Skip to content

Commit

Permalink
[TH2-5234] Corrected after review
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Aug 29, 2024
1 parent aee9869 commit 3be9d99
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 54 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ spec:
messageUnpackerOutputMessageBuffer: 100
messageFilterOutputMessageBuffer: 100
messageMergerOutputMessageBuffer: 10
messageIdsLookupLimitDays: 7 // lookup limit value for seacing next and previous message ids.
codecPendingBatchLimit: 16 // the total number of messages sent to the codec batches in parallel for all pipelines
codecCallbackThreadPool: 4 // thread pool for parsing messages received from codecs
Expand Down Expand Up @@ -296,7 +298,7 @@ spec:
# Release notes

## 5.13.0
* Provided ability to limit `messageIds` request by `lookupLimitDays` argument or `messageIdsLookupLimit` option
* Provided ability to limit `messageIds` request by `lookupLimitDays` argument or `messageIdsLookupLimitDays` option
* Updated:
* th2 gradle plugin: `0.1.1`
* common: `5.14.0-dev`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class CustomConfigurationClass {
val messageUnpackerOutputMessageBuffer: Int = 100
val messageFilterOutputMessageBuffer: Int = 100
val messageMergerOutputMessageBuffer: Int = 10
val messageIdsLookupLimit: Int = 60 * 30 * 1_000
val messageIdsLookupLimitDays: Int = 7

val codecResponseTimeout: Int = 6_000
val codecPendingBatchLimit: Int = 16
Expand Down Expand Up @@ -178,10 +178,10 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
"10"
)

