Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(EntityService): batched transactions and ebean updates #8456

Merged
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
0e37907
refactor(rentention): refactor retention
david-leifker Jul 17, 2023
70c5b5e
feat(sql): ebean transaction batches
david-leifker Jul 19, 2023
3f13b99
lint
david-leifker Jul 19, 2023
496362d
fixing tests
david-leifker Jul 19, 2023
dd55590
fix-up patch
david-leifker Jul 19, 2023
e65d540
EntityService refactor
david-leifker Jul 22, 2023
e795eca
Merge remote-tracking branch 'origin/master' into ebean-batched-trans…
david-leifker Jul 22, 2023
f87b74e
Fix merge issues
david-leifker Jul 23, 2023
fc4e48b
Complete futures for async produce
david-leifker Jul 24, 2023
85b51e4
Add metrics around exceptions, skip retry on key aspects
david-leifker Jul 24, 2023
c29a206
Merge remote-tracking branch 'origin/master' into ebean-batched-trans…
david-leifker Jul 24, 2023
843cf06
Use fully qualified path for docker task lookup
david-leifker Jul 25, 2023
d19c533
Avoid pulling mae/mce consumers in smoke-tests (not used)
david-leifker Jul 25, 2023
f448e0e
try non-standalone
david-leifker Jul 25, 2023
38f2ba8
drop kafka-setup
david-leifker Jul 25, 2023
3122414
kafka is required
david-leifker Jul 26, 2023
da47ea1
disable ES threshold
david-leifker Jul 27, 2023
fcf883b
Merge remote-tracking branch 'datahub-project/master' into ebean-batc…
david-leifker Jul 28, 2023
846d95b
fix(test): increase siblings.js test stability
david-leifker Aug 1, 2023
05c9dad
Merge branch 'master' into siblings-cypress-fix
david-leifker Aug 1, 2023
2e6f377
Merge remote-tracking branch 'origin/siblings-cypress-fix' into ebean…
david-leifker Aug 1, 2023
abcfae0
Merge branch 'master' into ebean-batched-transactions
david-leifker Aug 1, 2023
9ef881b
lint
david-leifker Aug 1, 2023
1fe33b6
drop mce/mae consumers, space issues
david-leifker Aug 2, 2023
bd752be
add multi-threading test
david-leifker Aug 11, 2023
4c5fbae
Merge remote-tracking branch 'origin/master' into ebean-batched-trans…
david-leifker Aug 11, 2023
95cbe64
Merge remote-tracking branch 'origin/master' into ebean-batched-trans…
david-leifker Aug 11, 2023
8b66642
fix merge, ebean upgrade
david-leifker Aug 11, 2023
2f6da4c
Improve tests
david-leifker Aug 11, 2023
9ba652e
Merge branch 'master' into ebean-batched-transactions
david-leifker Aug 11, 2023
12eb77e
Fix deserialization
david-leifker Aug 12, 2023
01e489b
only add additional mcp for upsert/create ops
david-leifker Aug 12, 2023
92a3cf1
Merge branch 'master' into ebean-batched-transactions
david-leifker Aug 12, 2023
028f3aa
fix AspectUtils test from patch to upsert
david-leifker Aug 12, 2023
714d67d
Merge remote-tracking branch 'origin/master' into ebean-batched-trans…
david-leifker Aug 18, 2023
cb98802
temp fix incorrect head tag
david-leifker Aug 19, 2023
96a3e56
Merge branch 'master' into ebean-batched-transactions
david-leifker Aug 22, 2023
c6822c5
Merge branch 'master' into ebean-batched-transactions
david-leifker Aug 25, 2023
7ff96a6
Merge branch 'ebean-batched-transactions' of github.com:david-leifker…
david-leifker Aug 26, 2023
7ff7abb
generate default aspects even on patch
david-leifker Aug 29, 2023
738c19b
Merge branch 'master' into ebean-batched-transactions
david-leifker Aug 29, 2023
7f66861
fix patch with default aspects
david-leifker Sep 2, 2023
8428c90
Merge branch 'master' into ebean-batched-transactions
david-leifker Sep 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ jobs:
kafka_setup_build,
mysql_setup_build,
elasticsearch_setup_build,
mae_consumer_build,
mce_consumer_build,
# mae_consumer_build,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary until space issues can be resolved.

