diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index f6b7c2fa5..b084fe206 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -62,6 +62,19 @@ tasks.publish { dependsOn(tasks[writeVersionPropertiesFile]) } +configurations { + create("testArtifacts") +} + +tasks.register("testJar") { + from(sourceSets["test"].output) + archiveClassifier.set("test") +} + +artifacts { + add("testArtifacts", tasks["testJar"]) +} + /********************************************/ dependencies { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java index 9c82a6ff4..cff676c95 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java @@ -21,6 +21,7 @@ import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords; import static dev.responsive.kafka.testutils.IntegrationTestUtils.readOutputWithTimestamps; import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; +import static dev.responsive.kafka.testutils.processors.Deduplicator.deduplicatorApp; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; @@ -39,33 +40,23 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; -import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.api.stores.TtlProvider; import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration; import dev.responsive.kafka.testutils.KeyValueTimestamp; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; import java.time.Duration; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -202,35 +193,13 @@ public void shouldApplyRowLevelTtlForKeyAndValue() throws Exception { } private ResponsiveKafkaStreams buildStreams(final Map properties) { - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream input = builder.stream(inputTopic()); - input.process(new TtlProcessorSupplier(), STORE_NAME) - .to(outputTopic()); - - return new ResponsiveKafkaStreams(builder.build(), properties); - } - - @SuppressWarnings("checkstyle:linelength") - private static class TtlProcessorSupplier implements ProcessorSupplier { - - @Override - public Processor get() { - return new TtlDeduplicator(); - } + final var params = ResponsiveKeyValueParams.fact(STORE_NAME).withTtlProvider( + TtlProvider.>withDefault(DEFAULT_TTL) + .fromKeyAndValue(RowLevelTtlIntegrationTest::ttlForKeyAndValue) + ); + final Topology topology = deduplicatorApp(inputTopic(), outputTopic(), params); - @Override - public Set> stores() { - return Collections.singleton(ResponsiveStores.timestampedKeyValueStoreBuilder( - ResponsiveStores.keyValueStore( - ResponsiveKeyValueParams.fact(STORE_NAME).withTtlProvider( - TtlProvider.>withDefault(DEFAULT_TTL) - .fromKeyAndValue(RowLevelTtlIntegrationTest::ttlForKeyAndValue) - )), - Serdes.String(), - Serdes.String() - )); - } + return new ResponsiveKafkaStreams(topology, properties); } private static Optional ttlForKeyAndValue( @@ -266,31 +235,6 @@ private static Optional ttlForKeyAndValue( return Optional.empty(); } - private static class TtlDeduplicator implements Processor { - - private ProcessorContext context; - private TimestampedKeyValueStore ttlStore; - - @Override - public void init(final ProcessorContext context) { - this.context = context; - this.ttlStore = context.getStateStore(STORE_NAME); - } - - @Override - public void process(final Record record) { - final ValueAndTimestamp previous = ttlStore.putIfAbsent( - record.key(), - ValueAndTimestamp.make(record.value(), record.timestamp()) - ); - - if (previous == null) { - context.forward(record); - } - - } - } - private Map getMutableProperties() { final Map properties = new HashMap<>(responsiveProps); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTableTest.java index 9d037338f..583969a7b 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTableTest.java @@ -1,8 +1,8 @@ package dev.responsive.kafka.internal.db.inmemory; -import static dev.responsive.kafka.internal.db.testutils.Matchers.sameKeyValue; import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL; import static dev.responsive.kafka.testutils.IntegrationTestUtils.defaultOnlyTtl; +import static dev.responsive.kafka.testutils.Matchers.sameKeyValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java index 987de7870..3cf1b1057 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java @@ -17,9 +17,9 @@ package dev.responsive.kafka.internal.db.mongo; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; -import static dev.responsive.kafka.internal.db.testutils.Matchers.sameKeyValue; import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL; import static dev.responsive.kafka.testutils.IntegrationTestUtils.defaultOnlyTtl; +import static dev.responsive.kafka.testutils.Matchers.sameKeyValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java index 748c21386..74ed2fa9d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java @@ -1,7 +1,7 @@ package dev.responsive.kafka.internal.db.mongo; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; -import static dev.responsive.kafka.internal.db.testutils.Matchers.sameKeyValue; +import static dev.responsive.kafka.testutils.Matchers.sameKeyValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/testutils/Matchers.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/Matchers.java similarity index 62% rename from kafka-client/src/test/java/dev/responsive/kafka/internal/db/testutils/Matchers.java rename to kafka-client/src/test/java/dev/responsive/kafka/testutils/Matchers.java index c2eebafa0..c3c867a28 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/testutils/Matchers.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/Matchers.java @@ -1,4 +1,20 @@ -package dev.responsive.kafka.internal.db.testutils; +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.testutils; import java.util.Arrays; import org.apache.kafka.streams.KeyValue; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/processors/Deduplicator.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/processors/Deduplicator.java new file mode 100644 index 000000000..11689ec31 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/processors/Deduplicator.java @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.testutils.processors; + +import static dev.responsive.kafka.testutils.processors.GenericProcessorSuppliers.getFixedKeySupplier; + +import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; +import dev.responsive.kafka.api.stores.ResponsiveStores; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +public class Deduplicator { + + public static Topology deduplicatorApp( + final String inputTopicName, + final String outputTopicName, + final ResponsiveKeyValueParams params + ) { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = builder.stream(inputTopicName); + + final var storeBuilder = ResponsiveStores.timestampedKeyValueStoreBuilder( + ResponsiveStores.keyValueStore(params), Serdes.String(), Serdes.String() + ); + final String storeName = params.name().kafkaName(); + input + .processValues(getFixedKeySupplier(DeduplicatorProcessor::new, storeBuilder), storeName) + .to(outputTopicName); + + return builder.build(); + } + + private static class DeduplicatorProcessor implements FixedKeyProcessor { + + private final String storeName; + + private FixedKeyProcessorContext context; + private TimestampedKeyValueStore ttlStore; + + public DeduplicatorProcessor(final String storeName) { + this.storeName = storeName; + } + + @Override + public void init(final FixedKeyProcessorContext context) { + this.context = context; + this.ttlStore = context.getStateStore(storeName); + } + + @Override + public void process(final FixedKeyRecord record) { + final ValueAndTimestamp previous = ttlStore.putIfAbsent( + record.key(), + ValueAndTimestamp.make(record.value(), record.timestamp()) + ); + + if (previous == null) { + context.forward(record); + } + + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/processors/GenericProcessorSuppliers.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/processors/GenericProcessorSuppliers.java new file mode 100644 index 000000000..ed2c4409e --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/processors/GenericProcessorSuppliers.java @@ -0,0 +1,100 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.testutils.processors; + +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.state.StoreBuilder; + +public class GenericProcessorSuppliers { + + public static ProcessorSupplier getSupplier( + final Function> processorForStoreName, + final StoreBuilder storeBuilder + ) { + return new GenericProcessorSupplier<>(processorForStoreName, storeBuilder); + } + + public static FixedKeyProcessorSupplier getFixedKeySupplier( + final Function> processorForStoreName, + final StoreBuilder storeBuilder + ) { + return new GenericFixedKeyProcessorSupplier<>(processorForStoreName, storeBuilder); + } + + private static class GenericProcessorSupplier + implements ProcessorSupplier { + + private final Function> processorForStoreName; + private final StoreBuilder storeBuilder; + + public GenericProcessorSupplier( + final Function> processorForStoreName, + final StoreBuilder storeBuilder + ) { + this.processorForStoreName = processorForStoreName; + this.storeBuilder = storeBuilder; + } + + @Override + public Processor get() { + return processorForStoreName.apply(storeBuilder.name()); + } + + @Override + public Set> stores() { + if (storeBuilder != null) { + return Collections.singleton(storeBuilder); + } + return null; + } + } + + private static class GenericFixedKeyProcessorSupplier + implements FixedKeyProcessorSupplier { + + private final Function> processorForStoreName; + private final StoreBuilder storeBuilder; + + public GenericFixedKeyProcessorSupplier( + final Function> processorForStoreName, + final StoreBuilder storeBuilder + ) { + this.processorForStoreName = processorForStoreName; + this.storeBuilder = storeBuilder; + } + + @Override + public FixedKeyProcessor get() { + return processorForStoreName.apply(storeBuilder.name()); + } + + @Override + public Set> stores() { + if (storeBuilder != null) { + return Collections.singleton(storeBuilder); + } + return null; + } + } + +} diff --git a/responsive-test-utils/build.gradle.kts b/responsive-test-utils/build.gradle.kts index 0b0596f9b..059e60266 100644 --- a/responsive-test-utils/build.gradle.kts +++ b/responsive-test-utils/build.gradle.kts @@ -24,11 +24,13 @@ dependencies { implementation(project(":kafka-client")) api(libs.kafka.streams.test.utils) + implementation(libs.bundles.scylla) + implementation(variantOf(libs.kafka.clients) { classifier("test") }) - implementation(libs.bundles.scylla) + testImplementation(project(":kafka-client", configuration = "testArtifacts")) testImplementation(testlibs.bundles.base) testImplementation(libs.bundles.logging) diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java index 3cc20455c..60c161cc3 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java @@ -16,6 +16,10 @@ package dev.responsive.kafka.api; +import static dev.responsive.kafka.testutils.processors.Deduplicator.deduplicatorApp; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.api.stores.TtlProvider; @@ -38,6 +42,8 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.jupiter.params.ParameterizedTest; @@ -45,43 +51,47 @@ public class ResponsiveTopologyTestDriverKeyValueStoreTest { + private static final Instant STARTING_TIME = Instant.EPOCH; + private static final String STORE_NAME = "people"; + private static ResponsiveKeyValueParams paramsForType(final KVSchema type) { return type == KVSchema.KEY_VALUE - ? ResponsiveKeyValueParams.keyValue("people") - : ResponsiveKeyValueParams.fact("people"); + ? ResponsiveKeyValueParams.keyValue(STORE_NAME) + : ResponsiveKeyValueParams.fact(STORE_NAME); } @ParameterizedTest @EnumSource(SchemaTypes.KVSchema.class) public void shouldRunWithoutResponsiveConnectionAndNoTtl(final KVSchema type) { // Given: - final TopologyTestDriver driver = setupDriver(paramsForType(type)); - - final TestInputTopic bids = driver.createInputTopic( - "bids", new StringSerializer(), new StringSerializer()); - final TestInputTopic people = driver.createInputTopic( - "people", new StringSerializer(), new StringSerializer()); - final TestOutputTopic output = driver.createOutputTopic( - "output", new StringDeserializer(), new StringDeserializer()); - - // When: - people.pipeInput("1", "1,alice,CA"); - people.pipeInput("2", "2,bob,OR"); - people.pipeInput("3", "3,carol,CA"); - - bids.pipeInput("a", "a,100,1"); - bids.pipeInput("b", "b,101,2"); - bids.pipeInput("c", "c,102,1"); - bids.pipeInput("d", "d,103,3"); - - // Then: - final List outputs = output.readValuesToList(); - MatcherAssert.assertThat(outputs, Matchers.contains( - "a,100,1,1,alice,CA", - "c,102,1,1,alice,CA", - "d,103,3,3,carol,CA" - )); - driver.close(); + final Topology topology = topology(paramsForType(type)); + try (final TopologyTestDriver driver = setupDriver(topology)) { + + final TestInputTopic bids = driver.createInputTopic( + "bids", new StringSerializer(), new StringSerializer()); + final TestInputTopic people = driver.createInputTopic( + "people", new StringSerializer(), new StringSerializer()); + final TestOutputTopic output = driver.createOutputTopic( + "output", new StringDeserializer(), new StringDeserializer()); + + // When: + people.pipeInput("1", "1,alice,CA"); + people.pipeInput("2", "2,bob,OR"); + people.pipeInput("3", "3,carol,CA"); + + bids.pipeInput("a", "a,100,1"); + bids.pipeInput("b", "b,101,2"); + bids.pipeInput("c", "c,102,1"); + bids.pipeInput("d", "d,103,3"); + + // Then: + final List outputs = output.readValuesToList(); + MatcherAssert.assertThat(outputs, Matchers.contains( + "a,100,1,1,alice,CA", + "c,102,1,1,alice,CA", + "d,103,3,3,carol,CA" + )); + } } @ParameterizedTest @@ -99,45 +109,49 @@ public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) { return Optional.empty(); } }); - final TopologyTestDriver driver = setupDriver(paramsForType(type).withTtlProvider(ttlProvider)); - - final TestInputTopic bids = driver.createInputTopic( - "bids", new StringSerializer(), new StringSerializer()); - final TestInputTopic people = driver.createInputTopic( - "people", new StringSerializer(), new StringSerializer()); - final TestOutputTopic output = driver.createOutputTopic( - "output", new StringDeserializer(), new StringDeserializer()); - - // When: - people.pipeInput("0", "0,infinite,CA", 0); // insert time = 0 - people.pipeInput("1", "1,alice,CA", 0); // insert time = 0 - people.pipeInput("2", "2,bob,OR", 5); // insert time = 5 (advances streamTime to 5) - people.pipeInput("3", "3,carol,CA", 10); // insert time = 10 (advances streamTime to 10) - - bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired - bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired - - // advance streamTime to 20 - bids.pipeInput("c", "c,102,1", 20); // streamTime = 20 -- no result b/c alice has expired - bids.pipeInput("d", "d,103,3", 20); // streamTime = 20 -- result as carol is not expired - - people.pipeInput("1", "1,alex,CA", 20); // insert streamTime = 20 - bids.pipeInput("e", "e,104,1", 20); // streamTime = 20 -- yes result as alex replaced alice - - // advance streamTime to 30 - bids.pipeInput("f", "f,105,3", 30); // streamTime = 30 -- no result b/c carol has expired - - bids.pipeInput("g", "g,106,0", 30); // streamTime = 30 -- yes result b/c id 0 is infinite - - // Then: - final List outputs = output.readValuesToList(); - MatcherAssert.assertThat(outputs, Matchers.contains( - "a,100,1,1,alice,CA", - "d,103,3,3,carol,CA", - "e,104,1,1,alex,CA", - "g,106,0,0,infinite,CA" - )); - driver.close(); + + final Topology topology = topology(paramsForType(type).withTtlProvider(ttlProvider)); + try (final TopologyTestDriver driver = setupDriver(topology)) { + + final TestInputTopic bids = driver.createInputTopic( + "bids", new StringSerializer(), new StringSerializer()); + final TestInputTopic people = driver.createInputTopic( + "people", new StringSerializer(), new StringSerializer()); + final TestOutputTopic output = driver.createOutputTopic( + "output", new StringDeserializer(), new StringDeserializer()); + + // When: + people.pipeInput("0", "0,infinite,CA", 0); // insert time = 0 + people.pipeInput("1", "1,alice,CA", 0); // insert time = 0 + people.pipeInput("2", "2,bob,OR", 5); // insert time = 5 (advances streamTime to 5) + people.pipeInput("3", "3,carol,CA", 10); // insert time = 10 (advances streamTime to 10) + + bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired + bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired + + // advance streamTime to 20 + bids.pipeInput("c", "c,102,1", 20); // streamTime = 20 -- no result b/c alice has expired + bids.pipeInput("d", "d,103,3", 20); // streamTime = 20 -- result as carol is not expired + + people.pipeInput("1", "1,alex,CA", 20); // insert streamTime = 20 + bids.pipeInput( + "e", "e,104,1", 20); // streamTime = 20 -- yes result as alex replaced alice + + // advance streamTime to 30 + bids.pipeInput("f", "f,105,3", 30); // streamTime = 30 -- no result b/c carol has expired + + bids.pipeInput("g", "g,106,0", 30); // streamTime = 30 -- yes result b/c id 0 is infinite + + // Then: + final List outputs = output.readValuesToList(); + MatcherAssert.assertThat(outputs, Matchers.contains( + "a,100,1,1,alice,CA", + "d,103,3,3,carol,CA", + "e,104,1,1,alex,CA", + "g,106,0,0,infinite,CA" + )); + } + } @ParameterizedTest @@ -155,56 +169,104 @@ public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type return Optional.empty(); } }); - final TopologyTestDriver driver = setupDriver(paramsForType(type).withTtlProvider(ttlProvider)); + final Topology topology = topology(paramsForType(type).withTtlProvider(ttlProvider)); + try (final TopologyTestDriver driver = setupDriver(topology)) { - final TestInputTopic bids = driver.createInputTopic( - "bids", new StringSerializer(), new StringSerializer()); - final TestInputTopic people = driver.createInputTopic( - "people", new StringSerializer(), new StringSerializer()); - final TestOutputTopic output = driver.createOutputTopic( - "output", new StringDeserializer(), new StringDeserializer()); + final TestInputTopic bids = driver.createInputTopic( + "bids", new StringSerializer(), new StringSerializer()); + final TestInputTopic people = driver.createInputTopic( + "people", new StringSerializer(), new StringSerializer()); + final TestOutputTopic output = driver.createOutputTopic( + "output", new StringDeserializer(), new StringDeserializer()); - // When: - people.pipeInput("0", "0,infinite,CA", 0); // insert time = 0 - people.pipeInput("1", "1,alice,CA", 0); // insert time = 0 - people.pipeInput("2", "2,bob,OR", 5); // insert time = 5 - people.pipeInput("3", "3,carol,CA", 10); // insert time = 10 + // When: + people.pipeInput("0", "0,infinite,CA", 0); // insert time = 0 + people.pipeInput("1", "1,alice,CA", 0); // insert time = 0 + people.pipeInput("2", "2,bob,OR", 5); // insert time = 5 + people.pipeInput("3", "3,carol,CA", 10); // insert time = 10 - bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired - bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired + bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired + bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired - driver.advanceWallClockTime(Duration.ofMillis(20)); // advances wallclock time to 20 + driver.advanceWallClockTime(Duration.ofMillis(20)); // advances wallclock time to 20 - bids.pipeInput("c", "c,102,1", 20); // time = 20 -- no result b/c alice has expired - bids.pipeInput("d", "d,103,3", 20); // time = 20 -- result since carol is not expired + bids.pipeInput("c", "c,102,1", 20); // time = 20 -- no result b/c alice has expired + bids.pipeInput("d", "d,103,3", 20); // time = 20 -- result since carol is not expired - people.pipeInput("1", "1,alex,CA", 20); // insert time = 20 - bids.pipeInput("e", "e,104,1", 20); // time = 20 -- result as alex has replaced alice + people.pipeInput("1", "1,alex,CA", 20); // insert time = 20 + bids.pipeInput("e", "e,104,1", 20); // time = 20 -- result as alex has replaced alice - driver.advanceWallClockTime(Duration.ofMillis(30)); // advances wallclock time to 30 + driver.advanceWallClockTime(Duration.ofMillis(30)); // advances wallclock time to 30 - bids.pipeInput("f", "f,105,3", 30); // time = 30 -- no result b/c carol has expired + bids.pipeInput("f", "f,105,3", 30); // time = 30 -- no result b/c carol has expired - bids.pipeInput("g", "g,106,0", 30); // time = 30 -- result b/c person w/ id 0 is infinite + bids.pipeInput("g", "g,106,0", 30); // time = 30 -- result b/c person w/ id 0 is infinite - // Then: - final List outputs = output.readValuesToList(); - MatcherAssert.assertThat(outputs, Matchers.contains( - "a,100,1,1,alice,CA", - "d,103,3,3,carol,CA", - "e,104,1,1,alex,CA", - "g,106,0,0,infinite,CA" - )); - driver.close(); + // Then: + final List outputs = output.readValuesToList(); + MatcherAssert.assertThat(outputs, Matchers.contains( + "a,100,1,1,alice,CA", + "d,103,3,3,carol,CA", + "e,104,1,1,alex,CA", + "g,106,0,0,infinite,CA" + )); + } + } + + @ParameterizedTest + @EnumSource(SchemaTypes.KVSchema.class) + public void shouldDeduplicateWithTtlProviderToExpireOldRecords(final KVSchema type) { + final String value = "ignored"; + + final String key = "key"; + final long ttlMs = 10_000L; // 10s + final var ttlProvider = + TtlProvider.withDefault(Duration.ofMillis(ttlMs)) + .fromKey( + k -> { + final Duration ttl = Duration.ofMillis(ttlMs); + System.out.println("TTL: " + ttl); + return Optional.of(TtlDuration.of(ttl)); + }); + + final ResponsiveKeyValueParams params = paramsForType(type).withTtlProvider(ttlProvider); + final Topology topology = deduplicatorApp("input", "output", params); + try (final ResponsiveTopologyTestDriver driver = setupDriver(topology)) { + + final TestInputTopic inputTopic = driver.createInputTopic( + "input", + new StringSerializer(), + new StringSerializer(), + STARTING_TIME, + Duration.ZERO + ); + inputTopic.pipeInput(key, value); + + final KeyValueStore> transactionIdStore = + driver.getTimestampedKeyValueStore(STORE_NAME); + + assertNotNull( + transactionIdStore.get(key), "should have a single txn id in state store"); + + // advance wall clock time by 1 millisecond past expiration + driver.advanceWallClockTime(Duration.ofMillis(ttlMs).plusMillis(1L)); + + // should be expired + assertNull(transactionIdStore.get(key), "should have no txn id in state store"); + + // send the same event in again, outside the dedupe window + inputTopic.pipeInput(key, value); + assertNotNull( + transactionIdStore.get(key), "should have a single txn id in state store, again"); + } } - private ResponsiveTopologyTestDriver setupDriver(final ResponsiveKeyValueParams storeParams) { + private ResponsiveTopologyTestDriver setupDriver(final Topology topology) { final Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); - final Topology topology = topology(storeParams); - return new ResponsiveTopologyTestDriver(topology, props, Instant.EPOCH); + return new ResponsiveTopologyTestDriver(topology, props, STARTING_TIME); } private Topology topology(final ResponsiveKeyValueParams storeParams) { @@ -214,7 +276,7 @@ private Topology topology(final ResponsiveKeyValueParams storeParams) { final KStream bids = builder.stream("bids"); // schema for people is key: value: final KTable people = builder.table( - "people", + STORE_NAME, ResponsiveStores.materialized(storeParams) );