From 26642f6dd09d49adf90051527a68ed049319ac38 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Sat, 21 Oct 2023 22:50:12 -0700 Subject: [PATCH] Remove batchSize parameter from writers (#179) Relatively small refactor to clear up how batches are handled/sized between the CommitBuffer and RemoteWriters. It seems that the batchSize we pass in is always either the MAX_BATCH_SIZE constant or, in the case of restoration only, the size of the final batch that contains whatever records are remaining up to the MAX_BATCH_SIZE In other words, the batch size is always capped at MAX_BATCH_SIZE and furthermore, the only time the batchSize parameter is different than the hard-coded constant is when there are fewer updates in the batch and thus the batch will be naturally capped at this smaller size since we will run out of inserts (or deletes) before hitting the batchSize condition. Also, only the lwt writer seems to be using this at the moment, so it feels silly for us to pass this in to every other writer as we do right now. In addition to just being easier to reason about and removing unnecessarily complexity from the code, this is a necessary cleanup/precursor for the window store implementation --- .../kafka/internal/db/FactWriterFactory.java | 3 +- .../kafka/internal/db/LwtWriter.java | 27 ++++++++++++--- .../kafka/internal/db/LwtWriterFactory.java | 6 ++-- .../kafka/internal/db/WriterFactory.java | 3 +- .../internal/db/mongo/MongoWriterFactory.java | 3 +- .../kafka/internal/stores/CommitBuffer.java | 7 ++-- .../{stores => db}/FactSchemaWriterTest.java | 26 +++++++------- .../{stores => db}/LwtWriterTest.java | 34 ++++++++----------- .../kafka/internal/stores/TTDTable.java | 2 +- 9 files changed, 61 insertions(+), 50 deletions(-) rename kafka-client/src/test/java/dev/responsive/kafka/internal/{stores => db}/FactSchemaWriterTest.java (83%) rename kafka-client/src/test/java/dev/responsive/kafka/internal/{stores => db}/LwtWriterTest.java (84%) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java index 148275b02..e935b3daf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/FactWriterFactory.java @@ -30,8 +30,7 @@ public FactWriterFactory(final RemoteTable table) { @Override public RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ) { return new FactSchemaWriter<>( client.cassandraClient(), diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java index c441800c8..c676d7813 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriter.java @@ -16,11 +16,14 @@ package dev.responsive.kafka.internal.db; +import static dev.responsive.kafka.internal.stores.CommitBuffer.MAX_BATCH_SIZE; + import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.BatchableStatement; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import dev.responsive.kafka.internal.stores.RemoteWriteResult; import java.util.ArrayList; import java.util.List; @@ -34,22 +37,38 @@ public class LwtWriter implements RemoteWriter { private final Supplier> fencingStatementFactory; private final RemoteTable table; private final int partition; - private final int batchSize; + private final long maxBatchSize; private final List> statements; public LwtWriter( + final CassandraClient client, + final Supplier> fencingStatementFactory, + final RemoteTable table, + final int partition + ) { + this( + client, + fencingStatementFactory, + table, + partition, + MAX_BATCH_SIZE + ); + } + + @VisibleForTesting + LwtWriter( final CassandraClient client, final Supplier> fencingStatementFactory, final RemoteTable table, final int partition, - final int batchSize + final long maxBatchSize ) { this.client = client; this.fencingStatementFactory = fencingStatementFactory; this.table = table; this.partition = partition; - this.batchSize = batchSize; + this.maxBatchSize = maxBatchSize; statements = new ArrayList<>(); } @@ -74,7 +93,7 @@ public CompletionStage flush() { builder.setIdempotence(true); builder.addStatement(fencingStatementFactory.get()); - for (int i = 0; i < batchSize && it.hasNext(); i++) { + for (int i = 0; i < maxBatchSize && it.hasNext(); i++) { builder.addStatement(it.next()); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java index d3900d20a..e5a5ed3e6 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/LwtWriterFactory.java @@ -136,15 +136,13 @@ public LwtWriterFactory( @Override public RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ) { return new LwtWriter<>( client.cassandraClient(), () -> ensureEpoch.bind().setInt(PARTITION_KEY.bind(), partition), table, - partition, - batchSize + partition ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java index 9e53d0b16..9bc3235f9 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/WriterFactory.java @@ -22,8 +22,7 @@ public interface WriterFactory { RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java index 14c317084..36c32ff1c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWriterFactory.java @@ -40,8 +40,7 @@ public MongoWriterFactory( @Override public RemoteWriter createWriter( final SessionClients client, - final int partition, - final int batchSize + final int partition ) { return new MongoWriter<>(table, partition, genericCollection); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java index 6e23019fa..bb06c7dd7 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java @@ -85,7 +85,7 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.slf4j.Logger; -class CommitBuffer> +public class CommitBuffer> implements RecordBatchingStateRestoreCallback, Closeable { public static final int MAX_BATCH_SIZE = 1000; @@ -488,8 +488,7 @@ private void doFlush(final long consumedOffset, final int batchSize) { final RemoteWriter writer = writers .computeIfAbsent(subPartition, k -> writerFactory.createWriter( sessionClients, - subPartition, - batchSize + subPartition )); if (result.isTombstone) { @@ -509,7 +508,7 @@ private void doFlush(final long consumedOffset, final int batchSize) { // the first subpartition final var offsetWriteResult = writers.computeIfAbsent( subPartitioner.first(changelog.partition()), - subPartition -> writerFactory.createWriter(sessionClients, subPartition, batchSize) + subPartition -> writerFactory.createWriter(sessionClients, subPartition) ).setOffset(consumedOffset); if (!offsetWriteResult.wasApplied()) { diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/FactSchemaWriterTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/FactSchemaWriterTest.java similarity index 83% rename from kafka-client/src/test/java/dev/responsive/kafka/internal/stores/FactSchemaWriterTest.java rename to kafka-client/src/test/java/dev/responsive/kafka/internal/db/FactSchemaWriterTest.java index e0611bfdf..c5493e91a 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/FactSchemaWriterTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/FactSchemaWriterTest.java @@ -1,20 +1,22 @@ /* - * Copyright 2023 Responsive Computing, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * * Copyright 2023 Responsive Computing, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ -package dev.responsive.kafka.internal.stores; +package dev.responsive.kafka.internal.db; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/LwtWriterTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java similarity index 84% rename from kafka-client/src/test/java/dev/responsive/kafka/internal/stores/LwtWriterTest.java rename to kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java index 1ce600545..21e514769 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/LwtWriterTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/db/LwtWriterTest.java @@ -1,20 +1,22 @@ /* - * Copyright 2023 Responsive Computing, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * * Copyright 2023 Responsive Computing, Inc. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ -package dev.responsive.kafka.internal.stores; +package dev.responsive.kafka.internal.db; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -28,10 +30,6 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.internal.core.cql.DefaultBatchStatement; -import dev.responsive.kafka.internal.db.CassandraClient; -import dev.responsive.kafka.internal.db.CassandraKeyValueTable; -import dev.responsive.kafka.internal.db.LwtWriter; -import dev.responsive.kafka.internal.db.WriterFactory; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -51,9 +49,7 @@ class LwtWriterTest { private static final long CURRENT_TS = 100L; - - @Mock - private WriterFactory writerFactory; + @Mock private CassandraClient client; @Mock diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java index 777aeab7d..4b86cae76 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/TTDTable.java @@ -48,7 +48,7 @@ public WriterFactory init( final SubPartitioner partitioner, final int kafkaPartition ) { - return (client, partition, batchSize) -> new TTDWriter<>(this, partition); + return (client, partition) -> new TTDWriter<>(this, partition); } @Override