Skip to content

Commit

Permalink
[TH2-4316] Add information about start point for rule work
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed May 31, 2023
1 parent 2dc6311 commit 993d35b
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 42 deletions.
9 changes: 1 addition & 8 deletions src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ForkJoinPool
import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint
import com.exactpro.th2.check1.utils.toMessageID
import com.exactpro.th2.common.message.toJson
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
Expand Down Expand Up @@ -325,14 +326,6 @@ class CollectorService(
return checkNotNull(messageRouter.subscribeAll(listener)) { "Can not subscribe to queues" }
}

private fun SessionKey.toMessageID(sequence: Long) = MessageID.newBuilder()
.setConnectionId(ConnectionID.newBuilder()
.setSessionAlias(sessionAlias)
.build())
.setSequence(sequence)
.setDirection(direction)
.build()

private fun publishCheckpoint(request: CheckpointRequestOrBuilder, checkpoint: Checkpoint, event: Event) {
if (!request.hasParentEventId()) {
if (logger.isWarnEnabled) {
Expand Down
39 changes: 33 additions & 6 deletions src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import com.exactpro.th2.check1.utils.toMessageID
import com.exactpro.th2.common.event.EventUtils.createMessageBean
import com.exactpro.th2.common.util.toInstant

/**
Expand Down Expand Up @@ -153,7 +155,7 @@ abstract class AbstractCheckTask(
@Volatile
protected var started = false

protected fun createRootEvent() = Event.from(submitTime).description(description)
protected fun createRootEvent(): Event = Event.from(submitTime).description(description)

final override fun onStart() {
super.onStart()
Expand Down Expand Up @@ -312,6 +314,7 @@ abstract class AbstractCheckTask(
this.checkpointTimeout = calculateCheckpointTimeout(checkpointTimestamp, taskTimeout.messageTimeout)
this.isDefaultSequence = sequence == DEFAULT_SEQUENCE
val scheduler = Schedulers.from(executorService)
addStartInfo(refs.rootEvent.addSubEventWithSamePeriod(), sequence, checkpointTimestamp)

endFuture = Single.timer(taskTimeout.timeout, MILLISECONDS, Schedulers.computation())
.subscribe { _ -> end(State.TIMEOUT, "Timeout is exited") }
Expand Down Expand Up @@ -353,6 +356,30 @@ abstract class AbstractCheckTask(
}
}

private fun addStartInfo(event: Event, lastSequence: Long, checkpointTimestamp: Timestamp?) {
with(event) {
name(
if (lastSequence == DEFAULT_SEQUENCE) {
"Rule works from the beginning of the cache"
} else {
"Rule works from the $lastSequence sequence in session ${sessionKey.sessionAlias} and direction ${sessionKey.direction}"
}
)
status(PASSED)
type("ruleStartPoint")
if (lastSequence != DEFAULT_SEQUENCE) {
messageID(sessionKey.toMessageID(lastSequence))
}
bodyData(createMessageBean("The rule starts working from " +
(if (lastSequence == DEFAULT_SEQUENCE) "start of cache" else "sequence $lastSequence") +
(checkpointTimestamp?.let {
val instant = checkpointTimestamp.toInstant()
" and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}"
} ?: "")))
bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls"))
}
}

private fun taskFinished() {
var ruleEventStatus = EventStatus.FAILED
try {
Expand All @@ -372,8 +399,8 @@ abstract class AbstractCheckTask(
.name("Check rule $description problem")
.type("Exception")
.status(FAILED)
.bodyData(EventUtils.createMessageBean(message))
.bodyData(EventUtils.createMessageBean(ex.message))
.bodyData(createMessageBean(message))
.bodyData(createMessageBean(ex.message))
.toProto(parentEventID))
.build())
} finally {
Expand Down Expand Up @@ -489,8 +516,8 @@ abstract class AbstractCheckTask(
refs.rootEvent.addSubEventWithSamePeriod()
.name("Check result event cannot build completely")
.type("eventNotComplete")
.bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build"))
.bodyData(EventUtils.createMessageBean(e.message))
.bodyData(createMessageBean("An unexpected exception has been thrown during result check build"))
.bodyData(createMessageBean(e.message))
.status(FAILED)
true
}
Expand All @@ -507,7 +534,7 @@ abstract class AbstractCheckTask(
}
).name("Check task was interrupter because of ${timeoutType.name.lowercase()}")
.bodyData(
EventUtils.createMessageBean(
createMessageBean(
when (timeoutType) {
State.TIMEOUT ->
"Check task was interrupted because the task execution took longer than ${taskTimeout.timeout} mls. " +
Expand Down
26 changes: 26 additions & 0 deletions src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.check1.utils

import com.exactpro.th2.check1.SessionKey
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.MessageID

fun SessionKey.toMessageID(sequence: Long): MessageID = MessageID.newBuilder()
.setConnectionId(
ConnectionID.newBuilder()
.setSessionAlias(sessionAlias)
.build())
.setSequence(sequence)
.setDirection(direction)
.build()
26 changes: 13 additions & 13 deletions src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class TestChain: AbstractCheckTaskTest() {

val task = sequenceCheckRuleTask(listOf(3, 4), eventID, streams).also { it.begin() }
var eventList = awaitEventBatchAndGetEvents(6, 6)
assertEquals(8, eventList.size)
assertEquals(4, eventList.filter { it.status == SUCCESS }.size)
assertEquals(9, eventList.size)
assertEquals(5, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.filter { it.status == FAILED }.size)

sequenceCheckRuleTask(listOf(1, 2), eventID, streams).also { task.subscribeNextTask(it) }
Expand All @@ -110,8 +110,8 @@ class TestChain: AbstractCheckTaskTest() {

val task = sequenceCheckRuleTask(listOf(1, 4), eventID, streams).also { it.begin() }
var eventList = awaitEventBatchAndGetEvents(6, 6)
assertEquals(9, eventList.size)
assertEquals(5, eventList.filter { it.status == SUCCESS }.size)
assertEquals(10, eventList.size)
assertEquals(6, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.filter { it.status == FAILED }.size)

sequenceCheckRuleTask(listOf(2, 3), eventID, streams).also { task.subscribeNextTask(it) }
Expand All @@ -134,8 +134,8 @@ class TestChain: AbstractCheckTaskTest() {
task.begin()

val eventList = awaitEventBatchRequest(1000L, 4 * 2).flatMap(EventBatch::getEventsList)
assertEquals(4 * 3, eventList.size)
assertEquals(4 * 3, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4 * 4, eventList.size)
assertEquals(4 * 4, eventList.filter { it.status == SUCCESS }.size)
assertEquals(listOf(1L, 2L, 3L, 4L), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence))
}

Expand Down Expand Up @@ -240,7 +240,7 @@ class TestChain: AbstractCheckTaskTest() {
val rootEvent = eventsList.first()
assertEquals(FAILED, rootEvent.status)
assertEquals(3, rootEvent.attachedMessageIdsCount)
assertEquals(1, eventsList[2].attachedMessageIdsCount)
assertEquals(1, eventsList.single { it.type == "Verification" }.attachedMessageIdsCount)
assertEquals(FAILED, eventsList.last().status)
})
}
Expand All @@ -250,20 +250,20 @@ class TestChain: AbstractCheckTaskTest() {
awaitEventBatchRequest(1000L, times).drop(times - last).flatMap(EventBatch::getEventsList)

private fun checkSimpleVerifySuccess(eventList: List<Event>, sequence: Long) {
assertEquals(3, eventList.size)
assertEquals(3, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.size)
assertEquals(4, eventList.filter { it.status == SUCCESS }.size)
assertEquals(listOf(sequence), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence))
}

private fun checkSimpleVerifyFailure(eventList: List<Event>) {
assertEquals(3, eventList.size)
assertEquals(1, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.size)
assertEquals(2, eventList.filter { it.status == SUCCESS }.size)
assertEquals(2, eventList.filter { it.status == FAILED }.size)
}

private fun checkSequenceVerifySuccess(eventList: List<Event>, sequences: List<Long>) {
assertEquals(8, eventList.size)
assertEquals(8, eventList.filter { it.status == SUCCESS }.size)
assertEquals(9, eventList.size)
assertEquals(9, eventList.filter { it.status == SUCCESS }.size)
assertEquals(sequences, eventList
.dropWhile { it.type != CHECK_MESSAGES_TYPE } // Skip prefilter
.filter { it.type == VERIFICATION_TYPE }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)

assertEquals(4, eventList.size)
assertEquals(5, eventList.size)
assertTrue(eventList.all { it.status == SUCCESS })

verify(onTaskFinishedMock, timeout(1000).only()).invoke(SUCCESS)
Expand Down Expand Up @@ -171,9 +171,9 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
val task = checkTask(filter, eventID, streams, 200)
task.begin()

val eventBatches = awaitEventBatchRequest(1000L, 3)
val eventBatches = awaitEventBatchRequest(1000L, 4)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertEquals(4, eventList.size)
assertEquals(5, eventList.size)
assertEquals(2, eventList.filter { it.status == FAILED }.size) // Message filter and verification exceed max event batch content size
}

Expand All @@ -199,7 +199,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {

val eventBatch = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatch.flatMap(EventBatch::getEventsList)
assertEquals(4, eventList.size)
assertEquals(5, eventList.size)
assertTrue({
eventList.none { it.status == FAILED }
}) {
Expand Down Expand Up @@ -312,7 +312,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertAll({
assertEquals(3, eventList.size)
assertEquals(4, eventList.size)
}, {
val verificationEvent = eventList.find { it.type == "Verification" }
assertNotNull(verificationEvent) { "Missed verification event" }
Expand Down Expand Up @@ -388,7 +388,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertAll({
assertEquals(3, eventList.size)
assertEquals(4, eventList.size)
}, {
val verificationEvent = eventList.find { it.type == "Verification" }
assertNotNull(verificationEvent) { "Missed verification event" }
Expand Down Expand Up @@ -430,7 +430,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertAll({
assertEquals(3, eventList.size)
assertEquals(4, eventList.size)
}, {
val verificationEvent = eventList.find { it.type == "Verification" }
assertNotNull(verificationEvent) { "Missed verification event" }
Expand Down Expand Up @@ -471,8 +471,8 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {

val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertEquals(4, eventList.size)
assertEquals(4, eventList.filter { it.status == SUCCESS }.size)
assertEquals(5, eventList.size)
assertEquals(5, eventList.filter { it.status == SUCCESS }.size)
}

@Test
Expand Down Expand Up @@ -500,7 +500,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {

val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertEquals(5, eventList.size)
assertEquals(6, eventList.size)
assertEquals(2, eventList.filter { it.status == SUCCESS && it.type == "Verification" }.size)
assertEquals(FAILED, eventList.last().status)
}
Expand Down Expand Up @@ -538,7 +538,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {

val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertEquals(4, eventList.size)
assertEquals(5, eventList.size)
assertEquals(3, eventList.filter { it.status == FAILED }.size)
}

Expand Down Expand Up @@ -595,7 +595,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertAll({
assertEquals(3, eventList.size)
assertEquals(4, eventList.size)
}, {
val verificationEvent = eventList.find { it.type == "Verification" }
assertNotNull(verificationEvent) { "Missed verification event" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import java.util.stream.Stream
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import com.exactpro.th2.common.message.toJson
import org.junit.jupiter.api.Assertions

class TestSequenceCheckTask : AbstractCheckTaskTest() {

Expand Down Expand Up @@ -128,6 +130,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() {

/*
checkSequenceRule
ruleStartPoint
preFiltering
Verification x 3
checkMessages
Expand Down Expand Up @@ -156,8 +159,13 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() {
assertTrue (getEventsList().all { VERIFICATION_TYPE == it.type })
}
with(batchRequest[5]) {
assertEquals(1, eventsCount)
assertEquals("checkSequence", getEvents(0).type)
assertEquals(2, eventsCount)
Assertions.assertEquals(1, eventsList.count { it.type == "checkSequence" }) {
"unexpected count of \"checkSequence\" events: ${eventsList.joinToString { it.toJson() }}"
}
Assertions.assertEquals(1, eventsList.count { it.type == "ruleStartPoint" }) {
"unexpected count of \"ruleStartPoint\" events: ${eventsList.joinToString { it.toJson() }}"
}
}
}, {
val checkedMessages = assertNotNull(eventsList.find { it.type == CHECK_MESSAGES_TYPE }, "Cannot find checkMessages event")
Expand Down Expand Up @@ -767,7 +775,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() {
val eventBatches = awaitEventBatchRequest(1000L, 2)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
assertAll({
assertEquals(3, eventList.size)
assertEquals(4, eventList.size)
assertEquals(1, eventList.filter { it.type == "internalError" }.size)
})
}
Expand Down

0 comments on commit 993d35b

Please sign in to comment.