Skip to content

Commit

Permalink
[TH2-5207] skip null values when writing to Cassandra (#258)
Browse files Browse the repository at this point in the history
Co-authored-by: Oleg Smelov <[email protected]>
Co-authored-by: nikita.smirnov <[email protected]>
  • Loading branch information
3 people authored Jul 2, 2024
1 parent bc6f219 commit 7dd2bdd
Show file tree
Hide file tree
Showing 11 changed files with 353 additions and 41 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ Test events have mandatory parameters that are verified when storing an event. T
## Release notes

### 5.4.1
* Skip null values for 'type', 'messages' and 'labels' fields when writing to Cassandra to avoid tombstones creation.
* Migrated to th2 gradle plugin: `0.0.8` (bom: `4.6.1`)
* Page interval in page cache hols all pages covered the interval.<br>
Fixed the problem - components reload page interval and last page each time when page starts in day before and ends day after requested period.
* Corrected default settings:
Expand All @@ -218,7 +220,7 @@ Test events have mandatory parameters that are verified when storing an event. T
* composingServiceThreads: `5` -> `1`

### 5.4.0
* Using internal executor instead of ForkJoinPool.commonPool() to process intermediate tasks
* Using internal executor instead of ForkJoinPool.commonPool() to process intermediate tasks

### 5.3.0
+ Migrated to th2 gradle plugin: `0.0.5` (bom: `4.6.1`)
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
plugins {
id "com.exactpro.th2.gradle.base" version "0.0.5"
id "com.exactpro.th2.gradle.publish" version "0.0.5"
id "com.exactpro.th2.gradle.base" version "0.0.8"
id "com.exactpro.th2.gradle.publish" version "0.0.8"
}

allprojects {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.cradle.cassandra.dao;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Set;
import java.util.function.Function;

/**
* This wrapper skips null values to avoid tombstones creation in Cassandra tables
*/
public class BoundStatementBuilderWrapper {
private BoundStatementBuilder builder;

public BoundStatementBuilderWrapper setConsistencyLevel(@Nullable ConsistencyLevel consistencyLevel) {
builder.setConsistencyLevel(consistencyLevel);
return this;
}

public BoundStatementBuilderWrapper setTimeout(@Nullable Duration timeout) {
builder.setTimeout(timeout);
return this;
}

private BoundStatementBuilderWrapper(PreparedStatement statement) {
builder = statement.boundStatementBuilder();
}

public BoundStatementBuilderWrapper setBoolean(@NonNull String name, boolean v) {
builder = builder.setBoolean(name, v);
return this;
}

public BoundStatementBuilderWrapper setByte(@NonNull String name, byte v) {
builder = builder.setByte(name, v);
return this;
}

public BoundStatementBuilderWrapper setInt(@NonNull String name, int v) {
builder = builder.setInt(name, v);
return this;
}

public BoundStatementBuilderWrapper setLong(@NonNull String name, long v) {
builder = builder.setLong(name, v);
return this;
}

public BoundStatementBuilderWrapper setInstant(@NonNull String name, @Nullable Instant v) {
if (v != null) {
builder = builder.setInstant(name, v);
}
return this;
}

public BoundStatementBuilderWrapper setLocalDate(@NonNull String name, @Nullable LocalDate v) {
if (v != null) {
builder = builder.setLocalDate(name, v);
}
return this;
}

public BoundStatementBuilderWrapper setLocalTime(@NonNull String name, @Nullable LocalTime v) {
if (v != null) {
builder = builder.setLocalTime(name, v);
}
return this;
}

public BoundStatementBuilderWrapper setByteBuffer(@NonNull String name, @Nullable ByteBuffer v) {
if (v != null) {
builder = builder.setByteBuffer(name, v);
}
return this;
}

public BoundStatementBuilderWrapper setString(@NonNull String name, @Nullable String v) {
if (v != null) {
builder = builder.setString(name, v);
}
return this;
}

public <ElementT> BoundStatementBuilderWrapper setSet(@NonNull String name, @Nullable Set<ElementT> v, @NonNull Class<ElementT> elementsClass) {
if (v != null) {
builder = builder.setSet(name, v, elementsClass);
}
return this;
}

public BoundStatementBuilderWrapper apply(Function<BoundStatementBuilder, BoundStatementBuilder> func) {
builder = func.apply(builder);
return this;
}

public BoundStatement build() {
return builder.build();
}

public static BoundStatementBuilderWrapper builder(PreparedStatement statement) {
return new BoundStatementBuilderWrapper(statement);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2022-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.cradle.cassandra.dao.messages;

import com.datastax.oss.driver.api.core.CqlSession;
Expand All @@ -7,6 +23,7 @@
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.mapper.MapperContext;
import com.datastax.oss.driver.api.mapper.entity.EntityHelper;
import com.exactpro.cradle.cassandra.dao.BoundStatementBuilderWrapper;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;
Expand All @@ -15,7 +32,17 @@
import static com.exactpro.cradle.cassandra.dao.CradleEntity.FIELD_COMPRESSED;
import static com.exactpro.cradle.cassandra.dao.CradleEntity.FIELD_CONTENT;
import static com.exactpro.cradle.cassandra.dao.CradleEntity.FIELD_LABELS;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.*;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_ALIAS_GROUP;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_BOOK;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_CONTENT_SIZE;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_FIRST_MESSAGE_DATE;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_FIRST_MESSAGE_TIME;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_LAST_MESSAGE_DATE;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_LAST_MESSAGE_TIME;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_MESSAGE_COUNT;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_PAGE;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_REC_DATE;
import static com.exactpro.cradle.cassandra.dao.messages.GroupedMessageBatchEntity.FIELD_UNCOMPRESSED_CONTENT_SIZE;

public class GroupedMessageBatchInserter {
private final CqlSession session;
Expand All @@ -26,8 +53,9 @@ public GroupedMessageBatchInserter(MapperContext context, EntityHelper<GroupedMe
this.insertStatement = session.prepare(helper.insert().build());
}

public CompletableFuture<AsyncResultSet> insert(GroupedMessageBatchEntity groupedMessageBatch, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) {
BoundStatementBuilder builder = insertStatement.boundStatementBuilder()
public CompletableFuture<AsyncResultSet> insert(GroupedMessageBatchEntity groupedMessageBatch,
Function<BoundStatementBuilder, BoundStatementBuilder> attributes) {
BoundStatement statement = BoundStatementBuilderWrapper.builder(insertStatement)
.setString(FIELD_BOOK, groupedMessageBatch.getBook())
.setString(FIELD_PAGE, groupedMessageBatch.getPage())
.setString(FIELD_ALIAS_GROUP, groupedMessageBatch.getGroup())
Expand All @@ -43,11 +71,11 @@ public CompletableFuture<AsyncResultSet> insert(GroupedMessageBatchEntity groupe
.setByteBuffer(FIELD_CONTENT, groupedMessageBatch.getContent())
.setInstant(FIELD_REC_DATE, Instant.now())
.setInt(FIELD_CONTENT_SIZE, groupedMessageBatch.getContentSize())
.setInt(FIELD_UNCOMPRESSED_CONTENT_SIZE, groupedMessageBatch.getUncompressedContentSize());
.setInt(FIELD_UNCOMPRESSED_CONTENT_SIZE, groupedMessageBatch.getUncompressedContentSize())

.apply(attributes)
.build();

attributes.apply(builder);
BoundStatement statement = builder.build();
return session.executeAsync(statement).toCompletableFuture();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,11 +28,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import com.exactpro.cradle.cassandra.dao.BoundStatementBuilderWrapper;
import static com.exactpro.cradle.cassandra.dao.messages.MessageBatchEntity.*;


public class MessageBatchInserter {

private final CqlSession session;
private final PreparedStatement insertStatement;

Expand All @@ -42,7 +41,7 @@ public MessageBatchInserter(MapperContext context, EntityHelper<MessageBatchEnti
}

public CompletableFuture<AsyncResultSet> insert(MessageBatchEntity messageBatch, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) {
BoundStatementBuilder builder = insertStatement.boundStatementBuilder()
BoundStatement statement = BoundStatementBuilderWrapper.builder(insertStatement)
.setString(FIELD_BOOK, messageBatch.getBook())
.setString(FIELD_PAGE, messageBatch.getPage())
.setString(FIELD_SESSION_ALIAS, messageBatch.getSessionAlias())
Expand All @@ -61,11 +60,11 @@ public CompletableFuture<AsyncResultSet> insert(MessageBatchEntity messageBatch,
.setByteBuffer(FIELD_CONTENT, messageBatch.getContent())
.setInstant(FIELD_REC_DATE, Instant.now())
.setInt(FIELD_CONTENT_SIZE, messageBatch.getContentSize())
.setInt(FIELD_UNCOMPRESSED_CONTENT_SIZE, messageBatch.getUncompressedContentSize());
.setInt(FIELD_UNCOMPRESSED_CONTENT_SIZE, messageBatch.getUncompressedContentSize())

.apply(attributes)
.build();

attributes.apply(builder);
BoundStatement statement = builder.build();
return session.executeAsync(statement).toCompletableFuture();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2021-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.cradle.cassandra.dao.testevents;

import com.datastax.oss.driver.api.core.CqlSession;
Expand All @@ -27,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import com.exactpro.cradle.cassandra.dao.BoundStatementBuilderWrapper;
import static com.exactpro.cradle.cassandra.dao.testevents.TestEventEntity.*;

public class TestEvenInserter {
Expand All @@ -39,8 +41,9 @@ public TestEvenInserter(MapperContext context, EntityHelper<TestEventEntity> hel
this.insertStatement = session.prepare(helper.insert().build());
}

public CompletableFuture<AsyncResultSet> insert(TestEventEntity testEvent, Function<BoundStatementBuilder, BoundStatementBuilder> attributes) {
BoundStatementBuilder builder = insertStatement.boundStatementBuilder()
public CompletableFuture<AsyncResultSet> insert(TestEventEntity testEvent,
Function<BoundStatementBuilder, BoundStatementBuilder> attributes) {
BoundStatement statement = BoundStatementBuilderWrapper.builder(insertStatement)
.setString(FIELD_BOOK, testEvent.getBook())
.setString(FIELD_PAGE, testEvent.getPage())
.setString(FIELD_SCOPE, testEvent.getScope())
Expand All @@ -64,11 +67,11 @@ public CompletableFuture<AsyncResultSet> insert(TestEventEntity testEvent, Funct
.setByteBuffer(FIELD_CONTENT, testEvent.getContent())
.setInstant(FIELD_REC_DATE, Instant.now())
.setInt(FIELD_CONTENT_SIZE, testEvent.getContentSize())
.setInt(FIELD_UNCOMPRESSED_CONTENT_SIZE, testEvent.getUncompressedContentSize());
.setInt(FIELD_UNCOMPRESSED_CONTENT_SIZE, testEvent.getUncompressedContentSize())

.apply(attributes)
.build();

attributes.apply(builder);
BoundStatement statement = builder.build();
return session.executeAsync(statement).toCompletableFuture();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,11 +37,12 @@ public QueryExecutor(CqlSession session, long timeout, ConsistencyLevel writeCon
this.readConsistencyLevel = readConsistencyLevel;
}

private ResultSet executeQuery(String cqlQuery, ConsistencyLevel consistencyLevel) throws IOException {
private ResultSet executeQuery(String cqlQuery, ConsistencyLevel consistencyLevel, boolean tracing) throws IOException {

SimpleStatement statement = SimpleStatement.newInstance(cqlQuery)
.setTimeout(Duration.ofMillis(timeout))
.setConsistencyLevel(consistencyLevel);
.setConsistencyLevel(consistencyLevel)
.setTracing(tracing);

ResultSet rs = session.execute(statement);
if (!rs.wasApplied())
Expand All @@ -50,14 +51,18 @@ private ResultSet executeQuery(String cqlQuery, ConsistencyLevel consistencyLeve
}

public ResultSet executeWrite(String cqlQuery) throws IOException {
return executeQuery(cqlQuery, writeConsistencyLevel);
return executeQuery(cqlQuery, writeConsistencyLevel, false);
}

public ResultSet executeRead(String cqlQuery) throws IOException {
return executeQuery(cqlQuery, readConsistencyLevel);
return executeQuery(cqlQuery, readConsistencyLevel, false);
}

public ResultSet executeReadWithTracing(String cqlQuery) throws IOException {
return executeQuery(cqlQuery, readConsistencyLevel, true);
}

public CqlSession getSession() {
return session;
}
}
}
Loading

0 comments on commit 7dd2bdd

Please sign in to comment.