diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt index 5ae2fb82..9617ea60 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ package com.exactpro.th2.rptdataprovider.entities.requests import com.exactpro.cradle.Direction import com.exactpro.cradle.BookId import com.exactpro.cradle.TimeRelation +import com.exactpro.cradle.TimeRelation.AFTER +import com.exactpro.cradle.TimeRelation.BEFORE import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.common.util.toInstant import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest @@ -96,7 +98,7 @@ data class SseMessageSearchRequest( searchDirection = request.searchDirection.let { when (it) { - PREVIOUS -> TimeRelation.BEFORE + PREVIOUS -> BEFORE else -> searchDirection } }, @@ -138,7 +140,7 @@ data class SseMessageSearchRequest( constructor(parameters: Map>, filterPredicate: FilterPredicate>) : this( parameters = parameters, filterPredicate = filterPredicate, - searchDirection = TimeRelation.AFTER + searchDirection = AFTER ) constructor( @@ -148,19 +150,24 @@ data class SseMessageSearchRequest( ) : this( request = request, filterPredicate = filterPredicate, - searchDirection = TimeRelation.AFTER, + searchDirection = AFTER, bookId = bookId ) private fun checkEndTimestamp() { if (endTimestamp == null || startTimestamp == null) return - if (searchDirection == TimeRelation.AFTER) { - if (startTimestamp.isAfter(endTimestamp)) - throw InvalidRequestException("startTimestamp: $startTimestamp > endTimestamp: $endTimestamp") - } else { - if (startTimestamp.isBefore(endTimestamp)) - throw InvalidRequestException("startTimestamp: $startTimestamp < endTimestamp: $endTimestamp") + when(searchDirection) { + BEFORE -> { + if (startTimestamp < endTimestamp) { + throw InvalidRequestException("startTimestamp: $startTimestamp < endTimestamp: $endTimestamp") + } + } + AFTER -> { + if (startTimestamp > endTimestamp) { + throw InvalidRequestException("startTimestamp: $startTimestamp > endTimestamp: $endTimestamp") + } + } } } @@ -176,7 +183,26 @@ data class SseMessageSearchRequest( } private fun checkTimestampAndId() { - if (startTimestamp != null && resumeFromIdsList.isNotEmpty()) + if (startTimestamp != null && resumeFromIdsList.isNotEmpty()) { + when(searchDirection) { + BEFORE -> { + val pointers = resumeFromIdsList.filter { it.timestamp > startTimestamp } + if (pointers.isNotEmpty()) { + throw InvalidRequestException( + "You cannot specify resume Ids $pointers with timestamp greater than startTimestamp $startTimestamp" + ) + } + } + AFTER -> { + val pointers = resumeFromIdsList.filter { it.timestamp < startTimestamp } + if (pointers.isNotEmpty()) { + throw InvalidRequestException( + "You cannot specify resume Ids $pointers with timestamp less than startTimestamp $startTimestamp" + ) + } + } + } + } throw InvalidRequestException("You cannot specify resume Id and start timestamp at the same time") } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt index 393fef9e..a8b3089b 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt @@ -36,6 +36,7 @@ import com.exactpro.th2.rptdataprovider.entities.internal.PipelineKeepAlive import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch import com.exactpro.th2.rptdataprovider.entities.internal.StreamEndObject import com.exactpro.th2.rptdataprovider.entities.internal.StreamName +import com.exactpro.th2.rptdataprovider.entities.internal.StreamPointer import com.exactpro.th2.rptdataprovider.entities.mappers.TimeRelationMapper import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest import com.exactpro.th2.rptdataprovider.entities.responses.StreamInfo @@ -74,6 +75,14 @@ abstract class SearchMessagesHandler( private val searchMessageRequests = Counter.build("th2_search_messages", "Count of search message requests") .register() + + private fun StreamPointer.toStoredMessageId() = StoredMessageId( + stream.bookId, + stream.name, + stream.direction, + timestamp, + sequence + ) } @@ -135,22 +144,9 @@ abstract class SearchMessagesHandler( "(startTimestamp must not be null or resumeFromIdsList must not be empty) and endTimestamp must be null in request: $request" } searchMessageRequests.inc() - val resumeId = request.resumeFromIdsList.firstOrNull() - val messageId = resumeId?.let { - StoredMessageId( - it.stream.bookId, - it.stream.name, - it.stream.direction, - it.timestamp, - it.sequence - ) - } - val resultRequest = resumeId?.let { - request.copy(startTimestamp = resumeId.timestamp) - } ?: request - val before = getIds(resultRequest, messageId, lookupLimitDays, BEFORE) - val after = getIds(resultRequest, messageId, lookupLimitDays, AFTER) + val before = getIds(request, request.resumeFromIdsList, lookupLimitDays, BEFORE) + val after = getIds(request, request.resumeFromIdsList, lookupLimitDays, AFTER) return mapOf( TimeRelationMapper.toHttp(BEFORE) to before, @@ -166,10 +162,15 @@ abstract class SearchMessagesHandler( private suspend fun getIds( request: SseMessageSearchRequest, - messageId: StoredMessageId?, + messageIds: List, lookupLimitDays: Long, searchDirection: TimeRelation ): MutableList { + val messageId = when(searchDirection) { + BEFORE -> messageIds.maxByOrNull(StreamPointer::timestamp) + AFTER -> messageIds.minByOrNull(StreamPointer::timestamp) + }?.toStoredMessageId() + val lookupLimit = request.lookupLimitDays ?: lookupLimitDays val resultRequest = request.run { val calculatedStartTimestamp = startTimestamp ?: messageId?.timestamp diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt index bc067cf8..f9475e6e 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt @@ -16,8 +16,10 @@ package com.exactpro.th2.rptdataprovider.handlers.messages -import com.exactpro.cradle.Order -import com.exactpro.cradle.TimeRelation +import com.exactpro.cradle.Order.DIRECT +import com.exactpro.cradle.Order.REVERSE +import com.exactpro.cradle.TimeRelation.AFTER +import com.exactpro.cradle.TimeRelation.BEFORE import com.exactpro.cradle.messages.MessageFilterBuilder import com.exactpro.cradle.messages.StoredMessage import com.exactpro.cradle.messages.StoredMessageBatch @@ -66,10 +68,9 @@ class MessageExtractor( private var lastElement: StoredMessageId? = null private var lastTimestamp: Instant? = null - private val order = if (request.searchDirection == TimeRelation.AFTER) { - Order.DIRECT - } else { - Order.REVERSE + private val order = when(request.searchDirection) { + BEFORE -> REVERSE + AFTER -> DIRECT } init { @@ -86,10 +87,9 @@ class MessageExtractor( } private fun getMessagesFromBatch(batch: StoredMessageBatch): Collection { - return if (order == Order.DIRECT) { - batch.messages - } else { - batch.messagesReverse + return when(order) { + DIRECT -> batch.messages + REVERSE -> batch.messagesReverse } } @@ -102,10 +102,9 @@ class MessageExtractor( if (resumeFromId?.sequence != null) { val startSeq = resumeFromId.sequence dropWhile { - if (order == Order.DIRECT) { - it.sequence < startSeq - } else { - it.sequence > startSeq + when(order) { + DIRECT -> it.sequence < startSeq + REVERSE -> it.sequence > startSeq } } } else { @@ -113,10 +112,9 @@ class MessageExtractor( } }.dropWhile { //trim messages that do not strictly match time filter request.startTimestamp?.let { startTimestamp -> - if (order == Order.DIRECT) { - it.timestamp.isBefore(startTimestamp) - } else { - it.timestamp.isAfter(startTimestamp) + when(order) { + DIRECT -> it.timestamp.isBefore(startTimestamp) + REVERSE -> it.timestamp.isAfter(startTimestamp) } } ?: false } @@ -124,10 +122,9 @@ class MessageExtractor( private fun trimMessagesListTail(message: StoredMessage): Boolean { return request.endTimestamp?.let { endTimestamp -> - if (order == Order.DIRECT) { - message.timestamp.isAfterOrEqual(endTimestamp) - } else { - message.timestamp.isBeforeOrEqual(endTimestamp) + when(order) { + DIRECT -> message.timestamp.isAfterOrEqual(endTimestamp) + REVERSE -> message.timestamp.isBeforeOrEqual(endTimestamp) } } ?: false } @@ -149,9 +146,14 @@ class MessageExtractor( streamName!! - val resumeFromId = request.resumeFromIdsList.firstOrNull { - it.stream.name == streamName.name && it.stream.direction == streamName.direction - } + val resumeFromId = request.resumeFromIdsList.asSequence() + .filter { it.stream.name == streamName.name && it.stream.direction == streamName.direction } + .run { + when(order) { + DIRECT -> minByOrNull(StreamPointer::sequence) + REVERSE -> maxByOrNull(StreamPointer::sequence) + } + } logger.debug { "acquiring cradle iterator for stream $streamName" } @@ -170,20 +172,22 @@ class MessageExtractor( .also { builder -> if (resumeFromId != null) { builder.sequence().let { - if (order == Order.DIRECT) { - it.isGreaterThanOrEqualTo(resumeFromId.sequence) - } else { - it.isLessThanOrEqualTo(resumeFromId.sequence) + when(order) { + DIRECT -> it.isGreaterThanOrEqualTo(resumeFromId.sequence) + REVERSE -> it.isLessThanOrEqualTo(resumeFromId.sequence) } } } // always need to make sure that we send messages within the specified timestamp (in case the resume ID points to the past) - if (order == Order.DIRECT) { - request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) } - request.endTimestamp?.let { builder.timestampTo().isLessThan(it) } - } else { - request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) } - request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) } + when(order) { + DIRECT -> { + request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) } + request.endTimestamp?.let { builder.timestampTo().isLessThan(it) } + } + REVERSE -> { + request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) } + request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) } + } } }.build() ) @@ -207,8 +211,14 @@ class MessageExtractor( } } - val firstMessage = if (order == Order.DIRECT) batch.messages.first() else batch.messages.last() - val lastMessage = if (order == Order.DIRECT) batch.messages.last() else batch.messages.first() + val firstMessage = when(order) { + DIRECT -> batch.messages.first() + REVERSE -> batch.messages.last() + } + val lastMessage = when(order) { + DIRECT -> batch.messages.last() + REVERSE -> batch.messages.first() + } logger.trace { "batch ${batch.id.sequence} of stream $streamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messages.size} messages left (firstId=${firstMessage.id.sequence} firstTimestamp=${firstMessage.timestamp} lastId=${lastMessage.id.sequence} lastTimestamp=${lastMessage.timestamp})" @@ -254,10 +264,9 @@ class MessageExtractor( } isStreamEmpty = true - lastTimestamp = if (order == Order.DIRECT) { - Instant.ofEpochMilli(Long.MAX_VALUE) - } else { - Instant.ofEpochMilli(Long.MIN_VALUE) + lastTimestamp = when(order) { + DIRECT -> Instant.ofEpochMilli(Long.MAX_VALUE) + REVERSE -> Instant.ofEpochMilli(Long.MIN_VALUE) } logger.debug { "no more data for stream $streamName (lastId=${lastElement.toString()} lastTimestamp=${lastTimestamp})" }