Skip to content

Commit

Permalink
API: fill in missing Mongo Window range queries (#394)
Browse files Browse the repository at this point in the history
Fills in some of the missing range queries we had left out of the Mongo implementation due to performance concerns. Also adds a compound index on the windowed key which should alleviate the concerns for all but one of the range query types. For this last type, we log a warning and include a suggestion for flipping a config to trade off between different range query's performance
  • Loading branch information
ableegoldman authored Nov 19, 2024
1 parent 7401bb4 commit 145dc86
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class CassandraClient {
private final ResponsiveConfig config;
private final TableCache<RemoteKVTable<BoundStatement>> kvFactory;
private final TableCache<RemoteKVTable<BoundStatement>> factFactory;
private final WindowedTableCache<RemoteWindowedTable<BoundStatement>> windowedFactory;
private final WindowedTableCache<RemoteWindowTable<BoundStatement>> windowedFactory;
private final TableCache<CassandraFactTable> globalFactory;

/**
Expand All @@ -63,9 +63,9 @@ public CassandraClient(final CqlSession session, final ResponsiveConfig config)
this.kvFactory = new TableCache<>(spec -> CassandraKeyValueTable.create(spec, this));
this.factFactory = new TableCache<>(spec -> CassandraFactTable.create(spec, this));
this.windowedFactory =
new WindowedTableCache<>((spec, partitioner) -> CassandraWindowedTable.create(spec,
this,
partitioner
new WindowedTableCache<>((spec, partitioner) -> CassandraWindowTable.create(spec,
this,
partitioner
));
this.globalFactory = new TableCache<>(spec -> CassandraFactTable.create(spec, this));
}
Expand Down Expand Up @@ -186,7 +186,7 @@ public TableCache<RemoteKVTable<BoundStatement>> factFactory() {
return factFactory;
}

public WindowedTableCache<RemoteWindowedTable<BoundStatement>> windowedFactory() {
public WindowedTableCache<RemoteWindowTable<BoundStatement>> windowedFactory() {
return windowedFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public class CassandraWindowFlushManager extends WindowFlushManager {
private final String logPrefix;
private final Logger log;

private final CassandraWindowedTable table;
private final CassandraWindowTable table;
private final CassandraClient client;

private final TablePartitioner<WindowedKey, SegmentPartition> partitioner;
private final int kafkaPartition;
private final long epoch;

public CassandraWindowFlushManager(
final CassandraWindowedTable table,
final CassandraWindowTable table,
final CassandraClient client,
final WindowSegmentPartitioner partitioner,
final int kafkaPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraWindowedTable implements RemoteWindowedTable<BoundStatement> {
public class CassandraWindowTable implements RemoteWindowTable<BoundStatement> {

private static final Logger LOG = LoggerFactory.getLogger(CassandraWindowedTable.class);
private static final Logger LOG = LoggerFactory.getLogger(CassandraWindowTable.class);

private static final String KEY_FROM_BIND = "kf";
private static final String KEY_TO_BIND = "kt";
Expand Down Expand Up @@ -91,7 +91,7 @@ public class CassandraWindowedTable implements RemoteWindowedTable<BoundStatemen
private final PreparedStatement reserveEpoch;
private final PreparedStatement ensureEpoch;

public static CassandraWindowedTable create(
public static CassandraWindowTable create(
final RemoteTableSpec spec,
final CassandraClient client,
final WindowSegmentPartitioner partitioner
Expand Down Expand Up @@ -383,7 +383,7 @@ public static CassandraWindowedTable create(
QueryOp.WRITE
);

return new CassandraWindowedTable(
return new CassandraWindowTable(
name,
client,
partitioner,
Expand Down Expand Up @@ -423,7 +423,7 @@ private static CreateTableWithOptions createTable(final String tableName) {
.withColumn(STREAM_TIME.column(), DataTypes.BIGINT);
}

public CassandraWindowedTable(
public CassandraWindowTable(
final String name,
final CassandraClient client,
final WindowSegmentPartitioner partitioner,
Expand Down Expand Up @@ -847,7 +847,7 @@ public KeyValueIterator<WindowedKey, byte[]> fetch(
.setInstant(WINDOW_TO_BIND, Instant.ofEpochMilli(timeTo));

final ResultSet result = client.execute(get);
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows));
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows));
}

return Iterators.wrapped(segmentIterators);
Expand All @@ -872,7 +872,7 @@ public KeyValueIterator<WindowedKey, byte[]> backFetch(
.setInstant(WINDOW_TO_BIND, Instant.ofEpochMilli(timeTo));

final ResultSet result = client.execute(get);
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows));
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows));
}

return Iterators.wrapped(segmentIterators);
Expand All @@ -897,7 +897,7 @@ public KeyValueIterator<WindowedKey, byte[]> fetchRange(
.setByteBuffer(KEY_TO_BIND, ByteBuffer.wrap(toKey.get()));

final ResultSet result = client.execute(get);
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows));
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows));
}

return Iterators.filterKv(
Expand Down Expand Up @@ -925,7 +925,7 @@ public KeyValueIterator<WindowedKey, byte[]> backFetchRange(
.setByteBuffer(KEY_TO_BIND, ByteBuffer.wrap(toKey.get()));

final ResultSet result = client.execute(get);
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows));
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows));
}

return Iterators.filterKv(
Expand All @@ -951,7 +951,7 @@ public KeyValueIterator<WindowedKey, byte[]> fetchAll(
.setInstant(KEY_TO_BIND, Instant.ofEpochMilli(timeTo));

final ResultSet result = client.execute(get);
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows));
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows));
}

return Iterators.filterKv(
Expand All @@ -977,7 +977,7 @@ public KeyValueIterator<WindowedKey, byte[]> backFetchAll(
.setInstant(KEY_TO_BIND, Instant.ofEpochMilli(timeTo));

final ResultSet result = client.execute(get);
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows));
segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows));
}

return Iterators.filterKv(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.mongodb.MongoException;
import com.mongodb.bulk.WriteConcernError;
import com.mongodb.client.MongoCollection;
import dev.responsive.kafka.internal.db.mongo.MongoWindowedTable;
import dev.responsive.kafka.internal.db.mongo.MongoWindowTable;
import dev.responsive.kafka.internal.db.mongo.MongoWriter;
import dev.responsive.kafka.internal.db.mongo.WindowDoc;
import dev.responsive.kafka.internal.db.partitioning.Segmenter.SegmentPartition;
Expand All @@ -33,14 +33,14 @@ public class MongoWindowFlushManager extends WindowFlushManager {
private final String logPrefix;
private final Logger log;

private final MongoWindowedTable table;
private final MongoWindowTable table;
private final Function<SegmentPartition, MongoCollection<WindowDoc>> windowsForSegment;

private final WindowSegmentPartitioner partitioner;
private final int kafkaPartition;

public MongoWindowFlushManager(
final MongoWindowedTable table,
final MongoWindowTable table,
final Function<SegmentPartition, MongoCollection<WindowDoc>> windowsForSegment,
final WindowSegmentPartitioner partitioner,
final int kafkaPartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;

public interface RemoteWindowedTable<S> extends RemoteTable<WindowedKey, S> {
public interface RemoteWindowTable<S> extends RemoteTable<WindowedKey, S> {

/**
* Initializes the table by setting the metadata fields to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package dev.responsive.kafka.internal.db.mongo;

import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
import static dev.responsive.kafka.internal.db.mongo.WindowDoc.ID_SUBFIELD_KEY;
import static dev.responsive.kafka.internal.db.mongo.WindowDoc.ID_SUBFIELD_WINDOW_START;
import static dev.responsive.kafka.internal.db.partitioning.Segmenter.UNINITIALIZED_STREAM_TIME;
import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET;
import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
Expand All @@ -26,14 +28,16 @@
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.UpdateResult;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.db.MongoWindowFlushManager;
import dev.responsive.kafka.internal.db.RemoteWindowedTable;
import dev.responsive.kafka.internal.db.RemoteWindowTable;
import dev.responsive.kafka.internal.db.partitioning.Segmenter;
import dev.responsive.kafka.internal.db.partitioning.Segmenter.SegmentPartition;
import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner;
Expand All @@ -58,9 +62,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoWindowedTable implements RemoteWindowedTable<WriteModel<WindowDoc>> {
public class MongoWindowTable implements RemoteWindowTable<WriteModel<WindowDoc>> {

private static final Logger LOG = LoggerFactory.getLogger(MongoWindowedTable.class);
private static final Logger LOG = LoggerFactory.getLogger(MongoWindowTable.class);
private static final String METADATA_COLLECTION_NAME = "window_metadata";

private static final UpdateOptions UPSERT_OPTIONS = new UpdateOptions().upsert(true);
Expand All @@ -87,6 +91,7 @@ private static class PartitionSegments {
private final Segmenter segmenter;
private final long epoch;
private final CollectionCreationOptions collectionCreationOptions;
private final boolean timestampFirstOrder;

// Recommended to keep the total number of collections under 10,000, so we should not
// let num_segments * num_kafka_partitions exceed 10k at the most
Expand All @@ -99,13 +104,15 @@ public PartitionSegments(
final int kafkaPartition,
final long streamTime,
final long epoch,
final CollectionCreationOptions collectionCreationOptions
final CollectionCreationOptions collectionCreationOptions,
final boolean timestampFirstOrder
) {
this.database = database;
this.adminDatabase = adminDatabase;
this.segmenter = segmenter;
this.epoch = epoch;
this.collectionCreationOptions = collectionCreationOptions;
this.timestampFirstOrder = timestampFirstOrder;
this.segmentWindows = new ConcurrentHashMap<>();

final List<SegmentPartition> activeSegments =
Expand Down Expand Up @@ -165,6 +172,17 @@ private void createSegment(final SegmentPartition segmentToCreate) {
WindowDoc.class
);
}
if (timestampFirstOrder) {
windowDocs.createIndex(Indexes.compoundIndex(
Indexes.ascending(WindowDoc.ID_WINDOW_START_TS),
Indexes.ascending(WindowDoc.ID_RECORD_KEY)
));
} else {
windowDocs.createIndex(Indexes.compoundIndex(
Indexes.ascending(WindowDoc.ID_RECORD_KEY),
Indexes.ascending(WindowDoc.ID_WINDOW_START_TS)
));
}
segmentWindows.put(segmentToCreate, windowDocs);
}

Expand All @@ -189,7 +207,7 @@ private void deleteSegment(final SegmentPartition segmentToExpire) {
}
}

public MongoWindowedTable(
public MongoWindowTable(
final MongoClient client,
final String name,
final WindowSegmentPartitioner partitioner,
Expand Down Expand Up @@ -258,7 +276,8 @@ public MongoWindowFlushManager init(
kafkaPartition,
metaDoc.streamTime,
metaDoc.epoch,
collectionCreationOptions
collectionCreationOptions,
timestampFirstOrder
)
);

Expand Down Expand Up @@ -522,8 +541,38 @@ public KeyValueIterator<WindowedKey, byte[]> fetchRange(
final long timeFrom,
final long timeTo
) {
throw new UnsupportedOperationException("fetchRange not yet supported for Mongo backends");
final List<KeyValueIterator<WindowedKey, byte[]>> segmentIterators = new LinkedList<>();
final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition);

for (final var segment : partitioner.segmenter().range(kafkaPartition, timeFrom, timeTo)) {
final var segmentWindows = partitionSegments.segmentWindows.get(segment);
if (segmentWindows == null) {
continue;
}
final ArrayList<Bson> filters = new ArrayList<>(4);

// order matters when filtering on fields in a compound index
if (timestampFirstOrder) {
filters.add(Filters.gte(ID_SUBFIELD_WINDOW_START, timeFrom));
filters.add(Filters.lte(ID_SUBFIELD_WINDOW_START, timeTo));
filters.add(Filters.gte(ID_SUBFIELD_KEY, keyCodec.encode(fromKey)));
filters.add(Filters.lte(ID_SUBFIELD_KEY, keyCodec.encode(toKey)));
} else {
filters.add(Filters.gte(ID_SUBFIELD_KEY, keyCodec.encode(fromKey)));
filters.add(Filters.lte(ID_SUBFIELD_KEY, keyCodec.encode(toKey)));
filters.add(Filters.gte(ID_SUBFIELD_WINDOW_START, timeFrom));
filters.add(Filters.lte(ID_SUBFIELD_WINDOW_START, timeTo));
}
final FindIterable<WindowDoc> fetchResults = segmentWindows.find(Filters.and(filters));

if (retainDuplicates) {
segmentIterators.add(
new DuplicateKeyListValueIterator(fetchResults.iterator(), this::windowedKeyFromDoc));
} else {
segmentIterators.add(Iterators.kv(fetchResults.iterator(), this::windowFromDoc));
}
}
return Iterators.wrapped(segmentIterators);
}

@Override
Expand All @@ -543,7 +592,37 @@ public KeyValueIterator<WindowedKey, byte[]> fetchAll(
final long timeFrom,
final long timeTo
) {
throw new UnsupportedOperationException("fetchAll not yet supported for Mongo backends");
final List<KeyValueIterator<WindowedKey, byte[]>> segmentIterators = new LinkedList<>();
final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition);

for (final var segment : partitioner.segmenter().range(kafkaPartition, timeFrom, timeTo)) {
final var segmentWindows = partitionSegments.segmentWindows.get(segment);
if (segmentWindows == null) {
continue;
}
final ArrayList<Bson> filters = new ArrayList<>(2);
filters.add(Filters.gte(ID_SUBFIELD_WINDOW_START, timeFrom));
filters.add(Filters.lte(ID_SUBFIELD_WINDOW_START, timeTo));

if (!timestampFirstOrder) {
LOG.warn("WindowStore#fetchAll should be used with caution as a full table scan is "
+ "required for range queries with no key bound when the key is indexed "
+ "first. If your application makes heavy use of this API, consider "
+ "setting " + ResponsiveConfig.MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG
+ "to 'true' for better performance of #fetchAll at the cost of worse "
+ "performance of the #fetch(key, timeFrom, timeTo API.");
}

final FindIterable<WindowDoc> fetchResults = segmentWindows.find(Filters.and(filters));

if (retainDuplicates) {
segmentIterators.add(
new DuplicateKeyListValueIterator(fetchResults.iterator(), this::windowedKeyFromDoc));
} else {
segmentIterators.add(Iterators.kv(fetchResults.iterator(), this::windowFromDoc));
}
}
return Iterators.wrapped(segmentIterators);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.mongodb.client.model.WriteModel;
import dev.responsive.kafka.internal.db.RemoteKVTable;
import dev.responsive.kafka.internal.db.RemoteSessionTable;
import dev.responsive.kafka.internal.db.RemoteWindowedTable;
import dev.responsive.kafka.internal.db.RemoteWindowTable;
import dev.responsive.kafka.internal.db.SessionTableCache;
import dev.responsive.kafka.internal.db.TableCache;
import dev.responsive.kafka.internal.db.WindowedTableCache;
Expand All @@ -31,7 +31,7 @@
public class ResponsiveMongoClient {

private final TableCache<MongoKVTable> kvTableCache;
private final WindowedTableCache<MongoWindowedTable> windowTableCache;
private final WindowedTableCache<MongoWindowTable> windowTableCache;
private final SessionTableCache<MongoSessionTable> sessionTableCache;
private final MongoClient client;

Expand All @@ -49,7 +49,7 @@ public ResponsiveMongoClient(
spec.ttlResolver()
));
windowTableCache = new WindowedTableCache<>(
(spec, partitioner) -> new MongoWindowedTable(
(spec, partitioner) -> new MongoWindowTable(
client,
spec.tableName(),
partitioner,
Expand All @@ -76,7 +76,7 @@ public RemoteKVTable<WriteModel<KVDoc>> kvTable(
);
}

public RemoteWindowedTable<WriteModel<WindowDoc>> windowedTable(
public RemoteWindowTable<WriteModel<WindowDoc>> windowedTable(
final String name,
final WindowSegmentPartitioner partitioner
) throws InterruptedException, TimeoutException {
Expand Down
Loading

0 comments on commit 145dc86

Please sign in to comment.