diff --git a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt index 697739d2..58b09fa0 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt @@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CompletableFuture import java.util.concurrent.ForkJoinPool import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint +import com.exactpro.th2.check1.utils.toMessageID import com.exactpro.th2.common.message.toJson import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -325,14 +326,6 @@ class CollectorService( return checkNotNull(messageRouter.subscribeAll(listener)) { "Can not subscribe to queues" } } - private fun SessionKey.toMessageID(sequence: Long) = MessageID.newBuilder() - .setConnectionId(ConnectionID.newBuilder() - .setSessionAlias(sessionAlias) - .build()) - .setSequence(sequence) - .setDirection(direction) - .build() - private fun publishCheckpoint(request: CheckpointRequestOrBuilder, checkpoint: Checkpoint, event: Event) { if (!request.hasParentEventId()) { if (logger.isWarnEnabled) { diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt index e3a300e4..d464ddf0 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt @@ -70,6 +70,8 @@ import java.util.concurrent.ForkJoinPool import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import com.exactpro.th2.check1.utils.toMessageID +import com.exactpro.th2.common.event.EventUtils.createMessageBean import com.exactpro.th2.common.util.toInstant /** @@ -153,7 +155,7 @@ abstract class AbstractCheckTask( @Volatile protected var started = false - protected fun createRootEvent() = Event.from(submitTime).description(description) + protected fun createRootEvent(): Event = Event.from(submitTime).description(description) final override fun onStart() { super.onStart() @@ -312,6 +314,7 @@ abstract class AbstractCheckTask( this.checkpointTimeout = calculateCheckpointTimeout(checkpointTimestamp, taskTimeout.messageTimeout) this.isDefaultSequence = sequence == DEFAULT_SEQUENCE val scheduler = Schedulers.from(executorService) + addStartInfo(refs.rootEvent.addSubEventWithSamePeriod(), sequence, checkpointTimestamp) endFuture = Single.timer(taskTimeout.timeout, MILLISECONDS, Schedulers.computation()) .subscribe { _ -> end(State.TIMEOUT, "Timeout is exited") } @@ -353,6 +356,30 @@ abstract class AbstractCheckTask( } } + private fun addStartInfo(event: Event, lastSequence: Long, checkpointTimestamp: Timestamp?) { + with(event) { + name( + if (lastSequence == DEFAULT_SEQUENCE) { + "Rule works from the beginning of the cache" + } else { + "Rule works from the $lastSequence sequence in session ${sessionKey.sessionAlias} and direction ${sessionKey.direction}" + } + ) + status(PASSED) + type("ruleStartPoint") + if (lastSequence != DEFAULT_SEQUENCE) { + messageID(sessionKey.toMessageID(lastSequence)) + } + bodyData(createMessageBean("The rule starts working from " + + (if (lastSequence == DEFAULT_SEQUENCE) "start of cache" else "sequence $lastSequence") + + (checkpointTimestamp?.let { + val instant = checkpointTimestamp.toInstant() + " and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}" + } ?: ""))) + bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls")) + } + } + private fun taskFinished() { var ruleEventStatus = EventStatus.FAILED try { @@ -372,8 +399,8 @@ abstract class AbstractCheckTask( .name("Check rule $description problem") .type("Exception") .status(FAILED) - .bodyData(EventUtils.createMessageBean(message)) - .bodyData(EventUtils.createMessageBean(ex.message)) + .bodyData(createMessageBean(message)) + .bodyData(createMessageBean(ex.message)) .toProto(parentEventID)) .build()) } finally { @@ -489,8 +516,8 @@ abstract class AbstractCheckTask( refs.rootEvent.addSubEventWithSamePeriod() .name("Check result event cannot build completely") .type("eventNotComplete") - .bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build")) - .bodyData(EventUtils.createMessageBean(e.message)) + .bodyData(createMessageBean("An unexpected exception has been thrown during result check build")) + .bodyData(createMessageBean(e.message)) .status(FAILED) true } @@ -507,7 +534,7 @@ abstract class AbstractCheckTask( } ).name("Check task was interrupter because of ${timeoutType.name.lowercase()}") .bodyData( - EventUtils.createMessageBean( + createMessageBean( when (timeoutType) { State.TIMEOUT -> "Check task was interrupted because the task execution took longer than ${taskTimeout.timeout} mls. " + diff --git a/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt b/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt new file mode 100644 index 00000000..194f2935 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.check1.utils + +import com.exactpro.th2.check1.SessionKey +import com.exactpro.th2.common.grpc.ConnectionID +import com.exactpro.th2.common.grpc.MessageID + +fun SessionKey.toMessageID(sequence: Long): MessageID = MessageID.newBuilder() + .setConnectionId( + ConnectionID.newBuilder() + .setSessionAlias(sessionAlias) + .build()) + .setSequence(sequence) + .setDirection(direction) + .build() \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt index e1340400..6b2482c8 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt @@ -95,8 +95,8 @@ class TestChain: AbstractCheckTaskTest() { val task = sequenceCheckRuleTask(listOf(3, 4), eventID, streams).also { it.begin() } var eventList = awaitEventBatchAndGetEvents(6, 6) - assertEquals(8, eventList.size) - assertEquals(4, eventList.filter { it.status == SUCCESS }.size) + assertEquals(9, eventList.size) + assertEquals(5, eventList.filter { it.status == SUCCESS }.size) assertEquals(4, eventList.filter { it.status == FAILED }.size) sequenceCheckRuleTask(listOf(1, 2), eventID, streams).also { task.subscribeNextTask(it) } @@ -110,8 +110,8 @@ class TestChain: AbstractCheckTaskTest() { val task = sequenceCheckRuleTask(listOf(1, 4), eventID, streams).also { it.begin() } var eventList = awaitEventBatchAndGetEvents(6, 6) - assertEquals(9, eventList.size) - assertEquals(5, eventList.filter { it.status == SUCCESS }.size) + assertEquals(10, eventList.size) + assertEquals(6, eventList.filter { it.status == SUCCESS }.size) assertEquals(4, eventList.filter { it.status == FAILED }.size) sequenceCheckRuleTask(listOf(2, 3), eventID, streams).also { task.subscribeNextTask(it) } @@ -134,8 +134,8 @@ class TestChain: AbstractCheckTaskTest() { task.begin() val eventList = awaitEventBatchRequest(1000L, 4 * 2).flatMap(EventBatch::getEventsList) - assertEquals(4 * 3, eventList.size) - assertEquals(4 * 3, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4 * 4, eventList.size) + assertEquals(4 * 4, eventList.filter { it.status == SUCCESS }.size) assertEquals(listOf(1L, 2L, 3L, 4L), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence)) } @@ -240,7 +240,7 @@ class TestChain: AbstractCheckTaskTest() { val rootEvent = eventsList.first() assertEquals(FAILED, rootEvent.status) assertEquals(3, rootEvent.attachedMessageIdsCount) - assertEquals(1, eventsList[2].attachedMessageIdsCount) + assertEquals(1, eventsList.single { it.type == "Verification" }.attachedMessageIdsCount) assertEquals(FAILED, eventsList.last().status) }) } @@ -250,20 +250,20 @@ class TestChain: AbstractCheckTaskTest() { awaitEventBatchRequest(1000L, times).drop(times - last).flatMap(EventBatch::getEventsList) private fun checkSimpleVerifySuccess(eventList: List, sequence: Long) { - assertEquals(3, eventList.size) - assertEquals(3, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4, eventList.size) + assertEquals(4, eventList.filter { it.status == SUCCESS }.size) assertEquals(listOf(sequence), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence)) } private fun checkSimpleVerifyFailure(eventList: List) { - assertEquals(3, eventList.size) - assertEquals(1, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4, eventList.size) + assertEquals(2, eventList.filter { it.status == SUCCESS }.size) assertEquals(2, eventList.filter { it.status == FAILED }.size) } private fun checkSequenceVerifySuccess(eventList: List, sequences: List) { - assertEquals(8, eventList.size) - assertEquals(8, eventList.filter { it.status == SUCCESS }.size) + assertEquals(9, eventList.size) + assertEquals(9, eventList.filter { it.status == SUCCESS }.size) assertEquals(sequences, eventList .dropWhile { it.type != CHECK_MESSAGES_TYPE } // Skip prefilter .filter { it.type == VERIFICATION_TYPE } diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt index e8b3d79a..95a1e0fc 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt @@ -116,7 +116,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertTrue(eventList.all { it.status == SUCCESS }) verify(onTaskFinishedMock, timeout(1000).only()).invoke(SUCCESS) @@ -171,9 +171,9 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val task = checkTask(filter, eventID, streams, 200) task.begin() - val eventBatches = awaitEventBatchRequest(1000L, 3) + val eventBatches = awaitEventBatchRequest(1000L, 4) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertEquals(2, eventList.filter { it.status == FAILED }.size) // Message filter and verification exceed max event batch content size } @@ -199,7 +199,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatch = awaitEventBatchRequest(1000L, 2) val eventList = eventBatch.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertTrue({ eventList.none { it.status == FAILED } }) { @@ -312,7 +312,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -388,7 +388,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -430,7 +430,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -471,8 +471,8 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) - assertEquals(4, eventList.filter { it.status == SUCCESS }.size) + assertEquals(5, eventList.size) + assertEquals(5, eventList.filter { it.status == SUCCESS }.size) } @Test @@ -500,7 +500,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(5, eventList.size) + assertEquals(6, eventList.size) assertEquals(2, eventList.filter { it.status == SUCCESS && it.type == "Verification" }.size) assertEquals(FAILED, eventList.last().status) } @@ -538,7 +538,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertEquals(3, eventList.filter { it.status == FAILED }.size) } @@ -595,7 +595,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt index ccda1062..2021791a 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt @@ -55,6 +55,8 @@ import java.util.stream.Stream import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue +import com.exactpro.th2.common.message.toJson +import org.junit.jupiter.api.Assertions class TestSequenceCheckTask : AbstractCheckTaskTest() { @@ -128,6 +130,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { /* checkSequenceRule + ruleStartPoint preFiltering Verification x 3 checkMessages @@ -156,8 +159,13 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { assertTrue (getEventsList().all { VERIFICATION_TYPE == it.type }) } with(batchRequest[5]) { - assertEquals(1, eventsCount) - assertEquals("checkSequence", getEvents(0).type) + assertEquals(2, eventsCount) + Assertions.assertEquals(1, eventsList.count { it.type == "checkSequence" }) { + "unexpected count of \"checkSequence\" events: ${eventsList.joinToString { it.toJson() }}" + } + Assertions.assertEquals(1, eventsList.count { it.type == "ruleStartPoint" }) { + "unexpected count of \"ruleStartPoint\" events: ${eventsList.joinToString { it.toJson() }}" + } } }, { val checkedMessages = assertNotNull(eventsList.find { it.type == CHECK_MESSAGES_TYPE }, "Cannot find checkMessages event") @@ -767,7 +775,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) assertEquals(1, eventList.filter { it.type == "internalError" }.size) }) }