-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Row-level ttl for KV stores #369
base: main
Are you sure you want to change the base?
Conversation
kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java
Outdated
Show resolved
Hide resolved
kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's looking good! Thanks for putting this together so fast. No major comments.
kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java
Outdated
Show resolved
Hide resolved
// TODO(sophie): figure out how to account for row-level ttl in migration mode | ||
startTimeMs = System.currentTimeMillis() - params.ttlProvider().defaultTtl().toMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's create a follow-up ticket, but I think what we can do is just have them pass in the "start time" to the tool, and then just apply individual TTls as necessary
kafka-client/src/main/java/dev/responsive/kafka/internal/utils/StateDeserializer.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java
Outdated
Show resolved
Hide resolved
kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java
Outdated
Show resolved
Hide resolved
|
||
package dev.responsive.kafka.internal.db; | ||
|
||
public class AbstractCassandraKVTableIntegrationTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope this is TODO -(going to combine some of the common KV vs Fact table tests, especially the ttl unit tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if possible, I'd prefer we used static helper methods to abstract classes for tests 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not about the static helper methods it's about the test themselves. We want to run largely the same set of tests for both of these (same insert semantics, same ttl semantics, etc) so I was going to extract all the common tests into this class.
If you'd prefer the @Parametrize approach I can take a look at that, but since not all the tests are going to be the same for the kv vs fact tables, I thought the abstract test format would be cleaner. Does that make sense?
(That said, ideally we can share the same tests for the Mongo table as well, in which case another approach might be easier in the end 🤷♀️ )
|
||
maybeTtlSpec = ((DelegatingTableSpec) maybeTtlSpec).delegate(); | ||
if (!spec.ttlResolver().hasConstantTtl()) { | ||
throw new UnsupportedOperationException("The ResponsiveTopologyTestDriver does not yet " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's create a ticket for this, I think we have customers that use this
.bind() | ||
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) | ||
.setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTs)); | ||
public byte[] get(final int kafkaPartition, final Bytes key, long streamTimeMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the conditional complexity of this method is pretty high -- would be good to split it up into individual methods, perhaps follow the form:
if !ttlResolver { simpleGet() }
else if !ttlResolver.needsValueToComputeTtl() { getWithMinTtl() }
else { getWithPostFilterForTtl() }
|
||
final ResultSet result = client.execute(range); | ||
resultsPerPartition.add(Iterators.kv(result.iterator(), CassandraKeyValueTable::rows)); | ||
} | ||
// TODO(sophie): filter by minValidTs if based on value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make sure to address these TODOs before cutting a release since this will result in wrong data, I know it's already a big PR so just make sure we're tracking it somewhere
// if the default ttl is infinite we still have to define the ttl index for | ||
// the table since the ttlProvider may apply row-level overrides. To approximate | ||
// an "infinite" default retention, we just set the default ttl to the maximum value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change the approach here to be more like the Tombstones: we would create the index on a new field expire_ts
(instead of the insertion ts), which could be just be null/empty for documents that don't have a TTL. And instead of having expireAfter = ttl + 12h
we should have expireAfter = 12h
(which by the way, it looks like this change dropped the 12h, which was there to give a grace to make sure that stream time advances past the physical time). This would dramatically reduce the size of the index in situations where many rows do not have a TTL.
From https://www.mongodb.com/docs/manual/core/index-ttl/#expiration-of-data:
If the indexed field in a document doesn't contain one or more date values, the document will not expire.
If a document does not contain the indexed field, the document will not expire.
return v == null ? null : v.getValue(); | ||
public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) { | ||
|
||
// Need to post-filter if value is needed to compute ttl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think with the approach suggested above, we can do it all server side because expire_ts
is already a dedicated column so we don't need to recompute it. We could actually consider doing the same thing in Scylla by inserting an extra column for expire_ts (they just wouldn't create an index on it).
) { | ||
// TODO(sophie): filter by minValidTs if based on key or default only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about TODOs
kafka-client/src/main/java/dev/responsive/kafka/internal/utils/Utils.java
Outdated
Show resolved
Hide resolved
// Just return Optional.empty if the row override is actually equal to the default | ||
// to help simplify handling logic for tables | ||
if (rowTtlOverride.isPresent() && rowTtlOverride.get().equals(defaultTtl)) { | ||
return Optional.empty(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer we removed this optimization unless it proves necessary -- does it actually simplify things? I imagine we shouldn't be comparing the result of this to the default anywhere in the code
It works with today's implementation, but if we ever have partial updates it's risky (as in I inserted a row with a non-default TTL and then I update it to be the default TTL I should make sure that every scenario would override the old TTL)
Filters.gte(KVDoc.TIMESTAMP, minValidTs) | ||
)).first(); | ||
return v == null ? null : v.getValue(); | ||
public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure to add tests for MongoDB as well. I'd actually prefer if we just threw an exception for this PR if there's anything but a static default TTL so we can have all the mongo changes be part of a future PR.
} | ||
|
||
@Test | ||
public void test() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this test complete? (also re: TODO below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha no, this is the test I've been working on. I just happened to push the skeleton for this it seems
|
||
// Then | ||
assertThat(valAt0, is(val)); | ||
assertThat(valAt1, nullValue()); | ||
long lookupTime = Duration.ofMinutes(11).toMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to change it for this PR, but just in the future you can use TimeUnit.MINUTES.toMillis(11)
etc.. for all of these. IMO a little easier to read and has no object allocations 😅
0436073
to
ef015eb
Compare
ef015eb
to
993c8eb
Compare
TODO (this PR, WIP now):
-RowLevelTtlIntegrationTest
-MongoKeyValueTable unit tests
-AbstractCassandraKVTable unit test (extract ttl unit tests from CassandraFactTableTest)
-range/all support
Followup work:
-migration mode ttl
-in-memory kv store ttl?
-TopologyTestDriver ttl