Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Increased Wait After Unsubscribe During State Transitions #1213

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_STUCK_CONSUMER_REPAIR_THRESHOLD_SECOND;
import static com.linkedin.venice.ConfigKeys.SERVER_SYSTEM_STORE_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.SERVER_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.SEVER_CALCULATE_QUOTA_USAGE_BASED_ON_PARTITIONS_ASSIGNMENT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SORTED_INPUT_DRAINER_SIZE;
import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED;
Expand Down Expand Up @@ -555,6 +556,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int aaWCWorkloadParallelProcessingThreadPoolSize;
private final boolean isGlobalRtDivEnabled;
private final boolean nearlineWorkloadProducerThroughputOptimizationEnabled;
private final long pubSubConsumerWaitAfterUnsubscribeTimeoutMs;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -929,6 +931,9 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_AA_WC_WORKLOAD_PARALLEL_PROCESSING_THREAD_POOL_SIZE, 8);
nearlineWorkloadProducerThroughputOptimizationEnabled =
serverProperties.getBoolean(SERVER_NEARLINE_WORKLOAD_PRODUCER_THROUGHPUT_OPTIMIZATION_ENABLED, true);
pubSubConsumerWaitAfterUnsubscribeTimeoutMs = serverProperties.getLong(
SERVER_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS,
KafkaConsumerService.DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS);
}

