Skip to content

Commit

Permalink
[TH2-4983] Remove redundant reversing the returned events chunk (#362)
Browse files Browse the repository at this point in the history
* Remove redundant reversing the returned events chunk

* Update readme and version
  • Loading branch information
OptimumCode authored Nov 24, 2023
1 parent 3a47735 commit 3222ec4
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 21 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Report data provider (5.9.3)
# Report data provider (5.9.4)

# Overview
This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -295,6 +295,10 @@ spec:

# Release notes

## 5.9.4

+ Fix problem with incorrect events order when requesting search in previous direction

## 5.9.3
+ Enabled [Cassandra driver metrics](https://docs.datastax.com/en/developer/java-driver/4.10/manual/core/metrics/)

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@

kotlin.code.style=official

release_version=5.9.3
release_version=5.9.4

docker_image_name=
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,18 @@ fun Instant.isAfterOrEqual(other: Instant): Boolean {
return this.isAfter(other) || this == other
}

fun StoredTestEventBatch.tryToGetTestEvents(parentEventId: StoredTestEventId? = null): Collection<BatchedStoredTestEvent> {
fun StoredTestEventBatch.tryToGetTestEvents(parentEventId: StoredTestEventId? = null): Sequence<BatchedStoredTestEvent> {
return try {
this.testEvents?.let { events ->
this.testEvents?.asSequence()?.let { events ->
if (parentEventId != null) {
events.filter { it.parentId == parentEventId }
} else {
events
}
}?: emptyList()
} ?: emptySequence()
} catch (e: IOException) {
logger.error(e) { "unexpected IO exception while trying to parse an event batch - contents were ignored" }
emptyList()
emptySequence()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flow
Expand Down Expand Up @@ -163,7 +162,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
timestampFrom: Instant,
timestampTo: Instant,
): List<BaseEventEntity> {
return wrappers.flatMap { entry ->
return wrappers.asSequence().flatMap { entry ->
if (entry.isBatch) {
val batch = entry.asBatch()
batch.tryToGetTestEvents(request.parentEvent?.eventId).mapNotNull { event ->
Expand All @@ -175,10 +174,24 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
}
} else {
val single = entry.asSingle()
listOf(single to eventProducer.fromStoredEvent(single, null))
sequenceOf(single to eventProducer.fromStoredEvent(single, null))
}
}.let { eventTreesNodes ->
eventProducer.fromEventsProcessed(eventTreesNodes, request)
eventProducer.mutableFromEventsProcessed(eventTreesNodes, request)
}.also { events ->
// The sorting here is performed in order to avoid unexpected events order
// when several batches were added in the same chunk we process here
// Events between those batches might be unordered
events.sortWith(
Comparator.comparing(BaseEventEntity::startTimestamp)
.run {
if (request.searchDirection == AFTER) {
this
} else {
reversed()
}
}
)
}
}

Expand All @@ -197,19 +210,18 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
getEventsSuspend(request, timestampFrom, timestampTo)
.asSequence()
.chunked(eventSearchChunkSize)
// Cradle suppose to return events in the right order depending on the order in the request
// We don't need to sort entities between each other.
// However, there still might be an issue with batches if it contains events for a long period of time
// For now just keep it in mind when you are investigating the reason
// some events returned in unexpected order
for (event in eventsCollection)
emit(event)
}
.map { wrappers ->
async(parentContext) {
prepareEvents(wrappers, request, timestampFrom, timestampTo)
.let { events ->
if (request.searchDirection == AFTER) {
events
} else {
events.reversed()
}
}.also { parentContext.ensureActive() }
.also { parentContext.ensureActive() }
}
}.buffer(BUFFERED)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ class EventProducer(private val cradle: CradleService, private val mapper: Objec
.toList()
}

fun fromEventsProcessed(
events: List<Pair<TestEventSingle, BaseEventEntity>>,
fun mutableFromEventsProcessed(
events: Sequence<Pair<TestEventSingle, BaseEventEntity>>,
request: SseEventSearchRequest
): List<BaseEventEntity> {
): MutableList<BaseEventEntity> {

return events.let {
if (!request.metadataOnly || request.filterPredicate.getSpecialTypes().contains(NEED_BODY)) {
Expand All @@ -148,7 +148,7 @@ class EventProducer(private val cradle: CradleService, private val mapper: Objec
}
} else {
it.map { (_, event) -> event }
}
}.toMutableList()
}
}

Expand Down

0 comments on commit 3222ec4

Please sign in to comment.