diff --git a/build.gradle b/build.gradle index a6c9b20..f62177d 100644 --- a/build.gradle +++ b/build.gradle @@ -12,6 +12,7 @@ plugins { id "org.owasp.dependencycheck" version "9.0.9" id 'com.github.jk1.dependency-license-report' version '2.5' id "de.undercouch.download" version "5.4.0" + id "me.champeau.jmh" version "0.7.2" } ext { @@ -66,6 +67,14 @@ jar { } } +jmh { + jmhTimeout = "1m" + iterations = 3 + fork = 2 + warmupIterations = 3 + warmupForks = 2 +} + configurations.configureEach { resolutionStrategy { force "com.exactpro.th2:cradle-core:$cradleVersion" @@ -182,6 +191,9 @@ dependencies { because("Error collector serialise Instant values") } + jmh 'org.openjdk.jmh:jmh-core:1.37' + jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.37' + testImplementation 'org.apache.commons:commons-lang3' testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' testImplementation 'org.mockito:mockito-junit-jupiter:5.10.0' @@ -190,6 +202,7 @@ dependencies { test { useJUnitPlatform() + maxHeapSize = "3G" } application { diff --git a/src/jmh/java/com/exactpro/th2/estore/EventWrapperBenchmark.java b/src/jmh/java/com/exactpro/th2/estore/EventWrapperBenchmark.java new file mode 100644 index 0000000..33443fd --- /dev/null +++ b/src/jmh/java/com/exactpro/th2/estore/EventWrapperBenchmark.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.estore; + +import com.exactpro.cradle.CradleEntitiesFactory; +import com.exactpro.cradle.utils.CradleStorageException; +import com.exactpro.th2.common.grpc.Direction; +import com.exactpro.th2.common.grpc.Event; +import com.exactpro.th2.common.grpc.EventBatch; +import com.exactpro.th2.common.grpc.EventID; +import com.exactpro.th2.common.grpc.MessageID; +import com.google.protobuf.UnsafeByteOperations; +import org.apache.commons.lang3.RandomStringUtils; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +import java.time.Instant; +import java.util.Set; +import java.util.UUID; + +import static com.exactpro.cradle.CoreStorageSettings.DEFAULT_BOOK_REFRESH_INTERVAL_MILLIS; +import static com.exactpro.cradle.CradleStorage.DEFAULT_MAX_MESSAGE_BATCH_SIZE; +import static com.exactpro.cradle.CradleStorage.DEFAULT_MAX_TEST_EVENT_BATCH_SIZE; +import static com.exactpro.th2.common.utils.message.MessageUtilsKt.toTimestamp; +import static org.openjdk.jmh.annotations.Mode.Throughput; + +@State(Scope.Benchmark) +public class EventWrapperBenchmark { + private static final CradleEntitiesFactory FACTORY = new CradleEntitiesFactory(DEFAULT_MAX_MESSAGE_BATCH_SIZE, DEFAULT_MAX_TEST_EVENT_BATCH_SIZE, DEFAULT_BOOK_REFRESH_INTERVAL_MILLIS); + public static final String BOOK = "benchmark-book"; + private static final String SCOPE = "benchmark-scope"; + private static final String SESSION_ALIAS_PREFIX = "benchmark-alias-"; + private static final String EVENT_NAME_PREFIX = "benchmark-event-"; + private static final int CONTENT_SIZE = 500; + private static final int EVENT_NUMBER = 100; + private static final int SESSION_ALIAS_NUMBER = 5; + private static final int MESSAGES_PER_DIRECTION = 2; + @State(Scope.Thread) + public static class EventBatchState { + private EventBatch batch; + @Setup + public void init() { + EventID parentId = EventID.newBuilder() + .setBookName(BOOK) + .setScope(SCOPE) + .setStartTimestamp(toTimestamp(Instant.now())) + .setId(UUID.randomUUID().toString()) + .build(); + EventBatch.Builder batchBuilder = EventBatch.newBuilder() + .setParentEventId(parentId); + + int seqCounter = 0; + for (int eventIndex = 0; eventIndex < EVENT_NUMBER; eventIndex++) { + Event.Builder eventBuilder = Event.newBuilder() + .setId(EventID.newBuilder() + .setBookName(BOOK) + .setScope(SCOPE) + .setStartTimestamp(toTimestamp(Instant.now())) + .setId(UUID.randomUUID().toString())) + .setParentId(parentId) + .setName(EVENT_NAME_PREFIX + eventIndex) + .setBody(UnsafeByteOperations.unsafeWrap(RandomStringUtils.random(CONTENT_SIZE, true, true).getBytes())); + + for (int aliasIndex = 0; aliasIndex < SESSION_ALIAS_NUMBER; aliasIndex++) { + for (Direction direction : Set.of(Direction.FIRST, Direction.SECOND)) { + for (int msgIndex = 0; msgIndex < MESSAGES_PER_DIRECTION; msgIndex++) { + MessageID.Builder messageIdBuilder = MessageID.newBuilder() + .setBookName(BOOK) + .setDirection(direction) + .setTimestamp(toTimestamp(Instant.now())) + .setSequence(++seqCounter); + messageIdBuilder.getConnectionIdBuilder() + .setSessionAlias(SESSION_ALIAS_PREFIX + aliasIndex); + eventBuilder.addAttachedMessageIds(messageIdBuilder.build()); + } + } + } + batchBuilder.addEvents(eventBuilder.build()); + } + batch = batchBuilder.build(); + } + } + + @Benchmark + @BenchmarkMode({Throughput}) + public void benchmarkToCradleBatch(EventBatchState state) throws CradleStorageException { + IEventWrapper.ProtoEventWrapper.toCradleBatch(FACTORY, state.batch); + } +} diff --git a/src/main/java/com/exactpro/th2/estore/ErrorCollector.java b/src/main/java/com/exactpro/th2/estore/ErrorCollector.java index 25a8f7a..b6f8295 100644 --- a/src/main/java/com/exactpro/th2/estore/ErrorCollector.java +++ b/src/main/java/com/exactpro/th2/estore/ErrorCollector.java @@ -46,19 +46,19 @@ @SuppressWarnings("unused") public class ErrorCollector implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(ErrorCollector.class); - private static final Callback PERSIST_CALL_BACK = new LogCallBack(LOGGER, Level.TRACE); + private static final Callback PERSIST_CALL_BACK = new LogCallBack(LOGGER, Level.TRACE); private static final ThreadLocal OBJECT_MAPPER = ThreadLocal.withInitial(() -> new ObjectMapper() .registerModule(new JavaTimeModule()) // otherwise, type supported by JavaTimeModule will be serialized as array of date component .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) .setSerializationInclusion(NON_NULL)); - private static final Persistor DYMMY_PERSISTOR = new DymmyPersistor(); + private static final Persistor DYMMY_PERSISTOR = new DymmyPersistor(); private final ScheduledFuture drainFuture; private final CradleEntitiesFactory entitiesFactory; private final Lock lock = new ReentrantLock(); private volatile StoredTestEventId rootEvent; - private volatile Persistor persistor = DYMMY_PERSISTOR; + private volatile Persistor persistor = DYMMY_PERSISTOR; private Map errors = new HashMap<>(); public ErrorCollector(@NotNull ScheduledExecutorService executor, @@ -76,7 +76,7 @@ public ErrorCollector(@NotNull ScheduledExecutorService executor, this(executor, entitiesFactory, 1, TimeUnit.MINUTES); } - public void init(@NotNull Persistor persistor, StoredTestEventId rootEvent) { + public void init(@NotNull Persistor persistor, StoredTestEventId rootEvent) { this.persistor = requireNonNull(persistor, "Persistor factory can't be null"); this.rootEvent = requireNonNull(rootEvent, "Root event id can't be null"); } @@ -124,7 +124,7 @@ private void drain() { if (map.isEmpty()) { return; } Instant now = Instant.now(); - TestEventSingleToStore eventToStore = entitiesFactory.testEventBuilder() + IEventWrapper eventWrapper = IEventWrapper.wrap(entitiesFactory.testEventBuilder() .id(new StoredTestEventId(rootEvent.getBookId(), rootEvent.getScope(), now, Util.generateId())) .name("estore internal problem(s): " + calculateTotalQty(map.values())) .type("InternalError") @@ -133,9 +133,9 @@ private void drain() { .endTimestamp(now) // Content wrapped to list to use the same format as mstore .content(OBJECT_MAPPER.get().writeValueAsBytes(List.of(new BodyData(map)))) - .build(); + .build()); - persistor.persist(eventToStore, PERSIST_CALL_BACK); + persistor.persist(eventWrapper, PERSIST_CALL_BACK); } catch (Exception e) { LOGGER.error("Drain events task failure", e); } @@ -200,10 +200,10 @@ public void setQuantity(int quantity) { } } - private static class DymmyPersistor implements Persistor { + private static class DymmyPersistor implements Persistor { @Override - public void persist(TestEventToStore data, Callback callback) { + public void persist(IEventWrapper data, Callback callback) { LOGGER.warn( "{} isn't initialised", ErrorCollector.class.getSimpleName()); } } diff --git a/src/main/java/com/exactpro/th2/estore/EventPersistor.java b/src/main/java/com/exactpro/th2/estore/EventPersistor.java index 885bbf8..bf7c6af 100644 --- a/src/main/java/com/exactpro/th2/estore/EventPersistor.java +++ b/src/main/java/com/exactpro/th2/estore/EventPersistor.java @@ -18,7 +18,7 @@ import com.exactpro.cradle.CradleStorage; import com.exactpro.cradle.errors.BookNotFoundException; import com.exactpro.cradle.errors.PageNotFoundException; -import com.exactpro.cradle.testevents.TestEventToStore; +import com.exactpro.cradle.testevents.StoredTestEventIdUtils; import com.exactpro.cradle.utils.CradleStorageException; import com.exactpro.th2.taskutils.BlockingScheduledRetryableTaskQueue; import com.exactpro.th2.taskutils.FutureTracker; @@ -40,7 +40,7 @@ import static com.exactpro.th2.common.utils.ExecutorServiceUtilsKt.shutdownGracefully; import static java.util.Objects.requireNonNull; -public class EventPersistor implements Runnable, Persistor, AutoCloseable { +public class EventPersistor implements Runnable, Persistor, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(EventPersistor.class); private static final String THREAD_NAME_PREFIX = "event-persistor-thread-"; @@ -73,7 +73,7 @@ public EventPersistor(@NotNull ErrorCollector errorCollector, this.taskQueue = new BlockingScheduledRetryableTaskQueue<>(config.getMaxTaskCount(), config.getMaxTaskDataSize(), scheduler); this.futures = new FutureTracker<>(); this.metrics = new EventPersistorMetrics<>(taskQueue); - this.samplerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + this.samplerService = Executors.newScheduledThreadPool(3, THREAD_FACTORY); // FIXME: make thread count configurable } @@ -104,6 +104,7 @@ public void run() { while (!stopped) { try { ScheduledRetryableTask task = taskQueue.awaitScheduled(); + StoredTestEventIdUtils.track(task.getPayload().eventWrapper.id(), "awaited task"); try { processTask(task); } catch (Exception e) { @@ -112,6 +113,8 @@ public void run() { } catch (InterruptedException ie) { LOGGER.debug("Received InterruptedException. aborting"); break; + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); } } } @@ -124,7 +127,7 @@ private void logAndFail(ScheduledRetryableTask task, String err task.getPayload().fail(); } - private void resolveTaskError(ScheduledRetryableTask task, Throwable e) { + private void resolveTaskError(ScheduledRetryableTask task, Throwable e) throws CradleStorageException { if (e instanceof BookNotFoundException || e instanceof PageNotFoundException) { // If following exceptions were thrown there's no point in retrying String message = String.format("Can't retry after %s exception", e.getClass().getSimpleName()); @@ -136,29 +139,12 @@ private void resolveTaskError(ScheduledRetryableTask task, Thro @Override - public void persist(TestEventToStore event, Callback callback) { + public void persist(IEventWrapper event, Callback callback) { metrics.takeQueueMeasurements(); PersistenceTask task = new PersistenceTask(event, callback); - taskQueue.submit(new ScheduledRetryableTask<>(System.nanoTime(), maxTaskRetries, getEventContentSize(event), task)); - } - - - public long getEventContentSize(TestEventToStore event) { - if (event.isSingle()) - return event.asSingle().getContent().length; - else if (event.isBatch()) - return event.asBatch().getBatchSize(); - else - throw new IllegalArgumentException(String.format("Unknown event class (%s)", event.getClass().getSimpleName())); - } - - private int getEventCount(TestEventToStore event) { - if (event.isSingle()) - return 1; - else if (event.isBatch()) - return event.asBatch().getTestEventsCount(); - else - throw new IllegalArgumentException(String.format("Unknown event class (%s)", event.getClass().getSimpleName())); + StoredTestEventIdUtils.track(event.id(), "submitting task"); + taskQueue.submit(new ScheduledRetryableTask<>(System.nanoTime(), maxTaskRetries, event.size(), task)); + StoredTestEventIdUtils.track(event.id(), "submitted task"); } @@ -178,40 +164,65 @@ public void close () { void processTask(ScheduledRetryableTask task) throws IOException, CradleStorageException { - final TestEventToStore event = task.getPayload().eventBatch; + final IEventWrapper eventWrapper = task.getPayload().eventWrapper; final Histogram.Timer timer = metrics.startMeasuringPersistenceLatency(); - CompletableFuture result = cradleStorage.storeTestEventAsync(event) - .thenRun(() -> LOGGER.debug("Stored batch id '{}' parent id '{}'", event.getId(), event.getParentId())) + + StoredTestEventIdUtils.track(eventWrapper.id(), "wait conversion"); + CompletableFuture result = CompletableFuture.supplyAsync(() -> { + try(AutoCloseable ignored = StoredTestEventIdUtils.Statistic.measure("convert")) { + StoredTestEventIdUtils.track(eventWrapper.id(), "converting"); + return eventWrapper.get(); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + throw new RuntimeException(e); + } finally { + StoredTestEventIdUtils.track(eventWrapper.id(), "converted"); + } + }, samplerService) + .thenComposeAsync(event -> + { + try { + return cradleStorage.storeTestEventAsync(event) + .thenApply(ignored -> event); + } catch (IOException | CradleStorageException e) { + throw new RuntimeException(e); + } + }, samplerService) + .thenAccept(event -> LOGGER.debug("Stored batch id '{}' parent id '{}'", event.getId(), event.getParentId())) .whenCompleteAsync((unused, ex) -> { - timer.observeDuration(); - if (ex != null) { - resolveTaskError(task, ex); - } else { - taskQueue.complete(task); - metrics.updateEventMeasurements(getEventCount(event), task.getPayloadSize()); - task.getPayload().complete(); + try { + timer.observeDuration(); + if (ex != null) { + resolveTaskError(task, ex); + } else { + taskQueue.complete(task); + metrics.updateEventMeasurements(eventWrapper.count(), task.getPayloadSize()); + task.getPayload().complete(); + StoredTestEventIdUtils.track(eventWrapper.id(), "completed"); + } + } catch (CradleStorageException e) { + throw new RuntimeException(e); } - } - , samplerService + }, samplerService ); futures.track(result); } - private void logAndRetry(ScheduledRetryableTask task, Throwable e) { + private void logAndRetry(ScheduledRetryableTask task, Throwable e) throws CradleStorageException { metrics.registerPersistenceFailure(); int retriesDone = task.getRetriesDone() + 1; final PersistenceTask persistenceTask = task.getPayload(); - final TestEventToStore eventBatch = persistenceTask.eventBatch; + final IEventWrapper eventBatch = persistenceTask.eventWrapper; if (task.getRetriesLeft() > 0) { errorCollector.collect("Failed to store an event batch, rescheduling"); LOGGER.error("Failed to store the event batch id '{}', {} retries left, rescheduling", - eventBatch.getId(), + eventBatch.get().getId(), task.getRetriesLeft(), e); taskQueue.retry(task); @@ -221,7 +232,7 @@ private void logAndRetry(ScheduledRetryableTask task, Throwable logAndFail(task, "Failed to store an event batch, aborting after several executions", String.format("Failed to store the event batch id '%s', aborting after %d executions", - eventBatch.getId(), + eventBatch.get().getId(), retriesDone), e); } @@ -229,22 +240,22 @@ private void logAndRetry(ScheduledRetryableTask task, Throwable static class PersistenceTask { - final TestEventToStore eventBatch; - final Callback callback; + final IEventWrapper eventWrapper; + final Callback callback; - PersistenceTask(TestEventToStore eventBatch, Callback callback) { - this.eventBatch = eventBatch; + PersistenceTask(IEventWrapper eventWrapper, Callback callback) { + this.eventWrapper = eventWrapper; this.callback = callback; } void complete () { if (callback != null) - callback.onSuccess(eventBatch); + callback.onSuccess(eventWrapper); } void fail() { if (callback != null) - callback.onFail(eventBatch); + callback.onFail(eventWrapper); } } } diff --git a/src/main/java/com/exactpro/th2/estore/EventProcessor.java b/src/main/java/com/exactpro/th2/estore/EventProcessor.java index cc7f590..065a11c 100644 --- a/src/main/java/com/exactpro/th2/estore/EventProcessor.java +++ b/src/main/java/com/exactpro/th2/estore/EventProcessor.java @@ -15,21 +15,14 @@ package com.exactpro.th2.estore; -import com.exactpro.cradle.BookId; import com.exactpro.cradle.CradleEntitiesFactory; -import com.exactpro.cradle.testevents.TestEventBatchToStore; -import com.exactpro.cradle.testevents.TestEventBatchToStoreBuilder; -import com.exactpro.cradle.testevents.TestEventSingleToStore; -import com.exactpro.cradle.testevents.TestEventSingleToStoreBuilder; -import com.exactpro.cradle.testevents.TestEventToStore; -import com.exactpro.cradle.utils.CradleStorageException; +import com.exactpro.cradle.testevents.StoredTestEventIdUtils; import com.exactpro.th2.common.grpc.Event; import com.exactpro.th2.common.grpc.EventBatch; -import com.exactpro.th2.common.grpc.EventBatchOrBuilder; -import com.exactpro.th2.common.grpc.EventOrBuilder; -import com.exactpro.th2.common.schema.message.*; import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation; -import com.exactpro.th2.common.util.StorageUtils; +import com.exactpro.th2.common.schema.message.MessageRouter; +import com.exactpro.th2.common.schema.message.QueueAttribute; +import com.exactpro.th2.common.schema.message.SubscriberMonitor; import com.google.protobuf.TextFormat; import io.prometheus.client.Histogram; import org.jetbrains.annotations.NotNull; @@ -41,8 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; -import java.util.stream.Collectors; +import static com.exactpro.cradle.testevents.StoredTestEventIdUtils.track; import static com.google.protobuf.TextFormat.shortDebugString; import static java.util.Objects.requireNonNull; @@ -52,14 +45,14 @@ public class EventProcessor implements AutoCloseable { private final ErrorCollector errorCollector; private final MessageRouter router; private final CradleEntitiesFactory entitiesFactory; - private final Persistor persistor; + private final Persistor persistor; private SubscriberMonitor monitor; private final EventProcessorMetrics metrics; public EventProcessor(@NotNull ErrorCollector errorCollector, @NotNull MessageRouter router, @NotNull CradleEntitiesFactory entitiesFactory, - @NotNull Persistor persistor) { + @NotNull Persistor persistor) { this.errorCollector = requireNonNull(errorCollector, "Error collector can't be null"); this.router = requireNonNull(router, "Message router can't be null"); this.entitiesFactory = requireNonNull(entitiesFactory, "Cradle entity factory can't be null"); @@ -85,6 +78,7 @@ public void start() { public void process(EventBatch eventBatch, Confirmation confirmation) { + StoredTestEventIdUtils.track(eventBatch.getEvents(0).getId().getId(), "start process"); List events = eventBatch.getEventsList(); if (events.isEmpty()) { if (LOGGER.isWarnEnabled()) @@ -112,12 +106,12 @@ public void close() { private void storeSingleEvents(List events, Confirmation confirmation) { - Callback persistorCallback = new ProcessorCallback(confirmation, events.size()); + Callback persistorCallback = new ProcessorCallback(confirmation, events.size()); events.forEach((event) -> { try { - TestEventSingleToStore cradleEventSingle = toCradleEvent(event); - persist(cradleEventSingle, persistorCallback); + IEventWrapper eventWrapper = IEventWrapper.wrap(entitiesFactory, event); + persist(eventWrapper, persistorCallback); } catch (Exception e) { errorCollector.collect("Failed to process a single event"); if (LOGGER.isErrorEnabled()) { @@ -133,15 +127,15 @@ private void storeSingleEvents(List events, Confirmation confirmation) { private void storeEventBatch(EventBatch eventBatch, Confirmation confirmation) { try { - TestEventBatchToStore cradleBatch = toCradleBatch(eventBatch); - persist(cradleBatch, new Callback<>() { + IEventWrapper eventWrapper = IEventWrapper.wrap(entitiesFactory, eventBatch); + persist(eventWrapper, new Callback<>() { @Override - public void onSuccess(TestEventToStore data) { + public void onSuccess(IEventWrapper data) { confirm(confirmation); } @Override - public void onFail(TestEventToStore data) { + public void onFail(IEventWrapper data) { reject(confirmation); } }); @@ -179,52 +173,15 @@ private boolean reject(Confirmation confirmation) { } - private void persist(TestEventToStore data, Callback callback) throws Exception { + private void persist(IEventWrapper eventWrapper, Callback callback) throws Exception { try (Histogram.Timer ignored = metrics.startMeasuringPersistenceLatency()) { - persistor.persist(data, callback); + persistor.persist(eventWrapper, callback); } } - - public TestEventSingleToStore toCradleEvent(EventOrBuilder protoEvent) throws CradleStorageException { - TestEventSingleToStoreBuilder builder = entitiesFactory - .testEventBuilder() - .id(ProtoUtil.toCradleEventID(protoEvent.getId())) - .name(protoEvent.getName()) - .type(protoEvent.getType()) - .success(ProtoUtil.isSuccess(protoEvent.getStatus())) - .messages(protoEvent.getAttachedMessageIdsList().stream() - .map(ProtoUtil::toStoredMessageId) - .collect(Collectors.toSet())) - .content(protoEvent.getBody().toByteArray()); - if (protoEvent.hasParentId()) { - builder.parentId(ProtoUtil.toCradleEventID(protoEvent.getParentId())); - } - if (protoEvent.hasEndTimestamp()) { - builder.endTimestamp(StorageUtils.toInstant(protoEvent.getEndTimestamp())); - } - return builder.build(); - } - - - private TestEventBatchToStore toCradleBatch(EventBatchOrBuilder protoEventBatch) throws CradleStorageException { - TestEventBatchToStoreBuilder cradleEventBatch = entitiesFactory.testEventBatchBuilder() - .id( - new BookId(protoEventBatch.getParentEventId().getBookName()), - protoEventBatch.getParentEventId().getScope(), - StorageUtils.toInstant(ProtoUtil.getMinStartTimestamp(protoEventBatch.getEventsList())), - Util.generateId() - ) - .parentId(ProtoUtil.toCradleEventID(protoEventBatch.getParentEventId())); - for (Event protoEvent : protoEventBatch.getEventsList()) { - cradleEventBatch.addTestEvent(toCradleEvent(protoEvent)); - } - return cradleEventBatch.build(); - } - - private class ProcessorCallback implements Callback { + private class ProcessorCallback implements Callback { private final AtomicBoolean responded = new AtomicBoolean(false); - private final Map completed = new ConcurrentHashMap<>(); + private final Map completed = new ConcurrentHashMap<>(); private final Confirmation confirmation; private final int eventCount; @@ -234,18 +191,18 @@ public ProcessorCallback(Confirmation confirmation, int eventCount) { } @Override - public void onSuccess(TestEventToStore persistedEvent) { - completed.put(persistedEvent, persistedEvent); + public void onSuccess(IEventWrapper eventWrapper) { + completed.put(eventWrapper, eventWrapper); if (completed.size() == eventCount) { checkAndRespond(responded, () -> confirm(confirmation)); } } @Override - public void onFail(TestEventToStore persistedEvent) { + public void onFail(IEventWrapper eventWrapper) { checkAndRespond(responded, () -> reject(confirmation)); - if (persistedEvent != null) { - completed.put(persistedEvent, persistedEvent); + if (eventWrapper != null) { + completed.put(eventWrapper, eventWrapper); } } diff --git a/src/main/java/com/exactpro/th2/estore/EventStore.java b/src/main/java/com/exactpro/th2/estore/EventStore.java index e17cbb7..6e6733c 100644 --- a/src/main/java/com/exactpro/th2/estore/EventStore.java +++ b/src/main/java/com/exactpro/th2/estore/EventStore.java @@ -105,15 +105,15 @@ private static StoredTestEventId createAndStoreRootEvent(EventPersistor persisto boxConfiguration.getBoxName(), now, Util.generateId()); - TestEventSingleToStore eventToStore = entitiesFactory.testEventBuilder() + IEventWrapper eventWrapper = IEventWrapper.wrap(entitiesFactory.testEventBuilder() .id(rootEventId) .name(boxConfiguration.getBoxName() + " " + now) .type("Microservice") .success(true) .endTimestamp(now) .content(new byte[0]) - .build(); - persistor.persist(eventToStore, new LogCallBack(LOGGER, Level.INFO)); + .build()); + persistor.persist(eventWrapper, new LogCallBack(LOGGER, Level.INFO)); return rootEventId; } diff --git a/src/main/java/com/exactpro/th2/estore/IEventWrapper.java b/src/main/java/com/exactpro/th2/estore/IEventWrapper.java new file mode 100644 index 0000000..35530ae --- /dev/null +++ b/src/main/java/com/exactpro/th2/estore/IEventWrapper.java @@ -0,0 +1,231 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.estore; + +import com.exactpro.cradle.BookId; +import com.exactpro.cradle.CradleEntitiesFactory; +import com.exactpro.cradle.testevents.TestEventBatchToStore; +import com.exactpro.cradle.testevents.TestEventBatchToStoreBuilder; +import com.exactpro.cradle.testevents.TestEventSingleToStore; +import com.exactpro.cradle.testevents.TestEventSingleToStoreBuilder; +import com.exactpro.cradle.testevents.TestEventToStore; +import com.exactpro.cradle.utils.CradleStorageException; +import com.exactpro.th2.common.grpc.Event; +import com.exactpro.th2.common.grpc.EventBatchOrBuilder; +import com.exactpro.th2.common.grpc.EventOrBuilder; +import com.exactpro.th2.common.util.StorageUtils; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public interface IEventWrapper { + int count(); + int size(); + String id(); // FIXME: remove after debugging + TestEventToStore get() throws CradleStorageException; + + static IEventWrapper wrap(TestEventSingleToStore event) { + return new CradleEventSingleWrapper(event); + } + static IEventWrapper wrap(TestEventBatchToStore batch) { + return new CradleEventBatchWrapper(batch); + } + static IEventWrapper wrap(CradleEntitiesFactory entitiesFactory, EventOrBuilder event) { + return new ProtoEventSingleWrapper(entitiesFactory, event); + } + static IEventWrapper wrap(CradleEntitiesFactory entitiesFactory, EventBatchOrBuilder batch) { + return new ProtoEventBatchWrapper(entitiesFactory, batch); + } + + class CradleEventSingleWrapper implements IEventWrapper { + private final TestEventSingleToStore event; + + private CradleEventSingleWrapper(TestEventSingleToStore event) { + this.event = event; + } + + @Override + public int count() { + return 1; + } + + @Override + public int size() { + return event.getSize(); + } + + @Override + public String id() { + return event.getId().getId(); + } + + @Override + public TestEventToStore get() { + return event; + } + } + class CradleEventBatchWrapper implements IEventWrapper { + private final TestEventBatchToStore batch; + + private CradleEventBatchWrapper(TestEventBatchToStore batch) { + this.batch = batch; + } + + @Override + public int count() { + return batch.getTestEvents().size(); + } + + @Override + public int size() { + return batch.getBatchSize(); + } + + @Override + public String id() { + return batch.getTestEvents().iterator().next().getId().getId(); + } + + @Override + public TestEventToStore get() { + return batch; + } + } + class ProtoEventSingleWrapper extends ProtoEventWrapper { + private final EventOrBuilder event; + private ProtoEventSingleWrapper(CradleEntitiesFactory entitiesFactory, EventOrBuilder event) { + super(entitiesFactory); + this.event = event; + } + + @Override + public int count() { + return 1; + } + + @Override + public int size() { + return event.getBody().size(); + } + + @Override + public String id() { + return event.getId().getId(); + } + + @Override + protected TestEventToStore convert() throws CradleStorageException { + return toCradleEvent(entitiesFactory, event); + } + } + class ProtoEventBatchWrapper extends ProtoEventWrapper { + private final EventBatchOrBuilder batch; + + private ProtoEventBatchWrapper(CradleEntitiesFactory entitiesFactory, EventBatchOrBuilder batch) { + super(entitiesFactory); + this.batch = batch; + } + + @Override + public int count() { + return batch.getEventsCount(); + } + + @Override + public int size() { + return batch.getEventsList().stream() + .mapToInt(event -> event.getBody().size()) + .sum(); + } + + @Override + public String id() { + return batch.getEvents(0).getId().getId(); + } + + @Override + protected TestEventToStore convert() throws CradleStorageException { + return toCradleBatch(entitiesFactory, batch); + } + } + + abstract class ProtoEventWrapper implements IEventWrapper { + protected final CradleEntitiesFactory entitiesFactory; + private final Lock lock = new ReentrantLock(); + private volatile TestEventToStore value; + + protected ProtoEventWrapper(CradleEntitiesFactory entitiesFactory) { + this.entitiesFactory = entitiesFactory; + } + + public abstract int count(); + public abstract int size(); + public abstract String id(); // FIXME: remove after debugging + public TestEventToStore get() throws CradleStorageException { + if (value != null) { + return value; + } + lock.lock(); + try { + if (value != null) { + return value; + } + value = convert(); + return value; + } finally { + lock.unlock(); + } + } + protected abstract TestEventToStore convert() throws CradleStorageException; + + static TestEventSingleToStore toCradleEvent(CradleEntitiesFactory entitiesFactory, EventOrBuilder event) throws CradleStorageException { + TestEventSingleToStoreBuilder builder = entitiesFactory + .testEventBuilder() + .id(ProtoUtil.toCradleEventID(event.getId())) + .name(event.getName()) + .type(event.getType()) + .success(ProtoUtil.isSuccess(event.getStatus())) + .content(event.getBody().toByteArray()); + event.getAttachedMessageIdsList().stream() + .map(ProtoUtil::toStoredMessageId) + .forEach(builder::message); + if (event.hasParentId()) { + builder.parentId(ProtoUtil.toCradleEventID(event.getParentId())); + } + if (event.hasEndTimestamp()) { + builder.endTimestamp(StorageUtils.toInstant(event.getEndTimestamp())); + } + return builder.build(); + } + + static TestEventBatchToStore toCradleBatch(CradleEntitiesFactory entitiesFactory, EventBatchOrBuilder batch) throws CradleStorageException { + TestEventBatchToStoreBuilder cradleEventBatch = entitiesFactory.testEventBatchBuilder() + .id( + new BookId(batch.getParentEventId().getBookName()), + batch.getParentEventId().getScope(), + StorageUtils.toInstant(ProtoUtil.getMinStartTimestamp(batch.getEventsList())), + Util.generateId() + ) + .parentId(ProtoUtil.toCradleEventID(batch.getParentEventId())); + for (Event protoEvent : batch.getEventsList()) { + cradleEventBatch.addTestEvent(toCradleEvent(entitiesFactory, protoEvent)); + } + return cradleEventBatch.build(); + } + } +} + diff --git a/src/main/java/com/exactpro/th2/estore/LogCallBack.java b/src/main/java/com/exactpro/th2/estore/LogCallBack.java index a3cbc33..0864131 100644 --- a/src/main/java/com/exactpro/th2/estore/LogCallBack.java +++ b/src/main/java/com/exactpro/th2/estore/LogCallBack.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,8 +15,6 @@ */ package com.exactpro.th2.estore; -import com.exactpro.cradle.testevents.TestEventBatchToStore; -import com.exactpro.cradle.testevents.TestEventToStore; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -24,7 +22,7 @@ import static java.util.Objects.requireNonNull; -class LogCallBack implements Callback { +class LogCallBack implements Callback { private final LoggingEventBuilder loggingEventBuilder; private final boolean loggerEnabled; @@ -34,26 +32,16 @@ public LogCallBack(@NotNull Logger logger, Level level) { this.loggerEnabled = logger.isEnabledForLevel(level); } @Override - public void onSuccess(TestEventToStore data) { + public void onSuccess(IEventWrapper data) { if (loggerEnabled) { - if (data.isBatch()) { - TestEventBatchToStore batch = data.asBatch(); - loggingEventBuilder.log("Stored the {} test event batch with errors, events: {}, size: {} bytes", batch.getId(), batch.getTestEventsCount(), batch.getBatchSize()); - } else { - loggingEventBuilder.log("Stored the {} test event with error", data.getId()); - } + loggingEventBuilder.log("Stored the {} test event single/batch with error", data.id()); } } @Override - public void onFail(TestEventToStore data) { + public void onFail(IEventWrapper data) { if (loggerEnabled) { - if (data.isBatch()) { - TestEventBatchToStore batch = data.asBatch(); - loggingEventBuilder.log("Storing of the {} test event batch with errors failed, events: {}, size: {} bytes", batch.getId(), batch.getTestEventsCount(), batch.getBatchSize()); - } else { - loggingEventBuilder.log("Storing of the {} test event with error failed", data.getId()); - } + loggingEventBuilder.log("Storing of the {} test event single/batch with error failed", data.id()); } } } diff --git a/src/main/java/com/exactpro/th2/estore/ProtoUtil.java b/src/main/java/com/exactpro/th2/estore/ProtoUtil.java index 848d0f3..8d0e0e1 100644 --- a/src/main/java/com/exactpro/th2/estore/ProtoUtil.java +++ b/src/main/java/com/exactpro/th2/estore/ProtoUtil.java @@ -17,6 +17,7 @@ package com.exactpro.th2.estore; import com.exactpro.cradle.BookId; +import com.exactpro.cradle.Direction; import com.exactpro.cradle.messages.StoredMessageId; import com.exactpro.cradle.testevents.StoredTestEventId; import com.exactpro.th2.common.grpc.Event; @@ -26,6 +27,7 @@ import com.exactpro.th2.common.util.StorageUtils; import com.google.protobuf.Timestamp; +import java.time.Instant; import java.util.Collection; import java.util.Comparator; @@ -49,7 +51,7 @@ public static StoredTestEventId toCradleEventID(EventIDOrBuilder protoEventID) { new BookId(protoEventID.getBookName()), protoEventID.getScope(), StorageUtils.toInstant(protoEventID.getStartTimestamp()), - String.valueOf(protoEventID.getId()) + protoEventID.getId() ); } diff --git a/src/test/java/com/exactpro/th2/estore/IntegrationTestEventProcessor.java b/src/test/java/com/exactpro/th2/estore/IntegrationTestEventProcessor.java index 992514f..399fb2c 100644 --- a/src/test/java/com/exactpro/th2/estore/IntegrationTestEventProcessor.java +++ b/src/test/java/com/exactpro/th2/estore/IntegrationTestEventProcessor.java @@ -53,6 +53,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -80,13 +81,13 @@ public void reject() {} @Override public void confirm() {} }; - private static final int ITERATIONS = 1_000; + private static final double ITERATIONS = 2_000D; private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTestEventProcessor.class); private static final AtomicInteger EVENT_COUNTER = new AtomicInteger(); private static final String TEST_BOOK = "test_book"; private static final String TEST_SCOPE = "test-scope"; public static final String TEST_SESSION_GROUP = "test-session-group"; - public static final String TEST_SESSION_ALIAS = "test-session-alias"; + public static final String TEST_SESSION_ALIAS_PREFIX = "test-session-alias"; private EventProcessor processor; @@ -143,20 +144,20 @@ public void init(@Th2AppFactory CommonFactory appFactory, public void testRootEvents(Event.Status status, int batchSize, int contentSize, - int attachedMessages, + int sessionAliases, CradleManager manager) throws InterruptedException, CradleStorageException, IOException { LOGGER.info("testRootEvents - status: {}, batch size: {}, content size: {}, attached message ids: {}", status, batchSize, contentSize, - attachedMessages); + sessionAliases); BlockingQueue queue = new LinkedBlockingQueue<>(); - String bookName = "testRootEvents_" + status + '_' + batchSize + '_' + contentSize + '_' + attachedMessages; + String bookName = "testRootEvents_" + status + '_' + batchSize + '_' + contentSize + '_' + sessionAliases; createBook(manager.getStorage(), bookName); for (TestStage stage : TestStage.values()) { - generateRootEvents(bookName, status, batchSize, contentSize, attachedMessages) - .limit(ITERATIONS) + generateRootEvents(bookName, status, batchSize, contentSize, sessionAliases) + .limit((long) ITERATIONS) .forEach((packet) -> { TestConfirmation confirmation = new TestConfirmation(packet.statistic); queue.add(confirmation); @@ -186,15 +187,15 @@ public void testRootEvents(Event.Status status, public void testEvents(Event.Status status, int batchSize, int contentSize, - int attachedMessages, + int sessionAliases, CradleManager manager) throws InterruptedException, CradleStorageException, IOException { LOGGER.info("testEvents - status: {}, batch size: {}, content size: {}, attached message ids: {}", status, batchSize, contentSize, - attachedMessages); + sessionAliases); BlockingQueue queue = new LinkedBlockingQueue<>(); - String bookName = "testEvents_" + status + '_' + batchSize + '_' + contentSize + '_' + attachedMessages; + String bookName = "testEvents_" + status + '_' + batchSize + '_' + contentSize + '_' + sessionAliases; createBook(manager.getStorage(), bookName); com.exactpro.th2.common.grpc.Event rootEvent = fillEvent(Event.start(), Event.Status.PASSED, "root", bookName, 0, 0) .toProto(bookName, TEST_SCOPE); @@ -205,19 +206,26 @@ public void testEvents(Event.Status status, for (TestStage stage : TestStage.values()) { LOGGER.info("Sending - stage: {}", stage); long start = System.nanoTime(); - generateEventBatch(rootEventId, status, batchSize, contentSize, attachedMessages) - .limit(ITERATIONS) - .forEach((packet) -> { + List packets = generateEventBatch(rootEventId, status, batchSize, contentSize, sessionAliases) + .limit((long) ITERATIONS) + .collect(Collectors.toList()); + + long prepared = System.nanoTime(); + LOGGER.info("Prepared - stage: {}, batch rate: {}, event rate: {}", + stage, + ITERATIONS / (prepared - start) * 1_000_000_000, + ITERATIONS / (prepared - start) * 1_000_000_000 * batchSize); + packets.forEach((packet) -> { TestConfirmation confirmation = new TestConfirmation(packet.statistic); queue.add(confirmation); processor.process(packet.batch, confirmation); }); long sent = System.nanoTime(); - LOGGER.info("Waiting - stage: {}, batch rate: {}, event rate: {}", + LOGGER.info("Sent - stage: {}, batch rate: {}, event rate: {}", stage, - ((double) ITERATIONS) / (sent - start) * 1_000_000_000, - ((double) ITERATIONS) / (sent - start) * 1_000_000_000 * batchSize); + ITERATIONS / (sent - prepared) * 1_000_000_000, + ITERATIONS / (sent - prepared) * 1_000_000_000 * batchSize); List statistics = new ArrayList<>(); for (int i = 0; i < ITERATIONS; i++) { statistics.add(requireNonNull(queue.poll(1, TimeUnit.DAYS), @@ -228,8 +236,8 @@ public void testEvents(Event.Status status, LOGGER.info("Complete - stage: {}, batch rate: {}, event rate: {}", stage, - ((double) ITERATIONS) / (System.nanoTime() - start) * 1_000_000_000, - ((double) ITERATIONS) / (System.nanoTime() - start) * 1_000_000_000 * batchSize); + ITERATIONS / (System.nanoTime() - prepared) * 1_000_000_000, + ITERATIONS / (System.nanoTime() - prepared) * 1_000_000_000 * batchSize); LOGGER.info("{} (min/median/max) - preparation: {}, packToProto: {}, processing: {}, rate: {}", stage, calculate(statistics, batchSize, Statistic::getPreparation), @@ -272,7 +280,7 @@ private Stream generateEventBatch(EventID parentEventId, Event.Status status, int batchSize, int contentSize, - int attachedMessages) { + int sessionAliases) { String book = parentEventId.getBookName(); return Stream.generate(() -> { long start = System.currentTimeMillis(); @@ -281,7 +289,7 @@ private Stream generateEventBatch(EventID parentEventId, "main", book, contentSize, - attachedMessages); + sessionAliases); for (int item = 0; item < batchSize; item++) { fillEvent(mainEventBuilder.addSubEventWithSamePeriod(), @@ -289,7 +297,7 @@ private Stream generateEventBatch(EventID parentEventId, "sub", book, contentSize, - attachedMessages); + sessionAliases); } long toProto = System.currentTimeMillis(); try { @@ -301,28 +309,29 @@ private Stream generateEventBatch(EventID parentEventId, }); } - private static Event fillEvent(Event eventBuilder, Event.Status status, String name, String book, int contentSize, int attachedMessages) { + private static Event fillEvent(Event eventBuilder, Event.Status status, String name, String book, int contentSize, int sessionAliases) { eventBuilder.name(name + '-' + EVENT_COUNTER.incrementAndGet()) .type(name) .status(status); if (contentSize > 0) { eventBuilder.bodyData(genrateMessage(contentSize)); } - for (int i = 0; i < attachedMessages; i++) { - MessageID.Builder msgBuilder = MessageID.newBuilder(); - msgBuilder.setBookName(book) - .setDirection(Direction.SECOND) - .setTimestamp(MessageUtilsKt.toTimestamp(Instant.now())) - .setSequence(i + 1); - msgBuilder.getConnectionIdBuilder() - .setSessionGroup(TEST_SESSION_GROUP) - .setSessionAlias(TEST_SESSION_ALIAS); - eventBuilder.messageID(msgBuilder.build()); + for (int i = 0; i < sessionAliases; i++) { + for (Direction direction : Set.of(Direction.FIRST, Direction.SECOND)) { + MessageID.Builder msgBuilder = MessageID.newBuilder(); + msgBuilder.setBookName(book) + .setDirection(direction) + .setTimestamp(MessageUtilsKt.toTimestamp(Instant.now())) + .setSequence(i + 1); + msgBuilder.getConnectionIdBuilder() + .setSessionGroup(TEST_SESSION_GROUP) + .setSessionAlias(TEST_SESSION_ALIAS_PREFIX + i); + eventBuilder.messageID(msgBuilder.build()); + } } return eventBuilder; } - private static String calculate(List statistics, double divider, Function getFunc) { List values = statistics.stream().map(getFunc).map(v -> v.doubleValue() / divider).sorted().collect(Collectors.toList()); return String.valueOf(values.stream().min(Double::compareTo).orElseThrow()) + '/' + @@ -341,7 +350,8 @@ private static Stream provideEventsArgs() { // status, batch size, content size, attached messages // Arguments.of(Event.Status.PASSED, 1, 500, 100), // Arguments.of(Event.Status.PASSED, 10, 500, 100), - Arguments.of(Event.Status.PASSED, 100, 500, 100) + Arguments.of(Event.Status.PASSED, 100, 500, 5), + Arguments.of(Event.Status.PASSED, 5, 2500, 4) // Arguments.of(Event.Status.PASSED, 1000, 500, 100) ); } @@ -376,7 +386,7 @@ private Statistic(long start, long toProto, long toPacket, long toSend, long com this.preparation = toProto - start; this.packToProto = toPacket - toProto; - this.processing = complete - toPacket; + this.processing = complete - toSend; this.rate = 1_000_000_000D/this.processing; } diff --git a/src/test/java/com/exactpro/th2/estore/TestErrorCollector.java b/src/test/java/com/exactpro/th2/estore/TestErrorCollector.java index ae453e7..b79e614 100644 --- a/src/test/java/com/exactpro/th2/estore/TestErrorCollector.java +++ b/src/test/java/com/exactpro/th2/estore/TestErrorCollector.java @@ -19,7 +19,6 @@ import com.exactpro.cradle.CradleEntitiesFactory; import com.exactpro.cradle.testevents.StoredTestEventId; import com.exactpro.cradle.testevents.TestEventSingleToStore; -import com.exactpro.cradle.testevents.TestEventToStore; import org.apache.commons.lang3.RandomUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -38,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -62,7 +62,7 @@ class TestErrorCollector { @Mock private ScheduledFuture future; @Mock - private Persistor persistor; + private Persistor persistor; private final CradleEntitiesFactory entitiesFactory = spy(new CradleEntitiesFactory(1_024^2, 1_024^2, 30_000L)); private final StoredTestEventId rootEvent = new StoredTestEventId( new BookId("test-book"), @@ -103,12 +103,12 @@ void testCollect() throws Exception { taskCaptor.getValue().run(); - ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(TestEventToStore.class); + ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistor).persist(eventBatchCaptor.capture(), any()); verify(entitiesFactory).testEventBuilder(); - assertTrue(eventBatchCaptor.getValue().isSingle()); - TestEventSingleToStore event = eventBatchCaptor.getValue().asSingle(); + assertInstanceOf(IEventWrapper.ProtoEventSingleWrapper.class, eventBatchCaptor.getValue()); + TestEventSingleToStore event = eventBatchCaptor.getValue().get().asSingle(); assertEquals("estore internal problem(s): 3", event.getName()); assertEquals("InternalError", event.getType()); @@ -133,12 +133,12 @@ void testLogAndCollect() throws Exception { taskCaptor.getValue().run(); - ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(TestEventToStore.class); + ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistor).persist(eventBatchCaptor.capture(), any()); verify(entitiesFactory).testEventBuilder(); - assertTrue(eventBatchCaptor.getValue().isSingle()); - TestEventSingleToStore event = eventBatchCaptor.getValue().asSingle(); + assertInstanceOf(IEventWrapper.ProtoEventSingleWrapper.class, eventBatchCaptor.getValue()); + TestEventSingleToStore event = eventBatchCaptor.getValue().get().asSingle(); assertEquals("estore internal problem(s): 1", event.getName()); assertEquals("InternalError", event.getType()); @@ -159,12 +159,12 @@ void testClose() throws Exception { verify(future).cancel(eq(true)); - ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(TestEventToStore.class); + ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistor).persist(eventBatchCaptor.capture(), any()); verify(entitiesFactory).testEventBuilder(); - assertTrue(eventBatchCaptor.getValue().isSingle()); - TestEventSingleToStore event = eventBatchCaptor.getValue().asSingle(); + assertInstanceOf(IEventWrapper.ProtoEventSingleWrapper.class, eventBatchCaptor.getValue()); + TestEventSingleToStore event = eventBatchCaptor.getValue().get().asSingle(); assertEquals("estore internal problem(s): 1", event.getName()); assertEquals("InternalError", event.getType()); diff --git a/src/test/java/com/exactpro/th2/estore/TestEventPersistor.java b/src/test/java/com/exactpro/th2/estore/TestEventPersistor.java index 69b7bce..f1fe1e9 100644 --- a/src/test/java/com/exactpro/th2/estore/TestEventPersistor.java +++ b/src/test/java/com/exactpro/th2/estore/TestEventPersistor.java @@ -52,6 +52,7 @@ import java.util.stream.Collectors; import static com.exactpro.th2.common.utils.ExecutorServiceUtilsKt.shutdownGracefully; +import static com.exactpro.th2.estore.IEventWrapper.wrap; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -87,7 +88,7 @@ public class TestEventPersistor { private final ErrorCollector errorCollector = mock(ErrorCollector.class); @SuppressWarnings("unchecked") - private final Callback callback = mock(Callback.class); + private final Callback callback = mock(Callback.class); private EventPersistor persistor; private CradleEntitiesFactory cradleEntitiesFactory; @@ -117,7 +118,7 @@ public void testSingleEvent() throws IOException, CradleStorageException, Interr StoredTestEventId parentId = new StoredTestEventId(bookId, SCOPE, timestamp, "test-parent"); TestEventSingleToStore event = createEvent(bookId, SCOPE, parentId, "test-id-1", "test-event", timestamp, 12); - persistor.persist(event, callback); + persistor.persist(wrap(event), callback); Thread.sleep(EVENT_PERSIST_TIMEOUT); @@ -137,7 +138,7 @@ public void testSingleEvent() throws IOException, CradleStorageException, Interr public void testEventBatch() throws IOException, CradleStorageException, InterruptedException { TestEventBatchToStore eventBatch = createEventBatch1(); - persistor.persist(eventBatch, callback); + persistor.persist(wrap(eventBatch), callback); Thread.sleep(EVENT_PERSIST_TIMEOUT * 2); @@ -162,7 +163,7 @@ public void testEventResubmitted() throws IOException, CradleStorageException, I .thenReturn(CompletableFuture.completedFuture(null)); TestEventBatchToStore eventBatch = createEventBatch1(); - persistor.persist(eventBatch, callback); + persistor.persist(wrap(eventBatch), callback); Thread.sleep(EVENT_PERSIST_TIMEOUT * 2); @@ -186,8 +187,8 @@ public void testEventResubmittedLimitedTimes() throws IOException, CradleStorage os = os.thenReturn(CompletableFuture.failedFuture(new IOException("event persistence failure"))); os.thenReturn(CompletableFuture.completedFuture(null)); - TestEventToStore eventBatch = createEventBatch1(); - persistor.persist(eventBatch, callback); + TestEventBatchToStore eventBatch = createEventBatch1(); + persistor.persist(wrap(eventBatch), callback); Thread.sleep(EVENT_PERSIST_TIMEOUT * (MAX_EVENT_PERSIST_RETRIES + 1)); @@ -215,7 +216,7 @@ public void testEventCountQueueing() throws IOException, CradleStorageException // create executor with thread pool size > event queue size to avoid free thread waiting final ExecutorService executor = Executors.newFixedThreadPool(MAX_EVENT_QUEUE_TASK_SIZE * 2); - TestEventToStore eventBatch = createEventBatch1(); + TestEventBatchToStore eventBatch = createEventBatch1(); when(storageMock.storeTestEventAsync(any())) .thenAnswer((ignored) -> CompletableFuture.runAsync(() -> pause(storeExecutionTime), executor)); @@ -223,7 +224,7 @@ public void testEventCountQueueing() throws IOException, CradleStorageException // setup producer thread StartableRunnable runnable = StartableRunnable.of(() -> { for (int i = 0; i < totalEvents; i++) - persistor.persist(eventBatch, callback); + persistor.persist(wrap(eventBatch), callback); }); new Thread(runnable).start(); @@ -260,7 +261,7 @@ public void testEventSizeQueueing() throws IOException, CradleStorageException { Instant timestamp = Instant.now(); StoredTestEventId parentId = new StoredTestEventId(bookId, SCOPE, timestamp, "test-parent"); - TestEventToStore[] events = new TestEventToStore[totalEvents]; + TestEventSingleToStore[] events = new TestEventSingleToStore[totalEvents]; for (int i = 0; i < totalEvents; i++) events[i] = createEvent(bookId, SCOPE, parentId, "test-id-1", "test-event", timestamp, 12, content); @@ -270,7 +271,7 @@ public void testEventSizeQueueing() throws IOException, CradleStorageException { // setup producer thread StartableRunnable runnable = StartableRunnable.of(() -> { for (int i = 0; i < totalEvents; i++) - persistor.persist(events[i], callback); + persistor.persist(wrap(events[i]), callback); }); new Thread(runnable).start(); @@ -307,9 +308,9 @@ private void pause(long millis) { } } - private static Map mapOf(Collection collection) { + private static Map mapOf(Collection collection) { if (collection != null) - return collection.stream().collect(Collectors.toMap(BatchedStoredTestEvent::getId, v -> v)); + return collection.stream().collect(Collectors.toMap(TestEventSingleToStore::getId, v -> v)); else return Collections.emptyMap(); } @@ -342,9 +343,9 @@ private static void assertStoredEvent(TestEventToStore expected, TestEventToStor } if (expected.isBatch()) { - assertEquals(expected.asBatch().getTestEventsCount(), actual.asBatch().getTestEventsCount()); - Map expectedEvents = mapOf(expected.asBatch().getTestEvents()); - Map actualEvents = mapOf(actual.asBatch().getTestEvents()); + assertEquals(expected.asBatch().getTestEventsCount(), actual.asBatch().getTestEvents().size()); + Map expectedEvents = mapOf(expected.asBatch().getTestEvents()); + Map actualEvents = mapOf(actual.asBatch().getTestEvents()); for (TestEventSingle expectedSingle : expectedEvents.values()) { StoredTestEventId id = expectedSingle.getId(); diff --git a/src/test/java/com/exactpro/th2/estore/TestEventProcessor.java b/src/test/java/com/exactpro/th2/estore/TestEventProcessor.java index 5e7e619..a88dc4f 100644 --- a/src/test/java/com/exactpro/th2/estore/TestEventProcessor.java +++ b/src/test/java/com/exactpro/th2/estore/TestEventProcessor.java @@ -49,6 +49,7 @@ import static com.exactpro.th2.common.util.StorageUtils.toInstant; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; @@ -68,7 +69,7 @@ public class TestEventProcessor { private static final Random RANDOM = new Random(); @SuppressWarnings("unchecked") - private final Persistor persistorMock = mock(Persistor.class); + private final Persistor persistorMock = mock(Persistor.class); @SuppressWarnings("unchecked") private final MessageRouter routerMock = mock(MessageRouter.class); private final Confirmation confirmation = mock(Confirmation.class); @@ -99,10 +100,10 @@ public void testRootEventDelivery() throws Exception { verify(cradleEntitiesFactory, never()).testEventBatchBuilder(); - ArgumentCaptor capture = ArgumentCaptor.forClass(TestEventSingleToStore.class); + ArgumentCaptor capture = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistorMock, times(1)).persist(capture.capture(), any()); - TestEventSingleToStore capturedValue = capture.getValue(); + TestEventSingleToStore capturedValue = capture.getValue().get().asSingle(); assertNotNull(capturedValue, "Captured stored root event"); assertStoredEvent(event, capturedValue); } @@ -115,10 +116,10 @@ public void testSubEventDelivery() throws Exception { verify(cradleEntitiesFactory, never()).testEventBatchBuilder(); - ArgumentCaptor capture = ArgumentCaptor.forClass(TestEventSingleToStore.class); + ArgumentCaptor capture = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistorMock, times(1)).persist(capture.capture(), any()); - TestEventSingleToStore capturedValue = capture.getValue(); + TestEventSingleToStore capturedValue = capture.getValue().get().asSingle(); assertNotNull(capturedValue, "Captured stored sub-event"); assertStoredEvent(event, capturedValue); } @@ -133,17 +134,17 @@ public void testMultipleSubEventsDelivery() throws Exception { verify(cradleEntitiesFactory, never()).testEventBatchBuilder(); - ArgumentCaptor capture = ArgumentCaptor.forClass(TestEventSingleToStore.class); + ArgumentCaptor capture = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistorMock, times(2)).persist(capture.capture(), any()); - List capturedValues = capture.getAllValues(); + List capturedValues = capture.getAllValues(); - TestEventSingleToStore capturedValue = capture.getAllValues().get(0); + TestEventSingleToStore capturedValue = capture.getAllValues().get(0).get().asSingle(); assertNotNull(capturedValue, "Captured first stored event"); - assertStoredEvent(first, capturedValues.get(0)); + assertStoredEvent(first, capturedValues.get(0).get().asSingle()); - capturedValue = capture.getAllValues().get(1); + capturedValue = capture.getAllValues().get(1).get().asSingle(); assertNotNull(capturedValue, "Captured second stored event"); - assertStoredEvent(second, capturedValues.get(1)); + assertStoredEvent(second, capturedValues.get(1).get().asSingle()); } @Test @@ -158,20 +159,20 @@ public void testRootEventBatchConfirmation() throws Exception { verify(cradleEntitiesFactory, never()).testEventBatchBuilder(); - ArgumentCaptor eventCapture = ArgumentCaptor.forClass(TestEventSingleToStore.class); - ArgumentCaptor> callbackCapture = ArgumentCaptor.forClass(Callback.class); + ArgumentCaptor eventCapture = ArgumentCaptor.forClass(IEventWrapper.class); + ArgumentCaptor> callbackCapture = ArgumentCaptor.forClass(Callback.class); verify(persistorMock, times(2)).persist(eventCapture.capture(), callbackCapture.capture()); - List capturedEvents = eventCapture.getAllValues(); - List> capturedCallbacks = callbackCapture.getAllValues(); + List capturedEvents = eventCapture.getAllValues(); + List> capturedCallbacks = callbackCapture.getAllValues(); - TestEventSingleToStore capturedValue = capturedEvents.get(0); + TestEventSingleToStore capturedValue = capturedEvents.get(0).get().asSingle(); assertNotNull(capturedValue, "Captured first stored event"); - assertStoredEvent(first, capturedEvents.get(0)); + assertStoredEvent(first, capturedValue); - capturedValue = capturedEvents.get(1); + capturedValue = capturedEvents.get(1).get().asSingle(); assertNotNull(capturedValue, "Captured second stored event"); - assertStoredEvent(second, capturedEvents.get(1)); + assertStoredEvent(second, capturedValue); // trigger and verify confirmations capturedCallbacks.get(0).onSuccess(capturedEvents.get(0)); @@ -192,20 +193,20 @@ public void testRootEventBatchRejection() throws Exception { verify(cradleEntitiesFactory, never()).testEventBatchBuilder(); - ArgumentCaptor eventCapture = ArgumentCaptor.forClass(TestEventSingleToStore.class); - ArgumentCaptor> callbackCapture = ArgumentCaptor.forClass(Callback.class); + ArgumentCaptor eventCapture = ArgumentCaptor.forClass(IEventWrapper.class); + ArgumentCaptor> callbackCapture = ArgumentCaptor.forClass(Callback.class); verify(persistorMock, times(2)).persist(eventCapture.capture(), callbackCapture.capture()); - List capturedEvents = eventCapture.getAllValues(); - List> capturedCallbacks = callbackCapture.getAllValues(); + List capturedEvents = eventCapture.getAllValues(); + List> capturedCallbacks = callbackCapture.getAllValues(); - TestEventSingleToStore capturedValue = capturedEvents.get(0); + TestEventSingleToStore capturedValue = capturedEvents.get(0).get().asSingle(); assertNotNull(capturedValue, "Captured first stored event"); - assertStoredEvent(first, capturedEvents.get(0)); + assertStoredEvent(first, capturedValue); - capturedValue = capturedEvents.get(1); + capturedValue = capturedEvents.get(1).get().asSingle(); assertNotNull(capturedValue, "Captured second stored event"); - assertStoredEvent(second, capturedEvents.get(1)); + assertStoredEvent(second, capturedValue); // trigger and verify confirmations capturedCallbacks.get(0).onFail(capturedEvents.get(0)); @@ -227,20 +228,20 @@ public void testRootEventBatchRejectionOnSingleFailure() throws Exception { verify(cradleEntitiesFactory, never()).testEventBatchBuilder(); - ArgumentCaptor eventCapture = ArgumentCaptor.forClass(TestEventSingleToStore.class); - ArgumentCaptor> callbackCapture = ArgumentCaptor.forClass(Callback.class); + ArgumentCaptor eventCapture = ArgumentCaptor.forClass(IEventWrapper.class); + ArgumentCaptor> callbackCapture = ArgumentCaptor.forClass(Callback.class); verify(persistorMock, times(2)).persist(eventCapture.capture(), callbackCapture.capture()); - List capturedEvents = eventCapture.getAllValues(); - List> capturedCallbacks = callbackCapture.getAllValues(); + List capturedEvents = eventCapture.getAllValues(); + List> capturedCallbacks = callbackCapture.getAllValues(); - TestEventSingleToStore capturedValue = capturedEvents.get(0); + TestEventSingleToStore capturedValue = capturedEvents.get(0).get().asSingle(); assertNotNull(capturedValue, "Captured first stored event"); - assertStoredEvent(first, capturedEvents.get(0)); + assertStoredEvent(first, capturedValue); - capturedValue = capturedEvents.get(1); + capturedValue = capturedEvents.get(1).get().asSingle(); assertNotNull(capturedValue, "Captured second stored event"); - assertStoredEvent(second, capturedEvents.get(1)); + assertStoredEvent(second, capturedValue); // trigger and verify confirmations capturedCallbacks.get(0).onSuccess(capturedEvents.get(0)); @@ -300,10 +301,10 @@ public void testRootEventWithMessagesDelivery() throws Exception { verify(cradleEntitiesFactory, never()).testEventBatchBuilder(); - ArgumentCaptor captureEvent = ArgumentCaptor.forClass(TestEventSingleToStore.class); + ArgumentCaptor captureEvent = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistorMock, times(1)).persist(captureEvent.capture(), any()); - TestEventSingleToStore capturedValue = captureEvent.getValue(); + TestEventSingleToStore capturedValue = captureEvent.getValue().get().asSingle(); assertNotNull(capturedValue, "Captured stored event"); assertStoredEvent(first, capturedValue); } @@ -311,10 +312,10 @@ public void testRootEventWithMessagesDelivery() throws Exception { private void assertTestEventBatchToStore(EventID parentId, Event first, Event second) throws Exception { eventStore.process(deliveryOf(parentId, first, second), confirmation); - ArgumentCaptor capture = ArgumentCaptor.forClass(TestEventBatchToStore.class); + ArgumentCaptor capture = ArgumentCaptor.forClass(IEventWrapper.class); verify(persistorMock, times(1)).persist(capture.capture(), any()); - TestEventBatchToStore testEventBatchToStore = capture.getValue(); + TestEventBatchToStore testEventBatchToStore = capture.getValue().get().asBatch(); assertEquals( new StoredTestEventId( new BookId(parentId.getBookName()), @@ -325,7 +326,7 @@ private void assertTestEventBatchToStore(EventID parentId, Event first, Event se testEventBatchToStore.getParentId() ); List expectedEvents = List.of(first, second); - List actualEvents = new ArrayList<>(testEventBatchToStore.getTestEvents()); + List actualEvents = new ArrayList<>(testEventBatchToStore.getTestEvents()); assertEquals(expectedEvents.size(), actualEvents.size(), "Event batch size"); for (int i = 0; i < expectedEvents.size(); i++) { assertStoredEvent(expectedEvents.get(i), actualEvents.get(i)); diff --git a/src/test/resources/log4j2.properties b/src/test/resources/log4j2.properties index 3806fe5..1dc8ea5 100755 --- a/src/test/resources/log4j2.properties +++ b/src/test/resources/log4j2.properties @@ -17,21 +17,26 @@ name=Th2Logger # Console appender configuration appender.console.type=Console -appender.console.name=consoleLogger +appender.console.name=CONSOLE appender.console.layout.type=PatternLayout #appender.console.layout.pattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-6p [%-15t] %c - %m%n -appender.console.layout.pattern=%d{dd MMM yyyy HH:mm:ss,SSS} - %m%n +appender.console.layout.pattern=%d{dd MMM yyyy HH:mm:ss.SSS} [%-15t] - %m%n + +appender.file.type = File +appender.file.name = LOGFILE +appender.file.append = false +appender.file.fileName = results/log4j2.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{dd MMM yyyy HH:mm:ss.SSS} [%-15t] - %m%n logger.th2.name=com.exactpro.th2 logger.th2.level=ERROR logger.datastax.name=com.datastax -logger.datastax.level=OFF +logger.datastax.level=ERROR logger.test.name=com.exactpro.th2.estore.IntegrationTestEventProcessor logger.test.level=INFO # Root logger level -rootLogger.level=OFF -# Root logger referring to console appender -rootLogger.appenderRef.stdout.ref=consoleLogger \ No newline at end of file +rootLogger=ERROR, CONSOLE, LOGFILE \ No newline at end of file