Skip to content

Commit

Permalink
Add support for Batch-level QueryOptions.
Browse files Browse the repository at this point in the history
Closes #1192
  • Loading branch information
mp911de committed Sep 9, 2024
1 parent 7d0a718 commit bb2b7a4
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Collections;

import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -59,6 +60,16 @@ public interface CassandraBatchOperations {
*/
CassandraBatchOperations withTimestamp(long timestamp);

/**
* Apply given {@link QueryOptions} to the whole batch statement.
*
* @param options the options to apply.
* @return {@code this} {@link CassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
CassandraBatchOperations withQueryOptions(QueryOptions options);

/**
* Add a {@link BatchableStatement statement} to the batch.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.springframework.data.cassandra.core.convert.CassandraConverter;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
Expand Down Expand Up @@ -58,6 +59,8 @@ class CassandraBatchTemplate implements CassandraBatchOperations {

private final StatementFactory statementFactory;

private QueryOptions options = QueryOptions.empty();

/**
* Create a new {@link CassandraBatchTemplate} given {@link CassandraOperations} and {@link BatchType}.
*
Expand Down Expand Up @@ -114,7 +117,8 @@ protected StatementFactory getStatementFactory() {
public WriteResult execute() {

if (this.executed.compareAndSet(false, true)) {
return WriteResult.of(this.operations.getCqlOperations().queryForResultSet(batch.build()));
BatchStatement statement = QueryOptionsUtil.addQueryOptions(batch.build(), this.options);
return WriteResult.of(this.operations.getCqlOperations().queryForResultSet(statement));
}

throw new IllegalStateException("This Cassandra Batch was already executed");
Expand All @@ -130,9 +134,21 @@ public CassandraBatchOperations withTimestamp(long timestamp) {
return this;
}

@Override
public CassandraBatchOperations withQueryOptions(QueryOptions options) {

assertNotExecuted();
Assert.notNull(options, "QueryOptions must not be null");

this.options = options;

return this;
}

@Override
public CassandraBatchOperations addStatement(BatchableStatement<?> statement) {

assertNotExecuted();
Assert.notNull(statement, "Statement must not be null");

this.batch.addStatement(statement);
Expand All @@ -143,6 +159,7 @@ public CassandraBatchOperations addStatement(BatchableStatement<?> statement) {
@Override
public CassandraBatchOperations addStatements(BatchableStatement<?>... statements) {

assertNotExecuted();
Assert.notNull(statements, "Statements must not be null");

this.batch.addStatements(statements);
Expand All @@ -154,6 +171,7 @@ public CassandraBatchOperations addStatements(BatchableStatement<?>... statement
@SuppressWarnings("unchecked")
public CassandraBatchOperations addStatements(Iterable<? extends BatchableStatement<?>> statements) {

assertNotExecuted();
Assert.notNull(statements, "Statements must not be null");

this.batch.addStatements((Iterable<BatchableStatement<?>>) statements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.reactivestreams.Subscriber;

import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -65,6 +66,16 @@ public interface ReactiveCassandraBatchOperations {
*/
ReactiveCassandraBatchOperations withTimestamp(long timestamp);

/**
* Apply given {@link QueryOptions} to the whole batch statement.
*
* @param options the options to apply.
* @return {@code this} {@link CassandraBatchOperations}.
* @throws IllegalStateException if the batch was already executed.
* @since 4.4
*/
ReactiveCassandraBatchOperations withQueryOptions(QueryOptions options);

/**
* Add a {@link BatchableStatement statement} to the batch.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.springframework.data.cassandra.core.convert.CassandraConverter;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.QueryOptionsUtil;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity;
import org.springframework.data.cassandra.core.mapping.CassandraMappingContext;
Expand Down Expand Up @@ -66,6 +67,8 @@ class ReactiveCassandraBatchTemplate implements ReactiveCassandraBatchOperations

private final StatementFactory statementFactory;

private QueryOptions options = QueryOptions.empty();

/**
* Create a new {@link CassandraBatchTemplate} given {@link CassandraOperations} and {@link BatchType}.
*
Expand Down Expand Up @@ -141,7 +144,8 @@ public Mono<WriteResult> execute() {

this.batch.addStatements((List<BatchableStatement<?>>) statements);

return this.operations.getReactiveCqlOperations().queryForResultSet(this.batch.build());
return this.operations.getReactiveCqlOperations()
.queryForResultSet(QueryOptionsUtil.addQueryOptions(this.batch.build(), this.options));
}) //
.flatMap(resultSet -> resultSet.rows().collectList()
.map(rows -> new WriteResult(resultSet.getAllExecutionInfo(), resultSet.wasApplied(), rows)));
Expand All @@ -160,9 +164,21 @@ public ReactiveCassandraBatchOperations withTimestamp(long timestamp) {
return this;
}

@Override
public ReactiveCassandraBatchOperations withQueryOptions(QueryOptions options) {

assertNotExecuted();
Assert.notNull(options, "QueryOptions must not be null");

this.options = options;

return this;
}

@Override
public ReactiveCassandraBatchOperations addStatement(Mono<? extends BatchableStatement<?>> statement) {

assertNotExecuted();
Assert.notNull(statement, "Statement mono must not be null");

this.batchMonos.add(statement.map(List::of));
Expand All @@ -174,6 +190,7 @@ public ReactiveCassandraBatchOperations addStatement(Mono<? extends BatchableSta
public ReactiveCassandraBatchOperations addStatements(
Mono<? extends Iterable<? extends BatchableStatement<?>>> statements) {

assertNotExecuted();
Assert.notNull(statements, "Statements mono must not be null");

this.batchMonos.add(statements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.data.cassandra.CassandraConnectionFailureException;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.domain.FlatGroup;
import org.springframework.data.cassandra.domain.Group;
import org.springframework.data.cassandra.domain.GroupKey;
import org.springframework.data.cassandra.repository.support.SchemaTestUtils;
import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTests;

import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
Expand Down Expand Up @@ -297,7 +301,19 @@ void shouldApplyTimestampToAllEntities() {
}
}

@Test // DATACASS-288
@Test // GH-1192
void shouldApplyQueryOptions() {

QueryOptions options = QueryOptions.builder().consistencyLevel(ConsistencyLevel.THREE).build();

CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED);
CassandraBatchOperations ops = batchOperations.insert(walter).withQueryOptions(options);

assertThatExceptionOfType(CassandraConnectionFailureException.class).isThrownBy(ops::execute)
.withRootCauseInstanceOf(AllNodesFailedException.class);
}

@Test // GH-1192
void shouldNotExecuteTwice() {

CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.data.cassandra.CassandraConnectionFailureException;
import org.springframework.data.cassandra.ReactiveResultSet;
import org.springframework.data.cassandra.core.convert.MappingCassandraConverter;
import org.springframework.data.cassandra.core.cql.QueryOptions;
import org.springframework.data.cassandra.core.cql.ReactiveCqlTemplate;
import org.springframework.data.cassandra.core.cql.WriteOptions;
import org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession;
Expand All @@ -41,6 +43,8 @@
import org.springframework.data.cassandra.repository.support.SchemaTestUtils;
import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTests;

import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
Expand Down Expand Up @@ -153,8 +157,7 @@ void shouldInsertCollectionOfEntitiesWithTtl() {
.then(template.getReactiveCqlOperations().queryForResultSet("SELECT TTL(email), email FROM group;"));

resultSet.flatMapMany(ReactiveResultSet::availableRows) //
.collectList()
.as(StepVerifier::create) //
.collectList().as(StepVerifier::create) //
.assertNext(rows -> {

for (Row row : rows) {
Expand Down Expand Up @@ -252,8 +255,7 @@ void shouldUpdateCollectionOfEntitiesWithTtl() {
.then(template.getReactiveCqlOperations().queryForResultSet("SELECT TTL(email), email FROM group;"));

resultSet.flatMapMany(ReactiveResultSet::availableRows) //
.collectList()
.as(StepVerifier::create) //
.collectList().as(StepVerifier::create) //
.assertNext(rows -> {

for (Row row : rows) {
Expand Down Expand Up @@ -388,6 +390,20 @@ void shouldApplyTimestampToAllEntities() {
.assertNext(row -> assertThat(row.getLong(0)).isEqualTo(timestamp)).verifyComplete();
}

@Test // GH-1192
void shouldApplyQueryOptions() {

QueryOptions options = QueryOptions.builder().consistencyLevel(ConsistencyLevel.THREE).build();

ReactiveCassandraBatchOperations batchOperations = new ReactiveCassandraBatchTemplate(template, BatchType.LOGGED);
Mono<WriteResult> execute = batchOperations.insert(walter).insert(mike).withQueryOptions(options).execute();

execute.as(StepVerifier::create).verifyErrorSatisfies(e -> {
assertThat(e).isInstanceOf(CassandraConnectionFailureException.class)
.hasRootCauseInstanceOf(AllNodesFailedException.class);
});
}

@Test // DATACASS-574
void shouldNotExecuteTwice() {

Expand Down Expand Up @@ -428,10 +444,12 @@ void shouldSupportMultithreadedMerge() {

for (int i = 0; i < 100; i++) {

batchOperations.insert(Mono.just(Arrays.asList(new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
new Group(new GroupKey("users", "0x1", "walter" + random.longs())))).publishOn(Schedulers.boundedElastic()));
batchOperations.insert(Mono
.just(Arrays.asList(new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
new Group(new GroupKey("users", "0x1", "walter" + random.longs())),
new Group(new GroupKey("users", "0x1", "walter" + random.longs()))))
.publishOn(Schedulers.boundedElastic()));
}

batchOperations.execute()
Expand Down

0 comments on commit bb2b7a4

Please sign in to comment.