diff --git a/kafka-client-bootstrap/build.gradle.kts b/kafka-client-bootstrap/build.gradle.kts index 4feb388e0..d0099b315 100644 --- a/kafka-client-bootstrap/build.gradle.kts +++ b/kafka-client-bootstrap/build.gradle.kts @@ -15,6 +15,10 @@ plugins { id("responsive.docker") } +repositories { + mavenLocal() +} + application { mainClass.set("dev.responsive.kafka.bootstrap.main.Main") } diff --git a/kafka-client-examples/e2e-test/build.gradle.kts b/kafka-client-examples/e2e-test/build.gradle.kts index a554a07da..b0bb1b1b7 100644 --- a/kafka-client-examples/e2e-test/build.gradle.kts +++ b/kafka-client-examples/e2e-test/build.gradle.kts @@ -15,6 +15,10 @@ plugins { id("responsive.docker") } +repositories { + mavenLocal() +} + application { mainClass.set("dev.responsive.examples.e2etest.Main") } diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java index af9b96b56..6c9a15182 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java @@ -32,9 +32,10 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; @@ -70,7 +71,7 @@ protected Topology buildTopology() { } builder.stream(ORDERS, Consumed.with(Serdes.String(), RegressionSchema.orderSerde())) - .transform(BatchTransformer::new, "grouped-orders-store") + .process(BatchTransformer::new, "grouped-orders-store") .peek((k, v) -> { if (responsive) { final var random = Math.abs(randomGenerator.nextLong() % 10000); @@ -84,14 +85,13 @@ protected Topology buildTopology() { return builder.build(); } - private static class BatchTransformer - implements Transformer> { + private static class BatchTransformer implements Processor { - private ProcessorContext context; + private ProcessorContext context; private KeyValueStore store; @Override - public void init(final ProcessorContext context) { + public void init(final ProcessorContext context) { this.context = context; this.store = context.getStateStore("grouped-orders-store"); this.context.schedule( @@ -102,15 +102,18 @@ public void init(final ProcessorContext context) { } @Override - public KeyValue transform(final String key, final Order value) { - final long ts = context.timestamp(); + public void process(final Record record) { + final long ts = record.timestamp(); // first add the order to the list of orders that are stored - store.put(storedKey(key, ts), new StoredOrder(Optional.of(value), Optional.empty())); + store.put( + storedKey(record.key(), ts), + new StoredOrder(Optional.of(record.value()), Optional.empty()) + ); // next, we need to update the tracked metadata row to // check whether the value ought to be emitted - final String mKey = metaKey(key); + final String mKey = metaKey(record.key()); final StoredOrder.Meta meta = Optional.ofNullable(store.get(mKey)) .orElse(new StoredOrder(Optional.empty(), Optional.of(new StoredOrder.Meta(ts, 0, 0)))) .meta() @@ -122,17 +125,15 @@ public KeyValue transform(final String key, final Order va final StoredOrder.Meta newMeta = new StoredOrder.Meta( ts, meta.count() + 1, - meta.size() + (long) value.amount() + meta.size() + (long) record.value().amount() ); if (shouldFlush(newMeta, ts)) { - doFlush(key); + doFlush(record.key()); store.delete(mKey); } else { store.put(mKey, new StoredOrder(Optional.empty(), Optional.of(newMeta))); } - - return null; } private void flushExpired(long ts) { @@ -174,7 +175,7 @@ private void doFlush(final String key) { "Got stored order with no order! %s".formatted(value)))); } - context.forward(key, result); + context.forward(new Record<>(key, result, 0L)); } } diff --git a/kafka-client-examples/simple-example/build.gradle.kts b/kafka-client-examples/simple-example/build.gradle.kts index 2c07a81e8..112f140b3 100644 --- a/kafka-client-examples/simple-example/build.gradle.kts +++ b/kafka-client-examples/simple-example/build.gradle.kts @@ -19,12 +19,16 @@ application { mainClass.set("dev.responsive.examples.simpleapp.Main") } +repositories { + mavenLocal() +} + dependencies { // todo: how to set the version here? implementation(project(":kafka-client")) implementation("com.google.guava:guava:32.1.1-jre") - implementation("org.apache.kafka:kafka-clients:3.4.0") - implementation("org.apache.kafka:kafka-streams:3.4.0") + implementation(libs.kafka.clients) + implementation(libs.kafka.streams) implementation("io.opentelemetry.javaagent:opentelemetry-javaagent:1.25.0") implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0") implementation("org.apache.commons:commons-text:1.10.0") diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index e61e2b56f..d88cb2102 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -33,6 +33,10 @@ plugins { id("java") } +repositories { + mavenLocal() +} + /*********** Generated Resources ***********/ val gitCommitId: String by lazy { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 9377d4702..4a4da205b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -284,16 +284,16 @@ private static Properties propsWithOverrides( return propsWithOverrides; } - final Object o = configs.originals().get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS); + final Object o = configs.originals().get(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); if (o == null) { propsWithOverrides.put( - InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, TASK_ASSIGNOR_CLASS_OVERRIDE ); } else if (!TASK_ASSIGNOR_CLASS_OVERRIDE.equals(o.toString())) { final String errorMsg = String.format( "Invalid Streams configuration value for '%s': got %s, expected '%s'", - InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, + StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, o, TASK_ASSIGNOR_CLASS_OVERRIDE ); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java index 130f55737..4c9513ceb 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncFixedKeyProcessorSupplier.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier; import org.apache.kafka.streams.state.StoreBuilder; /** @@ -34,10 +35,10 @@ * documentation on the async processing framework. */ public class AsyncFixedKeyProcessorSupplier - implements FixedKeyProcessorSupplier { + implements WrappedFixedKeyProcessorSupplier { private final FixedKeyProcessorSupplier userProcessorSupplier; - private final Map> asyncStoreBuilders; + private Map> asyncStoreBuilders = null; /** * Create an AsyncProcessorSupplier that wraps a custom {@link ProcessorSupplier} @@ -52,24 +53,31 @@ public class AsyncFixedKeyProcessorSupplier public static AsyncFixedKeyProcessorSupplier createAsyncProcessorSupplier( final FixedKeyProcessorSupplier processorSupplier ) { - return new AsyncFixedKeyProcessorSupplier<>(processorSupplier, processorSupplier.stores()); + return new AsyncFixedKeyProcessorSupplier<>(processorSupplier); } private AsyncFixedKeyProcessorSupplier( - final FixedKeyProcessorSupplier userProcessorSupplier, - final Set> userStoreBuilders + final FixedKeyProcessorSupplier userProcessorSupplier ) { this.userProcessorSupplier = userProcessorSupplier; - this.asyncStoreBuilders = initializeAsyncBuilders(userStoreBuilders); } @Override public AsyncProcessor get() { + maybeInitializeAsyncStoreBuilders(); + return createAsyncFixedKeyProcessor(userProcessorSupplier.get(), asyncStoreBuilders); } @Override public Set> stores() { + maybeInitializeAsyncStoreBuilders(); return new HashSet<>(asyncStoreBuilders.values()); } + + private void maybeInitializeAsyncStoreBuilders() { + if (asyncStoreBuilders == null) { + asyncStoreBuilders = initializeAsyncBuilders(userProcessorSupplier.stores()); + } + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java index 33ce0cb1d..e1647f539 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorSupplier.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -114,10 +115,11 @@ * {@link Processor} and requires at least one state store be connected. */ public final class AsyncProcessorSupplier - implements ProcessorSupplier { + implements WrappedProcessorSupplier { private final ProcessorSupplier userProcessorSupplier; - private final Map> asyncStoreBuilders; + + private Map> asyncStoreBuilders = null; /** * Create an AsyncProcessorSupplier that wraps a custom {@link ProcessorSupplier} @@ -132,32 +134,30 @@ public final class AsyncProcessorSupplier public static AsyncProcessorSupplier createAsyncProcessorSupplier( final ProcessorSupplier processorSupplier ) { - return new AsyncProcessorSupplier<>(processorSupplier, processorSupplier.stores()); + return new AsyncProcessorSupplier<>(processorSupplier); } private AsyncProcessorSupplier( - final ProcessorSupplier userProcessorSupplier, - final Set> userStoreBuilders + final ProcessorSupplier userProcessorSupplier ) { - if (userStoreBuilders == null || userStoreBuilders.isEmpty()) { - throw new UnsupportedOperationException( - "Async processing currently requires at least one state store be " - + "connected to the async processor, and that stores be connected " - + "by implementing the #stores method in your processor supplier"); - } - this.userProcessorSupplier = userProcessorSupplier; - this.asyncStoreBuilders = initializeAsyncBuilders(userStoreBuilders); } @Override public AsyncProcessor get() { + maybeInitializeAsyncStoreBuilders(); return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders); } @Override public Set> stores() { + maybeInitializeAsyncStoreBuilders(); return new HashSet<>(asyncStoreBuilders.values()); } + private void maybeInitializeAsyncStoreBuilders() { + if (asyncStoreBuilders == null) { + asyncStoreBuilders = initializeAsyncBuilders(userProcessorSupplier.stores()); + } + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorWrapper.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorWrapper.java new file mode 100644 index 000000000..50d899ba9 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/AsyncProcessorWrapper.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.api.async; + +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.ProcessorWrapper; +import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier; +import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier; + +// TODO(sophie): only wrap when stores are Responsive (rather than eg rocksdb or in-memory) +public class AsyncProcessorWrapper implements ProcessorWrapper { + + @Override + public WrappedProcessorSupplier wrapProcessorSupplier( + final String processorName, + final ProcessorSupplier processorSupplier + ) { + final var stores = processorSupplier.stores(); + if (stores != null && !stores.isEmpty()) { + return AsyncProcessorSupplier.createAsyncProcessorSupplier(processorSupplier); + } else { + return ProcessorWrapper.asWrapped(processorSupplier); + } + } + + @Override + public WrappedFixedKeyProcessorSupplier wrapFixedKeyProcessorSupplier( + final String processorName, + final FixedKeyProcessorSupplier processorSupplier + ) { + final var stores = processorSupplier.stores(); + if (stores != null && !stores.isEmpty()) { + return AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier(processorSupplier); + } else { + return ProcessorWrapper.asWrappedFixedKey(processorSupplier); + } + + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 81e3ba163..0b47e389a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -71,7 +71,7 @@ public class AsyncProcessor private final Processor userProcessor; private final FixedKeyProcessor userFixedKeyProcessor; - private final Map> connectedStoreBuilders; + private final Map> connectedStoreBuilders; // Tracks all pending events, ie from moment of creation to end of finalization/transition // to "DONE" state. Used to make sure all events are flushed during a commit. @@ -115,14 +115,14 @@ public class AsyncProcessor public static AsyncProcessor createAsyncProcessor( final Processor userProcessor, - final Map> connectedStoreBuilders + final Map> connectedStoreBuilders ) { return new AsyncProcessor<>(userProcessor, null, connectedStoreBuilders); } public static AsyncProcessor createAsyncFixedKeyProcessor( final FixedKeyProcessor userProcessor, - final Map> connectedStoreBuilders + final Map> connectedStoreBuilders ) { return new AsyncProcessor<>(null, userProcessor, connectedStoreBuilders); } @@ -135,7 +135,7 @@ public static AsyncProcessor createAsyncFi private AsyncProcessor( final Processor userProcessor, final FixedKeyProcessor userFixedKeyProcessor, - final Map> connectedStoreBuilders + final Map> connectedStoreBuilders ) { this.userProcessor = userProcessor; this.userFixedKeyProcessor = userFixedKeyProcessor; @@ -377,10 +377,10 @@ public void close() { private static void registerFlushListenerForStoreBuilders( final String streamThreadName, final int partition, - final Collection> asyncStoreBuilders, + final Collection> asyncStoreBuilders, final AsyncFlushListener flushPendingEvents ) { - for (final AbstractAsyncStoreBuilder builder : asyncStoreBuilders) { + for (final AbstractAsyncStoreBuilder builder : asyncStoreBuilders) { builder.registerFlushListenerWithAsyncStore(streamThreadName, partition, flushPendingEvents); } } @@ -738,7 +738,7 @@ private boolean isCleared() { */ private void verifyConnectedStateStores( final Map> accessedStores, - final Map> connectedStores + final Map> connectedStores ) { if (!accessedStores.keySet().equals(connectedStores.keySet())) { log.error( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java index 3ae987e5c..4148d2ab3 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java @@ -15,8 +15,6 @@ import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; -import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder; -import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.StoreType; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -24,8 +22,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.internals.AsyncKeyValueStoreBuilder; -import org.apache.kafka.streams.state.internals.AsyncTimestampedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.DelayedAsyncStoreBuilder; public class AsyncUtils { @@ -60,43 +57,22 @@ public static boolean isStreamThreadOrAsyncThread( || isStreamThread(threadName, streamThreadName); } - public static Map> initializeAsyncBuilders( + public static Map> initializeAsyncBuilders( final Set> userConnectedStores ) { if (userConnectedStores == null || userConnectedStores.isEmpty()) { return Collections.emptyMap(); } - final Map> asyncStoreBuilders = new HashMap<>(); + final Map> asyncStoreBuilders = new HashMap<>(); for (final StoreBuilder builder : userConnectedStores) { final String storeName = builder.name(); - if (builder instanceof ResponsiveStoreBuilder) { - final ResponsiveStoreBuilder responsiveBuilder = - (ResponsiveStoreBuilder) builder; - final StoreType storeType = responsiveBuilder.storeType(); + asyncStoreBuilders.put( + storeName, + new DelayedAsyncStoreBuilder<>(builder) + ); - final AbstractAsyncStoreBuilder storeBuilder; - if (storeType.equals(StoreType.TIMESTAMPED_KEY_VALUE)) { - storeBuilder = new AsyncTimestampedKeyValueStoreBuilder<>(responsiveBuilder); - } else if (storeType.equals(StoreType.KEY_VALUE)) { - storeBuilder = new AsyncKeyValueStoreBuilder<>(responsiveBuilder); - } else { - throw new UnsupportedOperationException( - "Only key-value stores are supported by async processors at this time"); - } - - asyncStoreBuilders.put( - storeName, - storeBuilder - ); - - } else { - throw new IllegalStateException(String.format( - "Detected the StoreBuilder for %s was not created via the ResponsiveStores factory, " - + "please ensure that all store builders and suppliers are provided through the " - + "appropriate API from ResponsiveStores", storeName)); - } } return asyncStoreBuilders; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java index 1f4eb1215..5da261075 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java @@ -259,8 +259,8 @@ public void setProcessorMetadata(final ProcessorMetadata metadata) { } @Override - public ProcessorMetadata getProcessorMetadata() { - return delegate().getProcessorMetadata(); + public ProcessorMetadata processorMetadata() { + return delegate().processorMetadata(); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AbstractAsyncFlushingStore.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AbstractAsyncFlushingStore.java new file mode 100644 index 000000000..f044ff4f9 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AbstractAsyncFlushingStore.java @@ -0,0 +1,131 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.api.async.internals.stores; + +import dev.responsive.kafka.api.async.internals.stores.StreamThreadFlushListeners.AsyncFlushListener; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.internals.CacheFlushListener; +import org.apache.kafka.streams.state.internals.CachedStateStore; +import org.apache.kafka.streams.state.internals.WrappedStateStore; +import org.slf4j.Logger; + +public abstract class AbstractAsyncFlushingStore + extends WrappedStateStore + implements CachedStateStore { + + private final Logger log; + + private final StreamThreadFlushListeners flushListeners; + + // Effectively final but can't be initialized until the store's #init + private int partition; + + // Effectively final but can't be initialized until the corresponding processor's #init + private AsyncFlushListener flushAsyncProcessor; + + public AbstractAsyncFlushingStore( + final S inner, + final StreamThreadFlushListeners flushListeners + ) { + super(inner); + this.flushListeners = flushListeners; + this.log = new LogContext( + String.format("stream-thread [%s] %s: ", + flushListeners.streamThreadName(), + inner.name()) + ).logger(AbstractAsyncFlushingStore.class); + } + + @Override + public void init(final StateStoreContext context, + final StateStore root) { + this.partition = context.taskId().partition(); + + flushListeners.registerStoreConnectorForPartition( + partition, + flushListener -> flushAsyncProcessor = flushListener + ); + + try { + super.init(context, root); + } catch (final RuntimeException e) { + log.error("failed to initialize the wrapped store. Deregistering the store connector as " + + "its likely that this store was not registered with streams and close will " + + "not be called"); + flushListeners.unregisterListenerForPartition(partition); + throw e; + } + } + + @Override + public void flushCache() { + if (flushAsyncProcessor != null) { + // We wait on/clear out the async processor buffers first so that any pending async events + // that write to the state store are guaranteed to be inserted in the cache before we + // proceed with flushing the cache. This is the reason we hook into the commit to block + // on pending async results via this #flushCache API, and not, for example, the consumer's + // commit or producer's commitTxn -- because #flushCache is the first call in a commit, and + // if we waited until #commit/#commitTxn we would have to flush the cache a second time in + // case any pending async events issued new writes to the state store/cache + flushAsyncProcessor.flushBuffers(); + } else { + log.warn("A flush was triggered on the async state store but the flush listener was " + + "not yet initialized. This can happen when a task is closed before " + + "it can be initialized."); + } + + super.flushCache(); + } + + /** + * Used by Streams to clear the cache (without flushing) when a task is transitioning + * from active to standby and the state stores are being recycled. Standby tasks + * have no caching layer, so Streams simply clears the cache here in case the + * task is re-recycled back into an active task and the caching layer is revived. + */ + @Override + public void clearCache() { + // this is technically a Responsive-specific constraint, and should be relaxed if we open + // up the async framework to general use cases + throw new IllegalStateException( + "Attempted to clear cache of async store, this implies the task is " + + "transitioning to standby which should not happen"); + } + + @Override + public void close() { + flushListeners.unregisterListenerForPartition(partition); + super.close(); + } + + /** + * NOTE: this is NOT the same as the AsyncFlushListener, which is used to flush the entire + * async processor when the cache is flushed as part of a Streams commit. + * This API is used by Streams, internally, to register a listener for the records that + * are evicted from the Streams cache and need to be forwarded downstream through the + * topology. This method would be better named #setCacheEvictionListener since evictions + * can happen when a new record is added that pushes the cache beyond its maximum size, + * and not just when the cache is flushed. Unfortunately, it's a Streams API that we're + * being forced to implement here, not something we can control. + */ + @Override + public boolean setFlushListener( + final CacheFlushListener listener, + final boolean sendOldValues + ) { + return super.setFlushListener(listener, sendOldValues); + } + +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AbstractAsyncStoreBuilder.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AbstractAsyncStoreBuilder.java index 5a0e1951c..21604ff06 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AbstractAsyncStoreBuilder.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AbstractAsyncStoreBuilder.java @@ -17,25 +17,21 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.internals.AsyncKeyValueStoreBuilder; +import org.apache.kafka.streams.state.WindowStore; /** * A copy of the {@link org.apache.kafka.streams.state.internals.AbstractStoreBuilder} class with * a few additional methods related to async processing, such as de/registering flush listeners */ -public abstract class AbstractAsyncStoreBuilder +public abstract class AbstractAsyncStoreBuilder implements StoreBuilder { protected final String name; - protected final Serde keySerde; - protected final Serde valueSerde; - protected final Time time; protected final Map logConfig = new HashMap<>(); private boolean cachingEnabled = false; @@ -48,26 +44,19 @@ public abstract class AbstractAsyncStoreBuilder new ConcurrentHashMap<>(); public AbstractAsyncStoreBuilder( - final String name, - final Serde keySerde, - final Serde valueSerde, - final Time time + final String name ) { Objects.requireNonNull(name, "name cannot be null"); - Objects.requireNonNull(time, "time cannot be null"); this.name = name; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.time = time; } /** - * Similar to the #maybeWrapCaching or #maybeWrapLogging methods in the StoreBuilder classes - * (eg {@link AsyncKeyValueStoreBuilder}, this method adds an additional layer to the store + * Similar to the #maybeWrapCaching or #maybeWrapLogging methods in the + * {@link org.apache.kafka.streams.state.internals.DelayedAsyncStoreBuilder}, + * this method adds an additional layer to the store * hierarchy by wrapping it in a {@link AsyncFlushingKeyValueStore}. *

* This specific method is for use with KV stores, whether plain or timestamped. - * TODO: add equivalent for window/session stores */ protected KeyValueStore wrapAsyncFlushingKV( final KeyValueStore inner @@ -78,6 +67,40 @@ protected KeyValueStore wrapAsyncFlushingKV( return new AsyncFlushingKeyValueStore(inner, threadFlushListeners); } + /** + * Similar to the #maybeWrapCaching or #maybeWrapLogging methods in the + * {@link org.apache.kafka.streams.state.internals.DelayedAsyncStoreBuilder}, + * this method adds an additional layer to the store + * hierarchy by wrapping it in a {@link AsyncFlushingWindowStore}. + *

+ * This specific method is for use with window stores, whether plain or timestamped. + */ + protected WindowStore wrapAsyncFlushingWindow( + final WindowStore inner + ) { + final StreamThreadFlushListeners threadFlushListeners = + getOrCreateFlushListeners(Thread.currentThread().getName()); + + return new AsyncFlushingWindowStore(inner, threadFlushListeners); + } + + /** + * Similar to the #maybeWrapCaching or #maybeWrapLogging methods in the + * {@link org.apache.kafka.streams.state.internals.DelayedAsyncStoreBuilder}, + * this method adds an additional layer to the store + * hierarchy by wrapping it in a {@link AsyncFlushingSessionStore}. + *

+ * This specific method is for use with session stores, whether plain or timestamped. + */ + protected SessionStore wrapAsyncFlushingSession( + final SessionStore inner + ) { + final StreamThreadFlushListeners threadFlushListeners = + getOrCreateFlushListeners(Thread.currentThread().getName()); + + return new AsyncFlushingSessionStore(inner, threadFlushListeners); + } + /** * Register a flush listener and the partition it's associated with for the * given StreamThread. diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingKeyValueStore.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingKeyValueStore.java index 51066da11..aefdfc5c1 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingKeyValueStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingKeyValueStore.java @@ -12,20 +12,13 @@ package dev.responsive.kafka.api.async.internals.stores; -import dev.responsive.kafka.api.async.internals.stores.StreamThreadFlushListeners.AsyncFlushListener; import java.util.List; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.internals.CacheFlushListener; import org.apache.kafka.streams.state.internals.CachedStateStore; import org.apache.kafka.streams.state.internals.CachingKeyValueStore; -import org.apache.kafka.streams.state.internals.WrappedStateStore; -import org.slf4j.Logger; /** * Simple wrapper class around Kafka Streams' {@link CachingKeyValueStore} class that @@ -38,92 +31,14 @@ * package -- so we can call the package-private constructor of the super class) */ public class AsyncFlushingKeyValueStore - extends WrappedStateStore, byte[], byte[]> - implements KeyValueStore, CachedStateStore { - - private final Logger log; - - private final StreamThreadFlushListeners flushListeners; - - // Effectively final but can't be initialized until the store's #init - private int partition; - - // Effectively final but can't be initialized until the corresponding processor's #init - private AsyncFlushListener flushAsyncProcessor; + extends AbstractAsyncFlushingStore> + implements KeyValueStore { public AsyncFlushingKeyValueStore( final KeyValueStore inner, final StreamThreadFlushListeners flushListeners ) { - super(inner); - this.flushListeners = flushListeners; - this.log = new LogContext( - String.format("stream-thread [%s] %s: ", - flushListeners.streamThreadName(), - inner.name()) - ).logger(AsyncFlushingKeyValueStore.class); - } - - @Override - public void init(final StateStoreContext context, - final StateStore root) { - this.partition = context.taskId().partition(); - - flushListeners.registerStoreConnectorForPartition( - partition, - flushListener -> flushAsyncProcessor = flushListener - ); - - try { - super.init(context, root); - } catch (final RuntimeException e) { - log.error("failed to initialize the wrapped store. Deregistering the store connector as " - + "its likely that this store was not registered with streams and close will not be" - + " called"); - flushListeners.unregisterListenerForPartition(partition); - throw e; - } - } - - @Override - public void flushCache() { - if (flushAsyncProcessor != null) { - // We wait on/clear out the async processor buffers first so that any pending async events - // that write to the state store are guaranteed to be inserted in the cache before we - // proceed with flushing the cache. This is the reason we hook into the commit to block - // on pending async results via this #flushCache API, and not, for example, the consumer's - // commit or producer's commitTxn -- because #flushCache is the first call in a commit, and - // if we waited until #commit/#commitTxn we would have to flush the cache a second time in - // case any pending async events issued new writes to the state store/cache - flushAsyncProcessor.flushBuffers(); - } else { - log.warn("A flush was triggered on the async state store but the flush listener was " - + "not yet initialized. This can happen when a task is closed before " - + "it can be initialized."); - } - - super.flushCache(); - } - - /** - * Used by Streams to clear the cache (without flushing) when a task is transitioning - * from active to standby and the state stores are being recycled. Standby tasks - * have no caching layer, so Streams simply clears the cache here in case the - * task is re-recycled back into an active task and the caching layer is revived. - */ - @Override - public void clearCache() { - // this is technically a Responsive-specific constraint, and should be relaxed if we open - // up the async framework to general use cases - throw new IllegalStateException( - "Attempted to clear cache of async store, this implies the task is " - + "transitioning to standby which should not happen"); - } - - @Override - public void close() { - flushListeners.unregisterListenerForPartition(partition); - super.close(); + super(inner, flushListeners); } @Override @@ -166,21 +81,4 @@ public long approximateNumEntries() { return wrapped().approximateNumEntries(); } - /** - * NOTE: this is NOT the same as the AsyncFlushListener, which is used to flush the entire - * async processor when the cache is flushed as part of a Streams commit. - * This API is used by Streams, internally, to register a listener for the records that - * are evicted from the Streams cache and need to be forwarded downstream through the - * topology. This method would be better named #setCacheEvictionListener since evictions - * can happen when a new record is added that pushes the cache beyond its maximum size, - * and not just when the cache is flushed. Unfortunately, it's a Streams API that we're - * being forced to implement here, not something we can control. - */ - @Override - public boolean setFlushListener( - final CacheFlushListener listener, - final boolean sendOldValues - ) { - return super.setFlushListener(listener, sendOldValues); - } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingSessionStore.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingSessionStore.java new file mode 100644 index 000000000..5c29ab643 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingSessionStore.java @@ -0,0 +1,179 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.api.async.internals.stores; + +import java.time.Instant; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.SessionStore; + +@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder") +public class AsyncFlushingSessionStore + extends AbstractAsyncFlushingStore> + implements SessionStore { + + public AsyncFlushingSessionStore( + final SessionStore inner, + final StreamThreadFlushListeners flushListeners + ) { + super(inner, flushListeners); + } + + @Override + public KeyValueIterator, byte[]> findSessions( + final long earliestSessionEndTime, + final long latestSessionEndTime + ) { + return wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime); + } + + @Override + public KeyValueIterator, byte[]> findSessions( + final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime + ) { + return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public KeyValueIterator, byte[]> findSessions( + final Bytes key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime + ) { + return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions( + final Bytes key, + final long earliestSessionEndTime, + final long latestSessionStartTime + ) { + return wrapped().backwardFindSessions( + key, earliestSessionEndTime, latestSessionStartTime + ); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions( + final Bytes key, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime + ) { + return wrapped().backwardFindSessions( + key, earliestSessionEndTime, latestSessionStartTime + ); + } + + @Override + public KeyValueIterator, byte[]> findSessions( + final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime + ) { + return wrapped().findSessions( + keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime + ); + } + + @Override + public KeyValueIterator, byte[]> findSessions( + final Bytes keyFrom, + final Bytes keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime + ) { + return wrapped().findSessions( + keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime + ); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions( + final Bytes keyFrom, + final Bytes keyTo, + final long earliestSessionEndTime, + final long latestSessionStartTime + ) { + return wrapped().backwardFindSessions( + keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime + ); + } + + @Override + public KeyValueIterator, byte[]> backwardFindSessions( + final Bytes keyFrom, + final Bytes keyTo, + final Instant earliestSessionEndTime, + final Instant latestSessionStartTime + ) { + return wrapped().backwardFindSessions( + keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime + ); + } + + @Override + public byte[] fetchSession( + final Bytes key, + final long sessionStartTime, + final long sessionEndTime + ) { + return wrapped().fetchSession(key, sessionStartTime, sessionEndTime); + } + + @Override + public byte[] fetchSession( + final Bytes key, + final Instant sessionStartTime, + final Instant sessionEndTime + ) { + return wrapped().fetchSession(key, sessionStartTime, sessionEndTime); + } + + @Override + public void remove(final Windowed windowedKey) { + wrapped().remove(windowedKey); + } + + @Override + public void put(final Windowed windowedKey, final byte[] value) { + wrapped().put(windowedKey, value); + } + + @Override + public KeyValueIterator, byte[]> fetch(final Bytes key) { + return wrapped().fetch(key); + } + + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes key) { + return wrapped().backwardFetch(key); + } + + @Override + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { + return wrapped().fetch(keyFrom, keyTo); + } + + @Override + public KeyValueIterator, byte[]> backwardFetch( + final Bytes keyFrom, + final Bytes keyTo + ) { + return wrapped().backwardFetch(keyFrom, keyTo); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingWindowStore.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingWindowStore.java new file mode 100644 index 000000000..525497f94 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncFlushingWindowStore.java @@ -0,0 +1,159 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package dev.responsive.kafka.api.async.internals.stores; + +import java.time.Instant; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; + +@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder") +public class AsyncFlushingWindowStore + extends AbstractAsyncFlushingStore> + implements WindowStore { + + public AsyncFlushingWindowStore( + final WindowStore inner, + final StreamThreadFlushListeners flushListeners + ) { + super(inner, flushListeners); + } + + @Override + public void put(final Bytes key, final byte[] value, final long timestamp) { + wrapped().put(key, value, timestamp); + } + + @Override + public WindowStoreIterator fetch( + final Bytes key, + final long timeFrom, + final long timeTo + ) { + return wrapped().fetch(key, timeFrom, timeTo); + } + + @Override + public WindowStoreIterator fetch( + final Bytes key, + final Instant timeFrom, + final Instant timeTo + ) throws IllegalArgumentException { + return wrapped().fetch(key, timeFrom, timeTo); + } + + @Override + public WindowStoreIterator backwardFetch( + final Bytes key, + final long timeFrom, + final long timeTo + ) { + return wrapped().backwardFetch(key, timeFrom, timeTo); + } + + @Override + public WindowStoreIterator backwardFetch( + final Bytes key, + final Instant timeFrom, + final Instant timeTo + ) throws IllegalArgumentException { + return wrapped().backwardFetch(key, timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> fetch( + final Bytes keyFrom, + final Bytes keyTo, final long timeFrom, final long timeTo + ) { + return wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> fetch( + final Bytes keyFrom, + final Bytes keyTo, + final Instant timeFrom, + final Instant timeTo + ) throws IllegalArgumentException { + return wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> backwardFetch( + final Bytes keyFrom, + final Bytes keyTo, + final long timeFrom, + final long timeTo + ) { + return wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> backwardFetch( + final Bytes keyFrom, + final Bytes keyTo, + final Instant timeFrom, + final Instant timeTo + ) throws IllegalArgumentException { + return wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> fetchAll( + final long timeFrom, + final long timeTo + ) { + return wrapped().fetchAll(timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> fetchAll( + final Instant timeFrom, + final Instant timeTo + ) throws IllegalArgumentException { + return wrapped().fetchAll(timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> backwardFetchAll( + final long timeFrom, + final long timeTo + ) { + return wrapped().backwardFetchAll(timeFrom, timeTo); + } + + @Override + public KeyValueIterator, byte[]> backwardFetchAll( + final Instant timeFrom, + final Instant timeTo + ) throws IllegalArgumentException { + return wrapped().backwardFetchAll(timeFrom, timeTo); + } + + @Override + public byte[] fetch(final Bytes key, final long timestamp) { + return wrapped().fetch(key, timestamp); + } + + @Override + public KeyValueIterator, byte[]> all() { + return wrapped().all(); + } + + @Override + public KeyValueIterator, byte[]> backwardAll() { + return wrapped().backwardAll(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java index 845728f5d..a60ce6d6e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryConfig; @@ -115,16 +114,6 @@ public String name() { return userDelegate.name(); } - @Override - @Deprecated - public void init( - final org.apache.kafka.streams.processor.ProcessorContext context, - final StateStore root - ) { - throw new UnsupportedOperationException("This init method is deprecated, please implement" - + "init(StateStoreContext, StateStore) instead"); - } - @Override public QueryResult query( final Query query, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 429e133da..b852c08d7 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; +import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor; /** * Configurations for {@link ResponsiveKafkaStreams} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java index 5fa2cac19..9f41c2e23 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java @@ -13,11 +13,15 @@ package dev.responsive.kafka.api.stores; import dev.responsive.kafka.internal.stores.ResponsiveMaterialized; -import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder; -import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.StoreType; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveKeyValueStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveSessionStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveTimestampedKeyValueStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveTimestampedWindowStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveWindowStoreBuilder; import java.time.Duration; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -117,12 +121,11 @@ public static StoreBuilder> keyValueStoreBuilder( final Serde keySerde, final Serde valueSerde ) { - return new ResponsiveStoreBuilder<>( - StoreType.KEY_VALUE, + return new ResponsiveKeyValueStoreBuilder<>( storeSupplier, - Stores.keyValueStoreBuilder(storeSupplier, keySerde, valueSerde), keySerde, - valueSerde + valueSerde, + Time.SYSTEM ); } @@ -151,15 +154,11 @@ public static StoreBuilder> timestampedKey ) { storeSupplier.asTimestamped(); - return new ResponsiveStoreBuilder<>( - StoreType.TIMESTAMPED_KEY_VALUE, + return new ResponsiveTimestampedKeyValueStoreBuilder<>( storeSupplier, - Stores.timestampedKeyValueStoreBuilder( - storeSupplier, - keySerde, - valueSerde), keySerde, - valueSerde + valueSerde, + Time.SYSTEM ); } @@ -259,12 +258,11 @@ public static StoreBuilder> windowStoreBuilder( final Serde keySerde, final Serde valueSerde ) { - return new ResponsiveStoreBuilder<>( - StoreType.WINDOW, + return new ResponsiveWindowStoreBuilder<>( storeSupplier, - Stores.windowStoreBuilder(storeSupplier, keySerde, valueSerde), keySerde, - valueSerde + valueSerde, + Time.SYSTEM ); } @@ -286,15 +284,11 @@ public static StoreBuilder> timestampedWindo final Serde keySerde, final Serde valueSerde ) { - return new ResponsiveStoreBuilder<>( - StoreType.TIMESTAMPED_WINDOW, + return new ResponsiveTimestampedWindowStoreBuilder<>( storeSupplier, - Stores.timestampedWindowStoreBuilder( - storeSupplier, - keySerde, - valueSerde), keySerde, - valueSerde + valueSerde, + Time.SYSTEM ); } @@ -345,12 +339,11 @@ public static StoreBuilder> sessionStoreBuilder( final Serde keySerde, final Serde valueSerde ) { - return new ResponsiveStoreBuilder<>( - StoreType.SESSION, + return new ResponsiveSessionStoreBuilder<>( storeSupplier, - Stores.sessionStoreBuilder(storeSupplier, keySerde, valueSerde), keySerde, - valueSerde + valueSerde, + Time.SYSTEM ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java index c28131ade..325b31ea1 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java @@ -26,11 +26,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metrics.KafkaMetric; public abstract class DelegatingConsumer implements Consumer { @@ -76,14 +78,21 @@ public void subscribe(final Pattern pattern) { } @Override - public void unsubscribe() { - delegate.unsubscribe(); + public void subscribe( + final SubscriptionPattern pattern, + final ConsumerRebalanceListener callback + ) { + delegate.subscribe(pattern, callback); } @Override - @Deprecated - public ConsumerRecords poll(final long timeout) { - return delegate.poll(timeout); + public void subscribe(final SubscriptionPattern pattern) { + delegate.subscribe(pattern); + } + + @Override + public void unsubscribe() { + delegate.unsubscribe(); } @Override @@ -158,18 +167,6 @@ public long position(final TopicPartition partition, final Duration timeout) { return delegate.position(partition, timeout); } - @Override - @Deprecated - public OffsetAndMetadata committed(final TopicPartition partition) { - return delegate.committed(partition); - } - - @Override - @Deprecated - public OffsetAndMetadata committed(final TopicPartition partition, final Duration timeout) { - return delegate.committed(partition, timeout); - } - @Override public Map committed(final Set partitions) { return delegate.committed(partitions); @@ -294,4 +291,14 @@ public void wakeup() { public Uuid clientInstanceId(final Duration duration) { return delegate.clientInstanceId(duration); } + + @Override + public void registerMetricForSubscription(final KafkaMetric metric) { + delegate.registerMetricForSubscription(metric); + } + + @Override + public void unregisterMetricFromSubscription(final KafkaMetric metric) { + delegate.unregisterMetricFromSubscription(metric); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingProducer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingProducer.java index b0e90133c..d6f0e5e9c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingProducer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingProducer.java @@ -28,10 +28,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.metrics.KafkaMetric; public abstract class DelegatingProducer implements Producer { - private final Producer delegate; + protected final Producer delegate; public DelegatingProducer(final Producer delegate) { this.delegate = delegate; @@ -47,15 +48,6 @@ public void beginTransaction() throws ProducerFencedException { delegate.beginTransaction(); } - @Override - @SuppressWarnings("deprecation") - public void sendOffsetsToTransaction( - final Map offsets, - final String consumerGroupId - ) throws ProducerFencedException { - delegate.sendOffsetsToTransaction(offsets, consumerGroupId); - } - @Override public void sendOffsetsToTransaction( final Map offsets, @@ -114,4 +106,14 @@ public void close(final Duration timeout) { delegate.close(); } + @Override + public void registerMetricForSubscription(final KafkaMetric metric) { + delegate.registerMetricForSubscription(metric); + } + + @Override + public void unregisterMetricFromSubscription(final KafkaMetric metric) { + delegate.unregisterMetricFromSubscription(metric); + } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java index a51fc251c..1a409e9d4 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveConsumer.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.SubscriptionPattern; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -106,6 +107,20 @@ public void subscribe(final Pattern pattern) { + " without a rebalance listener"); } + @Override + public void subscribe( + final SubscriptionPattern pattern, + final ConsumerRebalanceListener callback + ) { + super.subscribe(pattern, callback); + } + + @Override + public void subscribe(final SubscriptionPattern pattern) { + throw new IllegalStateException("Unexpected call to subscribe(SubscriptionPattern) on main consumer" + + " without a rebalance listener"); + } + @Override public void unsubscribe() { listeners.forEach(Listener::onUnsubscribe); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveGlobalConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveGlobalConsumer.java index 0942e525f..facad2ecc 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveGlobalConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveGlobalConsumer.java @@ -15,6 +15,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -182,7 +183,8 @@ static SingletonConsumerRecords of(final ConsumerRecords records public SingletonConsumerRecords( final Map>> records ) { - super(records); + super(records, Collections.emptyMap()); + // TODO(sophie): need to pass in the actual next offsets here? } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java index d7f46793c..1764a33a0 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java @@ -26,17 +26,12 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ResponsiveProducer implements Producer { - private final Producer wrapped; +public class ResponsiveProducer extends DelegatingProducer { private final List listeners; private final Logger logger; @@ -65,40 +60,19 @@ public ResponsiveProducer( final Producer wrapped, final List listeners ) { + super(wrapped); this.logger = LoggerFactory.getLogger( ResponsiveProducer.class.getName() + "." + Objects.requireNonNull(clientid)); - this.wrapped = Objects.requireNonNull(wrapped); this.listeners = Objects.requireNonNull(listeners); } - @Override - public void initTransactions() { - wrapped.initTransactions(); - } - - @Override - public void beginTransaction() throws ProducerFencedException { - wrapped.beginTransaction(); - } - - @Override - @SuppressWarnings("deprecation") - public void sendOffsetsToTransaction( - final Map offsets, - final String consumerGroupId - ) throws ProducerFencedException { - wrapped.sendOffsetsToTransaction(offsets, consumerGroupId); - for (final var l : listeners) { - l.onSendOffsetsToTransaction(offsets, consumerGroupId); - } - } @Override public void sendOffsetsToTransaction( final Map offsets, final ConsumerGroupMetadata groupMetadata ) throws ProducerFencedException { - wrapped.sendOffsetsToTransaction(offsets, groupMetadata); + delegate.sendOffsetsToTransaction(offsets, groupMetadata); for (final var l : listeners) { l.onSendOffsetsToTransaction(offsets, groupMetadata.groupId()); } @@ -106,57 +80,38 @@ public void sendOffsetsToTransaction( @Override public void commitTransaction() throws ProducerFencedException { - wrapped.commitTransaction(); + delegate.commitTransaction(); listeners.forEach(Listener::onCommit); } @Override public void abortTransaction() throws ProducerFencedException { - wrapped.abortTransaction(); + delegate.abortTransaction(); listeners.forEach(Listener::onAbort); } @Override public Future send(final ProducerRecord record) { - return new RecordingFuture(wrapped.send(record), listeners); + return new RecordingFuture(delegate.send(record), listeners); } @Override public Future send(final ProducerRecord record, final Callback callback) { return new RecordingFuture( - wrapped.send(record, new RecordingCallback(callback, listeners)), listeners + delegate.send(record, new RecordingCallback(callback, listeners)), listeners ); } - @Override - public void flush() { - wrapped.flush(); - } - - @Override - public List partitionsFor(final String topic) { - return wrapped.partitionsFor(topic); - } - - @Override - public Map metrics() { - return wrapped.metrics(); - } - - @Override - public Uuid clientInstanceId(final Duration duration) { - return wrapped.clientInstanceId(duration); - } @Override public void close() { - wrapped.close(); + delegate.close(); closeListeners(); } @Override public void close(final Duration timeout) { - wrapped.close(); + delegate.close(timeout); closeListeners(); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfig.java index f93b9e63f..0ec016455 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfig.java @@ -36,7 +36,6 @@ public static ResponsiveStreamsConfig streamsConfig(final Map props) { public static void validateStreamsConfig(final StreamsConfig streamsConfig) { verifyNoStandbys(streamsConfig); - verifyNotEosV1(streamsConfig); } static void verifyNoStandbys(final StreamsConfig config) throws ConfigException { @@ -55,13 +54,6 @@ static void verifyNoStandbys(final StreamsConfig config) throws ConfigException } } - @SuppressWarnings("deprecation") - static void verifyNotEosV1(final StreamsConfig config) throws ConfigException { - if (EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) { - throw new ConfigException("Responsive driver can only be used with ALOS/EOS-V2"); - } - } - private ResponsiveStreamsConfig(final Map props, final boolean logConfigs) { super(props, logConfigs); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java index 8329fc4c0..59e53bacc 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveKeyValueStore.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.Task.TaskType; @@ -79,18 +78,6 @@ public String name() { return name.kafkaName(); } - @Override - @Deprecated - public void init(final ProcessorContext context, final StateStore root) { - if (context instanceof StateStoreContext) { - init((StateStoreContext) context, root); - } else { - throw new UnsupportedOperationException( - "Use ResponsiveStore#init(StateStoreContext, StateStore) instead." - ); - } - } - @Override public void init(final StateStoreContext storeContext, final StateStore root) { try { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveSessionStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveSessionStore.java index 6b9b1a160..e2bfb3c9d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveSessionStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveSessionStore.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; @@ -61,18 +60,6 @@ public ResponsiveSessionStore(final ResponsiveSessionParams params) { ).logger(ResponsiveSessionStore.class); } - @Override - @Deprecated - public void init(final ProcessorContext context, final StateStore root) { - if (context instanceof StateStoreContext) { - init((StateStoreContext) context, root); - } else { - throw new UnsupportedOperationException( - "Use ResponsiveSessionStore#init(StateStoreContext, StateStore) instead." - ); - } - } - @Override public void init(final StateStoreContext storeContext, final StateStore root) { log.info("Initializing state store"); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreBuilder.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreBuilder.java index 368e0e78e..ae9f20b21 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreBuilder.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreBuilder.java @@ -12,134 +12,133 @@ package dev.responsive.kafka.internal.stores; -import java.util.Map; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreSupplier; - -public class ResponsiveStoreBuilder implements StoreBuilder { - - private final StoreType storeType; - private final StoreSupplier userStoreSupplier; - private final StoreBuilder userStoreBuilder; - private final Serde keySerde; - // Note: the valueSerde is not necessary of type V, eg in case of timestamped stores - private final Serde valueSerde; - private final Time time; - - public enum StoreType { - KEY_VALUE, - TIMESTAMPED_KEY_VALUE, - WINDOW, - TIMESTAMPED_WINDOW, - SESSION - } - - public ResponsiveStoreBuilder( - final StoreType storeType, - final StoreSupplier userStoreSupplier, - final StoreBuilder userStoreBuilder, - final Serde keySerde, - final Serde valueSerde - ) { - // the time parameter only exists for Streams unit tests and in non-testing code - // will always hard-code Time.SYSTEM - this( - storeType, - userStoreSupplier, - userStoreBuilder, - keySerde, - valueSerde, - Time.SYSTEM - ); - } - - private ResponsiveStoreBuilder( - final StoreType storeType, - final StoreSupplier userStoreSupplier, - final StoreBuilder userStoreBuilder, - final Serde keySerde, - final Serde valueSerde, - final Time time - ) { - this.storeType = storeType; - this.userStoreSupplier = userStoreSupplier; - this.userStoreBuilder = userStoreBuilder; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.time = time; - } - - public StoreType storeType() { - return storeType; - } - - public StoreSupplier storeSupplier() { - return userStoreSupplier; - } - - public Serde keySerde() { - return keySerde; - } - - // For timestamped stores, this will be the serde for the inner value type - // which will not be the same type as V, which is the store's actual V type - // (and would actually be TimestampAndValue for timestamped stores) - @SuppressWarnings("unchecked") - public Serde innerValueSerde() { - return (Serde) valueSerde; - } - - public Time time() { - return time; - } - - @Override - public StoreBuilder withCachingEnabled() { - userStoreBuilder.withCachingEnabled(); - return this; - } - - @Override - public StoreBuilder withCachingDisabled() { - userStoreBuilder.withCachingDisabled(); - return this; - } - - @Override - public StoreBuilder withLoggingEnabled(final Map config) { - userStoreBuilder.withLoggingEnabled(config); - return this; - } - - @Override - public StoreBuilder withLoggingDisabled() { - userStoreBuilder.withLoggingDisabled(); - throw new UnsupportedOperationException( - "Responsive stores are currently incompatible with disabling the changelog. " - + "Please reach out to us to request this feature."); - } - - @Override - public T build() { - return userStoreBuilder.build(); - } - - @Override - public Map logConfig() { - return userStoreBuilder.logConfig(); - } - - @Override - public boolean loggingEnabled() { - return userStoreBuilder.loggingEnabled(); - } - - @Override - public String name() { - return userStoreBuilder.name(); +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.SessionStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; +import org.apache.kafka.streams.state.internals.WindowStoreBuilder; + +public interface ResponsiveStoreBuilder extends StoreBuilder { + + StoreSupplier storeSupplier(); + + class ResponsiveKeyValueStoreBuilder extends KeyValueStoreBuilder + implements ResponsiveStoreBuilder> { + + private final KeyValueBytesStoreSupplier storeSupplier; + + public ResponsiveKeyValueStoreBuilder( + final KeyValueBytesStoreSupplier storeSupplier, + final Serde keySerde, + final Serde valueSerde, + final Time time + ) { + super(storeSupplier, keySerde, valueSerde, time); + this.storeSupplier = storeSupplier; + } + + @Override + public KeyValueBytesStoreSupplier storeSupplier() { + return storeSupplier; + } + } + + class ResponsiveTimestampedKeyValueStoreBuilder + extends TimestampedKeyValueStoreBuilder + implements ResponsiveStoreBuilder, TimestampedKeyValueStore> { + + private final KeyValueBytesStoreSupplier storeSupplier; + + public ResponsiveTimestampedKeyValueStoreBuilder( + final KeyValueBytesStoreSupplier storeSupplier, + final Serde keySerde, + final Serde valueSerde, + final Time time + ) { + super(storeSupplier, keySerde, valueSerde, time); + this.storeSupplier = storeSupplier; + } + + @Override + public KeyValueBytesStoreSupplier storeSupplier() { + return storeSupplier; + } + } + + class ResponsiveWindowStoreBuilder extends WindowStoreBuilder + implements ResponsiveStoreBuilder> { + + private final WindowBytesStoreSupplier storeSupplier; + + public ResponsiveWindowStoreBuilder( + final WindowBytesStoreSupplier storeSupplier, + final Serde keySerde, + final Serde valueSerde, + final Time time + ) { + super(storeSupplier, keySerde, valueSerde, time); + this.storeSupplier = storeSupplier; + } + + @Override + public WindowBytesStoreSupplier storeSupplier() { + return storeSupplier; + } + } + + class ResponsiveTimestampedWindowStoreBuilder extends TimestampedWindowStoreBuilder + implements ResponsiveStoreBuilder> { + + private final WindowBytesStoreSupplier storeSupplier; + + public ResponsiveTimestampedWindowStoreBuilder( + final WindowBytesStoreSupplier storeSupplier, + final Serde keySerde, + final Serde valueSerde, + final Time time + ) { + super(storeSupplier, keySerde, valueSerde, time); + this.storeSupplier = storeSupplier; + } + + @Override + public WindowBytesStoreSupplier storeSupplier() { + return storeSupplier; + } + } + + class ResponsiveSessionStoreBuilder extends SessionStoreBuilder + implements ResponsiveStoreBuilder> { + + private final SessionBytesStoreSupplier storeSupplier; + + public ResponsiveSessionStoreBuilder( + final SessionBytesStoreSupplier storeSupplier, + final Serde keySerde, + final Serde valueSerde, + final Time time + ) { + super(storeSupplier, keySerde, valueSerde, time); + this.storeSupplier = storeSupplier; + } + + @Override + public SessionBytesStoreSupplier storeSupplier() { + return storeSupplier; + } } - } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java index 5a78dc426..6ba81ff96 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; @@ -81,18 +80,6 @@ public String name() { return name.kafkaName(); } - @Override - @Deprecated - public void init(final ProcessorContext context, final StateStore root) { - if (context instanceof StateStoreContext) { - init((StateStoreContext) context, root); - } else { - throw new UnsupportedOperationException( - "Use ResponsiveWindowStore#init(StateStoreContext, StateStore) instead." - ); - } - } - @Override public void init(final StateStoreContext storeContext, final StateStore root) { try { diff --git a/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/AsyncKeyValueStoreBuilder.java b/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/AsyncKeyValueStoreBuilder.java deleted file mode 100644 index 0b2465419..000000000 --- a/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/AsyncKeyValueStoreBuilder.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2024 Responsive Computing, Inc. - * - * This source code is licensed under the Responsive Business Source License Agreement v1.0 - * available at: - * - * https://www.responsive.dev/legal/responsive-bsl-10 - * - * This software requires a valid Commercial License Key for production use. Trial and commercial - * licenses can be obtained at https://www.responsive.dev - */ - -package org.apache.kafka.streams.state.internals; - -import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; -import dev.responsive.kafka.api.async.internals.stores.AsyncFlushingKeyValueStore; -import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Essentially a copy of the {@link KeyValueStoreBuilder} class that - * allows us to inject an additional layer, the {@link AsyncFlushingKeyValueStore}. - * We also use this builder to coordinate between the async processor (which is - * responsible for creating this builder) and the async flushing store (which is - * created by this builder). - */ -public class AsyncKeyValueStoreBuilder - extends AbstractAsyncStoreBuilder> { - - private static final Logger LOG = - LoggerFactory.getLogger(AsyncKeyValueStoreBuilder.class); - private final KeyValueBytesStoreSupplier storeSupplier; - - @SuppressWarnings("unchecked") - public AsyncKeyValueStoreBuilder( - final ResponsiveStoreBuilder responsiveBuilder - ) { - this( - (KeyValueBytesStoreSupplier) responsiveBuilder.storeSupplier(), - (Serde) responsiveBuilder.keySerde(), - responsiveBuilder.innerValueSerde(), - responsiveBuilder.time() - ); - } - - private AsyncKeyValueStoreBuilder( - final KeyValueBytesStoreSupplier storeSupplier, - final Serde keySerde, - final Serde valueSerde, - final Time time - ) { - super( - storeSupplier.name(), - keySerde, - valueSerde, - time - ); - this.storeSupplier = storeSupplier; - LOG.debug("Created async KV store builder with valueSerde = {}", valueSerde); - } - - @Override - public KeyValueStore build() { - final KeyValueStore store = storeSupplier.get(); - - return new MeteredKeyValueStore<>( - wrapAsyncFlushingKV( - maybeWrapCaching( - maybeWrapLogging(store)) - ), - storeSupplier.metricsScope(), - time, - keySerde, - valueSerde - ); - } - - private KeyValueStore maybeWrapCaching(final KeyValueStore inner) { - if (!cachingEnabled()) { - return inner; - } - return new CachingKeyValueStore(inner, true); - } - - private KeyValueStore maybeWrapLogging(final KeyValueStore inner) { - if (!loggingEnabled()) { - return inner; - } - return new ChangeLoggingKeyValueBytesStore(inner); - } - -} diff --git a/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/AsyncTimestampedKeyValueStoreBuilder.java b/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/AsyncTimestampedKeyValueStoreBuilder.java deleted file mode 100644 index 667f1b73f..000000000 --- a/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/AsyncTimestampedKeyValueStoreBuilder.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2024 Responsive Computing, Inc. - * - * This source code is licensed under the Responsive Business Source License Agreement v1.0 - * available at: - * - * https://www.responsive.dev/legal/responsive-bsl-10 - * - * This software requires a valid Commercial License Key for production use. Trial and commercial - * licenses can be obtained at https://www.responsive.dev - */ - -package org.apache.kafka.streams.state.internals; - -import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; -import dev.responsive.kafka.api.async.internals.stores.AsyncFlushingKeyValueStore; -import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.TimestampedBytesStore; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; -import org.apache.kafka.streams.state.ValueAndTimestamp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Essentially a copy of the {@link TimestampedKeyValueStoreBuilder} class that - * allows us to inject an additional layer, the {@link AsyncFlushingKeyValueStore}. - * We also use this builder to coordinate between the async processor (which is - * responsible for creating this builder) and the async flushing store (which is - * created by this builder). - */ -public class AsyncTimestampedKeyValueStoreBuilder - extends AbstractAsyncStoreBuilder, TimestampedKeyValueStore> { - - private static final Logger LOG = - LoggerFactory.getLogger(AsyncTimestampedKeyValueStoreBuilder.class); - private final KeyValueBytesStoreSupplier storeSupplier; - - @SuppressWarnings("unchecked") - public AsyncTimestampedKeyValueStoreBuilder( - final ResponsiveStoreBuilder responsiveBuilder - ) { - this( - (KeyValueBytesStoreSupplier) responsiveBuilder.storeSupplier(), - (Serde) responsiveBuilder.keySerde(), - responsiveBuilder.innerValueSerde(), - responsiveBuilder.time() - ); - } - - private AsyncTimestampedKeyValueStoreBuilder( - final KeyValueBytesStoreSupplier storeSupplier, - final Serde keySerde, - final Serde valueSerde, - final Time time - ) { - super( - storeSupplier.name(), - keySerde, - valueSerde == null ? null : new ValueAndTimestampSerde<>(valueSerde), - time - ); - this.storeSupplier = storeSupplier; - LOG.debug("Created async timestamped-KV store builder with valueSerde = {}", valueSerde); - } - - @Override - public TimestampedKeyValueStore build() { - final KeyValueStore store = storeSupplier.get(); - if (!(store instanceof TimestampedBytesStore)) { - throw new IllegalStateException("Timestamped store builder expects store supplier to provide " - + "store that implements TimestampedBytesStore"); - } - - return new MeteredTimestampedKeyValueStore<>( - wrapAsyncFlushingKV( - maybeWrapCaching( - maybeWrapLogging(store)) - ), - storeSupplier.metricsScope(), - time, - keySerde, - valueSerde - ); - } - - private KeyValueStore maybeWrapCaching(final KeyValueStore inner) { - if (!cachingEnabled()) { - return inner; - } - return new CachingKeyValueStore(inner, true); - } - - private KeyValueStore maybeWrapLogging(final KeyValueStore inner) { - if (!loggingEnabled()) { - return inner; - } - return new ChangeLoggingTimestampedKeyValueBytesStore(inner); - } - -} diff --git a/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/DelayedAsyncStoreBuilder.java b/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/DelayedAsyncStoreBuilder.java new file mode 100644 index 000000000..66efe67c1 --- /dev/null +++ b/kafka-client/src/main/java/org/apache/kafka/streams/state/internals/DelayedAsyncStoreBuilder.java @@ -0,0 +1,420 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * This source code is licensed under the Responsive Business Source License Agreement v1.0 + * available at: + * + * https://www.responsive.dev/legal/responsive-bsl-10 + * + * This software requires a valid Commercial License Key for production use. Trial and commercial + * licenses can be obtained at https://www.responsive.dev + */ + +package org.apache.kafka.streams.state.internals; + +import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveKeyValueStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveSessionStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveTimestampedKeyValueStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveTimestampedWindowStoreBuilder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.ResponsiveWindowStoreBuilder; +import java.lang.reflect.Field; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +public class DelayedAsyncStoreBuilder + extends AbstractAsyncStoreBuilder { + + private final StoreBuilder inner; + private StoreBuilder innerResolved; + + public DelayedAsyncStoreBuilder(final StoreBuilder inner) { + super(inner.name()); + this.inner = inner; + } + + // We need to implement equals to handle the case of stores shared by multiple processors + // which check StoreBuilder equality to avoid adding the same store again for each processor + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final DelayedAsyncStoreBuilder that = (DelayedAsyncStoreBuilder) o; + + return inner.equals(that.inner); + } + + @Override + public int hashCode() { + return inner.hashCode(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public T build() { + + if (innerResolved == null) { + if (inner instanceof FactoryWrappingStoreBuilder) { + innerResolved = (StoreBuilder) ((FactoryWrappingStoreBuilder) inner).storeFactory().builder(); + } else { + innerResolved = inner; + } + } + + if (innerResolved instanceof KeyValueStoreBuilder) { + return (T) getKeyValueStore((KeyValueStoreBuilder) innerResolved); + } else if (innerResolved instanceof TimestampedKeyValueStoreBuilder) { + return (T) getTimestampedKeyValueStore((TimestampedKeyValueStoreBuilder) innerResolved); + } else if (innerResolved instanceof WindowStoreBuilder) { + return (T) getWindowStore((WindowStoreBuilder) innerResolved); + } else if (innerResolved instanceof TimestampedWindowStoreBuilder) { + return (T) getTimestampedWindowStore((TimestampedWindowStoreBuilder) innerResolved); + } else if (innerResolved instanceof SessionStoreBuilder) { + return (T) getSessionStore((SessionStoreBuilder) innerResolved); + } else { + throw new UnsupportedOperationException("Other store types not yet supported"); + } + } + + @SuppressWarnings({"rawtypes"}) + private StateStore getKeyValueStore(final KeyValueStoreBuilder builder) { + final KeyValueBytesStoreSupplier storeSupplier; + if (builder instanceof ResponsiveStoreBuilder.ResponsiveKeyValueStoreBuilder) { + storeSupplier = ((ResponsiveKeyValueStoreBuilder) builder).storeSupplier(); + } else { + try { + final Field storeSupplierField = + KeyValueStoreBuilder.class.getDeclaredField("storeSupplier"); + storeSupplierField.setAccessible(true); + + storeSupplier = + (KeyValueBytesStoreSupplier) storeSupplierField.get(builder); + } catch (final Exception e) { + throw new IllegalStateException("Failed to retrieve store supplier for async " + + "key-value store", e); + } + } + return getKeyValueStore(storeSupplier, builder.time, builder.keySerde, builder.valueSerde); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private StateStore getKeyValueStore( + final KeyValueBytesStoreSupplier storeSupplier, + final Time time, + final Serde keySerde, + final Serde valueSerde + ) { + final KeyValueStore store = storeSupplier.get(); + + return new MeteredKeyValueStore<>( + wrapAsyncFlushingKV( + maybeWrapCachingKV( + maybeWrapLoggingKV(store)) + ), + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde + ); + } + + @SuppressWarnings({"rawtypes"}) + private StateStore getTimestampedKeyValueStore( + final TimestampedKeyValueStoreBuilder builder + ) { + final KeyValueBytesStoreSupplier storeSupplier; + if (builder instanceof ResponsiveStoreBuilder.ResponsiveTimestampedKeyValueStoreBuilder) { + storeSupplier = ((ResponsiveTimestampedKeyValueStoreBuilder) builder).storeSupplier(); + } else { + try { + final Field storeSupplierField = + TimestampedKeyValueStoreBuilder.class.getDeclaredField("storeSupplier"); + storeSupplierField.setAccessible(true); + + storeSupplier = (KeyValueBytesStoreSupplier) storeSupplierField.get(builder); + } catch (final Exception e) { + throw new IllegalStateException("Failed to build async timestamped key-value store", e); + } + } + + return getTimestampedKeyValueStore( + storeSupplier, + builder.time, + builder.keySerde, + builder.valueSerde + ); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private StateStore getTimestampedKeyValueStore( + final KeyValueBytesStoreSupplier storeSupplier, + final Time time, + final Serde keySerde, + final Serde valueSerde + ) { + final KeyValueStore store = storeSupplier.get(); + + return new MeteredTimestampedKeyValueStore<>( + wrapAsyncFlushingKV( + maybeWrapCachingKV( + maybeWrapLoggingTimestampedKV(store)) + ), + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde + ); + } + + @SuppressWarnings({"rawtypes"}) + private StateStore getWindowStore(final WindowStoreBuilder builder) { + final WindowBytesStoreSupplier storeSupplier; + if (builder instanceof ResponsiveWindowStoreBuilder) { + storeSupplier = ((ResponsiveWindowStoreBuilder) builder).storeSupplier(); + } else { + try { + final Field storeSupplierField = + WindowStoreBuilder.class.getDeclaredField("storeSupplier"); + storeSupplierField.setAccessible(true); + + storeSupplier = + (WindowBytesStoreSupplier) storeSupplierField.get(builder); + } catch (final Exception e) { + throw new IllegalStateException("Failed to retrieve store supplier for async " + + "window store", e); + } + } + return getWindowStore( + storeSupplier, + builder.time, + builder.keySerde, + builder.valueSerde + ); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private StateStore getWindowStore( + final WindowBytesStoreSupplier storeSupplier, + final Time time, + final Serde keySerde, + final Serde valueSerde + ) { + final WindowStore store = storeSupplier.get(); + + final long windowSize = storeSupplier.windowSize(); + return new MeteredWindowStore<>( + wrapAsyncFlushingWindow( + maybeWrapCachingWindow( + maybeWrapLoggingWindow(store, storeSupplier.retainDuplicates()), + windowSize, + storeSupplier.segmentIntervalMs()) + ), + windowSize, + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde + ); + } + + @SuppressWarnings({"rawtypes"}) + private StateStore getTimestampedWindowStore( + final TimestampedWindowStoreBuilder builder + ) { + final WindowBytesStoreSupplier storeSupplier; + if (builder instanceof ResponsiveTimestampedWindowStoreBuilder) { + storeSupplier = ((ResponsiveTimestampedWindowStoreBuilder) builder).storeSupplier(); + } else { + try { + final Field storeSupplierField = + TimestampedWindowStoreBuilder.class.getDeclaredField("storeSupplier"); + storeSupplierField.setAccessible(true); + + storeSupplier = (WindowBytesStoreSupplier) storeSupplierField.get(builder); + } catch (final Exception e) { + throw new IllegalStateException("Failed to build async timestamped window store", e); + } + } + + return getTimestampedWindowStore( + storeSupplier, + builder.time, + builder.keySerde, + builder.valueSerde + ); + } + + + @SuppressWarnings({"unchecked", "rawtypes"}) + private StateStore getTimestampedWindowStore( + final WindowBytesStoreSupplier storeSupplier, + final Time time, + final Serde keySerde, + final Serde valueSerde + ) { + final WindowStore store = storeSupplier.get(); + + final long windowSize = storeSupplier.windowSize(); + return new MeteredTimestampedWindowStore<>( + wrapAsyncFlushingWindow( + maybeWrapCachingWindow( + maybeWrapLoggingTimestampedWindow(store, storeSupplier.retainDuplicates()), + windowSize, + storeSupplier.segmentIntervalMs()) + ), + windowSize, + storeSupplier.metricsScope(), + time, + keySerde, + valueSerde + ); + } + + @SuppressWarnings({"rawtypes"}) + private StateStore getSessionStore(final SessionStoreBuilder builder) { + final SessionBytesStoreSupplier storeSupplier; + if (builder instanceof ResponsiveSessionStoreBuilder) { + storeSupplier = ((ResponsiveSessionStoreBuilder) builder).storeSupplier(); + } else { + try { + final Field storeSupplierField = + SessionStoreBuilder.class.getDeclaredField("storeSupplier"); + storeSupplierField.setAccessible(true); + + storeSupplier = + (SessionBytesStoreSupplier) storeSupplierField.get(builder); + } catch (final Exception e) { + throw new IllegalStateException("Failed to retrieve store supplier for async " + + "session store", e); + } + } + return getSessionStore( + storeSupplier, + builder.time, + builder.keySerde, + builder.valueSerde + ); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private StateStore getSessionStore( + final SessionBytesStoreSupplier storeSupplier, + final Time time, + final Serde keySerde, + final Serde valueSerde + ) { + final SessionStore store = storeSupplier.get(); + + return new MeteredSessionStore<>( + wrapAsyncFlushingSession( + maybeWrapCachingSession( + maybeWrapLoggingSession(store), + storeSupplier.segmentIntervalMs()) + ), + storeSupplier.metricsScope(), + keySerde, + valueSerde, + time + ); + } + + private KeyValueStore maybeWrapCachingKV( + final KeyValueStore inner + ) { + if (!cachingEnabled()) { + return inner; + } + return new CachingKeyValueStore(inner, true); + } + + private KeyValueStore maybeWrapLoggingKV( + final KeyValueStore inner + ) { + if (!loggingEnabled()) { + return inner; + } + return new ChangeLoggingKeyValueBytesStore(inner); + } + + private KeyValueStore maybeWrapLoggingTimestampedKV( + final KeyValueStore inner + ) { + if (!loggingEnabled()) { + return inner; + } + return new ChangeLoggingTimestampedKeyValueBytesStore(inner); + } + + private WindowStore maybeWrapCachingWindow( + final WindowStore inner, + final long windowSize, + final long segmentInterval + ) { + if (!cachingEnabled()) { + return inner; + } + return new CachingWindowStore(inner, windowSize, segmentInterval); + } + + private WindowStore maybeWrapLoggingWindow( + final WindowStore inner, + final boolean retainDuplicates + ) { + if (!loggingEnabled()) { + return inner; + } + return new ChangeLoggingWindowBytesStore( + inner, + retainDuplicates, + WindowKeySchema::toStoreKeyBinary + ); + } + + private WindowStore maybeWrapLoggingTimestampedWindow( + final WindowStore inner, + final boolean retainDuplicates + ) { + if (!loggingEnabled()) { + return inner; + } + return new ChangeLoggingTimestampedWindowBytesStore(inner, retainDuplicates); + } + + private SessionStore maybeWrapCachingSession( + final SessionStore inner, + final long segmentInterval + ) { + if (!cachingEnabled()) { + return inner; + } + return new CachingSessionStore(inner, segmentInterval); + } + + private SessionStore maybeWrapLoggingSession( + final SessionStore inner + ) { + if (!loggingEnabled()) { + return inner; + } + return new ChangeLoggingSessionBytesStore(inner); + } + +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java index 6deeaa566..a721890ed 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/ResponsiveKafkaStreamsTest.java @@ -15,38 +15,58 @@ import static dev.responsive.kafka.api.config.ResponsiveConfig.COMPATIBILITY_MODE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ENV_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.RESPONSIVE_ORG_CONFIG; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeRecords; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.readOutput; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; import com.datastax.oss.driver.api.core.CqlSession; +import dev.responsive.kafka.api.async.AsyncProcessorWrapper; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers; import dev.responsive.kafka.internal.db.CassandraClient; import dev.responsive.kafka.internal.db.CassandraClientFactory; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.testutils.IntegrationTestUtils; +import java.time.Duration; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaClientSupplier; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/async/AsyncProcessorIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/async/AsyncProcessorIntegrationTest.java index 5ecfca938..df2034efd 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/async/AsyncProcessorIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/async/AsyncProcessorIntegrationTest.java @@ -33,13 +33,17 @@ import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -52,7 +56,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.async.AsyncProcessorWrapper; import dev.responsive.kafka.api.config.StorageBackend; +import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.testutils.ResponsiveConfigParam; @@ -70,6 +76,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -85,10 +92,14 @@ import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; @@ -141,6 +152,7 @@ public class AsyncProcessorIntegrationTest { private static final int INPUT_RECORDS_PER_KEY = 10; private String inputTopic; + private String inputTableTopic; private String outputTopic; private String inKVStore; @@ -165,6 +177,7 @@ public void before( this.name = info.getDisplayName().replace("()", ""); this.inputTopic = name + "input"; + this.inputTableTopic = name + "input-table"; this.outputTopic = name + "output"; this.inKVStore = name + "in"; this.asyncStore1 = name + "a1"; @@ -179,21 +192,118 @@ public void before( createTopicsAndWait( admin, - Map.of(inputTopic(), numInputPartitions, outputTopic(), numOutputPartitions) + Map.of( + inputTopic, numInputPartitions, + inputTableTopic, numInputPartitions, + outputTopic, numOutputPartitions) ); } @AfterEach public void after() { - admin.deleteTopics(List.of(inputTopic(), outputTopic())); + admin.deleteTopics(List.of(inputTopic, inputTableTopic, outputTopic)); } - private String inputTopic() { - return name + "." + inputTopic; + @Test + public void shouldWrapDSLWithAsync() throws Exception { + final Map properties = getMutableProperties(); + properties.put(PROCESSOR_WRAPPER_CLASS_CONFIG, AsyncProcessorWrapper.class); + properties.put(DSL_STORE_SUPPLIERS_CLASS_CONFIG, ResponsiveDslStoreSuppliers.class); + properties.put(COMMIT_INTERVAL_MS_CONFIG, 0L); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + final StreamsBuilder builder = new StreamsBuilder( + new TopologyConfig(new StreamsConfig(properties)) + ); + + final KStream stream = builder.stream(inputTopic); + + stream + .groupByKey() + .count() + .toStream() + .mapValues(String::valueOf) + .to(outputTopic); + + final Properties props = new Properties(); + props.putAll(properties); + final Topology topology = builder.build(props); + + final KafkaProducer producer = new KafkaProducer<>(properties); + + final List> streamInput = new LinkedList<>(); + streamInput.add(new KeyValue<>("A", "a1")); + streamInput.add(new KeyValue<>("B", "b1")); + streamInput.add(new KeyValue<>("A", "a2")); + pipeRecords(producer, inputTopic, streamInput); + + try (final var streams = new ResponsiveKafkaStreams(topology, properties)) { + startAppAndAwaitRunning(Duration.ofSeconds(30), streams); + + final List> kvs = readOutput( + outputTopic, 0, 3, numOutputPartitions, false, properties + ); + + assertThat(kvs, containsInAnyOrder( + new KeyValue<>("A", "1"), + new KeyValue<>("B", "1"), + new KeyValue<>("A", "2")) + ); + } } - private String outputTopic() { - return name + "." + outputTopic; + @Test + public void shouldExecuteDSLStreamTableJoinWithAsync() throws Exception { + final Map properties = getMutableProperties(); + properties.put(PROCESSOR_WRAPPER_CLASS_CONFIG, AsyncProcessorWrapper.class); + properties.put(DSL_STORE_SUPPLIERS_CLASS_CONFIG, ResponsiveDslStoreSuppliers.class); + properties.put(COMMIT_INTERVAL_MS_CONFIG, 0L); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + final StreamsBuilder builder = new StreamsBuilder( + new TopologyConfig(new StreamsConfig(properties)) + ); + + final KStream stream = builder.stream(inputTopic); + final KTable table = builder.table(inputTableTopic); + + stream + .join(table, (l, r) -> l + "-" + r) + .to(outputTopic); + + + final Properties props = new Properties(); + props.putAll(properties); + final Topology topology = builder.build(props); + + final KafkaProducer producer = new KafkaProducer<>(properties); + + final List> tableInput = new LinkedList<>(); + tableInput.add(new KeyValue<>("A", "a")); + tableInput.add(new KeyValue<>("B", "b")); + tableInput.add(new KeyValue<>("C", "c")); + pipeRecords(producer, inputTableTopic, tableInput); + + final List> streamInput = new LinkedList<>(); + streamInput.add(new KeyValue<>("A", "a-joined")); + streamInput.add(new KeyValue<>("B", "b-joined")); + streamInput.add(new KeyValue<>("C", "c-joined")); + pipeRecords(producer, inputTopic, streamInput); + + try (final var streams = new ResponsiveKafkaStreams(topology, properties)) { + startAppAndAwaitRunning(Duration.ofSeconds(30), streams); + + final List> kvs = readOutput( + outputTopic, 0, 3, numOutputPartitions, false, properties + ); + + final List> expectedOutput = new LinkedList<>(); + expectedOutput.add(new KeyValue<>("A", "a-joined-a")); + expectedOutput.add(new KeyValue<>("B", "b-joined-b")); + expectedOutput.add(new KeyValue<>("C", "c-joined-c")); + + assertThat(kvs.containsAll(expectedOutput), is(true)); + } } @Test @@ -235,7 +345,7 @@ public void shouldExecuteMultipleMixedAsyncProcessorsNoCaching() throws Exceptio final StreamsBuilder builder = new StreamsBuilder(); final KStream input = builder.stream( - inputTopic(), + inputTopic, Consumed.with( Serdes.String(), Serdes.serdeFrom(new InputRecordSerializer(), new InputRecordDeserializer()) @@ -298,7 +408,7 @@ public void shouldExecuteMultipleMixedAsyncProcessorsNoCaching() throws Exceptio )), Named.as("S2"), asyncStore2) - .to(outputTopic(), Produced.with(Serdes.String(), Serdes.String())); + .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); final List caughtExceptions = new LinkedList<>(); try (final var streams = new ResponsiveKafkaStreams(builder.build(), properties)) { @@ -309,11 +419,11 @@ public void shouldExecuteMultipleMixedAsyncProcessorsNoCaching() throws Exceptio startAppAndAwaitRunning(Duration.ofSeconds(30), streams); // When: - pipeRecords(producer, inputTopic(), inputRecords); + pipeRecords(producer, inputTopic, inputRecords); // Then: final List> kvs = readOutput( - outputTopic(), 0, numInputRecords, numOutputPartitions, false, properties + outputTopic, 0, numInputRecords, numOutputPartitions, false, properties ); final Map> observedOutputValuesByKey = new HashMap<>(keys.size()); @@ -373,7 +483,7 @@ public void shouldProcessStatelessEventsInOrderByKey() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final KStream input = builder.stream( - inputTopic(), + inputTopic, Consumed.with( Serdes.String(), Serdes.serdeFrom(new InputRecordSerializer(), new InputRecordDeserializer()) @@ -399,7 +509,7 @@ public void shouldProcessStatelessEventsInOrderByKey() throws Exception { latestValues, inputRecordsLatch), outKVStore) - .to(outputTopic(), Produced.with(Serdes.String(), Serdes.String())); + .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); final List caughtExceptions = new LinkedList<>(); try (final var streams = new ResponsiveKafkaStreams(builder.build(), properties)) { @@ -410,7 +520,7 @@ public void shouldProcessStatelessEventsInOrderByKey() throws Exception { startAppAndAwaitRunning(Duration.ofSeconds(30), streams); // When: - pipeRecords(producer, inputTopic(), inputRecords); + pipeRecords(producer, inputTopic, inputRecords); // Then: final long timeoutMs = 60_000L + DEFAULT_ASYNC_SLEEP_DURATION_MS * numInputRecords; @@ -421,7 +531,7 @@ public void shouldProcessStatelessEventsInOrderByKey() throws Exception { } final var kvs = readOutput( - outputTopic(), 0, numInputRecords, numOutputPartitions, false, properties + outputTopic, 0, numInputRecords, numOutputPartitions, false, properties ); final Map latestByKey = new HashMap<>(); @@ -492,7 +602,7 @@ public void shouldProcessStatefulEventsInOrderByKey() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final KStream input = builder.stream( - inputTopic(), + inputTopic, Consumed.with( Serdes.String(), Serdes.serdeFrom(new InputRecordSerializer(), new InputRecordDeserializer()) @@ -523,7 +633,7 @@ public void shouldProcessStatefulEventsInOrderByKey() throws Exception { latestValues, inputRecordsLatch), outKVStore) - .to(outputTopic(), Produced.with(Serdes.String(), Serdes.String())); + .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); final List caughtExceptions = new LinkedList<>(); try (final var streams = new ResponsiveKafkaStreams(builder.build(), properties)) { @@ -534,7 +644,7 @@ public void shouldProcessStatefulEventsInOrderByKey() throws Exception { startAppAndAwaitRunning(Duration.ofSeconds(30), streams); // When: - pipeRecords(producer, inputTopic(), inputRecords); + pipeRecords(producer, inputTopic, inputRecords); // Then: final long timeout = 60_000L + DEFAULT_ASYNC_SLEEP_DURATION_MS * numInputRecords; @@ -545,7 +655,7 @@ public void shouldProcessStatefulEventsInOrderByKey() throws Exception { } final var kvs = readOutput( - outputTopic(), 0, numInputRecords, numOutputPartitions, false, properties + outputTopic, 0, numInputRecords, numOutputPartitions, false, properties ); final Map latestByKey = new HashMap<>(); @@ -570,7 +680,7 @@ public void shouldThrowIfStoresNotConnectedCorrectly() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final KStream input = builder.stream( - inputTopic(), + inputTopic, Consumed.with( Serdes.String(), Serdes.serdeFrom(new InputRecordSerializer(), new InputRecordDeserializer()) diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java index d523df003..01ae37a3d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java @@ -530,12 +530,6 @@ public RestoreRecordRecordingConsumer( this.recorded = recorded; } - @Override - @SuppressWarnings("deprecation") - public ConsumerRecords poll(long timeoutMs) { - return record(super.poll(timeoutMs)); - } - @Override public ConsumerRecords poll(Duration timeout) { return record(super.poll(timeout)); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java index 9510af6e9..abe58e5f1 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java @@ -27,6 +27,7 @@ import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG; import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG; @@ -35,6 +36,7 @@ import static org.hamcrest.Matchers.equalTo; import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.async.AsyncProcessorWrapper; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.api.stores.ResponsiveStores; @@ -408,6 +410,8 @@ public boolean await() { public void shouldDoStreamStreamJoin() throws Exception { // Given: final Map properties = getMutablePropertiesWithStringSerdes(); + properties.put(PROCESSOR_WRAPPER_CLASS_CONFIG, AsyncProcessorWrapper.class); + properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); final StreamsBuilder builder = new StreamsBuilder(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java index 650b05c30..a4448ab10 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveProducerTest.java @@ -23,13 +23,13 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import dev.responsive.kafka.internal.clients.ResponsiveProducer; import dev.responsive.kafka.internal.clients.ResponsiveProducer.Listener; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; @@ -143,7 +143,7 @@ public void shouldNotifyOnSendOffsetsToTransaction() { PARTITION1, new OffsetAndMetadata(10), PARTITION2, new OffsetAndMetadata(11) ), - "foo" + new ConsumerGroupMetadata("groupId") ); // then: @@ -158,7 +158,10 @@ PARTITION2, new OffsetAndMetadata(11L) @Test public void shouldThrowExceptionFromCommitCallback() { // given: - producer.sendOffsetsToTransaction(Map.of(PARTITION1, new OffsetAndMetadata(10)), "foo"); + producer.sendOffsetsToTransaction( + Map.of(PARTITION1, new OffsetAndMetadata(10)), + new ConsumerGroupMetadata("groupId") + ); doThrow(new RuntimeException("oops")).when(listener1).onCommit(); // when/then: diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfigTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfigTest.java index f1703533d..7f6a88071 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfigTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/config/ResponsiveStreamsConfigTest.java @@ -13,7 +13,6 @@ package dev.responsive.kafka.internal.config; import static dev.responsive.kafka.internal.config.ResponsiveStreamsConfig.verifyNoStandbys; -import static dev.responsive.kafka.internal.config.ResponsiveStreamsConfig.verifyNotEosV1; import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Map; @@ -56,16 +55,4 @@ public void shouldNotThrowWhenNumStandbysSetToZero() { ))); } - @SuppressWarnings("deprecation") - @Test - public void shouldThrowOnEOSV1() { - assertThrows( - ConfigException.class, - () -> verifyNotEosV1(new StreamsConfig(Map.of( - StreamsConfig.APPLICATION_ID_CONFIG, "foo", - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "foo.bar", - StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE - ))) - ); - } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index 8476394df..41ed1b31a 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -396,12 +396,12 @@ public static List> readOutput( consumer.assign(partitions); partitions.forEach(tp -> consumer.seek(tp, from)); - final long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(300); + final long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(60); final List> result = new ArrayList<>(); while (result.size() < numEvents) { // this is configured to only poll one record at a time, so we // can guarantee we won't accidentally poll more than numEvents - final ConsumerRecords polled = consumer.poll(Duration.ofSeconds(30)); + final ConsumerRecords polled = consumer.poll(Duration.ofSeconds(5)); for (ConsumerRecord rec : polled) { result.add(new KeyValue<>(rec.key(), rec.value())); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/KeyValueStoreComparator.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/KeyValueStoreComparator.java index 3b72d7c20..c10b0266f 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/KeyValueStoreComparator.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/KeyValueStoreComparator.java @@ -17,7 +17,6 @@ import java.util.ArrayList; import java.util.List; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -169,18 +168,6 @@ public String name() { return this.sourceOfTruth.name(); } - @Override - @Deprecated - public void init(final ProcessorContext context, final StateStore root) { - if (context instanceof StateStoreContext) { - init((StateStoreContext) context, root); - } else { - throw new UnsupportedOperationException( - "Use ResponsiveSessionStore#init(StateStoreContext, StateStore) instead." - ); - } - } - @Override public void init(final StateStoreContext context, final StateStore root) { StateStoreContext proxy = (StateStoreContext) Proxy.newProxyInstance( diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/SessionStoreComparator.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/SessionStoreComparator.java index 81e81f3bc..d077b970c 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/SessionStoreComparator.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/SessionStoreComparator.java @@ -18,7 +18,6 @@ import java.util.List; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -78,18 +77,6 @@ public void init(final StateStoreContext storeContext, final StateStore root) { this.candidate.init(proxy, root); } - @Override - @Deprecated - public void init(final ProcessorContext context, final StateStore root) { - if (context instanceof StateStoreContext) { - init((StateStoreContext) context, root); - } else { - throw new UnsupportedOperationException( - "Use ResponsiveSessionStore#init(StateStoreContext, StateStore) instead." - ); - } - } - @Override public void flush() { this.sourceOfTruth.flush(); diff --git a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java index fcf029543..8a9ec1b03 100644 --- a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java +++ b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java @@ -44,7 +44,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -263,16 +262,9 @@ private GlobalStreamThread getThread( final TestStoreSupplier storeSupplier, final StateRestoreListener restoreListener, final File tempDir) { - final Time time = new SystemTime(); + final Time time = Time.SYSTEM; final InternalTopologyBuilder builder = new InternalTopologyBuilder(); builder.addGlobalStore( - new StoreBuilderWrapper( - new KeyValueStoreBuilder<>( - storeSupplier, - new ByteArraySerde(), - new ByteArraySerde(), - time).withLoggingDisabled() - ), "global", null, null, @@ -291,7 +283,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { global.put(record.key(), record.value()); } - } + }, + false ); final String baseDirectoryName = tempDir.getAbsolutePath(); diff --git a/operator/build.gradle.kts b/operator/build.gradle.kts index 02f2edd4c..57efe793b 100644 --- a/operator/build.gradle.kts +++ b/operator/build.gradle.kts @@ -16,6 +16,10 @@ plugins { id("responsive.helm") } +repositories { + mavenLocal() +} + application { mainClass.set("dev.responsive.k8s.operator.OperatorMain") } diff --git a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java index a06493608..06531448c 100644 --- a/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java +++ b/operator/src/test/java/dev/responsive/k8s/operator/reconciler/KafkaStreamsPolicyPluginTest.java @@ -220,7 +220,7 @@ public void shouldSetSecondaryMapperForDeploymentEventSource() { final Optional> src = maybePullSrc(sources, Deployment.class); assert src.isPresent(); - final var s2pMapper = src.get().getConfiguration() + final var s2pMapper = src.get().configuration() .getSecondaryToPrimaryMapper(); final var ids = s2pMapper.toPrimaryResourceIDs(deployment); assertThat(ids, contains(new ResourceID("bar", "foo"))); @@ -235,7 +235,7 @@ public void shouldSetPrimaryToSecondaryMapperForDeploymentEventSource() { final Optional> src = maybePullSrc(sources, Deployment.class); assert src.isPresent(); - final var s2pMapper = src.get().getConfiguration() + final var s2pMapper = src.get().configuration() .getPrimaryToSecondaryMapper(); final var ids = s2pMapper.toSecondaryResourceIDs(policy); assertThat(ids, contains(new ResourceID("baz", "biz"))); @@ -469,7 +469,7 @@ public void shouldSetSecondaryMapperForStatefulSetEventSource() { final Optional> src = maybePullSrc(sources, StatefulSet.class); assert src.isPresent(); - final var s2pMapper = src.get().getConfiguration() + final var s2pMapper = src.get().configuration() .getSecondaryToPrimaryMapper(); final var ids = s2pMapper.toPrimaryResourceIDs(statefulSet); assertThat(ids, contains(new ResourceID("bar", "foo"))); @@ -484,7 +484,7 @@ public void shouldSetPrimaryToSecondaryMapperForStatefulSetEventSource() { final Optional> src = maybePullSrc(sources, StatefulSet.class); assert src.isPresent(); - final var s2pMapper = src.get().getConfiguration() + final var s2pMapper = src.get().configuration() .getPrimaryToSecondaryMapper(); final var ids = s2pMapper.toSecondaryResourceIDs(policy); assertThat(ids, contains(new ResourceID("baz", "biz"))); @@ -634,7 +634,7 @@ private Optional) { - if (((InformerEventSource) source).getConfiguration().getResourceClass() + if (((InformerEventSource) source).configuration().getResourceClass() .equals(clazz)) { return Optional.of((InformerEventSource) source); } diff --git a/responsive-spring/build.gradle.kts b/responsive-spring/build.gradle.kts index 8d25b1f15..101fd9fa0 100644 --- a/responsive-spring/build.gradle.kts +++ b/responsive-spring/build.gradle.kts @@ -39,6 +39,10 @@ java { } } +repositories { + mavenLocal() +} + version = project(":kafka-client").version dependencies { diff --git a/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java b/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java index 16791cde4..ca2fe6e81 100644 --- a/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java +++ b/responsive-spring/src/test/java/dev/responsive/spring/KafkaStreamsApp.java @@ -68,7 +68,7 @@ public KafkaStreamsConfiguration streamsConfigs() { props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.MONGO_DB.name()); props.put(ResponsiveConfig.MONGO_ENDPOINT_CONFIG, "mongodb://localhost:27017"); - props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, RecoveringDeserializationExceptionHandler.class); props.put( RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, diff --git a/responsive-test-utils/build.gradle.kts b/responsive-test-utils/build.gradle.kts index bd9397b06..a75582d65 100644 --- a/responsive-test-utils/build.gradle.kts +++ b/responsive-test-utils/build.gradle.kts @@ -14,6 +14,10 @@ plugins { id("responsive.java-library-conventions") } +repositories { + mavenLocal() +} + version = project(":kafka-client").version dependencies { diff --git a/settings.gradle.kts b/settings.gradle.kts index 72225c6d0..b38407e44 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -43,7 +43,7 @@ dependencyResolutionManagement { versionCatalogs { create("libs") { version("jackson", "2.15.2") - version("kafka", "3.7.1") + version("kafka", "4.0.0-SNAPSHOT") version("scylla", "4.15.0.0") version("javaoperatorsdk", "4.9.6") version("grpc", "1.52.1")