val messageIdsLookupLimit: Variable = Variable(
"messageIdsLookupLimit",
customConfiguration.messageIdsLookupLimit.toString(),
"1800000"
val messageIdsLookupLimitDays: Variable = Variable(
"messageIdsLookupLimitDays",
customConfiguration.messageIdsLookupLimitDays.toString(),
"7"
)

val codecResponseTimeout: Variable = Variable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ data class SseMessageSearchRequest<RM, PM>(
val endTimestamp: Instant?,
val resultCountLimit: Int?,
val attachedEvents: Boolean,
val lookupLimit: Long?,
val lookupLimitDays: Long?,
val resumeFromIdsList: List<StreamPointer>,
val includeProtocols: List<String>?,
val excludeProtocols: List<String>?,
Expand Down Expand Up @@ -70,7 +70,7 @@ data class SseMessageSearchRequest<RM, PM>(

resultCountLimit = parameters["resultCountLimit"]?.firstOrNull()?.toInt(),
attachedEvents = parameters["attachedEvents"]?.firstOrNull()?.toBoolean() ?: false,
lookupLimit = parameters["lookupLimitDays"]?.firstOrNull()?.toLong()?.run { this * 24 * 60 * 60 * 1_000 },
lookupLimitDays = parameters["lookupLimitDays"]?.firstOrNull()?.toLong(),

includeProtocols = parameters["includeProtocols"],
excludeProtocols = parameters["excludeProtocols"],
Expand Down Expand Up @@ -127,7 +127,7 @@ data class SseMessageSearchRequest<RM, PM>(

attachedEvents = false,

lookupLimit = null,
lookupLimitDays = null,

includeProtocols = null,

Expand Down Expand Up @@ -164,20 +164,6 @@ data class SseMessageSearchRequest<RM, PM>(
}
}

private fun checkLookupLimitDays() {
if (lookupLimit != null && endTimestamp != null) {
throw InvalidRequestException(
"endTimestamp: $endTimestamp must be null if lookupLimit: $lookupLimit isn't null"
)
}
}

private fun checkNoEndTimestamp() {
if (endTimestamp != null) {
throw InvalidRequestException("endTimestamp: $endTimestamp must be null")
}
}

private fun checkStartPoint() {
if (startTimestamp == null && resumeFromIdsList.isEmpty())
throw InvalidRequestException("One of the 'startTimestamp' or 'messageId' must not be null")
Expand Down Expand Up @@ -217,10 +203,9 @@ data class SseMessageSearchRequest<RM, PM>(

fun checkIdsRequest() {
checkStartPoint()
checkNoEndTimestamp()
checkEndTimestamp()
checkStreamList()
checkTimestampAndId()
checkLookupLimitDays()
checkResumeIds()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.exactpro.th2.rptdataprovider.handlers

import com.exactpro.cradle.Direction
import com.exactpro.cradle.TimeRelation
import com.exactpro.cradle.TimeRelation.AFTER
import com.exactpro.cradle.TimeRelation.BEFORE
import com.exactpro.cradle.messages.StoredMessageId
import com.exactpro.th2.common.grpc.Message
import com.exactpro.th2.common.grpc.MessageGroupBatch
Expand Down Expand Up @@ -54,6 +56,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.withContext
import java.time.temporal.ChronoUnit.DAYS
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.coroutineContext
import kotlin.math.roundToInt
Expand Down Expand Up @@ -127,7 +130,7 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
}
}

suspend fun getIds(request: SseMessageSearchRequest<RM, PM>, lookupLimit: Long): Map<String, List<StreamInfo>> {
suspend fun getIds(request: SseMessageSearchRequest<RM, PM>, lookupLimitDays: Long): Map<String, List<StreamInfo>> {
require(request.startTimestamp != null && request.endTimestamp == null) {
"startTimestamp must be not null and endTimestamp be null in request: $request"
}
Expand All @@ -146,12 +149,12 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
request.copy(startTimestamp = resumeId.timestamp)
} ?: request

val before = getIds(resultRequest, messageId, lookupLimit, TimeRelation.BEFORE)
val after = getIds(resultRequest, messageId, lookupLimit, TimeRelation.AFTER)
val before = getIds(resultRequest, messageId, lookupLimitDays, BEFORE)
val after = getIds(resultRequest, messageId, lookupLimitDays, AFTER)

return mapOf(
TimeRelationMapper.toHttp(TimeRelation.BEFORE) to before,
TimeRelationMapper.toHttp(TimeRelation.AFTER) to after,
TimeRelationMapper.toHttp(BEFORE) to before,
TimeRelationMapper.toHttp(AFTER) to after,
)
}

Expand All @@ -164,13 +167,19 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
private suspend fun getIds(
request: SseMessageSearchRequest<RM, PM>,
messageId: StoredMessageId?,
lookupLimit: Long,
lookupLimitDays: Long,
searchDirection: TimeRelation
): MutableList<StreamInfo> {
val resultRequest = request.copy(
searchDirection = searchDirection,
lookupLimit = request.lookupLimit ?: lookupLimit
)
val lookupLimit = request.lookupLimitDays ?: lookupLimitDays
val resultRequest = request.run {
copy(
searchDirection = searchDirection,
endTimestamp = when (searchDirection) {
BEFORE -> startTimestamp?.minus(lookupLimit, DAYS)
AFTER -> startTimestamp?.plus(lookupLimit, DAYS)
}
).also(SseMessageSearchRequest<*, *>::checkIdsRequest)
}

val pipelineStatus = PipelineStatus()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,22 +178,12 @@ class MessageExtractor<B, G, RM, PM>(
}
}
// always need to make sure that we send messages within the specified timestamp (in case the resume ID points to the past)
with(request) {
if (order == Order.DIRECT) {
startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
endTimestamp?.let { builder.timestampTo().isLessThan(it) }

if (startTimestamp != null && endTimestamp == null && lookupLimit != null) {
builder.timestampTo().isLessThan(startTimestamp.plusMillis(lookupLimit))
}
} else {
startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }

if (startTimestamp != null && endTimestamp == null && lookupLimit != null) {
builder.timestampFrom().isGreaterThan(startTimestamp.minusMillis(lookupLimit))
}
}
if (order == Order.DIRECT) {
request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampTo().isLessThan(it) }
} else {
request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }
}
}.build()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,19 +445,19 @@ class HttpServer<B, G, RM, PM>(
queryParametersMap,
messageFiltersPredicateFactory.getEmptyPredicate(),
).also(SseMessageSearchRequest<*, *>::checkIdsRequest)
searchMessagesHandler.getIds(request, configuration.messageIdsLookupLimit.value.toLong())
searchMessagesHandler.getIds(request, configuration.messageIdsLookupLimitDays.value.toLong())
}
}

get("/bookIds") {
handleRequest(call, context, "book ids", null, false, false) {
handleRequest(call, context, "book ids", null, probe = false, useSse = false) {
cradleService.getBookIds()
}
}

get("/scopeIds") {
val book = call.parameters["bookId"]!!
handleRequest(call, context, "event scopes", null, false, false) {
handleRequest(call, context, "event scopes", null, probe = false, useSse = false) {
cradleService.getEventScopes(BookId(book))
}
}
Expand Down

0 comments on commit 3be9d99

Please sign in to comment.