From 853518be30c14faa61e07ac189575503e070d79f Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 24 Oct 2024 19:56:57 -0700 Subject: [PATCH] Row-level TTL PR 2.5: fix default ttl for CassandraKeyValueTable (#372) Quick followup to PR #2 (#371) which switched the method of setting a ttl from the old Spec layers to the new TtlResolver. Carried over the ttl to the CassandraFactTable but forgot to apply it for the CassandraKeyValueTable, which is done in this PR --- .../ResponsiveKeyValueBytesStoreSupplier.java | 2 +- .../kafka/api/stores/TtlProvider.java | 3 ++ .../internal/db/CassandraKeyValueTable.java | 19 ++++++++++-- .../db/CassandraFactTableIntegrationTest.java | 2 +- .../db/CassandraKVTableIntegrationTest.java | 30 +++++++++++++++++-- 5 files changed, 49 insertions(+), 7 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java index 513d2ddd1..48bab920f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java @@ -47,7 +47,7 @@ public KeyValueStore get() { if (isTimestamped) { return new ResponsiveTimestampedKeyValueStore(params); } else { - return new ResponsiveKeyValueStore(params, isTimestamped); + return new ResponsiveKeyValueStore(params, false); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java index 2d481a0f7..2e58bcb0d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/TtlProvider.java @@ -110,6 +110,9 @@ public TtlProvider fromValue( valueSerde); } + /** + * @return the same TtlProvider with a key-and-value-based override function + */ public TtlProvider fromKeyAndValue( final BiFunction> computeTtlFromKeyAndValue, final Serde keySerde, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java index d3b303730..4f61bfd2f 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraKeyValueTable.java @@ -40,6 +40,7 @@ import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions; import dev.responsive.kafka.internal.db.partitioning.SubPartitioner; import dev.responsive.kafka.internal.db.spec.RemoteTableSpec; +import dev.responsive.kafka.internal.stores.TtlResolver; import dev.responsive.kafka.internal.utils.Iterators; import java.nio.ByteBuffer; import java.time.Duration; @@ -47,6 +48,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeoutException; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -81,8 +83,9 @@ public static CassandraKeyValueTable create( final CassandraClient client ) throws InterruptedException, TimeoutException { final String name = spec.tableName(); + final var ttlResolver = spec.ttlResolver(); LOG.info("Creating data table {} in remote store.", name); - client.execute(spec.applyDefaultOptions(createTable(name)).build()); + client.execute(spec.applyDefaultOptions(createTable(name, ttlResolver)).build()); client.awaitTable(name).await(Duration.ofSeconds(60)); @@ -224,8 +227,11 @@ public static CassandraKeyValueTable create( ); } - private static CreateTableWithOptions createTable(final String tableName) { - return SchemaBuilder + private static CreateTableWithOptions createTable( + final String tableName, + final Optional> ttlResolver + ) { + final var baseOptions = SchemaBuilder .createTable(tableName) .ifNotExists() .withPartitionKey(PARTITION_KEY.column(), DataTypes.INT) @@ -235,6 +241,13 @@ private static CreateTableWithOptions createTable(final String tableName) { .withColumn(OFFSET.column(), DataTypes.BIGINT) .withColumn(EPOCH.column(), DataTypes.BIGINT) .withColumn(TIMESTAMP.column(), DataTypes.TIMESTAMP); + + if (ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()) { + final int defaultTtlSeconds = (int) ttlResolver.get().defaultTtl().toSeconds(); + return baseOptions.withDefaultTimeToLiveSeconds(defaultTtlSeconds); + } else { + return baseOptions; + } } // Visible for Testing diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java index fc21033a6..04e959b56 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java @@ -125,7 +125,7 @@ public void shouldInsertAndDelete() throws Exception { @SuppressWarnings("OptionalGetWithoutIsPresent") @Test - public void shouldRespectSemanticDefaultOnlyTtl() throws Exception { + public void shouldConfigureDefaultTtl() throws Exception { // Given: final var defaultTtl = Duration.ofMinutes(30); final var ttlProvider = TtlProvider.withDefault(defaultTtl); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java index 88f54a634..629f4df4e 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java @@ -20,6 +20,7 @@ import static dev.responsive.kafka.testutils.IntegrationTestUtils.copyConfigWithOverrides; import static java.util.Collections.singletonMap; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -59,6 +60,7 @@ public class CassandraKVTableIntegrationTest { private static final int NUM_KAFKA_PARTITIONS = NUM_SUBPARTITIONS_TOTAL / 4; private CassandraClient client; + private CqlSession session; private String storeName; // ie the "kafkaName", NOT the "cassandraName" private ResponsiveConfig config; @@ -68,7 +70,7 @@ public void before( final CassandraContainer cassandra, @ResponsiveConfigParam final ResponsiveConfig config ) throws InterruptedException, TimeoutException { - final CqlSession session = CqlSession.builder() + this.session = CqlSession.builder() .addContactPoint(cassandra.getContactPoint()) .withLocalDatacenter(cassandra.getLocalDatacenter()) .withKeyspace("responsive_itests") // NOTE: this keyspace is expected to exist @@ -196,8 +198,32 @@ public void shouldReturnRangeKeysInLexicalOrderAcrossMultipleSubPartitions() { } } + @SuppressWarnings("OptionalGetWithoutIsPresent") @Test - public void shouldRespectSemanticTtlForLookups() { + public void shouldRespectSemanticDefaultOnlyTtl() { + // Given: + final var defaultTtl = Duration.ofMinutes(30); + final var ttlProvider = TtlProvider.withDefault(defaultTtl); + final ResponsiveKeyValueParams params = + ResponsiveKeyValueParams.keyValue(storeName).withTtlProvider(ttlProvider); + final String tableName = params.name().tableName(); + + // When: + createTableFromParams(params); + + // Then: + final var table = session.getMetadata() + .getKeyspace(session.getKeyspace().get()) + .get() + .getTable(tableName) + .get(); + final String describe = table.describe(false); + + assertThat(describe, containsString("default_time_to_live = " + (int) defaultTtl.toSeconds())); + } + + @Test + public void shouldRespectSemanticDefaultOnlyTtlForLookups() { // Given: final TtlProvider ttlProvider = TtlProvider.withDefault(Duration.ofMillis(100)); final RemoteKVTable table = createTable(ttlProvider);