Skip to content

Commit

Permalink
fixed bug: always DIRECT order in GroupedMessageFilterBuilder
Browse files Browse the repository at this point in the history
some MessageExtractor & StreamMerger refactoring
  • Loading branch information
lumber1000 committed Nov 18, 2024
1 parent 0bdd05b commit 5c58bec
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.exactpro.th2.rptdataprovider.Context
import com.exactpro.th2.rptdataprovider.entities.internal.CommonStreamName
import com.exactpro.th2.rptdataprovider.entities.internal.EmptyPipelineObject
import com.exactpro.th2.rptdataprovider.entities.internal.PipelineRawBatch
import com.exactpro.th2.rptdataprovider.entities.internal.StreamPointer
import com.exactpro.th2.rptdataprovider.entities.requests.SseMessageSearchRequest
import com.exactpro.th2.rptdataprovider.entities.responses.StoredMessageBatchWrapper
import com.exactpro.th2.rptdataprovider.entities.sse.StreamWriter
Expand Down Expand Up @@ -72,6 +73,17 @@ class MessageExtractor<B, G, RM, PM>(
REVERSE -> Instant::isAfter
}

private val firstResumeId: Sequence<StreamPointer>.((StreamPointer) -> Instant) -> StreamPointer? =
when(order) {
DIRECT -> Sequence<StreamPointer>::minByOrNull
REVERSE -> Sequence<StreamPointer>::maxByOrNull
}

private val emptyStreamTimestamp = when (order) {
DIRECT -> Instant.MAX
REVERSE -> Instant.MIN
}

