diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index a963f3829..d0108d1ea 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; +import dev.responsive.kafka.api.async.internals.AsyncUtils; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers; @@ -510,26 +511,16 @@ public Params withTime(final Time time) { // that it's impossible to use a Params instance that hasn't called build(), // but that felt a little extra public Params build() { - final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); - - final KafkaClientSupplier delegateKafkaClientSupplier; - if (asyncThreadPoolSize > 0) { - - final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry( - streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG), - asyncThreadPoolSize, - responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG), - metrics - ); - delegateKafkaClientSupplier = new AsyncStreamsKafkaClientSupplier( - innerClientSupplier, - asyncRegistry - ); - this.asyncThreadPoolRegistry = Optional.of(asyncRegistry); - } else { - delegateKafkaClientSupplier = innerClientSupplier; - this.asyncThreadPoolRegistry = Optional.empty(); - } + this.asyncThreadPoolRegistry = AsyncUtils.configuredAsyncThreadPool( + responsiveConfig, + streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG), + metrics + ); + final KafkaClientSupplier delegateKafkaClientSupplier = + asyncThreadPoolRegistry.isPresent() ? new AsyncStreamsKafkaClientSupplier( + innerClientSupplier, + asyncThreadPoolRegistry.get() + ) : innerClientSupplier; this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier( delegateKafkaClientSupplier, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java index 84647e97d..9f31be5cf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java @@ -182,6 +182,7 @@ public void scheduleForProcessing( = inFlight.computeIfAbsent(asyncProcessorId, k -> new ConcurrentHashMap<>()); for (final AsyncEvent event : events) { + log.info("Scheduled event {}", event.inputRecord()); try { queueSemaphore.acquire(); } catch (final InterruptedException e) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java index 3ae987e5c..9b8d5d3f6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java @@ -13,13 +13,18 @@ package dev.responsive.kafka.api.async.internals; import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME; +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder; import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.StoreType; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -130,4 +135,24 @@ public static int processorRecordContextHashCode( return result; } + public static Optional configuredAsyncThreadPool( + final ResponsiveConfig responsiveConfig, + final int numStreamThreads, + final ResponsiveMetrics metrics + ) { + final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); + + if (asyncThreadPoolSize > 0) { + final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry( + numStreamThreads, + asyncThreadPoolSize, + responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG), + metrics + ); + return Optional.of(asyncRegistry); + } else { + return Optional.empty(); + } + } + } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java index 73caed274..0acfde7b1 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java @@ -14,15 +14,21 @@ import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener; +import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; +import dev.responsive.kafka.api.async.internals.AsyncUtils; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.clients.TTDCassandraClient; import dev.responsive.kafka.internal.clients.TTDMockAdmin; import dev.responsive.kafka.internal.config.InternalSessionConfigs; +import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; +import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.utils.SessionClients; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.Optional; import java.util.Properties; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -139,12 +145,28 @@ private static Properties testDriverProps( final var restoreListener = mockRestoreListener(props); sessionClients.initialize(restoreListener.metrics(), restoreListener); - props.putAll(new InternalSessionConfigs.Builder() + final var metrics = new ResponsiveMetrics(new Metrics()); + final String appId = userProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + metrics.initializeTags( + appId, + appId + "-client", + ClientVersionMetadata.loadVersionMetadata(), + Collections.emptyMap() + ); + + final var sessionConfig = new InternalSessionConfigs.Builder() .withSessionClients(sessionClients) .withStoreRegistry(client.storeRegistry()) - .withTopologyDescription(topologyDescription) - .build() - ); + .withMetrics(metrics) + .withTopologyDescription(topologyDescription); + + AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(props), 1, metrics) + .ifPresent(threadPoolRegistry -> { + threadPoolRegistry.startNewAsyncThreadPool("Test worker"); + sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry); + }); + + props.putAll(sessionConfig.build()); return props; } diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java index 5900301ab..c5c918e31 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java @@ -12,10 +12,12 @@ package dev.responsive.kafka.api; +import static dev.responsive.kafka.api.async.AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier; import static dev.responsive.kafka.testutils.processors.Deduplicator.deduplicatorApp; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.api.stores.TtlProvider; @@ -38,6 +40,9 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.hamcrest.MatcherAssert; @@ -152,7 +157,8 @@ public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) { @ParameterizedTest @EnumSource(SchemaTypes.KVSchema.class) - public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) { + public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) + throws InterruptedException { // Given: final Duration defaultTtl = Duration.ofMillis(15); @@ -261,6 +267,7 @@ private ResponsiveTopologyTestDriver setupDriver(final Topology topology) { final Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.put(ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, 2); return new ResponsiveTopologyTestDriver(topology, props, STARTING_TIME); } @@ -283,6 +290,20 @@ private Topology topology(final ResponsiveKeyValueParams storeParams) { .join(people, (bid, person) -> bid + "," + person) // state is the 6th column .filter((k, v) -> v.split(",")[5].equals("CA")) + .processValues(createAsyncProcessorSupplier(() -> new FixedKeyProcessor() { + + private FixedKeyProcessorContext context; + + @Override + public void init(final FixedKeyProcessorContext context) { + this.context = context; + } + + @Override + public void process(final FixedKeyRecord fixedKeyRecord) { + context.forward(fixedKeyRecord); + } + })) .to("output"); return builder.build();