Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Nov 12, 2024
1 parent 4224319 commit afe51e9
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public FilteredGroupedMessageBatchIterator(Iterator<StoredGroupedMessageBatch> s
}

@Override
Iterator<StoredGroupedMessageBatch> getTargetIterator(Iterator<StoredGroupedMessageBatch> sourceIterator)
Iterator<StoredGroupedMessageBatch> createTargetIterator(Iterator<StoredGroupedMessageBatch> sourceIterator)
{
Predicate<StoredGroupedMessageBatch> filterPredicate = createFilterPredicate();
return Streams.stream(sourceIterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public FilteredMessageIterator(Iterator<StoredMessageBatch> batchIterator, Messa
}

@Override
Iterator<StoredMessage> getTargetIterator(Iterator<StoredMessageBatch> sourceIterator)
Iterator<StoredMessage> createTargetIterator(Iterator<StoredMessageBatch> sourceIterator)
{
Predicate<StoredMessage> filterPredicate = createFilterPredicate();
return Streams.stream(sourceIterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public MappedIterator(Iterator<S> sourceIterator, int limit, AtomicInteger retur
this.returned = returned;
}

abstract Iterator<T> getTargetIterator(Iterator<S> sourceIterator);
abstract Iterator<T> createTargetIterator(Iterator<S> sourceIterator);

@Override
public boolean hasNext()
{
if(targetIterator == null) {
targetIterator = getTargetIterator(sourceIterator);
targetIterator = createTargetIterator(sourceIterator);
}
return (limit <= 0 || returned.get() < limit) && targetIterator.hasNext();
}
Expand All @@ -48,7 +48,7 @@ public boolean hasNext()
public T next()
{
if(targetIterator == null) {
targetIterator = getTargetIterator(sourceIterator);
targetIterator = createTargetIterator(sourceIterator);
}
returned.incrementAndGet();
return targetIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public abstract class BaseCradleCassandraTest {

private final long storeActionRejectionThreshold = new CoreStorageSettings().calculateStoreActionRejectionThreshold();

protected final Instant dataStart = Instant.now().minus(1, ChronoUnit.HOURS);
protected final Instant dataStart = Instant.now().minus(70, ChronoUnit.MINUTES);
protected final BookId bookId = generateBookId();

protected CqlSession session;
Expand All @@ -80,10 +80,11 @@ public abstract class BaseCradleCassandraTest {
private final PageToAdd page_a3 = new PageToAdd(DEFAULT_PAGE_PREFIX + 3, dataStart.plus(30, ChronoUnit.MINUTES), "");
private final PageToAdd page_a4 = new PageToAdd(DEFAULT_PAGE_PREFIX + 4, dataStart.plus(40, ChronoUnit.MINUTES), "");
private final PageToAdd page_a5 = new PageToAdd(DEFAULT_PAGE_PREFIX + 5, dataStart.plus(50, ChronoUnit.MINUTES), "");
private final PageToAdd page_a6 = new PageToAdd(DEFAULT_PAGE_PREFIX + 6, dataStart.plus(60, ChronoUnit.MINUTES), "");

private final List<PageToAdd> pagesToAdd = List.of(
page_r2, page_r1,
page_a0, page_a1, page_a2, page_a3, page_a4, page_a5
page_a0, page_a1, page_a2, page_a3, page_a4, page_a5, page_a6
);
private final List<PageId> pageIdToRemove = Stream.of(page_r2, page_r1)
.map(page -> new PageId(bookId, page.getStart(), page.getName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,21 @@ protected void generateData() {
b6.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 53, 15L));
b6.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 55, 16L));

// page 6 contains 2 messages from batch 7, 3 from batch 8 and 1 from batch 1
// contains 1 group test_group3
// contains 1 session alias test_session_alias6
// Those batches are added to test different cases with limit 1 filter.
GroupedMessageBatchToStore b7 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b7.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 55, 17L));
b7.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 55, 18L));
b7.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 65, 17L));
b7.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 65, 18L));

GroupedMessageBatchToStore b8 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 56, 18L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 58, 19L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 59, 20L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 66, 18L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 68, 19L));
b8.addMessage(generateMessage(SESSION_ALIAS6, Direction.SECOND, 69, 20L));

GroupedMessageBatchToStore b9 = new GroupedMessageBatchToStore(GROUP3_NAME, 1024, storeActionRejectionThreshold);
b9.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 60, 21L));
b9.addMessage(generateMessage(SESSION_ALIAS6, Direction.FIRST, 70, 21L));

