Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add publishing confirmation to redeliver messages that were not confi… #306

Merged
merged 16 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: "Run integration tests for common"

on:
push:
branches:
- '*'

jobs:
tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 'zulu' '11'
uses: actions/setup-java@v4
with:
distribution: 'zulu'
java-version: '11'
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v3
- name: Build with Gradle
run: ./gradlew --info integrationTest
- uses: actions/upload-artifact@v4
if: failure()
with:
name: integration-test-results
path: build/reports/tests/integrationTest/
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 common library (Java) (5.12.0)
# th2 common library (Java) (5.13.0)

## Usage

Expand Down Expand Up @@ -511,6 +511,15 @@ dependencies {

## Release notes

### 5.13.0-dev

+ Added functionality for publisher confirmations to mitigate network issues for message producers.
+ New parameters are added to connection manager configuration:
+ enablePublisherConfirmation - enables publisher confirmation. `false` by default.
+ maxInflightPublicationsBytes - the max number of unconfirmed published messages per channel. `52428800` (50 MB), by default.
+ heartbeatIntervalSeconds - rabbitmq connection heartbeat interval in seconds.
`0` by default (that means the default interval will be set by the internal library used to communicate with RabbitMQ).

### 5.12.0-dev

+ Updated kubernetes-client: `6.12.1`
Expand Down
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,12 @@ tasks.named('extractIncludeProto') {enabled = false }
tasks.named('extractIncludeTestFixturesProto') {enabled = false }

compileTestJava.dependsOn.add('generateTestProto')
processTestResources.dependsOn.add('generateTestProto')
processTestResources.dependsOn.add('generateTestProto')

tasks.register("publicationManualBench", JavaExec.class) {
mainClass.set('com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManualBenchmark')
classpath(sourceSets.test.runtimeClasspath)
dependsOn('testClasses')

jvmArgs('-XX:StartFlightRecording=duration=60s,settings=profile,filename=publishing-profile-record.jfr')
}
OptimumCode marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ data class ConnectionManagerConfiguration(
val retryTimeDeviationPercent: Int = 10,
val messageRecursionLimit: Int = 100,
val workingThreads: Int = 1,
val confirmationTimeout: Duration = Duration.ofMinutes(5)
val confirmationTimeout: Duration = Duration.ofMinutes(5),
val enablePublisherConfirmation: Boolean = false,
// Default value 50MB is taken based on measurement done in ConnectionManualBenchmark class
val maxInflightPublicationsBytes: Int = 50 * 1024 * 1024,
val heartbeatIntervalSeconds: Int = DEFAULT_HB_INTERVAL_SECONDS,
) : Configuration() {
init {
check(maxRecoveryAttempts > 0) { "expected 'maxRecoveryAttempts' greater than 0 but was $maxRecoveryAttempts" }
Expand All @@ -64,6 +68,11 @@ data class ConnectionManagerConfiguration(
))
}
}

companion object {
const val NO_LIMIT_INFLIGHT_REQUESTS: Int = -1
const val DEFAULT_HB_INTERVAL_SECONDS: Int = 0
}
}

