-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Row-level TTL PR 9: TTD test with ttl deduplicator #381
Conversation
@@ -62,6 +62,19 @@ tasks.publish { | |||
dependsOn(tasks[writeVersionPropertiesFile]) | |||
} | |||
|
|||
configurations { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll admit I had to ask chatGPT how to access the stuff in the testutils of kafka-client
from the tests in responsive-test-utils
, and this was how it said to do it. Not sure it's necessary the right way to do so, if you have a different suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 when it comes to build systems, if it works it works
public Processor<String, String, String, String> get() { | ||
return new TtlDeduplicator(); | ||
} | ||
final var params = ResponsiveKeyValueParams.fact(STORE_NAME).withTtlProvider( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no actual changes to this test at all, just moving some things to the testutils and getting rid of the duplicate code here
@@ -1,4 +1,20 @@ | |||
package dev.responsive.kafka.internal.db.testutils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
felt a bit confusing to have multiple testutils packages in different places, and since this one had only the one class in it I decided to just move this class to the main testutils and get rid of this other testutils package.
)); | ||
driver.close(); | ||
final Topology topology = topology(paramsForType(type)); | ||
try (final TopologyTestDriver driver = setupDriver(topology)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no changes to any of the existing tests in this file, just moved the driver into a try-with-resources
"input", | ||
new StringSerializer(), | ||
new StringSerializer(), | ||
STARTING_TIME, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we fix this issue in Streams we should be able to remove this explicit time initialization here and in the driver itself, and have the tests still pass. Until then, we can't verify ttl with millisecond precision because the system time might drift between the driver initialization and the test topic creation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
No changes to the ttl implementation, just adding another TTD test to replicate a ttl deduplicator type app. Due to a small discrepency in the way timestamps are resolved in the TestInputTopics created by the TTD, users will need to make sure they specify an initial time for the TTD itself and either (a) pass the same starting time in to the the driver's #createInputTopic method when creating a TestInputTopic, or (b) set timestamps for each record passed in via the topic's #pipeInput method.
I submitted a fix for this issue in the Kafka Streams TTD, but we'll need to wait for that to be released before the Responsive TTD will work without setting the times as described above.
Also includes some refactoring of test code to reduce duplicate code for common things like custom processors, processor suppliers, and certain topologies we use in multiple places like the deduplicator
PR 1: API #370
PR 2: TtlResolver #371
PR 3: compute minValidTs from TtlResolver #373
PR 4: CassandraFactTable implementation #374
PR 5: extract StateSerdes #375
PR 6: support for ResponsiveTopologyTestDriver #376
PR 7: TopologyTestDriver#advanceWallClockTime #379
PR 8: advance streamTime in #get and add RowLevelTtlIntegrationTest #380