Skip to content

Commit

Permalink
Row-level TTL PR 2.5: fix default ttl for CassandraKeyValueTable (#372)
Browse files Browse the repository at this point in the history
Quick followup to PR #2 (#371) which switched the method of setting a ttl from the old Spec layers to the new TtlResolver. Carried over the ttl to the CassandraFactTable but forgot to apply it for the CassandraKeyValueTable, which is done in this PR
  • Loading branch information
ableegoldman authored Oct 25, 2024
1 parent f238757 commit 853518b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public KeyValueStore<Bytes, byte[]> get() {
if (isTimestamped) {
return new ResponsiveTimestampedKeyValueStore(params);
} else {
return new ResponsiveKeyValueStore(params, isTimestamped);
return new ResponsiveKeyValueStore(params, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public TtlProvider<K, V> fromValue(
valueSerde);
}

/**
* @return the same TtlProvider with a key-and-value-based override function
*/
public TtlProvider<K, V> fromKeyAndValue(
final BiFunction<K, V, Optional<TtlDuration>> computeTtlFromKeyAndValue,
final Serde<K> keySerde,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import dev.responsive.kafka.internal.db.partitioning.SubPartitioner;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -81,8 +83,9 @@ public static CassandraKeyValueTable create(
final CassandraClient client
) throws InterruptedException, TimeoutException {
final String name = spec.tableName();
final var ttlResolver = spec.ttlResolver();
LOG.info("Creating data table {} in remote store.", name);
client.execute(spec.applyDefaultOptions(createTable(name)).build());
client.execute(spec.applyDefaultOptions(createTable(name, ttlResolver)).build());

client.awaitTable(name).await(Duration.ofSeconds(60));

Expand Down Expand Up @@ -224,8 +227,11 @@ public static CassandraKeyValueTable create(
);
}

private static CreateTableWithOptions createTable(final String tableName) {
return SchemaBuilder
private static CreateTableWithOptions createTable(
final String tableName,
final Optional<TtlResolver<?, ?>> ttlResolver
) {
final var baseOptions = SchemaBuilder
.createTable(tableName)
.ifNotExists()
.withPartitionKey(PARTITION_KEY.column(), DataTypes.INT)
Expand All @@ -235,6 +241,13 @@ private static CreateTableWithOptions createTable(final String tableName) {
.withColumn(OFFSET.column(), DataTypes.BIGINT)
.withColumn(EPOCH.column(), DataTypes.BIGINT)
.withColumn(TIMESTAMP.column(), DataTypes.TIMESTAMP);

if (ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()) {
final int defaultTtlSeconds = (int) ttlResolver.get().defaultTtl().toSeconds();
return baseOptions.withDefaultTimeToLiveSeconds(defaultTtlSeconds);
} else {
return baseOptions;
}
}

// Visible for Testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void shouldInsertAndDelete() throws Exception {

@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
public void shouldRespectSemanticDefaultOnlyTtl() throws Exception {
public void shouldConfigureDefaultTtl() throws Exception {
// Given:
final var defaultTtl = Duration.ofMinutes(30);
final var ttlProvider = TtlProvider.<String, String>withDefault(defaultTtl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static dev.responsive.kafka.testutils.IntegrationTestUtils.copyConfigWithOverrides;
import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -59,6 +60,7 @@ public class CassandraKVTableIntegrationTest {
private static final int NUM_KAFKA_PARTITIONS = NUM_SUBPARTITIONS_TOTAL / 4;

private CassandraClient client;
private CqlSession session;
private String storeName; // ie the "kafkaName", NOT the "cassandraName"
private ResponsiveConfig config;

Expand All @@ -68,7 +70,7 @@ public void before(
final CassandraContainer<?> cassandra,
@ResponsiveConfigParam final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
final CqlSession session = CqlSession.builder()
this.session = CqlSession.builder()
.addContactPoint(cassandra.getContactPoint())
.withLocalDatacenter(cassandra.getLocalDatacenter())
.withKeyspace("responsive_itests") // NOTE: this keyspace is expected to exist
Expand Down Expand Up @@ -196,8 +198,32 @@ public void shouldReturnRangeKeysInLexicalOrderAcrossMultipleSubPartitions() {
}
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
public void shouldRespectSemanticTtlForLookups() {
public void shouldRespectSemanticDefaultOnlyTtl() {
// Given:
final var defaultTtl = Duration.ofMinutes(30);
final var ttlProvider = TtlProvider.<String, String>withDefault(defaultTtl);
final ResponsiveKeyValueParams params =
ResponsiveKeyValueParams.keyValue(storeName).withTtlProvider(ttlProvider);
final String tableName = params.name().tableName();

// When:
createTableFromParams(params);

// Then:
final var table = session.getMetadata()
.getKeyspace(session.getKeyspace().get())
.get()
.getTable(tableName)
.get();
final String describe = table.describe(false);

assertThat(describe, containsString("default_time_to_live = " + (int) defaultTtl.toSeconds()));
}

@Test
public void shouldRespectSemanticDefaultOnlyTtlForLookups() {
// Given:
final TtlProvider<?, ?> ttlProvider = TtlProvider.withDefault(Duration.ofMillis(100));
final RemoteKVTable<BoundStatement> table = createTable(ttlProvider);
Expand Down

0 comments on commit 853518b

Please sign in to comment.