Skip to content

Commit

Permalink
Row-level TTL PR 6: TopologyTestDriver (#376)
Browse files Browse the repository at this point in the history
Support for row-level ttl in the TopologyTestDriver, as well as the InMemoryKeyValueTable which we now use as the backing kv store for the Responsive TTD.
  • Loading branch information
ableegoldman authored Oct 29, 2024
1 parent ee59069 commit ce24a17
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static <K, V> TtlProvider<K, V> withDefault(final Duration defaultTtl) {
public static <K, V> TtlProvider<K, V> withNoDefault() {
return new TtlProvider<>(
TtlType.DEFAULT_ONLY,
TtlDuration.noTtl(),
TtlDuration.infinite(),
(ignoredK, ignoredV) -> Optional.empty()
);
}
Expand Down Expand Up @@ -131,7 +131,7 @@ public static TtlDuration of(final Duration ttl) {
}

// No ttl will be applied, in other words infinite retention
public static TtlDuration noTtl() {
public static TtlDuration infinite() {
return new TtlDuration(Duration.ZERO, Ttl.INFINITE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dev.responsive.kafka.internal.db.inmemory;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
import dev.responsive.kafka.internal.db.KVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.RemoteWriter;
Expand All @@ -22,7 +24,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryKVTable implements RemoteKVTable<Object> {
// Note: this class doesn't actually use BoundStatements and applies all operations immediately,
// we just stub the BoundStatement type so we can reuse this table for the TTD
public class InMemoryKVTable implements RemoteKVTable<BoundStatement> {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryKVTable.class);

private int kafkaPartition;
Expand Down Expand Up @@ -50,13 +54,16 @@ public byte[] get(final int kafkaPartition, final Bytes key, final long streamTi
return null;
}

final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

if (value.epochMillis() < minValidTs) {
return null;
if (ttlResolver.isPresent()) {
final TtlDuration rowTtl = ttlResolver.get().resolveTtl(key, value.value());
if (rowTtl.isFinite()) {
final long minValidTs = streamTimeMs - rowTtl.toMillis();
if (value.epochMillis < minValidTs) {
return null;
}
}
}

return value.value();
}

Expand All @@ -67,6 +74,11 @@ public KeyValueIterator<Bytes, byte[]> range(
final Bytes to,
final long streamTimeMs
) {
if (ttlResolver.isPresent() && !ttlResolver.get().hasDefaultOnly()) {
throw new UnsupportedOperationException("Row-level ttl is not yet supported for range "
+ "queries on in-memory tables or TTD");
}

checkKafkaPartition(kafkaPartition);

final var iter = store
Expand All @@ -84,6 +96,11 @@ public KeyValueIterator<Bytes, byte[]> range(

@Override
public KeyValueIterator<Bytes, byte[]> all(final int kafkaPartition, final long streamTimeMs) {
if (ttlResolver.isPresent() && !ttlResolver.get().hasDefaultOnly()) {
throw new UnsupportedOperationException("Row-level ttl is not yet supported for range "
+ "queries on in-memory tables or TTD");
}

checkKafkaPartition(kafkaPartition);
final var iter = store.entrySet().iterator();

Expand All @@ -106,15 +123,19 @@ public String name() {
}

@Override
public Object insert(int kafkaPartition, Bytes key, byte[] value, long epochMillis) {
public BoundStatement insert(int kafkaPartition, Bytes key, byte[] value, long epochMillis) {
checkKafkaPartition(kafkaPartition);
return store.put(key, new Value(epochMillis, value));

store.put(key, new Value(epochMillis, value));
return null;
}

@Override
public Object delete(int kafkaPartition, Bytes key) {
public BoundStatement delete(int kafkaPartition, Bytes key) {
checkKafkaPartition(kafkaPartition);
return store.remove(key);
store.remove(key);

return null;
}

@Override
Expand Down Expand Up @@ -152,17 +173,7 @@ public String tableName() {

@Override
public TablePartitioner<Bytes, Integer> partitioner() {
return new TablePartitioner<>() {
@Override
public Integer tablePartition(int kafkaPartition, Bytes key) {
return kafkaPartition;
}

@Override
public Integer metadataTablePartition(int kafkaPartition) {
return kafkaPartition;
}
};
return TablePartitioner.defaultPartitioner();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void shouldRespectSemanticKeyBasedTtl() throws Exception {
final var defaultTtl = Duration.ofMinutes(30);
final Function<String, Optional<TtlDuration>> ttlForKey = k -> {
if (k.equals("KEEP_FOREVER")) {
return Optional.of(TtlDuration.noTtl());
return Optional.of(TtlDuration.infinite());
} else if (k.endsWith("DEFAULT_TTL")) {
return Optional.empty();
} else {
Expand Down Expand Up @@ -264,7 +264,7 @@ public void shouldRespectSemanticKeyValueBasedTtl() throws Exception {
if (v.equals("DEFAULT")) {
return Optional.empty();
} else if (v.equals("NO_TTL")) {
return Optional.of(TtlDuration.noTtl());
return Optional.of(TtlDuration.infinite());
} else {
return Optional.of(TtlDuration.of(Duration.ofMinutes(Long.parseLong(v))));
}
Expand Down Expand Up @@ -345,7 +345,7 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception {
if (v.equals("DEFAULT")) {
return Optional.empty();
} else if (v.equals("NO_TTL")) {
return Optional.of(TtlDuration.noTtl());
return Optional.of(TtlDuration.infinite());
} else {
return Optional.of(TtlDuration.of(Duration.ofMinutes(Long.parseLong(v))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,21 @@ public ResponsiveTopologyTestDriver(
topology,
config,
initialWallClockTime,
new TTDCassandraClient(
new TTDMockAdmin(baseProps(config), topology),
mockTime(initialWallClockTime))
new TTDCassandraClient(new TTDMockAdmin(baseProps(config), topology))
);
}

/**
* Advances the internal mock time used for Responsive-specific features such as ttl, as well
* as the mock time used for various Kafka Streams functionality such as wall-clock punctuators.
* Advances the internal mock time which can be used to trigger some Kafka Streams
* functionality such as wall-clock punctuators, or force Responsive to flush
* its internal buffers.
* See {@link TopologyTestDriver#advanceWallClockTime(Duration)} for more details.
*
* @param advance the amount of time to advance wall-clock time
*/
@Override
public void advanceWallClockTime(final Duration advance) {
client.advanceWallClockTime(advance);
client.flush();
super.advanceWallClockTime(advance);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,30 @@
import dev.responsive.kafka.internal.db.TTDWindowedTable;
import dev.responsive.kafka.internal.db.TableCache;
import dev.responsive.kafka.internal.db.WindowedTableCache;
import dev.responsive.kafka.internal.db.inmemory.InMemoryKVTable;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.RemoteMonitor;
import java.time.Duration;
import java.util.OptionalInt;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.utils.Time;

public class TTDCassandraClient extends CassandraClient {
private final Time time;
private final ResponsiveStoreRegistry storeRegistry = new ResponsiveStoreRegistry();
private final TTDMockAdmin admin;

private final TableCache<RemoteKVTable<BoundStatement>> kvFactory;
private final WindowedTableCache<RemoteWindowedTable<BoundStatement>> windowedFactory;

public TTDCassandraClient(final TTDMockAdmin admin, final Time time) {
public TTDCassandraClient(final TTDMockAdmin admin) {
super(loggedConfig(admin.props()));
this.time = time;
this.admin = admin;

kvFactory = new TableCache<>(spec -> TTDKeyValueTable.create(spec, this));
kvFactory = new TableCache<>(spec -> new TTDKeyValueTable(spec, this));
windowedFactory = new WindowedTableCache<>((spec, partitioner) -> TTDWindowedTable.create(spec,
this,
partitioner
));
}

public Time time() {
return time;
}

public ResponsiveStoreRegistry storeRegistry() {
return storeRegistry;
}
Expand All @@ -71,12 +64,7 @@ public TTDMockAdmin mockAdmin() {
return admin;
}

public void advanceWallClockTime(final Duration advance) {
flush();
time.sleep(advance.toMillis());
}

private void flush() {
public void flush() {
storeRegistry.stores().forEach(s -> s.onCommit().accept(0L));
}

Expand Down Expand Up @@ -109,9 +97,10 @@ public RemoteMonitor awaitTable(

@Override
public long count(final String tableName, final int tablePartition) {
final var kv = (TTDKeyValueTable) kvFactory.getTable(tableName);
final var kv = (InMemoryKVTable) kvFactory.getTable(tableName);
final var window = (TTDWindowedTable) windowedFactory.getTable(tableName);
return (kv == null ? 0 : kv.count()) + (window == null ? 0 : window.count());
return (kv == null ? 0 : kv.approximateNumEntries(tablePartition))
+ (window == null ? 0 : window.count());
}

@Override
Expand Down
Loading

0 comments on commit ce24a17

Please sign in to comment.