diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java index 148275b02..e935b3daf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java @@ -30,8 +30,7 @@ public FactWriterFactory(final RemoteTable table) { @Override public RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ) { return new FactSchemaWriter<>( client.cassandraClient(), diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java index c441800c8..c676d7813 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java @@ -16,11 +16,14 @@ package dev.responsive.kafka.internal.db; +import static dev.responsive.kafka.internal.stores.CommitBuffer.MAX_BATCH_SIZE; + import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.BatchableStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import dev.responsive.kafka.internal.stores.RemoteWriteResult; import java.util.ArrayList; import java.util.List; @@ -34,22 +37,38 @@ public class LwtWriter implements RemoteWriter { private final Supplier> fencingStatementFactory; private final RemoteTable table; private final int partition; - private final int batchSize; + private final long maxBatchSize; private final List> statements; public LwtWriter( + final CassandraClient client, + final Supplier> fencingStatementFactory, + final RemoteTable table, + final int partition + ) { + this( + client, + fencingStatementFactory, + table, + partition, + MAX_BATCH_SIZE + ); + } + + @VisibleForTesting + LwtWriter( final CassandraClient client, final Supplier> fencingStatementFactory, final RemoteTable table, final int partition, - final int batchSize + final long maxBatchSize ) { this.client = client; this.fencingStatementFactory = fencingStatementFactory; this.table = table; this.partition = partition; - this.batchSize = batchSize; + this.maxBatchSize = maxBatchSize; statements = new ArrayList<>(); } @@ -74,7 +93,7 @@ public CompletionStage flush() { builder.setIdempotence(true); builder.addStatement(fencingStatementFactory.get()); - for (int i = 0; i < batchSize && it.hasNext(); i++) { + for (int i = 0; i < maxBatchSize && it.hasNext(); i++) { builder.addStatement(it.next()); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java index d3900d20a..e5a5ed3e6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java @@ -136,15 +136,13 @@ public LwtWriterFactory( @Override public RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ) { return new LwtWriter<>( client.cassandraClient(), () -> ensureEpoch.bind().setInt(PARTITION_KEY.bind(), partition), table, - partition, - batchSize + partition ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java index 9e53d0b16..9bc3235f9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java @@ -22,8 +22,7 @@ public interface WriterFactory { RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java index 14c317084..36c32ff1c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java @@ -40,8 +40,7 @@ public MongoWriterFactory( @Override public RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ) { return new MongoWriter<>(table, partition, genericCollection); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java index 6e23019fa..bb06c7dd7 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java @@ -85,7 +85,7 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.slf4j.Logger; -class CommitBuffer> +public class CommitBuffer> implements RecordBatchingStateRestoreCallback, Closeable { public static final int MAX_BATCH_SIZE = 1000; @@ -488,8 +488,7 @@ private void doFlush(final long consumedOffset, final int batchSize) { final RemoteWriter writer = writers .computeIfAbsent(subPartition, k -> writerFactory.createWriter( sessionClients, - subPartition, - batchSize + subPartition )); if (result.isTombstone) { @@ -509,7 +508,7 @@ private void doFlush(final long consumedOffset, final int batchSize) { // the first subpartition final var offsetWriteResult = writers.computeIfAbsent( subPartitioner.first(changelog.partition()), - subPartition -> writerFactory.createWriter(sessionClients, subPartition, batchSize) + subPartition -> writerFactory.createWriter(sessionClients, subPartition) ).setOffset(consumedOffset); if (!offsetWriteResult.wasApplied()) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/FactSchemaWriterTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/FactSchemaWriterTest.java similarity index 83% rename from kafka-client/src/test/java/dev/responsive/kafka/internal/stores/FactSchemaWriterTest.java rename to kafka-client/src/test/java/dev/responsive/kafka/internal/db/FactSchemaWriterTest.java index e0611bfdf..c5493e91a 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/FactSchemaWriterTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/FactSchemaWriterTest.java @@ -1,20 +1,22 @@ /* - * Copyright 2023 Responsive Computing, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * * Copyright 2023 Responsive Computing, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ -package dev.responsive.kafka.internal.stores; +package dev.responsive.kafka.internal.db; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/LwtWriterTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java similarity index 84% rename from kafka-client/src/test/java/dev/responsive/kafka/internal/stores/LwtWriterTest.java rename to kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java index 1ce600545..21e514769 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/LwtWriterTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java @@ -1,20 +1,22 @@ /* - * Copyright 2023 Responsive Computing, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * * Copyright 2023 Responsive Computing, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ -package dev.responsive.kafka.internal.stores; +package dev.responsive.kafka.internal.db; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -28,10 +30,6 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.internal.core.cql.DefaultBatchStatement; -import dev.responsive.kafka.internal.db.CassandraClient; -import dev.responsive.kafka.internal.db.CassandraKeyValueTable; -import dev.responsive.kafka.internal.db.LwtWriter; -import dev.responsive.kafka.internal.db.WriterFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -51,9 +49,7 @@ class LwtWriterTest { private static final long CURRENT_TS = 100L; - - @Mock - private WriterFactory writerFactory; + @Mock private CassandraClient client; @Mock diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java index 777aeab7d..4b86cae76 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java @@ -48,7 +48,7 @@ public WriterFactory init( final SubPartitioner partitioner, final int kafkaPartition ) { - return (client, partition, batchSize) -> new TTDWriter<>(this, partition); + return (client, partition) -> new TTDWriter<>(this, partition); } @Override