Skip to content

Commit

Permalink
[TH2-5086] Problem with timeout when searching messages (#360)
Browse files Browse the repository at this point in the history
* Add missing limit to the group filter. Log the original message filter

* Use lazy iterator processing instead of collecting all to a list

* Remove deprecated PipelinePhase

* Update version and readme
  • Loading branch information
OptimumCode authored Sep 28, 2023
1 parent 84164cd commit 1e29815
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 27 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Report data provider (5.9.1)
# Report data provider (5.9.2)

# Overview
This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -295,6 +295,9 @@ spec:

# Release notes

## 5.9.2
+ Fix problem with accumulating all batches in memory when provider loads messages by group

## 5.9.1
+ Migrated to the cradle version with fixed load pages where `removed` field is null problem.
+ Updated cradle: `5.1.4-dev`
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@

kotlin.code.style=official

release_version=5.9.1
release_version=5.9.2

docker_image_name=
40 changes: 21 additions & 19 deletions src/main/kotlin/com/exactpro/th2/rptdataprovider/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,27 @@ fun MessageFilter.convertToString(): String {
"order=${filter.order}"
}

fun MessageFilter.toGroupedMessageFilter(group: String?): GroupedMessageFilter = GroupedMessageFilter.builder().also { builder ->
builder.bookId(bookId)
.pageId(pageId)
.groupName(group)
.order(order)

when(timestampFrom?.operation) {
null -> { /* do noting */ }
ComparisonOperation.GREATER -> builder.timestampFrom().isGreaterThan(timestampFrom.value)
ComparisonOperation.GREATER_OR_EQUALS -> builder.timestampFrom().isGreaterThanOrEqualTo(timestampFrom.value)
else -> error("The '${timestampFrom.operation}' operation isn't supported")
}
when(timestampTo?.operation) {
null -> { /* do noting */ }
ComparisonOperation.LESS -> builder.timestampTo().isLessThan(timestampTo.value)
ComparisonOperation.LESS_OR_EQUALS -> builder.timestampTo().isLessThanOrEqualTo(timestampTo.value)
else -> error("The '${timestampTo.operation}' operation isn't supported")
}
}.build()
fun MessageFilter.toGroupedMessageFilter(group: String): GroupedMessageFilter =
GroupedMessageFilter.builder().also { builder ->
logger.debug { "Creating group filter from ${this.convertToString()}" }
builder.bookId(bookId)
.pageId(pageId)
.groupName(group)
.order(order)

when (timestampFrom?.operation) {
null -> { /* do noting */ }
ComparisonOperation.GREATER -> builder.timestampFrom().isGreaterThan(timestampFrom.value)
ComparisonOperation.GREATER_OR_EQUALS -> builder.timestampFrom().isGreaterThanOrEqualTo(timestampFrom.value)
else -> error("The '${timestampFrom.operation}' operation isn't supported")
}
when (timestampTo?.operation) {
null -> { /* do noting */ }
ComparisonOperation.LESS -> builder.timestampTo().isLessThan(timestampTo.value)
ComparisonOperation.LESS_OR_EQUALS -> builder.timestampTo().isLessThanOrEqualTo(timestampTo.value)
else -> error("The '${timestampTo.operation}' operation isn't supported")
}
}.build()

suspend fun <T> logTime(methodName: String, lambda: suspend () -> T): T? {
var result: T?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class HttpServer<B, G, RM, PM>(

if (timeout <= 0) return

pipeline.intercept(ApplicationCallPipeline.Features) {
pipeline.intercept(ApplicationCallPipeline.Plugins) {
if (excludes.any { call.request.uri.contains(it) }) return@intercept
withTimeout(timeout) {
proceed()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ open class CradleService(configuration: Configuration, cradleManager: CradleMana
logTime("getMessagesBatches (filter=${filter.convertToString()})") {
getMessageBatches(filter)
}
} ?: listOf())
} ?: emptySequence())
.let { iterable ->
Channel<StoredMessageBatch>(1)
.also { channel ->
Expand Down Expand Up @@ -181,7 +181,7 @@ open class CradleService(configuration: Configuration, cradleManager: CradleMana

protected open suspend fun getMessageBatches(
filter: MessageFilter
): Iterable<StoredMessageBatch> = storage.getMessageBatchesAsync(filter).await().asIterable()
): Sequence<StoredMessageBatch> = storage.getMessageBatchesAsync(filter).await().asSequence()

protected open suspend fun getMessageBatches(
id: StoredMessageId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ class MessageGroupCradleService(

override suspend fun getMessageBatches(
filter: MessageFilter
): Iterable<StoredMessageBatch> =
): Sequence<StoredMessageBatch> =
getSessionGroupSuspend(filter)?.let { group ->
val groupedMessageFilter = filter.toGroupedMessageFilter(group).also {
K_LOGGER.debug { "Start searching group batches by $it" }
}
storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asIterable()
storage.getGroupedMessageBatchesAsync(groupedMessageFilter).await().asSequence()
.mapNotNull { batch ->
val messages = batch.messages.filter { message ->
filter.sessionAlias == message.sessionAlias
Expand All @@ -130,7 +130,7 @@ class MessageGroupCradleService(
}
}

} ?: listOf()
} ?: emptySequence()

override suspend fun getMessageBatches(
id: StoredMessageId
Expand Down

0 comments on commit 1e29815

Please sign in to comment.