Skip to content

Commit

Permalink
feat: implement backfill endpoint with aspect routing (linkedin#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhixuanjia authored May 23, 2022
1 parent 57937d2 commit dcb00a9
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ project.ext.externalDependency = [
'junitJupiterParams': "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion",
'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion",
'junitVintageEngine': "org.junit.vintage:junit-vintage-engine:$junitJupiterVersion",
'lombok': 'org.projectlombok:lombok:1.18.12',
'lombok': 'org.projectlombok:lombok:1.18.22',
'mockito': 'org.mockito:mockito-core:3.0.0',
'mockitoInline': 'org.mockito:mockito-inline:3.0.0',
'neo4jHarness': 'org.neo4j.test:neo4j-harness:3.4.11',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,15 @@ public Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTe
return backfill(BackfillMode.BACKFILL_ALL, aspectClasses, urns);
}

/**
* Backfill a set of aspects for each corresponding urn.
*/
public void backfill(@Nonnull Map<URN, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> urnToAspects) {
urnToAspects.forEach((urn, aspects) -> {
aspects.forEach((aspectClass, aspect) -> aspect.ifPresent(value -> backfill(BackfillMode.BACKFILL_ALL, value, urn)));
});
}

/**
* Similar to {@link #backfill(Set, Set)} but does a scoped backfill.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package com.linkedin.metadata.restli;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.restli.server.RestLiServiceException;
import java.util.Map;
import java.util.Set;


/**
* A client interacts with standard GMS APIs.
*/
public abstract class BaseAspectRoutingGmsClient {
public abstract class BaseAspectRoutingGmsClient<ASPECT extends RecordTemplate> {

/**
* Retrieves the latest version of the routing aspect for an entity.
*/
public abstract <KEY, ASPECT extends RecordTemplate> ASPECT get(KEY id) throws RestLiServiceException;
public abstract <URN extends Urn> ASPECT get(URN urn) throws RestLiServiceException;

/**
* Batch retrieve the latest version of the routing aspect for a set of entities.
*/
public abstract <URN extends Urn> Map<URN, ASPECT> batchGet(Set<URN> urn) throws RestLiServiceException;

/**
* Ingests the latest version of the routing aspect for an entity.
*/
public abstract <KEY, ASPECT extends RecordTemplate> void ingest(KEY id, ASPECT aspect) throws RestLiServiceException;
public abstract <URN extends Urn> void ingest(URN urn, ASPECT aspect) throws RestLiServiceException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -81,7 +84,7 @@ public BaseAspectRoutingResource(@Nonnull Class<SNAPSHOT> snapshotClass, @Nonnul
* Get the client of GMS that routing aspect will be routed to.
* @return A client of the GMS for routing aspect.
*/
public abstract BaseAspectRoutingGmsClient getGmsClient();
public abstract BaseAspectRoutingGmsClient<ROUTING_ASPECT> getGmsClient();

/**
* Retrieves the value for an entity that is made up of latest versions of specified aspects.
Expand All @@ -97,7 +100,7 @@ public Task<VALUE> get(@Nonnull KEY id,

// Get entity from aspect GMS
if (containsRoutingAspect(aspectClasses) && aspectClasses.size() == 1) {
return merge(null, getGmsClient().get(id));
return merge(null, getGmsClient().get(toUrn(id)));
}

// The assumption is main GMS must have this entity.
Expand All @@ -116,7 +119,7 @@ public Task<VALUE> get(@Nonnull KEY id,
// TODO: Confirm ownership gms will return null value if there is no ownership aspect for id.
// Need to read from both aspect GMS and local DAO.
final VALUE valueFromLocalDao = getValueFromLocalDao(id, withoutRoutingAspect);
final ROUTING_ASPECT aspectValueFromGms = getGmsClient().get(id);
final ROUTING_ASPECT aspectValueFromGms = getGmsClient().get(toUrn(id));

return merge(valueFromLocalDao, aspectValueFromGms);
});
Expand Down Expand Up @@ -155,6 +158,47 @@ public Task<SNAPSHOT> getSnapshot(@ActionParam(PARAM_URN) @Nonnull String urnStr
});
}

/**
* An action method for emitting MAE backfill messages for a set of entities.
*/
@Action(name = ACTION_BACKFILL_WITH_URNS)
@Nonnull
public Task<BackfillResult> backfill(@ActionParam(PARAM_URNS) @Nonnull String[] urns,
@ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) {

return RestliUtils.toTask(() -> {
final Set<URN> urnSet = Arrays.stream(urns).map(urnString -> parseUrnParam(urnString)).collect(Collectors.toSet());
final Set<Class<? extends RecordTemplate>> aspectClasses = parseAspectsParam(aspectNames);
Map<URN, Map<Class<? extends RecordTemplate>, java.util.Optional<? extends RecordTemplate>>> urnToAspect = new HashMap<>();

if (!containsRoutingAspect(aspectClasses)) {
// Backfill only needs local DAO.
return buildBackfillResult(getLocalDAO().backfill(aspectClasses, urnSet));
}

if (containsRoutingAspect(aspectClasses) && aspectClasses.size() == 1) {
// Backfill only needs aspect GMS
getGmsClient().batchGet(urnSet).forEach((urn, routingAspect) -> {
urnToAspect.put(urn, Collections.singletonMap(_routingAspectClass, java.util.Optional.ofNullable(routingAspect)));
});
} else {
getGmsClient().batchGet(urnSet).forEach((urn, routingAspect) -> {
Map<Class<? extends RecordTemplate>, java.util.Optional<? extends RecordTemplate>> aspectMap = new HashMap<>();
aspectMap.put(_routingAspectClass, java.util.Optional.ofNullable(routingAspect));
urnToAspect.put(urn, aspectMap);
});

getLocalDAO().get(removeRoutingAspect(aspectClasses), urnSet).forEach((urn, aspect) -> {
urnToAspect.get(urn).putAll(aspect);
});
}

// MAE is emitted in LocalDAO so we still need to invoke backfill in LocalDAO.
getLocalDAO().backfill(urnToAspect);
return buildBackfillResult(urnToAspect);
});
}

@Nonnull
@Override
protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
Expand All @@ -165,7 +209,7 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
ModelUtils.getAspectsFromSnapshot(snapshot).stream().forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (aspect.getClass().equals(_routingAspectClass)) {
getGmsClient().ingest(toKey(urn), aspect);
getGmsClient().ingest(urn, (ROUTING_ASPECT) aspect);
} else {
getLocalDAO().add(urn, aspect, auditStamp);
}
Expand Down Expand Up @@ -235,7 +279,7 @@ private List<ASPECT_UNION> getAspectsFromLocalDao(URN urn, Set<Class<? extends R
@Nonnull
@ParametersAreNonnullByDefault
private List<ASPECT_UNION> getAspectsFromGms(URN urn) {
final ROUTING_ASPECT routingAspect = getGmsClient().get(toKey(urn));
final ROUTING_ASPECT routingAspect = getGmsClient().get(urn);
if (routingAspect == null) {
return new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public Task<BackfillResult> backfill(@ActionParam(PARAM_MODE) @Nonnull BackfillM
}

@Nonnull
private BackfillResult buildBackfillResult(@Nonnull Map<URN, Map<Class<? extends RecordTemplate>,
protected BackfillResult buildBackfillResult(@Nonnull Map<URN, Map<Class<? extends RecordTemplate>,
java.util.Optional<? extends RecordTemplate>>> backfilledAspects) {

final Set<URN> urns = new TreeSet<>(Comparator.comparing(Urn::toString));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -160,7 +161,7 @@ public void testGetWithRoutingAspect() {
when(_mockLocalDAO.exists(urn)).thenReturn(true);
when(_mockLocalDAO.get(new HashSet<>(Arrays.asList(aspectBarKey)))).thenReturn(
Collections.singletonMap(aspectBarKey, Optional.of(bar)));
when(_mockGmsClient.get(makeResourceKey(urn))).thenReturn(foo);
when(_mockGmsClient.get(urn)).thenReturn(foo);

EntityValue value = runAndWait(_resource.get(makeResourceKey(urn), new String[]{AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()}));

Expand Down Expand Up @@ -197,7 +198,7 @@ public void testGetWithOnlyRoutingAspect() {
AspectFoo foo = new AspectFoo().setValue("foo");

when(_mockLocalDAO.exists(urn)).thenReturn(true);
when(_mockGmsClient.get(makeResourceKey(urn))).thenReturn(foo);
when(_mockGmsClient.get(urn)).thenReturn(foo);

EntityValue value = runAndWait(_resource.get(makeResourceKey(urn), new String[]{AspectFoo.class.getCanonicalName()}));

Expand All @@ -218,7 +219,7 @@ public void testGetWithEmptyValueFromLocalDao() {
when(_mockLocalDAO.exists(urn)).thenReturn(true);
when(_mockLocalDAO.get(new HashSet<>(Arrays.asList(aspectBarKey)))).thenReturn(
Collections.singletonMap(aspectBarKey, Optional.empty()));
when(_mockGmsClient.get(makeResourceKey(urn))).thenReturn(foo);
when(_mockGmsClient.get(urn)).thenReturn(foo);

EntityValue value = runAndWait(_resource.get(makeResourceKey(urn), new String[]{AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()}));

Expand All @@ -237,7 +238,7 @@ public void testGetWithNullValueFromGms() {
when(_mockLocalDAO.exists(urn)).thenReturn(true);
when(_mockLocalDAO.get(new HashSet<>(Arrays.asList(aspectBarKey)))).thenReturn(
Collections.singletonMap(aspectBarKey, Optional.of(bar)));
when(_mockGmsClient.get(makeResourceKey(urn))).thenReturn(null);
when(_mockGmsClient.get(urn)).thenReturn(null);

EntityValue value = runAndWait(_resource.get(makeResourceKey(urn), new String[]{AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()}));

Expand All @@ -258,7 +259,7 @@ public void testIngestWithRoutingAspect() {
runAndWait(_resource.ingest(snapshot));

verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any());
verify(_mockGmsClient, times(1)).ingest(eq(_resource.toKey(urn)), eq(foo));
verify(_mockGmsClient, times(1)).ingest(eq(urn), eq(foo));
verifyNoMoreInteractions(_mockLocalDAO);
}

Expand Down Expand Up @@ -286,7 +287,7 @@ public void testIngestWithOnlyRoutingAspect() {
runAndWait(_resource.ingest(snapshot));

verifyZeroInteractions(_mockLocalDAO);
verify(_mockGmsClient, times(1)).ingest(eq(_resource.toKey(urn)), eq(foo));
verify(_mockGmsClient, times(1)).ingest(eq(urn), eq(foo));
verifyNoMoreInteractions(_mockGmsClient);
}

Expand Down Expand Up @@ -314,7 +315,7 @@ public void testGetSnapshotWithRoutingAspect() {
AspectKey<FooUrn, ? extends RecordTemplate> barKey = new AspectKey<>(AspectBar.class, urn, LATEST_VERSION);
Set<AspectKey<FooUrn, ? extends RecordTemplate>> aspectKeys = ImmutableSet.of(barKey);
when(_mockLocalDAO.get(aspectKeys)).thenReturn(ImmutableMap.of(barKey, Optional.of(bar)));
when(_mockGmsClient.get(makeResourceKey(urn))).thenReturn(foo);
when(_mockGmsClient.get(urn)).thenReturn(foo);

EntitySnapshot snapshot = runAndWait(_resource.getSnapshot(urn.toString(),
new String[]{AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()}));
Expand All @@ -329,7 +330,7 @@ public void testGetSnapshotWithRoutingAspect() {
public void testGetSnapshotWithOnlyRoutingAspect() {
FooUrn urn = makeFooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
when(_mockGmsClient.get(makeResourceKey(urn))).thenReturn(foo);
when(_mockGmsClient.get(urn)).thenReturn(foo);

EntitySnapshot snapshot = runAndWait(_resource.getSnapshot(urn.toString(), new String[]{AspectFoo.class.getCanonicalName()}));
assertEquals(snapshot.getUrn(), urn);
Expand All @@ -341,4 +342,71 @@ public void testGetSnapshotWithOnlyRoutingAspect() {
assertEquals(aspects, ImmutableSet.of(foo));
verifyZeroInteractions(_mockLocalDAO);
}

@Test
public void testBackfillWithRoutingAspect() {
FooUrn fooUrn1 = makeFooUrn(1);
FooUrn fooUrn2 = makeFooUrn(2);
AspectFoo foo1 = new AspectFoo().setValue("foo1");
AspectBar bar1 = new AspectBar().setValue("bar1");
AspectFoo foo2 = new AspectFoo().setValue("foo2");
AspectBar bar2 = new AspectBar().setValue("bar2");

Map<FooUrn, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> daoResult =
ImmutableMap.of(fooUrn1, Collections.singletonMap(AspectBar.class, Optional.of(bar1)),
fooUrn2, Collections.singletonMap(AspectBar.class, Optional.of(bar2)));

when(_mockLocalDAO.get(Collections.singleton(AspectBar.class), ImmutableSet.of(fooUrn1, fooUrn2))).thenReturn(daoResult);
when(_mockGmsClient.batchGet(ImmutableSet.of(fooUrn1, fooUrn2))).thenReturn(ImmutableMap.of(fooUrn1, foo1, fooUrn2, foo2));

BackfillResult backfillResult = runAndWait(_resource.backfill(new String[]{fooUrn1.toString(), fooUrn2.toString()},
new String[]{AspectFoo.class.getCanonicalName(), AspectBar.class.getCanonicalName()}));

assertEquals(backfillResult.getEntities().size(), 2);
assertTrue(backfillResult.getEntities().get(0).getAspects().contains(AspectBar.class.getCanonicalName()));
assertTrue(backfillResult.getEntities().get(0).getAspects().contains(AspectFoo.class.getCanonicalName()));
assertTrue(backfillResult.getEntities().get(1).getAspects().contains(AspectBar.class.getCanonicalName()));
assertTrue(backfillResult.getEntities().get(1).getAspects().contains(AspectFoo.class.getCanonicalName()));
verify(_mockLocalDAO, times(1)).backfill(anyMap());
}

@Test
public void testBackfillWithoutRoutingAspect() {
FooUrn fooUrn1 = makeFooUrn(1);
FooUrn fooUrn2 = makeFooUrn(2);
AspectBar bar1 = new AspectBar().setValue("bar1");
AspectBar bar2 = new AspectBar().setValue("bar2");

Map<FooUrn, Map<Class<? extends RecordTemplate>, Optional<? extends RecordTemplate>>> daoResult =
ImmutableMap.of(fooUrn1, Collections.singletonMap(AspectBar.class, Optional.of(bar1)),
fooUrn2, Collections.singletonMap(AspectBar.class, Optional.of(bar2)));

when(_mockLocalDAO.backfill(Collections.singleton(AspectBar.class), ImmutableSet.of(fooUrn1, fooUrn2))).thenReturn(daoResult);
BackfillResult backfillResult = runAndWait(_resource.backfill(new String[]{fooUrn1.toString(), fooUrn2.toString()},
new String[]{AspectBar.class.getCanonicalName()}));

assertEquals(backfillResult.getEntities().size(), 2);
verifyZeroInteractions(_mockGmsClient);
verify(_mockLocalDAO, times(0)).backfill(anyMap());
}

@Test
public void testBackfillWithOnlyRoutingAspect() {
FooUrn fooUrn1 = makeFooUrn(1);
FooUrn fooUrn2 = makeFooUrn(2);
AspectFoo foo1 = new AspectFoo().setValue("foo1");
AspectFoo foo2 = new AspectFoo().setValue("foo2");

when(_mockGmsClient.batchGet(ImmutableSet.of(fooUrn1, fooUrn2))).thenReturn(ImmutableMap.of(fooUrn1, foo1, fooUrn2, foo2));

BackfillResult backfillResult = runAndWait(_resource.backfill(new String[]{fooUrn1.toString(), fooUrn2.toString()},
new String[]{AspectFoo.class.getCanonicalName()}));

assertEquals(backfillResult.getEntities().size(), 2);
assertFalse(backfillResult.getEntities().get(0).getAspects().contains(AspectBar.class.getCanonicalName()));
assertTrue(backfillResult.getEntities().get(0).getAspects().contains(AspectFoo.class.getCanonicalName()));
assertFalse(backfillResult.getEntities().get(1).getAspects().contains(AspectBar.class.getCanonicalName()));
assertTrue(backfillResult.getEntities().get(1).getAspects().contains(AspectFoo.class.getCanonicalName()));
verify(_mockLocalDAO, times(1)).backfill(anyMap());
}
}

0 comments on commit dcb00a9

Please sign in to comment.