Skip to content

Commit

Permalink
[TH2-5106] disable cradle message check by default (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita-Smirnov-Exactpro authored Oct 20, 2023
1 parent 05f029b commit 6e54dbc
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 9 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ spec:
# maxBufferDecodeQueue: 10000 # buffer size for messages that sent to decode but answers hasn't been received
# decodingTimeout: 60000 # timeout expecting answers from codec.
# batchSizeBytes: 256KB # the max size of the batch in bytes. You can use 'MB,KB' suffixes or a plain int value
# validateCradleData: false # validate data loaded from cradle. NOTE: Enabled validation affect performance
# codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false)
# responseFormats: string list # resolve data for selected formats only. (allowed values: BASE_64, PARSED)

Expand Down Expand Up @@ -225,7 +226,9 @@ spec:
## 2.4.0
+ Add `batchSizeBytes` parameter to limit batch size by size in bytes rather than count of messages.
+ Added `batchSizeBytes` parameter to limit batch size by size in bytes rather than count of messages.
+ Added `validateCradleData` parameter to enable/disable validation logic for data loaded from cradle.
Currently, managed validation logic includes check for message sequence and timestamp inside a group batch.
+ Parameters `batchSize` and `groupRequestBuffer` removed.
The maximum batch size in messages is computed based on `bufferPerQuery` or `maxBufferDecodeQueue` if previous parameter is not set.

Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/com/exactpro/th2/lwdataprovider/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class Context(
),
val cradleMsgExtractor: CradleMessageExtractor = CradleMessageExtractor(
cradleManager,
DataMeasurementHistogram.create(registry, "cradle message")
DataMeasurementHistogram.create(registry, "cradle message"),
configuration.validateCradleData
),
val generalCradleExtractor: GeneralCradleExtractor = GeneralCradleExtractor(cradleManager),
val execExecutor: Executor = Executors.newFixedThreadPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class CustomConfigurationClass(
val codecUsePinAttributes: Boolean? = null,
val listOfMessageAsSingleMessage: Boolean? = null,
val useTransportMode: Boolean? = null,
val validateCradleData: Boolean? = null,
val flushSseAfter: Int? = null,
val gzipCompressionLevel: Int? = null,
@JsonDeserialize(using = ByteSizeDeserializer::class)
Expand Down Expand Up @@ -70,6 +71,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
val codecUsePinAttributes: Boolean = VariableBuilder.getVariable(customConfiguration::codecUsePinAttributes, true)
val listOfMessageAsSingleMessage: Boolean = VariableBuilder.getVariable(customConfiguration::listOfMessageAsSingleMessage, true)
val useTransportMode: Boolean = VariableBuilder.getVariable(customConfiguration::useTransportMode, false)
val validateCradleData: Boolean = VariableBuilder.getVariable(customConfiguration::validateCradleData, false)
val flushSseAfter: Int = VariableBuilder.getVariable(customConfiguration::flushSseAfter, 0)
val gzipCompressionLevel: Int = VariableBuilder.getVariable(customConfiguration::gzipCompressionLevel, -1)
val batchSizeBytes: Int = VariableBuilder.getVariable(customConfiguration::batchSizeBytes, 256 * 1024)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import kotlin.system.measureTimeMillis
class CradleMessageExtractor(
cradleManager: CradleManager,
private val dataMeasurement: DataMeasurement,
private val validateCradleData: Boolean
) {

private val storage: CradleStorage = cradleManager.storage
Expand Down Expand Up @@ -120,7 +121,7 @@ class CradleMessageExtractor(
val orderStrategy = filter.order?.toOrderStrategy() ?: OrderStrategy.DIRECT
val iterator: Iterator<StoredGroupedMessageBatch> =
measure("init_groups") { storage.getGroupedMessageBatches(filter) }
.withCheck()
.withCheck(validateCradleData)
.withMeasurements("groups", dataMeasurement)
if (!iterator.hasNext()) {
logger.info { "Empty response received from cradle" }
Expand Down Expand Up @@ -171,7 +172,7 @@ class CradleMessageExtractor(
}
tryDrain(group, buffer, sink)
} else {
orderStrategy.reorder(prev.messages).forEachIndexed { index, msg ->
orderStrategy.reorder(prev.messages).forEachIndexed { _, msg ->
if ((needFiltration && !msg.inRange()) || parameters.preFilter?.invoke(msg) == false) {
return@forEachIndexed
}
Expand Down Expand Up @@ -358,8 +359,13 @@ internal class GroupBatchCheckIterator(
}

companion object {
fun Iterator<StoredGroupedMessageBatch>.withCheck(): Iterator<StoredGroupedMessageBatch> =
fun Iterator<StoredGroupedMessageBatch>.withCheck(
validateCradleData: Boolean
): Iterator<StoredGroupedMessageBatch> = if (validateCradleData) {
GroupBatchCheckIterator(this)
} else {
this
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal class TestCradleMessageExtractor {
internal fun setUp() {
storage = mock { }
manager = mock { on { this.storage }.thenReturn(storage) }
extractor = CradleMessageExtractor(manager, DummyDataMeasurement)
extractor = CradleMessageExtractor(manager, DummyDataMeasurement, false)
clearInvocations(storage, messageRouter, manager)
}

Expand Down Expand Up @@ -305,6 +305,7 @@ internal class TestCradleMessageExtractor {
@ParameterizedTest
@EnumSource(Order::class)
fun getMessagesGroupUnorderedMessagesByTimestamp(order: Order) {
val extractorWithValidation = CradleMessageExtractor(manager, DummyDataMeasurement, true)
val correctMessages = listOf(
createCradleStoredMessage(
TEST_SESSION_ALIAS,
Expand Down Expand Up @@ -333,7 +334,7 @@ internal class TestCradleMessageExtractor {
whenever(storage.getGroupedMessageBatches(any())).thenReturn(ListCradleResult(batchesList))

val exception = assertThrowsExactly(IllegalStateException::class.java) {
extractor.getMessagesGroup(
extractorWithValidation.getMessagesGroup(
GroupedMessageFilter.builder()
.bookId(BookId("book"))
.groupName("test")
Expand All @@ -360,6 +361,7 @@ internal class TestCradleMessageExtractor {
@ParameterizedTest
@EnumSource(Order::class)
fun getMessagesGroupUnorderedMessagesBySequence(order: Order) {
val extractorWithValidation = CradleMessageExtractor(manager, DummyDataMeasurement, true)
val now = Instant.now()
val correctMessages = listOf(
createCradleStoredMessage(
Expand Down Expand Up @@ -389,7 +391,7 @@ internal class TestCradleMessageExtractor {
whenever(storage.getGroupedMessageBatches(any())).thenReturn(ListCradleResult(batchesList))

val exception = assertThrowsExactly(IllegalStateException::class.java) {
extractor.getMessagesGroup(
extractorWithValidation.getMessagesGroup(
GroupedMessageFilter.builder()
.bookId(BookId("book"))
.groupName("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ abstract class AbstractCradleIntegrationTest {
messageExtractor = CradleMessageExtractor(
cradleManager = cradleManager,
DummyDataMeasurement,
false
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ internal class TestSearchMessagesHandler {
decoder: Decoder,
useTransportMode: Boolean
) = SearchMessagesHandler(
CradleMessageExtractor(manager, DummyDataMeasurement),
CradleMessageExtractor(manager, DummyDataMeasurement, false),
decoder,
executor,
Configuration(
Expand Down

0 comments on commit 6e54dbc

Please sign in to comment.