Skip to content

Commit

Permalink
[TH2-5234] Added messageIdsLookupLimit option
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro committed Aug 27, 2024
1 parent ab47aca commit aee9869
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ spec:
# Release notes

## 5.13.0
* Provided ability to limit `messageIds` request by `lookupLimitDays` argument
* Provided ability to limit `messageIds` request by `lookupLimitDays` argument or `messageIdsLookupLimit` option
* Updated:
* th2 gradle plugin: `0.1.1`
* common: `5.14.0-dev`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class CustomConfigurationClass {
val messageUnpackerOutputMessageBuffer: Int = 100
val messageFilterOutputMessageBuffer: Int = 100
val messageMergerOutputMessageBuffer: Int = 10
val messageIdsLookupLimit: Int = 60 * 30 * 1_000

val codecResponseTimeout: Int = 6_000
val codecPendingBatchLimit: Int = 16
Expand Down Expand Up @@ -177,6 +178,12 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
"10"
)

val messageIdsLookupLimit: Variable = Variable(
"messageIdsLookupLimit",
customConfiguration.messageIdsLookupLimit.toString(),
"1800000"
)

val codecResponseTimeout: Variable = Variable(
"codecResponseTimeout",
customConfiguration.codecResponseTimeout.toString(), "6000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ data class SseMessageSearchRequest<RM, PM>(
val endTimestamp: Instant?,
val resultCountLimit: Int?,
val attachedEvents: Boolean,
val lookupLimitDays: Int?,
val lookupLimit: Long?,
val resumeFromIdsList: List<StreamPointer>,
val includeProtocols: List<String>?,
val excludeProtocols: List<String>?,
Expand Down Expand Up @@ -70,7 +70,7 @@ data class SseMessageSearchRequest<RM, PM>(

resultCountLimit = parameters["resultCountLimit"]?.firstOrNull()?.toInt(),
attachedEvents = parameters["attachedEvents"]?.firstOrNull()?.toBoolean() ?: false,
lookupLimitDays = parameters["lookupLimitDays"]?.firstOrNull()?.toInt(),
lookupLimit = parameters["lookupLimitDays"]?.firstOrNull()?.toLong()?.run { this * 24 * 60 * 60 * 1_000 },

includeProtocols = parameters["includeProtocols"],
excludeProtocols = parameters["excludeProtocols"],
Expand Down Expand Up @@ -127,7 +127,7 @@ data class SseMessageSearchRequest<RM, PM>(

attachedEvents = false,

lookupLimitDays = null,
lookupLimit = null,

includeProtocols = null,

Expand Down Expand Up @@ -165,13 +165,19 @@ data class SseMessageSearchRequest<RM, PM>(
}

private fun checkLookupLimitDays() {
if (lookupLimitDays != null && endTimestamp != null) {
if (lookupLimit != null && endTimestamp != null) {
throw InvalidRequestException(
"endTimestamp: $endTimestamp must be null if lookupLimitDays: $lookupLimitDays isn't null"
"endTimestamp: $endTimestamp must be null if lookupLimit: $lookupLimit isn't null"
)
}
}

private fun checkNoEndTimestamp() {
if (endTimestamp != null) {
throw InvalidRequestException("endTimestamp: $endTimestamp must be null")
}
}

private fun checkStartPoint() {
if (startTimestamp == null && resumeFromIdsList.isEmpty())
throw InvalidRequestException("One of the 'startTimestamp' or 'messageId' must not be null")
Expand Down Expand Up @@ -211,7 +217,7 @@ data class SseMessageSearchRequest<RM, PM>(

fun checkIdsRequest() {
checkStartPoint()
checkEndTimestamp()
checkNoEndTimestamp()
checkStreamList()
checkTimestampAndId()
checkLookupLimitDays()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
}
}

suspend fun getIds(request: SseMessageSearchRequest<RM, PM>): Map<String, List<StreamInfo>> {
suspend fun getIds(request: SseMessageSearchRequest<RM, PM>, lookupLimit: Long): Map<String, List<StreamInfo>> {
require(request.startTimestamp != null && request.endTimestamp == null) {
"startTimestamp must be not null and endTimestamp be null in request: $request"
}
searchMessageRequests.inc()
val resumeId = request.resumeFromIdsList.firstOrNull()
val messageId = resumeId?.let {
Expand All @@ -143,8 +146,8 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
request.copy(startTimestamp = resumeId.timestamp)
} ?: request

val before = getIds(resultRequest, messageId, TimeRelation.BEFORE)
val after = getIds(resultRequest, messageId, TimeRelation.AFTER)
val before = getIds(resultRequest, messageId, lookupLimit, TimeRelation.BEFORE)
val after = getIds(resultRequest, messageId, lookupLimit, TimeRelation.AFTER)

return mapOf(
TimeRelationMapper.toHttp(TimeRelation.BEFORE) to before,
Expand All @@ -161,9 +164,13 @@ abstract class SearchMessagesHandler<B, G, RM, PM>(
private suspend fun getIds(
request: SseMessageSearchRequest<RM, PM>,
messageId: StoredMessageId?,
lookupLimit: Long,
searchDirection: TimeRelation
): MutableList<StreamInfo> {
val resultRequest = request.copy(searchDirection = searchDirection)
val resultRequest = request.copy(
searchDirection = searchDirection,
lookupLimit = request.lookupLimit ?: lookupLimit
)

val pipelineStatus = PipelineStatus()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.time.Instant
import java.time.temporal.ChronoUnit


class MessageExtractor<B, G, RM, PM>(
Expand Down Expand Up @@ -184,25 +183,15 @@ class MessageExtractor<B, G, RM, PM>(
startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
endTimestamp?.let { builder.timestampTo().isLessThan(it) }

if (startTimestamp != null &&
endTimestamp == null &&
lookupLimitDays != null
) {
builder.timestampTo().isLessThan(
startTimestamp.plus(lookupLimitDays.toLong(), ChronoUnit.DAYS)
)
if (startTimestamp != null && endTimestamp == null && lookupLimit != null) {
builder.timestampTo().isLessThan(startTimestamp.plusMillis(lookupLimit))
}
} else {
startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }

if (startTimestamp != null &&
endTimestamp == null &&
lookupLimitDays != null
) {
builder.timestampFrom().isGreaterThan(
startTimestamp.minus(lookupLimitDays.toLong(), ChronoUnit.DAYS)
)
if (startTimestamp != null && endTimestamp == null && lookupLimit != null) {
builder.timestampFrom().isGreaterThan(startTimestamp.minusMillis(lookupLimit))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ class HttpServer<B, G, RM, PM>(
queryParametersMap,
messageFiltersPredicateFactory.getEmptyPredicate(),
).also(SseMessageSearchRequest<*, *>::checkIdsRequest)
searchMessagesHandler.getIds(request)
searchMessagesHandler.getIds(request, configuration.messageIdsLookupLimit.value.toLong())
}
}

Expand Down

0 comments on commit aee9869

Please sign in to comment.