Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-4316] Add additional events to the root event with information a… #173

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ test {
}

application {
mainClassName 'com.exactpro.th2.check1.Check1Main'
mainClass.set('com.exactpro.th2.check1.Check1Main')
}

applicationName = 'service'

distTar {
archiveName "${applicationName}.tar"
archiveFileName.set("${applicationName}.tar")
}

dockerPrepare {
Expand Down
9 changes: 1 addition & 8 deletions src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ForkJoinPool
import com.exactpro.th2.common.grpc.Checkpoint as GrpcCheckpoint
import com.exactpro.th2.check1.utils.toMessageID
import com.exactpro.th2.common.message.toJson
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
Expand Down Expand Up @@ -325,14 +326,6 @@ class CollectorService(
return checkNotNull(messageRouter.subscribeAll(listener)) { "Can not subscribe to queues" }
}

private fun SessionKey.toMessageID(sequence: Long) = MessageID.newBuilder()
.setConnectionId(ConnectionID.newBuilder()
.setSessionAlias(sessionAlias)
.build())
.setSequence(sequence)
.setDirection(direction)
.build()

private fun publishCheckpoint(request: CheckpointRequestOrBuilder, checkpoint: Checkpoint, event: Event) {
if (!request.hasParentEventId()) {
if (logger.isWarnEnabled) {
Expand Down
89 changes: 83 additions & 6 deletions src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import com.exactpro.th2.check1.utils.toMessageID
import com.exactpro.th2.common.event.EventUtils.createMessageBean
import com.exactpro.th2.common.util.toInstant

/**
* Implements common logic for check task.
Expand Down Expand Up @@ -152,7 +155,7 @@ abstract class AbstractCheckTask(
@Volatile
protected var started = false

protected fun createRootEvent() = Event.from(submitTime).description(description)
protected fun createRootEvent(): Event = Event.from(submitTime).description(description)

final override fun onStart() {
super.onStart()
Expand All @@ -170,7 +173,7 @@ abstract class AbstractCheckTask(
super.onError(e)

refs.rootEvent.status(FAILED)
.bodyData(EventUtils.createMessageBean(e.message))
.exception(e, true)
end(State.ERROR, "Error ${e.message} received in message stream")
}

Expand Down Expand Up @@ -311,6 +314,7 @@ abstract class AbstractCheckTask(
this.checkpointTimeout = calculateCheckpointTimeout(checkpointTimestamp, taskTimeout.messageTimeout)
this.isDefaultSequence = sequence == DEFAULT_SEQUENCE
val scheduler = Schedulers.from(executorService)
addStartInfo(refs.rootEvent.addSubEventWithSamePeriod(), sequence, checkpointTimestamp)

endFuture = Single.timer(taskTimeout.timeout, MILLISECONDS, Schedulers.computation())
.subscribe { _ -> end(State.TIMEOUT, "Timeout is exited") }
Expand Down Expand Up @@ -352,6 +356,30 @@ abstract class AbstractCheckTask(
}
}

private fun addStartInfo(event: Event, lastSequence: Long, checkpointTimestamp: Timestamp?) {
with(event) {
name(
if (lastSequence == DEFAULT_SEQUENCE) {
"Rule works from the beginning of the cache"
} else {
"Rule works from the $lastSequence sequence in session ${sessionKey.sessionAlias} and direction ${sessionKey.direction}"
}
)
status(PASSED)
type("ruleStartPoint")
if (lastSequence != DEFAULT_SEQUENCE) {
messageID(sessionKey.toMessageID(lastSequence))
}
bodyData(createMessageBean("The rule starts working from " +
(if (lastSequence == DEFAULT_SEQUENCE) "start of cache" else "sequence $lastSequence") +
(checkpointTimestamp?.let {
val instant = checkpointTimestamp.toInstant()
" and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}"
} ?: "")))
bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls"))
}
Comment on lines +361 to +380

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
name(
if (lastSequence == DEFAULT_SEQUENCE) {
"Rule works from the beginning of the cache"
} else {
"Rule works from the $lastSequence sequence in session ${sessionKey.sessionAlias} and direction ${sessionKey.direction}"
}
)
status(PASSED)
type("ruleStartPoint")
if (lastSequence != DEFAULT_SEQUENCE) {
messageID(sessionKey.toMessageID(lastSequence))
}
bodyData(createMessageBean("The rule starts working from " +
(if (lastSequence == DEFAULT_SEQUENCE) "start of cache" else "sequence $lastSequence") +
(checkpointTimestamp?.let {
val instant = checkpointTimestamp.toInstant()
" and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}"
} ?: "")))
bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls"))
}
val endOfMessage = checkpointTimestamp?.let {
val instant = checkpointTimestamp.toInstant()
" and expects messages between $instant and ${instant.plusMillis(taskTimeout.messageTimeout)}"
} ?: ""
if (lastSequence == DEFAULT_SEQUENCE) {
name("Rule works from the beginning of the cache")
bodyData(createMessageBean("The rule starts working from start of cache $endOfMessage")
} else {
name("Rule works from the $lastSequence sequence in session ${sessionKey.sessionAlias} and direction ${sessionKey.direction}")
messageID(sessionKey.toMessageID(lastSequence))
bodyData(createMessageBean("The rule starts working from sequence $lastSequence $endOfMessage")
}
status(PASSED)
type("ruleStartPoint")
bodyData(createMessageBean("Rule timeout is set to ${taskTimeout.timeout} mls"))
}

}

private fun taskFinished() {
var ruleEventStatus = EventStatus.FAILED
try {
Expand All @@ -371,8 +399,8 @@ abstract class AbstractCheckTask(
.name("Check rule $description problem")
.type("Exception")
.status(FAILED)
.bodyData(EventUtils.createMessageBean(message))
.bodyData(EventUtils.createMessageBean(ex.message))
.bodyData(createMessageBean(message))
.bodyData(createMessageBean(ex.message))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.bodyData(createMessageBean(ex.message))
.exception(ex, true)

.toProto(parentEventID))
.build())
} finally {
Expand Down Expand Up @@ -424,6 +452,9 @@ abstract class AbstractCheckTask(

protected open val skipPublication: Boolean = false

protected open val errorEventOnTimeout: Boolean
get() = true

protected fun isCheckpointLastReceivedMessage(): Boolean = bufferContainsStartMessage && !hasMessagesInTimeoutInterval

/**
Expand Down Expand Up @@ -466,6 +497,9 @@ abstract class AbstractCheckTask(
private fun completeEventOrReportError(prevState: State): Boolean {
return try {
if (started) {
if (errorEventOnTimeout && prevState in TIMEOUT_STATES) {
addTimeoutEvent(prevState)
}
completeEvent(prevState)
doAfterCompleteEvent()
false
Expand All @@ -482,13 +516,55 @@ abstract class AbstractCheckTask(
refs.rootEvent.addSubEventWithSamePeriod()
.name("Check result event cannot build completely")
.type("eventNotComplete")
.bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build"))
.bodyData(EventUtils.createMessageBean(e.message))
.bodyData(createMessageBean("An unexpected exception has been thrown during result check build"))
.bodyData(createMessageBean(e.message))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.bodyData(createMessageBean(e.message))
.exception(e, true)

.status(FAILED)
true
}
}

private fun addTimeoutEvent(timeoutType: State) {
val timeoutValue: Long = when (timeoutType) {
State.TIMEOUT -> taskTimeout.timeout
State.MESSAGE_TIMEOUT -> taskTimeout.messageTimeout
else -> error("unexpected timeout state: $timeoutType")
}
refs.rootEvent.addSubEventWithSamePeriod()
.status(FAILED)
.type(
when (timeoutType) {
State.TIMEOUT -> "CheckTimeoutInterrupted"
State.MESSAGE_TIMEOUT -> "CheckMessageTimeoutInterrupted"
else -> error("unexpected timeout state: $timeoutType")
}
).name("Rule processed $handledMessageCounter message(s) and was interrupted due to $timeoutValue mls ${timeoutType.name.lowercase()}")
.bodyData(
createMessageBean(
when (timeoutType) {
State.TIMEOUT -> timeoutText()
State.MESSAGE_TIMEOUT -> messageTimeoutText()
else -> error("unexpected timeout state: $timeoutType")
}
)
)
}

private fun messageTimeoutText(): String = "Check task was interrupted because the timestamp on the last processed message exceeds the message timeout. " +
(checkpointTimeout
?.toInstant()
?.let {
"Rule expects messages between $it and ${it.plusMillis(taskTimeout.messageTimeout)} " +
"but processed one outside this range. Check the messages attached to the root rule event to find all processed messages."
} ?: "But the message timeout is not specified. Contact the developers.")

private fun timeoutText(): String =
"""
|Check task was interrupted because the task execution took longer than ${taskTimeout.timeout} mls. The possible reasons are:
|* incorrect message filter - rule didn't find a match for all requested messages and kept working until the timeout exceeded (check key fields)
|* incorrect point of start - some of the expected messages were behind the start point and rule couldn't find them (check the checkpoint)
|* lack of the resources - rule might perform slow and didn't get to the expected messages in specified timeout (check component resources)
""".trimMargin()

private fun configureRootEvent() {
refs.rootEvent.name(name()).type(type())
setup(refs.rootEvent)
Expand Down Expand Up @@ -585,6 +661,7 @@ abstract class AbstractCheckTask(
private val RESPONSE_EXECUTOR = ForkJoinPool.commonPool()
@JvmField
val CONVERTER = ProtoToIMessageConverter(VerificationUtil.FACTORY_PROXY, null, null, createParameters().setUseMarkerForNullsInMessage(true))
private val TIMEOUT_STATES: Set<State> = setOf(State.TIMEOUT, State.MESSAGE_TIMEOUT)
val EMPTY_STATUS_CONSUMER: (EventStatus) -> Unit = {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class NoMessageCheckTask(

override fun type(): String = "noMessageCheck"

override val errorEventOnTimeout: Boolean
get() = false

override fun setup(rootEvent: Event) {
rootEvent.bodyData(EventUtils.createMessageBean("No message check rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)" }}"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class SilenceCheckTask(
}
}

override val errorEventOnTimeout: Boolean
get() = false

override fun name(): String = "AutoSilenceCheck"

override fun type(): String = "AutoSilenceCheck"
Expand Down
26 changes: 26 additions & 0 deletions src/main/kotlin/com/exactpro/th2/check1/utils/SessionUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.exactpro.th2.check1.utils

import com.exactpro.th2.check1.SessionKey
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.MessageID

fun SessionKey.toMessageID(sequence: Long): MessageID = MessageID.newBuilder()
.setConnectionId(
ConnectionID.newBuilder()
.setSessionAlias(sessionAlias)
.build())
.setSequence(sequence)
.setDirection(direction)
.build()
26 changes: 13 additions & 13 deletions src/test/kotlin/com/exactpro/th2/check1/rule/TestChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class TestChain: AbstractCheckTaskTest() {

val task = sequenceCheckRuleTask(listOf(3, 4), eventID, streams).also { it.begin() }
var eventList = awaitEventBatchAndGetEvents(6, 6)
assertEquals(8, eventList.size)
assertEquals(4, eventList.filter { it.status == SUCCESS }.size)
assertEquals(9, eventList.size)
assertEquals(5, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.filter { it.status == FAILED }.size)

sequenceCheckRuleTask(listOf(1, 2), eventID, streams).also { task.subscribeNextTask(it) }
Expand All @@ -110,8 +110,8 @@ class TestChain: AbstractCheckTaskTest() {

val task = sequenceCheckRuleTask(listOf(1, 4), eventID, streams).also { it.begin() }
var eventList = awaitEventBatchAndGetEvents(6, 6)
assertEquals(9, eventList.size)
assertEquals(5, eventList.filter { it.status == SUCCESS }.size)
assertEquals(10, eventList.size)
assertEquals(6, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.filter { it.status == FAILED }.size)

sequenceCheckRuleTask(listOf(2, 3), eventID, streams).also { task.subscribeNextTask(it) }
Expand All @@ -134,8 +134,8 @@ class TestChain: AbstractCheckTaskTest() {
task.begin()

val eventList = awaitEventBatchRequest(1000L, 4 * 2).flatMap(EventBatch::getEventsList)
assertEquals(4 * 3, eventList.size)
assertEquals(4 * 3, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4 * 4, eventList.size)
assertEquals(4 * 4, eventList.filter { it.status == SUCCESS }.size)
assertEquals(listOf(1L, 2L, 3L, 4L), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence))
}

Expand Down Expand Up @@ -240,7 +240,7 @@ class TestChain: AbstractCheckTaskTest() {
val rootEvent = eventsList.first()
assertEquals(FAILED, rootEvent.status)
assertEquals(3, rootEvent.attachedMessageIdsCount)
assertEquals(1, eventsList[2].attachedMessageIdsCount)
assertEquals(1, eventsList.single { it.type == "Verification" }.attachedMessageIdsCount)
assertEquals(FAILED, eventsList.last().status)
})
}
Expand All @@ -250,20 +250,20 @@ class TestChain: AbstractCheckTaskTest() {
awaitEventBatchRequest(1000L, times).drop(times - last).flatMap(EventBatch::getEventsList)

private fun checkSimpleVerifySuccess(eventList: List<Event>, sequence: Long) {
assertEquals(3, eventList.size)
assertEquals(3, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.size)
assertEquals(4, eventList.filter { it.status == SUCCESS }.size)
assertEquals(listOf(sequence), eventList.filter { it.type == VERIFICATION_TYPE }.flatMap(Event::getAttachedMessageIdsList).map(MessageID::getSequence))
}

private fun checkSimpleVerifyFailure(eventList: List<Event>) {
assertEquals(3, eventList.size)
assertEquals(1, eventList.filter { it.status == SUCCESS }.size)
assertEquals(4, eventList.size)
assertEquals(2, eventList.filter { it.status == SUCCESS }.size)
assertEquals(2, eventList.filter { it.status == FAILED }.size)
}

private fun checkSequenceVerifySuccess(eventList: List<Event>, sequences: List<Long>) {
assertEquals(8, eventList.size)
assertEquals(8, eventList.filter { it.status == SUCCESS }.size)
assertEquals(9, eventList.size)
assertEquals(9, eventList.filter { it.status == SUCCESS }.size)
assertEquals(sequences, eventList
.dropWhile { it.type != CHECK_MESSAGES_TYPE } // Skip prefilter
.filter { it.type == VERIFICATION_TYPE }
Expand Down
Loading