-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[controller][server] TopicManager refactoring and performance improvements #743
Conversation
fe8844f
to
08a38ac
Compare
51c47fb
to
a839615
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments, all minor... looks great so far. I still have about half to review.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
Show resolved
Hide resolved
...-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java
Outdated
Show resolved
Hide resolved
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManagerStats.java
Outdated
Show resolved
Hide resolved
...l/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManagerRepository.java
Outdated
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Outdated
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Outdated
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Show resolved
Hide resolved
...venice-common/src/test/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcherTest.java
Show resolved
Hide resolved
.../integrationTest/java/com/linkedin/venice/pubsub/api/producer/PubSubProducerAdapterTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work and it makes a lot of sense to me to have a centralized place/class to coordinate the work of getting offset. Thank you @sushantmane!
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished first pass... a few more comments. Nothing dramatic, LG overall.
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...s/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormat.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/PubSubConstants.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubConsumerAdapter.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubConsumerAdapter.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
...l/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManagerRepository.java
Outdated
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Outdated
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Outdated
Show resolved
Hide resolved
c1bf1a5
to
9c02126
Compare
9c02126
to
aed9c6a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one minor comment... probably good to go very soon! Thanks for continuing to iterate on this big change!
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great overall, and left some minor comments.
...-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Outdated
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManager.java
Show resolved
Hide resolved
...nal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicMetadataFetcher.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/manager/TopicManagerStats.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Sushant! Let's take this monster PR across the finish line 🚀🚢 !!!!!
Thanks a lot @FelixGV, @lluwm, and @gaojieliu for the review, discussions, and help in improving this PR. 🙏 🚀 |
TopicManager Refactoring and Performance Improvements
Currently, we have three components that work together to fetch topic metadata,
such as
endOffset
and the timestamp of the last produced message. Thesecomponents are
TopicManager
,PartitionOffsetFetcher
, andCachedPubSubMetadataGetter
. We create oneTopicManager
instance per brokeraddress/region, which is shared by all the components in the system.
PartitionOffsetFetcher
is a part of the topic manager. There is one instance ofCachedPubSubMetadataGetter
perStoreIngestionTask
, which makes use ofTopicManager
andPartitionOffsetFetcher
to retrieve topic metadata.PartitionOffsetFetcher
uses pubsub admin and consumer clients to fetch offsets onbehalf of components like
CachedPubSubMetadataGetter
and other components. EachPartitionOffsetFetcher
has just one instance of a consumer, which is protectedwith a consumer mutex since consumers are not thread-safe. This means that
PartitionOffsetFetcher
can handle only one request at a time. The consumer mutexis coarse-grained, which means it holds the lock even when making calls with the
admin client, which is unnecessary since admin clients are thread-safe.
Also, when the mutex is held for topics that do not exist or have metadata fetching
issues, we perform retries while holding the mutex. This results in the mutex being
held for a relatively longer time when there is an issue with one topic, directly
causing delays for other requests waiting for the lock. We have often observed high
contention on this mutex, leading to several issues.
The
CachedPubSubMetadataGetter
fetches offsets asynchronously usingTopicManager
/PartitionOffsetFetcher
, and caches them in a local cache. Theseasynchronous requests are initiated when an entry is either not in the cache or
when it is stale. Currently, these asynchronous requests are served by the ForkJoin
common thread pool.
When there is an issue like a non-existent topic, we see high contention on the
consumer mutex, as discussed earlier. This results in ForkJoin threads being
blocked on the consumer lock for longer periods. When all the ForkJoin common
thread pools get blocked, other components in the system that rely on them are
starved, which affects their functionality.
To enhance the resiliency of
TopicManager
in the face of failures, this PRintroduces the following high-level changes:
Admin operations during metadata fetch are now performed without holding any lock.
This prevents bad topics from acquiring the consumer in the first place and reduces
the chances of contention when getting the consumer.
A configurable fetcher consumer pool is introduced for each topic manager. This
allows us to process requests with other consumers even if one is busy.
We have discontinued the use of the ForkJoin common thread pool for submitting
async fetch offset requests. Instead, dedicated fetcher thread pools are used for this purpose.
Miscellaneous
pubsub.topic.manager.metadata.fetcher.consumer.pool.size
- A config to specify number of PubSub consumer clients to use per topic manager (default: 2)pubsub.topic.manager.metadata.fetcher.thread.pool.size
- Controls the size of per TM thread-pool used for get offset async requests (default: 2).TopicManagerRepository.SSLPropertiesSupplier
has been extracted intoPubSubPropertiesSupplier
.CachedPubSubMetadataGetter
andPartitionOffsetFetcher
have been merged to formTopicManagerMetadataFetcher
.getOffsetLagFor
inAggKafkaConsumerService
has been renamed togetConsumerLagBasedOnMetrics
.getPartitionOffsetLag
inActiveActiveStoreIngestionTask
has been renamed togetPartitionOffsetLagBasedOnMetrics
.TopicManagerContext
has been created/extracted fromTopicManagerRepository.builder()
to pass the required dependencies forTopicManagerRepository
.How was this PR tested?
CI, frankenwar testing
Does this PR introduce any user-facing changes?