From 36607ecfd6dcc2f108bf6940c4006ca5d7544723 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Sun, 23 Jul 2023 17:07:11 -0500 Subject: [PATCH] Fix merge issues --- .../resolvers/embed/UpdateEmbedResolver.java | 2 +- .../resolvers/mutate/MutationUtils.java | 2 +- .../mutate/UpdateUserSettingResolver.java | 2 +- .../resolvers/mutate/util/DeleteUtils.java | 4 +- .../mutate/util/DeprecationUtils.java | 4 +- .../resolvers/mutate/util/DomainUtils.java | 4 +- .../resolvers/mutate/util/LabelUtils.java | 4 +- .../resolvers/mutate/util/OwnerUtils.java | 4 +- .../linkedin/datahub/graphql/TestUtils.java | 13 ++- .../BatchUpdateSoftDeletedResolverTest.java | 3 +- .../BatchUpdateDeprecationResolverTest.java | 3 +- .../domain/BatchSetDomainResolverTest.java | 3 +- .../embed/UpdateEmbedResolverTest.java | 5 +- .../owner/AddOwnersResolverTest.java | 3 +- .../owner/BatchAddOwnersResolverTest.java | 3 +- .../owner/BatchRemoveOwnersResolverTest.java | 3 +- .../resolvers/tag/AddTagsResolverTest.java | 3 +- .../tag/BatchAddTagsResolverTest.java | 9 +- .../tag/BatchRemoveTagsResolverTest.java | 7 +- .../resolvers/term/AddTermsResolverTest.java | 14 +-- .../term/BatchAddTermsResolverTest.java | 3 +- .../term/BatchRemoveTermsResolverTest.java | 3 +- .../metadata/client/JavaEntityClient.java | 8 +- .../metadata/entity/EntityServiceImpl.java | 59 ++++--------- .../linkedin/metadata/entity/EntityUtils.java | 2 +- .../cassandra/CassandraRetentionService.java | 10 +++ .../metadata/entity/ebean/EbeanAspectDao.java | 11 ++- .../entity/ebean/EbeanRetentionService.java | 10 +++ ...spectsBatch.java => AspectsBatchImpl.java} | 33 +++---- .../ebean/transactions/PatchBatchItem.java | 10 ++- .../ebean/transactions/UpsertBatchItem.java | 11 ++- .../entity/validation/ValidationUtils.java | 38 ++++++++ .../metadata/AspectIngestionUtils.java | 8 +- .../entity/EbeanEntityServiceTest.java | 6 +- .../metadata/entity/EntityServiceTest.java | 54 ++++++------ .../token/StatefulTokenService.java | 4 +- .../linkedin/metadata/boot/BootstrapStep.java | 2 +- .../linkedin/metadata/boot/UpgradeStep.java | 4 +- .../boot/steps/BackfillBrowsePathsV2Step.java | 2 +- .../IngestDataPlatformInstancesStep.java | 4 +- .../boot/steps/IngestDataPlatformsStep.java | 4 +- .../IngestDefaultGlobalSettingsStep.java | 2 +- .../boot/steps/IngestOwnershipTypesStep.java | 4 +- .../boot/steps/IngestPoliciesStep.java | 4 +- .../metadata/boot/steps/IngestRolesStep.java | 4 +- .../boot/steps/RestoreDbtSiblingsIndices.java | 2 +- .../steps/UpgradeDefaultBrowsePathsStep.java | 2 +- .../steps/BackfillBrowsePathsV2StepTest.java | 4 +- .../IngestDefaultGlobalSettingsStepTest.java | 4 +- .../RestoreColumnLineageIndicesTest.java | 6 +- .../steps/RestoreGlossaryIndicesTest.java | 6 +- .../UpgradeDefaultBrowsePathsStepTest.java | 8 +- .../openapi/util/MappingUtil.java | 10 ++- .../java/entities/EntitiesControllerTest.java | 6 +- .../resources/entity/AspectResource.java | 12 +-- .../entity/BatchIngestionRunResource.java | 2 +- .../resources/entity/AspectResourceTest.java | 12 +-- .../linkedin/metadata/entity/AspectUtils.java | 37 -------- .../metadata/entity/DeleteEntityService.java | 2 +- .../metadata/entity/EntityService.java | 86 +++++-------------- .../metadata/entity/IngestResult.java | 18 ++++ .../metadata/entity/RetentionService.java | 10 +-- .../metadata/entity/UpdateAspectResult.java | 8 ++ .../transactions/AbstractBatchItem.java | 12 ++- .../entity/transactions/AspectsBatch.java | 22 +++++ 65 files changed, 346 insertions(+), 318 deletions(-) rename metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/{AspectsBatch.java => AspectsBatchImpl.java} (59%) create mode 100644 metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java rename {metadata-io/src/main/java/com/linkedin/metadata/entity/ebean => metadata-service/services/src/main/java/com/linkedin/metadata/entity}/transactions/AbstractBatchItem.java (91%) create mode 100644 metadata-service/services/src/main/java/com/linkedin/metadata/entity/transactions/AspectsBatch.java diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolver.java index 8fc931310570af..86b8eb5564152c 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolver.java @@ -58,7 +58,7 @@ public CompletableFuture get(DataFetchingEnvironment environment) throw updateEmbed(embed, input); final MetadataChangeProposal proposal = buildMetadataChangeProposalWithUrn(entityUrn, EMBED_ASPECT_NAME, embed); - _entityService.ingestSingleProposal( + _entityService.ingestProposal( proposal, new AuditStamp().setActor(UrnUtils.getUrn(context.getActorUrn())).setTime(System.currentTimeMillis()), false diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java index 524080bc94edb3..0cf9acd62f736c 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/MutationUtils.java @@ -29,7 +29,7 @@ private MutationUtils() { } public static void persistAspect(Urn urn, String aspectName, RecordTemplate aspect, Urn actor, EntityService entityService) { final MetadataChangeProposal proposal = buildMetadataChangeProposalWithUrn(urn, aspectName, aspect); - entityService.ingestSingleProposal(proposal, getAuditStamp(actor), false); + entityService.ingestProposal(proposal, getAuditStamp(actor), false); } /** diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java index e97c20b77434e2..8c1d32c470f44b 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/UpdateUserSettingResolver.java @@ -56,7 +56,7 @@ public CompletableFuture get(DataFetchingEnvironment environment) throw MetadataChangeProposal proposal = buildMetadataChangeProposalWithUrn(actor, CORP_USER_SETTINGS_ASPECT_NAME, newSettings); - _entityService.ingestSingleProposal(proposal, getAuditStamp(actor), false); + _entityService.ingestProposal(proposal, getAuditStamp(actor), false); return true; } catch (Exception e) { diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeleteUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeleteUtils.java index 9ec91b4784f087..94acb5a75918f6 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeleteUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeleteUtils.java @@ -12,7 +12,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import java.util.ArrayList; import java.util.List; @@ -73,7 +73,7 @@ private static MetadataChangeProposal buildSoftDeleteProposal( } private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { - entityService.ingestProposal(AspectsBatch.builder() + entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false); } } \ No newline at end of file diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java index 4d3b0770bd3634..0a1712aaf9ab46 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DeprecationUtils.java @@ -14,7 +14,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import java.util.ArrayList; import java.util.List; @@ -88,7 +88,7 @@ private static MetadataChangeProposal buildUpdateDeprecationProposal( } private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { - entityService.ingestProposal(AspectsBatch.builder() + entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false); } } \ No newline at end of file diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java index 4da6ee5c2db07c..3809fcee286497 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/DomainUtils.java @@ -14,7 +14,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import java.util.ArrayList; import java.util.List; @@ -87,7 +87,7 @@ public static void validateDomain(Urn domainUrn, EntityService entityService) { } private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { - entityService.ingestProposal(AspectsBatch.builder() + entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false); } } \ No newline at end of file diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java index ffb499a3a833de..eb2d49ce987e38 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/LabelUtils.java @@ -20,7 +20,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.schema.EditableSchemaFieldInfo; import com.linkedin.schema.EditableSchemaMetadata; @@ -559,7 +559,7 @@ private static GlossaryTermAssociationArray removeTermsIfExists(GlossaryTerms te } private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { - entityService.ingestProposal(AspectsBatch.builder() + entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false); } } diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java index 9563e55f91f705..cf9c2a4ba6adcc 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/mutate/util/OwnerUtils.java @@ -21,7 +21,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.authorization.PoliciesConfig; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import java.util.ArrayList; import java.util.List; @@ -270,7 +270,7 @@ public static Boolean validateRemoveInput( } private static void ingestChangeProposals(List changes, EntityService entityService, Urn actor) { - entityService.ingestProposal(AspectsBatch.builder() + entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false); } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java index 67afd4d3a0cc86..272a93fa1989c9 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/TestUtils.java @@ -9,7 +9,7 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.mxe.MetadataChangeProposal; @@ -104,7 +104,7 @@ public static void verifyIngestProposal(EntityService mockService, int numberOfI } public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, List proposals) { - AspectsBatch batch = AspectsBatch.builder() + AspectsBatchImpl batch = AspectsBatchImpl.builder() .mcps(proposals, mockService.getEntityRegistry()) .build(); Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal( @@ -115,7 +115,7 @@ public static void verifyIngestProposal(EntityService mockService, int numberOfI } public static void verifySingleIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) { - Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal( Mockito.eq(proposal), Mockito.any(AuditStamp.class), Mockito.eq(false) @@ -124,14 +124,14 @@ public static void verifySingleIngestProposal(EntityService mockService, int num public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations) { Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal( - Mockito.any(AspectsBatch.class), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.eq(false) ); } public static void verifySingleIngestProposal(EntityService mockService, int numberOfInvocations) { - Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.eq(false) @@ -140,8 +140,7 @@ public static void verifySingleIngestProposal(EntityService mockService, int num public static void verifyNoIngestProposal(EntityService mockService) { Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), - Mockito.any(AuditStamp.class), Mockito.anyBoolean()); + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } private TestUtils() { } diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java index 6703d8726fa5db..bae6f27a854bc7 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/delete/BatchUpdateSoftDeletedResolverTest.java @@ -11,6 +11,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; @@ -165,7 +166,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java index 444d7481a23b2f..ce5a02bb573e18 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/deprecation/BatchUpdateDeprecationResolverTest.java @@ -12,6 +12,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; @@ -183,7 +184,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchUpdateDeprecationResolver resolver = new BatchUpdateDeprecationResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java index 138fd1d9bf0240..8cd3c71a21555b 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/domain/BatchSetDomainResolverTest.java @@ -14,6 +14,7 @@ import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; @@ -277,7 +278,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchSetDomainResolver resolver = new BatchSetDomainResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolverTest.java index 3cfb1c55b32759..f1d44fcb472556 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/embed/UpdateEmbedResolverTest.java @@ -15,6 +15,7 @@ import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.r2.RemoteInvocationException; import graphql.schema.DataFetchingEnvironment; @@ -136,7 +137,7 @@ public void testGetFailureEntityDoesNotExist() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.eq(false) );; @@ -156,7 +157,7 @@ public void testGetUnauthorized() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.eq(false) ); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java index e1324edaf8a7ff..efc0c5dfcf36d8 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/AddOwnersResolverTest.java @@ -13,6 +13,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -198,7 +199,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); AddOwnersResolver resolver = new AddOwnersResolver(Mockito.mock(EntityService.class)); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java index ff119451dd859b..79fc62742f4442 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchAddOwnersResolverTest.java @@ -17,6 +17,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -280,7 +281,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchAddOwnersResolver resolver = new BatchAddOwnersResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java index c76f2ec4acb121..9dc2ec81278069 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/owner/BatchRemoveOwnersResolverTest.java @@ -14,6 +14,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.BatchRemoveOwnersResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -175,7 +176,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchRemoveOwnersResolver resolver = new BatchRemoveOwnersResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java index 0596924b54a253..268d6a6bc4268d 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/AddTagsResolverTest.java @@ -13,6 +13,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.AddTagsResolver; import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; @@ -206,7 +207,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.eq(false)); AddTagsResolver resolver = new AddTagsResolver(Mockito.mock(EntityService.class)); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java index 6009912f498ec9..651b89359c83fa 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchAddTagsResolverTest.java @@ -15,6 +15,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; @@ -184,7 +185,7 @@ public void testGetFailureTagDoesNotExist() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -223,7 +224,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -247,7 +248,7 @@ public void testGetUnauthorized() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -256,7 +257,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchAddTagsResolver resolver = new BatchAddTagsResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java index 2e4314ef27e544..f302540eba9048 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/tag/BatchRemoveTagsResolverTest.java @@ -16,6 +16,7 @@ import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetchingEnvironment; @@ -179,7 +180,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -203,7 +204,7 @@ public void testGetUnauthorized() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -212,7 +213,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchRemoveTagsResolver resolver = new BatchRemoveTagsResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java index 94584c3750d2c6..213d21fd35dc1e 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/AddTermsResolverTest.java @@ -13,7 +13,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.AddTermsResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -58,7 +58,7 @@ public void testGetSuccessNoExistingTerms() throws Exception { // Unable to easily validate exact payload due to the injected timestamp Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(AspectsBatch.class), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.eq(false) ); @@ -104,7 +104,7 @@ public void testGetSuccessExistingTerms() throws Exception { // Unable to easily validate exact payload due to the injected timestamp Mockito.verify(mockService, Mockito.times(1)).ingestProposal( - Mockito.any(AspectsBatch.class), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.eq(false) ); @@ -143,7 +143,7 @@ public void testGetFailureTermDoesNotExist() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -173,7 +173,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -194,7 +194,7 @@ public void testGetUnauthorized() throws Exception { assertThrows(CompletionException.class, () -> resolver.get(mockEnv).join()); Mockito.verify(mockService, Mockito.times(0)).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); } @@ -203,7 +203,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); AddTermsResolver resolver = new AddTermsResolver(Mockito.mock(EntityService.class)); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java index 77200c8b72a220..8887bb452b478c 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchAddTermsResolverTest.java @@ -14,6 +14,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.BatchAddTermsResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -220,7 +221,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchAddTermsResolver resolver = new BatchAddTermsResolver(mockService); diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java index c0f1c06ba0556d..995a4acb8a4676 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/term/BatchRemoveTermsResolverTest.java @@ -14,6 +14,7 @@ import com.linkedin.datahub.graphql.resolvers.mutate.BatchRemoveTermsResolver; import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import graphql.schema.DataFetchingEnvironment; import java.util.concurrent.CompletionException; import org.mockito.Mockito; @@ -182,7 +183,7 @@ public void testGetEntityClientException() throws Exception { EntityService mockService = getMockEntityService(); Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal( - Mockito.any(), + Mockito.any(AspectsBatchImpl.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean()); BatchRemoveTermsResolver resolver = new BatchRemoveTermsResolver(mockService); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 7cd9e57cde0669..cae29f3bfee19c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -25,7 +25,9 @@ import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.DeleteEntityService; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.IngestResult; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; +import com.linkedin.metadata.entity.transactions.AspectsBatch; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.graph.LineageDirection; import com.linkedin.metadata.query.AutoCompleteResult; @@ -536,11 +538,11 @@ public String ingestProposal(@Nonnull final MetadataChangeProposal metadataChang Stream proposalStream = Stream.concat(Stream.of(metadataChangeProposal), additionalChanges.stream()); - AspectsBatch batch = AspectsBatch.builder() + AspectsBatch batch = AspectsBatchImpl.builder() .mcps(proposalStream.collect(Collectors.toList()), _entityService.getEntityRegistry()) .build(); - EntityService.IngestResult one = _entityService.ingestProposal(batch, auditStamp, async).stream() + IngestResult one = _entityService.ingestProposal(batch, auditStamp, async).stream() .findFirst().get(); Urn urn = one.getUrn(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 6430ab9afdd479..0d76ea19482144 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -34,14 +34,15 @@ import com.linkedin.metadata.aspect.Aspect; import com.linkedin.metadata.aspect.VersionedAspect; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; -import com.linkedin.metadata.entity.ebean.transactions.AbstractBatchItem; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; +import com.linkedin.metadata.entity.transactions.AbstractBatchItem; import com.linkedin.metadata.entity.ebean.transactions.PatchBatchItem; import com.linkedin.metadata.entity.ebean.transactions.UpsertBatchItem; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult; import com.linkedin.metadata.entity.retention.BulkApplyRetentionArgs; import com.linkedin.metadata.entity.retention.BulkApplyRetentionResult; +import com.linkedin.metadata.entity.transactions.AspectsBatch; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -88,8 +89,6 @@ import javax.annotation.Nullable; import javax.persistence.EntityNotFoundException; -import lombok.Builder; -import lombok.Value; import lombok.extern.slf4j.Slf4j; import static com.linkedin.metadata.Constants.*; @@ -133,34 +132,6 @@ public class EntityServiceImpl implements EntityService { * monotonically increasing version incrementing as usual once the latest version is replaced. */ - @Builder(toBuilder = true) - @Value - public static class UpdateAspectResult { - Urn urn; - UpsertBatchItem request; - RecordTemplate oldValue; - RecordTemplate newValue; - SystemMetadata oldSystemMetadata; - SystemMetadata newSystemMetadata; - MetadataAuditOperation operation; - AuditStamp auditStamp; - long maxVersion; - boolean processedMCL; - Future mclFuture; - } - - @Builder(toBuilder = true) - @Value - public static class IngestResult { - Urn urn; - AbstractBatchItem request; - boolean publishedMCL; - boolean processedMCL; - boolean publishedMCP; - boolean sqlCommitted; - boolean isUpdate; // update else insert - } - private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3; protected final AspectDao _aspectDao; @@ -199,6 +170,7 @@ public EntityServiceImpl( * @param aspectNames aspects to fetch for each urn in urns set * @return a map of provided {@link Urn} to a List containing the requested aspects. */ + @Override public Map> getLatestAspects( @Nonnull final Set urns, @Nonnull final Set aspectNames) { @@ -517,7 +489,8 @@ public ListResult listLatestAspects( * @param systemMetadata system metadata * @return update result */ - public List ingestAspects(Urn entityUrn, + @Override + public List ingestAspects(@Nonnull Urn entityUrn, List> pairList, @Nonnull final AuditStamp auditStamp, SystemMetadata systemMetadata) { @@ -529,7 +502,7 @@ public List ingestAspects(Urn entityUrn, .systemMetadata(systemMetadata) .build(_entityRegistry)) .collect(Collectors.toList()); - return ingestAspects(AspectsBatch.builder().items(items).build(), auditStamp, true, true); + return ingestAspects(AspectsBatchImpl.builder().items(items).build(), auditStamp, true, true); } /** @@ -541,6 +514,7 @@ public List ingestAspects(Urn entityUrn, * successful update * @return the {@link RecordTemplate} representation of the written aspect object */ + @Override public List ingestAspects(@Nonnull final AspectsBatch aspectsBatch, @Nonnull final AuditStamp auditStamp, boolean emitMCL, @@ -719,7 +693,7 @@ public RecordTemplate ingestAspectIfNotPresent(@Nonnull Urn urn, @Nonnull SystemMetadata systemMetadata) { log.debug("Invoked ingestAspectIfNotPresent with urn: {}, aspectName: {}, newValue: {}", urn, aspectName, newValue); - AspectsBatch aspectsBatch = AspectsBatch.builder() + AspectsBatchImpl aspectsBatch = AspectsBatchImpl.builder() .one(UpsertBatchItem.builder() .urn(urn) .aspectName(aspectName) @@ -739,9 +713,10 @@ public RecordTemplate ingestAspectIfNotPresent(@Nonnull Urn urn, * @param async a flag to control whether we commit to primary store or just write to proposal log before returning * @return an {@link IngestResult} containing the results */ - public IngestResult ingestSingleProposal(MetadataChangeProposal proposal, AuditStamp auditStamp, final boolean async) { - return ingestProposal(AspectsBatch.builder().mcps(List.of(proposal), getEntityRegistry()).build(), auditStamp, async) - .stream().findFirst().get(); + @Override + public IngestResult ingestProposal(MetadataChangeProposal proposal, AuditStamp auditStamp, final boolean async) { + return ingestProposal(AspectsBatchImpl.builder().mcps(List.of(proposal), getEntityRegistry()).build(), auditStamp, + async).stream().findFirst().get(); } /** @@ -844,7 +819,7 @@ private Stream ingestProposalAsync(AspectsBatch aspectsBatch) { } private Stream ingestProposalSync(AspectsBatch aspectsBatch, AuditStamp auditStamp) { - AspectsBatch nonTimeseries = AspectsBatch.builder() + AspectsBatchImpl nonTimeseries = AspectsBatchImpl.builder() .items(aspectsBatch.getItems().stream() .filter(item -> !item.getAspectSpec().isTimeseries()) .collect(Collectors.toList())) @@ -1085,12 +1060,14 @@ public Map getEntities(@Nonnull final Set urns, @Nonnull Set toEntity(entry.getValue()))); } + @Override public Pair, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, @Nonnull final AspectSpec aspectSpec, @Nonnull final MetadataChangeLog metadataChangeLog) { Future future = _producer.produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog); return Pair.of(future, preprocessEvent(metadataChangeLog)); } + @Override public Pair, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName, @Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue, @Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata, @@ -1276,7 +1253,7 @@ private void ingestSnapshotUnion(@Nonnull final Snapshot snapshotUnion, @Nonnull aspectRecordsToIngest.addAll(generateDefaultAspectsIfMissing(urn, aspectRecordsToIngest.stream().map(Pair::getFirst).collect(Collectors.toSet()))); - AspectsBatch aspectsBatch = AspectsBatch.builder() + AspectsBatchImpl aspectsBatch = AspectsBatchImpl.builder() .items(aspectRecordsToIngest.stream().map(pair -> UpsertBatchItem.builder() .urn(urn) .aspectName(pair.getKey()) @@ -1594,7 +1571,7 @@ public RollbackResult deleteAspect(String urn, String aspectName, @Nonnull Map mcps) { + return AspectsBatchImpl.builder() + .mcps(mcps, _entityService.getEntityRegistry()) + .build(); + } + @Override @WithSpan protected void applyRetention(List retentionContexts) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 2cfebaeab9839b..93c40aab2a1fa7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -14,7 +14,16 @@ import com.linkedin.metadata.query.ExtraInfoArray; import com.linkedin.metadata.query.ListResultMetadata; import com.linkedin.metadata.search.utils.QueryUtils; -import io.ebean.*; +import io.ebean.DuplicateKeyException; +import io.ebean.EbeanServer; +import io.ebean.ExpressionList; +import io.ebean.Junction; +import io.ebean.PagedList; +import io.ebean.Query; +import io.ebean.RawSql; +import io.ebean.RawSqlBuilder; +import io.ebean.Transaction; +import io.ebean.TxScope; import io.ebean.annotation.Platform; import io.ebean.annotation.TxIsolation; import java.net.URISyntaxException; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanRetentionService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanRetentionService.java index 21bbc38430436e..33888594c44589 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanRetentionService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanRetentionService.java @@ -4,8 +4,11 @@ import com.datahub.util.RecordUtils; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.RetentionService; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.entity.retention.BulkApplyRetentionArgs; import com.linkedin.metadata.entity.retention.BulkApplyRetentionResult; +import com.linkedin.metadata.entity.transactions.AspectsBatch; +import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.retention.DataHubRetentionConfig; import com.linkedin.retention.Retention; import com.linkedin.retention.TimeBasedRetention; @@ -48,6 +51,13 @@ public EntityService getEntityService() { return _entityService; } + @Override + protected AspectsBatch buildAspectsBatch(List mcps) { + return AspectsBatchImpl.builder() + .mcps(mcps, _entityService.getEntityRegistry()) + .build(); + } + @Override @WithSpan protected void applyRetention(List retentionContexts) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AspectsBatch.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AspectsBatchImpl.java similarity index 59% rename from metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AspectsBatch.java rename to metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AspectsBatchImpl.java index 17037a4bced686..ca5e070bc5ca73 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AspectsBatch.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AspectsBatchImpl.java @@ -1,6 +1,8 @@ package com.linkedin.metadata.entity.ebean.transactions; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.entity.transactions.AbstractBatchItem; +import com.linkedin.metadata.entity.transactions.AspectsBatch; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.mxe.MetadataChangeProposal; import lombok.Builder; @@ -8,40 +10,28 @@ import lombok.extern.slf4j.Slf4j; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; + @Slf4j @Getter @Builder(toBuilder = true) -public class AspectsBatch { +public class AspectsBatchImpl implements AspectsBatch { private final List items; - public Map> getUrnAspectsMap() { - return items.stream() - .map(aspect -> Map.entry(aspect.getUrn().toString(), aspect.getAspectName())) - .collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toSet()))); - } - - public boolean containsDuplicateAspects() { - return items.stream().map(i -> String.format("%s_%s", i.getClass().getName(), i.hashCode())) - .distinct().count() != items.size(); - } - - public static class AspectsBatchBuilder { + public static class AspectsBatchImplBuilder { /** * Just one aspect record template * @param data aspect data * @return builder */ - public AspectsBatchBuilder one(AbstractBatchItem data) { + public AspectsBatchImplBuilder one(AbstractBatchItem data) { this.items = List.of(data); return this; } - public AspectsBatchBuilder mcps(List mcps, EntityRegistry entityRegistry) { + public AspectsBatchImplBuilder mcps(List mcps, EntityRegistry entityRegistry) { this.items = mcps.stream().map(mcp -> { if (mcp.getChangeType().equals(ChangeType.PATCH)) { return PatchBatchItem.PatchBatchItemBuilder.build(mcp, entityRegistry); @@ -61,8 +51,8 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - AspectsBatch that = (AspectsBatch) o; - return items.equals(that.items); + AspectsBatchImpl that = (AspectsBatchImpl) o; + return Objects.equals(items, that.items); } @Override @@ -72,9 +62,6 @@ public int hashCode() { @Override public String toString() { - return "AspectsBatch{" - + "items=" - + items - + '}'; + return "AspectsBatchImpl{" + "items=" + items + '}'; } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/PatchBatchItem.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/PatchBatchItem.java index b5edccbc57fd83..cc0b3d915b4077 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/PatchBatchItem.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/PatchBatchItem.java @@ -10,8 +10,9 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.EntityUtils; +import com.linkedin.metadata.entity.transactions.AbstractBatchItem; +import com.linkedin.metadata.entity.validation.ValidationUtils; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -58,6 +59,11 @@ public ChangeType getChangeType() { return ChangeType.PATCH; } + @Override + public void validateUrn(EntityRegistry entityRegistry, Urn urn) { + EntityUtils.validateUrn(entityRegistry, urn); + } + public UpsertBatchItem applyPatch(EntityRegistry entityRegistry, RecordTemplate recordTemplate) { UpsertBatchItem.UpsertBatchItemBuilder builder = UpsertBatchItem.builder() .urn(getUrn()) @@ -94,7 +100,7 @@ public PatchBatchItem build(EntityRegistry entityRegistry) { entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType())); log.debug("entity spec = {}", this.entitySpec); - aspectSpec(AspectUtils.validate(this.entitySpec, this.aspectName)); + aspectSpec(ValidationUtils.validate(this.entitySpec, this.aspectName)); log.debug("aspect spec = {}", this.aspectSpec); if (this.patch == null) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/UpsertBatchItem.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/UpsertBatchItem.java index 9c89858f33c316..bd58d267a8308e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/UpsertBatchItem.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/UpsertBatchItem.java @@ -5,9 +5,9 @@ import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.EntityAspect; import com.linkedin.metadata.entity.EntityUtils; +import com.linkedin.metadata.entity.transactions.AbstractBatchItem; import com.linkedin.metadata.entity.validation.ValidationUtils; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; @@ -50,6 +50,11 @@ public ChangeType getChangeType() { return ChangeType.UPSERT; } + @Override + public void validateUrn(EntityRegistry entityRegistry, Urn urn) { + EntityUtils.validateUrn(entityRegistry, urn); + } + public EntityAspect toLatestEntityAspect(AuditStamp auditStamp) { EntityAspect latest = new EntityAspect(); latest.setAspect(getAspectName()); @@ -70,10 +75,10 @@ public UpsertBatchItem build(EntityRegistry entityRegistry) { entitySpec(entityRegistry.getEntitySpec(this.urn.getEntityType())); log.debug("entity spec = {}", this.entitySpec); - aspectSpec(AspectUtils.validate(this.entitySpec, this.aspectName)); + aspectSpec(ValidationUtils.validate(this.entitySpec, this.aspectName)); log.debug("aspect spec = {}", this.aspectSpec); - AspectUtils.validateRecordTemplate(entityRegistry, this.entitySpec, this.urn, this.aspect); + ValidationUtils.validateRecordTemplate(entityRegistry, this.entitySpec, this.urn, this.aspect); return new UpsertBatchItem(this.urn, this.aspectName, AbstractBatchItem.generateSystemMetadataIfEmpty(this.systemMetadata), this.aspect, this.metadataChangeProposal, this.entitySpec, this.aspectSpec); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/validation/ValidationUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/validation/ValidationUtils.java index 99bb323e51ecb0..6182b27333cbb3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/validation/ValidationUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/validation/ValidationUtils.java @@ -1,8 +1,16 @@ package com.linkedin.metadata.entity.validation; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.schema.validation.ValidationResult; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.entity.EntityUtils; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; import lombok.extern.slf4j.Slf4j; +import java.util.function.Consumer; + @Slf4j public class ValidationUtils { @@ -33,6 +41,36 @@ public static void validateOrWarn(RecordTemplate record) { }); } + public static AspectSpec validate(EntitySpec entitySpec, String aspectName) { + if (aspectName == null || aspectName.isEmpty()) { + throw new UnsupportedOperationException("Aspect name is required for create and update operations"); + } + + AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); + + if (aspectSpec == null) { + throw new RuntimeException( + String.format("Unknown aspect %s for entity %s", aspectName, entitySpec.getName())); + } + + return aspectSpec; + } + + public static void validateRecordTemplate(EntityRegistry entityRegistry, EntitySpec entitySpec, Urn urn, RecordTemplate aspect) { + EntityRegistryUrnValidator validator = new EntityRegistryUrnValidator(entityRegistry); + validator.setCurrentEntitySpec(entitySpec); + Consumer resultFunction = validationResult -> { + throw new IllegalArgumentException("Invalid format for aspect: " + entitySpec.getName() + "\n Cause: " + + validationResult.getMessages()); }; + RecordTemplateValidator.validate(EntityUtils.buildKeyAspect(entityRegistry, urn), resultFunction, validator); + RecordTemplateValidator.validate(aspect, resultFunction, validator); + } + + public static void validateRecordTemplate(EntityRegistry entityRegistry, Urn urn, RecordTemplate aspect) { + EntitySpec entitySpec = entityRegistry.getEntitySpec(urn.getEntityType()); + validateRecordTemplate(entityRegistry, entitySpec, urn, aspect); + } + private ValidationUtils() { } } \ No newline at end of file diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java index cf1e262a04343c..e95378a616d971 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java @@ -5,7 +5,7 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.identity.CorpUserInfo; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.transactions.UpsertBatchItem; import com.linkedin.metadata.key.CorpUserKey; import java.util.HashMap; @@ -41,7 +41,7 @@ public static Map ingestCorpUserKeyAspects(EntityService entit .systemMetadata(AspectGenerationUtils.createSystemMetadata()) .build(entityService.getEntityRegistry())); } - entityService.ingestAspects(AspectsBatch.builder().items(items).build(), AspectGenerationUtils.createAuditStamp(), true, true); + entityService.ingestAspects(AspectsBatchImpl.builder().items(items).build(), AspectGenerationUtils.createAuditStamp(), true, true); return aspects; } @@ -67,7 +67,7 @@ public static Map ingestCorpUserInfoAspects(@Nonnull final En .systemMetadata(AspectGenerationUtils.createSystemMetadata()) .build(entityService.getEntityRegistry())); } - entityService.ingestAspects(AspectsBatch.builder().items(items).build(), AspectGenerationUtils.createAuditStamp(), true, true); + entityService.ingestAspects(AspectsBatchImpl.builder().items(items).build(), AspectGenerationUtils.createAuditStamp(), true, true); return aspects; } @@ -94,7 +94,7 @@ public static Map ingestChartInfoAspects(@Nonnull final EntitySe .systemMetadata(AspectGenerationUtils.createSystemMetadata()) .build(entityService.getEntityRegistry())); } - entityService.ingestAspects(AspectsBatch.builder().items(items).build(), AspectGenerationUtils.createAuditStamp(), true, true); + entityService.ingestAspects(AspectsBatchImpl.builder().items(items).build(), AspectGenerationUtils.createAuditStamp(), true, true); return aspects; } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index 87c8f0ae30498c..bf720166fdd9aa 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -10,7 +10,7 @@ import com.linkedin.metadata.EbeanTestUtils; import com.linkedin.metadata.entity.ebean.EbeanAspectDao; import com.linkedin.metadata.entity.ebean.EbeanRetentionService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.transactions.UpsertBatchItem; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.key.CorpUserKey; @@ -117,7 +117,7 @@ public void testIngestListLatestAspects() throws AssertionError { .systemMetadata(metadata1) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // List aspects ListResult batch1 = _entityServiceImpl.listLatestAspects(entityUrn1.getEntityType(), aspectName, 0, 2); @@ -181,7 +181,7 @@ public void testIngestListUrns() throws AssertionError { .systemMetadata(metadata1) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // List aspects urns ListUrnsResult batch1 = _entityServiceImpl.listUrns(entityUrn1.getEntityType(), 0, 2); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index 1c116dee79f8f2..c0d2a3783c0a7c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -32,7 +32,7 @@ import com.linkedin.metadata.aspect.CorpUserAspect; import com.linkedin.metadata.aspect.CorpUserAspectArray; import com.linkedin.metadata.aspect.VersionedAspect; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.transactions.UpsertBatchItem; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.event.EventProducer; @@ -48,7 +48,6 @@ import com.linkedin.metadata.snapshot.Snapshot; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.GenericAspect; -import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; @@ -544,7 +543,7 @@ public void testReingestLineageProposal() throws Exception { mcp1.setSystemMetadata(metadata1); mcp1.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME); - _entityServiceImpl.ingestSingleProposal(mcp1, TEST_AUDIT_STAMP, false); + _entityServiceImpl.ingestProposal(mcp1, TEST_AUDIT_STAMP, false); final MetadataChangeLog initialChangeLog = new MetadataChangeLog(); initialChangeLog.setEntityType(entityUrn.getEntityType()); @@ -579,7 +578,7 @@ public void testReingestLineageProposal() throws Exception { // Mockito detects the previous invocation and throws an error in verifying the second call unless invocations are cleared clearInvocations(_mockProducer); - _entityServiceImpl.ingestSingleProposal(mcp1, TEST_AUDIT_STAMP, false); + _entityServiceImpl.ingestProposal(mcp1, TEST_AUDIT_STAMP, false); verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.eq(restateChangeLog)); @@ -606,7 +605,7 @@ public void testIngestTimeseriesAspect() throws Exception { genericAspect.setValue(ByteString.unsafeWrap(datasetProfileSerialized)); genericAspect.setContentType("application/json"); gmce.setAspect(genericAspect); - _entityServiceImpl.ingestSingleProposal(gmce, TEST_AUDIT_STAMP, false); + _entityServiceImpl.ingestProposal(gmce, TEST_AUDIT_STAMP, false); } @Test @@ -625,7 +624,7 @@ public void testAsyncProposalVersioned() throws Exception { genericAspect.setValue(ByteString.unsafeWrap(datasetPropertiesSerialized)); genericAspect.setContentType("application/json"); gmce.setAspect(genericAspect); - _entityServiceImpl.ingestSingleProposal(gmce, TEST_AUDIT_STAMP, true); + _entityServiceImpl.ingestProposal(gmce, TEST_AUDIT_STAMP, true); verify(_mockProducer, times(0)).produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any()); verify(_mockProducer, times(1)).produceMetadataChangeProposal(Mockito.eq(entityUrn), @@ -651,7 +650,7 @@ public void testAsyncProposalTimeseries() throws Exception { genericAspect.setValue(ByteString.unsafeWrap(datasetProfileSerialized)); genericAspect.setContentType("application/json"); gmce.setAspect(genericAspect); - _entityServiceImpl.ingestSingleProposal(gmce, TEST_AUDIT_STAMP, true); + _entityServiceImpl.ingestProposal(gmce, TEST_AUDIT_STAMP, true); verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any()); verify(_mockProducer, times(0)).produceMetadataChangeProposal(Mockito.eq(entityUrn), @@ -719,7 +718,7 @@ public void testGetAspectAtVersion() throws AssertionError { assertTrue(DataTemplateUtil.areEqual(writtenVersionedAspect1, readAspect1)); // Validate retrieval of CorpUserInfo Aspect #2 - _entityService.ingestAspects(entityUrn, List.of(Pair.of(aspectName, writeAspect2)), TEST_AUDIT_STAMP, null); + _entityServiceImpl.ingestAspects(entityUrn, List.of(Pair.of(aspectName, writeAspect2)), TEST_AUDIT_STAMP, null); VersionedAspect writtenVersionedAspect2 = new VersionedAspect(); writtenVersionedAspect2.setAspect(Aspect.create(writeAspect2)); @@ -731,7 +730,7 @@ public void testGetAspectAtVersion() throws AssertionError { verify(_mockProducer, times(2)).produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.eq(corpUserInfoSpec), Mockito.any()); - readAspect1 = _entityService.getVersionedAspect(entityUrn, aspectName, -1); + readAspect1 = _entityServiceImpl.getVersionedAspect(entityUrn, aspectName, -1); assertFalse(DataTemplateUtil.areEqual(writtenVersionedAspect1, readAspect1)); verifyNoMoreInteractions(_mockProducer); @@ -786,7 +785,7 @@ public void testRollbackAspect() throws AssertionError { .systemMetadata(metadata2) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // this should no-op since this run has been overwritten AspectRowSummary rollbackOverwrittenAspect = new AspectRowSummary(); @@ -854,7 +853,7 @@ public void testRollbackKey() throws AssertionError { .systemMetadata(metadata2) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // this should no-op since the key should have been written in the furst run AspectRowSummary rollbackKeyWithWrongRunId = new AspectRowSummary(); @@ -898,7 +897,6 @@ public void testRollbackUrn() throws AssertionError { // Ingest CorpUserInfo Aspect #1 CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); - _entityServiceImpl.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); RecordTemplate writeKey1 = EntityUtils.buildKeyAspect(_testEntityRegistry, entityUrn1); @@ -943,7 +941,7 @@ public void testRollbackUrn() throws AssertionError { .systemMetadata(metadata2) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // this should no-op since the key should have been written in the furst run AspectRowSummary rollbackKeyWithWrongRunId = new AspectRowSummary(); @@ -981,10 +979,10 @@ public void testIngestGetLatestAspect() throws AssertionError { .systemMetadata(metadata1) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // Validate retrieval of CorpUserInfo Aspect #1 - RecordTemplate readAspect1 = _entityService.getLatestAspect(entityUrn, aspectName); + RecordTemplate readAspect1 = _entityServiceImpl.getLatestAspect(entityUrn, aspectName); assertTrue(DataTemplateUtil.areEqual(writeAspect1, readAspect1)); ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class); @@ -1010,10 +1008,10 @@ public void testIngestGetLatestAspect() throws AssertionError { .systemMetadata(metadata2) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // Validate retrieval of CorpUserInfo Aspect #2 - RecordTemplate readAspect2 = _entityService.getLatestAspect(entityUrn, aspectName); + RecordTemplate readAspect2 = _entityServiceImpl.getLatestAspect(entityUrn, aspectName); EntityAspect readAspectDao1 = _aspectDao.getAspect(entityUrn.toString(), aspectName, 1); EntityAspect readAspectDao2 = _aspectDao.getAspect(entityUrn.toString(), aspectName, 0); @@ -1050,10 +1048,10 @@ public void testIngestGetLatestEnvelopedAspect() throws Exception { .systemMetadata(metadata1) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // Validate retrieval of CorpUserInfo Aspect #1 - EnvelopedAspect readAspect1 = _entityService.getLatestEnvelopedAspect("corpuser", entityUrn, aspectName); + EnvelopedAspect readAspect1 = _entityServiceImpl.getLatestEnvelopedAspect("corpuser", entityUrn, aspectName); assertTrue(DataTemplateUtil.areEqual(writeAspect1, new CorpUserInfo(readAspect1.getValue().data()))); // Ingest CorpUserInfo Aspect #2 @@ -1067,10 +1065,10 @@ public void testIngestGetLatestEnvelopedAspect() throws Exception { .systemMetadata(metadata2) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // Validate retrieval of CorpUserInfo Aspect #2 - EnvelopedAspect readAspect2 = _entityService.getLatestEnvelopedAspect("corpuser", entityUrn, aspectName); + EnvelopedAspect readAspect2 = _entityServiceImpl.getLatestEnvelopedAspect("corpuser", entityUrn, aspectName); EntityAspect readAspectDao1 = _aspectDao.getAspect(entityUrn.toString(), aspectName, 1); EntityAspect readAspectDao2 = _aspectDao.getAspect(entityUrn.toString(), aspectName, 0); @@ -1104,10 +1102,10 @@ public void testIngestSameAspect() throws AssertionError { .systemMetadata(metadata1) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // Validate retrieval of CorpUserInfo Aspect #1 - RecordTemplate readAspect1 = _entityService.getLatestAspect(entityUrn, aspectName); + RecordTemplate readAspect1 = _entityServiceImpl.getLatestAspect(entityUrn, aspectName); assertTrue(DataTemplateUtil.areEqual(writeAspect1, readAspect1)); ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class); @@ -1133,10 +1131,10 @@ public void testIngestSameAspect() throws AssertionError { .systemMetadata(metadata2) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); // Validate retrieval of CorpUserInfo Aspect #2 - RecordTemplate readAspect2 = _entityService.getLatestAspect(entityUrn, aspectName); + RecordTemplate readAspect2 = _entityServiceImpl.getLatestAspect(entityUrn, aspectName); EntityAspect readAspectDao2 = _aspectDao.getAspect(entityUrn.toString(), aspectName, ASPECT_LATEST_VERSION); assertTrue(DataTemplateUtil.areEqual(writeAspect2, readAspect2)); @@ -1207,7 +1205,7 @@ public void testRetention() throws AssertionError { .systemMetadata(metadata1) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); assertEquals(_entityServiceImpl.getAspect(entityUrn, aspectName, 1), writeAspect1); assertEquals(_entityServiceImpl.getAspect(entityUrn, aspectName2, 1), writeAspect2); @@ -1236,7 +1234,7 @@ public void testRetention() throws AssertionError { .systemMetadata(metadata1) .build(_testEntityRegistry) ); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); + _entityServiceImpl.ingestAspects(AspectsBatchImpl.builder().items(items).build(), TEST_AUDIT_STAMP, true, true); assertNull(_entityServiceImpl.getAspect(entityUrn, aspectName, 1)); assertEquals(_entityServiceImpl.getAspect(entityUrn, aspectName2, 1), writeAspect2); @@ -1451,7 +1449,7 @@ public void testUIPreProcessedProposal() throws Exception { genericAspect.setValue(ByteString.unsafeWrap(datasetPropertiesSerialized)); genericAspect.setContentType("application/json"); gmce.setAspect(genericAspect); - _entityServiceImpl.ingestSingleProposal(gmce, TEST_AUDIT_STAMP, false); + _entityServiceImpl.ingestProposal(gmce, TEST_AUDIT_STAMP, false); ArgumentCaptor captor = ArgumentCaptor.forClass(MetadataChangeLog.class); verify(_mockProducer, times(1)).produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), captor.capture()); diff --git a/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java b/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java index 40cf3ae1d5b9a6..125bba7ec32805 100644 --- a/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java +++ b/metadata-service/auth-impl/src/main/java/com/datahub/authentication/token/StatefulTokenService.java @@ -12,7 +12,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.entity.AspectUtils; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.key.DataHubAccessTokenKey; import com.linkedin.metadata.utils.AuditStampUtils; import com.linkedin.metadata.utils.GenericRecordUtils; @@ -129,7 +129,7 @@ public String generateAccessToken(@Nonnull final TokenType type, @Nonnull final Stream proposalStream = Stream.concat(Stream.of(proposal), AspectUtils.getAdditionalChanges(proposal, _entityService).stream()); - _entityService.ingestProposal(AspectsBatch.builder() + _entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(proposalStream.collect(Collectors.toList()), _entityService.getEntityRegistry()) .build(), auditStamp, false); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java index b7477fdcc0af90..876a0871fa4cbe 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java @@ -63,6 +63,6 @@ static void setUpgradeResult(Urn urn, EntityService entityService) throws URISyn upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME); upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult)); upgradeProposal.setChangeType(ChangeType.UPSERT); - entityService.ingestSingleProposal(upgradeProposal, auditStamp, false); + entityService.ingestProposal(upgradeProposal, auditStamp, false); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java index b408af8b66720b..dbbcf3a139bf1c 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/UpgradeStep.java @@ -92,7 +92,7 @@ private void ingestUpgradeRequestAspect() throws URISyntaxException { upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeRequest)); upgradeProposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestSingleProposal(upgradeProposal, auditStamp, false); + _entityService.ingestProposal(upgradeProposal, auditStamp, false); } private void ingestUpgradeResultAspect() throws URISyntaxException { @@ -107,7 +107,7 @@ private void ingestUpgradeResultAspect() throws URISyntaxException { upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult)); upgradeProposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestSingleProposal(upgradeProposal, auditStamp, false); + _entityService.ingestProposal(upgradeProposal, auditStamp, false); } private void cleanUpgradeAfterError(Exception e, String errorMessage) { diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java index 51b1da92756a77..ea9ac57778550b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2Step.java @@ -138,7 +138,7 @@ private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exceptio proposal.setChangeType(ChangeType.UPSERT); proposal.setSystemMetadata(new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis())); proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2)); - _entityService.ingestSingleProposal( + _entityService.ingestProposal( proposal, auditStamp, false diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java index d27324377163e2..30608e984a0f26 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java @@ -8,7 +8,7 @@ import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.AspectMigrationsDao; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.transactions.UpsertBatchItem; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.utils.DataPlatformInstanceUtils; @@ -81,7 +81,7 @@ public void execute() throws Exception { final AuditStamp aspectAuditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - _entityService.ingestAspects(AspectsBatch.builder().items(items).build(), aspectAuditStamp, true, true); + _entityService.ingestAspects(AspectsBatchImpl.builder().items(items).build(), aspectAuditStamp, true, true); log.info("Finished ingesting DataPlatformInstance for urn {} to {}", start, start + BATCH_SIZE); start += BATCH_SIZE; diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java index c8efc7e3095171..e4ad215eec8640 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformsStep.java @@ -18,7 +18,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.entity.ebean.transactions.UpsertBatchItem; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -80,7 +80,7 @@ public void execute() throws IOException, URISyntaxException { .build(_entityService.getEntityRegistry()); }).collect(Collectors.toList()); - _entityService.ingestAspects(AspectsBatch.builder().items(dataPlatformAspects).build(), + _entityService.ingestAspects(AspectsBatchImpl.builder().items(dataPlatformAspects).build(), new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), true, false); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java index 1541ca40d8911a..5bc80f46e64784 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStep.java @@ -116,7 +116,7 @@ public void execute() throws IOException, URISyntaxException { proposal.setAspect(GenericRecordUtils.serializeAspect(newSettings)); proposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestSingleProposal( + _entityService.ingestProposal( proposal, new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), false); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestOwnershipTypesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestOwnershipTypesStep.java index 596af9cb857f67..55d612618ff9fc 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestOwnershipTypesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestOwnershipTypesStep.java @@ -9,7 +9,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.boot.UpgradeStep; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; @@ -108,7 +108,7 @@ private void ingestOwnershipType(final Urn ownershipTypeUrn, final OwnershipType proposal.setAspect(GenericRecordUtils.serializeAspect(info)); proposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestProposal(AspectsBatch.builder() + _entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(List.of(keyAspectProposal, proposal), _entityService.getEntityRegistry()).build(), auditStamp, false); } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java index 67291946f83dd8..87dcfd736da401 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestPoliciesStep.java @@ -13,7 +13,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.ListUrnsResult; @@ -182,7 +182,7 @@ private void ingestPolicy(final Urn urn, final DataHubPolicyInfo info) throws UR proposal.setAspect(GenericRecordUtils.serializeAspect(info)); proposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestProposal(AspectsBatch.builder() + _entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(List.of(keyAspectProposal, proposal), _entityRegistry) .build(), new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java index 4b67c014b26e8d..99be1851139682 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestRolesStep.java @@ -10,7 +10,7 @@ import com.linkedin.metadata.Constants; import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.EntityService; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.EntityKeyUtils; @@ -108,7 +108,7 @@ private void ingestRole(final Urn roleUrn, final DataHubRoleInfo dataHubRoleInfo proposal.setAspect(GenericRecordUtils.serializeAspect(dataHubRoleInfo)); proposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestProposal(AspectsBatch.builder() + _entityService.ingestProposal(AspectsBatchImpl.builder() .mcps(List.of(keyAspectProposal, proposal), _entityRegistry).build(), new AuditStamp().setActor(Urn.createFromString(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), false); diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java index ea71a41c7baa59..4828e3b2b2b289 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java @@ -168,6 +168,6 @@ private void ingestUpgradeAspect(String aspectName, RecordTemplate aspect, Audit upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect)); upgradeProposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestSingleProposal(upgradeProposal, auditStamp, false); + _entityService.ingestProposal(upgradeProposal, auditStamp, false); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java index 38cf773e577b4b..7fcafa24d7b452 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStep.java @@ -128,7 +128,7 @@ private void migrateBrowsePath(Urn urn, AuditStamp auditStamp) throws Exception proposal.setChangeType(ChangeType.UPSERT); proposal.setSystemMetadata(new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis())); proposal.setAspect(GenericRecordUtils.serializeAspect(newPaths)); - _entityService.ingestSingleProposal( + _entityService.ingestProposal( proposal, auditStamp, false diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java index 1fb979cba2d3cb..49fce75ab7c611 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/BackfillBrowsePathsV2StepTest.java @@ -97,7 +97,7 @@ public void testExecuteNoExistingBrowsePaths() throws Exception { Mockito.eq(null) ); // Verify that 11 aspects are ingested, 2 for the upgrade request / result, 9 for ingesting 1 of each entity type - Mockito.verify(mockService, Mockito.times(11)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(11)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(), Mockito.eq(false) @@ -124,7 +124,7 @@ public void testDoesNotRunWhenAlreadyExecuted() throws Exception { BackfillBrowsePathsV2Step backfillBrowsePathsV2Step = new BackfillBrowsePathsV2Step(mockService, mockSearchService); backfillBrowsePathsV2Step.execute(); - Mockito.verify(mockService, Mockito.times(0)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean() diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStepTest.java index 50ab1f53455f15..24bdd193a39c80 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDefaultGlobalSettingsStepTest.java @@ -38,7 +38,7 @@ public void testExecuteValidSettingsNoExistingSettings() throws Exception { GlobalSettingsInfo expectedResult = new GlobalSettingsInfo(); expectedResult.setViews(new GlobalViewsSettings().setDefaultView(UrnUtils.getUrn("urn:li:dataHubView:test"))); - Mockito.verify(entityService, times(1)).ingestSingleProposal( + Mockito.verify(entityService, times(1)).ingestProposal( Mockito.eq(buildUpdateSettingsProposal(expectedResult)), Mockito.any(AuditStamp.class), Mockito.eq(false) @@ -65,7 +65,7 @@ public void testExecuteValidSettingsExistingSettings() throws Exception { GlobalSettingsInfo expectedResult = new GlobalSettingsInfo(); expectedResult.setViews(new GlobalViewsSettings().setDefaultView(UrnUtils.getUrn("urn:li:dataHubView:custom"))); - Mockito.verify(entityService, times(1)).ingestSingleProposal( + Mockito.verify(entityService, times(1)).ingestProposal( Mockito.eq(buildUpdateSettingsProposal(expectedResult)), Mockito.any(AuditStamp.class), Mockito.eq(false) diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java index 29664436a28d91..d90c1947b3e5e4 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreColumnLineageIndicesTest.java @@ -59,7 +59,7 @@ public void testExecuteFirstTime() throws Exception { Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.CHART_ENTITY_NAME); Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); // creates upgradeRequest and upgradeResult aspects - Mockito.verify(mockService, Mockito.times(2)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.eq(false) @@ -121,7 +121,7 @@ public void testExecuteWithNewVersion() throws Exception { Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.CHART_ENTITY_NAME); Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); // creates upgradeRequest and upgradeResult aspects - Mockito.verify(mockService, Mockito.times(2)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.eq(false) @@ -183,7 +183,7 @@ public void testDoesNotExecuteWithSameVersion() throws Exception { Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.CHART_ENTITY_NAME); Mockito.verify(mockRegistry, Mockito.times(0)).getEntitySpec(Constants.DASHBOARD_ENTITY_NAME); // creates upgradeRequest and upgradeResult aspects - Mockito.verify(mockService, Mockito.times(0)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.eq(false) diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java index 2ebc16b3973d1b..e6104c9c590638 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/RestoreGlossaryIndicesTest.java @@ -105,7 +105,7 @@ public void testExecuteFirstTime() throws Exception { Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME); Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME); - Mockito.verify(mockService, Mockito.times(2)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.eq(false) @@ -166,7 +166,7 @@ public void testExecutesWithNewVersion() throws Exception { Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME); Mockito.verify(mockRegistry, Mockito.times(1)).getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME); - Mockito.verify(mockService, Mockito.times(2)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.eq(false) @@ -227,7 +227,7 @@ public void testDoesNotRunWhenAlreadyExecuted() throws Exception { Mockito.verify(mockSearchService, Mockito.times(0)).search(Constants.GLOSSARY_NODE_ENTITY_NAME, "", null, null, 0, 1000, new SearchFlags().setFulltext(false) .setSkipAggregates(true).setSkipHighlighting(true)); - Mockito.verify(mockService, Mockito.times(0)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean() diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java index a9410fb2522f21..5e4ad6e7fe8809 100644 --- a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/UpgradeDefaultBrowsePathsStepTest.java @@ -87,7 +87,7 @@ public void testExecuteNoExistingBrowsePaths() throws Exception { Mockito.eq(5000) ); // Verify that 4 aspects are ingested, 2 for the upgrade request / result, but none for ingesting - Mockito.verify(mockService, Mockito.times(2)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(), Mockito.eq(false) @@ -155,7 +155,7 @@ public void testExecuteFirstTime() throws Exception { Mockito.eq(5000) ); // Verify that 4 aspects are ingested, 2 for the upgrade request / result and 2 for the browse pahts - Mockito.verify(mockService, Mockito.times(4)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(4)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(), Mockito.eq(false) @@ -223,7 +223,7 @@ public void testDoesNotRunWhenBrowsePathIsNotQualified() throws Exception { Mockito.eq(5000) ); // Verify that 2 aspects are ingested, only those for the upgrade step - Mockito.verify(mockService, Mockito.times(2)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(2)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(), Mockito.eq(false) @@ -249,7 +249,7 @@ public void testDoesNotRunWhenAlreadyExecuted() throws Exception { UpgradeDefaultBrowsePathsStep step = new UpgradeDefaultBrowsePathsStep(mockService); step.execute(); - Mockito.verify(mockService, Mockito.times(0)).ingestSingleProposal( + Mockito.verify(mockService, Mockito.times(0)).ingestProposal( Mockito.any(MetadataChangeProposal.class), Mockito.any(AuditStamp.class), Mockito.anyBoolean() diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java index ea0d5ff2b24f01..4d0e5e7df29d58 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/util/MappingUtil.java @@ -19,8 +19,10 @@ import com.linkedin.entity.Aspect; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.IngestResult; import com.linkedin.metadata.entity.RollbackRunResult; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; +import com.linkedin.metadata.entity.transactions.AspectsBatch; import com.linkedin.metadata.entity.validation.ValidationException; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.entity.AspectUtils; @@ -270,14 +272,14 @@ public static Pair ingestProposal(com.linkedin.mxe.MetadataChan Stream proposalStream = Stream.concat(Stream.of(serviceProposal), AspectUtils.getAdditionalChanges(serviceProposal, entityService).stream()); - AspectsBatch batch = AspectsBatch.builder().mcps(proposalStream.collect(Collectors.toList()), + AspectsBatch batch = AspectsBatchImpl.builder().mcps(proposalStream.collect(Collectors.toList()), entityService.getEntityRegistry()).build(); - Set proposalResult = + Set proposalResult = entityService.ingestProposal(batch, auditStamp, false); Urn urn = proposalResult.stream().findFirst().get().getUrn(); - return new Pair<>(urn.toString(), proposalResult.stream().anyMatch(EntityService.IngestResult::isSqlCommitted)); + return new Pair<>(urn.toString(), proposalResult.stream().anyMatch(IngestResult::isSqlCommitted)); } catch (ValidationException ve) { exceptionally = ve; throw HttpClientErrorException.create(HttpStatus.UNPROCESSABLE_ENTITY, ve.getMessage(), null, null, null); diff --git a/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java b/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java index db155e192e653c..6a0c336e42bfc1 100644 --- a/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java +++ b/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java @@ -9,7 +9,7 @@ import com.linkedin.metadata.config.PreProcessHooks; import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.metadata.entity.AspectDao; -import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.UpdateAspectResult; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.service.UpdateIndicesService; @@ -66,8 +66,8 @@ public void setup() EntityRegistry mockEntityRegistry = new MockEntityRegistry(); AspectDao aspectDao = Mockito.mock(AspectDao.class); Mockito.when(aspectDao.runInTransactionWithRetry( - ArgumentMatchers.>any(), anyInt())).thenAnswer(i -> - ((Function) i.getArgument(0)).apply(Mockito.mock(Transaction.class)) + ArgumentMatchers.>any(), anyInt())).thenAnswer(i -> + ((Function) i.getArgument(0)).apply(Mockito.mock(Transaction.class)) ); EventProducer mockEntityEventProducer = Mockito.mock(EventProducer.class); diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 846b098d1ab358..1ce57b4b95d9e8 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -8,7 +8,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.linkedin.aspect.GetTimeseriesAspectValuesResponse; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; +import com.linkedin.metadata.entity.IngestResult; +import com.linkedin.metadata.entity.ebean.transactions.AspectsBatchImpl; +import com.linkedin.metadata.entity.transactions.AspectsBatch; import com.linkedin.metadata.resources.operations.Utils; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; @@ -207,22 +209,22 @@ public Task ingestProposal( final AspectsBatch batch; if (asyncBool) { // if async we'll expand the additional changes later, no need to do this early - batch = AspectsBatch.builder() + batch = AspectsBatchImpl.builder() .mcps(List.of(metadataChangeProposal), _entityService.getEntityRegistry()) .build(); } else { Stream proposalStream = Stream.concat(Stream.of(metadataChangeProposal), AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService).stream()); - batch = AspectsBatch.builder() + batch = AspectsBatchImpl.builder() .mcps(proposalStream.collect(Collectors.toList()), _entityService.getEntityRegistry()) .build(); } - Set results = + Set results = _entityService.ingestProposal(batch, auditStamp, asyncBool); - EntityService.IngestResult one = results.stream() + IngestResult one = results.stream() .findFirst() .get(); diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java index f5a009b7018885..3ff22fb7676760 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java @@ -294,7 +294,7 @@ private void updateExecutionRequestStatus(String runId, String status) { proposal.setAspect(GenericRecordUtils.serializeAspect(requestResult)); proposal.setChangeType(ChangeType.UPSERT); - _entityService.ingestSingleProposal(proposal, + _entityService.ingestProposal(proposal, new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()), false); } } catch (Exception e) { diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java index 0c7a0bb6d47e2e..59201d875e5ce2 100644 --- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java +++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java @@ -15,9 +15,9 @@ import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.UpdateAspectResult; import com.linkedin.metadata.entity.ebean.transactions.UpsertBatchItem; import com.linkedin.metadata.entity.EntityServiceImpl; -import com.linkedin.metadata.entity.UpdateAspectResult; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; @@ -90,23 +90,23 @@ public void testAsyncDefaultAspects() throws URISyntaxException { .build(_entityRegistry); when(_aspectDao.runInTransactionWithRetry(any(), anyInt())) .thenReturn(List.of( - EntityService.UpdateAspectResult.builder().urn(urn) + UpdateAspectResult.builder().urn(urn) .newValue(new DatasetProperties().setName("name1")) .auditStamp(new AuditStamp()) .request(req).build(), - EntityService.UpdateAspectResult.builder().urn(urn) + UpdateAspectResult.builder().urn(urn) .newValue(new DatasetProperties().setName("name2")) .auditStamp(new AuditStamp()) .request(req).build(), - EntityService.UpdateAspectResult.builder().urn(urn) + UpdateAspectResult.builder().urn(urn) .newValue(new DatasetProperties().setName("name3")) .auditStamp(new AuditStamp()) .request(req).build(), - EntityService.UpdateAspectResult.builder().urn(urn) + UpdateAspectResult.builder().urn(urn) .newValue(new DatasetProperties().setName("name4")) .auditStamp(new AuditStamp()) .request(req).build(), - EntityService.UpdateAspectResult.builder().urn(urn) + UpdateAspectResult.builder().urn(urn) .newValue(new DatasetProperties().setName("name5")) .auditStamp(new AuditStamp()) .request(req).build())); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java index 58e24825d1f2ba..e062d55254f901 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/AspectUtils.java @@ -4,17 +4,11 @@ import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; -import com.linkedin.data.schema.validation.ValidationResult; import com.linkedin.data.template.RecordTemplate; import com.linkedin.entity.Aspect; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClient; import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.entity.validation.EntityRegistryUrnValidator; -import com.linkedin.metadata.entity.validation.RecordTemplateValidator; -import com.linkedin.metadata.models.AspectSpec; -import com.linkedin.metadata.models.EntitySpec; -import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.GenericAspect; @@ -25,7 +19,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.function.Consumer; import java.util.stream.Collectors; import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; @@ -138,34 +131,4 @@ public static AuditStamp getAuditStamp(Urn actor) { auditStamp.setActor(actor); return auditStamp; } - - public static AspectSpec validate(EntitySpec entitySpec, String aspectName) { - if (aspectName == null || aspectName.isEmpty()) { - throw new UnsupportedOperationException("Aspect name is required for create and update operations"); - } - - AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); - - if (aspectSpec == null) { - throw new RuntimeException( - String.format("Unknown aspect %s for entity %s", aspectName, entitySpec.getName())); - } - - return aspectSpec; - } - - public static void validateRecordTemplate(EntityRegistry entityRegistry, EntitySpec entitySpec, Urn urn, RecordTemplate aspect) { - EntityRegistryUrnValidator validator = new EntityRegistryUrnValidator(entityRegistry); - validator.setCurrentEntitySpec(entitySpec); - Consumer resultFunction = validationResult -> { - throw new IllegalArgumentException("Invalid format for aspect: " + entitySpec.getName() + "\n Cause: " - + validationResult.getMessages()); }; - RecordTemplateValidator.validate(EntityUtils.buildKeyAspect(entityRegistry, urn), resultFunction, validator); - RecordTemplateValidator.validate(aspect, resultFunction, validator); - } - - public static void validateRecordTemplate(EntityRegistry entityRegistry, Urn urn, RecordTemplate aspect) { - EntitySpec entitySpec = entityRegistry.getEntitySpec(urn.getEntityType()); - validateRecordTemplate(entityRegistry, entitySpec, urn, aspect); - } } \ No newline at end of file diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java index f95227373f5564..40284efe7ac82f 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/DeleteEntityService.java @@ -270,7 +270,7 @@ private void updateAspect(Urn urn, String aspectName, RecordTemplate prevAspect, proposal.setAspect(GenericRecordUtils.serializeAspect(newAspect)); final AuditStamp auditStamp = new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - final EntityService.IngestResult ingestProposalResult = _entityService.ingestSingleProposal(proposal, auditStamp, false); + final IngestResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp, false); if (!ingestProposalResult.isSqlCommitted()) { log.error("Failed to ingest aspect with references removed. Before {}, after: {}, please check MCP processor" diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java index 25edff740037ed..a84b8aabee9e6e 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -13,12 +13,11 @@ import com.linkedin.metadata.aspect.VersionedAspect; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult; +import com.linkedin.metadata.entity.transactions.AspectsBatch; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.query.ListUrnsResult; import com.linkedin.metadata.run.AspectRowSummary; -import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; @@ -28,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -140,22 +140,6 @@ EnvelopedAspect getLatestEnvelopedAspect( @Nonnull final Urn urn, @Nonnull final String aspectName) throws Exception; - /** - * Retrieves the specific version of the aspect for the given urn - * - * @param entityName name of the entity to fetch - * @param urn urn to fetch - * @param aspectName name of the aspect to fetch - * @param version version to fetch - * @return {@link EnvelopedAspect} object, or null if one cannot be found - */ - EnvelopedAspect getEnvelopedAspect( - // TODO: entityName is only used for a debug statement, can we remove this as a param? - String entityName, - @Nonnull Urn urn, - @Nonnull String aspectName, - long version) throws Exception; - @Deprecated VersionedAspect getVersionedAspect(@Nonnull Urn urn, @Nonnull String aspectName, long version); @@ -165,24 +149,11 @@ ListResult listLatestAspects( final int start, final int count); - void ingestAspects(@Nonnull final Urn urn, @Nonnull List> aspectRecordsToIngest, + List ingestAspects(@Nonnull final Urn urn, @Nonnull List> aspectRecordsToIngest, @Nonnull final AuditStamp auditStamp, @Nullable SystemMetadata systemMetadata); - /** - * Ingests (inserts) a new version of an entity aspect & emits a {@link com.linkedin.mxe.MetadataAuditEvent}. - * - * Note that in general, this should not be used externally. It is currently serving upgrade scripts and - * is as such public. - * - * @param urn an urn associated with the new aspect - * @param aspectName name of the aspect being inserted - * @param newValue value of the aspect being inserted - * @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time - * @param systemMetadata - * @return the {@link RecordTemplate} representation of the written aspect object - */ - RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName, - @Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp, @Nullable SystemMetadata systemMetadata); + List ingestAspects(@Nonnull final AspectsBatch aspectsBatch, @Nonnull final AuditStamp auditStamp, + boolean emitMCL, boolean overwrite); /** * Ingests (inserts) a new version of an entity aspect & emits a {@link com.linkedin.mxe.MetadataAuditEvent}. @@ -211,17 +182,6 @@ String batchApplyRetention(Integer start, Integer count, Integer attemptWithVers // TODO: Extract this to a different service, doesn't need to be here RestoreIndicesResult restoreIndices(@Nonnull RestoreIndicesArgs args, @Nonnull Consumer logger); - @Deprecated - RecordTemplate updateAspect( - @Nonnull final Urn urn, - @Nonnull final String entityName, - @Nonnull final String aspectName, - @Nonnull final AspectSpec aspectSpec, - @Nonnull final RecordTemplate newValue, - @Nonnull final AuditStamp auditStamp, - @Nonnull final long version, - @Nonnull final boolean emitMae); - ListUrnsResult listUrns(@Nonnull final String entityName, final int start, final int count); @Deprecated @@ -230,23 +190,14 @@ RecordTemplate updateAspect( @Deprecated Map getEntities(@Nonnull final Set urns, @Nonnull Set aspectNames); - @Deprecated - void produceMetadataAuditEvent(@Nonnull final Urn urn, @Nonnull final String aspectName, - @Nullable final RecordTemplate oldAspectValue, @Nullable final RecordTemplate newAspectValue, - @Nullable final SystemMetadata oldSystemMetadata, @Nullable final SystemMetadata newSystemMetadata, - @Nullable final MetadataAuditOperation operation); - - @Deprecated - void produceMetadataAuditEventForKey(@Nonnull final Urn urn, - @Nullable final SystemMetadata newSystemMetadata); - - void produceMetadataChangeLog(@Nonnull final Urn urn, AspectSpec aspectSpec, + Pair, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, AspectSpec aspectSpec, @Nonnull final MetadataChangeLog metadataChangeLog); - void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName, - @Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue, - @Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata, - @Nullable final SystemMetadata newSystemMetadata, @Nonnull AuditStamp auditStamp, @Nonnull final ChangeType changeType); + Pair, Boolean> alwaysProduceMCLAsync(@Nonnull final Urn urn, @Nonnull String entityName, @Nonnull String aspectName, + @Nonnull final AspectSpec aspectSpec, @Nullable final RecordTemplate oldAspectValue, + @Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata, + @Nullable final SystemMetadata newSystemMetadata, @Nonnull AuditStamp auditStamp, + @Nonnull final ChangeType changeType); RecordTemplate getLatestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName); @@ -261,9 +212,6 @@ void ingestEntities(@Nonnull final List entities, @Nonnull final AuditSt void ingestEntity(@Nonnull Entity entity, @Nonnull AuditStamp auditStamp, @Nonnull SystemMetadata systemMetadata); - @Deprecated - Snapshot buildSnapshot(@Nonnull final Urn urn, @Nonnull final RecordTemplate aspectValue); - void setRetentionService(RetentionService retentionService); AspectSpec getKeyAspectSpec(@Nonnull final Urn urn); @@ -289,8 +237,16 @@ List> generateDefaultAspectsIfMissing(@Nonnull fina RollbackRunResult rollbackWithConditions(List aspectRows, Map conditions, boolean hardDelete); - IngestProposalResult ingestProposal(@Nonnull MetadataChangeProposal mcp, - AuditStamp auditStamp, final boolean async); + Set ingestProposal(AspectsBatch aspectsBatch, AuditStamp auditStamp, final boolean async); + + /** + * If you have more than 1 proposal use the {AspectsBatch} method + * @param proposal the metadata proposal to ingest + * @param auditStamp audit information + * @param async async ingestion or sync ingestion + * @return ingestion result + */ + IngestResult ingestProposal(MetadataChangeProposal proposal, AuditStamp auditStamp, final boolean async); Boolean exists(Urn urn); diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java new file mode 100644 index 00000000000000..5e4ed6259a7f7b --- /dev/null +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/IngestResult.java @@ -0,0 +1,18 @@ +package com.linkedin.metadata.entity; + +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.entity.transactions.AbstractBatchItem; +import lombok.Builder; +import lombok.Value; + +@Builder(toBuilder = true) +@Value +public class IngestResult { + Urn urn; + AbstractBatchItem request; + boolean publishedMCL; + boolean processedMCL; + boolean publishedMCP; + boolean sqlCommitted; + boolean isUpdate; // update else insert +} diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/RetentionService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/RetentionService.java index b1d654864d674d..1cdd9965c4bfce 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/RetentionService.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/RetentionService.java @@ -7,9 +7,9 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; -import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch; import com.linkedin.metadata.entity.retention.BulkApplyRetentionArgs; import com.linkedin.metadata.entity.retention.BulkApplyRetentionResult; +import com.linkedin.metadata.entity.transactions.AspectsBatch; import com.linkedin.metadata.key.DataHubRetentionKey; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; @@ -111,14 +111,14 @@ public boolean setRetention(@Nullable String entityName, @Nullable String aspect AuditStamp auditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - AspectsBatch batch = AspectsBatch.builder() - .mcps(List.of(keyProposal, aspectProposal), getEntityService().getEntityRegistry()) - .build(); + AspectsBatch batch = buildAspectsBatch(List.of(keyProposal, aspectProposal)); return getEntityService().ingestProposal(batch, auditStamp, false).stream() - .anyMatch(EntityService.IngestResult::isSqlCommitted); + .anyMatch(IngestResult::isSqlCommitted); } + protected abstract AspectsBatch buildAspectsBatch(List mcps); + /** * Delete the retention policy set for given entity and aspect. * diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/UpdateAspectResult.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/UpdateAspectResult.java index 68ecdbd87dd16f..06199814d30ddb 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/UpdateAspectResult.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/UpdateAspectResult.java @@ -3,14 +3,20 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.entity.transactions.AbstractBatchItem; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.SystemMetadata; +import lombok.Builder; import lombok.Value; +import java.util.concurrent.Future; + +@Builder(toBuilder = true) @Value public class UpdateAspectResult { Urn urn; + AbstractBatchItem request; RecordTemplate oldValue; RecordTemplate newValue; SystemMetadata oldSystemMetadata; @@ -18,4 +24,6 @@ public class UpdateAspectResult { MetadataAuditOperation operation; AuditStamp auditStamp; long maxVersion; + boolean processedMCL; + Future mclFuture; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AbstractBatchItem.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/transactions/AbstractBatchItem.java similarity index 91% rename from metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AbstractBatchItem.java rename to metadata-service/services/src/main/java/com/linkedin/metadata/entity/transactions/AbstractBatchItem.java index a51bf0c943687c..03a2b4e2a7f736 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/transactions/AbstractBatchItem.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/transactions/AbstractBatchItem.java @@ -1,11 +1,10 @@ -package com.linkedin.metadata.entity.ebean.transactions; - +package com.linkedin.metadata.entity.transactions; import com.linkedin.common.urn.Urn; import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.template.AspectTemplateEngine; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.SystemMetadata; @@ -13,6 +12,9 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; +import static com.linkedin.metadata.Constants.*; + + public abstract class AbstractBatchItem { // urn an urn associated with the new aspect public abstract Urn getUrn(); @@ -30,11 +32,13 @@ public abstract class AbstractBatchItem { public abstract MetadataChangeProposal getMetadataChangeProposal(); + public abstract void validateUrn(EntityRegistry entityRegistry, Urn urn); + @Nonnull protected static SystemMetadata generateSystemMetadataIfEmpty(@Nullable SystemMetadata systemMetadata) { if (systemMetadata == null) { systemMetadata = new SystemMetadata(); - systemMetadata.setRunId(EntityService.DEFAULT_RUN_ID); + systemMetadata.setRunId(DEFAULT_RUN_ID); systemMetadata.setLastObserved(System.currentTimeMillis()); } return systemMetadata; diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/entity/transactions/AspectsBatch.java b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/transactions/AspectsBatch.java new file mode 100644 index 00000000000000..1d3da081300718 --- /dev/null +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/entity/transactions/AspectsBatch.java @@ -0,0 +1,22 @@ +package com.linkedin.metadata.entity.transactions; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + + +public interface AspectsBatch { + List getItems(); + + default boolean containsDuplicateAspects() { + return getItems().stream().map(i -> String.format("%s_%s", i.getClass().getName(), i.hashCode())) + .distinct().count() != getItems().size(); + } + + default Map> getUrnAspectsMap() { + return getItems().stream() + .map(aspect -> Map.entry(aspect.getUrn().toString(), aspect.getAspectName())) + .collect(Collectors.groupingBy(Map.Entry::getKey, Collectors.mapping(Map.Entry::getValue, Collectors.toSet()))); + } +}