init {
externalScope.launch {
try {
Expand All @@ -85,17 +97,10 @@ class MessageExtractor<B, G, RM, PM>(
}
}

private val StoredGroupedMessageBatch.orderedMessages: Collection<StoredMessage>
get() = when (order) {
DIRECT -> messages
REVERSE -> messagesReverse
}

override suspend fun processMessage() {
coroutineScope {
launch {
while (this@coroutineScope.isActive) {

//FIXME: replace delay-based stream updates with synchronous updates from iterator
lastTimestamp?.also {
sendToChannel(EmptyPipelineObject(isStreamEmpty, lastElement, it).also { msg ->
Expand All @@ -108,12 +113,7 @@ class MessageExtractor<B, G, RM, PM>(

val resumeFromId = request.resumeFromIdsList.asSequence()
.filter { it.stream.name == commonStreamName.name }
.run {
when(request.searchDirection) {
BEFORE -> maxByOrNull { it.timestamp }
AFTER -> minByOrNull { it.timestamp }
}
}
.firstResumeId { it.timestamp }

LOGGER.debug { "acquiring cradle iterator for stream $commonStreamName" }

Expand All @@ -127,21 +127,26 @@ class MessageExtractor<B, G, RM, PM>(
request.startTimestamp,
request.endTimestamp,
)

val cradleMessageIterable = context.cradleService.getGroupedMessages(
this,
GroupedMessageFilterBuilder().apply {
bookId(commonStreamName.bookId)
groupName(sessionGroup)
order(order)

if (order == REVERSE) { // default: DIRECT
request.startTimestamp?.let { timestampTo().isLessThanOrEqualTo(it) }
request.endTimestamp?.let { timestampFrom().isGreaterThan(it) }
} else {
request.startTimestamp?.let { timestampFrom().isGreaterThanOrEqualTo(it) }
request.endTimestamp?.let { timestampTo().isLessThan(it) }
}
}.build()

GroupedMessageFilterBuilder()
.bookId(commonStreamName.bookId)
.groupName(sessionGroup)
.order(order)
.also { builder ->
when (order) {
DIRECT -> {
request.startTimestamp?.let { builder.timestampFrom().isGreaterThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampTo().isLessThan(it) }
}
REVERSE -> {
request.startTimestamp?.let { builder.timestampTo().isLessThanOrEqualTo(it) }
request.endTimestamp?.let { builder.timestampFrom().isGreaterThan(it) }
}
}
}.build()
)

LOGGER.debug { "cradle iterator has been built for session group: $sessionGroup, alias: $commonStreamName" }
Expand All @@ -164,9 +169,9 @@ class MessageExtractor<B, G, RM, PM>(
batch.messages.filterTo(filteredMessages) { it.sessionAlias == commonStreamName.name }
filteredMessages.sortBy { it.timestamp }

when (request.searchDirection) {
AFTER -> filteredMessages
BEFORE -> filteredMessages.asReversed()
when (order) {
DIRECT -> filteredMessages
REVERSE -> filteredMessages.asReversed()
}
}

Expand Down Expand Up @@ -199,8 +204,9 @@ class MessageExtractor<B, G, RM, PM>(
}

LOGGER.trace {
val firstMessage = batch.orderedMessages.firstOrNull()
val lastMessage = batch.orderedMessages.lastOrNull()
val messages = if (order == REVERSE) batch.messagesReverse else batch.messages
val firstMessage = messages.firstOrNull()
val lastMessage = messages.lastOrNull()
"batch ${batch.firstTimestamp} of group $sessionGroup for stream $commonStreamName has been trimmed (targetStartTimestamp=${request.startTimestamp} targetEndTimestamp=${request.endTimestamp} targetId=${resumeFromId?.sequence}) - ${trimmedMessages.size} of ${batch.messageCount} messages left (firstId=${firstMessage?.id?.sequence} firstTimestamp=${firstMessage?.timestamp} lastId=${lastMessage?.id?.sequence} lastTimestamp=${lastMessage?.timestamp})"
}

Expand Down Expand Up @@ -244,10 +250,7 @@ class MessageExtractor<B, G, RM, PM>(
}

isStreamEmpty = true
lastTimestamp = when (order) {
DIRECT -> Instant.MAX
REVERSE -> Instant.MIN
}
lastTimestamp = emptyStreamTimestamp

LOGGER.debug { "no more data for stream $commonStreamName (lastId=${lastElement.toString()} lastTimestamp=${lastTimestamp})" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ import com.exactpro.th2.rptdataprovider.handlers.PipelineStatus
import com.exactpro.th2.rptdataprovider.handlers.messages.helpers.MultipleStreamHolder
import com.exactpro.th2.rptdataprovider.isAfterOrEqual
import com.exactpro.th2.rptdataprovider.isBeforeOrEqual
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.isActive
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import kotlin.coroutines.coroutineContext
Expand All @@ -49,30 +55,32 @@ class StreamMerger<B, G, RM, PM>(
) : PipelineComponent<B, G, RM, PM>(context, searchRequest, externalScope, messageFlowCapacity = messageFlowCapacity) {
private val messageStreams = MultipleStreamHolder(pipelineStreams)
private var resultCountLimit = searchRequest.resultCountLimit
private val processJob: Job

init {
processJob = externalScope.launch { processMessage() }
private val messageFilterByTimestamp: (PipelineStepObject, PipelineStepObject) -> Boolean = when(searchRequest.searchDirection) {
TimeRelation.AFTER -> { msg1, msg2 -> msg1.lastScannedTime.isBefore(msg2.lastScannedTime) }
TimeRelation.BEFORE -> { msg1, msg2 -> msg2.lastScannedTime.isBefore(msg1.lastScannedTime) }
}

private fun timestampInRange(pipelineStepObject: PipelineStepObject): Boolean {
return pipelineStepObject.lastScannedTime.let { timestamp ->
if (searchRequest.searchDirection == TimeRelation.AFTER) {
searchRequest.endTimestamp == null || timestamp.isBeforeOrEqual(searchRequest.endTimestamp)
} else {
searchRequest.endTimestamp == null || timestamp.isAfterOrEqual(searchRequest.endTimestamp)
}
}
private val timestampFilter = when (searchRequest.searchDirection) {
TimeRelation.AFTER -> Instant::isBeforeOrEqual
TimeRelation.BEFORE -> Instant::isAfterOrEqual
}

private fun inTimeRange(pipelineStepObject: PipelineStepObject): Boolean {
return if (pipelineStepObject !is EmptyPipelineObject) {
timestampInRange(pipelineStepObject)
} else {
true
}
private val sequenceFilter: (Long, Long) -> Boolean = when(searchRequest.searchDirection) {
TimeRelation.AFTER -> { current, prev -> current > prev }
TimeRelation.BEFORE -> { current, prev -> current < prev }
}

private val processJob: Job
init {
processJob = externalScope.launch { processMessage() }
}

private fun inTimeRange(pipelineStepObject: PipelineStepObject): Boolean =
pipelineStepObject is EmptyPipelineObject
|| searchRequest.endTimestamp == null
|| timestampFilter(pipelineStepObject.lastScannedTime, searchRequest.endTimestamp)

private suspend fun keepAliveGenerator() {
while (coroutineContext.isActive) {
val scannedObjectCount = messageStreams.getScannedObjectCount()
Expand Down Expand Up @@ -140,36 +148,11 @@ class StreamMerger<B, G, RM, PM>(
}
}

private fun isLess(firstMessage: PipelineStepObject, secondMessage: PipelineStepObject): Boolean {
return firstMessage.lastScannedTime.isBefore(secondMessage.lastScannedTime)
}

private fun isGreater(firstMessage: PipelineStepObject, secondMessage: PipelineStepObject): Boolean {
return firstMessage.lastScannedTime.isAfter(secondMessage.lastScannedTime)
}

private suspend fun getNextMessage(): PipelineStepObject {
return coroutineScope {

val streams =
if (LOGGER.isTraceEnabled())
messageStreams.getLoggingStreamInfo()
else null

let {
if (searchRequest.searchDirection == TimeRelation.AFTER) {
messageStreams.selectMessage { new, old ->
isLess(new, old)
}
} else {
messageStreams.selectMessage { new, old ->
isGreater(new, old)
}
}
}.also {
LOGGER.trace {
"selected ${it.lastProcessedId} - ${it.javaClass.kotlin}-${it.javaClass.hashCode()} ${it.lastScannedTime} out of [${streams}]"
}
private suspend fun getNextMessage(): PipelineStepObject = coroutineScope {
val streams = if (LOGGER.isTraceEnabled()) messageStreams.getLoggingStreamInfo() else null
messageStreams.selectMessage(messageFilterByTimestamp).also {
LOGGER.trace {
"selected ${it.lastProcessedId} - ${it.javaClass.kotlin}-${it.javaClass.hashCode()} ${it.lastScannedTime} out of [${streams}]"
}
}
}
Expand All @@ -178,13 +161,7 @@ class StreamMerger<B, G, RM, PM>(
LOGGER.info { "Getting streams info" }
processJob.join()
LOGGER.debug { "Merge job is finished" }
return messageStreams.getStreamsInfo { current, prev ->
if (searchRequest.searchDirection == TimeRelation.AFTER) {
current > prev
} else {
current < prev
}
}
return messageStreams.getStreamsInfo(sequenceFilter)
}

companion object {
Expand Down

0 comments on commit 5c58bec

Please sign in to comment.