From ee47f1261160c3d06260edcc71d57e1e146c8f79 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 9 Dec 2024 17:53:11 -0800 Subject: [PATCH] Support async in Responsive TTD (#400) Combination of #398 and #399 for the full fix --- .../kafka/api/ResponsiveKafkaStreams.java | 33 +++----- .../api/async/internals/AsyncProcessor.java | 19 +---- .../api/async/internals/AsyncThreadPool.java | 1 + .../kafka/api/async/internals/AsyncUtils.java | 41 +++++++++ .../internal/db/partitioning/Segmenter.java | 4 + .../api/ResponsiveTopologyTestDriver.java | 84 ++++++++++++++++--- .../internal/clients/TTDCassandraClient.java | 1 - .../kafka/internal/db/TTDWindowTable.java | 8 +- .../internal/stores/WindowStoreStub.java | 7 +- .../org/apache/kafka/streams/TTDUtils.java | 32 +++++++ ...veTopologyTestDriverKeyValueStoreTest.java | 30 ++++++- 11 files changed, 198 insertions(+), 62 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index a963f3829..c908d7fba 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -12,8 +12,6 @@ package dev.responsive.kafka.api; -import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; -import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; @@ -32,6 +30,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; +import dev.responsive.kafka.api.async.internals.AsyncUtils; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers; @@ -510,26 +509,16 @@ public Params withTime(final Time time) { // that it's impossible to use a Params instance that hasn't called build(), // but that felt a little extra public Params build() { - final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); - - final KafkaClientSupplier delegateKafkaClientSupplier; - if (asyncThreadPoolSize > 0) { - - final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry( - streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG), - asyncThreadPoolSize, - responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG), - metrics - ); - delegateKafkaClientSupplier = new AsyncStreamsKafkaClientSupplier( - innerClientSupplier, - asyncRegistry - ); - this.asyncThreadPoolRegistry = Optional.of(asyncRegistry); - } else { - delegateKafkaClientSupplier = innerClientSupplier; - this.asyncThreadPoolRegistry = Optional.empty(); - } + this.asyncThreadPoolRegistry = AsyncUtils.configuredAsyncThreadPool( + responsiveConfig, + streamsConfig.getInt(NUM_STREAM_THREADS_CONFIG), + metrics + ); + final KafkaClientSupplier delegateKafkaClientSupplier = + asyncThreadPoolRegistry.isPresent() ? new AsyncStreamsKafkaClientSupplier( + innerClientSupplier, + asyncThreadPoolRegistry.get() + ) : innerClientSupplier; this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier( delegateKafkaClientSupplier, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 81e3ba163..00efebc0d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -12,9 +12,9 @@ package dev.responsive.kafka.api.async.internals; +import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_FLUSH_INTERVAL_MS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG; -import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry; import dev.responsive.kafka.api.async.AsyncProcessorSupplier; import dev.responsive.kafka.api.async.internals.contexts.AsyncUserProcessorContext; @@ -40,7 +40,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -218,7 +217,7 @@ private void initFields( final long punctuationInterval = configs.getLong(ASYNC_FLUSH_INTERVAL_MS_CONFIG); final int maxEventsPerKey = configs.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG); - this.asyncThreadPoolRegistration = getAsyncThreadPool(taskContext, streamThreadName); + this.asyncThreadPoolRegistration = getAsyncThreadPool(appConfigs, streamThreadName); asyncThreadPoolRegistration.registerAsyncProcessor(taskId, this::flushPendingEventsForCommit); asyncThreadPoolRegistration.threadPool().maybeInitThreadPoolMetrics(); @@ -756,17 +755,5 @@ private void verifyConnectedStateStores( } } - private static AsyncThreadPoolRegistration getAsyncThreadPool( - final ProcessingContext context, - final String streamThreadName - ) { - try { - final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(context.appConfigs()); - return registry.asyncThreadPoolForStreamThread(streamThreadName); - } catch (final Exception e) { - throw new ConfigException( - "Unable to locate async thread pool registry. Make sure to configure " - + ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e); - } - } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java index 84647e97d..860dcd547 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java @@ -182,6 +182,7 @@ public void scheduleForProcessing( = inFlight.computeIfAbsent(asyncProcessorId, k -> new ConcurrentHashMap<>()); for (final AsyncEvent event : events) { + log.trace("Scheduled event {}", event.inputRecord()); try { queueSemaphore.acquire(); } catch (final InterruptedException e) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java index 3ae987e5c..926237fcf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java @@ -13,14 +13,21 @@ package dev.responsive.kafka.api.async.internals; import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME; +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; +import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder; import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.StoreType; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.StoreBuilder; @@ -130,4 +137,38 @@ public static int processorRecordContextHashCode( return result; } + public static Optional configuredAsyncThreadPool( + final ResponsiveConfig responsiveConfig, + final int numStreamThreads, + final ResponsiveMetrics metrics + ) { + final int asyncThreadPoolSize = responsiveConfig.getInt(ASYNC_THREAD_POOL_SIZE_CONFIG); + + if (asyncThreadPoolSize > 0) { + final AsyncThreadPoolRegistry asyncRegistry = new AsyncThreadPoolRegistry( + numStreamThreads, + asyncThreadPoolSize, + responsiveConfig.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG), + metrics + ); + return Optional.of(asyncRegistry); + } else { + return Optional.empty(); + } + } + + public static AsyncThreadPoolRegistration getAsyncThreadPool( + final Map appConfigs, + final String streamThreadName + ) { + try { + final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(appConfigs); + return registry.asyncThreadPoolForStreamThread(streamThreadName); + } catch (final Exception e) { + throw new ConfigException( + "Unable to locate async thread pool registry. Make sure to configure " + + ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e); + } + } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java index a3142576d..1c24d3643 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java @@ -143,6 +143,10 @@ public Segmenter( ); } + public long retentionPeriodMs() { + return retentionPeriodMs; + } + public long segmentIntervalMs() { return segmentIntervalMs; } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java index 73caed274..3ac86d74f 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java @@ -12,28 +12,42 @@ package dev.responsive.kafka.api; +import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool; +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener; +import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration; +import dev.responsive.kafka.api.async.internals.AsyncUtils; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.clients.TTDCassandraClient; import dev.responsive.kafka.internal.clients.TTDMockAdmin; import dev.responsive.kafka.internal.config.InternalSessionConfigs; +import dev.responsive.kafka.internal.metrics.ClientVersionMetadata; +import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; import dev.responsive.kafka.internal.utils.SessionClients; import java.time.Duration; import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TTDUtils.TopologyTestDriverAccessor; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.test.TestRecord; -public class ResponsiveTopologyTestDriver extends TopologyTestDriver { +public class ResponsiveTopologyTestDriver extends TopologyTestDriverAccessor { public static final String RESPONSIVE_TTD_ORG = "Responsive"; public static final String RESPONSIVE_TTD_ENV = "TopologyTestDriver"; private final TTDCassandraClient client; + private final Optional asyncThreadPool; /** * Create a new test diver instance with default test properties. @@ -91,7 +105,7 @@ public ResponsiveTopologyTestDriver( ) { this( topology, - config, + baseProps(config), initialWallClockTime, new TTDCassandraClient( new TTDMockAdmin(baseProps(config), topology), @@ -109,9 +123,16 @@ public ResponsiveTopologyTestDriver( @Override public void advanceWallClockTime(final Duration advance) { client.advanceWallClockTime(advance); + client.flush(); super.advanceWallClockTime(advance); } + public void flush() { + asyncThreadPool.ifPresent(AsyncThreadPoolRegistration::flushAllAsyncEvents); + client.flush(); + super.advanceWallClockTime(Duration.ZERO); + } + private ResponsiveTopologyTestDriver( final Topology topology, final Properties config, @@ -124,28 +145,55 @@ private ResponsiveTopologyTestDriver( initialWallClockTime ); this.client = cassandraClient; + this.asyncThreadPool = getAsyncThreadPoolRegistration(super.props()); + } + + @Override + protected void pipeRecord( + final String topic, + final TestRecord record, + final Serializer keySerializer, + final Serializer valueSerializer, + final Instant time + ) { + super.pipeRecord(topic, record, keySerializer, valueSerializer, time); + flush(); } private static Properties testDriverProps( - final Properties userProps, + final Properties baseProps, final TopologyDescription topologyDescription, final TTDCassandraClient client ) { - final Properties props = baseProps(userProps); - final SessionClients sessionClients = new SessionClients( Optional.empty(), Optional.of(client), Optional.empty(), false, client.mockAdmin() ); - final var restoreListener = mockRestoreListener(props); + final var restoreListener = mockRestoreListener(baseProps); sessionClients.initialize(restoreListener.metrics(), restoreListener); - props.putAll(new InternalSessionConfigs.Builder() + final var metrics = new ResponsiveMetrics(new Metrics()); + final String appId = baseProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + metrics.initializeTags( + appId, + appId + "-client", + ClientVersionMetadata.loadVersionMetadata(), + Collections.emptyMap() + ); + + final var sessionConfig = new InternalSessionConfigs.Builder() .withSessionClients(sessionClients) .withStoreRegistry(client.storeRegistry()) - .withTopologyDescription(topologyDescription) - .build() - ); - return props; + .withMetrics(metrics) + .withTopologyDescription(topologyDescription); + + AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(baseProps), 1, metrics) + .ifPresent(threadPoolRegistry -> { + threadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName()); + sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry); + }); + + baseProps.putAll(sessionConfig.build()); + return baseProps; } @SuppressWarnings("deprecation") @@ -175,6 +223,20 @@ private static MockTime mockTime(final Instant initialWallClockTime) { return mockTime; } + private static Optional getAsyncThreadPoolRegistration( + final Properties props + ) { + final int asyncThreadPoolSize = (int) props.getOrDefault(ASYNC_THREAD_POOL_SIZE_CONFIG, 0); + + if (asyncThreadPoolSize > 0) { + final Map configMap = new HashMap<>(); + // stupid conversion to deal with Map vs Properties type discrepancy + props.forEach((key, value) -> configMap.put(key.toString(), value)); + return Optional.of(getAsyncThreadPool(configMap, Thread.currentThread().getName())); + } else { + return Optional.empty(); + } + } } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java index 57a46328b..aa027fc61 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java @@ -67,7 +67,6 @@ public long currentWallClockTimeMs() { } public void advanceWallClockTime(final Duration advance) { - flush(); time.sleep(advance.toMillis()); } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java index 09571bc62..94fbf0128 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java @@ -46,7 +46,7 @@ public TTDWindowTable( ) { super(client); this.name = spec.tableName(); - this.stub = new WindowStoreStub(); + this.stub = new WindowStoreStub(partitioner.segmenter().retentionPeriodMs()); this.partitioner = partitioner; } @@ -207,17 +207,17 @@ protected RemoteWriteResult updateOffsetAndStreamTime( final long consumedOffset, final long streamTime ) { - return null; + return RemoteWriteResult.success(null); } @Override protected RemoteWriteResult createSegment(final SegmentPartition partition) { - return null; + return RemoteWriteResult.success(null); } @Override protected RemoteWriteResult deleteSegment(final SegmentPartition partition) { - return null; + return RemoteWriteResult.success(null); } } } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java index b2abaa74e..dd2a1303c 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java @@ -28,9 +28,8 @@ public class WindowStoreStub { private final long retentionPeriod; private long observedStreamTime = 0L; - public WindowStoreStub() { - // TODO: how can we pass the actual retention period through to the store stub? - this.retentionPeriod = 15L; + public WindowStoreStub(final long retentionPeriod) { + this.retentionPeriod = retentionPeriod; } public void put(final WindowedKey key, final byte[] value) { @@ -48,7 +47,7 @@ public byte[] fetch( final long windowStart ) { final WindowedKey windowedKey = new WindowedKey(key, windowStart); - if (windowStart < minValidTimestamp() && records.containsKey(windowedKey)) { + if (windowStart > minValidTimestamp() && records.containsKey(windowedKey)) { return records.get(windowedKey); } else { return null; diff --git a/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java b/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java index 51136525b..ab42a6333 100644 --- a/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java +++ b/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java @@ -14,9 +14,13 @@ import static org.apache.kafka.streams.processor.internals.ProcessorStateManager.storeChangelogTopic; +import java.time.Instant; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.test.TestRecord; /** * A utility class that lives in the o.a.k.streams package so we can access @@ -47,4 +51,32 @@ public static Set extractChangelogTopics(final Topology topology) { .flatMap(t -> t.stateChangelogTopics.keySet().stream()) .collect(Collectors.toSet()); } + + public static class TopologyTestDriverAccessor extends TopologyTestDriver { + + private final Properties props; + + public TopologyTestDriverAccessor( + final Topology topology, + final Properties config, + final Instant initialWallClockTime + ) { + super(topology, config, initialWallClockTime); + this.props = config; + } + + public Properties props() { + return props; + } + + @Override + protected void pipeRecord(final String topic, + final TestRecord record, + final Serializer keySerializer, + final Serializer valueSerializer, + final Instant time) { + super.pipeRecord(topic, record, keySerializer, valueSerializer, time); + } + + } } diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java index 5900301ab..3f49a91ca 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java @@ -12,10 +12,12 @@ package dev.responsive.kafka.api; +import static dev.responsive.kafka.api.async.AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier; import static dev.responsive.kafka.testutils.processors.Deduplicator.deduplicatorApp; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.api.stores.TtlProvider; @@ -38,6 +40,9 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.hamcrest.MatcherAssert; @@ -61,7 +66,7 @@ private static ResponsiveKeyValueParams paramsForType(final KVSchema type) { public void shouldRunWithoutResponsiveConnectionAndNoTtl(final KVSchema type) { // Given: final Topology topology = topology(paramsForType(type)); - try (final TopologyTestDriver driver = setupDriver(topology)) { + try (final ResponsiveTopologyTestDriver driver = setupDriver(topology)) { final TestInputTopic bids = driver.createInputTopic( "bids", new StringSerializer(), new StringSerializer()); @@ -152,7 +157,8 @@ public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) { @ParameterizedTest @EnumSource(SchemaTypes.KVSchema.class) - public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) { + public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) + throws InterruptedException { // Given: final Duration defaultTtl = Duration.ofMillis(15); @@ -200,7 +206,7 @@ public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type // Then: final List outputs = output.readValuesToList(); - MatcherAssert.assertThat(outputs, Matchers.contains( + MatcherAssert.assertThat(outputs, Matchers.containsInAnyOrder( "a,100,1,1,alice,CA", "d,103,3,3,carol,CA", "e,104,1,1,alex,CA", @@ -251,7 +257,7 @@ public void shouldDeduplicateWithTtlProviderToExpireOldRecords(final KVSchema ty assertNull(transactionIdStore.get(key), "should have no txn id in state store"); // send the same event in again, outside the dedupe window - inputTopic.pipeInput(key, value); + inputTopic.pipeInput(key, value, ttlMs + 1); assertNotNull( transactionIdStore.get(key), "should have a single txn id in state store, again"); } @@ -261,6 +267,7 @@ private ResponsiveTopologyTestDriver setupDriver(final Topology topology) { final Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.put(ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, 2); return new ResponsiveTopologyTestDriver(topology, props, STARTING_TIME); } @@ -283,6 +290,21 @@ private Topology topology(final ResponsiveKeyValueParams storeParams) { .join(people, (bid, person) -> bid + "," + person) // state is the 6th column .filter((k, v) -> v.split(",")[5].equals("CA")) + .processValues(createAsyncProcessorSupplier( + () -> new FixedKeyProcessor() { + + private FixedKeyProcessorContext context; + + @Override + public void init(final FixedKeyProcessorContext context) { + this.context = context; + } + + @Override + public void process(final FixedKeyRecord fixedKeyRecord) { + context.forward(fixedKeyRecord); + } + })) .to("output"); return builder.build();