Skip to content

Commit

Permalink
support configuring tombstone retention in Mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Jan 13, 2025
1 parent 4c5c0e9 commit 87af267
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ public class ResponsiveConfig extends AbstractConfig {
+ "with for best results. However it is important to note that this cannot be changed for "
+ "an active application. Messing with this can corrupt existing state!";

public static final String MONGO_TOMBSTONE_RETENTION_MS_CONFIG = "responsive.mongo.tombstone.retention.ms";
private static final long MONGO_TOMBSTONE_RETENTION_MS_DEFAULT = Duration.ofHours(12).toMillis();
private static final String MONGO_TOMBSTONE_RETENTION_MS_DOC =
"The duration, in milliseconds, to retain deleted records in MongoDB after they are "
+ "deleted or their TTL expires. This retention period ensures that zombie instances of "
+ "Kafka Streams are fenced from overwriting deleted or expired records. Configure this "
+ "based on your application's tolerance for potential write errors by zombie instances. "
+ "Default: 12 hours (43,200,000 ms).";

// ------------------ RS3 specific configurations ----------------------

public static final String RS3_HOSTNAME_CONFIG = "responsive.rs3.hostname";
Expand Down Expand Up @@ -536,6 +545,12 @@ public class ResponsiveConfig extends AbstractConfig {
"",
Importance.LOW,
MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_DOC
).define(
MONGO_TOMBSTONE_RETENTION_MS_CONFIG,
Type.LONG,
MONGO_TOMBSTONE_RETENTION_MS_DEFAULT,
Importance.LOW,
MONGO_TOMBSTONE_RETENTION_MS_DOC
).define(
WINDOW_BLOOM_FILTER_COUNT_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

package dev.responsive.kafka.internal.db;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.stores.ResponsiveWindowParams;
import dev.responsive.kafka.internal.db.partitioning.Segmenter.SegmentPartition;
Expand All @@ -37,20 +38,23 @@ public class RemoteTableSpecFactory {
public static RemoteTableSpec fromKVParams(
final ResponsiveKeyValueParams params,
final TablePartitioner<Bytes, Integer> partitioner,
final Optional<TtlResolver<?, ?>> ttlResolver
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveConfig config
) {
return new DefaultTableSpec(
params.name().tableName(),
partitioner,
ttlResolver
ttlResolver,
config
);
}

public static RemoteTableSpec fromWindowParams(
final ResponsiveWindowParams params,
final TablePartitioner<WindowedKey, SegmentPartition> partitioner
final TablePartitioner<WindowedKey, SegmentPartition> partitioner,
final ResponsiveConfig config
) {
return new DefaultTableSpec(params.name().tableName(), partitioner, Optional.empty());
return new DefaultTableSpec(params.name().tableName(), partitioner, Optional.empty(), config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.UpdateResult;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.MongoKVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Optional;
Expand Down Expand Up @@ -70,7 +70,8 @@ public MongoKVTable(
final MongoClient client,
final String name,
final CollectionCreationOptions collectionCreationOptions,
final Optional<TtlResolver<?, ?>> ttlResolver
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveConfig config
) {
this.name = name;
this.keyCodec = new StringKeyCodec();
Expand All @@ -96,11 +97,12 @@ public MongoKVTable(
}
metadata = database.getCollection(METADATA_COLLECTION_NAME, KVMetadataDoc.class);

// TODO(agavra): make the tombstone retention configurable
// this is idempotent
final long tombstoneRetentionMs =
config.getLong(ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_MS_CONFIG);
docs.createIndex(
Indexes.descending(KVDoc.TOMBSTONE_TS),
new IndexOptions().expireAfter(12L, TimeUnit.HOURS)
new IndexOptions().expireAfter(tombstoneRetentionMs, TimeUnit.MILLISECONDS)
);

if (ttlResolver.isPresent()) {
Expand All @@ -110,7 +112,7 @@ public MongoKVTable(
}

this.defaultTtlMs = ttlResolver.get().defaultTtl().toMillis();
final long expireAfterMs = defaultTtlMs + Duration.ofHours(12).toMillis();
final long expireAfterMs = defaultTtlMs + tombstoneRetentionMs;

docs.createIndex(
Indexes.descending(KVDoc.TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.mongodb.client.MongoClient;
import com.mongodb.client.model.WriteModel;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.RemoteSessionTable;
import dev.responsive.kafka.internal.db.RemoteWindowTable;
Expand Down Expand Up @@ -46,7 +47,8 @@ public ResponsiveMongoClient(
client,
spec.tableName(),
collectionCreationOptions,
spec.ttlResolver()
spec.ttlResolver(),
spec.config()
));
windowTableCache = new WindowedTableCache<>(
(spec, partitioner) -> new MongoWindowTable(
Expand All @@ -69,27 +71,30 @@ public ResponsiveMongoClient(

public RemoteKVTable<WriteModel<KVDoc>> kvTable(
final String name,
final Optional<TtlResolver<?, ?>> ttlResolver
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
return kvTableCache.create(
new DefaultTableSpec(name, TablePartitioner.defaultPartitioner(), ttlResolver)
new DefaultTableSpec(name, TablePartitioner.defaultPartitioner(), ttlResolver, config)
);
}

public RemoteWindowTable<WriteModel<WindowDoc>> windowedTable(
final String name,
final WindowSegmentPartitioner partitioner
final WindowSegmentPartitioner partitioner,
final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
return windowTableCache.create(
new DefaultTableSpec(name, partitioner, Optional.empty()), partitioner);
new DefaultTableSpec(name, partitioner, Optional.empty(), config), partitioner);
}

public RemoteSessionTable<WriteModel<SessionDoc>> sessionTable(
final String name,
final SessionSegmentPartitioner partitioner
final SessionSegmentPartitioner partitioner,
final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
return sessionTableCache.create(
new DefaultTableSpec(name, partitioner, Optional.empty()), partitioner);
new DefaultTableSpec(name, partitioner, Optional.empty(), config), partitioner);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

package dev.responsive.kafka.internal.db.rs3;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.rs3.client.WalEntry;
import dev.responsive.kafka.internal.db.rs3.client.grpc.GrpcRS3Client;
Expand All @@ -30,7 +31,7 @@ public RS3TableFactory(
this.rs3Port = rs3Port;
}

public RemoteKVTable<WalEntry> kvTable(final String name) {
public RemoteKVTable<WalEntry> kvTable(final String name, final ResponsiveConfig config) {
final UUID storeId = new UUID(0, 0);
final PssPartitioner pssPartitioner = new PssDirectPartitioner();
final var rs3Client = GrpcRS3Client.connect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package dev.responsive.kafka.internal.db.spec;

import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.util.Optional;
Expand All @@ -22,15 +23,18 @@ public class DefaultTableSpec implements RemoteTableSpec {
private final String name;
private final TablePartitioner<?, ?> partitioner;
private final Optional<TtlResolver<?, ?>> ttlResolver;
private final ResponsiveConfig config;

public DefaultTableSpec(
final String name,
final TablePartitioner<?, ?> partitioner,
final Optional<TtlResolver<?, ?>> ttlResolver
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveConfig config
) {
this.name = name;
this.partitioner = partitioner;
this.ttlResolver = ttlResolver;
this.config = config;
}

@Override
Expand All @@ -52,4 +56,9 @@ public String tableName() {
public CreateTableWithOptions applyDefaultOptions(final CreateTableWithOptions base) {
return base;
}

@Override
public ResponsiveConfig config() {
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package dev.responsive.kafka.internal.db.spec;

import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.RemoteTable;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.stores.TtlResolver;
Expand All @@ -32,4 +33,6 @@ public interface RemoteTableSpec {

CreateTableWithOptions applyDefaultOptions(final CreateTableWithOptions base);

ResponsiveConfig config();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadSessionClients;
import static dev.responsive.kafka.internal.db.partitioning.TablePartitioner.defaultPartitioner;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.CassandraFactTable;
Expand Down Expand Up @@ -65,7 +66,8 @@ public static GlobalOperations create(
final var spec = RemoteTableSpecFactory.fromKVParams(
params,
defaultPartitioner(),
ttlResolver
ttlResolver,
ResponsiveConfig.responsiveConfig(appConfigs)
);

final var table = client.globalFactory().create(spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ public static PartitionedOperations create(
table = createCassandra(params, config, sessionClients, changelog.topic(), ttlResolver);
break;
case MONGO_DB:
table = createMongo(params, sessionClients, ttlResolver);
table = createMongo(params, sessionClients, ttlResolver, config);
break;
case IN_MEMORY:
table = createInMemory(params, ttlResolver);
table = createInMemory(params, ttlResolver, config);
break;
case RS3:
table = createRS3(params, sessionClients);
table = createRS3(params, sessionClients, config);
break;
default:
throw new IllegalStateException("Unexpected value: " + sessionClients.storageBackend());
Expand Down Expand Up @@ -187,7 +187,8 @@ public static PartitionedOperations create(

private static RemoteKVTable<?> createInMemory(
final ResponsiveKeyValueParams params,
final Optional<TtlResolver<?, ?>> ttlResolver
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveConfig config
) {
if (ttlResolver.isPresent() && !ttlResolver.get().hasDefaultOnly()) {
throw new UnsupportedOperationException("Row-level ttl is not yet supported "
Expand Down Expand Up @@ -221,7 +222,7 @@ private static RemoteKVTable<?> createCassandra(
changelogTopicName
);
final var client = sessionClients.cassandraClient();
final var spec = RemoteTableSpecFactory.fromKVParams(params, partitioner, ttlResolver);
final var spec = RemoteTableSpecFactory.fromKVParams(params, partitioner, ttlResolver, config);
switch (params.schemaType()) {
case KEY_VALUE:
return client.kvFactory().create(spec);
Expand All @@ -235,16 +236,18 @@ private static RemoteKVTable<?> createCassandra(
private static RemoteKVTable<?> createMongo(
final ResponsiveKeyValueParams params,
final SessionClients sessionClients,
final Optional<TtlResolver<?, ?>> ttlResolver
final Optional<TtlResolver<?, ?>> ttlResolver,
final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
return sessionClients.mongoClient().kvTable(params.name().tableName(), ttlResolver);
return sessionClients.mongoClient().kvTable(params.name().tableName(), ttlResolver, config);
}

private static RemoteKVTable<?> createRS3(
final ResponsiveKeyValueParams params,
final SessionClients sessionClients
final SessionClients sessionClients,
final ResponsiveConfig config
) {
return sessionClients.rs3TableFactory().kvTable(params.name().tableName());
return sessionClients.rs3TableFactory().kvTable(params.name().tableName(), config);
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ public static SegmentedOperations create(
final RemoteWindowTable<?> table;
switch (sessionClients.storageBackend()) {
case CASSANDRA:
table = createCassandra(params, sessionClients, partitioner);
table = createCassandra(params, sessionClients, partitioner, responsiveConfig);
break;
case MONGO_DB:
table = createMongo(params, sessionClients, partitioner);
table = createMongo(params, sessionClients, partitioner, responsiveConfig);
break;
default:
throw new IllegalStateException("Unexpected value: " + sessionClients.storageBackend());
Expand Down Expand Up @@ -168,11 +168,12 @@ public static SegmentedOperations create(
private static RemoteWindowTable<?> createCassandra(
final ResponsiveWindowParams params,
final SessionClients clients,
final WindowSegmentPartitioner partitioner
final WindowSegmentPartitioner partitioner,
final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
final CassandraClient client = clients.cassandraClient();

final var spec = RemoteTableSpecFactory.fromWindowParams(params, partitioner);
final var spec = RemoteTableSpecFactory.fromWindowParams(params, partitioner, config);

switch (params.schemaType()) {
case WINDOW:
Expand All @@ -187,13 +188,14 @@ private static RemoteWindowTable<?> createCassandra(
private static RemoteWindowTable<?> createMongo(
final ResponsiveWindowParams params,
final SessionClients clients,
final WindowSegmentPartitioner partitioner
final WindowSegmentPartitioner partitioner,
final ResponsiveConfig config
) throws InterruptedException, TimeoutException {
final ResponsiveMongoClient client = clients.mongoClient();

switch (params.schemaType()) {
case WINDOW:
return client.windowedTable(params.name().tableName(), partitioner);
return client.windowedTable(params.name().tableName(), partitioner, config);
case STREAM:
throw new UnsupportedOperationException("Not yet implemented");
default:
Expand Down
Loading

0 comments on commit 87af267

Please sign in to comment.