diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraClient.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraClient.java index 4d0199a06..d88170d65 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraClient.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraClient.java @@ -47,7 +47,7 @@ public class CassandraClient { private final ResponsiveConfig config; private final TableCache> kvFactory; private final TableCache> factFactory; - private final WindowedTableCache> windowedFactory; + private final WindowedTableCache> windowedFactory; private final TableCache globalFactory; /** @@ -63,9 +63,9 @@ public CassandraClient(final CqlSession session, final ResponsiveConfig config) this.kvFactory = new TableCache<>(spec -> CassandraKeyValueTable.create(spec, this)); this.factFactory = new TableCache<>(spec -> CassandraFactTable.create(spec, this)); this.windowedFactory = - new WindowedTableCache<>((spec, partitioner) -> CassandraWindowedTable.create(spec, - this, - partitioner + new WindowedTableCache<>((spec, partitioner) -> CassandraWindowTable.create(spec, + this, + partitioner )); this.globalFactory = new TableCache<>(spec -> CassandraFactTable.create(spec, this)); } @@ -186,7 +186,7 @@ public TableCache> factFactory() { return factFactory; } - public WindowedTableCache> windowedFactory() { + public WindowedTableCache> windowedFactory() { return windowedFactory; } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java index 6087e7c86..b0c2cc29b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowFlushManager.java @@ -27,7 +27,7 @@ public class CassandraWindowFlushManager extends WindowFlushManager { private final String logPrefix; private final Logger log; - private final CassandraWindowedTable table; + private final CassandraWindowTable table; private final CassandraClient client; private final TablePartitioner partitioner; @@ -35,7 +35,7 @@ public class CassandraWindowFlushManager extends WindowFlushManager { private final long epoch; public CassandraWindowFlushManager( - final CassandraWindowedTable table, + final CassandraWindowTable table, final CassandraClient client, final WindowSegmentPartitioner partitioner, final int kafkaPartition, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowTable.java similarity index 98% rename from kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java rename to kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowTable.java index 3d749ef10..61146a29c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowTable.java @@ -59,9 +59,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class CassandraWindowedTable implements RemoteWindowedTable { +public class CassandraWindowTable implements RemoteWindowTable { - private static final Logger LOG = LoggerFactory.getLogger(CassandraWindowedTable.class); + private static final Logger LOG = LoggerFactory.getLogger(CassandraWindowTable.class); private static final String KEY_FROM_BIND = "kf"; private static final String KEY_TO_BIND = "kt"; @@ -91,7 +91,7 @@ public class CassandraWindowedTable implements RemoteWindowedTable fetch( .setInstant(WINDOW_TO_BIND, Instant.ofEpochMilli(timeTo)); final ResultSet result = client.execute(get); - segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows)); + segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows)); } return Iterators.wrapped(segmentIterators); @@ -872,7 +872,7 @@ public KeyValueIterator backFetch( .setInstant(WINDOW_TO_BIND, Instant.ofEpochMilli(timeTo)); final ResultSet result = client.execute(get); - segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows)); + segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows)); } return Iterators.wrapped(segmentIterators); @@ -897,7 +897,7 @@ public KeyValueIterator fetchRange( .setByteBuffer(KEY_TO_BIND, ByteBuffer.wrap(toKey.get())); final ResultSet result = client.execute(get); - segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows)); + segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows)); } return Iterators.filterKv( @@ -925,7 +925,7 @@ public KeyValueIterator backFetchRange( .setByteBuffer(KEY_TO_BIND, ByteBuffer.wrap(toKey.get())); final ResultSet result = client.execute(get); - segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows)); + segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows)); } return Iterators.filterKv( @@ -951,7 +951,7 @@ public KeyValueIterator fetchAll( .setInstant(KEY_TO_BIND, Instant.ofEpochMilli(timeTo)); final ResultSet result = client.execute(get); - segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows)); + segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows)); } return Iterators.filterKv( @@ -977,7 +977,7 @@ public KeyValueIterator backFetchAll( .setInstant(KEY_TO_BIND, Instant.ofEpochMilli(timeTo)); final ResultSet result = client.execute(get); - segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowedTable::windowRows)); + segmentIterators.add(Iterators.kv(result.iterator(), CassandraWindowTable::windowRows)); } return Iterators.filterKv( diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java index 6f30dbda4..6848e9edc 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/MongoWindowFlushManager.java @@ -16,7 +16,7 @@ import com.mongodb.MongoException; import com.mongodb.bulk.WriteConcernError; import com.mongodb.client.MongoCollection; -import dev.responsive.kafka.internal.db.mongo.MongoWindowedTable; +import dev.responsive.kafka.internal.db.mongo.MongoWindowTable; import dev.responsive.kafka.internal.db.mongo.MongoWriter; import dev.responsive.kafka.internal.db.mongo.WindowDoc; import dev.responsive.kafka.internal.db.partitioning.Segmenter.SegmentPartition; @@ -33,14 +33,14 @@ public class MongoWindowFlushManager extends WindowFlushManager { private final String logPrefix; private final Logger log; - private final MongoWindowedTable table; + private final MongoWindowTable table; private final Function> windowsForSegment; private final WindowSegmentPartitioner partitioner; private final int kafkaPartition; public MongoWindowFlushManager( - final MongoWindowedTable table, + final MongoWindowTable table, final Function> windowsForSegment, final WindowSegmentPartitioner partitioner, final int kafkaPartition, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowTable.java similarity index 98% rename from kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowedTable.java rename to kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowTable.java index 6eb7b5107..b6c69fcf6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowTable.java @@ -16,7 +16,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.KeyValueIterator; -public interface RemoteWindowedTable extends RemoteTable { +public interface RemoteWindowTable extends RemoteTable { /** * Initializes the table by setting the metadata fields to diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTable.java similarity index 83% rename from kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.java rename to kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTable.java index 7b8f75a54..612c657f3 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTable.java @@ -13,6 +13,8 @@ package dev.responsive.kafka.internal.db.mongo; import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry; +import static dev.responsive.kafka.internal.db.mongo.WindowDoc.ID_SUBFIELD_KEY; +import static dev.responsive.kafka.internal.db.mongo.WindowDoc.ID_SUBFIELD_WINDOW_START; import static dev.responsive.kafka.internal.db.partitioning.Segmenter.UNINITIALIZED_STREAM_TIME; import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; import static org.bson.codecs.configuration.CodecRegistries.fromProviders; @@ -26,14 +28,16 @@ import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.FindOneAndUpdateOptions; +import com.mongodb.client.model.Indexes; import com.mongodb.client.model.ReturnDocument; import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.Updates; import com.mongodb.client.model.WriteModel; import com.mongodb.client.result.UpdateResult; +import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.db.MongoWindowFlushManager; -import dev.responsive.kafka.internal.db.RemoteWindowedTable; +import dev.responsive.kafka.internal.db.RemoteWindowTable; import dev.responsive.kafka.internal.db.partitioning.Segmenter; import dev.responsive.kafka.internal.db.partitioning.Segmenter.SegmentPartition; import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner; @@ -58,9 +62,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MongoWindowedTable implements RemoteWindowedTable> { +public class MongoWindowTable implements RemoteWindowTable> { - private static final Logger LOG = LoggerFactory.getLogger(MongoWindowedTable.class); + private static final Logger LOG = LoggerFactory.getLogger(MongoWindowTable.class); private static final String METADATA_COLLECTION_NAME = "window_metadata"; private static final UpdateOptions UPSERT_OPTIONS = new UpdateOptions().upsert(true); @@ -87,6 +91,7 @@ private static class PartitionSegments { private final Segmenter segmenter; private final long epoch; private final CollectionCreationOptions collectionCreationOptions; + private final boolean timestampFirstOrder; // Recommended to keep the total number of collections under 10,000, so we should not // let num_segments * num_kafka_partitions exceed 10k at the most @@ -99,13 +104,15 @@ public PartitionSegments( final int kafkaPartition, final long streamTime, final long epoch, - final CollectionCreationOptions collectionCreationOptions + final CollectionCreationOptions collectionCreationOptions, + final boolean timestampFirstOrder ) { this.database = database; this.adminDatabase = adminDatabase; this.segmenter = segmenter; this.epoch = epoch; this.collectionCreationOptions = collectionCreationOptions; + this.timestampFirstOrder = timestampFirstOrder; this.segmentWindows = new ConcurrentHashMap<>(); final List activeSegments = @@ -165,6 +172,17 @@ private void createSegment(final SegmentPartition segmentToCreate) { WindowDoc.class ); } + if (timestampFirstOrder) { + windowDocs.createIndex(Indexes.compoundIndex( + Indexes.ascending(WindowDoc.ID_WINDOW_START_TS), + Indexes.ascending(WindowDoc.ID_RECORD_KEY) + )); + } else { + windowDocs.createIndex(Indexes.compoundIndex( + Indexes.ascending(WindowDoc.ID_RECORD_KEY), + Indexes.ascending(WindowDoc.ID_WINDOW_START_TS) + )); + } segmentWindows.put(segmentToCreate, windowDocs); } @@ -189,7 +207,7 @@ private void deleteSegment(final SegmentPartition segmentToExpire) { } } - public MongoWindowedTable( + public MongoWindowTable( final MongoClient client, final String name, final WindowSegmentPartitioner partitioner, @@ -258,7 +276,8 @@ public MongoWindowFlushManager init( kafkaPartition, metaDoc.streamTime, metaDoc.epoch, - collectionCreationOptions + collectionCreationOptions, + timestampFirstOrder ) ); @@ -522,8 +541,38 @@ public KeyValueIterator fetchRange( final long timeFrom, final long timeTo ) { - throw new UnsupportedOperationException("fetchRange not yet supported for Mongo backends"); + final List> segmentIterators = new LinkedList<>(); + final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition); + + for (final var segment : partitioner.segmenter().range(kafkaPartition, timeFrom, timeTo)) { + final var segmentWindows = partitionSegments.segmentWindows.get(segment); + if (segmentWindows == null) { + continue; + } + final ArrayList filters = new ArrayList<>(4); + // order matters when filtering on fields in a compound index + if (timestampFirstOrder) { + filters.add(Filters.gte(ID_SUBFIELD_WINDOW_START, timeFrom)); + filters.add(Filters.lte(ID_SUBFIELD_WINDOW_START, timeTo)); + filters.add(Filters.gte(ID_SUBFIELD_KEY, keyCodec.encode(fromKey))); + filters.add(Filters.lte(ID_SUBFIELD_KEY, keyCodec.encode(toKey))); + } else { + filters.add(Filters.gte(ID_SUBFIELD_KEY, keyCodec.encode(fromKey))); + filters.add(Filters.lte(ID_SUBFIELD_KEY, keyCodec.encode(toKey))); + filters.add(Filters.gte(ID_SUBFIELD_WINDOW_START, timeFrom)); + filters.add(Filters.lte(ID_SUBFIELD_WINDOW_START, timeTo)); + } + final FindIterable fetchResults = segmentWindows.find(Filters.and(filters)); + + if (retainDuplicates) { + segmentIterators.add( + new DuplicateKeyListValueIterator(fetchResults.iterator(), this::windowedKeyFromDoc)); + } else { + segmentIterators.add(Iterators.kv(fetchResults.iterator(), this::windowFromDoc)); + } + } + return Iterators.wrapped(segmentIterators); } @Override @@ -543,7 +592,37 @@ public KeyValueIterator fetchAll( final long timeFrom, final long timeTo ) { - throw new UnsupportedOperationException("fetchAll not yet supported for Mongo backends"); + final List> segmentIterators = new LinkedList<>(); + final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition); + + for (final var segment : partitioner.segmenter().range(kafkaPartition, timeFrom, timeTo)) { + final var segmentWindows = partitionSegments.segmentWindows.get(segment); + if (segmentWindows == null) { + continue; + } + final ArrayList filters = new ArrayList<>(2); + filters.add(Filters.gte(ID_SUBFIELD_WINDOW_START, timeFrom)); + filters.add(Filters.lte(ID_SUBFIELD_WINDOW_START, timeTo)); + + if (!timestampFirstOrder) { + LOG.warn("WindowStore#fetchAll should be used with caution as a full table scan is " + + "required for range queries with no key bound when the key is indexed " + + "first. If your application makes heavy use of this API, consider " + + "setting " + ResponsiveConfig.MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG + + "to 'true' for better performance of #fetchAll at the cost of worse " + + "performance of the #fetch(key, timeFrom, timeTo API."); + } + + final FindIterable fetchResults = segmentWindows.find(Filters.and(filters)); + + if (retainDuplicates) { + segmentIterators.add( + new DuplicateKeyListValueIterator(fetchResults.iterator(), this::windowedKeyFromDoc)); + } else { + segmentIterators.add(Iterators.kv(fetchResults.iterator(), this::windowFromDoc)); + } + } + return Iterators.wrapped(segmentIterators); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java index 6491f8123..f365c312a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/ResponsiveMongoClient.java @@ -16,7 +16,7 @@ import com.mongodb.client.model.WriteModel; import dev.responsive.kafka.internal.db.RemoteKVTable; import dev.responsive.kafka.internal.db.RemoteSessionTable; -import dev.responsive.kafka.internal.db.RemoteWindowedTable; +import dev.responsive.kafka.internal.db.RemoteWindowTable; import dev.responsive.kafka.internal.db.SessionTableCache; import dev.responsive.kafka.internal.db.TableCache; import dev.responsive.kafka.internal.db.WindowedTableCache; @@ -31,7 +31,7 @@ public class ResponsiveMongoClient { private final TableCache kvTableCache; - private final WindowedTableCache windowTableCache; + private final WindowedTableCache windowTableCache; private final SessionTableCache sessionTableCache; private final MongoClient client; @@ -49,7 +49,7 @@ public ResponsiveMongoClient( spec.ttlResolver() )); windowTableCache = new WindowedTableCache<>( - (spec, partitioner) -> new MongoWindowedTable( + (spec, partitioner) -> new MongoWindowTable( client, spec.tableName(), partitioner, @@ -76,7 +76,7 @@ public RemoteKVTable> kvTable( ); } - public RemoteWindowedTable> windowedTable( + public RemoteWindowTable> windowedTable( final String name, final WindowSegmentPartitioner partitioner ) throws InterruptedException, TimeoutException { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/WindowDoc.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/WindowDoc.java index 99a31f9f6..1fdf7a40d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/WindowDoc.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/WindowDoc.java @@ -30,6 +30,10 @@ public class WindowDoc { public static final String ID_RECORD_KEY = "key"; public static final String ID_WINDOW_START_TS = "windowStartTs"; + // combined name of id and subfield, for use in query filters + public static final String ID_SUBFIELD_KEY = String.join(".", ID, ID_RECORD_KEY); + public static final String ID_SUBFIELD_WINDOW_START = String.join(".", ID, ID_WINDOW_START_TS); + BasicDBObject id; byte[] value; List values; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java index 54acd55ff..2fdc984f5 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java @@ -24,7 +24,7 @@ import dev.responsive.kafka.internal.db.BatchFlusher; import dev.responsive.kafka.internal.db.CassandraClient; import dev.responsive.kafka.internal.db.RemoteTableSpecFactory; -import dev.responsive.kafka.internal.db.RemoteWindowedTable; +import dev.responsive.kafka.internal.db.RemoteWindowTable; import dev.responsive.kafka.internal.db.WindowFlushManager; import dev.responsive.kafka.internal.db.WindowedKeySpec; import dev.responsive.kafka.internal.db.mongo.ResponsiveMongoClient; @@ -56,7 +56,7 @@ public class SegmentedOperations implements WindowOperations { @SuppressWarnings("rawtypes") private final InternalProcessorContext context; private final ResponsiveWindowParams params; - private final RemoteWindowedTable table; + private final RemoteWindowTable table; private final CommitBuffer buffer; private final TopicPartition changelog; @@ -94,7 +94,7 @@ public static SegmentedOperations create( params.retainDuplicates() ); - final RemoteWindowedTable table; + final RemoteWindowTable table; switch (sessionClients.storageBackend()) { case CASSANDRA: table = createCassandra(params, sessionClients, partitioner); @@ -165,7 +165,7 @@ public static SegmentedOperations create( } } - private static RemoteWindowedTable createCassandra( + private static RemoteWindowTable createCassandra( final ResponsiveWindowParams params, final SessionClients clients, final WindowSegmentPartitioner partitioner @@ -184,7 +184,7 @@ private static RemoteWindowedTable createCassandra( } } - private static RemoteWindowedTable createMongo( + private static RemoteWindowTable createMongo( final ResponsiveWindowParams params, final SessionClients clients, final WindowSegmentPartitioner partitioner @@ -205,7 +205,7 @@ private static RemoteWindowedTable createMongo( public SegmentedOperations( final InternalProcessorContext context, final ResponsiveWindowParams params, - final RemoteWindowedTable table, + final RemoteWindowTable table, final CommitBuffer buffer, final TopicPartition changelog, final ResponsiveStoreRegistry storeRegistry, diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java index 6da8293fd..2fa9ba327 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowTableTest.java @@ -13,19 +13,30 @@ package dev.responsive.kafka.internal.db.mongo; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; +import static dev.responsive.kafka.testutils.Matchers.sameKeyValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; import com.mongodb.client.MongoClient; +import com.mongodb.client.model.Filters; import dev.responsive.kafka.api.config.StorageBackend; +import dev.responsive.kafka.internal.db.MongoWindowFlushManager; +import dev.responsive.kafka.internal.db.partitioning.Segmenter; import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner; import dev.responsive.kafka.internal.utils.SessionUtil; import dev.responsive.kafka.internal.utils.WindowedKey; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; import java.util.ArrayList; +import java.util.Arrays; import java.util.Map; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -33,17 +44,19 @@ import org.junit.jupiter.api.extension.RegisterExtension; class MongoWindowTableTest { - @RegisterExtension public static final ResponsiveExtension EXT = new ResponsiveExtension(StorageBackend.MONGO_DB); private static final CollectionCreationOptions UNSHARDED = new CollectionCreationOptions( false, 0 ); - private static final byte[] DEFAULT_VALUE = new byte[] {1}; private String name; private MongoClient client; + private WindowSegmentPartitioner partitioner; + private Segmenter.SegmentPartition segment; + private MongoWindowTable table; + private MongoWindowFlushManager flushManager; @BeforeEach public void before( @@ -54,6 +67,14 @@ public void before( final String mongoConnection = (String) props.get(MONGO_ENDPOINT_CONFIG); client = SessionUtil.connect(mongoConnection, null, null, "", null); + + partitioner = new WindowSegmentPartitioner(10_000L, 1_000L, false); + segment = partitioner.segmenter().activeSegments(0, 100).get(0); + + table = new MongoWindowTable(client, name, partitioner, true, UNSHARDED); + flushManager = table.init(0); + flushManager.updateOffsetAndStreamTime(0, 100); + flushManager.createSegment(segment); } @Test @@ -63,8 +84,8 @@ public void shouldSucceedSimpleSetGet() { new WindowSegmentPartitioner(10_000L, 1_000L, false); final var segment = partitioner.segmenter().activeSegments(0, 100).get(0); - final MongoWindowedTable table = - new MongoWindowedTable(client, name, partitioner, false, UNSHARDED); + final MongoWindowTable table = + new MongoWindowTable(client, name, partitioner, false, UNSHARDED); final var flushManager = table.init(0); flushManager.updateOffsetAndStreamTime(0, 100); @@ -72,17 +93,18 @@ public void shouldSucceedSimpleSetGet() { // When: final var byteKey = Bytes.wrap("key".getBytes()); + final byte[] valueByte = new byte[] {1}; var writer = flushManager.createWriter(segment); writer.insert( new WindowedKey(byteKey, 0), - DEFAULT_VALUE, + valueByte, table.localEpoch(0) ); writer.flush(); // Then: var value = table.fetch(0, byteKey, 0); - assertThat(value, Matchers.equalTo(DEFAULT_VALUE)); + assertThat(value, Matchers.equalTo(valueByte)); value = table.fetch(0, byteKey, 100); assertThat(value, Matchers.nullValue()); value = table.fetch(0, Bytes.wrap("other".getBytes()), 0); @@ -96,25 +118,26 @@ public void shouldSucceedRangeSetGet() { new WindowSegmentPartitioner(10_000L, 1_000L, false); final var segment = partitioner.segmenter().activeSegments(0, 100).get(0); - final MongoWindowedTable table = - new MongoWindowedTable(client, name, partitioner, false, UNSHARDED); + final MongoWindowTable table = + new MongoWindowTable(client, name, partitioner, false, UNSHARDED); final var flushManager = table.init(0); flushManager.updateOffsetAndStreamTime(0, 6_000); flushManager.createSegment(segment); // When: final var byteKey = Bytes.wrap("key".getBytes()); + final byte[] byteValue = new byte[] {1}; final var windowedKey1 = new WindowedKey(byteKey, 500); final var windowedKey2 = new WindowedKey(byteKey, 5_000); var writer = flushManager.createWriter(segment); writer.insert( windowedKey1, - DEFAULT_VALUE, + byteValue, table.localEpoch(0) ); writer.insert( windowedKey2, - DEFAULT_VALUE, + byteValue, table.localEpoch(0) ); writer.flush(); @@ -127,10 +150,252 @@ public void shouldSucceedRangeSetGet() { assertThat(kvs, Matchers.hasSize(2)); assertThat(kvs.get(0).key.key, Matchers.equalTo(windowedKey1.key)); assertThat(kvs.get(0).key.windowStartMs, Matchers.equalTo(windowedKey1.windowStartMs)); - assertThat(kvs.get(0).value, Matchers.equalTo(DEFAULT_VALUE)); + assertThat(kvs.get(0).value, Matchers.equalTo(byteValue)); assertThat(kvs.get(1).key.key, Matchers.equalTo(windowedKey2.key)); assertThat(kvs.get(1).key.windowStartMs, Matchers.equalTo(windowedKey2.windowStartMs)); - assertThat(kvs.get(1).value, Matchers.equalTo(DEFAULT_VALUE)); + assertThat(kvs.get(1).value, Matchers.equalTo(byteValue)); + } + + @Test + public void shouldFetchInserts() { + // given: + final var writer = flushManager.createWriter(segment); + final Bytes key = Bytes.wrap("key".getBytes()); + writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); + writer.flush(); + + // when: + final var val = table.fetch(0, Bytes.wrap("key".getBytes()), 100); + + // then: + assertThat(val, equalsByteArray("val".getBytes())); + } + + private void shouldReturnWindowsForFetch() { + // given: + final var writer = flushManager.createWriter(segment); + final Bytes key = Bytes.wrap("key".getBytes()); + final Bytes keyOther = Bytes.wrap("keyOther".getBytes()); + writer.insert(new WindowedKey(key, 99), "valEarly".getBytes(), 110); + writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); + writer.insert(new WindowedKey(keyOther, 105), "valOther".getBytes(), 110); + writer.insert(new WindowedKey(key, 200), "val2".getBytes(), 210); + writer.insert(new WindowedKey(key, 201), "valLate".getBytes(), 210); + writer.flush(); + + // when: + final var iterator = table.fetch(0, key, 100, 200); + + // then: + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(key, 100), "val".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(key, 200), "val2".getBytes())) + ); + assertThat(iterator.hasNext(), is(false)); + iterator.close(); + } + + @Test + public void shouldFetchWhenTimestampFirstEnabled() { + assertThat(table.isTimestampFirstOrder(), is(true)); + shouldReturnWindowsForFetch(); + } + + @Test + public void shouldFetchWhenTimestampFirstDisabled() { + table = new MongoWindowTable(client, name, partitioner, false, UNSHARDED); + flushManager = table.init(0); + flushManager.updateOffsetAndStreamTime(0, 100); + flushManager.createSegment(segment); + + shouldReturnWindowsForFetch(); + } + + private void shouldReturnWindowsForFetchRange() { + // given: + final var writer = flushManager.createWriter(segment); + final Bytes key = Bytes.wrap("key".getBytes()); // key out of range + final Bytes keyA = Bytes.wrap("keyA".getBytes()); + final Bytes keyAA = Bytes.wrap("keyAA".getBytes()); + final Bytes keyAB = Bytes.wrap("keyAB".getBytes()); + final Bytes keyB = Bytes.wrap("keyB".getBytes()); + final Bytes keyBA = Bytes.wrap("keyBA".getBytes()); // key out of range + + writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); // key out of range + writer.insert(new WindowedKey(keyA, 99), "valAEarly".getBytes(), 110); // time out of range + writer.insert(new WindowedKey(keyA, 100), "valA".getBytes(), 110); + writer.insert(new WindowedKey(keyAA, 99), "valAAEarly".getBytes(), 110); // time out of range + writer.insert(new WindowedKey(keyAA, 100), "valAA".getBytes(), 110); + writer.insert(new WindowedKey(keyAB, 105), "valAB".getBytes(), 110); + writer.insert(new WindowedKey(keyB, 200), "valB".getBytes(), 210); + writer.insert(new WindowedKey(keyB, 201), "valBLate".getBytes(), 210); // time out of range + writer.insert(new WindowedKey(keyBA, 200), "valBA".getBytes(), 210); // key out of range + writer.flush(); + + // when: + final var iterator = table.fetchRange(0, keyA, keyB, 100, 200); + + // then: + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyA, 100), "valA".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyAA, 100), "valAA".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyAB, 105), "valAB".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyB, 200), "valB".getBytes())) + ); + assertThat(iterator.hasNext(), is(false)); + iterator.close(); + } + + @Test + public void shouldFetchRangeWhenTimestampFirstEnabled() { + assertThat(table.isTimestampFirstOrder(), is(true)); + shouldReturnWindowsForFetchRange(); + } + + @Test + public void shouldFetchRangeWhenTimestampFirstDisabled() { + table = new MongoWindowTable(client, name, partitioner, false, UNSHARDED); + flushManager = table.init(0); + flushManager.updateOffsetAndStreamTime(0, 100); + flushManager.createSegment(segment); + + shouldReturnWindowsForFetchRange(); + } + + private void shouldReturnWindowsForFetchAll() { + // given: + final var writer = flushManager.createWriter(segment); + final Bytes key = Bytes.wrap("key".getBytes()); + final Bytes keyA = Bytes.wrap("keyA".getBytes()); + final Bytes keyAA = Bytes.wrap("keyAA".getBytes()); + final Bytes keyAB = Bytes.wrap("keyAB".getBytes()); + final Bytes keyB = Bytes.wrap("keyB".getBytes()); + final Bytes keyBA = Bytes.wrap("keyBA".getBytes()); + + writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); + writer.insert(new WindowedKey(keyA, 99), "valAEarly".getBytes(), 110); // time out of range + writer.insert(new WindowedKey(keyA, 100), "valA".getBytes(), 110); + writer.insert(new WindowedKey(keyAA, 99), "valAAEarly".getBytes(), 110); // time out of range + writer.insert(new WindowedKey(keyAA, 100), "valAA".getBytes(), 110); + writer.insert(new WindowedKey(keyAB, 105), "valAB".getBytes(), 110); + writer.insert(new WindowedKey(keyB, 200), "valB".getBytes(), 210); + writer.insert(new WindowedKey(keyB, 201), "valBLate".getBytes(), 210); // time out of range + writer.insert(new WindowedKey(keyBA, 200), "valBA".getBytes(), 210); + writer.flush(); + + // when: + final var iterator = table.fetchAll(0, 100, 200); + + // then: + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(key, 100), "val".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyA, 100), "valA".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyAA, 100), "valAA".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyAB, 105), "valAB".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyB, 200), "valB".getBytes())) + ); + assertThat(iterator.hasNext(), is(true)); + assertThat( + iterator.next(), + sameKeyValue(new KeyValue<>(new WindowedKey(keyBA, 200), "valBA".getBytes())) + ); + assertThat(iterator.hasNext(), is(false)); + iterator.close(); + } + + @Test + public void shouldFetchAllWhenTimestampFirstEnabled() { + assertThat(table.isTimestampFirstOrder(), is(true)); + shouldReturnWindowsForFetchAll(); + } + + @Test + public void shouldFetchAllWhenTimestampFirstDisabled() { + table = new MongoWindowTable(client, name, partitioner, false, UNSHARDED); + flushManager = table.init(0); + flushManager.updateOffsetAndStreamTime(0, 100); + flushManager.createSegment(segment); + + shouldReturnWindowsForFetchAll(); + } + + @Test + public void shouldEncodeKeyAsBase64String() { + // given: + final var writer = flushManager.createWriter(segment); + final Bytes key = Bytes.wrap("key".getBytes()); + writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); + writer.flush(); + final var collection = table.windowsForSegmentPartition(0, segment); + + // when: + final var result = collection.find(Filters.eq( + String.join(".", WindowDoc.ID, WindowDoc.ID_RECORD_KEY), + new StringKeyCodec().encode(key) + )).first(); + + // then: + assertThat(result, not(nullValue())); + } + + private static Matcher equalsByteArray(final byte[] expected) { + return new EqualByteArray(expected); + } + + private static class EqualByteArray extends BaseMatcher { + private final byte[] expected; + + public EqualByteArray(byte[] expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object actual) { + return actual instanceof byte[] && Arrays.equals(expected, (byte[]) actual); + } + + @Override + public void describeTo(Description description) { + description.appendValue(expected); + } } } \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java deleted file mode 100644 index b05effbef..000000000 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTableTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2024 Responsive Computing, Inc. - * - * This source code is licensed under the Responsive Business Source License Agreement v1.0 - * available at: - * - * https://www.responsive.dev/legal/responsive-bsl-10 - * - * This software requires a valid Commercial License Key for production use. Trial and commercial - * licenses can be obtained at https://www.responsive.dev - */ - -package dev.responsive.kafka.internal.db.mongo; - -import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; -import static dev.responsive.kafka.testutils.Matchers.sameKeyValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; - -import com.mongodb.client.MongoClient; -import com.mongodb.client.model.Filters; -import dev.responsive.kafka.api.config.StorageBackend; -import dev.responsive.kafka.internal.db.MongoWindowFlushManager; -import dev.responsive.kafka.internal.db.partitioning.Segmenter; -import dev.responsive.kafka.internal.db.partitioning.WindowSegmentPartitioner; -import dev.responsive.kafka.internal.utils.SessionUtil; -import dev.responsive.kafka.internal.utils.WindowedKey; -import dev.responsive.kafka.testutils.ResponsiveConfigParam; -import dev.responsive.kafka.testutils.ResponsiveExtension; -import java.util.Arrays; -import java.util.Map; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.RegisterExtension; - -class MongoWindowedTableTest { - @RegisterExtension - public static final ResponsiveExtension EXT = new ResponsiveExtension(StorageBackend.MONGO_DB); - private static final CollectionCreationOptions UNSHARDED = new CollectionCreationOptions( - false, - 0 - ); - - private String name; - private MongoClient client; - private WindowSegmentPartitioner partitioner; - private Segmenter.SegmentPartition segment; - private MongoWindowedTable table; - private MongoWindowFlushManager flushManager; - - @BeforeEach - public void before( - final TestInfo info, - @ResponsiveConfigParam final Map props - ) { - name = info.getDisplayName().replace("()", ""); - - final String mongoConnection = (String) props.get(MONGO_ENDPOINT_CONFIG); - client = SessionUtil.connect(mongoConnection, null, null, "", null); - - partitioner = new WindowSegmentPartitioner(10_000L, 1_000L, false); - segment = partitioner.segmenter().activeSegments(0, 100).get(0); - - table = new MongoWindowedTable(client, name, partitioner, true, UNSHARDED); - flushManager = table.init(0); - flushManager.updateOffsetAndStreamTime(0, 100); - flushManager.createSegment(segment); - } - - @Test - public void shouldFetchInserts() { - // given: - final var writer = flushManager.createWriter(segment); - final Bytes key = Bytes.wrap("key".getBytes()); - writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); - writer.flush(); - - // when: - final var val = table.fetch(0, Bytes.wrap("key".getBytes()), 100); - - // then: - assertThat(val, equalsByteArray("val".getBytes())); - } - - private void shouldFetchWindowsInRange() { - // given: - final var writer = flushManager.createWriter(segment); - final Bytes key = Bytes.wrap("key".getBytes()); - final Bytes keyOther = Bytes.wrap("keyOther".getBytes()); - writer.insert(new WindowedKey(key, 99), "valEarly".getBytes(), 110); - writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); - writer.insert(new WindowedKey(keyOther, 105), "valOther".getBytes(), 110); - writer.insert(new WindowedKey(key, 200), "val2".getBytes(), 210); - writer.insert(new WindowedKey(key, 201), "valLate".getBytes(), 210); - writer.flush(); - - // when: - final var iterator = table.fetch(0, key, 100, 200); - - // then: - assertThat(iterator.hasNext(), is(true)); - assertThat( - iterator.next(), - sameKeyValue(new KeyValue<>(new WindowedKey(key, 100), "val".getBytes())) - ); - assertThat(iterator.hasNext(), is(true)); - assertThat( - iterator.next(), - sameKeyValue(new KeyValue<>(new WindowedKey(key, 200), "val2".getBytes())) - ); - assertThat(iterator.hasNext(), is(false)); - iterator.close(); - } - - @Test - public void shouldFetchWindowsInRangeWhenTimestampFirstEnabled() { - assertThat(table.isTimestampFirstOrder(), is(true)); - shouldFetchWindowsInRange(); - } - - @Test - public void shouldFetchWindowsInRangeWhenTimestampFirstDisabled() { - table = new MongoWindowedTable(client, name, partitioner, false, UNSHARDED); - flushManager = table.init(0); - flushManager.updateOffsetAndStreamTime(0, 100); - flushManager.createSegment(segment); - - shouldFetchWindowsInRange(); - } - - @Test - public void shouldEncodeKeyAsBase64String() { - // given: - final var writer = flushManager.createWriter(segment); - final Bytes key = Bytes.wrap("key".getBytes()); - writer.insert(new WindowedKey(key, 100), "val".getBytes(), 110); - writer.flush(); - final var collection = table.windowsForSegmentPartition(0, segment); - - // when: - final var result = collection.find(Filters.eq( - String.join(".", WindowDoc.ID, WindowDoc.ID_RECORD_KEY), - new StringKeyCodec().encode(key) - )).first(); - - // then: - assertThat(result, not(nullValue())); - } - - private static Matcher equalsByteArray(final byte[] expected) { - return new EqualByteArray(expected); - } - - private static class EqualByteArray extends BaseMatcher { - private final byte[] expected; - - public EqualByteArray(byte[] expected) { - this.expected = expected; - } - - @Override - public boolean matches(Object actual) { - return actual instanceof byte[] && Arrays.equals(expected, (byte[]) actual); - } - - @Override - public void describeTo(Description description) { - description.appendValue(expected); - } - } -} \ No newline at end of file diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java index 1bbd15dc7..57a46328b 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java @@ -23,9 +23,9 @@ import dev.responsive.kafka.internal.db.CassandraClient; import dev.responsive.kafka.internal.db.QueryOp; import dev.responsive.kafka.internal.db.RemoteKVTable; -import dev.responsive.kafka.internal.db.RemoteWindowedTable; +import dev.responsive.kafka.internal.db.RemoteWindowTable; import dev.responsive.kafka.internal.db.TTDKeyValueTable; -import dev.responsive.kafka.internal.db.TTDWindowedTable; +import dev.responsive.kafka.internal.db.TTDWindowTable; import dev.responsive.kafka.internal.db.TableCache; import dev.responsive.kafka.internal.db.WindowedTableCache; import dev.responsive.kafka.internal.db.inmemory.InMemoryKVTable; @@ -42,7 +42,7 @@ public class TTDCassandraClient extends CassandraClient { private final Time time; private final TableCache> kvFactory; - private final WindowedTableCache> windowedFactory; + private final WindowedTableCache> windowedFactory; public TTDCassandraClient(final TTDMockAdmin admin, final Time time) { super(loggedConfig(admin.props())); @@ -50,10 +50,8 @@ public TTDCassandraClient(final TTDMockAdmin admin, final Time time) { this.time = time; kvFactory = new TableCache<>(spec -> new TTDKeyValueTable(spec, this)); - windowedFactory = new WindowedTableCache<>((spec, partitioner) -> TTDWindowedTable.create(spec, - this, - partitioner - )); + windowedFactory = new WindowedTableCache<>( + (spec, partitioner) -> TTDWindowTable.create(spec, this, partitioner)); } public ResponsiveStoreRegistry storeRegistry() { @@ -107,7 +105,7 @@ public RemoteMonitor awaitTable( @Override public long count(final String tableName, final int tablePartition) { final var kv = (InMemoryKVTable) kvFactory.getTable(tableName); - final var window = (TTDWindowedTable) windowedFactory.getTable(tableName); + final var window = (TTDWindowTable) windowedFactory.getTable(tableName); return (kv == null ? 0 : kv.approximateNumEntries(tablePartition)) + (window == null ? 0 : window.count()); } @@ -128,7 +126,7 @@ public TableCache> factFactory() { } @Override - public WindowedTableCache> windowedFactory() { + public WindowedTableCache> windowedFactory() { return windowedFactory; } } \ No newline at end of file diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowedTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java similarity index 94% rename from responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowedTable.java rename to responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java index 913ae13f2..00832ff86 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowedTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java @@ -24,22 +24,22 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.KeyValueIterator; -public class TTDWindowedTable extends TTDTable - implements RemoteWindowedTable { +public class TTDWindowTable extends TTDTable + implements RemoteWindowTable { private final String name; private final WindowStoreStub stub; private final WindowSegmentPartitioner partitioner; - public static TTDWindowedTable create( + public static TTDWindowTable create( final RemoteTableSpec spec, final CassandraClient client, final WindowSegmentPartitioner partitioner ) { - return new TTDWindowedTable(spec, (TTDCassandraClient) client, partitioner); + return new TTDWindowTable(spec, (TTDCassandraClient) client, partitioner); } - public TTDWindowedTable( + public TTDWindowTable( final RemoteTableSpec spec, final TTDCassandraClient client, WindowSegmentPartitioner partitioner @@ -157,11 +157,11 @@ public long count() { private static class TTDWindowFlushManager extends WindowFlushManager { private final String logPrefix; - private final TTDWindowedTable table; + private final TTDWindowTable table; private final WindowSegmentPartitioner partitioner; public TTDWindowFlushManager( - final TTDWindowedTable table, + final TTDWindowTable table, final int kafkaPartition, final WindowSegmentPartitioner partitioner ) {