From abd736e7ecae38b5cbe610e6e97a44d3b45edb4a Mon Sep 17 00:00:00 2001 From: Oleg Date: Mon, 24 Oct 2022 11:46:39 +0400 Subject: [PATCH] [TH2-4351] Check status when submitting events --- .../com/exactpro/th2/check1/AbstractSessionObserver.kt | 3 ++- .../com/exactpro/th2/check1/rule/AbstractCheckTask.kt | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt b/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt index b4a8d720..6378c9b4 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt @@ -15,6 +15,7 @@ package com.exactpro.th2.check1 import io.reactivex.observers.DisposableObserver import mu.KotlinLogging import org.slf4j.Logger +import org.slf4j.LoggerFactory abstract class AbstractSessionObserver : DisposableObserver() { override fun onComplete() { @@ -27,6 +28,6 @@ abstract class AbstractSessionObserver : DisposableObserver() { companion object { @JvmField - val LOGGER: Logger = KotlinLogging.logger { } + val LOGGER: Logger = LoggerFactory.getLogger(AbstractSessionObserver::class.java) } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt index 2102c82d..2ce35796 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt @@ -361,7 +361,14 @@ abstract class AbstractCheckTask( private fun taskFinished() { try { - val currentState = taskState.get() + val currentState = taskState.updateAndGet { + when (it) { + // When we complete because of the stream completion the unsubscribe method might be called before the complete method + // Because of that we need to check the status and use the completion status if we are still in BEGIN state + State.BEGIN -> streamCompletedState + else -> it + } + } LOGGER.info("Finishes task '$description' in state ${currentState.name}") if (currentState.callOnTimeoutCallback) { callOnTimeoutCallback()