From 1429f91a3b0da82f755493ff6d50d40fbd7f96ff Mon Sep 17 00:00:00 2001 From: Nikita Smirnov <46124551+Nikita-Smirnov-Exactpro@users.noreply.github.com> Date: Fri, 20 Sep 2024 19:15:49 +0400 Subject: [PATCH] [TH2-5239] Reduced required memory for executing sse event request with `limitForParent` parameter (#370) --- README.md | 5 +- build.gradle | 4 +- gradle.properties | 18 +- .../handlers/IParentEventCounter.kt | 73 +++++ .../handlers/SearchEventsHandler.kt | 42 +-- .../handlers/IParentEventCounterTest.kt | 267 ++++++++++++++++++ .../handlers/events/TestEventPipeline.kt | 2 + .../handlers/events/TimestampGeneratorTest.kt | 5 +- .../kotlin/handlers/messages/ExtractorTest.kt | 1 + .../kotlin/handlers/messages/MergerTest.kt | 8 +- 10 files changed, 360 insertions(+), 65 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt create mode 100644 src/test/kotlin/handlers/IParentEventCounterTest.kt diff --git a/README.md b/README.md index ca9d8920..28a76d83 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Report data provider (5.13.1) +# Report data provider (5.13.2) # Overview This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources. @@ -297,6 +297,9 @@ spec: # Release notes +## 5.13.2 +* Reduced required memory for executing sse event request with `limitForParent` parameter + ## 5.13.1 * Fixed the problem data provider can't handle `messageIds` request with `messageId` but without `startTimestamp` arguments diff --git a/build.gradle b/build.gradle index 16e49638..049865c2 100644 --- a/build.gradle +++ b/build.gradle @@ -93,9 +93,7 @@ application { test { -// FIXME: the tests were temporary disabled since they're not compatible with new api - -// useJUnitPlatform() + useJUnitPlatform() } dependencyCheck { diff --git a/gradle.properties b/gradle.properties index 46c80001..cff52657 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,19 +1,3 @@ -################################################################################ -# Copyright 2009-2024 Exactpro (Exactpro Systems Limited) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - kotlin.code.style=official -release_version=5.13.1 +release_version=5.13.2 docker_image_name= \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt new file mode 100644 index 00000000..34f32461 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/IParentEventCounter.kt @@ -0,0 +1,73 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.rptdataprovider.handlers + +import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +internal interface IParentEventCounter { + /** + * This method use parent event id or event id to limit number of child events. + * WARNING: event id isn't grantee event unique then this method can't be used for strict limitation. + * @return false if limit exceeded otherwise true + */ + fun checkCountAndGet(event: BaseEventEntity): Boolean + + private object NoLimitedParentEventCounter : IParentEventCounter { + override fun checkCountAndGet(event: BaseEventEntity): Boolean = true + } + + private class LimitedParentEventCounter( + private val limitForParent: Long + ) : IParentEventCounter { + private val parentEventCounter = ConcurrentHashMap() + + override fun checkCountAndGet(event: BaseEventEntity): Boolean { + if (event.parentEventId == null) { + return true + } + + val value = parentEventCounter.compute(event.parentEventId.eventId.id) { _, value -> + if (value == null) { + AtomicLong(1) + } else { + if (value === MAX_EVENT_COUNTER) { + parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) + MAX_EVENT_COUNTER + } else { + if (value.incrementAndGet() > limitForParent) { + parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER) + MAX_EVENT_COUNTER + } else { + value + } + } + } + } + + return value !== MAX_EVENT_COUNTER + } + } + + companion object { + private val MAX_EVENT_COUNTER = AtomicLong(Long.MAX_VALUE) + + fun create(limitForParent: Long? = null): IParentEventCounter = + limitForParent?.let { LimitedParentEventCounter(it) } ?: NoLimitedParentEventCounter + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt index 2a4b0e44..ba2a86b0 100644 --- a/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/rptdataprovider/handlers/SearchEventsHandler.kt @@ -36,6 +36,7 @@ import com.exactpro.th2.rptdataprovider.minInstant import com.exactpro.th2.rptdataprovider.producers.EventProducer import com.exactpro.th2.rptdataprovider.services.cradle.CradleService import com.exactpro.th2.rptdataprovider.tryToGetTestEvents +import io.github.oshai.kotlinlogging.KotlinLogging import io.prometheus.client.Counter import kotlinx.coroutines.Deferred import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -61,11 +62,9 @@ import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import io.github.oshai.kotlinlogging.KotlinLogging import java.time.Instant import java.time.LocalTime import java.time.ZoneOffset -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext @@ -85,34 +84,6 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { private val eventSearchChunkSize: Int = context.configuration.eventSearchChunkSize.value.toInt() private val keepAliveTimeout: Long = context.configuration.keepAliveTimeout.value.toLong() - - private data class ParentEventCounter private constructor( - private val parentEventCounter: ConcurrentHashMap?, - val limitForParent: Long? - ) { - - constructor(limitForParent: Long?) : this( - parentEventCounter = limitForParent?.let { ConcurrentHashMap() }, - limitForParent = limitForParent - ) - - fun checkCountAndGet(event: BaseEventEntity): BaseEventEntity? { - if (limitForParent == null || event.parentEventId == null) - return event - - return parentEventCounter!!.getOrPut(event.parentEventId.toString(), { AtomicLong(1) }).let { parentCount -> - if (parentCount.get() <= limitForParent) { - parentCount.incrementAndGet() - event - } else { - parentEventCounter.putIfAbsent(event.id.toString(), AtomicLong(Long.MAX_VALUE)) - null - } - } - } - } - - private suspend fun keepAlive( writer: StreamWriter<*, *>, lastScannedObjectInfo: LastScannedObjectInfo, @@ -323,7 +294,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { requireNotNull(resumeTimestamp) { "timestamp for $resumeProviderId cannot be extracted" } } val timeIntervals = getTimeIntervals(request, sseEventSearchStep, startTimestamp) - val parentEventCounter = ParentEventCounter(request.limitForParent) + val parentEventCounter = IParentEventCounter.create(request.limitForParent) flow { for ((start, end) in timeIntervals) { @@ -359,14 +330,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) { lastScannedObject.update(event, scanCnt) processedEventCount.inc() } - .filter { request.filterPredicate.apply(it) } - .let { - if (parentEventCounter.limitForParent != null) { - it.filter { event -> parentEventCounter.checkCountAndGet(event) != null } - } else { - it - } - } + .filter { request.filterPredicate.apply(it) && parentEventCounter.checkCountAndGet(it) } .let { fl -> request.resultCountLimit?.let { fl.take(it) } ?: fl } .onStart { launch { diff --git a/src/test/kotlin/handlers/IParentEventCounterTest.kt b/src/test/kotlin/handlers/IParentEventCounterTest.kt new file mode 100644 index 00000000..8361cec0 --- /dev/null +++ b/src/test/kotlin/handlers/IParentEventCounterTest.kt @@ -0,0 +1,267 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package handlers + +import com.exactpro.cradle.BookId +import com.exactpro.cradle.testevents.StoredTestEventId +import com.exactpro.th2.rptdataprovider.entities.internal.ProviderEventId +import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity +import com.exactpro.th2.rptdataprovider.handlers.IParentEventCounter +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import java.time.Instant +import java.util.UUID + +class IParentEventCounterTest { + + @Test + fun `no limit test`() { + val eventCounter = IParentEventCounter.create(null) + + val rootEventId = NEXT_UUID + val parentEventId = ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ) + + assertAll( + { + assertTrue( + eventCounter.checkCountAndGet(createEventEntity(ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ))), + "root event with unique id", + ) + }, + { + assertTrue( + eventCounter.checkCountAndGet(createEventEntity(ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), rootEventId), + ))), + "root event with same id", + ) + }, + { + assertTrue( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ), + parentEventId, + ) + ), + "single event id", + ) + }, + { + assertTrue( + eventCounter.checkCountAndGet(createEventEntity( + ProviderEventId( + batchId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ), + parentEventId, + )), + "batched event id", + ) + }, + ) + } + + @Test + fun `limit root event test`() { + val limitForParent = 50 + val eventCounter = IParentEventCounter.create(limitForParent.toLong()) + + val rootEventId = NEXT_UUID + + repeat(limitForParent * 2) { + assertAll( + { + assertTrue( + eventCounter.checkCountAndGet(createEventEntity(ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ))), + "root event with unique id, attempt $it", + ) + }, + { + assertTrue( + eventCounter.checkCountAndGet(createEventEntity(ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), rootEventId), + ))), + "root event with same id, attempt $it", + ) + }, + ) + } + } + + @Test + fun `singe event test`() { + val limitForParent = 50 + val eventCounter = IParentEventCounter.create(limitForParent.toLong()) + + val parentEventId = ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ) + + repeat(limitForParent) { + assertTrue( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ), + parentEventId, + ) + ), + "single event id, attempt $it", + ) + } + + val nextEventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID) + assertAll( + { + assertFalse( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = null, + eventId = nextEventId, + ), + parentEventId, + ), + ), + "single event id, attempt ${limitForParent + 1}", + ) + }, + { + assertFalse( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ), + ProviderEventId( + batchId = null, + eventId = nextEventId, + ) + ), + ), + "child of single event id, attempt ${limitForParent + 1}", + ) + }, + ) + } + + @Test + fun `batched event test`() { + val limitForParent = 50 + val eventCounter = IParentEventCounter.create(limitForParent.toLong()) + + val parentEventId = ProviderEventId( + batchId = null, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ) + val batchId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID) + + repeat(limitForParent) { + assertTrue( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = batchId, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ), + parentEventId, + ) + ), + "single event id, attempt $it", + ) + } + + val nextEventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID) + assertAll( + { + assertFalse( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = batchId, + eventId = nextEventId, + ), + parentEventId, + ), + ), + "single event id, attempt ${limitForParent + 1}", + ) + }, + { + assertFalse( + eventCounter.checkCountAndGet( + createEventEntity( + ProviderEventId( + batchId = batchId, + eventId = StoredTestEventId(BOOK_ID, SCOPE, Instant.now(), NEXT_UUID), + ), + ProviderEventId( + batchId = batchId, + eventId = nextEventId + ) + ), + ), + "child of single event id, attempt ${limitForParent + 1}", + ) + }, + ) + } + + companion object { + private val BOOK_ID = BookId("test-book") + private const val SCOPE = "test-scope" + + private val NEXT_UUID: String + get() = UUID.randomUUID().toString() + + private fun createEventEntity( + id: ProviderEventId, + parentEventId: ProviderEventId? = null, + ) = BaseEventEntity( + type = "event", + id = id, + batchId = id.batchId, + isBatched = id.batchId != null, + eventName = "test-event", + eventType = "test-type", + startTimestamp = id.eventId.startTimestamp, + endTimestamp = null, + parentEventId = parentEventId, + successful = true, + ) + } +} \ No newline at end of file diff --git a/src/test/kotlin/handlers/events/TestEventPipeline.kt b/src/test/kotlin/handlers/events/TestEventPipeline.kt index 5b779d98..3a7b7575 100644 --- a/src/test/kotlin/handlers/events/TestEventPipeline.kt +++ b/src/test/kotlin/handlers/events/TestEventPipeline.kt @@ -45,6 +45,7 @@ import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions.assertArrayEquals import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance import java.time.Instant @@ -54,6 +55,7 @@ import kotlin.math.abs @TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Disabled("update required") class TestEventPipeline { companion object { private const val STORE_ACTION_REJECTION_THRESHOLD = 30_000L diff --git a/src/test/kotlin/handlers/events/TimestampGeneratorTest.kt b/src/test/kotlin/handlers/events/TimestampGeneratorTest.kt index 1b987f30..edd84137 100644 --- a/src/test/kotlin/handlers/events/TimestampGeneratorTest.kt +++ b/src/test/kotlin/handlers/events/TimestampGeneratorTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,6 +68,9 @@ class TimestampGeneratorTest { parameters["resumeFromId"] = listOf(resumeId.toString()) } + parameters["bookId"] = listOf(bookId.toString()) + parameters["scope"] = listOf(scope) + return SseEventSearchRequest(parameters, FilterPredicate(emptyList())) .copy(searchDirection = searchDirection) .also { diff --git a/src/test/kotlin/handlers/messages/ExtractorTest.kt b/src/test/kotlin/handlers/messages/ExtractorTest.kt index 1d5ea1a9..e9d8f245 100644 --- a/src/test/kotlin/handlers/messages/ExtractorTest.kt +++ b/src/test/kotlin/handlers/messages/ExtractorTest.kt @@ -125,6 +125,7 @@ class ExtractorTest { ) } every { msg.metadata } answers { null } + every { msg.serializedSize } answers { 1 } return msg } diff --git a/src/test/kotlin/handlers/messages/MergerTest.kt b/src/test/kotlin/handlers/messages/MergerTest.kt index 10bbdff0..89c4afc5 100644 --- a/src/test/kotlin/handlers/messages/MergerTest.kt +++ b/src/test/kotlin/handlers/messages/MergerTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -201,7 +201,7 @@ class MergerTest { getMessages(startTimestamp, 2, 3), limit = 4, streamInfo = listOf( - StreamInfo(streamNameObjects[0], StoredMessageId(BOOK, baseStreamName, Direction.FIRST, TIMESTAMP_EMPTY, -1)), + StreamInfo(streamNameObjects[0], StoredMessageId(BOOK, baseStreamName, Direction.FIRST, TIMESTAMP_EMPTY, 0)), StreamInfo(streamNameObjects[1], StoredMessageId(BOOK, baseStreamName, Direction.SECOND, TIMESTAMP, 3)), ) ), @@ -211,7 +211,7 @@ class MergerTest { getMessages(startTimestamp, 2, 4), limit = 4, streamInfo = listOf( - StreamInfo(streamNameObjects[0], StoredMessageId(BOOK, baseStreamName, Direction.FIRST, TIMESTAMP_EMPTY, -1)), + StreamInfo(streamNameObjects[0], StoredMessageId(BOOK, baseStreamName, Direction.FIRST, TIMESTAMP_EMPTY, 0)), StreamInfo(streamNameObjects[1], StoredMessageId(BOOK, baseStreamName, Direction.SECOND, TIMESTAMP, 3)), ) ), @@ -221,7 +221,7 @@ class MergerTest { getMessages(startTimestamp, 2, 4), limit = 2, streamInfo = listOf( - StreamInfo(streamNameObjects[0], StoredMessageId(BOOK, baseStreamName, Direction.FIRST, TIMESTAMP_EMPTY, -1)), + StreamInfo(streamNameObjects[0], StoredMessageId(BOOK, baseStreamName, Direction.FIRST, TIMESTAMP_EMPTY, 0)), StreamInfo(streamNameObjects[1], StoredMessageId(BOOK, baseStreamName, Direction.SECOND, TIMESTAMP, 1)), ) )