Skip to content

Commit

Permalink
[TH2-2212] recover channel-level exceptions (#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
lumber1000 authored Mar 5, 2024
1 parent 81304f0 commit 1a76450
Show file tree
Hide file tree
Showing 16 changed files with 1,492 additions and 248 deletions.
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 common library (Java) (5.8.0)
# th2 common library (Java) (5.9.0)

## Usage

Expand Down Expand Up @@ -93,6 +93,9 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
* maxConnectionRecoveryTimeout - this option defines a maximum interval in milliseconds between reconnect attempts, with
its default value set to 60000. Common factory increases the reconnect interval values from
minConnectionRecoveryTimeout to maxConnectionRecoveryTimeout.
* retryTimeDeviationPercent - specifies random deviation to delay interval duration. Default value is 10 percents.
E.g. if delay interval is 30 seconds and `retryTimeDeviationPercent` is 10 percents the actual duration of interval
will be random value from 27 to 33 seconds.
* prefetchCount - this option is the maximum number of messages that the server will deliver, with its value set to 0 if
unlimited, the default value is set to 10.
* messageRecursionLimit - an integer number denotes how deep the nested protobuf message might be, set by default 100
Expand All @@ -110,6 +113,7 @@ The `CommonFactory` reads a RabbitMQ configuration from the rabbitMQ.json file.
"maxRecoveryAttempts": 5,
"minConnectionRecoveryTimeout": 10000,
"maxConnectionRecoveryTimeout": 60000,
"retryTimeDeviationPercent": 10,
"prefetchCount": 10,
"messageRecursionLimit": 100
}
Expand Down Expand Up @@ -379,6 +383,7 @@ describes gRPC service structure)

This kind of router provides the ability for component to send / receive messages via RabbitMQ.
Router has several methods to subscribe and publish RabbitMQ messages steam (th2 use batches of messages or events as transport).
Supports recovery of subscriptions cancelled by RabbitMQ due to following errors: "delivery acknowledgement timed out" and "queue not found".

#### Choice pin by attributes

