diff --git a/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt b/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt index 1161fa8..b4a8d72 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/AbstractSessionObserver.kt @@ -13,8 +13,8 @@ package com.exactpro.th2.check1 import io.reactivex.observers.DisposableObserver +import mu.KotlinLogging import org.slf4j.Logger -import org.slf4j.LoggerFactory abstract class AbstractSessionObserver : DisposableObserver() { override fun onComplete() { @@ -26,8 +26,7 @@ abstract class AbstractSessionObserver : DisposableObserver() { } companion object { - @Suppress("JAVA_CLASS_ON_COMPANION") @JvmField - val LOGGER: Logger = LoggerFactory.getLogger(javaClass.enclosingClass) + val LOGGER: Logger = KotlinLogging.logger { } } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt index f880914..4826f1a 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt @@ -19,8 +19,8 @@ import com.exactpro.th2.check1.grpc.ChainID import com.exactpro.th2.check1.grpc.CheckRuleRequest import com.exactpro.th2.check1.grpc.CheckSequenceRuleRequest import com.exactpro.th2.check1.grpc.CheckpointRequestOrBuilder -import com.exactpro.th2.check1.metrics.BufferMetric import com.exactpro.th2.check1.grpc.NoMessageCheckRequest +import com.exactpro.th2.check1.metrics.BufferMetric import com.exactpro.th2.check1.rule.AbstractCheckTask import com.exactpro.th2.check1.rule.RuleFactory import com.exactpro.th2.common.event.Event @@ -34,15 +34,18 @@ import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.common.schema.message.SubscriberMonitor import com.fasterxml.jackson.core.JsonProcessingException +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.protobuf.TextFormat.shortDebugString import io.reactivex.Observable import io.reactivex.subjects.PublishSubject -import org.slf4j.LoggerFactory -import java.io.IOException +import mu.KotlinLogging import java.time.Instant import java.time.temporal.ChronoUnit import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.ForkJoinPool +import java.util.concurrent.TimeUnit import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint import com.exactpro.th2.check1.utils.toMessageID import com.exactpro.th2.common.message.toJson @@ -53,7 +56,7 @@ class CollectorService( private val configuration: Check1Configuration, ) { - private val logger = LoggerFactory.getLogger(javaClass.name + '@' + hashCode()) + private val logger = KotlinLogging.logger(javaClass.name + '@' + hashCode()) /** * Queue name to subscriber. Messages with different connectivity can be transferred with one queue. @@ -67,7 +70,9 @@ class CollectorService( private val olderThanDelta = configuration.cleanupOlderThan private val olderThanTimeUnit = configuration.cleanupTimeUnit private val defaultAutoSilenceCheck: Boolean = configuration.isAutoSilenceCheckAfterSequenceRule - + private val commonRuleExecutor: ExecutorService = Executors.newSingleThreadExecutor( + ThreadFactoryBuilder().setNameFormat("rule-executor-%d").build() + ) private var ruleFactory: RuleFactory init { @@ -143,7 +148,7 @@ class CollectorService( } else { checkpoint } - value?.subscribeNextTask(this) ?: begin(realCheckpoint) + value?.subscribeNextTask(this) ?: begin(realCheckpoint, commonRuleExecutor) } private fun CheckRuleRequest.getChainIdOrGenerate(): ChainID { @@ -178,13 +183,13 @@ class CollectorService( val endTime = task.endTime when { !olderThan(now, delta, unit, endTime) -> false - task.tryShutdownExecutor() -> { - logger.info("Removed task ${task.description} ($endTime) from tasks map") - true - } else -> { - logger.warn("Task ${task.description} can't be removed because it has a continuation") - false + !task.hasNextRule().also { canBeRemoved -> + when { + canBeRemoved -> logger.info("Removed task ${task.description} ($endTime) from tasks map") + else -> logger.warn("Task ${task.description} can't be removed because it has a continuation") + } + } } } } @@ -197,10 +202,7 @@ class CollectorService( private fun sendEvents(parentEventID: EventID, event: Event) { logger.debug("Sending event thee id '{}' parent id '{}'", event.id, parentEventID) - val batch = EventBatch.newBuilder() - .setParentEventId(parentEventID) - .addAllEvents(event.toListProto(parentEventID)) - .build() + val batch = event.toBatchProto(parentEventID) ForkJoinPool.commonPool().execute { try { @@ -222,12 +224,22 @@ class CollectorService( } fun close() { - try { - subscriberMonitor.unsubscribe() - } catch (e: IOException) { - logger.error("Close subscriber failure", e) + runCatching(subscriberMonitor::unsubscribe).onFailure { + logger.error(it) { "Close subscriber failure" } } mqSubject.onComplete() + runCatching { + commonRuleExecutor.shutdown() + val timeout: Long = 10 + val unit = TimeUnit.SECONDS + if (!commonRuleExecutor.awaitTermination(timeout, unit)) { + logger.warn { "Cannot shutdown executor during ${unit.toMillis(timeout)} ms. Force shutdown" } + val remainingTasks = commonRuleExecutor.shutdownNow() + logger.warn { "Tasks left: ${remainingTasks.size}" } + } + }.onFailure { + logger.error(it) { "Cannot shutdown common task executor" } + } } private fun subscribe(listener: MessageListener): SubscriberMonitor { diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt index ab650bf..2102c82 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt @@ -171,17 +171,9 @@ abstract class AbstractCheckTask( } /** - * Shutdown the executor that is used to perform this task in case it doesn't have a next task - * @return true if the task doesn't have a next task, otherwise it will return false + * Returns `true` if the rule has a continuation (the rule that should start after the current on is finished) */ - fun tryShutdownExecutor(): Boolean { - if (hasNextTask.get()) { - LOGGER.warn("Cannot shutdown executor for task '$description' that has a connected task") - return false - } - executorService.shutdown() - return true - } + fun hasNextRule(): Boolean = hasNextTask.get() /** * Registers a task as the next task in the continuous verification chain. Its [begin] method will be called @@ -195,14 +187,8 @@ abstract class AbstractCheckTask( if (hasNextTask.compareAndSet(false, true)) { onChainedTaskSubscription() sequenceSubject.subscribe { legacy -> - val executor = if (legacy.executorService.isShutdown) { - LOGGER.warn("Executor has been shutdown before next task has been subscribed. Create a new one") - createExecutorService() - } else { - legacy.executorService - } legacy.sequenceData.apply { - checkTask.begin(lastSequence, lastMessageTimestamp, executor, PreviousExecutionData(untrusted, completed)) + checkTask.begin(lastSequence, lastMessageTimestamp, executorService, PreviousExecutionData(untrusted, completed)) } } LOGGER.info("Task {} ({}) subscribed to task {} ({})", checkTask.description, checkTask.hashCode(), description, hashCode()) @@ -213,14 +199,14 @@ abstract class AbstractCheckTask( /** * Observe a message sequence from the checkpoint. - * Task subscribe to messages stream with its sequence after call. + * Task subscribe to message's stream with its sequence after call. * This method should be called only once otherwise it throws IllegalStateException. * @param checkpoint message sequence and checkpoint timestamp from previous task. * @throws IllegalStateException when method is called more than once. */ - fun begin(checkpoint: Checkpoint? = null) { + fun begin(checkpoint: Checkpoint? = null, executorService: ExecutorService = createExecutorService()) { val checkpointData = checkpoint?.getCheckpointData(sessionKey) - begin(checkpointData?.sequence ?: DEFAULT_SEQUENCE, checkpointData?.timestamp) + begin(checkpointData?.sequence ?: DEFAULT_SEQUENCE, checkpointData?.timestamp, executorService) } /** @@ -279,7 +265,7 @@ abstract class AbstractCheckTask( /** * Observe a message sequence from the previous task. - * Task subscribe to messages stream with sequence after call. + * Task subscribe to message's stream with sequence after call. * This method should be called only once otherwise it throws IllegalStateException. * @param sequence message sequence from the previous task. * @param checkpointTimestamp checkpoint timestamp from the previous task @@ -291,7 +277,7 @@ abstract class AbstractCheckTask( private fun begin( sequence: Long = DEFAULT_SEQUENCE, checkpointTimestamp: Timestamp? = null, - executorService: ExecutorService = createExecutorService(), + executorService: ExecutorService, previousExecutionData: PreviousExecutionData = PreviousExecutionData.DEFAULT ) { configureRootEvent() diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt index 3eeeeea..344f034 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt @@ -61,6 +61,7 @@ import org.junit.jupiter.params.provider.ValueSource import java.lang.IllegalArgumentException import java.time.Instant import java.util.stream.Stream +import java.util.concurrent.Executors import kotlin.test.assertEquals import kotlin.test.assertNotNull @@ -81,6 +82,35 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { clientStub ) + @Test + fun `success verification several rules on same executor`() { + val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( + message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .mergeMetadata(MessageMetadata.newBuilder() + .putProperties("keyProp", "42") + .putProperties("notKeyProp", "2") + .build()) + .build() + )) + + val eventID = EventID.newBuilder().setId("root").build() + val filter = RootMessageFilter.newBuilder() + .setMessageType(MESSAGE_TYPE) + .setMetadataFilter(MetadataFilter.newBuilder() + .putPropertyFilters("keyProp", "42".toSimpleFilter(FilterOperation.EQUAL, true))) + .build() + val executor = Executors.newSingleThreadExecutor() + + checkTask(filter, eventID, streams).apply { begin(executorService = executor) } + checkTask(filter, eventID, streams).apply { begin(executorService = executor) } + + val eventBatches = awaitEventBatchRequest(1000L, 4) + val eventList = eventBatches.flatMap(EventBatch::getEventsList) + val eventsPerRule = 5 + assertEquals(2 * eventsPerRule, eventList.size) + assertEquals(2 * eventsPerRule, eventList.filter { it.status == SUCCESS }.size) + } + @Test fun `success verification`() { val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( @@ -613,4 +643,4 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { Arguments.of(null, false) ) } -} \ No newline at end of file +}