Skip to content

Commit

Permalink
Merge branch 'master' into th2-2484
Browse files Browse the repository at this point in the history
  • Loading branch information
viktor-235 authored Nov 16, 2021
2 parents c9f8792 + 54c3e93 commit 146f6b4
Show file tree
Hide file tree
Showing 47 changed files with 1,290 additions and 2,012 deletions.
66 changes: 54 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 common library (Java) (3.27.0)
# th2 common library (Java) (3.30.0)

## Usage

Expand Down Expand Up @@ -53,7 +53,7 @@ Then you will create an instance of imported class, by choosing one of the follo
var factory = CommonFactory.createFromKubernetes(namespace, boxName);
```
It also can be called by using `createFromArguments(args)` with arguments `--namespace` and `--boxName`.
1. Create factory with a namespace in Kubernetes, the name of the target th2 box from Kubernetes and the name of context to choose the context from Kube config:
1. Create factory with a namespace in Kubernetes, the name of the target th2 box from Kubernetes and the name of context to choose from Kube config:
```
var factory = CommonFactory.createFromKubernetes(namespace, boxName, contextName);
```
Expand Down Expand Up @@ -120,12 +120,12 @@ The `CommonFactory` reads a message's router configuration from the `mq.json` fi

Filters format:
* fieldName - a field's name
* expectedValue - expected field's value (used not for all operations)
* expectedValue - expected field's value (not used for all operations)
* operation - operation's type
* `EQUAL` - filter is pass if the field equals exact value
* `NOT_EQUAL` - filter is pass if the field doesn't equal exact value
* `EMPTY` - filter is pass if the field is empty
* `NOT_EMPTY` - filter is pass if the field isn't empty
* `EQUAL` - the filter passes if the field is equal to the exact value
* `NOT_EQUAL` - the filter passes if the field does not equal the exact value
* `EMPTY` - the filter passes if the field is empty
* `NOT_EMPTY` - the filter passes if the field is not empty
* `WILDCARD` - filters the field by wildcard expression

```json
Expand Down Expand Up @@ -195,7 +195,7 @@ The `CommonFactory` reads a Cradle configuration from the cradle.json file.

1. Also note that `generated_configs` directory will be created to store `.json` files with configs from Kubernetes. Those files are overridden when `CommonFactory.createFromKubernetes(namespace, boxName)` and `CommonFactory.createFromKubernetes(namespace, boxName, contextName)` are invoked again.

1. User needs to have authentication with service account token that has necessary access to read CRs and secrets from the specified namespace.
1. User needs to have authentication with service account token that has the necessary access to read CRs and secrets from the specified namespace.

