Skip to content

Commit

Permalink
[da-vinci][server] Tweaked the offset lag threshold logic
Browse files Browse the repository at this point in the history
Previously, the store-level 'offsetLagThresholdToGoOnline' config is
being used as partition-level, which has caused a lot of confusion
since the users are expecting this would apply in store-level.
This code change would divide this configured value by partition count,
but keep the minimal lag threshold per partition as 1.

This code also removed some deprecated logic for incremental push to VT.
  • Loading branch information
gaojieliu committed Feb 2, 2024
1 parent 4f11a4a commit db27699
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -807,145 +807,140 @@ protected boolean isReadyToServe(PartitionConsumptionState partitionConsumptionS
int partitionId = partitionConsumptionState.getPartition();
boolean isLagAcceptable = false;

if (!hybridStoreConfig.isPresent()) {
long lag = measureLagWithCallToPubSub(
localKafkaServer,
versionTopic,
partitionId,
partitionConsumptionState.getLatestProcessedLocalVersionTopicOffset());
isLagAcceptable = lag <= 0;
} else {
try {
// Looks like none of the short-circuitry fired, so we need to measure lag!
long offsetThreshold = hybridStoreConfig.get().getOffsetLagThresholdToGoOnline();
long producerTimeLagThresholdInSeconds =
hybridStoreConfig.get().getProducerTimestampLagThresholdToGoOnlineInSeconds();
String msg = msgForLagMeasurement[partitionId];

// Log only once a minute per partition.
boolean shouldLogLag = !REDUNDANT_LOGGING_FILTER.isRedundantException(msg);
/**
* If offset lag threshold is set to -1, time lag threshold will be the only criterion for going online.
*/
if (offsetThreshold >= 0) {
isLagAcceptable = checkAndLogIfLagIsAcceptableForHybridStore(
partitionConsumptionState,
measureHybridOffsetLag(partitionConsumptionState, shouldLogLag),
offsetThreshold,
shouldLogLag,
OFFSET_LAG,
0);
}
try {
// Looks like none of the short-circuitry fired, so we need to measure lag!
long offsetThreshold = getOffsetToOnlineLagThresholdPerPartition(
hybridStoreConfig,
storeRepository,
storeName,
subPartitionCount);
long producerTimeLagThresholdInSeconds =
hybridStoreConfig.get().getProducerTimestampLagThresholdToGoOnlineInSeconds();
String msg = msgForLagMeasurement[partitionId];

// Log only once a minute per partition.
boolean shouldLogLag = !REDUNDANT_LOGGING_FILTER.isRedundantException(msg);
/**
* If offset lag threshold is set to -1, time lag threshold will be the only criterion for going online.
*/
if (offsetThreshold >= 0) {
isLagAcceptable = checkAndLogIfLagIsAcceptableForHybridStore(
partitionConsumptionState,
measureHybridOffsetLag(partitionConsumptionState, shouldLogLag),
offsetThreshold,
shouldLogLag,
OFFSET_LAG,
0);
}

/**
* If the hybrid producer time lag threshold is positive, check the difference between current time and latest
* producer timestamp; ready-to-serve will not be reported until the diff is smaller than the defined time lag threshold.
*
* If timestamp lag threshold is set to -1, offset lag threshold will be the only criterion for going online.
*/
if (producerTimeLagThresholdInSeconds > 0) {
long producerTimeLagThresholdInMS = TimeUnit.SECONDS.toMillis(producerTimeLagThresholdInSeconds);
long latestConsumedProducerTimestamp =
partitionConsumptionState.getOffsetRecord().getLatestProducerProcessingTimeInMs();
if (amplificationFactor != 1) {
latestConsumedProducerTimestamp = getLatestConsumedProducerTimestampWithSubPartition(
latestConsumedProducerTimestamp,
partitionConsumptionState);
}
boolean timestampLagIsAcceptable = checkAndLogIfLagIsAcceptableForHybridStore(
partitionConsumptionState,
LatencyUtils.getElapsedTimeInMs(latestConsumedProducerTimestamp),
producerTimeLagThresholdInMS,
shouldLogLag,
TIME_LAG,
latestConsumedProducerTimestamp);
/**
* If the hybrid producer time lag threshold is positive, check the difference between current time and latest
* producer timestamp; ready-to-serve will not be reported until the diff is smaller than the defined time lag threshold.
* If time lag is not acceptable but the producer timestamp of the last message of RT is smaller or equal than
* the known latest producer timestamp in server, it means ingestion task has reached the end of RT, so it's
* safe to ignore the time lag.
*
* If timestamp lag threshold is set to -1, offset lag threshold will be the only criterion for going online.
* Notice that if EOP is not received, this function will be short circuit before reaching here, so there is
* no risk of meeting the time lag earlier than expected.
*/
if (producerTimeLagThresholdInSeconds > 0) {
long producerTimeLagThresholdInMS = TimeUnit.SECONDS.toMillis(producerTimeLagThresholdInSeconds);
long latestConsumedProducerTimestamp =
partitionConsumptionState.getOffsetRecord().getLatestProducerProcessingTimeInMs();
if (amplificationFactor != 1) {
latestConsumedProducerTimestamp = getLatestConsumedProducerTimestampWithSubPartition(
latestConsumedProducerTimestamp,
partitionConsumptionState);
if (!timestampLagIsAcceptable) {
String msgIdentifier = msg + "_ignore_time_lag";
String realTimeTopicKafkaURL;
Set<String> realTimeTopicKafkaURLs = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
if (realTimeTopicKafkaURLs.isEmpty()) {
throw new VeniceException("Expect a real-time topic Kafka URL for store " + storeName);
} else if (realTimeTopicKafkaURLs.size() == 1) {
realTimeTopicKafkaURL = realTimeTopicKafkaURLs.iterator().next();
} else if (realTimeTopicKafkaURLs.contains(localKafkaServer)) {
realTimeTopicKafkaURL = localKafkaServer;
} else {
throw new VeniceException(
String.format(
"Expect source RT Kafka URLs contains local Kafka URL. Got local "
+ "Kafka URL %s and RT source Kafka URLs %s",
localKafkaServer,
realTimeTopicKafkaURLs));
}
boolean timestampLagIsAcceptable = checkAndLogIfLagIsAcceptableForHybridStore(
partitionConsumptionState,
LatencyUtils.getElapsedTimeInMs(latestConsumedProducerTimestamp),
producerTimeLagThresholdInMS,
shouldLogLag,
TIME_LAG,
latestConsumedProducerTimestamp);
/**
* If time lag is not acceptable but the producer timestamp of the last message of RT is smaller or equal than
* the known latest producer timestamp in server, it means ingestion task has reached the end of RT, so it's
* safe to ignore the time lag.
*
* Notice that if EOP is not received, this function will be short circuit before reaching here, so there is
* no risk of meeting the time lag earlier than expected.
*/
if (!timestampLagIsAcceptable) {
String msgIdentifier = msg + "_ignore_time_lag";
String realTimeTopicKafkaURL;
Set<String> realTimeTopicKafkaURLs = getRealTimeDataSourceKafkaAddress(partitionConsumptionState);
if (realTimeTopicKafkaURLs.isEmpty()) {
throw new VeniceException("Expect a real-time topic Kafka URL for store " + storeName);
} else if (realTimeTopicKafkaURLs.size() == 1) {
realTimeTopicKafkaURL = realTimeTopicKafkaURLs.iterator().next();
} else if (realTimeTopicKafkaURLs.contains(localKafkaServer)) {
realTimeTopicKafkaURL = localKafkaServer;
} else {
throw new VeniceException(
String.format(
"Expect source RT Kafka URLs contains local Kafka URL. Got local "
+ "Kafka URL %s and RT source Kafka URLs %s",
localKafkaServer,
realTimeTopicKafkaURLs));
}

final PubSubTopic lagMeasurementTopic = pubSubTopicRepository.getTopic(realTimeTopic.getName());
final PubSubTopicPartition pubSubTopicPartition =
new PubSubTopicPartitionImpl(lagMeasurementTopic, partitionId);

// DaVinci and STANDBY checks the local consumption and leaderCompleteState status
final String lagMeasurementKafkaUrl =
(isHybridFollower(partitionConsumptionState)) ? localKafkaServer : realTimeTopicKafkaURL;

if (!cachedPubSubMetadataGetter.containsTopic(getTopicManager(lagMeasurementKafkaUrl), realTimeTopic)) {
final PubSubTopic lagMeasurementTopic = pubSubTopicRepository.getTopic(realTimeTopic.getName());
final PubSubTopicPartition pubSubTopicPartition =
new PubSubTopicPartitionImpl(lagMeasurementTopic, partitionId);

// DaVinci and STANDBY checks the local consumption and leaderCompleteState status
final String lagMeasurementKafkaUrl =
(isHybridFollower(partitionConsumptionState)) ? localKafkaServer : realTimeTopicKafkaURL;

if (!cachedPubSubMetadataGetter.containsTopic(getTopicManager(lagMeasurementKafkaUrl), realTimeTopic)) {
timestampLagIsAcceptable = true;
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msgIdentifier)) {
LOGGER.info(
"{} [Time lag] Topic {} doesn't exist; ignoring time lag.",
consumerTaskId,
lagMeasurementTopic);
}
} else {
long latestProducerTimestampInTopic = cachedPubSubMetadataGetter
.getProducerTimestampOfLastDataMessage(getTopicManager(lagMeasurementKafkaUrl), pubSubTopicPartition);
if (latestProducerTimestampInTopic < 0
|| latestProducerTimestampInTopic <= latestConsumedProducerTimestamp) {
timestampLagIsAcceptable = true;
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msgIdentifier)) {
LOGGER.info(
"{} [Time lag] Topic {} doesn't exist; ignoring time lag.",
consumerTaskId,
lagMeasurementTopic);
}
} else {
long latestProducerTimestampInTopic = cachedPubSubMetadataGetter
.getProducerTimestampOfLastDataMessage(getTopicManager(lagMeasurementKafkaUrl), pubSubTopicPartition);
if (latestProducerTimestampInTopic < 0
|| latestProducerTimestampInTopic <= latestConsumedProducerTimestamp) {
timestampLagIsAcceptable = true;
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msgIdentifier)) {
if (latestProducerTimestampInTopic < 0) {
LOGGER.info(
"{} [Time lag] Topic {} is empty or all messages have been truncated; ignoring time lag.",
consumerTaskId,
lagMeasurementTopic);
} else {
LOGGER.info(
"{} [Time lag] Producer timestamp of last message in topic {} "
+ "partition {}: {}, which is smaller or equal than the known latest producer time: {}. "
+ "Consumption lag is caught up already.",
consumerTaskId,
lagMeasurementTopic,
partitionId,
latestProducerTimestampInTopic,
latestConsumedProducerTimestamp);
}
if (latestProducerTimestampInTopic < 0) {
LOGGER.info(
"{} [Time lag] Topic {} is empty or all messages have been truncated; ignoring time lag.",
consumerTaskId,
lagMeasurementTopic);
} else {
LOGGER.info(
"{} [Time lag] Producer timestamp of last message in topic {} "
+ "partition {}: {}, which is smaller or equal than the known latest producer time: {}. "
+ "Consumption lag is caught up already.",
consumerTaskId,
lagMeasurementTopic,
partitionId,
latestProducerTimestampInTopic,
latestConsumedProducerTimestamp);
}
}
}
}
if (offsetThreshold >= 0) {
/**
* If both threshold configs are on, both offset lag and time lag must be within thresholds before online.
*/
isLagAcceptable &= timestampLagIsAcceptable;
} else {
isLagAcceptable = timestampLagIsAcceptable;
}
}
} catch (Exception e) {
String exceptionMsgIdentifier =
new StringBuilder().append(kafkaVersionTopic).append("_isReadyToServe").toString();
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(exceptionMsgIdentifier)) {
LOGGER.info("Exception when trying to determine if hybrid store is ready to serve: {}", storeName, e);
if (offsetThreshold >= 0) {
/**
* If both threshold configs are on, both offset lag and time lag must be within thresholds before online.
*/
isLagAcceptable &= timestampLagIsAcceptable;
} else {
isLagAcceptable = timestampLagIsAcceptable;
}
isLagAcceptable = false;
}
} catch (Exception e) {
String exceptionMsgIdentifier =
new StringBuilder().append(kafkaVersionTopic).append("_isReadyToServe").toString();
if (!REDUNDANT_LOGGING_FILTER.isRedundantException(exceptionMsgIdentifier)) {
LOGGER.info("Exception when trying to determine if hybrid store is ready to serve: {}", storeName, e);
}
isLagAcceptable = false;
}

if (isLagAcceptable) {
Expand Down Expand Up @@ -1156,7 +1151,7 @@ private void processIngestionException() {
/**
* Since the partition is already unsubscribed, we will clear the exception to avoid excessive logging, and in theory,
* this shouldn't happen since {@link #processCommonConsumerAction} will clear the exception list during un-subscribing.
*/
*/
partitionIngestionExceptionList.set(exceptionPartition, null);
} else {
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, exceptionPartition);
Expand All @@ -1165,7 +1160,7 @@ private void processIngestionException() {
*/
if (partitionException instanceof MemoryLimitExhaustedException
|| partitionException.getCause() instanceof MemoryLimitExhaustedException
&& isCurrentVersion.getAsBoolean()) {
&& isCurrentVersion.getAsBoolean()) {
LOGGER.warn(
"Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and"
+ " resume the consumption after killing ingestion tasks for non current versions");
Expand Down Expand Up @@ -1625,6 +1620,25 @@ protected TopicSwitch resolveSourceKafkaServersWithinTopicSwitch(TopicSwitch ori
return originalTopicSwitch;
}

protected static long getOffsetToOnlineLagThresholdPerPartition(
Optional<HybridStoreConfig> hybridStoreConfig,
ReadOnlyStoreRepository storeRepository,
String storeName,
int subPartitionCount) {
if (!hybridStoreConfig.isPresent()) {
throw new VeniceException("This is not a hybrid store: " + storeName);
}
long lagThreshold = hybridStoreConfig.get().getOffsetLagThresholdToGoOnline();
if (lagThreshold < 0) {
/**
* Offset lag threshold is disabled, use time lag.
*/
return lagThreshold;
}

return Math.max(lagThreshold / subPartitionCount, 1);
}

private void checkConsumptionStateWhenStart(
OffsetRecord offsetRecord,
PartitionConsumptionState newPartitionConsumptionState) {
Expand Down Expand Up @@ -1694,7 +1708,11 @@ private void checkConsumptionStateWhenStart(
long offsetLagDeltaRelaxFactor = serverConfig.getOffsetLagDeltaRelaxFactorForFastOnlineTransitionInRestart();
long previousOffsetLag = newPartitionConsumptionState.getOffsetRecord().getOffsetLag();
if (hybridStoreConfig.isPresent() && newPartitionConsumptionState.isEndOfPushReceived()) {
long offsetLagThreshold = hybridStoreConfig.get().getOffsetLagThresholdToGoOnline();
long offsetLagThreshold = getOffsetToOnlineLagThresholdPerPartition(
hybridStoreConfig,
storeRepository,
storeName,
subPartitionCount);
// Only enable this feature with positive offset lag delta relax factor and offset lag threshold.
if (offsetLagDeltaRelaxEnabled && offsetLagThreshold > 0) {
long offsetLag = measureHybridOffsetLag(newPartitionConsumptionState, true);
Expand Down Expand Up @@ -2862,8 +2880,8 @@ protected void validateMessage(

return (isDataRecovery && isHybridMode() && partitionConsumptionState.getTopicSwitch() == null)
|| (topicManager.isTopicCompactionEnabled(pubSubTopic)
&& LatencyUtils.getElapsedTimeInMs(consumerRecord.getPubSubMessageTime()) >= topicManager
.getTopicMinLogCompactionLagMs(pubSubTopic));
&& LatencyUtils.getElapsedTimeInMs(consumerRecord.getPubSubMessageTime()) >= topicManager
.getTopicMinLogCompactionLagMs(pubSubTopic));
});

try {
Expand Down Expand Up @@ -3845,4 +3863,4 @@ public boolean isProducingVersionTopicHealthy() {
}
return true;
}
}
}
Loading

0 comments on commit db27699

Please sign in to comment.