diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt index 08da9fe9..c2f27e73 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt @@ -28,6 +28,7 @@ import com.exactpro.th2.rptdataprovider.Context import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName import com.exactpro.th2.rptdataprovider.entities.internal.EmptyPipelineObject import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch +import com.exactpro.th2.rptdataprovider.entities.internal.StreamPointer import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest import com.exactpro.th2.rptdataprovider.entities.responses.StoredMessageBatchWrapper import com.exactpro.th2.rptdataprovider.entities.sse.StreamWriter @@ -72,6 +73,17 @@ class MessageExtractor( REVERSE -> Instant::isAfter } + private val firstResumeId: Sequence.((StreamPointer) -> Instant) -> StreamPointer? = + when(order) { + DIRECT -> Sequence::minByOrNull + REVERSE -> Sequence::maxByOrNull + } + + private val emptyStreamTimestamp = when (order) { + DIRECT -> Instant.MAX + REVERSE -> Instant.MIN + } + init { externalScope.launch { try { @@ -85,17 +97,10 @@ class MessageExtractor( } } - private val StoredGroupedMessageBatch.orderedMessages: Collection - get() = when (order) { - DIRECT -> messages - REVERSE -> messagesReverse - } - override suspend fun processMessage() { coroutineScope { launch { while (this@coroutineScope.isActive) { - //FIXME: replace delay-based stream updates with synchronous updates from iterator lastTimestamp?.also { sendToChannel(EmptyPipelineObject(isStreamEmpty, lastElement, it).also { msg -> @@ -108,12 +113,7 @@ class MessageExtractor( val resumeFromId = request.resumeFromIdsList.asSequence() .filter { it.stream.name == commonStreamName.name } - .run { - when(request.searchDirection) { - BEFORE -> maxByOrNull { it.timestamp } - AFTER -> minByOrNull { it.timestamp } - } - } + .firstResumeId { it.timestamp } LOGGER.debug { "acquiring cradle iterator for stream $commonStreamName" } @@ -127,21 +127,26 @@ class MessageExtractor( request.startTimestamp, request.endTimestamp, ) + val cradleMessageIterable = context.cradleService.getGroupedMessages( this, - GroupedMessageFilterBuilder().apply { - bookId(commonStreamName.bookId) - groupName(sessionGroup) - order(order) - - if (order == REVERSE) { // default: DIRECT - request.startTimestamp?.let { timestampTo().isLessThanOrEqualTo(it) } - request.endTimestamp?.let { timestampFrom().isGreaterThan(it) } - } else { - request.startTimestamp?.let { timestampFrom().isGreaterThanOrEqualTo(it) } - request.endTimestamp?.let { timestampTo().isLessThan(it) } - } - }.build() + + GroupedMessageFilterBuilder() + .bookId(commonStreamName.bookId) + .groupName(sessionGroup) + .order(order) + .also { builder -> + when (order) { + DIRECT -> { + request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) } + request.endTimestamp?.let { builder.timestampTo().isLessThan(it) } + } + REVERSE -> { + request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) } + request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) } + } + } + }.build() ) LOGGER.debug { "cradle iterator has been built for session group: $sessionGroup, alias: $commonStreamName" } @@ -164,9 +169,9 @@ class MessageExtractor( batch.messages.filterTo(filteredMessages) { it.sessionAlias == commonStreamName.name } filteredMessages.sortBy { it.timestamp } - when (request.searchDirection) { - AFTER -> filteredMessages - BEFORE -> filteredMessages.asReversed() + when (order) { + DIRECT -> filteredMessages + REVERSE -> filteredMessages.asReversed() } } @@ -199,8 +204,9 @@ class MessageExtractor( } LOGGER.trace { - val firstMessage = batch.orderedMessages.firstOrNull() - val lastMessage = batch.orderedMessages.lastOrNull() + val messages = if (order == REVERSE) batch.messagesReverse else batch.messages + val firstMessage = messages.firstOrNull() + val lastMessage = messages.lastOrNull() "batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messageCount} messages left (firstId=${firstMessage?.id?.sequence} firstTimestamp=${firstMessage?.timestamp} lastId=${lastMessage?.id?.sequence} lastTimestamp=${lastMessage?.timestamp})" } @@ -244,10 +250,7 @@ class MessageExtractor( } isStreamEmpty = true - lastTimestamp = when (order) { - DIRECT -> Instant.MAX - REVERSE -> Instant.MIN - } + lastTimestamp = emptyStreamTimestamp LOGGER.debug { "no more data for stream $commonStreamName (lastId=${lastElement.toString()} lastTimestamp=${lastTimestamp})" } } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt index a8656de6..f7b42ac0 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/StreamMerger.kt @@ -30,7 +30,13 @@ import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus import com.exactpro.th2.rptdataprovider.handlers.messages.helpers.MultipleStreamHolder import com.exactpro.th2.rptdataprovider.isAfterOrEqual import com.exactpro.th2.rptdataprovider.isBeforeOrEqual -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.isActive +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.launch +import kotlinx.coroutines.delay import io.github.oshai.kotlinlogging.KotlinLogging import java.time.Instant import kotlin.coroutines.coroutineContext @@ -49,30 +55,32 @@ class StreamMerger( ) : PipelineComponent(context, searchRequest, externalScope, messageFlowCapacity = messageFlowCapacity) { private val messageStreams = MultipleStreamHolder(pipelineStreams) private var resultCountLimit = searchRequest.resultCountLimit - private val processJob: Job - init { - processJob = externalScope.launch { processMessage() } + private val messageFilterByTimestamp: (PipelineStepObject, PipelineStepObject) -> Boolean = when(searchRequest.searchDirection) { + TimeRelation.AFTER -> { msg1, msg2 -> msg1.lastScannedTime.isBefore(msg2.lastScannedTime) } + TimeRelation.BEFORE -> { msg1, msg2 -> msg2.lastScannedTime.isBefore(msg1.lastScannedTime) } } - private fun timestampInRange(pipelineStepObject: PipelineStepObject): Boolean { - return pipelineStepObject.lastScannedTime.let { timestamp -> - if (searchRequest.searchDirection == TimeRelation.AFTER) { - searchRequest.endTimestamp == null || timestamp.isBeforeOrEqual(searchRequest.endTimestamp) - } else { - searchRequest.endTimestamp == null || timestamp.isAfterOrEqual(searchRequest.endTimestamp) - } - } + private val timestampFilter = when (searchRequest.searchDirection) { + TimeRelation.AFTER -> Instant::isBeforeOrEqual + TimeRelation.BEFORE -> Instant::isAfterOrEqual } - private fun inTimeRange(pipelineStepObject: PipelineStepObject): Boolean { - return if (pipelineStepObject !is EmptyPipelineObject) { - timestampInRange(pipelineStepObject) - } else { - true - } + private val sequenceFilter: (Long, Long) -> Boolean = when(searchRequest.searchDirection) { + TimeRelation.AFTER -> { current, prev -> current > prev } + TimeRelation.BEFORE -> { current, prev -> current < prev } } + private val processJob: Job + init { + processJob = externalScope.launch { processMessage() } + } + + private fun inTimeRange(pipelineStepObject: PipelineStepObject): Boolean = + pipelineStepObject is EmptyPipelineObject + || searchRequest.endTimestamp == null + || timestampFilter(pipelineStepObject.lastScannedTime, searchRequest.endTimestamp) + private suspend fun keepAliveGenerator() { while (coroutineContext.isActive) { val scannedObjectCount = messageStreams.getScannedObjectCount() @@ -140,36 +148,11 @@ class StreamMerger( } } - private fun isLess(firstMessage: PipelineStepObject, secondMessage: PipelineStepObject): Boolean { - return firstMessage.lastScannedTime.isBefore(secondMessage.lastScannedTime) - } - - private fun isGreater(firstMessage: PipelineStepObject, secondMessage: PipelineStepObject): Boolean { - return firstMessage.lastScannedTime.isAfter(secondMessage.lastScannedTime) - } - - private suspend fun getNextMessage(): PipelineStepObject { - return coroutineScope { - - val streams = - if (LOGGER.isTraceEnabled()) - messageStreams.getLoggingStreamInfo() - else null - - let { - if (searchRequest.searchDirection == TimeRelation.AFTER) { - messageStreams.selectMessage { new, old -> - isLess(new, old) - } - } else { - messageStreams.selectMessage { new, old -> - isGreater(new, old) - } - } - }.also { - LOGGER.trace { - "selected ${it.lastProcessedId} - ${it.javaClass.kotlin}-${it.javaClass.hashCode()} ${it.lastScannedTime} out of [${streams}]" - } + private suspend fun getNextMessage(): PipelineStepObject = coroutineScope { + val streams = if (LOGGER.isTraceEnabled()) messageStreams.getLoggingStreamInfo() else null + messageStreams.selectMessage(messageFilterByTimestamp).also { + LOGGER.trace { + "selected ${it.lastProcessedId} - ${it.javaClass.kotlin}-${it.javaClass.hashCode()} ${it.lastScannedTime} out of [${streams}]" } } } @@ -178,13 +161,7 @@ class StreamMerger( LOGGER.info { "Getting streams info" } processJob.join() LOGGER.debug { "Merge job is finished" } - return messageStreams.getStreamsInfo { current, prev -> - if (searchRequest.searchDirection == TimeRelation.AFTER) { - current > prev - } else { - current < prev - } - } + return messageStreams.getStreamsInfo(sequenceFilter) } companion object {