List<GroupedMessageBatchToStore> data = List.of(b1, b2, b3, b4, b5, b6, b7, b8, b9);
for (GroupedMessageBatchToStore el : data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void getGroupedMessagesWithWideIntervalTest2() throws CradleStorageExcept
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(25, ChronoUnit.MINUTES))
.timestampTo().isLessThan(dataStart.plus(61, ChronoUnit.MINUTES))
.timestampTo().isLessThan(dataStart.plus(71, ChronoUnit.MINUTES))
.build();
var actual = storage.getGroupedMessageBatches(filter);
var resultAsList = Lists.newArrayList(actual.asIterable());
Expand Down Expand Up @@ -198,10 +198,10 @@ public void getGroupedMessageWithLeftTimeLimitInTheMiddleOfTheBatch() throws Cra
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(45, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 without time borders. First page first batch expected.")
@Test(description = "Get grouped messages with limit 1 without time borders.")
public void getGroupedMessageWithoutTimeLimits() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP2_NAME)
.groupName(GROUP3_NAME)
.bookId(bookId)
.limit(1)
.build();
Expand All @@ -211,8 +211,9 @@ public void getGroupedMessageWithoutTimeLimits() throws CradleStorageException,
var actual = storage.getGroupedMessageBatches(filter);
var resultAsList = Lists.newArrayList(actual.asIterable());
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(5, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(4);
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(25, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(29, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 with time borders corresponding to no message.")
Expand All @@ -237,8 +238,8 @@ public void getGroupedMessageWithBothTimeLimitsResultInTheMiddleOfBatch() throws
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(57, ChronoUnit.MINUTES))
.timestampTo().isLessThan(dataStart.plus(59, ChronoUnit.MINUTES))
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(67, ChronoUnit.MINUTES))
.timestampTo().isLessThan(dataStart.plus(69, ChronoUnit.MINUTES))
.limit(1)
.build();

Expand All @@ -249,17 +250,17 @@ public void getGroupedMessageWithBothTimeLimitsResultInTheMiddleOfBatch() throws
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(66, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(69, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 with time borders corresponding batch8 but not equal to first and last timestamp")
public void getGroupedMessageWithBothTimeLimitsCoveringWholeBatch() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(55 * 60 + 1, ChronoUnit.SECONDS))
.timestampTo().isLessThan(dataStart.plus(61, ChronoUnit.MINUTES))
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(65 * 60 + 1, ChronoUnit.SECONDS))
.timestampTo().isLessThan(dataStart.plus(71, ChronoUnit.MINUTES))
.limit(1)
.build();

Expand All @@ -270,8 +271,8 @@ public void getGroupedMessageWithBothTimeLimitsCoveringWholeBatch() throws Cradl
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(66, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(69, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 with start time limit corresponding to batch5 but less than its start time.")
Expand Down Expand Up @@ -299,7 +300,7 @@ public void getGroupedMessageWithLeftLimitBeforeLastBatchStart() throws CradleSt
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(56 * 60 - 1, ChronoUnit.SECONDS))
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(66 * 60 - 1, ChronoUnit.SECONDS))
.limit(1)
.build();

Expand All @@ -310,16 +311,16 @@ public void getGroupedMessageWithLeftLimitBeforeLastBatchStart() throws CradleSt
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(66, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(69, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1 with start time limit is the same as start time of the batch")
public void getGroupedMessagesWithLimitInReverseStartTimesEqual() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(60, ChronoUnit.MINUTES))
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(70, ChronoUnit.MINUTES))
.order(Order.REVERSE)
.limit(1)
.build();
Expand All @@ -331,16 +332,16 @@ public void getGroupedMessagesWithLimitInReverseStartTimesEqual() throws CradleS
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(1);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(70, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(70, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1.")
public void getGroupedMessagesWithLimitInReverseStartTimeRequestLessThanStartTimeBatch() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(59, ChronoUnit.MINUTES))
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(69, ChronoUnit.MINUTES))
.order(Order.REVERSE)
.limit(1)
.build();
Expand All @@ -352,17 +353,17 @@ public void getGroupedMessagesWithLimitInReverseStartTimeRequestLessThanStartTim
Assertions.assertThat(resultAsList.size()).isEqualTo(1);
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(1);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(60, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(70, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(70, ChronoUnit.MINUTES));
}

@Test(description = "Get grouped messages with limit 1.")
public void getGroupedMessagesWithLimitInReverseStartTimeRequestInTheMiddleOfTheBatch() throws CradleStorageException, IOException {
GroupedMessageFilter filter = GroupedMessageFilter.builder()
.groupName(GROUP3_NAME)
.bookId(bookId)
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(58, ChronoUnit.MINUTES))
.timestampTo().isLessThanOrEqualTo(dataStart.plus(59, ChronoUnit.MINUTES))
.timestampFrom().isGreaterThanOrEqualTo(dataStart.plus(68, ChronoUnit.MINUTES))
.timestampTo().isLessThanOrEqualTo(dataStart.plus(69, ChronoUnit.MINUTES))
.order(Order.REVERSE)
.limit(1)
.build();
Expand All @@ -375,7 +376,7 @@ public void getGroupedMessagesWithLimitInReverseStartTimeRequestInTheMiddleOfThe
System.out.println(resultAsList.get(0).getFirstTimestamp() + " " + resultAsList.get(0).getLastTimestamp());
Assertions.assertThat(resultAsList.get(0).getMessages().size()).isEqualTo(3);

Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(56, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(59, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getFirstTimestamp()).isEqualTo(dataStart.plus(66, ChronoUnit.MINUTES));
Assertions.assertThat(resultAsList.get(0).getLastTimestamp()).isEqualTo(dataStart.plus(69, ChronoUnit.MINUTES));
}
}

0 comments on commit afe51e9

Please sign in to comment.