From 3be9d9986b18ce163ecca732e30a2ee9ddde739f Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Thu, 29 Aug 2024 11:20:41 +0400 Subject: [PATCH] [TH2-5234] Corrected after review --- README.md | 4 ++- .../entities/configuration/Configuration.kt | 10 +++---- .../requests/SseMessageSearchRequest.kt | 23 +++------------ .../handlers/SearchMessagesHandler.kt | 29 ++++++++++++------- .../handlers/messages/MessageExtractor.kt | 22 ++++---------- .../th2/rptdataprovider/server/HttpServer.kt | 6 ++-- 6 files changed, 40 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index bdde59b7..aef00ad8 100644 --- a/README.md +++ b/README.md @@ -222,6 +222,8 @@ spec: messageUnpackerOutputMessageBuffer: 100 messageFilterOutputMessageBuffer: 100 messageMergerOutputMessageBuffer: 10 + + messageIdsLookupLimitDays: 7 // lookup limit value for seacing next and previous message ids. codecPendingBatchLimit: 16 // the total number of messages sent to the codec batches in parallel for all pipelines codecCallbackThreadPool: 4 // thread pool for parsing messages received from codecs @@ -296,7 +298,7 @@ spec: # Release notes ## 5.13.0 -* Provided ability to limit `messageIds` request by `lookupLimitDays` argument or `messageIdsLookupLimit` option +* Provided ability to limit `messageIds` request by `lookupLimitDays` argument or `messageIdsLookupLimitDays` option * Updated: * th2 gradle plugin: `0.1.1` * common: `5.14.0-dev` diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt index d4d88831..5be9bd2a 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/configuration/Configuration.kt @@ -48,7 +48,7 @@ class CustomConfigurationClass { val messageUnpackerOutputMessageBuffer: Int = 100 val messageFilterOutputMessageBuffer: Int = 100 val messageMergerOutputMessageBuffer: Int = 10 - val messageIdsLookupLimit: Int = 60 * 30 * 1_000 + val messageIdsLookupLimitDays: Int = 7 val codecResponseTimeout: Int = 6_000 val codecPendingBatchLimit: Int = 16 @@ -178,10 +178,10 @@ class Configuration(customConfiguration: CustomConfigurationClass) { "10" ) - val messageIdsLookupLimit: Variable = Variable( - "messageIdsLookupLimit", - customConfiguration.messageIdsLookupLimit.toString(), - "1800000" + val messageIdsLookupLimitDays: Variable = Variable( + "messageIdsLookupLimitDays", + customConfiguration.messageIdsLookupLimitDays.toString(), + "7" ) val codecResponseTimeout: Variable = Variable( diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt index 4f200ad1..5ae2fb82 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/entities/requests/SseMessageSearchRequest.kt @@ -39,7 +39,7 @@ data class SseMessageSearchRequest( val endTimestamp: Instant?, val resultCountLimit: Int?, val attachedEvents: Boolean, - val lookupLimit: Long?, + val lookupLimitDays: Long?, val resumeFromIdsList: List, val includeProtocols: List?, val excludeProtocols: List?, @@ -70,7 +70,7 @@ data class SseMessageSearchRequest( resultCountLimit = parameters["resultCountLimit"]?.firstOrNull()?.toInt(), attachedEvents = parameters["attachedEvents"]?.firstOrNull()?.toBoolean() ?: false, - lookupLimit = parameters["lookupLimitDays"]?.firstOrNull()?.toLong()?.run { this * 24 * 60 * 60 * 1_000 }, + lookupLimitDays = parameters["lookupLimitDays"]?.firstOrNull()?.toLong(), includeProtocols = parameters["includeProtocols"], excludeProtocols = parameters["excludeProtocols"], @@ -127,7 +127,7 @@ data class SseMessageSearchRequest( attachedEvents = false, - lookupLimit = null, + lookupLimitDays = null, includeProtocols = null, @@ -164,20 +164,6 @@ data class SseMessageSearchRequest( } } - private fun checkLookupLimitDays() { - if (lookupLimit != null && endTimestamp != null) { - throw InvalidRequestException( - "endTimestamp: $endTimestamp must be null if lookupLimit: $lookupLimit isn't null" - ) - } - } - - private fun checkNoEndTimestamp() { - if (endTimestamp != null) { - throw InvalidRequestException("endTimestamp: $endTimestamp must be null") - } - } - private fun checkStartPoint() { if (startTimestamp == null && resumeFromIdsList.isEmpty()) throw InvalidRequestException("One of the 'startTimestamp' or 'messageId' must not be null") @@ -217,10 +203,9 @@ data class SseMessageSearchRequest( fun checkIdsRequest() { checkStartPoint() - checkNoEndTimestamp() + checkEndTimestamp() checkStreamList() checkTimestampAndId() - checkLookupLimitDays() checkResumeIds() } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt index 799452ba..9a15bff1 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchMessagesHandler.kt @@ -18,6 +18,8 @@ package com.exactpro.th2.rptdataprovider.handlers import com.exactpro.cradle.Direction import com.exactpro.cradle.TimeRelation +import com.exactpro.cradle.TimeRelation.AFTER +import com.exactpro.cradle.TimeRelation.BEFORE import com.exactpro.cradle.messages.StoredMessageId import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.grpc.MessageGroupBatch @@ -54,6 +56,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.withContext +import java.time.temporal.ChronoUnit.DAYS import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.coroutineContext import kotlin.math.roundToInt @@ -127,7 +130,7 @@ abstract class SearchMessagesHandler( } } - suspend fun getIds(request: SseMessageSearchRequest, lookupLimit: Long): Map> { + suspend fun getIds(request: SseMessageSearchRequest, lookupLimitDays: Long): Map> { require(request.startTimestamp != null && request.endTimestamp == null) { "startTimestamp must be not null and endTimestamp be null in request: $request" } @@ -146,12 +149,12 @@ abstract class SearchMessagesHandler( request.copy(startTimestamp = resumeId.timestamp) } ?: request - val before = getIds(resultRequest, messageId, lookupLimit, TimeRelation.BEFORE) - val after = getIds(resultRequest, messageId, lookupLimit, TimeRelation.AFTER) + val before = getIds(resultRequest, messageId, lookupLimitDays, BEFORE) + val after = getIds(resultRequest, messageId, lookupLimitDays, AFTER) return mapOf( - TimeRelationMapper.toHttp(TimeRelation.BEFORE) to before, - TimeRelationMapper.toHttp(TimeRelation.AFTER) to after, + TimeRelationMapper.toHttp(BEFORE) to before, + TimeRelationMapper.toHttp(AFTER) to after, ) } @@ -164,13 +167,19 @@ abstract class SearchMessagesHandler( private suspend fun getIds( request: SseMessageSearchRequest, messageId: StoredMessageId?, - lookupLimit: Long, + lookupLimitDays: Long, searchDirection: TimeRelation ): MutableList { - val resultRequest = request.copy( - searchDirection = searchDirection, - lookupLimit = request.lookupLimit ?: lookupLimit - ) + val lookupLimit = request.lookupLimitDays ?: lookupLimitDays + val resultRequest = request.run { + copy( + searchDirection = searchDirection, + endTimestamp = when (searchDirection) { + BEFORE -> startTimestamp?.minus(lookupLimit, DAYS) + AFTER -> startTimestamp?.plus(lookupLimit, DAYS) + } + ).also(SseMessageSearchRequest<*, *>::checkIdsRequest) + } val pipelineStatus = PipelineStatus() diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt index bd06fb43..bc067cf8 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/messages/MessageExtractor.kt @@ -178,22 +178,12 @@ class MessageExtractor( } } // always need to make sure that we send messages within the specified timestamp (in case the resume ID points to the past) - with(request) { - if (order == Order.DIRECT) { - startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) } - endTimestamp?.let { builder.timestampTo().isLessThan(it) } - - if (startTimestamp != null && endTimestamp == null && lookupLimit != null) { - builder.timestampTo().isLessThan(startTimestamp.plusMillis(lookupLimit)) - } - } else { - startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) } - endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) } - - if (startTimestamp != null && endTimestamp == null && lookupLimit != null) { - builder.timestampFrom().isGreaterThan(startTimestamp.minusMillis(lookupLimit)) - } - } + if (order == Order.DIRECT) { + request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) } + request.endTimestamp?.let { builder.timestampTo().isLessThan(it) } + } else { + request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) } + request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) } } }.build() ) diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt index 7891dcfa..a7199188 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/server/HttpServer.kt @@ -445,19 +445,19 @@ class HttpServer( queryParametersMap, messageFiltersPredicateFactory.getEmptyPredicate(), ).also(SseMessageSearchRequest<*, *>::checkIdsRequest) - searchMessagesHandler.getIds(request, configuration.messageIdsLookupLimit.value.toLong()) + searchMessagesHandler.getIds(request, configuration.messageIdsLookupLimitDays.value.toLong()) } } get("/bookIds") { - handleRequest(call, context, "book ids", null, false, false) { + handleRequest(call, context, "book ids", null, probe = false, useSse = false) { cradleService.getBookIds() } } get("/scopeIds") { val book = call.parameters["bookId"]!! - handleRequest(call, context, "event scopes", null, false, false) { + handleRequest(call, context, "event scopes", null, probe = false, useSse = false) { cradleService.getEventScopes(BookId(book)) } }