-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[controller][server][vpj][samza][producer] Use lazy initialization of producer in VeniceWriter #713
Conversation
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except above clarification.
OPEN_VENICE_WRITER_COUNT.decrementAndGet(); | ||
} catch (Exception e) { | ||
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); | ||
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); | ||
} | ||
threadPoolExecutor.shutdown(); | ||
getThreadPoolExecutor().shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might need a bit improvement, shutdown
only makes sure no new tasks will be accepted. Take a look at our conventions for close a thread pool, e.g., VeniceParentHelixAdmin.close
or somewhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean use shutdownNow to interrupt the running threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, shutdownnow + awaitTermination for a limited timeout. does it make any sense?
OPEN_VENICE_WRITER_COUNT.decrementAndGet(); | ||
} catch (Exception e) { | ||
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); | ||
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); | ||
} | ||
threadPoolExecutor.shutdown(); | ||
getThreadPoolExecutor().shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
@@ -87,7 +88,7 @@ public class VeniceWriter<K, V, U> extends AbstractVeniceWriter<K, V, U> { | |||
new ChunkedPayloadAndManifest(null, null); | |||
|
|||
// use for running async close and to fetch number of partitions with timeout from producer | |||
private final ThreadPoolExecutor threadPoolExecutor; | |||
private final Lazy<ThreadPoolExecutor> threadPoolExecutorLazy = Lazy.of(this::createThreadPoolExecutor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make this static? Do we really want one thread pool per VW? We can have a lot of VW per server...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But with single pool, you won't be able to limit the thread count per VW? Or is it still possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit would need to be higher, but do we really need some per-VW threads? If we have 1K VWs in a server, do we really want 2K threads spread across 1K TPs? Would it be enough to have a single static TP with, e.g., max 100 threads, and other tasks get queued if we tap out all threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yeah that's probably more reasonable.
I thought the 2 per VW is required for XInfra purpose. Maybe @sushantmane knows better about this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I was thinking about having just one ThreadPool for all VWs. But I wanted let callers's have some control over the concurrency of close operations which is where this thread pool is primarily used. Making this a static pool would mean that the concurrency isn't fully controlled by the callers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see... so I guess it could be fine then...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FelixGV - Should we merge this PR then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm changing the scope of this PR slightly:
- Keep ThreadPoolExecutor per VW for now
- Keep lazy init of PubSubProducer and ThreadPool
- Close segments concurrently (VW::endAllSegments)
Do let me know if there are other things that I should address. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update PR with the above changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I left another review.
Since this touches VW, can you update the components affected to include everything that uses VW? (or use all) since probably only routers, tc and fc are excluded controller, server, dvc, vpj, samza, producer, ... |
726638b
to
04e94b3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments. Thanks for working on this.
@@ -41,6 +43,7 @@ public <K, V, U> VeniceWriter<K, V, U> createVeniceWriter(VeniceWriterOptions op | |||
return new VeniceWriter<>( | |||
options, | |||
props, | |||
producerAdapterFactory.create(props, options.getTopicName(), options.getBrokerAddress())); | |||
Lazy.of(() -> producerAdapterFactory.create(props, options.getTopicName(), options.getBrokerAddress())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about this... have you found specific code paths that were instantiating a VW
but then not actually using it to produce anything?
In many places, we are already wrapping the VW
into Lazy
, or else use other patterns of lazy init (e.g. computeIfAbsent
). So now we'll be doing double-lazy wrapping in many places. In theory it's fine but it seems a bit dirty to me... maybe the clean thing to do is to remove the "outer" lazy wrapping and just keep this inner one, or perhaps to make the remaining unwrapped VW
into Lazy<VW>
(and avoid the inner lazy). I don't have a strong opinion either way but I feel like it's something we should consider.
Anyway, I don't mean to say you should embark on a big refactoring in this PR, but I am interested in hearing your opinion on the above... I imagine you looked at this code more recently than I did if you're doing this, so maybe you caught some interesting context IDK about along the way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I just double-checked the code, and, except for two place (and none of them are in server startup path), we're indeed initializing VW
only when we need it. One trigger to add lazy initialization for VW's producer was based on hazy log reading, where I saw producers being one after another during startup . I should have properly checked the code before adding this code.
Will update the PR to remove lazy init. Thanks!
...e-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/MetaSystemStoreTest.java
Show resolved
Hide resolved
private PubSubProducerAdapter getProducerAdapter() { | ||
return producerAdapterLazy.get(); | ||
} | ||
|
||
private ThreadPoolExecutor getThreadPoolExecutor() { | ||
return threadPoolExecutorLazy.get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think personally I would prefer not hiding usages of Lazy
in this way. I prefer letting the producerAdapterLazy.get()
be present at the call site, as it gives maintainers a nudge to think about whether they really want to initialize the Lazy
instance. In some cases, it may be desirable to only use the lazy if it's already initialized, by calling ifPresent(instance -> { ... })
on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Will update the PR.
getProducerAdapter().close(topicName, closeTimeOutInMs, gracefulClose); | ||
OPEN_VENICE_WRITER_COUNT.decrementAndGet(); | ||
} catch (Exception e) { | ||
logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); | ||
VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); | ||
} | ||
threadPoolExecutor.shutdown(); | ||
getThreadPoolExecutor().shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap inside ifPresent
rather than doing a late initialization just for the sake of tearing it down right after...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Thanks
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java
Show resolved
Hide resolved
I'm gonna put it in draft mode because I want to take some time to think it over and make the PR better. |
PR #720 has substantially reduced the number of MetaStore VW's per host. I'm considering updating the PR based on Felix's suggestion to use a single thread pool for all VW's instead of a thread pool per VW. I would also like to consider sending CMs concurrently. |
Closing this PR for now. I would like to complete it at some point.. |
Use lazy initialization of producer in VeniceWriter
My old PR #224 removed lazy producer init in VWFactory.
Bringing it back.
How was this PR tested?
CI
Does this PR introduce any user-facing changes?