From 8f79373489cea1a0e005d49d26f8bd2cbdea1691 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Fri, 1 Nov 2024 16:37:29 -0700 Subject: [PATCH] test plus fix --- .../api/ResponsiveTopologyTestDriver.java | 11 +-- .../internal/clients/TTDCassandraClient.java | 15 +++- .../kafka/internal/db/TTDKeyValueTable.java | 15 +++- ...veTopologyTestDriverKeyValueStoreTest.java | 68 +++++++++++++++++-- 4 files changed, 95 insertions(+), 14 deletions(-) diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java index 4ecab6647..f1d83a68e 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java @@ -97,21 +97,22 @@ public ResponsiveTopologyTestDriver( topology, config, initialWallClockTime, - new TTDCassandraClient(new TTDMockAdmin(baseProps(config), topology)) + new TTDCassandraClient( + new TTDMockAdmin(baseProps(config), topology), + mockTime(initialWallClockTime)) ); } /** - * Advances the internal mock time which can be used to trigger some Kafka Streams - * functionality such as wall-clock punctuators, or force Responsive to flush - * its internal buffers. + * Advances the internal mock time used for Responsive-specific features such as ttl, as well + * as the mock time used for various Kafka Streams functionality such as wall-clock punctuators. * See {@link TopologyTestDriver#advanceWallClockTime(Duration)} for more details. * * @param advance the amount of time to advance wall-clock time */ @Override public void advanceWallClockTime(final Duration advance) { - client.flush(); + client.advanceWallClockTime(advance); super.advanceWallClockTime(advance); } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java index 8e707e1a2..372e8da72 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java @@ -35,19 +35,23 @@ import dev.responsive.kafka.internal.db.inmemory.InMemoryKVTable; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import dev.responsive.kafka.internal.utils.RemoteMonitor; +import java.time.Duration; import java.util.OptionalInt; import java.util.concurrent.CompletionStage; +import org.apache.kafka.common.utils.Time; public class TTDCassandraClient extends CassandraClient { private final ResponsiveStoreRegistry storeRegistry = new ResponsiveStoreRegistry(); private final TTDMockAdmin admin; + private final Time time; private final TableCache> kvFactory; private final WindowedTableCache> windowedFactory; - public TTDCassandraClient(final TTDMockAdmin admin) { + public TTDCassandraClient(final TTDMockAdmin admin, final Time time) { super(loggedConfig(admin.props())); this.admin = admin; + this.time = time; kvFactory = new TableCache<>(spec -> new TTDKeyValueTable(spec, this)); windowedFactory = new WindowedTableCache<>((spec, partitioner) -> TTDWindowedTable.create(spec, @@ -64,6 +68,15 @@ public TTDMockAdmin mockAdmin() { return admin; } + public long currentWallClockTimeMs() { + return time.milliseconds(); + } + + public void advanceWallClockTime(final Duration advance) { + flush(); + time.sleep(advance.toMillis()); + } + public void flush() { storeRegistry.stores().forEach(s -> s.onCommit().accept(0L)); } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java index b67837cb2..314693935 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java @@ -36,7 +36,10 @@ public byte[] get(final int kafkaPartition, final Bytes key, final long streamTi // trigger flush before lookup since CommitBuffer doesn't apply ttl client.flush(); - return super.get(kafkaPartition, key, streamTimeMs); + // take the max of mock "wallclock" time and stream-time since ttl is applied on both + final long currentTimeMs = Math.max(streamTimeMs, client.currentWallClockTimeMs()); + + return super.get(kafkaPartition, key, currentTimeMs); } @Override @@ -49,7 +52,10 @@ public KeyValueIterator range( // trigger flush before lookup since CommitBuffer doesn't apply ttl client.flush(); - return super.range(kafkaPartition, from, to, streamTimeMs); + // take the max of mock "wallclock" time and stream-time since ttl is applied on both + final long currentTimeMs = Math.max(streamTimeMs, client.currentWallClockTimeMs()); + + return super.range(kafkaPartition, from, to, currentTimeMs); } @Override @@ -57,7 +63,10 @@ public KeyValueIterator all(final int kafkaPartition, final long // trigger flush before lookup since CommitBuffer doesn't apply ttl client.flush(); - return super.all(kafkaPartition, streamTimeMs); + // take the max of mock "wallclock" time and stream-time since ttl is applied on both + final long currentTimeMs = Math.max(streamTimeMs, client.currentWallClockTimeMs()); + + return super.all(kafkaPartition, currentTimeMs); } } \ No newline at end of file diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java index 5dc3baeae..335c1c852 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java @@ -86,7 +86,7 @@ public void shouldRunWithoutResponsiveConnectionAndNoTtl(final KVSchema type) { @ParameterizedTest @EnumSource(SchemaTypes.KVSchema.class) - public void shouldRunWithoutResponsiveConnectionAndKeyBasedTtl(final KVSchema type) { + public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) { // Given: final Duration defaultTtl = Duration.ofMillis(15); @@ -114,15 +114,15 @@ public void shouldRunWithoutResponsiveConnectionAndKeyBasedTtl(final KVSchema ty people.pipeInput("2", "2,bob,OR", 5); // insert time = 5 (advances streamTime to 5) people.pipeInput("3", "3,carol,CA", 10); // insert time = 10 (advances streamTime to 10) - bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 - bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 + bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired + bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired people.pipeInput("4", "4,dean-ignored,VA", 20); // advances streamTime to 20 bids.pipeInput("c", "c,102,1", 20); // streamTime = 20 -- no result b/c alice has expired - bids.pipeInput("d", "d,103,3", 20); // streamTime = 20 + bids.pipeInput("d", "d,103,3", 20); // streamTime = 20 -- result as carol is not expired - people.pipeInput("1", "1,alex,CA", 20); // streamTime = 20 + people.pipeInput("1", "1,alex,CA", 20); // insert streamTime = 20 bids.pipeInput("e", "e,104,1", 20); // streamTime = 20 -- yes result as alex replaced alice people.pipeInput("4", "4,ignored,VA", 30); // advances streamTime to 30 @@ -142,6 +142,64 @@ public void shouldRunWithoutResponsiveConnectionAndKeyBasedTtl(final KVSchema ty driver.close(); } + @ParameterizedTest + @EnumSource(SchemaTypes.KVSchema.class) + public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) { + // Given: + final Duration defaultTtl = Duration.ofMillis(15); + + // Apply infinite retention only to key (ie "person id") of 0, everyone else is default + final TtlProvider ttlProvider = TtlProvider.withDefault(defaultTtl) + .fromKey(k -> { + if (k.equals("0")) { + return Optional.of(TtlDuration.infinite()); + } else { + return Optional.empty(); + } + }); + final TopologyTestDriver driver = setupDriver(paramsForType(type).withTtlProvider(ttlProvider)); + + final TestInputTopic bids = driver.createInputTopic( + "bids", new StringSerializer(), new StringSerializer()); + final TestInputTopic people = driver.createInputTopic( + "people", new StringSerializer(), new StringSerializer()); + final TestOutputTopic output = driver.createOutputTopic( + "output", new StringDeserializer(), new StringDeserializer()); + + // When: + people.pipeInput("0", "0,infinite,CA", 0); // insert time = 0 + people.pipeInput("1", "1,alice,CA", 0); // insert time = 0 + people.pipeInput("2", "2,bob,OR", 5); // insert time = 5 + people.pipeInput("3", "3,carol,CA", 10); // insert time = 10 + + bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired + bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired + + driver.advanceWallClockTime(Duration.ofMillis(20)); // advances wallclock time to 20 + + bids.pipeInput("c", "c,102,1", 20); // time = 20 -- no result b/c alice has expired + bids.pipeInput("d", "d,103,3", 20); // time = 20 -- result since carol is not expired + + people.pipeInput("1", "1,alex,CA", 20); // insert time = 20 + bids.pipeInput("e", "e,104,1", 20); // time = 20 -- result as alex has replaced alice + + driver.advanceWallClockTime(Duration.ofMillis(30)); // advances wallclock time to 30 + + bids.pipeInput("f", "f,105,3", 30); // time = 30 -- no result b/c carol has expired + + bids.pipeInput("g", "g,106,0", 30); // time = 30 -- result b/c person w/ id 0 is infinite + + // Then: + final List outputs = output.readValuesToList(); + MatcherAssert.assertThat(outputs, Matchers.contains( + "a,100,1,1,alice,CA", + "d,103,3,3,carol,CA", + "e,104,1,1,alex,CA", + "g,106,0,0,infinite,CA" + )); + driver.close(); + } + private ResponsiveTopologyTestDriver setupDriver(final ResponsiveKeyValueParams storeParams) { final Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);