Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support configuring tombstone retention in Mongo #409

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address feedback
agavra committed Jan 13, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 9b6c7e4d841da1d73f0d0c54dc4f61f539abc61e
Original file line number Diff line number Diff line change
@@ -111,14 +111,14 @@ public class ResponsiveConfig extends AbstractConfig {
+ "with for best results. However it is important to note that this cannot be changed for "
+ "an active application. Messing with this can corrupt existing state!";

public static final String MONGO_TOMBSTONE_RETENTION_MS_CONFIG = "responsive.mongo.tombstone.retention.ms";
private static final long MONGO_TOMBSTONE_RETENTION_MS_DEFAULT = Duration.ofHours(12).toMillis();
private static final String MONGO_TOMBSTONE_RETENTION_MS_DOC =
public static final String MONGO_TOMBSTONE_RETENTION_SEC_CONFIG = "responsive.mongo.tombstone.retention.seconds";
private static final long MONGO_TOMBSTONE_RETENTION_SEC_DEFAULT = Duration.ofHours(12).toSeconds();
private static final String MONGO_TOMBSTONE_RETENTION_SEC_DOC =
"The duration, in milliseconds, to retain deleted records in MongoDB after they are "
+ "deleted or their TTL expires. This retention period ensures that zombie instances of "
+ "Kafka Streams are fenced from overwriting deleted or expired records. Configure this "
+ "based on your application's tolerance for potential write errors by zombie instances. "
+ "Default: 12 hours (43,200,000 ms).";
+ "Default: 12 hours (43,200s).";

// ------------------ RS3 specific configurations ----------------------

@@ -546,11 +546,11 @@ public class ResponsiveConfig extends AbstractConfig {
Importance.LOW,
MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_DOC
).define(
MONGO_TOMBSTONE_RETENTION_MS_CONFIG,
MONGO_TOMBSTONE_RETENTION_SEC_CONFIG,
Type.LONG,
MONGO_TOMBSTONE_RETENTION_MS_DEFAULT,
MONGO_TOMBSTONE_RETENTION_SEC_DEFAULT,
Importance.LOW,
MONGO_TOMBSTONE_RETENTION_MS_DOC
MONGO_TOMBSTONE_RETENTION_SEC_DOC
).define(
WINDOW_BLOOM_FILTER_COUNT_CONFIG,
Type.INT,
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Optional;
@@ -98,11 +99,11 @@ public MongoKVTable(
metadata = database.getCollection(METADATA_COLLECTION_NAME, KVMetadataDoc.class);

// this is idempotent
final long tombstoneRetentionMs =
config.getLong(ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_MS_CONFIG);
final long tombstoneRetentionSeconds =
config.getLong(ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_SEC_CONFIG);
docs.createIndex(
Indexes.descending(KVDoc.TOMBSTONE_TS),
new IndexOptions().expireAfter(tombstoneRetentionMs, TimeUnit.MILLISECONDS)
new IndexOptions().expireAfter(tombstoneRetentionSeconds, TimeUnit.SECONDS)
);

if (ttlResolver.isPresent()) {
@@ -112,7 +113,8 @@ public MongoKVTable(
}

this.defaultTtlMs = ttlResolver.get().defaultTtl().toMillis();
final long expireAfterMs = defaultTtlMs + tombstoneRetentionMs;
final long expireAfterMs =
defaultTtlMs + Duration.ofSeconds(tombstoneRetentionSeconds).toMillis();

docs.createIndex(
Indexes.descending(KVDoc.TIMESTAMP),
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
package dev.responsive.kafka.internal.db.mongo;

import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_MS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_SEC_CONFIG;
import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.defaultOnlyTtl;
import static dev.responsive.kafka.testutils.Matchers.sameKeyValue;
@@ -404,7 +404,7 @@ public void shouldFilterExpiredFromFullScans() {
@Test
@Disabled("fix this when we fix https://github.com/slatedb/slatedb/issues/442")
public void shouldExpireRecords() {
props.put(MONGO_TOMBSTONE_RETENTION_MS_CONFIG, 1000);
props.put(MONGO_TOMBSTONE_RETENTION_SEC_CONFIG, 1);
final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(props);
final Duration ttl = Duration.ofMillis(100);
final MongoKVTable table = new MongoKVTable(
@@ -419,12 +419,14 @@ client, name, UNSHARDED, defaultOnlyTtl(ttl), config
// When:
final BooleanSupplier isExpired = () -> {
final byte[] bytes = table.get(0, bytes(10, 11, 12, 13), 100);
System.out.println(bytes == null);
return bytes == null;
};

// Then:
IntegrationTestUtils.awaitCondition(isExpired, Duration.ofSeconds(30), Duration.ofSeconds(1));
// TODO(agavra): we need to wait up to at least one minute to make sure that the ttl
// background job kicks in. Unfortunately I don't see a better way to do this so we
// may want to consider leaving this test disabled for CI/CD
IntegrationTestUtils.awaitCondition(isExpired, Duration.ofMinutes(2), Duration.ofSeconds(1));
}

private byte[] byteArray(int... bytes) {
Loading