diff --git a/README.md b/README.md index 50d3608e..bdf9b883 100644 --- a/README.md +++ b/README.md @@ -298,7 +298,7 @@ spec: ## 5.15.0 * Migrate to native grouped message request -* Updated th2 gradle plugin: `0.1.4` (th2-bom:4.8.0) +* Updated th2 gradle plugin: `0.1.5` (th2-bom:4.9.0) ## 5.14.1 * Updated cradle api: `5.4.4-dev` diff --git a/build.gradle b/build.gradle index 27416f11..fe5eb805 100644 --- a/build.gradle +++ b/build.gradle @@ -2,14 +2,13 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { id 'org.jetbrains.kotlin.jvm' version '1.8.22' - id "com.exactpro.th2.gradle.component" version "0.1.4" + id "com.exactpro.th2.gradle.component" version "0.1.5" id 'application' } ext { dockerImageVersion = release_version cradleVersion = '5.4.4-dev' - nettyVersion = '4.1.115.Final' } group 'com.exactpro.th2' @@ -57,17 +56,6 @@ dependencies { implementation 'io.ktor:ktor-server-netty' implementation 'io.ktor:ktor-server' - // override transitive dependency because of CVE-2024-47535 vulnerability - implementation "io.netty:netty-buffer:$nettyVersion" - implementation "io.netty:netty-codec:$nettyVersion" - implementation "io.netty:netty-codec-http:$nettyVersion" - implementation "io.netty:netty-codec-http2:$nettyVersion" - implementation "io.netty:netty-codec-socks:$nettyVersion" - implementation "io.netty:netty-common:$nettyVersion" - implementation "io.netty:netty-handler-proxy:$nettyVersion" - implementation "io.netty:netty-transport-native-epoll:$nettyVersion" - implementation "io.netty:netty-transport-native-kqueue:$nettyVersion" - implementation 'org.ehcache:ehcache:3.10.8' implementation('org.glassfish.jaxb:jaxb-runtime:2.3.9') { because("'2.3.9' version has 'EDL 1.0' license instead of 'CDDL GPL 1.1' in the '2.3.1'") diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt index e3dd5c05..222500b5 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt @@ -154,7 +154,7 @@ class MessageExtractor( }.build() ) - LOGGER.debug { "cradle iterator has been built for session group: $sessionGroup, alias: $commonStreamName" } + LOGGER.debug { "cradle iterator has been built for session group: $sessionGroup, alias: $commonStreamName, order: $order" } val start = request.startTimestamp val end = request.endTimestamp @@ -168,7 +168,7 @@ class MessageExtractor( val timeStart = System.currentTimeMillis() pipelineStatus.fetchedStart(commonStreamName.toString()) - LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName with ${batch.messageCount} messages (${batch.batchSize} bytes) has been extracted" } + LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) with ${batch.messageCount} messages (${batch.batchSize} bytes) has been extracted" } val orderedMessages = run { val filteredMessages: MutableList = ArrayList() @@ -213,7 +213,7 @@ class MessageExtractor( val messages = if (order == REVERSE) batch.messagesReverse else batch.messages val firstMessage = messages.firstOrNull() val lastMessage = messages.lastOrNull() - "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messageCount} messages left (firstId=${firstMessage?.id?.sequence} firstTimestamp=${firstMessage?.timestamp} lastId=${lastMessage?.id?.sequence} lastTimestamp=${lastMessage?.timestamp})" + "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messageCount} messages left (firstId=${firstMessage?.id?.sequence} firstTimestamp=${firstMessage?.timestamp} lastId=${lastMessage?.id?.sequence} lastTimestamp=${lastMessage?.timestamp})" } pipelineStatus.fetchedEnd(commonStreamName.toString()) @@ -235,9 +235,9 @@ class MessageExtractor( lastElement = message.id lastTimestamp = message.timestamp - LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been sent downstream" } + LOGGER.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) has been sent downstream" } } else { - LOGGER.trace { "skipping batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName - no messages left after trimming" } + LOGGER.trace { "skipping batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName (order: $order) - no messages left after trimming" } pipelineStatus.countSkippedBatches(commonStreamName.toString()) } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt index 3394d33d..98d4e44f 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt @@ -14,7 +14,6 @@ * limitations under the License. */ - package com.exactpro.th2.rptdataprovider.services.cradle import com.exactpro.cradle.BookId @@ -49,6 +48,7 @@ import kotlinx.coroutines.future.await import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.channels.ReceiveChannel import org.ehcache.Cache import org.ehcache.config.builders.CacheConfigurationBuilder import org.ehcache.config.builders.CacheManagerBuilder @@ -130,20 +130,26 @@ class CradleService(configuration: Configuration, cradleManager: CradleManager) suspend fun getGroupedMessages( scope: CoroutineScope, filter: GroupedMessageFilter, - ): Channel { + ): ReceiveChannel { val channel = Channel(1) scope.launch { withContext(cradleDispatcher) { - storage.getGroupedMessageBatches(filter).forEach { batch -> - K_LOGGER.trace { - "message batch has been received from the iterator, " + - "group: ${batch.group}, start: ${batch.firstTimestamp}, end: ${batch.lastTimestamp}" - } - channel.send(batch) - K_LOGGER.trace { - "message batch has been sent to the channel, " + - "group: ${batch.group}, start: ${batch.firstTimestamp}, end: ${batch.lastTimestamp}" + try { + storage.getGroupedMessageBatches(filter).forEach { batch -> + K_LOGGER.trace { + "message batch has been received from the iterator, " + + "group: ${batch.group}, start: ${batch.firstTimestamp}, " + + "end: ${batch.lastTimestamp}, request order: ${filter.order}" + } + channel.send(batch) + K_LOGGER.trace { + "message batch has been sent to the channel, " + + "group: ${batch.group}, start: ${batch.firstTimestamp}, " + + "end: ${batch.lastTimestamp}, request order: ${filter.order}" + } } + } finally { + channel.close() } } }