Skip to content

Commit

Permalink
fixed: unclosed channel causing a request timeout
Browse files Browse the repository at this point in the history
add request direction to log records
th2 gradle plugin 0.1.5
  • Loading branch information
lumber1000 committed Nov 25, 2024
1 parent 2513c7a commit 4caebc7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 30 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ spec:

## 5.15.0
* Migrate to native grouped message request
* Updated th2 gradle plugin: `0.1.4` (th2-bom:4.8.0)
* Updated th2 gradle plugin: `0.1.5` (th2-bom:4.9.0)

## 5.14.1
* Updated cradle api: `5.4.4-dev`
Expand Down
14 changes: 1 addition & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
id 'org.jetbrains.kotlin.jvm' version '1.8.22'
id "com.exactpro.th2.gradle.component" version "0.1.4"
id "com.exactpro.th2.gradle.component" version "0.1.5"
id 'application'
}

ext {
dockerImageVersion = release_version
cradleVersion = '5.4.4-dev'
nettyVersion = '4.1.115.Final'
}

group 'com.exactpro.th2'
Expand Down Expand Up @@ -57,17 +56,6 @@ dependencies {
implementation 'io.ktor:ktor-server-netty'
implementation 'io.ktor:ktor-server'

// override transitive dependency because of CVE-2024-47535 vulnerability
implementation "io.netty:netty-buffer:$nettyVersion"
implementation "io.netty:netty-codec:$nettyVersion"
implementation "io.netty:netty-codec-http:$nettyVersion"
implementation "io.netty:netty-codec-http2:$nettyVersion"
implementation "io.netty:netty-codec-socks:$nettyVersion"
implementation "io.netty:netty-common:$nettyVersion"
implementation "io.netty:netty-handler-proxy:$nettyVersion"
implementation "io.netty:netty-transport-native-epoll:$nettyVersion"
implementation "io.netty:netty-transport-native-kqueue:$nettyVersion"

implementation 'org.ehcache:ehcache:3.10.8'
implementation('org.glassfish.jaxb:jaxb-runtime:2.3.9') {
because("'2.3.9' version has 'EDL 1.0' license instead of 'CDDL GPL 1.1' in the '2.3.1'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class MessageExtractor<B, G, RM, PM>(
}.build()
)

LOGGER.debug { "cradle iterator has been built for session group: $sessionGroup, alias: $commonStreamName" }
LOGGER.debug { "cradle iterator has been built for session group: $sessionGroup, alias: $commonStreamName, order: $order" }

val start = request.startTimestamp
val end = request.endTimestamp
Expand All @@ -168,7 +168,7 @@ class MessageExtractor<B, G, RM, PM>(
val timeStart = System.currentTimeMillis()

pipelineStatus.fetchedStart(commonStreamName.toString())
LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName with ${batch.messageCount} messages (${batch.batchSize} bytes) has been extracted" }
LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) with ${batch.messageCount} messages (${batch.batchSize} bytes) has been extracted" }

val orderedMessages = run {
val filteredMessages: MutableList<StoredMessage> = ArrayList()
Expand Down Expand Up @@ -213,7 +213,7 @@ class MessageExtractor<B, G, RM, PM>(
val messages = if (order == REVERSE) batch.messagesReverse else batch.messages
val firstMessage = messages.firstOrNull()
val lastMessage = messages.lastOrNull()
"batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messageCount} messages left (firstId=${firstMessage?.id?.sequence} firstTimestamp=${firstMessage?.timestamp} lastId=${lastMessage?.id?.sequence} lastTimestamp=${lastMessage?.timestamp})"
"batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messageCount} messages left (firstId=${firstMessage?.id?.sequence} firstTimestamp=${firstMessage?.timestamp} lastId=${lastMessage?.id?.sequence} lastTimestamp=${lastMessage?.timestamp})"
}

pipelineStatus.fetchedEnd(commonStreamName.toString())
Expand All @@ -235,9 +235,9 @@ class MessageExtractor<B, G, RM, PM>(
lastElement = message.id
lastTimestamp = message.timestamp

LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been sent downstream" }
LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) has been sent downstream" }
} else {
LOGGER.trace { "skipping batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName - no messages left after trimming" }
LOGGER.trace { "skipping batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) - no messages left after trimming" }
pipelineStatus.countSkippedBatches(commonStreamName.toString())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/


package com.exactpro.th2.rptdataprovider.services.cradle

import com.exactpro.cradle.BookId
Expand Down Expand Up @@ -49,6 +48,7 @@ import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.channels.ReceiveChannel
import org.ehcache.Cache
import org.ehcache.config.builders.CacheConfigurationBuilder
import org.ehcache.config.builders.CacheManagerBuilder
Expand Down Expand Up @@ -130,20 +130,26 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager)
suspend fun getGroupedMessages(
scope: CoroutineScope,
filter: GroupedMessageFilter,
): Channel<StoredGroupedMessageBatch> {
): ReceiveChannel<StoredGroupedMessageBatch> {
val channel = Channel<StoredGroupedMessageBatch>(1)
scope.launch {
withContext(cradleDispatcher) {
storage.getGroupedMessageBatches(filter).forEach { batch ->
K_LOGGER.trace {
"message batch has been received from the iterator, " +
"group: ${batch.group}, start: ${batch.firstTimestamp}, end: ${batch.lastTimestamp}"
}
channel.send(batch)
K_LOGGER.trace {
"message batch has been sent to the channel, " +
"group: ${batch.group}, start: ${batch.firstTimestamp}, end: ${batch.lastTimestamp}"
try {
storage.getGroupedMessageBatches(filter).forEach { batch ->
K_LOGGER.trace {
"message batch has been received from the iterator, " +
"group: ${batch.group}, start: ${batch.firstTimestamp}, " +
"end: ${batch.lastTimestamp}, request order: ${filter.order}"
}
channel.send(batch)
K_LOGGER.trace {
"message batch has been sent to the channel, " +
"group: ${batch.group}, start: ${batch.firstTimestamp}, " +
"end: ${batch.lastTimestamp}, request order: ${filter.order}"
}
}
} finally {
channel.close()
}
}
}
Expand Down

0 comments on commit 4caebc7

Please sign in to comment.