Skip to content

Commit

Permalink
Row-level TTL PR 4: CassandraFactTable implementation (#374)
Browse files Browse the repository at this point in the history
Implement row-level ttl for Scylla fact tables
  • Loading branch information
ableegoldman authored Oct 28, 2024
1 parent 19ea566 commit 09c25d7
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static dev.responsive.kafka.internal.db.ColumnName.PARTITION_KEY;
import static dev.responsive.kafka.internal.db.ColumnName.ROW_TYPE;
import static dev.responsive.kafka.internal.db.ColumnName.TIMESTAMP;
import static dev.responsive.kafka.internal.db.ColumnName.TTL_SECONDS;
import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
Expand All @@ -32,6 +33,7 @@
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.nio.ByteBuffer;
Expand All @@ -55,7 +57,9 @@ public class CassandraFactTable implements RemoteKVTable<BoundStatement> {
private final Optional<TtlResolver<?, ?>> ttlResolver;

private final PreparedStatement get;
private final PreparedStatement getWithTimestamp;
private final PreparedStatement insert;
private final PreparedStatement insertWithTtl;
private final PreparedStatement delete;
private final PreparedStatement fetchOffset;
private final PreparedStatement setOffset;
Expand All @@ -65,7 +69,9 @@ public CassandraFactTable(
final CassandraClient client,
final Optional<TtlResolver<?, ?>> ttlResolver,
final PreparedStatement get,
final PreparedStatement getWithTimestamp,
final PreparedStatement insert,
final PreparedStatement insertWithTtl,
final PreparedStatement delete,
final PreparedStatement fetchOffset,
final PreparedStatement setOffset
Expand All @@ -74,7 +80,9 @@ public CassandraFactTable(
this.client = client;
this.ttlResolver = ttlResolver;
this.get = get;
this.getWithTimestamp = getWithTimestamp;
this.insert = insert;
this.insertWithTtl = insertWithTtl;
this.delete = delete;
this.fetchOffset = fetchOffset;
this.setOffset = setOffset;
Expand Down Expand Up @@ -119,6 +127,18 @@ public static CassandraFactTable create(
QueryOp.WRITE
);

final var insertWithTtl = client.prepare(
QueryBuilder
.insertInto(name)
.value(ROW_TYPE.column(), RowType.DATA_ROW.literal())
.value(DATA_KEY.column(), bindMarker(DATA_KEY.bind()))
.value(TIMESTAMP.column(), bindMarker(TIMESTAMP.bind()))
.value(DATA_VALUE.column(), bindMarker(DATA_VALUE.bind()))
.usingTtl(bindMarker(TTL_SECONDS.bind()))
.build(),
QueryOp.WRITE
);

final var get = client.prepare(
QueryBuilder
.selectFrom(name)
Expand All @@ -133,6 +153,19 @@ public static CassandraFactTable create(
QueryOp.READ
);

final var getWithTimestamp = client.prepare(
QueryBuilder
.selectFrom(name)
.columns(DATA_VALUE.column(), TIMESTAMP.column())
.where(ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal()))
.where(DATA_KEY.relation().isEqualTo(bindMarker(DATA_KEY.bind())))
// ALLOW FILTERING is OK b/c the query only scans one partition (it actually only
// returns a single value)
.allowFiltering()
.build(),
QueryOp.READ
);

final var delete = client.prepare(
QueryBuilder
.deleteFrom(name)
Expand Down Expand Up @@ -167,7 +200,9 @@ public static CassandraFactTable create(
client,
ttlResolver,
get,
getWithTimestamp,
insert,
insertWithTtl,
delete,
fetchOffset,
setOffset
Expand Down Expand Up @@ -272,6 +307,27 @@ public BoundStatement insert(
final byte[] value,
final long epochMillis
) {
if (ttlResolver.isPresent()) {
final Optional<TtlDuration> rowTtl = ttlResolver.get().computeTtl(key, value);

// If user happens to return same ttl value as the default, skip applying it at
// the row level since this is less efficient in Scylla
if (rowTtl.isPresent() && !rowTtl.get().equals(ttlResolver.get().defaultTtl())) {

// You can set the row ttl to 0 in Scylla to apply no/infinite ttl
final int rowTtlOverrideSeconds = rowTtl.get().isFinite()
? (int) rowTtl.get().toSeconds()
: 0;

return insertWithTtl
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
.setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value))
.setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(epochMillis))
.setInt(TTL_SECONDS.bind(), rowTtlOverrideSeconds);
}
}

return insert
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
Expand All @@ -281,24 +337,82 @@ public BoundStatement insert(

@Override
public byte[] get(final int kafkaPartition, final Bytes key, long streamTimeMs) {
final long minValidTs = ttlResolver.isEmpty()
? -1L
: streamTimeMs - ttlResolver.get().defaultTtl().toMillis();
if (ttlResolver.isEmpty()) {
return simpleGet(key);
} else if (ttlResolver.get().needsValueToComputeTtl()) {
return postFilterGet(key, streamTimeMs);
} else {
final TtlDuration ttl = ttlResolver.get().resolveTtl(key, null);
if (ttl.isFinite()) {
final long minValidTimeMs = streamTimeMs - ttl.toMillis();
return preFilterGet(key, minValidTimeMs);
} else {
return simpleGet(key);
}
}
}

final BoundStatement get = this.get
/**
* Simple "get" with no filtering for when ttl is infinite or there is no ttl at all
*/
private byte[] simpleGet(final Bytes key) {
// Just delegate to the preFilterGet with a min valid timestamp of -1
// since this should not exclude anything since it's not worth having
// a third "get" PreparedStatement without the gte(timestamp) filter
return preFilterGet(key, -1L);
}

/**
* Simple "get" with server-side filtering on ttl. Used when ttl is possible
* to compute based on the key alone, is default only, or has no ttl at all
*/
private byte[] preFilterGet(final Bytes key, final long minValidTimeMs) {
final BoundStatement getQuery = get
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
.setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTs));
.setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTimeMs));

final List<Row> result = client.execute(getQuery).all();

final List<Row> result = client.execute(get).all();
if (result.size() > 1) {
throw new IllegalArgumentException();
throw new IllegalStateException("Received multiple results for the same key");
} else if (result.isEmpty()) {
return null;
} else {
final ByteBuffer value = result.get(0).getByteBuffer(DATA_VALUE.column());
return Objects.requireNonNull(value).array();
return getValueFromRow(result.get(0));
}
}

private byte[] postFilterGet(final Bytes key, long streamTimeMs) {
final BoundStatement getQuery = getWithTimestamp
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()));

