Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-level TTL PR 7: TopologyTestDriver#advanceWallClockTime #379

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,22 @@ public ResponsiveTopologyTestDriver(
topology,
config,
initialWallClockTime,
new TTDCassandraClient(new TTDMockAdmin(baseProps(config), topology))
new TTDCassandraClient(
new TTDMockAdmin(baseProps(config), topology),
mockTime(initialWallClockTime))
);
}

/**
* Advances the internal mock time which can be used to trigger some Kafka Streams
* functionality such as wall-clock punctuators, or force Responsive to flush
* its internal buffers.
* Advances the internal mock time used for Responsive-specific features such as ttl, as well
* as the mock time used for various Kafka Streams functionality such as wall-clock punctuators.
* See {@link TopologyTestDriver#advanceWallClockTime(Duration)} for more details.
*
* @param advance the amount of time to advance wall-clock time
*/
@Override
public void advanceWallClockTime(final Duration advance) {
client.flush();
client.advanceWallClockTime(advance);
super.advanceWallClockTime(advance);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,23 @@
import dev.responsive.kafka.internal.db.inmemory.InMemoryKVTable;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.RemoteMonitor;
import java.time.Duration;
import java.util.OptionalInt;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.utils.Time;

public class TTDCassandraClient extends CassandraClient {
private final ResponsiveStoreRegistry storeRegistry = new ResponsiveStoreRegistry();
private final TTDMockAdmin admin;
private final Time time;

private final TableCache<RemoteKVTable<BoundStatement>> kvFactory;
private final WindowedTableCache<RemoteWindowedTable<BoundStatement>> windowedFactory;

public TTDCassandraClient(final TTDMockAdmin admin) {
public TTDCassandraClient(final TTDMockAdmin admin, final Time time) {
super(loggedConfig(admin.props()));
this.admin = admin;
this.time = time;

kvFactory = new TableCache<>(spec -> new TTDKeyValueTable(spec, this));
windowedFactory = new WindowedTableCache<>((spec, partitioner) -> TTDWindowedTable.create(spec,
Expand All @@ -64,6 +68,15 @@ public TTDMockAdmin mockAdmin() {
return admin;
}

public long currentWallClockTimeMs() {
return time.milliseconds();
}

public void advanceWallClockTime(final Duration advance) {
flush();
time.sleep(advance.toMillis());
}

public void flush() {
storeRegistry.stores().forEach(s -> s.onCommit().accept(0L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ public byte[] get(final int kafkaPartition, final Bytes key, final long streamTi
// trigger flush before lookup since CommitBuffer doesn't apply ttl
client.flush();

return super.get(kafkaPartition, key, streamTimeMs);
// take the max of mock "wallclock" time and stream-time since ttl is applied on both
final long currentTimeMs = Math.max(streamTimeMs, client.currentWallClockTimeMs());

return super.get(kafkaPartition, key, currentTimeMs);
}

@Override
Expand All @@ -49,15 +52,21 @@ public KeyValueIterator<Bytes, byte[]> range(
// trigger flush before lookup since CommitBuffer doesn't apply ttl
client.flush();

return super.range(kafkaPartition, from, to, streamTimeMs);
// take the max of mock "wallclock" time and stream-time since ttl is applied on both
final long currentTimeMs = Math.max(streamTimeMs, client.currentWallClockTimeMs());

return super.range(kafkaPartition, from, to, currentTimeMs);
}

@Override
public KeyValueIterator<Bytes, byte[]> all(final int kafkaPartition, final long streamTimeMs) {
// trigger flush before lookup since CommitBuffer doesn't apply ttl
client.flush();

return super.all(kafkaPartition, streamTimeMs);
// take the max of mock "wallclock" time and stream-time since ttl is applied on both
final long currentTimeMs = Math.max(streamTimeMs, client.currentWallClockTimeMs());

return super.all(kafkaPartition, currentTimeMs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void shouldRunWithoutResponsiveConnectionAndNoTtl(final KVSchema type) {

@ParameterizedTest
@EnumSource(SchemaTypes.KVSchema.class)
public void shouldRunWithoutResponsiveConnectionAndKeyBasedTtl(final KVSchema type) {
public void shouldEnforceKeyBasedTtlByAdvancingStreamTime(final KVSchema type) {
// Given:
final Duration defaultTtl = Duration.ofMillis(15);

Expand Down Expand Up @@ -114,15 +114,15 @@ public void shouldRunWithoutResponsiveConnectionAndKeyBasedTtl(final KVSchema ty
people.pipeInput("2", "2,bob,OR", 5); // insert time = 5 (advances streamTime to 5)
people.pipeInput("3", "3,carol,CA", 10); // insert time = 10 (advances streamTime to 10)

bids.pipeInput("a", "a,100,1", 10); // streamTime = 10
bids.pipeInput("b", "b,101,2", 10); // streamTime = 10
bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired
bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired

people.pipeInput("4", "4,dean-ignored,VA", 20); // advances streamTime to 20

bids.pipeInput("c", "c,102,1", 20); // streamTime = 20 -- no result b/c alice has expired
bids.pipeInput("d", "d,103,3", 20); // streamTime = 20
bids.pipeInput("d", "d,103,3", 20); // streamTime = 20 -- result as carol is not expired

people.pipeInput("1", "1,alex,CA", 20); // streamTime = 20
people.pipeInput("1", "1,alex,CA", 20); // insert streamTime = 20
bids.pipeInput("e", "e,104,1", 20); // streamTime = 20 -- yes result as alex replaced alice

people.pipeInput("4", "4,ignored,VA", 30); // advances streamTime to 30
Expand All @@ -142,6 +142,64 @@ public void shouldRunWithoutResponsiveConnectionAndKeyBasedTtl(final KVSchema ty
driver.close();
}

@ParameterizedTest
@EnumSource(SchemaTypes.KVSchema.class)
public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verified that this test failed without the fix

// Given:
final Duration defaultTtl = Duration.ofMillis(15);

// Apply infinite retention only to key (ie "person id") of 0, everyone else is default
final TtlProvider<String, ?> ttlProvider = TtlProvider.<String, Object>withDefault(defaultTtl)
.fromKey(k -> {
if (k.equals("0")) {
return Optional.of(TtlDuration.infinite());
} else {
return Optional.empty();
}
});
final TopologyTestDriver driver = setupDriver(paramsForType(type).withTtlProvider(ttlProvider));

final TestInputTopic<String, String> bids = driver.createInputTopic(
"bids", new StringSerializer(), new StringSerializer());
final TestInputTopic<String, String> people = driver.createInputTopic(
"people", new StringSerializer(), new StringSerializer());
final TestOutputTopic<String, String> output = driver.createOutputTopic(
"output", new StringDeserializer(), new StringDeserializer());

// When:
people.pipeInput("0", "0,infinite,CA", 0); // insert time = 0
people.pipeInput("1", "1,alice,CA", 0); // insert time = 0
people.pipeInput("2", "2,bob,OR", 5); // insert time = 5
people.pipeInput("3", "3,carol,CA", 10); // insert time = 10

bids.pipeInput("a", "a,100,1", 10); // streamTime = 10 -- result as alice is not expired
bids.pipeInput("b", "b,101,2", 10); // streamTime = 10 -- result as bob is not expired

driver.advanceWallClockTime(Duration.ofMillis(20)); // advances wallclock time to 20

bids.pipeInput("c", "c,102,1", 20); // time = 20 -- no result b/c alice has expired
bids.pipeInput("d", "d,103,3", 20); // time = 20 -- result since carol is not expired

people.pipeInput("1", "1,alex,CA", 20); // insert time = 20
bids.pipeInput("e", "e,104,1", 20); // time = 20 -- result as alex has replaced alice

driver.advanceWallClockTime(Duration.ofMillis(30)); // advances wallclock time to 30

bids.pipeInput("f", "f,105,3", 30); // time = 30 -- no result b/c carol has expired

bids.pipeInput("g", "g,106,0", 30); // time = 30 -- result b/c person w/ id 0 is infinite

// Then:
final List<String> outputs = output.readValuesToList();
MatcherAssert.assertThat(outputs, Matchers.contains(
"a,100,1,1,alice,CA",
"d,103,3,3,carol,CA",
"e,104,1,1,alex,CA",
"g,106,0,0,infinite,CA"
));
driver.close();
}

private ResponsiveTopologyTestDriver setupDriver(final ResponsiveKeyValueParams storeParams) {
final Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
Expand Down
Loading