Skip to content

Commit

Permalink
[TS-2095] Fixed problem - connect reorders sequence numbers when send…
Browse files Browse the repository at this point in the history
…ing messages in parallel mode by one session alias.
  • Loading branch information
Nikita-Smirnov-Exactpro committed Feb 12, 2024
1 parent bb6b448 commit bfeee52
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# HTTP Client v2.2.0
# HTTP Client v2.2.1

This microservice allows performing HTTP requests and receive HTTP responses. It also can perform basic authentication

Expand Down Expand Up @@ -177,6 +177,10 @@ spec:

## Changelog

### v2.2.1

* Fixed problem - connect reorders sequence numbers when sending messages in parallel mode by one session alias.

### v2.2.0

* Puts unique `th2-request-id` property to metadata of request/response messages
Expand Down
39 changes: 27 additions & 12 deletions src/main/kotlin/com/exactpro/th2/http/client/Application.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
* Copyright 2023-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,6 +67,8 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

private const val SEND_PIN_ATTRIBUTE = "send"
internal const val INPUT_QUEUE_TRANSPORT_ATTRIBUTE = SEND_PIN_ATTRIBUTE
Expand Down Expand Up @@ -110,6 +112,10 @@ class Application(
}

fun start() {
// component supported multithreading sending via single http client.
// increment sequence and putting into message batcher should be executed atomically.
val incomingLock = ReentrantLock()
val outgoingLock = ReentrantLock()
val incomingSequence = createSequence()
val outgoingSequence = createSequence()

Expand Down Expand Up @@ -140,20 +146,24 @@ class Application(
.also { registerResource("transport message batcher", it::close) }

onRequest = { request: RawHttpRequest ->
val rawMessage = request.toTransportMessage(sessionAlias, outgoingSequence())

messageBatcher.onMessage(rawMessage, sessionGroup)
val rawMessage = outgoingLock.withLock {
request.toTransportMessage(sessionAlias, outgoingSequence()).also {
messageBatcher.onMessage(it, sessionGroup)
}
}
eventBatcher.storeEvent(
rawMessage.eventId?.toProto() ?: rootEventId,
"Sent HTTP request",
"Send message"
)
}
onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> ->
messageBatcher.onMessage(
response.toTransportMessage(sessionAlias, incomingSequence(), request),
sessionGroup
)
incomingLock.withLock {
messageBatcher.onMessage(
response.toTransportMessage(sessionAlias, incomingSequence(), request),
sessionGroup
)
}
stateManager.onResponse(response)
}
} else {
Expand All @@ -167,17 +177,22 @@ class Application(
}.also { registerResource("proto message batcher", it::close) }

onRequest = { request: RawHttpRequest ->
val rawMessage = request.toProtoMessage(connectionId, outgoingSequence())

messageBatcher.onMessage(rawMessage)
val rawMessage = outgoingLock.withLock {
request.toProtoMessage(connectionId, outgoingSequence())
.also(messageBatcher::onMessage)
}
eventBatcher.storeEvent(
if (rawMessage.hasParentEventId()) rawMessage.parentEventId else rootEventId,
"Sent HTTP request",
"Send message"
)
}
onResponse = { request: RawHttpRequest, response: RawHttpResponse<*> ->
messageBatcher.onMessage(response.toProtoMessage(connectionId, incomingSequence(), request))
incomingLock.withLock {
messageBatcher.onMessage(
response.toProtoMessage(connectionId, incomingSequence(), request)
)
}
stateManager.onResponse(response)
}
}
Expand Down

0 comments on commit bfeee52

Please sign in to comment.