Skip to content

Commit

Permalink
remove extra specs and add TtlResolver
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed Oct 24, 2024
1 parent e7a5b8e commit 3e24977
Show file tree
Hide file tree
Showing 25 changed files with 208 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public KeyValueStore<Bytes, byte[]> get() {
if (isTimestamped) {
return new ResponsiveTimestampedKeyValueStore(params);
} else {
return new ResponsiveKeyValueStore(params);
return new ResponsiveKeyValueStore(params, isTimestamped);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static CassandraFactTable create(
final String name = spec.tableName();
LOG.info("Creating fact data table {} in remote store.", name);

final CreateTableWithOptions createTable = spec.applyOptions(createTable(name));
final CreateTableWithOptions createTable = spec.defaultOptions(createTable(name));

// separate metadata from the main table for the fact schema, this is acceptable
// because we don't use the metadata at all for fencing operations and writes to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static CassandraKeyValueTable create(
) throws InterruptedException, TimeoutException {
final String name = spec.tableName();
LOG.info("Creating data table {} in remote store.", name);
client.execute(spec.applyOptions(createTable(name)).build());
client.execute(spec.defaultOptions(createTable(name)).build());

client.awaitTable(name).await(Duration.ofSeconds(60));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public static CassandraWindowedTable create(
// (mainly the unpredictable ordering, as well as unidentifiable
// bounds for the fetchRange queries, etc)
LOG.info("Creating windowed data table {} in remote store.", name);
final CreateTableWithOptions createTable = spec.applyOptions(createTable(name));
final CreateTableWithOptions createTable = spec.defaultOptions(createTable(name));

client.execute(createTable.build());
client.awaitTable(name).await(Duration.ofSeconds(60));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import dev.responsive.kafka.api.stores.ResponsiveWindowParams;
import dev.responsive.kafka.internal.db.partitioning.Segmenter.SegmentPartition;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.spec.BaseTableSpec;
import dev.responsive.kafka.internal.db.spec.GlobalTableSpec;
import dev.responsive.kafka.internal.db.spec.DefaultTableSpec;
import dev.responsive.kafka.internal.db.spec.RemoteTableSpec;
import dev.responsive.kafka.internal.db.spec.TimeWindowedCompactionTableSpec;
import dev.responsive.kafka.internal.db.spec.TtlTableSpec;
import dev.responsive.kafka.internal.stores.SchemaTypes;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.WindowedKey;
import java.util.Optional;
import org.apache.kafka.common.utils.Bytes;

/**
Expand All @@ -40,35 +38,23 @@
*/
public class RemoteTableSpecFactory {

public static RemoteTableSpec globalSpec(
final ResponsiveKeyValueParams params,
final TablePartitioner<Bytes, Integer> partitioner
) {
return new GlobalTableSpec(new BaseTableSpec(params.name().tableName(), partitioner));
}

public static RemoteTableSpec fromKVParams(
final ResponsiveKeyValueParams params,
final TablePartitioner<Bytes, Integer> partitioner
final TablePartitioner<Bytes, Integer> partitioner,
final Optional<TtlResolver<?, ?>> ttlResolver
) {
RemoteTableSpec spec = new BaseTableSpec(params.name().tableName(), partitioner);

if (params.ttlProvider().isPresent()) {
spec = new TtlTableSpec(spec, params.defaultTimeToLive().get().duration());
}

if (params.schemaType() == SchemaTypes.KVSchema.FACT) {
spec = new TimeWindowedCompactionTableSpec(spec);
}

return spec;
return new DefaultTableSpec(
params.name().tableName(),
partitioner,
ttlResolver
);
}

public static RemoteTableSpec fromWindowParams(
final ResponsiveWindowParams params,
final TablePartitioner<WindowedKey, SegmentPartition> partitioner
) {
return new BaseTableSpec(params.name().tableName(), partitioner);
return new DefaultTableSpec(params.name().tableName(), partitioner, Optional.empty());
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import com.mongodb.client.result.UpdateResult;
import dev.responsive.kafka.internal.db.MongoKVFlushManager;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.stores.TtlResolver;
import dev.responsive.kafka.internal.utils.Iterators;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -78,7 +80,7 @@ public MongoKVTable(
final MongoClient client,
final String name,
final CollectionCreationOptions collectionCreationOptions,
final Duration ttl
final Optional<TtlResolver<?, ?>> ttlResolver
) {
this.name = name;
this.keyCodec = new StringKeyCodec();
Expand Down Expand Up @@ -109,8 +111,11 @@ public MongoKVTable(
Indexes.descending(KVDoc.TOMBSTONE_TS),
new IndexOptions().expireAfter(12L, TimeUnit.HOURS)
);
if (ttl != null) {
final Duration expireAfter = ttl.plus(Duration.ofHours(12));

if (ttlResolver.isPresent()) {
// TODO(sophie): account for infinite default ttl
final Duration expireAfter =
ttlResolver.get().defaultTtl().duration().plus(Duration.ofHours(12));
final long expireAfterSeconds = expireAfter.getSeconds();
docs.createIndex(
Indexes.descending(KVDoc.TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import dev.responsive.kafka.internal.db.partitioning.SessionSegmentPartitioner;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
import dev.responsive.kafka.internal.db.spec.BaseTableSpec;
import dev.responsive.kafka.internal.db.spec.TtlTableSpec;
import dev.responsive.kafka.internal.db.spec.DefaultTableSpec;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

public class ResponsiveMongoClient {
Expand All @@ -49,7 +50,7 @@ public ResponsiveMongoClient(
client,
spec.tableName(),
collectionCreationOptions,
spec instanceof TtlTableSpec ? ((TtlTableSpec) spec).ttl() : null
spec.ttlResolver()
));
windowTableCache = new WindowedTableCache<>(
(spec, partitioner) -> new MongoWindowedTable(
Expand All @@ -70,23 +71,29 @@ public ResponsiveMongoClient(
);
}

public RemoteKVTable<WriteModel<KVDoc>> kvTable(final String name)
throws InterruptedException, TimeoutException {
return kvTableCache.create(new BaseTableSpec(name, TablePartitioner.defaultPartitioner()));
public RemoteKVTable<WriteModel<KVDoc>> kvTable(
final String name,
final Optional<TtlResolver<?, ?>> ttlResolver
) throws InterruptedException, TimeoutException {
return kvTableCache.create(
new DefaultTableSpec(name, TablePartitioner.defaultPartitioner(), ttlResolver)
);
}

public RemoteWindowedTable<WriteModel<WindowDoc>> windowedTable(
final String name,
final WindowSegmentPartitioner partitioner
) throws InterruptedException, TimeoutException {
return windowTableCache.create(new BaseTableSpec(name, partitioner), partitioner);
return windowTableCache.create(
new DefaultTableSpec(name, partitioner, Optional.empty()), partitioner);
}

public RemoteSessionTable<WriteModel<SessionDoc>> sessionTable(
final String name,
final SessionSegmentPartitioner partitioner
) throws InterruptedException, TimeoutException {
return sessionTableCache.create(new BaseTableSpec(name, partitioner), partitioner);
return sessionTableCache.create(
new DefaultTableSpec(name, partitioner, Optional.empty()), partitioner);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,29 @@
package dev.responsive.kafka.internal.db.spec;

import com.datastax.oss.driver.api.querybuilder.schema.CreateTableWithOptions;
import dev.responsive.kafka.internal.db.TableOperations;
import com.datastax.oss.driver.api.querybuilder.schema.compaction.CompactionStrategy;
import com.datastax.oss.driver.internal.querybuilder.schema.compaction.DefaultLeveledCompactionStrategy;
import dev.responsive.kafka.internal.db.partitioning.TablePartitioner;
import java.util.EnumSet;
import dev.responsive.kafka.internal.stores.TtlResolver;
import java.util.Optional;

public class BaseTableSpec implements RemoteTableSpec {
public class DefaultTableSpec implements RemoteTableSpec {

private static final CompactionStrategy<?> DEFAULT_CASSANDRA_COMPACTION_STRATEGY =
new DefaultLeveledCompactionStrategy();

private final String name;
final TablePartitioner<?, ?> partitioner;
private final TablePartitioner<?, ?> partitioner;
private final Optional<TtlResolver<?, ?>> ttlResolver;

public BaseTableSpec(final String name, final TablePartitioner<?, ?> partitioner) {
public DefaultTableSpec(
final String name,
final TablePartitioner<?, ?> partitioner,
final Optional<TtlResolver<?, ?>> ttlResolver
) {
this.name = name;
this.partitioner = partitioner;
this.ttlResolver = ttlResolver;
}

@Override
Expand All @@ -42,13 +53,12 @@ public String tableName() {
}

@Override
public EnumSet<TableOperations> restrictedOperations() {
return EnumSet.noneOf(TableOperations.class);
public Optional<TtlResolver<?, ?>> ttlResolver() {
return ttlResolver;
}

@Override
public CreateTableWithOptions applyOptions(final CreateTableWithOptions base) {
// does nothing
return base;
public CreateTableWithOptions defaultOptions(final CreateTableWithOptions base) {
return base.withCompaction(DEFAULT_CASSANDRA_COMPACTION_STRATEGY);
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 3e24977

Please sign in to comment.