Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5063] Made error collector to collect and publish errors to even… #105

Merged
merged 6 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Overview (5.2.3)
# Overview (5.3.0)

Message store (mstore) is an important th2 component responsible for storing raw messages into Cradle. Please refer to [Cradle repository] (https://github.com/th2-net/cradleapi/blob/master/README.md) for more details. This component has a pin for listening messages via MQ.

Expand Down Expand Up @@ -115,6 +115,12 @@ spec:
This is a list of supported features provided by libraries.
Please see more details about this feature via [link](https://github.com/th2-net/th2-common-j#configuration-formats).

## 5.3.0

* Mstore publishes event with aggregated statistics about internal errors into event router periodically
* Updated common: `5.6.0-dev`
* Added common-utils: `2.2.2-dev`

## 5.2.4

* Migrated to the cradle version with fixed load pages where `removed` field is null problem.
Expand Down
10 changes: 7 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ plugins {
id 'java-library'
id 'application'
id 'com.palantir.docker' version '0.25.0'
id "org.owasp.dependencycheck" version "8.3.1"
id "org.owasp.dependencycheck" version "8.4.0"
id 'com.github.jk1.dependency-license-report' version '2.5'
id "de.undercouch.download" version "5.4.0"
}

ext {
cradleVersion = '5.1.4-dev'
commonVersion = '5.4.1-dev'
commonVersion = '5.6.0-dev'
commonUtilsVersion = '2.2.2-dev'
}

group = 'com.exactpro.th2'
Expand Down Expand Up @@ -88,7 +89,7 @@ tasks.withType(Sign).configureEach {
}
// disable running task 'initializeSonatypeStagingRepository' on a gitlab
tasks.configureEach { task ->
if (task.name.equals('initializeSonatypeStagingRepository') &&
if (task.name == 'initializeSonatypeStagingRepository' &&
!(project.hasProperty('sonatypeUsername') && project.hasProperty('sonatypePassword'))
) {
task.enabled = false
Expand Down Expand Up @@ -159,6 +160,9 @@ dependencies {
api platform('com.exactpro.th2:bom:4.5.0')

implementation "com.exactpro.th2:common:$commonVersion"
implementation("com.exactpro.th2:common-utils:$commonUtilsVersion") {
because("executor service utils is used")
}
implementation 'com.exactpro.th2:task-utils:0.1.1'

implementation 'com.google.protobuf:protobuf-java-util'
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
release_version=5.2.4
release_version=5.3.0
description='th2 mstore component'
vcs_url=https://github.com/th2-net/th2-mstore
52 changes: 32 additions & 20 deletions src/main/java/com/exactpro/th2/mstore/AbstractMessageProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

public abstract class AbstractMessageProcessor implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageProcessor.class);
protected final ErrorCollector errorCollector;
protected final CradleStorage cradleStorage;
private final ScheduledExecutorService drainExecutor = Executors.newSingleThreadScheduledExecutor();
protected final Map<SessionKey, MessageOrderingProperties> sessions = new ConcurrentHashMap<>();
Expand All @@ -69,11 +70,13 @@ public abstract class AbstractMessageProcessor implements AutoCloseable {
private final ManualDrainTrigger manualDrain;

public AbstractMessageProcessor(
@NotNull ErrorCollector errorCollector,
@NotNull CradleStorage cradleStorage,
@NotNull Persistor<GroupedMessageBatchToStore> persistor,
@NotNull Configuration configuration,
@NotNull Integer prefetchCount
) {
this.errorCollector = requireNonNull(errorCollector, "Error collector can't be null");
this.cradleStorage = requireNonNull(cradleStorage, "Cradle storage can't be null");
this.persistor = requireNonNull(persistor, "Persistor can't be null");
this.configuration = requireNonNull(configuration, "'Configuration' parameter");
Expand Down Expand Up @@ -103,13 +106,13 @@ public void close() {
future.cancel(false);
}
} catch (RuntimeException ex) {
LOGGER.error("Cannot cancel drain task", ex);
errorCollector.collect(LOGGER, "Cannot cancel drain task", ex);
}

try {
drain(true);
} catch (RuntimeException ex) {
LOGGER.error("Cannot drain left batches during shutdown", ex);
errorCollector.collect(LOGGER, "Cannot drain left batches during shutdown", ex);
}

try {
Expand All @@ -122,27 +125,27 @@ public void close() {
}
}
} catch (InterruptedException e) {
LOGGER.error("Cannot gracefully shutdown drain executor", e);
errorCollector.collect(LOGGER, "Cannot gracefully shutdown drain executor", e);
Thread.currentThread().interrupt();
} catch (RuntimeException e) {
LOGGER.error("Cannot gracefully shutdown drain executor", e);
errorCollector.collect(LOGGER, "Cannot gracefully shutdown drain executor", e);
}
}

private static void confirm(Confirmation confirmation) {
protected void confirm(Confirmation confirmation) {
try {
confirmation.confirm();
} catch (Exception e) {
LOGGER.error("Exception confirming message", e);
errorCollector.collect(LOGGER, "Exception confirming message", e);
}
}


private static void reject(Confirmation confirmation) {
protected void reject(Confirmation confirmation) {
try {
confirmation.reject();
} catch (Exception e) {
LOGGER.error("Exception rejecting message", e);
errorCollector.collect(LOGGER, "Exception rejecting message", e);
}
}

Expand Down Expand Up @@ -316,23 +319,14 @@ private void drain(boolean force) {
protected void persist(ConsolidatedBatch data) {
GroupedMessageBatchToStore batch = data.batch;
try (Histogram.Timer ignored = metrics.startMeasuringPersistenceLatency()) {
persistor.persist(batch, new Callback<>() {
@Override
public void onSuccess(GroupedMessageBatchToStore batch) {
data.confirmations.forEach(AbstractMessageProcessor::confirm);
}

@Override
public void onFail(GroupedMessageBatchToStore batch) {
data.confirmations.forEach(AbstractMessageProcessor::reject);
}
});
persistor.persist(batch, new ProcessorCallback(data));
} catch (Exception e) {
errorCollector.collect("Exception storing batch for group \"" + batch.getGroup() + '\"');
if (LOGGER.isErrorEnabled()) {
LOGGER.error("Exception storing batch for group \"{}\": {}", batch.getGroup(),
formatMessageBatchToStore(batch, false), e);
}
data.confirmations.forEach(AbstractMessageProcessor::reject);
data.confirmations.forEach(this::reject);
}
}

Expand Down Expand Up @@ -432,4 +426,22 @@ public static String identifySessionGroup(String sessionGroup, String sessionAli
return (sessionGroup == null || sessionGroup.isBlank()) ? requireNonBlank(sessionAlias, "'Session alias' parameter can not be blank") : sessionGroup;
}
}

private class ProcessorCallback implements Callback<GroupedMessageBatchToStore> {
private final ConsolidatedBatch data;

public ProcessorCallback(@NotNull ConsolidatedBatch data) {
this.data = requireNonNull(data, "Data can't be bull");
}

@Override
public void onSuccess(GroupedMessageBatchToStore batch) {
data.confirmations.forEach(AbstractMessageProcessor.this::confirm);
}

@Override
public void onFail(GroupedMessageBatchToStore batch) {
data.confirmations.forEach(AbstractMessageProcessor.this::reject);
}
}
}
177 changes: 177 additions & 0 deletions src/main/java/com/exactpro/th2/mstore/ErrorCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.mstore;

import com.exactpro.th2.common.event.Event;
import com.exactpro.th2.common.event.Event.Status;
import com.exactpro.th2.common.event.IBodyData;
import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.grpc.EventID;
import com.exactpro.th2.common.schema.message.MessageRouter;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Objects.requireNonNull;

@SuppressWarnings("unused")
public class ErrorCollector implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorCollector.class);
private final ScheduledFuture<?> drainFuture;
private final MessageRouter<EventBatch> eventRouter;
private final EventID rootEvent;
private final Lock lock = new ReentrantLock();
private Map<String, ErrorMetadata> errors = new HashMap<>();
OptimumCode marked this conversation as resolved.
Show resolved Hide resolved

public ErrorCollector(@NotNull ScheduledExecutorService executor,
@NotNull MessageRouter<EventBatch> eventRouter,
@NotNull EventID rootEvent,
long period,
@NotNull TimeUnit unit) {
this.eventRouter = requireNonNull(eventRouter, "Event router can't be null");
this.rootEvent = requireNonNull(rootEvent, "Root event can't be null");
requireNonNull(unit, "Unit can't be null");
this.drainFuture = requireNonNull(executor, "Executor can't be null")
.scheduleAtFixedRate(this::drain, period, period, unit);
Nikita-Smirnov-Exactpro marked this conversation as resolved.
Show resolved Hide resolved
}

public ErrorCollector(@NotNull ScheduledExecutorService executor,
@NotNull MessageRouter<EventBatch> eventRouter,
@NotNull EventID rootEvent) {
this(executor, eventRouter, rootEvent, 1, TimeUnit.MINUTES);
}

/**
* Log error and call the {@link #collect(String)}} method
* @param error is used as key identifier. Avoid put a lot of unique values
*/
public void collect(Logger logger, String error, Throwable cause) {
logger.error(error, cause);
collect(error);
}

/**
* @param error is used as key identifier. Avoid put a lot of unique values
*/
public void collect(String error) {
lock.lock();
try {
errors.compute(error, (key, metadata) -> {
if (metadata == null) {
return new ErrorMetadata();
}
metadata.inc();
return metadata;
});
} finally {
lock.unlock();
}
}

@Override
public void close() throws Exception {
drainFuture.cancel(true);
drain();
}

private void drain() {
try {
Map<String, ErrorMetadata> map = clear();
if (map.isEmpty()) { return; }

eventRouter.sendAll(Event.start()
.name("mstore internal problem(s): " + calculateTotalQty(map.values()))
.type("InternalError")
.status(Status.FAILED)
.bodyData(new BodyData(map))
.toBatchProto(rootEvent));

} catch (IOException | RuntimeException e) {
LOGGER.error("Drain events task failure", e);
}
}

private Map<String, ErrorMetadata> clear() {
lock.lock();
try {
Map<String, ErrorMetadata> result = errors;
errors = new HashMap<>();
return result;
} finally {
lock.unlock();
}
}

private static int calculateTotalQty(Collection<ErrorMetadata> errors) {
return errors.stream()
.map(ErrorMetadata::getQuantity)
.reduce(0, Integer::sum);
}

private static class BodyData implements IBodyData {
private final Map<String, ErrorMetadata> errors;
@JsonCreator
private BodyData(Map<String, ErrorMetadata> errors) {
this.errors = errors;
}
public Map<String, ErrorMetadata> getErrors() {
return errors;
}
}

private static class ErrorMetadata {
private final Instant firstDate = Instant.now();
private Instant lastDate;
private int quantity = 1;

public void inc() {
quantity += 1;
lastDate = Instant.now();
}

public Instant getFirstDate() {
return firstDate;
}

public Instant getLastDate() {
return lastDate;
}

public void setLastDate(Instant lastDate) {
this.lastDate = lastDate;
}

public int getQuantity() {
return quantity;
}

public void setQuantity(int quantity) {
this.quantity = quantity;
}
}
}
Loading
Loading