Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add wrapper around AsyncProcessor to enable async processing engine only with async threads configured #306

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

ableegoldman
Copy link
Contributor

Add a wrapper around the AsyncProcessor class and only instantiate it if the user has configured a nonzero thread pool size. This allows turning async on/off with just a config, without needing to add/remove the wrapper around the ProcessorSupplier as well.

@ableegoldman ableegoldman requested a review from rodesai June 7, 2024 03:53
Comment on lines +71 to +80
private final String logPrefix;
private final Logger log;

private final String streamThreadName;
private final String asyncProcessorName;
private final TaskId taskId;

private final AsyncThreadPool threadPool;
private final SchedulingQueue<KIn> schedulingQueue;
private final FinalizingQueue finalizingQueue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing about this change is that we only instantiate the AsyncProcessor after the processor is initialized, so we can actually make all these variables final. I was hoping to do this anyway

@@ -69,12 +66,18 @@
* -Coordinates the handoff of records between the StreamThread and AyncThreads
* -The starting and ending point of all async events -- see {@link AsyncEvent}
*/
public class AsyncProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VIn, KOut, VOut>, FixedKeyProcessor<KIn, VIn, VOut> {
public class AsyncProcessor<KIn, VIn, KOut, VOut> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this no longer implements the Processor interfaces, I think I'll rename it to "AsyncProcessingEngine" or something to that effect (suggestions welcome). And then rename the new wrapper class that creates/delegates to this class to "AsyncProcessor". But I'll wait until the PR has been reviewed/approved to do the renamings since that might impact a bunch of code

* 1. deduplicating and delegating code between the regular and fixed-key processor versions
* 2. disabling async processing altogether if the thread pool size is 0
*/
public class MaybeAsyncProcessor<KIn, VIn, KOut, VOut>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not the final name for this -- plan to rename it to "AsyncProcessor" or similar. See this comment

public AsyncProcessor<KIn, VIn, KIn, VOut> get() {
return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
public MaybeAsyncProcessor<KIn, VIn, KIn, VOut> get() {
return MaybeAsyncProcessor.createFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the one thing I haven't implemented yet is that we initialize the store builders as AsyncStoreBuilders right from the start. I still need to push that back so that we don't end up wrapping the user's store suppleirs in the async stuff if that's not enabled

@ableegoldman
Copy link
Contributor Author

Shelving this for now since the approach becomes rather messy once we fix the AsyncStoreBuilder issue (essentially we'll need to provide wrapper classes for every single async store type, which are already wrappers for each of the actual StateStore types)

Will probably pick this back up again at some point in the future, but in the meantime we advise users to simply branch their DSL operators based on the configuration in their own code. For example:

boolean enableAsync = asyncThreadPoolSize > 0;
if (enableAsync) {
    kstream
        .process(createAsyncProcessorSupplier(new MyProcessorSupplier))
        .to(OUTPUT_TOPIC);
} else {
    kstream
        .process(new MyProcessorSupplier)
        .to(OUTPUT_TOPIC);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant