From 3222ec42ed37c13e4cb58556f4f3a854f0930e51 Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Fri, 24 Nov 2023 16:17:15 +0400 Subject: [PATCH] [TH2-4983] Remove redundant reversing the returned events chunk (#362) * Remove redundant reversing the returned events chunk * Update readme and version --- README.md | 6 +++- gradle.properties | 2 +- .../th2/rptdataprovider/Extensions.kt | 8 ++--- .../handlers/SearchEventsHandler.kt | 34 +++++++++++++------ .../producers/EventProducer.kt | 8 ++--- 5 files changed, 37 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 9536297a..86c889fb 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Report data provider (5.9.3) +# Report data provider (5.9.4) # Overview This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources. @@ -295,6 +295,10 @@ spec: # Release notes +## 5.9.4 + ++ Fix problem with incorrect events order when requesting search in previous direction + ## 5.9.3 + Enabled [Cassandra driver metrics](https://docs.datastax.com/en/developer/java-driver/4.10/manual/core/metrics/) diff --git a/gradle.properties b/gradle.properties index 667afe2f..de8c8757 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,6 +16,6 @@ kotlin.code.style=official -release_version=5.9.3 +release_version=5.9.4 docker_image_name= \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Extensions.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Extensions.kt index 865ca917..f27f860f 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/Extensions.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/Extensions.kt @@ -184,18 +184,18 @@ fun Instant.isAfterOrEqual(other: Instant): Boolean { return this.isAfter(other) || this == other } -fun StoredTestEventBatch.tryToGetTestEvents(parentEventId: StoredTestEventId? = null): Collection { +fun StoredTestEventBatch.tryToGetTestEvents(parentEventId: StoredTestEventId? = null): Sequence { return try { - this.testEvents?.let { events -> + this.testEvents?.asSequence()?.let { events -> if (parentEventId != null) { events.filter { it.parentId == parentEventId } } else { events } - }?: emptyList() + } ?: emptySequence() } catch (e: IOException) { logger.error(e) { "unexpected IO exception while trying to parse an event batch - contents were ignored" } - emptyList() + emptySequence() } } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt index 0334902f..398956d1 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt @@ -50,7 +50,6 @@ import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.flatMapConcat import kotlinx.coroutines.flow.flow @@ -163,7 +162,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { timestampFrom: Instant, timestampTo: Instant, ): List { - return wrappers.flatMap { entry -> + return wrappers.asSequence().flatMap { entry -> if (entry.isBatch) { val batch = entry.asBatch() batch.tryToGetTestEvents(request.parentEvent?.eventId).mapNotNull { event -> @@ -175,10 +174,24 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { } } else { val single = entry.asSingle() - listOf(single to eventProducer.fromStoredEvent(single, null)) + sequenceOf(single to eventProducer.fromStoredEvent(single, null)) } }.let { eventTreesNodes -> - eventProducer.fromEventsProcessed(eventTreesNodes, request) + eventProducer.mutableFromEventsProcessed(eventTreesNodes, request) + }.also { events -> + // The sorting here is performed in order to avoid unexpected events order + // when several batches were added in the same chunk we process here + // Events between those batches might be unordered + events.sortWith( + Comparator.comparing(BaseEventEntity::startTimestamp) + .run { + if (request.searchDirection == AFTER) { + this + } else { + reversed() + } + } + ) } } @@ -197,19 +210,18 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { getEventsSuspend(request, timestampFrom, timestampTo) .asSequence() .chunked(eventSearchChunkSize) + // Cradle suppose to return events in the right order depending on the order in the request + // We don't need to sort entities between each other. + // However, there still might be an issue with batches if it contains events for a long period of time + // For now just keep it in mind when you are investigating the reason + // some events returned in unexpected order for (event in eventsCollection) emit(event) } .map { wrappers -> async(parentContext) { prepareEvents(wrappers, request, timestampFrom, timestampTo) - .let { events -> - if (request.searchDirection == AFTER) { - events - } else { - events.reversed() - } - }.also { parentContext.ensureActive() } + .also { parentContext.ensureActive() } } }.buffer(BUFFERED) } diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt index fb03f7a0..6477689a 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/producers/EventProducer.kt @@ -124,10 +124,10 @@ class EventProducer(private val cradle: CradleService, private val mapper: Objec .toList() } - fun fromEventsProcessed( - events: List>, + fun mutableFromEventsProcessed( + events: Sequence>, request: SseEventSearchRequest - ): List { + ): MutableList { return events.let { if (!request.metadataOnly || request.filterPredicate.getSpecialTypes().contains(NEED_BODY)) { @@ -148,7 +148,7 @@ class EventProducer(private val cradle: CradleService, private val mapper: Objec } } else { it.map { (_, event) -> event } - } + }.toMutableList() } }