final List<Row> result = client.execute(getQuery).all();

if (result.size() > 1) {
throw new IllegalStateException("Received multiple results for the same key");
} else if (result.isEmpty()) {
return null;
}

final Row rowResult = result.get(0);
final byte[] value = getValueFromRow(rowResult);
final TtlDuration ttl = ttlResolver.get().resolveTtl(key, value);

if (ttl.isFinite()) {
final long minValidTsFromValue = streamTimeMs - ttl.toMillis();
final long recordTs = rowResult.getInstant(TIMESTAMP.column()).toEpochMilli();
if (recordTs < minValidTsFromValue) {
return null;
}
}

return value;
}

private byte[] getValueFromRow(final Row row) {
return Objects.requireNonNull(row.getByteBuffer(DATA_VALUE.column())).array();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import com.datastax.oss.driver.internal.querybuilder.schema.compaction.DefaultLeveledCompactionStrategy;
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
Expand Down Expand Up @@ -425,8 +424,7 @@ private static CreateTableWithOptions createTable(final String tableName) {
.withColumn(DATA_VALUE.column(), DataTypes.BLOB)
.withColumn(OFFSET.column(), DataTypes.BIGINT)
.withColumn(EPOCH.column(), DataTypes.BIGINT)
.withColumn(STREAM_TIME.column(), DataTypes.BIGINT)
.withCompaction(new DefaultLeveledCompactionStrategy()); // TODO: create a LCSTableSpec?
.withColumn(STREAM_TIME.column(), DataTypes.BIGINT);
}

public CassandraWindowedTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum ColumnName {
EPOCH("epoch", "epoch"),
STREAM_TIME("streamTime", "streamtime"),
WINDOW_START("windowStart", "windowstart", ts -> timestamp((long) ts)),
TIMESTAMP("ts", "ts", ts -> timestamp((long) ts));
TIMESTAMP("ts", "ts", ts -> timestamp((long) ts)),
TTL_SECONDS("ttl", "ttl", ttl -> ttlSeconds((int) ttl));

static final Bytes METADATA_KEY
= Bytes.wrap("_metadata".getBytes(StandardCharsets.UTF_8));
Expand All @@ -61,6 +62,10 @@ private static Literal timestamp(final long ts) {
return QueryBuilder.literal(Instant.ofEpochMilli(ts));
}

private static Literal ttlSeconds(final int ttl) {
return QueryBuilder.literal(ttl);
}

ColumnName(final String column, final String bind) {
this(column, bind, QueryBuilder::literal);
}
Expand Down
Loading

0 comments on commit 09c25d7

Please sign in to comment.