Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Jan 13, 2025
1 parent 87af267 commit 9b6c7e4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ public class ResponsiveConfig extends AbstractConfig {
+ "with for best results. However it is important to note that this cannot be changed for "
+ "an active application. Messing with this can corrupt existing state!";

public static final String MONGO_TOMBSTONE_RETENTION_MS_CONFIG = "responsive.mongo.tombstone.retention.ms";
private static final long MONGO_TOMBSTONE_RETENTION_MS_DEFAULT = Duration.ofHours(12).toMillis();
private static final String MONGO_TOMBSTONE_RETENTION_MS_DOC =
public static final String MONGO_TOMBSTONE_RETENTION_SEC_CONFIG = "responsive.mongo.tombstone.retention.seconds";
private static final long MONGO_TOMBSTONE_RETENTION_SEC_DEFAULT = Duration.ofHours(12).toSeconds();
private static final String MONGO_TOMBSTONE_RETENTION_SEC_DOC =
"The duration, in milliseconds, to retain deleted records in MongoDB after they are "
+ "deleted or their TTL expires. This retention period ensures that zombie instances of "
+ "Kafka Streams are fenced from overwriting deleted or expired records. Configure this "
+ "based on your application's tolerance for potential write errors by zombie instances. "
+ "Default: 12 hours (43,200,000 ms).";
+ "Default: 12 hours (43,200s).";

// ------------------ RS3 specific configurations ----------------------

Expand Down Expand Up @@ -546,11 +546,11 @@ public class ResponsiveConfig extends AbstractConfig {
Importance.LOW,
MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_DOC
).define(
MONGO_TOMBSTONE_RETENTION_MS_CONFIG,
MONGO_TOMBSTONE_RETENTION_SEC_CONFIG,
Type.LONG,
MONGO_TOMBSTONE_RETENTION_MS_DEFAULT,
MONGO_TOMBSTONE_RETENTION_SEC_DEFAULT,
Importance.LOW,
MONGO_TOMBSTONE_RETENTION_MS_DOC
MONGO_TOMBSTONE_RETENTION_SEC_DOC
).define(
WINDOW_BLOOM_FILTER_COUNT_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Optional;
Expand Down Expand Up @@ -98,11 +99,11 @@ public MongoKVTable(
metadata = database.getCollection(METADATA_COLLECTION_NAME, KVMetadataDoc.class);

// this is idempotent
final long tombstoneRetentionMs =
config.getLong(ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_MS_CONFIG);
final long tombstoneRetentionSeconds =
config.getLong(ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_SEC_CONFIG);
docs.createIndex(
Indexes.descending(KVDoc.TOMBSTONE_TS),
new IndexOptions().expireAfter(tombstoneRetentionMs, TimeUnit.MILLISECONDS)
new IndexOptions().expireAfter(tombstoneRetentionSeconds, TimeUnit.SECONDS)
);

if (ttlResolver.isPresent()) {
Expand All @@ -112,7 +113,8 @@ public MongoKVTable(
}

this.defaultTtlMs = ttlResolver.get().defaultTtl().toMillis();
final long expireAfterMs = defaultTtlMs + tombstoneRetentionMs;
final long expireAfterMs =
defaultTtlMs + Duration.ofSeconds(tombstoneRetentionSeconds).toMillis();

docs.createIndex(
Indexes.descending(KVDoc.TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package dev.responsive.kafka.internal.db.mongo;

import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_MS_CONFIG;
import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_TOMBSTONE_RETENTION_SEC_CONFIG;
import static dev.responsive.kafka.internal.stores.TtlResolver.NO_TTL;
import static dev.responsive.kafka.testutils.IntegrationTestUtils.defaultOnlyTtl;
import static dev.responsive.kafka.testutils.Matchers.sameKeyValue;
Expand Down Expand Up @@ -404,7 +404,7 @@ public void shouldFilterExpiredFromFullScans() {
@Test
@Disabled("fix this when we fix https://github.com/slatedb/slatedb/issues/442")
public void shouldExpireRecords() {
props.put(MONGO_TOMBSTONE_RETENTION_MS_CONFIG, 1000);
props.put(MONGO_TOMBSTONE_RETENTION_SEC_CONFIG, 1);
final ResponsiveConfig config = ResponsiveConfig.responsiveConfig(props);
final Duration ttl = Duration.ofMillis(100);
final MongoKVTable table = new MongoKVTable(
Expand All @@ -419,12 +419,14 @@ client, name, UNSHARDED, defaultOnlyTtl(ttl), config
// When:
final BooleanSupplier isExpired = () -> {
final byte[] bytes = table.get(0, bytes(10, 11, 12, 13), 100);
System.out.println(bytes == null);
return bytes == null;
};

// Then:
IntegrationTestUtils.awaitCondition(isExpired, Duration.ofSeconds(30), Duration.ofSeconds(1));
// TODO(agavra): we need to wait up to at least one minute to make sure that the ttl
// background job kicks in. Unfortunately I don't see a better way to do this so we
// may want to consider leaving this test disabled for CI/CD
IntegrationTestUtils.awaitCondition(isExpired, Duration.ofMinutes(2), Duration.ofSeconds(1));
}

private byte[] byteArray(int... bytes) {
Expand Down

0 comments on commit 9b6c7e4

Please sign in to comment.