long extractIngestionMemoryLimit(
Expand Down Expand Up @@ -1179,6 +1184,10 @@ public int getPubSubConsumerPollRetryBackoffMs() {
return pubSubConsumerPollRetryBackoffMs;
}

public long getPubSubConsumerWaitAfterUnsubscribeTimeoutMs() {
return pubSubConsumerWaitAfterUnsubscribeTimeoutMs;
}

public long getDiskHealthCheckIntervalInMS() {
return diskHealthCheckIntervalInMS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public synchronized AbstractKafkaConsumerService createKafkaConsumerService(fina
metricsRepository,
kafkaClusterUrlToAliasMap.getOrDefault(url, url) + poolType.getStatSuffix(),
sharedConsumerNonExistingTopicCleanupDelayMS,
serverConfig.getPubSubConsumerWaitAfterUnsubscribeTimeoutMs(),
topicExistenceChecker,
liveConfigBasedKafkaThrottlingEnabled,
pubSubDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@
* @see AggKafkaConsumerService which wraps one instance of this class per Kafka cluster.
*/
public abstract class KafkaConsumerService extends AbstractKafkaConsumerService {
/**
* Max wait for the next poll() after unsubscribing, indicating that all previous inflight messages were processed
*/
public static final long DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(30);
/**
* A shorter timeout wait during shutdown / termination to avoid blocking the shutdown process
*/
public static final long SHUTDOWN_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);

protected final String kafkaUrl;
protected final String kafkaUrlForLogger;
protected final ConsumerPoolType poolType;
Expand Down Expand Up @@ -97,6 +106,7 @@ protected KafkaConsumerService(
final MetricsRepository metricsRepository,
final String kafkaClusterAlias,
final long sharedConsumerNonExistingTopicCleanupDelayMS,
final long consumerWaitAfterUnsubscribeTimeoutMs,
final TopicExistenceChecker topicExistenceChecker,
final boolean liveConfigBasedKafkaThrottlingEnabled,
final PubSubMessageDeserializer pubSubDeserializer,
Expand Down Expand Up @@ -135,6 +145,7 @@ protected KafkaConsumerService(
pubSubDeserializer,
null),
aggStats,
consumerWaitAfterUnsubscribeTimeoutMs,
this::recordPartitionsPerConsumerSensor,
this::handleUnsubscription);

Expand Down Expand Up @@ -230,7 +241,7 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (sharedConsumer) {
sharedConsumer.unSubscribe(topicPartition);
sharedConsumer.unSubscribe(topicPartition, SHUTDOWN_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS);
removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition);
}
});
Expand Down Expand Up @@ -447,6 +458,7 @@ KafkaConsumerService construct(
MetricsRepository metricsRepository,
String kafkaClusterAlias,
long sharedConsumerNonExistingTopicCleanupDelayMS,
long consumerWaitAfterUnsubscribeTimeoutMs,
TopicExistenceChecker topicExistenceChecker,
boolean liveConfigBasedKafkaThrottlingEnabled,
PubSubMessageDeserializer pubSubDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
final MetricsRepository metricsRepository,
final String kafkaClusterAlias,
final long sharedConsumerNonExistingTopicCleanupDelayMS,
final long consumerWaitAfterUnsubscribeTimeoutMs,
final TopicExistenceChecker topicExistenceChecker,
final boolean liveConfigBasedKafkaThrottlingEnabled,
final PubSubMessageDeserializer pubSubDeserializer,
Expand All @@ -69,6 +70,7 @@ public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
metricsRepository,
kafkaClusterAlias,
sharedConsumerNonExistingTopicCleanupDelayMS,
consumerWaitAfterUnsubscribeTimeoutMs,
topicExistenceChecker,
liveConfigBasedKafkaThrottlingEnabled,
pubSubDeserializer,
Expand All @@ -91,6 +93,7 @@ public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
final MetricsRepository metricsRepository,
final String kafkaClusterAlias,
final long sharedConsumerNonExistingTopicCleanupDelayMS,
final long consumerWaitAfterUnsubscribeTimeoutMs,
final TopicExistenceChecker topicExistenceChecker,
final boolean liveConfigBasedKafkaThrottlingEnabled,
final PubSubMessageDeserializer pubSubDeserializer,
Expand All @@ -111,6 +114,7 @@ public class PartitionWiseKafkaConsumerService extends KafkaConsumerService {
metricsRepository,
kafkaClusterAlias,
sharedConsumerNonExistingTopicCleanupDelayMS,
consumerWaitAfterUnsubscribeTimeoutMs,
topicExistenceChecker,
liveConfigBasedKafkaThrottlingEnabled,
pubSubDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
*/
class SharedKafkaConsumer implements PubSubConsumerAdapter {
private static final Logger LOGGER = LogManager.getLogger(SharedKafkaConsumer.class);
private long nextPollTimeOutSeconds = 10;

protected final PubSubConsumerAdapter delegate;

Expand All @@ -61,6 +60,8 @@ class SharedKafkaConsumer implements PubSubConsumerAdapter {
*/
private final AtomicBoolean waitingForPoll = new AtomicBoolean(false);

private long waitAfterUnsubscribeTimeoutMs;

private final Time time;

/**
Expand All @@ -85,19 +86,28 @@ class SharedKafkaConsumer implements PubSubConsumerAdapter {
public SharedKafkaConsumer(
PubSubConsumerAdapter delegate,
AggKafkaConsumerServiceStats stats,
long waitAfterUnsubscribeTimeoutMs,
Runnable assignmentChangeListener,
UnsubscriptionListener unsubscriptionListener) {
this(delegate, stats, assignmentChangeListener, unsubscriptionListener, new SystemTime());
this(
delegate,
stats,
waitAfterUnsubscribeTimeoutMs,
assignmentChangeListener,
unsubscriptionListener,
new SystemTime());
}

SharedKafkaConsumer(
PubSubConsumerAdapter delegate,
AggKafkaConsumerServiceStats stats,
long waitAfterUnsubscribeTimeoutMs,
Runnable assignmentChangeListener,
UnsubscriptionListener unsubscriptionListener,
Time time) {
this.delegate = delegate;
this.stats = stats;
this.waitAfterUnsubscribeTimeoutMs = waitAfterUnsubscribeTimeoutMs;
this.assignmentChangeListener = assignmentChangeListener;
this.unsubscriptionListener = unsubscriptionListener;
this.time = time;
Expand Down Expand Up @@ -145,20 +155,29 @@ synchronized void subscribe(
updateCurrentAssignment(delegate.getAssignment());
}

@Override
public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) {
unSubscribe(pubSubTopicPartition, waitAfterUnsubscribeTimeoutMs);
}

/**
* There is an additional goal of this function which is to make sure that all the records consumed for this {topic,partition} prior to
* calling unsubscribe here is produced to drainer service. {@link ConsumptionTask#run()} ends up calling
* {@link SharedKafkaConsumer#poll(long)} and produceToStoreBufferService sequentially. So waiting for at least one more
* invocation of {@link SharedKafkaConsumer#poll(long)} achieves the above objective.
*/
@Override
public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) {
public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition, long timeoutMsArg) {
/*
Other values of timeoutMs are provided by the shutdown code path for a shorter timeout wait than the default
value of the server config. However, if the server config waitAfterUnsubscribeTimeoutMs is smaller, then use it.
*/
final long timeoutMs = Math.min(waitAfterUnsubscribeTimeoutMs, timeoutMsArg);
unSubscribeAction(() -> {
this.delegate.unSubscribe(pubSubTopicPartition);
PubSubTopic versionTopic = subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition);
unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition);
return Collections.singleton(pubSubTopicPartition);
});
}, timeoutMs);
}