After that you can receive various Routers through factory properties:
```
Expand All @@ -214,7 +214,7 @@ With the router created, you can subscribe to pins (by specifying the callback f
```
router.subscribe(callback) # subscribe to only one pin
router.subscribeAll(callback) # subscribe to one or several pins
router.send(message) # send to only one pim
router.send(message) # send to only one pin
router.sendAll(message) # send to one or several pins
```
You can perform these actions by providing pin attributes in addition to the default ones.
Expand Down Expand Up @@ -248,12 +248,54 @@ NOTES:

* in order for the metrics to be exported, you also will need to create an instance of CommonFactory
* common JVM metrics will also be exported alongside common service metrics
* some metric labels are enumerations (`th2_type`: `MESSAGE_GROUP`, `EVENT`, `<customTag>`;`message_type`: `RAW_MESSAGE`, `MESSAGE`)

ABSTRACT METRICS:
* th2_rabbitmq_message_size_publish_bytes (`th2_pin`, `th2_type`, `exchange`, `routing_key`): number of published message bytes to RabbitMQ. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content
* th2_rabbitmq_message_publish_total (`th2_pin`, `th2_type`, `exchange`, `routing_key`): quantity of published messages to RabbitMQ. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content
* th2_rabbitmq_message_size_subscribe_bytes (`th2_pin`, `th2_type`, `queue`): number of bytes received from RabbitMQ, it includes bytes of messages dropped after filters. For information about the number of dropped messages, please refer to 'th2_message_dropped_subscribe_total' and 'th2_message_group_dropped_subscribe_total'. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content
* th2_rabbitmq_message_process_duration_seconds (`th2_pin`, `th2_type`, `queue`): time of message processing during subscription from RabbitMQ in seconds. The message is meant for any data transferred via RabbitMQ, for example, th2 batch message or event or custom content

MESSAGES METRICS:
* th2_message_publish_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of published raw or parsed messages
* th2_message_subscribe_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of received raw or parsed messages, includes dropped after filters. For information about the number of dropped messages, please refer to 'th2_message_dropped_subscribe_total'
* th2_message_dropped_publish_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of published raw or parsed messages dropped after filters
* th2_message_dropped_subscribe_total (`th2_pin`, `session_alias`, `direction`, `message_type`): quantity of received raw or parsed messages dropped after filters
* th2_message_group_publish_total (`th2_pin`, `session_alias`, `direction`): quantity of published message groups
* th2_message_group_subscribe_total (`th2_pin`, `session_alias`, `direction`): quantity of received message groups, includes dropped after filters. For information about the number of dropped messages, please refer to 'th2_message_group_dropped_subscribe_total'
* th2_message_group_dropped_publish_total (`th2_pin`, `session_alias`, `direction`): quantity of published message groups dropped after filters
* th2_message_group_dropped_subscribe_total (`th2_pin`, `session_alias`, `direction`): quantity of received message groups dropped after filters
* th2_message_group_sequence_publish (`th2_pin`, `session_alias`, `direction`): last published sequence
* th2_message_group_sequence_subscribe (`th2_pin`, `session_alias`, `direction`): last received sequence

EVENTS METRICS:
* th2_event_publish_total (`th2_pin`): quantity of published events
* th2_event_subscribe_total (`th2_pin`): quantity of received events

## Release notes

### 3.27.0
### 3.30.0

+ Updated `messageRecursionLimit` default value from `100` to `500`

### 3.29.1

+ Fix problem with filtering by `message_type` in MessageGroupBatch router

### 3.29.0

+ Update Cradle version from `2.13.0` to `2.20.0`

### 3.28.0

+ Added new parameter `hint` for `VerificationEntry`

### 3.27.0

+ Added new abstract router `AbstractRabbitRouter`, removed `MessageQueue` hierarchy
+ Parsed/raw routers work with `MessageGroupBatch` router
+ Added new metrics and removed old

### 3.26.5
+ Migrated `grpc-common` version from `3.7.0` to `3.8.0`
+ Added `time_precision` and `decimal_precision` parameters to `RootComparisonSettings`
Expand Down Expand Up @@ -295,7 +337,7 @@ NOTES:
+ Fixed `messageRecursionLimit` setting for all kind of RabbitMQ subscribers

### 3.24.0
+ Added setting `messageRecursionLimit`(default 100) to RabbitMQ configuration that denotes how deep nested protobuf messages might be.
+ Added setting `messageRecursionLimit`(the default value is set to 100) to RabbitMQ configuration that denotes how deep nested protobuf messages might be.

### 3.23.0
+ Update the grpc-common version to 3.4.0
Expand Down Expand Up @@ -359,7 +401,7 @@ NOTES:

### 3.16.3

+ Change the way channels are stored (they mapped to the pin instead of the thread).
+ Change the way that channels are stored (they are mapped to the pin instead of to the thread).
It might increase the average number of channels used by the box, but it also limits the max number of channels to the number of pins

### 3.16.2
Expand Down
8 changes: 3 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ sourceCompatibility = 11
targetCompatibility = 11

ext {
cradleVersion = '2.13.0'
junitVersion = '5.4.2'
cradleVersion = '2.20.0'
junitVersion = '5.7.2'
sharedDir = file("${project.rootDir}/shared")
}

Expand Down Expand Up @@ -201,9 +201,7 @@ dependencies {

implementation 'io.fabric8:kubernetes-client:4.13.0'

testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
testImplementation "org.junit.jupiter:junit-jupiter-params:${junitVersion}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitVersion}"
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
}

jar {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
#

release_version=3.27.0
release_version=3.30.0

description = 'th2 common library (Java)'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/******************************************************************************
* Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
*/
package com.exactpro.th2.common.event.bean;

import java.util.Map;
Expand All @@ -24,6 +24,7 @@ public class VerificationEntry {
private VerificationStatus status;
private String operation;
private boolean key;
private String hint;
private Map<String, VerificationEntry> fields;

public String getType() {
Expand Down Expand Up @@ -81,4 +82,12 @@ public Map<String, VerificationEntry> getFields() {
public void setFields(Map<String, VerificationEntry> fields) {
this.fields = fields;
}

public String getHint() {
return hint;
}

public void setHint(String hint) {
this.hint = hint;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,53 +15,73 @@

package com.exactpro.th2.common.schema.event;

import java.util.Set;

import org.apache.commons.collections4.SetUtils;
import org.jetbrains.annotations.NotNull;

import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.schema.message.FilterFunction;
import com.exactpro.th2.common.schema.message.MessageQueue;
import com.exactpro.th2.common.schema.message.MessageSender;
import com.exactpro.th2.common.schema.message.MessageSubscriber;
import com.exactpro.th2.common.schema.message.QueueAttribute;
import com.exactpro.th2.common.schema.message.configuration.QueueConfiguration;
import com.exactpro.th2.common.schema.message.configuration.RouterFilter;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitMessageRouter;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
import com.google.protobuf.Message;
import org.apache.commons.collections4.SetUtils;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitRouter;
import com.google.protobuf.TextFormat;

public class EventBatchRouter extends AbstractRabbitMessageRouter<EventBatch> {
public class EventBatchRouter extends AbstractRabbitRouter<EventBatch> {
protected static final String EVENT_TYPE = "EVENT";

private static final Set<String> REQUIRED_SUBSCRIBE_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.EVENT.toString(), QueueAttribute.SUBSCRIBE.toString());
private static final Set<String> REQUIRED_SEND_ATTRIBUTES = SetUtils.unmodifiableSet(QueueAttribute.EVENT.toString(), QueueAttribute.PUBLISH.toString());

@NotNull
@Override
protected MessageQueue<EventBatch> createQueue(@NotNull ConnectionManager connectionManager, @NotNull QueueConfiguration queueConfiguration, @NotNull FilterFunction filterFunction) {
EventBatchQueue eventBatchQueue = new EventBatchQueue();
eventBatchQueue.init(connectionManager, queueConfiguration, filterFunction);
return eventBatchQueue;
protected EventBatch splitAndFilter(
EventBatch message,
@NotNull QueueConfiguration pinConfiguration,
@NotNull String pinName
) {
return message;
}

@NotNull
@Override
protected Map<String, EventBatch> findQueueByFilter(Map<String, QueueConfiguration> queues, EventBatch msg) {
return queues.entrySet().stream().collect(Collectors.toMap(Entry::getKey, v -> msg));
protected Set<String> getRequiredSendAttributes() {
return REQUIRED_SEND_ATTRIBUTES;
}

@NotNull
@Override
protected boolean filterMessage(Message msg, List<? extends RouterFilter> filters) {
return true;
protected Set<String> getRequiredSubscribeAttributes() {
return REQUIRED_SUBSCRIBE_ATTRIBUTES;
}

@NotNull
@Override
protected Set<String> requiredSubscribeAttributes() {
return REQUIRED_SUBSCRIBE_ATTRIBUTES;
protected MessageSender<EventBatch> createSender(QueueConfiguration queueConfiguration, @NotNull String pinName) {
return new EventBatchSender(
getConnectionManager(),
queueConfiguration.getExchange(),
queueConfiguration.getRoutingKey(),
pinName
);
}

@NotNull
@Override
protected Set<String> requiredSendAttributes() {
return REQUIRED_SEND_ATTRIBUTES;
protected MessageSubscriber<EventBatch> createSubscriber(QueueConfiguration queueConfiguration, @NotNull String pinName) {
return new EventBatchSubscriber(
getConnectionManager(),
queueConfiguration.getQueue(),
FilterFunction.DEFAULT_FILTER_FUNCTION,
pinName
);
}

@NotNull
@Override
protected String toErrorString(EventBatch eventBatch) {
return TextFormat.shortDebugString(eventBatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,41 @@

package com.exactpro.th2.common.schema.event;

import java.io.IOException;

import org.jetbrains.annotations.NotNull;

import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.message.MessageUtils;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSender;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;

import static com.exactpro.th2.common.metrics.CommonMetrics.TH2_PIN_LABEL;
import static com.exactpro.th2.common.schema.event.EventBatchRouter.EVENT_TYPE;
import io.prometheus.client.Counter;

public class EventBatchSender extends AbstractRabbitSender<EventBatch> {
private static final Counter EVENT_PUBLISH_TOTAL = Counter.build()
.name("th2_event_publish_total")
.labelNames(TH2_PIN_LABEL)
.help("Quantity of published events")
.register();

private static final Counter OUTGOING_EVENT_BATCH_QUANTITY = Counter.build("th2_mq_outgoing_event_batch_quantity", "Quantity of outgoing event batches").register();
private static final Counter OUTGOING_EVENT_QUANTITY = Counter.build("th2_mq_outgoing_event_quantity", "Quantity of outgoing events").register();

@Override
protected Counter getDeliveryCounter() {
return OUTGOING_EVENT_BATCH_QUANTITY;
}

@Override
protected Counter getContentCounter() {
return OUTGOING_EVENT_QUANTITY;
public EventBatchSender(
@NotNull ConnectionManager connectionManager,
@NotNull String exchangeName,
@NotNull String routingKey,
@NotNull String th2Pin
) {
super(connectionManager, exchangeName, routingKey, th2Pin, EVENT_TYPE);
}

@Override
protected int extractCountFrom(EventBatch batch) {
return batch.getEventsCount();
public void send(EventBatch value) throws IOException {
EVENT_PUBLISH_TOTAL
.labels(th2Pin)
.inc(value.getEventsCount());
super.send(value);
}

@Override
Expand Down
Loading

0 comments on commit 146f6b4

Please sign in to comment.