Skip to content

Commit

Permalink
Correct information in events and get rid of tuples (#205)
Browse files Browse the repository at this point in the history
* Correct information in events and get rid of tuples

* Correct flaky test
  • Loading branch information
OptimumCode authored Jan 16, 2024
1 parent 17e6ef9 commit 35eefda
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
18 changes: 13 additions & 5 deletions src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import com.exactpro.th2.common.grpc.MessageID
import com.exactpro.th2.common.message.toJson
import com.exactpro.th2.common.schema.message.DeliveryMetadata
import com.exactpro.th2.common.grpc.RequestStatus
import com.exactpro.th2.common.schema.message.MessageListener
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.schema.message.SubscriberMonitor
import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch
Expand All @@ -58,7 +57,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ForkJoinPool
import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint
import com.exactpro.th2.common.message.toJson
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicLong
Expand Down Expand Up @@ -138,13 +136,19 @@ class CollectorService(
ruleFactory = RuleFactory(configuration, streamObservable, eventBatchRouter)
}

private fun prepareStoringResults(storeResult: Boolean): Triple<Long, CompletableFuture<Pair<EventStatus, Instant>>?, ((EventStatus) -> Unit)> {
private data class StoringResult(
val id: Long,
val resultFuture: CompletableFuture<Pair<EventStatus, Instant>>?,
val onTaskFinished: (EventStatus) -> Unit,
)

private fun prepareStoringResults(storeResult: Boolean): StoringResult {
return if (storeResult) {
val future = CompletableFuture<Pair<EventStatus, Instant>>()
val ruleId = ruleIdCounter.incrementAndGet()
Triple(ruleId, future) { future.complete(it to Instant.now()) }
StoringResult(ruleId, future) { future.complete(it to Instant.now()) }
} else {
Triple(0L, null, AbstractCheckTask.EMPTY_STATUS_CONSUMER)
EMPTY_STORING_RESULT
}
}

Expand Down Expand Up @@ -410,4 +414,8 @@ class CollectorService(
.setSequence(data.sequence)
.setDirection(direction)
.build()

companion object {
private val EMPTY_STORING_RESULT = StoringResult(0L, null, AbstractCheckTask.EMPTY_STATUS_CONSUMER)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ abstract class AbstractCheckTask(

protected open class Refs(
val rootEvent: Event,
val onTaskFinished: ((EventStatus) -> Unit)
val onTaskFinished: (EventStatus) -> Unit
)

protected class RefsKeeper<T : Refs>(refs: T) {
Expand Down Expand Up @@ -396,7 +396,11 @@ abstract class AbstractCheckTask(
(if (lastSequence == DEFAULT_SEQUENCE) "start of cache" else "sequence $lastSequence") +
(if (checkpointTimestamp != null && !Timestamp.getDefaultInstance().equals(checkpointTimestamp)) {
val instant = checkpointTimestamp.toInstant()
" and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}"
if (taskTimeout.messageTimeout > 0) {
" and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}"
} else {
" and expects messages from $instant until rule is stopped by the timeout"
}
} else "")))
bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls"))
}
Expand Down
9 changes: 8 additions & 1 deletion src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import com.exactpro.th2.common.grpc.ValueFilter
import com.exactpro.th2.common.utils.message.ProtoMessageHolder
import com.exactpro.th2.common.utils.message.TransportMessageHolder
import com.exactpro.th2.common.value.toValue
import com.google.protobuf.util.Timestamps
import io.reactivex.Observable
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
Expand Down Expand Up @@ -145,10 +146,16 @@ class TestChain : AbstractCheckTaskTest() {
val eventList = awaitEventBatchRequest(1000L, 4 * 2).flatMap(EventBatch::getEventsList)
assertEquals(4 * 4, eventList.size)
assertEquals(4 * 4, eventList.filter { it.status == SUCCESS }.size)
// Event publication is done in a separate thread,
// so, there is no guarantee that flatted event list will be in the right order.
// But we can sort events by timestamp and it should be enough
assertEquals(
listOf(1L, 2L, 3L, 4L),
eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList)
eventList.asSequence()
.sortedWith(compareBy(Timestamps.comparator()) { it.id.startTimestamp })
.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList)
.map(MessageID::getSequence)
.toList()
)
}

Expand Down

0 comments on commit 35eefda

Please sign in to comment.