Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-level ttl for KV stores #369

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,4 @@ public KVSchema schemaType() {
return ttlProvider;
}

public Optional<TtlDuration> defaultTimeToLive() {
if (ttlProvider.isPresent()) {
return Optional.ofNullable(ttlProvider.get().defaultTtl());

} else {
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static dev.responsive.kafka.internal.db.ColumnName.PARTITION_KEY;
import static dev.responsive.kafka.internal.db.ColumnName.ROW_TYPE;
import static dev.responsive.kafka.internal.db.ColumnName.TIMESTAMP;
import static dev.responsive.kafka.internal.db.ColumnName.TTL_SECONDS;
import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET;

import com.datastax.oss.driver.api.core.cql.BoundStatement;
Expand All @@ -32,10 +33,10 @@
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -47,31 +48,39 @@

public class CassandraFactTable implements RemoteKVTable<BoundStatement> {

private static final Logger LOG = LoggerFactory.getLogger(
CassandraFactTable.class);
private static final Logger LOG = LoggerFactory.getLogger(CassandraFactTable.class);

private final String name;
private final CassandraClient client;
private final Optional<TtlResolver<?, ?>> ttlResolver;

private final PreparedStatement get;
private final PreparedStatement getWithTimestamp;
private final PreparedStatement insert;
private final PreparedStatement insertWithTtl;
private final PreparedStatement delete;
private final PreparedStatement fetchOffset;
private final PreparedStatement setOffset;

public CassandraFactTable(
final String name,
final CassandraClient client,
final Optional<TtlResolver<?, ?>> ttlResolver,
final PreparedStatement get,
final PreparedStatement getWithTimestamp,
final PreparedStatement insert,
final PreparedStatement insertWithTtl,
final PreparedStatement delete,
final PreparedStatement fetchOffset,
final PreparedStatement setOffset
) {
this.name = name;
this.client = client;
this.ttlResolver = ttlResolver;
this.get = get;
this.getWithTimestamp = getWithTimestamp;
this.insert = insert;
this.insertWithTtl = insertWithTtl;
this.delete = delete;
this.fetchOffset = fetchOffset;
this.setOffset = setOffset;
Expand All @@ -84,8 +93,9 @@ public static CassandraFactTable create(
final String name = spec.tableName();
LOG.info("Creating fact data table {} in remote store.", name);

final Optional<TtlResolver<?, ?>> ttlResolver = spec.ttlResolver();
final CreateTableWithOptions createTable = spec.applyDefaultOptions(
createTable(name, spec.ttlResolver())
createTable(name, ttlResolver)
);

// separate metadata from the main table for the fact schema, this is acceptable
Expand Down Expand Up @@ -115,6 +125,18 @@ public static CassandraFactTable create(
QueryOp.WRITE
);

final var insertWithTtl = client.prepare(
QueryBuilder
.insertInto(name)
.value(ROW_TYPE.column(), RowType.DATA_ROW.literal())
.value(DATA_KEY.column(), bindMarker(DATA_KEY.bind()))
.value(TIMESTAMP.column(), bindMarker(TIMESTAMP.bind()))
.value(DATA_VALUE.column(), bindMarker(DATA_VALUE.bind()))
.usingTtl(bindMarker(TTL_SECONDS.bind()))
.build(),
QueryOp.WRITE
);

final var get = client.prepare(
QueryBuilder
.selectFrom(name)
Expand All @@ -129,6 +151,20 @@ public static CassandraFactTable create(
QueryOp.READ
);

final var getWithTimestamp = client.prepare(
QueryBuilder
.selectFrom(name)
.columns(DATA_VALUE.column(), TIMESTAMP.column())
.where(ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal()))
.where(DATA_KEY.relation().isEqualTo(bindMarker(DATA_KEY.bind())))
.where(TIMESTAMP.relation().isGreaterThanOrEqualTo(bindMarker(TIMESTAMP.bind())))
// ALLOW FILTERING is OK b/c the query only scans one partition (it actually only
// returns a single value)
.allowFiltering()
.build(),
QueryOp.READ
);

final var delete = client.prepare(
QueryBuilder
.deleteFrom(name)
Expand Down Expand Up @@ -161,8 +197,11 @@ public static CassandraFactTable create(
return new CassandraFactTable(
name,
client,
ttlResolver,
get,
getWithTimestamp,
insert,
insertWithTtl,
delete,
fetchOffset,
setOffset
Expand All @@ -178,7 +217,7 @@ private static CreateTableWithOptions createTable(
.ifNotExists()
.withPartitionKey(ROW_TYPE.column(), DataTypes.TINYINT)
.withPartitionKey(DATA_KEY.column(), DataTypes.BLOB)
.withColumn(TIMESTAMP.column(), DataTypes.TIMESTAMP)
.withColumn(TIMESTAMP.column(), DataTypes.BIGINT)
.withColumn(DATA_VALUE.column(), DataTypes.BLOB);

if (ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()) {
Expand Down Expand Up @@ -267,44 +306,98 @@ public BoundStatement insert(
final byte[] value,
final long epochMillis
) {
if (ttlResolver.isPresent()) {
final Optional<TtlDuration> rowTtl = ttlResolver.get().computeTtl(key, value);

if (rowTtl.isPresent()) {
return insertWithTtl
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
.setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value))
.setLong(TIMESTAMP.bind(), epochMillis)
.setInt(TTL_SECONDS.bind(), (int) rowTtl.get().toSeconds());
}
}

return insert
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
.setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value))
.setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(epochMillis));
.setLong(TIMESTAMP.bind(), epochMillis);
}

@Override
public byte[] get(final int kafkaPartition, final Bytes key, long minValidTs) {
final BoundStatement get = this.get
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
.setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTs));
public byte[] get(final int kafkaPartition, final Bytes key, long streamTimeMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the conditional complexity of this method is pretty high -- would be good to split it up into individual methods, perhaps follow the form:

if !ttlResolver { simpleGet() }
else if !ttlResolver.needsValueToComputeTtl() { getWithMinTtl() }
else { getWithPostFilterForTtl() }

long minValidTs = 0L;
if (ttlResolver.isPresent() && !ttlResolver.get().needsValueToComputeTtl()) {
final TtlDuration ttl = ttlResolver.get().resolveTtl(key, null);
if (ttl.isFinite()) {
minValidTs = streamTimeMs - ttl.toMillis();
}
}

final List<Row> result = client.execute(get).all();
if (result.size() > 1) {
throw new IllegalArgumentException();
} else if (result.isEmpty()) {
return null;
if (ttlResolver.isEmpty() || !ttlResolver.get().needsValueToComputeTtl()) {
final BoundStatement getQuery = get
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
.setLong(TIMESTAMP.bind(), minValidTs);
final List<Row> result = client.execute(getQuery).all();

if (result.size() > 1) {
throw new IllegalStateException("Received multiple results for the same key");
} else if (result.isEmpty()) {
return null;
} else {
return getValueFromRow(result.get(0));
}
} else {
final ByteBuffer value = result.get(0).getByteBuffer(DATA_VALUE.column());
return Objects.requireNonNull(value).array();
final BoundStatement getQuery = getWithTimestamp
.bind()
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get()))
.setLong(TIMESTAMP.bind(), minValidTs);
final List<Row> result = client.execute(getQuery).all();

if (result.size() > 1) {
throw new IllegalStateException("Received multiple results for the same key");
} else if (result.isEmpty()) {
return null;
}

final Row rowResult = result.get(0);
final byte[] value = getValueFromRow(rowResult);
final TtlDuration ttl = ttlResolver.get().resolveTtl(key, value);

if (ttl.isFinite()) {
final long minValidTsFromValue = streamTimeMs - ttl.toMillis();
final long recordTs = rowResult.getLong(TIMESTAMP.column());
if (recordTs < minValidTsFromValue) {
return null;
}
}

return value;
}
}

private byte[] getValueFromRow(final Row row) {
return Objects.requireNonNull(row.getByteBuffer(DATA_VALUE.column())).array();
}

@Override
public KeyValueIterator<Bytes, byte[]> range(
final int kafkaPartition,
final Bytes from,
final Bytes to,
long minValidTs) {
long streamTimeMs
) {
throw new UnsupportedOperationException("range scans are not supported on fact tables.");
}

@Override
public KeyValueIterator<Bytes, byte[]> all(
final int kafkaPartition,
long minValidTs) {
long streamTimeMs
) {
throw new UnsupportedOperationException("all is not supported on fact tables");
}

Expand Down
Loading
Loading