diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java index 513d2ddd1..48bab920f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java @@ -47,7 +47,7 @@ public KeyValueStore get() { if (isTimestamped) { return new ResponsiveTimestampedKeyValueStore(params); } else { - return new ResponsiveKeyValueStore(params, isTimestamped); + return new ResponsiveKeyValueStore(params, false); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java index 2d481a0f7..2e58bcb0d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java @@ -110,6 +110,9 @@ public TtlProvider fromValue( valueSerde); } + /** + * @return the same TtlProvider with a key-and-value-based override function + */ public TtlProvider fromKeyAndValue( final BiFunction> computeTtlFromKeyAndValue, final Serde keySerde, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java index d3b303730..4f61bfd2f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java @@ -40,6 +40,7 @@ import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions; import dev.responsive.kafka.internal.db.partitioning.SubPartitioner; import dev.responsive.kafka.internal.db.spec.RemoteTableSpec; +import dev.responsive.kafka.internal.stores.TtlResolver; import dev.responsive.kafka.internal.utils.Iterators; import java.nio.ByteBuffer; import java.time.Duration; @@ -47,6 +48,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeoutException; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -81,8 +83,9 @@ public static CassandraKeyValueTable create( final CassandraClient client ) throws InterruptedException, TimeoutException { final String name = spec.tableName(); + final var ttlResolver = spec.ttlResolver(); LOG.info("Creating data table {} in remote store.", name); - client.execute(spec.applyDefaultOptions(createTable(name)).build()); + client.execute(spec.applyDefaultOptions(createTable(name, ttlResolver)).build()); client.awaitTable(name).await(Duration.ofSeconds(60)); @@ -224,8 +227,11 @@ public static CassandraKeyValueTable create( ); } - private static CreateTableWithOptions createTable(final String tableName) { - return SchemaBuilder + private static CreateTableWithOptions createTable( + final String tableName, + final Optional> ttlResolver + ) { + final var baseOptions = SchemaBuilder .createTable(tableName) .ifNotExists() .withPartitionKey(PARTITION_KEY.column(), DataTypes.INT) @@ -235,6 +241,13 @@ private static CreateTableWithOptions createTable(final String tableName) { .withColumn(OFFSET.column(), DataTypes.BIGINT) .withColumn(EPOCH.column(), DataTypes.BIGINT) .withColumn(TIMESTAMP.column(), DataTypes.TIMESTAMP); + + if (ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()) { + final int defaultTtlSeconds = (int) ttlResolver.get().defaultTtl().toSeconds(); + return baseOptions.withDefaultTimeToLiveSeconds(defaultTtlSeconds); + } else { + return baseOptions; + } } // Visible for Testing diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java index fc21033a6..04e959b56 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java @@ -125,7 +125,7 @@ public void shouldInsertAndDelete() throws Exception { @SuppressWarnings("OptionalGetWithoutIsPresent") @Test - public void shouldRespectSemanticDefaultOnlyTtl() throws Exception { + public void shouldConfigureDefaultTtl() throws Exception { // Given: final var defaultTtl = Duration.ofMinutes(30); final var ttlProvider = TtlProvider.withDefault(defaultTtl); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java index 88f54a634..629f4df4e 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java @@ -20,6 +20,7 @@ import static dev.responsive.kafka.testutils.IntegrationTestUtils.copyConfigWithOverrides; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -59,6 +60,7 @@ public class CassandraKVTableIntegrationTest { private static final int NUM_KAFKA_PARTITIONS = NUM_SUBPARTITIONS_TOTAL / 4; private CassandraClient client; + private CqlSession session; private String storeName; // ie the "kafkaName", NOT the "cassandraName" private ResponsiveConfig config; @@ -68,7 +70,7 @@ public void before( final CassandraContainer cassandra, @ResponsiveConfigParam final ResponsiveConfig config ) throws InterruptedException, TimeoutException { - final CqlSession session = CqlSession.builder() + this.session = CqlSession.builder() .addContactPoint(cassandra.getContactPoint()) .withLocalDatacenter(cassandra.getLocalDatacenter()) .withKeyspace("responsive_itests") // NOTE: this keyspace is expected to exist @@ -196,8 +198,32 @@ public void shouldReturnRangeKeysInLexicalOrderAcrossMultipleSubPartitions() { } } + @SuppressWarnings("OptionalGetWithoutIsPresent") @Test - public void shouldRespectSemanticTtlForLookups() { + public void shouldRespectSemanticDefaultOnlyTtl() { + // Given: + final var defaultTtl = Duration.ofMinutes(30); + final var ttlProvider = TtlProvider.withDefault(defaultTtl); + final ResponsiveKeyValueParams params = + ResponsiveKeyValueParams.keyValue(storeName).withTtlProvider(ttlProvider); + final String tableName = params.name().tableName(); + + // When: + createTableFromParams(params); + + // Then: + final var table = session.getMetadata() + .getKeyspace(session.getKeyspace().get()) + .get() + .getTable(tableName) + .get(); + final String describe = table.describe(false); + + assertThat(describe, containsString("default_time_to_live = " + (int) defaultTtl.toSeconds())); + } + + @Test + public void shouldRespectSemanticDefaultOnlyTtlForLookups() { // Given: final TtlProvider ttlProvider = TtlProvider.withDefault(Duration.ofMillis(100)); final RemoteKVTable table = createTable(ttlProvider);