diff --git a/build.gradle b/build.gradle index 0c45dfc9..1f930ff1 100644 --- a/build.gradle +++ b/build.gradle @@ -229,13 +229,13 @@ test { } application { - mainClassName 'com.exactpro.th2.check1.Check1Main' + mainClass.set('com.exactpro.th2.check1.Check1Main') } applicationName = 'service' distTar { - archiveName "${applicationName}.tar" + archiveFileName.set("${applicationName}.tar") } dockerPrepare { diff --git a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt index 697739d2..58b09fa0 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt @@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CompletableFuture import java.util.concurrent.ForkJoinPool import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint +import com.exactpro.th2.check1.utils.toMessageID import com.exactpro.th2.common.message.toJson import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -325,14 +326,6 @@ class CollectorService( return checkNotNull(messageRouter.subscribeAll(listener)) { "Can not subscribe to queues" } } - private fun SessionKey.toMessageID(sequence: Long) = MessageID.newBuilder() - .setConnectionId(ConnectionID.newBuilder() - .setSessionAlias(sessionAlias) - .build()) - .setSequence(sequence) - .setDirection(direction) - .build() - private fun publishCheckpoint(request: CheckpointRequestOrBuilder, checkpoint: Checkpoint, event: Event) { if (!request.hasParentEventId()) { if (logger.isWarnEnabled) { diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt index 340d1359..8d25041a 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt @@ -70,6 +70,9 @@ import java.util.concurrent.ForkJoinPool import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import com.exactpro.th2.check1.utils.toMessageID +import com.exactpro.th2.common.event.EventUtils.createMessageBean +import com.exactpro.th2.common.util.toInstant /** * Implements common logic for check task. @@ -152,7 +155,7 @@ abstract class AbstractCheckTask( @Volatile protected var started = false - protected fun createRootEvent() = Event.from(submitTime).description(description) + protected fun createRootEvent(): Event = Event.from(submitTime).description(description) final override fun onStart() { super.onStart() @@ -170,7 +173,7 @@ abstract class AbstractCheckTask( super.onError(e) refs.rootEvent.status(FAILED) - .bodyData(EventUtils.createMessageBean(e.message)) + .exception(e, true) end(State.ERROR, "Error ${e.message} received in message stream") } @@ -311,6 +314,7 @@ abstract class AbstractCheckTask( this.checkpointTimeout = calculateCheckpointTimeout(checkpointTimestamp, taskTimeout.messageTimeout) this.isDefaultSequence = sequence == DEFAULT_SEQUENCE val scheduler = Schedulers.from(executorService) + addStartInfo(refs.rootEvent.addSubEventWithSamePeriod(), sequence, checkpointTimestamp) endFuture = Single.timer(taskTimeout.timeout, MILLISECONDS, Schedulers.computation()) .subscribe { _ -> end(State.TIMEOUT, "Timeout is exited") } @@ -352,6 +356,30 @@ abstract class AbstractCheckTask( } } + private fun addStartInfo(event: Event, lastSequence: Long, checkpointTimestamp: Timestamp?) { + with(event) { + name( + if (lastSequence == DEFAULT_SEQUENCE) { + "Rule works from the beginning of the cache" + } else { + "Rule works from the $lastSequence sequence in session ${sessionKey.sessionAlias} and direction ${sessionKey.direction}" + } + ) + status(PASSED) + type("ruleStartPoint") + if (lastSequence != DEFAULT_SEQUENCE) { + messageID(sessionKey.toMessageID(lastSequence)) + } + bodyData(createMessageBean("The rule starts working from " + + (if (lastSequence == DEFAULT_SEQUENCE) "start of cache" else "sequence $lastSequence") + + (checkpointTimestamp?.let { + val instant = checkpointTimestamp.toInstant() + " and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}" + } ?: ""))) + bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls")) + } + } + private fun taskFinished() { var ruleEventStatus = EventStatus.FAILED try { @@ -371,8 +399,8 @@ abstract class AbstractCheckTask( .name("Check rule $description problem") .type("Exception") .status(FAILED) - .bodyData(EventUtils.createMessageBean(message)) - .bodyData(EventUtils.createMessageBean(ex.message)) + .bodyData(createMessageBean(message)) + .bodyData(createMessageBean(ex.message)) .toProto(parentEventID)) .build()) } finally { @@ -424,6 +452,9 @@ abstract class AbstractCheckTask( protected open val skipPublication: Boolean = false + protected open val errorEventOnTimeout: Boolean + get() = true + protected fun isCheckpointLastReceivedMessage(): Boolean = bufferContainsStartMessage && !hasMessagesInTimeoutInterval /** @@ -466,6 +497,9 @@ abstract class AbstractCheckTask( private fun completeEventOrReportError(prevState: State): Boolean { return try { if (started) { + if (errorEventOnTimeout && prevState in TIMEOUT_STATES) { + addTimeoutEvent(prevState) + } completeEvent(prevState) doAfterCompleteEvent() false @@ -482,13 +516,55 @@ abstract class AbstractCheckTask( refs.rootEvent.addSubEventWithSamePeriod() .name("Check result event cannot build completely") .type("eventNotComplete") - .bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build")) - .bodyData(EventUtils.createMessageBean(e.message)) + .bodyData(createMessageBean("An unexpected exception has been thrown during result check build")) + .bodyData(createMessageBean(e.message)) .status(FAILED) true } } + private fun addTimeoutEvent(timeoutType: State) { + val timeoutValue: Long = when (timeoutType) { + State.TIMEOUT -> taskTimeout.timeout + State.MESSAGE_TIMEOUT -> taskTimeout.messageTimeout + else -> error("unexpected timeout state: $timeoutType") + } + refs.rootEvent.addSubEventWithSamePeriod() + .status(FAILED) + .type( + when (timeoutType) { + State.TIMEOUT -> "CheckTimeoutInterrupted" + State.MESSAGE_TIMEOUT -> "CheckMessageTimeoutInterrupted" + else -> error("unexpected timeout state: $timeoutType") + } + ).name("Rule processed $handledMessageCounter message(s) and was interrupted due to $timeoutValue mls ${timeoutType.name.lowercase()}") + .bodyData( + createMessageBean( + when (timeoutType) { + State.TIMEOUT -> timeoutText() + State.MESSAGE_TIMEOUT -> messageTimeoutText() + else -> error("unexpected timeout state: $timeoutType") + } + ) + ) + } + + private fun messageTimeoutText(): String = "Check task was interrupted because the timestamp on the last processed message exceeds the message timeout. " + + (checkpointTimeout + ?.toInstant() + ?.let { + "Rule expects messages between $it and ${it.plusMillis(taskTimeout.messageTimeout)} " + + "but processed one outside this range. Check the messages attached to the root rule event to find all processed messages." + } ?: "But the message timeout is not specified. Contact the developers.") + + private fun timeoutText(): String = + """ + |Check task was interrupted because the task execution took longer than ${taskTimeout.timeout} mls. The possible reasons are: + |* incorrect message filter - rule didn't find a match for all requested messages and kept working until the timeout exceeded (check key fields) + |* incorrect point of start - some of the expected messages were behind the start point and rule couldn't find them (check the checkpoint) + |* lack of the resources - rule might perform slow and didn't get to the expected messages in specified timeout (check component resources) + """.trimMargin() + private fun configureRootEvent() { refs.rootEvent.name(name()).type(type()) setup(refs.rootEvent) @@ -585,6 +661,7 @@ abstract class AbstractCheckTask( private val RESPONSE_EXECUTOR = ForkJoinPool.commonPool() @JvmField val CONVERTER = ProtoToIMessageConverter(VerificationUtil.FACTORY_PROXY, null, null, createParameters().setUseMarkerForNullsInMessage(true)) + private val TIMEOUT_STATES: Set = setOf(State.TIMEOUT, State.MESSAGE_TIMEOUT) val EMPTY_STATUS_CONSUMER: (EventStatus) -> Unit = {} } diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt index 4bbc32d7..d78b64a2 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt @@ -107,6 +107,9 @@ class NoMessageCheckTask( override fun type(): String = "noMessageCheck" + override val errorEventOnTimeout: Boolean + get() = false + override fun setup(rootEvent: Event) { rootEvent.bodyData(EventUtils.createMessageBean("No message check rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)" }}")) } diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt index addf2abb..fe1f35c9 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt @@ -119,6 +119,9 @@ class SilenceCheckTask( } } + override val errorEventOnTimeout: Boolean + get() = false + override fun name(): String = "AutoSilenceCheck" override fun type(): String = "AutoSilenceCheck" diff --git a/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt b/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt new file mode 100644 index 00000000..194f2935 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.check1.utils + +import com.exactpro.th2.check1.SessionKey +import com.exactpro.th2.common.grpc.ConnectionID +import com.exactpro.th2.common.grpc.MessageID + +fun SessionKey.toMessageID(sequence: Long): MessageID = MessageID.newBuilder() + .setConnectionId( + ConnectionID.newBuilder() + .setSessionAlias(sessionAlias) + .build()) + .setSequence(sequence) + .setDirection(direction) + .build() \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt index e1340400..6b2482c8 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt @@ -95,8 +95,8 @@ class TestChain: AbstractCheckTaskTest() { val task = sequenceCheckRuleTask(listOf(3, 4), eventID, streams).also { it.begin() } var eventList = awaitEventBatchAndGetEvents(6, 6) - assertEquals(8, eventList.size) - assertEquals(4, eventList.filter { it.status == SUCCESS }.size) + assertEquals(9, eventList.size) + assertEquals(5, eventList.filter { it.status == SUCCESS }.size) assertEquals(4, eventList.filter { it.status == FAILED }.size) sequenceCheckRuleTask(listOf(1, 2), eventID, streams).also { task.subscribeNextTask(it) } @@ -110,8 +110,8 @@ class TestChain: AbstractCheckTaskTest() { val task = sequenceCheckRuleTask(listOf(1, 4), eventID, streams).also { it.begin() } var eventList = awaitEventBatchAndGetEvents(6, 6) - assertEquals(9, eventList.size) - assertEquals(5, eventList.filter { it.status == SUCCESS }.size) + assertEquals(10, eventList.size) + assertEquals(6, eventList.filter { it.status == SUCCESS }.size) assertEquals(4, eventList.filter { it.status == FAILED }.size) sequenceCheckRuleTask(listOf(2, 3), eventID, streams).also { task.subscribeNextTask(it) } @@ -134,8 +134,8 @@ class TestChain: AbstractCheckTaskTest() { task.begin() val eventList = awaitEventBatchRequest(1000L, 4 * 2).flatMap(EventBatch::getEventsList) - assertEquals(4 * 3, eventList.size) - assertEquals(4 * 3, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4 * 4, eventList.size) + assertEquals(4 * 4, eventList.filter { it.status == SUCCESS }.size) assertEquals(listOf(1L, 2L, 3L, 4L), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence)) } @@ -240,7 +240,7 @@ class TestChain: AbstractCheckTaskTest() { val rootEvent = eventsList.first() assertEquals(FAILED, rootEvent.status) assertEquals(3, rootEvent.attachedMessageIdsCount) - assertEquals(1, eventsList[2].attachedMessageIdsCount) + assertEquals(1, eventsList.single { it.type == "Verification" }.attachedMessageIdsCount) assertEquals(FAILED, eventsList.last().status) }) } @@ -250,20 +250,20 @@ class TestChain: AbstractCheckTaskTest() { awaitEventBatchRequest(1000L, times).drop(times - last).flatMap(EventBatch::getEventsList) private fun checkSimpleVerifySuccess(eventList: List, sequence: Long) { - assertEquals(3, eventList.size) - assertEquals(3, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4, eventList.size) + assertEquals(4, eventList.filter { it.status == SUCCESS }.size) assertEquals(listOf(sequence), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence)) } private fun checkSimpleVerifyFailure(eventList: List) { - assertEquals(3, eventList.size) - assertEquals(1, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4, eventList.size) + assertEquals(2, eventList.filter { it.status == SUCCESS }.size) assertEquals(2, eventList.filter { it.status == FAILED }.size) } private fun checkSequenceVerifySuccess(eventList: List, sequences: List) { - assertEquals(8, eventList.size) - assertEquals(8, eventList.filter { it.status == SUCCESS }.size) + assertEquals(9, eventList.size) + assertEquals(9, eventList.filter { it.status == SUCCESS }.size) assertEquals(sequences, eventList .dropWhile { it.type != CHECK_MESSAGES_TYPE } // Skip prefilter .filter { it.type == VERIFICATION_TYPE } diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt index a534b325..95a1e0fc 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt @@ -116,7 +116,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertTrue(eventList.all { it.status == SUCCESS }) verify(onTaskFinishedMock, timeout(1000).only()).invoke(SUCCESS) @@ -171,9 +171,9 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val task = checkTask(filter, eventID, streams, 200) task.begin() - val eventBatches = awaitEventBatchRequest(1000L, 3) + val eventBatches = awaitEventBatchRequest(1000L, 4) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertEquals(2, eventList.filter { it.status == FAILED }.size) // Message filter and verification exceed max event batch content size } @@ -199,7 +199,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatch = awaitEventBatchRequest(1000L, 2) val eventList = eventBatch.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertTrue({ eventList.none { it.status == FAILED } }) { @@ -312,7 +312,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -388,7 +388,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -430,7 +430,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -471,8 +471,8 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) - assertEquals(4, eventList.filter { it.status == SUCCESS }.size) + assertEquals(5, eventList.size) + assertEquals(5, eventList.filter { it.status == SUCCESS }.size) } @Test @@ -500,7 +500,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(5, eventList.size) + assertEquals(6, eventList.size) assertEquals(2, eventList.filter { it.status == SUCCESS && it.type == "Verification" }.size) assertEquals(FAILED, eventList.last().status) } @@ -538,8 +538,8 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(3, eventList.size) - assertEquals(2, eventList.filter { it.status == FAILED }.size) + assertEquals(5, eventList.size) + assertEquals(3, eventList.filter { it.status == FAILED }.size) } @Test @@ -595,7 +595,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt index ccda1062..2021791a 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt @@ -55,6 +55,8 @@ import java.util.stream.Stream import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue +import com.exactpro.th2.common.message.toJson +import org.junit.jupiter.api.Assertions class TestSequenceCheckTask : AbstractCheckTaskTest() { @@ -128,6 +130,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { /* checkSequenceRule + ruleStartPoint preFiltering Verification x 3 checkMessages @@ -156,8 +159,13 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { assertTrue (getEventsList().all { VERIFICATION_TYPE == it.type }) } with(batchRequest[5]) { - assertEquals(1, eventsCount) - assertEquals("checkSequence", getEvents(0).type) + assertEquals(2, eventsCount) + Assertions.assertEquals(1, eventsList.count { it.type == "checkSequence" }) { + "unexpected count of \"checkSequence\" events: ${eventsList.joinToString { it.toJson() }}" + } + Assertions.assertEquals(1, eventsList.count { it.type == "ruleStartPoint" }) { + "unexpected count of \"ruleStartPoint\" events: ${eventsList.joinToString { it.toJson() }}" + } } }, { val checkedMessages = assertNotNull(eventsList.find { it.type == CHECK_MESSAGES_TYPE }, "Cannot find checkMessages event") @@ -767,7 +775,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) assertEquals(1, eventList.filter { it.type == "internalError" }.size) }) }