Skip to content

Commit

Permalink
[TH2-1686] Use common scheduler for executing rules pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed Oct 21, 2022
1 parent 61a4d7b commit fa12e33
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
package com.exactpro.th2.check1

import io.reactivex.observers.DisposableObserver
import mu.KotlinLogging
import org.slf4j.Logger
import org.slf4j.LoggerFactory

abstract class AbstractSessionObserver<T> : DisposableObserver<T>() {
override fun onComplete() {
Expand All @@ -26,8 +26,7 @@ abstract class AbstractSessionObserver<T> : DisposableObserver<T>() {
}

companion object {
@Suppress("JAVA_CLASS_ON_COMPANION")
@JvmField
val LOGGER: Logger = LoggerFactory.getLogger(javaClass.enclosingClass)
val LOGGER: Logger = KotlinLogging.logger { }
}
}
52 changes: 32 additions & 20 deletions src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import com.exactpro.th2.check1.grpc.ChainID
import com.exactpro.th2.check1.grpc.CheckRuleRequest
import com.exactpro.th2.check1.grpc.CheckSequenceRuleRequest
import com.exactpro.th2.check1.grpc.CheckpointRequestOrBuilder
import com.exactpro.th2.check1.metrics.BufferMetric
import com.exactpro.th2.check1.grpc.NoMessageCheckRequest
import com.exactpro.th2.check1.metrics.BufferMetric
import com.exactpro.th2.check1.rule.AbstractCheckTask
import com.exactpro.th2.check1.rule.RuleFactory
import com.exactpro.th2.common.event.Event
Expand All @@ -34,15 +34,18 @@ import com.exactpro.th2.common.schema.message.MessageListener
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.schema.message.SubscriberMonitor
import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.google.protobuf.TextFormat.shortDebugString
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import org.slf4j.LoggerFactory
import java.io.IOException
import mu.KotlinLogging
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint
import com.exactpro.th2.check1.utils.toMessageID
import com.exactpro.th2.common.message.toJson
Expand All @@ -53,7 +56,7 @@ class CollectorService(
private val configuration: Check1Configuration,
) {

private val logger = LoggerFactory.getLogger(javaClass.name + '@' + hashCode())
private val logger = KotlinLogging.logger(javaClass.name + '@' + hashCode())

/**
* Queue name to subscriber. Messages with different connectivity can be transferred with one queue.
Expand All @@ -67,7 +70,9 @@ class CollectorService(
private val olderThanDelta = configuration.cleanupOlderThan
private val olderThanTimeUnit = configuration.cleanupTimeUnit
private val defaultAutoSilenceCheck: Boolean = configuration.isAutoSilenceCheckAfterSequenceRule

private val commonRuleExecutor: ExecutorService = Executors.newSingleThreadExecutor(
ThreadFactoryBuilder().setNameFormat("rule-executor-%d").build()
)
private var ruleFactory: RuleFactory

init {
Expand Down Expand Up @@ -143,7 +148,7 @@ class CollectorService(
} else {
checkpoint
}
value?.subscribeNextTask(this) ?: begin(realCheckpoint)
value?.subscribeNextTask(this) ?: begin(realCheckpoint, commonRuleExecutor)
}

private fun CheckRuleRequest.getChainIdOrGenerate(): ChainID {
Expand Down Expand Up @@ -178,13 +183,13 @@ class CollectorService(
val endTime = task.endTime
when {
!olderThan(now, delta, unit, endTime) -> false
task.tryShutdownExecutor() -> {
logger.info("Removed task ${task.description} ($endTime) from tasks map")
true
}
else -> {
logger.warn("Task ${task.description} can't be removed because it has a continuation")
false
!task.hasNextRule().also { canBeRemoved ->
when {
canBeRemoved -> logger.info("Removed task ${task.description} ($endTime) from tasks map")
else -> logger.warn("Task ${task.description} can't be removed because it has a continuation")
}
}
}
}
}
Expand All @@ -197,10 +202,7 @@ class CollectorService(
private fun sendEvents(parentEventID: EventID, event: Event) {
logger.debug("Sending event thee id '{}' parent id '{}'", event.id, parentEventID)

val batch = EventBatch.newBuilder()
.setParentEventId(parentEventID)
.addAllEvents(event.toListProto(parentEventID))
.build()
val batch = event.toBatchProto(parentEventID)

ForkJoinPool.commonPool().execute {
try {
Expand All @@ -222,12 +224,22 @@ class CollectorService(
}

fun close() {
try {
subscriberMonitor.unsubscribe()
} catch (e: IOException) {
logger.error("Close subscriber failure", e)
runCatching(subscriberMonitor::unsubscribe).onFailure {
logger.error(it) { "Close subscriber failure" }
}
mqSubject.onComplete()
runCatching {
commonRuleExecutor.shutdown()
val timeout: Long = 10
val unit = TimeUnit.SECONDS
if (!commonRuleExecutor.awaitTermination(timeout, unit)) {
logger.warn { "Cannot shutdown executor during ${unit.toMillis(timeout)} ms. Force shutdown" }
val remainingTasks = commonRuleExecutor.shutdownNow()
logger.warn { "Tasks left: ${remainingTasks.size}" }
}
}.onFailure {
logger.error(it) { "Cannot shutdown common task executor" }
}
}

private fun subscribe(listener: MessageListener<MessageBatch>): SubscriberMonitor {
Expand Down
30 changes: 8 additions & 22 deletions src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,9 @@ abstract class AbstractCheckTask(
}

/**
* Shutdown the executor that is used to perform this task in case it doesn't have a next task
* @return true if the task doesn't have a next task, otherwise it will return false
* Returns `true` if the rule has a continuation (the rule that should start after the current on is finished)
*/
fun tryShutdownExecutor(): Boolean {
if (hasNextTask.get()) {
LOGGER.warn("Cannot shutdown executor for task '$description' that has a connected task")
return false
}
executorService.shutdown()
return true
}
fun hasNextRule(): Boolean = hasNextTask.get()

/**
* Registers a task as the next task in the continuous verification chain. Its [begin] method will be called
Expand All @@ -195,14 +187,8 @@ abstract class AbstractCheckTask(
if (hasNextTask.compareAndSet(false, true)) {
onChainedTaskSubscription()
sequenceSubject.subscribe { legacy ->
val executor = if (legacy.executorService.isShutdown) {
LOGGER.warn("Executor has been shutdown before next task has been subscribed. Create a new one")
createExecutorService()
} else {
legacy.executorService
}
legacy.sequenceData.apply {
checkTask.begin(lastSequence, lastMessageTimestamp, executor, PreviousExecutionData(untrusted, completed))
checkTask.begin(lastSequence, lastMessageTimestamp, executorService, PreviousExecutionData(untrusted, completed))
}
}
LOGGER.info("Task {} ({}) subscribed to task {} ({})", checkTask.description, checkTask.hashCode(), description, hashCode())
Expand All @@ -213,14 +199,14 @@ abstract class AbstractCheckTask(

/**
* Observe a message sequence from the checkpoint.
* Task subscribe to messages stream with its sequence after call.
* Task subscribe to message's stream with its sequence after call.
* This method should be called only once otherwise it throws IllegalStateException.
* @param checkpoint message sequence and checkpoint timestamp from previous task.
* @throws IllegalStateException when method is called more than once.
*/
fun begin(checkpoint: Checkpoint? = null) {
fun begin(checkpoint: Checkpoint? = null, executorService: ExecutorService = createExecutorService()) {
val checkpointData = checkpoint?.getCheckpointData(sessionKey)
begin(checkpointData?.sequence ?: DEFAULT_SEQUENCE, checkpointData?.timestamp)
begin(checkpointData?.sequence ?: DEFAULT_SEQUENCE, checkpointData?.timestamp, executorService)
}

/**
Expand Down Expand Up @@ -279,7 +265,7 @@ abstract class AbstractCheckTask(

/**
* Observe a message sequence from the previous task.
* Task subscribe to messages stream with sequence after call.
* Task subscribe to message's stream with sequence after call.
* This method should be called only once otherwise it throws IllegalStateException.
* @param sequence message sequence from the previous task.
* @param checkpointTimestamp checkpoint timestamp from the previous task
Expand All @@ -291,7 +277,7 @@ abstract class AbstractCheckTask(
private fun begin(
sequence: Long = DEFAULT_SEQUENCE,
checkpointTimestamp: Timestamp? = null,
executorService: ExecutorService = createExecutorService(),
executorService: ExecutorService,
previousExecutionData: PreviousExecutionData = PreviousExecutionData.DEFAULT
) {
configureRootEvent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import org.junit.jupiter.params.provider.ValueSource
import java.lang.IllegalArgumentException
import java.time.Instant
import java.util.stream.Stream
import java.util.concurrent.Executors
import kotlin.test.assertEquals
import kotlin.test.assertNotNull

Expand All @@ -81,6 +82,35 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
clientStub
)

@Test
fun `success verification several rules on same executor`() {
val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf(
message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS)
.mergeMetadata(MessageMetadata.newBuilder()
.putProperties("keyProp", "42")
.putProperties("notKeyProp", "2")
.build())
.build()
))

val eventID = EventID.newBuilder().setId("root").build()
val filter = RootMessageFilter.newBuilder()
.setMessageType(MESSAGE_TYPE)
.setMetadataFilter(MetadataFilter.newBuilder()
.putPropertyFilters("keyProp", "42".toSimpleFilter(FilterOperation.EQUAL, true)))
.build()
val executor = Executors.newSingleThreadExecutor()

checkTask(filter, eventID, streams).apply { begin(executorService = executor) }
checkTask(filter, eventID, streams).apply { begin(executorService = executor) }

val eventBatches = awaitEventBatchRequest(1000L, 4)
val eventList = eventBatches.flatMap(EventBatch::getEventsList)
val eventsPerRule = 5
assertEquals(2 * eventsPerRule, eventList.size)
assertEquals(2 * eventsPerRule, eventList.filter { it.status == SUCCESS }.size)
}

@Test
fun `success verification`() {
val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf(
Expand Down Expand Up @@ -613,4 +643,4 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() {
Arguments.of(null, false)
)
}
}
}

0 comments on commit fa12e33

Please sign in to comment.