-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Row-level TTL PR 6: TopologyTestDriver #376
Conversation
return null; | ||
} | ||
public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) { | ||
// trigger flush before lookup since CommitBuffer doesn't apply ttl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit hacky, but...it works
* Advances the internal mock time which can be used to trigger some Kafka Streams | ||
* functionality such as wall-clock punctuators, or force Responsive to flush | ||
* its internal buffers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to this "hack" we no longer require users to use this method to advance wall clock time and manually flush the CommitBuffer in their TTD apps. Generally this is only used for triggering punctuation in Streams so most people wouldn't already be using this in their apps, and will advance time via record timestamps
@@ -98,32 +109,36 @@ public void shouldRunAllKVStoreTypesWithTTLWithoutResponsiveConnection(final KVS | |||
"output", new StringDeserializer(), new StringDeserializer()); | |||
|
|||
// When: | |||
people.pipeInput("1", "1,alice,CA"); // time = 0 | |||
driver.advanceWallClockTime(Duration.ofMillis(5)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test now reflects how a regular Streams app would function and how a regular TTD test would be written
public class InMemoryKVTable implements RemoteKVTable<Object> { | ||
// Note: this class doesn't actually use BoundStatements and applies all operations immediately, | ||
// we just stub the BoundStatement type so we can reuse this table for the TTD | ||
public class InMemoryKVTable implements RemoteKVTable<BoundStatement> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it's worth noting that this table implementation doesn't actually apply ttl in terms of retention, only in terms of filtering results. I figure this is fine for the TTD since such test should be short-lived, but good to note in case we ever want to do anything else with this class 🤷♀️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't say I love the hack to flush on get, and it may come back to bite us if we want to expand the usage of TTD in the future, but I also think we might want to just totally rethink testing harnesses for Kafka Streams in the future so I'm OK to just get this out the door for now and revisit it later.
That being said, should CommitBuffer respect TTL? I'm OK if it doesn't because flushes generally happen pretty quickly, but that's not something I thought about until now.
Support for row-level ttl in the TopologyTestDriver, as well as the InMemoryKeyValueTable which we now use as the backing kv store for the Responsive TTD.
PR 1: API #370
PR 2: TtlResolver #371
PR 3: compute minValidTs from TtlResolver #373
PR 4: CassandraFactTable implementation #374
PR 5: extract StateSerdes #375
Planned followup PRs
Unplanned future work (can be picked up ad-hoc as needed)