# mce_consumer_build,
datahub_upgrade_build,
]
steps:
Expand Down Expand Up @@ -475,24 +475,21 @@ jobs:
if: ${{ needs.setup.outputs.publish != 'true' }}
with:
image: ${{ env.DATAHUB_ELASTIC_SETUP_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
- name: Download MCE Consumer image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' }}
with:
image: ${{ env.DATAHUB_MCE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
- name: Download MAE Consumer image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' }}
with:
image: ${{ env.DATAHUB_MAE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
# - name: Download MCE Consumer image
# uses: ishworkh/docker-image-artifact-download@v1
# if: ${{ needs.setup.outputs.publish != 'true' }}
# with:
# image: ${{ env.DATAHUB_MCE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
# - name: Download MAE Consumer image
# uses: ishworkh/docker-image-artifact-download@v1
# if: ${{ needs.setup.outputs.publish != 'true' }}
# with:
# image: ${{ env.DATAHUB_MAE_CONSUMER_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
- name: Download upgrade image
uses: ishworkh/docker-image-artifact-download@v1
if: ${{ needs.setup.outputs.publish != 'true' }}
with:
image: ${{ env.DATAHUB_UPGRADE_IMAGE }}:${{ needs.setup.outputs.unique_tag }}
- name: Disable datahub-actions
run: |
yq -i 'del(.services.datahub-actions)' docker/quickstart/docker-compose-without-neo4j.quickstart.yml
- name: run quickstart
env:
DATAHUB_TELEMETRY_ENABLED: false
Expand Down
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ buildscript {
ext.logbackClassic = '1.2.12'
ext.hadoop3Version = '3.3.5'
ext.kafkaVersion = '2.3.0'
ext.ebeanVersion = '12.16.1'

ext.docker_registry = 'linkedin'

Expand Down Expand Up @@ -86,8 +87,9 @@ project.ext.externalDependency = [
'dgraph4j' : 'io.dgraph:dgraph4j:21.03.1',
'dropwizardMetricsCore': 'io.dropwizard.metrics:metrics-core:4.2.3',
'dropwizardMetricsJmx': 'io.dropwizard.metrics:metrics-jmx:4.2.3',
'ebean': 'io.ebean:ebean:11.33.3',
'ebeanAgent': 'io.ebean:ebean-agent:11.27.1',
'ebean': 'io.ebean:ebean:' + ebeanVersion,
'ebeanAgent': 'io.ebean:ebean-agent:' + ebeanVersion,
'ebeanDdl': 'io.ebean:ebean-ddl-generator:' + ebeanVersion,
'elasticSearchRest': 'org.elasticsearch.client:elasticsearch-rest-high-level-client:' + elasticsearchVersion,
'elasticSearchTransport': 'org.elasticsearch.client:transport:' + elasticsearchVersion,
'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1',
Expand Down
2 changes: 1 addition & 1 deletion datahub-frontend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ task unversionZip(type: Copy, dependsOn: [':datahub-web-react:build', dist]) {
into "${buildDir}/docker/"
rename "datahub-frontend-${version}.zip", "datahub-frontend.zip"
}
tasks.getByName("docker").dependsOn(unversionZip)
tasks.getByPath(":datahub-frontend:docker").dependsOn(unversionZip)

task cleanLocalDockerImages {
doLast {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.MetadataChangeProposal;
import org.mockito.Mockito;

import java.util.List;


public class TestUtils {

public static EntityService getMockEntityService() {
EntityRegistry registry = new ConfigEntityRegistry(TestUtils.class.getResourceAsStream("/test-entity-registry.yaml"));
EntityService mockEntityService = Mockito.mock(EntityService.class);
Mockito.when(mockEntityService.getEntityRegistry()).thenReturn(registry);
return mockEntityService;
}

public static QueryContext getMockAllowContext() {
return getMockAllowContext("urn:li:corpuser:test");
}
Expand Down Expand Up @@ -88,25 +100,47 @@ public static QueryContext getMockDenyContext(String actorUrn, AuthorizationRequ
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) {
verifyIngestProposal(mockService, numberOfInvocations, List.of(proposal));
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, List<MetadataChangeProposal> proposals) {
AspectsBatchImpl batch = AspectsBatchImpl.builder()
.mcps(proposals, mockService.getEntityRegistry())
.build();
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.eq(batch),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifySingleIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.eq(proposal),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
Mockito.eq(proposal),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(AspectsBatchImpl.class),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifySingleIngestProposal(EntityService mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyNoIngestProposal(EntityService mockService) {
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.any(AuditStamp.class), Mockito.anyBoolean());
Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean());
}

private TestUtils() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DeleteAssertionResolverTest {
public void testGetSuccess() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);
Mockito.when(mockService.getAspect(
Urn.createFromString(TEST_ASSERTION_URN),
Expand Down Expand Up @@ -78,7 +78,7 @@ public void testGetSuccess() throws Exception {
public void testGetSuccessNoAssertionInfoFound() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);
Mockito.when(mockService.getAspect(
Urn.createFromString(TEST_ASSERTION_URN),
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testGetSuccessAssertionAlreadyRemoved() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(false);

DeleteAssertionResolver resolver = new DeleteAssertionResolver(mockClient, mockService);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testGetSuccessAssertionAlreadyRemoved() throws Exception {
public void testGetUnauthorized() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);
Mockito.when(mockService.getAspect(
Urn.createFromString(TEST_ASSERTION_URN),
Expand Down Expand Up @@ -189,7 +189,7 @@ public void testGetEntityClientException() throws Exception {
Mockito.any(),
Mockito.any(Authentication.class));

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);

DeleteAssertionResolver resolver = new DeleteAssertionResolver(mockClient, mockService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetchingEnvironment;

import java.util.List;
import java.util.concurrent.CompletionException;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand All @@ -29,7 +32,7 @@ public class BatchUpdateSoftDeletedResolverTest {

@Test
public void testGetSuccessNoExistingStatus() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -61,20 +64,17 @@ public void testGetSuccessNoExistingStatus() throws Exception {

final MetadataChangeProposal proposal1 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_1),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_2),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal2);
verifyIngestProposal(mockService, 1, List.of(proposal1, proposal2));
}

@Test
public void testGetSuccessExistingStatus() throws Exception {
final Status originalStatus = new Status().setRemoved(true);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -105,18 +105,15 @@ public void testGetSuccessExistingStatus() throws Exception {

final MetadataChangeProposal proposal1 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_1),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_2),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal2);
verifyIngestProposal(mockService, 1, List.of(proposal1, proposal2));
}

@Test
public void testGetFailureResourceDoesNotExist() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -148,7 +145,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {

@Test
public void testGetUnauthorized() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService);

Expand All @@ -166,10 +163,10 @@ public void testGetUnauthorized() throws Exception {

@Test
public void testGetEntityClientException() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal(
Mockito.any(),
Mockito.any(AspectsBatchImpl.class),
Mockito.any(AuditStamp.class), Mockito.anyBoolean());

BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetchingEnvironment;

import java.util.List;
import java.util.concurrent.CompletionException;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand All @@ -30,7 +33,7 @@ public class BatchUpdateDeprecationResolverTest {

@Test
public void testGetSuccessNoExistingDeprecation() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -68,12 +71,10 @@ public void testGetSuccessNoExistingDeprecation() throws Exception {

final MetadataChangeProposal proposal1 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_1),
DEPRECATION_ASPECT_NAME, newDeprecation);

verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_2),
DEPRECATION_ASPECT_NAME, newDeprecation);
verifyIngestProposal(mockService, 1, proposal2);

verifyIngestProposal(mockService, 1, List.of(proposal1, proposal2));
}

@Test
Expand All @@ -83,7 +84,7 @@ public void testGetSuccessExistingDeprecation() throws Exception {
.setNote("")
.setActor(UrnUtils.getUrn("urn:li:corpuser:test"));

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -120,18 +121,15 @@ public void testGetSuccessExistingDeprecation() throws Exception {

final MetadataChangeProposal proposal1 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_1),
DEPRECATION_ASPECT_NAME, newDeprecation);

verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_2),
DEPRECATION_ASPECT_NAME, newDeprecation);

verifyIngestProposal(mockService, 1, proposal2);
verifyIngestProposal(mockService, 1, List.of(proposal1, proposal2));
}

@Test
public void testGetFailureResourceDoesNotExist() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -164,7 +162,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {

@Test
public void testGetUnauthorized() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

BatchUpdateDeprecationResolver resolver = new BatchUpdateDeprecationResolver(mockService);

Expand All @@ -183,10 +181,10 @@ public void testGetUnauthorized() throws Exception {

@Test
public void testGetEntityClientException() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal(
Mockito.any(),
Mockito.any(AspectsBatchImpl.class),
Mockito.any(AuditStamp.class), Mockito.anyBoolean());

BatchUpdateDeprecationResolver resolver = new BatchUpdateDeprecationResolver(mockService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testGetSuccessNoExistingDeprecation() throws Exception {
.setUrn(Urn.createFromString(TEST_ENTITY_URN))
.setAspects(new EnvelopedAspectMap(Collections.emptyMap()))));

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ENTITY_URN))).thenReturn(true);

UpdateDeprecationResolver resolver = new UpdateDeprecationResolver(mockClient, mockService);
Expand Down
Loading
Loading