Skip to content

Commit

Permalink
Merge branch 'master' into ingestion/glue
Browse files Browse the repository at this point in the history
  • Loading branch information
aviv-julienjehannet authored Oct 22, 2024
2 parents fb6a5b8 + 6c55511 commit f21c8bc
Show file tree
Hide file tree
Showing 55 changed files with 998 additions and 284 deletions.
2 changes: 2 additions & 0 deletions datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.util.Configuration;
import config.ConfigurationProvider;
import controllers.SsoCallbackController;
import io.datahubproject.metadata.context.ValidationContext;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

Expand Down Expand Up @@ -187,6 +188,7 @@ protected OperationContext provideOperationContext(
.authorizationContext(AuthorizationContext.builder().authorizer(Authorizer.EMPTY).build())
.searchContext(SearchContext.EMPTY)
.entityRegistryContext(EntityRegistryContext.builder().build(EmptyEntityRegistry.EMPTY))
.validationContext(ValidationContext.builder().alternateValidation(false).build())
.build(systemAuthentication);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.datahubproject.metadata.context.OperationContextConfig;
import io.datahubproject.metadata.context.RetrieverContext;
import io.datahubproject.metadata.context.ServicesRegistryContext;
import io.datahubproject.metadata.context.ValidationContext;
import io.datahubproject.metadata.services.RestrictedService;
import java.util.List;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -161,7 +162,8 @@ protected OperationContext javaSystemOperationContext(
@Nonnull final GraphService graphService,
@Nonnull final SearchService searchService,
@Qualifier("baseElasticSearchComponents")
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components) {
BaseElasticSearchComponentsFactory.BaseElasticSearchComponents components,
@Nonnull final ConfigurationProvider configurationProvider) {

EntityServiceAspectRetriever entityServiceAspectRetriever =
EntityServiceAspectRetriever.builder()
Expand All @@ -186,6 +188,10 @@ protected OperationContext javaSystemOperationContext(
.aspectRetriever(entityServiceAspectRetriever)
.graphRetriever(systemGraphRetriever)
.searchRetriever(searchServiceSearchRetriever)
.build(),
ValidationContext.builder()
.alternateValidation(
configurationProvider.getFeatureFlags().isAlternateMCPValidation())
.build());

entityServiceAspectRetriever.setSystemOperationContext(systemOperationContext);
Expand Down
2 changes: 2 additions & 0 deletions docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ x-datahub-gms-service: &datahub-gms-service
environment: &datahub-gms-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE: ${ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE:-search_config.yaml}
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}
healthcheck:
test: curl -sS --fail http://datahub-gms:${DATAHUB_GMS_PORT:-8080}/health
start_period: 90s
Expand Down Expand Up @@ -182,6 +183,7 @@ x-datahub-mce-consumer-service: &datahub-mce-consumer-service
- ${DATAHUB_LOCAL_MCE_ENV:-empty2.env}
environment: &datahub-mce-consumer-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}

