Skip to content

Commit

Permalink
merge with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed Oct 28, 2024
2 parents 4a2cea0 + 853518b commit cc48d46
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private static CreateTableWithOptions createTable(
.withColumn(DATA_VALUE.column(), DataTypes.BLOB)
.withColumn(OFFSET.column(), DataTypes.BIGINT)
.withColumn(EPOCH.column(), DataTypes.BIGINT)
.withColumn(TIMESTAMP.column(), DataTypes.BIGINT);
.withColumn(TIMESTAMP.column(), DataTypes.TIMESTAMP);

if (ttlResolver.isPresent() && ttlResolver.get().defaultTtl().isFinite()) {
final int defaultTtlSeconds = (int) ttlResolver.get().defaultTtl().toSeconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void shouldInsertAndDelete() throws Exception {

@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
public void shouldRespectSemanticDefaultOnlyTtl() throws Exception {
public void shouldConfigureDefaultTtl() throws Exception {
// Given:
final var defaultTtl = Duration.ofMinutes(30);
final var ttlProvider = TtlProvider.<String, String>withDefault(defaultTtl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static dev.responsive.kafka.testutils.IntegrationTestUtils.copyConfigWithOverrides;
import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -59,6 +60,7 @@ public class CassandraKVTableIntegrationTest {
private static final int NUM_KAFKA_PARTITIONS = NUM_SUBPARTITIONS_TOTAL / 4;

private CassandraClient client;
private CqlSession session;
private String storeName; // ie the "kafkaName", NOT the "cassandraName"
private ResponsiveConfig config;

Expand All @@ -68,7 +70,7 @@ public void before(
final CassandraContainer<?> cassandra,
@ResponsiveConfigParam final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
final CqlSession session = CqlSession.builder()
this.session = CqlSession.builder()
.addContactPoint(cassandra.getContactPoint())
.withLocalDatacenter(cassandra.getLocalDatacenter())
.withKeyspace("responsive_itests") // NOTE: this keyspace is expected to exist
Expand Down Expand Up @@ -196,8 +198,32 @@ public void shouldReturnRangeKeysInLexicalOrderAcrossMultipleSubPartitions() {
}
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
public void shouldRespectSemanticTtlForLookups() {
public void shouldRespectSemanticDefaultOnlyTtl() {
// Given:
final var defaultTtl = Duration.ofMinutes(30);
final var ttlProvider = TtlProvider.<String, String>withDefault(defaultTtl);
final ResponsiveKeyValueParams params =
ResponsiveKeyValueParams.keyValue(storeName).withTtlProvider(ttlProvider);
final String tableName = params.name().tableName();

// When:
createTableFromParams(params);

// Then:
final var table = session.getMetadata()
.getKeyspace(session.getKeyspace().get())
.get()
.getTable(tableName)
.get();
final String describe = table.describe(false);

assertThat(describe, containsString("default_time_to_live = " + (int) defaultTtl.toSeconds()));
}

@Test
public void shouldRespectSemanticDefaultOnlyTtlForLookups() {
// Given:
final TtlProvider<?, ?> ttlProvider = TtlProvider.withDefault(Duration.ofMillis(100));
final RemoteKVTable<BoundStatement> table = createTable(ttlProvider);
Expand Down

0 comments on commit cc48d46

Please sign in to comment.