diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt index 69dbfdcf..5078f1d9 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Context.kt @@ -37,7 +37,6 @@ import com.exactpro.th2.rptdataprovider.producers.MessageProducer import com.exactpro.th2.rptdataprovider.serialization.InstantBackwardCompatibilitySerializer import com.exactpro.th2.rptdataprovider.server.ServerType import com.exactpro.th2.rptdataprovider.services.cradle.CradleService -import com.exactpro.th2.rptdataprovider.services.cradle.MessageGroupCradleService import com.exactpro.th2.rptdataprovider.services.rabbitmq.RabbitMqService import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper @@ -68,7 +67,7 @@ abstract class Context( val grpcConfig: GrpcConfiguration, - val cradleService: CradleService = createCradleService(configuration, cradleManager), + val cradleService: CradleService = CradleService(configuration, cradleManager), val rabbitMqService: RabbitMqService, @@ -143,15 +142,5 @@ abstract class Context( ) } } - - @JvmStatic - protected fun createCradleService( - configuration: Configuration, - cradleManager: CradleManager - ) = if (configuration.searchBySessionGroup.value.toBoolean()) { - MessageGroupCradleService(configuration, cradleManager) - } else { - CradleService(configuration, cradleManager) - } } } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt index 0263ffe9..6a1822cb 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/ProtoContext.kt @@ -70,7 +70,7 @@ class ProtoContext( grpcConfig: GrpcConfiguration, - cradleService: CradleService = createCradleService(configuration, cradleManager), + cradleService: CradleService = CradleService(configuration, cradleManager), rabbitMqService: RabbitMqService = ProtoRabbitMqService( configuration, diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt index 7bfc6145..a85b5f64 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/TransportContext.kt @@ -70,7 +70,7 @@ class TransportContext( grpcConfig: GrpcConfiguration, - cradleService: CradleService = createCradleService(configuration, cradleManager), + cradleService: CradleService = CradleService(configuration, cradleManager), rabbitMqService: RabbitMqService = TransportRabbitMqService( configuration, diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt index 5be9bd2a..0d2030b4 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt @@ -72,8 +72,6 @@ class CustomConfigurationClass { val useStrictMode = false - val searchBySessionGroup = true - val useTransportMode: Boolean = true val serverType: ServerType = ServerType.HTTP @@ -103,9 +101,6 @@ class Configuration(customConfiguration: CustomConfigurationClass) { val messageCacheSize: Variable = Variable("messageCacheSize", customConfiguration.messageCacheSize.toString(), "1") - val searchBySessionGroup: Variable = - Variable("searchBySessionGroup", customConfiguration.searchBySessionGroup.toString(), "true") - val aliasToGroupCacheSize: Variable = // TODO: added value check Variable("aliasToGroupCacheSize", customConfiguration.aliasToGroupCacheSize.toString(), "1000") diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/StreamPointer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/StreamPointer.kt index 2aad9672..37b3edfe 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/StreamPointer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/internal/StreamPointer.kt @@ -20,17 +20,62 @@ import com.exactpro.cradle.BookId import com.exactpro.cradle.Direction import java.time.Instant +class CommonStreamName( + val bookId: BookId, + val name: String +) { + internal val fullName = "${bookId.name}:$name" + + override fun toString(): String { + return fullName + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false -data class StreamName( - val name: String, + other as CommonStreamName + + return fullName == other.fullName + } + + override fun hashCode(): Int { + return fullName.hashCode() + } +} + +class StreamName( + bookId: BookId, + name: String, val direction: Direction, - val bookId: BookId ) { - private val fullName = "$name:$direction" + val common = CommonStreamName(bookId, name) + val bookId: BookId + get() = common.bookId + val name: String + get() = common.name + + internal val fullName = "${common.fullName}:$direction" override fun toString(): String { return fullName } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + if (!super.equals(other)) return false + + other as StreamName + + return fullName == other.fullName + } + + override fun hashCode(): Int { + var result = super.hashCode() + result = 31 * result + fullName.hashCode() + return result + } } @@ -48,7 +93,7 @@ data class StreamPointer( timestamp: Instant, hasStarted: Boolean ) : this( - stream = StreamName(streamName, direction, bookId), + stream = StreamName(bookId, streamName, direction), sequence = sequence, timestamp = timestamp, hasStarted = hasStarted diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/MessageBatch.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/MessageBatch.kt index c0b508cb..694a803f 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/MessageBatch.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/MessageBatch.kt @@ -18,13 +18,14 @@ package com.exactpro.th2.rptdataprovider.entities.responses import com.exactpro.cradle.messages.StoredMessage import com.exactpro.cradle.messages.StoredMessageId +import java.time.Instant data class MessageBatchWrapper( - val batchId: StoredMessageId, + val batchFirstTime: Instant, val trimmedMessages: Collection> ) data class StoredMessageBatchWrapper( - val batchId: StoredMessageId, + val batchFirstTime: Instant, val trimmedMessages: Collection ) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/StreamInfo.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/StreamInfo.kt index 4fc948e9..623790c3 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/StreamInfo.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/responses/StreamInfo.kt @@ -16,16 +16,17 @@ package com.exactpro.th2.rptdataprovider.entities.responses +import com.exactpro.cradle.Direction import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.dataprovider.grpc.Stream import com.exactpro.th2.rptdataprovider.convertToProto import com.exactpro.th2.rptdataprovider.cradleDirectionToGrpc -import com.exactpro.th2.rptdataprovider.entities.internal.StreamName +import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.fasterxml.jackson.annotation.JsonIgnore import io.github.oshai.kotlinlogging.KotlinLogging import java.time.Instant -data class StreamInfo(val streamPointer: StreamName, @JsonIgnore val messageId: StoredMessageId? = null) { +data class StreamInfo(val streamPointer: CommonStreamName, @JsonIgnore val messageId: StoredMessageId? = null) { @JsonIgnore private val lastMessage = messageId ?: let { @@ -35,7 +36,7 @@ data class StreamInfo(val streamPointer: StreamName, @JsonIgnore val messageId: StoredMessageId( streamPointer.bookId, streamPointer.name, - streamPointer.direction, + Direction.FIRST, Instant.ofEpochMilli(0), 0 ) @@ -48,16 +49,13 @@ data class StreamInfo(val streamPointer: StreamName, @JsonIgnore val messageId: } fun convertToProto(): Stream { - return Stream.newBuilder() - .setDirection(cradleDirectionToGrpc(streamPointer.direction)) - .setSession(streamPointer.name) - .setLastId( - lastMessage.let { - logger.trace { "stream $streamPointer - lastElement is ${it.sequence}" } - it.convertToProto() - - } - ) - .build() + return lastMessage.run { + logger.trace { "stream $streamPointer - lastElement is $sequence" } + Stream.newBuilder() + .setDirection(cradleDirectionToGrpc(direction)) + .setSession(sessionAlias) + .setLastId(this.convertToProto()) + .build() + } } } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/PipelineComponent.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/PipelineComponent.kt index ad4b28df..9723b630 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/PipelineComponent.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/PipelineComponent.kt @@ -17,19 +17,19 @@ package com.exactpro.th2.rptdataprovider.handlers import com.exactpro.th2.rptdataprovider.Context +import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.exactpro.th2.rptdataprovider.entities.internal.PipelineStepObject -import com.exactpro.th2.rptdataprovider.entities.internal.StreamName import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest +import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel -import io.github.oshai.kotlinlogging.KotlinLogging abstract class PipelineComponent( val context: Context, val searchRequest: SseMessageSearchRequest, val externalScope: CoroutineScope, - val streamName: StreamName? = null, + open val commonStreamName: CommonStreamName? = null, val previousComponent: PipelineComponent? = null, messageFlowCapacity: Int ) { diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt index 9a15bff1..1a3fcd2d 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt @@ -16,7 +16,6 @@ package com.exactpro.th2.rptdataprovider.handlers -import com.exactpro.cradle.Direction import com.exactpro.cradle.TimeRelation import com.exactpro.cradle.TimeRelation.AFTER import com.exactpro.cradle.TimeRelation.BEFORE @@ -30,12 +29,12 @@ import com.exactpro.th2.rptdataprovider.ProtoMessageGroup import com.exactpro.th2.rptdataprovider.ProtoRawMessage import com.exactpro.th2.rptdataprovider.TransportMessageGroup import com.exactpro.th2.rptdataprovider.TransportRawMessage +import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.exactpro.th2.rptdataprovider.entities.internal.EmptyPipelineObject import com.exactpro.th2.rptdataprovider.entities.internal.PipelineFilteredMessage import com.exactpro.th2.rptdataprovider.entities.internal.PipelineKeepAlive import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch import com.exactpro.th2.rptdataprovider.entities.internal.StreamEndObject -import com.exactpro.th2.rptdataprovider.entities.internal.StreamName import com.exactpro.th2.rptdataprovider.entities.mappers.TimeRelationMapper import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest import com.exactpro.th2.rptdataprovider.entities.responses.StreamInfo @@ -183,9 +182,7 @@ abstract class SearchMessagesHandler( val pipelineStatus = PipelineStatus() - val streamNames = resultRequest.stream.flatMap { stream -> - Direction.values().map { StreamName(stream, it, request.bookId) } - } + val streamNames = resultRequest.stream.map { stream -> CommonStreamName(request.bookId, stream) } val coroutineScope = CoroutineScope(coroutineContext + Job(coroutineContext[Job])) pipelineStatus.addStreams(streamNames.map { it.toString() }) @@ -222,7 +219,7 @@ abstract class SearchMessagesHandler( } while (listPair.isEmpty()) listPair.first().let { - streamInfoList.add(StreamInfo(messageExtractor.streamName!!, it.first)) + streamInfoList.add(StreamInfo(messageExtractor.commonStreamName, it.first)) } } return streamInfoList diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/ChainBuilder.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/ChainBuilder.kt index 018a8018..71cf62e1 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/ChainBuilder.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/ChainBuilder.kt @@ -16,7 +16,6 @@ package com.exactpro.th2.rptdataprovider.handlers.messages -import com.exactpro.cradle.Direction import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch @@ -26,6 +25,7 @@ import com.exactpro.th2.rptdataprovider.ProtoMessageGroup import com.exactpro.th2.rptdataprovider.ProtoRawMessage import com.exactpro.th2.rptdataprovider.TransportMessageGroup import com.exactpro.th2.rptdataprovider.TransportRawMessage +import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.exactpro.th2.rptdataprovider.entities.internal.StreamName import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus @@ -38,9 +38,7 @@ abstract class ChainBuilder( protected val pipelineStatus: PipelineStatus ) { fun buildChain(): StreamMerger { - val streamNames = - request.stream.flatMap { stream -> Direction.values().map { StreamName(stream, it, request.bookId) } } - + val streamNames = request.stream.map { stream -> CommonStreamName(request.bookId, stream) } pipelineStatus.addStreams(streamNames.map { it.toString() }) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt index 9c638690..c16dbff2 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchConverter.kt @@ -30,9 +30,9 @@ import com.exactpro.th2.rptdataprovider.ProtoMessageGroup import com.exactpro.th2.rptdataprovider.ProtoRawMessage import com.exactpro.th2.rptdataprovider.TransportMessageGroup import com.exactpro.th2.rptdataprovider.TransportRawMessage +import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.exactpro.th2.rptdataprovider.entities.internal.PipelineCodecRequest import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch -import com.exactpro.th2.rptdataprovider.entities.internal.StreamName import com.exactpro.th2.rptdataprovider.entities.mappers.ProtoMessageMapper import com.exactpro.th2.rptdataprovider.entities.mappers.TransportMessageMapper import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest @@ -44,15 +44,15 @@ import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest import com.exactpro.th2.rptdataprovider.services.rabbitmq.ProtoCodecBatchRequest import com.exactpro.th2.rptdataprovider.services.rabbitmq.TransportCodecBatchRequest +import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import io.github.oshai.kotlinlogging.KotlinLogging abstract class MessageBatchConverter( context: Context, searchRequest: SseMessageSearchRequest, - streamName: StreamName?, + streamName: CommonStreamName?, externalScope: CoroutineScope, previousComponent: PipelineComponent?, messageFlowCapacity: Int, @@ -84,7 +84,7 @@ abstract class MessageBatchConverter( ) : this( pipelineComponent.context, pipelineComponent.searchRequest, - pipelineComponent.streamName, + pipelineComponent.commonStreamName, pipelineComponent.externalScope, pipelineComponent, messageFlowCapacity, @@ -100,13 +100,13 @@ abstract class MessageBatchConverter( if (pipelineMessage is PipelineRawBatch) { pipelineStatus.convertStart( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.storedBatchWrapper.trimmedMessages.size.toLong() ) val timeStart = System.currentTimeMillis() - logger.trace { "received raw batch (stream=${streamName.toString()} id=${pipelineMessage.storedBatchWrapper.batchId})" } + logger.trace { "received raw batch (stream=${commonStreamName.toString()} first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime})" } val filteredMessages: List> = pipelineMessage.storedBatchWrapper.trimmedMessages.convertAndFilter() @@ -114,7 +114,7 @@ abstract class MessageBatchConverter( pipelineMessage.streamEmpty, pipelineMessage.lastProcessedId, pipelineMessage.lastScannedTime, - MessageBatchWrapper(pipelineMessage.storedBatchWrapper.batchId, filteredMessages.map(MessageHolder::messageWrapper)), + MessageBatchWrapper(pipelineMessage.storedBatchWrapper.batchFirstTime, filteredMessages.map(MessageHolder::messageWrapper)), createCodecBatchRequest(filteredMessages), info = pipelineMessage.info.also { it.startConvert = timeStart @@ -124,24 +124,24 @@ abstract class MessageBatchConverter( ) pipelineStatus.convertEnd( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.storedBatchWrapper.trimmedMessages.size.toLong() ) if (codecRequest.codecRequest.groupsCount > 0) { sendToChannel(codecRequest) - logger.trace { "converted batch is sent downstream (stream=${streamName.toString()} id=${codecRequest.storedBatchWrapper.batchId} requestHash=${codecRequest.codecRequest.requestHash})" } + logger.trace { "converted batch is sent downstream (stream=${commonStreamName.toString()} first-time=${codecRequest.storedBatchWrapper.batchFirstTime} requestHash=${codecRequest.codecRequest.requestHash})" } } else { - logger.trace { "converted batch is discarded because it has no messages (stream=${streamName.toString()} id=${pipelineMessage.storedBatchWrapper.batchId})" } + logger.trace { "converted batch is discarded because it has no messages (stream=${commonStreamName.toString()} first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime})" } } pipelineStatus.convertSendDownstream( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.storedBatchWrapper.trimmedMessages.size.toLong() ) pipelineStatus.countParsePrepared( - streamName.toString(), + commonStreamName.toString(), filteredMessages.size.toLong() ) @@ -188,7 +188,7 @@ class ProtoMessageBatchConverter( ((included.isNullOrEmpty() || included.contains(protocol)) && (excluded.isNullOrEmpty() || !excluded.contains(protocol))) .also { - logger.trace { "message ${message?.sequence} has protocol $protocol (matchesProtocolFilter=${it}) (stream=${streamName.toString()})" } + logger.trace { "message ${message?.sequence} has protocol $protocol (matchesProtocolFilter=${it}) (stream=${commonStreamName.toString()})" } } }.toList() @@ -198,7 +198,7 @@ class ProtoMessageBatchConverter( .newBuilder() .addAllGroups(filteredMessages.map(MessageHolder::messageGroup)) .build(), - streamName.toString() + commonStreamName.toString() ) } } @@ -224,7 +224,7 @@ class TransportMessageBatchConverter( ((included.isNullOrEmpty() || included.contains(protocol)) && (excluded.isNullOrEmpty() || !excluded.contains(protocol))) .also { - logger.trace { "message ${message?.id?.sequence} has protocol $protocol (matchesProtocolFilter=$it) (stream=$streamName)" } + logger.trace { "message ${message?.id?.sequence} has protocol $protocol (matchesProtocolFilter=$it) (stream=$commonStreamName)" } } }.toList() @@ -236,7 +236,7 @@ class TransportMessageBatchConverter( .setSessionGroup(firstMessageWrapper.sessionGroup) .setGroups(filteredMessages.map(MessageHolder::messageGroup)) .build(), - streamName.toString() + commonStreamName.toString() ) } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt index d7813a58..6e4ca4b5 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchDecoder.kt @@ -16,25 +16,25 @@ package com.exactpro.th2.rptdataprovider.handlers.messages import com.exactpro.th2.rptdataprovider.Context +import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.exactpro.th2.rptdataprovider.entities.internal.PipelineCodecRequest import com.exactpro.th2.rptdataprovider.entities.internal.PipelineDecodedBatch import com.exactpro.th2.rptdataprovider.entities.internal.ProtoProtocolInfo.isImage -import com.exactpro.th2.rptdataprovider.entities.internal.StreamName import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest import com.exactpro.th2.rptdataprovider.handlers.PipelineComponent import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchRequest import com.exactpro.th2.rptdataprovider.services.rabbitmq.CodecBatchResponse +import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import io.github.oshai.kotlinlogging.KotlinLogging class MessageBatchDecoder( context: Context, searchRequest: SseMessageSearchRequest, - streamName: StreamName?, + streamName: CommonStreamName?, externalScope: CoroutineScope, previousComponent: PipelineComponent?, messageFlowCapacity: Int, @@ -55,7 +55,7 @@ class MessageBatchDecoder( ) : this( pipelineComponent.context, pipelineComponent.searchRequest, - pipelineComponent.streamName, + pipelineComponent.commonStreamName, pipelineComponent.externalScope, pipelineComponent, messageFlowCapacity, @@ -95,10 +95,10 @@ class MessageBatchDecoder( ) ) } else { - logger.trace { "received converted batch (stream=${streamName.toString()} id=${pipelineMessage.storedBatchWrapper.batchId} requestHash=${pipelineMessage.codecRequest.requestHash})" } + logger.trace { "received converted batch (stream=${commonStreamName.toString()} first-time=${pipelineMessage.storedBatchWrapper.batchFirstTime} requestHash=${pipelineMessage.codecRequest.requestHash})" } pipelineStatus.decodeStart( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.codecRequest.groupsCount.toLong() ) @@ -111,20 +111,20 @@ class MessageBatchDecoder( protocol ) pipelineStatus.decodeEnd( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.codecRequest.groupsCount.toLong() ) sendToChannel(result) pipelineStatus.decodeSendDownstream( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.codecRequest.groupsCount.toLong() ) - logger.trace { "decoded batch is sent downstream (stream=${streamName.toString()} id=${result.storedBatchWrapper.batchId} requestHash=${pipelineMessage.codecRequest.requestHash})" } + logger.trace { "decoded batch is sent downstream (stream=${commonStreamName.toString()} first-time=${result.storedBatchWrapper.batchFirstTime} requestHash=${pipelineMessage.codecRequest.requestHash})" } } pipelineStatus.countParseRequested( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.storedBatchWrapper.trimmedMessages.count().toLong() ) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchUnpacker.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchUnpacker.kt index f32bddc0..439aa825 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchUnpacker.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageBatchUnpacker.kt @@ -40,7 +40,7 @@ import kotlin.time.measureTimedValue abstract class MessageBatchUnpacker( context: Context, searchRequest: SseMessageSearchRequest, - streamName: StreamName?, + streamName: CommonStreamName?, externalScope: CoroutineScope, previousComponent: PipelineComponent?, messageFlowCapacity: Int, @@ -60,7 +60,7 @@ abstract class MessageBatchUnpacker( ) : this( pipelineComponent.context, pipelineComponent.searchRequest, - pipelineComponent.streamName, + pipelineComponent.commonStreamName, pipelineComponent.externalScope, pipelineComponent, messageFlowCapacity, @@ -92,7 +92,7 @@ abstract class MessageBatchUnpacker( val messages = pipelineMessage.storedBatchWrapper.trimmedMessages val errorMessage = """"codec response is null - | (stream=${streamName} + | (stream=${commonStreamName} | firstRequestId=${messages.first().id.sequence} | lastRequestId=${messages.last().id.sequence} | requestSize=${messages.size}) @@ -114,7 +114,7 @@ abstract class MessageBatchUnpacker( if (pipelineMessage is PipelineDecodedBatch<*, *, *, *>) { pipelineStatus.unpackStart( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.storedBatchWrapper.trimmedMessages.size.toLong() ) @@ -127,7 +127,7 @@ abstract class MessageBatchUnpacker( } }.also { logger.debug { - "awaited codec response for ${it.duration.toDouble(DurationUnit.MILLISECONDS)}ms (stream=${streamName} firstRequestId=${requests.first().id.sequence} lastRequestId=${requests.last().id.sequence} requestSize=${requests.size} responseSize=${it.value?.groupCount})" + "awaited codec response for ${it.duration.toDouble(DurationUnit.MILLISECONDS)}ms (stream=${commonStreamName} firstRequestId=${requests.first().id.sequence} lastRequestId=${requests.last().id.sequence} requestSize=${requests.size} responseSize=${it.value?.groupCount})" } }.value?.groups @@ -148,24 +148,24 @@ abstract class MessageBatchUnpacker( val messages = pipelineMessage.storedBatchWrapper.trimmedMessages - logger.debug { "codec response unpacking took ${result.duration.toDouble(DurationUnit.MILLISECONDS)}ms (stream=${streamName.toString()} firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${messages.size})" } + logger.debug { "codec response unpacking took ${result.duration.toDouble(DurationUnit.MILLISECONDS)}ms (stream=${commonStreamName.toString()} firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${messages.size})" } pipelineMessage.info.buildMessage = result.duration.toDouble(DurationUnit.MILLISECONDS).toLong() StreamWriter.setBuildMessage(pipelineMessage.info) pipelineStatus.unpackEnd( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.storedBatchWrapper.trimmedMessages.size.toLong() ) result.value.forEach { (sendToChannel(it)) } pipelineStatus.unpackSendDownstream( - streamName.toString(), + commonStreamName.toString(), pipelineMessage.storedBatchWrapper.trimmedMessages.size.toLong() ) - logger.debug { "unpacked responses are sent (stream=${streamName.toString()} firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${result.value.size})" } + logger.debug { "unpacked responses are sent (stream=${commonStreamName.toString()} firstId=${messages.first().id.sequence} lastId=${messages.last().id.sequence} messages=${result.value.size})" } } else { sendToChannel(pipelineMessage) @@ -233,7 +233,7 @@ class ProtoMessageBatchUnpacker( throw CodecResponseException( """codec dont parsed all messages - | (stream=${streamName} + | (stream=${commonStreamName} | firstRequestId=${messages.first().id.sequence} | lastRequestId=${messages.last().id.sequence} | notParsedMessagesId=$notParsed @@ -259,10 +259,10 @@ class ProtoMessageBatchUnpacker( let { if (response == null) { - pipelineStatus.countParseReceivedFailed(streamName.toString()) + pipelineStatus.countParseReceivedFailed(commonStreamName.toString()) } - pipelineStatus.countParseReceivedTotal(streamName.toString()) + pipelineStatus.countParseReceivedTotal(commonStreamName.toString()) response?.messagesList?.map { ProtoBodyWrapper(it.message) } }, @@ -321,7 +321,7 @@ class TransportMessageBatchUnpacker( throw CodecResponseException( """codec dont parsed all messages - | (stream=${streamName} + | (stream=${commonStreamName} | firstRequestId=${messages.first().id.sequence} | lastRequestId=${messages.last().id.sequence} | notParsedMessagesId=$notParsed @@ -347,10 +347,10 @@ class TransportMessageBatchUnpacker( let { if (response == null) { - pipelineStatus.countParseReceivedFailed(streamName.toString()) + pipelineStatus.countParseReceivedFailed(commonStreamName.toString()) } - pipelineStatus.countParseReceivedTotal(streamName.toString()) + pipelineStatus.countParseReceivedTotal(commonStreamName.toString()) response?.messages?.map { TransportBodyWrapper((it as ParsedMessage), rawMessageWrapper.book, rawMessageWrapper.sessionAlias) } }, diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt index bc067cf8..1a960798 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt @@ -17,15 +17,17 @@ package com.exactpro.th2.rptdataprovider.handlers.messages import com.exactpro.cradle.Order -import com.exactpro.cradle.TimeRelation -import com.exactpro.cradle.messages.MessageFilterBuilder +import com.exactpro.cradle.TimeRelation.AFTER +import com.exactpro.cradle.TimeRelation.BEFORE +import com.exactpro.cradle.messages.GroupedMessageFilterBuilder +import com.exactpro.cradle.messages.StoredGroupedMessageBatch import com.exactpro.cradle.messages.StoredMessage import com.exactpro.cradle.messages.StoredMessageBatch import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.rptdataprovider.Context +import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.exactpro.th2.rptdataprovider.entities.internal.EmptyPipelineObject import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch -import com.exactpro.th2.rptdataprovider.entities.internal.StreamName import com.exactpro.th2.rptdataprovider.entities.internal.StreamPointer import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest import com.exactpro.th2.rptdataprovider.entities.responses.StoredMessageBatchWrapper @@ -47,12 +49,12 @@ import java.time.Instant class MessageExtractor( context: Context, val request: SseMessageSearchRequest, - stream: StreamName, + override val commonStreamName: CommonStreamName, externalScope: CoroutineScope, messageFlowCapacity: Int, private val pipelineStatus: PipelineStatus ) : PipelineComponent( - context, request, externalScope, stream, messageFlowCapacity = messageFlowCapacity + context, request, externalScope, commonStreamName, messageFlowCapacity = messageFlowCapacity ) { companion object { @@ -66,7 +68,7 @@ class MessageExtractor( private var lastElement: StoredMessageId? = null private var lastTimestamp: Instant? = null - private val order = if (request.searchDirection == TimeRelation.AFTER) { + private val order = if (request.searchDirection == AFTER) { Order.DIRECT } else { Order.REVERSE @@ -77,7 +79,7 @@ class MessageExtractor( try { processMessage() } catch (e: CancellationException) { - logger.debug { "message extractor for stream $streamName has been stopped" } + logger.debug { "message extractor for stream $commonStreamName has been stopped" } } catch (e: Exception) { logger.error(e) { "unexpected exception" } throw e @@ -85,6 +87,32 @@ class MessageExtractor( } } + private val StoredGroupedMessageBatch.orderedMessages: Collection + get() = if (order == Order.DIRECT) messages else messagesReverse + + private val StoredMessage.isNotCorrectAlias: Boolean + get() = commonStreamName?.let { sessionAlias != it.name } ?: false + + private fun StoredMessage.isNotResume(point: StreamPointer?): Boolean = + point?.let { + point.sequence == sequence && + point.timestamp == timestamp + point.stream.name == sessionAlias && + point.stream.direction == direction + + + } ?: false + + private val StoredMessage.isEarlierStart + get() = request.startTimestamp?.let { + if (order == Order.DIRECT) timestamp.isBefore(it) else timestamp.isAfter(it) + } ?: false + + private val StoredMessage.isLaterEnd + get() = request.endTimestamp?.let { + if (order == Order.DIRECT) timestamp.isAfterOrEqual(it) else timestamp.isBeforeOrEqual(it) + } ?: false + private fun getMessagesFromBatch(batch: StoredMessageBatch): Collection { return if (order == Order.DIRECT) { batch.messages @@ -147,74 +175,117 @@ class MessageExtractor( } } - streamName!! + commonStreamName!! - val resumeFromId = request.resumeFromIdsList.firstOrNull { - it.stream.name == streamName.name && it.stream.direction == streamName.direction - } + val resumeFromId = request.resumeFromIdsList.asSequence() + .filter { it.stream.name == commonStreamName.name } + .run { + when(request.searchDirection) { + BEFORE -> maxByOrNull { it.timestamp } + AFTER -> minByOrNull { it.timestamp } + } + } - logger.debug { "acquiring cradle iterator for stream $streamName" } + logger.debug { "acquiring cradle iterator for stream $commonStreamName" } - resumeFromId?.let { logger.debug { "resume sequence for stream $streamName is set to ${it.sequence}" } } - request.startTimestamp?.let { logger.debug { "start timestamp for stream $streamName is set to $it" } } + resumeFromId?.let { logger.debug { "resume sequence for stream $commonStreamName is set to ${it.sequence}" } } + request.startTimestamp?.let { logger.debug { "start timestamp for stream $commonStreamName is set to $it" } } if (resumeFromId == null || resumeFromId.hasStarted) { - val cradleMessageIterable = context.cradleService.getMessagesBatchesSuspend( - MessageFilterBuilder() - .sessionAlias(streamName.name) - .direction(streamName.direction) - .order(order) - .bookId(streamName.bookId) - - // timestamps will be ignored if resumeFromId is present - .also { builder -> - if (resumeFromId != null) { - builder.sequence().let { - if (order == Order.DIRECT) { - it.isGreaterThanOrEqualTo(resumeFromId.sequence) - } else { - it.isLessThanOrEqualTo(resumeFromId.sequence) - } - } - } - // always need to make sure that we send messages within the specified timestamp (in case the resume ID points to the past) - if (order == Order.DIRECT) { - request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) } - request.endTimestamp?.let { builder.timestampTo().isLessThan(it) } - } else { - request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) } - request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) } - } - }.build() + val sessionGroup = context.cradleService.getSessionGroup( + commonStreamName.bookId, + commonStreamName.name, + request.startTimestamp, + request.endTimestamp, ) + val cradleMessageIterable = context.cradleService.getGroupedMessages( + this, + GroupedMessageFilterBuilder().apply { + groupName( + sessionGroup, + ) + order(order) + + if (order == Order.DIRECT) { + request.startTimestamp?.let { timestampFrom().isGreaterThanOrEqualTo(it) } + request.endTimestamp?.let { timestampTo().isLessThan(it) } + } else { + request.startTimestamp?.let { timestampTo().isLessThanOrEqualTo(it) } + request.endTimestamp?.let { timestampFrom().isGreaterThan(it) } + } + }.build() + ) +// getMessagesBatchesSuspend( +// MessageFilterBuilder() +// .sessionAlias(streamName.name) +// .direction(streamName.direction) +// .order(order) +// .bookId(streamName.bookId) - logger.debug { "cradle iterator has been built for stream $streamName" } - + // timestamps will be ignored if resumeFromId is present +// .also { builder -> +// if (resumeFromId != null) { +// builder.sequence().let { +// if (order == Order.DIRECT) { +// it.isGreaterThanOrEqualTo(resumeFromId.sequence) +// } else { +// it.isLessThanOrEqualTo(resumeFromId.sequence) +// } +// } +// } +// // always need to make sure that we send messages within the specified timestamp (in case the resume ID points to the past) +// if (order == Order.DIRECT) { +// request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) } +// request.endTimestamp?.let { builder.timestampTo().isLessThan(it) } +// } else { +// request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) } +// request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) } +// } +// }.build() +// ) + + logger.debug { "cradle iterator has been built for session group: $sessionGroup, alias: $commonStreamName" } + + var firstFound = false + var lastFound = false var isLastMessageTrimmed = false - for (batch in cradleMessageIterable) { - if (externalScope.isActive && !isLastMessageTrimmed) { + for (batch: StoredGroupedMessageBatch in cradleMessageIterable) { + if (externalScope.isActive && !lastFound) { val timeStart = System.currentTimeMillis() - pipelineStatus.fetchedStart(streamName.toString()) - logger.trace { "batch ${batch.id.sequence} of stream $streamName with ${batch.messageCount} messages (${batch.batchSize} bytes) has been extracted" } + pipelineStatus.fetchedStart(commonStreamName.toString()) + logger.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName with ${batch.messageCount} messages (${batch.batchSize} bytes) has been extracted" } - val trimmedMessages = trimMessagesListHead(getMessagesFromBatch(batch), resumeFromId) - .dropLastWhile { message -> - trimMessagesListTail(message).also { - isLastMessageTrimmed = isLastMessageTrimmed || it + val trimmedMessages = ArrayList(batch.messageCount) + batch.orderedMessages.forEach { msg -> + if (msg.isNotCorrectAlias) { return@forEach } + + if (!firstFound) { + if (msg.isEarlierStart) { return@forEach } + if (msg.isNotResume(resumeFromId)) { return@forEach } + firstFound = true + } + + if(!lastFound) { + if (msg.isLaterEnd) { + lastFound = true + return@forEach } } + trimmedMessages.add(msg) + } + val firstMessage = if (order == Order.DIRECT) batch.messages.first() else batch.messages.last() val lastMessage = if (order == Order.DIRECT) batch.messages.last() else batch.messages.first() logger.trace { - "batch ${batch.id.sequence} of stream $streamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messages.size} messages left (firstId=${firstMessage.id.sequence} firstTimestamp=${firstMessage.timestamp} lastId=${lastMessage.id.sequence} lastTimestamp=${lastMessage.timestamp})" + "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messageCount} messages left (firstId=${firstMessage.id.sequence} firstTimestamp=${firstMessage.timestamp} lastId=${lastMessage.id.sequence} lastTimestamp=${lastMessage.timestamp})" } - pipelineStatus.fetchedEnd(streamName.toString()) + pipelineStatus.fetchedEnd(commonStreamName.toString()) try { trimmedMessages.last().let { message -> @@ -222,7 +293,7 @@ class MessageExtractor( false, message.id, message.timestamp, - StoredMessageBatchWrapper(batch.id, trimmedMessages) + StoredMessageBatchWrapper(batch.firstTimestamp, trimmedMessages) ).also { it.info.startExtract = timeStart it.info.endExtract = System.currentTimeMillis() @@ -231,23 +302,23 @@ class MessageExtractor( lastElement = message.id lastTimestamp = message.timestamp - logger.trace { "batch ${batch.id.sequence} of stream $streamName has been sent downstream" } + logger.trace { "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been sent downstream" } } } catch (e: NoSuchElementException) { - logger.trace { "skipping batch ${batch.id.sequence} of stream $streamName - no messages left after trimming" } - pipelineStatus.countSkippedBatches(streamName.toString()) + logger.trace { "skipping batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName - no messages left after trimming" } + pipelineStatus.countSkippedBatches(commonStreamName.toString()) } - pipelineStatus.fetchedSendDownstream(streamName.toString()) + pipelineStatus.fetchedSendDownstream(commonStreamName.toString()) - pipelineStatus.countFetchedBytes(streamName.toString(), batch.batchSize.toLong()) - pipelineStatus.countFetchedBatches(streamName.toString()) - pipelineStatus.countFetchedMessages(streamName.toString(), trimmedMessages.size.toLong()) + pipelineStatus.countFetchedBytes(commonStreamName.toString(), batch.batchSize.toLong()) + pipelineStatus.countFetchedBatches(commonStreamName.toString()) + pipelineStatus.countFetchedMessages(commonStreamName.toString(), trimmedMessages.size.toLong()) pipelineStatus.countSkippedMessages( - streamName.toString(), batch.messageCount - trimmedMessages.size.toLong() + commonStreamName.toString(), batch.messageCount - trimmedMessages.size.toLong() ) } else { - logger.debug { "Exiting $streamName loop. External scope active: '${externalScope.isActive}', LastMessageTrimmed: '$isLastMessageTrimmed'" } + logger.debug { "Exiting $commonStreamName loop. External scope active: '${externalScope.isActive}', LastMessageTrimmed: '$isLastMessageTrimmed'" } break } } @@ -260,7 +331,7 @@ class MessageExtractor( Instant.ofEpochMilli(Long.MIN_VALUE) } - logger.debug { "no more data for stream $streamName (lastId=${lastElement.toString()} lastTimestamp=${lastTimestamp})" } + logger.debug { "no more data for stream $commonStreamName (lastId=${lastElement.toString()} lastTimestamp=${lastTimestamp})" } } } } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageFilter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageFilter.kt index f08521a5..94364c7e 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageFilter.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageFilter.kt @@ -25,7 +25,7 @@ import kotlinx.coroutines.* class MessageFilter( context: Context, searchRequest: SseMessageSearchRequest, - streamName: StreamName?, + streamName: CommonStreamName?, externalScope: CoroutineScope, previousComponent: PipelineComponent?, messageFlowCapacity: Int @@ -49,7 +49,7 @@ class MessageFilter( ) : this( pipelineComponent.context, pipelineComponent.searchRequest, - pipelineComponent.streamName, + pipelineComponent.commonStreamName, pipelineComponent.externalScope, pipelineComponent, messageFlowCapacity diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/helpers/StreamHolder.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/helpers/StreamHolder.kt index 793becd0..075fa759 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/helpers/StreamHolder.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/helpers/StreamHolder.kt @@ -35,7 +35,7 @@ class StreamHolder(val messageStream: PipelineComponent<*, *, *, *>) { .register() } - private val streamName = messageStream.streamName?.toString() + private val streamName = messageStream.commonStreamName?.toString() private val labelMetric = pullFromStream.labels(streamName) var currentElement: PipelineStepObject? = null @@ -53,7 +53,7 @@ class StreamHolder(val messageStream: PipelineComponent<*, *, *, *>) { if (previousElement == null && currentElement == null) { currentElement = it } else { - throw InvalidInitializationException("StreamHolder ${messageStream.streamName} already initialized") + throw InvalidInitializationException("StreamHolder ${messageStream.commonStreamName} already initialized") } } } @@ -68,7 +68,7 @@ class StreamHolder(val messageStream: PipelineComponent<*, *, *, *>) { previousElement = currentElement currentElement = newElement } - ?: throw InvalidInitializationException("StreamHolder ${messageStream.streamName} need initialization") + ?: throw InvalidInitializationException("StreamHolder ${messageStream.commonStreamName} need initialization") } }.let { labelMetric.observe(it.duration.toDouble(DurationUnit.SECONDS)) @@ -86,7 +86,7 @@ class StreamHolder(val messageStream: PipelineComponent<*, *, *, *>) { while (isNeedSearchResumeId(sequenceFilter)) { pop() } - val streamName = checkNotNull(messageStream.streamName) { "stream name is null" } + val streamName = checkNotNull(messageStream.commonStreamName) { "stream name is null" } val stepObject = currentElement return if (stepObject != null && stepObject.streamEmpty) { StreamInfo(streamName, null) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt index 671bb51a..3394d33d 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/CradleService.kt @@ -20,8 +20,16 @@ package com.exactpro.th2.rptdataprovider.services.cradle import com.exactpro.cradle.BookId import com.exactpro.cradle.CradleManager import com.exactpro.cradle.CradleStorage +import com.exactpro.cradle.Direction +import com.exactpro.cradle.Direction.FIRST +import com.exactpro.cradle.Direction.SECOND +import com.exactpro.cradle.FrameType +import com.exactpro.cradle.PageInfo import com.exactpro.cradle.cassandra.CassandraStorageSettings +import com.exactpro.cradle.counters.Interval +import com.exactpro.cradle.messages.GroupedMessageFilter import com.exactpro.cradle.messages.MessageFilter +import com.exactpro.cradle.messages.StoredGroupedMessageBatch import com.exactpro.cradle.messages.StoredMessage import com.exactpro.cradle.messages.StoredMessageBatch import com.exactpro.cradle.messages.StoredMessageId @@ -33,6 +41,7 @@ import com.exactpro.th2.rptdataprovider.convertToString import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration import com.exactpro.th2.rptdataprovider.logMetrics import com.exactpro.th2.rptdataprovider.logTime +import com.exactpro.th2.rptdataprovider.toGroupedMessageFilter import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.channels.Channel @@ -40,26 +49,107 @@ import kotlinx.coroutines.future.await import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import io.github.oshai.kotlinlogging.KotlinLogging +import org.ehcache.Cache +import org.ehcache.config.builders.CacheConfigurationBuilder +import org.ehcache.config.builders.CacheManagerBuilder +import org.ehcache.config.builders.ResourcePoolsBuilder +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors -open class CradleService(configuration: Configuration, cradleManager: CradleManager) { +class CradleService(configuration: Configuration, cradleManager: CradleManager) { companion object { private val K_LOGGER = KotlinLogging.logger {} + private val CACHE_MANAGER = CacheManagerBuilder.newCacheManagerBuilder().build(true) + private val GET_MAP_ALIAS_TO_GROUP_ASYNC_METRIC: Metrics = + Metrics("map_session_alias_to_group_async", "findPage;getSessionGroups;getGroupedMessageBatches") + private val CRADLE_MIN_TIMESTAMP: Instant = Instant.ofEpochMilli(0L) + // FIXME: minusMillis(FrameType.values().asSequence().map(FrameType::getMillisInFrame).maxOf(Long::toLong)) is used as workaround for `long overflow` problem in CradleStorage.getMessageCountersAsync method. + // CradleStorage.getMessageCountersAsync add FrameType.getMillisInFrame to get the last next lest interval + // Cradle should support null value for from / to Interval properties + private val CRADLE_MAX_TIMESTAMP: Instant = Instant.ofEpochMilli(Long.MAX_VALUE).minusMillis( + FrameType.values().asSequence().map(FrameType::getMillisInFrame).maxOf(Long::toLong) + ) + private val GET_MESSAGES_BATCHES: Metrics = Metrics("get_messages_batches_async", "getMessagesBatchesAsync") private val GET_MESSAGE_ASYNC_METRIC: Metrics = Metrics("get_message_async", "getMessageAsync") private val GET_TEST_EVENTS_ASYNC_METRIC: Metrics = Metrics("get_test_events_async", "getTestEventsAsync") private val GET_TEST_EVENT_ASYNC_METRIC: Metrics = Metrics("get_test_event_async", "getTestEventAsync") private val GET_STREAMS_METRIC: Metrics = Metrics("get_streams", "getStreams") + + private fun createInterval(from: Instant?, to: Instant?): Interval { + return Interval( + if (from == null || from.isBefore(CRADLE_MIN_TIMESTAMP)) CRADLE_MIN_TIMESTAMP else from, + if (to == null || to.isAfter(CRADLE_MAX_TIMESTAMP)) CRADLE_MAX_TIMESTAMP else to, + ) + } + + // source interval changes to exclude intersection to next or previous page + private fun PageInfo.toInterval(): Interval = Interval( + started?.plusNanos(1) ?: CRADLE_MIN_TIMESTAMP, + ended?.minusNanos(1) ?: CRADLE_MAX_TIMESTAMP + ) + + private fun Interval.print(): String = "[$start, $end]" + private fun max(instantA: Instant, instantB: Instant):Instant = if (instantA.isAfter(instantB)) { + instantA + } else { + instantB + } + + private fun min(instantA: Instant, instantB: Instant):Instant = if (instantA.isBefore(instantB)) { + instantA + } else { + instantB + } + + private fun intersection(intervalA: Interval, intervalB: Interval): Interval { + require(!intervalA.start.isAfter(intervalB.end) + && !intervalB.start.isAfter(intervalA.end)) { + "${intervalA.print()}, ${intervalB.print()} intervals aren't intersection" + } + return Interval( + max(intervalA.start, intervalB.start), + min(intervalA.end, intervalB.end) + ) + } } + private val bookToCache = ConcurrentHashMap>() + + private val aliasToGroupCacheSize = configuration.aliasToGroupCacheSize.value.toLong() + private val cradleDispatcherPoolSize = configuration.cradleDispatcherPoolSize.value.toInt() - protected val storage: CradleStorage = cradleManager.storage + private val storage: CradleStorage = cradleManager.storage private val cradleDispatcher = Executors.newFixedThreadPool(cradleDispatcherPoolSize).asCoroutineDispatcher() + suspend fun getGroupedMessages( + scope: CoroutineScope, + filter: GroupedMessageFilter, + ): Channel { + val channel = Channel(1) + scope.launch { + withContext(cradleDispatcher) { + storage.getGroupedMessageBatches(filter).forEach { batch -> + K_LOGGER.trace { + "message batch has been received from the iterator, " + + "group: ${batch.group}, start: ${batch.firstTimestamp}, end: ${batch.lastTimestamp}" + } + channel.send(batch) + K_LOGGER.trace { + "message batch has been sent to the channel, " + + "group: ${batch.group}, start: ${batch.firstTimestamp}, end: ${batch.lastTimestamp}" + } + } + } + } + return channel + } + // FIXME: // It is not correct to create scope manually inside the suspend function // If the function is going to launch extra coroutines it should accept a coroutine scope. @@ -179,11 +269,205 @@ open class CradleService(configuration: Configuration, cradleManager: CradleMana } ?: emptyList() } - protected open suspend fun getMessageBatches( + private suspend fun getMessageBatches( + filter: MessageFilter + ): Sequence = + getSessionGroup(filter)?.let { group -> + val groupedMessageFilter = filter.toGroupedMessageFilter(group).also { + K_LOGGER.debug { "Start searching group batches by $it" } + } + storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asSequence() + .mapNotNull { batch -> + val messages = batch.messages.filter { message -> + filter.sessionAlias == message.sessionAlias + && filter.direction == message.direction + && filter.timestampFrom?.check(message.timestamp) ?: true + && filter.timestampTo?.check(message.timestamp) ?: true + } + if (messages.isEmpty()) { + null + } else { + StoredMessageBatch( + messages, + storage.findPage(batch.bookId, batch.recDate).id, + batch.recDate + ) + } + } + + } ?: emptySequence() + + private suspend fun getMessageBatches( + id: StoredMessageId + ): StoredMessage? = getSessionGroup(id)?.run { + storage.getGroupedMessageBatches(GroupedMessageFilter.builder().apply { + bookId(id.bookId) + timestampFrom().isGreaterThanOrEqualTo(id.timestamp) + timestampTo().isLessThanOrEqualTo(id.timestamp) + groupName(this@run) + }.build()).asSequence() + .flatMap(StoredGroupedMessageBatch::getMessages) + .filter { message -> id == message.id } + .firstOrNull() + } + + suspend fun getSessionGroup( + bookId: BookId, + sessionAlias: String, + from: Instant?, + to: Instant? + ): String? = withContext(cradleDispatcher) { + val cache = bookToCache.computeIfAbsent(bookId.name) { + CACHE_MANAGER.createCache( + "aliasToGroup(${bookId.name})", + CacheConfigurationBuilder.newCacheConfigurationBuilder( + String::class.java, + String::class.java, + ResourcePoolsBuilder.heap(aliasToGroupCacheSize) + ).build() + ) + } + cache.get(sessionAlias) ?: searchSessionGroup(bookId, from, to, sessionAlias, cache) + } + + private suspend fun searchSessionGroup( + bookId: BookId, + from: Instant?, + to: Instant?, + sessionAlias: String, + cache: Cache + ): String? = logMetrics(GET_MAP_ALIAS_TO_GROUP_ASYNC_METRIC) { + logTime("getSessionGroup (book=${bookId.name}, from=${from}, to=${to}, session alias=${sessionAlias})") { + val interval = createInterval(from, to) + + val sessionGroup: String? = eachSessionGroupByStatistics(sessionAlias, interval, bookId, cache) + ?: eachSessionGroupByPage(sessionAlias, interval, bookId, cache) + + if (sessionGroup == null) { + cache.get(sessionAlias)?.let { group -> + K_LOGGER.debug { "Another coroutine has found '$sessionAlias' session alias to '$group' group pair" } + return@logTime group + } ?: error("Mapping between a session group and the '${sessionAlias}' alias isn't found, book: ${bookId.name}, [from: $from, to: $to]") + } else { + return@logTime sessionGroup + } + } + } + + private suspend fun eachSessionGroupByPage( + sessionAlias: String, + interval: Interval, + bookId: BookId, + cache: Cache + ): String? { + K_LOGGER.debug { "Start searching '$sessionAlias' session alias in cradle in [${interval.start}, ${interval.end}] interval by page" } + val pageInterval = storage.getPagesAsync(bookId, interval).await().asSequence() + .map { pageInfo -> pageInfo.toInterval() } + .filter { pageInterval -> + storage.getSessionAliases(bookId, pageInterval).asSequence() + .any { alias -> alias == sessionAlias } + }.firstOrNull() + + cache.get(sessionAlias)?.let { group -> + K_LOGGER.debug { "Another coroutine has found '$sessionAlias' session alias to '$group' group pair" } + return group + } + + if (pageInterval == null) { + K_LOGGER.info { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval" } + return null + } + + val targetInterval = intersection(interval, pageInterval) + + return searchSessionGroupByGroupedMessage(sessionAlias, targetInterval, bookId, cache).also { + if (it == null) { + K_LOGGER.warn { "Mapping between a session group and the '${sessionAlias}' alias isn't found by page, book: ${bookId.name}, interval: [${interval.start}, ${interval.end}] " } + } + } + } + + private suspend fun eachSessionGroupByStatistics( + sessionAlias: String, + interval: Interval, + bookId: BookId, + cache: Cache + ): String? { + K_LOGGER.debug { "Start searching '$sessionAlias' session alias counters in cradle in [${interval.start}, ${interval.end}] interval, ${FrameType.TYPE_100MS} frame type" } + val counter = getCounterSample(bookId, sessionAlias, FIRST, interval) + ?: getCounterSample(bookId, sessionAlias, SECOND, interval) + + cache.get(sessionAlias)?.let { group -> + K_LOGGER.debug { "Another coroutine has found '$sessionAlias' session alias to '$group' group pair" } + return group + } + + if (counter == null) { + K_LOGGER.info { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval" } + return null + } + + val shortInterval = Interval(counter.frameStart, counter.frameStart.plusMillis(FrameType.TYPE_100MS.millisInFrame)) + return searchSessionGroupByGroupedMessage(sessionAlias, shortInterval, bookId, cache).also { + if (it == null) { + K_LOGGER.warn { "Mapping between a session group and the '${sessionAlias}' alias isn't found by statistics, book: ${bookId.name}, interval: [${interval.start}, ${interval.end}] " } + } + } + } + + private suspend fun getCounterSample( + bookId: BookId, + sessionAlias: String, + direction: Direction, + interval: Interval, + ) = storage.getMessageCountersAsync(bookId, sessionAlias, direction, FrameType.TYPE_100MS, interval) + .await().asSequence().firstOrNull() + + private suspend fun searchSessionGroupByGroupedMessage( + sessionAlias: String, + shortInterval: Interval, + bookId: BookId, + cache: Cache + ): String? { + K_LOGGER.debug { "Start searching session group by '$sessionAlias' alias in cradle in (${shortInterval.start}, ${shortInterval.end}) interval" } + storage.getSessionGroupsAsync(bookId, shortInterval).await().asSequence() + .flatMap { group -> + storage.getGroupedMessageBatches( + GroupedMessageFilter.builder() + .bookId(bookId) + .timestampFrom().isGreaterThanOrEqualTo(shortInterval.start) + .timestampTo().isLessThanOrEqualTo(shortInterval.end) + .groupName(group) + .build() + ).asSequence() + }.forEach searchInBatch@{ batch -> + cache.get(sessionAlias)?.let { group -> + K_LOGGER.debug { "Another coroutine has found '$sessionAlias' session alias to '$group' group pair" } + return group + } + K_LOGGER.debug { "Search session group by '$sessionAlias' alias in grouped batch" } + batch.messages.forEach { message -> + cache.putIfAbsent(message.sessionAlias, batch.group) ?: run { + K_LOGGER.info { "Put '${message.sessionAlias}' session alias to '${batch.group}' group to cache" } + if (sessionAlias == message.sessionAlias) { + K_LOGGER.debug { "Found '${message.sessionAlias}' session alias to '${batch.group}' group pair" } + } + } + } + } + return null + } + + private suspend fun getSessionGroup( filter: MessageFilter - ): Sequence = storage.getMessageBatchesAsync(filter).await().asSequence() + ): String? = getSessionGroup( + filter.bookId, + filter.sessionAlias, + filter.timestampFrom?.value, + filter.timestampTo?.value + ) - protected open suspend fun getMessageBatches( + private suspend fun getSessionGroup( id: StoredMessageId - ): StoredMessage? = storage.getMessageAsync(id).await() + ): String? = getSessionGroup(id.bookId, id.sessionAlias, id.timestamp, id.timestamp) } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/MessageGroupCradleService.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/MessageGroupCradleService.kt deleted file mode 100644 index 0c79ba5c..00000000 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/services/cradle/MessageGroupCradleService.kt +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Copyright 2024 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package com.exactpro.th2.rptdataprovider.services.cradle - -import com.exactpro.cradle.BookId -import com.exactpro.cradle.CradleManager -import com.exactpro.cradle.Direction -import com.exactpro.cradle.FrameType -import com.exactpro.cradle.PageInfo -import com.exactpro.cradle.counters.Interval -import com.exactpro.cradle.messages.GroupedMessageFilter -import com.exactpro.cradle.messages.MessageFilter -import com.exactpro.cradle.messages.StoredGroupedMessageBatch -import com.exactpro.cradle.messages.StoredMessage -import com.exactpro.cradle.messages.StoredMessageBatch -import com.exactpro.cradle.messages.StoredMessageId -import com.exactpro.th2.rptdataprovider.Metrics -import com.exactpro.th2.rptdataprovider.entities.configuration.Configuration -import com.exactpro.th2.rptdataprovider.logMetrics -import com.exactpro.th2.rptdataprovider.logTime -import com.exactpro.th2.rptdataprovider.toGroupedMessageFilter -import kotlinx.coroutines.future.await -import io.github.oshai.kotlinlogging.KotlinLogging -import org.ehcache.Cache -import org.ehcache.config.builders.CacheConfigurationBuilder -import org.ehcache.config.builders.CacheManagerBuilder -import org.ehcache.config.builders.ResourcePoolsBuilder -import java.time.Instant -import java.util.concurrent.ConcurrentHashMap - -class MessageGroupCradleService( - configuration: Configuration, - cradleManager: CradleManager -): CradleService( - configuration, - cradleManager, -) { - - companion object { - private val K_LOGGER = KotlinLogging.logger {} - - private val CACHE_MANAGER = CacheManagerBuilder.newCacheManagerBuilder().build(true) - - private val GET_MAP_ALIAS_TO_GROUP_ASYNC_METRIC: Metrics = - Metrics("map_session_aslias_to_group_async", "findPage;getSessionGroups;getGroupedMessageBatches") - - private fun createInterval(from: Instant?, to: Instant?): Interval { - return Interval( - if (from == null || from.isBefore(CRADLE_MIN_TIMESTAMP)) CRADLE_MIN_TIMESTAMP else from, - if (to == null || to.isAfter(CRADLE_MAX_TIMESTAMP)) CRADLE_MAX_TIMESTAMP else to, - ) - } - - private val CRADLE_MIN_TIMESTAMP: Instant = Instant.ofEpochMilli(0L) - // FIXME: minusMillis(FrameType.values().asSequence().map(FrameType::getMillisInFrame).maxOf(Long::toLong)) is used as workaround for `long overflow` problem in CradleStorage.getMessageCountersAsync method. - // CradleStorage.getMessageCountersAsync add FrameType.getMillisInFrame to get the last next lest interval - // Cradle should support null value for from / to Interval properties - private val CRADLE_MAX_TIMESTAMP: Instant = Instant.ofEpochMilli(Long.MAX_VALUE).minusMillis(FrameType.values().asSequence().map(FrameType::getMillisInFrame).maxOf(Long::toLong)) - - // source interval changes to exclude intersection to next or previous page - private fun PageInfo.toInterval(): Interval = Interval( - started?.plusNanos(1) ?: CRADLE_MIN_TIMESTAMP, - ended?.minusNanos(1) ?: CRADLE_MAX_TIMESTAMP - ) - - private fun Interval.print(): String = "[$start, $end]" - private fun max(instantA: Instant, instantB: Instant):Instant = if (instantA.isAfter(instantB)) { - instantA - } else { - instantB - } - - private fun min(instantA: Instant, instantB: Instant):Instant = if (instantA.isBefore(instantB)) { - instantA - } else { - instantB - } - - private fun intersection(intervalA: Interval, intervalB: Interval): Interval { - require(!intervalA.start.isAfter(intervalB.end) - && !intervalB.start.isAfter(intervalA.end)) { - "${intervalA.print()}, ${intervalB.print()} intervals aren't intersection" - } - return Interval(max(intervalA.start, intervalB.start), min(intervalA.end, intervalB.end)) - } - } - - private val aliasToGroupCacheSize = configuration.aliasToGroupCacheSize.value.toLong() - - private val bookToCache = ConcurrentHashMap>() - - override suspend fun getMessageBatches( - filter: MessageFilter - ): Sequence = - getSessionGroupSuspend(filter)?.let { group -> - val groupedMessageFilter = filter.toGroupedMessageFilter(group).also { - K_LOGGER.debug { "Start searching group batches by $it" } - } - storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asSequence() - .mapNotNull { batch -> - val messages = batch.messages.filter { message -> - filter.sessionAlias == message.sessionAlias - && filter.direction == message.direction - && filter.timestampFrom?.check(message.timestamp) ?: true - && filter.timestampTo?.check(message.timestamp) ?: true - } - if (messages.isEmpty()) { - null - } else { - StoredMessageBatch( - messages, - storage.findPage(batch.bookId, batch.recDate).id, - batch.recDate - ) - } - } - - } ?: emptySequence() - - override suspend fun getMessageBatches( - id: StoredMessageId - ): StoredMessage? = getSessionGroupSuspend(id)?.run { - storage.getGroupedMessageBatches(GroupedMessageFilter.builder().apply { - bookId(id.bookId) - timestampFrom().isGreaterThanOrEqualTo(id.timestamp) - timestampTo().isLessThanOrEqualTo(id.timestamp) - groupName(this@run) - }.build()).asSequence() - .flatMap(StoredGroupedMessageBatch::getMessages) - .filter { message -> id == message.id } - .firstOrNull() - } - - private suspend fun getSessionGroupSuspend( - bookId: BookId, - from: Instant?, - to: Instant?, - sessionAlias: String, - direction: Direction - ): String? { - val cache = bookToCache.computeIfAbsent(bookId.name) { - CACHE_MANAGER.createCache( - "aliasToGroup(${bookId.name})", - CacheConfigurationBuilder.newCacheConfigurationBuilder( - String::class.java, - String::class.java, - ResourcePoolsBuilder.heap(aliasToGroupCacheSize) - ).build() - ) - } - return cache.get(sessionAlias) ?: searchSessionGroup(bookId, from, to, sessionAlias, direction, cache) - } - - private suspend fun searchSessionGroup( - bookId: BookId, - from: Instant?, - to: Instant?, - sessionAlias: String, - direction: Direction, - cache: Cache - ): String? = logMetrics(GET_MAP_ALIAS_TO_GROUP_ASYNC_METRIC) { - logTime("getSessionGroup (book=${bookId.name}, from=${from}, to=${to}, session alias=${sessionAlias})") { - val interval = createInterval(from, to) - - val sessionGroup: String? = seachSessionGorupByStatistics(sessionAlias, interval, bookId, direction, cache) - ?: seachSessionGorupByPage(sessionAlias, interval, bookId, cache) - - if (sessionGroup == null) { - cache.get(sessionAlias)?.let { group -> - K_LOGGER.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" } - return@logTime group - } ?: error("Mapping between a session group and the '${sessionAlias}' alias isn't found, book: ${bookId.name}, [from: $from, to: $to]") - } else { - return@logTime sessionGroup - } - } - } - - private suspend fun seachSessionGorupByStatistics( - sessionAlias: String, - interval: Interval, - bookId: BookId, - direction: Direction, - cache: Cache - ): String? { - K_LOGGER.debug { "Strat searching '$sessionAlias' session alias counters in cradle in [${interval.start}, ${interval.end}] interval, ${FrameType.TYPE_100MS} frame type" } - val counter = storage.getMessageCountersAsync(bookId, sessionAlias, direction, FrameType.TYPE_100MS, interval) - .await().asSequence().firstOrNull() - - cache.get(sessionAlias)?.let { group -> - K_LOGGER.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" } - return group - } - - if (counter == null) { - K_LOGGER.info { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval" } - return null - } - - val shortInterval = Interval(counter.frameStart, counter.frameStart.plusMillis(FrameType.TYPE_100MS.millisInFrame)) - return seachSessionGorupByGroupedMessage(sessionAlias, shortInterval, bookId, cache).also { - if (it == null) { - K_LOGGER.warn { "Mapping between a session group and the '${sessionAlias}' alias isn't found by statistics, book: ${bookId.name}, interval: [${interval.start}, ${interval.end}] " } - } - } - } - - private suspend fun seachSessionGorupByPage( - sessionAlias: String, - interval: Interval, - bookId: BookId, - cache: Cache - ): String? { - K_LOGGER.debug { "Strat searching '$sessionAlias' session alias in cradle in [${interval.start}, ${interval.end}] interval by page" } - val pageInterval = storage.getPagesAsync(bookId, interval).await().asSequence() - .map { pageInfo -> pageInfo.toInterval() } - .filter { pageInterval -> - storage.getSessionAliases(bookId, pageInterval).asSequence() - .any { alias -> alias == sessionAlias } - }.firstOrNull() - - cache.get(sessionAlias)?.let { group -> - K_LOGGER.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" } - return group - } - - if (pageInterval == null) { - K_LOGGER.info { "'$sessionAlias' session alias isn't in [${interval.start}, ${interval.end}] interval" } - return null - } - - val targetInterval = intersection(interval, pageInterval) - - return seachSessionGorupByGroupedMessage(sessionAlias, targetInterval, bookId, cache).also { - if (it == null) { - K_LOGGER.warn { "Mapping between a session group and the '${sessionAlias}' alias isn't found by page, book: ${bookId.name}, interval: [${interval.start}, ${interval.end}] " } - } - } - } - - private suspend fun seachSessionGorupByGroupedMessage( - sessionAlias: String, - shortInterval: Interval, - bookId: BookId, - cache: Cache - ): String? { - K_LOGGER.debug { "Strat searching session group by '$sessionAlias' alias in cradle in (${shortInterval.start}, ${shortInterval.end}) interval" } - storage.getSessionGroupsAsync(bookId, shortInterval).await().asSequence() - .flatMap { group -> - storage.getGroupedMessageBatches( - GroupedMessageFilter.builder() - .bookId(bookId) - .timestampFrom().isGreaterThanOrEqualTo(shortInterval.start) - .timestampTo().isLessThanOrEqualTo(shortInterval.end) - .groupName(group) - .build() - ).asSequence() - }.forEach searchInBatch@{ batch -> - cache.get(sessionAlias)?.let { group -> - K_LOGGER.debug { "Another coroutine has dound '$sessionAlias' session alias to '$group' group pair" } - return group - } - K_LOGGER.debug { "Search session group by '$sessionAlias' alias in grouped batch" } - batch.messages.forEach { message -> - cache.putIfAbsent(message.sessionAlias, batch.group) ?: run { - K_LOGGER.info { "Put '${message.sessionAlias}' session alias to '${batch.group}' group to cache" } - if (sessionAlias == message.sessionAlias) { - K_LOGGER.debug { "Found '${message.sessionAlias}' session alias to '${batch.group}' group pair" } - } - } - } - } - return null - } - - private suspend fun getSessionGroupSuspend( - filter: MessageFilter - ): String? = getSessionGroupSuspend( - filter.bookId, - filter.timestampFrom?.value, - filter.timestampTo?.value, - filter.sessionAlias, - filter.direction - ) - - private suspend fun getSessionGroupSuspend( - id: StoredMessageId - ): String? = getSessionGroupSuspend(id.bookId, id.timestamp, id.timestamp, id.sessionAlias, id.direction) -} diff --git a/src/test/kotlin/handlers/messages/ExtractorTest.kt b/src/test/kotlin/handlers/messages/ExtractorTest.kt index 1e48301a..509eb811 100644 --- a/src/test/kotlin/handlers/messages/ExtractorTest.kt +++ b/src/test/kotlin/handlers/messages/ExtractorTest.kt @@ -68,7 +68,7 @@ class ExtractorTest { private val bookId = BookId("") private val fullStreamName = "${baseStreamName}:${streamDirection}" - private val streamNameObject = StreamName(baseStreamName, Direction.valueOf(streamDirection), BookId("")) + private val streamNameObject = StreamName(BookId(""), baseStreamName, Direction.valueOf(streamDirection)) inner class BorderTestParameters( val startTimestamp: Instant, diff --git a/src/test/kotlin/handlers/messages/MergerTest.kt b/src/test/kotlin/handlers/messages/MergerTest.kt index 9a3775da..fbe30d36 100644 --- a/src/test/kotlin/handlers/messages/MergerTest.kt +++ b/src/test/kotlin/handlers/messages/MergerTest.kt @@ -59,7 +59,7 @@ class MergerTest { private val fullStreamName = streamDirection.map { "${baseStreamName}:${it}" } private val streamNameObjects = streamDirection.map { - StreamName(baseStreamName, Direction.valueOf(it.uppercase()), BOOK) + StreamName(BOOK, baseStreamName, Direction.valueOf(it.uppercase())) } private val direction = "next"