@Override
Expand All @@ -170,7 +189,7 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition);
}
return pubSubTopicPartitionSet;
});
}, waitAfterUnsubscribeTimeoutMs);
}

/**
Expand All @@ -179,7 +198,7 @@ public synchronized void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicP
*
* @param supplier which performs the unsubscription and returns a set of partitions which were unsubscribed
*/
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier) {
protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>> supplier, long timeoutMs) {
long currentPollTimes = pollTimes;
Set<PubSubTopicPartition> topicPartitions = supplier.get();
long startTime = System.currentTimeMillis();
Expand All @@ -191,24 +210,27 @@ protected synchronized void unSubscribeAction(Supplier<Set<PubSubTopicPartition>
topicPartitions,
elapsedTime);
updateCurrentAssignment(delegate.getAssignment());
waitAfterUnsubscribe(currentPollTimes, topicPartitions);
waitAfterUnsubscribe(currentPollTimes, topicPartitions, timeoutMs);
}

protected void waitAfterUnsubscribe(long currentPollTimes, Set<PubSubTopicPartition> topicPartitions) {
protected void waitAfterUnsubscribe(
long currentPollTimes,
Set<PubSubTopicPartition> topicPartitions,
long timeoutMs) {
currentPollTimes++;
waitingForPoll.set(true);
// Wait for the next poll or maximum 10 seconds. Interestingly wait api does not provide any indication if wait
// returned
// due to timeout. So an explicit time check is necessary.
long timeoutMs = (time.getNanoseconds() / Time.NS_PER_MS) + nextPollTimeOutSeconds * Time.MS_PER_SECOND;
final long endTimeMs = time.getMilliseconds() + timeoutMs;
try {
while (currentPollTimes > pollTimes) {
long waitMs = timeoutMs - (time.getNanoseconds() / Time.NS_PER_MS);
final long waitMs = endTimeMs - time.getMilliseconds();
if (waitMs <= 0) {
LOGGER.warn(
"Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} seconds",
"Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} milliseconds",
topicPartitions,
nextPollTimeOutSeconds);
waitAfterUnsubscribeTimeoutMs);
break;
}
wait(waitMs);
Expand All @@ -221,8 +243,8 @@ protected void waitAfterUnsubscribe(long currentPollTimes, Set<PubSubTopicPartit
}

// Only for testing.
void setNextPollTimeoutSeconds(long nextPollTimeOutSeconds) {
this.nextPollTimeOutSeconds = nextPollTimeOutSeconds;
void setWaitAfterUnsubscribeTimeoutMs(long waitAfterUnsubscribeTimeoutMs) {
this.waitAfterUnsubscribeTimeoutMs = waitAfterUnsubscribeTimeoutMs;
}

// Only for testing.
Expand All @@ -248,7 +270,7 @@ public synchronized Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, Kafka
/**
* Always invoke this method no matter whether the consumer have subscription or not. Therefore we could notify any
* waiter who might be waiting for a invocation of poll to happen even if the consumer does not have subscription
* after calling {@link SharedKafkaConsumer#unSubscribe(String, int)}.
* after calling {@link SharedKafkaConsumer#unSubscribe(PubSubTopicPartition)}.
*/
pollTimes++;
if (waitingForPoll.get()) {
Expand Down Expand Up @@ -347,9 +369,4 @@ public Long endOffset(PubSubTopicPartition pubSubTopicPartition) {
public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic topic) {
throw new UnsupportedOperationException("partitionsFor is not supported in SharedKafkaConsumer");
}

// Test only
public void setNextPollTimeOutSeconds(long seconds) {
this.nextPollTimeOutSeconds = seconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class StoreAwarePartitionWiseKafkaConsumerService extends PartitionWiseKa
final MetricsRepository metricsRepository,
final String kafkaClusterAlias,
final long sharedConsumerNonExistingTopicCleanupDelayMS,
final long consumerWaitAfterUnsubscribeTimeoutMs,
final TopicExistenceChecker topicExistenceChecker,
final boolean liveConfigBasedKafkaThrottlingEnabled,
final PubSubMessageDeserializer pubSubDeserializer,
Expand All @@ -57,6 +58,7 @@ public class StoreAwarePartitionWiseKafkaConsumerService extends PartitionWiseKa
metricsRepository,
kafkaClusterAlias,
sharedConsumerNonExistingTopicCleanupDelayMS,
consumerWaitAfterUnsubscribeTimeoutMs,
topicExistenceChecker,
liveConfigBasedKafkaThrottlingEnabled,
pubSubDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class TopicWiseKafkaConsumerService extends KafkaConsumerService {
final MetricsRepository metricsRepository,
final String kafkaClusterAlias,
final long sharedConsumerNonExistingTopicCleanupDelayMS,
final long consumerWaitAfterUnsubscribeTimeoutMs,
final TopicExistenceChecker topicExistenceChecker,
final boolean liveConfigBasedKafkaThrottlingEnabled,
final PubSubMessageDeserializer pubSubDeserializer,
Expand All @@ -63,6 +64,7 @@ public class TopicWiseKafkaConsumerService extends KafkaConsumerService {
metricsRepository,
kafkaClusterAlias,
sharedConsumerNonExistingTopicCleanupDelayMS,
consumerWaitAfterUnsubscribeTimeoutMs,
topicExistenceChecker,
liveConfigBasedKafkaThrottlingEnabled,
pubSubDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ public void testKafkaConsumerServiceResubscriptionConcurrency() throws Exception
mockMetricsRepository,
"test_kafka_cluster_alias",
TimeUnit.MINUTES.toMillis(1),
KafkaConsumerService.DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS,
mock(TopicExistenceChecker.class),
false,
pubSubDeserializer,
Expand Down Expand Up @@ -614,7 +615,8 @@ private Runnable getResubscriptionRunnableFor(
consumerServiceDelegator
.startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, 0, consumedDataReceiver);
// Avoid wait time here to increase the chance for race condition.
consumerServiceDelegator.assignConsumerFor(versionTopic, pubSubTopicPartition).setNextPollTimeOutSeconds(0);
consumerServiceDelegator.assignConsumerFor(versionTopic, pubSubTopicPartition)
.setWaitAfterUnsubscribeTimeoutMs(0L);
int versionNum =
Version.parseVersionFromKafkaTopicName(partitionReplicaIngestionContext.getVersionTopic().getName());
if (versionNum % 3 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void testTopicWiseGetConsumer() throws Exception {
mockMetricsRepository,
testKafkaClusterAlias,
TimeUnit.MINUTES.toMillis(1),
KafkaConsumerService.DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS,
mock(TopicExistenceChecker.class),
false,
pubSubDeserializer,
Expand Down Expand Up @@ -240,6 +241,7 @@ private KafkaConsumerService getKafkaConsumerServiceWithSingleConsumer(
mockMetricsRepository,
"test_kafka_cluster_alias",
TimeUnit.MINUTES.toMillis(1),
KafkaConsumerService.DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS,
mock(TopicExistenceChecker.class),
false,
pubSubDeserializer,
Expand Down Expand Up @@ -294,6 +296,7 @@ public void testTopicWiseGetConsumerForHybridMode() throws Exception {
mockMetricsRepository,
"test_kafka_cluster_alias",
TimeUnit.MINUTES.toMillis(1),
KafkaConsumerService.DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS,
mock(TopicExistenceChecker.class),
false,
pubSubDeserializer,
Expand Down Expand Up @@ -394,6 +397,7 @@ public void testPartitionWiseGetConsumer() {
mockMetricsRepository,
"test_kafka_cluster_alias",
TimeUnit.MINUTES.toMillis(1),
KafkaConsumerService.DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS,
mock(TopicExistenceChecker.class),
false,
pubSubDeserializer,
Expand Down Expand Up @@ -515,6 +519,7 @@ public void testGetMaxElapsedTimeMSSinceLastPollInConsumerPool() {
mockMetricsRepository,
"test_kafka_cluster_alias",
TimeUnit.MINUTES.toMillis(1),
KafkaConsumerService.DEFAULT_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS,
mock(TopicExistenceChecker.class),
false,
pubSubDeserializer,
Expand Down
Loading