diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 3665b4636..0e958965e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -111,6 +111,15 @@ public class ResponsiveConfig extends AbstractConfig { + "with for best results. However it is important to note that this cannot be changed for " + "an active application. Messing with this can corrupt existing state!"; + public static final String MONGO_TOMBSTONE_RETENTION_SEC_CONFIG = "responsive.mongo.tombstone.retention.seconds"; + private static final long MONGO_TOMBSTONE_RETENTION_SEC_DEFAULT = Duration.ofHours(12).toSeconds(); + private static final String MONGO_TOMBSTONE_RETENTION_SEC_DOC = + "The duration, in milliseconds, to retain deleted records in MongoDB after they are " + + "deleted or their TTL expires. This retention period ensures that zombie instances of " + + "Kafka Streams are fenced from overwriting deleted or expired records. Configure this " + + "based on your application's tolerance for potential write errors by zombie instances. " + + "Default: 12 hours (43,200s)."; + // ------------------ RS3 specific configurations ---------------------- public static final String RS3_HOSTNAME_CONFIG = "responsive.rs3.hostname"; @@ -536,6 +545,12 @@ public class ResponsiveConfig extends AbstractConfig { "", Importance.LOW, MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_DOC + ).define( + MONGO_TOMBSTONE_RETENTION_SEC_CONFIG, + Type.LONG, + MONGO_TOMBSTONE_RETENTION_SEC_DEFAULT, + Importance.LOW, + MONGO_TOMBSTONE_RETENTION_SEC_DOC ).define( WINDOW_BLOOM_FILTER_COUNT_CONFIG, Type.INT, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java index b1d4426cd..38bb686b9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteTableSpecFactory.java @@ -12,6 +12,7 @@ package dev.responsive.kafka.internal.db; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveWindowParams; import dev.responsive.kafka.internal.db.partitioning.Segmenter.SegmentPartition; @@ -37,20 +38,23 @@ public class RemoteTableSpecFactory { public static RemoteTableSpec fromKVParams( final ResponsiveKeyValueParams params, final TablePartitioner partitioner, - final Optional> ttlResolver + final Optional> ttlResolver, + final ResponsiveConfig config ) { return new DefaultTableSpec( params.name().tableName(), partitioner, - ttlResolver + ttlResolver, + config ); } public static RemoteTableSpec fromWindowParams( final ResponsiveWindowParams params, - final TablePartitioner partitioner + final TablePartitioner partitioner, + final ResponsiveConfig config ) { - return new DefaultTableSpec(params.name().tableName(), partitioner, Optional.empty()); + return new DefaultTableSpec(params.name().tableName(), partitioner, Optional.empty(), config); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java index 29dac725b..dcfd4c5fe 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoKVTable.java @@ -31,6 +31,7 @@ import com.mongodb.client.model.Updates; import com.mongodb.client.model.WriteModel; import com.mongodb.client.result.UpdateResult; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.MongoKVFlushManager; import dev.responsive.kafka.internal.db.RemoteKVTable; import dev.responsive.kafka.internal.stores.TtlResolver; @@ -70,7 +71,8 @@ public MongoKVTable( final MongoClient client, final String name, final CollectionCreationOptions collectionCreationOptions, - final Optional> ttlResolver + final Optional> ttlResolver, + final ResponsiveConfig config ) { this.name = name; this.keyCodec = new StringKeyCodec(); @@ -96,11 +98,12 @@ public MongoKVTable( } metadata = database.getCollection(METADATA_COLLECTION_NAME, KVMetadataDoc.class); - // TODO(agavra): make the tombstone retention configurable // this is idempotent + final long tombstoneRetentionSeconds = + config.getLong(ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_SEC_CONFIG); docs.createIndex( Indexes.descending(KVDoc.TOMBSTONE_TS), - new IndexOptions().expireAfter(12L, TimeUnit.HOURS) + new IndexOptions().expireAfter(tombstoneRetentionSeconds, TimeUnit.SECONDS) ); if (ttlResolver.isPresent()) { @@ -110,7 +113,8 @@ public MongoKVTable( } this.defaultTtlMs = ttlResolver.get().defaultTtl().toMillis(); - final long expireAfterMs = defaultTtlMs + Duration.ofHours(12).toMillis(); + final long expireAfterMs = + defaultTtlMs + Duration.ofSeconds(tombstoneRetentionSeconds).toMillis(); docs.createIndex( Indexes.descending(KVDoc.TIMESTAMP), diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java index f365c312a..dd58359db 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java @@ -14,6 +14,7 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.model.WriteModel; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.RemoteKVTable; import dev.responsive.kafka.internal.db.RemoteSessionTable; import dev.responsive.kafka.internal.db.RemoteWindowTable; @@ -46,7 +47,8 @@ public ResponsiveMongoClient( client, spec.tableName(), collectionCreationOptions, - spec.ttlResolver() + spec.ttlResolver(), + spec.config() )); windowTableCache = new WindowedTableCache<>( (spec, partitioner) -> new MongoWindowTable( @@ -69,27 +71,30 @@ public ResponsiveMongoClient( public RemoteKVTable> kvTable( final String name, - final Optional> ttlResolver + final Optional> ttlResolver, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { return kvTableCache.create( - new DefaultTableSpec(name, TablePartitioner.defaultPartitioner(), ttlResolver) + new DefaultTableSpec(name, TablePartitioner.defaultPartitioner(), ttlResolver, config) ); } public RemoteWindowTable> windowedTable( final String name, - final WindowSegmentPartitioner partitioner + final WindowSegmentPartitioner partitioner, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { return windowTableCache.create( - new DefaultTableSpec(name, partitioner, Optional.empty()), partitioner); + new DefaultTableSpec(name, partitioner, Optional.empty(), config), partitioner); } public RemoteSessionTable> sessionTable( final String name, - final SessionSegmentPartitioner partitioner + final SessionSegmentPartitioner partitioner, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { return sessionTableCache.create( - new DefaultTableSpec(name, partitioner, Optional.empty()), partitioner); + new DefaultTableSpec(name, partitioner, Optional.empty(), config), partitioner); } public void close() { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java index c52245cf0..dd3cdd555 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/rs3/RS3TableFactory.java @@ -12,6 +12,7 @@ package dev.responsive.kafka.internal.db.rs3; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.RemoteKVTable; import dev.responsive.kafka.internal.db.rs3.client.WalEntry; import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client; @@ -30,7 +31,7 @@ public RS3TableFactory( this.rs3Port = rs3Port; } - public RemoteKVTable kvTable(final String name) { + public RemoteKVTable kvTable(final String name, final ResponsiveConfig config) { final UUID storeId = new UUID(0, 0); final PssPartitioner pssPartitioner = new PssDirectPartitioner(); final var rs3Client = GrpcRS3Client.connect( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/DefaultTableSpec.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/DefaultTableSpec.java index 733cd46cb..f9cc337f7 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/DefaultTableSpec.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/DefaultTableSpec.java @@ -13,6 +13,7 @@ package dev.responsive.kafka.internal.db.spec; import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; import dev.responsive.kafka.internal.stores.TtlResolver; import java.util.Optional; @@ -22,15 +23,18 @@ public class DefaultTableSpec implements RemoteTableSpec { private final String name; private final TablePartitioner partitioner; private final Optional> ttlResolver; + private final ResponsiveConfig config; public DefaultTableSpec( final String name, final TablePartitioner partitioner, - final Optional> ttlResolver + final Optional> ttlResolver, + final ResponsiveConfig config ) { this.name = name; this.partitioner = partitioner; this.ttlResolver = ttlResolver; + this.config = config; } @Override @@ -52,4 +56,9 @@ public String tableName() { public CreateTableWithOptions applyDefaultOptions(final CreateTableWithOptions base) { return base; } + + @Override + public ResponsiveConfig config() { + return config; + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/RemoteTableSpec.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/RemoteTableSpec.java index a635df1dd..12d2363c8 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/RemoteTableSpec.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/spec/RemoteTableSpec.java @@ -13,6 +13,7 @@ package dev.responsive.kafka.internal.db.spec; import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.RemoteTable; import dev.responsive.kafka.internal.db.partitioning.TablePartitioner; import dev.responsive.kafka.internal.stores.TtlResolver; @@ -32,4 +33,6 @@ public interface RemoteTableSpec { CreateTableWithOptions applyDefaultOptions(final CreateTableWithOptions base); + ResponsiveConfig config(); + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java index f600d0697..b27be6e6e 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/GlobalOperations.java @@ -15,6 +15,7 @@ import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadSessionClients; import static dev.responsive.kafka.internal.db.partitioning.TablePartitioner.defaultPartitioner; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.internal.db.CassandraClient; import dev.responsive.kafka.internal.db.CassandraFactTable; @@ -65,7 +66,8 @@ public static GlobalOperations create( final var spec = RemoteTableSpecFactory.fromKVParams( params, defaultPartitioner(), - ttlResolver + ttlResolver, + ResponsiveConfig.responsiveConfig(appConfigs) ); final var table = client.globalFactory().create(spec); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index 01664e4ad..1a1147fee 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -100,13 +100,13 @@ public static PartitionedOperations create( table = createCassandra(params, config, sessionClients, changelog.topic(), ttlResolver); break; case MONGO_DB: - table = createMongo(params, sessionClients, ttlResolver); + table = createMongo(params, sessionClients, ttlResolver, config); break; case IN_MEMORY: - table = createInMemory(params, ttlResolver); + table = createInMemory(params, ttlResolver, config); break; case RS3: - table = createRS3(params, sessionClients); + table = createRS3(params, sessionClients, config); break; default: throw new IllegalStateException("Unexpected value: " + sessionClients.storageBackend()); @@ -187,7 +187,8 @@ public static PartitionedOperations create( private static RemoteKVTable createInMemory( final ResponsiveKeyValueParams params, - final Optional> ttlResolver + final Optional> ttlResolver, + final ResponsiveConfig config ) { if (ttlResolver.isPresent() && !ttlResolver.get().hasDefaultOnly()) { throw new UnsupportedOperationException("Row-level ttl is not yet supported " @@ -221,7 +222,7 @@ private static RemoteKVTable createCassandra( changelogTopicName ); final var client = sessionClients.cassandraClient(); - final var spec = RemoteTableSpecFactory.fromKVParams(params, partitioner, ttlResolver); + final var spec = RemoteTableSpecFactory.fromKVParams(params, partitioner, ttlResolver, config); switch (params.schemaType()) { case KEY_VALUE: return client.kvFactory().create(spec); @@ -235,16 +236,18 @@ private static RemoteKVTable createCassandra( private static RemoteKVTable createMongo( final ResponsiveKeyValueParams params, final SessionClients sessionClients, - final Optional> ttlResolver + final Optional> ttlResolver, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { - return sessionClients.mongoClient().kvTable(params.name().tableName(), ttlResolver); + return sessionClients.mongoClient().kvTable(params.name().tableName(), ttlResolver, config); } private static RemoteKVTable createRS3( final ResponsiveKeyValueParams params, - final SessionClients sessionClients + final SessionClients sessionClients, + final ResponsiveConfig config ) { - return sessionClients.rs3TableFactory().kvTable(params.name().tableName()); + return sessionClients.rs3TableFactory().kvTable(params.name().tableName(), config); } @SuppressWarnings("rawtypes") diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java index 2fdc984f5..7d26e4161 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java @@ -97,10 +97,10 @@ public static SegmentedOperations create( final RemoteWindowTable table; switch (sessionClients.storageBackend()) { case CASSANDRA: - table = createCassandra(params, sessionClients, partitioner); + table = createCassandra(params, sessionClients, partitioner, responsiveConfig); break; case MONGO_DB: - table = createMongo(params, sessionClients, partitioner); + table = createMongo(params, sessionClients, partitioner, responsiveConfig); break; default: throw new IllegalStateException("Unexpected value: " + sessionClients.storageBackend()); @@ -168,11 +168,12 @@ public static SegmentedOperations create( private static RemoteWindowTable createCassandra( final ResponsiveWindowParams params, final SessionClients clients, - final WindowSegmentPartitioner partitioner + final WindowSegmentPartitioner partitioner, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { final CassandraClient client = clients.cassandraClient(); - final var spec = RemoteTableSpecFactory.fromWindowParams(params, partitioner); + final var spec = RemoteTableSpecFactory.fromWindowParams(params, partitioner, config); switch (params.schemaType()) { case WINDOW: @@ -187,13 +188,14 @@ private static RemoteWindowTable createCassandra( private static RemoteWindowTable createMongo( final ResponsiveWindowParams params, final SessionClients clients, - final WindowSegmentPartitioner partitioner + final WindowSegmentPartitioner partitioner, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { final ResponsiveMongoClient client = clients.mongoClient(); switch (params.schemaType()) { case WINDOW: - return client.windowedTable(params.name().tableName(), partitioner); + return client.windowedTable(params.name().tableName(), partitioner, config); case STREAM: throw new UnsupportedOperationException("Not yet implemented"); default: diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java index ea3e34573..4311c4a61 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java @@ -93,7 +93,7 @@ public static SessionOperationsImpl create( ); final RemoteSessionTable table = - createRemoteSessionTable(params, sessionClients, partitioner); + createRemoteSessionTable(params, sessionClients, partitioner, responsiveConfig); final SessionFlushManager flushManager = table.init(changelog.partition()); @@ -151,13 +151,14 @@ private static RemoteSessionTable createCassandra( private static RemoteSessionTable createMongo( final ResponsiveSessionParams params, final SessionClients clients, - final SessionSegmentPartitioner partitioner + final SessionSegmentPartitioner partitioner, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { final ResponsiveMongoClient client = clients.mongoClient(); switch (params.schemaType()) { case SESSION: - return client.sessionTable(params.name().tableName(), partitioner); + return client.sessionTable(params.name().tableName(), partitioner, config); default: throw new IllegalArgumentException(params.schemaType().name()); } @@ -166,13 +167,14 @@ private static RemoteSessionTable createMongo( private static RemoteSessionTable createRemoteSessionTable( final ResponsiveSessionParams params, final SessionClients sessionClients, - final SessionSegmentPartitioner partitioner + final SessionSegmentPartitioner partitioner, + final ResponsiveConfig config ) throws InterruptedException, TimeoutException { switch (sessionClients.storageBackend()) { case CASSANDRA: return createCassandra(params, sessionClients, partitioner); case MONGO_DB: - return createMongo(params, sessionClients, partitioner); + return createMongo(params, sessionClients, partitioner, config); default: throw new IllegalStateException("Unexpected value: " + sessionClients.storageBackend()); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java index d523df003..0255d8fd2 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreRestoreIntegrationTest.java @@ -363,7 +363,8 @@ private RemoteKVTable remoteKVTable( .create(new DefaultTableSpec( aggName(), TablePartitioner.defaultPartitioner(), - TtlResolver.NO_TTL + TtlResolver.NO_TTL, + config )); } else if (type == KVSchema.KEY_VALUE) { @@ -381,7 +382,8 @@ private RemoteKVTable remoteKVTable( mongoClient, aggName(), CollectionCreationOptions.fromConfig(config), - TtlResolver.NO_TTL + TtlResolver.NO_TTL, + config ); table.init(0); } else { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java index 96f0eb651..4c4150d99 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/TablePartitionerIntegrationTest.java @@ -112,6 +112,7 @@ public class TablePartitionerIntegrationTest { private String storeName; private Admin admin; private CassandraClient client; + private ResponsiveConfig config; @BeforeEach public void before( @@ -124,6 +125,7 @@ public void before( storeName = name + "store"; this.responsiveProps.putAll(responsiveProps); + config = ResponsiveConfig.responsiveConfig(responsiveProps); this.admin = admin; admin.createTopics( @@ -195,7 +197,7 @@ public void shouldFlushToRemoteTableWithSubpartitions() throws Exception { ); final CassandraKeyValueTable table = CassandraKeyValueTable.create( - new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL), client); + new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL, config), client); assertThat(client.numPartitions(cassandraName), is(OptionalInt.of(32))); assertThat(client.count(cassandraName, 0), is(2L)); @@ -253,7 +255,7 @@ public void shouldFlushToRemoteTableWithoutSubpartitions() throws Exception { final String cassandraName = new TableName(storeName).tableName(); final var partitioner = TablePartitioner.defaultPartitioner(); final CassandraFactTable table = CassandraFactTable.create( - new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL), client); + new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL, config), client); final var offset0 = table.fetchOffset(0); final var offset1 = table.fetchOffset(1); @@ -333,7 +335,7 @@ public void shouldFlushToRemoteTableWithSegmentPartitions() throws Exception { storeName + "-changelog" ); final CassandraKeyValueTable table = CassandraKeyValueTable.create( - new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL), client); + new DefaultTableSpec(cassandraName, partitioner, TtlResolver.NO_TTL, config), client); assertThat(client.numPartitions(cassandraName), is(OptionalInt.of(32))); assertThat(client.count(cassandraName, 0), is(2L)); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java index 6a72bf14e..a773e10db 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraFactTableIntegrationTest.java @@ -59,6 +59,7 @@ class CassandraFactTableIntegrationTest { private ResponsiveKeyValueParams params; private CassandraClient client; private CqlSession session; + private ResponsiveConfig config; @BeforeEach public void before( @@ -75,6 +76,7 @@ public void before( .withKeyspace("responsive_itests") // NOTE: this keyspace is expected to exist .build(); client = new CassandraClient(session, ResponsiveConfig.responsiveConfig(responsiveProps)); + config = ResponsiveConfig.responsiveConfig(responsiveProps); } @Test @@ -84,7 +86,7 @@ public void shouldInitializeWithCorrectMetadata() throws Exception { final String tableName = params.name().tableName(); final CassandraFactTable schema = (CassandraFactTable) client .factFactory() - .create(RemoteTableSpecFactory.fromKVParams(params, defaultPartitioner(), NO_TTL)); + .create(RemoteTableSpecFactory.fromKVParams(params, defaultPartitioner(), NO_TTL, config)); // When: final var token = schema.init(1); @@ -112,7 +114,7 @@ public void shouldInsertAndDelete() throws Exception { params = ResponsiveKeyValueParams.fact(storeName); final RemoteKVTable table = client .factFactory() - .create(RemoteTableSpecFactory.fromKVParams(params, defaultPartitioner(), NO_TTL)); + .create(RemoteTableSpecFactory.fromKVParams(params, defaultPartitioner(), NO_TTL, config)); table.init(1); @@ -144,7 +146,8 @@ public void shouldConfigureDefaultTtl() throws Exception { client.factFactory().create(RemoteTableSpecFactory.fromKVParams( params, defaultPartitioner(), - defaultOnlyTtl(Duration.ofMillis(ttlMs)) + defaultOnlyTtl(Duration.ofMillis(ttlMs)), + config )); // Then: @@ -170,7 +173,8 @@ public void shouldRespectSemanticDefaultOnlyTtl() throws Exception { final var table = client.factFactory().create(RemoteTableSpecFactory.fromKVParams( params, defaultPartitioner(), - defaultOnlyTtl(Duration.ofMillis(ttlMs)) + defaultOnlyTtl(Duration.ofMillis(ttlMs)), + config )); final long insertTimeMs = 0L; @@ -213,7 +217,8 @@ public void shouldRespectSemanticKeyBasedTtl() throws Exception { defaultPartitioner(), Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", new StringDeserializer(), new StringDeserializer()), - ttlProvider)) + ttlProvider)), + config )); table.init(1); @@ -280,7 +285,8 @@ public void shouldRespectSemanticKeyValueBasedTtl() throws Exception { defaultPartitioner(), Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", new StringDeserializer(), new StringDeserializer()), - ttlProvider)) + ttlProvider)), + config )); table.init(1); @@ -361,7 +367,8 @@ public void shouldRespectOverridesWithValueBasedTtl() throws Exception { Optional.of(new TtlResolver<>( new StateDeserializer<>("ignored", new StringDeserializer(), new StringDeserializer()), ttlProvider) - ))); + ), config + )); table.init(1); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java index 8138cef1b..f277dd295 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/CassandraKVTableIntegrationTest.java @@ -116,7 +116,7 @@ private RemoteKVTable createTableFromParams( try { return CassandraKeyValueTable.create( - new DefaultTableSpec(tableName, partitioner, ttlResolver), client); + new DefaultTableSpec(tableName, partitioner, ttlResolver, config), client); } catch (final Exception e) { throw new AssertionError("Failed to create table", e); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java index 9add66497..de89f7c67 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoKVTableTest.java @@ -13,6 +13,7 @@ package dev.responsive.kafka.internal.db.mongo; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; +import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_SEC_CONFIG; import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL; import static dev.responsive.kafka.testutils.IntegrationTestUtils.defaultOnlyTtl; import static dev.responsive.kafka.testutils.Matchers.sameKeyValue; @@ -22,9 +23,11 @@ import static org.hamcrest.Matchers.nullValue; import com.mongodb.client.MongoClient; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.internal.stores.RemoteWriteResult; import dev.responsive.kafka.internal.utils.SessionUtil; +import dev.responsive.kafka.testutils.IntegrationTestUtils; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; import java.time.Duration; @@ -33,9 +36,11 @@ import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.function.BooleanSupplier; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.RegisterExtension; @@ -49,8 +54,10 @@ class MongoKVTableTest { 0 ); + private Map props; private String name; private MongoClient client; + private ResponsiveConfig config; @BeforeEach public void before( @@ -60,13 +67,15 @@ public void before( name = info.getDisplayName().replace("()", ""); final String mongoConnection = (String) props.get(MONGO_ENDPOINT_CONFIG); + this.props = props; client = SessionUtil.connect(mongoConnection, null, null, "", null); + config = ResponsiveConfig.responsiveConfig(props); } @Test public void shouldSucceedWriterWithSameEpoch() throws ExecutionException, InterruptedException { // Given: - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); @@ -85,7 +94,7 @@ public void shouldSucceedWriterWithSameEpoch() throws ExecutionException, Interr @Test public void shouldSucceedWriterWithLargerEpoch() throws ExecutionException, InterruptedException { // Given: - var table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + var table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(1), byteArray(1), 100); @@ -93,7 +102,7 @@ public void shouldSucceedWriterWithLargerEpoch() throws ExecutionException, Inte // When: // initialize new writer with higher epoch - table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); writerFactory = table.init(0); writer = writerFactory.createWriter(0, 0); writer.insert(bytes(1), byteArray(1), 101); @@ -106,11 +115,11 @@ public void shouldSucceedWriterWithLargerEpoch() throws ExecutionException, Inte @Test public void shouldFenceWriterSmallerEpoch() throws ExecutionException, InterruptedException { // Given: - var table0 = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + var table0 = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory0 = table0.init(0); var writer0 = writerFactory0.createWriter(0, 0); - var table1 = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + var table1 = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory1 = table1.init(0); var writer1 = writerFactory1.createWriter(0, 0); @@ -129,7 +138,7 @@ public void shouldFenceWriterSmallerEpoch() throws ExecutionException, Interrupt @Test public void shouldReturnNullForNotInserted() { // given: - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); table.init(0); // when: @@ -142,7 +151,7 @@ public void shouldReturnNullForNotInserted() { @Test public void shouldReturnNullForDeletedRecord() { // given: - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(1), byteArray(1), 100); @@ -161,7 +170,7 @@ public void shouldReturnNullForDeletedRecord() { @Test public void shouldGetInsertedRecord() { // given: - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(1), byteArray(1), 100); @@ -179,7 +188,8 @@ public void shouldGetInsertedRecord() { public void shouldFilterResultsWithOldTimestamp() { // given: final Duration ttl = Duration.ofMillis(100); - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl)); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl), + config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(1), byteArray(1), 0); @@ -196,7 +206,8 @@ public void shouldFilterResultsWithOldTimestamp() { public void shouldIncludeResultsWithNewerTimestamp() { // given: final Duration ttl = Duration.ofMillis(100); - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl)); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl), + config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(1), byteArray(1), 0); @@ -213,7 +224,7 @@ public void shouldIncludeResultsWithNewerTimestamp() { @Test public void shouldHandlePartitionedRangeScansCorrectly() { // Given: - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); final var writerFactory0 = table.init(0); final var writer0 = writerFactory0.createWriter(0, 0); @@ -248,7 +259,7 @@ public void shouldHandlePartitionedRangeScansCorrectly() { @Test public void shouldFilterTombstonesFromRangeScans() { - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(10, 11, 12, 13), byteArray(2), 100); @@ -277,7 +288,8 @@ public void shouldFilterTombstonesFromRangeScans() { @Test public void shouldFilterExpiredItemsFromRangeScans() { final Duration ttl = Duration.ofMillis(100); - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl)); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl), + config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(10, 11, 12, 13), byteArray(2), 100); @@ -302,7 +314,7 @@ public void shouldFilterExpiredItemsFromRangeScans() { @Test public void shouldHandleFullScansCorrectly() { - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); final var writerFactory0 = table.init(0); final var writer0 = writerFactory0.createWriter(0, 0); @@ -335,7 +347,7 @@ public void shouldHandleFullScansCorrectly() { @Test public void shouldFilterTombstonesFromFullScans() { - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, NO_TTL, config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); writer.insert(bytes(10, 11, 12, 13), byteArray(2), 100); @@ -364,7 +376,8 @@ public void shouldFilterTombstonesFromFullScans() { @Test public void shouldFilterExpiredFromFullScans() { final Duration ttl = Duration.ofMillis(100); - final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl)); + final MongoKVTable table = new MongoKVTable(client, name, UNSHARDED, defaultOnlyTtl(ttl), + config); var writerFactory = table.init(0); var writer = writerFactory.createWriter(0, 0); @@ -388,6 +401,34 @@ public void shouldFilterExpiredFromFullScans() { iter.close(); } + @Test + @Disabled("fix this when we fix https://github.com/slatedb/slatedb/issues/442") + public void shouldExpireRecords() { + props.put(MONGO_TOMBSTONE_RETENTION_SEC_CONFIG, 1); + final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(props); + final Duration ttl = Duration.ofMillis(100); + final MongoKVTable table = new MongoKVTable( + client, name, UNSHARDED, defaultOnlyTtl(ttl), config + ); + + var writerFactory = table.init(0); + var writer = writerFactory.createWriter(0, 0); + writer.insert(bytes(10, 11, 12, 13), byteArray(2), 100); + writer.flush(); + + // When: + final BooleanSupplier isExpired = () -> { + final byte[] bytes = table.get(0, bytes(10, 11, 12, 13), 100); + return bytes == null; + }; + + // Then: + // TODO(agavra): we need to wait up to at least one minute to make sure that the ttl + // background job kicks in. Unfortunately I don't see a better way to do this so we + // may want to consider leaving this test disabled for CI/CD + IntegrationTestUtils.awaitCondition(isExpired, Duration.ofMinutes(2), Duration.ofSeconds(1)); + } + private byte[] byteArray(int... bytes) { byte[] byteArray = new byte[bytes.length]; for (int i = 0; i < bytes.length; i++) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index 5e3961f5e..84920e01e 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -189,7 +189,7 @@ public void before( sessionClients.initialize(responsiveMetrics, null); table = (CassandraKeyValueTable) client.kvFactory().create( - new DefaultTableSpec(name, partitioner, NO_TTL)); + new DefaultTableSpec(name, partitioner, NO_TTL, config)); changelog = new TopicPartition(name + "-changelog", KAFKA_PARTITION); when(admin.deleteRecords(Mockito.any())).thenReturn(new DeleteRecordsResult(Map.of( diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index c20d3c60b..24dc42935 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -75,6 +76,7 @@ import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.Topology; +import org.junit.Assert; import org.junit.jupiter.api.TestInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -616,6 +618,35 @@ public static void waitTillConsumedPast( throw new TimeoutException("timed out waiting for app to fully consume input"); } + + /** + * Waits for a specific condition to become true within the given timeout period. + * Uses a linear backoff strategy to retry the condition check. + * + * @param condition the condition to check + * @param timeout the maximum time to wait for the condition + * @param backoff the backoff duration between retries + * @throws AssertionError if the timeout is reached and the condition is not met + */ + public static void awaitCondition(BooleanSupplier condition, Duration timeout, Duration backoff) { + Instant startTime = Instant.now(); + Instant deadline = startTime.plus(timeout); + + while (Instant.now().isBefore(deadline)) { + if (condition.getAsBoolean()) { + return; + } + try { + Thread.sleep(backoff.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Await operation was interrupted", e); + } + } + + Assert.fail("Condition was not met within the timeout of " + timeout); + } + private IntegrationTestUtils() { }