diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index f82079ad45..f28e58c50c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -54,7 +54,6 @@ import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_CONSUMER_POOL_SIZE_FOR_AA_WC_LEADER; import static com.linkedin.venice.ConfigKeys.SERVER_DEDICATED_DRAINER_FOR_SORTED_INPUT_ENABLED; -import static com.linkedin.venice.ConfigKeys.SERVER_DELAY_REPORT_READY_TO_SERVE_MS; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_HEALTH_CHECK_SERVICE_ENABLED; @@ -364,7 +363,6 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final KafkaConsumerService.ConsumerAssignmentStrategy sharedConsumerAssignmentStrategy; private final int consumerPoolSizePerKafkaCluster; private final boolean leakedResourceCleanupEnabled; - private final long delayReadyToServeMS; private final IngestionMode ingestionMode; private final int ingestionServicePort; @@ -612,7 +610,6 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map> oldValueProvider = Lazy.of( @@ -489,7 +489,7 @@ protected void processMessageAndMaybeProduceToKafka( break; default: throw new VeniceMessageException( - consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType); + ingestionTaskName + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType); } if (mergeConflictResult.isUpdateIgnored()) { @@ -838,7 +838,8 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump */ if (shouldNewLeaderSwitchToRemoteConsumption(partitionConsumptionState)) { partitionConsumptionState.setConsumeRemotely(true); - LOGGER.info("{} enabled remote consumption from topic {} partition {}", consumerTaskId, leaderTopic, partition); + LOGGER + .info("{} enabled remote consumption from topic {} partition {}", ingestionTaskName, leaderTopic, partition); } partitionConsumptionState.setLeaderFollowerState(LEADER); @@ -850,7 +851,7 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump LOGGER.info( "{} is promoted to leader for partition {} and it is going to start consuming from " + "topic {} with offset by Kafka URL mapping {}", - consumerTaskId, + ingestionTaskName, partition, leaderTopic, leaderOffsetByKafkaURL); @@ -864,7 +865,7 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump LOGGER.info( "{}, as a leader, started consuming from topic {} partition {} with offset by Kafka URL mapping {}", - consumerTaskId, + ingestionTaskName, leaderTopic, partition, leaderOffsetByKafkaURL); @@ -919,7 +920,7 @@ protected void leaderExecuteTopicSwitch( rewindStartTimestamp = calculateRewindStartTime(partitionConsumptionState); LOGGER.info( "{} leader calculated rewindStartTimestamp {} for {}", - consumerTaskId, + ingestionTaskName, rewindStartTimestamp, sourceTopicPartition); } else { @@ -998,7 +999,7 @@ protected void leaderExecuteTopicSwitch( partitionConsumptionState.setConsumeRemotely(true); LOGGER.info( "{} enabled remote consumption and switch to topic {} partition {} with offset by Kafka URL mapping {}", - consumerTaskId, + ingestionTaskName, newSourceTopic, sourceTopicPartition, upstreamOffsetsByKafkaURLs); @@ -1031,7 +1032,7 @@ protected void leaderExecuteTopicSwitch( syncConsumedUpstreamRTOffsetMapIfNeeded(partitionConsumptionState, upstreamOffsetsByKafkaURLs); LOGGER.info( "{} leader successfully switch feed topic from {} to {} on partition {} with offset by Kafka URL mapping {}", - consumerTaskId, + ingestionTaskName, currentLeaderTopic, newSourceTopic, partition, @@ -1094,7 +1095,7 @@ protected boolean processTopicSwitch( rewindStartTimestamp = calculateRewindStartTime(partitionConsumptionState); LOGGER.info( "{} leader calculated rewindStartTimestamp {} for topic {} partition {}", - consumerTaskId, + ingestionTaskName, rewindStartTimestamp, newSourceTopicName, newSourceTopicPartition); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java index 48b0ff3a26..5202407759 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java @@ -19,6 +19,7 @@ import com.linkedin.venice.service.AbstractVeniceService; import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.utils.DaemonThreadFactory; +import com.linkedin.venice.utils.RedundantExceptionFilter; import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; @@ -47,6 +48,8 @@ */ public class AggKafkaConsumerService extends AbstractVeniceService { private static final Logger LOGGER = LogManager.getLogger(AggKafkaConsumerService.class); + private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER = + RedundantExceptionFilter.getRedundantExceptionFilter(); private final PubSubConsumerAdapterFactory consumerFactory; private final VeniceServerConfig serverConfig; @@ -73,6 +76,9 @@ public class AggKafkaConsumerService extends AbstractVeniceService { private final Function isAAOrWCEnabledFunc; private final ReadOnlyStoreRepository metadataRepository; + private final static String STUCK_CONSUMER_MSG = + "Didn't find any suspicious ingestion task, and please contact developers to investigate it further"; + public AggKafkaConsumerService( final PubSubConsumerAdapterFactory consumerFactory, TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier, @@ -241,8 +247,9 @@ protected static Runnable getStuckConsumerDetectionAndRepairRunnable( } }); if (!hasRepairedSomeIngestionTask.get()) { - LOGGER.error( - "Didn't find any suspicious ingestion task, and please contact developers to investigate it further"); + if (!REDUNDANT_LOGGING_FILTER.isRedundantException(STUCK_CONSUMER_MSG)) { + LOGGER.warn(STUCK_CONSUMER_MSG); + } stuckConsumerRepairStats.recordRepairFailure(); } }; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java index 6c8ce12fc7..dcbfe193d2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ConsumptionTask.java @@ -90,6 +90,7 @@ public void run() { Set topicPartitionsToUnsub = new HashSet<>(); int payloadBytesConsumedInOnePoll; int polledPubSubMessagesCount = 0; + Map storePollCounterMap = new HashMap<>(); try { while (running) { try { @@ -126,7 +127,6 @@ public void run() { payloadBytesConsumedInOnePoll = 0; polledPubSubMessagesCount = 0; beforeProducingToWriteBufferTimestamp = System.currentTimeMillis(); - Map storePollCounterMap = new HashMap<>(); for (Map.Entry>> entry: polledPubSubMessages .entrySet()) { PubSubTopicPartition pubSubTopicPartition = entry.getKey(); @@ -164,6 +164,7 @@ public void run() { recordsThrottler.accept(polledPubSubMessagesCount); cleaner.unsubscribe(topicPartitionsToUnsub); aggStats.recordTotalDetectedNoRunningIngestionTopicPartitionNum(topicPartitionsToUnsub.size()); + storePollCounterMap.clear(); } else { // No result came back, here will add some delay addSomeDelay = true; @@ -230,7 +231,7 @@ void removeDataReceiver(PubSubTopicPartition topicPartition) { /** * This class is used to count the number of messages and the byte size of the messages for a given store per poll. */ - class StorePollCounter { + static class StorePollCounter { protected int msgCount; protected int byteSize; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index e70e8b37f6..a0a48e3b3a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -506,7 +506,6 @@ public void handleStoreDeleted(Store store) { .setServerConfig(serverConfig) .setDiskUsage(diskUsage) .setAggKafkaConsumerService(aggKafkaConsumerService) - .setStartReportingReadyToServeTimestamp(System.currentTimeMillis() + serverConfig.getDelayReadyToServeMS()) .setPartitionStateSerializer(partitionStateSerializer) .setIsDaVinciClient(isDaVinciClient) .setRemoteIngestionRepairService(remoteIngestionRepairService) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 21a8dde37a..96f61fe452 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -395,12 +395,12 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws } if (store.isMigrationDuplicateStore()) { partitionConsumptionState.setLeaderFollowerState(PAUSE_TRANSITION_FROM_STANDBY_TO_LEADER); - LOGGER.info("{} for partition {} is paused transition from STANDBY to LEADER", consumerTaskId, partition); + LOGGER.info("{} for partition {} is paused transition from STANDBY to LEADER", ingestionTaskName, partition); } else { // Mark this partition in the middle of STANDBY to LEADER transition partitionConsumptionState.setLeaderFollowerState(IN_TRANSITION_FROM_STANDBY_TO_LEADER); - LOGGER.info("{} for partition {} is in transition from STANDBY to LEADER", consumerTaskId, partition); + LOGGER.info("{} for partition {} is in transition from STANDBY to LEADER", ingestionTaskName, partition); } break; case LEADER_TO_STANDBY: @@ -455,7 +455,7 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws partitionConsumptionState.setConsumeRemotely(false); LOGGER.info( "{} disabled remote consumption from topic {} partition {}", - consumerTaskId, + ingestionTaskName, leaderTopic, partition); // Followers always consume local VT and should not skip kafka message @@ -468,7 +468,7 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset(), localKafkaServer); - LOGGER.info("{} demoted to standby for partition {}", consumerTaskId, partition); + LOGGER.info("{} demoted to standby for partition {}", ingestionTaskName, partition); } else { partitionConsumptionState.setLeaderFollowerState(STANDBY); updateLeaderTopicOnFollower(partitionConsumptionState); @@ -520,7 +520,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { partitionConsumptionState.setLeaderFollowerState(IN_TRANSITION_FROM_STANDBY_TO_LEADER); LOGGER.info( "{} became in transition to leader for partition {}", - consumerTaskId, + ingestionTaskName, partitionConsumptionState.getPartition()); } break; @@ -529,7 +529,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { if (canSwitchToLeaderTopic(partitionConsumptionState)) { LOGGER.info( "{} start promoting to leader for partition {} unsubscribing from current topic: {}", - consumerTaskId, + ingestionTaskName, partition, kafkaVersionTopic); /** @@ -541,7 +541,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { LOGGER.info( "{} start promoting to leader for partition {}, unsubscribed from current topic: {}", - consumerTaskId, + ingestionTaskName, partition, kafkaVersionTopic); OffsetRecord offsetRecord = partitionConsumptionState.getOffsetRecord(); @@ -589,7 +589,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { PubSubTopic currentLeaderTopic = partitionConsumptionState.getOffsetRecord().getLeaderTopic(pubSubTopicRepository); if (currentLeaderTopic == null) { - String errorMsg = consumerTaskId + " Missing leader topic for actual leader. OffsetRecord: " + String errorMsg = ingestionTaskName + " Missing leader topic for actual leader. OffsetRecord: " + partitionConsumptionState.getOffsetRecord().toSimplifiedString(); LOGGER.error(errorMsg); throw new VeniceException(errorMsg); @@ -608,7 +608,7 @@ protected void checkLongRunningTaskState() throws InterruptedException { partitionConsumptionState.setConsumeRemotely(false); LOGGER.info( "{} disabled remote consumption from topic {} partition {}", - consumerTaskId, + ingestionTaskName, currentLeaderTopic, partition); if (isDataRecovery && partitionConsumptionState.isBatchOnly() && !versionTopic.equals(currentLeaderTopic)) { @@ -772,7 +772,7 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump partitionConsumptionState.setConsumeRemotely(true); LOGGER.info( "{} enabled remote consumption from topic {} partition {}", - consumerTaskId, + ingestionTaskName, offsetRecord.getLeaderTopic(pubSubTopicRepository), partition); } @@ -799,7 +799,7 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump LOGGER.info( "{} is promoted to leader for partition {} and it is going to start consuming from " + "{} at offset {}; source Kafka url: {}; remote consumption flag: {}", - consumerTaskId, + ingestionTaskName, partition, leaderTopicPartition, leaderStartOffset, @@ -813,7 +813,7 @@ protected void startConsumingAsLeader(PartitionConsumptionState partitionConsump LOGGER.info( "{}, as a leader, started consuming from {} at offset {}", - consumerTaskId, + ingestionTaskName, leaderTopicPartition, leaderStartOffset); } @@ -876,7 +876,7 @@ protected void leaderExecuteTopicSwitch( // subscribe to the new upstream if (isNativeReplicationEnabled && !newSourceKafkaServer.equals(localKafkaServer)) { partitionConsumptionState.setConsumeRemotely(true); - LOGGER.info("{} enabled remote consumption from {}", consumerTaskId, newSourceTopicPartition); + LOGGER.info("{} enabled remote consumption from {}", ingestionTaskName, newSourceTopicPartition); } partitionConsumptionState.getOffsetRecord().setLeaderTopic(newSourceTopic); partitionConsumptionState.getOffsetRecord() @@ -895,7 +895,7 @@ protected void leaderExecuteTopicSwitch( LOGGER.info( "{} leader successfully switch feed topic from {} to {} offset {} partition {}", - consumerTaskId, + ingestionTaskName, currentLeaderTopic, newSourceTopic, upstreamStartOffset, @@ -978,17 +978,16 @@ protected Set getRealTimeDataSourceKafkaAddress(PartitionConsumptionStat } /** - * This method get the timestamp of the "last" message in topic/partition; notice that when the function - * returns, new messages can be appended to the partition already, so it's not guaranteed that this timestamp - * is from the last message. + * This method gets the timestamp when consumption thread is about to process the "last" message in topic/partition; + * note that when the function returns, new messages can be appended to the partition already, so it's not guaranteed + * that this timestamp is from the last message. + * + * See {@link PartitionConsumptionState#latestMessageConsumedTimestampInMs} for details. */ private long getLastConsumedMessageTimestamp(int partition) { - - /** - * Ingestion thread would update the last consumed message timestamp for the corresponding partition. - */ + // Consumption thread would update the last consumed message timestamp for the corresponding partition. PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(partition); - return partitionConsumptionState.getLatestMessageConsumptionTimestampInMs(); + return partitionConsumptionState.getLatestMessageConsumedTimestampInMs(); } protected boolean shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionState partitionConsumptionState) { @@ -1390,7 +1389,7 @@ private void updateOffsetsAsRemoteConsumeLeader( } } else { // Ideally this should never happen. - String msg = consumerTaskId + " UpdateOffset: Produced record should not be null in LEADER for: " + String msg = ingestionTaskName + " UpdateOffset: Produced record should not be null in LEADER for: " + consumerRecord.getTopicPartition(); if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { LOGGER.warn(msg); @@ -1451,7 +1450,7 @@ protected void checkAndHandleUpstreamOffsetRewind( * otherwise, don't fail the push job, it's streaming ingestion now so it's serving online traffic already. */ String logMsg = String.format( - consumerTaskId + " partition %d received message with upstreamOffset: %d;" + ingestionTaskName + " partition %d received message with upstreamOffset: %d;" + " but recorded upstreamOffset is: %d. Received message producer GUID: %s; Recorded producer GUID: %s;" + " Received message producer host: %s; Recorded producer host: %s." + " Multiple leaders are producing. ", @@ -1512,7 +1511,7 @@ protected void checkAndHandleUpstreamOffsetRewind( break; } } catch (Exception e) { - LOGGER.warn("{} failed comparing the rewind message with the actual value in Venice", consumerTaskId, e); + LOGGER.warn("{} failed comparing the rewind message with the actual value in Venice", ingestionTaskName, e); } if (lossy) { @@ -1634,7 +1633,7 @@ protected long measureRTOffsetLagForMultiRegions( throw new VeniceException( String.format( "%s Multi colo RT offset lag calculation is not supported for non Active-Active stores", - consumerTaskId)); + ingestionTaskName)); } @Override @@ -1765,8 +1764,9 @@ protected boolean shouldProcessRecord(PubSubMessage= record.getOffset()) { - String message = consumerTaskId + " Current L/F state:" + partitionConsumptionState.getLeaderFollowerState() - + "; The record was already processed partition " + subPartition; + String message = + ingestionTaskName + " Current L/F state:" + partitionConsumptionState.getLeaderFollowerState() + + "; The record was already processed partition " + subPartition; if (!REDUNDANT_LOGGING_FILTER.isRedundantException(message)) { LOGGER.info("{}; LastKnown {}; Current {}", message, lastOffset, record.getOffset()); } @@ -1945,7 +1946,7 @@ protected void logLag( String lagLogFooter) { LOGGER.info( "{} [{} lag] partition {} is {}. {}Lag: [{}] {} Threshold [{}]. {}", - this.consumerTaskId, + this.ingestionTaskName, lagType.prettyString(), partition, (isLagAcceptable ? "not lagging" : "lagging"), @@ -2120,7 +2121,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( MessageType msgType = MessageType.valueOf(kafkaValue); if (msgType == MessageType.UPDATE && !produceToLocalKafka) { throw new VeniceMessageException( - consumerTaskId + " hasProducedToKafka: Received UPDATE message in non-leader for: " + ingestionTaskName + " hasProducedToKafka: Received UPDATE message in non-leader for: " + consumerRecord.getTopicPartition() + " Offset " + consumerRecord.getOffset()); } else if (msgType == MessageType.CONTROL_MESSAGE) { ControlMessage controlMessage = (ControlMessage) kafkaValue.payloadUnion; @@ -2180,7 +2181,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( * messages to disk, and potentially rewind a k/v pair to an old value. */ divErrorMetricCallback.accept(e); - LOGGER.debug("{} : Skipping a duplicate record at offset: {}", consumerTaskId, consumerRecord.getOffset()); + LOGGER.debug("{} : Skipping a duplicate record at offset: {}", ingestionTaskName, consumerRecord.getOffset()); return DelegateConsumerRecordResult.DUPLICATE_MESSAGE; } @@ -2366,7 +2367,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( if (!isSegmentControlMsg(controlMessageType)) { LOGGER.info( "{} hasProducedToKafka: {}; ControlMessage: {}; topicPartition: {}; offset: {}", - consumerTaskId, + ingestionTaskName, producedFinally, controlMessageType.name(), consumerRecord.getTopicPartition(), @@ -2374,7 +2375,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( } } else if (kafkaValue == null) { throw new VeniceMessageException( - consumerTaskId + " hasProducedToKafka: Given null Venice Message. TopicPartition: " + ingestionTaskName + " hasProducedToKafka: Given null Venice Message. TopicPartition: " + consumerRecord.getTopicPartition() + " Offset " + consumerRecord.getOffset()); } else { // This function may modify the original record in KME and it is unsafe to use the payload from KME directly @@ -2391,7 +2392,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( return DelegateConsumerRecordResult.PRODUCED_TO_KAFKA; } catch (Exception e) { throw new VeniceException( - consumerTaskId + " hasProducedToKafka: exception for message received from: " + ingestionTaskName + " hasProducedToKafka: exception for message received from: " + consumerRecord.getTopicPartition() + ", Offset: " + consumerRecord.getOffset() + ". Bubbling up.", e); } @@ -2997,7 +2998,7 @@ protected void processMessageAndMaybeProduceToKafka( default: throw new VeniceMessageException( - consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType); + ingestionTaskName + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType); } } @@ -3298,7 +3299,7 @@ protected long measureRTOffsetLagForSingleRegion( if (shouldLog) { LOGGER.info( "{} partition {} RT lag offset for {} is: Latest RT offset [{}] - persisted offset [{}] = Lag [{}]", - consumerTaskId, + ingestionTaskName, pcs.getPartition(), sourceRealTimeTopicKafkaURL, lastOffsetInRealTimeTopic, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java index b1dc9273f0..8a8f654108 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducerCallback.java @@ -212,7 +212,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { boolean endOfPushReceived = partitionConsumptionState.isEndOfPushReceived(); LOGGER.error( "{} received exception in kafka callback thread; EOP received: {}, {}, Offset: {}", - ingestionTask.consumerTaskId, + ingestionTask.ingestionTaskName, endOfPushReceived, sourceConsumerRecord.getTopicPartition(), sourceConsumerRecord.getOffset(), diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 17f811f934..1856d44c93 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -88,7 +88,21 @@ public class PartitionConsumptionState { private CheckSum expectedSSTFileChecksum; - private long latestMessageConsumptionTimestampInMs; + /** + * The timestamp when consumption thread is about to process the "latest" message in topic/partition; + * Note that when retrieve the timestamp, the new message can be appended to the partition already, because it's + * only update when this "latest" message is already produced to kafka or queued to drainer. + * so it's not guaranteed that this timestamp is from the last message. + */ + private long latestMessageConsumedTimestampInMs; + + /** + * The timestamp when consumption thread is about to process the "latest" message in topic/partition; + * The difference between this field and {@link #latestMessageConsumedTimestampInMs} is that this field is updated + * whenever there's a new record for this topic/partition, while the other field is updated only when the record is + * fully processed. + */ + private long latestPolledMessageTimestampInMs; /** * An in-memory state to record the ingestion start time of this partition; in O/O state model, the push timeout clock @@ -214,11 +228,13 @@ public PartitionConsumptionState(int partition, int amplificationFactor, OffsetR this.leaderFollowerState = LeaderFollowerStateType.STANDBY; this.expectedSSTFileChecksum = null; /** - * Initialize the latest consumption time with current time; otherwise, it's 0 by default + * Initialize the latest consumed time with current time; otherwise, it's 0 by default * and leader will be promoted immediately. */ - this.latestMessageConsumptionTimestampInMs = System.currentTimeMillis(); - this.consumptionStartTimeInMs = System.currentTimeMillis(); + long currentTimeInMs = System.currentTimeMillis(); + this.latestMessageConsumedTimestampInMs = currentTimeInMs; + this.latestPolledMessageTimestampInMs = currentTimeInMs; + this.consumptionStartTimeInMs = currentTimeInMs; // Restore previous status from offset record. for (CharSequence status: offsetRecord.getSubPartitionStatus().keySet()) { @@ -481,12 +497,20 @@ public byte[] getExpectedChecksum() { return this.expectedSSTFileChecksum == null ? null : this.expectedSSTFileChecksum.getCheckSum(); } - public long getLatestMessageConsumptionTimestampInMs() { - return latestMessageConsumptionTimestampInMs; + public long getLatestMessageConsumedTimestampInMs() { + return latestMessageConsumedTimestampInMs; + } + + public void setLatestMessageConsumedTimestampInMs(long consumedTimestampInMs) { + this.latestMessageConsumedTimestampInMs = consumedTimestampInMs; + } + + public long getLatestPolledMessageTimestampInMs() { + return latestPolledMessageTimestampInMs; } - public void setLatestMessageConsumptionTimestampInMs(long consumptionTimestampInMs) { - this.latestMessageConsumptionTimestampInMs = consumptionTimestampInMs; + public void setLatestPolledMessageTimestampInMs(long timestampInMs) { + this.latestPolledMessageTimestampInMs = timestampInMs; } public long getConsumptionStartTimeInMs() { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 73d9def7c5..060bacca5c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -179,7 +179,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final int versionNumber; protected final ReadOnlySchemaRepository schemaRepository; protected final ReadOnlyStoreRepository storeRepository; - protected final String consumerTaskId; + protected final String ingestionTaskName; protected final Properties kafkaProps; protected final AtomicBoolean isRunning; protected final AtomicBoolean emitMetrics; // TODO: remove this once we migrate to versioned stats @@ -250,12 +250,6 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final AggKafkaConsumerService aggKafkaConsumerService; - /** - * Please refer to {@link com.linkedin.venice.ConfigKeys#SERVER_DELAY_REPORT_READY_TO_SERVE_MS} to - * find more details. - */ - private final long startReportingReadyToServeTimestamp; - protected int writeComputeFailureCode = 0; private final InternalAvroSpecificSerializer partitionStateSerializer; @@ -366,7 +360,7 @@ public StoreIngestionTask( this.kafkaVersionTopic, KafkaDataIntegrityValidator.DISABLED, builder.getServerConfig().getDivProducerStateMaxAgeMs()); - this.consumerTaskId = String.format(CONSUMER_TASK_ID_FORMAT, kafkaVersionTopic); + this.ingestionTaskName = String.format(CONSUMER_TASK_ID_FORMAT, kafkaVersionTopic); this.topicManagerRepository = builder.getTopicManagerRepository(); this.cachedPubSubMetadataGetter = new CachedPubSubMetadataGetter(storeConfig.getTopicOffsetCheckIntervalMs()); @@ -395,7 +389,6 @@ public StoreIngestionTask( this.aggKafkaConsumerService = Objects.requireNonNull(builder.getAggKafkaConsumerService()); this.errorPartitionId = errorPartitionId; - this.startReportingReadyToServeTimestamp = builder.getStartReportingReadyToServeTimestamp(); this.isWriteComputationEnabled = store.isWriteComputationEnabled(); @@ -899,7 +892,7 @@ protected boolean isReadyToServe(PartitionConsumptionState partitionConsumptionS if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msgIdentifier)) { LOGGER.info( "{} [Time lag] Topic {} doesn't exist; ignoring time lag.", - consumerTaskId, + ingestionTaskName, lagMeasurementTopic); } } else { @@ -912,14 +905,14 @@ protected boolean isReadyToServe(PartitionConsumptionState partitionConsumptionS if (latestProducerTimestampInTopic < 0) { LOGGER.info( "{} [Time lag] Topic {} is empty or all messages have been truncated; ignoring time lag.", - consumerTaskId, + ingestionTaskName, lagMeasurementTopic); } else { LOGGER.info( "{} [Time lag] Producer timestamp of last message in topic {} " + "partition {}: {}, which is smaller or equal than the known latest producer time: {}. " + "Consumption lag is caught up already.", - consumerTaskId, + ingestionTaskName, lagMeasurementTopic, partitionId, latestProducerTimestampInTopic, @@ -1047,8 +1040,11 @@ protected void produceToStoreBufferServiceOrKafka( long currentTimeForMetricsMs = System.currentTimeMillis(); for (PubSubMessage record: records) { long beforeProcessingRecordTimestampNs = System.nanoTime(); + PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(subPartition); + if (partitionConsumptionState != null) { + partitionConsumptionState.setLatestPolledMessageTimestampInMs(currentTimeForMetricsMs); + } if (!shouldProcessRecord(record, subPartition)) { - PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(subPartition); if (partitionConsumptionState != null) { partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(kafkaUrl, record.getOffset()); } @@ -1103,13 +1099,13 @@ protected void produceToStoreBufferServiceOrKafka( break; default: throw new VeniceException( - consumerTaskId + " received unknown DelegateConsumerRecordResult enum for " + record.getTopicPartition()); + ingestionTaskName + " received unknown DelegateConsumerRecordResult enum for " + + record.getTopicPartition()); } totalBytesRead += record.getPayloadSize(); - // Update the latest message consumption time - PartitionConsumptionState partitionConsumptionState = partitionConsumptionStateMap.get(subPartition); + // Update the latest message consumed time if (partitionConsumptionState != null) { - partitionConsumptionState.setLatestMessageConsumptionTimestampInMs(currentTimeForMetricsMs); + partitionConsumptionState.setLatestMessageConsumedTimestampInMs(currentTimeForMetricsMs); } } @@ -1242,8 +1238,11 @@ protected void checkIngestionProgress(Store store) throws InterruptedException { * {@link IllegalStateException} with empty subscription. */ if (!consumerHasAnySubscription()) { + // TODO add the idle metrics here. + // TODO Maybe add some delays or whatsever. + // why the 100 idle count is 1 sec? readCycleDelayMs is 1 sec if (++idleCounter <= getMaxIdleCounter()) { - String message = consumerTaskId + " Not subscribed to any partitions "; + String message = ingestionTaskName + " Not subscribed to any partitions "; if (!REDUNDANT_LOGGING_FILTER.isRedundantException(message)) { LOGGER.info(message); } @@ -1252,7 +1251,8 @@ protected void checkIngestionProgress(Store store) throws InterruptedException { } else { if (!hybridStoreConfig.isPresent() && serverConfig.isUnsubscribeAfterBatchpushEnabled() && subscribedCount != 0 && subscribedCount == forceUnSubscribedCount) { - String msg = consumerTaskId + " Going back to sleep as consumption has finished and topics are unsubscribed"; + String msg = + ingestionTaskName + " Going back to sleep as consumption has finished and topics are unsubscribed"; if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { LOGGER.info(msg); } @@ -1267,6 +1267,7 @@ protected void checkIngestionProgress(Store store) throws InterruptedException { idleCounter = 0; maybeUnsubscribeCompletedPartitions(store); recordQuotaMetrics(store); + recordMaxIdleTime(); /** * While using the shared consumer, we still need to check hybrid quota here since the actual disk usage could change @@ -1306,6 +1307,18 @@ private void maybeUnsubscribeCompletedPartitions(Store store) { } } + private void recordMaxIdleTime() { + if (emitMetrics.get()) { + long maxIdleTime = 0; + for (PartitionConsumptionState state: partitionConsumptionStateMap.values()) { + if (state != null) { + maxIdleTime = Math.max(maxIdleTime, state.getLatestPolledMessageTimestampInMs()); + } + } + versionedIngestionStats.recordMaxIdleTime(storeName, versionNumber, maxIdleTime); + } + } + private void recordQuotaMetrics(Store store) { if (emitMetrics.get()) { long currentQuota = store.getStorageQuotaInByte(); @@ -1330,7 +1343,7 @@ public void run() { try { // Update thread name to include topic to make it easy debugging Thread.currentThread().setName("venice-SIT-" + kafkaVersionTopic); - LOGGER.info("Running {}", consumerTaskId); + LOGGER.info("Running {}", ingestionTaskName); versionedIngestionStats.resetIngestionTaskPushTimeoutGauge(storeName, versionNumber); while (isRunning()) { @@ -1395,7 +1408,7 @@ public void run() { CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])).get(60, SECONDS); } } catch (VeniceIngestionTaskKilledException e) { - LOGGER.info("{} has been killed.", consumerTaskId); + LOGGER.info("{} has been killed.", ingestionTaskName); statusReportAdapter.reportKilled(partitionConsumptionStateMap.values(), e); doFlush = false; if (isCurrentVersion.getAsBoolean()) { @@ -1415,7 +1428,7 @@ public void run() { if (!isRunning()) { LOGGER.info( "{} encountered checksum verification failure, skipping error reporting because server is shutting down", - consumerTaskId, + ingestionTaskName, e); } else { handleIngestionException(e); @@ -1432,7 +1445,7 @@ public void run() { if (!isRunning() && ExceptionUtils.recursiveClassEquals(e, InterruptedException.class)) { // Known exceptions during graceful shutdown of storage server. Report error only if the server is still // running. - LOGGER.info("{} interrupted, skipping error reporting because server is shutting down", consumerTaskId, e); + LOGGER.info("{} interrupted, skipping error reporting because server is shutting down", ingestionTaskName, e); return; } handleIngestionException(e); @@ -1443,30 +1456,14 @@ public void run() { } } - private void recordStalePartitionsWithoutIngestionTask() { - /** - * Completed partitions will remain ONLINE without a backing ingestion task. If the partition belongs to a hybrid - * store then it will remain stale until the host is restarted. This is because both the auto reset task and Helix - * controller doesn't think there is a problem with the replica since it's COMPLETED and ONLINE. Stale replicas is - * better than dropping availability and that's why we do not put COMPLETED replicas to ERROR state immediately. - */ - partitionIngestionExceptionList.forEach(ep -> { - if (ep != null && ep.isReplicaCompleted()) { - versionedIngestionStats.recordStalePartitionsWithoutIngestionTask(storeName, versionNumber); - } - }); - } - private void handleIngestionException(Exception e) { - LOGGER.error("{} has failed.", consumerTaskId, e); - recordStalePartitionsWithoutIngestionTask(); + LOGGER.error("{} has failed.", ingestionTaskName, e); reportError(partitionConsumptionStateMap.values(), errorPartitionId, "Caught Exception during ingestion.", e); hostLevelIngestionStats.recordIngestionFailure(); } private void handleIngestionThrowable(Throwable t) { - LOGGER.error("{} has failed.", consumerTaskId, t); - recordStalePartitionsWithoutIngestionTask(); + LOGGER.error("{} has failed.", ingestionTaskName, t); reportError( partitionConsumptionStateMap.values(), errorPartitionId, @@ -1497,14 +1494,14 @@ private void internalClose(boolean doFlush) { if (opType == ConsumerActionType.RESET_OFFSET) { String topic = message.getTopic(); int partition = message.getPartition(); - LOGGER.info("{} Cleanup Reset OffSet : Topic {} Partition Id {}", consumerTaskId, topic, partition); + LOGGER.info("{} Cleanup Reset OffSet : Topic {} Partition Id {}", ingestionTaskName, topic, partition); storageMetadataService.clearOffset(topic, partition); } else { - LOGGER.info("{} Cleanup ignoring the Message {}", consumerTaskId, message); + LOGGER.info("{} Cleanup ignoring the Message {}", ingestionTaskName, message); } } } catch (Exception e) { - LOGGER.error("{} Error while resetting offset.", consumerTaskId, e); + LOGGER.error("{} Error while resetting offset.", ingestionTaskName, e); } // Unsubscribe any topic partitions related to this version topic from the shared consumer. aggKafkaConsumerService.unsubscribeAll(versionTopic); @@ -1513,7 +1510,7 @@ private void internalClose(boolean doFlush) { partitionConsumptionStateMap.values().parallelStream().forEach(PartitionConsumptionState::unsubscribe); partitionConsumptionStateMap.clear(); } catch (Exception e) { - LOGGER.error("{} Error while unsubscribing topic.", consumerTaskId, e); + LOGGER.error("{} Error while unsubscribing topic.", ingestionTaskName, e); } try { closeVeniceWriters(doFlush); @@ -1794,14 +1791,14 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws localKafkaServer); LOGGER.info( "{} subscribed to: {} Offset {}", - consumerTaskId, + ingestionTaskName, topicPartition, offsetRecord.getLocalVersionTopicOffset()); } storageUtilizationManager.initPartition(partition); break; case UNSUBSCRIBE: - LOGGER.info("{} Unsubscribing to: {}", consumerTaskId, topicPartition); + LOGGER.info("{} Unsubscribing to: {}", ingestionTaskName, topicPartition); PartitionConsumptionState consumptionState = partitionConsumptionStateMap.get(partition); forceUnSubscribedCount--; subscribedCount--; @@ -1848,11 +1845,11 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws partitionIngestionExceptionList.set(partition, null); LOGGER.info( "{} Unsubscribed to: {}, which has errored with exception", - consumerTaskId, + ingestionTaskName, topicPartition, partitionExceptionInfo.getException()); } else { - LOGGER.info("{} Unsubscribed to: {}", consumerTaskId, topicPartition); + LOGGER.info("{} Unsubscribed to: {}", ingestionTaskName, topicPartition); } // Only remove the partition from the maintained set if it's triggered by Helix. // Otherwise, keep it in the set since it is used for alerts; alerts should be sent out if unsubscription @@ -1892,12 +1889,12 @@ private void resetOffset(int partition, PubSubTopicPartition topicPartition, boo */ try { consumerResetOffset(topicPartition.getPubSubTopic(), partitionConsumptionState); - LOGGER.info("{} Reset OffSet : {}", consumerTaskId, topicPartition); + LOGGER.info("{} Reset OffSet : {}", ingestionTaskName, topicPartition); } catch (PubSubUnsubscribedTopicPartitionException e) { LOGGER.error( "{} Kafka consumer should have subscribed to the partition already but it fails " + "on resetting offset for: {}", - consumerTaskId, + ingestionTaskName, topicPartition); } partitionConsumptionStateMap.put( @@ -1914,7 +1911,7 @@ private void resetOffset(int partition, PubSubTopicPartition topicPartition, boo } else { LOGGER.info( "{} No need to reset offset by Kafka consumer, since the consumer is not subscribing: {}", - consumerTaskId, + ingestionTaskName, topicPartition); } kafkaDataIntegrityValidator.clearPartition(partition); @@ -2170,7 +2167,7 @@ public void processConsumerRecord( } } catch (VeniceMessageException | UnsupportedOperationException e) { throw new VeniceException( - consumerTaskId + " : Received an exception for message at partition: " + ingestionTaskName + " : Received an exception for message at partition: " + record.getTopicPartition().getPartitionNumber() + ", offset: " + record.getOffset() + ". Bubbling up.", e); } @@ -2508,7 +2505,7 @@ protected void processEndOfPush( if (partitionConsumptionState.getOffsetRecord().isEndOfPushReceived()) { LOGGER.warn( "{} Received duplicate EOP control message, ignoring it. Partition: {}, Offset: {}", - consumerTaskId, + ingestionTaskName, partition, offset); return; @@ -2620,7 +2617,7 @@ private boolean processControlMessage( if (!isSegmentControlMsg(type)) { LOGGER.info( "{} : Received {} control message. Partition: {}, Offset: {}", - consumerTaskId, + ingestionTaskName, type.name(), partition, offset); @@ -2777,7 +2774,7 @@ private int internalProcessConsumerRecord( } catch (DuplicateDataException e) { divErrorMetricCallback.accept(e); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("{} : Skipping a duplicate record at offset: {}", consumerTaskId, consumerRecord.getOffset()); + LOGGER.debug("{} : Skipping a duplicate record at offset: {}", ingestionTaskName, consumerRecord.getOffset()); } } catch (PersistenceFailureException ex) { if (partitionConsumptionStateMap.containsKey(consumerRecord.getTopicPartition().getPartitionNumber())) { @@ -2955,7 +2952,7 @@ private void writeToStorageEngine(int partition, byte[] keyBytes, Put put, long if (traceEnabled) { LOGGER.trace( "{} : Completed PUT to Store: {} in {} ns at {}", - consumerTaskId, + ingestionTaskName, kafkaVersionTopic, System.nanoTime() - putStartTimeNs, System.currentTimeMillis()); @@ -3015,6 +3012,7 @@ public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionSt } public void consumerUnSubscribe(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) { + // TODO may be remove them from here too Instant startTime = Instant.now(); int partitionId = partitionConsumptionState.getPartition(); aggKafkaConsumerService.unsubscribeConsumerFor(versionTopic, new PubSubTopicPartitionImpl(topic, partitionId)); @@ -3158,11 +3156,11 @@ private int processKafkaDataMessage( case UPDATE: throw new VeniceMessageException( - consumerTaskId + ": Not expecting UPDATE message from: " + consumerRecord.getTopicPartition() + ", Offset: " - + consumerRecord.getOffset()); + ingestionTaskName + ": Not expecting UPDATE message from: " + consumerRecord.getTopicPartition() + + ", Offset: " + consumerRecord.getOffset()); default: throw new VeniceMessageException( - consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType); + ingestionTaskName + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType); } /* @@ -3226,7 +3224,7 @@ private void waitReadyToProcessRecord(PubSubMessage partitionStateSerializer; private boolean isDaVinciClient; private RemoteIngestionRepairService remoteIngestionRepairService; @@ -268,14 +267,6 @@ public Builder setAggKafkaConsumerService(AggKafkaConsumerService aggKafkaConsum return set(() -> this.aggKafkaConsumerService = aggKafkaConsumerService); } - public long getStartReportingReadyToServeTimestamp() { - return startReportingReadyToServeTimestamp; - } - - public Builder setStartReportingReadyToServeTimestamp(long timestamp) { - return set(() -> this.startReportingReadyToServeTimestamp = timestamp); - } - public InternalAvroSpecificSerializer getPartitionStateSerializer() { return partitionStateSerializer; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/AdminResponse.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/AdminResponse.java index d3303f4328..1962e20cdb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/AdminResponse.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/AdminResponse.java @@ -57,7 +57,7 @@ public void addPartitionConsumptionState(PartitionConsumptionState state) { snapshot.isLatchReleased = state.isLatchReleased(); snapshot.processedRecordSizeSinceLastSync = state.getProcessedRecordSizeSinceLastSync(); snapshot.consumeRemotely = state.consumeRemotely(); - snapshot.latestMessageConsumptionTimestampInMs = state.getLatestMessageConsumptionTimestampInMs(); + snapshot.latestMessageConsumptionTimestampInMs = state.getLatestMessageConsumedTimestampInMs(); responseRecord.partitionConsumptionStates.add(snapshot); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java index 02b881ab07..987f37849e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java @@ -12,6 +12,9 @@ /** * This class is an aggregate place that keeps stats objects for multiple stores and total stats for each region for * AggKafkaConsumerService. + * + * For total stats for a given region, use this class to record stats. For store-level stats, delegate them to + * {@link KafkaConsumerServiceStats}. */ public class AggKafkaConsumerServiceStats extends AbstractVeniceAggStoreStats { public AggKafkaConsumerServiceStats( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java index 24b73f7399..244c89737a 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java @@ -123,10 +123,6 @@ public void resetIngestionTaskPushTimeoutGauge(String storeName, int version) { getStats(storeName, version).setIngestionTaskPushTimeoutGauge(0); } - public void recordStalePartitionsWithoutIngestionTask(String storeName, int version) { - recordVersionedAndTotalStat(storeName, version, IngestionStats::recordStalePartitionsWithoutIngestionTask); - } - public void recordSubscribePrepLatency(String storeName, int version, double value) { long currentTimeMs = System.currentTimeMillis(); recordVersionedAndTotalStat(storeName, version, stat -> stat.recordSubscribePrepLatency(value, currentTimeMs)); @@ -169,6 +165,10 @@ public void recordTransformerLatency(String storeName, int version, double value recordVersionedAndTotalStat(storeName, version, stat -> stat.recordTransformerLatency(value, timestamp)); } + public void recordMaxIdleTime(String storeName, int version, long idleTimeMs) { + getStats(storeName, version).recordIdleTime(idleTimeMs); + } + public void registerTransformerLatencySensor(String storeName, int version) { getStats(storeName, version).registerTransformerLatencySensor(); getTotalStats(storeName).registerTransformerLatencySensor(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java index b95ab8c1be..5c17e4e522 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java @@ -59,6 +59,7 @@ public class IngestionStats { public static final String NEARLINE_LOCAL_BROKER_TO_READY_TO_SERVE_LATENCY = "nearline_local_broker_to_ready_to_serve_latency"; public static final String TRANSFORMER_LATENCY = "transformer_latency"; + public static final String IDLE_TIME = "idle_time"; private static final MetricConfig METRIC_CONFIG = new MetricConfig(); private StoreIngestionTask ingestionTask; @@ -66,7 +67,6 @@ public class IngestionStats { private final Int2ObjectMap regionIdToHybridBytesConsumedRateMap; private final Int2ObjectMap regionIdToHybridRecordsConsumedRateMap; private final Int2ObjectMap regionIdToHybridAvgConsumedOffsetMap; - private final Count stalePartitionsWithoutIngestionTaskCount; private final LongAdderRateGauge recordsConsumedSensor = new LongAdderRateGauge(); private final LongAdderRateGauge bytesConsumedSensor = new LongAdderRateGauge(); private final LongAdderRateGauge leaderRecordsConsumedSensor = new LongAdderRateGauge(); @@ -78,7 +78,6 @@ public class IngestionStats { private final Int2ObjectMap regionIdToHybridBytesConsumedSensorMap; private final Int2ObjectMap regionIdToHybridRecordsConsumedSensorMap; private final Int2ObjectMap regionIdToHybridAvgConsumedOffsetSensorMap; - private final Sensor stalePartitionsWithoutIngestionTaskSensor; private final WritePathLatencySensor subscribePrepLatencySensor; private final WritePathLatencySensor consumedRecordEndToEndProcessingLatencySensor; private final WritePathLatencySensor nearlineProducerToLocalBrokerLatencySensor; @@ -97,6 +96,9 @@ public class IngestionStats { private final Sensor versionTopicEndOffsetRewindSensor; private final MetricsRepository localMetricRepository; + // Measure the max idle time among partitions for a given the store on this host + private final LongAdderRateGauge idleTimeSensor = new LongAdderRateGauge(); + public IngestionStats(VeniceServerConfig serverConfig) { Int2ObjectMap kafkaClusterIdToAliasMap = serverConfig.getKafkaClusterIdToAliasMap(); @@ -150,14 +152,6 @@ public IngestionStats(VeniceServerConfig serverConfig) { registerSensor(localMetricRepository, LEADER_RECORDS_PRODUCED_METRIC_NAME, leaderRecordsProducedSensor); registerSensor(localMetricRepository, LEADER_BYTES_PRODUCED_METRIC_NAME, leaderBytesProducedSensor); - stalePartitionsWithoutIngestionTaskCount = new Count(); - stalePartitionsWithoutIngestionTaskSensor = - localMetricRepository.sensor(STALE_PARTITIONS_WITHOUT_INGESTION_TASK_METRIC_NAME); - stalePartitionsWithoutIngestionTaskSensor.add( - STALE_PARTITIONS_WITHOUT_INGESTION_TASK_METRIC_NAME - + stalePartitionsWithoutIngestionTaskCount.getClass().getSimpleName(), - stalePartitionsWithoutIngestionTaskCount); - versionTopicEndOffsetRewindSensor = localMetricRepository.sensor(VERSION_TOPIC_END_OFFSET_REWIND_COUNT); versionTopicEndOffsetRewindSensor.add(VERSION_TOPIC_END_OFFSET_REWIND_COUNT, versionTopicEndOffsetRewindCount); @@ -177,6 +171,7 @@ public IngestionStats(VeniceServerConfig serverConfig) { registerSensor(localMetricRepository, TIMESTAMP_REGRESSION_DCR_ERROR, timestampRegressionDCRErrorSensor); registerSensor(localMetricRepository, OFFSET_REGRESSION_DCR_ERROR, offsetRegressionDCRErrorSensor); registerSensor(localMetricRepository, TOMBSTONE_CREATION_DCR, tombstoneCreationDCRSensor); + registerSensor(localMetricRepository, IDLE_TIME, idleTimeSensor); } private void registerSensor(MetricsRepository localMetricRepository, String sensorName, LongAdderRateGauge gauge) { @@ -301,10 +296,6 @@ public double getReadyToServeWithRTLag() { return 0; } - public double getStalePartitionsWithoutIngestionTaskCount() { - return stalePartitionsWithoutIngestionTaskCount.measure(METRIC_CONFIG, System.currentTimeMillis()); - } - public double getSubscribePrepLatencyAvg() { return subscribePrepLatencySensor.getAvg(); } @@ -317,10 +308,6 @@ public void recordSubscribePrepLatency(double value, long currentTimeMs) { subscribePrepLatencySensor.record(value, currentTimeMs); } - public void recordStalePartitionsWithoutIngestionTask() { - stalePartitionsWithoutIngestionTaskSensor.record(); - } - public void recordVersionTopicEndOffsetRewind() { versionTopicEndOffsetRewindSensor.record(); } @@ -521,6 +508,14 @@ public void registerTransformerLatencySensor() { transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY); } + public void recordIdleTime(long value) { + idleTimeSensor.record(value); + } + + public double getIdleTime() { + return idleTimeSensor.getRate(); + } + public static double unAvailableToZero(double value) { /* When data is unavailable, return 0 instead of NaN or Infinity. Some metrics are initialized to -INF. This can cause problems when metrics are aggregated. Use only when zero makes semantic sense. diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java index 07827f7fac..0dea83070f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java @@ -10,6 +10,7 @@ import static com.linkedin.davinci.stats.IngestionStats.FOLLOWER_RECORDS_CONSUMED_METRIC_NAME; import static com.linkedin.davinci.stats.IngestionStats.HYBRID_FOLLOWER_OFFSET_LAG; import static com.linkedin.davinci.stats.IngestionStats.HYBRID_LEADER_OFFSET_LAG; +import static com.linkedin.davinci.stats.IngestionStats.IDLE_TIME; import static com.linkedin.davinci.stats.IngestionStats.INGESTION_TASK_ERROR_GAUGE; import static com.linkedin.davinci.stats.IngestionStats.INGESTION_TASK_PUSH_TIMEOUT_GAUGE; import static com.linkedin.davinci.stats.IngestionStats.LEADER_BYTES_CONSUMED_METRIC_NAME; @@ -23,7 +24,6 @@ import static com.linkedin.davinci.stats.IngestionStats.OFFSET_REGRESSION_DCR_ERROR; import static com.linkedin.davinci.stats.IngestionStats.READY_TO_SERVE_WITH_RT_LAG_METRIC_NAME; import static com.linkedin.davinci.stats.IngestionStats.RECORDS_CONSUMED_METRIC_NAME; -import static com.linkedin.davinci.stats.IngestionStats.STALE_PARTITIONS_WITHOUT_INGESTION_TASK_METRIC_NAME; import static com.linkedin.davinci.stats.IngestionStats.SUBSCRIBE_ACTION_PREP_LATENCY; import static com.linkedin.davinci.stats.IngestionStats.TIMESTAMP_REGRESSION_DCR_ERROR; import static com.linkedin.davinci.stats.IngestionStats.TOMBSTONE_CREATION_DCR; @@ -120,9 +120,6 @@ protected void registerStats() { registerSensor( LEADER_BYTES_PRODUCED_METRIC_NAME, new IngestionStatsGauge(this, () -> getStats().getLeaderBytesProduced(), 0)); - registerSensor( - STALE_PARTITIONS_WITHOUT_INGESTION_TASK_METRIC_NAME, - new IngestionStatsGauge(this, () -> getStats().getStalePartitionsWithoutIngestionTaskCount(), 0)); registerSensor( SUBSCRIBE_ACTION_PREP_LATENCY + "_avg", new IngestionStatsGauge(this, () -> getStats().getSubscribePrepLatencyAvg(), 0)); @@ -135,6 +132,7 @@ protected void registerStats() { registerSensor( CONSUMED_RECORD_END_TO_END_PROCESSING_LATENCY + "_max", new IngestionStatsGauge(this, () -> getStats().getConsumedRecordEndToEndProcessingLatencyMax(), 0)); + registerSensor(IDLE_TIME + "_max", new IngestionStatsGauge(this, () -> getStats().getIdleTime())); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index b1121ccb9c..e44e1b42fd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -409,7 +409,7 @@ public static Object[][] sortedInputAndAAConfigProvider() { createReplicationMetadataWithValueSchemaId(DELETE_KEY_FOO_TIMESTAMP, DELETE_KEY_FOO_OFFSET, EXISTING_SCHEMA_ID); private boolean databaseChecksumVerificationEnabled = false; - private AggKafkaConsumerServiceStats _kafkaConsumerServiceStats = mock(AggKafkaConsumerServiceStats.class); + private AggKafkaConsumerServiceStats kafkaConsumerServiceStats = mock(AggKafkaConsumerServiceStats.class); private PubSubConsumerAdapterFactory mockFactory = mock(PubSubConsumerAdapterFactory.class); private Supplier storeVersionStateSupplier = () -> new StoreVersionState(); @@ -549,7 +549,7 @@ public void methodSetUp() throws Exception { KafkaConsumerServiceStats regionStats = mock(KafkaConsumerServiceStats.class); doNothing().when(regionStats).recordByteSizePerPoll(anyDouble()); doNothing().when(regionStats).recordPollResultNum(anyInt()); - doReturn(regionStats).when(_kafkaConsumerServiceStats).getStoreStats(anyString()); + doReturn(regionStats).when(kafkaConsumerServiceStats).getStoreStats(anyString()); } private VeniceWriter getVeniceWriter(String topic, PubSubProducerAdapter producerAdapter, int amplificationFactor) { @@ -941,7 +941,7 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( isLiveConfigEnabled, pubSubDeserializer, SystemTime.INSTANCE, - _kafkaConsumerServiceStats, + kafkaConsumerServiceStats, false, mock(ReadOnlyStoreRepository.class), false); @@ -964,7 +964,7 @@ private StoreIngestionTaskFactory.Builder getIngestionTaskFactoryBuilder( isLiveConfigEnabled, pubSubDeserializer, SystemTime.INSTANCE, - _kafkaConsumerServiceStats, + kafkaConsumerServiceStats, false, mock(ReadOnlyStoreRepository.class), false); @@ -1740,7 +1740,7 @@ public void testBadMessageTypesFailFast(AAConfig aaConfig) throws Exception { localVeniceWriter.put(putKeyBar, putValue, SCHEMA_ID); runTest(Utils.setOf(PARTITION_FOO, PARTITION_BAR), () -> { - verify(_kafkaConsumerServiceStats, timeout(TEST_TIMEOUT_MS).atLeastOnce()).recordTotalPollError(); + verify(kafkaConsumerServiceStats, timeout(TEST_TIMEOUT_MS).atLeastOnce()).recordTotalPollError(); }, aaConfig); } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java index 879f0307ce..a192236341 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/AbstractVeniceStats.java @@ -138,6 +138,15 @@ protected Sensor registerSensorWithAggregate( } } + /** + * Register sensor for both total and per store stats. + * When per-store stats is recorded, the total stats would be recorded as well. + * @param sensorName + * @param totalStats + * @param totalSensor + * @param stats + * @return + */ protected Sensor registerPerStoreAndTotalSensor( String sensorName, AbstractVeniceStats totalStats, @@ -147,6 +156,14 @@ protected Sensor registerPerStoreAndTotalSensor( return registerSensor(sensorName, parent, stats); } + /** + * Only register sensor for total stats. If not provided, create a new one. + * @param sensorName + * @param totalStats + * @param totalSensor + * @param time + * @return + */ protected LongAdderRateGauge registerOnlyTotalRate( String sensorName, AbstractVeniceStats totalStats, diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 81f3284a75..6a249614d5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -620,14 +620,6 @@ private ConfigKeys() { */ public static final String SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED = "server.leaked.resource.cleanup.enabled"; - /** - * The delay serving of the newly started storage node. - * The reason to have this config is that we noticed a high GC pause for some time because of connection warming or initializing the - * internal components. - * We will need to do some experiment to find the right value. - */ - public static final String SERVER_DELAY_REPORT_READY_TO_SERVE_MS = "server.delay.report.ready.to.serve.ms"; - /** * Ingestion mode in target storage instance. * This will be applied to Da Vinci and Storage Node.