diff --git a/README.md b/README.md index 44e47aa0..04afc8fa 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# th2 check1 (3.7.2) +# th2 check1 (3.8.0) ## Overview @@ -108,8 +108,39 @@ spec: - "parsed" ``` +## Prometheus metrics +The Check1 component publishes Prometheus metrics to observe the actual state of it +* `th2_check1_actual_cache_number` - actual number of messages in caches +* `th2_check1_active_tasks_number` - actual number of currently working rules + +The `th2_check1_actual_cache_number` metric separate messages with two labels: +* `session_alias` - session alias of received message +* `direction` - direction of received message + +The `th2_check1_active_tasks_number` metric separate rules with label `rule_type` + ## Release Notes +### 3.8.0 + +#### Added: ++ Added check for positive timeout ++ Added mechanism for handling exceptions when creating and executing rules which publishes events about an error that has occurred ++ Added metric for monitoring active rules and messages count ++ Added check for required message type in the message filter ++ Provided more detailed logging in comparable messages ++ Provided the ability to attach verification description to event ++ Provided the ability to verify repeating groups according to defined filters via `check_repeating_group_order` parameter in the `RootComparisonSettings` message + +#### Changed: ++ Migrated `common` version from `3.25.0` to `3.26.4` + + Added support for converting SimpleList to readable payload body + + Added the new `description` parameter to `RootMessageFilter` message ++ Migrated `grpc-check1` version from `3.2.0` to `3.4.2` ++ Migrated sailfish-utils from `3.7.0` to `3.8.1` + + Now Check1 keep the order of repeating result groups by default + + Fix IN, NOT_IN FilterOperation interaction + ### 3.7.2 #### Changed: diff --git a/build.gradle b/build.gradle index b55a51eb..4de63bf9 100644 --- a/build.gradle +++ b/build.gradle @@ -165,9 +165,9 @@ signing { dependencies { api platform('com.exactpro.th2:bom:3.0.0') - implementation 'com.exactpro.th2:grpc-check1:3.2.0' - implementation 'com.exactpro.th2:common:3.25.0' - implementation 'com.exactpro.th2:sailfish-utils:3.7.0' + implementation 'com.exactpro.th2:grpc-check1:3.4.2' + implementation 'com.exactpro.th2:common:3.26.4' + implementation 'com.exactpro.th2:sailfish-utils:3.9.1' implementation "org.slf4j:slf4j-log4j12" implementation "org.slf4j:slf4j-api" @@ -177,6 +177,10 @@ dependencies { implementation "io.reactivex.rxjava2:rxjava:2.2.19" // https://github.com/salesforce/reactive-grpc/issues/202 + implementation('io.prometheus:simpleclient') { + because('metrics from messages and rules') + } + testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2' testImplementation 'org.jetbrains.kotlin:kotlin-test-junit' testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0" @@ -202,4 +206,4 @@ dockerPrepare { docker { copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) -} +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 95ad3705..a340e47d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -release_version = 3.7.2 +release_version = 3.8.0 description = 'th2 check1 box' diff --git a/src/main/java/com/exactpro/th2/check1/event/CheckSequenceUtils.java b/src/main/java/com/exactpro/th2/check1/event/CheckSequenceUtils.java index 90165f59..c84cd766 100644 --- a/src/main/java/com/exactpro/th2/check1/event/CheckSequenceUtils.java +++ b/src/main/java/com/exactpro/th2/check1/event/CheckSequenceUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2020 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2021 Exactpro (Exactpro Systems Limited) * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -16,6 +16,7 @@ import java.util.Map.Entry; import com.exactpro.th2.check1.event.bean.CheckSequenceRow; +import com.exactpro.th2.common.grpc.FilterOperation; import com.exactpro.th2.common.grpc.MessageFilter; import com.exactpro.th2.common.grpc.MessageMetadata; import com.exactpro.th2.common.grpc.MetadataFilter; @@ -134,6 +135,8 @@ private static String getKeyFields(String name, ValueFilter valueFilter) { messageFilter.getFieldsMap().forEach((childName, filter) -> result.append(getKeyFields(childName, filter))); } else if (valueFilter.hasListFilter()) { valueFilter.getListFilter().getValuesList().forEach(filter -> result.append(getKeyFields(name, filter))); + } else if (valueFilter.hasSimpleList() && (valueFilter.getOperation() == FilterOperation.IN || valueFilter.getOperation() == FilterOperation.NOT_IN)) { + result.append(", ").append(name).append(' ').append(valueFilter.getOperation()).append(' ').append(valueFilter.getSimpleList().getSimpleValuesList()); } else if (valueFilter.getKey()) { result.append(", ").append(name).append('=').append(valueFilter.getSimpleFilter()); } @@ -142,6 +145,9 @@ private static String getKeyFields(String name, ValueFilter valueFilter) { private static String getKeyFields(String name, SimpleFilter valueFilter) { if (valueFilter.getKey()) { + if (valueFilter.hasSimpleList() && (valueFilter.getOperation() == FilterOperation.IN || valueFilter.getOperation() == FilterOperation.NOT_IN)) { + return ", " + name + ' ' + valueFilter.getOperation() + ' ' + valueFilter.getSimpleList().getSimpleValuesList(); + } return ", " + name + '=' + valueFilter.getValue(); } return ""; diff --git a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt index 922bd783..fcea0869 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt @@ -17,21 +17,16 @@ import com.exactpro.th2.check1.grpc.ChainID import com.exactpro.th2.check1.grpc.CheckRuleRequest import com.exactpro.th2.check1.grpc.CheckSequenceRuleRequest import com.exactpro.th2.check1.grpc.CheckpointRequestOrBuilder +import com.exactpro.th2.check1.metrics.BufferMetric import com.exactpro.th2.check1.rule.AbstractCheckTask -import com.exactpro.th2.check1.rule.check.CheckRuleTask -import com.exactpro.th2.check1.rule.sequence.SequenceCheckRuleTask +import com.exactpro.th2.check1.rule.RuleFactory import com.exactpro.th2.common.event.Event import com.exactpro.th2.common.event.EventUtils -import com.exactpro.th2.common.grpc.ComparisonSettings import com.exactpro.th2.common.grpc.ConnectionID -import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.EventBatch import com.exactpro.th2.common.grpc.EventID import com.exactpro.th2.common.grpc.MessageBatch -import com.exactpro.th2.common.grpc.MessageFilter import com.exactpro.th2.common.grpc.MessageID -import com.exactpro.th2.common.grpc.RootComparisonSettings -import com.exactpro.th2.common.grpc.RootMessageFilter import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.common.schema.message.SubscriberMonitor @@ -64,41 +59,34 @@ class CollectorService( private val olderThanDelta = configuration.cleanupOlderThan private val olderThanTimeUnit = configuration.cleanupTimeUnit private val maxEventBatchContentSize = configuration.maxEventBatchContentSize + + private var ruleFactory: RuleFactory init { + BufferMetric.configure(configuration) + val limitSize = configuration.messageCacheSize mqSubject = PublishSubject.create() subscriberMonitor = subscribe(MessageListener { _: String, batch: MessageBatch -> mqSubject.onNext(batch) }) streamObservable = mqSubject.flatMapIterable(MessageBatch::getMessagesList) - .groupBy { message -> message.metadata.id.run { SessionKey(connectionId.sessionAlias, direction) } } + .groupBy { message -> + message.metadata.id.run { + SessionKey(connectionId.sessionAlias, direction) + }.also(BufferMetric::processMessage) + } .map { group -> StreamContainer(group.key!!, limitSize, group) } .replay().apply { connect() } checkpointSubscriber = streamObservable.subscribeWith(CheckpointSubscriber()) + + ruleFactory = RuleFactory(maxEventBatchContentSize, streamObservable, eventBatchRouter) } @Throws(InterruptedException::class) fun verifyCheckRule(request: CheckRuleRequest): ChainID { - check(request.hasParentEventId()) { "Parent event id can't be null" } - val parentEventID: EventID = request.parentEventId - check(request.connectivityId.sessionAlias.isNotEmpty()) { "Session alias cannot be empty" } - val sessionAlias: String = request.connectivityId.sessionAlias - - check(request.kindCase != CheckRuleRequest.KindCase.KIND_NOT_SET) { - "Either old filter or root filter must be set" - } - val filter: RootMessageFilter = if (request.hasRootFilter()) { - request.rootFilter - } else { - request.filter.toRootMessageFilter() - } - val direction = directionOrDefault(request.direction) - val chainID = request.getChainIdOrGenerate() - - val task = CheckRuleTask(request.description, Instant.now(), SessionKey(sessionAlias, direction), request.timeout, maxEventBatchContentSize, - filter, parentEventID, streamObservable, eventBatchRouter) + val task = ruleFactory.createCheckRule(request) cleanupTasksOlderThan(olderThanDelta, olderThanTimeUnit) @@ -110,26 +98,8 @@ class CollectorService( @Throws(InterruptedException::class) fun verifyCheckSequenceRule(request: CheckSequenceRuleRequest): ChainID { - check(request.hasParentEventId()) { "Parent event id can't be null" } - val parentEventID: EventID = request.parentEventId - check(request.connectivityId.sessionAlias.isNotEmpty()) { "Session alias cannot be empty" } - val sessionAlias: String = request.connectivityId.sessionAlias - val direction = directionOrDefault(request.direction) - - check((request.messageFiltersList.isEmpty() && request.rootMessageFiltersList.isNotEmpty()) - || (request.messageFiltersList.isNotEmpty() && request.rootMessageFiltersList.isEmpty())) { - "Either messageFilters or rootMessageFilters must be set but not both" - } - val chainID = request.getChainIdOrGenerate() - - val protoMessageFilters: List = if (request.rootMessageFiltersList.isNotEmpty()) { - request.rootMessageFiltersList - } else { - request.messageFiltersList.map { it.toRootMessageFilter() } - } - val task = SequenceCheckRuleTask(request.description, Instant.now(), SessionKey(sessionAlias, direction), request.timeout, maxEventBatchContentSize, - request.preFilter, protoMessageFilters, request.checkOrder, parentEventID, streamObservable, eventBatchRouter) + val task = ruleFactory.createSequenceCheckRule(request) cleanupTasksOlderThan(olderThanDelta, olderThanTimeUnit) @@ -139,24 +109,6 @@ class CollectorService( return chainID } - private fun MessageFilter.toRootMessageFilter(): RootMessageFilter { - return RootMessageFilter.newBuilder() - .setMessageType(this.messageType) - .setComparisonSettings(this.comparisonSettings.toRootComparisonSettings()) - .setMessageFilter(this) - .build() - } - - private fun ComparisonSettings.toRootComparisonSettings(): RootComparisonSettings { - return RootComparisonSettings.newBuilder() - .addAllIgnoreFields(this.ignoreFieldsList) - .build() - } - - - private fun directionOrDefault(direction: Direction) = - if (direction == Direction.UNRECOGNIZED) Direction.FIRST else direction - private fun AbstractCheckTask.addToChainOrBegin( value: AbstractCheckTask?, checkpoint: com.exactpro.th2.common.grpc.Checkpoint diff --git a/src/main/kotlin/com/exactpro/th2/check1/exception/RuleExceptions.kt b/src/main/kotlin/com/exactpro/th2/check1/exception/RuleExceptions.kt new file mode 100644 index 00000000..053e6b02 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/check1/exception/RuleExceptions.kt @@ -0,0 +1,20 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.check1.exception + +open class RuleException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) : Exception(message, cause) + +class RuleCreationException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) : RuleException(message, cause) + +class RuleInternalException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) : RuleException(message, cause) diff --git a/src/main/kotlin/com/exactpro/th2/check1/metrics/BufferMetric.kt b/src/main/kotlin/com/exactpro/th2/check1/metrics/BufferMetric.kt new file mode 100644 index 00000000..a415547d --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/check1/metrics/BufferMetric.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.check1.metrics + +import com.exactpro.th2.check1.SessionKey +import com.exactpro.th2.check1.configuration.Check1Configuration +import com.exactpro.th2.common.metrics.DEFAULT_DIRECTION_LABEL_NAME +import com.exactpro.th2.common.metrics.DEFAULT_SESSION_ALIAS_LABEL_NAME +import io.prometheus.client.Counter +import java.util.concurrent.ConcurrentHashMap +import kotlin.math.min + +object BufferMetric { + + private val actualBufferCountMetric: Counter = Counter + .build("th2_check1_actual_cache_number", "The actual number of messages in caches") + .labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME) + .register() + + private val bufferMessagesSizeBySessionKey: MutableMap = ConcurrentHashMap() + private var maxBufferSize: Int = -1 + + fun configure(configuration: Check1Configuration) { + this.maxBufferSize = configuration.messageCacheSize + } + + fun processMessage(sessionKey: SessionKey) { + val labels = arrayOf(sessionKey.sessionAlias, sessionKey.direction.name) + + bufferMessagesSizeBySessionKey.compute(sessionKey) { _, old -> + min(maxBufferSize, (old ?: 0) + 1).also { + if (it != old) { + actualBufferCountMetric.labels(*labels).inc() + } + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/metrics/RuleMetric.kt b/src/main/kotlin/com/exactpro/th2/check1/metrics/RuleMetric.kt new file mode 100644 index 00000000..ae5f4083 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/check1/metrics/RuleMetric.kt @@ -0,0 +1,31 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.check1.metrics + +import io.prometheus.client.Gauge + +object RuleMetric { + private val ACTIVE_TASK_COUNTER = Gauge + .build("th2_check1_active_tasks_number", "The number of currently working rules") + .labelNames("rule_type") + .register() + + fun incrementActiveRule(ruleType: String) { + ACTIVE_TASK_COUNTER.labels(ruleType).inc() + } + + fun decrementActiveRule(ruleType: String) { + ACTIVE_TASK_COUNTER.labels(ruleType).dec() + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt index 714ec9fd..3a1683d5 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt @@ -13,6 +13,7 @@ package com.exactpro.th2.check1.rule +import com.exactpro.sf.common.messages.IMessage import com.exactpro.sf.comparison.ComparatorSettings import com.exactpro.sf.comparison.ComparisonResult import com.exactpro.sf.comparison.MessageComparator @@ -21,6 +22,8 @@ import com.exactpro.th2.check1.AbstractSessionObserver import com.exactpro.th2.check1.SessionKey import com.exactpro.th2.check1.StreamContainer import com.exactpro.th2.check1.event.bean.builder.VerificationBuilder +import com.exactpro.th2.check1.exception.RuleInternalException +import com.exactpro.th2.check1.metrics.RuleMetric import com.exactpro.th2.check1.util.VerificationUtil import com.exactpro.th2.common.event.Event import com.exactpro.th2.common.event.Event.Status.FAILED @@ -34,6 +37,7 @@ import com.exactpro.th2.common.grpc.MessageFilter import com.exactpro.th2.common.grpc.MessageMetadata import com.exactpro.th2.common.grpc.MetadataFilter import com.exactpro.th2.common.grpc.RootMessageFilter +import com.exactpro.th2.common.message.toJson import com.exactpro.th2.common.message.toReadableBodyCollection import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.sailfish.utils.ProtoToIMessageConverter @@ -71,13 +75,15 @@ abstract class AbstractCheckTask( require(maxEventBatchContentSize > 0) { "'maxEventBatchContentSize' should be greater than zero, actual: $maxEventBatchContentSize" } + require(timeout > 0) { + "'timeout' should be set or be greater than zero, actual: $timeout" + } } protected var handledMessageCounter: Long = 0 protected val converter = ProtoToIMessageConverter(VerificationUtil.FACTORY_PROXY, null, null) - protected val rootEvent: Event = Event.from(submitTime) - .description(description) + protected val rootEvent: Event = Event.from(submitTime).description(description) private val sequenceSubject = SingleSubject.create() private val hasNextTask = AtomicBoolean(false) @@ -199,6 +205,10 @@ abstract class AbstractCheckTask( */ protected open fun Observable.taskPipeline() : Observable = this + protected abstract fun name(): String + protected abstract fun type(): String + protected abstract fun setup(rootEvent: Event) + /** * Observe a message sequence from the previous task. * Task subscribe to messages stream with sequence after call. @@ -208,10 +218,12 @@ abstract class AbstractCheckTask( * @throws IllegalStateException when method is called more than once. */ private fun begin(sequence: Long = DEFAULT_SEQUENCE, executorService: ExecutorService = createExecutorService()) { + configureRootEvent() if (!taskState.compareAndSet(State.CREATED, State.BEGIN)) { throw IllegalStateException("Task $description already has been started") } LOGGER.info("Check begin for session alias '{}' with sequence '{}' timeout '{}'", sessionKey, sequence, timeout) + RuleMetric.incrementActiveRule(type()) this.lastSequence = sequence this.executorService = executorService val scheduler = Schedulers.from(executorService) @@ -219,29 +231,40 @@ abstract class AbstractCheckTask( endFuture = Single.timer(timeout, MILLISECONDS, Schedulers.computation()) .subscribe { _ -> end("Timeout is exited") } - messageStream.observeOn(scheduler) // Defined scheduler to execution in one thread to avoid race-condition. - .doFinally(this::taskFinished) // will be executed if the source is complete or an error received or the timeout is exited. - - // All sources above will be disposed on this scheduler. - // - // This method should be called as closer as possible - // to the actual dispose that you want to execute on this scheduler - // because other operations are executed on the same single-thread scheduler. - // - // If we move [Observable#unsubscribeOn] after them, they won't be disposed until the scheduler is free. - // In the worst-case scenario, it might never happen. - .unsubscribeOn(scheduler) - .continueObserve(sessionKey, sequence) - .doOnNext { - handledMessageCounter++ - - with(it.metadata.id) { - rootEvent.messageID(this) - } - } - .mapToMessageContainer() - .taskPipeline() - .subscribe(this) + try { + messageStream.observeOn(scheduler) // Defined scheduler to execution in one thread to avoid race-condition. + .doFinally(this::taskFinished) // will be executed if the source is complete or an error received or the timeout is exited. + + // All sources above will be disposed on this scheduler. + // + // This method should be called as closer as possible + // to the actual dispose that you want to execute on this scheduler + // because other operations are executed on the same single-thread scheduler. + // + // If we move [Observable#unsubscribeOn] after them, they won't be disposed until the scheduler is free. + // In the worst-case scenario, it might never happen. + .unsubscribeOn(scheduler) + .continueObserve(sessionKey, sequence) + .doOnNext { + handledMessageCounter++ + + with(it.metadata.id) { + rootEvent.messageID(this) + } + } + .mapToMessageContainer() + .taskPipeline() + .subscribe(this) + } catch (exception: Exception) { + LOGGER.error("An internal error occurred while executing rule", exception) + rootEvent.addSubEventWithSamePeriod() + .name("An error occurred while executing rule") + .type("internalError") + .status(FAILED) + .exception(exception, true) + taskFinished() + throw RuleInternalException("An internal error occurred while executing rule", exception) + } } private fun taskFinished() { @@ -267,6 +290,7 @@ abstract class AbstractCheckTask( .toProto(parentEventID)) .build()) } finally { + RuleMetric.decrementActiveRule(type()) sequenceSubject.onSuccess(Legacy(executorService, lastSequence)) } } @@ -314,7 +338,7 @@ abstract class AbstractCheckTask( private fun publishEvent() { val prevState = taskState.getAndSet(State.PUBLISHED) if (prevState != State.PUBLISHED) { - completeEvent(prevState == State.TIMEOUT) + completeEventOrReportError(prevState) _endTime = Instant.now() val batches = rootEvent.disperseToBatches(maxEventBatchContentSize, parentEventID) @@ -337,6 +361,25 @@ abstract class AbstractCheckTask( } } + private fun completeEventOrReportError(prevState: State) { + try { + completeEvent(prevState == State.TIMEOUT) + } catch (e: Exception) { + LOGGER.error("Result event cannot be completed", e) + rootEvent.addSubEventWithSamePeriod() + .name("Check result event cannot build completely") + .type("eventNotComplete") + .bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build")) + .bodyData(EventUtils.createMessageBean(e.message)) + .status(FAILED) + } + } + + private fun configureRootEvent() { + rootEvent.name(name()).type(type()) + setup(rootEvent) + } + protected fun matchFilter( messageContainer: MessageContainer, messageFilter: SailfishFilter, @@ -349,21 +392,27 @@ abstract class AbstractCheckTask( messageContainer.metadataMessage, it.message, it.comparatorSettings, matchNames - ).also { comparisonResult -> + )?.also { comparisonResult -> LOGGER.debug("Metadata comparison result\n {}", comparisonResult) } } if (metadataFilter != null && metadataComparisonResult == null) { if (LOGGER.isDebugEnabled) { LOGGER.debug("Metadata for message {} does not match the filter by key fields. Skip message checking", - shortDebugString(messageContainer.protoMessage.metadata.id)) + messageContainer.protoMessage.metadata.id.toJson()) } return AggregatedFilterResult.EMPTY } val comparisonResult: ComparisonResult? = messageFilter.let { MessageComparator.compare(messageContainer.sailfishMessage, it.message, it.comparatorSettings, matchNames) } - LOGGER.debug("Compare message '{}' result\n{}", messageContainer.sailfishMessage.name, comparisonResult) + + if (comparisonResult == null) { + LOGGER.debug("Comparison result for the message '{}' with the message `{}` does not match the filter by key fields or message type", + messageContainer.sailfishMessage.name, messageFilter.message.name) + } else { + LOGGER.debug("Compare message '{}' result\n{}", messageContainer.sailfishMessage.name, comparisonResult) + } return if (comparisonResult != null || metadataComparisonResult != null) { if (significant) { @@ -387,6 +436,11 @@ abstract class AbstractCheckTask( ComparatorSettings().also { it.metaContainer = VerificationUtil.toMetaContainer(this.messageFilter, false) it.ignoredFields = this.comparisonSettings.ignoreFieldsList.toSet() + if (this.comparisonSettings.checkRepeatingGroupOrder) { + it.isCheckGroupsOrder = true + } else { + it.isKeepResultGroupOrder = true + } } protected fun MessageFilter.toCompareSettings(): ComparatorSettings = @@ -399,10 +453,10 @@ abstract class AbstractCheckTask( it.metaContainer = VerificationUtil.toMetaContainer(this) } - protected fun Event.appendEventWithVerification(protoMessage: Message, protoMessageFilter: MessageFilter, comparisonResult: ComparisonResult): Event { + protected fun Event.appendEventWithVerification(protoMessage: Message, protoFilter: RootMessageFilter, comparisonResult: ComparisonResult): Event { val verificationComponent = VerificationBuilder() comparisonResult.results.forEach { (key: String?, value: ComparisonResult?) -> - verificationComponent.verification(key, value, protoMessageFilter, true) + verificationComponent.verification(key, value, protoFilter.messageFilter, true) } with(protoMessage.metadata) { @@ -411,6 +465,9 @@ abstract class AbstractCheckTask( .status(if (comparisonResult.getStatusType() == StatusType.FAILED) FAILED else PASSED) .messageID(id) .bodyData(verificationComponent.build()) + if (protoFilter.hasDescription()) { + description(protoFilter.description.value) + } } return this } @@ -452,13 +509,18 @@ abstract class AbstractCheckTask( protected fun Event.appendEventsWithVerification(comparisonContainer: ComparisonContainer): Event = this.apply { val protoFilter = comparisonContainer.protoFilter addSubEventWithSamePeriod() - .appendEventWithVerification(comparisonContainer.protoActual, protoFilter.messageFilter, comparisonContainer.result.messageResult!!) + .appendEventWithVerification(comparisonContainer.protoActual, protoFilter, comparisonContainer.result.messageResult!!) if (protoFilter.hasMetadataFilter()) { addSubEventWithSamePeriod() .appendEventWithVerification(comparisonContainer.protoActual.metadata, protoFilter.metadataFilter, comparisonContainer.result.metadataResult!!) } } + protected fun ProtoToIMessageConverter.fromProtoPreFilter(protoPreMessageFilter: RootMessageFilter, + messageName: String = protoPreMessageFilter.messageType): IMessage { + return fromProtoFilter(protoPreMessageFilter.messageFilter, messageName) + } + private fun Observable.mapToMessageContainer(): Observable = map { message -> MessageContainer(message, converter.fromProtoMessage(message, false)) } diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/RuleFactory.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/RuleFactory.kt new file mode 100644 index 00000000..aa3479cf --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/RuleFactory.kt @@ -0,0 +1,198 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.check1.rule + +import com.exactpro.th2.check1.SessionKey +import com.exactpro.th2.check1.StreamContainer +import com.exactpro.th2.check1.exception.RuleCreationException +import com.exactpro.th2.check1.exception.RuleInternalException +import com.exactpro.th2.check1.grpc.CheckRuleRequest +import com.exactpro.th2.check1.grpc.CheckSequenceRuleRequest +import com.exactpro.th2.check1.rule.check.CheckRuleTask +import com.exactpro.th2.check1.rule.sequence.SequenceCheckRuleTask +import com.exactpro.th2.common.event.Event +import com.exactpro.th2.common.grpc.ComparisonSettings +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.grpc.EventBatch +import com.exactpro.th2.common.grpc.EventID +import com.exactpro.th2.common.grpc.MessageFilter +import com.exactpro.th2.common.grpc.RootComparisonSettings +import com.exactpro.th2.common.grpc.RootMessageFilter +import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.schema.message.MessageRouter +import com.google.protobuf.GeneratedMessageV3 +import io.reactivex.Observable +import mu.KotlinLogging +import org.slf4j.Logger +import java.time.Instant +import java.util.concurrent.ForkJoinPool + +class RuleFactory( + private val maxEventBatchContentSize: Int, + private val streamObservable: Observable, + private val eventBatchRouter: MessageRouter +) { + + fun createCheckRule(request: CheckRuleRequest): CheckRuleTask = + ruleCreation(request, request.parentEventId) { + checkAndCreateRule { request -> + check(request.hasParentEventId()) { "Parent event id can't be null" } + check(request.connectivityId.sessionAlias.isNotEmpty()) { "Session alias cannot be empty" } + val sessionAlias: String = request.connectivityId.sessionAlias + + check(request.kindCase != CheckRuleRequest.KindCase.KIND_NOT_SET) { + "Either old filter or root filter must be set" + } + val filter: RootMessageFilter = if (request.hasRootFilter()) { + request.rootFilter + } else { + request.filter.toRootMessageFilter() + }.also { it.validateRootMessageFilter() } + val direction = directionOrDefault(request.direction) + + CheckRuleTask( + request.description, + Instant.now(), + SessionKey(sessionAlias, direction), + request.timeout, + maxEventBatchContentSize, + filter, + request.parentEventId, + streamObservable, + eventBatchRouter + ) + } + onErrorEvent { + Event.start() + .name("Check rule cannot be created") + .type("checkRuleCreation") + } + } + + fun createSequenceCheckRule(request: CheckSequenceRuleRequest): SequenceCheckRuleTask = + ruleCreation(request, request.parentEventId) { + checkAndCreateRule { request -> + check(request.hasParentEventId()) { "Parent event id can't be null" } + check(request.connectivityId.sessionAlias.isNotEmpty()) { "Session alias cannot be empty" } + val sessionAlias: String = request.connectivityId.sessionAlias + val direction = directionOrDefault(request.direction) + + check((request.messageFiltersList.isEmpty() && request.rootMessageFiltersList.isNotEmpty()) + || (request.messageFiltersList.isNotEmpty() && request.rootMessageFiltersList.isEmpty())) { + "Either messageFilters or rootMessageFilters must be set but not both" + } + + val protoMessageFilters: List = request.rootMessageFiltersList.ifEmpty { + request.messageFiltersList.map { it.toRootMessageFilter() } + }.onEach { it.validateRootMessageFilter() } + + SequenceCheckRuleTask( + request.description, + Instant.now(), + SessionKey(sessionAlias, direction), + request.timeout, + maxEventBatchContentSize, + request.preFilter, + protoMessageFilters, + request.checkOrder, + request.parentEventId, + streamObservable, + eventBatchRouter + ) + } + onErrorEvent { + Event.start() + .name("Sequence check rule cannot be created") + .type("sequenceCheckRuleCreation") + } + } + + + private inline fun ruleCreation(request: T, parentEventId: EventID, block: RuleCreationContext.() -> Unit): R { + val ruleCreationContext = RuleCreationContext().apply(block) + try { + return ruleCreationContext.action(request) + } catch (e: RuleInternalException) { + throw e + } catch (e: Exception) { + val rootEvent = ruleCreationContext.event() + rootEvent.addSubEventWithSamePeriod() + .name("An error occurred while creating rule") + .type("ruleCreationException") + .exception(e, true) + .status(Event.Status.FAILED) + publishEvents(rootEvent, parentEventId) + throw RuleCreationException("An error occurred while creating rule", e) + } + } + + private fun MessageFilter.toRootMessageFilter(): RootMessageFilter { + return RootMessageFilter.newBuilder() + .setMessageType(this.messageType) + .setComparisonSettings(this.comparisonSettings.toRootComparisonSettings()) + .setMessageFilter(this) + .build() + } + + private fun RootMessageFilter.validateRootMessageFilter() { + check(this.messageType.isNotBlank()) { "Rule cannot be executed because the message filter does not contain 'message type'" } + } + + private fun ComparisonSettings.toRootComparisonSettings(): RootComparisonSettings { + return RootComparisonSettings.newBuilder() + .addAllIgnoreFields(this.ignoreFieldsList) + .build() + } + + private fun directionOrDefault(direction: Direction) = + if (direction == Direction.UNRECOGNIZED) Direction.FIRST else direction + + private fun publishEvents(event: Event, parentEventId: EventID) { + if (parentEventId == EventID.getDefaultInstance()) { + return + } + + val batch = EventBatch.newBuilder() + .addAllEvents(event.toListProto(parentEventId)) + .build() + RESPONSE_EXECUTOR.execute { + try { + eventBatchRouter.send(batch) + if (LOGGER.isDebugEnabled) { + LOGGER.debug("Sent event batch '{}'", batch.toJson()) + } + } catch (e: Exception) { + LOGGER.error("Can not send event batch '{}'", batch.toJson(), e) + } + } + } + + private class RuleCreationContext { + lateinit var action: (T) -> R + lateinit var event: () -> Event + + fun checkAndCreateRule(block: (T) -> R) { + action = block + } + + fun onErrorEvent(block: () -> Event) { + event = block + } + } + + companion object { + private val LOGGER: Logger = KotlinLogging.logger { } + private val RESPONSE_EXECUTOR = ForkJoinPool.commonPool() + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt index 7a142796..ab90e41a 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt @@ -23,7 +23,6 @@ import com.exactpro.th2.check1.util.VerificationUtil.METADATA_MESSAGE_NAME import com.exactpro.th2.common.event.Event import com.exactpro.th2.common.event.Event.Status.FAILED import com.exactpro.th2.common.event.EventUtils -import com.exactpro.th2.common.event.EventUtils.createMessageBean import com.exactpro.th2.common.grpc.EventBatch import com.exactpro.th2.common.grpc.EventID import com.exactpro.th2.common.grpc.RootMessageFilter @@ -48,7 +47,7 @@ class CheckRuleTask( ) : AbstractCheckTask(description, timeout, maxEventBatchContentSize, startTime, sessionKey, parentEventID, messageStream, eventBatchRouter) { private val messageFilter: SailfishFilter = SailfishFilter( - converter.fromProtoFilter(protoMessageFilter.messageFilter, protoMessageFilter.messageType), + converter.fromProtoPreFilter(protoMessageFilter), protoMessageFilter.toCompareSettings() ) private val metadataFilter: SailfishFilter? = protoMessageFilter.metadataFilterOrNull()?.let { @@ -58,13 +57,6 @@ class CheckRuleTask( ) } - init { - rootEvent - .name("Check rule") - .bodyData(createMessageBean("Check rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)"} }")) - .type("Check rule") - } - override fun onStart() { super.onStart() @@ -95,4 +87,12 @@ class CheckRuleTask( .type("Check failed") .status(FAILED) } + + override fun name(): String = "Check rule" + + override fun type(): String = "Check rule" + + override fun setup(rootEvent: Event) { + rootEvent.bodyData(EventUtils.createMessageBean("Check rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)"} }")) + } } diff --git a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt index e320bae4..f7495a08 100644 --- a/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt +++ b/src/main/kotlin/com/exactpro/th2/check1/rule/sequence/SequenceCheckRuleTask.kt @@ -15,7 +15,6 @@ */ package com.exactpro.th2.check1.rule.sequence -import com.exactpro.sf.common.messages.IMessage import com.exactpro.th2.check1.SessionKey import com.exactpro.th2.check1.StreamContainer import com.exactpro.th2.check1.event.CheckSequenceUtils @@ -40,7 +39,6 @@ import com.exactpro.th2.common.grpc.MessageID import com.exactpro.th2.common.grpc.RootMessageFilter import com.exactpro.th2.common.message.toReadableBodyCollection import com.exactpro.th2.common.schema.message.MessageRouter -import com.exactpro.th2.sailfish.utils.ProtoToIMessageConverter import com.google.protobuf.TextFormat.shortDebugString import io.reactivex.Observable import java.time.Instant @@ -96,13 +94,6 @@ class SequenceCheckRuleTask( private var reordered: Boolean = false private lateinit var matchedByKeys: MutableSet - init { - rootEvent - .name("Check sequence rule") - .bodyData(createMessageBean("Check sequence rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)"} }")) - .type("checkSequenceRule") - } - override fun onStart() { super.onStart() @@ -113,7 +104,7 @@ class SequenceCheckRuleTask( messageFilters = protoMessageFilters.map { MessageFilterContainer( it, - SailfishFilter(converter.fromProtoFilter(it.messageFilter, it.messageType), it.toCompareSettings()), + SailfishFilter(converter.fromProtoPreFilter(it), it.toCompareSettings()), it.metadataFilterOrNull()?.let { metadataFilter -> SailfishFilter(converter.fromMetadataFilter(metadataFilter, VerificationUtil.METADATA_MESSAGE_NAME), metadataFilter.toComparisonSettings()) @@ -191,6 +182,14 @@ class SequenceCheckRuleTask( fillCheckMessagesEvent() } + override fun name(): String = "Check sequence rule" + + override fun type(): String = "checkSequenceRule" + + override fun setup(rootEvent: Event) { + rootEvent.bodyData(createMessageBean("Check sequence rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)"} }")) + } + /** * Creates events for check messages */ @@ -240,9 +239,6 @@ class SequenceCheckRuleTask( .bodyData(sequenceTable.build()) } - private fun ProtoToIMessageConverter.fromProtoPreFilter(protoPreMessageFilter: RootMessageFilter): IMessage = - fromProtoFilter(protoPreMessageFilter.messageFilter, PRE_FILTER_MESSAGE_NAME) - private fun PreFilter.toRootMessageFilter() = RootMessageFilter.newBuilder() .setMessageType(PRE_FILTER_MESSAGE_NAME) .setMessageFilter(toMessageFilter()) diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTaskTest.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTaskTest.kt index e46cf455..e1d9cda0 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTaskTest.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTaskTest.kt @@ -14,16 +14,26 @@ package com.exactpro.th2.check1.rule import com.exactpro.th2.check1.SessionKey import com.exactpro.th2.check1.StreamContainer +import com.exactpro.th2.common.event.IBodyData +import com.exactpro.th2.common.event.bean.Verification +import com.exactpro.th2.common.event.bean.VerificationEntry import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.Direction.FIRST +import com.exactpro.th2.common.grpc.Event import com.exactpro.th2.common.grpc.EventBatch import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.schema.message.MessageRouter +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.readValue import com.nhaarman.mockitokotlin2.argumentCaptor import com.nhaarman.mockitokotlin2.spy import com.nhaarman.mockitokotlin2.timeout import com.nhaarman.mockitokotlin2.verify import io.reactivex.Observable +import kotlin.test.assertEquals +import kotlin.test.assertNotNull abstract class AbstractCheckTaskTest { protected val clientStub: MessageRouter = spy { } @@ -51,6 +61,56 @@ abstract class AbstractCheckTaskTest { } } + protected fun extractEventBody(verificationEvent: Event): List { + return jacksonObjectMapper() + .addMixIn(IBodyData::class.java, IBodyDataMixIn::class.java) + .readValue(verificationEvent.body.toByteArray()) + } + + protected fun assertVerification(verificationEvent: Event): Verification { + val verifications = extractEventBody(verificationEvent) + val verification = verifications.filterIsInstance().firstOrNull() + assertNotNull(verification) { "Verification event does not contain the verification" } + return verification + } + + protected fun assertVerificationEntries( + expectedVerificationEntries: Map, + actualVerificationEntries: Map?, + asserts: (VerificationEntry, VerificationEntry) -> Unit) { + assertNotNull(actualVerificationEntries) { "Actual verification entry is null" } + expectedVerificationEntries.forEach { (expectedFieldName, expectedVerificationEntry) -> + val actualVerificationEntry = actualVerificationEntries[expectedFieldName] + assertNotNull(actualVerificationEntry) { + "Actual verification entry does not contains field '${expectedFieldName}'" + } + asserts(expectedVerificationEntry, actualVerificationEntry) + if (expectedVerificationEntry.fields == null) { + return@forEach + } + assertVerificationEntries(expectedVerificationEntry.fields, actualVerificationEntry.fields, asserts) + } + } + + protected fun assertVerifications( + expectedVerification: Verification, + actualVerification: Verification, + asserts: (VerificationEntry, VerificationEntry) -> Unit) = assertVerificationEntries(expectedVerification.fields, actualVerification.fields, asserts) + + protected fun assertVerificationByStatus(verification: Verification, expectedVerificationEntries: Map) { + assertVerificationEntries(expectedVerificationEntries, verification.fields) { expected, actual -> + assertEquals(expected.status, actual.status) + } + } + + + @JsonSubTypes(value = [ + JsonSubTypes.Type(value = Verification::class, name = Verification.TYPE), + JsonSubTypes.Type(value = com.exactpro.th2.common.event.bean.Message::class, name = com.exactpro.th2.common.event.bean.Message.TYPE) + ]) + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", include = JsonTypeInfo.As.PROPERTY, visible = true) + private interface IBodyDataMixIn + companion object { const val MESSAGE_TYPE = "TestMsg" const val SESSION_ALIAS = "test_session" diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/RuleFactoryTest.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/RuleFactoryTest.kt new file mode 100644 index 00000000..692c1026 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/RuleFactoryTest.kt @@ -0,0 +1,82 @@ +/* + * Copyright 2021-2021 Exactpro (Exactpro Systems Limited) + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.check1.rule + +import com.exactpro.th2.check1.SessionKey +import com.exactpro.th2.check1.StreamContainer +import com.exactpro.th2.check1.exception.RuleCreationException +import com.exactpro.th2.check1.grpc.CheckRuleRequest +import com.exactpro.th2.common.event.EventUtils +import com.exactpro.th2.common.grpc.Checkpoint +import com.exactpro.th2.common.grpc.Direction +import com.exactpro.th2.common.grpc.EventBatch +import com.exactpro.th2.common.grpc.EventID +import com.exactpro.th2.common.grpc.Message +import com.exactpro.th2.common.grpc.MessageMetadata +import com.exactpro.th2.common.message.message +import com.exactpro.th2.common.schema.message.MessageRouter +import com.nhaarman.mockitokotlin2.argumentCaptor +import com.nhaarman.mockitokotlin2.spy +import com.nhaarman.mockitokotlin2.timeout +import com.nhaarman.mockitokotlin2.verify +import io.reactivex.Observable +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import kotlin.test.assertEquals + +class RuleFactoryTest { + private val clientStub: MessageRouter = spy { } + + @Test + fun `failed rule creation because one of required fields is empty`() { + val streams = createStreams(AbstractCheckTaskTest.SESSION_ALIAS, Direction.FIRST, listOf( + message(AbstractCheckTaskTest.MESSAGE_TYPE, Direction.FIRST, AbstractCheckTaskTest.SESSION_ALIAS) + .mergeMetadata(MessageMetadata.newBuilder() + .putProperties("keyProp", "42") + .putProperties("notKeyProp", "2") + .build()) + .build() + )) + + val ruleFactory = RuleFactory(1024 * 1024, streams, clientStub) + + val request = CheckRuleRequest.newBuilder() + .setParentEventId(EventID.newBuilder().setId("root").build()) + .setCheckpoint(Checkpoint.newBuilder().setId(EventUtils.generateUUID()).build()).build() + + assertThrows { + ruleFactory.createCheckRule(request) + } + + val eventBatches = awaitEventBatchRequest(1000L, 1) + val eventList = eventBatches.flatMap(EventBatch::getEventsList) + assertAll({ + assertEquals(2, eventList.size) + assertEquals(1, eventList.filter { it.type == "ruleCreationException" }.size) + }) + } + + private fun awaitEventBatchRequest(timeoutValue: Long = 1000L, times: Int): List { + val argumentCaptor = argumentCaptor() + verify(clientStub, timeout(timeoutValue).times(times)).send(argumentCaptor.capture()) + return argumentCaptor.allValues + } + + private fun createStreams(alias: String = AbstractCheckTaskTest.SESSION_ALIAS, direction: Direction = Direction.FIRST, messages: List): Observable { + return Observable.just( + StreamContainer(SessionKey(alias, direction), messages.size + 1, Observable.fromIterable(messages)) + ) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt index a3f15ce2..7d9abb79 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/check/TestCheckRuleTask.kt @@ -16,35 +16,53 @@ package com.exactpro.th2.check1.rule.check import com.exactpro.th2.check1.SessionKey import com.exactpro.th2.check1.StreamContainer import com.exactpro.th2.check1.rule.AbstractCheckTaskTest +import com.exactpro.th2.check1.util.createVerificationEntry import com.exactpro.th2.check1.util.toSimpleFilter +import com.exactpro.th2.common.event.bean.VerificationStatus import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.EventBatch import com.exactpro.th2.common.grpc.EventID import com.exactpro.th2.common.grpc.EventStatus import com.exactpro.th2.common.grpc.EventStatus.SUCCESS import com.exactpro.th2.common.grpc.FilterOperation +import com.exactpro.th2.common.grpc.ListValueFilter import com.exactpro.th2.common.grpc.MessageMetadata import com.exactpro.th2.common.grpc.MetadataFilter +import com.exactpro.th2.common.grpc.RootComparisonSettings import com.exactpro.th2.common.grpc.RootMessageFilter +import com.exactpro.th2.common.grpc.ValueFilter import com.exactpro.th2.common.message.message +import com.exactpro.th2.common.message.messageFilter +import com.exactpro.th2.common.message.rootMessageFilter +import com.exactpro.th2.common.value.add +import com.exactpro.th2.common.value.listValue +import com.exactpro.th2.common.value.toValue +import com.exactpro.th2.common.value.toValueFilter +import com.google.protobuf.StringValue import io.reactivex.Observable -import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import java.lang.IllegalArgumentException import java.time.Instant import kotlin.test.assertEquals +import kotlin.test.assertNotNull internal class TestCheckRuleTask : AbstractCheckTaskTest() { private fun checkTask( messageFilter: RootMessageFilter, parentEventID: EventID, messageStream: Observable, - maxEventBatchContentSize: Int = 1024 * 1024 + maxEventBatchContentSize: Int = 1024 * 1024, + timeout: Long = 1000 ) = CheckRuleTask( SESSION_ALIAS, Instant.now(), SessionKey(SESSION_ALIAS, Direction.FIRST), - 1000, + timeout, maxEventBatchContentSize, messageFilter, parentEventID, @@ -189,4 +207,218 @@ internal class TestCheckRuleTask : AbstractCheckTaskTest() { "No failed event $eventBatch" } } + + @ParameterizedTest(name = "timeout = {0}") + @ValueSource(longs = [0, -1]) + fun `handle error if the timeout is zero or negative`(timeout: Long) { + val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( + message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .mergeMetadata(MessageMetadata.newBuilder() + .putProperties("keyProp", "42") + .putProperties("notKeyProp", "2") + .build()) + .build() + )) + + val eventID = EventID.newBuilder().setId("root").build() + val filter = RootMessageFilter.newBuilder() + .setMessageType(MESSAGE_TYPE) + .setMetadataFilter(MetadataFilter.newBuilder() + .putPropertyFilters("keyProp", "42".toSimpleFilter(FilterOperation.EQUAL, true))) + .build() + + val exception = assertThrows("Task cannot be created due to invalid timeout") { + checkTask(filter, eventID, streams, timeout = timeout) + } + assertEquals("'timeout' should be set or be greater than zero, actual: $timeout", exception.message) + } + + @Test + fun `check that the order is kept in repeating groups`() { + val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( + message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .putFields("legs", listValue() + .add(message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .putAllFields(mapOf( + "A" to "1".toValue(), + "B" to "1".toValue() + ))) + .add(message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .putAllFields(mapOf( + "A" to "2".toValue(), + "B" to "2".toValue() + ))) + .toValue()) + .build() + )) + + val messageFilterForCheckOrder: RootMessageFilter = rootMessageFilter(MESSAGE_TYPE).apply { + messageFilter = messageFilter().apply { + putFields("legs", ValueFilter.newBuilder() + .setListFilter(ListValueFilter.newBuilder().apply { + addValues(ValueFilter.newBuilder().apply { + messageFilter = messageFilter().apply { + putAllFields(mapOf( + "A" to "2".toValueFilter(), + "B" to "2".toValueFilter() + )) + }.build() + }.build()) + addValues(ValueFilter.newBuilder().apply { + messageFilter = messageFilter().apply { + putAllFields(mapOf( + "A" to "1".toValueFilter(), + "B" to "3".toValueFilter() + )) + }.build() + }.build()) + }.build()).build()) + }.build() + }.build() + + val eventID = EventID.newBuilder().setId("root").build() + + checkTask(messageFilterForCheckOrder, eventID, streams).begin() + + val eventBatches = awaitEventBatchRequest(1000L, 2) + val eventList = eventBatches.flatMap(EventBatch::getEventsList) + assertAll({ + assertEquals(3, eventList.size) + }, { + val verificationEvent = eventList.find { it.type == "Verification" } + assertNotNull(verificationEvent) { "Missed verification event" } + val verification = assertVerification(verificationEvent) + + val expectedLegs = mapOf( + "legs" to createVerificationEntry( + "0" to createVerificationEntry( + "A" to createVerificationEntry(VerificationStatus.PASSED), + "B" to createVerificationEntry(VerificationStatus.FAILED) + ), + "1" to createVerificationEntry( + "A" to createVerificationEntry(VerificationStatus.PASSED), + "B" to createVerificationEntry(VerificationStatus.PASSED) + ) + ) + ) + + assertVerificationByStatus(verification, expectedLegs) + }) + } + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `check verification description`(includeDescription: Boolean) { + val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( + message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .putFields("A", "1".toValue()) + .build() + )) + val messageFilterForCheckOrder: RootMessageFilter = RootMessageFilter.newBuilder().apply { + messageType = MESSAGE_TYPE + messageFilter = messageFilter().putFields("A", "1".toValueFilter()).build() + if (includeDescription) { + description = StringValue.of(VERIFICATION_DESCRIPTION) + } + }.build() + val eventID = EventID.newBuilder().setId("root").build() + + checkTask(messageFilterForCheckOrder, eventID, streams).begin() + + val eventBatches = awaitEventBatchRequest(1000L, 2) + val eventList = eventBatches.flatMap(EventBatch::getEventsList) + assertAll({ + assertEquals(3, eventList.size) + }, { + val verificationEvent = eventList.find { it.type == "Verification" } + assertNotNull(verificationEvent) { "Missed verification event" } + assertEquals(includeDescription, verificationEvent.name.contains(VERIFICATION_DESCRIPTION)) + }) + } + + @Test + fun `verify repeating groups according to defined filters`() { + val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( + message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .putFields("legs", listValue() + .add(message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .putAllFields(mapOf( + "A" to "1".toValue(), + "B" to "2".toValue() + ))) + .add(message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .putAllFields(mapOf( + "C" to "3".toValue(), + "D" to "4".toValue() + ))) + .toValue()) + .build() + )) + + val messageFilterForCheckOrder: RootMessageFilter = rootMessageFilter(MESSAGE_TYPE).apply { + comparisonSettings = RootComparisonSettings.newBuilder().apply { + checkRepeatingGroupOrder = true + }.build() + messageFilter = messageFilter().apply { + putFields("legs", ValueFilter.newBuilder() + .setListFilter(ListValueFilter.newBuilder().apply { + addValues(ValueFilter.newBuilder().apply { + messageFilter = messageFilter().apply { + putAllFields(mapOf( + "C" to "3".toValueFilter(), + "D" to "4".toValueFilter() + )) + }.build() + }.build()) + addValues(ValueFilter.newBuilder().apply { + messageFilter = messageFilter().apply { + putAllFields(mapOf( + "A" to "1".toValueFilter(), + "B" to "2".toValueFilter() + )) + }.build() + }.build()) + }.build()).build()) + }.build() + }.build() + + val eventID = EventID.newBuilder().setId("root").build() + + checkTask(messageFilterForCheckOrder, eventID, streams).begin() + + val eventBatches = awaitEventBatchRequest(1000L, 2) + val eventList = eventBatches.flatMap(EventBatch::getEventsList) + assertAll({ + assertEquals(3, eventList.size) + }, { + val verificationEvent = eventList.find { it.type == "Verification" } + assertNotNull(verificationEvent) { "Missed verification event" } + + val verification = assertVerification(verificationEvent) + + val expectedLegs = mapOf( + "legs" to createVerificationEntry( + "0" to createVerificationEntry( + "A" to createVerificationEntry(VerificationStatus.NA), + "B" to createVerificationEntry(VerificationStatus.NA), + "C" to createVerificationEntry(VerificationStatus.FAILED), + "D" to createVerificationEntry(VerificationStatus.FAILED) + ), + "1" to createVerificationEntry( + "C" to createVerificationEntry(VerificationStatus.NA), + "D" to createVerificationEntry(VerificationStatus.NA), + "A" to createVerificationEntry(VerificationStatus.FAILED), + "B" to createVerificationEntry(VerificationStatus.FAILED) + ) + ) + ) + + assertVerificationByStatus(verification, expectedLegs) + }) + } + + + companion object { + private const val VERIFICATION_DESCRIPTION = "Test verification with description" + } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt index c12d5df2..5c9af9c0 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/rule/sequence/TestSequenceCheckTask.kt @@ -14,10 +14,12 @@ package com.exactpro.th2.check1.rule.sequence import com.exactpro.th2.check1.SessionKey import com.exactpro.th2.check1.StreamContainer +import com.exactpro.th2.check1.exception.RuleInternalException import com.exactpro.th2.check1.grpc.PreFilter import com.exactpro.th2.check1.rule.AbstractCheckTaskTest import com.exactpro.th2.check1.rule.sequence.SequenceCheckRuleTask.Companion.CHECK_MESSAGES_TYPE import com.exactpro.th2.check1.rule.sequence.SequenceCheckRuleTask.Companion.CHECK_SEQUENCE_TYPE +import com.exactpro.th2.check1.util.toSimpleFilter import com.exactpro.th2.common.event.EventUtils import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.Event @@ -27,12 +29,17 @@ import com.exactpro.th2.common.grpc.EventStatus import com.exactpro.th2.common.grpc.FilterOperation import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.grpc.MessageFilter +import com.exactpro.th2.common.grpc.MessageMetadata +import com.exactpro.th2.common.grpc.MetadataFilter import com.exactpro.th2.common.grpc.RootMessageFilter import com.exactpro.th2.common.grpc.Value import com.exactpro.th2.common.grpc.ValueFilter +import com.exactpro.th2.common.message.message +import com.exactpro.th2.common.message.messageFilter import io.reactivex.Observable import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll +import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments.arguments @@ -451,6 +458,37 @@ class TestSequenceCheckTask : AbstractCheckTaskTest() { }) } + @Test + fun `failed rule creation due to invalid regex operation in the message filter`() { + val streams = createStreams(SESSION_ALIAS, Direction.FIRST, listOf( + message(MESSAGE_TYPE, Direction.FIRST, SESSION_ALIAS) + .mergeMetadata(MessageMetadata.newBuilder() + .putProperties("keyProp", "42") + .putProperties("notKeyProp", "2") + .build()) + .build() + )) + + val eventID = EventID.newBuilder().setId("root").build() + val filter = RootMessageFilter.newBuilder() + .setMessageType(MESSAGE_TYPE) + .setMetadataFilter(MetadataFilter.newBuilder() + .putPropertyFilters("keyProp", "42".toSimpleFilter(FilterOperation.EQUAL))) + .setMessageFilter(messageFilter().putFields("keyProp", ValueFilter.newBuilder().setOperation(FilterOperation.LIKE).setSimpleFilter(".[").build())) + .build() + + assertThrows { + sequenceCheckRuleTask(parentEventID = eventID, messageStream = streams, filtersParam = listOf(filter), checkOrder = false).begin() + } + + val eventBatches = awaitEventBatchRequest(1000L, 2) + val eventList = eventBatches.flatMap(EventBatch::getEventsList) + assertAll({ + assertEquals(3, eventList.size) + assertEquals(1, eventList.filter { it.type == "internalError" }.size) + }) + } + private fun assertCheckSequenceStatus(expectedStatus: EventStatus, eventsList: List) { val checkSequenceEvent = assertNotNull(eventsList.find { it.type == "checkSequence" }, "Cannot find checkSequence event") assertEquals(expectedStatus, checkSequenceEvent.status) diff --git a/src/test/kotlin/com/exactpro/th2/check1/util/Utils.kt b/src/test/kotlin/com/exactpro/th2/check1/util/Utils.kt index 1c8760ee..02bec5cc 100644 --- a/src/test/kotlin/com/exactpro/th2/check1/util/Utils.kt +++ b/src/test/kotlin/com/exactpro/th2/check1/util/Utils.kt @@ -13,6 +13,8 @@ package com.exactpro.th2.check1.util +import com.exactpro.th2.common.event.bean.VerificationEntry +import com.exactpro.th2.common.event.bean.VerificationStatus import com.exactpro.th2.common.grpc.FilterOperation import com.exactpro.th2.common.grpc.MetadataFilter @@ -20,4 +22,13 @@ fun String.toSimpleFilter(op: FilterOperation, key: Boolean = false): MetadataFi .setOperation(op) .setValue(this) .setKey(key) - .build() \ No newline at end of file + .build() + + +fun createVerificationEntry(status: VerificationStatus): VerificationEntry = VerificationEntry().apply { + this.status = status +} + +fun createVerificationEntry(vararg verificationEntries: Pair): VerificationEntry = VerificationEntry().apply { + fields = linkedMapOf(*verificationEntries) +} \ No newline at end of file