From bfeee52bb8acc5431ac7613fcbcbb65667d7584d Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Mon, 12 Feb 2024 18:17:44 +0400 Subject: [PATCH] [TS-2095] Fixed problem - connect reorders sequence numbers when sending messages in parallel mode by one session alias. --- README.md | 6 ++- .../exactpro/th2/http/client/Application.kt | 39 +++++++++++++------ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 542c703..87c24b2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# HTTP Client v2.2.0 +# HTTP Client v2.2.1 This microservice allows performing HTTP requests and receive HTTP responses. It also can perform basic authentication @@ -177,6 +177,10 @@ spec: ## Changelog +### v2.2.1 + +* Fixed problem - connect reorders sequence numbers when sending messages in parallel mode by one session alias. + ### v2.2.0 * Puts unique `th2-request-id` property to metadata of request/response messages diff --git a/src/main/kotlin/com/exactpro/th2/http/client/Application.kt b/src/main/kotlin/com/exactpro/th2/http/client/Application.kt index 085cf1a..3eff5d3 100644 --- a/src/main/kotlin/com/exactpro/th2/http/client/Application.kt +++ b/src/main/kotlin/com/exactpro/th2/http/client/Application.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 Exactpro (Exactpro Systems Limited) + * Copyright 2023-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,8 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock private const val SEND_PIN_ATTRIBUTE = "send" internal const val INPUT_QUEUE_TRANSPORT_ATTRIBUTE = SEND_PIN_ATTRIBUTE @@ -110,6 +112,10 @@ class Application( } fun start() { + // component supported multithreading sending via single http client. + // increment sequence and putting into message batcher should be executed atomically. + val incomingLock = ReentrantLock() + val outgoingLock = ReentrantLock() val incomingSequence = createSequence() val outgoingSequence = createSequence() @@ -140,9 +146,11 @@ class Application( .also { registerResource("transport message batcher", it::close) } onRequest = { request: RawHttpRequest -> - val rawMessage = request.toTransportMessage(sessionAlias, outgoingSequence()) - - messageBatcher.onMessage(rawMessage, sessionGroup) + val rawMessage = outgoingLock.withLock { + request.toTransportMessage(sessionAlias, outgoingSequence()).also { + messageBatcher.onMessage(it, sessionGroup) + } + } eventBatcher.storeEvent( rawMessage.eventId?.toProto() ?: rootEventId, "Sent HTTP request", @@ -150,10 +158,12 @@ class Application( ) } onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> - messageBatcher.onMessage( - response.toTransportMessage(sessionAlias, incomingSequence(), request), - sessionGroup - ) + incomingLock.withLock { + messageBatcher.onMessage( + response.toTransportMessage(sessionAlias, incomingSequence(), request), + sessionGroup + ) + } stateManager.onResponse(response) } } else { @@ -167,9 +177,10 @@ class Application( }.also { registerResource("proto message batcher", it::close) } onRequest = { request: RawHttpRequest -> - val rawMessage = request.toProtoMessage(connectionId, outgoingSequence()) - - messageBatcher.onMessage(rawMessage) + val rawMessage = outgoingLock.withLock { + request.toProtoMessage(connectionId, outgoingSequence()) + .also(messageBatcher::onMessage) + } eventBatcher.storeEvent( if (rawMessage.hasParentEventId()) rawMessage.parentEventId else rootEventId, "Sent HTTP request", @@ -177,7 +188,11 @@ class Application( ) } onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> -> - messageBatcher.onMessage(response.toProtoMessage(connectionId, incomingSequence(), request)) + incomingLock.withLock { + messageBatcher.onMessage( + response.toProtoMessage(connectionId, incomingSequence(), request) + ) + } stateManager.onResponse(response) } }