diff --git a/.github/workflows/dev-docker-publish.yml b/.github/workflows/dev-docker-publish.yml deleted file mode 100644 index f2d1c4de..00000000 --- a/.github/workflows/dev-docker-publish.yml +++ /dev/null @@ -1,48 +0,0 @@ -name: Dev build and publish Docker distributions to Github Container Registry ghcr.io - -on: - push: - branches-ignore: - - master - - version-* - - dependabot* - paths-ignore: - - README.md - -jobs: - build: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v2 - # Prepare custom build version - - name: Get branch name - id: branch - run: echo ::set-output name=branch_name::${GITHUB_REF#refs/*/} - - name: Get release_version - id: ver - uses: christian-draeger/read-properties@1.0.1 - with: - path: gradle.properties - property: release_version - - name: Build custom release version - id: release_ver - run: echo ::set-output name=value::"${{ steps.ver.outputs.value }}-${{ steps.branch.outputs.branch_name }}-${{ github.run_id }}" - - name: Show custom release version - run: echo ${{ steps.release_ver.outputs.value }} - # Build and publish image - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v1 - - uses: docker/login-action@v1 - with: - registry: ghcr.io - username: ${{ github.repository_owner }} - password: ${{ secrets.CR_PAT }} - - run: echo "::set-output name=REPOSITORY_NAME::$(echo '${{ github.repository }}' | awk -F '/' '{print $2}')" - id: meta - - name: Build and push - id: docker_build - uses: docker/build-push-action@v2 - with: - push: true - tags: ghcr.io/${{ github.repository }}:${{ steps.release_ver.outputs.value }} - labels: com.exactpro.th2.${{ steps.meta.outputs.REPOSITORY_NAME }}=${{ steps.ver.outputs.value }} \ No newline at end of file diff --git a/.github/workflows/dev-java-publish-sonatype-and-docker.yml b/.github/workflows/dev-java-publish-sonatype-and-docker.yml new file mode 100644 index 00000000..a9816d9c --- /dev/null +++ b/.github/workflows/dev-java-publish-sonatype-and-docker.yml @@ -0,0 +1,30 @@ +name: Dev build and publish Java distributions to sonatype snapshot repository + +on: + push: + branches-ignore: + - master + - version-* + - dependabot* + paths-ignore: + - README.md + # paths: + # - gradle.properties + +jobs: + build-job: + uses: th2-net/.github/.github/workflows/compound-java-dev.yml@main + with: + build-target: 'Sonatype,Docker' + runsOn: ubuntu-latest + gradleVersion: '7' + docker-username: ${{ github.actor }} + secrets: + sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} + sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} + sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} + sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} + docker-password: ${{ secrets.GITHUB_TOKEN }} + + + diff --git a/.github/workflows/dev-java-publish-sonatype.yml b/.github/workflows/dev-java-publish-sonatype.yml deleted file mode 100644 index f516fd9d..00000000 --- a/.github/workflows/dev-java-publish-sonatype.yml +++ /dev/null @@ -1,53 +0,0 @@ -name: Dev build and publish Java distributions to sonatype snapshot repository - -on: - push: - branches-ignore: - - master - - version-* - - dependabot* - paths-ignore: - - README.md - # paths: - # - gradle.properties - -jobs: - build: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 -# Prepare custom build version - - name: Get branch name - id: branch - run: echo ::set-output name=branch_name::${GITHUB_REF#refs/*/} - - name: Get release_version - id: ver - uses: christian-draeger/read-properties@1.0.1 - with: - path: gradle.properties - property: release_version - - name: Build custom release version - id: release_ver - run: echo ::set-output name=value::"${{ steps.ver.outputs.value }}-${{ steps.branch.outputs.branch_name }}-${{ github.run_id }}-SNAPSHOT" - - name: Write custom release version to file - uses: christian-draeger/write-properties@1.0.1 - with: - path: gradle.properties - property: release_version - value: ${{ steps.release_ver.outputs.value }} - - name: Show custom release version - run: echo ${{ steps.release_ver.outputs.value }} -# Build and publish package - - name: Set up JDK 11 - uses: actions/setup-java@v1 - with: - java-version: '11' - - name: Build with Gradle - run: ./gradlew --info clean build publish - env: - ORG_GRADLE_PROJECT_sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} - ORG_GRADLE_PROJECT_sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} - ORG_GRADLE_PROJECT_signingKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} - ORG_GRADLE_PROJECT_signingPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} - diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml deleted file mode 100644 index f01d7de0..00000000 --- a/.github/workflows/docker-publish.yml +++ /dev/null @@ -1,37 +0,0 @@ -name: Build and publish Docker distributions to Github Container Registry ghcr.io - -on: - push: - branches: - - master - - version-* - paths: - - gradle.properties - -jobs: - build: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v2 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v1 - - uses: docker/login-action@v1 - with: - registry: ghcr.io - username: ${{ github.repository_owner }} - password: ${{ secrets.CR_PAT }} - - run: echo "::set-output name=REPOSITORY_NAME::$(echo '${{ github.repository }}' | awk -F '/' '{print $2}')" - id: meta - - name: Read version from gradle.properties - id: read_property - uses: christian-draeger/read-properties@1.0.1 - with: - path: ./gradle.properties - property: release_version - - name: Build and push - id: docker_build - uses: docker/build-push-action@v2 - with: - push: true - tags: ghcr.io/${{ github.repository }}:${{ steps.read_property.outputs.value }} - labels: com.exactpro.th2.${{ steps.meta.outputs.REPOSITORY_NAME }}=${{ steps.read_property.outputs.value }} diff --git a/.github/workflows/java-publish-sonatype-and-docker.yml b/.github/workflows/java-publish-sonatype-and-docker.yml new file mode 100644 index 00000000..630b5598 --- /dev/null +++ b/.github/workflows/java-publish-sonatype-and-docker.yml @@ -0,0 +1,24 @@ +name: Build and release Java distributions to sonatype. + +on: + push: + branches: + - master + - version-* + paths: + - gradle.properties + +jobs: + build: + uses: th2-net/.github/.github/workflows/compound-java.yml@main + with: + build-target: 'Sonatype,Docker' + runsOn: ubuntu-latest + gradleVersion: '7' + docker-username: ${{ github.actor }} + secrets: + sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} + sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} + sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} + sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} + docker-password: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/java-publish-sonatype.yml b/.github/workflows/java-publish-sonatype.yml deleted file mode 100644 index b8c36688..00000000 --- a/.github/workflows/java-publish-sonatype.yml +++ /dev/null @@ -1,27 +0,0 @@ -name: Build and release Java distributions to sonatype. - -on: - push: - branches: - - master - - version-* - paths: - - gradle.properties - -jobs: - build: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Set up JDK 11 - uses: actions/setup-java@v1 - with: - java-version: '11' - - name: Build with Gradle - run: ./gradlew --info clean build publish closeAndReleaseSonatypeStagingRepository - env: - ORG_GRADLE_PROJECT_sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} - ORG_GRADLE_PROJECT_sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} - ORG_GRADLE_PROJECT_signingKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} - ORG_GRADLE_PROJECT_signingPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} diff --git a/README.md b/README.md index 760e3627..e61c0c23 100644 --- a/README.md +++ b/README.md @@ -206,11 +206,11 @@ The `th2_check1_active_tasks_number` metric separate rules with label `rule_type + Support for disabling of order verification for simple collection + Switch for events publication in checkpoint request. Parameter `enable-checkpoint-events-publication` should be used for that. -### 3.9.1 - #### Changed: + Migrated `sailfish-utils` version from `3.12.2` to `3.12.3` + Improved condition output format for `EQ_PRECISION`, `WILDCARD`, `LIKE`, `IN`, `MORE`, `LESS` operations and their negative versions ++ Changed the way the check1 works with threads internally. + Now it uses a common executor for running check rules instead of creating an executor per each rule ### 3.9.0 diff --git a/build.gradle b/build.gradle index a4521038..10b994b6 100644 --- a/build.gradle +++ b/build.gradle @@ -191,13 +191,13 @@ test { } application { - mainClassName 'com.exactpro.th2.check1.Check1Main' + mainClass.set('com.exactpro.th2.check1.Check1Main') } applicationName = 'service' distTar { - archiveName "${applicationName}.tar" + archiveFileName.set("${applicationName}.tar") } dockerPrepare { diff --git a/src/main/java/com/exactpro/th2/check1/util/VerificationUtil.java b/src/main/java/com/exactpro/th2/check1/util/VerificationUtil.java index a7133e93..3ed58b77 100644 --- a/src/main/java/com/exactpro/th2/check1/util/VerificationUtil.java +++ b/src/main/java/com/exactpro/th2/check1/util/VerificationUtil.java @@ -15,9 +15,7 @@ */ package com.exactpro.th2.check1.util; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import com.exactpro.sf.aml.AMLLangConst; @@ -85,7 +83,7 @@ private static void toMetaContainer(String fieldName, ValueFilter value, MetaCon Set keyFields, boolean listItemAsSeparate) { if (value.hasMessageFilter()) { parent.add(fieldName, toMetaContainer(value.getMessageFilter(), listItemAsSeparate)); - } else if (value.hasListFilter() && value.getListFilter().getValues(0).hasMessageFilter()) { + } else if (value.hasListFilter() && containsMessageFilter(value.getListFilter())) { if (listItemAsSeparate) { convertListAsSeparateContainers(parent, fieldName, value.getListFilter()); } else { @@ -116,4 +114,8 @@ private static void convertListAsSeparateContainers(MetaContainer parent, String } parent.add(fieldName, result); } + + private static boolean containsMessageFilter(ListValueFilter listFilter) { + return listFilter.getValuesCount() > 0 && listFilter.getValues(0).hasMessageFilter(); + } } diff --git a/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt b/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt index 1161fa81..6378c9b4 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt @@ -13,6 +13,7 @@ package com.exactpro.th2.check1 import io.reactivex.observers.DisposableObserver +import mu.KotlinLogging import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -26,8 +27,7 @@ abstract class AbstractSessionObserver : DisposableObserver() { } companion object { - @Suppress("JAVA_CLASS_ON_COMPANION") @JvmField - val LOGGER: Logger = LoggerFactory.getLogger(javaClass.enclosingClass) + val LOGGER: Logger = LoggerFactory.getLogger(AbstractSessionObserver::class.java) } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt index c516025e..4826f1ae 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt @@ -19,8 +19,8 @@ import com.exactpro.th2.check1.grpc.ChainID import com.exactpro.th2.check1.grpc.CheckRuleRequest import com.exactpro.th2.check1.grpc.CheckSequenceRuleRequest import com.exactpro.th2.check1.grpc.CheckpointRequestOrBuilder -import com.exactpro.th2.check1.metrics.BufferMetric import com.exactpro.th2.check1.grpc.NoMessageCheckRequest +import com.exactpro.th2.check1.metrics.BufferMetric import com.exactpro.th2.check1.rule.AbstractCheckTask import com.exactpro.th2.check1.rule.RuleFactory import com.exactpro.th2.common.event.Event @@ -34,16 +34,20 @@ import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.common.schema.message.SubscriberMonitor import com.fasterxml.jackson.core.JsonProcessingException +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.protobuf.TextFormat.shortDebugString import io.reactivex.Observable import io.reactivex.subjects.PublishSubject -import org.slf4j.LoggerFactory -import java.io.IOException +import mu.KotlinLogging import java.time.Instant import java.time.temporal.ChronoUnit import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.ForkJoinPool +import java.util.concurrent.TimeUnit import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint +import com.exactpro.th2.check1.utils.toMessageID import com.exactpro.th2.common.message.toJson class CollectorService( @@ -52,7 +56,7 @@ class CollectorService( private val configuration: Check1Configuration, ) { - private val logger = LoggerFactory.getLogger(javaClass.name + '@' + hashCode()) + private val logger = KotlinLogging.logger(javaClass.name + '@' + hashCode()) /** * Queue name to subscriber. Messages with different connectivity can be transferred with one queue. @@ -66,7 +70,9 @@ class CollectorService( private val olderThanDelta = configuration.cleanupOlderThan private val olderThanTimeUnit = configuration.cleanupTimeUnit private val defaultAutoSilenceCheck: Boolean = configuration.isAutoSilenceCheckAfterSequenceRule - + private val commonRuleExecutor: ExecutorService = Executors.newSingleThreadExecutor( + ThreadFactoryBuilder().setNameFormat("rule-executor-%d").build() + ) private var ruleFactory: RuleFactory init { @@ -142,7 +148,7 @@ class CollectorService( } else { checkpoint } - value?.subscribeNextTask(this) ?: begin(realCheckpoint) + value?.subscribeNextTask(this) ?: begin(realCheckpoint, commonRuleExecutor) } private fun CheckRuleRequest.getChainIdOrGenerate(): ChainID { @@ -177,13 +183,13 @@ class CollectorService( val endTime = task.endTime when { !olderThan(now, delta, unit, endTime) -> false - task.tryShutdownExecutor() -> { - logger.info("Removed task ${task.description} ($endTime) from tasks map") - true - } else -> { - logger.warn("Task ${task.description} can't be removed because it has a continuation") - false + !task.hasNextRule().also { canBeRemoved -> + when { + canBeRemoved -> logger.info("Removed task ${task.description} ($endTime) from tasks map") + else -> logger.warn("Task ${task.description} can't be removed because it has a continuation") + } + } } } } @@ -196,10 +202,7 @@ class CollectorService( private fun sendEvents(parentEventID: EventID, event: Event) { logger.debug("Sending event thee id '{}' parent id '{}'", event.id, parentEventID) - val batch = EventBatch.newBuilder() - .setParentEventId(parentEventID) - .addAllEvents(event.toProtoEvents(parentEventID.id)) - .build() + val batch = event.toBatchProto(parentEventID) ForkJoinPool.commonPool().execute { try { @@ -221,26 +224,28 @@ class CollectorService( } fun close() { - try { - subscriberMonitor.unsubscribe() - } catch (e: IOException) { - logger.error("Close subscriber failure", e) + runCatching(subscriberMonitor::unsubscribe).onFailure { + logger.error(it) { "Close subscriber failure" } } mqSubject.onComplete() + runCatching { + commonRuleExecutor.shutdown() + val timeout: Long = 10 + val unit = TimeUnit.SECONDS + if (!commonRuleExecutor.awaitTermination(timeout, unit)) { + logger.warn { "Cannot shutdown executor during ${unit.toMillis(timeout)} ms. Force shutdown" } + val remainingTasks = commonRuleExecutor.shutdownNow() + logger.warn { "Tasks left: ${remainingTasks.size}" } + } + }.onFailure { + logger.error(it) { "Cannot shutdown common task executor" } + } } private fun subscribe(listener: MessageListener): SubscriberMonitor { return checkNotNull(messageRouter.subscribeAll(listener)) { "Can not subscribe to queues" } } - private fun SessionKey.toMessageID(sequence: Long) = MessageID.newBuilder() - .setConnectionId(ConnectionID.newBuilder() - .setSessionAlias(sessionAlias) - .build()) - .setSequence(sequence) - .setDirection(direction) - .build() - private fun publishCheckpoint(request: CheckpointRequestOrBuilder, checkpoint: Checkpoint, event: Event) { if (!request.hasParentEventId()) { if (logger.isWarnEnabled) { diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt index b375479c..0047a8e4 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt @@ -66,6 +66,9 @@ import java.util.concurrent.ForkJoinPool import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import com.exactpro.th2.check1.utils.toMessageID +import com.exactpro.th2.common.event.EventUtils.createMessageBean +import com.exactpro.th2.common.util.toInstant /** * Implements common logic for check task. @@ -145,16 +148,17 @@ abstract class AbstractCheckTask( @Volatile protected var started = false - protected fun createRootEvent() = Event.from(submitTime).description(description) + protected fun createRootEvent(): Event = Event.from(submitTime).description(description) final override fun onStart() { super.onStart() - started = true //Init or re-init variable in TASK_SCHEDULER thread handledMessageCounter = 0 onStartInit() + + started = true } protected abstract fun onStartInit() @@ -163,22 +167,14 @@ abstract class AbstractCheckTask( super.onError(e) refs.rootEvent.status(FAILED) - .bodyData(EventUtils.createMessageBean(e.message)) + .exception(e, true) end(State.ERROR, "Error ${e.message} received in message stream") } /** - * Shutdown the executor that is used to perform this task in case it doesn't have a next task - * @return true if the task doesn't have a next task, otherwise it will return false + * Returns `true` if the rule has a continuation (the rule that should start after the current on is finished) */ - fun tryShutdownExecutor(): Boolean { - if (hasNextTask.get()) { - LOGGER.warn("Cannot shutdown executor for task '$description' that has a connected task") - return false - } - executorService.shutdown() - return true - } + fun hasNextRule(): Boolean = hasNextTask.get() /** * Registers a task as the next task in the continuous verification chain. Its [begin] method will be called @@ -192,14 +188,8 @@ abstract class AbstractCheckTask( if (hasNextTask.compareAndSet(false, true)) { onChainedTaskSubscription() sequenceSubject.subscribe { legacy -> - val executor = if (legacy.executorService.isShutdown) { - LOGGER.warn("Executor has been shutdown before next task has been subscribed. Create a new one") - createExecutorService() - } else { - legacy.executorService - } legacy.sequenceData.apply { - checkTask.begin(lastSequence, lastMessageTimestamp, executor, PreviousExecutionData(untrusted, completed)) + checkTask.begin(lastSequence, lastMessageTimestamp, executorService, PreviousExecutionData(untrusted, completed)) } } LOGGER.info("Task {} ({}) subscribed to task {} ({})", checkTask.description, checkTask.hashCode(), description, hashCode()) @@ -210,14 +200,14 @@ abstract class AbstractCheckTask( /** * Observe a message sequence from the checkpoint. - * Task subscribe to messages stream with its sequence after call. + * Task subscribe to message's stream with its sequence after call. * This method should be called only once otherwise it throws IllegalStateException. * @param checkpoint message sequence and checkpoint timestamp from previous task. * @throws IllegalStateException when method is called more than once. */ - fun begin(checkpoint: Checkpoint? = null) { + fun begin(checkpoint: Checkpoint? = null, executorService: ExecutorService = createExecutorService()) { val checkpointData = checkpoint?.getCheckpointData(sessionKey) - begin(checkpointData?.sequence ?: DEFAULT_SEQUENCE, checkpointData?.timestamp) + begin(checkpointData?.sequence ?: DEFAULT_SEQUENCE, checkpointData?.timestamp, executorService) } /** @@ -276,7 +266,7 @@ abstract class AbstractCheckTask( /** * Observe a message sequence from the previous task. - * Task subscribe to messages stream with sequence after call. + * Task subscribe to message's stream with sequence after call. * This method should be called only once otherwise it throws IllegalStateException. * @param sequence message sequence from the previous task. * @param checkpointTimestamp checkpoint timestamp from the previous task @@ -288,7 +278,7 @@ abstract class AbstractCheckTask( private fun begin( sequence: Long = DEFAULT_SEQUENCE, checkpointTimestamp: Timestamp? = null, - executorService: ExecutorService = createExecutorService(), + executorService: ExecutorService, previousExecutionData: PreviousExecutionData = PreviousExecutionData.DEFAULT ) { configureRootEvent() @@ -304,6 +294,7 @@ abstract class AbstractCheckTask( this.checkpointTimeout = calculateCheckpointTimeout(checkpointTimestamp, taskTimeout.messageTimeout) this.isDefaultSequence = sequence == DEFAULT_SEQUENCE val scheduler = Schedulers.from(executorService) + addStartInfo(refs.rootEvent.addSubEventWithSamePeriod(), sequence, checkpointTimestamp) endFuture = Single.timer(taskTimeout.timeout, MILLISECONDS, Schedulers.computation()) .subscribe { _ -> end(State.TIMEOUT, "Timeout is exited") } @@ -345,9 +336,40 @@ abstract class AbstractCheckTask( } } + private fun addStartInfo(event: Event, lastSequence: Long, checkpointTimestamp: Timestamp?) { + with(event) { + name( + if (lastSequence == DEFAULT_SEQUENCE) { + "Rule works from the beginning of the cache" + } else { + "Rule works from the $lastSequence sequence in session ${sessionKey.sessionAlias} and direction ${sessionKey.direction}" + } + ) + status(PASSED) + type("ruleStartPoint") + if (lastSequence != DEFAULT_SEQUENCE) { + messageID(sessionKey.toMessageID(lastSequence)) + } + bodyData(createMessageBean("The rule starts working from " + + (if (lastSequence == DEFAULT_SEQUENCE) "start of cache" else "sequence $lastSequence") + + (checkpointTimestamp?.let { + val instant = checkpointTimestamp.toInstant() + " and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}" + } ?: ""))) + bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls")) + } + } + private fun taskFinished() { try { - val currentState = taskState.get() + val currentState = taskState.updateAndGet { + when (it) { + // When we complete because of the stream completion the unsubscribe method might be called before the complete method + // Because of that we need to check the status and use the completion status if we are still in BEGIN state + State.BEGIN -> streamCompletedState + else -> it + } + } LOGGER.info("Finishes task '$description' in state ${currentState.name}") if (currentState.callOnTimeoutCallback) { callOnTimeoutCallback() @@ -363,14 +385,21 @@ abstract class AbstractCheckTask( .name("Check rule $description problem") .type("Exception") .status(FAILED) - .bodyData(EventUtils.createMessageBean(message)) - .bodyData(EventUtils.createMessageBean(ex.message)) + .bodyData(createMessageBean(message)) + .bodyData(createMessageBean(ex.message)) .toProto(parentEventID)) .build()) } finally { RuleMetric.decrementActiveRule(type()) refsKeeper.eraseRefs() - sequenceSubject.onSuccess(Legacy(executorService, SequenceData(lastSequence, lastMessageTimestamp, !hasMessagesInTimeoutInterval))) + val sequenceData = SequenceData( + lastSequence = lastSequence, + lastMessageTimestamp = lastMessageTimestamp, + // we use started here because we don't want to fail next rule in the chain + // if the current rule was not initialized + untrusted = !hasMessagesInTimeoutInterval && started, + ) + sequenceSubject.onSuccess(Legacy(executorService, sequenceData)) } } @@ -414,6 +443,9 @@ abstract class AbstractCheckTask( protected open val skipPublication: Boolean = false + protected open val errorEventOnTimeout: Boolean + get() = true + protected fun isCheckpointLastReceivedMessage(): Boolean = bufferContainsStartMessage && !hasMessagesInTimeoutInterval /** @@ -452,6 +484,9 @@ abstract class AbstractCheckTask( private fun completeEventOrReportError(prevState: State): Boolean { return try { if (started) { + if (errorEventOnTimeout && prevState in TIMEOUT_STATES) { + addTimeoutEvent(prevState) + } completeEvent(prevState) doAfterCompleteEvent() false @@ -468,13 +503,55 @@ abstract class AbstractCheckTask( refs.rootEvent.addSubEventWithSamePeriod() .name("Check result event cannot build completely") .type("eventNotComplete") - .bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build")) - .bodyData(EventUtils.createMessageBean(e.message)) + .bodyData(createMessageBean("An unexpected exception has been thrown during result check build")) + .bodyData(createMessageBean(e.message)) .status(FAILED) true } } + private fun addTimeoutEvent(timeoutType: State) { + val timeoutValue: Long = when (timeoutType) { + State.TIMEOUT -> taskTimeout.timeout + State.MESSAGE_TIMEOUT -> taskTimeout.messageTimeout + else -> error("unexpected timeout state: $timeoutType") + } + refs.rootEvent.addSubEventWithSamePeriod() + .status(FAILED) + .type( + when (timeoutType) { + State.TIMEOUT -> "CheckTimeoutInterrupted" + State.MESSAGE_TIMEOUT -> "CheckMessageTimeoutInterrupted" + else -> error("unexpected timeout state: $timeoutType") + } + ).name("Rule processed $handledMessageCounter message(s) and was interrupted due to $timeoutValue mls ${timeoutType.name.lowercase()}") + .bodyData( + createMessageBean( + when (timeoutType) { + State.TIMEOUT -> timeoutText() + State.MESSAGE_TIMEOUT -> messageTimeoutText() + else -> error("unexpected timeout state: $timeoutType") + } + ) + ) + } + + private fun messageTimeoutText(): String = "Check task was interrupted because the timestamp on the last processed message exceeds the message timeout. " + + (checkpointTimeout + ?.toInstant() + ?.let { + "Rule expects messages between $it and ${it.plusMillis(taskTimeout.messageTimeout)} " + + "but processed one outside this range. Check the messages attached to the root rule event to find all processed messages." + } ?: "But the message timeout is not specified. Contact the developers.") + + private fun timeoutText(): String = + """ + |Check task was interrupted because the task execution took longer than ${taskTimeout.timeout} mls. The possible reasons are: + |* incorrect message filter - rule didn't find a match for all requested messages and kept working until the timeout exceeded (check key fields) + |* incorrect point of start - some of the expected messages were behind the start point and rule couldn't find them (check the checkpoint) + |* lack of the resources - rule might perform slow and didn't get to the expected messages in specified timeout (check component resources) + """.trimMargin() + private fun configureRootEvent() { refs.rootEvent.name(name()).type(type()) setup(refs.rootEvent) @@ -495,9 +572,11 @@ abstract class AbstractCheckTask( private fun fillUntrustedExecutionEvent() { refs.rootEvent.addSubEvent( Event.start() - .name("The current check is untrusted because the start point of the check interval has been selected approximately") + .name("The current check is untrusted because previous rule in the chain started from approximate start point") .status(FAILED) .type("untrustedExecution") + .bodyData(createMessageBean("The previous rule in the chain didn't found the start point in the messages cache. " + + "That means this rule might be started from an unexpected position. Be careful with its work results")) ) } @@ -571,6 +650,7 @@ abstract class AbstractCheckTask( private val RESPONSE_EXECUTOR = ForkJoinPool.commonPool() @JvmField val CONVERTER = ProtoToIMessageConverter(VerificationUtil.FACTORY_PROXY, null, null, createParameters().setUseMarkerForNullsInMessage(true)) + private val TIMEOUT_STATES: Set = setOf(State.TIMEOUT, State.MESSAGE_TIMEOUT) } protected fun RootMessageFilter.metadataFilterOrNull(): MetadataFilter? = diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt index 3addb4a5..bbe76ae1 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt @@ -100,6 +100,9 @@ class NoMessageCheckTask( override fun type(): String = "noMessageCheck" + override val errorEventOnTimeout: Boolean + get() = false + override fun setup(rootEvent: Event) { rootEvent.bodyData(EventUtils.createMessageBean("No message check rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)" }}")) } diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt index 3f13ba3d..76028858 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt @@ -112,6 +112,9 @@ class SilenceCheckTask( } } + override val errorEventOnTimeout: Boolean + get() = false + override fun name(): String = "AutoSilenceCheck" override fun type(): String = "AutoSilenceCheck" diff --git a/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt b/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt new file mode 100644 index 00000000..194f2935 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2022 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.check1.utils + +import com.exactpro.th2.check1.SessionKey +import com.exactpro.th2.common.grpc.ConnectionID +import com.exactpro.th2.common.grpc.MessageID + +fun SessionKey.toMessageID(sequence: Long): MessageID = MessageID.newBuilder() + .setConnectionId( + ConnectionID.newBuilder() + .setSessionAlias(sessionAlias) + .build()) + .setSequence(sequence) + .setDirection(direction) + .build() \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt index a8d83f7f..4abf44b5 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt @@ -91,8 +91,8 @@ class TestChain: AbstractCheckTaskTest() { val task = sequenceCheckRuleTask(listOf(3, 4), eventID, streams).also { it.begin() } var eventList = awaitEventBatchAndGetEvents(6, 6) - assertEquals(8, eventList.size) - assertEquals(4, eventList.filter { it.status == SUCCESS }.size) + assertEquals(9, eventList.size) + assertEquals(5, eventList.filter { it.status == SUCCESS }.size) assertEquals(4, eventList.filter { it.status == FAILED }.size) sequenceCheckRuleTask(listOf(1, 2), eventID, streams).also { task.subscribeNextTask(it) } @@ -106,8 +106,8 @@ class TestChain: AbstractCheckTaskTest() { val task = sequenceCheckRuleTask(listOf(1, 4), eventID, streams).also { it.begin() } var eventList = awaitEventBatchAndGetEvents(6, 6) - assertEquals(9, eventList.size) - assertEquals(5, eventList.filter { it.status == SUCCESS }.size) + assertEquals(10, eventList.size) + assertEquals(6, eventList.filter { it.status == SUCCESS }.size) assertEquals(4, eventList.filter { it.status == FAILED }.size) sequenceCheckRuleTask(listOf(2, 3), eventID, streams).also { task.subscribeNextTask(it) } @@ -130,8 +130,8 @@ class TestChain: AbstractCheckTaskTest() { task.begin() val eventList = awaitEventBatchRequest(1000L, 4 * 2).flatMap(EventBatch::getEventsList) - assertEquals(4 * 3, eventList.size) - assertEquals(4 * 3, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4 * 4, eventList.size) + assertEquals(4 * 4, eventList.filter { it.status == SUCCESS }.size) assertEquals(listOf(1L, 2L, 3L, 4L), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence)) } @@ -236,7 +236,7 @@ class TestChain: AbstractCheckTaskTest() { val rootEvent = eventsList.first() assertEquals(FAILED, rootEvent.status) assertEquals(3, rootEvent.attachedMessageIdsCount) - assertEquals(1, eventsList[2].attachedMessageIdsCount) + assertEquals(1, eventsList.single { it.type == "Verification" }.attachedMessageIdsCount) assertEquals(FAILED, eventsList.last().status) }) } @@ -246,20 +246,20 @@ class TestChain: AbstractCheckTaskTest() { awaitEventBatchRequest(1000L, times).drop(times - last).flatMap(EventBatch::getEventsList) private fun checkSimpleVerifySuccess(eventList: List, sequence: Long) { - assertEquals(3, eventList.size) - assertEquals(3, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4, eventList.size) + assertEquals(4, eventList.filter { it.status == SUCCESS }.size) assertEquals(listOf(sequence), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence)) } private fun checkSimpleVerifyFailure(eventList: List) { - assertEquals(3, eventList.size) - assertEquals(1, eventList.filter { it.status == SUCCESS }.size) + assertEquals(4, eventList.size) + assertEquals(2, eventList.filter { it.status == SUCCESS }.size) assertEquals(2, eventList.filter { it.status == FAILED }.size) } private fun checkSequenceVerifySuccess(eventList: List, sequences: List) { - assertEquals(8, eventList.size) - assertEquals(8, eventList.filter { it.status == SUCCESS }.size) + assertEquals(9, eventList.size) + assertEquals(9, eventList.filter { it.status == SUCCESS }.size) assertEquals(sequences, eventList .dropWhile { it.type != CHECK_MESSAGES_TYPE } // Skip prefilter .filter { it.type == VERIFICATION_TYPE } @@ -339,6 +339,6 @@ class TestChain: AbstractCheckTaskTest() { companion object { private const val KEY_FIELD = "key" private const val NOT_KEY_FIELD = "not_key" - private const val UNTRUSTED_EXECUTION_EVENT_NAME: String = "The current check is untrusted because the start point of the check interval has been selected approximately" + private const val UNTRUSTED_EXECUTION_EVENT_NAME: String = "The current check is untrusted because previous rule in the chain started from approximate start point" } } diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt index 5fc4a381..344f0343 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt @@ -61,6 +61,7 @@ import org.junit.jupiter.params.provider.ValueSource import java.lang.IllegalArgumentException import java.time.Instant import java.util.stream.Stream +import java.util.concurrent.Executors import kotlin.test.assertEquals import kotlin.test.assertNotNull @@ -81,6 +82,35 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { clientStub ) + @Test + fun `success verification several rules on same executor`() { + val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( + message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .mergeMetadata(MessageMetadata.newBuilder() + .putProperties("keyProp", "42") + .putProperties("notKeyProp", "2") + .build()) + .build() + )) + + val eventID = EventID.newBuilder().setId("root").build() + val filter = RootMessageFilter.newBuilder() + .setMessageType(MESSAGE_TYPE) + .setMetadataFilter(MetadataFilter.newBuilder() + .putPropertyFilters("keyProp", "42".toSimpleFilter(FilterOperation.EQUAL, true))) + .build() + val executor = Executors.newSingleThreadExecutor() + + checkTask(filter, eventID, streams).apply { begin(executorService = executor) } + checkTask(filter, eventID, streams).apply { begin(executorService = executor) } + + val eventBatches = awaitEventBatchRequest(1000L, 4) + val eventList = eventBatches.flatMap(EventBatch::getEventsList) + val eventsPerRule = 5 + assertEquals(2 * eventsPerRule, eventList.size) + assertEquals(2 * eventsPerRule, eventList.filter { it.status == SUCCESS }.size) + } + @Test fun `success verification`() { val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( @@ -103,8 +133,8 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) - assertEquals(4, eventList.filter { it.status == SUCCESS }.size) + assertEquals(5, eventList.size) + assertEquals(5, eventList.filter { it.status == SUCCESS }.size) } @Test @@ -153,9 +183,9 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val task = checkTask(filter, eventID, streams, 200) task.begin() - val eventBatches = awaitEventBatchRequest(1000L, 3) + val eventBatches = awaitEventBatchRequest(1000L, 4) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertEquals(2, eventList.filter { it.status == EventStatus.FAILED }.size) // Message filter and verification exceed max event batch content size } @@ -181,7 +211,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatch = awaitEventBatchRequest(1000L, 2) val eventList = eventBatch.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) + assertEquals(5, eventList.size) assertTrue({ eventList.none { it.status == EventStatus.FAILED } }) { @@ -294,7 +324,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -367,7 +397,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -407,7 +437,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -448,8 +478,8 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(4, eventList.size) - assertEquals(4, eventList.filter { it.status == SUCCESS }.size) + assertEquals(5, eventList.size) + assertEquals(5, eventList.filter { it.status == SUCCESS }.size) } @Test @@ -477,7 +507,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(5, eventList.size) + assertEquals(6, eventList.size) assertEquals(2, eventList.filter { it.status == SUCCESS && it.type == "Verification" }.size) assertEquals(FAILED, eventList.last().status) } @@ -515,8 +545,8 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) - assertEquals(3, eventList.size) - assertEquals(2, eventList.filter { it.status == FAILED }.size) + assertEquals(5, eventList.size) + assertEquals(3, eventList.filter { it.status == FAILED }.size) } @Test @@ -572,7 +602,7 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) }, { val verificationEvent = eventList.find { it.type == "Verification" } assertNotNull(verificationEvent) { "Missed verification event" } @@ -613,4 +643,4 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { Arguments.of(null, false) ) } -} \ No newline at end of file +} diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt index 6af402bc..e036be63 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt @@ -51,6 +51,8 @@ import java.util.stream.Stream import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue +import com.exactpro.th2.common.message.toJson +import org.junit.jupiter.api.Assertions class TestSequenceCheckTask : AbstractCheckTaskTest() { @@ -124,6 +126,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { /* checkSequenceRule + ruleStartPoint preFiltering Verification x 3 checkMessages @@ -152,8 +155,13 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { assertTrue (getEventsList().all { VERIFICATION_TYPE == it.type }) } with(batchRequest[5]) { - assertEquals(1, eventsCount) - assertEquals("checkSequence", getEvents(0).type) + assertEquals(2, eventsCount) + Assertions.assertEquals(1, eventsList.count { it.type == "checkSequence" }) { + "unexpected count of \"checkSequence\" events: ${eventsList.joinToString { it.toJson() }}" + } + Assertions.assertEquals(1, eventsList.count { it.type == "ruleStartPoint" }) { + "unexpected count of \"ruleStartPoint\" events: ${eventsList.joinToString { it.toJson() }}" + } } }, { val checkedMessages = assertNotNull(eventsList.find { it.type == CHECK_MESSAGES_TYPE }, "Cannot find checkMessages event") @@ -763,7 +771,7 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { val eventBatches = awaitEventBatchRequest(1000L, 2) val eventList = eventBatches.flatMap(EventBatch::getEventsList) assertAll({ - assertEquals(3, eventList.size) + assertEquals(4, eventList.size) assertEquals(1, eventList.filter { it.type == "internalError" }.size) }) }