Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-level TTL PR 6: TopologyTestDriver #376

Merged
merged 6 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static <K, V> TtlProvider<K, V> withDefault(final Duration defaultTtl) {
public static <K, V> TtlProvider<K, V> withNoDefault() {
return new TtlProvider<>(
TtlType.DEFAULT_ONLY,
TtlDuration.noTtl(),
TtlDuration.infinite(),
(ignoredK, ignoredV) -> Optional.empty()
);
}
Expand Down Expand Up @@ -131,7 +131,7 @@ public static TtlDuration of(final Duration ttl) {
}

// No ttl will be applied, in other words infinite retention
public static TtlDuration noTtl() {
public static TtlDuration infinite() {
return new TtlDuration(Duration.ZERO, Ttl.INFINITE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dev.responsive.kafka.internal.db.inmemory;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
import dev.responsive.kafka.internal.db.KVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.RemoteWriter;
Expand All @@ -22,7 +24,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryKVTable implements RemoteKVTable<Object> {
// Note: this class doesn't actually use BoundStatements and applies all operations immediately,
// we just stub the BoundStatement type so we can reuse this table for the TTD
public class InMemoryKVTable implements RemoteKVTable<BoundStatement> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it's worth noting that this table implementation doesn't actually apply ttl in terms of retention, only in terms of filtering results. I figure this is fine for the TTD since such test should be short-lived, but good to note in case we ever want to do anything else with this class 🤷‍♀️

private static final Logger LOG = LoggerFactory.getLogger(InMemoryKVTable.class);

private int kafkaPartition;
Expand Down Expand Up @@ -50,13 +54,16 @@ public byte[] get(final int kafkaPartition, final Bytes key, final long streamTi
return null;
}

final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

if (value.epochMillis() < minValidTs) {
return null;
if (ttlResolver.isPresent()) {
final TtlDuration rowTtl = ttlResolver.get().resolveTtl(key, value.value());
if (rowTtl.isFinite()) {
final long minValidTs = streamTimeMs - rowTtl.toMillis();
if (value.epochMillis < minValidTs) {
return null;
}
}
}

return value.value();
}

Expand All @@ -67,6 +74,11 @@ public KeyValueIterator<Bytes, byte[]> range(
final Bytes to,
final long streamTimeMs
) {
if (ttlResolver.isPresent() && !ttlResolver.get().hasDefaultOnly()) {
throw new UnsupportedOperationException("Row-level ttl is not yet supported for range "
+ "queries on in-memory tables or TTD");
}

checkKafkaPartition(kafkaPartition);

final var iter = store
Expand All @@ -84,6 +96,11 @@ public KeyValueIterator<Bytes, byte[]> range(

@Override
public KeyValueIterator<Bytes, byte[]> all(final int kafkaPartition, final long streamTimeMs) {
if (ttlResolver.isPresent() && !ttlResolver.get().hasDefaultOnly()) {
throw new UnsupportedOperationException("Row-level ttl is not yet supported for range "
+ "queries on in-memory tables or TTD");
}

checkKafkaPartition(kafkaPartition);
final var iter = store.entrySet().iterator();

Expand All @@ -106,15 +123,19 @@ public String name() {
}

@Override
public Object insert(int kafkaPartition, Bytes key, byte[] value, long epochMillis) {
public BoundStatement insert(int kafkaPartition, Bytes key, byte[] value, long epochMillis) {
checkKafkaPartition(kafkaPartition);
return store.put(key, new Value(epochMillis, value));

store.put(key, new Value(epochMillis, value));
return null;
}

@Override
public Object delete(int kafkaPartition, Bytes key) {
public BoundStatement delete(int kafkaPartition, Bytes key) {
checkKafkaPartition(kafkaPartition);
return store.remove(key);
store.remove(key);

return null;
}

@Override
Expand Down Expand Up @@ -152,17 +173,7 @@ public String tableName() {

@Override
public TablePartitioner<Bytes, Integer> partitioner() {
return new TablePartitioner<>() {
@Override
public Integer tablePartition(int kafkaPartition, Bytes key) {
return kafkaPartition;
}

@Override
public Integer metadataTablePartition(int kafkaPartition) {
return kafkaPartition;
}
};
return TablePartitioner.defaultPartitioner();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void shouldRespectSemanticKeyBasedTtl() throws Exception {
final var defaultTtl = Duration.ofMinutes(30);
final Function<String, Optional<TtlDuration>> ttlForKey = k -> {
if (k.equals("KEEP_FOREVER")) {
return Optional.of(TtlDuration.noTtl());
return Optional.of(TtlDuration.infinite());
} else if (k.endsWith("DEFAULT_TTL")) {
return Optional.empty();
} else {
Expand Down Expand Up @@ -264,7 +264,7 @@ public void shouldRespectSemanticKeyValueBasedTtl() throws Exception {
if (v.equals("DEFAULT")) {
return Optional.empty();
} else if (v.equals("NO_TTL")) {
return Optional.of(TtlDuration.noTtl());
return Optional.of(TtlDuration.infinite());
} else {
return Optional.of(TtlDuration.of(Duration.ofMinutes(Long.parseLong(v))));
}
Expand Down Expand Up @@ -345,7 +345,7 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception {
if (v.equals("DEFAULT")) {
return Optional.empty();
} else if (v.equals("NO_TTL")) {
return Optional.of(TtlDuration.noTtl());
return Optional.of(TtlDuration.infinite());
} else {
return Optional.of(TtlDuration.of(Duration.ofMinutes(Long.parseLong(v))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,21 @@ public ResponsiveTopologyTestDriver(
topology,
config,
initialWallClockTime,
new TTDCassandraClient(
new TTDMockAdmin(baseProps(config), topology),
mockTime(initialWallClockTime))
new TTDCassandraClient(new TTDMockAdmin(baseProps(config), topology))
);
}

/**
* Advances the internal mock time used for Responsive-specific features such as ttl, as well
* as the mock time used for various Kafka Streams functionality such as wall-clock punctuators.
* Advances the internal mock time which can be used to trigger some Kafka Streams
* functionality such as wall-clock punctuators, or force Responsive to flush
* its internal buffers.
Comment on lines +105 to +107
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to this "hack" we no longer require users to use this method to advance wall clock time and manually flush the CommitBuffer in their TTD apps. Generally this is only used for triggering punctuation in Streams so most people wouldn't already be using this in their apps, and will advance time via record timestamps

* See {@link TopologyTestDriver#advanceWallClockTime(Duration)} for more details.
*
* @param advance the amount of time to advance wall-clock time
*/
@Override
public void advanceWallClockTime(final Duration advance) {
client.advanceWallClockTime(advance);
client.flush();
super.advanceWallClockTime(advance);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,30 @@
import dev.responsive.kafka.internal.db.TTDWindowedTable;
import dev.responsive.kafka.internal.db.TableCache;
import dev.responsive.kafka.internal.db.WindowedTableCache;
import dev.responsive.kafka.internal.db.inmemory.InMemoryKVTable;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.RemoteMonitor;
import java.time.Duration;
import java.util.OptionalInt;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.utils.Time;

public class TTDCassandraClient extends CassandraClient {
private final Time time;
private final ResponsiveStoreRegistry storeRegistry = new ResponsiveStoreRegistry();
private final TTDMockAdmin admin;

private final TableCache<RemoteKVTable<BoundStatement>> kvFactory;
private final WindowedTableCache<RemoteWindowedTable<BoundStatement>> windowedFactory;

public TTDCassandraClient(final TTDMockAdmin admin, final Time time) {
public TTDCassandraClient(final TTDMockAdmin admin) {
super(loggedConfig(admin.props()));
this.time = time;
this.admin = admin;

kvFactory = new TableCache<>(spec -> TTDKeyValueTable.create(spec, this));
kvFactory = new TableCache<>(spec -> new TTDKeyValueTable(spec, this));
windowedFactory = new WindowedTableCache<>((spec, partitioner) -> TTDWindowedTable.create(spec,
this,
partitioner
));
}

public Time time() {
return time;
}

public ResponsiveStoreRegistry storeRegistry() {
return storeRegistry;
}
Expand All @@ -71,12 +64,7 @@ public TTDMockAdmin mockAdmin() {
return admin;
}

public void advanceWallClockTime(final Duration advance) {
flush();
time.sleep(advance.toMillis());
}

private void flush() {
public void flush() {
storeRegistry.stores().forEach(s -> s.onCommit().accept(0L));
}

Expand Down Expand Up @@ -109,9 +97,10 @@ public RemoteMonitor awaitTable(

@Override
public long count(final String tableName, final int tablePartition) {
final var kv = (TTDKeyValueTable) kvFactory.getTable(tableName);
final var kv = (InMemoryKVTable) kvFactory.getTable(tableName);
final var window = (TTDWindowedTable) windowedFactory.getTable(tableName);
return (kv == null ? 0 : kv.count()) + (window == null ? 0 : window.count());
return (kv == null ? 0 : kv.approximateNumEntries(tablePartition))
+ (window == null ? 0 : window.count());
}

@Override
Expand Down
Loading
Loading