Skip to content

Commit

Permalink
[TH2-5234] Corrected condition for messageIds request where args cont…
Browse files Browse the repository at this point in the history
…ain messageId and start time
  • Loading branch information
Nikita-Smirnov-Exactpro committed Sep 3, 2024
1 parent 9da0b89 commit 85423a8
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@ package com.exactpro.th2.rptdataprovider.entities.requests
import com.exactpro.cradle.Direction
import com.exactpro.cradle.BookId
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.TimeRelation.AFTER
import com.exactpro.cradle.TimeRelation.BEFORE
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.common.util.toInstant
import com.exactpro.th2.dataprovider.grpc.MessageSearchRequest
Expand Down Expand Up @@ -96,7 +98,7 @@ data class SseMessageSearchRequest<RM, PM>(

searchDirection = request.searchDirection.let {
when (it) {
PREVIOUS -> TimeRelation.BEFORE
PREVIOUS -> BEFORE
else -> searchDirection
}
},
Expand Down Expand Up @@ -138,7 +140,7 @@ data class SseMessageSearchRequest<RM, PM>(
constructor(parameters: Map<String, List<String>>, filterPredicate: FilterPredicate<MessageWithMetadata<RM, PM>>) : this(
parameters = parameters,
filterPredicate = filterPredicate,
searchDirection = TimeRelation.AFTER
searchDirection = AFTER
)

constructor(
Expand All @@ -148,19 +150,24 @@ data class SseMessageSearchRequest<RM, PM>(
) : this(
request = request,
filterPredicate = filterPredicate,
searchDirection = TimeRelation.AFTER,
searchDirection = AFTER,
bookId = bookId
)

private fun checkEndTimestamp() {
if (endTimestamp == null || startTimestamp == null) return

if (searchDirection == TimeRelation.AFTER) {
if (startTimestamp.isAfter(endTimestamp))
throw InvalidRequestException("startTimestamp: $startTimestamp > endTimestamp: $endTimestamp")
} else {
if (startTimestamp.isBefore(endTimestamp))
throw InvalidRequestException("startTimestamp: $startTimestamp < endTimestamp: $endTimestamp")
when(searchDirection) {
BEFORE -> {
if (startTimestamp < endTimestamp) {
throw InvalidRequestException("startTimestamp: $startTimestamp < endTimestamp: $endTimestamp")
}
}
AFTER -> {
if (startTimestamp > endTimestamp) {
throw InvalidRequestException("startTimestamp: $startTimestamp > endTimestamp: $endTimestamp")
}
}
}
}

Expand All @@ -176,7 +183,26 @@ data class SseMessageSearchRequest<RM, PM>(
}

private fun checkTimestampAndId() {
if (startTimestamp != null && resumeFromIdsList.isNotEmpty())
if (startTimestamp != null && resumeFromIdsList.isNotEmpty()) {
when(searchDirection) {
BEFORE -> {
val pointers = resumeFromIdsList.filter { it.timestamp > startTimestamp }
if (pointers.isNotEmpty()) {
throw InvalidRequestException(
"You cannot specify resume Ids $pointers with timestamp greater than startTimestamp $startTimestamp"
)
}
}
AFTER -> {
val pointers = resumeFromIdsList.filter { it.timestamp < startTimestamp }
if (pointers.isNotEmpty()) {
throw InvalidRequestException(
"You cannot specify resume Ids $pointers with timestamp less than startTimestamp $startTimestamp"
)
}
}
}
}
throw InvalidRequestException("You cannot specify resume Id and start timestamp at the same time")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.exactpro.th2.rptdataprovider.entities.internal.PipelineKeepAlive
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch
import com.exactpro.th2.rptdataprovider.entities.internal.StreamEndObject
import com.exactpro.th2.rptdataprovider.entities.internal.StreamName
import com.exactpro.th2.rptdataprovider.entities.internal.StreamPointer
import com.exactpro.th2.rptdataprovider.entities.mappers.TimeRelationMapper
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import com.exactpro.th2.rptdataprovider.entities.responses.StreamInfo
Expand Down Expand Up @@ -74,6 +75,14 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
private val searchMessageRequests =
Counter.build("th2_search_messages", "Count of search message requests")
.register()

private fun StreamPointer.toStoredMessageId() = StoredMessageId(
stream.bookId,
stream.name,
stream.direction,
timestamp,
sequence
)
}


Expand Down Expand Up @@ -135,22 +144,9 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
"(startTimestamp must not be null or resumeFromIdsList must not be empty) and endTimestamp must be null in request: $request"
}
searchMessageRequests.inc()
val resumeId = request.resumeFromIdsList.firstOrNull()
val messageId = resumeId?.let {
StoredMessageId(
it.stream.bookId,
it.stream.name,
it.stream.direction,
it.timestamp,
it.sequence
)
}
val resultRequest = resumeId?.let {
request.copy(startTimestamp = resumeId.timestamp)
} ?: request

val before = getIds(resultRequest, messageId, lookupLimitDays, BEFORE)
val after = getIds(resultRequest, messageId, lookupLimitDays, AFTER)
val before = getIds(request, request.resumeFromIdsList, lookupLimitDays, BEFORE)
val after = getIds(request, request.resumeFromIdsList, lookupLimitDays, AFTER)

return mapOf(
TimeRelationMapper.toHttp(BEFORE) to before,
Expand All @@ -166,10 +162,15 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(

private suspend fun getIds(
request: SseMessageSearchRequest<RM, PM>,
messageId: StoredMessageId?,
messageIds: List<StreamPointer>,
lookupLimitDays: Long,
searchDirection: TimeRelation
): MutableList<StreamInfo> {
val messageId = when(searchDirection) {
BEFORE -> messageIds.maxByOrNull(StreamPointer::timestamp)
AFTER -> messageIds.minByOrNull(StreamPointer::timestamp)
}?.toStoredMessageId()

val lookupLimit = request.lookupLimitDays ?: lookupLimitDays
val resultRequest = request.run {
val calculatedStartTimestamp = startTimestamp ?: messageId?.timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.exactpro.th2.rptdataprovider.handlers.messages

import com.exactpro.cradle.Order
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.Order.DIRECT
import com.exactpro.cradle.Order.REVERSE
import com.exactpro.cradle.TimeRelation.AFTER
import com.exactpro.cradle.TimeRelation.BEFORE
import com.exactpro.cradle.messages.MessageFilterBuilder
import com.exactpro.cradle.messages.StoredMessage
import com.exactpro.cradle.messages.StoredMessageBatch
Expand Down Expand Up @@ -66,10 +68,9 @@ class MessageExtractor<B, G, RM, PM>(
private var lastElement: StoredMessageId? = null
private var lastTimestamp: Instant? = null

private val order = if (request.searchDirection == TimeRelation.AFTER) {
Order.DIRECT
} else {
Order.REVERSE
private val order = when(request.searchDirection) {
BEFORE -> REVERSE
AFTER -> DIRECT
}

init {
Expand All @@ -86,10 +87,9 @@ class MessageExtractor<B, G, RM, PM>(
}

private fun getMessagesFromBatch(batch: StoredMessageBatch): Collection<StoredMessage> {
return if (order == Order.DIRECT) {
batch.messages
} else {
batch.messagesReverse
return when(order) {
DIRECT -> batch.messages
REVERSE -> batch.messagesReverse
}
}

Expand All @@ -102,32 +102,29 @@ class MessageExtractor<B, G, RM, PM>(
if (resumeFromId?.sequence != null) {
val startSeq = resumeFromId.sequence
dropWhile {
if (order == Order.DIRECT) {
it.sequence < startSeq
} else {
it.sequence > startSeq
when(order) {
DIRECT -> it.sequence < startSeq
REVERSE -> it.sequence > startSeq
}
}
} else {
this // nothing to filter by sequence
}
}.dropWhile { //trim messages that do not strictly match time filter
request.startTimestamp?.let { startTimestamp ->
if (order == Order.DIRECT) {
it.timestamp.isBefore(startTimestamp)
} else {
it.timestamp.isAfter(startTimestamp)
when(order) {
DIRECT -> it.timestamp.isBefore(startTimestamp)
REVERSE -> it.timestamp.isAfter(startTimestamp)
}
} ?: false
}
}

private fun trimMessagesListTail(message: StoredMessage): Boolean {
return request.endTimestamp?.let { endTimestamp ->
if (order == Order.DIRECT) {
message.timestamp.isAfterOrEqual(endTimestamp)
} else {
message.timestamp.isBeforeOrEqual(endTimestamp)
when(order) {
DIRECT -> message.timestamp.isAfterOrEqual(endTimestamp)
REVERSE -> message.timestamp.isBeforeOrEqual(endTimestamp)
}
} ?: false
}
Expand All @@ -149,9 +146,14 @@ class MessageExtractor<B, G, RM, PM>(

streamName!!

val resumeFromId = request.resumeFromIdsList.firstOrNull {
it.stream.name == streamName.name && it.stream.direction == streamName.direction
}
val resumeFromId = request.resumeFromIdsList.asSequence()
.filter { it.stream.name == streamName.name && it.stream.direction == streamName.direction }
.run {
when(order) {
DIRECT -> minByOrNull(StreamPointer::sequence)
REVERSE -> maxByOrNull(StreamPointer::sequence)
}
}

logger.debug { "acquiring cradle iterator for stream $streamName" }

Expand All @@ -170,20 +172,22 @@ class MessageExtractor<B, G, RM, PM>(
.also { builder ->
if (resumeFromId != null) {
builder.sequence().let {
if (order == Order.DIRECT) {
it.isGreaterThanOrEqualTo(resumeFromId.sequence)
} else {
it.isLessThanOrEqualTo(resumeFromId.sequence)
when(order) {
DIRECT -> it.isGreaterThanOrEqualTo(resumeFromId.sequence)
REVERSE -> it.isLessThanOrEqualTo(resumeFromId.sequence)
}
}
}
// always need to make sure that we send messages within the specified timestamp (in case the resume ID points to the past)
if (order == Order.DIRECT) {
request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampTo().isLessThan(it) }
} else {
request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }
when(order) {
DIRECT -> {
request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampTo().isLessThan(it) }
}
REVERSE -> {
request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }
}
}
}.build()
)
Expand All @@ -207,8 +211,14 @@ class MessageExtractor<B, G, RM, PM>(
}
}

val firstMessage = if (order == Order.DIRECT) batch.messages.first() else batch.messages.last()
val lastMessage = if (order == Order.DIRECT) batch.messages.last() else batch.messages.first()
val firstMessage = when(order) {
DIRECT -> batch.messages.first()
REVERSE -> batch.messages.last()
}
val lastMessage = when(order) {
DIRECT -> batch.messages.last()
REVERSE -> batch.messages.first()
}

logger.trace {
"batch ${batch.id.sequence} of stream $streamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messages.size} messages left (firstId=${firstMessage.id.sequence} firstTimestamp=${firstMessage.timestamp} lastId=${lastMessage.id.sequence} lastTimestamp=${lastMessage.timestamp})"
Expand Down Expand Up @@ -254,10 +264,9 @@ class MessageExtractor<B, G, RM, PM>(
}

isStreamEmpty = true
lastTimestamp = if (order == Order.DIRECT) {
Instant.ofEpochMilli(Long.MAX_VALUE)
} else {
Instant.ofEpochMilli(Long.MIN_VALUE)
lastTimestamp = when(order) {
DIRECT -> Instant.ofEpochMilli(Long.MAX_VALUE)
REVERSE -> Instant.ofEpochMilli(Long.MIN_VALUE)
}

logger.debug { "no more data for stream $streamName (lastId=${lastElement.toString()} lastTimestamp=${lastTimestamp})" }
Expand Down

0 comments on commit 85423a8

Please sign in to comment.