x-datahub-mce-consumer-service-dev: &datahub-mce-consumer-service-dev
<<: *datahub-mce-consumer-service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@AllArgsConstructor
@EqualsAndHashCode
public abstract class PluginSpec {
protected static String ENTITY_WILDCARD = "*";
protected static String WILDCARD = "*";

@Nonnull
public abstract AspectPluginConfig getConfig();
Expand Down Expand Up @@ -50,7 +50,7 @@ protected boolean isEntityAspectSupported(
return (getConfig().getSupportedEntityAspectNames().stream()
.anyMatch(
supported ->
ENTITY_WILDCARD.equals(supported.getEntityName())
WILDCARD.equals(supported.getEntityName())
|| supported.getEntityName().equals(entityName)))
&& isAspectSupported(aspectName);
}
Expand All @@ -59,13 +59,16 @@ protected boolean isAspectSupported(@Nonnull String aspectName) {
return getConfig().getSupportedEntityAspectNames().stream()
.anyMatch(
supported ->
ENTITY_WILDCARD.equals(supported.getAspectName())
WILDCARD.equals(supported.getAspectName())
|| supported.getAspectName().equals(aspectName));
}

protected boolean isChangeTypeSupported(@Nullable ChangeType changeType) {
return (changeType == null && getConfig().getSupportedOperations().isEmpty())
|| getConfig().getSupportedOperations().stream()
.anyMatch(supported -> supported.equalsIgnoreCase(String.valueOf(changeType)));
.anyMatch(
supported ->
WILDCARD.equals(supported)
|| supported.equalsIgnoreCase(String.valueOf(changeType)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -65,7 +66,7 @@ public static class EntitySystemAspect implements SystemAspect {
@Nullable private final RecordTemplate recordTemplate;

@Nonnull private final EntitySpec entitySpec;
@Nonnull private final AspectSpec aspectSpec;
@Nullable private final AspectSpec aspectSpec;

@Nonnull
public String getUrnRaw() {
Expand Down Expand Up @@ -151,15 +152,19 @@ private EntityAspect.EntitySystemAspect build() {

public EntityAspect.EntitySystemAspect build(
@Nonnull EntitySpec entitySpec,
@Nonnull AspectSpec aspectSpec,
@Nullable AspectSpec aspectSpec,
@Nonnull EntityAspect entityAspect) {
this.entityAspect = entityAspect;
this.urn = UrnUtils.getUrn(entityAspect.getUrn());
this.aspectSpec = aspectSpec;
if (entityAspect.getMetadata() != null) {
this.recordTemplate =
RecordUtils.toRecordTemplate(
aspectSpec.getDataTemplateClass(), entityAspect.getMetadata());
(Class<? extends RecordTemplate>)
(aspectSpec == null
? GenericAspect.class
: aspectSpec.getDataTemplateClass()),
entityAspect.getMetadata());
}

return new EntitySystemAspect(entityAspect, urn, recordTemplate, entitySpec, aspectSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
Expand All @@ -11,6 +12,7 @@
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.Collection;
Expand Down Expand Up @@ -114,39 +116,33 @@ private Stream<? extends BatchItem> proposedItemsToChangeItemStream(List<MCPItem
proposedItem.getChangeType(),
proposedItem.getUrn(),
proposedItem.getAspectName())))
.map(
mcpItem -> {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever().getEntityRegistry());
}
return ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever());
});
.map(mcpItem -> patchDiscriminator(mcpItem, retrieverContext.getAspectRetriever()));
List<MCPItem> mutatedItems =
applyProposalMutationHooks(proposedItems, retrieverContext).collect(Collectors.toList());
Stream<? extends BatchItem> proposedItemsToChangeItems =
mutatedItems.stream()
.filter(mcpItem -> mcpItem.getMetadataChangeProposal() != null)
// Filter on proposed items again to avoid applying builder to Patch Item side effects
.filter(mcpItem -> mcpItem instanceof ProposedItem)
.map(
mcpItem ->
ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
retrieverContext.getAspectRetriever()));
.map(mcpItem -> patchDiscriminator(mcpItem, retrieverContext.getAspectRetriever()));
Stream<? extends BatchItem> sideEffectItems =
mutatedItems.stream().filter(mcpItem -> !(mcpItem instanceof ProposedItem));
Stream<? extends BatchItem> combinedChangeItems =
Stream.concat(proposedItemsToChangeItems, unmutatedItems);
return Stream.concat(combinedChangeItems, sideEffectItems);
}

private static BatchItem patchDiscriminator(MCPItem mcpItem, AspectRetriever aspectRetriever) {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),

Check warning on line 138 in metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java

View workflow job for this annotation

GitHub Actions / qodana

Nullability and data flow problems

Argument `mcpItem.getMetadataChangeProposal()` might be null
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
}
return ChangeItemImpl.ChangeItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(), mcpItem.getAuditStamp(), aspectRetriever);

Check warning on line 143 in metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java

View workflow job for this annotation

GitHub Actions / qodana

Nullability and data flow problems

Argument `mcpItem.getMetadataChangeProposal()` might be null
}

