-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] Automatic async wrapping for stateful processors #392
base: main
Are you sure you want to change the base?
Conversation
@@ -15,6 +15,10 @@ plugins { | |||
id("responsive.docker") | |||
} | |||
|
|||
repositories { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to put this into all the build.gradle files so I could compile from my local AK branch. I can remove all of these before merging if anyone feels strongly about it, but if it doesn't do any harm then I'd rather keep these for the next time I'm working off of a dev branch
@@ -70,7 +71,7 @@ protected Topology buildTopology() { | |||
} | |||
|
|||
builder.stream(ORDERS, Consumed.with(Serdes.String(), RegressionSchema.orderSerde())) | |||
.transform(BatchTransformer::new, "grouped-orders-store") | |||
.process(BatchTransformer::new, "grouped-orders-store") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not part of async -- related to the 4.0 upgrade
@@ -284,16 +284,16 @@ private static Properties propsWithOverrides( | |||
return propsWithOverrides; | |||
} | |||
|
|||
final Object o = configs.originals().get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS); | |||
final Object o = configs.originals().get(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not part of async -- related to the 4.0 upgrade
) { | ||
if (userStoreBuilders == null || userStoreBuilders.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to KIP-1112 -- this check was supposed to be deleted already but I must have missed it before (the corresponding check in FixedKeyProcessorSupplier was correctly removed at that time)
return new HashSet<>(asyncStoreBuilders.values()); | ||
} | ||
|
||
private void maybeInitializeAsyncStoreBuilders() { | ||
if (asyncStoreBuilders == null) { | ||
asyncStoreBuilders = initializeAsyncBuilders(userProcessorSupplier.stores()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to delay the initialization of the async store builders because the call to the inner processor's #stores method might return different things at different points in the build process (for example, if a downstream operator forces materialization, the #stores will return null/empty until that downstream operator is processed)
final String processorName, | ||
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier | ||
) { | ||
return AsyncProcessorSupplier.createAsyncProcessorSupplier(processorSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we really want to wrap every processor as async -- WDYT about only wrapping stateful operators by default?
(We can of course still allow users to inject their own custom async processors if they have some heavier processing without state like an RPC)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking we'd leverage configure()
to allow us to plug into exactly this consideration. Not sure how we want to expose that as an API, but we can figure that out later. For now we can do just stateful ones. Like you said, if it's something stateless they want to make async it's pretty easy to write a processor for it manually.
@@ -182,7 +183,8 @@ static SingletonConsumerRecords of(final ConsumerRecords<byte[], byte[]> records | |||
public SingletonConsumerRecords( | |||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records | |||
) { | |||
super(records); | |||
super(records, Collections.emptyMap()); | |||
// TODO(sophie): need to pass in the actual next offsets here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to the upgrade -- a recent KIP added offset tracking to the consumer records, haven't looked into it much yet but I suspect Streams may be using this in some way and so we might need to actually implement this
import org.apache.kafka.streams.state.StoreBuilder; | ||
import org.apache.kafka.streams.state.TimestampedBytesStore; | ||
|
||
public class DelayedAsyncStoreBuilder<K, V, T extends StateStore> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class is basically where all the nasty hacky stuff is confined to. For now there are two main culprits:
- checking and unwrapping the
FactoryWrappingStoreBuilder
, an internal AK class I had to add while converting DSL operators to use #stores, so that the DSLStoreSuppliers are resolved correctly - some "light" reflection to grab the storeSupplier from a storeBuilder instance
This PR is mainly a POC to show that it can be done -- we'll need to wait for 4.0 to be released before merging this. It also includes the changes we'll need to make for that upgrade (eg removal of deprecated APIs)
Until we can merge and release this new feature, it has to be run on top of my WIP KIP-1112 branch (here).
Remaining work beyond this POC:
-AK: converting all of the other stateful DSL operators to implement the ProcessorSupplier#stores method
-Responsive: fill in the async store builders with all the store types (eg versioned stores, window and session stores)