Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5239] Reduced required memory for executing sse event request with limitForParent parameter #370

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Report data provider (5.13.1)
# Report data provider (5.13.2)

# Overview
This component serves as a backend for rpt-viewer. It will connect to the cassandra database via cradle api and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -297,6 +297,9 @@ spec:

# Release notes

## 5.13.2
* Reduced required memory for executing sse event request with `limitForParent` parameter

## 5.13.1
* Fixed the problem data provider can't handle `messageIds` request with `messageId` but without `startTimestamp` arguments

Expand Down
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ application {


test {
// FIXME: the tests were temporary disabled since they're not compatible with new api

// useJUnitPlatform()
useJUnitPlatform()
}

dependencyCheck {
Expand Down
18 changes: 1 addition & 17 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
################################################################################
# Copyright 2009-2024 Exactpro (Exactpro Systems Limited)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

kotlin.code.style=official
release_version=5.13.1
release_version=5.13.2
docker_image_name=
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.rptdataprovider.handlers

import com.exactpro.th2.rptdataprovider.entities.responses.BaseEventEntity
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

internal interface IParentEventCounter {
/**
* This method use parent event id or event id to limit number of child events.
* WARNING: event id isn't grantee event unique then this method can't be used for strict limitation.
* @return false if limit exceeded otherwise true
*/
fun checkCountAndGet(event: BaseEventEntity): Boolean

private object NoLimitedParentEventCounter : IParentEventCounter {
override fun checkCountAndGet(event: BaseEventEntity): Boolean = true
}

private class LimitedParentEventCounter(
private val limitForParent: Long
) : IParentEventCounter {
private val parentEventCounter = ConcurrentHashMap<String, AtomicLong>()

override fun checkCountAndGet(event: BaseEventEntity): Boolean {
if (event.parentEventId == null) {
return true
}

val value = parentEventCounter.compute(event.parentEventId.eventId.id) { _, value ->
if (value == null) {
AtomicLong(1)
} else {
if (value === MAX_EVENT_COUNTER) {
parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER)
MAX_EVENT_COUNTER
} else {
if (value.incrementAndGet() > limitForParent) {
parentEventCounter.putIfAbsent(event.id.eventId.id, MAX_EVENT_COUNTER)
MAX_EVENT_COUNTER
} else {
value
}
}
}
}

return value !== MAX_EVENT_COUNTER
}
}

companion object {
private val MAX_EVENT_COUNTER = AtomicLong(Long.MAX_VALUE)

fun create(limitForParent: Long? = null): IParentEventCounter =
limitForParent?.let { LimitedParentEventCounter(it) } ?: NoLimitedParentEventCounter
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.exactpro.th2.rptdataprovider.minInstant
import com.exactpro.th2.rptdataprovider.producers.EventProducer
import com.exactpro.th2.rptdataprovider.services.cradle.CradleService
import com.exactpro.th2.rptdataprovider.tryToGetTestEvents
import io.github.oshai.kotlinlogging.KotlinLogging
import io.prometheus.client.Counter
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand All @@ -61,11 +62,9 @@ import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.time.LocalTime
import java.time.ZoneOffset
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
Expand All @@ -85,34 +84,6 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
private val eventSearchChunkSize: Int = context.configuration.eventSearchChunkSize.value.toInt()
private val keepAliveTimeout: Long = context.configuration.keepAliveTimeout.value.toLong()


private data class ParentEventCounter private constructor(
private val parentEventCounter: ConcurrentHashMap<String, AtomicLong>?,
val limitForParent: Long?
) {

constructor(limitForParent: Long?) : this(
parentEventCounter = limitForParent?.let { ConcurrentHashMap<String, AtomicLong>() },
limitForParent = limitForParent
)

fun checkCountAndGet(event: BaseEventEntity): BaseEventEntity? {
if (limitForParent == null || event.parentEventId == null)
return event

return parentEventCounter!!.getOrPut(event.parentEventId.toString(), { AtomicLong(1) }).let { parentCount ->
if (parentCount.get() <= limitForParent) {
parentCount.incrementAndGet()
event
} else {
parentEventCounter.putIfAbsent(event.id.toString(), AtomicLong(Long.MAX_VALUE))
null
}
}
}
}


private suspend fun keepAlive(
writer: StreamWriter<*, *>,
lastScannedObjectInfo: LastScannedObjectInfo,
Expand Down Expand Up @@ -323,7 +294,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
requireNotNull(resumeTimestamp) { "timestamp for $resumeProviderId cannot be extracted" }
}
val timeIntervals = getTimeIntervals(request, sseEventSearchStep, startTimestamp)
val parentEventCounter = ParentEventCounter(request.limitForParent)
val parentEventCounter = IParentEventCounter.create(request.limitForParent)

flow {
for ((start, end) in timeIntervals) {
Expand Down Expand Up @@ -359,14 +330,7 @@ class SearchEventsHandler(context: Context<*, *, *, *>) {
lastScannedObject.update(event, scanCnt)
processedEventCount.inc()
}
.filter { request.filterPredicate.apply(it) }
.let {
if (parentEventCounter.limitForParent != null) {
it.filter { event -> parentEventCounter.checkCountAndGet(event) != null }
} else {
it
}
}
.filter { request.filterPredicate.apply(it) && parentEventCounter.checkCountAndGet(it) }
.let { fl -> request.resultCountLimit?.let { fl.take(it) } ?: fl }
.onStart {
launch {
Expand Down
Loading
Loading