diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java index 513d2ddd1..48bab920f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java @@ -47,7 +47,7 @@ public KeyValueStore get() { if (isTimestamped) { return new ResponsiveTimestampedKeyValueStore(params); } else { - return new ResponsiveKeyValueStore(params, isTimestamped); + return new ResponsiveKeyValueStore(params, false); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java index c5af98bc9..287af289c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueParams.java @@ -71,13 +71,4 @@ public KVSchema schemaType() { return ttlProvider; } - public Optional defaultTimeToLive() { - if (ttlProvider.isPresent()) { - return Optional.ofNullable(ttlProvider.get().defaultTtl()); - - } else { - return Optional.empty(); - } - } - } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java index 2d481a0f7..2e58bcb0d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java @@ -110,6 +110,9 @@ public TtlProvider fromValue( valueSerde); } + /** + * @return the same TtlProvider with a key-and-value-based override function + */ public TtlProvider fromKeyAndValue( final BiFunction> computeTtlFromKeyAndValue, final Serde keySerde, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java index 362688ee3..32dd44060 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraFactTable.java @@ -23,6 +23,7 @@ import static dev.responsive.kafka.internal.db.ColumnName.PARTITION_KEY; import static dev.responsive.kafka.internal.db.ColumnName.ROW_TYPE; import static dev.responsive.kafka.internal.db.ColumnName.TIMESTAMP; +import static dev.responsive.kafka.internal.db.ColumnName.TTL_SECONDS; import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; import com.datastax.oss.driver.api.core.cql.BoundStatement; @@ -32,10 +33,10 @@ import com.datastax.oss.driver.api.querybuilder.QueryBuilder; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions; +import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration; import dev.responsive.kafka.internal.db.spec.RemoteTableSpec; import dev.responsive.kafka.internal.stores.TtlResolver; import java.nio.ByteBuffer; -import java.time.Instant; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -47,14 +48,16 @@ public class CassandraFactTable implements RemoteKVTable { - private static final Logger LOG = LoggerFactory.getLogger( - CassandraFactTable.class); + private static final Logger LOG = LoggerFactory.getLogger(CassandraFactTable.class); private final String name; private final CassandraClient client; + private final Optional> ttlResolver; private final PreparedStatement get; + private final PreparedStatement getWithTimestamp; private final PreparedStatement insert; + private final PreparedStatement insertWithTtl; private final PreparedStatement delete; private final PreparedStatement fetchOffset; private final PreparedStatement setOffset; @@ -62,16 +65,22 @@ public class CassandraFactTable implements RemoteKVTable { public CassandraFactTable( final String name, final CassandraClient client, + final Optional> ttlResolver, final PreparedStatement get, + final PreparedStatement getWithTimestamp, final PreparedStatement insert, + final PreparedStatement insertWithTtl, final PreparedStatement delete, final PreparedStatement fetchOffset, final PreparedStatement setOffset ) { this.name = name; this.client = client; + this.ttlResolver = ttlResolver; this.get = get; + this.getWithTimestamp = getWithTimestamp; this.insert = insert; + this.insertWithTtl = insertWithTtl; this.delete = delete; this.fetchOffset = fetchOffset; this.setOffset = setOffset; @@ -84,6 +93,7 @@ public static CassandraFactTable create( final String name = spec.tableName(); LOG.info("Creating fact data table {} in remote store.", name); + final Optional> ttlResolver = spec.ttlResolver(); final CreateTableWithOptions createTable = spec.applyDefaultOptions( createTable(name, spec.ttlResolver()) ); @@ -115,6 +125,18 @@ public static CassandraFactTable create( QueryOp.WRITE ); + final var insertWithTtl = client.prepare( + QueryBuilder + .insertInto(name) + .value(ROW_TYPE.column(), RowType.DATA_ROW.literal()) + .value(DATA_KEY.column(), bindMarker(DATA_KEY.bind())) + .value(TIMESTAMP.column(), bindMarker(TIMESTAMP.bind())) + .value(DATA_VALUE.column(), bindMarker(DATA_VALUE.bind())) + .usingTtl(bindMarker(TTL_SECONDS.bind())) + .build(), + QueryOp.WRITE + ); + final var get = client.prepare( QueryBuilder .selectFrom(name) @@ -129,6 +151,20 @@ public static CassandraFactTable create( QueryOp.READ ); + final var getWithTimestamp = client.prepare( + QueryBuilder + .selectFrom(name) + .columns(DATA_VALUE.column(), TIMESTAMP.column()) + .where(ROW_TYPE.relation().isEqualTo(RowType.DATA_ROW.literal())) + .where(DATA_KEY.relation().isEqualTo(bindMarker(DATA_KEY.bind()))) + .where(TIMESTAMP.relation().isGreaterThanOrEqualTo(bindMarker(TIMESTAMP.bind()))) + // ALLOW FILTERING is OK b/c the query only scans one partition (it actually only + // returns a single value) + .allowFiltering() + .build(), + QueryOp.READ + ); + final var delete = client.prepare( QueryBuilder .deleteFrom(name) @@ -161,8 +197,11 @@ public static CassandraFactTable create( return new CassandraFactTable( name, client, + ttlResolver, get, + getWithTimestamp, insert, + insertWithTtl, delete, fetchOffset, setOffset @@ -178,7 +217,7 @@ private static CreateTableWithOptions createTable( .ifNotExists() .withPartitionKey(ROW_TYPE.column(), DataTypes.TINYINT) .withPartitionKey(DATA_KEY.column(), DataTypes.BLOB) - .withColumn(TIMESTAMP.column(), DataTypes.TIMESTAMP) + .withColumn(TIMESTAMP.column(), DataTypes.BIGINT) .withColumn(DATA_VALUE.column(), DataTypes.BLOB); if (ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()) { @@ -267,44 +306,98 @@ public BoundStatement insert( final byte[] value, final long epochMillis ) { + if (ttlResolver.isPresent()) { + final Optional rowTtl = ttlResolver.get().computeTtl(key, value); + + if (rowTtl.isPresent()) { + return insertWithTtl + .bind() + .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) + .setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value)) + .setLong(TIMESTAMP.bind(), epochMillis) + .setInt(TTL_SECONDS.bind(), (int) rowTtl.get().toSeconds()); + } + } + return insert .bind() .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) .setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value)) - .setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(epochMillis)); + .setLong(TIMESTAMP.bind(), epochMillis); } @Override - public byte[] get(final int kafkaPartition, final Bytes key, long minValidTs) { - final BoundStatement get = this.get - .bind() - .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) - .setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTs)); + public byte[] get(final int kafkaPartition, final Bytes key, long streamTimeMs) { + long minValidTs = 0L; + if (ttlResolver.isPresent() && !ttlResolver.get().needsValueToComputeTtl()) { + final TtlDuration ttl = ttlResolver.get().resolveTtl(key, null); + if (ttl.isFinite()) { + minValidTs = streamTimeMs - ttl.toMillis(); + } + } - final List result = client.execute(get).all(); - if (result.size() > 1) { - throw new IllegalArgumentException(); - } else if (result.isEmpty()) { - return null; + if (ttlResolver.isEmpty() || !ttlResolver.get().needsValueToComputeTtl()) { + final BoundStatement getQuery = get + .bind() + .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) + .setLong(TIMESTAMP.bind(), minValidTs); + final List result = client.execute(getQuery).all(); + + if (result.size() > 1) { + throw new IllegalStateException("Received multiple results for the same key"); + } else if (result.isEmpty()) { + return null; + } else { + return getValueFromRow(result.get(0)); + } } else { - final ByteBuffer value = result.get(0).getByteBuffer(DATA_VALUE.column()); - return Objects.requireNonNull(value).array(); + final BoundStatement getQuery = getWithTimestamp + .bind() + .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) + .setLong(TIMESTAMP.bind(), minValidTs); + final List result = client.execute(getQuery).all(); + + if (result.size() > 1) { + throw new IllegalStateException("Received multiple results for the same key"); + } else if (result.isEmpty()) { + return null; + } + + final Row rowResult = result.get(0); + final byte[] value = getValueFromRow(rowResult); + final TtlDuration ttl = ttlResolver.get().resolveTtl(key, value); + + if (ttl.isFinite()) { + final long minValidTsFromValue = streamTimeMs - ttl.toMillis(); + final long recordTs = rowResult.getLong(TIMESTAMP.column()); + if (recordTs < minValidTsFromValue) { + return null; + } + } + + return value; } } + private byte[] getValueFromRow(final Row row) { + return Objects.requireNonNull(row.getByteBuffer(DATA_VALUE.column())).array(); + } + @Override public KeyValueIterator range( final int kafkaPartition, final Bytes from, final Bytes to, - long minValidTs) { + long streamTimeMs + ) { throw new UnsupportedOperationException("range scans are not supported on fact tables."); } @Override public KeyValueIterator all( final int kafkaPartition, - long minValidTs) { + long streamTimeMs + ) { throw new UnsupportedOperationException("all is not supported on fact tables"); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java index d3b303730..9ddfab268 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java @@ -26,6 +26,7 @@ import static dev.responsive.kafka.internal.db.ColumnName.PARTITION_KEY; import static dev.responsive.kafka.internal.db.ColumnName.ROW_TYPE; import static dev.responsive.kafka.internal.db.ColumnName.TIMESTAMP; +import static dev.responsive.kafka.internal.db.ColumnName.TTL_SECONDS; import static dev.responsive.kafka.internal.db.RowType.DATA_ROW; import static dev.responsive.kafka.internal.db.RowType.METADATA_ROW; import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; @@ -38,15 +39,17 @@ import com.datastax.oss.driver.api.querybuilder.QueryBuilder; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions; +import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration; import dev.responsive.kafka.internal.db.partitioning.SubPartitioner; import dev.responsive.kafka.internal.db.spec.RemoteTableSpec; +import dev.responsive.kafka.internal.stores.TtlResolver; import dev.responsive.kafka.internal.utils.Iterators; import java.nio.ByteBuffer; import java.time.Duration; -import java.time.Instant; import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeoutException; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -64,11 +67,14 @@ public class CassandraKeyValueTable implements RemoteKVTable { private final String name; private final CassandraClient client; private final SubPartitioner partitioner; + private final Optional> ttlResolver; private final PreparedStatement get; + private final PreparedStatement getWithTimestamp; private final PreparedStatement range; private final PreparedStatement all; private final PreparedStatement insert; + private final PreparedStatement insertWithTtl; private final PreparedStatement delete; private final PreparedStatement fetchOffset; private final PreparedStatement setOffset; @@ -81,8 +87,9 @@ public static CassandraKeyValueTable create( final CassandraClient client ) throws InterruptedException, TimeoutException { final String name = spec.tableName(); + final var ttlResolver = spec.ttlResolver(); LOG.info("Creating data table {} in remote store.", name); - client.execute(spec.applyDefaultOptions(createTable(name)).build()); + client.execute(spec.applyDefaultOptions(createTable(name, ttlResolver)).build()); client.awaitTable(name).await(Duration.ofSeconds(60)); @@ -98,6 +105,19 @@ public static CassandraKeyValueTable create( QueryOp.WRITE ); + final var insertWithTtl = client.prepare( + QueryBuilder + .insertInto(name) + .value(PARTITION_KEY.column(), bindMarker(PARTITION_KEY.bind())) + .value(ROW_TYPE.column(), RowType.DATA_ROW.literal()) + .value(DATA_KEY.column(), bindMarker(DATA_KEY.bind())) + .value(TIMESTAMP.column(), bindMarker(TIMESTAMP.bind())) + .value(DATA_VALUE.column(), bindMarker(DATA_VALUE.bind())) + .usingTtl(bindMarker(TTL_SECONDS.bind())) + .build(), + QueryOp.WRITE + ); + final var get = client.prepare( QueryBuilder .selectFrom(name) @@ -112,6 +132,20 @@ public static CassandraKeyValueTable create( QueryOp.READ ); + final var getWithTimestamp = client.prepare( + QueryBuilder + .selectFrom(name) + .columns(DATA_VALUE.column(), TIMESTAMP.column()) + .where(PARTITION_KEY.relation().isEqualTo(bindMarker(PARTITION_KEY.bind()))) + .where(ROW_TYPE.relation().isEqualTo(DATA_ROW.literal())) + .where(DATA_KEY.relation().isEqualTo(bindMarker(DATA_KEY.bind()))) + .where(TIMESTAMP.relation().isGreaterThanOrEqualTo(bindMarker(TIMESTAMP.bind()))) + // ALLOW FILTERING is OK b/c the query only scans one partition + .allowFiltering() + .build(), + QueryOp.READ + ); + final var range = client.prepare( QueryBuilder .selectFrom(name) @@ -211,10 +245,13 @@ public static CassandraKeyValueTable create( name, client, (SubPartitioner) spec.partitioner(), + ttlResolver, get, + getWithTimestamp, range, all, insert, + insertWithTtl, delete, fetchOffset, setOffset, @@ -224,8 +261,11 @@ public static CassandraKeyValueTable create( ); } - private static CreateTableWithOptions createTable(final String tableName) { - return SchemaBuilder + private static CreateTableWithOptions createTable( + final String tableName, + final Optional> ttlResolver + ) { + final var baseOptions = SchemaBuilder .createTable(tableName) .ifNotExists() .withPartitionKey(PARTITION_KEY.column(), DataTypes.INT) @@ -234,7 +274,14 @@ private static CreateTableWithOptions createTable(final String tableName) { .withColumn(DATA_VALUE.column(), DataTypes.BLOB) .withColumn(OFFSET.column(), DataTypes.BIGINT) .withColumn(EPOCH.column(), DataTypes.BIGINT) - .withColumn(TIMESTAMP.column(), DataTypes.TIMESTAMP); + .withColumn(TIMESTAMP.column(), DataTypes.BIGINT); + + if (ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()) { + final int defaultTtlSeconds = (int) ttlResolver.get().defaultTtl().toSeconds(); + return baseOptions.withDefaultTimeToLiveSeconds(defaultTtlSeconds); + } else { + return baseOptions; + } } // Visible for Testing @@ -242,10 +289,13 @@ public CassandraKeyValueTable( final String name, final CassandraClient client, final SubPartitioner partitioner, + final Optional> ttlResolver, final PreparedStatement get, + final PreparedStatement getWithTimestamp, final PreparedStatement range, final PreparedStatement all, final PreparedStatement insert, + final PreparedStatement insertWithTtl, final PreparedStatement delete, final PreparedStatement fetchOffset, final PreparedStatement setOffset, @@ -256,10 +306,13 @@ public CassandraKeyValueTable( this.name = name; this.client = client; this.partitioner = partitioner; + this.ttlResolver = ttlResolver; this.get = get; + this.getWithTimestamp = getWithTimestamp; this.range = range; this.all = all; this.insert = insert; + this.insertWithTtl = insertWithTtl; this.delete = delete; this.fetchOffset = fetchOffset; this.setOffset = setOffset; @@ -338,39 +391,82 @@ public CassandraKVFlushManager init( public byte[] get( final int kafkaPartition, final Bytes key, - final long minValidTs + final long streamTimeMs ) { final int tablePartition = partitioner.tablePartition(kafkaPartition, key); - final BoundStatement get = this.get - .bind() - .setInt(PARTITION_KEY.bind(), tablePartition) - .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) - .setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTs)); + long minValidTs = 0L; + if (ttlResolver.isPresent() && !ttlResolver.get().needsValueToComputeTtl()) { + final TtlDuration ttl = ttlResolver.get().resolveTtl(key, null); + if (ttl.isFinite()) { + minValidTs = streamTimeMs - ttl.toMillis(); + } + } - final List result = client.execute(get).all(); - if (result.size() > 1) { - throw new IllegalStateException("Unexpected multiple results for point lookup"); - } else if (result.isEmpty()) { - return null; + if (ttlResolver.isEmpty() || !ttlResolver.get().needsValueToComputeTtl()) { + final BoundStatement getQuery = get + .bind() + .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) + .setInt(PARTITION_KEY.bind(), tablePartition) + .setLong(TIMESTAMP.bind(), minValidTs); + final List result = client.execute(getQuery).all(); + + if (result.size() > 1) { + throw new IllegalStateException("Received multiple results for the same key"); + } else if (result.isEmpty()) { + return null; + } else { + return getValueFromRow(result.get(0)); + } } else { - final ByteBuffer value = result.get(0).getByteBuffer(DATA_VALUE.column()); - return Objects.requireNonNull(value).array(); + final BoundStatement getQuery = getWithTimestamp + .bind() + .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) + .setInt(PARTITION_KEY.bind(), tablePartition) + .setLong(TIMESTAMP.bind(), minValidTs); + final List result = client.execute(getQuery).all(); + + if (result.size() > 1) { + throw new IllegalStateException("Received multiple results for the same key"); + } else if (result.isEmpty()) { + return null; + } + + final Row rowResult = result.get(0); + final byte[] value = getValueFromRow(rowResult); + final TtlDuration ttl = ttlResolver.get().resolveTtl(key, value); + + if (ttl.isFinite()) { + final long minValidTsFromValue = streamTimeMs - ttl.toMillis(); + final long recordTs = rowResult.getLong(TIMESTAMP.column()); + if (recordTs < minValidTsFromValue) { + return null; + } + } + + return value; } } + private byte[] getValueFromRow(final Row row) { + return Objects.requireNonNull(row.getByteBuffer(DATA_VALUE.column())).array(); + } + @Override public KeyValueIterator range( final int kafkaPartition, final Bytes from, final Bytes to, - final long minValidTs + final long streamTimeMs ) { // TODO: explore more efficient ways to serve bounded range queries, for now we have to // iterate over all subpartitions and merge the results since we don't know which subpartitions // hold keys within the given range // One option would be to configure the partitioner with an alternative hasher that's optimized // for range queries with a Comparator-aware key-->subpartition mapping strategy. + + // TODO(sophie): filter by minValidTs if based on key or default only + final long minValidTs = 0L; final List> resultsPerPartition = new LinkedList<>(); for (final int partition : partitioner.allTablePartitions(kafkaPartition)) { final BoundStatement range = this.range @@ -378,29 +474,33 @@ public KeyValueIterator range( .setInt(PARTITION_KEY.bind(), partition) .setByteBuffer(FROM_BIND, ByteBuffer.wrap(from.get())) .setByteBuffer(TO_BIND, ByteBuffer.wrap(to.get())) - .setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTs)); + .setLong(TIMESTAMP.bind(), minValidTs); final ResultSet result = client.execute(range); resultsPerPartition.add(Iterators.kv(result.iterator(), CassandraKeyValueTable::rows)); } + // TODO(sophie): filter by minValidTs if based on value return Iterators.wrapped(resultsPerPartition); } @Override public KeyValueIterator all( final int kafkaPartition, - final long minValidTs + final long streamTimeMs ) { + // TODO(sophie): filter by minValidTs if based on key or default only + final long minValidTs = 0L; final List> resultsPerPartition = new LinkedList<>(); for (final int partition : partitioner.allTablePartitions(kafkaPartition)) { final BoundStatement range = this.all .bind() .setInt(PARTITION_KEY.bind(), partition) - .setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(minValidTs)); + .setLong(TIMESTAMP.bind(), minValidTs); final ResultSet result = client.execute(range); resultsPerPartition.add(Iterators.kv(result.iterator(), CassandraKeyValueTable::rows)); } + // TODO(sophie): filter by minValidTs if based on value return Iterators.wrapped(resultsPerPartition); } @@ -412,11 +512,26 @@ public BoundStatement insert( final long epochMillis ) { final int tablePartition = partitioner.tablePartition(kafkaPartition, key); + + if (ttlResolver.isPresent()) { + final Optional rowTtl = ttlResolver.get().computeTtl(key, value); + + if (rowTtl.isPresent()) { + return insertWithTtl + .bind() + .setInt(PARTITION_KEY.bind(), tablePartition) + .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) + .setLong(TIMESTAMP.bind(), epochMillis) + .setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value)) + .setInt(TTL_SECONDS.bind(), (int) rowTtl.get().toSeconds()); + } + } + return insert .bind() .setInt(PARTITION_KEY.bind(), tablePartition) .setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.get())) - .setInstant(TIMESTAMP.bind(), Instant.ofEpochMilli(epochMillis)) + .setLong(TIMESTAMP.bind(), epochMillis) .setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value)); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java index e39bea605..b68fa50ff 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java @@ -42,7 +42,6 @@ import com.datastax.oss.driver.api.querybuilder.QueryBuilder; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions; -import com.datastax.oss.driver.internal.querybuilder.schema.compaction.DefaultLeveledCompactionStrategy; import dev.responsive.kafka.internal.db.partitioning.Segmenter; import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner; import dev.responsive.kafka.internal.db.spec.RemoteTableSpec; @@ -425,8 +424,7 @@ private static CreateTableWithOptions createTable(final String tableName) { .withColumn(DATA_VALUE.column(), DataTypes.BLOB) .withColumn(OFFSET.column(), DataTypes.BIGINT) .withColumn(EPOCH.column(), DataTypes.BIGINT) - .withColumn(STREAM_TIME.column(), DataTypes.BIGINT) - .withCompaction(new DefaultLeveledCompactionStrategy()); // TODO: create a LCSTableSpec? + .withColumn(STREAM_TIME.column(), DataTypes.BIGINT); } public CassandraWindowedTable( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/ColumnName.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/ColumnName.java index 579ebca76..7d43c17c0 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/ColumnName.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/ColumnName.java @@ -22,7 +22,6 @@ import com.datastax.oss.driver.api.querybuilder.relation.Relation; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.time.Instant; import java.util.function.Function; import org.apache.kafka.common.utils.Bytes; @@ -40,7 +39,8 @@ public enum ColumnName { EPOCH("epoch", "epoch"), STREAM_TIME("streamTime", "streamtime"), WINDOW_START("windowStart", "windowstart", ts -> timestamp((long) ts)), - TIMESTAMP("ts", "ts", ts -> timestamp((long) ts)); + TIMESTAMP("ts", "ts", ts -> timestamp((long) ts)), + TTL_SECONDS("ttl", "ttl", ttl -> ttlSeconds((int) ttl)); static final Bytes METADATA_KEY = Bytes.wrap("_metadata".getBytes(StandardCharsets.UTF_8)); @@ -58,7 +58,11 @@ private static Literal bytes(final Bytes b) { } private static Literal timestamp(final long ts) { - return QueryBuilder.literal(Instant.ofEpochMilli(ts)); + return QueryBuilder.literal(ts); + } + + private static Literal ttlSeconds(final int ttl) { + return QueryBuilder.literal(ttl); } ColumnName(final String column, final String bind) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java index a352540ce..8e6fdc2fa 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteKVTable.java @@ -38,12 +38,11 @@ KVFlushManager init( * * @param kafkaPartition the kafka partition * @param key the data key - * @param minValidTs the minimum valid timestamp to apply semantic TTL, - * in epochMillis + * @param streamTimeMs the current streamTime * * @return the value previously set */ - byte[] get(int kafkaPartition, Bytes key, long minValidTs); + byte[] get(int kafkaPartition, Bytes key, long streamTimeMs); /** * Retrieves a range of key value pairs from the given {@code partitionKey} and @@ -57,7 +56,7 @@ KVFlushManager init( * @param kafkaPartition the kafka partition * @param from the starting key (inclusive) * @param to the ending key (inclusive) - * @param minValidTs the minimum timestamp, in epochMillis, to consider valid + * @param streamTimeMs the current streamTime * * @return an iterator of all key-value pairs in the range */ @@ -65,7 +64,7 @@ KeyValueIterator range( int kafkaPartition, Bytes from, Bytes to, - long minValidTs + long streamTimeMs ); /** @@ -77,11 +76,11 @@ KeyValueIterator range( * session). * * @param kafkaPartition the kafka partition - * @param minValidTs the minimum valid timestamp, in epochMilliis, to return + * @param streamTimeMs the current streamTime * * @return an iterator of all key-value pairs */ - KeyValueIterator all(int kafkaPartition, long minValidTs); + KeyValueIterator all(int kafkaPartition, long streamTimeMs); /** * An approximate count of the total number of entries across all sub-partitions diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java index 7c3371fc4..038f17e1d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/inmemory/InMemoryKVTable.java @@ -39,13 +39,13 @@ public KVFlushManager init(int kafkaPartition) { } @Override - public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) { + public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) { checkKafkaPartition(kafkaPartition); final var value = store.get(key); if (value == null) { return null; } - if (value.epochMillis() < minValidTs) { + if (value.epochMillis() < streamTimeMs) { return null; } return value.value(); @@ -56,7 +56,7 @@ public KeyValueIterator range( final int kafkaPartition, final Bytes from, final Bytes to, - final long minValidTs + final long streamTimeMs ) { checkKafkaPartition(kafkaPartition); final var iter = store @@ -64,14 +64,14 @@ public KeyValueIterator range( .headMap(to, true) .entrySet() .iterator(); - return iteratorWithTimeFilter(iter, minValidTs); + return iteratorWithTimeFilter(iter, streamTimeMs); } @Override - public KeyValueIterator all(final int kafkaPartition, final long minValidTs) { + public KeyValueIterator all(final int kafkaPartition, final long streamTimeMs) { checkKafkaPartition(kafkaPartition); final var iter = store.entrySet().iterator(); - return iteratorWithTimeFilter(iter, minValidTs); + return iteratorWithTimeFilter(iter, streamTimeMs); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/KVDoc.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/KVDoc.java index 82a6958f6..05510e261 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/KVDoc.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/KVDoc.java @@ -29,6 +29,7 @@ public class KVDoc { public static final String VALUE = "value"; public static final String EPOCH = "epoch"; public static final String TIMESTAMP = "ts"; + public static final String TTL_TIMESTAMP = "ttlTs"; public static final String KAFKA_PARTITION = "partition"; public static final String TOMBSTONE_TS = "tombstoneTs"; @@ -39,6 +40,7 @@ public class KVDoc { byte[] value; long epoch; long timestamp; + long ttlTimestamp; int kafkaPartition; Date tombstoneTs; @@ -51,12 +53,14 @@ public KVDoc( @BsonProperty(VALUE) byte[] value, @BsonProperty(EPOCH) long epoch, @BsonProperty(TIMESTAMP) long timestamp, + @BsonProperty(TTL_TIMESTAMP) long ttlTimestamp, @BsonProperty(KAFKA_PARTITION) int kafkaPartition ) { this.id = id; this.value = value; this.epoch = epoch; this.timestamp = timestamp; + this.ttlTimestamp = ttlTimestamp; this.kafkaPartition = kafkaPartition; } @@ -80,6 +84,10 @@ public void setTimestamp(final long timestamp) { this.timestamp = timestamp; } + public void setTtlTimestamp(final long ttlTimestamp) { + this.ttlTimestamp = ttlTimestamp; + } + public byte[] getValue() { return value; } @@ -92,6 +100,10 @@ public long getTimestamp() { return timestamp; } + public long getTtlTimestamp() { + return ttlTimestamp; + } + public int getKafkaPartition() { return kafkaPartition; } @@ -121,13 +133,14 @@ public boolean equals(final Object o) { && Objects.equals(id, kvDoc.id) && Arrays.equals(value, kvDoc.value) && Objects.equals(timestamp, kvDoc.timestamp) + && Objects.equals(ttlTimestamp, kvDoc.ttlTimestamp) && Objects.equals(kafkaPartition, kvDoc.kafkaPartition) && Objects.equals(tombstoneTs, kvDoc.tombstoneTs); } @Override public int hashCode() { - int result = Objects.hash(id, epoch, tombstoneTs, timestamp, kafkaPartition); + int result = Objects.hash(id, epoch, tombstoneTs, timestamp, ttlTimestamp, kafkaPartition); result = 31 * result + Arrays.hashCode(value); return result; } @@ -140,6 +153,7 @@ public String toString() { + ", epoch=" + epoch + ", tombstoneTs=" + tombstoneTs + ", timestamp=" + timestamp + + ", ttlTimestamp=" + ttlTimestamp + ", kafkaPartition=" + kafkaPartition + '}'; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java index a0245ae12..2b757e759 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java @@ -35,11 +35,11 @@ import com.mongodb.client.model.Updates; import com.mongodb.client.model.WriteModel; import com.mongodb.client.result.UpdateResult; +import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration; import dev.responsive.kafka.internal.db.MongoKVFlushManager; import dev.responsive.kafka.internal.db.RemoteKVTable; import dev.responsive.kafka.internal.stores.TtlResolver; import dev.responsive.kafka.internal.utils.Iterators; -import java.time.Duration; import java.time.Instant; import java.util.Date; import java.util.Optional; @@ -52,6 +52,7 @@ import org.bson.codecs.configuration.CodecProvider; import org.bson.codecs.configuration.CodecRegistry; import org.bson.codecs.pojo.PojoCodecProvider; +import org.bson.conversions.Bson; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +64,8 @@ public class MongoKVTable implements RemoteKVTable> { private final String name; private final KeyCodec keyCodec; + private final Optional> ttlResolver; + private final long defaultTtlSeconds; private final MongoCollection docs; private final MongoCollection metadata; @@ -76,6 +79,7 @@ public MongoKVTable( ) { this.name = name; this.keyCodec = new StringKeyCodec(); + this.ttlResolver = ttlResolver; final CodecProvider pojoCodecProvider = PojoCodecProvider.builder().automatic(true).build(); final CodecRegistry pojoCodecRegistry = fromRegistries( getDefaultCodecRegistry(), @@ -105,14 +109,19 @@ public MongoKVTable( ); if (ttlResolver.isPresent()) { - // TODO(sophie): account for infinite default ttl - final Duration expireAfter = - ttlResolver.get().defaultTtl().duration().plus(Duration.ofHours(12)); - final long expireAfterSeconds = expireAfter.getSeconds(); + // if the default ttl is infinite we still have to define the ttl index for + // the table since the ttlProvider may apply row-level overrides. To approximate + // an "infinite" default retention, we just set the default ttl to the maximum value + defaultTtlSeconds = ttlResolver.get().defaultTtl().isFinite() + ? ttlResolver.get().defaultTtl().toSeconds() + : Long.MAX_VALUE; + docs.createIndex( - Indexes.descending(KVDoc.TIMESTAMP), - new IndexOptions().expireAfter(expireAfterSeconds, TimeUnit.SECONDS) + Indexes.descending(KVDoc.TTL_TIMESTAMP), + new IndexOptions().expireAfter(defaultTtlSeconds, TimeUnit.SECONDS) ); + } else { + defaultTtlSeconds = 0L; } } @@ -147,12 +156,52 @@ public MongoKVFlushManager init(final int kafkaPartition) { } @Override - public byte[] get(final int kafkaPartition, final Bytes key, final long minValidTs) { - final KVDoc v = docs.find(Filters.and( - Filters.eq(KVDoc.ID, keyCodec.encode(key)), - Filters.gte(KVDoc.TIMESTAMP, minValidTs) - )).first(); - return v == null ? null : v.getValue(); + public byte[] get(final int kafkaPartition, final Bytes key, final long streamTimeMs) { + + // Need to post-filter if value is needed to compute ttl + if (ttlResolver.isPresent() && ttlResolver.get().needsValueToComputeTtl()) { + final KVDoc v = docs.find(Filters.and( + Filters.eq(KVDoc.ID, keyCodec.encode(key)) + )).first(); + + if (v == null) { + return null; + } + + final byte[] value = v.getValue(); + final TtlDuration ttl = ttlResolver.get().resolveTtl(key, value); + + if (ttl.isFinite()) { + final long minValidTsFromValue = streamTimeMs - ttl.toMillis(); + final long recordTs = v.getTimestamp(); + if (recordTs < minValidTsFromValue) { + return null; + } + } + + return value; + + } else { + // If ttl is default-only or key-based and computed ttl is finite, we can pre-filter + if (ttlResolver.isPresent()) { + + final TtlDuration ttl = ttlResolver.get().resolveTtl(key, null); + if (ttl.isFinite()) { + final long minValidTs = streamTimeMs - ttl.toMillis(); + final KVDoc v = docs.find(Filters.and( + Filters.eq(KVDoc.ID, keyCodec.encode(key)), + Filters.gte(KVDoc.TIMESTAMP, minValidTs) + )).first(); + return v == null ? null : v.getValue(); + } + } + + // If ttl is not used or infinite for this row, no filter is needed + final KVDoc v = docs.find(Filters.and( + Filters.eq(KVDoc.ID, keyCodec.encode(key)) + )).first(); + return v == null ? null : v.getValue(); + } } @Override @@ -160,8 +209,10 @@ public KeyValueIterator range( final int kafkaPartition, final Bytes from, final Bytes to, - final long minValidTs + final long streamTimeMs ) { + // TODO(sophie): filter by minValidTs if based on key or default only + final long minValidTs = 0L; final FindIterable result = docs.find( Filters.and( Filters.gte(KVDoc.ID, keyCodec.encode(from)), @@ -171,6 +222,8 @@ public KeyValueIterator range( Filters.eq(KVDoc.KAFKA_PARTITION, kafkaPartition) ) ); + // TODO(sophie): filter by minValidTs if based on value + return Iterators.kv( result.iterator(), doc -> new KeyValue<>( @@ -180,10 +233,10 @@ public KeyValueIterator range( } @Override - public KeyValueIterator all(final int kafkaPartition, final long minValidTs) { + public KeyValueIterator all(final int kafkaPartition, final long streamTimeMs) { final FindIterable result = docs.find(Filters.and( Filters.not(Filters.exists(KVDoc.TOMBSTONE_TS)), - Filters.gte(KVDoc.TIMESTAMP, minValidTs), + Filters.gte(KVDoc.TIMESTAMP, streamTimeMs), Filters.eq(KVDoc.KAFKA_PARTITION, kafkaPartition) )); return Iterators.kv( @@ -203,18 +256,47 @@ public WriteModel insert( final long epochMillis ) { final long epoch = kafkaPartitionToEpoch.get(kafkaPartition); + + Bson update = Updates.combine( + Updates.set(KVDoc.VALUE, value), + Updates.set(KVDoc.EPOCH, epoch), + Updates.set(KVDoc.TIMESTAMP, epochMillis), + Updates.set(KVDoc.KAFKA_PARTITION, kafkaPartition), + Updates.unset(KVDoc.TOMBSTONE_TS)); + + if (ttlResolver.isPresent()) { + final Optional rowTtl = ttlResolver.get().computeTtl(key, value); + + final long ttlTimestamp; + if (rowTtl.isPresent()) { + final var rowTtlDuration = rowTtl.get(); + if (rowTtlDuration.isFinite()) { + // Mongo does not actually support row-level ttl so we have to "trick" it by building + // the ttl index from a special "ttlTimestamp" field rather than the true record + // timestamp, then adjusting the timestamp by the difference between row and default ttl. + // This effectively makes these records appear older or younger than they really are + // so that they're retained according to the row-level ttl override + final long ttlTsAdjustment = rowTtlDuration.toSeconds() - defaultTtlSeconds; + ttlTimestamp = TimeUnit.MILLISECONDS.toSeconds(epochMillis) + ttlTsAdjustment; + } else { + // approximate row-level "infinite" ttl by setting ttlTimestamp to largest possible value + ttlTimestamp = Long.MAX_VALUE; + } + } else { + // to apply the default ttl we just use the current time for ttlTimestamp + ttlTimestamp = TimeUnit.MILLISECONDS.toSeconds(epochMillis); + } + update = Updates.combine( + update, + Updates.set(KVDoc.TTL_TIMESTAMP, ttlTimestamp)); + } + return new UpdateOneModel<>( Filters.and( Filters.eq(KVDoc.ID, keyCodec.encode(key)), Filters.lte(KVDoc.EPOCH, epoch) ), - Updates.combine( - Updates.set(KVDoc.VALUE, value), - Updates.set(KVDoc.EPOCH, epoch), - Updates.set(KVDoc.TIMESTAMP, epochMillis), - Updates.set(KVDoc.KAFKA_PARTITION, kafkaPartition), - Updates.unset(KVDoc.TOMBSTONE_TS) - ), + update, new UpdateOptions().upsert(true) ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java index 4236f4986..1a8a85482 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java @@ -18,6 +18,7 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.model.WriteModel; +import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.internal.db.RemoteKVTable; import dev.responsive.kafka.internal.db.RemoteSessionTable; import dev.responsive.kafka.internal.db.RemoteWindowedTable; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index fa7f5c754..88954b4ce 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -316,7 +316,7 @@ public byte[] get(final Bytes key) { return table.get( changelog.partition(), key, - minValidTimestamp() + currentRecordTimestamp() ); } @@ -336,7 +336,7 @@ public KeyValueIterator range(final Bytes from, final Bytes to) { return new LocalRemoteKvIterator<>( buffer.range(from, to), - table.range(changelog.partition(), from, to, minValidTimestamp()) + table.range(changelog.partition(), from, to, currentRecordTimestamp()) ); } @@ -350,7 +350,7 @@ public KeyValueIterator reverseRange(final Bytes from, final Byte public KeyValueIterator all() { return new LocalRemoteKvIterator<>( buffer.all(), - table.all(changelog.partition(), minValidTimestamp()) + table.all(changelog.partition(), currentRecordTimestamp()) ); } @@ -387,15 +387,6 @@ private long currentRecordTimestamp() { return context.timestamp(); } - private long minValidTimestamp() { - if (params.ttlProvider().isEmpty()) { - return -1L; - } - - final long ttlMs = params.defaultTimeToLive().get().toMillis(); - return currentRecordTimestamp() - ttlMs; - } - private boolean migratingAndTimestampTooEarly() { if (!migrationMode) { return false; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java new file mode 100644 index 000000000..d6974c1af --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/RowLevelTtlIntegrationTest.java @@ -0,0 +1,220 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.integration; + +import static dev.responsive.kafka.api.config.ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.createTopicsAndWait; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeInput; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.readOutput; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItems; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.config.StorageBackend; +import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; +import dev.responsive.kafka.api.stores.ResponsiveStores; +import dev.responsive.kafka.api.stores.TtlProvider; +import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration; +import dev.responsive.kafka.testutils.ResponsiveConfigParam; +import dev.responsive.kafka.testutils.ResponsiveExtension; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class RowLevelTtlIntegrationTest { + + @RegisterExtension + static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA); + + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + private static final String STORE_NAME = "store"; + + private static final Duration DEFAULT_TTL = Duration.ofSeconds(5); + + private final Map responsiveProps = new HashMap<>(); + + private String name; + private Admin admin; + + @BeforeEach + public void before( + final TestInfo info, + final Admin admin, + @ResponsiveConfigParam final Map responsiveProps + ) { + // add displayName to name to account for parameterized tests + name = info.getDisplayName().replace("()", ""); + + this.responsiveProps.putAll(responsiveProps); + + this.admin = admin; + createTopicsAndWait(admin, Map.of(inputTopic(), 2, outputTopic(), 1)); + } + + @AfterEach + public void after() { + admin.deleteTopics(List.of(inputTopic(), outputTopic())); + } + + private String inputTopic() { + return name + "." + INPUT_TOPIC; + } + + private String outputTopic() { + return name + "." + OUTPUT_TOPIC; + } + + @Test + public void test() throws Exception { + // Given: + final Map properties = getMutableProperties(); + final KafkaProducer producer = new KafkaProducer<>(properties); + try (final ResponsiveKafkaStreams streams = buildStreams(properties)) { + startAppAndAwaitRunning(Duration.ofSeconds(10), streams); + for (int i = 0; i < 10000; i++) { + pipeInput(inputTopic(), 2, producer, System::currentTimeMillis, 0, 10, 0, 1); + Thread.sleep(100); + } + + final var kvs = readOutput(outputTopic(), 0, 20, true, properties); + assertThat( + kvs, + hasItems(new KeyValue<>(0L, 10L), new KeyValue<>(1L, 10L))); + + } + } + + private ResponsiveKafkaStreams buildStreams(final Map properties) { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream input = builder.stream(inputTopic()); + input.process(new TtlProcessorSupplier(), STORE_NAME) + .to(outputTopic()); + + return new ResponsiveKafkaStreams(builder.build(), properties); + } + + @SuppressWarnings("checkstyle:linelength") + private static class TtlProcessorSupplier implements ProcessorSupplier { + + @Override + public Processor get() { + return new TtlProcessor(); + } + + @Override + public Set> stores() { + return Collections.singleton(ResponsiveStores.keyValueStoreBuilder( + ResponsiveStores.keyValueStore( + ResponsiveKeyValueParams.fact(STORE_NAME).withTtlProvider( + TtlProvider.withDefault(DEFAULT_TTL) + .fromKey(RowLevelTtlIntegrationTest::ttlForKey, Serdes.String()) + )), + Serdes.String(), + Serdes.String() + )); + } + } + + private static Optional ttlForKey(final String key) { + return Optional.empty(); //TODO(sophie): fill me out + } + + private static class TtlProcessor implements Processor { + + private ProcessorContext context; + private KeyValueStore ttlStore; + + @Override + public void init(final ProcessorContext context) { + this.context = context; + this.ttlStore = context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Record record) { + // TODO(sophie): implement processor logic that's based on ttl + } + } + + private Map getMutableProperties() { + final Map properties = new HashMap<>(responsiveProps); + + properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + properties.put(APPLICATION_ID_CONFIG, name); + properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + properties.put(NUM_STREAM_THREADS_CONFIG, 1); + properties.put(STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + properties.put(STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1); + properties.put(COMMIT_INTERVAL_MS_CONFIG, 1); + + properties.put( + ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG, "maxIdleTimeMs=60000" + ); + + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + properties.put(consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + properties.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + + return properties; + } + +} + diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/AbstractCassandraKVTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/AbstractCassandraKVTableIntegrationTest.java new file mode 100644 index 000000000..2d0c02080 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/AbstractCassandraKVTableIntegrationTest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.internal.db; + +import com.datastax.oss.driver.api.core.CqlSession; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; +import dev.responsive.kafka.testutils.ResponsiveConfigParam; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.containers.CassandraContainer; + +public class AbstractCassandraKVTableIntegrationTest { + + private String storeName; // ie the "kafkaName", NOT the "cassandraName" + private ResponsiveKeyValueParams params; + private CassandraClient client; + private CqlSession session; + + @BeforeEach + public void before( + final TestInfo info, + @ResponsiveConfigParam final Map responsiveProps, + final CassandraContainer cassandra + ) { + String name = info.getTestMethod().orElseThrow().getName(); + storeName = name + "--store"; + + session = CqlSession.builder() + .addContactPoint(cassandra.getContactPoint()) + .withLocalDatacenter(cassandra.getLocalDatacenter()) + .withKeyspace("responsive_itests") // NOTE: this keyspace is expected to exist + .build(); + client = new CassandraClient(session, ResponsiveConfig.responsiveConfig(responsiveProps)); + } + +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java index fc21033a6..0f83a3947 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java @@ -18,6 +18,8 @@ import static dev.responsive.kafka.internal.db.partitioning.TablePartitioner.defaultPartitioner; import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.serializedKey; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.serializedValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsStringIgnoringCase; @@ -29,12 +31,16 @@ import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.TtlProvider; +import dev.responsive.kafka.api.stores.TtlProvider.TtlDuration; import dev.responsive.kafka.internal.stores.TtlResolver; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,9 +51,6 @@ @ExtendWith(ResponsiveExtension.class) class CassandraFactTableIntegrationTest { - private static final long CURRENT_TIME = 100L; - private static final long MIN_VALID_TS = 0L; - private String storeName; // ie the "kafkaName", NOT the "cassandraName" private ResponsiveKeyValueParams params; private CassandraClient client; @@ -60,7 +63,7 @@ public void before( final CassandraContainer cassandra ) { String name = info.getTestMethod().orElseThrow().getName(); - storeName = name + "store"; + storeName = name + "-store"; session = CqlSession.builder() .addContactPoint(cassandra.getContactPoint()) @@ -80,7 +83,7 @@ public void shouldInitializeWithCorrectMetadata() throws Exception { .create(RemoteTableSpecFactory.fromKVParams(params, defaultPartitioner(), NO_TTL)); // When: - final var token = schema.init(1); + final var ignored = schema.init(1); schema.init(2); client.execute(schema.setOffset(2, 10)); final long offset1 = schema.fetchOffset(1); @@ -111,12 +114,13 @@ public void shouldInsertAndDelete() throws Exception { final Bytes key = Bytes.wrap(new byte[]{0}); final byte[] val = new byte[]{1}; + final long minValidTs = 0L; // When: - client.execute(table.insert(1, key, val, CURRENT_TIME)); - final byte[] valAt0 = table.get(1, key, MIN_VALID_TS); + client.execute(table.insert(1, key, val, 0L)); + final byte[] valAt0 = table.get(1, key, minValidTs); client.execute(table.delete(1, key)); - final byte[] valAt1 = table.get(1, key, MIN_VALID_TS); + final byte[] valAt1 = table.get(1, key, minValidTs); // Then assertThat(valAt0, is(val)); @@ -150,4 +154,221 @@ public void shouldRespectSemanticDefaultOnlyTtl() throws Exception { assertThat(describe, containsString("default_time_to_live = " + (int) defaultTtl.toSeconds())); } + @Test + public void shouldRespectSemanticKeyBasedTtl() throws Exception { + // Given: + final var defaultTtl = Duration.ofMinutes(30); + final Function> ttlForKey = k -> { + if (k.equals("KEEP_FOREVER")) { + return Optional.of(TtlDuration.noTtl()); + } else if (k.endsWith("DEFAULT_TTL")) { + return Optional.empty(); + } else { + final long ttlMinutes = Long.parseLong(k); + return Optional.of(TtlDuration.of(Duration.ofMinutes(ttlMinutes))); + } + }; + final TtlProvider ttlProvider = TtlProvider + .withDefault(defaultTtl) + .fromKey(ttlForKey, Serdes.String()); + params = ResponsiveKeyValueParams.fact(storeName).withTtlProvider(ttlProvider); + + final var table = client.factFactory().create(RemoteTableSpecFactory.fromKVParams( + params, + defaultPartitioner(), + Optional.of(new TtlResolver<>(false, "changelog-ignored", ttlProvider)) + )); + + table.init(1); + + final Bytes noTtlKey = serializedKey("KEEP_FOREVER"); + final Bytes defaultTtlKey = serializedKey("DEFAULT_TTL"); // default is 30min + final Bytes tenMinTtlKey = serializedKey(String.valueOf(10L)); + final Bytes fiftyMinTtlKey = serializedKey(String.valueOf(50L)); + + final byte[] val = new byte[]{1}; + + // When: + final long insertTimeMs = 0L; + client.execute(table.insert(1, noTtlKey, val, insertTimeMs)); + client.execute(table.insert(1, defaultTtlKey, val, insertTimeMs)); + client.execute(table.insert(1, tenMinTtlKey, val, insertTimeMs)); + client.execute(table.insert(1, fiftyMinTtlKey, val, insertTimeMs)); + + // Then + long lookupTime = Duration.ofMinutes(11).toMillis(); + assertThat(table.get(1, noTtlKey, lookupTime), is(val)); + assertThat(table.get(1, defaultTtlKey, lookupTime), is(val)); + assertThat(table.get(1, tenMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, fiftyMinTtlKey, lookupTime), is(val)); + + lookupTime = Duration.ofMinutes(31).toMillis(); + assertThat(table.get(1, noTtlKey, lookupTime), is(val)); + assertThat(table.get(1, defaultTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, tenMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, fiftyMinTtlKey, lookupTime), is(val)); + + lookupTime = Duration.ofMinutes(51).toMillis(); + assertThat(table.get(1, noTtlKey, lookupTime), is(val)); + assertThat(table.get(1, defaultTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, tenMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, fiftyMinTtlKey, lookupTime), nullValue()); // expired + } + + @Test + public void shouldRespectSemanticKeyValueBasedTtl() throws Exception { + // Given: + final var defaultTtl = Duration.ofMinutes(30); + final BiFunction> ttlForKeyAndValue = (k, v) -> { + if (k.equals("10_MINUTE_RETENTION")) { + return Optional.of(TtlDuration.of(Duration.ofMinutes(10))); + } else { + if (v.equals("DEFAULT")) { + return Optional.empty(); + } else if (v.equals("NO_TTL")) { + return Optional.of(TtlDuration.noTtl()); + } else { + return Optional.of(TtlDuration.of(Duration.ofMinutes(Long.parseLong(v)))); + } + } + }; + final TtlProvider ttlProvider = TtlProvider + .withDefault(defaultTtl) + .fromKeyAndValue(ttlForKeyAndValue, Serdes.String(), Serdes.String()); + params = ResponsiveKeyValueParams.fact(storeName).withTtlProvider(ttlProvider); + + + final var table = client.factFactory().create(RemoteTableSpecFactory.fromKVParams( + params, + defaultPartitioner(), + Optional.of(new TtlResolver<>(false, "changelog-ignored", ttlProvider)) + )); + + table.init(1); + + final Bytes tenMinTtlKey = serializedKey("10_MINUTE_RETENTION"); + final Bytes defaultTtlKey = serializedKey("DEFAULT_30_MIN_RETENTION"); + final Bytes noTtlKey = serializedKey("NO_TTL"); + final Bytes twoMinTtlKey = serializedKey("2_MINUTE_RETENTION"); + final Bytes fiftyMinTtlKey = serializedKey("50_MINUTE_RETENTION"); + + final byte[] defaultTtlValue = serializedValue("DEFAULT"); // default is 30min + final byte[] noTtlValue = serializedValue("NO_TTL"); + final byte[] twoMinTtlValue = serializedValue(String.valueOf(2)); + final byte[] fiftyMinTtlValue = serializedValue(String.valueOf(50)); + + final byte[] val = new byte[]{1}; + + // When + long insertTimeMs = 0L; + client.execute(table.insert(1, tenMinTtlKey, val, insertTimeMs)); + client.execute(table.insert(1, defaultTtlKey, defaultTtlValue, insertTimeMs)); + client.execute(table.insert(1, noTtlKey, noTtlValue, insertTimeMs)); + client.execute(table.insert(1, twoMinTtlKey, twoMinTtlValue, insertTimeMs)); + client.execute(table.insert(1, fiftyMinTtlKey, fiftyMinTtlValue, insertTimeMs)); + + // Then: + long lookupTime = Duration.ofMinutes(3).toMillis(); + assertThat(table.get(1, tenMinTtlKey, lookupTime), is(val)); + assertThat(table.get(1, defaultTtlKey, lookupTime), is(defaultTtlValue)); + assertThat(table.get(1, noTtlKey, lookupTime), is(noTtlValue)); + assertThat(table.get(1, twoMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, fiftyMinTtlKey, lookupTime), is(fiftyMinTtlValue)); + + lookupTime = Duration.ofMinutes(11).toMillis(); + assertThat(table.get(1, tenMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, defaultTtlKey, lookupTime), is(defaultTtlValue)); + assertThat(table.get(1, noTtlKey, lookupTime), is(noTtlValue)); + assertThat(table.get(1, twoMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, fiftyMinTtlKey, lookupTime), is(fiftyMinTtlValue)); + + lookupTime = Duration.ofMinutes(31).toMillis(); + assertThat(table.get(1, tenMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, defaultTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, noTtlKey, lookupTime), is(noTtlValue)); + assertThat(table.get(1, twoMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, fiftyMinTtlKey, lookupTime), is(fiftyMinTtlValue)); + + lookupTime = Duration.ofMinutes(51).toMillis(); + assertThat(table.get(1, tenMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, defaultTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, noTtlKey, lookupTime), is(noTtlValue)); + assertThat(table.get(1, twoMinTtlKey, lookupTime), nullValue()); // expired + assertThat(table.get(1, fiftyMinTtlKey, lookupTime), nullValue()); // expired + } + + @Test + public void shouldRespectOverridesWithValueBasedTtl() throws Exception { + // Given: + final var defaultTtl = Duration.ofMinutes(30); + final Function> ttlForValue = v -> { + if (v.equals("DEFAULT")) { + return Optional.empty(); + } else if (v.equals("NO_TTL")) { + return Optional.of(TtlDuration.noTtl()); + } else { + return Optional.of(TtlDuration.of(Duration.ofMinutes(Long.parseLong(v)))); + } + }; + final TtlProvider ttlProvider = TtlProvider + .withDefault(defaultTtl) + .fromValue(ttlForValue, Serdes.String()); + params = ResponsiveKeyValueParams.fact(storeName).withTtlProvider(ttlProvider); + + + final var table = client.factFactory().create(RemoteTableSpecFactory.fromKVParams( + params, + defaultPartitioner(), + Optional.of(new TtlResolver<>(false, "changelog-ignored", ttlProvider)) + )); + + table.init(1); + + + final Bytes key = serializedKey("ignored"); + + final byte[] defaultTtlValue = serializedValue("DEFAULT"); // default is 30min + final byte[] noTtlValue = serializedValue("NO_TTL"); + final byte[] threeMinTtlValue = serializedValue(String.valueOf(3)); + final byte[] tenMinTtlValue = serializedValue(String.valueOf(10)); + + // When + long currentTime = 0L; + // first record set to expire at 3min + client.execute(table.insert(1, key, threeMinTtlValue, currentTime)); + + // Then: + currentTime = Duration.ofMinutes(4).toMillis(); + assertThat(table.get(1, key, currentTime), nullValue()); // expired + + // insert new record with 3min ttl -- now set to expire at 10min + currentTime = Duration.ofMinutes(7).toMillis(); + client.execute(table.insert(1, key, threeMinTtlValue, currentTime)); + + // override with 10min ttl -- now set to expire at 18min + currentTime = Duration.ofMinutes(8).toMillis(); + client.execute(table.insert(1, key, tenMinTtlValue, currentTime)); + + // record should still exist after 10min + currentTime = Duration.ofMinutes(11).toMillis(); + assertThat(table.get(1, key, currentTime), is(tenMinTtlValue)); + + // override with default ttl (30min) -- now set to expire at 45min + currentTime = Duration.ofMinutes(15).toMillis(); + client.execute(table.insert(1, key, defaultTtlValue, currentTime)); + + // record should still exist after 18min + currentTime = Duration.ofMinutes(20).toMillis(); + assertThat(table.get(1, key, currentTime), is(defaultTtlValue)); + + // override with no ttl -- now set to never expire + currentTime = Duration.ofMinutes(30).toMillis(); + client.execute(table.insert(1, key, noTtlValue, currentTime)); + + // record should still exist after 45min + currentTime = Duration.ofMinutes(50).toMillis(); + assertThat(table.get(1, key, currentTime), is(noTtlValue)); + } + + } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java index 9825454ad..680d7fda8 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java @@ -32,6 +32,7 @@ import com.datastax.oss.driver.internal.core.cql.DefaultBatchStatement; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.BeforeEach; @@ -141,7 +142,8 @@ public void shouldIssueSecondBatchEvenIfFirstWasNotApplied() { private static class TestRemoteTable extends CassandraKeyValueTable { public TestRemoteTable(final String tableName, final CassandraClient client) { - super(tableName, client, null, null, null, null, null, null, null, null, null, null, null); + super(tableName, client, null, Optional.empty(), + null, null, null, null, null, null, null, null, null, null, null, null); } @Override diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index a02d5680e..984cd307c 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -188,6 +188,7 @@ public void before( sessionClients.initialize(responsiveMetrics, null); table = (CassandraKeyValueTable) client.kvFactory().create( new DefaultTableSpec(name, partitioner, NO_TTL)); + changelog = new TopicPartition(name + "-changelog", KAFKA_PARTITION); when(admin.deleteRecords(Mockito.any())).thenReturn(new DeleteRecordsResult(Map.of( diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index cbae361e3..6fc085464 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -51,19 +51,27 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde; import org.junit.jupiter.api.TestInfo; public final class IntegrationTestUtils { + private static final Serde STRING_SERDE = Serdes.String(); + private static final Serde> VALUE_AND_TIMESTAMP_STRING_SERDE = + new ValueAndTimestampSerde<>(STRING_SERDE); + /** * Simple override that allows plugging in a custom CassandraClientFactory * to mock or verify this connection in tests @@ -114,6 +122,23 @@ public static ResponsiveConfig dummyConfig(final Map overrides) { return ResponsiveConfig.responsiveConfig(props); } + public static byte[] serialize(final D data, final Serde serde) { + return serde.serializer().serialize("ignored", data); + } + + public static Bytes serializedKey(final String key) { + return Bytes.wrap(serialize(key, STRING_SERDE)); + } + + public static byte[] serializedValue(final String value) { + return serialize(value, STRING_SERDE); + } + + public static byte[] serializedValueAndTimestamp(final String value, final long timestamp) { + final ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make(value, timestamp); + return serialize(valueAndTimestamp, VALUE_AND_TIMESTAMP_STRING_SERDE); + } + public static long minutesToMillis(final long minutes) { return minutes * 60 * 1000L; } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java index 86dca7148..be913f34a 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDKeyValueTable.java @@ -94,7 +94,7 @@ public BoundStatement delete(final int kafkaPartition, final Bytes key) { } @Override - public byte[] get(final int kafkaPartition, final Bytes key, long minValidTs) { + public byte[] get(final int kafkaPartition, final Bytes key, long streamTimeMs) { return stub.get(key); } @@ -103,7 +103,7 @@ public KeyValueIterator range( final int kafkaPartition, Bytes from, final Bytes to, - long minValidTs + long streamTimeMs ) { return stub.range(from, to); } @@ -111,7 +111,7 @@ public KeyValueIterator range( @Override public KeyValueIterator all( final int kafkaPartition, - long minValidTs + long streamTimeMs ) { return stub.all(); }