Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Automatic async wrapping for stateful processors #392

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka-client-bootstrap/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ plugins {
id("responsive.docker")
}

repositories {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to put this into all the build.gradle files so I could compile from my local AK branch. I can remove all of these before merging if anyone feels strongly about it, but if it doesn't do any harm then I'd rather keep these for the next time I'm working off of a dev branch

mavenLocal()
}

application {
mainClass.set("dev.responsive.kafka.bootstrap.main.Main")
}
Expand Down
4 changes: 4 additions & 0 deletions kafka-client-examples/e2e-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ plugins {
id("responsive.docker")
}

repositories {
mavenLocal()
}

application {
mainClass.set("dev.responsive.examples.e2etest.Main")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
Expand Down Expand Up @@ -70,7 +71,7 @@ protected Topology buildTopology() {
}

builder.stream(ORDERS, Consumed.with(Serdes.String(), RegressionSchema.orderSerde()))
.transform(BatchTransformer::new, "grouped-orders-store")
.process(BatchTransformer::new, "grouped-orders-store")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of async -- related to the 4.0 upgrade

.peek((k, v) -> {
if (responsive) {
final var random = Math.abs(randomGenerator.nextLong() % 10000);
Expand All @@ -84,14 +85,13 @@ protected Topology buildTopology() {
return builder.build();
}

private static class BatchTransformer
implements Transformer<String, Order, KeyValue<String, GroupedOrder>> {
private static class BatchTransformer implements Processor<String, Order, String, GroupedOrder> {

private ProcessorContext context;
private ProcessorContext<String, GroupedOrder> context;
private KeyValueStore<String, StoredOrder> store;

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<String, GroupedOrder> context) {
this.context = context;
this.store = context.getStateStore("grouped-orders-store");
this.context.schedule(
Expand All @@ -102,15 +102,18 @@ public void init(final ProcessorContext context) {
}

@Override
public KeyValue<String, GroupedOrder> transform(final String key, final Order value) {
final long ts = context.timestamp();
public void process(final Record<String, Order> record) {
final long ts = record.timestamp();

// first add the order to the list of orders that are stored
store.put(storedKey(key, ts), new StoredOrder(Optional.of(value), Optional.empty()));
store.put(
storedKey(record.key(), ts),
new StoredOrder(Optional.of(record.value()), Optional.empty())
);

// next, we need to update the tracked metadata row to
// check whether the value ought to be emitted
final String mKey = metaKey(key);
final String mKey = metaKey(record.key());
final StoredOrder.Meta meta = Optional.ofNullable(store.get(mKey))
.orElse(new StoredOrder(Optional.empty(), Optional.of(new StoredOrder.Meta(ts, 0, 0))))
.meta()
Expand All @@ -122,17 +125,15 @@ public KeyValue<String, GroupedOrder> transform(final String key, final Order va
final StoredOrder.Meta newMeta = new StoredOrder.Meta(
ts,
meta.count() + 1,
meta.size() + (long) value.amount()
meta.size() + (long) record.value().amount()
);

if (shouldFlush(newMeta, ts)) {
doFlush(key);
doFlush(record.key());
store.delete(mKey);
} else {
store.put(mKey, new StoredOrder(Optional.empty(), Optional.of(newMeta)));
}

return null;
}

private void flushExpired(long ts) {
Expand Down Expand Up @@ -174,7 +175,7 @@ private void doFlush(final String key) {
"Got stored order with no order! %s".formatted(value))));
}

context.forward(key, result);
context.forward(new Record<>(key, result, 0L));
}
}

Expand Down
8 changes: 6 additions & 2 deletions kafka-client-examples/simple-example/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ application {
mainClass.set("dev.responsive.examples.simpleapp.Main")
}

repositories {
mavenLocal()
}

dependencies {
// todo: how to set the version here?
implementation(project(":kafka-client"))
implementation("com.google.guava:guava:32.1.1-jre")
implementation("org.apache.kafka:kafka-clients:3.4.0")
implementation("org.apache.kafka:kafka-streams:3.4.0")
implementation(libs.kafka.clients)
implementation(libs.kafka.streams)
implementation("io.opentelemetry.javaagent:opentelemetry-javaagent:1.25.0")
implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0")
implementation("org.apache.commons:commons-text:1.10.0")
Expand Down
4 changes: 4 additions & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ plugins {
id("java")
}

repositories {
mavenLocal()
}

/*********** Generated Resources ***********/

val gitCommitId: String by lazy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,16 @@ private static Properties propsWithOverrides(
return propsWithOverrides;
}

final Object o = configs.originals().get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS);
final Object o = configs.originals().get(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of async -- related to the 4.0 upgrade

if (o == null) {
propsWithOverrides.put(
InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
TASK_ASSIGNOR_CLASS_OVERRIDE
);
} else if (!TASK_ASSIGNOR_CLASS_OVERRIDE.equals(o.toString())) {
final String errorMsg = String.format(
"Invalid Streams configuration value for '%s': got %s, expected '%s'",
InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
o,
TASK_ASSIGNOR_CLASS_OVERRIDE
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

/**
Expand All @@ -34,10 +35,10 @@
* documentation on the async processing framework.
*/
public class AsyncFixedKeyProcessorSupplier<KIn, VIn, VOut>
implements FixedKeyProcessorSupplier<KIn, VIn, VOut> {
implements WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> {

private final FixedKeyProcessorSupplier<KIn, VIn, VOut> userProcessorSupplier;
private final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> asyncStoreBuilders;
private Map<String, AbstractAsyncStoreBuilder<?>> asyncStoreBuilders = null;

/**
* Create an AsyncProcessorSupplier that wraps a custom {@link ProcessorSupplier}
Expand All @@ -52,24 +53,31 @@ public class AsyncFixedKeyProcessorSupplier<KIn, VIn, VOut>
public static <KIn, VIn, VOut> AsyncFixedKeyProcessorSupplier<KIn, VIn, VOut> createAsyncProcessorSupplier(
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
) {
return new AsyncFixedKeyProcessorSupplier<>(processorSupplier, processorSupplier.stores());
return new AsyncFixedKeyProcessorSupplier<>(processorSupplier);
}

private AsyncFixedKeyProcessorSupplier(
final FixedKeyProcessorSupplier<KIn, VIn, VOut> userProcessorSupplier,
final Set<StoreBuilder<?>> userStoreBuilders
final FixedKeyProcessorSupplier<KIn, VIn, VOut> userProcessorSupplier
) {
this.userProcessorSupplier = userProcessorSupplier;
this.asyncStoreBuilders = initializeAsyncBuilders(userStoreBuilders);
}

@Override
public AsyncProcessor<KIn, VIn, KIn, VOut> get() {
maybeInitializeAsyncStoreBuilders();

return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
}

@Override
public Set<StoreBuilder<?>> stores() {
maybeInitializeAsyncStoreBuilders();
return new HashSet<>(asyncStoreBuilders.values());
}

private void maybeInitializeAsyncStoreBuilders() {
if (asyncStoreBuilders == null) {
asyncStoreBuilders = initializeAsyncBuilders(userProcessorSupplier.stores());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;

Expand Down Expand Up @@ -114,10 +115,11 @@
* {@link Processor} and requires at least one state store be connected.
*/
public final class AsyncProcessorSupplier<KIn, VIn, KOut, VOut>
implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
implements WrappedProcessorSupplier<KIn, VIn, KOut, VOut> {

private final ProcessorSupplier<KIn, VIn, KOut, VOut> userProcessorSupplier;
private final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> asyncStoreBuilders;

private Map<String, AbstractAsyncStoreBuilder<?>> asyncStoreBuilders = null;

/**
* Create an AsyncProcessorSupplier that wraps a custom {@link ProcessorSupplier}
Expand All @@ -132,32 +134,30 @@ public final class AsyncProcessorSupplier<KIn, VIn, KOut, VOut>
public static <KIn, VIn, KOut, VOut> AsyncProcessorSupplier<KIn, VIn, KOut, VOut> createAsyncProcessorSupplier(
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return new AsyncProcessorSupplier<>(processorSupplier, processorSupplier.stores());
return new AsyncProcessorSupplier<>(processorSupplier);
}

private AsyncProcessorSupplier(
final ProcessorSupplier<KIn, VIn, KOut, VOut> userProcessorSupplier,
final Set<StoreBuilder<?>> userStoreBuilders
final ProcessorSupplier<KIn, VIn, KOut, VOut> userProcessorSupplier
) {
if (userStoreBuilders == null || userStoreBuilders.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to KIP-1112 -- this check was supposed to be deleted already but I must have missed it before (the corresponding check in FixedKeyProcessorSupplier was correctly removed at that time)

throw new UnsupportedOperationException(
"Async processing currently requires at least one state store be "
+ "connected to the async processor, and that stores be connected "
+ "by implementing the #stores method in your processor supplier");
}

this.userProcessorSupplier = userProcessorSupplier;
this.asyncStoreBuilders = initializeAsyncBuilders(userStoreBuilders);
}

@Override
public AsyncProcessor<KIn, VIn, KOut, VOut> get() {
maybeInitializeAsyncStoreBuilders();
return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
}

@Override
public Set<StoreBuilder<?>> stores() {
maybeInitializeAsyncStoreBuilders();
return new HashSet<>(asyncStoreBuilders.values());
}

private void maybeInitializeAsyncStoreBuilders() {
if (asyncStoreBuilders == null) {
asyncStoreBuilders = initializeAsyncBuilders(userProcessorSupplier.stores());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to delay the initialization of the async store builders because the call to the inner processor's #stores method might return different things at different points in the build process (for example, if a downstream operator forces materialization, the #stores will return null/empty until that downstream operator is processed)

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.api.async;

import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;

// TODO(sophie): only wrap when stores are Responsive (rather than eg rocksdb or in-memory)
public class AsyncProcessorWrapper implements ProcessorWrapper {

@Override
public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(
final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
final var stores = processorSupplier.stores();
if (stores != null && !stores.isEmpty()) {
return AsyncProcessorSupplier.createAsyncProcessorSupplier(processorSupplier);
} else {
return ProcessorWrapper.asWrapped(processorSupplier);
}
}

@Override
public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(
final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
) {
final var stores = processorSupplier.stores();
if (stores != null && !stores.isEmpty()) {
return AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier(processorSupplier);
} else {
return ProcessorWrapper.asWrappedFixedKey(processorSupplier);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class AsyncProcessor<KIn, VIn, KOut, VOut>
private final Processor<KIn, VIn, KOut, VOut> userProcessor;
private final FixedKeyProcessor<KIn, VIn, VOut> userFixedKeyProcessor;

private final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders;
private final Map<String, AbstractAsyncStoreBuilder<?>> connectedStoreBuilders;

// Tracks all pending events, ie from moment of creation to end of finalization/transition
// to "DONE" state. Used to make sure all events are flushed during a commit.
Expand Down Expand Up @@ -115,14 +115,14 @@ public class AsyncProcessor<KIn, VIn, KOut, VOut>

public static <KIn, VIn, KOut, VOut> AsyncProcessor<KIn, VIn, KOut, VOut> createAsyncProcessor(
final Processor<KIn, VIn, KOut, VOut> userProcessor,
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders
final Map<String, AbstractAsyncStoreBuilder<?>> connectedStoreBuilders
) {
return new AsyncProcessor<>(userProcessor, null, connectedStoreBuilders);
}

public static <KIn, VIn, VOut> AsyncProcessor<KIn, VIn, KIn, VOut> createAsyncFixedKeyProcessor(
final FixedKeyProcessor<KIn, VIn, VOut> userProcessor,
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders
final Map<String, AbstractAsyncStoreBuilder<?>> connectedStoreBuilders
) {
return new AsyncProcessor<>(null, userProcessor, connectedStoreBuilders);
}
Expand All @@ -135,7 +135,7 @@ public static <KIn, VIn, VOut> AsyncProcessor<KIn, VIn, KIn, VOut> createAsyncFi
private AsyncProcessor(
final Processor<KIn, VIn, KOut, VOut> userProcessor,
final FixedKeyProcessor<KIn, VIn, VOut> userFixedKeyProcessor,
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders
final Map<String, AbstractAsyncStoreBuilder<?>> connectedStoreBuilders
) {
this.userProcessor = userProcessor;
this.userFixedKeyProcessor = userFixedKeyProcessor;
Expand Down Expand Up @@ -377,10 +377,10 @@ public void close() {
private static void registerFlushListenerForStoreBuilders(
final String streamThreadName,
final int partition,
final Collection<AbstractAsyncStoreBuilder<?, ?, ?>> asyncStoreBuilders,
final Collection<AbstractAsyncStoreBuilder<?>> asyncStoreBuilders,
final AsyncFlushListener flushPendingEvents
) {
for (final AbstractAsyncStoreBuilder<?, ?, ?> builder : asyncStoreBuilders) {
for (final AbstractAsyncStoreBuilder<?> builder : asyncStoreBuilders) {
builder.registerFlushListenerWithAsyncStore(streamThreadName, partition, flushPendingEvents);
}
}
Expand Down Expand Up @@ -738,7 +738,7 @@ private boolean isCleared() {
*/
private void verifyConnectedStateStores(
final Map<String, AsyncKeyValueStore<?, ?>> accessedStores,
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStores
final Map<String, AbstractAsyncStoreBuilder<?>> connectedStores
) {
if (!accessedStores.keySet().equals(connectedStores.keySet())) {
log.error(
Expand Down
Loading