diff --git a/README.md b/README.md index 5727d39ba..a90997014 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 common library (Java) (3.27.0) +# th2 common library (Java) (3.30.0) ## Usage @@ -53,7 +53,7 @@ Then you will create an instance of imported class, by choosing one of the follo var factory = CommonFactory.createFromKubernetes(namespace, boxName); ``` It also can be called by using `createFromArguments(args)` with arguments `--namespace` and `--boxName`. -1. Create factory with a namespace in Kubernetes, the name of the target th2 box from Kubernetes and the name of context to choose the context from Kube config: +1. Create factory with a namespace in Kubernetes, the name of the target th2 box from Kubernetes and the name of context to choose from Kube config: ``` var factory = CommonFactory.createFromKubernetes(namespace, boxName, contextName); ``` @@ -120,12 +120,12 @@ The `CommonFactory` reads a message's router configuration from the `mq.json` fi Filters format: * fieldName - a field's name -* expectedValue - expected field's value (used not for all operations) +* expectedValue - expected field's value (not used for all operations) * operation - operation's type - * `EQUAL` - filter is pass if the field equals exact value - * `NOT_EQUAL` - filter is pass if the field doesn't equal exact value - * `EMPTY` - filter is pass if the field is empty - * `NOT_EMPTY` - filter is pass if the field isn't empty + * `EQUAL` - the filter passes if the field is equal to the exact value + * `NOT_EQUAL` - the filter passes if the field does not equal the exact value + * `EMPTY` - the filter passes if the field is empty + * `NOT_EMPTY` - the filter passes if the field is not empty * `WILDCARD` - filters the field by wildcard expression ```json @@ -195,7 +195,7 @@ The `CommonFactory` reads a Cradle configuration from the cradle.json file. 1. Also note that `generated_configs` directory will be created to store `.json` files with configs from Kubernetes. Those files are overridden when `CommonFactory.createFromKubernetes(namespace, boxName)` and `CommonFactory.createFromKubernetes(namespace, boxName, contextName)` are invoked again. -1. User needs to have authentication with service account token that has necessary access to read CRs and secrets from the specified namespace. +1. User needs to have authentication with service account token that has the necessary access to read CRs and secrets from the specified namespace. After that you can receive various Routers through factory properties: ``` @@ -214,7 +214,7 @@ With the router created, you can subscribe to pins (by specifying the callback f ``` router.subscribe(callback) # subscribe to only one pin router.subscribeAll(callback) # subscribe to one or several pins -router.send(message) # send to only one pim +router.send(message) # send to only one pin router.sendAll(message) # send to one or several pins ``` You can perform these actions by providing pin attributes in addition to the default ones. @@ -248,12 +248,54 @@ NOTES: * in order for the metrics to be exported, you also will need to create an instance of CommonFactory * common JVM metrics will also be exported alongside common service metrics +* some metric labels are enumerations (`th2_type`: `MESSAGE_GROUP`, `EVENT`, ``;`message_type`: `RAW_MESSAGE`, `MESSAGE`) + +ABSTRACT METRICS: +* th2_rabbitmq_message_size_publish_bytes (`th2_pin`, `th2_type`, `exchange`, `routing_key`): number of published message bytes to RabbitMQ. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content +* th2_rabbitmq_message_publish_total (`th2_pin`, `th2_type`, `exchange`, `routing_key`): quantity of published messages to RabbitMQ. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content +* th2_rabbitmq_message_size_subscribe_bytes (`th2_pin`, `th2_type`, `queue`): number of bytes received from RabbitMQ, it includes bytes of messages dropped after filters. For information about the number of dropped messages, please refer to 'th2_message_dropped_subscribe_total' and 'th2_message_group_dropped_subscribe_total'. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content +* th2_rabbitmq_message_process_duration_seconds (`th2_pin`, `th2_type`, `queue`): time of message processing during subscription from RabbitMQ in seconds. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content + +MESSAGES METRICS: +* th2_message_publish_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of published raw or parsed messages +* th2_message_subscribe_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of received raw or parsed messages, includes dropped after filters. For information about the number of dropped messages, please refer to 'th2_message_dropped_subscribe_total' +* th2_message_dropped_publish_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of published raw or parsed messages dropped after filters +* th2_message_dropped_subscribe_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of received raw or parsed messages dropped after filters +* th2_message_group_publish_total (`th2_pin`, `session_alias`, `direction`): quantity of published message groups +* th2_message_group_subscribe_total (`th2_pin`, `session_alias`, `direction`): quantity of received message groups, includes dropped after filters. For information about the number of dropped messages, please refer to 'th2_message_group_dropped_subscribe_total' +* th2_message_group_dropped_publish_total (`th2_pin`, `session_alias`, `direction`): quantity of published message groups dropped after filters +* th2_message_group_dropped_subscribe_total (`th2_pin`, `session_alias`, `direction`): quantity of received message groups dropped after filters +* th2_message_group_sequence_publish (`th2_pin`, `session_alias`, `direction`): last published sequence +* th2_message_group_sequence_subscribe (`th2_pin`, `session_alias`, `direction`): last received sequence + +EVENTS METRICS: +* th2_event_publish_total (`th2_pin`): quantity of published events +* th2_event_subscribe_total (`th2_pin`): quantity of received events ## Release notes -### 3.27.0 +### 3.30.0 + + Updated `messageRecursionLimit` default value from `100` to `500` +### 3.29.1 + ++ Fix problem with filtering by `message_type` in MessageGroupBatch router + +### 3.29.0 + ++ Update Cradle version from `2.13.0` to `2.20.0` + +### 3.28.0 + ++ Added new parameter `hint` for `VerificationEntry` + +### 3.27.0 + ++ Added new abstract router `AbstractRabbitRouter`, removed `MessageQueue` hierarchy ++ Parsed/raw routers work with `MessageGroupBatch` router ++ Added new metrics and removed old + ### 3.26.5 + Migrated `grpc-common` version from `3.7.0` to `3.8.0` + Added `time_precision` and `decimal_precision` parameters to `RootComparisonSettings` @@ -295,7 +337,7 @@ NOTES: + Fixed `messageRecursionLimit` setting for all kind of RabbitMQ subscribers ### 3.24.0 -+ Added setting `messageRecursionLimit`(default 100) to RabbitMQ configuration that denotes how deep nested protobuf messages might be. ++ Added setting `messageRecursionLimit`(the default value is set to 100) to RabbitMQ configuration that denotes how deep nested protobuf messages might be. ### 3.23.0 + Update the grpc-common version to 3.4.0 @@ -359,7 +401,7 @@ NOTES: ### 3.16.3 -+ Change the way channels are stored (they mapped to the pin instead of the thread). ++ Change the way that channels are stored (they are mapped to the pin instead of to the thread). It might increase the average number of channels used by the box, but it also limits the max number of channels to the number of pins ### 3.16.2 diff --git a/build.gradle b/build.gradle index e9d8cb364..cc825d447 100644 --- a/build.gradle +++ b/build.gradle @@ -29,8 +29,8 @@ sourceCompatibility = 11 targetCompatibility = 11 ext { - cradleVersion = '2.13.0' - junitVersion = '5.4.2' + cradleVersion = '2.20.0' + junitVersion = '5.7.2' sharedDir = file("${project.rootDir}/shared") } @@ -201,9 +201,7 @@ dependencies { implementation 'io.fabric8:kubernetes-client:4.13.0' - testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}" - testImplementation "org.junit.jupiter:junit-jupiter-params:${junitVersion}" - testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitVersion}" + testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}" } jar { diff --git a/gradle.properties b/gradle.properties index 9a3f8e603..3322e2561 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,7 +13,7 @@ # limitations under the License. # -release_version=3.27.0 +release_version=3.30.0 description = 'th2 common library (Java)' diff --git a/src/main/java/com/exactpro/th2/common/event/bean/VerificationEntry.java b/src/main/java/com/exactpro/th2/common/event/bean/VerificationEntry.java index 6730b531a..f1b957227 100644 --- a/src/main/java/com/exactpro/th2/common/event/bean/VerificationEntry.java +++ b/src/main/java/com/exactpro/th2/common/event/bean/VerificationEntry.java @@ -1,5 +1,5 @@ -/****************************************************************************** - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) +/* + * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - ******************************************************************************/ + */ package com.exactpro.th2.common.event.bean; import java.util.Map; @@ -24,6 +24,7 @@ public class VerificationEntry { private VerificationStatus status; private String operation; private boolean key; + private String hint; private Map fields; public String getType() { @@ -81,4 +82,12 @@ public Map getFields() { public void setFields(Map fields) { this.fields = fields; } + + public String getHint() { + return hint; + } + + public void setHint(String hint) { + this.hint = hint; + } } diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchQueue.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchQueue.java deleted file mode 100644 index 57bc1e53e..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchQueue.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.event; - -import com.exactpro.th2.common.grpc.EventBatch; -import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageSender; -import com.exactpro.th2.common.schema.message.MessageSubscriber; -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitQueue; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import org.jetbrains.annotations.NotNull; - -public class EventBatchQueue extends AbstractRabbitQueue { - - @Override - protected MessageSender createSender(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration) { - EventBatchSender eventBatchSender = new EventBatchSender(); - eventBatchSender.init(connectionManager, queueConfiguration.getExchange(), queueConfiguration.getRoutingKey()); - return eventBatchSender; - } - - @Override - protected MessageSubscriber createSubscriber(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction) { - EventBatchSubscriber eventBatchSubscriber = new EventBatchSubscriber(); - eventBatchSubscriber.init(connectionManager, new SubscribeTarget(queueConfiguration.getQueue(), queueConfiguration.getRoutingKey(), queueConfiguration.getExchange()), filterFunction); - return eventBatchSubscriber; - } -} diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchRouter.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchRouter.java index 9452a6351..c4f25c878 100644 --- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchRouter.java +++ b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchRouter.java @@ -15,53 +15,73 @@ package com.exactpro.th2.common.schema.event; +import java.util.Set; + +import org.apache.commons.collections4.SetUtils; +import org.jetbrains.annotations.NotNull; + import com.exactpro.th2.common.grpc.EventBatch; import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageQueue; +import com.exactpro.th2.common.schema.message.MessageSender; +import com.exactpro.th2.common.schema.message.MessageSubscriber; import com.exactpro.th2.common.schema.message.QueueAttribute; import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.configuration.RouterFilter; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitMessageRouter; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import com.google.protobuf.Message; -import org.apache.commons.collections4.SetUtils; -import org.jetbrains.annotations.NotNull; - -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitRouter; +import com.google.protobuf.TextFormat; -public class EventBatchRouter extends AbstractRabbitMessageRouter { +public class EventBatchRouter extends AbstractRabbitRouter { + protected static final String EVENT_TYPE = "EVENT"; private static final Set REQUIRED_SUBSCRIBE_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.EVENT.toString(), QueueAttribute.SUBSCRIBE.toString()); private static final Set REQUIRED_SEND_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.EVENT.toString(), QueueAttribute.PUBLISH.toString()); + @NotNull @Override - protected MessageQueue createQueue(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction) { - EventBatchQueue eventBatchQueue = new EventBatchQueue(); - eventBatchQueue.init(connectionManager, queueConfiguration, filterFunction); - return eventBatchQueue; + protected EventBatch splitAndFilter( + EventBatch message, + @NotNull QueueConfiguration pinConfiguration, + @NotNull String pinName + ) { + return message; } + @NotNull @Override - protected Map findQueueByFilter(Map queues, EventBatch msg) { - return queues.entrySet().stream().collect(Collectors.toMap(Entry::getKey, v -> msg)); + protected Set getRequiredSendAttributes() { + return REQUIRED_SEND_ATTRIBUTES; } + @NotNull @Override - protected boolean filterMessage(Message msg, List filters) { - return true; + protected Set getRequiredSubscribeAttributes() { + return REQUIRED_SUBSCRIBE_ATTRIBUTES; } + @NotNull @Override - protected Set requiredSubscribeAttributes() { - return REQUIRED_SUBSCRIBE_ATTRIBUTES; + protected MessageSender createSender(QueueConfiguration queueConfiguration, @NotNull String pinName) { + return new EventBatchSender( + getConnectionManager(), + queueConfiguration.getExchange(), + queueConfiguration.getRoutingKey(), + pinName + ); } + @NotNull @Override - protected Set requiredSendAttributes() { - return REQUIRED_SEND_ATTRIBUTES; + protected MessageSubscriber createSubscriber(QueueConfiguration queueConfiguration, @NotNull String pinName) { + return new EventBatchSubscriber( + getConnectionManager(), + queueConfiguration.getQueue(), + FilterFunction.DEFAULT_FILTER_FUNCTION, + pinName + ); + } + + @NotNull + @Override + protected String toErrorString(EventBatch eventBatch) { + return TextFormat.shortDebugString(eventBatch); } } diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java index f8ddba6bb..661d4ab70 100644 --- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java +++ b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSender.java @@ -16,29 +16,41 @@ package com.exactpro.th2.common.schema.event; +import java.io.IOException; + +import org.jetbrains.annotations.NotNull; + import com.exactpro.th2.common.grpc.EventBatch; import com.exactpro.th2.common.message.MessageUtils; import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; + +import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL; +import static com.exactpro.th2.common.schema.event.EventBatchRouter.EVENT_TYPE; import io.prometheus.client.Counter; public class EventBatchSender extends AbstractRabbitSender { + private static final Counter EVENT_PUBLISH_TOTAL = Counter.build() + .name("th2_event_publish_total") + .labelNames(TH2_PIN_LABEL) + .help("Quantity of published events") + .register(); - private static final Counter OUTGOING_EVENT_BATCH_QUANTITY = Counter.build("th2_mq_outgoing_event_batch_quantity", "Quantity of outgoing event batches").register(); - private static final Counter OUTGOING_EVENT_QUANTITY = Counter.build("th2_mq_outgoing_event_quantity", "Quantity of outgoing events").register(); - - @Override - protected Counter getDeliveryCounter() { - return OUTGOING_EVENT_BATCH_QUANTITY; - } - - @Override - protected Counter getContentCounter() { - return OUTGOING_EVENT_QUANTITY; + public EventBatchSender( + @NotNull ConnectionManager connectionManager, + @NotNull String exchangeName, + @NotNull String routingKey, + @NotNull String th2Pin + ) { + super(connectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE); } @Override - protected int extractCountFrom(EventBatch batch) { - return batch.getEventsCount(); + public void send(EventBatch value) throws IOException { + EVENT_PUBLISH_TOTAL + .labels(th2Pin) + .inc(value.getEventsCount()); + super.send(value); } @Override diff --git a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java index 5f4c38fbb..a5986032d 100644 --- a/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java +++ b/src/main/java/com/exactpro/th2/common/schema/event/EventBatchSubscriber.java @@ -17,61 +17,39 @@ package com.exactpro.th2.common.schema.event; import com.exactpro.th2.common.grpc.EventBatch; +import com.exactpro.th2.common.schema.message.FilterFunction; import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; +import com.rabbitmq.client.Delivery; + +import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL; +import static com.exactpro.th2.common.schema.event.EventBatchRouter.EVENT_TYPE; import io.prometheus.client.Counter; -import io.prometheus.client.Histogram; -import org.jetbrains.annotations.Nullable; -import java.util.List; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import static com.exactpro.th2.common.message.MessageUtils.toJson; -import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_BUCKETS; public class EventBatchSubscriber extends AbstractRabbitSubscriber { - - private static final Counter INCOMING_EVENT_BATCH_QUANTITY = Counter.build() - .name("th2_mq_incoming_event_batch_quantity") - .help("Quantity of incoming event batches") - .register(); - private static final Counter INCOMING_EVENT_QUANTITY = Counter.build() - .name("th2_mq_incoming_event_quantity") - .help("Quantity of incoming events") - .register(); - private static final Histogram EVENT_PROCESSING_TIME = Histogram.build() - .buckets(DEFAULT_BUCKETS) - .name("th2_mq_event_processing_time") - .help("Time of processing events") + private static final Counter EVENT_SUBSCRIBE_TOTAL = Counter.build() + .name("th2_event_subscribe_total") + .labelNames(TH2_PIN_LABEL) + .help("Quantity of received events") .register(); - private static final String[] NO_LABELS = {}; - - @Override - protected Counter getDeliveryCounter() { - return INCOMING_EVENT_BATCH_QUANTITY; - } - - @Override - protected Counter getContentCounter() { - return INCOMING_EVENT_QUANTITY; - } - - @Override - protected Histogram getProcessingTimer() { - return EVENT_PROCESSING_TIME; - } - @Override - protected String[] extractLabels(EventBatch batch) { - return NO_LABELS; + public EventBatchSubscriber( + @NotNull ConnectionManager connectionManager, + @NotNull String queue, + @NotNull FilterFunction filterFunc, + @NotNull String th2Pin + ) { + super(connectionManager, queue, filterFunc, th2Pin, EVENT_TYPE); } @Override - protected int extractCountFrom(EventBatch batch) { - return batch.getEventsCount(); - } - - @Override - protected List valueFromBytes(byte[] bytes) throws Exception { - return List.of(EventBatch.parseFrom(bytes)); + protected EventBatch valueFromBytes(byte[] bytes) throws Exception { + return EventBatch.parseFrom(bytes); } @Override @@ -89,4 +67,12 @@ protected String toShortDebugString(EventBatch value) { protected EventBatch filter(EventBatch eventBatch) throws Exception { return eventBatch; } + + @Override + protected void handle(String consumeTag, Delivery delivery, EventBatch value) { + EVENT_SUBSCRIBE_TOTAL + .labels(th2Pin) + .inc(value.getEventsCount()); + super.handle(consumeTag, delivery, value); + } } diff --git a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java index 4e9b1d7e2..33333e10d 100644 --- a/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java +++ b/src/main/java/com/exactpro/th2/common/schema/factory/AbstractCommonFactory.java @@ -227,7 +227,7 @@ public MessageRouter getMessageRouterParsedBatch() { if (router == null) { try { router = messageRouterParsedBatchClass.getConstructor().newInstance(); - router.init(getMessageRouterContext()); + router.init(getMessageRouterContext(), getMessageRouterMessageGroupBatch()); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new CommonFactoryException("Can not create parsed message router", e); } @@ -247,7 +247,7 @@ public MessageRouter getMessageRouterRawBatch() { if (router == null) { try { router = messageRouterRawBatchClass.getConstructor().newInstance(); - router.init(getMessageRouterContext()); + router.init(getMessageRouterContext(), getMessageRouterMessageGroupBatch()); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new CommonFactoryException("Can not create raw message router", e); } @@ -335,9 +335,9 @@ public void registerCustomMessageRouter( * Registers message router for custom type that is passed via {@code messageClass} parameter.
* * @param messageClass custom message class - * @param messageConverter converter that will used to convert message to bytes and vice versa + * @param messageConverter converter that will be used to convert message to bytes and vice versa * @param defaultSendAttributes set of attributes for sending. A pin must have all of them to be selected for sending the message - * @param defaultSubscribeAttributes set of attributes subscription. A pin must have all of them to be selected for receiving messages + * @param defaultSubscribeAttributes set of attributes for subscription. A pin must have all of them to be selected for receiving messages * @param custom message type * @throws IllegalStateException if the router for {@code messageClass} is already registered */ diff --git a/src/main/java/com/exactpro/th2/common/schema/message/MessageQueue.java b/src/main/java/com/exactpro/th2/common/schema/message/MessageQueue.java deleted file mode 100644 index 759956691..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/MessageQueue.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message; - -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import org.jetbrains.annotations.NotNull; - -/** - * Message queue - * @see MessageSubscriber - * @see MessageSender - */ -public interface MessageQueue extends AutoCloseable { - @Deprecated(since = "3.3.0", forRemoval = true) - void init(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration); - - void init(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunc); - - MessageSubscriber getSubscriber(); - - MessageSender getSender(); - -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java b/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java index 81ac73d91..8a2aef5b2 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/MessageRouter.java @@ -15,12 +15,16 @@ package com.exactpro.th2.common.schema.message; +import com.exactpro.th2.common.grpc.MessageGroupBatch; import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration; +import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext; import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.io.IOException; +import java.util.Objects; /** * Interface for send and receive RabbitMQ messages @@ -34,7 +38,16 @@ public interface MessageRouter extends AutoCloseable { * @param configuration message router configuration */ @Deprecated(since = "3.2.2", forRemoval = true) - void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration); + default void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration) { + Objects.requireNonNull(connectionManager, "Connection owner can not be null"); + Objects.requireNonNull(configuration, "Configuration cannot be null"); + + init(new DefaultMessageRouterContext(connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration)); + } + + default void init(@NotNull MessageRouterContext context, @NotNull MessageRouter groupBatchRouter) { + init(context); + } /** * Initialization message router @@ -47,7 +60,7 @@ public interface MessageRouter extends AutoCloseable { * @param queueAttr queues attributes * @param callback listener * @throws IllegalStateException when more than 1 queue is found - * @return {@link SubscriberMonitor} it start listening. Returns null is can not listen this queue + * @return {@link SubscriberMonitor} it start listening. Returns null if can not listen to this queue */ @Nullable SubscriberMonitor subscribe(MessageListener callback, String... queueAttr); @@ -55,16 +68,18 @@ public interface MessageRouter extends AutoCloseable { /** * Listen ALL RabbitMQ queues in configurations * @param callback listener - * @return {@link SubscriberMonitor} it start listening. Returns null is can not listen this queue + * @return {@link SubscriberMonitor} it start listening. Returns null if can not listen to this queue */ @Nullable - SubscriberMonitor subscribeAll(MessageListener callback); + default SubscriberMonitor subscribeAll(MessageListener callback) { + return subscribeAll(callback, QueueAttribute.SUBSCRIBE.toString()); + } /** * Listen SOME RabbitMQ queues by intersection schemas queues attributes * @param callback listener * @param queueAttr queues attributes - * @return {@link SubscriberMonitor} it start listening. Returns null is can not listen this queue + * @return {@link SubscriberMonitor} it start listening. Returns null if can not listen to this queue */ @Nullable SubscriberMonitor subscribeAll(MessageListener callback, String... queueAttr); @@ -74,7 +89,9 @@ public interface MessageRouter extends AutoCloseable { * @param message * @throws IOException if can not send message */ - void send(T message) throws IOException; + default void send(T message) throws IOException { + send(message, QueueAttribute.PUBLISH.toString()); + } /** * Send message to ONE RabbitMQ queue by intersection schemas queues attributes diff --git a/src/main/java/com/exactpro/th2/common/schema/message/MessageSender.java b/src/main/java/com/exactpro/th2/common/schema/message/MessageSender.java index 9d98827ca..bed00d28e 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/MessageSender.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/MessageSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,12 +24,12 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -/** - * Send message to {@link MessageQueue} - */ @NotThreadSafe public interface MessageSender { - void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull String sendQueue); + + // Please use constructor for initialization + @Deprecated + void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull String routingKey); void send(T message) throws IOException; } diff --git a/src/main/java/com/exactpro/th2/common/schema/message/MessageSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/MessageSubscriber.java index 1b361b038..fb0357134 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/MessageSubscriber.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/MessageSubscriber.java @@ -17,6 +17,7 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget; import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; + import org.jetbrains.annotations.NotNull; import javax.annotation.concurrent.NotThreadSafe; @@ -26,10 +27,12 @@ */ @NotThreadSafe public interface MessageSubscriber extends AutoCloseable { - + // Please use constructor for initialization @Deprecated(since = "3.3.0", forRemoval = true) void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull SubscribeTarget subscribeTargets); + // Please use constructor for initialization + @Deprecated void init(@NotNull ConnectionManager connectionManager, @NotNull SubscribeTarget subscribeTarget, @NotNull FilterFunction filterFunc); void start() throws Exception; diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitBatchSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitBatchSubscriber.java deleted file mode 100644 index fd7eecbea..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitBatchSubscriber.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq; - -import com.exactpro.th2.common.grpc.Direction; -import com.exactpro.th2.common.grpc.MessageGroupBatch; -import com.exactpro.th2.common.schema.message.configuration.RouterFilter; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.Message; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - - -public abstract class AbstractRabbitBatchSubscriber extends AbstractRabbitSubscriber { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRabbitBatchSubscriber.class); - - private List filters; - private final int messageRecursionLimit; - - public AbstractRabbitBatchSubscriber(List filters, int messageRecursionLimit) { - this.filters = filters; - if (messageRecursionLimit < 0) { - throw new IllegalArgumentException("Recursion limit cannot be negative: " + messageRecursionLimit); - } - this.messageRecursionLimit = messageRecursionLimit; - } - - @Override - protected MB filter(MB batch) { - if (filters.isEmpty()) { - return batch; - } - - var messages = new ArrayList<>(getMessages(batch)); - - var each = messages.iterator(); - - while (each.hasNext()) { - var msg = each.next(); - if (!callFilterFunction(msg, filters)) { - each.remove(); - LOGGER.debug("Message skipped because it did not satisfy filters: " + extractMetadata(msg)); - } - } - - return messages.isEmpty() ? null : createBatch(messages); - } - - - protected abstract List getMessages(MB batch); - - protected abstract MB createBatch(List messages); - - protected abstract Metadata extractMetadata(M message); - - protected MessageGroupBatch parseEncodedBatch(byte[] body) throws IOException { - var ins = CodedInputStream.newInstance(body); - ins.setRecursionLimit(messageRecursionLimit); - return MessageGroupBatch.parseFrom(ins); - } - - protected static class Metadata { - private final long sequence; - private final String messageType; - private final Direction direction; - private final String sessionAlias; - - public Metadata(long sequence, String messageType, Direction direction, String sessionAlias) { - this.sequence = sequence; - this.messageType = messageType; - this.direction = direction; - this.sessionAlias = sessionAlias; - } - - public long getSequence() { - return sequence; - } - - public String getMessageType() { - return messageType; - } - - public Direction getDirection() { - return direction; - } - - public String getSessionAlias() { - return sessionAlias; - } - - @Override - public String toString() { - return new ToStringBuilder(this) - .append("sequence", sequence) - .append("messageType", messageType) - .append("sessionAlias", sessionAlias) - .append("direction", direction) - .toString(); - } - } - -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitMessageRouter.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitMessageRouter.java deleted file mode 100644 index c3b2922c5..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitMessageRouter.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq; - -import com.exactpro.th2.common.schema.exception.RouterException; -import com.exactpro.th2.common.schema.filter.strategy.FilterStrategy; -import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageListener; -import com.exactpro.th2.common.schema.message.MessageQueue; -import com.exactpro.th2.common.schema.message.MessageRouter; -import com.exactpro.th2.common.schema.message.MessageRouterContext; -import com.exactpro.th2.common.schema.message.MessageRouterMonitor; -import com.exactpro.th2.common.schema.message.MessageSender; -import com.exactpro.th2.common.schema.message.MessageSubscriber; -import com.exactpro.th2.common.schema.message.SubscriberMonitor; -import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration; -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.configuration.RouterFilter; -import com.exactpro.th2.common.schema.message.impl.context.DefaultMessageRouterContext; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import com.google.protobuf.Message; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -public abstract class AbstractRabbitMessageRouter implements MessageRouter { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRabbitMessageRouter.class); - - private final AtomicReference context = new AtomicReference<>(); - private final AtomicReference> filterStrategy = new AtomicReference<>(getDefaultFilterStrategy()); - private final ConcurrentMap> queueConnections = new ConcurrentHashMap<>(); - - @Override - public void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration) { - Objects.requireNonNull(connectionManager, "Connection owner can not be null"); - Objects.requireNonNull(configuration, "Configuration cannot be null"); - - init(new DefaultMessageRouterContext(connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration)); - } - - @Override - public void init(@NotNull MessageRouterContext context) { - Objects.requireNonNull(context, "Context can not be null"); - - this.context.updateAndGet(prev -> { - if (prev == null) { - return context; - } else { - throw new IllegalStateException("Router is already initialized"); - } - }); - } - - @Nullable - private SubscriberMonitor subscribe(String queueAlias, MessageListener callback) { - var queue = getMessageQueue(queueAlias); - MessageSubscriber subscriber = queue.getSubscriber(); - subscriber.addListener(callback); - - try { - subscriber.start(); - } catch (Exception e) { - throw new RouterException("Can not start subscriber", e); - } - - return new SubscriberMonitorImpl(subscriber, queue); - } - - @NotNull - @Override - public SubscriberMonitor subscribe(MessageListener callback, String... queueAttr) { - var attributes = addRequiredSubscribeAttributes(queueAttr); - var queues = getConfiguration().findQueuesByAttr(attributes); - if (queues.size() != 1) { - throw new IllegalStateException("Wrong amount of queues for subscribe. Found " + queues.size() + " queues, but must not be more than 1. Search was done by " + attributes + " attributes"); - } - return subscribe(queues.keySet().iterator().next(), callback); - } - - @Override - public SubscriberMonitor subscribeAll(MessageListener callback) { - List subscribers = getConfiguration().findQueuesByAttr(requiredSubscribeAttributes()).keySet().stream().map(alias -> subscribe(alias, callback)).collect(Collectors.toList()); - if (subscribers.isEmpty()) { - throw new IllegalStateException("Wrong amount of queues for subscribeAll. Should not be empty. Search was done by " + requiredSubscribeAttributes() + " attributes"); - } - return new MultiplySubscribeMonitorImpl(subscribers); - } - - @Override - public SubscriberMonitor subscribeAll(MessageListener callback, String... queueAttr) { - var attributes = addRequiredSubscribeAttributes(queueAttr); - List subscribers = getConfiguration().findQueuesByAttr(attributes).keySet().stream().map(queueConfiguration -> subscribe(queueConfiguration, callback)).collect(Collectors.toList()); - if (subscribers.isEmpty()) { - throw new IllegalStateException("Wrong amount of queues for subscribeAll. Should not be empty. Search was done by " + attributes + " attributes"); - } - return new MultiplySubscribeMonitorImpl(subscribers); - } - - @Override - public void send(T message) throws IOException { - var filteredByAttrAndFilter = findQueueByFilter(requiredSendAttributes().size() > 0 ? getConfiguration().findQueuesByAttr(requiredSendAttributes()) : getConfiguration().getQueues(), message); - - if (filteredByAttrAndFilter.size() != 1) { - throw new IllegalStateException("Wrong count of queues for send. Should be equal to 1. Find queues = " + filteredByAttrAndFilter.size()); - } - - send(filteredByAttrAndFilter); - } - - @Override - public void send(T message, String... queueAttr) throws IOException { - - var filteredByAttr = getConfiguration().findQueuesByAttr(addRequiredSendAttributes(queueAttr)); - - var filteredByAttrAndFilter = findQueueByFilter(filteredByAttr, message); - - if (filteredByAttrAndFilter.size() != 1) { - throw new IllegalStateException("Wrong size of queues for send. Should be equal to 1"); - } - - send(filteredByAttrAndFilter); - } - - @Override - public void sendAll(T message, String... queueAttr) throws IOException { - - var filteredByAttr = getConfiguration().findQueuesByAttr(addRequiredSendAttributes(queueAttr)); - - var filteredByAttrAndFilter = findQueueByFilter(filteredByAttr, message); - - if (filteredByAttrAndFilter.isEmpty()) { - throw new IllegalStateException("Wrong size of queues for send. Can't be equal to 0"); - } - - send(filteredByAttrAndFilter); - } - - /** - * Return a fields filter strategy - * @return filter strategy for filtering message fields - */ - @NotNull - public FilterStrategy getFilterStrategy() { - return this.filterStrategy.get(); - } - - @Override - public void close() { - LOGGER.info("Closing message router"); - - Collection exceptions = new ArrayList<>(); - - for (MessageQueue queue : queueConnections.values()) { - try { - queue.close(); - } catch (Exception e) { - exceptions.add(e); - } - } - - queueConnections.clear(); - - if (!exceptions.isEmpty()) { - RuntimeException exception = new RouterException("Can not close message router"); - exceptions.forEach(exception::addSuppressed); - throw exception; - } - - LOGGER.info("Message router has been successfully closed"); - } - - protected abstract MessageQueue createQueue(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction); - - protected abstract Map findQueueByFilter(Map queues, T msg); - - protected abstract Set requiredSubscribeAttributes(); - - protected abstract Set requiredSendAttributes(); - - @NotNull - protected FilterStrategy getDefaultFilterStrategy() { - return FilterStrategy.DEFAULT_FILTER_STRATEGY; - } - - protected MessageRouterMonitor getMonitor() { - return getContext().getRouterMonitor(); - } - - protected MessageQueue getMessageQueue(String queueAlias) { - return queueConnections.computeIfAbsent(queueAlias, key -> { - ConnectionManager connectionManager = getConnectionManager(); - - QueueConfiguration queueByAlias = getConfiguration().getQueueByAlias(key); - if (queueByAlias == null) { - throw new IllegalStateException("Can not find queue for " + queueAlias); - } - - return createQueue(connectionManager, queueByAlias, this::filterMessage); - }); - } - - protected void send(Map aliasesAndMessagesToSend) { - Map exceptions = new HashMap<>(); - - aliasesAndMessagesToSend.forEach((queueAlias, message) -> { - try { - MessageSender sender = getMessageQueue(queueAlias).getSender(); - sender.send(message); - } catch (IOException e) { - exceptions.put(queueAlias, e); - } catch (Exception e) { - throw new RouterException("Can not start sender to queue: " + queueAlias, e); - } - }); - - if (!exceptions.isEmpty()) { - RouterException exception = new RouterException("Can not send to queue(s): " + String.join(",", exceptions.keySet())); - exceptions.values().forEach(exception::addSuppressed); - throw exception; - } - } - - protected boolean filterMessage(Message msg, List filters) { - return filterStrategy.get().verify(msg, filters); - } - - @NotNull - private ConnectionManager getConnectionManager() { - return getContext().getConnectionManager(); - } - - @NotNull - private MessageRouterConfiguration getConfiguration() { - return getContext().getConfiguration(); - } - - @NotNull - private MessageRouterContext getContext() { - MessageRouterContext context = this.context.get(); - if (context == null) { - throw new IllegalStateException("Router is not initialized"); - } - return context; - } - - private Collection addRequiredSubscribeAttributes(String[] queueAttr) { - Set attributes = new HashSet<>(requiredSubscribeAttributes()); - attributes.addAll(Arrays.asList(queueAttr)); - return attributes; - } - - private Collection addRequiredSendAttributes(String[] queueAttr) { - Set attributes = new HashSet<>(requiredSendAttributes()); - attributes.addAll(Arrays.asList(queueAttr)); - return attributes; - } - - protected static class SubscriberMonitorImpl implements SubscriberMonitor { - private final Object lock; - - private final MessageSubscriber subscriber; - public SubscriberMonitorImpl(@NotNull MessageSubscriber subscriber, @Nullable Object lock) { - this.lock = lock == null ? subscriber : lock; - this.subscriber = subscriber; - } - - @Override - public void unsubscribe() throws Exception { - synchronized (lock) { - subscriber.close(); - } - } - } - protected static class MultiplySubscribeMonitorImpl implements SubscriberMonitor { - - - - private final List subscriberMonitors; - public MultiplySubscribeMonitorImpl(List subscriberMonitors) { - this.subscriberMonitors = subscriberMonitors; - } - - @Override - public void unsubscribe() throws Exception { - Exception exception = null; - for (SubscriberMonitor monitor : subscriberMonitors) { - try { - monitor.unsubscribe(); - } catch (Exception e) { - if (exception == null) { - exception = new Exception("Can not unsubscribe from some subscribe monitors"); - } - exception.addSuppressed(e); - } - } - if (exception != null) { - throw exception; - } - } - - } -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitQueue.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitQueue.java deleted file mode 100644 index 0d7b4f5e0..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitQueue.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq; - -import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageQueue; -import com.exactpro.th2.common.schema.message.MessageSender; -import com.exactpro.th2.common.schema.message.MessageSubscriber; -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import org.jetbrains.annotations.NotNull; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; - -public abstract class AbstractRabbitQueue implements MessageQueue { - - private final AtomicReference connectionManager = new AtomicReference<>(); - private final AtomicReference queueConfiguration = new AtomicReference<>(); - private final AtomicReference filterFunc = new AtomicReference<>(); - - private final AtomicReference> sender = new AtomicReference<>(); - private final AtomicReference> subscriber = new AtomicReference<>(); - - @Override - public void init(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration) { - this.init(connectionManager, queueConfiguration, FilterFunction.DEFAULT_FILTER_FUNCTION); - } - - @Override - public void init(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunc) { - if (isInit()) { - throw new IllegalStateException("Queue is already initialize"); - } - - Objects.requireNonNull(connectionManager, "Connection can not be null"); - Objects.requireNonNull(queueConfiguration, "Queue configuration can not be null"); - Objects.requireNonNull(filterFunc, "Filter function can not be null"); - - this.connectionManager.set(connectionManager); - this.queueConfiguration.set(queueConfiguration); - this.filterFunc.set(filterFunc); - } - - @Override - public MessageSubscriber getSubscriber() { - ConnectionManager connectionManger = connectionManager.get(); - QueueConfiguration queueConfiguration = this.queueConfiguration.get(); - FilterFunction filterFunction = filterFunc.get(); - - if (connectionManger == null || queueConfiguration == null || filterFunction == null) { - throw new IllegalStateException("Queue is not initialized"); - } - - if (!queueConfiguration.isReadable()) { - throw new IllegalStateException("Queue can not read"); - } - - return subscriber.updateAndGet( subscriber -> { - if (subscriber == null) { - return createSubscriber(connectionManger, queueConfiguration, filterFunction); - } - return subscriber; - }); - } - - @Override - public MessageSender getSender() { - ConnectionManager connectionManager = this.connectionManager.get(); - QueueConfiguration queueConfiguration = this.queueConfiguration.get(); - FilterFunction filterFunction = filterFunc.get(); - - if (connectionManager == null || queueConfiguration == null || filterFunction == null) { - throw new IllegalStateException("Queue is not initialized"); - } - - if (!queueConfiguration.isWritable()) { - throw new IllegalStateException("Queue can not write"); - } - - return sender.updateAndGet(sender -> { - if (sender == null) { - return createSender(connectionManager, queueConfiguration); - } - return sender; - }); - } - - @Override - public void close() throws Exception { - Collection exceptions = new ArrayList<>(); - - - subscriber.updateAndGet(subscriber -> { - if (subscriber != null) { - try { - subscriber.close(); - } catch (Exception e) { - exceptions.add(e); - } - } - return null; - }); - - if (!exceptions.isEmpty()) { - Exception exception = new Exception("Can not close message queue"); - exceptions.forEach(exception::addSuppressed); - throw exception; - } - } - - public boolean isInit() { - return this.connectionManager.get() != null || this.queueConfiguration.get() != null || this.filterFunc.get() != null; - } - - protected abstract MessageSender createSender(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration); - - protected abstract MessageSubscriber createSubscriber(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction); -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java index c87a09f22..751bc4113 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSender.java @@ -17,7 +17,6 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq; import java.io.IOException; -import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import org.jetbrains.annotations.NotNull; @@ -27,55 +26,77 @@ import com.exactpro.th2.common.schema.message.MessageSender; import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; +import static com.exactpro.th2.common.metrics.CommonMetrics.EXCHANGE_LABEL; +import static com.exactpro.th2.common.metrics.CommonMetrics.ROUTING_KEY_LABEL; +import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL; +import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_TYPE_LABEL; import io.prometheus.client.Counter; +import static java.util.Objects.requireNonNull; public abstract class AbstractRabbitSender implements MessageSender { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRabbitSender.class); - private final AtomicReference sendQueue = new AtomicReference<>(); + private static final Counter MESSAGE_SIZE_PUBLISH_BYTES = Counter.build() + .name("th2_rabbitmq_message_size_publish_bytes") + .labelNames(TH2_PIN_LABEL, TH2_TYPE_LABEL, EXCHANGE_LABEL, ROUTING_KEY_LABEL) + .help("Number of published message bytes to RabbitMQ. " + + "The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content") + .register(); + + private static final Counter MESSAGE_PUBLISH_TOTAL = Counter.build() + .name("th2_rabbitmq_message_publish_total") + .labelNames(TH2_PIN_LABEL, TH2_TYPE_LABEL, EXCHANGE_LABEL, ROUTING_KEY_LABEL) + .help("Quantity of published messages to RabbitMQ. " + + "The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content") + .register(); + + protected final String th2Pin; + private final AtomicReference routingKey = new AtomicReference<>(); private final AtomicReference exchangeName = new AtomicReference<>(); private final AtomicReference connectionManager = new AtomicReference<>(); + private final String th2Type; + + public AbstractRabbitSender( + @NotNull ConnectionManager connectionManager, + @NotNull String exchangeName, + @NotNull String routingKey, + @NotNull String th2Pin, + @NotNull String th2Type + ) { + this.connectionManager.set(requireNonNull(connectionManager, "Connection can not be null")); + this.exchangeName.set(requireNonNull(exchangeName, "Exchange name can not be null")); + this.routingKey.set(requireNonNull(routingKey, "Routing key can not be null")); + this.th2Pin = requireNonNull(th2Pin, "TH2 pin can not be null"); + this.th2Type = requireNonNull(th2Type, "TH2 type can not be null"); + } + @Deprecated @Override - public void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull String sendQueue) { - Objects.requireNonNull(connectionManager, "Connection can not be null"); - Objects.requireNonNull(exchangeName, "Exchange name can not be null"); - Objects.requireNonNull(sendQueue, "Send queue can not be null"); - - if (this.connectionManager.get() != null && this.sendQueue.get() != null && this.exchangeName.get() != null) { - throw new IllegalStateException("Sender is already initialize"); - } - - this.connectionManager.set(connectionManager); - this.exchangeName.set(exchangeName); - this.sendQueue.set(sendQueue); + public void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull String routingKey) { + throw new UnsupportedOperationException("Method is deprecated, please use constructor"); } - protected abstract Counter getDeliveryCounter(); - - protected abstract Counter getContentCounter(); - - protected abstract int extractCountFrom(T batch); - @Override public void send(T value) throws IOException { - Objects.requireNonNull(value, "Value for send can not be null"); - - Counter counter = getDeliveryCounter(); - counter.inc(); - Counter contentCounter = getContentCounter(); - contentCounter.inc(extractCountFrom(value)); + requireNonNull(value, "Value for send can not be null"); try { ConnectionManager connection = this.connectionManager.get(); - connection.basicPublish(exchangeName.get(), sendQueue.get(), null, valueToBytes(value)); + byte[] bytes = valueToBytes(value); + MESSAGE_SIZE_PUBLISH_BYTES + .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) + .inc(bytes.length); + MESSAGE_PUBLISH_TOTAL + .labels(th2Pin, th2Type, exchangeName.get(), routingKey.get()) + .inc(); + connection.basicPublish(exchangeName.get(), routingKey.get(), null, bytes); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Message sent to exchangeName='{}', routing key='{}': '{}'", - exchangeName, sendQueue, toShortTraceString(value)); + exchangeName, routingKey, toShortTraceString(value)); } else if (LOGGER.isDebugEnabled()) { LOGGER.debug("Message sent to exchangeName='{}', routing key='{}': '{}'", - exchangeName, sendQueue, toShortDebugString(value)); + exchangeName, routingKey, toShortDebugString(value)); } } catch (Exception e) { throw new IOException("Can not send message: " + toShortDebugString(value), e); @@ -87,6 +108,4 @@ public void send(T value) throws IOException { protected abstract String toShortDebugString(T value); protected abstract byte[] valueToBytes(T value); - - } diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java index a5822805f..b6c8d3c3e 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitSubscriber.java @@ -25,9 +25,14 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; import com.google.protobuf.Message; import com.rabbitmq.client.Delivery; + +import static com.exactpro.th2.common.metrics.CommonMetrics.QUEUE_LABEL; +import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL; +import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_TYPE_LABEL; import io.prometheus.client.Counter; import io.prometheus.client.Histogram; import io.prometheus.client.Histogram.Timer; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -39,59 +44,105 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicReference; -import static com.exactpro.th2.common.schema.message.FilterFunction.DEFAULT_FILTER_FUNCTION; +import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_BUCKETS; +import static java.util.Objects.requireNonNull; public abstract class AbstractRabbitSubscriber implements MessageSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRabbitSubscriber.class); + private static final Counter MESSAGE_SIZE_SUBSCRIBE_BYTES = Counter.build() + .name("th2_rabbitmq_message_size_subscribe_bytes") + .labelNames(TH2_PIN_LABEL, TH2_TYPE_LABEL, QUEUE_LABEL) + .help("Number of bytes received from RabbitMQ, it includes bytes of messages dropped after filters. " + + "For information about the number of dropped messages, please refer to 'th2_message_dropped_subscribe_total' and 'th2_message_group_dropped_subscribe_total'. " + + "The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content") + .register(); + + private static final Histogram MESSAGE_PROCESS_DURATION_SECONDS = Histogram.build() + .buckets(DEFAULT_BUCKETS) + .name("th2_rabbitmq_message_process_duration_seconds") + .labelNames(TH2_PIN_LABEL, TH2_TYPE_LABEL, QUEUE_LABEL) + .help("Time of message processing during subscription from RabbitMQ in seconds. " + + "The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content") + .register(); + + protected final String th2Pin; private final List> listeners = new CopyOnWriteArrayList<>(); - private final AtomicReference subscribeTarget = new AtomicReference<>(); + private final String queue; private final AtomicReference connectionManager = new AtomicReference<>(); private final AtomicReference consumerMonitor = new AtomicReference<>(); private final AtomicReference filterFunc = new AtomicReference<>(); + private final String th2Type; private final HealthMetrics healthMetrics = new HealthMetrics(this); + public AbstractRabbitSubscriber( + @NotNull ConnectionManager connectionManager, + @NotNull String queue, + @NotNull FilterFunction filterFunc, + @NotNull String th2Pin, + @NotNull String th2Type + ) { + this.connectionManager.set(requireNonNull(connectionManager, "Connection can not be null")); + this.queue = requireNonNull(queue, "Queue can not be null"); + this.filterFunc.set(requireNonNull(filterFunc, "Filter function can not be null")); + this.th2Pin = requireNonNull(th2Pin, "TH2 pin can not be null"); + this.th2Type = requireNonNull(th2Type, "TH2 type can not be null"); + } + + @Deprecated @Override - public void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull SubscribeTarget subscribeTarget) { - this.init(connectionManager, new SubscribeTarget(subscribeTarget.getQueue(), subscribeTarget.getRoutingKey(), exchangeName), DEFAULT_FILTER_FUNCTION); + public void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull SubscribeTarget subscribeTargets) { + throw new UnsupportedOperationException("Method is deprecated, please use constructor"); } + @Deprecated @Override public void init(@NotNull ConnectionManager connectionManager, @NotNull SubscribeTarget subscribeTarget, @NotNull FilterFunction filterFunc) { - Objects.requireNonNull(connectionManager, "Connection can not be null"); - Objects.requireNonNull(subscribeTarget, "Subscriber target can not be null"); - Objects.requireNonNull(filterFunc, "Filter function can not be null"); - - - if (this.connectionManager.get() != null || this.subscribeTarget.get() != null || this.filterFunc.get() != null) { - throw new IllegalStateException("Subscriber is already initialize"); - } - - this.connectionManager.set(connectionManager); - this.subscribeTarget.set(subscribeTarget); - this.filterFunc.set(filterFunc); + throw new UnsupportedOperationException("Method is deprecated, please use constructor"); } @Override public void start() throws Exception { ConnectionManager connectionManager = this.connectionManager.get(); - SubscribeTarget target = subscribeTarget.get(); - - if (connectionManager == null || target == null) { + if (connectionManager == null) { throw new IllegalStateException("Subscriber is not initialized"); } try { - var queue = target.getQueue(); - var routingKey = target.getRoutingKey(); - var exchangeName = target.getExchange(); - consumerMonitor.updateAndGet(monitor -> { if (monitor == null) { try { - monitor = connectionManager.basicConsume(queue, this::handle, this::canceled); - LOGGER.info("Start listening exchangeName='{}', routing key='{}', queue name='{}'", exchangeName, routingKey, queue); + monitor = connectionManager.basicConsume( + queue, + (consumeTag, delivery) -> { + Timer processTimer = MESSAGE_PROCESS_DURATION_SECONDS + .labels(th2Pin, th2Type, queue) + .startTimer(); + MESSAGE_SIZE_SUBSCRIBE_BYTES + .labels(th2Pin, th2Type, queue) + .inc(delivery.getBody().length); + try { + T value; + try { + value = valueFromBytes(delivery.getBody()); + } catch (Exception e) { + throw new IOException( + String.format( + "Can not extract value from bytes for envelope '%s', queue '%s', pin '%s'", + delivery.getEnvelope(), queue, th2Pin + ), + e + ); + } + handle(consumeTag, delivery, value); + } finally { + processTimer.observeDuration(); + } + }, + this::canceled + ); + LOGGER.info("Start listening queue name='{}'", queue); } catch (IOException e) { throw new IllegalStateException("Can not start subscribe to queue = " + queue, e); } @@ -134,7 +185,7 @@ protected boolean callFilterFunction(Message message, List valueFromBytes(byte[] body) throws Exception; + protected abstract T valueFromBytes(byte[] body) throws Exception; protected abstract String toShortTraceString(T value); @@ -143,87 +194,51 @@ protected boolean callFilterFunction(Message message, List values = valueFromBytes(delivery.getBody()); - - for (T value : values) { - Objects.requireNonNull(value, "Received value is null"); - - String[] labels = extractLabels(value); - Objects.requireNonNull(labels, "Labels list extracted from received value is null"); - - Counter counter = getDeliveryCounter(); - Counter contentCounter = getContentCounter(); - - if (labels.length == 0) { - counter.inc(); - contentCounter.inc(extractCountFrom(value)); - } else { - counter.labels(labels).inc(); - contentCounter.labels(labels).inc(extractCountFrom(value)); - } + requireNonNull(value, "Received value is null"); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Received message: {}", toShortTraceString(value)); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Received message: {}", toShortDebugString(value)); - } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Received message: {}", toShortTraceString(value)); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Received message: {}", toShortDebugString(value)); + } - var filteredValue = filter(value); + var filteredValue = filter(value); - if (Objects.isNull(filteredValue)) { - LOGGER.debug("Message is filtered"); - return; - } + if (Objects.isNull(filteredValue)) { + LOGGER.debug("Message is filtered"); + return; + } - for (MessageListener listener : listeners) { - try { - listener.handler(consumeTag, filteredValue); - } catch (Exception listenerExc) { - LOGGER.warn("Message listener from class '{}' threw exception", listener.getClass(), listenerExc); - } + for (MessageListener listener : listeners) { + try { + listener.handler(consumeTag, filteredValue); + } catch (Exception listenerExc) { + LOGGER.warn("Message listener from class '{}' threw exception", listener.getClass(), listenerExc); } } } catch (Exception e) { LOGGER.error("Can not parse value from delivery for: {}", consumeTag, e); - } finally { - processTimer.observeDuration(); } } private void resubscribe() { - SubscribeTarget target = subscribeTarget.get(); - var queue = target.getQueue(); - var routingKey = target.getRoutingKey(); - var exchangeName = target.getExchange(); - LOGGER.info("Try to resubscribe subscriber for exchangeName='{}', routing key='{}', queue name='{}'", exchangeName, routingKey, queue); + LOGGER.info("Try to resubscribe subscriber for queue name='{}'", queue); SubscriberMonitor monitor = consumerMonitor.getAndSet(null); if (monitor != null) { try { monitor.unsubscribe(); } catch (Exception e) { - LOGGER.info("Can not unsubscribe on resubscribe for exchangeName='{}', routing key='{}', queue name='{}'", exchangeName, routingKey, queue); + LOGGER.info("Can not unsubscribe on resubscribe for queue name='{}'", queue); } } try { start(); } catch (Exception e) { - LOGGER.error("Can not resubscribe subscriber for exchangeName='{}', routing key='{}', queue name='{}'", exchangeName, routingKey, queue); + LOGGER.error("Can not resubscribe subscriber for queue name='{}'", queue); healthMetrics.disable(); } } @@ -233,5 +248,4 @@ private void canceled(String consumerTag) { healthMetrics.getReadinessMonitor().disable(); resubscribe(); } - } diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchQueue.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchQueue.java deleted file mode 100644 index ec54e2aad..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchQueue.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.parsed; - -import com.exactpro.th2.common.grpc.MessageBatch; -import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageSender; -import com.exactpro.th2.common.schema.message.MessageSubscriber; -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitQueue; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import org.jetbrains.annotations.NotNull; - -public class RabbitParsedBatchQueue extends AbstractRabbitQueue { - - @Override - protected MessageSender createSender(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration) { - MessageSender result = new RabbitParsedBatchSender(); - result.init(connectionManager, queueConfiguration.getExchange(), queueConfiguration.getRoutingKey()); - return result; - } - - @Override - protected MessageSubscriber createSubscriber(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction) { - var result = new RabbitParsedBatchSubscriber(queueConfiguration.getFilters(), connectionManager.getConfiguration().getMessageRecursionLimit()); - result.init(connectionManager, - new SubscribeTarget(queueConfiguration.getQueue(), queueConfiguration.getRoutingKey(), queueConfiguration.getExchange()), - filterFunction); - return result; - } -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchRouter.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchRouter.java index 3d4378fc5..6369b2447 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchRouter.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchRouter.java @@ -15,60 +15,56 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.parsed; -import com.exactpro.th2.common.grpc.Message; -import com.exactpro.th2.common.grpc.MessageBatch; -import com.exactpro.th2.common.grpc.MessageBatch.Builder; -import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageQueue; -import com.exactpro.th2.common.schema.message.QueueAttribute; -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.router.AbstractRabbitBatchMessageRouter; +import java.util.Set; + import org.apache.commons.collections4.SetUtils; import org.jetbrains.annotations.NotNull; -import java.util.List; -import java.util.Set; - -public class RabbitParsedBatchRouter extends AbstractRabbitBatchMessageRouter { +import com.exactpro.th2.common.grpc.AnyMessage; +import com.exactpro.th2.common.grpc.MessageBatch; +import com.exactpro.th2.common.grpc.MessageGroup; +import com.exactpro.th2.common.grpc.MessageGroupBatch; +import com.exactpro.th2.common.message.MessageUtils; +import com.exactpro.th2.common.schema.message.QueueAttribute; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractGroupBatchAdapterRouter; +public class RabbitParsedBatchRouter extends AbstractGroupBatchAdapterRouter { private static final Set REQUIRED_SUBSCRIBE_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.PARSED.toString(), QueueAttribute.SUBSCRIBE.toString()); private static final Set REQUIRED_SEND_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.PARSED.toString(), QueueAttribute.PUBLISH.toString()); + @NotNull @Override - protected MessageQueue createQueue(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction) { - RabbitParsedBatchQueue queue = new RabbitParsedBatchQueue(); - queue.init(connectionManager, queueConfiguration, filterFunction); - return queue; - } - - @Override - protected Set requiredSubscribeAttributes() { - return REQUIRED_SUBSCRIBE_ATTRIBUTES; - } - - @Override - protected Set requiredSendAttributes() { + public Set getRequiredSendAttributes() { return REQUIRED_SEND_ATTRIBUTES; } + @NotNull @Override - protected List getMessages(MessageBatch batch) { - return batch.getMessagesList(); - } - - @Override - protected Builder createBatchBuilder() { - return MessageBatch.newBuilder(); + public Set getRequiredSubscribeAttributes() { + return REQUIRED_SUBSCRIBE_ATTRIBUTES; } @Override - protected void addMessage(Builder builder, Message message) { - builder.addMessages(message); + protected @NotNull MessageGroupBatch buildGroupBatch(MessageBatch messageBatch) { + var messageGroupBuilder = MessageGroup.newBuilder(); + messageBatch.getMessagesList().forEach(message -> + messageGroupBuilder.addMessages(AnyMessage.newBuilder().setMessage(message).build()) + ); + return MessageGroupBatch.newBuilder().addGroups(messageGroupBuilder).build(); } @Override - protected MessageBatch build(Builder builder) { + protected MessageBatch buildFromGroupBatch(@NotNull MessageGroupBatch groupBatch) { + var builder = MessageBatch.newBuilder(); + groupBatch.getGroupsList().stream() + .flatMap(messageGroup -> messageGroup.getMessagesList().stream()) + .peek(anyMessage -> { + if (!anyMessage.hasMessage()) { + throw new IllegalStateException("Message group batch contains not parsed message: " + MessageUtils.toJson(groupBatch)); + } + }) + .map(AnyMessage::getMessage) + .forEach(builder::addMessages); return builder.build(); } } diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchSender.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchSender.java deleted file mode 100644 index ec905e104..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchSender.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.parsed; - -import static com.exactpro.th2.common.message.MessageUtils.getDebugString; - -import java.util.stream.Collectors; - -import com.exactpro.th2.common.grpc.AnyMessage; -import com.exactpro.th2.common.grpc.MessageBatch; -import com.exactpro.th2.common.grpc.MessageGroup; -import com.exactpro.th2.common.grpc.MessageGroupBatch; -import com.exactpro.th2.common.message.MessageUtils; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender; - -import io.prometheus.client.Counter; - -public class RabbitParsedBatchSender extends AbstractRabbitSender { - - private static final Counter OUTGOING_PARSED_MSG_BATCH_QUANTITY = Counter.build("th2_mq_outgoing_parsed_msg_batch_quantity", "Quantity of outgoing parsed message batches").register(); - private static final Counter OUTGOING_PARSED_MSG_QUANTITY = Counter.build("th2_mq_outgoing_parsed_msg_quantity", "Quantity of outgoing parsed messages").register(); - - @Override - protected Counter getDeliveryCounter() { - return OUTGOING_PARSED_MSG_BATCH_QUANTITY; - } - - @Override - protected Counter getContentCounter() { - return OUTGOING_PARSED_MSG_QUANTITY; - } - - @Override - protected int extractCountFrom(MessageBatch batch) { - return batch.getMessagesCount(); - } - - @Override - protected byte[] valueToBytes(MessageBatch value) { - var groupBuilder = MessageGroup.newBuilder(); - - for (var message : value.getMessagesList()) { - var anyMessage = AnyMessage.newBuilder().setMessage(message).build(); - groupBuilder.addMessages(anyMessage); - } - - var batchBuilder = MessageGroupBatch.newBuilder(); - var group = groupBuilder.build(); - - return batchBuilder.addGroups(group).build().toByteArray(); - } - - @Override - protected String toShortTraceString(MessageBatch value) { - return MessageUtils.toJson(value); - } - - @Override - protected String toShortDebugString(MessageBatch value) { - return getDebugString(getClass().getSimpleName(), - value.getMessagesList().stream().map(message -> message.getMetadata().getId()).collect(Collectors.toList())); - } -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchSubscriber.java deleted file mode 100644 index 965ba90df..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/parsed/RabbitParsedBatchSubscriber.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.parsed; - -import static com.exactpro.th2.common.message.MessageUtils.getDebugString; -import static com.exactpro.th2.common.message.MessageUtils.getSessionAliasAndDirection; -import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_BUCKETS; -import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_DIRECTION_LABEL_NAME; -import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_SESSION_ALIAS_LABEL_NAME; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import com.exactpro.th2.common.grpc.AnyMessage; -import com.exactpro.th2.common.grpc.AnyMessage.KindCase; -import com.exactpro.th2.common.grpc.Message; -import com.exactpro.th2.common.grpc.MessageBatch; -import com.exactpro.th2.common.grpc.MessageID; -import com.exactpro.th2.common.message.MessageUtils; -import com.exactpro.th2.common.schema.message.configuration.RouterFilter; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitBatchSubscriber; - -import io.prometheus.client.Counter; -import io.prometheus.client.Histogram; - -public class RabbitParsedBatchSubscriber extends AbstractRabbitBatchSubscriber { - - private static final Counter INCOMING_PARSED_MSG_BATCH_QUANTITY = Counter.build() - .name("th2_mq_incoming_parsed_msg_batch_quantity") - .labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME) - .help("Quantity of incoming parsed message batches") - .register(); - private static final Counter INCOMING_PARSED_MSG_QUANTITY = Counter.build() - .name("th2_mq_incoming_parsed_msg_quantity") - .labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME) - .help("Quantity of incoming parsed messages") - .register(); - private static final Histogram PARSED_MSG_PROCESSING_TIME = Histogram.build() - .buckets(DEFAULT_BUCKETS) - .name("th2_mq_parsed_msg_processing_time") - .help("Time of processing parsed messages") - .register(); - - @Override - protected Counter getDeliveryCounter() { - return INCOMING_PARSED_MSG_BATCH_QUANTITY; - } - - @Override - protected Counter getContentCounter() { - return INCOMING_PARSED_MSG_QUANTITY; - } - - @Override - protected Histogram getProcessingTimer() { - return PARSED_MSG_PROCESSING_TIME; - } - - @Override - protected String[] extractLabels(MessageBatch batch) { - MessageID messageID = getMessages(batch).get(0).getMetadata().getId(); - return getSessionAliasAndDirection(messageID); - } - - @Override - protected int extractCountFrom(MessageBatch batch) { - return batch.getMessagesCount(); - } - - public RabbitParsedBatchSubscriber(List filters, int messageRecursionLimit) { - super(filters, messageRecursionLimit); - } - - @Override - protected List valueFromBytes(byte[] body) throws Exception { - var groupBatch = parseEncodedBatch(body); - var messageGroups = groupBatch.getGroupsList(); - var parsedBatches = new ArrayList(messageGroups.size()); - - for (var group : messageGroups) { - var builder = MessageBatch.newBuilder(); - - for (AnyMessage message : group.getMessagesList()) { - if (message.getKindCase() != KindCase.MESSAGE) { - throw new IllegalStateException("Message group batch contains raw messages: " + MessageUtils.toJson(groupBatch)); - } - - builder.addMessages(message.getMessage()); - } - - parsedBatches.add(builder.build()); - } - - return parsedBatches; - } - - @Override - protected List getMessages(MessageBatch batch) { - return batch.getMessagesList(); - } - - @Override - protected MessageBatch createBatch(List messages) { - return MessageBatch.newBuilder().addAllMessages(messages).build(); - } - - @Override - protected String toShortTraceString(MessageBatch value) { - return MessageUtils.toJson(value); - } - - @Override - protected String toShortDebugString(MessageBatch value) { - return getDebugString(getClass().getSimpleName(), - value.getMessagesList().stream().map(message -> message.getMetadata().getId()).collect(Collectors.toList())); - } - - @Override - protected Metadata extractMetadata(Message message) { - var metadata = message.getMetadata(); - var messageID = metadata.getId(); - return new Metadata( - messageID.getSequence(), - metadata.getMessageType(), - messageID.getDirection(), - messageID.getConnectionId().getSessionAlias() - ); - } - -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchQueue.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchQueue.java deleted file mode 100644 index c68b3b2d1..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchQueue.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.raw; - -import com.exactpro.th2.common.grpc.RawMessageBatch; -import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageSender; -import com.exactpro.th2.common.schema.message.MessageSubscriber; -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitQueue; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import org.jetbrains.annotations.NotNull; - -public class RabbitRawBatchQueue extends AbstractRabbitQueue { - - @Override - protected MessageSender createSender(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration) { - var result = new RabbitRawBatchSender(); - result.init(connectionManager, queueConfiguration.getExchange(), queueConfiguration.getRoutingKey()); - return result; - } - - @Override - protected MessageSubscriber createSubscriber(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction) { - var result = new RabbitRawBatchSubscriber(queueConfiguration.getFilters(), connectionManager.getConfiguration().getMessageRecursionLimit()); - result.init(connectionManager, - new SubscribeTarget(queueConfiguration.getQueue(), queueConfiguration.getRoutingKey(), queueConfiguration.getExchange()), - filterFunction); - return result; - } -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchRouter.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchRouter.java index d135f57c2..75a77f812 100644 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchRouter.java +++ b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchRouter.java @@ -15,68 +15,58 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.raw; -import com.exactpro.th2.common.grpc.RawMessage; -import com.exactpro.th2.common.grpc.RawMessageBatch; -import com.exactpro.th2.common.grpc.RawMessageBatch.Builder; -import com.exactpro.th2.common.schema.filter.strategy.FilterStrategy; -import com.exactpro.th2.common.schema.filter.strategy.impl.Th2RawMsgFilterStrategy; -import com.exactpro.th2.common.schema.message.FilterFunction; -import com.exactpro.th2.common.schema.message.MessageQueue; -import com.exactpro.th2.common.schema.message.QueueAttribute; -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.router.AbstractRabbitBatchMessageRouter; -import com.google.protobuf.Message; +import java.util.Set; + import org.apache.commons.collections4.SetUtils; import org.jetbrains.annotations.NotNull; -import java.util.List; -import java.util.Set; - -public class RabbitRawBatchRouter extends AbstractRabbitBatchMessageRouter { +import com.exactpro.th2.common.grpc.AnyMessage; +import com.exactpro.th2.common.grpc.MessageGroup; +import com.exactpro.th2.common.grpc.MessageGroupBatch; +import com.exactpro.th2.common.grpc.RawMessageBatch; +import com.exactpro.th2.common.message.MessageUtils; +import com.exactpro.th2.common.schema.message.QueueAttribute; +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractGroupBatchAdapterRouter; +public class RabbitRawBatchRouter extends AbstractGroupBatchAdapterRouter { private static final Set REQUIRED_SUBSCRIBE_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.RAW.toString(), QueueAttribute.SUBSCRIBE.toString()); private static final Set REQUIRED_SEND_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.RAW.toString(), QueueAttribute.PUBLISH.toString()); + @NotNull @Override - protected @NotNull FilterStrategy getDefaultFilterStrategy() { - return new Th2RawMsgFilterStrategy(); - } - - @Override - protected MessageQueue createQueue(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction) { - RabbitRawBatchQueue queue = new RabbitRawBatchQueue(); - queue.init(connectionManager, queueConfiguration, filterFunction); - return queue; - } - - @Override - protected Set requiredSubscribeAttributes() { - return REQUIRED_SUBSCRIBE_ATTRIBUTES; - } - - @Override - protected Set requiredSendAttributes() { + public Set getRequiredSendAttributes() { return REQUIRED_SEND_ATTRIBUTES; } + @NotNull @Override - protected List getMessages(RawMessageBatch batch) { - return batch.getMessagesList(); - } - - @Override - protected Builder createBatchBuilder() { - return RawMessageBatch.newBuilder(); + public Set getRequiredSubscribeAttributes() { + return REQUIRED_SUBSCRIBE_ATTRIBUTES; } @Override - protected void addMessage(Builder builder, RawMessage message) { - builder.addMessages(message); + protected @NotNull MessageGroupBatch buildGroupBatch(RawMessageBatch rawMessageBatch) { + var messageGroupBatchBuilder = MessageGroupBatch.newBuilder(); + rawMessageBatch.getMessagesList().forEach(rawMessage -> + messageGroupBatchBuilder.addGroups( + MessageGroup.newBuilder().addMessages(AnyMessage.newBuilder().setRawMessage(rawMessage).build()) + ) + ); + return messageGroupBatchBuilder.build(); } @Override - protected RawMessageBatch build(Builder builder) { + protected RawMessageBatch buildFromGroupBatch(@NotNull MessageGroupBatch groupBatch) { + var builder = RawMessageBatch.newBuilder(); + groupBatch.getGroupsList().stream() + .flatMap(messageGroup -> messageGroup.getMessagesList().stream()) + .peek(anyMessage -> { + if (!anyMessage.hasRawMessage()) { + throw new IllegalStateException("Message group batch contains not raw message: " + MessageUtils.toJson(groupBatch)); + } + }) + .map(AnyMessage::getRawMessage) + .forEach(builder::addMessages); return builder.build(); } } diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchSender.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchSender.java deleted file mode 100644 index a6ece8f68..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchSender.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.raw; - -import static com.exactpro.th2.common.message.MessageUtils.getDebugString; - -import java.util.stream.Collectors; - -import com.exactpro.th2.common.grpc.AnyMessage; -import com.exactpro.th2.common.grpc.MessageGroup; -import com.exactpro.th2.common.grpc.MessageGroupBatch; -import com.exactpro.th2.common.grpc.RawMessageBatch; -import com.exactpro.th2.common.message.MessageUtils; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender; - -import io.prometheus.client.Counter; - -public class RabbitRawBatchSender extends AbstractRabbitSender { - - private static final Counter OUTGOING_RAW_MSG_BATCH_QUANTITY = Counter.build("th2_mq_outgoing_raw_msg_batch_quantity", "Quantity of outgoing raw message batches").register(); - private static final Counter OUTGOING_RAW_MSG_QUANTITY = Counter.build("th2_mq_outgoing_raw_msg_quantity", "Quantity of outgoing raw messages").register(); - - @Override - protected Counter getDeliveryCounter() { - return OUTGOING_RAW_MSG_BATCH_QUANTITY; - } - - @Override - protected Counter getContentCounter() { - return OUTGOING_RAW_MSG_QUANTITY; - } - - @Override - protected int extractCountFrom(RawMessageBatch batch) { - return batch.getMessagesCount(); - } - - @Override - protected byte[] valueToBytes(RawMessageBatch value) { - var batchBuilder = MessageGroupBatch.newBuilder(); - - for (var rawMessage : value.getMessagesList()) { - var anyMessage = AnyMessage.newBuilder().setRawMessage(rawMessage).build(); - var group = MessageGroup.newBuilder().addMessages(anyMessage).build(); - batchBuilder.addGroups(group); - } - - return batchBuilder.build().toByteArray(); - } - - @Override - protected String toShortTraceString(RawMessageBatch value) { - return MessageUtils.toJson(value); - } - - @Override - protected String toShortDebugString(RawMessageBatch value) { - return getDebugString(getClass().getSimpleName(), - value.getMessagesList().stream().map(message -> message.getMetadata().getId()).collect(Collectors.toList())); - } -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchSubscriber.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchSubscriber.java deleted file mode 100644 index 94c27c7ba..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/raw/RabbitRawBatchSubscriber.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.raw; - -import static com.exactpro.th2.common.message.MessageUtils.getDebugString; -import static com.exactpro.th2.common.message.MessageUtils.getSessionAliasAndDirection; -import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_BUCKETS; -import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_DIRECTION_LABEL_NAME; -import static com.exactpro.th2.common.metrics.CommonMetrics.DEFAULT_SESSION_ALIAS_LABEL_NAME; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import com.exactpro.th2.common.grpc.AnyMessage; -import com.exactpro.th2.common.grpc.AnyMessage.KindCase; -import com.exactpro.th2.common.grpc.MessageID; -import com.exactpro.th2.common.grpc.RawMessage; -import com.exactpro.th2.common.grpc.RawMessageBatch; -import com.exactpro.th2.common.message.MessageUtils; -import com.exactpro.th2.common.schema.message.configuration.RouterFilter; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitBatchSubscriber; - -import io.prometheus.client.Counter; -import io.prometheus.client.Histogram; - -public class RabbitRawBatchSubscriber extends AbstractRabbitBatchSubscriber { - - private static final Counter INCOMING_RAW_MSG_BATCH_QUANTITY = Counter.build() - .name("th2_mq_incoming_raw_msg_batch_quantity") - .labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME) - .help("Quantity of incoming raw message batches") - .register(); - private static final Counter INCOMING_RAW_MSG_QUANTITY = Counter.build() - .name("th2_mq_incoming_raw_msg_quantity") - .labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME) - .help("Quantity of incoming raw messages") - .register(); - private static final Histogram RAW_MSG_PROCESSING_TIME = Histogram.build() - .buckets(DEFAULT_BUCKETS) - .name("th2_mq_raw_msg_processing_time") - .help("Time of processing raw messages") - .register(); - - private static final String MESSAGE_TYPE = "raw"; - - public RabbitRawBatchSubscriber(List filters, int messageRecursionLimit) { - super(filters, messageRecursionLimit); - } - - @Override - protected Counter getDeliveryCounter() { - return INCOMING_RAW_MSG_BATCH_QUANTITY; - } - - @Override - protected Counter getContentCounter() { - return INCOMING_RAW_MSG_QUANTITY; - } - - @Override - protected Histogram getProcessingTimer() { - return RAW_MSG_PROCESSING_TIME; - } - - @Override - protected String[] extractLabels(RawMessageBatch batch) { - MessageID messageID = getMessages(batch).get(0).getMetadata().getId(); - return getSessionAliasAndDirection(messageID); - } - - @Override - protected int extractCountFrom(RawMessageBatch batch) { - return batch.getMessagesCount(); - } - - @Override - protected List valueFromBytes(byte[] body) throws Exception { - var groupBatch = parseEncodedBatch(body); - var messageGroups = groupBatch.getGroupsList(); - var rawBatches = new ArrayList(messageGroups.size()); - - for (var group : messageGroups) { - var builder = RawMessageBatch.newBuilder(); - - for (AnyMessage message : group.getMessagesList()) { - if (message.getKindCase() != KindCase.RAW_MESSAGE) { - throw new IllegalStateException("Message group batch contains parsed messages: " + MessageUtils.toJson(groupBatch)); - } - - builder.addMessages(message.getRawMessage()); - } - - rawBatches.add(builder.build()); - } - - return rawBatches; - } - - @Override - protected List getMessages(RawMessageBatch batch) { - return batch.getMessagesList(); - } - - @Override - protected RawMessageBatch createBatch(List messages) { - return RawMessageBatch.newBuilder().addAllMessages(messages).build(); - } - - @Override - protected String toShortTraceString(RawMessageBatch value) { - return MessageUtils.toJson(value); - } - - @Override - protected String toShortDebugString(RawMessageBatch value) { - return getDebugString(getClass().getSimpleName(), - value.getMessagesList().stream().map(message -> message.getMetadata().getId()).collect(Collectors.toList())); - } - - @Override - protected Metadata extractMetadata(RawMessage message) { - var metadata = message.getMetadata(); - var messageID = metadata.getId(); - return new Metadata( - messageID.getSequence(), - MESSAGE_TYPE, - messageID.getDirection(), - messageID.getConnectionId().getSessionAlias() - ); - } - -} diff --git a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/router/AbstractRabbitBatchMessageRouter.java b/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/router/AbstractRabbitBatchMessageRouter.java deleted file mode 100644 index 9ee782914..000000000 --- a/src/main/java/com/exactpro/th2/common/schema/message/impl/rabbitmq/router/AbstractRabbitBatchMessageRouter.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.router; - -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitMessageRouter; -import com.google.protobuf.Message; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; - -public abstract class AbstractRabbitBatchMessageRouter extends AbstractRabbitMessageRouter { - - @Override - protected Map findQueueByFilter(Map queues, MB batch) { - - Map result = new HashMap<>(); - - for (var message : getMessages(batch)) { - - for (var queueAlias : filter(queues, message)) { - - result.putIfAbsent(queueAlias, createBatchBuilder()); - - addMessage(result.get(queueAlias), message); - - } - - } - - return result.entrySet().stream() - .collect(Collectors.toMap(Entry::getKey, value -> build(value.getValue()))); - } - - protected Set filter(Map queues, Message message) { - - var aliases = new HashSet(); - - for (var queueEntry : queues.entrySet()) { - - var queueAlias = queueEntry.getKey(); - var filters = queueEntry.getValue().getFilters(); - - if (filters.isEmpty() || filterMessage(message, filters)) { - aliases.add(queueAlias); - } - - } - - return aliases; - } - - - protected abstract List getMessages(MB batch); - - protected abstract MBB createBatchBuilder(); - - protected abstract void addMessage(MBB builder, M message); - - protected abstract MB build(MBB builder); - -} diff --git a/src/main/kotlin/com/exactpro/th2/common/message/MessageUtils.kt b/src/main/kotlin/com/exactpro/th2/common/message/MessageUtils.kt index 94d0b416d..5328995ac 100644 --- a/src/main/kotlin/com/exactpro/th2/common/message/MessageUtils.kt +++ b/src/main/kotlin/com/exactpro/th2/common/message/MessageUtils.kt @@ -67,6 +67,7 @@ import com.exactpro.th2.common.value.updateOrAddMessage import com.exactpro.th2.common.value.updateOrAddString import com.exactpro.th2.common.value.updateString import com.google.protobuf.Duration +import com.google.protobuf.TextFormat.shortDebugString import com.google.protobuf.Timestamp import com.google.protobuf.util.JsonFormat import java.math.BigDecimal @@ -332,14 +333,19 @@ var Message.Builder.sequence fun getSessionAliasAndDirection(messageID: MessageID): Array = arrayOf(messageID.connectionId.sessionAlias, messageID.direction.name) -private val unknownLabels = arrayOf("unknown", "unknown") - fun getSessionAliasAndDirection(anyMessage: AnyMessage): Array = when { anyMessage.hasMessage() -> getSessionAliasAndDirection(anyMessage.message.metadata.id) anyMessage.hasRawMessage() -> getSessionAliasAndDirection(anyMessage.rawMessage.metadata.id) - else -> unknownLabels + else -> error("Message ${shortDebugString(anyMessage)} doesn't have message or rawMessage") } +val AnyMessage.sequence: Long + get() = when { + hasMessage() -> message.metadata.id.sequence + hasRawMessage() -> rawMessage.metadata.id.sequence + else -> error("Message ${shortDebugString(this)} doesn't have message or rawMessage") + } + fun getDebugString(className: String, ids: List): String { val sessionAliasAndDirection = getSessionAliasAndDirection(ids[0]) val sequences = ids.joinToString { it.sequence.toString() } diff --git a/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt b/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt index e3bebc8d5..8125e5587 100644 --- a/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt +++ b/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt @@ -42,8 +42,14 @@ import java.util.concurrent.atomic.AtomicInteger @JvmField val DEFAULT_BUCKETS = doubleArrayOf(0.000_25, 0.000_5, 0.001, 0.005, 0.010, 0.015, 0.025, 0.050, 0.100, 0.250, 0.500, 1.0) -const val DEFAULT_SESSION_ALIAS_LABEL_NAME : String = "session_alias" -const val DEFAULT_DIRECTION_LABEL_NAME : String = "direction" +const val TH2_PIN_LABEL = "th2_pin" +const val TH2_TYPE_LABEL = "th2_type" +const val EXCHANGE_LABEL = "exchange" +const val QUEUE_LABEL = "queue" +const val ROUTING_KEY_LABEL = "routing_key" +const val SESSION_ALIAS_LABEL = "session_alias" +const val DIRECTION_LABEL = "direction" +const val MESSAGE_TYPE_LABEL = "message_type" private val LIVENESS_ARBITER = AggregatingMetric(listOf(PrometheusMetric("th2_liveness", "Service liveness"), FileMetric( "healthy"))) private val READINESS_ARBITER = AggregatingMetric(listOf(PrometheusMetric("th2_readiness", "Service readiness"), FileMetric( "ready"))) diff --git a/src/main/kotlin/com/exactpro/th2/common/metrics/MetricsUtils.kt b/src/main/kotlin/com/exactpro/th2/common/metrics/MetricsUtils.kt new file mode 100644 index 000000000..ead708ff0 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/metrics/MetricsUtils.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.metrics + +import com.exactpro.th2.common.grpc.AnyMessage +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.message.getSessionAliasAndDirection +import com.exactpro.th2.common.message.sequence +import io.prometheus.client.Counter +import io.prometheus.client.Gauge + +fun incrementTotalMetrics( + batch: MessageGroupBatch, + th2Pin: String, + messageCounter: Counter, + groupCounter: Counter, + gauge: Gauge +) { + val groupsBySessionAliasAndDirection = mutableMapOf() + batch.groupsList.forEach { group -> + if (group.messagesList.isNotEmpty()) { + val aliasAndDirectionArray = getSessionAliasAndDirection(group.messagesList[0]) + incrementMetricByMessages( + group.messagesList, + th2Pin, + messageCounter, + aliasAndDirectionArray[0], + aliasAndDirectionArray[1] + ) + + val aliasAndDirection = SessionAliasAndDirection(aliasAndDirectionArray[0], aliasAndDirectionArray[1]) + groupsBySessionAliasAndDirection + .computeIfAbsent(aliasAndDirection) { InternalCounter() } + .inc() + + gauge + .labels(th2Pin, *aliasAndDirection.labels) + .set(group.messagesList[0].sequence.toDouble()) + } + } + groupsBySessionAliasAndDirection.forEach { + groupCounter + .labels(th2Pin, *it.key.labels) + .inc(it.value.number.toDouble()) + } +} + +fun incrementDroppedMetrics( + messages: List, + th2Pin: String, + messageCounter: Counter, + groupCounter: Counter +) { + if (messages.isNotEmpty()) { + val aliasAndDirectionArray = getSessionAliasAndDirection(messages[0]) + incrementMetricByMessages( + messages, + th2Pin, + messageCounter, + aliasAndDirectionArray[0], + aliasAndDirectionArray[1] + ) + groupCounter + .labels(th2Pin, aliasAndDirectionArray[0], aliasAndDirectionArray[1]) + .inc() + } +} + +private fun incrementMetricByMessages( + messages: List, + th2Pin: String, + counter: Counter, + sessionAlias: String, + direction: String +) { + messages + .groupingBy { it.kindCase.name } + .eachCount() + .forEach { + counter + .labels(th2Pin, sessionAlias, direction, it.key) + .inc(it.value.toDouble()) + } +} + +private data class SessionAliasAndDirection(val sessionAlias: String, val direction: String) { + val labels = arrayOf(sessionAlias, direction) +} + +private class InternalCounter { + var number: Int = 0 + fun inc() = number++ +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/AnyMessageFilterStrategy.kt b/src/main/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/AnyMessageFilterStrategy.kt index 0ce4aea09..d39f229d1 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/AnyMessageFilterStrategy.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/AnyMessageFilterStrategy.kt @@ -33,7 +33,7 @@ class AnyMessageFilterStrategy : AbstractFilterStrategy() { val metadata = message.message.metadata result[AbstractTh2MsgFilterStrategy.SESSION_ALIAS_KEY] = metadata.id.connectionId.sessionAlias - result[AbstractTh2MsgFilterStrategy.MESSAGE_TYPE_KEY] = message.message.descriptorForType.name + result[AbstractTh2MsgFilterStrategy.MESSAGE_TYPE_KEY] = metadata.messageType result[AbstractTh2MsgFilterStrategy.DIRECTION_KEY] = metadata.id.direction.name } message.hasRawMessage() -> { diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/Th2RawMsgFilterStrategy.kt b/src/main/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/Th2RawMsgFilterStrategy.kt deleted file mode 100644 index ae33c1e91..000000000 --- a/src/main/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/Th2RawMsgFilterStrategy.kt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.common.schema.filter.strategy.impl - -import com.exactpro.th2.common.grpc.RawMessage -import com.exactpro.th2.common.message.toJson -import com.google.protobuf.Message - -class Th2RawMsgFilterStrategy : AbstractFilterStrategy() { - override fun getFields(message: Message): MutableMap { - check(message is RawMessage) { "Message is not an ${RawMessage::class.qualifiedName}: ${message.toJson()}" } - val metadata = message.metadata - return mutableMapOf( - AbstractTh2MsgFilterStrategy.SESSION_ALIAS_KEY to metadata.id.connectionId.sessionAlias, - AbstractTh2MsgFilterStrategy.DIRECTION_KEY to metadata.id.direction.name - ) - } -} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterUtils.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterUtils.kt index 0bbef8a84..ea514a411 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterUtils.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/MessageRouterUtils.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,4 +59,16 @@ fun MessageRouter.storeEvent( } @Deprecated(message = "Please use MessageUtils.toJson", replaceWith = ReplaceWith("toJson(true)", imports = ["com.exactpro.th2.common.message.toJson"]), level = DeprecationLevel.WARNING) -fun MessageOrBuilder.toJson() : String = toJson(true) \ No newline at end of file +fun MessageOrBuilder.toJson() : String = toJson(true) + +fun appendAttributes( + vararg attributes: String, + requiredAttributes: () -> Set +): Set { + if (attributes.isEmpty()) { + return requiredAttributes() + } + return mutableSetOf(*attributes).apply { + addAll(requiredAttributes()) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractGroupBatchAdapterRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractGroupBatchAdapterRouter.kt new file mode 100644 index 000000000..f2a98ddcd --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractGroupBatchAdapterRouter.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.schema.message.impl.rabbitmq + +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.schema.message.MessageListener +import com.exactpro.th2.common.schema.message.MessageRouter +import com.exactpro.th2.common.schema.message.MessageRouterContext +import com.exactpro.th2.common.schema.message.SubscriberMonitor +import com.exactpro.th2.common.schema.message.appendAttributes + +abstract class AbstractGroupBatchAdapterRouter : MessageRouter { + private lateinit var groupBatchRouter: MessageRouter + + abstract fun getRequiredSendAttributes(): Set + + abstract fun getRequiredSubscribeAttributes(): Set + + protected abstract fun buildGroupBatch(batch: T): MessageGroupBatch + + protected abstract fun buildFromGroupBatch(groupBatch: MessageGroupBatch): T + + override fun init(context: MessageRouterContext) {} + + override fun init(context: MessageRouterContext, groupBatchRouter: MessageRouter) { + this.groupBatchRouter = groupBatchRouter + } + + override fun subscribe(callback: MessageListener, vararg attributes: String): SubscriberMonitor? { + return groupBatchRouter.subscribe( + MessageListener { consumerTag: String, message: MessageGroupBatch -> + callback.handler(consumerTag, buildFromGroupBatch(message)) + }, + *appendAttributes(*attributes) { getRequiredSubscribeAttributes() }.toTypedArray() + ) + } + + override fun subscribeAll(callback: MessageListener, vararg attributes: String): SubscriberMonitor? { + return groupBatchRouter.subscribeAll( + MessageListener { consumerTag: String, message: MessageGroupBatch -> + callback.handler(consumerTag, buildFromGroupBatch(message)) + }, + *appendAttributes(*attributes) { getRequiredSubscribeAttributes() }.toTypedArray() + ) + } + + override fun send(messageBatch: T, vararg attributes: String) { + groupBatchRouter.send( + buildGroupBatch(messageBatch), + *appendAttributes(*attributes) { getRequiredSendAttributes() }.toTypedArray() + ) + } + + override fun sendAll(messageBatch: T, vararg attributes: String) { + groupBatchRouter.sendAll( + buildGroupBatch(messageBatch), + *appendAttributes(*attributes) { getRequiredSendAttributes() }.toTypedArray() + ) + } + + override fun close() { + groupBatchRouter.close() + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt new file mode 100644 index 000000000..a4184564f --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/AbstractRabbitRouter.kt @@ -0,0 +1,242 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.common.schema.message.impl.rabbitmq + +import com.exactpro.th2.common.schema.exception.RouterException +import com.exactpro.th2.common.schema.filter.strategy.FilterStrategy +import com.exactpro.th2.common.schema.message.* +import com.exactpro.th2.common.schema.message.QueueAttribute.PUBLISH +import com.exactpro.th2.common.schema.message.QueueAttribute.SUBSCRIBE +import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfiguration +import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration +import com.exactpro.th2.common.schema.message.configuration.RouterFilter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.google.protobuf.Message +import mu.KotlinLogging +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicReference + +typealias PinName = String +typealias PinConfiguration = QueueConfiguration +typealias Queue = String +typealias RoutingKey = String + +abstract class AbstractRabbitRouter : MessageRouter { + private val _context = AtomicReference() + protected var context: MessageRouterContext + get() = checkNotNull(_context.get()) { "Router didn't initialized yet" } + private set(context) = check(_context.compareAndSet(null, context)) { + "Router is already initialized" + } + + private val configuration: MessageRouterConfiguration + get() = context.configuration + + protected val connectionManager: ConnectionManager + get() = context.connectionManager + + private val subscribers = ConcurrentHashMap>() + private val senders = ConcurrentHashMap>() + + private val filterStrategy = AtomicReference>(getDefaultFilterStrategy()) + + protected open fun getDefaultFilterStrategy(): FilterStrategy { + return FilterStrategy.DEFAULT_FILTER_STRATEGY + } + + protected open fun filterMessage(msg: Message, filters: List): Boolean { + return filterStrategy.get().verify(msg, filters) + } + + override fun init(context: MessageRouterContext) { + this.context = context + } + + override fun send(message: T, vararg attributes: String) { + val pintAttributes: Set = appendAttributes(*attributes) { getRequiredSendAttributes() } + send(message, pintAttributes) { + check(size == 1) { + "Found incorrect number of pins ${map(Triple::first)} to the send operation by attributes $pintAttributes and filters, expected 1, actual $size" + } + } + } + + override fun sendAll(message: T, vararg attributes: String) { + val pintAttributes: Set = appendAttributes(*attributes) { getRequiredSendAttributes() } + send(message, pintAttributes) { + check(isNotEmpty()) { + "Found incorrect number of pins ${map(Triple::first)} to send all operation by attributes $pintAttributes and filters, expected 1 or more, actual $size" + } + } + } + + override fun subscribe(callback: MessageListener, vararg attributes: String): SubscriberMonitor { + val pintAttributes: Set = appendAttributes(*attributes) { getRequiredSubscribeAttributes() } + return subscribe(pintAttributes, callback) { + check(size == 1) { + "Found incorrect number of pins ${map(Pair::first)} to subscribe operation by attributes $pintAttributes and filters, expected 1, actual $size" + } + } + } + + override fun subscribeAll(callback: MessageListener, vararg attributes: String): SubscriberMonitor? { + val pintAttributes: Set = appendAttributes(*attributes) { getRequiredSubscribeAttributes() } + return subscribe(pintAttributes, callback) { + check(isNotEmpty()) { + "Found incorrect number of pins ${map(Pair::first)} to subscribe all operation by attributes $pintAttributes and filters, expected 1 or more, actual $size" + } + } + } + + override fun close() { + LOGGER.info("Closing message router") + + val exceptions = mutableListOf() + subscribers.values.forEach { subscriber -> + runCatching(subscriber::close) + .onFailure { exceptions.add(it) } + } + subscribers.clear() + + checkOrThrow("Can not close message router", exceptions) + LOGGER.info("Message router has been successfully closed") + } + + /** + * Prepares the message to send via the specified pin. + * An implementation can rebuild the passed message according to pin configuration and returns a new instance of a message. + * @param message a source message which can be reduced according to pin configuration + * @return the source message, part of them, or null if the message is matched to the pin configuration: fully, partially, not match accordingly + */ + protected abstract fun splitAndFilter(message: T, pinConfiguration: PinConfiguration, pinName: PinName): T + + /** + * Returns default set of attributes for send operations + */ + protected open fun getRequiredSendAttributes() = REQUIRED_SEND_ATTRIBUTES + + /** + * Returns default set of attributes for subscribe operations + */ + protected open fun getRequiredSubscribeAttributes() = REQUIRED_SUBSCRIBE_ATTRIBUTES + + //TODO: implement common sender + protected abstract fun createSender(pinConfig: PinConfiguration, pinName: PinName): MessageSender + + //TODO: implement common subscriber + protected abstract fun createSubscriber(pinConfig: PinConfiguration, pinName: PinName): MessageSubscriber + + protected abstract fun T.toErrorString(): String + + private fun send( + message: T, pintAttributes: Set, + check: List>.() -> Unit + ) { + val packages: List> = configuration.queues.asSequence() + .filter { it.value.attributes.containsAll(pintAttributes) } + .map { (pinName, pinConfig) -> + Triple(pinName, pinConfig, splitAndFilter(message, pinConfig, pinName)) + } + .toList() + .also(check) + + val exceptions: MutableMap = mutableMapOf() + packages.forEach { (pinName: PinName, pinConfig: PinConfiguration, message: T) -> + try { + senders.getSender(pinName, pinConfig) + .send(message) + } catch (e: Exception) { + LOGGER.error(e) { "Message ${message.toErrorString()} can't be send through the $pinName pin" } + exceptions[pinName] = e + } + } + checkOrThrow("Can't send to pin(s): ${exceptions.keys}", exceptions.values) + } + + private fun subscribe( + pintAttributes: Set, + messageListener: MessageListener, + check: List>.() -> Unit + ): SubscriberMonitor { + val packages: List> = configuration.queues.asSequence() + .filter { it.value.attributes.containsAll(pintAttributes) } + .map { (pinName, pinConfig) -> Pair(pinName, pinConfig) } + .toList() + .also(check) + + //TODO: catch exceptions during subscriptions and roll back + val exceptions: MutableMap = mutableMapOf() + val monitors: MutableList = mutableListOf() + packages.forEach { (pinName: PinName, pinConfig: PinConfiguration) -> + runCatching { + subscribers.getSubscriber(pinName, pinConfig).apply { + addListener(messageListener) + start() //TODO: replace to lazy start on add listener(s) + } + }.onFailure { e -> + LOGGER.error(e) { "Listener can't be subscribed via the $pinName pin" } + exceptions[pinName] = e + }.onSuccess { + monitors.add(SubscriberMonitor { close() }) + } + } + + checkOrThrow("Can't subscribe to pin(s): ${exceptions.keys}", exceptions.values) + + return when (monitors.size) { + 1 -> monitors[0] + else -> SubscriberMonitor { + monitors.forEach(SubscriberMonitor::unsubscribe) + } + } + } + + private fun checkOrThrow(message: String, exceptions: Collection) { + if (exceptions.isNotEmpty()) { + throw RouterException(message).apply { + exceptions.forEach(this::addSuppressed) + } + } + } + + private fun ConcurrentHashMap>.getSender( + pinName: PinName, + pinConfig: PinConfiguration + ): MessageSender = computeIfAbsent(pinConfig.routingKey) { + check(pinConfig.isWritable) { + "The $pinName isn't writable, configuration: $pinConfig" + } + + return@computeIfAbsent createSender(pinConfig, pinName) + } + + private fun ConcurrentHashMap>.getSubscriber( + pinName: PinName, + pinConfig: PinConfiguration + ): MessageSubscriber = computeIfAbsent(pinConfig.queue) { + check(pinConfig.isReadable) { + "The $pinName isn't readable, configuration: $pinConfig" + } + + return@computeIfAbsent createSubscriber(pinConfig, pinName) + } + + companion object { + private val LOGGER = KotlinLogging.logger {} + + private val REQUIRED_SEND_ATTRIBUTES = setOf(PUBLISH.toString()) + private val REQUIRED_SUBSCRIBE_ATTRIBUTES = setOf(SUBSCRIBE.toString()) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/SubscribeTarget.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/SubscribeTarget.kt index aae280528..442ff3b14 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/SubscribeTarget.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/configuration/SubscribeTarget.kt @@ -15,4 +15,5 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration +@Deprecated(message = "For deprecated MessageSubscriber.init() methods, please use constructor with param queue: String") data class SubscribeTarget(val queue: String, val routingKey: String, val exchange: String) diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/MetricsHolder.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/MetricsHolder.kt deleted file mode 100644 index 51f176d7e..000000000 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/MetricsHolder.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.custom - -import com.exactpro.th2.common.metrics.DEFAULT_BUCKETS -import com.google.common.base.CaseFormat -import io.prometheus.client.Counter -import io.prometheus.client.Histogram - -class MetricsHolder( - customTag: String, - labels: Array -) { - val incomingDeliveryCounter: Counter - val incomingDataCounter: Counter - val processingTimer: Histogram - val outgoingDeliveryCounter: Counter - val outgoingDataCounter: Counter - - init { - val tag = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, customTag.decapitalize()) - - incomingDeliveryCounter = Counter.build() - .name("th2_mq_incoming_${tag}_delivery_quantity") - .labelNames(*labels) - .help("The received Rabbit MQ delivery quantity") - .register() - incomingDataCounter = Counter.build() - .name("th2_mq_incoming_${tag}_data_quantity") - .labelNames(*labels) - .help("The received data quantity") - .register() - outgoingDeliveryCounter = Counter.build() - .name("th2_mq_outgoing_${tag}_delivery_quantity") - .help("The number of sent Rabbit MQ messages") - .register() - outgoingDataCounter = Counter.build() - .name("th2_mq_outgoing_${tag}_data_quantity") - .help("The number of sent messages") - .register() - processingTimer = Histogram.build() - .buckets(*DEFAULT_BUCKETS) - .name("th2_mq_${tag}_processing_time") - .help("Time spent to process a single delivery") - .register() - } -} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomQueue.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomQueue.kt deleted file mode 100644 index 1350de3ec..000000000 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomQueue.kt +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.custom - -import com.exactpro.th2.common.schema.message.FilterFunction -import com.exactpro.th2.common.schema.message.MessageSender -import com.exactpro.th2.common.schema.message.MessageSubscriber -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitQueue -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager -import io.prometheus.client.Counter -import io.prometheus.client.Histogram - -class RabbitCustomQueue( - private val converter: MessageConverter, - private val metricsHolder: MetricsHolder -) : AbstractRabbitQueue() { - - override fun createSender( - connectionManager: ConnectionManager, - queueConfiguration: QueueConfiguration - ): MessageSender { - return Sender( - converter, - metricsHolder.outgoingDeliveryCounter, - metricsHolder.outgoingDataCounter - ).apply { - init(connectionManager, queueConfiguration.exchange, queueConfiguration.routingKey) - } - } - - override fun createSubscriber( - connectionManager: ConnectionManager, - queueConfiguration: QueueConfiguration, - filterFunction: FilterFunction - ): MessageSubscriber { - return Subscriber( - converter, - metricsHolder.incomingDeliveryCounter, - metricsHolder.processingTimer, - metricsHolder.incomingDataCounter - ).apply { - init( - connectionManager, - SubscribeTarget(queueConfiguration.queue, queueConfiguration.routingKey, queueConfiguration.exchange), - filterFunction - ) - } - } - - private class Sender( - private val converter: MessageConverter, - private val deliveryCounter: Counter, - private val dataCounter: Counter - ) : AbstractRabbitSender() { - override fun valueToBytes(value: T): ByteArray = converter.toByteArray(value) - - override fun toShortTraceString(value: T): String = converter.toTraceString(value) - override fun toShortDebugString(value: T): String = converter.toDebugString(value) - - //region Prometheus stats - override fun getDeliveryCounter(): Counter = deliveryCounter - - override fun getContentCounter(): Counter = dataCounter - - override fun extractCountFrom(batch: T): Int = converter.extractCount(batch) - //endregion - } - - private class Subscriber( - private val converter: MessageConverter, - private val deliveryCounter: Counter, - private val timer: Histogram, - private val dataCounter: Counter - ) : AbstractRabbitSubscriber() { - override fun valueFromBytes(body: ByteArray): List = listOf(converter.fromByteArray(body)) - - override fun toShortTraceString(value: T): String = converter.toTraceString(value) - override fun toShortDebugString(value: T): String = converter.toDebugString(value) - - // FIXME: the filtering is not working for custom objects - override fun filter(value: T): T = value - - //region Prometheus stats - override fun getDeliveryCounter(): Counter = deliveryCounter - - override fun getContentCounter(): Counter = dataCounter - - override fun getProcessingTimer(): Histogram = timer - - override fun extractCountFrom(batch: T): Int = converter.extractCount(batch) - - override fun extractLabels(batch: T): Array = converter.getLabels(batch) - - //endregion - } -} diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt index 552b276c9..5c6296239 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/custom/RabbitCustomRouter.kt @@ -16,40 +16,104 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.custom import com.exactpro.th2.common.schema.message.FilterFunction -import com.exactpro.th2.common.schema.message.MessageQueue +import com.exactpro.th2.common.schema.message.MessageSender +import com.exactpro.th2.common.schema.message.MessageSubscriber import com.exactpro.th2.common.schema.message.QueueAttribute -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration -import com.exactpro.th2.common.schema.message.configuration.RouterFilter -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitMessageRouter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitRouter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber +import com.exactpro.th2.common.schema.message.impl.rabbitmq.PinConfiguration +import com.exactpro.th2.common.schema.message.impl.rabbitmq.PinName import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager -import com.google.protobuf.Message +/** + * NOTE: labels are used for back compatibility and may be deleted later + */ class RabbitCustomRouter( - customTag: String, + private val customTag: String, labels: Array, private val converter: MessageConverter, defaultSendAttributes: Set = emptySet(), defaultSubscribeAttributes: Set = emptySet() -) : AbstractRabbitMessageRouter() { - private val requiredSubscribeAttributes: Set = hashSetOf(QueueAttribute.SUBSCRIBE.toString()) + defaultSubscribeAttributes - private val requiredSendAttributes: Set = hashSetOf(QueueAttribute.PUBLISH.toString()) + defaultSendAttributes - private val metricsHolder = MetricsHolder(customTag, labels) - - override fun createQueue(connectionManager: ConnectionManager, queueConfiguration: QueueConfiguration, filterFunction: FilterFunction): MessageQueue { - return RabbitCustomQueue(converter, metricsHolder).apply { - init(connectionManager, queueConfiguration, filterFunction) +) : AbstractRabbitRouter() { + private val requiredSendAttributes: MutableSet = + mutableSetOf(QueueAttribute.PUBLISH.toString()).apply { + addAll(defaultSendAttributes) + } + + private val requiredSubscribeAttributes: MutableSet = + mutableSetOf(QueueAttribute.SUBSCRIBE.toString()).apply { + addAll(defaultSubscribeAttributes) } + + override fun getRequiredSendAttributes(): Set { + return requiredSendAttributes + } + + override fun getRequiredSubscribeAttributes(): Set { + return requiredSubscribeAttributes + } + + override fun splitAndFilter(message: T, pinConfiguration: PinConfiguration, pinName: PinName): T { + return message + } + + override fun createSender(pinConfig: PinConfiguration, pinName: PinName): MessageSender { + return Sender( + connectionManager, + pinConfig.exchange, + pinConfig.routingKey, + pinName, + customTag, + converter + ) } - // FIXME: the filtering is not working for custom objects - override fun filterMessage(msg: Message?, filters: MutableList?): Boolean = true + override fun createSubscriber(pinConfig: PinConfiguration, pinName: PinName): MessageSubscriber { + return Subscriber( + connectionManager, + pinConfig.queue, + FilterFunction.DEFAULT_FILTER_FUNCTION, + pinName, + customTag, + converter + ) + } - // FIXME: the filtering is not working for custom objects - override fun findQueueByFilter(queues: Map, msg: T): Map { - return queues.keys.associateWithTo(HashMap()) { msg } + override fun T.toErrorString(): String { + return this.toString() } - override fun requiredSubscribeAttributes(): Set = requiredSubscribeAttributes + private class Sender( + connectionManager: ConnectionManager, + exchangeName: String, + routingKey: String, + th2Pin: String, + customTag: String, + private val converter: MessageConverter + ) : AbstractRabbitSender(connectionManager, exchangeName, routingKey, th2Pin, customTag) { + override fun valueToBytes(value: T): ByteArray = converter.toByteArray(value) + + override fun toShortTraceString(value: T): String = converter.toTraceString(value) + + override fun toShortDebugString(value: T): String = converter.toDebugString(value) + } - override fun requiredSendAttributes(): Set = requiredSendAttributes + private class Subscriber( + connectionManager: ConnectionManager, + queue: String, + filterFunction: FilterFunction, + th2Pin: String, + customTag: String, + private val converter: MessageConverter + ) : AbstractRabbitSubscriber(connectionManager, queue, filterFunction, th2Pin, customTag) { + override fun valueFromBytes(body: ByteArray): T = converter.fromByteArray(body) + + override fun toShortTraceString(value: T): String = converter.toTraceString(value) + + override fun toShortDebugString(value: T): String = converter.toDebugString(value) + + // FIXME: the filtering is not working for custom objects + override fun filter(value: T): T = value + } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchQueue.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchQueue.kt deleted file mode 100644 index 66d7bab39..000000000 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchQueue.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.exactpro.th2.common.schema.message.impl.rabbitmq.group - -import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.schema.message.FilterFunction -import com.exactpro.th2.common.schema.message.MessageSender -import com.exactpro.th2.common.schema.message.MessageSubscriber -import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitQueue -import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager - -class RabbitMessageGroupBatchQueue : AbstractRabbitQueue() { - override fun createSender(connectionManager: ConnectionManager, queueConfiguration: QueueConfiguration): MessageSender { - return RabbitMessageGroupBatchSender().apply { - init(connectionManager, queueConfiguration.exchange, queueConfiguration.routingKey) - } - } - - override fun createSubscriber(connectionManager: ConnectionManager, queueConfiguration: QueueConfiguration, filterFunction: FilterFunction): MessageSubscriber { - return RabbitMessageGroupBatchSubscriber(queueConfiguration.filters, connectionManager.configuration.messageRecursionLimit).apply { - init(connectionManager, SubscribeTarget(queueConfiguration.queue, queueConfiguration.routingKey, queueConfiguration.exchange), filterFunction) - } - } -} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt index 821413d12..10c88969d 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchRouter.kt @@ -15,89 +15,96 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.group -import com.exactpro.th2.common.grpc.MessageGroup import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.grpc.MessageGroupBatch.Builder -import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.metrics.DIRECTION_LABEL +import com.exactpro.th2.common.metrics.SESSION_ALIAS_LABEL +import com.exactpro.th2.common.metrics.TH2_PIN_LABEL +import com.exactpro.th2.common.metrics.MESSAGE_TYPE_LABEL +import com.exactpro.th2.common.metrics.incrementDroppedMetrics import com.exactpro.th2.common.schema.filter.strategy.impl.AbstractFilterStrategy import com.exactpro.th2.common.schema.filter.strategy.impl.AnyMessageFilterStrategy import com.exactpro.th2.common.schema.message.FilterFunction -import com.exactpro.th2.common.schema.message.MessageQueue -import com.exactpro.th2.common.schema.message.QueueAttribute.PUBLISH -import com.exactpro.th2.common.schema.message.QueueAttribute.SUBSCRIBE +import com.exactpro.th2.common.schema.message.MessageSender +import com.exactpro.th2.common.schema.message.MessageSubscriber import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration -import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager -import com.exactpro.th2.common.schema.message.impl.rabbitmq.router.AbstractRabbitBatchMessageRouter +import com.exactpro.th2.common.schema.message.configuration.RouterFilter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitRouter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.PinName import com.google.protobuf.Message +import com.google.protobuf.TextFormat +import io.prometheus.client.Counter +import org.jetbrains.annotations.NotNull -class RabbitMessageGroupBatchRouter : AbstractRabbitBatchMessageRouter() { - +class RabbitMessageGroupBatchRouter : AbstractRabbitRouter() { override fun getDefaultFilterStrategy(): AbstractFilterStrategy { - return AnyMessageFilterStrategy(); + return AnyMessageFilterStrategy() } - override fun createQueue(connectionManager: ConnectionManager, queueConfiguration: QueueConfiguration, filterFunction: FilterFunction): MessageQueue { - return RabbitMessageGroupBatchQueue().apply { - init(connectionManager, queueConfiguration, filterFunction) + override fun splitAndFilter( + message: MessageGroupBatch, + pinConfiguration: @NotNull QueueConfiguration, + pinName: PinName + ): @NotNull MessageGroupBatch { + if (pinConfiguration.filters.isEmpty()) { + return message } - } - - override fun findQueueByFilter(queues: MutableMap, batch: MessageGroupBatch): MutableMap { - val builders = hashMapOf() - getMessages(batch).forEach { group -> - val originalMessages = group.messagesList - val parsedRawPartition = originalMessages.partition { it.hasMessage() } - - if (parsedRawPartition.first.size == originalMessages.size || parsedRawPartition.second.size == originalMessages.size) { - val forFilter = if (parsedRawPartition.first.isEmpty()) parsedRawPartition.second else parsedRawPartition.first - val groups = hashMapOf() - forFilter.forEach { - filter(queues, it).forEach { alias -> - groups.getOrPut(alias) {MessageGroup.newBuilder()}.addMessages(it) - } - } - - groups.forEach { (alias, newGroup) -> - builders.getOrPut(alias, ::createBatchBuilder).addGroups(newGroup) - } + val builder = MessageGroupBatch.newBuilder() + message.groupsList.forEach { group -> + if (group.messagesList.all { filterMessage(it, pinConfiguration.filters) }) { + builder.addGroups(group) } else { - val skippedAliases = hashSetOf() - originalMessages.groupBy { filter(queues, it) }.forEach { (aliases, messages) -> - if (aliases.isNotEmpty() && messages.size == group.messagesCount) { - aliases.forEach { alias -> - builders.getOrPut(alias, ::createBatchBuilder).addGroups(group) - } - } else { - skippedAliases.addAll(aliases) - } - } - - if (skippedAliases.isNotEmpty()) { - monitor.onWarn("Group was skipped for aliases '{}' '{}'", skippedAliases, group.toJson()) - } + incrementDroppedMetrics( + group.messagesList, + pinName, + MESSAGE_DROPPED_PUBLISH_TOTAL, + MESSAGE_GROUP_DROPPED_PUBLISH_TOTAL + ) } } - - return builders.mapValuesTo(hashMapOf()) { it.value.build() } + return builder.build() } - override fun requiredSubscribeAttributes(): Set = REQUIRED_SUBSCRIBE_ATTRIBUTES - - override fun requiredSendAttributes(): Set = REQUIRED_SEND_ATTRIBUTES - - override fun getMessages(batch: MessageGroupBatch): MutableList = batch.groupsList - - override fun createBatchBuilder(): Builder = MessageGroupBatch.newBuilder() + override fun createSender(pinConfig: QueueConfiguration, pinName: PinName): MessageSender { + return RabbitMessageGroupBatchSender( + connectionManager, + pinConfig.exchange, + pinConfig.routingKey, + pinName + ) + } - override fun addMessage(builder: Builder, group: MessageGroup) { - builder.addGroups(group) + override fun createSubscriber( + pinConfig: QueueConfiguration, + pinName: PinName + ): MessageSubscriber { + return RabbitMessageGroupBatchSubscriber( + connectionManager, + pinConfig.queue, + FilterFunction { msg: Message, filters: List -> filterMessage(msg, filters) }, + pinName, + pinConfig.filters, + connectionManager.configuration.messageRecursionLimit + ) } - override fun build(builder: Builder): MessageGroupBatch = builder.build() + override fun MessageGroupBatch.toErrorString(): String { + return TextFormat.shortDebugString(this) + } companion object { - private val REQUIRED_SUBSCRIBE_ATTRIBUTES = setOf(SUBSCRIBE.toString()) - private val REQUIRED_SEND_ATTRIBUTES = setOf(PUBLISH.toString()) + const val MESSAGE_GROUP_TYPE = "MESSAGE_GROUP" + + private val MESSAGE_DROPPED_PUBLISH_TOTAL: Counter = Counter.build() + .name("th2_message_dropped_publish_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL, MESSAGE_TYPE_LABEL) + .help("Quantity of published raw or parsed messages dropped after filters") + .register() + + private val MESSAGE_GROUP_DROPPED_PUBLISH_TOTAL: Counter = Counter.build() + .name("th2_message_group_dropped_publish_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL) + .help("Quantity of published message groups dropped after filters") + .register() } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt index 7cc809c44..99d841b6f 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSender.kt @@ -19,15 +19,38 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.group import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.message.getSessionAliasAndDirection import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.metrics.DIRECTION_LABEL +import com.exactpro.th2.common.metrics.SESSION_ALIAS_LABEL +import com.exactpro.th2.common.metrics.TH2_PIN_LABEL +import com.exactpro.th2.common.metrics.MESSAGE_TYPE_LABEL +import com.exactpro.th2.common.metrics.incrementTotalMetrics import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.group.RabbitMessageGroupBatchRouter.Companion.MESSAGE_GROUP_TYPE import io.prometheus.client.Counter +import io.prometheus.client.Gauge + +class RabbitMessageGroupBatchSender( + connectionManager: ConnectionManager, + exchangeName: String, + routingKey: String, + th2Pin: String +) : AbstractRabbitSender(connectionManager, exchangeName, routingKey, th2Pin, MESSAGE_GROUP_TYPE) { + override fun send(value: MessageGroupBatch) { + incrementTotalMetrics( + value, + th2Pin, + MESSAGE_PUBLISH_TOTAL, + MESSAGE_GROUP_PUBLISH_TOTAL, + MESSAGE_GROUP_SEQUENCE_PUBLISH + ) + super.send(value) + } -class RabbitMessageGroupBatchSender : AbstractRabbitSender() { - override fun getDeliveryCounter(): Counter = OUTGOING_MSG_GROUP_BATCH_QUANTITY - override fun getContentCounter(): Counter = OUTGOING_MSG_GROUP_QUANTITY - override fun extractCountFrom(batch: MessageGroupBatch): Int = batch.groupsCount override fun valueToBytes(value: MessageGroupBatch): ByteArray = value.toByteArray() + override fun toShortTraceString(value: MessageGroupBatch): String = value.toJson() + override fun toShortDebugString(value: MessageGroupBatch): String = "MessageGroupBatch: " + run { val sessionAliasAndDirection = getSessionAliasAndDirection(value.groupsList[0].messagesList[0]) @@ -40,9 +63,24 @@ class RabbitMessageGroupBatchSender : AbstractRabbitSender() else -> "" } } - + companion object { - private val OUTGOING_MSG_GROUP_BATCH_QUANTITY = Counter.build("th2_mq_outgoing_msg_group_batch_quantity", "Quantity of outgoing message group batches").register() - private val OUTGOING_MSG_GROUP_QUANTITY = Counter.build("th2_mq_outgoing_msg_group_quantity", "Quantity of outgoing message groups").register() + private val MESSAGE_PUBLISH_TOTAL = Counter.build() + .name("th2_message_publish_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL, MESSAGE_TYPE_LABEL) + .help("Quantity of published raw or parsed messages") + .register() + + private val MESSAGE_GROUP_PUBLISH_TOTAL = Counter.build() + .name("th2_message_group_publish_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL) + .help("Quantity of published message groups") + .register() + + private val MESSAGE_GROUP_SEQUENCE_PUBLISH = Gauge.build() + .name("th2_message_group_sequence_publish") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL) + .help("Last published sequence") + .register() } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt index 8e82fdfa7..6df55b05c 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/group/RabbitMessageGroupBatchSubscriber.kt @@ -15,40 +15,40 @@ package com.exactpro.th2.common.schema.message.impl.rabbitmq.group -import com.exactpro.th2.common.grpc.MessageGroup import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.message.getSessionAliasAndDirection import com.exactpro.th2.common.message.toJson -import com.exactpro.th2.common.metrics.DEFAULT_BUCKETS -import com.exactpro.th2.common.metrics.DEFAULT_DIRECTION_LABEL_NAME -import com.exactpro.th2.common.metrics.DEFAULT_SESSION_ALIAS_LABEL_NAME +import com.exactpro.th2.common.metrics.DIRECTION_LABEL +import com.exactpro.th2.common.metrics.SESSION_ALIAS_LABEL +import com.exactpro.th2.common.metrics.TH2_PIN_LABEL +import com.exactpro.th2.common.metrics.MESSAGE_TYPE_LABEL +import com.exactpro.th2.common.metrics.incrementTotalMetrics +import com.exactpro.th2.common.metrics.incrementDroppedMetrics +import com.exactpro.th2.common.schema.message.FilterFunction import com.exactpro.th2.common.schema.message.configuration.RouterFilter -import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitBatchSubscriber +import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber import com.google.protobuf.CodedInputStream +import com.rabbitmq.client.Delivery +import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager +import com.exactpro.th2.common.schema.message.impl.rabbitmq.group.RabbitMessageGroupBatchRouter.Companion.MESSAGE_GROUP_TYPE import io.prometheus.client.Counter -import io.prometheus.client.Histogram +import io.prometheus.client.Gauge import mu.KotlinLogging class RabbitMessageGroupBatchSubscriber( + connectionManager: ConnectionManager, + queue: String, + filterFunction: FilterFunction, + th2Pin: String, private val filters: List, - messageRecursionLimit: Int -) : AbstractRabbitBatchSubscriber(filters, messageRecursionLimit) { + private val messageRecursionLimit: Int +) : AbstractRabbitSubscriber(connectionManager, queue, filterFunction, th2Pin, MESSAGE_GROUP_TYPE) { private val logger = KotlinLogging.logger {} - override fun getDeliveryCounter(): Counter = INCOMING_MSG_GROUP_BATCH_QUANTITY - override fun getContentCounter(): Counter = INCOMING_MSG_GROUP_QUANTITY - override fun getProcessingTimer(): Histogram = MSG_GROUP_PROCESSING_TIME + override fun valueFromBytes(body: ByteArray): MessageGroupBatch = parseEncodedBatch(body) - override fun extractLabels(batch: MessageGroupBatch): Array { - val message = getMessages(batch)[0].messagesList[0] - return getSessionAliasAndDirection(message) - } - - override fun extractCountFrom(batch: MessageGroupBatch): Int = batch.groupsCount - override fun valueFromBytes(body: ByteArray): List = listOf(parseEncodedBatch(body)) - override fun getMessages(batch: MessageGroupBatch): MutableList = batch.groupsList - override fun createBatch(messages: List): MessageGroupBatch = MessageGroupBatch.newBuilder().addAllGroups(messages).build() override fun toShortTraceString(value: MessageGroupBatch): String = value.toJson() + override fun toShortDebugString(value: MessageGroupBatch): String = "MessageGroupBatch: " + run { val sessionAliasAndDirection = getSessionAliasAndDirection(value.groupsList[0].messagesList[0]) @@ -62,39 +62,80 @@ class RabbitMessageGroupBatchSubscriber( } } - override fun extractMetadata(messageGroup: MessageGroup): Metadata = throw UnsupportedOperationException() - override fun filter(batch: MessageGroupBatch): MessageGroupBatch? { if (filters.isEmpty()) { return batch } - val groups = getMessages(batch).asSequence() + val groups = batch.groupsList.asSequence() .filter { group -> group.messagesList.all { message -> callFilterFunction(message, filters) }.also { allMessagesMatch -> if (!allMessagesMatch) { logger.debug { "Skipped message group because none or some of its messages didn't match any filters: ${group.toJson()}" } + incrementDroppedMetrics( + group.messagesList, + th2Pin, + MESSAGE_DROPPED_SUBSCRIBE_TOTAL, + MESSAGE_GROUP_DROPPED_SUBSCRIBE_TOTAL + ) } } } .toList() - return if (groups.isEmpty()) null else createBatch(groups) + return if (groups.isEmpty()) null else MessageGroupBatch.newBuilder().addAllGroups(groups).build() + } + + override fun handle(consumeTag: String, delivery: Delivery, value: MessageGroupBatch) { + incrementTotalMetrics( + value, + th2Pin, + MESSAGE_SUBSCRIBE_TOTAL, + MESSAGE_GROUP_SUBSCRIBE_TOTAL, + MESSAGE_GROUP_SEQUENCE_SUBSCRIBE + ) + super.handle(consumeTag, delivery, value) + } + + private fun parseEncodedBatch(body: ByteArray?): MessageGroupBatch { + val ins = CodedInputStream.newInstance(body) + ins.setRecursionLimit(messageRecursionLimit) + return MessageGroupBatch.parseFrom(ins) } companion object { - private val INCOMING_MSG_GROUP_BATCH_QUANTITY = Counter.build() - .name("th2_mq_incoming_msg_group_batch_quantity") - .labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME) - .help("Quantity of incoming message group batches") + private val MESSAGE_SUBSCRIBE_TOTAL = Counter.build() + .name("th2_message_subscribe_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL, MESSAGE_TYPE_LABEL) + .help("Quantity of received raw or parsed messages, includes dropped after filters. " + + "For information about the number of dropped messages, please refer to 'th2_message_dropped_subscribe_total'") .register() - private val INCOMING_MSG_GROUP_QUANTITY = Counter.build() - .name("th2_mq_incoming_msg_group_quantity") - .labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME) - .help("Quantity of incoming message groups") + + private val MESSAGE_GROUP_SUBSCRIBE_TOTAL = Counter.build() + .name("th2_message_group_subscribe_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL) + .help("Quantity of received message groups, includes dropped after filters. " + + "For information about the number of dropped messages, please refer to 'th2_message_group_dropped_subscribe_total'") + .register() + + private val MESSAGE_DROPPED_SUBSCRIBE_TOTAL = Counter.build() + .name("th2_message_dropped_subscribe_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL, MESSAGE_TYPE_LABEL) + .help("Quantity of received raw or parsed messages dropped after filters") + .register() + + private val MESSAGE_GROUP_DROPPED_SUBSCRIBE_TOTAL = Counter.build() + .name("th2_message_group_dropped_subscribe_total") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL) + .help("Quantity of received message groups dropped after filters") + .register() + + private val MESSAGE_GROUP_SEQUENCE_SUBSCRIBE = Gauge.build() + .name("th2_message_group_sequence_subscribe") + .labelNames(TH2_PIN_LABEL, SESSION_ALIAS_LABEL, DIRECTION_LABEL) + .help("Last received sequence") .register() - private val MSG_GROUP_PROCESSING_TIME = Histogram.build("th2_mq_msg_group_processing_time", "Time of processing message groups").buckets(*DEFAULT_BUCKETS).register() } } \ No newline at end of file diff --git a/src/main/resources/META-INF/services/com.exactpro.th2.common.schema.message.MessageQueue b/src/main/resources/META-INF/services/com.exactpro.th2.common.schema.message.MessageQueue deleted file mode 100644 index 82e2db067..000000000 --- a/src/main/resources/META-INF/services/com.exactpro.th2.common.schema.message.MessageQueue +++ /dev/null @@ -1,2 +0,0 @@ -com.exactpro.th2.common.schema.message.impl.rabbitmq.parsed.RabbitParsedBatchQueue -com.exactpro.th2.common.schema.message.impl.rabbitmq.raw.RabbitRawBatchQueue \ No newline at end of file diff --git a/src/test/java/com/exactpro/th2/common/event/bean/TestVerification.java b/src/test/java/com/exactpro/th2/common/event/bean/TestVerification.java index f4294d572..fff144af9 100644 --- a/src/test/java/com/exactpro/th2/common/event/bean/TestVerification.java +++ b/src/test/java/com/exactpro/th2/common/event/bean/TestVerification.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,6 +73,7 @@ public void testSerializationRecursiveVerification() throws IOException { verificationEntry.setKey(false); verificationEntry.setActual("1"); verificationEntry.setExpected("1"); + verificationEntry.setHint("Hint for collection"); VerificationEntry verificationEntryInner = new VerificationEntry(); verificationEntryInner.setType("field"); @@ -81,6 +82,7 @@ public void testSerializationRecursiveVerification() throws IOException { verificationEntryInner.setKey(false); verificationEntryInner.setActual("9"); verificationEntryInner.setExpected("9"); + verificationEntryInner.setHint("Hint for inner entry"); verificationEntry.setFields(new HashMap<>() {{ put("Field C", verificationEntryInner); @@ -103,6 +105,7 @@ public void testSerializationRecursiveVerification() throws IOException { " \"key\": false,\n" + " \"actual\": \"1\",\n" + " \"expected\": \"1\",\n" + + " \"hint\": \"Hint for collection\",\n" + " \"fields\": {\n" + " \"Field C\": {\n" + " \"type\": \"field\",\n" + @@ -110,7 +113,8 @@ public void testSerializationRecursiveVerification() throws IOException { " \"status\": \"FAILED\",\n" + " \"key\": false,\n" + " \"actual\": \"9\",\n" + - " \"expected\": \"9\"\n" + + " \"expected\": \"9\",\n" + + " \"hint\": \"Hint for inner entry\"\n" + " }\n" + " }\n" + " }\n" + diff --git a/src/test/java/com/exactpro/th2/common/schema/message/MessageQueueTest.java b/src/test/java/com/exactpro/th2/common/schema/message/MessageQueueTest.java deleted file mode 100644 index 35d85f29c..000000000 --- a/src/test/java/com/exactpro/th2/common/schema/message/MessageQueueTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.common.schema.message; - -import java.util.ServiceLoader; -import java.util.ServiceLoader.Provider; -import java.util.Set; -import java.util.stream.Collectors; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import com.exactpro.th2.common.schema.message.impl.rabbitmq.parsed.RabbitParsedBatchQueue; -import com.exactpro.th2.common.schema.message.impl.rabbitmq.raw.RabbitRawBatchQueue; - -//FIXME: it maybe redundant -public class MessageQueueTest { - - @Test - public void testClassLoading() { - Set> expected = Set.of(RabbitParsedBatchQueue.class, RabbitRawBatchQueue.class); - Set> actual = ServiceLoader.load(MessageQueue.class).stream() - .map(Provider::type) - .collect(Collectors.toUnmodifiableSet()); - - Assertions.assertEquals(expected, actual); - } -} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/TestAnyMessageFilterStrategy.kt b/src/test/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/TestAnyMessageFilterStrategy.kt new file mode 100644 index 000000000..be2cb2a9a --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/common/schema/filter/strategy/impl/TestAnyMessageFilterStrategy.kt @@ -0,0 +1,133 @@ +/* + * Copyright 2021 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.schema.filter.strategy.impl + +import com.exactpro.th2.common.grpc.AnyMessage +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.message.message +import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.schema.message.configuration.FieldFilterConfiguration +import com.exactpro.th2.common.schema.message.configuration.FieldFilterOperation +import com.exactpro.th2.common.schema.message.configuration.MqRouterFilterConfiguration +import org.apache.commons.collections4.MultiMapUtils +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.Arguments.arguments +import org.junit.jupiter.params.provider.MethodSource +import org.junit.jupiter.params.provider.ValueSource + +class TestAnyMessageFilterStrategy { + private val strategy = AnyMessageFilterStrategy() + + @ParameterizedTest + @MethodSource("parsedMessages") + fun `matches the parsed message by message type with single filter`(anyMessage: AnyMessage, expectMatch: Boolean) { + val match = strategy.verify( + anyMessage, + MqRouterFilterConfiguration( + metadata = MultiMapUtils.newListValuedHashMap().apply { + put("message_type", FieldFilterConfiguration( + fieldName = "message_type", + operation = FieldFilterOperation.EQUAL, + expectedValue = "test" + )) + } + )) + + assertEquals(expectMatch, match) { "The message ${anyMessage.toJson()} was${if (expectMatch) "" else " not"} matched" } + } + + @ParameterizedTest + @MethodSource("messages") + fun `matches the parsed message by direction with single filter`(message: AnyMessage, expectMatch: Boolean) { + val match = strategy.verify( + message, + MqRouterFilterConfiguration( + metadata = MultiMapUtils.newListValuedHashMap().apply { + put("direction", FieldFilterConfiguration( + fieldName = "direction", + operation = FieldFilterOperation.EQUAL, + expectedValue = "FIRST" + )) + } + )) + + assertEquals(expectMatch, match) { "The message ${message.toJson()} was${if (expectMatch) "" else " not"} matched" } + } + + @ParameterizedTest + @MethodSource("messages") + fun `matches the parsed message by alias with single filter`(message: AnyMessage, expectMatch: Boolean) { + val match = strategy.verify( + message, + MqRouterFilterConfiguration( + metadata = MultiMapUtils.newListValuedHashMap().apply { + put("session_alias", FieldFilterConfiguration( + fieldName = "session_alias", + operation = FieldFilterOperation.EQUAL, + expectedValue = "test-alias" + )) + } + )) + + assertEquals(expectMatch, match) { "The message ${message.toJson()} was${if (expectMatch) "" else " not"} matched" } + } + + companion object { + private val PARSED_MESSAGE_MATCH = AnyMessage.newBuilder().setMessage( + message("test", Direction.FIRST, "test-alias") + ).build() + + private val RAW_MESSAGE_MATCH = AnyMessage.newBuilder().setRawMessage( + RawMessage.newBuilder().apply { + metadataBuilder.idBuilder.apply { + connectionIdBuilder.sessionAlias = "test-alias" + direction = Direction.FIRST + } + } + ).build() + + private val PARSED_MESSAGE_MISS_MATCH = AnyMessage.newBuilder().setMessage( + message("test1", Direction.SECOND, "test-alias1") + ).build() + + private val RAW_MESSAGE_MISS_MATCH = AnyMessage.newBuilder().setRawMessage( + RawMessage.newBuilder().apply { + metadataBuilder.idBuilder.apply { + connectionIdBuilder.sessionAlias = "test-alias1" + direction = Direction.SECOND + } + } + ).build() + + @JvmStatic + fun parsedMessages(): List = listOf( + arguments(PARSED_MESSAGE_MATCH, true), + arguments(PARSED_MESSAGE_MISS_MATCH, false) + ) + + @JvmStatic + fun messages(): List = listOf( + arguments(RAW_MESSAGE_MATCH, true), + arguments(RAW_MESSAGE_MISS_MATCH, false) + ) + parsedMessages() + } +} \ No newline at end of file