Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller][server] TopicManager refactoring and performance improvements #743

Merged
merged 8 commits into from
Feb 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import static com.linkedin.venice.ConfigKeys.MIN_CONSUMER_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.OFFSET_LAG_DELTA_RELAX_FACTOR_FOR_FAST_ONLINE_TRANSITION_IN_RESTART;
import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS;
import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_CONSUMER_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_PRODUCER_ADAPTER_FACTORY_CLASS;
Expand Down Expand Up @@ -131,6 +133,7 @@
import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED;
import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.UNSORTED_INPUT_DRAINER_SIZE;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE;

import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory;
import com.linkedin.davinci.kafka.consumer.KafkaConsumerService;
Expand Down Expand Up @@ -271,6 +274,9 @@ public class VeniceServerConfig extends VeniceClusterConfig {
*/
private final int topicOffsetCheckIntervalMs;

private final int topicManagerMetadataFetcherConsumerPoolSize;
private final int topicManagerMetadataFetcherThreadPoolSize;

/**
* Graceful shutdown period.
* Venice SN needs to explicitly do graceful shutdown since Netty's graceful shutdown logic
Expand Down Expand Up @@ -511,6 +517,11 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
maxRequestSize = (int) serverProperties.getSizeInBytes(SERVER_MAX_REQUEST_SIZE, 256 * 1024);
topicOffsetCheckIntervalMs =
serverProperties.getInt(SERVER_SOURCE_TOPIC_OFFSET_CHECK_INTERVAL_MS, (int) TimeUnit.SECONDS.toMillis(60));
this.topicManagerMetadataFetcherConsumerPoolSize = serverProperties.getInt(
PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE,
PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE);
this.topicManagerMetadataFetcherThreadPoolSize = serverProperties
.getInt(PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE, topicManagerMetadataFetcherConsumerPoolSize);
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
nettyGracefulShutdownPeriodSeconds = serverProperties.getInt(SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS, 30);
nettyWorkerThreadCount = serverProperties.getInt(SERVER_NETTY_WORKER_THREADS, 0);
grpcWorkerThreadCount =
Expand Down Expand Up @@ -1358,4 +1369,12 @@ public boolean isDedicatedConsumerPoolForAAWCLeaderEnabled() {
public int getDedicatedConsumerPoolSizeForAAWCLeader() {
return dedicatedConsumerPoolSizeForAAWCLeader;
}

public int getTopicManagerMetadataFetcherConsumerPoolSize() {
return topicManagerMetadataFetcherConsumerPoolSize;
}

public int getTopicManagerMetadataFetcherThreadPoolSize() {
return topicManagerMetadataFetcherThreadPoolSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.kafka.partitionoffset.PartitionOffsetFetcherImpl.DEFAULT_KAFKA_OFFSET_API_TIMEOUT;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_OFFSET_API_TIMEOUT_DURATION_DEFAULT_VALUE;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.davinci.callback.BytesStreamingCallback;
Expand Down Expand Up @@ -147,7 +147,8 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep
}

PubSubTopicPartition topicPartition = getTopicPartition(partition);
Long earliestOffset = pubSubConsumer.beginningOffset(topicPartition, DEFAULT_KAFKA_OFFSET_API_TIMEOUT);
Long earliestOffset =
pubSubConsumer.beginningOffset(topicPartition, PUBSUB_OFFSET_API_TIMEOUT_DURATION_DEFAULT_VALUE);
VeniceChangeCoordinate earliestCheckpoint = earliestOffset == null
? null
: new VeniceChangeCoordinate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public abstract void startConsumptionIntoDataReceiver(
long lastReadOffset,
ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver);

public abstract long getOffsetLagFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition);
public abstract long getOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition);

public abstract long getLatestOffsetFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition);
public abstract long getLatestOffsetBasedOnMetrics(
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.linkedin.venice.exceptions.PersistenceFailureException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
Expand All @@ -43,6 +42,7 @@
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubConstants;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
Expand Down Expand Up @@ -876,7 +876,7 @@ private long calculateRewindStartTime(PartitionConsumptionState partitionConsump
long rewindTimeInMs = hybridStoreConfig.get().getRewindTimeInSeconds() * Time.MS_PER_SECOND;
if (isDataRecovery) {
// Override the user rewind if the version is under data recovery to avoid data loss when user have short rewind.
rewindTimeInMs = Math.max(TopicManager.BUFFER_REPLAY_MINIMAL_SAFETY_MARGIN, rewindTimeInMs);
rewindTimeInMs = Math.max(PubSubConstants.BUFFER_REPLAY_MINIMAL_SAFETY_MARGIN, rewindTimeInMs);
}
switch (hybridStoreConfig.get().getBufferReplayPolicy()) {
case REWIND_FROM_SOP:
Expand Down Expand Up @@ -1107,7 +1107,7 @@ protected boolean processTopicSwitch(
try {
PubSubTopicPartition newSourceTP = new PubSubTopicPartitionImpl(newSourceTopic, newSourceTopicPartition);
upstreamStartOffset =
getTopicManager(sourceKafkaURL.toString()).getPartitionOffsetByTime(newSourceTP, rewindStartTimestamp);
getTopicManager(sourceKafkaURL.toString()).getOffsetByTime(newSourceTP, rewindStartTimestamp);
numberOfContactedBrokers.getAndIncrement();
} catch (Exception e) {
// TODO: Catch more specific Exception?
Expand Down Expand Up @@ -1398,7 +1398,7 @@ public long getRegionHybridOffsetLag(int regionId) {
// Consumer might not existed after the consumption state is created, but before attaching the corresponding
// consumer.
long offsetLagOptional =
getPartitionOffsetLag(kafkaSourceAddress, currentLeaderTopic, pcs.getUserPartition());
getPartitionOffsetLagBasedOnMetrics(kafkaSourceAddress, currentLeaderTopic, pcs.getUserPartition());
if (offsetLagOptional >= 0) {
return offsetLagOptional;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.linkedin.davinci.ingestion.consumption.ConsumedDataReceiver;
import com.linkedin.davinci.stats.StuckConsumerRepairStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand All @@ -16,6 +15,8 @@
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.pubsub.manager.TopicManagerContext.PubSubPropertiesSupplier;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.DaemonThreadFactory;
Expand Down Expand Up @@ -68,8 +69,8 @@ public class AggKafkaConsumerService extends AbstractVeniceService {
private final Map<String, String> kafkaClusterUrlToAliasMap;
private final Object2IntMap<String> kafkaClusterUrlToIdMap;
private final PubSubMessageDeserializer pubSubDeserializer;
private final TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier;
private final Function<String, String> kafkaClusterUrlResolver;
private final PubSubPropertiesSupplier pubSubPropertiesSupplier;

private final Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping = new VeniceConcurrentHashMap<>();
private ScheduledExecutorService stuckConsumerRepairExecutorService;
Expand All @@ -81,7 +82,7 @@ public class AggKafkaConsumerService extends AbstractVeniceService {

public AggKafkaConsumerService(
final PubSubConsumerAdapterFactory consumerFactory,
TopicManagerRepository.SSLPropertiesSupplier sslPropertiesSupplier,
final PubSubPropertiesSupplier pubSubPropertiesSupplier,
final VeniceServerConfig serverConfig,
final EventThrottler bandwidthThrottler,
final EventThrottler recordsThrottler,
Expand All @@ -107,10 +108,8 @@ public AggKafkaConsumerService(
this.kafkaClusterUrlToIdMap = serverConfig.getKafkaClusterUrlToIdMap();
this.isKafkaConsumerOffsetCollectionEnabled = serverConfig.isKafkaConsumerOffsetCollectionEnabled();
this.pubSubDeserializer = pubSubDeserializer;
this.sslPropertiesSupplier = sslPropertiesSupplier;
this.kafkaClusterUrlResolver = serverConfig.getKafkaClusterUrlResolver();
this.metadataRepository = metadataRepository;

if (serverConfig.isStuckConsumerRepairEnabled()) {
this.stuckConsumerRepairExecutorService = Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory(this.getClass().getName() + "-StuckConsumerRepair"));
Expand All @@ -130,7 +129,7 @@ public AggKafkaConsumerService(
LOGGER.info("Started stuck consumer repair service with checking interval: {} seconds", intervalInSeconds);
}
this.isAAOrWCEnabledFunc = isAAOrWCEnabledFunc;

this.pubSubPropertiesSupplier = pubSubPropertiesSupplier;
LOGGER.info("Successfully initialized AggKafkaConsumerService");
}

Expand Down Expand Up @@ -259,7 +258,7 @@ protected static Runnable getStuckConsumerDetectionAndRepairRunnable(
* @return the {@link KafkaConsumerService} for a specific Kafka bootstrap url,
* or null if there isn't any.
*/
private AbstractKafkaConsumerService getKafkaConsumerService(final String kafkaURL) {
AbstractKafkaConsumerService getKafkaConsumerService(final String kafkaURL) {
AbstractKafkaConsumerService consumerService = kafkaServerToConsumerServiceMap.get(kafkaURL);
if (consumerService == null && kafkaClusterUrlResolver != null) {
consumerService = kafkaServerToConsumerServiceMap.get(kafkaClusterUrlResolver.apply(kafkaURL));
Expand All @@ -276,7 +275,7 @@ private AbstractKafkaConsumerService getKafkaConsumerService(final String kafkaU
*/
public synchronized AbstractKafkaConsumerService createKafkaConsumerService(final Properties consumerProperties) {
String kafkaUrl = consumerProperties.getProperty(KAFKA_BOOTSTRAP_SERVERS);
Properties properties = sslPropertiesSupplier.get(kafkaUrl).toProperties();
Properties properties = pubSubPropertiesSupplier.get(kafkaUrl).toProperties();
consumerProperties.putAll(properties);
if (kafkaUrl == null || kafkaUrl.isEmpty()) {
throw new IllegalArgumentException("Kafka URL must be set in the consumer properties config. Got: " + kafkaUrl);
Expand Down Expand Up @@ -393,24 +392,32 @@ public ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, L

versionTopicStoreIngestionTaskMapping.put(storeIngestionTask.getVersionTopic().getName(), storeIngestionTask);
consumerService.startConsumptionIntoDataReceiver(pubSubTopicPartition, lastOffset, dataReceiver);

TopicManager topicManager = storeIngestionTask.getTopicManager(kafkaURL);
if (topicManager != null) {
// prefetch and cache the latest offset for the topic partition
topicManager.prefetchAndCacheLatestOffset(pubSubTopicPartition);
sushantmane marked this conversation as resolved.
Show resolved Hide resolved
}
return dataReceiver;
}

public long getOffsetLagFor(
public long getOffsetLagBasedOnMetrics(
final String kafkaURL,
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition) {
AbstractKafkaConsumerService consumerService = getKafkaConsumerService(kafkaURL);
return consumerService == null ? -1 : consumerService.getOffsetLagFor(versionTopic, pubSubTopicPartition);
return consumerService == null
? -1
: consumerService.getOffsetLagBasedOnMetrics(versionTopic, pubSubTopicPartition);
}

public long getLatestOffsetFor(
public long getLatestOffsetBasedOnMetrics(
final String kafkaURL,
PubSubTopic versionTopic,
PubSubTopicPartition pubSubTopicPartition) {
AbstractKafkaConsumerService consumerService = getKafkaConsumerService(kafkaURL);
return consumerService == null ? -1 : consumerService.getLatestOffsetFor(versionTopic, pubSubTopicPartition);
return consumerService == null
? -1
: consumerService.getLatestOffsetBasedOnMetrics(versionTopic, pubSubTopicPartition);
}

/**
Expand Down
Loading
Loading