Skip to content

Commit

Permalink
TH2-3615/3841 (#159)
Browse files Browse the repository at this point in the history
UninitializedPropertyAccessException fixed (TH2-3615)
Rules corrected to avoid holding data when the rule is finished (TH2-3841)
  • Loading branch information
lumber1000 authored Oct 6, 2022
1 parent 0b8e211 commit bb9444d
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 171 deletions.
75 changes: 53 additions & 22 deletions src/main/kotlin/com/exactpro/th2/check1/rule/AbstractCheckTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,30 @@ import java.util.concurrent.atomic.AtomicReference
*/
abstract class AbstractCheckTask(
private val ruleConfiguration: RuleConfiguration,
submitTime: Instant,
private val submitTime: Instant,
protected val sessionKey: SessionKey,
private val parentEventID: EventID,
private val messageStream: Observable<StreamContainer>,
private val eventBatchRouter: MessageRouter<EventBatch>
) : AbstractSessionObserver<MessageContainer>() {

protected open class Refs(val rootEvent: Event)

protected class RefsKeeper<T : Refs>(refs: T) {
private var refsNullable: T? = refs
val refs: T get() = refsNullable ?: error("Requesting references after references has been erased.")
fun eraseRefs() {
refsNullable = null
}
}

protected abstract val refsKeeper: RefsKeeper<out Refs>
private val refs get() = refsKeeper.refs

val description: String? = ruleConfiguration.description
private val taskTimeout: TaskTimeout = ruleConfiguration.taskTimeout

protected var handledMessageCounter: Long = 0
protected val rootEvent: Event = Event.from(submitTime).description(description)

private val sequenceSubject = SingleSubject.create<Legacy>()
private val hasNextTask = AtomicBoolean(false)
Expand Down Expand Up @@ -130,17 +142,27 @@ abstract class AbstractCheckTask(
private var bufferContainsStartMessage: Boolean = false
private var isDefaultSequence: Boolean = false

override fun onStart() {
@Volatile
protected var started = false

protected fun createRootEvent() = Event.from(submitTime).description(description)

final override fun onStart() {
super.onStart()
started = true

//Init or re-init variable in TASK_SCHEDULER thread
handledMessageCounter = 0

onStartInit()
}

protected abstract fun onStartInit()

override fun onError(e: Throwable) {
super.onError(e)

rootEvent.status(FAILED)
refs.rootEvent.status(FAILED)
.bodyData(EventUtils.createMessageBean(e.message))
end(State.ERROR, "Error ${e.message} received in message stream")
}
Expand Down Expand Up @@ -182,7 +204,7 @@ abstract class AbstractCheckTask(
}
LOGGER.info("Task {} ({}) subscribed to task {} ({})", checkTask.description, checkTask.hashCode(), description, hashCode())
} else {
throw IllegalStateException("Subscription to last sequence for task $description (${hashCode()}) is already executed, subscriber ${checkTask.description} (${checkTask.hashCode()})")
error("Subscription to last sequence for task $description (${hashCode()}) is already executed, subscriber ${checkTask.description} (${checkTask.hashCode()})")
}
}

Expand Down Expand Up @@ -272,7 +294,7 @@ abstract class AbstractCheckTask(
configureRootEvent()
isParentCompleted = previousExecutionData.completed
if (!taskState.compareAndSet(State.CREATED, State.BEGIN)) {
throw IllegalStateException("Task $description already has been started")
error("Task $description already has been started")
}
LOGGER.info("Check begin for session alias '{}' with sequence '{}' and task timeout '{}'", sessionKey, sequence, taskTimeout)
RuleMetric.incrementActiveRule(type())
Expand All @@ -292,7 +314,7 @@ abstract class AbstractCheckTask(

// All sources above will be disposed on this scheduler.
//
// This method should be called as closer as possible
// This method should be called as close as possible
// to the actual dispose that you want to execute on this scheduler
// because other operations are executed on the same single-thread scheduler.
//
Expand All @@ -304,7 +326,7 @@ abstract class AbstractCheckTask(
handledMessageCounter++

with(it.metadata.id) {
rootEvent.messageID(this)
refs.rootEvent.messageID(this)
}
}
.takeWhileMessagesInTimeout()
Expand All @@ -313,7 +335,7 @@ abstract class AbstractCheckTask(
.subscribe(this)
} catch (exception: Exception) {
LOGGER.error("An internal error occurred while executing rule", exception)
rootEvent.addSubEventWithSamePeriod()
refs.rootEvent.addSubEventWithSamePeriod()
.name("An error occurred while executing rule")
.type("internalError")
.status(FAILED)
Expand Down Expand Up @@ -347,6 +369,7 @@ abstract class AbstractCheckTask(
.build())
} finally {
RuleMetric.decrementActiveRule(type())
refsKeeper.eraseRefs()
sequenceSubject.onSuccess(Legacy(executorService, SequenceData(lastSequence, lastMessageTimestamp, !hasMessagesInTimeoutInterval)))
}
}
Expand Down Expand Up @@ -406,7 +429,7 @@ abstract class AbstractCheckTask(
LOGGER.info("Skip event publication for task ${type()} '$description' (${hashCode()})")
return
}
val batches = rootEvent.disperseToBatches(ruleConfiguration.maxEventBatchContentSize, parentEventID)
val batches = refs.rootEvent.disperseToBatches(ruleConfiguration.maxEventBatchContentSize, parentEventID)

RESPONSE_EXECUTOR.execute {
batches.forEach { batch ->
Expand All @@ -422,18 +445,27 @@ abstract class AbstractCheckTask(
}
}
} else {
LOGGER.debug("Event tree id '{}' parent id '{}' is already published", rootEvent.id, parentEventID)
LOGGER.debug("Event tree id '{}' parent id '{}' is already published", refs.rootEvent.id, parentEventID)
}
}

private fun completeEventOrReportError(prevState: State): Boolean {
return try {
completeEvent(prevState)
doAfterCompleteEvent()
false
if (started) {
completeEvent(prevState)
doAfterCompleteEvent()
false
} else {
LOGGER.error("Check task was not started.")
refs.rootEvent.addSubEventWithSamePeriod()
.name("Check failed: task timeout elapsed before the check task was started. Please, check component resources for throttling or intensive GC")
.type("taskNotStarted")
.status(FAILED)
true
}
} catch (e: Exception) {
LOGGER.error("Result event cannot be completed", e)
rootEvent.addSubEventWithSamePeriod()
refs.rootEvent.addSubEventWithSamePeriod()
.name("Check result event cannot build completely")
.type("eventNotComplete")
.bodyData(EventUtils.createMessageBean("An unexpected exception has been thrown during result check build"))
Expand All @@ -444,8 +476,8 @@ abstract class AbstractCheckTask(
}

private fun configureRootEvent() {
rootEvent.name(name()).type(type())
setup(rootEvent)
refs.rootEvent.name(name()).type(type())
setup(refs.rootEvent)
}

private fun doAfterCompleteEvent() {
Expand All @@ -461,7 +493,7 @@ abstract class AbstractCheckTask(
}

private fun fillUntrustedExecutionEvent() {
rootEvent.addSubEvent(
refs.rootEvent.addSubEvent(
Event.start()
.name("The current check is untrusted because the start point of the check interval has been selected approximately")
.status(FAILED)
Expand All @@ -470,7 +502,7 @@ abstract class AbstractCheckTask(
}

private fun fillMissedStartMessageAndMessagesInIntervalEvent() {
rootEvent.addSubEvent(
refs.rootEvent.addSubEvent(
Event.start()
.name("Check cannot be executed because buffer for session alias '${sessionKey.sessionAlias}' and direction '${sessionKey.direction}' contains neither message in the requested check interval with sequence '$lastSequence' and checkpoint timestamp '${checkpointTimeout?.toJson()}'")
.status(FAILED)
Expand All @@ -479,7 +511,7 @@ abstract class AbstractCheckTask(
}

private fun fillEmptyStartMessageEvent() {
rootEvent.addSubEvent(
refs.rootEvent.addSubEvent(
Event.start()
.name("Buffer for session alias '${sessionKey.sessionAlias}' and direction '${sessionKey.direction}' doesn't contain starting message, but contains several messages in the requested check interval")
.status(FAILED)
Expand Down Expand Up @@ -721,7 +753,6 @@ abstract class AbstractCheckTask(
null
}


private data class Legacy(val executorService: ExecutorService, val sequenceData: SequenceData)
private data class SequenceData(val lastSequence: Long, val lastMessageTimestamp: Timestamp?, val untrusted: Boolean)
private data class PreviousExecutionData(
Expand All @@ -740,4 +771,4 @@ abstract class AbstractCheckTask(
val DEFAULT = PreviousExecutionData()
}
}
}
}
55 changes: 34 additions & 21 deletions src/main/kotlin/com/exactpro/th2/check1/rule/check/CheckRuleTask.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand Down Expand Up @@ -35,53 +35,66 @@ import java.time.Instant
/**
* This rule checks for the presence of a single message in the messages stream.
*/

class CheckRuleTask(
ruleConfiguration: RuleConfiguration,
startTime: Instant,
sessionKey: SessionKey,
private val protoMessageFilter: RootMessageFilter,
protoMessageFilter: RootMessageFilter,
parentEventID: EventID,
messageStream: Observable<StreamContainer>,
eventBatchRouter: MessageRouter<EventBatch>
) : AbstractCheckTask(ruleConfiguration, startTime, sessionKey, parentEventID, messageStream, eventBatchRouter) {

private val messageFilter: SailfishFilter = SailfishFilter(
CONVERTER.fromProtoPreFilter(protoMessageFilter),
protoMessageFilter.toCompareSettings()
)
private val metadataFilter: SailfishFilter? = protoMessageFilter.metadataFilterOrNull()?.let {
SailfishFilter(
CONVERTER.fromMetadataFilter(it, METADATA_MESSAGE_NAME),
it.toComparisonSettings()
protected class Refs(
rootEvent: Event,
val protoMessageFilter: RootMessageFilter,
val messageFilter: SailfishFilter,
val metadataFilter: SailfishFilter?
) : AbstractCheckTask.Refs(rootEvent)

override val refsKeeper = RefsKeeper(
Refs(
rootEvent = createRootEvent(),
protoMessageFilter = protoMessageFilter,
messageFilter = SailfishFilter(
CONVERTER.fromProtoPreFilter(protoMessageFilter),
protoMessageFilter.toCompareSettings()
),
metadataFilter = protoMessageFilter.metadataFilterOrNull()?.let {
SailfishFilter(
CONVERTER.fromMetadataFilter(it, METADATA_MESSAGE_NAME),
it.toComparisonSettings()
)
}
)
}
)

override fun onStart() {
super.onStart()
private val refs get() = refsKeeper.refs

override fun onStartInit() {
val subEvent = Event.start()
.endTimestamp()
.name("Message filter")
.type("Filter")
.bodyData(protoMessageFilter.toReadableBodyCollection())

rootEvent.addSubEvent(subEvent)
.bodyData(refs.protoMessageFilter.toReadableBodyCollection())

refs.rootEvent.addSubEvent(subEvent)
}

override fun onNext(messageContainer: MessageContainer) {
val aggregatedResult = matchFilter(messageContainer, messageFilter, metadataFilter)
val aggregatedResult = matchFilter(messageContainer, refs.messageFilter, refs.metadataFilter)

val container = ComparisonContainer(messageContainer, protoMessageFilter, aggregatedResult)
val container = ComparisonContainer(messageContainer, refs.protoMessageFilter, aggregatedResult)

if (container.matchesByKeys) {
rootEvent.appendEventsWithVerification(container)
refs.rootEvent.appendEventsWithVerification(container)
checkComplete()
}
}

override fun onTimeout() {
rootEvent.addSubEventWithSamePeriod()
refs.rootEvent.addSubEventWithSamePeriod()
.name("No message found by target keys")
.type("Check failed")
.status(FAILED)
Expand All @@ -94,4 +107,4 @@ class CheckRuleTask(
override fun setup(rootEvent: Event) {
rootEvent.bodyData(EventUtils.createMessageBean("Check rule for messages from ${sessionKey.run { "$sessionAlias ($direction direction)"} }"))
}
}
}
Loading

0 comments on commit bb9444d

Please sign in to comment.