Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logs): add change event details to log context and improve some logs in MCL/MCP #11690

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,5 +458,11 @@ public class Constants {
// DAO
public static final long LATEST_VERSION = 0;

// Logging MDC
public static final String MDC_ENTITY_URN = "entityUrn";
public static final String MDC_ASPECT_NAME = "";
public static final String MDC_ENTITY_TYPE = "entityType";
public static final String MDC_CHANGE_TYPE = "changeType";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import com.linkedin.metadata.utils.SchemaFieldUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
Expand All @@ -56,6 +57,9 @@
@Slf4j
public class UpdateGraphIndicesService implements SearchIndicesService {
private static final String DOWNSTREAM_OF = "DownstreamOf";
private static final String GRAPH_DIFF_MODE_REMOVE_METRIC = "diff_remove_edge";
private static final String GRAPH_DIFF_MODE_ADD_METRIC = "diff_add_edge";
private static final String GRAPH_DIFF_MODE_UPDATE_METRIC = "diff_update_edge";

public static UpdateGraphIndicesService withService(GraphService graphService) {
return new UpdateGraphIndicesService(graphService);
Expand Down Expand Up @@ -382,20 +386,25 @@ private void updateGraphServiceDiff(
final List<Edge> mergedEdges = getMergedEdges(oldEdgeSet, newEdgeSet);

// Remove any old edges that no longer exist first
if (subtractiveDifference.size() > 0) {
if (!subtractiveDifference.isEmpty()) {
log.debug("Removing edges: {}", subtractiveDifference);
MetricUtils.counter(this.getClass(), GRAPH_DIFF_MODE_REMOVE_METRIC)
.inc(subtractiveDifference.size());
subtractiveDifference.forEach(graphService::removeEdge);
}

// Then add new edges
if (additiveDifference.size() > 0) {
if (!additiveDifference.isEmpty()) {
log.debug("Adding edges: {}", additiveDifference);
MetricUtils.counter(this.getClass(), GRAPH_DIFF_MODE_ADD_METRIC)
.inc(additiveDifference.size());
additiveDifference.forEach(graphService::addEdge);
}

// Then update existing edges
if (mergedEdges.size() > 0) {
if (!mergedEdges.isEmpty()) {
log.debug("Updating edges: {}", mergedEdges);
MetricUtils.counter(this.getClass(), GRAPH_DIFF_MODE_UPDATE_METRIC).inc(mergedEdges.size());
mergedEdges.forEach(graphService::upsertEdge);
}
}
Expand Down Expand Up @@ -437,7 +446,7 @@ private void deleteGraphData(

final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
edgeAndRelationTypes.getSecond();
if (urnToRelationshipTypesBeingAdded.size() > 0) {
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
graphService.removeEdgesFromNode(
opContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.structured.StructuredPropertyDefinition;
Expand Down Expand Up @@ -61,6 +62,9 @@ public class UpdateIndicesService implements SearchIndicesService {

@Getter private final boolean structuredPropertiesWriteEnabled;

private static final String DOCUMENT_TRANSFORM_FAILED_METRIC = "document_transform_failed";
private static final String SEARCH_DIFF_MODE_SKIPPED_METRIC = "search_diff_no_changes_detected";

private static final Set<ChangeType> UPDATE_CHANGE_TYPES =
ImmutableSet.of(
ChangeType.CREATE,
Expand Down Expand Up @@ -283,11 +287,13 @@ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem ev
event.getAuditStamp()));
} catch (Exception e) {
log.error(
"Error in getting documents from aspect: {} for aspect {}", e, aspectSpec.getName());
"Error in getting documents for urn: {} from aspect: {}", urn, aspectSpec.getName(), e);
MetricUtils.counter(this.getClass(), DOCUMENT_TRANSFORM_FAILED_METRIC).inc();
return;
}

if (!searchDocument.isPresent()) {
if (searchDocument.isEmpty()) {
log.info("Search document for urn: {} aspect: {} was empty", urn, aspect);
return;
}

Expand All @@ -304,15 +310,22 @@ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem ev
opContext, urn, previousAspect, aspectSpec, false);
} catch (Exception e) {
log.error(
"Error in getting documents from previous aspect state: {} for aspect {}, continuing without diffing.",
e,
aspectSpec.getName());
"Error in getting documents from previous aspect state for urn: {} for aspect {}, continuing without diffing.",
urn,
aspectSpec.getName(),
e);
MetricUtils.counter(this.getClass(), DOCUMENT_TRANSFORM_FAILED_METRIC).inc();
}
}

if (previousSearchDocument.isPresent()) {
if (searchDocument.get().toString().equals(previousSearchDocument.get().toString())) {
// No changes to search document, skip writing no-op update
log.info(
"No changes detected for search document for urn: {} aspect: {}",
urn,
aspectSpec.getName());
MetricUtils.counter(this.getClass(), SEARCH_DIFF_MODE_SKIPPED_METRIC).inc();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} urn=%X{entityUrn:-none} aspect=%X{aspectName:-none} entityType=%X{entityType:-none} changeType=%X{changeType:-none} - %msg%n</pattern>
RyanHolstien marked this conversation as resolved.
Show resolved Hide resolved
</encoder>
<filter class="com.linkedin.metadata.utils.log.LogMessageFilter">
<excluded>scanned from multiple locations</excluded>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package com.linkedin.metadata.kafka;

import static com.linkedin.metadata.Constants.MDC_ASPECT_NAME;
import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_URN;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.MDC;

@Slf4j
public class MCLKafkaListener {
Expand Down Expand Up @@ -66,13 +75,23 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
return;
}

Urn entityUrn = event.getEntityUrn();
String aspectName = event.hasAspectName() ? event.getAspectName() : null;
String entityType = event.hasEntityType() ? event.getEntityType() : null;
ChangeType changeType = event.hasChangeType() ? event.getChangeType() : null;
MDC.put(MDC_ENTITY_URN, Optional.ofNullable(entityUrn).map(Urn::toString).orElse(""));
MDC.put(MDC_ASPECT_NAME, aspectName);
MDC.put(MDC_ENTITY_TYPE, entityType);
MDC.put(
MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse(""));

log.info(
"Invoking MCL hooks for consumer: {} urn: {}, aspect name: {}, entity type: {}, change type: {}",
consumerGroupId,
event.getEntityUrn(),
event.hasAspectName() ? event.getAspectName() : null,
event.hasEntityType() ? event.getEntityType() : null,
event.hasChangeType() ? event.getChangeType() : null);
entityUrn,
aspectName,
entityType,
changeType);

// Here - plug in additional "custom processor hooks"
for (MetadataChangeLogHook hook : this.hooks) {
Expand All @@ -98,6 +117,8 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
"Successfully completed MCL hooks for consumer: {} urn: {}",
consumerGroupId,
event.getEntityUrn());
} finally {
MDC.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%date{ISO8601} [%thread] %-5level %logger{36} urn=%X{entityUrn:-none} aspect=%X{aspectName:-none} entityType=%X{entityType:-none} changeType=%X{changeType:-none} - %msg%n</pattern>
</encoder>
<filter class="com.linkedin.metadata.utils.log.LogMessageFilter">
<excluded>scanned from multiple locations</excluded>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package com.linkedin.metadata.kafka;

import static com.linkedin.metadata.Constants.MDC_ASPECT_NAME;
import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_URN;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory;
import com.linkedin.metadata.EventUtils;
Expand All @@ -27,6 +34,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
Expand Down Expand Up @@ -128,6 +136,17 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
MetadataChangeProposal event = new MetadataChangeProposal();
try {
event = EventUtils.avroToPegasusMCP(record);

Urn entityUrn = event.getEntityUrn();
String aspectName = event.hasAspectName() ? event.getAspectName() : null;
String entityType = event.hasEntityType() ? event.getEntityType() : null;
ChangeType changeType = event.hasChangeType() ? event.getChangeType() : null;
MDC.put(MDC_ENTITY_URN, Optional.ofNullable(entityUrn).map(Urn::toString).orElse(""));
MDC.put(MDC_ASPECT_NAME, aspectName);
MDC.put(MDC_ENTITY_TYPE, entityType);
MDC.put(
MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse(""));

log.debug("MetadataChangeProposal {}", event);
// TODO: Get this from the event itself.
String urn = entityClient.ingestProposal(systemOperationContext, event, false);
Expand All @@ -137,6 +156,8 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
log.error("Message: {}", record);
sendFailedMCP(event, throwable);
}
} finally {
MDC.clear();
}
}

Expand Down
Loading