Skip to content

Commit

Permalink
remove batch size param
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed Oct 22, 2023
1 parent dce016a commit 320bd3e
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public FactWriterFactory(final RemoteTable<K, BoundStatement> table) {
@Override
public RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
) {
return new FactSchemaWriter<>(
client.cassandraClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -36,20 +37,38 @@ public class LwtWriter<K> implements RemoteWriter<K> {
private final Supplier<BatchableStatement<?>> fencingStatementFactory;
private final RemoteTable<K, BoundStatement> table;
private final int partition;
private final long maxBatchSize;

private final List<BatchableStatement<?>> statements;

public LwtWriter(
final CassandraClient client,
final Supplier<BatchableStatement<?>> fencingStatementFactory,
final RemoteTable<K, BoundStatement> table,
final int partition
) {
this(
client,
fencingStatementFactory,
table,
partition,
MAX_BATCH_SIZE
);
}

@VisibleForTesting
LwtWriter(
final CassandraClient client,
final Supplier<BatchableStatement<?>> fencingStatementFactory,
final RemoteTable<K, BoundStatement> table,
final int partition,
final int batchSize
final long maxBatchSize
) {
this.client = client;
this.fencingStatementFactory = fencingStatementFactory;
this.table = table;
this.partition = partition;
this.maxBatchSize = maxBatchSize;

statements = new ArrayList<>();
}
Expand All @@ -74,7 +93,7 @@ public CompletionStage<RemoteWriteResult> flush() {
builder.setIdempotence(true);
builder.addStatement(fencingStatementFactory.get());

for (int i = 0; i < MAX_BATCH_SIZE && it.hasNext(); i++) {
for (int i = 0; i < maxBatchSize && it.hasNext(); i++) {
builder.addStatement(it.next());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,13 @@ public LwtWriterFactory(
@Override
public RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
) {
return new LwtWriter<>(
client.cassandraClient(),
() -> ensureEpoch.bind().setInt(PARTITION_KEY.bind(), partition),
table,
partition,
batchSize
partition
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public interface WriterFactory<K> {

RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public MongoWriterFactory(
@Override
public RemoteWriter<K> createWriter(
final SessionClients client,
final int partition,
final int batchSize
final int partition
) {
return new MongoWriter<>(table, partition, genericCollection);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* * Copyright 2023 Responsive Computing, Inc.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.stores;
package dev.responsive.kafka.internal.db;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
/*
* Copyright 2023 Responsive Computing, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* * Copyright 2023 Responsive Computing, Inc.
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.responsive.kafka.internal.stores;
package dev.responsive.kafka.internal.db;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -28,10 +30,6 @@
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.internal.core.cql.DefaultBatchStatement;
import dev.responsive.kafka.internal.db.CassandraClient;
import dev.responsive.kafka.internal.db.CassandraKeyValueTable;
import dev.responsive.kafka.internal.db.LwtWriter;
import dev.responsive.kafka.internal.db.WriterFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -51,9 +49,7 @@
class LwtWriterTest {

private static final long CURRENT_TS = 100L;

@Mock
private WriterFactory<?> writerFactory;

@Mock
private CassandraClient client;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public WriterFactory<K> init(
final SubPartitioner partitioner,
final int kafkaPartition
) {
return (client, partition, batchSize) -> new TTDWriter<>(this, partition);
return (client, partition) -> new TTDWriter<>(this, partition);
}

@Override
Expand Down

0 comments on commit 320bd3e

Please sign in to comment.