Skip to content

Commit

Permalink
Remove batchSize parameter from writers (#179)
Browse files Browse the repository at this point in the history
Relatively small refactor to clear up how batches are handled/sized between the CommitBuffer and RemoteWriters. It seems that the batchSize we pass in is always either the MAX_BATCH_SIZE constant or, in the case of restoration only, the size of the final batch that contains whatever records are remaining up to the MAX_BATCH_SIZE

In other words, the batch size is always capped at MAX_BATCH_SIZE and furthermore, the only time the batchSize parameter is different than the hard-coded constant is when there are fewer updates in the batch and thus the batch will be naturally capped at this smaller size since we will run out of inserts (or deletes) before hitting the batchSize condition. Also, only the lwt writer seems to be using this at the moment, so it feels silly for us to pass this in to every other writer as we do right now.

In addition to just being easier to reason about and removing unnecessarily complexity from the code, this is a necessary cleanup/precursor for the window store implementation
  • Loading branch information
ableegoldman authored Oct 22, 2023
1 parent d0b9d3c commit 26642f6
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public FactWriterFactory(final RemoteTable<K, BoundStatement> table) {
@Override
public RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
) {
return new FactSchemaWriter<>(
client.cassandraClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package dev.responsive.kafka.internal.db;

import static dev.responsive.kafka.internal.stores.CommitBuffer.MAX_BATCH_SIZE;

import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -34,22 +37,38 @@ public class LwtWriter<K> implements RemoteWriter<K> {
private final Supplier<BatchableStatement<?>> fencingStatementFactory;
private final RemoteTable<K, BoundStatement> table;
private final int partition;
private final int batchSize;
private final long maxBatchSize;

private final List<BatchableStatement<?>> statements;

public LwtWriter(
final CassandraClient client,
final Supplier<BatchableStatement<?>> fencingStatementFactory,
final RemoteTable<K, BoundStatement> table,
final int partition
) {
this(
client,
fencingStatementFactory,
table,
partition,
MAX_BATCH_SIZE
);
}

@VisibleForTesting
LwtWriter(
final CassandraClient client,
final Supplier<BatchableStatement<?>> fencingStatementFactory,
final RemoteTable<K, BoundStatement> table,
final int partition,
final int batchSize
final long maxBatchSize
) {
this.client = client;
this.fencingStatementFactory = fencingStatementFactory;
this.table = table;
this.partition = partition;
this.batchSize = batchSize;
this.maxBatchSize = maxBatchSize;

statements = new ArrayList<>();
}
Expand All @@ -74,7 +93,7 @@ public CompletionStage<RemoteWriteResult> flush() {
builder.setIdempotence(true);
builder.addStatement(fencingStatementFactory.get());

for (int i = 0; i < batchSize && it.hasNext(); i++) {
for (int i = 0; i < maxBatchSize && it.hasNext(); i++) {
builder.addStatement(it.next());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,13 @@ public LwtWriterFactory(
@Override
public RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
) {
return new LwtWriter<>(
client.cassandraClient(),
() -> ensureEpoch.bind().setInt(PARTITION_KEY.bind(), partition),
table,
partition,
batchSize
partition
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public interface WriterFactory<K> {

RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public MongoWriterFactory(
@Override
public RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
) {
return new MongoWriter<>(table, partition, genericCollection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;

class CommitBuffer<K, S extends RemoteTable<K, ?>>
public class CommitBuffer<K, S extends RemoteTable<K, ?>>
implements RecordBatchingStateRestoreCallback, Closeable {

public static final int MAX_BATCH_SIZE = 1000;
Expand Down Expand Up @@ -488,8 +488,7 @@ private void doFlush(final long consumedOffset, final int batchSize) {
final RemoteWriter<K> writer = writers
.computeIfAbsent(subPartition, k -> writerFactory.createWriter(
sessionClients,
subPartition,
batchSize
subPartition
));

if (result.isTombstone) {
Expand All @@ -509,7 +508,7 @@ private void doFlush(final long consumedOffset, final int batchSize) {
// the first subpartition
final var offsetWriteResult = writers.computeIfAbsent(
subPartitioner.first(changelog.partition()),
subPartition -> writerFactory.createWriter(sessionClients, subPartition, batchSize)
subPartition -> writerFactory.createWriter(sessionClients, subPartition)
).setOffset(consumedOffset);

if (!offsetWriteResult.wasApplied()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* * Copyright 2023 Responsive Computing, Inc.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.stores;
package dev.responsive.kafka.internal.db;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* * Copyright 2023 Responsive Computing, Inc.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.stores;
package dev.responsive.kafka.internal.db;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -28,10 +30,6 @@
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.internal.core.cql.DefaultBatchStatement;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.CassandraKeyValueTable;
import dev.responsive.kafka.internal.db.LwtWriter;
import dev.responsive.kafka.internal.db.WriterFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -51,9 +49,7 @@
class LwtWriterTest {

private static final long CURRENT_TS = 100L;

@Mock
private WriterFactory<?> writerFactory;

@Mock
private CassandraClient client;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public WriterFactory<K> init(
final SubPartitioner partitioner,
final int kafkaPartition
) {
return (client, partition, batchSize) -> new TTDWriter<>(this, partition);
return (client, partition) -> new TTDWriter<>(this, partition);
}

@Override
Expand Down

0 comments on commit 26642f6

Please sign in to comment.