Skip to content

Commit

Permalink
Row-level TTL PR 3: compute minValidTs from TtlResolver (#373)
Browse files Browse the repository at this point in the history
Short PR to move the computation of minValidTs from the upper layer of the KV stores to the inner Table implementations where we can account for the TtlResolver

Introduces the TtlResolver used to hook everything up within Responsive. Also cleans up the RemoteTableSpec
  • Loading branch information
ableegoldman authored Oct 28, 2024
1 parent 853518b commit 19ea566
Show file tree
Hide file tree
Showing 24 changed files with 326 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package dev.responsive.kafka.api.stores;

import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
import dev.responsive.kafka.internal.stores.SchemaTypes.KVSchema;
import dev.responsive.kafka.internal.utils.TableName;
import java.time.Duration;
Expand Down Expand Up @@ -71,13 +70,4 @@ public KVSchema schemaType() {
return ttlProvider;
}

public Optional<TtlDuration> defaultTimeToLive() {
if (ttlProvider.isPresent()) {
return Optional.ofNullable(ttlProvider.get().defaultTtl());

} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ public boolean isFinite() {
}

public long toSeconds() {
return duration.toSeconds();
return duration().toSeconds();
}

public long toMillis() {
return duration.toMillis();
return duration().toMillis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class CassandraFactTable implements RemoteKVTable<BoundStatement> {

private final String name;
private final CassandraClient client;
private final Optional<TtlResolver<?, ?>> ttlResolver;

private final PreparedStatement get;
private final PreparedStatement insert;
Expand All @@ -62,6 +63,7 @@ public class CassandraFactTable implements RemoteKVTable<BoundStatement> {
public CassandraFactTable(
final String name,
final CassandraClient client,
final Optional<TtlResolver<?, ?>> ttlResolver,
final PreparedStatement get,
final PreparedStatement insert,
final PreparedStatement delete,
Expand All @@ -70,6 +72,7 @@ public CassandraFactTable(
) {
this.name = name;
this.client = client;
this.ttlResolver = ttlResolver;
this.get = get;
this.insert = insert;
this.delete = delete;
Expand All @@ -82,10 +85,11 @@ public static CassandraFactTable create(
final CassandraClient client
) {
final String name = spec.tableName();
final var ttlResolver = spec.ttlResolver();
LOG.info("Creating fact data table {} in remote store.", name);

final CreateTableWithOptions createTable = spec.applyDefaultOptions(
createTable(name, spec.ttlResolver())
createTable(name, ttlResolver)
);

// separate metadata from the main table for the fact schema, this is acceptable
Expand Down Expand Up @@ -161,6 +165,7 @@ public static CassandraFactTable create(
return new CassandraFactTable(
name,
client,
ttlResolver,
get,
insert,
delete,
Expand Down Expand Up @@ -275,7 +280,11 @@ public BoundStatement insert(
}

@Override
public byte[] get(final int kafkaPartition, final Bytes key, long minValidTs) {
public byte[] get(final int kafkaPartition, final Bytes key, long streamTimeMs) {
final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

final BoundStatement get = this.get
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
Expand All @@ -297,14 +306,16 @@ public KeyValueIterator<Bytes, byte[]> range(
final int kafkaPartition,
final Bytes from,
final Bytes to,
long minValidTs) {
long streamTimeMs
) {
throw new UnsupportedOperationException("range scans are not supported on fact tables.");
}

@Override
public KeyValueIterator<Bytes, byte[]> all(
final int kafkaPartition,
long minValidTs) {
long streamTimeMs
) {
throw new UnsupportedOperationException("all is not supported on fact tables");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class CassandraKeyValueTable implements RemoteKVTable<BoundStatement> {
private final String name;
private final CassandraClient client;
private final SubPartitioner partitioner;
private final Optional<TtlResolver<?, ?>> ttlResolver;

private final PreparedStatement get;
private final PreparedStatement range;
Expand Down Expand Up @@ -214,6 +215,7 @@ public static CassandraKeyValueTable create(
name,
client,
(SubPartitioner) spec.partitioner(),
ttlResolver,
get,
range,
all,
Expand Down Expand Up @@ -255,6 +257,7 @@ public CassandraKeyValueTable(
final String name,
final CassandraClient client,
final SubPartitioner partitioner,
final Optional<TtlResolver<?, ?>> ttlResolver,
final PreparedStatement get,
final PreparedStatement range,
final PreparedStatement all,
Expand All @@ -269,6 +272,7 @@ public CassandraKeyValueTable(
this.name = name;
this.client = client;
this.partitioner = partitioner;
this.ttlResolver = ttlResolver;
this.get = get;
this.range = range;
this.all = all;
Expand Down Expand Up @@ -351,8 +355,12 @@ public CassandraKVFlushManager init(
public byte[] get(
final int kafkaPartition,
final Bytes key,
final long minValidTs
final long streamTimeMs
) {
final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

final int tablePartition = partitioner.tablePartition(kafkaPartition, key);

final BoundStatement get = this.get
Expand All @@ -377,8 +385,12 @@ public KeyValueIterator<Bytes, byte[]> range(
final int kafkaPartition,
final Bytes from,
final Bytes to,
final long minValidTs
final long streamTimeMs
) {
final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

// TODO: explore more efficient ways to serve bounded range queries, for now we have to
// iterate over all subpartitions and merge the results since we don't know which subpartitions
// hold keys within the given range
Expand All @@ -402,8 +414,12 @@ public KeyValueIterator<Bytes, byte[]> range(
@Override
public KeyValueIterator<Bytes, byte[]> all(
final int kafkaPartition,
final long minValidTs
final long streamTimeMs
) {
final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

final List<KeyValueIterator<Bytes, byte[]>> resultsPerPartition = new LinkedList<>();
for (final int partition : partitioner.allTablePartitions(kafkaPartition)) {
final BoundStatement range = this.all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ KVFlushManager init(
*
* @param kafkaPartition the kafka partition
* @param key the data key
* @param minValidTs the minimum valid timestamp to apply semantic TTL,
* in epochMillis
* @param streamTimeMs the current streamTime
*
* @return the value previously set
*/
byte[] get(int kafkaPartition, Bytes key, long minValidTs);
byte[] get(int kafkaPartition, Bytes key, long streamTimeMs);

/**
* Retrieves a range of key value pairs from the given {@code partitionKey} and
Expand All @@ -57,15 +56,15 @@ KVFlushManager init(
* @param kafkaPartition the kafka partition
* @param from the starting key (inclusive)
* @param to the ending key (inclusive)
* @param minValidTs the minimum timestamp, in epochMillis, to consider valid
* @param streamTimeMs the current streamTime
*
* @return an iterator of all key-value pairs in the range
*/
KeyValueIterator<Bytes, byte[]> range(
int kafkaPartition,
Bytes from,
Bytes to,
long minValidTs
long streamTimeMs
);

/**
Expand All @@ -77,11 +76,11 @@ KeyValueIterator<Bytes, byte[]> range(
* session).
*
* @param kafkaPartition the kafka partition
* @param minValidTs the minimum valid timestamp, in epochMilliis, to return
* @param streamTimeMs the current streamTime
*
* @return an iterator of all key-value pairs
*/
KeyValueIterator<Bytes, byte[]> all(int kafkaPartition, long minValidTs);
KeyValueIterator<Bytes, byte[]> all(int kafkaPartition, long streamTimeMs);

/**
* An approximate count of the total number of entries across all sub-partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentNavigableMap;
Expand All @@ -25,10 +27,12 @@ public class InMemoryKVTable implements RemoteKVTable<Object> {

private int kafkaPartition;
private final String name;
private final Optional<TtlResolver<?, ?>> ttlResolver;
private final ConcurrentNavigableMap<Bytes, Value> store = new ConcurrentSkipListMap<>();

public InMemoryKVTable(String name) {
public InMemoryKVTable(final String name, final Optional<TtlResolver<?, ?>> ttlResolver) {
this.name = Objects.requireNonNull(name);
this.ttlResolver = ttlResolver;
}

@Override
Expand All @@ -39,12 +43,17 @@ public KVFlushManager init(int kafkaPartition) {
}

@Override
public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) {
public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) {
checkKafkaPartition(kafkaPartition);
final var value = store.get(key);
if (value == null) {
return null;
}

final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

if (value.epochMillis() < minValidTs) {
return null;
}
Expand All @@ -56,21 +65,32 @@ public KeyValueIterator<Bytes, byte[]> range(
final int kafkaPartition,
final Bytes from,
final Bytes to,
final long minValidTs
final long streamTimeMs
) {
checkKafkaPartition(kafkaPartition);

final var iter = store
.tailMap(from, true)
.headMap(to, true)
.entrySet()
.iterator();

final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

return iteratorWithTimeFilter(iter, minValidTs);
}

@Override
public KeyValueIterator<Bytes, byte[]> all(final int kafkaPartition, final long minValidTs) {
public KeyValueIterator<Bytes, byte[]> all(final int kafkaPartition, final long streamTimeMs) {
checkKafkaPartition(kafkaPartition);
final var iter = store.entrySet().iterator();

final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();

return iteratorWithTimeFilter(iter, minValidTs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class MongoKVTable implements RemoteKVTable<WriteModel<KVDoc>> {

private final String name;
private final KeyCodec keyCodec;
private final Optional<TtlResolver<?, ?>> ttlResolver;
private final long defaultTtlMs;
private final MongoCollection<KVDoc> docs;
private final MongoCollection<KVMetadataDoc> metadata;

Expand All @@ -76,6 +78,7 @@ public MongoKVTable(
) {
this.name = name;
this.keyCodec = new StringKeyCodec();
this.ttlResolver = ttlResolver;
final CodecProvider pojoCodecProvider = PojoCodecProvider.builder().automatic(true).build();
final CodecRegistry pojoCodecRegistry = fromRegistries(
getDefaultCodecRegistry(),
Expand Down Expand Up @@ -106,13 +109,15 @@ public MongoKVTable(

if (ttlResolver.isPresent()) {
// TODO(sophie): account for infinite default ttl
final Duration expireAfter =
ttlResolver.get().defaultTtl().duration().plus(Duration.ofHours(12));
final long expireAfterSeconds = expireAfter.getSeconds();
this.defaultTtlMs = ttlResolver.get().defaultTtl().toMillis();
final long expireAfterMs = defaultTtlMs + Duration.ofHours(12).toMillis();

docs.createIndex(
Indexes.descending(KVDoc.TIMESTAMP),
new IndexOptions().expireAfter(expireAfterSeconds, TimeUnit.SECONDS)
new IndexOptions().expireAfter(expireAfterMs, TimeUnit.MILLISECONDS)
);
} else {
defaultTtlMs = 0L;
}
}

Expand Down Expand Up @@ -147,7 +152,9 @@ public MongoKVFlushManager init(final int kafkaPartition) {
}

@Override
public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) {
public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) {
final long minValidTs = ttlResolver.isEmpty() ? -1L : streamTimeMs - defaultTtlMs;

final KVDoc v = docs.find(Filters.and(
Filters.eq(KVDoc.ID, keyCodec.encode(key)),
Filters.gte(KVDoc.TIMESTAMP, minValidTs)
Expand All @@ -160,8 +167,10 @@ public KeyValueIterator<Bytes, byte[]> range(
final int kafkaPartition,
final Bytes from,
final Bytes to,
final long minValidTs
final long streamTimeMs
) {
final long minValidTs = ttlResolver.isEmpty() ? -1L : streamTimeMs - defaultTtlMs;

final FindIterable<KVDoc> result = docs.find(
Filters.and(
Filters.gte(KVDoc.ID, keyCodec.encode(from)),
Expand All @@ -180,7 +189,9 @@ public KeyValueIterator<Bytes, byte[]> range(
}

@Override
public KeyValueIterator<Bytes, byte[]> all(final int kafkaPartition, final long minValidTs) {
public KeyValueIterator<Bytes, byte[]> all(final int kafkaPartition, final long streamTimeMs) {
final long minValidTs = ttlResolver.isEmpty() ? -1L : streamTimeMs - defaultTtlMs;

final FindIterable<KVDoc> result = docs.find(Filters.and(
Filters.not(Filters.exists(KVDoc.TOMBSTONE_TS)),
Filters.gte(KVDoc.TIMESTAMP, minValidTs),
Expand Down
Loading

0 comments on commit 19ea566

Please sign in to comment.