data class RetryingDelay(val tryNumber: Int, val delay: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class NotificationEventBatchSender(
throw UnsupportedOperationException("Method is deprecated, please use constructor")
}

@Throws(IOException::class)
override fun send(message: EventBatch) {
try {
connectionManager.basicPublish(exchange, EMPTY_ROUTING_KEY, null, message.toByteArray())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ import io.grpc.stub.StreamObserver
import mu.KotlinLogging
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.junit.jupiter.api.extension.AfterTestExecutionCallback
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.extension.ExtensionContext
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.timeout
Expand All @@ -50,23 +55,40 @@ import org.mockito.kotlin.verifyNoMoreInteractions
import org.testcontainers.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue

private const val CANCEL_REASON = "test request is canceled"

@ExtendWith(DefaultGrpcRouterTest.ExecutionListener::class)
@IntegrationTest
internal class DefaultGrpcRouterTest {
/**
* Listener adds additional logging to help understanding from the stdout where test starts and finishes
*/
internal class ExecutionListener : BeforeTestExecutionCallback, AfterTestExecutionCallback {
private val logger = KotlinLogging.logger { }
override fun beforeTestExecution(ctx: ExtensionContext) {
logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' started" }
}

override fun afterTestExecution(ctx: ExtensionContext) {
logger.info { "Execution for test '${ctx.testMethod.map { it.name }.orElse("unknown")}' is finished" }
}

}

@IntegrationTest
abstract inner class AbstractGrpcRouterTest {
private val grpcRouterClient = DefaultGrpcRouter()
Expand Down Expand Up @@ -415,6 +437,7 @@ internal class DefaultGrpcRouterTest {
)
}

@Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
Expand Down Expand Up @@ -751,6 +774,7 @@ internal class DefaultGrpcRouterTest {
)
}

@Disabled("this test isn't relevant for async request")
@Test
override fun `interrupt thread during retry request`() {
// this test isn't relevant for async request
Expand Down Expand Up @@ -825,7 +849,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
"Connection refused: localhost/127.0.0.1:8080",
"Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
Expand All @@ -851,7 +875,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
"Connection refused: localhost/127.0.0.1:8080",
"Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
Expand Down Expand Up @@ -880,7 +904,7 @@ internal class DefaultGrpcRouterTest {
ExceptionMetadata(
"UNAVAILABLE: io exception",
ExceptionMetadata(
"Connection refused: localhost/127.0.0.1:8080",
"Connection refused: localhost/",
ExceptionMetadata(
"Connection refused"
)
Expand Down Expand Up @@ -915,7 +939,7 @@ internal class DefaultGrpcRouterTest {
shutdownNow()
} else {
shutdown()
if (!awaitTermination(5, TimeUnit.SECONDS)) {
if (!awaitTermination(60, TimeUnit.SECONDS)) {
shutdownNow()
error("'Server' can't be closed")
}
Expand All @@ -928,9 +952,9 @@ internal class DefaultGrpcRouterTest {
exceptionMetadata: ExceptionMetadata,
path: List<String?> = emptyList()
) {
assertEquals(
exceptionMetadata.message,
exception.message,
val expectedMessage = exceptionMetadata.message
assertTrue(
exception.message == expectedMessage || exception.message?.startsWith(expectedMessage ?: "null") == true,
"Message for exception: $exception, path: ${path.printAsStackTrace()}"
)
exceptionMetadata.suspended?.let { suspendMetadataList ->
Expand Down Expand Up @@ -962,7 +986,7 @@ internal class DefaultGrpcRouterTest {

private fun ExecutorService.shutdownGracefully() {
shutdown()
if (!awaitTermination(1, TimeUnit.SECONDS)) {
if (!awaitTermination(30, TimeUnit.SECONDS)) {
shutdownNow()
error("'Executor' can't be stopped")
}
Expand All @@ -974,10 +998,25 @@ internal class DefaultGrpcRouterTest {
val suspended: List<ExceptionMetadata>? = null
)

/**
* Baton class can help to synchronize two threads (only **two**).
*
* Baton class was migrated from using queue with size 1 to lock and conditions for synchronization.
*
* The implementation with queue did not provide guarantees that the same thread won't get the permit and put it back
* while another thread was still waiting for a free space in the queue.
*
* Using lock and conditions guarantees that the permit won't be given unless somebody is waiting for that permit.
* And vise-versa, nobody can get a permit unless somebody tries to put the permit
*/
internal class Baton(
private val name: String
) {
private val queue = ArrayBlockingQueue<Any>(1).apply { put(Any()) }
@Volatile
private var permits = 0
private val lock = ReentrantLock()
private val givenCondition = lock.newCondition()
private val getCondition = lock.newCondition()

fun giveAndGet(giveComment: String = "", getComment: String = "") {
give(giveComment)
Expand All @@ -986,13 +1025,25 @@ internal class DefaultGrpcRouterTest {

fun give(comment: String = "") {
K_LOGGER.info { "'$name' baton is giving by [${Thread.currentThread().name}] - $comment" }
queue.put(Any())
lock.withLock {
if (permits == 0) {
getCondition.await()
}
permits += 1
givenCondition.signal()
}
K_LOGGER.info { "'$name' baton is given by [${Thread.currentThread().name}] - $comment" }
}

fun get(comment: String = "") {
K_LOGGER.info { "'$name' baton is getting by [${Thread.currentThread().name}] - $comment" }
queue.poll()
lock.withLock {
getCondition.signal()
permits -= 1
if (permits < 0) {
givenCondition.await()
}
}
K_LOGGER.info { "'$name' baton is got by [${Thread.currentThread().name}] - $comment" }
}
}
Expand All @@ -1009,8 +1060,8 @@ internal class DefaultGrpcRouterTest {
}.build())

responseBaton?.let {
Thread.sleep(1_000)
it.give("response sent")
Thread.sleep(1_000)
}

if (complete) {
Expand All @@ -1027,8 +1078,8 @@ internal class DefaultGrpcRouterTest {
}

responseBaton?.let {
Thread.sleep(1_000)
it.give("response sent")
Thread.sleep(1_000)
}

if (complete) {
Expand Down
Loading