From bb9444ddd89041090865715851dbf4bfd819641f Mon Sep 17 00:00:00 2001 From: Oleg Smelov <45400511+lumber1000@users.noreply.github.com> Date: Thu, 6 Oct 2022 16:11:48 +0400 Subject: [PATCH] TH2-3615/3841 (#159) UninitializedPropertyAccessException fixed (TH2-3615) Rules corrected to avoid holding data when the rule is finished (TH2-3841) --- .../th2/check1/rule/AbstractCheckTask.kt | 75 +++++++--- .../th2/check1/rule/check/CheckRuleTask.kt | 55 ++++--- .../rule/nomessage/NoMessageCheckTask.kt | 77 ++++++---- .../rule/sequence/SequenceCheckRuleTask.kt | 136 +++++++++--------- .../check1/rule/sequence/SilenceCheckTask.kt | 83 ++++++----- 5 files changed, 255 insertions(+), 171 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt index 1e904ec4..b375479c 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt @@ -74,18 +74,30 @@ import java.util.concurrent.atomic.AtomicReference */ abstract class AbstractCheckTask( private val ruleConfiguration: RuleConfiguration, - submitTime: Instant, + private val submitTime: Instant, protected val sessionKey: SessionKey, private val parentEventID: EventID, private val messageStream: Observable, private val eventBatchRouter: MessageRouter ) : AbstractSessionObserver() { + protected open class Refs(val rootEvent: Event) + + protected class RefsKeeper(refs: T) { + private var refsNullable: T? = refs + val refs: T get() = refsNullable ?: error("Requesting references after references has been erased.") + fun eraseRefs() { + refsNullable = null + } + } + + protected abstract val refsKeeper: RefsKeeper + private val refs get() = refsKeeper.refs + val description: String? = ruleConfiguration.description private val taskTimeout: TaskTimeout = ruleConfiguration.taskTimeout protected var handledMessageCounter: Long = 0 - protected val rootEvent: Event = Event.from(submitTime).description(description) private val sequenceSubject = SingleSubject.create() private val hasNextTask = AtomicBoolean(false) @@ -130,17 +142,27 @@ abstract class AbstractCheckTask( private var bufferContainsStartMessage: Boolean = false private var isDefaultSequence: Boolean = false - override fun onStart() { + @Volatile + protected var started = false + + protected fun createRootEvent() = Event.from(submitTime).description(description) + + final override fun onStart() { super.onStart() + started = true //Init or re-init variable in TASK_SCHEDULER thread handledMessageCounter = 0 + + onStartInit() } + protected abstract fun onStartInit() + override fun onError(e: Throwable) { super.onError(e) - rootEvent.status(FAILED) + refs.rootEvent.status(FAILED) .bodyData(EventUtils.createMessageBean(e.message)) end(State.ERROR, "Error ${e.message} received in message stream") } @@ -182,7 +204,7 @@ abstract class AbstractCheckTask( } LOGGER.info("Task {} ({}) subscribed to task {} ({})", checkTask.description, checkTask.hashCode(), description, hashCode()) } else { - throw IllegalStateException("Subscription to last sequence for task $description (${hashCode()}) is already executed, subscriber ${checkTask.description} (${checkTask.hashCode()})") + error("Subscription to last sequence for task $description (${hashCode()}) is already executed, subscriber ${checkTask.description} (${checkTask.hashCode()})") } } @@ -272,7 +294,7 @@ abstract class AbstractCheckTask( configureRootEvent() isParentCompleted = previousExecutionData.completed if (!taskState.compareAndSet(State.CREATED, State.BEGIN)) { - throw IllegalStateException("Task $description already has been started") + error("Task $description already has been started") } LOGGER.info("Check begin for session alias '{}' with sequence '{}' and task timeout '{}'", sessionKey, sequence, taskTimeout) RuleMetric.incrementActiveRule(type()) @@ -292,7 +314,7 @@ abstract class AbstractCheckTask( // All sources above will be disposed on this scheduler. // - // This method should be called as closer as possible + // This method should be called as close as possible // to the actual dispose that you want to execute on this scheduler // because other operations are executed on the same single-thread scheduler. // @@ -304,7 +326,7 @@ abstract class AbstractCheckTask( handledMessageCounter++ with(it.metadata.id) { - rootEvent.messageID(this) + refs.rootEvent.messageID(this) } } .takeWhileMessagesInTimeout() @@ -313,7 +335,7 @@ abstract class AbstractCheckTask( .subscribe(this) } catch (exception: Exception) { LOGGER.error("An internal error occurred while executing rule", exception) - rootEvent.addSubEventWithSamePeriod() + refs.rootEvent.addSubEventWithSamePeriod() .name("An error occurred while executing rule") .type("internalError") .status(FAILED) @@ -347,6 +369,7 @@ abstract class AbstractCheckTask( .build()) } finally { RuleMetric.decrementActiveRule(type()) + refsKeeper.eraseRefs() sequenceSubject.onSuccess(Legacy(executorService, SequenceData(lastSequence, lastMessageTimestamp, !hasMessagesInTimeoutInterval))) } } @@ -406,7 +429,7 @@ abstract class AbstractCheckTask( LOGGER.info("Skip event publication for task ${type()} '$description' (${hashCode()})") return } - val batches = rootEvent.disperseToBatches(ruleConfiguration.maxEventBatchContentSize, parentEventID) + val batches = refs.rootEvent.disperseToBatches(ruleConfiguration.maxEventBatchContentSize, parentEventID) RESPONSE_EXECUTOR.execute { batches.forEach { batch -> @@ -422,18 +445,27 @@ abstract class AbstractCheckTask( } } } else { - LOGGER.debug("Event tree id '{}' parent id '{}' is already published", rootEvent.id, parentEventID) + LOGGER.debug("Event tree id '{}' parent id '{}' is already published", refs.rootEvent.id, parentEventID) } } private fun completeEventOrReportError(prevState: State): Boolean { return try { - completeEvent(prevState) - doAfterCompleteEvent() - false + if (started) { + completeEvent(prevState) + doAfterCompleteEvent() + false + } else { + LOGGER.error("Check task was not started.") + refs.rootEvent.addSubEventWithSamePeriod() + .name("Check failed: task timeout elapsed before the check task was started. Please, check component resources for throttling or intensive GC") + .type("taskNotStarted") + .status(FAILED) + true + } } catch (e: Exception) { LOGGER.error("Result event cannot be completed", e) - rootEvent.addSubEventWithSamePeriod() + refs.rootEvent.addSubEventWithSamePeriod() .name("Check result event cannot build completely") .type("eventNotComplete") .bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build")) @@ -444,8 +476,8 @@ abstract class AbstractCheckTask( } private fun configureRootEvent() { - rootEvent.name(name()).type(type()) - setup(rootEvent) + refs.rootEvent.name(name()).type(type()) + setup(refs.rootEvent) } private fun doAfterCompleteEvent() { @@ -461,7 +493,7 @@ abstract class AbstractCheckTask( } private fun fillUntrustedExecutionEvent() { - rootEvent.addSubEvent( + refs.rootEvent.addSubEvent( Event.start() .name("The current check is untrusted because the start point of the check interval has been selected approximately") .status(FAILED) @@ -470,7 +502,7 @@ abstract class AbstractCheckTask( } private fun fillMissedStartMessageAndMessagesInIntervalEvent() { - rootEvent.addSubEvent( + refs.rootEvent.addSubEvent( Event.start() .name("Check cannot be executed because buffer for session alias '${sessionKey.sessionAlias}' and direction '${sessionKey.direction}' contains neither message in the requested check interval with sequence '$lastSequence' and checkpoint timestamp '${checkpointTimeout?.toJson()}'") .status(FAILED) @@ -479,7 +511,7 @@ abstract class AbstractCheckTask( } private fun fillEmptyStartMessageEvent() { - rootEvent.addSubEvent( + refs.rootEvent.addSubEvent( Event.start() .name("Buffer for session alias '${sessionKey.sessionAlias}' and direction '${sessionKey.direction}' doesn't contain starting message, but contains several messages in the requested check interval") .status(FAILED) @@ -721,7 +753,6 @@ abstract class AbstractCheckTask( null } - private data class Legacy(val executorService: ExecutorService, val sequenceData: SequenceData) private data class SequenceData(val lastSequence: Long, val lastMessageTimestamp: Timestamp?, val untrusted: Boolean) private data class PreviousExecutionData( @@ -740,4 +771,4 @@ abstract class AbstractCheckTask( val DEFAULT = PreviousExecutionData() } } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt index b94dd543..7c82cde1 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -35,53 +35,66 @@ import java.time.Instant /** * This rule checks for the presence of a single message in the messages stream. */ + class CheckRuleTask( ruleConfiguration: RuleConfiguration, startTime: Instant, sessionKey: SessionKey, - private val protoMessageFilter: RootMessageFilter, + protoMessageFilter: RootMessageFilter, parentEventID: EventID, messageStream: Observable, eventBatchRouter: MessageRouter ) : AbstractCheckTask(ruleConfiguration, startTime, sessionKey, parentEventID, messageStream, eventBatchRouter) { - private val messageFilter: SailfishFilter = SailfishFilter( - CONVERTER.fromProtoPreFilter(protoMessageFilter), - protoMessageFilter.toCompareSettings() - ) - private val metadataFilter: SailfishFilter? = protoMessageFilter.metadataFilterOrNull()?.let { - SailfishFilter( - CONVERTER.fromMetadataFilter(it, METADATA_MESSAGE_NAME), - it.toComparisonSettings() + protected class Refs( + rootEvent: Event, + val protoMessageFilter: RootMessageFilter, + val messageFilter: SailfishFilter, + val metadataFilter: SailfishFilter? + ) : AbstractCheckTask.Refs(rootEvent) + + override val refsKeeper = RefsKeeper( + Refs( + rootEvent = createRootEvent(), + protoMessageFilter = protoMessageFilter, + messageFilter = SailfishFilter( + CONVERTER.fromProtoPreFilter(protoMessageFilter), + protoMessageFilter.toCompareSettings() + ), + metadataFilter = protoMessageFilter.metadataFilterOrNull()?.let { + SailfishFilter( + CONVERTER.fromMetadataFilter(it, METADATA_MESSAGE_NAME), + it.toComparisonSettings() + ) + } ) - } + ) - override fun onStart() { - super.onStart() + private val refs get() = refsKeeper.refs + override fun onStartInit() { val subEvent = Event.start() .endTimestamp() .name("Message filter") .type("Filter") - .bodyData(protoMessageFilter.toReadableBodyCollection()) - - rootEvent.addSubEvent(subEvent) + .bodyData(refs.protoMessageFilter.toReadableBodyCollection()) + refs.rootEvent.addSubEvent(subEvent) } override fun onNext(messageContainer: MessageContainer) { - val aggregatedResult = matchFilter(messageContainer, messageFilter, metadataFilter) + val aggregatedResult = matchFilter(messageContainer, refs.messageFilter, refs.metadataFilter) - val container = ComparisonContainer(messageContainer, protoMessageFilter, aggregatedResult) + val container = ComparisonContainer(messageContainer, refs.protoMessageFilter, aggregatedResult) if (container.matchesByKeys) { - rootEvent.appendEventsWithVerification(container) + refs.rootEvent.appendEventsWithVerification(container) checkComplete() } } override fun onTimeout() { - rootEvent.addSubEventWithSamePeriod() + refs.rootEvent.addSubEventWithSamePeriod() .name("No message found by target keys") .type("Check failed") .status(FAILED) @@ -94,4 +107,4 @@ class CheckRuleTask( override fun setup(rootEvent: Event) { rootEvent.bodyData(EventUtils.createMessageBean("Check rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)"} }")) } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt index c580df8a..3addb4a5 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/nomessage/NoMessageCheckTask.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -43,41 +43,56 @@ class NoMessageCheckTask( eventBatchRouter: MessageRouter ) : AbstractCheckTask(ruleConfiguration, startTime, sessionKey, parentEventID, messageStream, eventBatchRouter) { - private val protoPreMessageFilter: RootMessageFilter = protoPreFilter.toRootMessageFilter() - private val messagePreFilter = SailfishFilter( - CONVERTER.fromProtoPreFilter(protoPreMessageFilter), - protoPreMessageFilter.toCompareSettings() - ) + protected class Refs( + rootEvent: Event, + val protoPreMessageFilter: RootMessageFilter, + val messagePreFilter: SailfishFilter, + val metadataPreFilter: SailfishFilter?, + ) : AbstractCheckTask.Refs(rootEvent) { + val preFilterEvent: Event by lazy { + Event.start() + .type("preFiltering") + .bodyData(protoPreMessageFilter.toTreeTable()) + } + val resultEvent: Event by lazy { + Event.start() + .type("noMessagesCheckResult") + } + } - private val metadataPreFilter: SailfishFilter? = protoPreMessageFilter.metadataFilterOrNull()?.let { - SailfishFilter( - CONVERTER.fromMetadataFilter(it, VerificationUtil.METADATA_MESSAGE_NAME), - it.toComparisonSettings() + override val refsKeeper = RefsKeeper(protoPreFilter.toRootMessageFilter().let { protoPreMessageFilter -> + Refs( + rootEvent = createRootEvent(), + protoPreMessageFilter = protoPreFilter.toRootMessageFilter(), + messagePreFilter = SailfishFilter( + CONVERTER.fromProtoPreFilter(protoPreMessageFilter), + protoPreMessageFilter.toCompareSettings() + ), + metadataPreFilter = protoPreMessageFilter.metadataFilterOrNull()?.let { + SailfishFilter( + CONVERTER.fromMetadataFilter(it, VerificationUtil.METADATA_MESSAGE_NAME), + it.toComparisonSettings() + ) + } ) - } + }) - private lateinit var preFilterEvent: Event - private lateinit var resultEvent: Event + private val refs get() = refsKeeper.refs private var extraMessagesCounter: Int = 0 - - override fun onStart() { - super.onStart() - preFilterEvent = Event.start() - .type("preFiltering") - .bodyData(protoPreMessageFilter.toTreeTable()) - rootEvent.addSubEvent(preFilterEvent) - resultEvent = Event.start() - .type("noMessagesCheckResult") - rootEvent.addSubEvent(resultEvent) + override fun onStartInit() { + with(refs) { + rootEvent.addSubEvent(preFilterEvent) + rootEvent.addSubEvent(resultEvent) + } } override fun Observable.taskPipeline(): Observable = - preFilterBy(this, protoPreMessageFilter, messagePreFilter, metadataPreFilter, LOGGER) { preFilterContainer -> // Update pre-filter state + preFilterBy(this, refs.protoPreMessageFilter, refs.messagePreFilter, refs.metadataPreFilter, LOGGER) { preFilterContainer -> // Update pre-filter state with(preFilterContainer) { - preFilterEvent.appendEventsWithVerification(preFilterContainer) - preFilterEvent.messageID(protoActual.metadata.id) + refs.preFilterEvent.appendEventsWithVerification(preFilterContainer) + refs.preFilterEvent.messageID(protoActual.metadata.id) } } @@ -92,17 +107,17 @@ class NoMessageCheckTask( override fun onNext(messageContainer: MessageContainer) { messageContainer.protoMessage.metadata.apply { extraMessagesCounter++ - resultEvent.messageID(id) + refs.resultEvent.messageID(id) } } override fun completeEvent(taskState: State) { - preFilterEvent.name("Prefilter: $extraMessagesCounter messages were filtered.") + refs.preFilterEvent.name("Prefilter: $extraMessagesCounter messages were filtered.") if (extraMessagesCounter == 0) { - resultEvent.status(Event.Status.PASSED).name("Check passed") + refs.resultEvent.status(Event.Status.PASSED).name("Check passed") } else { - resultEvent.status(Event.Status.FAILED) + refs.resultEvent.status(Event.Status.FAILED) .name("Check failed: $extraMessagesCounter extra messages were found.") } @@ -113,7 +128,7 @@ class NoMessageCheckTask( if (taskState != State.TIMEOUT || !isCheckpointLastReceivedMessage()) { executionStopEvent.status(Event.Status.FAILED) } - resultEvent.addSubEvent(executionStopEvent) + refs.resultEvent.addSubEvent(executionStopEvent) } } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt index 5b1810fc..b2a419fd 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.check1.rule.sequence import com.exactpro.th2.check1.SessionKey @@ -44,7 +45,7 @@ import com.exactpro.th2.common.schema.message.MessageRouter import com.google.protobuf.TextFormat.shortDebugString import io.reactivex.Observable import java.time.Instant -import java.util.LinkedHashMap +import kotlin.collections.HashSet import kotlin.collections.component1 import kotlin.collections.component2 import kotlin.collections.set @@ -57,51 +58,66 @@ import kotlin.collections.set * If **checkOrder** parameter is set to `true` the messages must be received in the exact same order as filters were specified. * If this parameter is set to `false`, the order won't be checked. */ + class SequenceCheckRuleTask( ruleConfiguration: RuleConfiguration, startTime: Instant, sessionKey: SessionKey, protoPreFilter: PreFilter, - private val protoMessageFilters: List, + protoMessageFilters: List, private val checkOrder: Boolean, parentEventID: EventID, messageStream: Observable, eventBatchRouter: MessageRouter ) : AbstractCheckTask(ruleConfiguration, startTime, sessionKey, parentEventID, messageStream, eventBatchRouter) { - private val protoPreMessageFilter: RootMessageFilter = protoPreFilter.toRootMessageFilter() - private val messagePreFilter = SailfishFilter( - CONVERTER.fromProtoPreFilter(protoPreMessageFilter), - protoPreMessageFilter.toCompareSettings() - ) - private val metadataPreFilter: SailfishFilter? = protoPreMessageFilter.metadataFilterOrNull()?.let { - SailfishFilter( - CONVERTER.fromMetadataFilter(it, VerificationUtil.METADATA_MESSAGE_NAME), - it.toComparisonSettings() - ) - } - private lateinit var preFilteringResults: MutableMap - - /** - * List of filters which haven't matched yet. It is created from the requested filters and reduced after every match - */ - private lateinit var messageFilters: MutableList + protected class Refs( + rootEvent: Event, + val protoMessageFilters: List, + val protoPreMessageFilter: RootMessageFilter, + val messagePreFilter: SailfishFilter, + val metadataPreFilter: SailfishFilter?, + ) : AbstractCheckTask.Refs(rootEvent) { + val preFilterEvent: Event by lazy { + Event.start() + .type("preFiltering") + .bodyData(protoPreMessageFilter.toReadableBodyCollection()) + } - private lateinit var messageFilteringResults: MutableMap + val preFilteringResults: MutableMap = LinkedHashMap() - private lateinit var preFilterEvent: Event + /** + * List of filters which haven't matched yet. It is created from the requested filters and reduced after every match + */ + lateinit var messageFilters: MutableList - private var reordered: Boolean = false - private lateinit var matchedByKeys: MutableSet + val messageFilteringResults: MutableMap = LinkedHashMap() + val matchedByKeys: MutableSet = HashSet(protoMessageFilters.size) + } + override val refsKeeper = RefsKeeper(protoPreFilter.toRootMessageFilter().let { protoPreMessageFilter -> + Refs( + rootEvent = createRootEvent(), + protoMessageFilters = protoMessageFilters, + protoPreMessageFilter = protoPreMessageFilter, + messagePreFilter = SailfishFilter( + CONVERTER.fromProtoPreFilter(protoPreMessageFilter), + protoPreMessageFilter.toCompareSettings() + ), + metadataPreFilter = protoPreMessageFilter.metadataFilterOrNull()?.let { + SailfishFilter( + CONVERTER.fromMetadataFilter(it, VerificationUtil.METADATA_MESSAGE_NAME), + it.toComparisonSettings() + ) + } + ) + }) - override fun onStart() { - super.onStart() + private val refs get() = refsKeeper.refs - //Init or re-init variable in TASK_SCHEDULER thread - preFilteringResults = LinkedHashMap() + private var reordered: Boolean = false - messageFilteringResults = LinkedHashMap() - messageFilters = protoMessageFilters.map { + override fun onStartInit() { + refs.messageFilters = refs.protoMessageFilters.map { MessageFilterContainer( it, SailfishFilter(CONVERTER.fromProtoPreFilter(it), it.toCompareSettings()), @@ -112,28 +128,21 @@ class SequenceCheckRuleTask( ) }.toMutableList() - matchedByKeys = HashSet(messageFilters.size) - - preFilterEvent = Event.start() - .type("preFiltering") - .bodyData(protoPreMessageFilter.toReadableBodyCollection()) - - rootEvent.addSubEvent(preFilterEvent) + refs.rootEvent.addSubEvent(refs.preFilterEvent) } override fun Observable.taskPipeline(): Observable = - preFilterBy(this, protoPreMessageFilter, messagePreFilter, metadataPreFilter, LOGGER) { preFilterContainer -> // Update pre-filter state + preFilterBy(this, refs.protoPreMessageFilter, refs.messagePreFilter, refs.metadataPreFilter, LOGGER) { preFilterContainer -> // Update pre-filter state with(preFilterContainer) { - preFilterEvent.appendEventsWithVerification(preFilterContainer) - preFilterEvent.messageID(protoActual.metadata.id) - - preFilteringResults[protoActual.metadata.id] = preFilterContainer + refs.preFilterEvent.appendEventsWithVerification(preFilterContainer) + refs.preFilterEvent.messageID(protoActual.metadata.id) + refs.preFilteringResults[protoActual.metadata.id] = preFilterContainer } } override fun onNext(messageContainer: MessageContainer) { - for (index in messageFilters.indices) { - val messageFilterContainer = messageFilters[index] + for (index in refs.messageFilters.indices) { + val messageFilterContainer = refs.messageFilters[index] val messageFilter: SailfishFilter = messageFilterContainer.messageFilter val metadataFilter: SailfishFilter? = messageFilterContainer.metadataFilter @@ -147,10 +156,10 @@ class SequenceCheckRuleTask( if (comparisonContainer.matchesByKeys) { reordered = reordered || index != 0 - messageFilters.removeAt(index) + refs.messageFilters.removeAt(index) - messageFilteringResults[messageContainer.protoMessage.metadata.id] = comparisonContainer - matchedByKeys.add(messageFilterContainer) + refs.messageFilteringResults[messageContainer.protoMessage.metadata.id] = comparisonContainer + refs.matchedByKeys.add(messageFilterContainer) requireNotNull(result.messageResult) { "Message result must not be null because the result said the message is matched by key fields. Filter: " + @@ -160,16 +169,15 @@ class SequenceCheckRuleTask( } } - val expectedMatches = protoMessageFilters.size + val expectedMatches = refs.protoMessageFilters.size // rule has found complete match for all filters or each filter has found a match by key fields at least - if (messageFilters.isEmpty() || (matchedByKeys.size == expectedMatches && messageFilteringResults.size >= expectedMatches)) { + if (refs.messageFilters.isEmpty() || (refs.matchedByKeys.size == expectedMatches && refs.messageFilteringResults.size >= expectedMatches)) { checkComplete() } } override fun completeEvent(taskState: State) { - preFilterEvent.name("Pre-filtering (filtered ${preFilteringResults.size} / processed $handledMessageCounter) messages") - + refs.preFilterEvent.name("Pre-filtering (filtered ${refs.preFilteringResults.size} / processed $handledMessageCounter) messages") fillSequenceEvent() fillCheckMessagesEvent() } @@ -186,14 +194,14 @@ class SequenceCheckRuleTask( * Creates events for check messages */ private fun fillCheckMessagesEvent() { - val checkMessagesEvent = rootEvent.addSubEventWithSamePeriod() + val checkMessagesEvent = refs.rootEvent.addSubEventWithSamePeriod() .name("Check messages") .type(CHECK_MESSAGES_TYPE) - .appendEventWithVerificationsAndFilters(protoMessageFilters, messageFilteringResults.values) - if (protoMessageFilters.size != messageFilteringResults.size) { - messageFilteringResults.values.map(ComparisonContainer::protoFilter) + .appendEventWithVerificationsAndFilters(refs.protoMessageFilters, refs.messageFilteringResults.values) + if (refs.protoMessageFilters.size != refs.messageFilteringResults.size) { + refs.messageFilteringResults.values.map(ComparisonContainer::protoFilter) checkMessagesEvent.status(FAILED) - .bodyData(createMessageBean("Incorrect number of comparisons (expected ${protoMessageFilters.size} / actual ${messageFilteringResults.size})")) + .bodyData(createMessageBean("Incorrect number of comparisons (expected ${refs.protoMessageFilters.size} / actual ${refs.messageFilteringResults.size})")) } else { checkMessagesEvent.bodyData(createMessageBean("Contains comparisons")) } @@ -204,25 +212,25 @@ class SequenceCheckRuleTask( */ private fun fillSequenceEvent() { val sequenceTable = TableBuilder() - preFilteringResults.forEach { (messageID: MessageID, comparisonContainer: ComparisonContainer) -> - val container = messageFilteringResults[messageID] + refs.preFilteringResults.forEach { (messageID: MessageID, comparisonContainer: ComparisonContainer) -> + val container = refs.messageFilteringResults[messageID] sequenceTable.row( container?.let { CheckSequenceUtils.createBothSide(it.sailfishActual, it.protoActual.metadata, it.protoFilter, sessionKey.sessionAlias) } ?: CheckSequenceUtils.createOnlyActualSide(comparisonContainer.sailfishActual, sessionKey.sessionAlias) ) } - messageFilters.forEach { messageFilter: MessageFilterContainer -> + refs.messageFilters.forEach { messageFilter: MessageFilterContainer -> sequenceTable.row(CheckSequenceUtils.createOnlyExpectedSide(messageFilter.protoMessageFilter, sessionKey.sessionAlias)) } - rootEvent.addSubEventWithSamePeriod() - .name("Check sequence (expected ${protoMessageFilters.size} / actual ${preFilteringResults.size} , check order $checkOrder)") + refs.rootEvent.addSubEventWithSamePeriod() + .name("Check sequence (expected ${refs.protoMessageFilters.size} / actual ${refs.preFilteringResults.size} , check order $checkOrder)") .type("checkSequence") - .status(if (protoMessageFilters.size == preFilteringResults.size + .status(if (refs.protoMessageFilters.size == refs.preFilteringResults.size && !(checkOrder && reordered)) PASSED else FAILED) .bodyData(MessageBuilder() - .text("Expected ${protoMessageFilters.size}, Actual ${preFilteringResults.size}" + + .text("Expected ${refs.protoMessageFilters.size}, Actual ${refs.preFilteringResults.size}" + if (checkOrder) ", " + if (reordered) "Out of order" else "In order" @@ -250,4 +258,4 @@ class SequenceCheckRuleTask( const val CHECK_MESSAGES_TYPE = "checkMessages" const val CHECK_SEQUENCE_TYPE = "checkSequence" } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt index 0fdfb386..3f13ba3d 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SilenceCheckTask.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Exactpro (Exactpro Systems Limited) + * Copyright 2021-2022 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -43,28 +43,48 @@ class SilenceCheckTask( messageStream: Observable, eventBatchRouter: MessageRouter ) : AbstractCheckTask(ruleConfiguration, submitTime, sessionKey, parentEventID, messageStream, eventBatchRouter) { - private val protoPreMessageFilter: RootMessageFilter = protoPreFilter.toRootMessageFilter() - private val messagePreFilter = SailfishFilter( - CONVERTER.fromProtoPreFilter(protoPreMessageFilter), - protoPreMessageFilter.toCompareSettings() - ) - private val metadataPreFilter: SailfishFilter? = protoPreMessageFilter.metadataFilterOrNull()?.let { - SailfishFilter( - CONVERTER.fromMetadataFilter(it, VerificationUtil.METADATA_MESSAGE_NAME), - it.toComparisonSettings() - ) + + protected class Refs( + rootEvent: Event, + val protoPreMessageFilter: RootMessageFilter, + val messagePreFilter: SailfishFilter, + val metadataPreFilter: SailfishFilter? + ) : AbstractCheckTask.Refs(rootEvent) { + val preFilterEvent: Event by lazy { + Event.start() + .type("preFiltering") + .bodyData(protoPreMessageFilter.toReadableBodyCollection()) + } + val resultEvent: Event by lazy { + Event.start() + .type("noMessagesCheckResult") + } } - private lateinit var preFilterEvent: Event - private lateinit var resultEvent: Event + + override val refsKeeper = RefsKeeper(protoPreFilter.toRootMessageFilter().let { protoPreMessageFilter -> + Refs( + rootEvent = createRootEvent(), + protoPreMessageFilter = protoPreMessageFilter, + messagePreFilter = SailfishFilter( + CONVERTER.fromProtoPreFilter(protoPreMessageFilter), + protoPreMessageFilter.toCompareSettings() + ), + metadataPreFilter = protoPreMessageFilter.metadataFilterOrNull()?.let { + SailfishFilter( + CONVERTER.fromMetadataFilter(it, VerificationUtil.METADATA_MESSAGE_NAME), + it.toComparisonSettings() + ) + } + ) + }) + + private val refs get() = refsKeeper.refs + private var extraMessagesCounter: Int = 0 - @Volatile - private var started = false private val isCanceled = AtomicBoolean() - override fun onStart() { - super.onStart() - started = true + override fun onStartInit() { val hasNextTask = hasNextTask() if (isParentCompleted == false || hasNextTask) { if (hasNextTask) { @@ -75,15 +95,11 @@ class SilenceCheckTask( cancel() return } - preFilterEvent = Event.start() - .type("preFiltering") - .bodyData(protoPreMessageFilter.toReadableBodyCollection()) - rootEvent.addSubEvent(preFilterEvent) - - resultEvent = Event.start() - .type("noMessagesCheckResult") - rootEvent.addSubEvent(resultEvent) + with(refs) { + rootEvent.addSubEvent(preFilterEvent) + rootEvent.addSubEvent(resultEvent) + } } override fun onChainedTaskSubscription() { @@ -105,17 +121,17 @@ class SilenceCheckTask( } override fun Observable.taskPipeline(): Observable = - preFilterBy(this, protoPreMessageFilter, messagePreFilter, metadataPreFilter, LOGGER) { preFilterContainer -> // Update pre-filter state + preFilterBy(this, refs.protoPreMessageFilter, refs.messagePreFilter, refs.metadataPreFilter, LOGGER) { preFilterContainer -> // Update pre-filter state with(preFilterContainer) { - preFilterEvent.appendEventsWithVerification(preFilterContainer) - preFilterEvent.messageID(protoActual.metadata.id) + refs.preFilterEvent.appendEventsWithVerification(preFilterContainer) + refs.preFilterEvent.messageID(protoActual.metadata.id) } } override fun onNext(container: MessageContainer) { container.protoMessage.metadata.apply { extraMessagesCounter++ - resultEvent.messageID(id) + refs.resultEvent.messageID(id) } } @@ -123,12 +139,13 @@ class SilenceCheckTask( if (skipPublication) { return } - preFilterEvent.name("Prefilter: $extraMessagesCounter messages were filtered.") + + refs.preFilterEvent.name("Prefilter: $extraMessagesCounter messages were filtered.") if (extraMessagesCounter == 0) { - resultEvent.status(Event.Status.PASSED).name("Check passed") + refs.resultEvent.status(Event.Status.PASSED).name("Check passed") } else { - resultEvent.status(Event.Status.FAILED) + refs.resultEvent.status(Event.Status.FAILED) .name("Check failed: $extraMessagesCounter extra messages were found.") } }