Skip to content

Commit

Permalink
[TH2-4731] Using refactored iterators in cradle-cassandra (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
LukaTchumburidze authored Jun 28, 2023
1 parent 62467dc commit c4bcb64
Show file tree
Hide file tree
Showing 53 changed files with 2,381 additions and 516 deletions.
14 changes: 13 additions & 1 deletion cradle-cassandra/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation "com.datastax.oss:java-driver-mapper-processor:${driver_version}"
implementation "com.datastax.oss:java-driver-mapper-runtime:${driver_version}"
implementation 'io.prometheus:simpleclient_dropwizard:0.16.0'
implementation 'com.google.guava:guava'

// this section is required to bypass failing vulnerability check caused by cassandra driver's transitive dependencies
annotationProcessor platform('com.exactpro.th2:bom:4.3.0')
Expand All @@ -28,6 +29,8 @@ dependencies {
testImplementation 'org.assertj:assertj-core:3.12.2'
testImplementation "org.apache.logging.log4j:log4j-slf4j2-impl"
testImplementation 'org.apache.logging.log4j:log4j-core'
testImplementation 'org.mockito:mockito-core:5.2.0'
testImplementation 'org.testcontainers:cassandra:1.17.5'
}

def gen_dir = 'build/generated/sources/annotationProcessor/main'
Expand All @@ -38,7 +41,16 @@ compileJava {
}

test {
useTestNG()
useTestNG() {
suites "src/test/resources/core.xml"
}
}

tasks.register('integrationTest', Test) {
group = 'verification'
useTestNG() {
suites "src/test/resources/integration.xml"
}
}

jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.exactpro.cradle.cassandra.dao.statistics.MessageStatisticsOperator;
import com.exactpro.cradle.cassandra.dao.statistics.SessionStatisticsOperator;
import com.exactpro.cradle.cassandra.dao.testevents.*;
import com.exactpro.cradle.cassandra.iterators.ConvertingPagedIterator;
import com.exactpro.cradle.cassandra.iterators.PagedIterator;
import com.exactpro.cradle.cassandra.keyspaces.CradleInfoKeyspaceCreator;
import com.exactpro.cradle.cassandra.metrics.DriverMetrics;
Expand All @@ -56,6 +55,7 @@
import com.exactpro.cradle.counters.CounterSample;
import com.exactpro.cradle.counters.Interval;
import com.exactpro.cradle.intervals.IntervalsWorker;
import com.exactpro.cradle.iterators.ConvertingIterator;
import com.exactpro.cradle.messages.*;
import com.exactpro.cradle.resultset.CradleResultSet;
import com.exactpro.cradle.testevents.StoredTestEvent;
Expand All @@ -75,7 +75,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1349,12 +1348,14 @@ protected CompletableFuture<Iterator<PageInfo>> doGetPagesAsync (BookId bookId,
endDate,
endTime,
readAttrs)
.thenApply(rs -> new ConvertingPagedIterator<>(rs,
selectExecutor,
0,
new AtomicInteger(0),
PageEntity::toPageInfo,
(el) -> operators.getPageEntityConverter().getEntity(el), queryInfo));
.thenApply(rs -> {
PagedIterator<PageEntity> pagedIterator = new PagedIterator<>(rs,
selectExecutor,
operators.getPageEntityConverter()::getEntity,
queryInfo);

return new ConvertingIterator<>(pagedIterator, PageEntity::toPageInfo);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.exactpro.cradle.cassandra.counters;

import com.exactpro.cradle.BookId;
import com.exactpro.cradle.EntityType;
import com.exactpro.cradle.serialization.SerializedEntityMetadata;

import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
import com.exactpro.cradle.BookId;
import com.exactpro.cradle.cassandra.EntityConverter;
import com.exactpro.cradle.cassandra.iterators.ConvertingPagedIterator;
import com.exactpro.cradle.cassandra.iterators.PagedIterator;
import com.exactpro.cradle.cassandra.workers.Worker;
import com.exactpro.cradle.cassandra.workers.WorkerSupplies;
import com.exactpro.cradle.intervals.Interval;
import com.exactpro.cradle.intervals.IntervalsWorker;
import com.exactpro.cradle.iterators.ConvertingIterator;
import com.exactpro.cradle.utils.CradleStorageException;
import com.exactpro.cradle.utils.UpdateNotAppliedException;
import org.slf4j.Logger;
Expand All @@ -39,7 +40,6 @@
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static com.exactpro.cradle.CradleStorage.TIMEZONE_OFFSET;
Expand Down Expand Up @@ -83,14 +83,17 @@ private Interval mapEntityToInterval (IntervalEntity entity) {
}

private Iterable<Interval> getIntervalsIterator(MappedAsyncPagingIterable<IntervalEntity> iterable, String queryInfo) {
return () -> new ConvertingPagedIterator<>(
iterable,
selectQueryExecutor,
0,
new AtomicInteger(0),
this::mapEntityToInterval,
converter::getEntity,
queryInfo);
return () -> {
PagedIterator<IntervalEntity> pagedIterator = new PagedIterator<>(iterable,
selectQueryExecutor,
converter::getEntity,
queryInfo);

return new ConvertingIterator<>(
pagedIterator,
this::mapEntityToInterval);
};

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.exactpro.cradle.cassandra.dao.messages;

import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.Row;
import com.exactpro.cradle.BookInfo;
Expand All @@ -24,30 +25,45 @@
import com.exactpro.cradle.PageInfo;
import com.exactpro.cradle.cassandra.dao.CassandraOperators;
import com.exactpro.cradle.cassandra.dao.messages.converters.MessageBatchEntityConverter;
import com.exactpro.cradle.cassandra.dao.messages.sequences.MessageBatchIteratorCondition;
import com.exactpro.cradle.cassandra.dao.messages.sequences.MessageBatchIteratorFilter;
import com.exactpro.cradle.cassandra.dao.messages.sequences.SequenceRange;
import com.exactpro.cradle.cassandra.dao.messages.sequences.SequenceRangeExtractor;
import com.exactpro.cradle.cassandra.iterators.PagedIterator;
import com.exactpro.cradle.cassandra.resultset.IteratorProvider;
import com.exactpro.cradle.cassandra.retries.SelectQueryExecutor;
import com.exactpro.cradle.cassandra.utils.FilterUtils;
import com.exactpro.cradle.cassandra.workers.MessagesWorker;
import com.exactpro.cradle.filters.FilterForAny;
import com.exactpro.cradle.filters.FilterForGreater;
import com.exactpro.cradle.filters.FilterForLess;
import com.exactpro.cradle.iterators.ConvertingIterator;
import com.exactpro.cradle.iterators.FilteringIterator;
import com.exactpro.cradle.iterators.TakeWhileIterator;
import com.exactpro.cradle.messages.MessageFilter;
import com.exactpro.cradle.messages.StoredMessageBatch;
import com.exactpro.cradle.utils.CradleStorageException;
import com.exactpro.cradle.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static com.exactpro.cradle.cassandra.dao.messages.MessageBatchEntity.FIELD_FIRST_MESSAGE_TIME;

abstract public class AbstractMessageIteratorProvider<T> extends IteratorProvider<T>
{
abstract public class AbstractMessageIteratorProvider<T> extends IteratorProvider<T> {

private static final Logger logger = LoggerFactory.getLogger(AbstractMessageIteratorProvider.class);
protected final MessageBatchOperator op;
protected final MessageBatchEntityConverter messageBatchEntityConverter;
protected final MessageBatchEntityConverter converter;
protected final BookInfo book;
protected final ExecutorService composingService;
protected final SelectQueryExecutor selectQueryExecutor;
Expand All @@ -56,17 +72,21 @@ abstract public class AbstractMessageIteratorProvider<T> extends IteratorProvide
protected PageInfo firstPage, lastPage;
protected final Function<BoundStatementBuilder, BoundStatementBuilder> readAttrs;
protected final MessageFilter filter;
/** limit must be strictly positive ( limit greater than 0 ) */
protected final int limit;
protected final AtomicInteger returned;
protected CassandraStoredMessageFilter cassandraFilter;
protected final MessageBatchIteratorFilter<MessageBatchEntity> batchFilter;
protected final MessageBatchIteratorCondition<MessageBatchEntity> iterationCondition;
protected TakeWhileIterator<MessageBatchEntity> takeWhileIterator;

public AbstractMessageIteratorProvider(String requestInfo, MessageFilter filter, CassandraOperators operators, BookInfo book,
ExecutorService composingService, SelectQueryExecutor selectQueryExecutor,
Function<BoundStatementBuilder, BoundStatementBuilder> readAttrs) throws CradleStorageException
{
super(requestInfo);
this.op = operators.getMessageBatchOperator();
this.messageBatchEntityConverter = operators.getMessageBatchEntityConverter();
this.converter = operators.getMessageBatchEntityConverter();
this.book = book;
this.composingService = composingService;
this.selectQueryExecutor = selectQueryExecutor;
Expand All @@ -77,6 +97,23 @@ public AbstractMessageIteratorProvider(String requestInfo, MessageFilter filter,
this.leftBoundFilter = createLeftBoundFilter(filter);
this.rightBoundFilter = createRightBoundFilter(filter);
this.cassandraFilter = createInitialFilter(filter);

FilterForAny<Long> sequenceFilter = filter.getSequence();
MessageBatchIteratorFilter<MessageBatchEntity> batchFilter;
MessageBatchIteratorCondition<MessageBatchEntity> iterationCondition;
if (sequenceFilter == null) {
batchFilter = MessageBatchIteratorFilter.none();
iterationCondition = MessageBatchIteratorCondition.none();
} else {
SequenceRangeExtractor<MessageBatchEntity> extractor = entity -> new SequenceRange(
entity.getSequence(),
entity.getLastSequence());
batchFilter = new MessageBatchIteratorFilter<>(filter, extractor);
iterationCondition = new MessageBatchIteratorCondition<>(filter, extractor);
}

this.batchFilter = batchFilter;
this.iterationCondition = iterationCondition;
}

protected FilterForGreater<Instant> createLeftBoundFilter(MessageFilter filter) throws CradleStorageException
Expand Down Expand Up @@ -200,4 +237,46 @@ protected CassandraStoredMessageFilter createNextFilter(CassandraStoredMessageFi
updatedLimit,
filter.getOrder());
}

protected boolean performNextIteratorChecks () {
if (cassandraFilter == null) {
return false;
}

if (takeWhileIterator != null && takeWhileIterator.isHalted()) {
logger.debug("Iterator was interrupted because iterator condition was not met");
return false;
}

if (limit > 0 && returned.get() >= limit) {
logger.debug("Filtering interrupted because limit for records to return ({}) is reached ({})", limit, returned);
return false;
}

return true;
}

protected Iterator<StoredMessageBatch> getBatchedIterator (MappedAsyncPagingIterable<MessageBatchEntity> resultSet) {
PageId pageId = new PageId(book.getId(), cassandraFilter.getPage());
// Updated limit should be smaller, since we already got entities from previous batch
cassandraFilter = createNextFilter(cassandraFilter, Math.max(limit - returned.get(),0));

PagedIterator<MessageBatchEntity> pagedIterator = new PagedIterator<>(
resultSet,
selectQueryExecutor,
converter::getEntity,
getRequestInfo());
FilteringIterator<MessageBatchEntity> filteringIterator = new FilteringIterator<>(
pagedIterator,
batchFilter::test);
// We need to store this iterator since
// it gives info whether or no iterator was halted
takeWhileIterator = new TakeWhileIterator<>(
filteringIterator,
iterationCondition);

return new ConvertingIterator<>(
takeWhileIterator, entity ->
MessagesWorker.mapMessageBatchEntity(pageId, entity));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import com.exactpro.cradle.Order;
import com.exactpro.cradle.cassandra.dao.CassandraFilter;
import com.exactpro.cradle.cassandra.utils.FilterUtils;
Expand All @@ -33,7 +34,9 @@

public class CassandraGroupedMessageFilter implements CassandraFilter<GroupedMessageBatchEntity> {
private final String book, page, groupName;
private final Integer limit;

/** limit must be strictly positive ( limit greater than 0 ) */
private final int limit;
private final FilterForGreater<Instant> messageTimeFrom;
private final FilterForLess<Instant> messageTimeTo;
private final Order order;
Expand Down Expand Up @@ -66,7 +69,7 @@ public Select addConditions(Select select) {
if (messageTimeTo != null)
select = FilterUtils.timestampFilterToWhere(messageTimeTo.getOperation(), select, FIELD_FIRST_MESSAGE_DATE, FIELD_FIRST_MESSAGE_TIME, DATE_TO, TIME_TO);

if (limit != 0) {
if (limit > 0) {
select = select.limit(limit);
}

Expand Down Expand Up @@ -108,7 +111,7 @@ public String getGroupName() {
return groupName;
}

public Integer getLimit() {
public int getLimit() {
return limit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.querybuilder.relation.MultiColumnRelationBuilder;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import com.exactpro.cradle.Order;
import com.exactpro.cradle.cassandra.dao.CassandraFilter;
import com.exactpro.cradle.cassandra.utils.FilterUtils;
Expand Down Expand Up @@ -49,7 +50,8 @@ public class CassandraStoredMessageFilter implements CassandraFilter<MessageBatc
private final FilterForLess<Instant> messageTimeTo;
private final FilterForAny<Long> sequence;

private final Integer limit;
/** limit must be strictly positive ( limit greater than 0 ) */
private final int limit;

private final Order order;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

public class FilteredGroupedMessageBatchIterator extends MappedIterator<StoredGroupedMessageBatch, StoredGroupedMessageBatch>
{
private FilterForGreater<Instant> filterFrom;
private FilterForLess<Instant> filterTo;
private final FilterForGreater<Instant> filterFrom;
private final FilterForLess<Instant> filterTo;


public FilteredGroupedMessageBatchIterator(Iterator<StoredGroupedMessageBatch> sourceIterator, GroupedMessageFilter filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Stream;

public class FilteredMessageIterator extends MappedIterator<StoredMessageBatch, StoredMessage>
{
Expand Down
Loading

0 comments on commit c4bcb64

Please sign in to comment.