diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java index 77e961b4a..c17799a5a 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchOperations.java @@ -17,6 +17,7 @@ import java.util.Collections; +import org.springframework.data.cassandra.core.cql.QueryOptions; import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.util.Assert; @@ -59,6 +60,16 @@ public interface CassandraBatchOperations { */ CassandraBatchOperations withTimestamp(long timestamp); + /** + * Apply given {@link QueryOptions} to the whole batch statement. + * + * @param options the options to apply. + * @return {@code this} {@link CassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + CassandraBatchOperations withQueryOptions(QueryOptions options); + /** * Add a {@link BatchableStatement statement} to the batch. * diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java index e569f4b57..5ab5b8639 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java @@ -20,6 +20,7 @@ import org.springframework.data.cassandra.core.convert.CassandraConverter; import org.springframework.data.cassandra.core.cql.QueryOptions; +import org.springframework.data.cassandra.core.cql.QueryOptionsUtil; import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity; import org.springframework.data.cassandra.core.mapping.CassandraMappingContext; @@ -58,6 +59,8 @@ class CassandraBatchTemplate implements CassandraBatchOperations { private final StatementFactory statementFactory; + private QueryOptions options = QueryOptions.empty(); + /** * Create a new {@link CassandraBatchTemplate} given {@link CassandraOperations} and {@link BatchType}. * @@ -114,7 +117,8 @@ protected StatementFactory getStatementFactory() { public WriteResult execute() { if (this.executed.compareAndSet(false, true)) { - return WriteResult.of(this.operations.getCqlOperations().queryForResultSet(batch.build())); + BatchStatement statement = QueryOptionsUtil.addQueryOptions(batch.build(), this.options); + return WriteResult.of(this.operations.getCqlOperations().queryForResultSet(statement)); } throw new IllegalStateException("This Cassandra Batch was already executed"); @@ -130,9 +134,21 @@ public CassandraBatchOperations withTimestamp(long timestamp) { return this; } + @Override + public CassandraBatchOperations withQueryOptions(QueryOptions options) { + + assertNotExecuted(); + Assert.notNull(options, "QueryOptions must not be null"); + + this.options = options; + + return this; + } + @Override public CassandraBatchOperations addStatement(BatchableStatement statement) { + assertNotExecuted(); Assert.notNull(statement, "Statement must not be null"); this.batch.addStatement(statement); @@ -143,6 +159,7 @@ public CassandraBatchOperations addStatement(BatchableStatement statement) { @Override public CassandraBatchOperations addStatements(BatchableStatement... statements) { + assertNotExecuted(); Assert.notNull(statements, "Statements must not be null"); this.batch.addStatements(statements); @@ -154,6 +171,7 @@ public CassandraBatchOperations addStatements(BatchableStatement... statement @SuppressWarnings("unchecked") public CassandraBatchOperations addStatements(Iterable> statements) { + assertNotExecuted(); Assert.notNull(statements, "Statements must not be null"); this.batch.addStatements((Iterable>) statements); diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java index fb7c02cab..fcf4cf8fd 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchOperations.java @@ -22,6 +22,7 @@ import org.reactivestreams.Subscriber; +import org.springframework.data.cassandra.core.cql.QueryOptions; import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.util.Assert; @@ -65,6 +66,16 @@ public interface ReactiveCassandraBatchOperations { */ ReactiveCassandraBatchOperations withTimestamp(long timestamp); + /** + * Apply given {@link QueryOptions} to the whole batch statement. + * + * @param options the options to apply. + * @return {@code this} {@link CassandraBatchOperations}. + * @throws IllegalStateException if the batch was already executed. + * @since 4.4 + */ + ReactiveCassandraBatchOperations withQueryOptions(QueryOptions options); + /** * Add a {@link BatchableStatement statement} to the batch. * diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java index c4038606a..dee207954 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplate.java @@ -27,6 +27,7 @@ import org.springframework.data.cassandra.core.convert.CassandraConverter; import org.springframework.data.cassandra.core.cql.QueryOptions; +import org.springframework.data.cassandra.core.cql.QueryOptionsUtil; import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity; import org.springframework.data.cassandra.core.mapping.CassandraMappingContext; @@ -66,6 +67,8 @@ class ReactiveCassandraBatchTemplate implements ReactiveCassandraBatchOperations private final StatementFactory statementFactory; + private QueryOptions options = QueryOptions.empty(); + /** * Create a new {@link CassandraBatchTemplate} given {@link CassandraOperations} and {@link BatchType}. * @@ -141,7 +144,8 @@ public Mono execute() { this.batch.addStatements((List>) statements); - return this.operations.getReactiveCqlOperations().queryForResultSet(this.batch.build()); + return this.operations.getReactiveCqlOperations() + .queryForResultSet(QueryOptionsUtil.addQueryOptions(this.batch.build(), this.options)); }) // .flatMap(resultSet -> resultSet.rows().collectList() .map(rows -> new WriteResult(resultSet.getAllExecutionInfo(), resultSet.wasApplied(), rows))); @@ -160,9 +164,21 @@ public ReactiveCassandraBatchOperations withTimestamp(long timestamp) { return this; } + @Override + public ReactiveCassandraBatchOperations withQueryOptions(QueryOptions options) { + + assertNotExecuted(); + Assert.notNull(options, "QueryOptions must not be null"); + + this.options = options; + + return this; + } + @Override public ReactiveCassandraBatchOperations addStatement(Mono> statement) { + assertNotExecuted(); Assert.notNull(statement, "Statement mono must not be null"); this.batchMonos.add(statement.map(List::of)); @@ -174,6 +190,7 @@ public ReactiveCassandraBatchOperations addStatement(Mono>> statements) { + assertNotExecuted(); Assert.notNull(statements, "Statements mono must not be null"); this.batchMonos.add(statements); diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java index 628805a25..29c2d5c57 100644 --- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java +++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/CassandraBatchTemplateIntegrationTests.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.data.cassandra.CassandraConnectionFailureException; +import org.springframework.data.cassandra.core.cql.QueryOptions; import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.data.cassandra.domain.FlatGroup; import org.springframework.data.cassandra.domain.Group; @@ -31,6 +33,8 @@ import org.springframework.data.cassandra.repository.support.SchemaTestUtils; import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTests; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; @@ -297,7 +301,19 @@ void shouldApplyTimestampToAllEntities() { } } - @Test // DATACASS-288 + @Test // GH-1192 + void shouldApplyQueryOptions() { + + QueryOptions options = QueryOptions.builder().consistencyLevel(ConsistencyLevel.THREE).build(); + + CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED); + CassandraBatchOperations ops = batchOperations.insert(walter).withQueryOptions(options); + + assertThatExceptionOfType(CassandraConnectionFailureException.class).isThrownBy(ops::execute) + .withRootCauseInstanceOf(AllNodesFailedException.class); + } + + @Test // GH-1192 void shouldNotExecuteTwice() { CassandraBatchOperations batchOperations = new CassandraBatchTemplate(template, BatchType.LOGGED); diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java index f1d4e4534..90edc5b01 100644 --- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java +++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/ReactiveCassandraBatchTemplateIntegrationTests.java @@ -30,8 +30,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.data.cassandra.CassandraConnectionFailureException; import org.springframework.data.cassandra.ReactiveResultSet; import org.springframework.data.cassandra.core.convert.MappingCassandraConverter; +import org.springframework.data.cassandra.core.cql.QueryOptions; import org.springframework.data.cassandra.core.cql.ReactiveCqlTemplate; import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.data.cassandra.core.cql.session.DefaultBridgedReactiveSession; @@ -41,6 +43,8 @@ import org.springframework.data.cassandra.repository.support.SchemaTestUtils; import org.springframework.data.cassandra.test.util.AbstractKeyspaceCreatingIntegrationTests; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.SimpleStatement; @@ -153,8 +157,7 @@ void shouldInsertCollectionOfEntitiesWithTtl() { .then(template.getReactiveCqlOperations().queryForResultSet("SELECT TTL(email), email FROM group;")); resultSet.flatMapMany(ReactiveResultSet::availableRows) // - .collectList() - .as(StepVerifier::create) // + .collectList().as(StepVerifier::create) // .assertNext(rows -> { for (Row row : rows) { @@ -252,8 +255,7 @@ void shouldUpdateCollectionOfEntitiesWithTtl() { .then(template.getReactiveCqlOperations().queryForResultSet("SELECT TTL(email), email FROM group;")); resultSet.flatMapMany(ReactiveResultSet::availableRows) // - .collectList() - .as(StepVerifier::create) // + .collectList().as(StepVerifier::create) // .assertNext(rows -> { for (Row row : rows) { @@ -388,6 +390,20 @@ void shouldApplyTimestampToAllEntities() { .assertNext(row -> assertThat(row.getLong(0)).isEqualTo(timestamp)).verifyComplete(); } + @Test // GH-1192 + void shouldApplyQueryOptions() { + + QueryOptions options = QueryOptions.builder().consistencyLevel(ConsistencyLevel.THREE).build(); + + ReactiveCassandraBatchOperations batchOperations = new ReactiveCassandraBatchTemplate(template, BatchType.LOGGED); + Mono execute = batchOperations.insert(walter).insert(mike).withQueryOptions(options).execute(); + + execute.as(StepVerifier::create).verifyErrorSatisfies(e -> { + assertThat(e).isInstanceOf(CassandraConnectionFailureException.class) + .hasRootCauseInstanceOf(AllNodesFailedException.class); + }); + } + @Test // DATACASS-574 void shouldNotExecuteTwice() { @@ -428,10 +444,12 @@ void shouldSupportMultithreadedMerge() { for (int i = 0; i < 100; i++) { - batchOperations.insert(Mono.just(Arrays.asList(new Group(new GroupKey("users", "0x1", "walter" + random.longs())), - new Group(new GroupKey("users", "0x1", "walter" + random.longs())), - new Group(new GroupKey("users", "0x1", "walter" + random.longs())), - new Group(new GroupKey("users", "0x1", "walter" + random.longs())))).publishOn(Schedulers.boundedElastic())); + batchOperations.insert(Mono + .just(Arrays.asList(new Group(new GroupKey("users", "0x1", "walter" + random.longs())), + new Group(new GroupKey("users", "0x1", "walter" + random.longs())), + new Group(new GroupKey("users", "0x1", "walter" + random.longs())), + new Group(new GroupKey("users", "0x1", "walter" + random.longs())))) + .publishOn(Schedulers.boundedElastic())); } batchOperations.execute()