public static class AspectsBatchImplBuilder {
/**
* Just one aspect record template
Expand All @@ -164,13 +160,33 @@ public AspectsBatchImplBuilder mcps(
Collection<MetadataChangeProposal> mcps,
AuditStamp auditStamp,
RetrieverContext retrieverContext) {
return mcps(mcps, auditStamp, retrieverContext, false);
}

public AspectsBatchImplBuilder mcps(
Collection<MetadataChangeProposal> mcps,
AuditStamp auditStamp,
RetrieverContext retrieverContext,
boolean alternateMCPValidation) {

retrieverContext(retrieverContext);
items(
mcps.stream()
.map(
mcp -> {
try {
if (alternateMCPValidation) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(mcp.getEntityType());
return ProposedItem.builder()
.metadataChangeProposal(mcp)
.entitySpec(entitySpec)
.auditStamp(auditStamp)
.build();
}
if (mcp.getChangeType().equals(ChangeType.PATCH)) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public static PatchItemImpl build(
.build(entityRegistry);
}

private static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
public static JsonPatch convertToJsonPatch(MetadataChangeProposal mcp) {
JsonNode json;
try {
return Json.createPatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Builder;
Expand Down Expand Up @@ -83,4 +85,18 @@ public SystemMetadata getSystemMetadata() {
public ChangeType getChangeType() {
return metadataChangeProposal.getChangeType();
}

public static class ProposedItemBuilder {
public ProposedItem build() {
// Ensure systemMetadata
return new ProposedItem(
Objects.requireNonNull(this.metadataChangeProposal)
.setSystemMetadata(
SystemMetadataUtils.generateSystemMetadataIfEmpty(
this.metadataChangeProposal.getSystemMetadata())),
this.auditStamp,
this.entitySpec,
this.aspectSpec);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package com.linkedin.metadata.aspect.hooks;

import static com.linkedin.events.metadata.ChangeType.CREATE;
import static com.linkedin.events.metadata.ChangeType.CREATE_ENTITY;
import static com.linkedin.events.metadata.ChangeType.UPDATE;
import static com.linkedin.events.metadata.ChangeType.UPSERT;

import com.datahub.util.exception.ModelConversionException;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.transform.filter.request.MaskTree;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.RetrieverContext;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig;
Expand All @@ -14,6 +20,7 @@
import com.linkedin.mxe.GenericAspect;
import com.linkedin.restli.internal.server.util.RestUtils;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
Expand All @@ -27,6 +34,11 @@
@Getter
@Accessors(chain = true)
public class IgnoreUnknownMutator extends MutationHook {
private static final Set<String> SUPPORTED_MIME_TYPES =
Set.of("application/json", "application/json-patch+json");
private static final Set<ChangeType> MUTATION_TYPES =
Set.of(CREATE, CREATE_ENTITY, UPSERT, UPDATE);

@Nonnull private AspectPluginConfig config;

@Override
Expand All @@ -42,8 +54,8 @@ protected Stream<MCPItem> proposalMutation(
item.getAspectSpec().getName());
return false;
}
if (!"application/json"
.equals(item.getMetadataChangeProposal().getAspect().getContentType())) {
if (!SUPPORTED_MIME_TYPES.contains(
item.getMetadataChangeProposal().getAspect().getContentType())) {
log.warn(
"Dropping unknown content type {} for aspect {} on entity {}",
item.getMetadataChangeProposal().getAspect().getContentType(),
Expand All @@ -55,25 +67,27 @@ protected Stream<MCPItem> proposalMutation(
})
.peek(
item -> {
try {
AspectSpec aspectSpec = item.getEntitySpec().getAspectSpec(item.getAspectName());
GenericAspect aspect = item.getMetadataChangeProposal().getAspect();
RecordTemplate recordTemplate =
GenericRecordUtils.deserializeAspect(
aspect.getValue(), aspect.getContentType(), aspectSpec);
if (MUTATION_TYPES.contains(item.getChangeType())) {
try {
ValidationApiUtils.validateOrThrow(recordTemplate);
} catch (ValidationException | ModelConversionException e) {
log.warn(
"Failed to validate aspect. Coercing aspect {} on entity {}",
item.getAspectName(),
item.getEntitySpec().getName());
RestUtils.trimRecordTemplate(recordTemplate, new MaskTree(), false);
item.getMetadataChangeProposal()
.setAspect(GenericRecordUtils.serializeAspect(recordTemplate));
AspectSpec aspectSpec = item.getEntitySpec().getAspectSpec(item.getAspectName());
GenericAspect aspect = item.getMetadataChangeProposal().getAspect();
RecordTemplate recordTemplate =
GenericRecordUtils.deserializeAspect(
aspect.getValue(), aspect.getContentType(), aspectSpec);
try {
ValidationApiUtils.validateOrThrow(recordTemplate);
} catch (ValidationException | ModelConversionException e) {
log.warn(
"Failed to validate aspect. Coercing aspect {} on entity {}",
item.getAspectName(),
item.getEntitySpec().getName());
RestUtils.trimRecordTemplate(recordTemplate, new MaskTree(), false);
item.getMetadataChangeProposal()
.setAspect(GenericRecordUtils.serializeAspect(recordTemplate));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
Expand Down
Loading

0 comments on commit f21c8bc

Please sign in to comment.