diff --git a/README.md b/README.md index 4316482..ff85316 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Overview (5.2.3) +# Overview (5.3.0) Message store (mstore) is an important th2 component responsible for storing raw messages into Cradle. Please refer to [Cradle repository] (https://github.com/th2-net/cradleapi/blob/master/README.md) for more details. This component has a pin for listening messages via MQ. @@ -115,6 +115,12 @@ spec: This is a list of supported features provided by libraries. Please see more details about this feature via [link](https://github.com/th2-net/th2-common-j#configuration-formats). +## 5.3.0 + +* Mstore publishes event with aggregated statistics about internal errors into event router periodically +* Updated common: `5.6.0-dev` +* Added common-utils: `2.2.2-dev` + ## 5.2.4 * Migrated to the cradle version with fixed load pages where `removed` field is null problem. diff --git a/build.gradle b/build.gradle index 9bc0360..7717f6c 100644 --- a/build.gradle +++ b/build.gradle @@ -9,14 +9,15 @@ plugins { id 'java-library' id 'application' id 'com.palantir.docker' version '0.25.0' - id "org.owasp.dependencycheck" version "8.3.1" + id "org.owasp.dependencycheck" version "8.4.0" id 'com.github.jk1.dependency-license-report' version '2.5' id "de.undercouch.download" version "5.4.0" } ext { cradleVersion = '5.1.4-dev' - commonVersion = '5.4.1-dev' + commonVersion = '5.6.0-dev' + commonUtilsVersion = '2.2.2-dev' } group = 'com.exactpro.th2' @@ -88,7 +89,7 @@ tasks.withType(Sign).configureEach { } // disable running task 'initializeSonatypeStagingRepository' on a gitlab tasks.configureEach { task -> - if (task.name.equals('initializeSonatypeStagingRepository') && + if (task.name == 'initializeSonatypeStagingRepository' && !(project.hasProperty('sonatypeUsername') && project.hasProperty('sonatypePassword')) ) { task.enabled = false @@ -159,6 +160,9 @@ dependencies { api platform('com.exactpro.th2:bom:4.5.0') implementation "com.exactpro.th2:common:$commonVersion" + implementation("com.exactpro.th2:common-utils:$commonUtilsVersion") { + because("executor service utils is used") + } implementation 'com.exactpro.th2:task-utils:0.1.1' implementation 'com.google.protobuf:protobuf-java-util' diff --git a/gradle.properties b/gradle.properties index 31f05df..d49ae86 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -release_version=5.2.4 +release_version=5.3.0 description='th2 mstore component' vcs_url=https://github.com/th2-net/th2-mstore \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/mstore/AbstractMessageProcessor.java b/src/main/java/com/exactpro/th2/mstore/AbstractMessageProcessor.java index 0d10e2f..bc2ba71 100644 --- a/src/main/java/com/exactpro/th2/mstore/AbstractMessageProcessor.java +++ b/src/main/java/com/exactpro/th2/mstore/AbstractMessageProcessor.java @@ -56,6 +56,7 @@ public abstract class AbstractMessageProcessor implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageProcessor.class); + protected final ErrorCollector errorCollector; protected final CradleStorage cradleStorage; private final ScheduledExecutorService drainExecutor = Executors.newSingleThreadScheduledExecutor(); protected final Map sessions = new ConcurrentHashMap<>(); @@ -69,11 +70,13 @@ public abstract class AbstractMessageProcessor implements AutoCloseable { private final ManualDrainTrigger manualDrain; public AbstractMessageProcessor( + @NotNull ErrorCollector errorCollector, @NotNull CradleStorage cradleStorage, @NotNull Persistor persistor, @NotNull Configuration configuration, @NotNull Integer prefetchCount ) { + this.errorCollector = requireNonNull(errorCollector, "Error collector can't be null"); this.cradleStorage = requireNonNull(cradleStorage, "Cradle storage can't be null"); this.persistor = requireNonNull(persistor, "Persistor can't be null"); this.configuration = requireNonNull(configuration, "'Configuration' parameter"); @@ -103,13 +106,13 @@ public void close() { future.cancel(false); } } catch (RuntimeException ex) { - LOGGER.error("Cannot cancel drain task", ex); + errorCollector.collect(LOGGER, "Cannot cancel drain task", ex); } try { drain(true); } catch (RuntimeException ex) { - LOGGER.error("Cannot drain left batches during shutdown", ex); + errorCollector.collect(LOGGER, "Cannot drain left batches during shutdown", ex); } try { @@ -122,27 +125,27 @@ public void close() { } } } catch (InterruptedException e) { - LOGGER.error("Cannot gracefully shutdown drain executor", e); + errorCollector.collect(LOGGER, "Cannot gracefully shutdown drain executor", e); Thread.currentThread().interrupt(); } catch (RuntimeException e) { - LOGGER.error("Cannot gracefully shutdown drain executor", e); + errorCollector.collect(LOGGER, "Cannot gracefully shutdown drain executor", e); } } - private static void confirm(Confirmation confirmation) { + protected void confirm(Confirmation confirmation) { try { confirmation.confirm(); } catch (Exception e) { - LOGGER.error("Exception confirming message", e); + errorCollector.collect(LOGGER, "Exception confirming message", e); } } - private static void reject(Confirmation confirmation) { + protected void reject(Confirmation confirmation) { try { confirmation.reject(); } catch (Exception e) { - LOGGER.error("Exception rejecting message", e); + errorCollector.collect(LOGGER, "Exception rejecting message", e); } } @@ -316,23 +319,14 @@ private void drain(boolean force) { protected void persist(ConsolidatedBatch data) { GroupedMessageBatchToStore batch = data.batch; try (Histogram.Timer ignored = metrics.startMeasuringPersistenceLatency()) { - persistor.persist(batch, new Callback<>() { - @Override - public void onSuccess(GroupedMessageBatchToStore batch) { - data.confirmations.forEach(AbstractMessageProcessor::confirm); - } - - @Override - public void onFail(GroupedMessageBatchToStore batch) { - data.confirmations.forEach(AbstractMessageProcessor::reject); - } - }); + persistor.persist(batch, new ProcessorCallback(data)); } catch (Exception e) { + errorCollector.collect("Exception storing batch for group \"" + batch.getGroup() + '\"'); if (LOGGER.isErrorEnabled()) { LOGGER.error("Exception storing batch for group \"{}\": {}", batch.getGroup(), formatMessageBatchToStore(batch, false), e); } - data.confirmations.forEach(AbstractMessageProcessor::reject); + data.confirmations.forEach(this::reject); } } @@ -432,4 +426,22 @@ public static String identifySessionGroup(String sessionGroup, String sessionAli return (sessionGroup == null || sessionGroup.isBlank()) ? requireNonBlank(sessionAlias, "'Session alias' parameter can not be blank") : sessionGroup; } } + + private class ProcessorCallback implements Callback { + private final ConsolidatedBatch data; + + public ProcessorCallback(@NotNull ConsolidatedBatch data) { + this.data = requireNonNull(data, "Data can't be bull"); + } + + @Override + public void onSuccess(GroupedMessageBatchToStore batch) { + data.confirmations.forEach(AbstractMessageProcessor.this::confirm); + } + + @Override + public void onFail(GroupedMessageBatchToStore batch) { + data.confirmations.forEach(AbstractMessageProcessor.this::reject); + } + } } diff --git a/src/main/java/com/exactpro/th2/mstore/ErrorCollector.java b/src/main/java/com/exactpro/th2/mstore/ErrorCollector.java new file mode 100644 index 0000000..4bfa3e0 --- /dev/null +++ b/src/main/java/com/exactpro/th2/mstore/ErrorCollector.java @@ -0,0 +1,177 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.mstore; + +import com.exactpro.th2.common.event.Event; +import com.exactpro.th2.common.event.Event.Status; +import com.exactpro.th2.common.event.IBodyData; +import com.exactpro.th2.common.grpc.EventBatch; +import com.exactpro.th2.common.grpc.EventID; +import com.exactpro.th2.common.schema.message.MessageRouter; +import com.fasterxml.jackson.annotation.JsonCreator; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static java.util.Objects.requireNonNull; + +@SuppressWarnings("unused") +public class ErrorCollector implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(ErrorCollector.class); + private final ScheduledFuture drainFuture; + private final MessageRouter eventRouter; + private final EventID rootEvent; + private final Lock lock = new ReentrantLock(); + private Map errors = new HashMap<>(); + + public ErrorCollector(@NotNull ScheduledExecutorService executor, + @NotNull MessageRouter eventRouter, + @NotNull EventID rootEvent, + long period, + @NotNull TimeUnit unit) { + this.eventRouter = requireNonNull(eventRouter, "Event router can't be null"); + this.rootEvent = requireNonNull(rootEvent, "Root event can't be null"); + requireNonNull(unit, "Unit can't be null"); + this.drainFuture = requireNonNull(executor, "Executor can't be null") + .scheduleAtFixedRate(this::drain, period, period, unit); + } + + public ErrorCollector(@NotNull ScheduledExecutorService executor, + @NotNull MessageRouter eventRouter, + @NotNull EventID rootEvent) { + this(executor, eventRouter, rootEvent, 1, TimeUnit.MINUTES); + } + + /** + * Log error and call the {@link #collect(String)}} method + * @param error is used as key identifier. Avoid put a lot of unique values + */ + public void collect(Logger logger, String error, Throwable cause) { + logger.error(error, cause); + collect(error); + } + + /** + * @param error is used as key identifier. Avoid put a lot of unique values + */ + public void collect(String error) { + lock.lock(); + try { + errors.compute(error, (key, metadata) -> { + if (metadata == null) { + return new ErrorMetadata(); + } + metadata.inc(); + return metadata; + }); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws Exception { + drainFuture.cancel(true); + drain(); + } + + private void drain() { + try { + Map map = clear(); + if (map.isEmpty()) { return; } + + eventRouter.sendAll(Event.start() + .name("mstore internal problem(s): " + calculateTotalQty(map.values())) + .type("InternalError") + .status(Status.FAILED) + .bodyData(new BodyData(map)) + .toBatchProto(rootEvent)); + + } catch (IOException | RuntimeException e) { + LOGGER.error("Drain events task failure", e); + } + } + + private Map clear() { + lock.lock(); + try { + Map result = errors; + errors = new HashMap<>(); + return result; + } finally { + lock.unlock(); + } + } + + private static int calculateTotalQty(Collection errors) { + return errors.stream() + .map(ErrorMetadata::getQuantity) + .reduce(0, Integer::sum); + } + + private static class BodyData implements IBodyData { + private final Map errors; + @JsonCreator + private BodyData(Map errors) { + this.errors = errors; + } + public Map getErrors() { + return errors; + } + } + + private static class ErrorMetadata { + private final Instant firstDate = Instant.now(); + private Instant lastDate; + private int quantity = 1; + + public void inc() { + quantity += 1; + lastDate = Instant.now(); + } + + public Instant getFirstDate() { + return firstDate; + } + + public Instant getLastDate() { + return lastDate; + } + + public void setLastDate(Instant lastDate) { + this.lastDate = lastDate; + } + + public int getQuantity() { + return quantity; + } + + public void setQuantity(int quantity) { + this.quantity = quantity; + } + } +} diff --git a/src/main/java/com/exactpro/th2/mstore/MessagePersistor.java b/src/main/java/com/exactpro/th2/mstore/MessagePersistor.java index 214613d..139937a 100644 --- a/src/main/java/com/exactpro/th2/mstore/MessagePersistor.java +++ b/src/main/java/com/exactpro/th2/mstore/MessagePersistor.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -19,6 +19,7 @@ import com.exactpro.cradle.errors.BookNotFoundException; import com.exactpro.cradle.errors.PageNotFoundException; import com.exactpro.cradle.messages.GroupedMessageBatchToStore; +import com.exactpro.th2.common.utils.ExecutorServiceUtilsKt; import com.exactpro.th2.taskutils.BlockingScheduledRetryableTaskQueue; import com.exactpro.th2.taskutils.FutureTracker; import com.exactpro.th2.taskutils.RetryScheduler; @@ -47,6 +48,7 @@ public class MessagePersistor implements Runnable, AutoCloseable, Persistor> taskQueue; private final int maxTaskRetries; private final CradleStorage cradleStorage; + private final ErrorCollector errorCollector; private final FutureTracker futures; private final MessagePersistorMetrics> metrics; @@ -55,13 +57,14 @@ public class MessagePersistor implements Runnable, AutoCloseable, Persistor config.getRetryDelayBase() * 1_000_000 * (r + 1)); + public MessagePersistor(@NotNull ErrorCollector errorCollector, @NotNull CradleStorage cradleStorage, @NotNull Configuration config) { + this(errorCollector, cradleStorage, (r) -> config.getRetryDelayBase() * 1_000_000 * (r + 1), config); } - public MessagePersistor(@NotNull Configuration config, @NotNull CradleStorage cradleStorage, RetryScheduler scheduler) { + public MessagePersistor(@NotNull ErrorCollector errorCollector, @NotNull CradleStorage cradleStorage, RetryScheduler scheduler, @NotNull Configuration config) { this.maxTaskRetries = config.getMaxRetryCount(); this.cradleStorage = requireNonNull(cradleStorage, "Cradle storage can't be null"); + this.errorCollector = requireNonNull(errorCollector, "Error collector can't be null"); this.taskQueue = new BlockingScheduledRetryableTaskQueue<>(config.getMaxTaskCount(), config.getMaxTaskDataSize(), scheduler); this.futures = new FutureTracker<>(); @@ -72,6 +75,7 @@ public MessagePersistor(@NotNull Configuration config, @NotNull CradleStorage cr public void start() throws InterruptedException { this.stopped = false; synchronized (signal) { + // FIXME: control resource new Thread(this, THREAD_NAME_PREFIX + this.hashCode()).start(); signal.wait(); } @@ -132,7 +136,9 @@ void processTask(ScheduledRetryableTask> task, Throwable e) { if (e instanceof BookNotFoundException || e instanceof PageNotFoundException) { // If following exceptions were thrown there's no point in retrying - logAndFail(task, String.format("Can't retry after %s exception", e.getClass()), e); + logAndFail(task, + "Can't retry after an exception", + String.format("Can't retry after %s exception", e.getClass()), e); } else { logAndRetry(task, e); } @@ -147,22 +153,18 @@ public void close () { futures.awaitRemaining(); LOGGER.info("All waiting futures are completed"); } catch (Exception ex) { - LOGGER.error("Cannot await all futures to be finished", ex); - } - try { - executor.shutdown(); - executor.awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException e) { + errorCollector.collect(LOGGER, "Cannot await all futures to be finished", ex); } + ExecutorServiceUtilsKt.shutdownGracefully(executor, 1, TimeUnit.MINUTES); } private void logAndRetry(ScheduledRetryableTask> task, Throwable e) { int retriesDone = task.getRetriesDone() + 1; - final GroupedMessageBatchToStore messageBatch = task.getPayload().data; + GroupedMessageBatchToStore messageBatch = task.getPayload().data; if (task.getRetriesLeft() > 0) { - + errorCollector.collect("Failed to store the message batch for group '" + messageBatch.getGroup() + "' retries left, rescheduling"); LOGGER.error("Failed to store the message batch for group '{}', {} retries left, rescheduling", messageBatch.getGroup(), task.getRetriesLeft(), @@ -171,8 +173,8 @@ private void logAndRetry(ScheduledRetryableTask> task, String logMessage, Throwable e) { + private void logAndFail( + ScheduledRetryableTask> task, + String errorKey, + String logMessage, + Throwable cause + ) { taskQueue.complete(task); metrics.registerAbortedPersistence(); - LOGGER.error(logMessage, e); + errorCollector.collect(errorKey); + LOGGER.error(logMessage, cause); task.getPayload().fail(); } @@ -210,13 +218,15 @@ private static class PersistenceTask { } void complete() { - if (callback != null) + if (callback != null) { callback.onSuccess(data); + } } void fail() { - if (callback != null) + if (callback != null) { callback.onFail(data); + } } } } \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/mstore/MessageStore.java b/src/main/java/com/exactpro/th2/mstore/MessageStore.java index 95d35a2..c1dd04a 100644 --- a/src/main/java/com/exactpro/th2/mstore/MessageStore.java +++ b/src/main/java/com/exactpro/th2/mstore/MessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -18,15 +18,22 @@ import com.exactpro.cradle.CradleManager; import com.exactpro.cradle.CradleStorage; import com.exactpro.th2.common.schema.factory.CommonFactory; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + import static com.exactpro.th2.common.metrics.CommonMetrics.LIVENESS_MONITOR; import static com.exactpro.th2.common.metrics.CommonMetrics.READINESS_MONITOR; +import static com.exactpro.th2.common.utils.ExecutorServiceUtilsKt.shutdownGracefully; public class MessageStore { private static final Logger LOGGER = LoggerFactory.getLogger(MessageStore.class); + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("error-collector-%d").build(); public static void main(String[] args) { @@ -40,32 +47,36 @@ public static void main(String[] args) { shutdownManager.registerResource(factory); Configuration config = factory.getCustomConfiguration(Configuration.class); - if (config == null) + if (config == null) { config = Configuration.createDefault(); - - ObjectMapper mapper = new ObjectMapper(); - LOGGER.info("Effective configuration:\n{}", mapper.writerWithDefaultPrettyPrinter().writeValueAsString(config)); + } // Initialize Cradle CradleManager cradleManager = factory.getCradleManager(); shutdownManager.registerResource(cradleManager); CradleStorage storage = cradleManager.getStorage(); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + shutdownManager.registerResource(() -> shutdownGracefully(executor, 5, TimeUnit.SECONDS)); + + ErrorCollector errorCollector = new ErrorCollector(executor, factory.getEventBatchRouter(), factory.getRootEventId()); + shutdownManager.registerResource(errorCollector); + // Initialize persistor - MessagePersistor persistor = new MessagePersistor(config, storage); + MessagePersistor persistor = new MessagePersistor(errorCollector, storage, config); shutdownManager.registerResource(persistor); // Initialize processors - ProtoRawMessageProcessor protoProcessor = new ProtoRawMessageProcessor( - factory.getMessageRouterRawBatch(), + AbstractMessageProcessor protoProcessor = new ProtoRawMessageProcessor( + errorCollector, factory.getMessageRouterRawBatch(), storage, persistor, config, factory.getConnectionManagerConfiguration().getPrefetchCount()); shutdownManager.registerResource(protoProcessor); - TransportGroupProcessor transportProcessor = new TransportGroupProcessor( - factory.getTransportGroupBatchRouter(), + AbstractMessageProcessor transportProcessor = new TransportGroupProcessor( + errorCollector, factory.getTransportGroupBatchRouter(), storage, persistor, config, diff --git a/src/main/java/com/exactpro/th2/mstore/ProtoRawMessageProcessor.java b/src/main/java/com/exactpro/th2/mstore/ProtoRawMessageProcessor.java index fbe0e52..35e1f82 100644 --- a/src/main/java/com/exactpro/th2/mstore/ProtoRawMessageProcessor.java +++ b/src/main/java/com/exactpro/th2/mstore/ProtoRawMessageProcessor.java @@ -56,13 +56,14 @@ public class ProtoRawMessageProcessor extends AbstractMessageProcessor { private SubscriberMonitor monitor; public ProtoRawMessageProcessor( + @NotNull ErrorCollector errorCollector, @NotNull MessageRouter router, @NotNull CradleStorage cradleStorage, @NotNull Persistor persistor, @NotNull Configuration configuration, @NotNull Integer prefetchCount ) { - super(cradleStorage, persistor, configuration, prefetchCount); + super(errorCollector, cradleStorage, persistor, configuration, prefetchCount); this.router = requireNonNull(router, "Message router can't be null"); } @@ -81,7 +82,7 @@ public void close() { try { monitor.unsubscribe(); } catch (Exception e) { - LOGGER.error("Can not unsubscribe from queues", e); + errorCollector.collect(LOGGER, "Can not unsubscribe from queues", e); } } super.close(); @@ -121,7 +122,7 @@ void process(DeliveryMetadata deliveryMetadata, RawMessageBatch messageBatch, Co storeMessages(groupedMessageBatchToStore, groupKey, confirmation); } } catch (Exception ex) { - LOGGER.error("Cannot handle the batch of type {}, rejecting", messageBatch.getClass(), ex); + errorCollector.collect(LOGGER, "Cannot handle the batch of type " + messageBatch.getClass() + ", rejecting", ex); reject(confirmation); } } @@ -133,24 +134,6 @@ protected MessageOrderingProperties extractOrderingProperties(MessageID messageI ); } - private static void confirm(Confirmation confirmation) { - try { - confirmation.confirm(); - } catch (Exception e) { - LOGGER.error("Exception confirming message", e); - } - } - - - private static void reject(Confirmation confirmation) { - try { - confirmation.reject(); - } catch (Exception e) { - LOGGER.error("Exception rejecting message", e); - } - } - - //FIXME: com.exactpro.th2.mstore.MessageProcessor.toCradleBatch() 98,242 ms (40.5%) private GroupedMessageBatchToStore toCradleBatch(String group, List messagesList) throws CradleStorageException { GroupedMessageBatchToStore batch = cradleStorage.getEntitiesFactory().groupedMessageBatch(group); diff --git a/src/main/java/com/exactpro/th2/mstore/TransportGroupProcessor.java b/src/main/java/com/exactpro/th2/mstore/TransportGroupProcessor.java index 8146370..40c1536 100644 --- a/src/main/java/com/exactpro/th2/mstore/TransportGroupProcessor.java +++ b/src/main/java/com/exactpro/th2/mstore/TransportGroupProcessor.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.time.Instant; import java.util.HashMap; import java.util.List; @@ -51,13 +50,14 @@ public class TransportGroupProcessor extends AbstractMessageProcessor { private SubscriberMonitor monitor; public TransportGroupProcessor( + @NotNull ErrorCollector errorCollector, @NotNull MessageRouter router, @NotNull CradleStorage cradleStorage, @NotNull Persistor persistor, @NotNull Configuration configuration, @NotNull Integer prefetchCount ) { - super(cradleStorage, persistor, configuration, prefetchCount); + super(errorCollector, cradleStorage, persistor, configuration, prefetchCount); this.router = requireNonNull(router, "Message router can't be null"); } @@ -77,7 +77,7 @@ public void close() { try { monitor.unsubscribe(); } catch (Exception e) { - LOGGER.error("Can not unsubscribe from queues", e); + errorCollector.collect(LOGGER, "Can not unsubscribe from queues", e); } } super.close(); @@ -115,7 +115,7 @@ void process(DeliveryMetadata deliveryMetadata, GroupBatch messageBatch, Confirm storeMessages(groupedMessageBatchToStore, groupKey, confirmation); } } catch (Exception ex) { - LOGGER.error("Cannot handle the batch of type {}, rejecting", messageBatch.getClass(), ex); + errorCollector.collect(LOGGER, "Cannot handle the batch of type " + messageBatch.getClass() + ", rejecting", ex); reject(confirmation); } } @@ -127,23 +127,6 @@ protected MessageOrderingProperties extractOrderingProperties(MessageId messageI ); } - private static void confirm(Confirmation confirmation) { - try { - confirmation.confirm(); - } catch (Exception e) { - LOGGER.error("Exception confirming message", e); - } - } - - - private static void reject(Confirmation confirmation) { - try { - confirmation.reject(); - } catch (Exception e) { - LOGGER.error("Exception rejecting message", e); - } - } - private GroupedMessageBatchToStore toCradleBatch(GroupBatch groupBatch) throws CradleStorageException { GroupedMessageBatchToStore batch = cradleStorage.getEntitiesFactory().groupedMessageBatch(groupBatch.getSessionGroup()); for (MessageGroup messageGroup : groupBatch.getGroups()) { diff --git a/src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java b/src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java new file mode 100644 index 0000000..f7b48fc --- /dev/null +++ b/src/test/java/com/exactpro/th2/mstore/TestErrorCollector.java @@ -0,0 +1,169 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.mstore; + +import com.exactpro.th2.common.grpc.Event; +import com.exactpro.th2.common.grpc.EventBatch; +import com.exactpro.th2.common.grpc.EventID; +import com.exactpro.th2.common.grpc.EventStatus; +import com.exactpro.th2.common.schema.message.MessageRouter; +import com.google.protobuf.util.Timestamps; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +@ExtendWith(MockitoExtension.class) +@SuppressWarnings("DynamicRegexReplaceableByCompiledPattern") +class TestErrorCollector { + + private static final long PERIOD = RandomUtils.nextLong(0, Long.MAX_VALUE); + private static final TimeUnit TIME_UNIT = TimeUnit.values()[RandomUtils.nextInt(0, TimeUnit.values().length)] ; + + @Mock + private Logger logger; + @Mock + private ScheduledExecutorService executor; + @Mock + private ScheduledFuture future; + @Mock + private MessageRouter eventRouter; + private final EventID rootEvent = EventID.newBuilder() + .setBookName("test-book") + .setScope("test-scope") + .setId("test-id") + .setStartTimestamp(Timestamps.now()) + .build(); + private ErrorCollector errorCollector; + @Captor + private ArgumentCaptor taskCaptor; + + @BeforeEach + void beforeEach() { + doReturn(future).when(executor).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + errorCollector = new ErrorCollector(executor, eventRouter, rootEvent, PERIOD, TIME_UNIT); + verify(executor).scheduleAtFixedRate(taskCaptor.capture(), eq(PERIOD), eq(PERIOD), eq(TIME_UNIT)); + verifyNoMoreInteractions(executor); + } + + @AfterEach + void afterEach() { + verifyNoMoreInteractions(logger); + verifyNoMoreInteractions(executor); + verifyNoMoreInteractions(future); + verifyNoMoreInteractions(eventRouter); + } + + @SuppressWarnings("DynamicRegexReplaceableByCompiledPattern") + @Test + void testCollect() throws IOException { + errorCollector.collect("A"); + for (int i = 0; i < 2; i++) { + errorCollector.collect("B"); + } + verifyNoMoreInteractions(eventRouter); + + taskCaptor.getValue().run(); + + ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(EventBatch.class); + verify(eventRouter).sendAll(eventBatchCaptor.capture()); + + assertEquals(1, eventBatchCaptor.getValue().getEventsCount()); + Event event = eventBatchCaptor.getValue().getEvents(0); + + assertEquals("mstore internal problem(s): 3", event.getName()); + assertEquals("InternalError", event.getType()); + assertEquals(EventStatus.FAILED, event.getStatus()); + + String body = event.getBody().toStringUtf8(); + assertTrue(body.matches(".*\"A\":\\{\"firstDate\":\"\\d+-\\d+-\\d+T\\d+:\\d+:\\d+.\\d+Z\",\"quantity\":1}.*"), () -> "body: " + body); + assertTrue(body.matches(".*\"B\":\\{\"firstDate\":\"\\d+-\\d+-\\d+T\\d+:\\d+:\\d+.\\d+Z\",\"lastDate\":\"\\d+-\\d+-\\d+T\\d+:\\d+:\\d+.\\d+Z\",\"quantity\":2}.*"), () -> "body: " + body); + + taskCaptor.getValue().run(); + } + + @SuppressWarnings("DynamicRegexReplaceableByCompiledPattern") + @Test + void testLogAndCollect() throws IOException { + RuntimeException exception = new RuntimeException("test-message"); + errorCollector.collect(logger, "A", exception); + verify(logger).error(eq("A"), same(exception)); + + verifyNoMoreInteractions(logger); + verifyNoMoreInteractions(eventRouter); + + taskCaptor.getValue().run(); + + ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(EventBatch.class); + verify(eventRouter).sendAll(eventBatchCaptor.capture()); + + assertEquals(1, eventBatchCaptor.getValue().getEventsCount()); + Event event = eventBatchCaptor.getValue().getEvents(0); + + assertEquals("mstore internal problem(s): 1", event.getName()); + assertEquals("InternalError", event.getType()); + assertEquals(EventStatus.FAILED, event.getStatus()); + + String body = event.getBody().toStringUtf8(); + assertTrue(body.matches("\\[\\{\"errors\":\\{\"A\":\\{\"firstDate\":\"\\d+-\\d+-\\d+T\\d+:\\d+:\\d+.\\d+Z\",\"quantity\":1}}}]"), () -> "body: " + body); + + taskCaptor.getValue().run(); + } + + @Test + void testClose() throws Exception { + errorCollector.collect("A"); + verifyNoMoreInteractions(eventRouter); + + errorCollector.close(); + + verify(future).cancel(eq(true)); + + ArgumentCaptor eventBatchCaptor = ArgumentCaptor.forClass(EventBatch.class); + verify(eventRouter).sendAll(eventBatchCaptor.capture()); + + assertEquals(1, eventBatchCaptor.getValue().getEventsCount()); + Event event = eventBatchCaptor.getValue().getEvents(0); + + assertEquals("mstore internal problem(s): 1", event.getName()); + assertEquals("InternalError", event.getType()); + assertEquals(EventStatus.FAILED, event.getStatus()); + + String body = event.getBody().toStringUtf8(); + assertTrue(body.matches("\\[\\{\"errors\":\\{\"A\":\\{\"firstDate\":\"\\d+-\\d+-\\d+T\\d+:\\d+:\\d+.\\d+Z\",\"quantity\":1}}}]"), () -> "body: " + body); + } +} \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/mstore/TestMessagePersistor.java b/src/test/java/com/exactpro/th2/mstore/TestMessagePersistor.java index 6236850..118b4ea 100644 --- a/src/test/java/com/exactpro/th2/mstore/TestMessagePersistor.java +++ b/src/test/java/com/exactpro/th2/mstore/TestMessagePersistor.java @@ -75,6 +75,7 @@ public class TestMessagePersistor { private final CradleStorage storageMock = mock(CradleStorage.class); private final Callback callback = mock(Callback.class); + private final ErrorCollector errorCollector = mock(ErrorCollector.class); private MessagePersistor persistor; @@ -95,7 +96,7 @@ void setUp() throws InterruptedException, CradleStorageException { .withRetryDelayBase(10L) .withMaxTaskDataSize(MAX_MESSAGE_QUEUE_DATA_SIZE) .build(); - persistor = spy(new MessagePersistor(config, storageMock)); + persistor = spy(new MessagePersistor(errorCollector, storageMock, config)); persistor.start(); } diff --git a/src/test/java/com/exactpro/th2/mstore/TestProtoRawMessageProcessor.java b/src/test/java/com/exactpro/th2/mstore/TestProtoRawMessageProcessor.java index 3f032cb..99d943c 100644 --- a/src/test/java/com/exactpro/th2/mstore/TestProtoRawMessageProcessor.java +++ b/src/test/java/com/exactpro/th2/mstore/TestProtoRawMessageProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -90,6 +90,7 @@ class TestProtoRawMessageProcessor { private final DeliveryMetadata deliveryMetadata = new DeliveryMetadata("", false); @SuppressWarnings("UnnecessarilyQualifiedInnerClassAccess") private final ManualAckDeliveryCallback.Confirmation confirmation = mock(ManualAckDeliveryCallback.Confirmation.class); + private final ErrorCollector errorCollector = mock(ErrorCollector.class); private final Random random = new Random(); @@ -601,13 +602,13 @@ void testSeparateBatches() { } } - private static ProtoRawMessageProcessor createStore( + private ProtoRawMessageProcessor createStore( CradleStorage cradleStorageMock, MessageRouter routerMock, Persistor persistor, Configuration configuration ) { - return new ProtoRawMessageProcessor(routerMock, cradleStorageMock, persistor, configuration, 0); + return new ProtoRawMessageProcessor(errorCollector, routerMock, cradleStorageMock, persistor, configuration, 0); } private static RawMessage createMessage(String sessionAlias, String sessionGroup, Direction direction, long sequence, String bookName) { diff --git a/src/test/java/com/exactpro/th2/mstore/TestTransportGroupProcessor.java b/src/test/java/com/exactpro/th2/mstore/TestTransportGroupProcessor.java index f54991a..cd8f9e5 100644 --- a/src/test/java/com/exactpro/th2/mstore/TestTransportGroupProcessor.java +++ b/src/test/java/com/exactpro/th2/mstore/TestTransportGroupProcessor.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; @@ -56,6 +55,7 @@ import static com.exactpro.th2.mstore.TransportGroupProcessor.toCradleMessage; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -83,6 +83,7 @@ class TestTransportGroupProcessor { private final MessagePersistor persistor = mock(MessagePersistor.class); private final DeliveryMetadata deliveryMetadata = new DeliveryMetadata("", false); private final ManualAckDeliveryCallback.Confirmation confirmation = mock(ManualAckDeliveryCallback.Confirmation.class); + private final ErrorCollector errorCollector = mock(ErrorCollector.class); private final Random random = new Random(); @@ -131,14 +132,14 @@ public void testToCradleMessage() throws CradleStorageException { MessageToStore cradleMessage = toCradleMessage(book, rawMessage); - Assertions.assertEquals(rawMessage.getProtocol(), cradleMessage.getProtocol()); - Assertions.assertEquals(rawMessage.getMetadata(), cradleMessage.getMetadata().toMap()); - Assertions.assertEquals(rawMessage.getId().getSessionAlias(), cradleMessage.getSessionAlias()); - Assertions.assertEquals(book, cradleMessage.getBookId().getName()); - Assertions.assertEquals(toCradleDirection(rawMessage.getId().getDirection()), cradleMessage.getDirection()); - Assertions.assertEquals(rawMessage.getId().getTimestamp(), cradleMessage.getTimestamp()); - Assertions.assertEquals(rawMessage.getId().getSequence(), cradleMessage.getSequence()); - Assertions.assertArrayEquals(TransportUtilsKt.toByteArray(rawMessage.getBody()), cradleMessage.getContent()); + assertEquals(rawMessage.getProtocol(), cradleMessage.getProtocol()); + assertEquals(rawMessage.getMetadata(), cradleMessage.getMetadata().toMap()); + assertEquals(rawMessage.getId().getSessionAlias(), cradleMessage.getSessionAlias()); + assertEquals(book, cradleMessage.getBookId().getName()); + assertEquals(toCradleDirection(rawMessage.getId().getDirection()), cradleMessage.getDirection()); + assertEquals(rawMessage.getId().getTimestamp(), cradleMessage.getTimestamp()); + assertEquals(rawMessage.getId().getSequence(), cradleMessage.getSequence()); + assertArrayEquals(TransportUtilsKt.toByteArray(rawMessage.getBody()), cradleMessage.getContent()); } @AfterEach @@ -445,7 +446,7 @@ private TransportGroupProcessor createStore( Persistor persistor, Configuration configuration ) { - return new TransportGroupProcessor(routerMock, cradleStorageMock, persistor, configuration, 0); + return new TransportGroupProcessor(errorCollector, routerMock, cradleStorageMock, persistor, configuration, 0); } private RawMessage createMessage(String sessionAlias, Direction direction, long sequence) {