Expand Down Expand Up @@ -502,6 +507,13 @@ dependencies {

## Release notes

### 5.9.0-dev
+ Added retry in case of a RabbitMQ channel or connection error (when possible).
+ Added InterruptedException to basicConsume method signature.
+ Added additional logging for RabbitMQ errors.
+ Fixed connection recovery delay time.
+ Integration tests for RabbitMQ retry scenarios.

### 5.8.0-dev
+ Added `NOT_WILDCARD` filter operation, which filter a field which isn't matched by wildcard expression.

Expand Down Expand Up @@ -972,4 +984,4 @@ dependencies {
### 3.0.1

+ metrics related to time measurement of an incoming message handling (Raw / Parsed / Event) migrated to
Prometheus [histogram](https://prometheus.io/docs/concepts/metric_types/#histogram)
Prometheus [histogram](https://prometheus.io/docs/concepts/metric_types/#histogram)
61 changes: 30 additions & 31 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ buildscript {
}

dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlin_version}"
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}

Expand All @@ -21,15 +21,15 @@ plugins {
id 'java-library'
id 'java-test-fixtures'
id 'maven-publish'
id "io.github.gradle-nexus.publish-plugin" version "1.0.0"
id "io.github.gradle-nexus.publish-plugin" version "1.3.0"
id 'signing'
id 'org.jetbrains.kotlin.jvm' version "${kotlin_version}"
id 'org.jetbrains.kotlin.kapt' version "${kotlin_version}"
id "org.owasp.dependencycheck" version "8.3.1"
id "me.champeau.jmh" version "0.6.8"
id 'org.jetbrains.kotlin.jvm' version "$kotlin_version"
id 'org.jetbrains.kotlin.kapt' version "$kotlin_version"
id "org.owasp.dependencycheck" version "9.0.9"
id "me.champeau.jmh" version "0.7.2"
id "com.gorylenko.gradle-git-properties" version "2.4.1"
id 'com.github.jk1.dependency-license-report' version '2.5'
id "de.undercouch.download" version "5.4.0"
id "de.undercouch.download" version "5.6.0"
id "com.google.protobuf" version "0.9.4"
}

Expand All @@ -41,8 +41,8 @@ ext {
protobufVersion = '3.23.2' // The protoc:3.23.3 https://github.com/protocolbuffers/protobuf/issues/13070
serviceGeneratorVersion = '3.5.1'

cradleVersion = '5.1.1-dev'
junitVersion = '5.10.0'
cradleVersion = '5.1.4-dev'
junitVersion = '5.10.2'

genBaseDir = file("${buildDir}/generated/source/proto")
}
Expand Down Expand Up @@ -181,10 +181,10 @@ tasks.register('integrationTest', Test) {

dependencies {
api platform("com.exactpro.th2:bom:4.5.0")
api('com.exactpro.th2:grpc-common:4.3.0-dev') {
api('com.exactpro.th2:grpc-common:4.4.0-dev') {
because('protobuf transport is main now, this dependnecy should be moved to grpc, mq protobuf modules after splitting')
}
api("com.exactpro.th2:cradle-core:${cradleVersion}") {
api("com.exactpro.th2:cradle-core:$cradleVersion") {
because('cradle is included into common library now, this dependnecy should be moved to a cradle module after splitting')
}
api('io.netty:netty-buffer') {
Expand All @@ -195,10 +195,10 @@ dependencies {
jmh 'org.openjdk.jmh:jmh-generator-annprocess:0.9'

implementation 'com.google.protobuf:protobuf-java-util'
implementation "com.exactpro.th2:grpc-service-generator:${serviceGeneratorVersion}"
implementation "com.exactpro.th2:cradle-cassandra:${cradleVersion}"
implementation "com.exactpro.th2:grpc-service-generator:$serviceGeneratorVersion"
implementation "com.exactpro.th2:cradle-cassandra:$cradleVersion"

def autoValueVersion = '1.10.1'
def autoValueVersion = '1.10.4'
implementation "com.google.auto.value:auto-value-annotations:$autoValueVersion"
kapt("com.google.auto.value:auto-value:$autoValueVersion") {
//FIXME: Updated library because it is fat jar
Expand Down Expand Up @@ -237,7 +237,7 @@ dependencies {
implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor'

implementation 'com.fasterxml.uuid:java-uuid-generator:4.0.1'
implementation "com.fasterxml.uuid:java-uuid-generator:5.0.0"

implementation 'org.apache.logging.log4j:log4j-slf4j2-impl'
implementation 'org.apache.logging.log4j:log4j-core'
Expand All @@ -247,30 +247,29 @@ dependencies {
implementation 'io.prometheus:simpleclient_httpserver'
implementation 'io.prometheus:simpleclient_log4j2'

implementation('com.squareup.okio:okio:3.5.0') {
implementation("com.squareup.okio:okio:3.8.0") {
because('fix vulnerability in transitive dependency <com.squareup.okhttp3:okhttp>')
}
implementation('com.squareup.okhttp3:okhttp:4.11.0') {
implementation("com.squareup.okhttp3:okhttp:4.12.0") {
because('fix vulnerability in transitive dependency <kubernetes-client>')
}
implementation('io.fabric8:kubernetes-client:6.8.0') {
implementation("io.fabric8:kubernetes-client:6.10.0") {
exclude group: 'com.fasterxml.jackson.dataformat', module: 'jackson-dataformat-yaml'
}

implementation 'io.github.microutils:kotlin-logging:3.0.0' // The last version bases on kotlin 1.6.0
implementation "io.github.microutils:kotlin-logging:3.0.5"

testImplementation 'javax.annotation:javax.annotation-api:1.3.2'
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
testImplementation 'org.mockito.kotlin:mockito-kotlin:4.0.0'
testImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
testImplementation "org.mockito.kotlin:mockito-kotlin:5.2.1"
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5'
testImplementation "org.testcontainers:testcontainers:1.17.1"
testImplementation "org.testcontainers:rabbitmq:1.17.1"
testImplementation("org.junit-pioneer:junit-pioneer:2.1.0") {
testImplementation "org.testcontainers:testcontainers:1.19.6"
testImplementation "org.testcontainers:rabbitmq:1.19.6"
testImplementation("org.junit-pioneer:junit-pioneer:2.2.0") {
because("system property tests")
}

testFixturesImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit5', version: kotlin_version
testFixturesImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
testFixturesImplementation "org.jetbrains.kotlin:kotlin-test-junit5:$kotlin_version"
testFixturesImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
}

jar {
Expand All @@ -294,14 +293,14 @@ sourceSets {

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protobufVersion}"
artifact = "com.google.protobuf:protoc:$protobufVersion"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion"
}
services {
artifact = "com.exactpro.th2:grpc-service-generator:${serviceGeneratorVersion}:all@jar"
artifact = "com.exactpro.th2:grpc-service-generator:$serviceGeneratorVersion:all@jar"
}
}
generateProtoTasks {
Expand Down Expand Up @@ -355,4 +354,4 @@ licenseReport {
]
excludeOwnGroup = false
allowedLicensesFile = new URL("https://raw.githubusercontent.com/th2-net/.github/main/license-compliance/gradle-license-report/allowed-licenses.json")
}
}
5 changes: 3 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
release_version=5.8.0

release_version=5.9.0
description='th2 common library (Java)'
vcs_url=https://github.com/th2-net/th2-common-j
kapt.include.compile.classpath=false
kapt.include.compile.classpath=false
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ protected PrometheusConfiguration loadPrometheusConfiguration() {
}

protected ConnectionManager createRabbitMQConnectionManager() {
return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration(), livenessMonitor::disable);
return new ConnectionManager(getRabbitMqConfiguration(), getConnectionManagerConfiguration());
}

protected ConnectionManager getRabbitMqConnectionManager() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -15,8 +15,8 @@

package com.exactpro.th2.common.schema.grpc.router;

import com.exactpro.th2.common.grpc.router.ServerGrpcInterceptor;
import com.exactpro.th2.common.grpc.router.MethodDetails;
import com.exactpro.th2.common.grpc.router.ServerGrpcInterceptor;
import com.exactpro.th2.common.metrics.CommonMetrics;
import com.exactpro.th2.common.schema.grpc.configuration.GrpcConfiguration;
import com.exactpro.th2.common.schema.grpc.configuration.GrpcRouterConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private void subscribe() {
consumerMonitor.updateAndGet(previous -> previous == EMPTY_INITIALIZER
? Suppliers.memoize(this::basicConsume)
: previous)
.get(); // initialize subscribtion
.get(); // initialize subscription
} catch (Exception e) {
throw new IllegalStateException("Can not start listening", e);
}
Expand Down Expand Up @@ -189,6 +189,10 @@ private SubscriberMonitor basicConsume() {
return connectionManager.basicConsume(queue, this::handle, this::canceled);
} catch (IOException e) {
throw new IllegalStateException("Can not subscribe to queue = " + queue, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("Interrupted exception while consuming from queue '{}'", queue);
throw new IllegalStateException("Thread was interrupted while consuming", e);
}
}

Expand Down Expand Up @@ -232,4 +236,4 @@ private void canceled(String consumerTag) {
private static <T> Supplier<T> emptySupplier() {
return (Supplier<T>) EMPTY_INITIALIZER;
}
}
}
Loading

0 comments on commit 1a76450

Please sign in to comment.