From a621ce008c0372f08d63dc66a3f06cdb576a2f87 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Fri, 4 Oct 2024 19:46:46 +0400 Subject: [PATCH 1/7] [TH2-1928] Support `keepOpen` option for `searchMessageGroups` gRPC request --- README.md | 6 +- app/build.gradle.kts | 2 + app/gradle.properties | 2 +- .../entities/requests/MessagesGroupRequest.kt | 4 +- .../grpc/GrpcDataProviderImplTest.kt | 292 ++++++++++++++++++ 5 files changed, 302 insertions(+), 4 deletions(-) create mode 100644 app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt diff --git a/README.md b/README.md index c358a4fe..9cc22a03 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Lightweight data provider (2.11.0) +# Lightweight data provider (2.11.1) # Overview This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources. @@ -224,6 +224,10 @@ spec: # Release notes: +## 2.11.1 + ++ Support `keepOpen` option for `searchMessageGroups` gRPC request + ## 2.11.0 + Updated: diff --git a/app/build.gradle.kts b/app/build.gradle.kts index aab5d500..fdf70b40 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -63,6 +63,8 @@ dependencies { testImplementation("org.testcontainers:cassandra") testImplementation("com.datastax.oss:java-driver-core") + testImplementation("io.grpc:grpc-testing") + testImplementation("io.grpc:grpc-inprocess") } application { diff --git a/app/gradle.properties b/app/gradle.properties index 28a27745..b4253d88 100644 --- a/app/gradle.properties +++ b/app/gradle.properties @@ -1,4 +1,4 @@ kotlin.code.style=official -release_version=2.11.0 +release_version=2.11.1 description='th2 Lightweight data provider component' kapt.include.compile.classpath=false \ No newline at end of file diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt index 2ed537d1..8f607284 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,7 +60,7 @@ data class MessagesGroupRequest( }, if (hasStartTimestamp()) startTimestamp.toInstant() else error("missing start timestamp"), if (hasEndTimestamp()) endTimestamp.toInstant() else error("missing end timestamp"), - false, // FIXME: update gRPC + request.keepOpen, if (hasBookId()) bookId.toCradle() else error("parameter '$BOOK_ID_PARAM' is required"), request.responseFormatsList.takeIf { it.isNotEmpty() } ?.mapTo(hashSetOf(), ResponseFormat.Companion::fromString) diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt new file mode 100644 index 00000000..b2cf84e1 --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt @@ -0,0 +1,292 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.cradle.CradleManager +import com.exactpro.cradle.CradleStorage +import com.exactpro.cradle.messages.StoredMessage +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.lwdataprovider.Decoder +import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.configuration.Configuration +import com.exactpro.th2.lwdataprovider.configuration.CustomConfigurationClass +import com.exactpro.th2.lwdataprovider.db.CradleMessageExtractor +import com.exactpro.th2.lwdataprovider.db.DataMeasurement +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat +import com.exactpro.th2.lwdataprovider.handlers.GeneralCradleHandler +import com.exactpro.th2.lwdataprovider.handlers.MessageResponseHandler +import com.exactpro.th2.lwdataprovider.handlers.SearchEventsHandler +import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler +import com.exactpro.th2.lwdataprovider.util.DummyDataMeasurement +import com.exactpro.th2.lwdataprovider.util.ImmutableListCradleResult +import com.exactpro.th2.lwdataprovider.util.createBatches +import io.github.oshai.kotlinlogging.KotlinLogging +import io.grpc.BindableService +import io.grpc.ManagedChannel +import io.grpc.Server +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.mockito.kotlin.any +import org.mockito.kotlin.argThat +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.spy +import org.mockito.kotlin.whenever +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.* +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Executor +import java.util.concurrent.TimeUnit + +// FIXME: refactor and extended +class GrpcDataProviderImplTest { + private val executor: Executor = Executor { it.run() } + private val storage = mock() + private val manager = mock { + on { storage } doReturn storage + } + private val searchEventsHandler: SearchEventsHandler = mock { } + private val generalCradleHandler: GeneralCradleHandler = mock { } + private val measurement: DataMeasurement = mock { + on { start(any()) } doReturn mock { } + } + private val decoder = spy(TestDecoder()) + private val searchHandler = createSearchMessagesHandler(decoder, false) + private val configuration = Configuration(CustomConfigurationClass()) + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `stops pulling if data out of range exist`(offsetNewData: Boolean) { + val startTimestamp = Instant.now() + val firstEndTimestamp = startTimestamp.plus(10L, ChronoUnit.MINUTES) + val endTimestamp = firstEndTimestamp.plus(10L, ChronoUnit.MINUTES) + val aliasesCount = 5 + val increase = 5L + val firstBatchMessagesCount = (firstEndTimestamp.epochSecond - startTimestamp.epochSecond) / increase + val firstMessagesPerAlias = firstBatchMessagesCount / aliasesCount + + val lastBatchMessagesCount = (endTimestamp.epochSecond - firstEndTimestamp.epochSecond) / increase + val lastMessagesPerAlias = lastBatchMessagesCount / aliasesCount + + val firstBatches = createBatches( + firstMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + startTimestamp, + firstEndTimestamp, + ) + val lastBatches = createBatches( + lastMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + firstEndTimestamp, + endTimestamp, + aliasIndexOffset = if (offsetNewData) aliasesCount else 0 + ) + val outsideBatches = createBatches( + 10, + 1, + 0, + increase, + endTimestamp.plusNanos(1), + endTimestamp.plus(5, ChronoUnit.MINUTES), + ) + val group = "test" + val firstRequestMessagesCount = firstBatches.sumOf { it.messageCount } + val secondRequestMessagesCount = lastBatches.sumOf { it.messageCount } + val messagesCount = firstRequestMessagesCount + secondRequestMessagesCount + + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == startTimestamp && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(firstBatches)) + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == firstBatches.maxOf { it.lastTimestamp } && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(lastBatches)) + whenever(storage.getGroupedMessageBatches(argThat { + limit == 1 && groupName == group + })).thenReturn(ImmutableListCradleResult(outsideBatches)) + + val request = MessageGroupsSearchRequest.newBuilder().apply { + addMessageGroupBuilder().setName("test") + addResponseFormats(ResponseFormat.BASE_64.name) + bookIdBuilder.setName("test") + this.startTimestamp = startTimestamp.toTimestamp() + this.endTimestamp = endTimestamp.toTimestamp() + this.keepOpen = true + }.build() +// MessagesGroupRequest( +// groups = setOf("test"), +// startTimestamp, +// endTimestamp, +// keepOpen = true, +// BookId("test"), +// responseFormats = setOf(ResponseFormat.BASE_64), +// ) + val grpcDataProvider = createGrpcDataProvider() + GrpcTestHolder(grpcDataProvider).use { (stub) -> + val responses = stub.searchMessageGroups(request).asSequence().toList() + + assertEquals(messagesCount + 1, responses.size) { + val missing: List = + (firstBatches.asSequence() + lastBatches.asSequence()).flatMap { it.messages }.filter { stored -> + responses.none { + val messageId = it.message.messageId + messageId.connectionId.sessionAlias == stored.sessionAlias + && messageId.sequence == stored.sequence + && messageId.direction.toCradleDirection() == stored.direction + } + }.toList() + "Missing ${missing.size} message(s): $missing" + } + +// val captor = argumentCaptor() +// verify(handler, atMost(messagesCount)).handleNext(captor.capture()) +// verify(handler, never()).writeErrorMessage(any(), any(), any()) +// verify(handler, never()).writeErrorMessage(any(), any(), any()) +// val messages: List = captor.allValues +// assertEquals(messagesCount, messages.size) { +// val missing: List = +// (firstBatches.asSequence() + lastBatches.asSequence()).flatMap { it.messages }.filter { stored -> +// messages.none { +// val raw = it.storedMessage +// raw.sessionAlias == stored.sessionAlias && raw.sequence == stored.sequence && raw.direction == stored.direction +// } +// }.toList() +// "Missing ${missing.size} message(s): $missing" +// } + +// validateMessagesOrder(messages, messagesCount) + } + } + + private open class TestDecoder( + capacity: Int = 10 + ) : Decoder { + val protoQueue: Queue = ArrayBlockingQueue(capacity) + val transportQueue: Queue = ArrayBlockingQueue(capacity) + override fun sendBatchMessage( + batchBuilder: MessageGroupBatch.Builder, + requests: Collection, + session: String + ) { + protoQueue.addAll(requests) + } + + override fun sendBatchMessage( + batchBuilder: GroupBatch.Builder, + requests: Collection, + session: String + ) { + transportQueue.addAll(requests) + } + + //FIXME: implement for transport + } + + private fun createSearchMessagesHandler( + decoder: Decoder, + useTransportMode: Boolean + ) = SearchMessagesHandler( + CradleMessageExtractor(manager, DummyDataMeasurement, false), + decoder, + executor, + Configuration( + CustomConfigurationClass( + bufferPerQuery = 4, + useTransportMode = useTransportMode, + batchSizeBytes = 300, + ) + ) + ) + + private fun createGrpcDataProvider() = GrpcDataProviderImpl( + configuration, + searchHandler, + searchEventsHandler, + generalCradleHandler, + measurement + ) + + private open class MessageResponseHandlerTestImpl( + measurement: DataMeasurement, + maxQueue: Int = 0, + ) : MessageResponseHandler(measurement, maxQueue) { + override fun handleNextInternal(data: RequestedMessageDetails) { + } + + override fun complete() { + } + + override fun writeErrorMessage(text: String, id: String?, batchId: String?) { + } + + override fun writeErrorMessage(error: Throwable, id: String?, batchId: String?) { + } + } + + private class GrpcTestHolder( + service: BindableService + ) : AutoCloseable { + private val inProcessServer: Server = InProcessServerBuilder + .forName(SERVER_NAME) + .addService(service) + .directExecutor() + .build() + .also(Server::start) + + private val inProcessChannel: ManagedChannel = InProcessChannelBuilder + .forName(SERVER_NAME) + .directExecutor() + .build() + + val stub: DataProviderGrpc.DataProviderBlockingStub = DataProviderGrpc.newBlockingStub(inProcessChannel) + + operator fun component1(): DataProviderGrpc.DataProviderBlockingStub = stub + + override fun close() { + LOGGER.info { "Shutdown process channel" } + inProcessChannel.shutdown() + if (!inProcessChannel.awaitTermination(1, TimeUnit.MINUTES)) { + LOGGER.warn { "Process channel couldn't stop during 1 min" } + inProcessChannel.shutdownNow() + LOGGER.warn { "Process channel shutdown now, is terminated: ${inProcessChannel.isTerminated}" } + } + LOGGER.info { "Shutdown process server" } + inProcessServer.shutdown() + if (!inProcessServer.awaitTermination(1, TimeUnit.MINUTES)) { + LOGGER.warn { "Process server couldn't stop during 1 min" } + inProcessServer.shutdownNow() + LOGGER.warn { "Process server shutdown now, is terminated: ${inProcessChannel.isTerminated}" } + } + } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + + private const val SERVER_NAME = "server" + } +} \ No newline at end of file From b8021f0293ccfa6bfd2e3d7585b4f795f71bff43 Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 7 Oct 2024 12:06:53 +0300 Subject: [PATCH 2/7] bump th2 plugin version --- README.md | 1 + gradle/libs.versions.toml | 2 +- grpc/README.md | 5 ++++- grpc/gradle.properties | 2 +- grpc/package_info.json | 2 +- utils/README.md | 5 ++++- utils/gradle.properties | 2 +- 7 files changed, 13 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 9cc22a03..17e6f371 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,7 @@ spec: ## 2.11.1 + Support `keepOpen` option for `searchMessageGroups` gRPC request ++ th2 gradle plugin `0.1.3` ## 2.11.0 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index efa0fce9..5a6af1ae 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,6 +1,6 @@ [versions] kotlin = "1.8.22" -th2-plugin = "0.1.1" +th2-plugin = "0.1.3" strikt = "0.34.1" javalin = "5.6.5" openapi = "5.6.4" diff --git a/grpc/README.md b/grpc/README.md index 40dc2024..ce6fcb04 100644 --- a/grpc/README.md +++ b/grpc/README.md @@ -1,9 +1,12 @@ -# gRPC for lw-data-provider (2.3.3) +# gRPC for lw-data-provider (2.3.4) ## Release notes: ### 2.3.3 +#### Updates: ++ th2 gradle plugin `0.1.3` + #### Updates: + th2 gradle plugin `0.0.8` diff --git a/grpc/gradle.properties b/grpc/gradle.properties index 8d10f752..19dab062 100644 --- a/grpc/gradle.properties +++ b/grpc/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=2.3.3 +release_version=2.3.4 description='th2 Lightweight data provider gRPC' \ No newline at end of file diff --git a/grpc/package_info.json b/grpc/package_info.json index bb3dfeea..2684b138 100644 --- a/grpc/package_info.json +++ b/grpc/package_info.json @@ -1,4 +1,4 @@ { "package_name": "th2_grpc_lw_data_provider", - "package_version": "2.3.3" + "package_version": "2.3.4" } diff --git a/utils/README.md b/utils/README.md index cb529294..71f8c84a 100644 --- a/utils/README.md +++ b/utils/README.md @@ -1,9 +1,12 @@ -# utils for lw-data-provider (0.0.3) +# utils for lw-data-provider (0.0.4) # Release notes: ## 0.0.3 +### Updates: ++ th2 gradle plugin `0.1.3` + ### Updates: + th2 gradle plugin `0.0.8` + common: `5.12.0-dev` diff --git a/utils/gradle.properties b/utils/gradle.properties index 09930ec6..407c8215 100644 --- a/utils/gradle.properties +++ b/utils/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=0.0.3 +release_version=0.0.4 description='th2 Lightweight data provider utils' \ No newline at end of file From 52176ecfe47bc99224f08b59fd500d923cf1b935 Mon Sep 17 00:00:00 2001 From: "nikita.smirnov" Date: Tue, 15 Oct 2024 10:01:59 +0400 Subject: [PATCH 3/7] [th2-1928] corrected readme --- grpc/README.md | 4 +++- utils/README.md | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/grpc/README.md b/grpc/README.md index ce6fcb04..7fa1b72c 100644 --- a/grpc/README.md +++ b/grpc/README.md @@ -2,11 +2,13 @@ ## Release notes: -### 2.3.3 +### 2.3.4 #### Updates: + th2 gradle plugin `0.1.3` +### 2.3.3 + #### Updates: + th2 gradle plugin `0.0.8` diff --git a/utils/README.md b/utils/README.md index 71f8c84a..d861101a 100644 --- a/utils/README.md +++ b/utils/README.md @@ -2,11 +2,13 @@ # Release notes: -## 0.0.3 +## 0.0.4 ### Updates: + th2 gradle plugin `0.1.3` +## 0.0.3 + ### Updates: + th2 gradle plugin `0.0.8` + common: `5.12.0-dev` From 274a8be56e7ba690a0dd67965b2496978bfe7cf7 Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 7 Oct 2024 12:06:53 +0300 Subject: [PATCH 4/7] Make sure that Backpreassure provider is closed on RST_STREAM --- .../grpc/GrpcDataProviderBackPressure.kt | 101 ++++-- .../grpc/GrpcDataProviderImpl.kt | 2 +- .../th2/lwdataprovider/grpc/GRPCBaseTests.kt | 114 ++++++ .../grpc/GrpcDataProviderBackPressureTest.kt | 42 +++ .../grpc/GrpcDataProviderImplTest.kt | 328 ++---------------- .../lwdataprovider/grpc/GrpcImplTestBase.kt | 147 ++++++++ .../lwdataprovider/util/VerificationUtil.kt | 23 ++ grpc/README.md | 3 + utils/README.md | 3 + 9 files changed, 436 insertions(+), 327 deletions(-) create mode 100644 app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GRPCBaseTests.kt create mode 100644 app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt create mode 100644 app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt index 115ffb02..b58a8e09 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt @@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue import java.util.concurrent.Future import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -42,9 +43,9 @@ class GrpcDataProviderBackPressure( dataMeasurement: DataMeasurement, private val scheduler: ScheduledExecutorService, ) : GrpcDataProviderImpl(configuration, searchMessagesHandler, searchEventsHandler, generalCradleHandler, dataMeasurement) { - companion object { private val logger = KotlinLogging.logger { } + private const val EVENT_POLLING_TIMEOUT = 100L } override fun processResponse( @@ -57,50 +58,90 @@ class GrpcDataProviderBackPressure( val servCallObs = responseObserver as ServerCallStreamObserver val lock = ReentrantLock() var future: Future<*>? = null + val isCancelled = AtomicBoolean(false) fun cleanBuffer() { while (buffer.poll() != null) { buffer.clear() } } + fun cancel() { - handler.cancel() - onClose(handler) - cleanBuffer() - onFinished() + if (isCancelled.compareAndSet(false, true)) { + handler.cancel() + onClose(handler) + cleanBuffer() + onFinished() + logger.info { "Stream cancelled and cleaned up" } + } } + + servCallObs.setOnCancelHandler { + logger.warn { "Execution cancelled" } + lock.withLock { + future?.cancel(true) + future = null + } + cancel() + } + servCallObs.setOnReadyHandler { - if (!handler.isAlive) + if (!handler.isAlive || isCancelled.get()) { + logger.debug { "Handler no longer alive or already cancelled, skipping processing" } return@setOnReadyHandler + } + lock.withLock { future?.cancel(false) future = null } + var inProcess = true - while (servCallObs.isReady && inProcess) { + while (servCallObs.isReady && inProcess && !isCancelled.get()) { if (servCallObs.isCancelled) { logger.warn { "Request is canceled during processing" } - handler.cancel() + cancel() return@setOnReadyHandler } - val event = buffer.take() - if (event.close) { - servCallObs.onCompleted() - inProcess = false - onFinished() - onClose(handler) - logger.info { "Executing finished successfully" } - } else if (event.error != null) { - servCallObs.onError(event.error) - inProcess = false - onFinished() - handler.complete() - logger.warn(event.error) { "Executing finished with error" } - } else { - converter.invoke(event)?.let { servCallObs.onNext(it) } + + try { + // We need to poll because if we will use take and keepOpen option it is possible that we will have to wait here indefinitely + val event = buffer.poll(EVENT_POLLING_TIMEOUT, TimeUnit.MILLISECONDS) ?: continue + when { + event.close -> { + servCallObs.onCompleted() + inProcess = false + onFinished() + onClose(handler) + logger.info { "Executing finished successfully" } + } + event.error != null -> { + servCallObs.onError(event.error) + inProcess = false + onFinished() + handler.complete() + logger.warn(event.error) { "Executing finished with error" } + } + else -> { + converter.invoke(event)?.let { servCallObs.onNext(it) } + } + } + } catch (e: InterruptedException) { + logger.warn(e) { "Processing interrupted" } + cancel() + return@setOnReadyHandler + } catch (e: Exception) { + logger.error(e) { "Error processing event" } + servCallObs.onError(Status.INTERNAL + .withDescription("Internal error during processing") + .withCause(e) + .asRuntimeException()) + cancel() + return@setOnReadyHandler } } - if (inProcess) { + + if (inProcess && !isCancelled.get()) { lock.withLock { future = scheduler.schedule({ runCatching { @@ -118,20 +159,10 @@ class GrpcDataProviderBackPressure( }, configuration.grpcBackPressureReadinessTimeoutMls, TimeUnit.MILLISECONDS) } } + if (!servCallObs.isReady) { logger.trace { "Suspending processing because the opposite side is not ready to receive more messages. In queue: ${buffer.size}" } } } - - servCallObs.setOnCancelHandler { - logger.warn{ "Execution cancelled" } - lock.withLock { - future?.cancel(true) - future = null - } - cancel() - } - - } } \ No newline at end of file diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt index 3c93bf8a..83c28f79 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt @@ -244,7 +244,7 @@ open class GrpcDataProviderImpl( } } - protected open fun processResponse( + open fun processResponse( responseObserver: StreamObserver, buffer: BlockingQueue, handler: CancelableResponseHandler, diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GRPCBaseTests.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GRPCBaseTests.kt new file mode 100644 index 00000000..662e0c0a --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GRPCBaseTests.kt @@ -0,0 +1,114 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.cradle.messages.StoredMessage +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat +import com.exactpro.th2.lwdataprovider.util.ImmutableListCradleResult +import com.exactpro.th2.lwdataprovider.util.createBatches +import com.exactpro.th2.lwdataprovider.util.validateMessagesOrderGrpc +import org.junit.jupiter.api.Assertions.assertEquals +import org.mockito.kotlin.argThat +import org.mockito.kotlin.whenever +import java.time.Instant +import java.time.temporal.ChronoUnit + +abstract class GRPCBaseTests : GrpcImplTestBase() { + + protected fun stopsPullingDataWhenOutOfRangeExists(offsetNewData: Boolean) { + val startTimestamp = Instant.now() + val firstEndTimestamp = startTimestamp.plus(10L, ChronoUnit.MINUTES) + val endTimestamp = firstEndTimestamp.plus(10L, ChronoUnit.MINUTES) + val aliasesCount = 5 + val increase = 5L + val firstBatchMessagesCount = (firstEndTimestamp.epochSecond - startTimestamp.epochSecond) / increase + val firstMessagesPerAlias = firstBatchMessagesCount / aliasesCount + + val lastBatchMessagesCount = (endTimestamp.epochSecond - firstEndTimestamp.epochSecond) / increase + val lastMessagesPerAlias = lastBatchMessagesCount / aliasesCount + + val firstBatches = createBatches( + firstMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + startTimestamp, + firstEndTimestamp, + ) + val lastBatches = createBatches( + lastMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + firstEndTimestamp, + endTimestamp, + aliasIndexOffset = if (offsetNewData) aliasesCount else 0 + ) + val outsideBatches = createBatches( + 10, + 1, + 0, + increase, + endTimestamp.plusNanos(1), + endTimestamp.plus(5, ChronoUnit.MINUTES), + ) + val group = "test" + val firstRequestMessagesCount = firstBatches.sumOf { it.messageCount } + val secondRequestMessagesCount = lastBatches.sumOf { it.messageCount } + val messagesCount = firstRequestMessagesCount + secondRequestMessagesCount + + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == startTimestamp && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(firstBatches)) + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == firstBatches.maxOf { it.lastTimestamp } && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(lastBatches)) + whenever(storage.getGroupedMessageBatches(argThat { + limit == 1 && groupName == group + })).thenReturn(ImmutableListCradleResult(outsideBatches)) + + val request = MessageGroupsSearchRequest.newBuilder().apply { + addMessageGroupBuilder().setName("test") + addResponseFormats(ResponseFormat.BASE_64.name) + bookIdBuilder.setName("test") + this.startTimestamp = startTimestamp.toTimestamp() + this.endTimestamp = endTimestamp.toTimestamp() + this.keepOpen = true + }.build() + + val grpcDataProvider = createGrpcDataProvider() + GrpcTestHolder(grpcDataProvider).use { (stub) -> + val responses = stub.searchMessageGroups(request).asSequence().toList() + + assertEquals(messagesCount + 1, responses.size) { + val missing: List = + (firstBatches.asSequence() + lastBatches.asSequence()).flatMap { it.messages }.filter { stored -> + responses.none { + val messageId = it.message.messageId + messageId.connectionId.sessionAlias == stored.sessionAlias + && messageId.sequence == stored.sequence + && messageId.direction.toCradleDirection() == stored.direction + } + }.toList() + "Missing ${missing.size} message(s): $missing" + } + + validateMessagesOrderGrpc(responses, messagesCount) + } + } +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt new file mode 100644 index 00000000..3a739a2a --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt @@ -0,0 +1,42 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import java.util.concurrent.Executors + +class GrpcDataProviderBackPressureTest : GRPCBaseTests() { + + val backPressureExecutor = Executors.newSingleThreadScheduledExecutor() + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `stops pulling if data out of range exist`(offsetNewData: Boolean) { + this.stopsPullingDataWhenOutOfRangeExists(offsetNewData) + } + + override fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase = GrpcDataProviderBackPressure( + configuration, + searchHandler, + searchEventsHandler, + generalCradleHandler, + measurement, + backPressureExecutor + ) +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt index b2cf84e1..ec190b46 100644 --- a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt @@ -1,292 +1,38 @@ -/* - * Copyright 2024 Exactpro (Exactpro Systems Limited) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.exactpro.th2.lwdataprovider.grpc - -import com.exactpro.cradle.CradleManager -import com.exactpro.cradle.CradleStorage -import com.exactpro.cradle.messages.StoredMessage -import com.exactpro.th2.common.grpc.MessageGroupBatch -import com.exactpro.th2.common.message.toTimestamp -import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch -import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc -import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest -import com.exactpro.th2.lwdataprovider.Decoder -import com.exactpro.th2.lwdataprovider.RequestedMessageDetails -import com.exactpro.th2.lwdataprovider.configuration.Configuration -import com.exactpro.th2.lwdataprovider.configuration.CustomConfigurationClass -import com.exactpro.th2.lwdataprovider.db.CradleMessageExtractor -import com.exactpro.th2.lwdataprovider.db.DataMeasurement -import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat -import com.exactpro.th2.lwdataprovider.handlers.GeneralCradleHandler -import com.exactpro.th2.lwdataprovider.handlers.MessageResponseHandler -import com.exactpro.th2.lwdataprovider.handlers.SearchEventsHandler -import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler -import com.exactpro.th2.lwdataprovider.util.DummyDataMeasurement -import com.exactpro.th2.lwdataprovider.util.ImmutableListCradleResult -import com.exactpro.th2.lwdataprovider.util.createBatches -import io.github.oshai.kotlinlogging.KotlinLogging -import io.grpc.BindableService -import io.grpc.ManagedChannel -import io.grpc.Server -import io.grpc.inprocess.InProcessChannelBuilder -import io.grpc.inprocess.InProcessServerBuilder -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource -import org.mockito.kotlin.any -import org.mockito.kotlin.argThat -import org.mockito.kotlin.doReturn -import org.mockito.kotlin.mock -import org.mockito.kotlin.spy -import org.mockito.kotlin.whenever -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.* -import java.util.concurrent.ArrayBlockingQueue -import java.util.concurrent.Executor -import java.util.concurrent.TimeUnit - -// FIXME: refactor and extended -class GrpcDataProviderImplTest { - private val executor: Executor = Executor { it.run() } - private val storage = mock() - private val manager = mock { - on { storage } doReturn storage - } - private val searchEventsHandler: SearchEventsHandler = mock { } - private val generalCradleHandler: GeneralCradleHandler = mock { } - private val measurement: DataMeasurement = mock { - on { start(any()) } doReturn mock { } - } - private val decoder = spy(TestDecoder()) - private val searchHandler = createSearchMessagesHandler(decoder, false) - private val configuration = Configuration(CustomConfigurationClass()) - - @ParameterizedTest - @ValueSource(booleans = [true, false]) - fun `stops pulling if data out of range exist`(offsetNewData: Boolean) { - val startTimestamp = Instant.now() - val firstEndTimestamp = startTimestamp.plus(10L, ChronoUnit.MINUTES) - val endTimestamp = firstEndTimestamp.plus(10L, ChronoUnit.MINUTES) - val aliasesCount = 5 - val increase = 5L - val firstBatchMessagesCount = (firstEndTimestamp.epochSecond - startTimestamp.epochSecond) / increase - val firstMessagesPerAlias = firstBatchMessagesCount / aliasesCount - - val lastBatchMessagesCount = (endTimestamp.epochSecond - firstEndTimestamp.epochSecond) / increase - val lastMessagesPerAlias = lastBatchMessagesCount / aliasesCount - - val firstBatches = createBatches( - firstMessagesPerAlias, - aliasesCount, - overlapCount = 0, - increase, - startTimestamp, - firstEndTimestamp, - ) - val lastBatches = createBatches( - lastMessagesPerAlias, - aliasesCount, - overlapCount = 0, - increase, - firstEndTimestamp, - endTimestamp, - aliasIndexOffset = if (offsetNewData) aliasesCount else 0 - ) - val outsideBatches = createBatches( - 10, - 1, - 0, - increase, - endTimestamp.plusNanos(1), - endTimestamp.plus(5, ChronoUnit.MINUTES), - ) - val group = "test" - val firstRequestMessagesCount = firstBatches.sumOf { it.messageCount } - val secondRequestMessagesCount = lastBatches.sumOf { it.messageCount } - val messagesCount = firstRequestMessagesCount + secondRequestMessagesCount - - whenever(storage.getGroupedMessageBatches(argThat { - groupName == group && from.value == startTimestamp && to.value == endTimestamp - })).thenReturn(ImmutableListCradleResult(firstBatches)) - whenever(storage.getGroupedMessageBatches(argThat { - groupName == group && from.value == firstBatches.maxOf { it.lastTimestamp } && to.value == endTimestamp - })).thenReturn(ImmutableListCradleResult(lastBatches)) - whenever(storage.getGroupedMessageBatches(argThat { - limit == 1 && groupName == group - })).thenReturn(ImmutableListCradleResult(outsideBatches)) - - val request = MessageGroupsSearchRequest.newBuilder().apply { - addMessageGroupBuilder().setName("test") - addResponseFormats(ResponseFormat.BASE_64.name) - bookIdBuilder.setName("test") - this.startTimestamp = startTimestamp.toTimestamp() - this.endTimestamp = endTimestamp.toTimestamp() - this.keepOpen = true - }.build() -// MessagesGroupRequest( -// groups = setOf("test"), -// startTimestamp, -// endTimestamp, -// keepOpen = true, -// BookId("test"), -// responseFormats = setOf(ResponseFormat.BASE_64), -// ) - val grpcDataProvider = createGrpcDataProvider() - GrpcTestHolder(grpcDataProvider).use { (stub) -> - val responses = stub.searchMessageGroups(request).asSequence().toList() - - assertEquals(messagesCount + 1, responses.size) { - val missing: List = - (firstBatches.asSequence() + lastBatches.asSequence()).flatMap { it.messages }.filter { stored -> - responses.none { - val messageId = it.message.messageId - messageId.connectionId.sessionAlias == stored.sessionAlias - && messageId.sequence == stored.sequence - && messageId.direction.toCradleDirection() == stored.direction - } - }.toList() - "Missing ${missing.size} message(s): $missing" - } - -// val captor = argumentCaptor() -// verify(handler, atMost(messagesCount)).handleNext(captor.capture()) -// verify(handler, never()).writeErrorMessage(any(), any(), any()) -// verify(handler, never()).writeErrorMessage(any(), any(), any()) -// val messages: List = captor.allValues -// assertEquals(messagesCount, messages.size) { -// val missing: List = -// (firstBatches.asSequence() + lastBatches.asSequence()).flatMap { it.messages }.filter { stored -> -// messages.none { -// val raw = it.storedMessage -// raw.sessionAlias == stored.sessionAlias && raw.sequence == stored.sequence && raw.direction == stored.direction -// } -// }.toList() -// "Missing ${missing.size} message(s): $missing" -// } - -// validateMessagesOrder(messages, messagesCount) - } - } - - private open class TestDecoder( - capacity: Int = 10 - ) : Decoder { - val protoQueue: Queue = ArrayBlockingQueue(capacity) - val transportQueue: Queue = ArrayBlockingQueue(capacity) - override fun sendBatchMessage( - batchBuilder: MessageGroupBatch.Builder, - requests: Collection, - session: String - ) { - protoQueue.addAll(requests) - } - - override fun sendBatchMessage( - batchBuilder: GroupBatch.Builder, - requests: Collection, - session: String - ) { - transportQueue.addAll(requests) - } - - //FIXME: implement for transport - } - - private fun createSearchMessagesHandler( - decoder: Decoder, - useTransportMode: Boolean - ) = SearchMessagesHandler( - CradleMessageExtractor(manager, DummyDataMeasurement, false), - decoder, - executor, - Configuration( - CustomConfigurationClass( - bufferPerQuery = 4, - useTransportMode = useTransportMode, - batchSizeBytes = 300, - ) - ) - ) - - private fun createGrpcDataProvider() = GrpcDataProviderImpl( - configuration, - searchHandler, - searchEventsHandler, - generalCradleHandler, - measurement - ) - - private open class MessageResponseHandlerTestImpl( - measurement: DataMeasurement, - maxQueue: Int = 0, - ) : MessageResponseHandler(measurement, maxQueue) { - override fun handleNextInternal(data: RequestedMessageDetails) { - } - - override fun complete() { - } - - override fun writeErrorMessage(text: String, id: String?, batchId: String?) { - } - - override fun writeErrorMessage(error: Throwable, id: String?, batchId: String?) { - } - } - - private class GrpcTestHolder( - service: BindableService - ) : AutoCloseable { - private val inProcessServer: Server = InProcessServerBuilder - .forName(SERVER_NAME) - .addService(service) - .directExecutor() - .build() - .also(Server::start) - - private val inProcessChannel: ManagedChannel = InProcessChannelBuilder - .forName(SERVER_NAME) - .directExecutor() - .build() - - val stub: DataProviderGrpc.DataProviderBlockingStub = DataProviderGrpc.newBlockingStub(inProcessChannel) - - operator fun component1(): DataProviderGrpc.DataProviderBlockingStub = stub - - override fun close() { - LOGGER.info { "Shutdown process channel" } - inProcessChannel.shutdown() - if (!inProcessChannel.awaitTermination(1, TimeUnit.MINUTES)) { - LOGGER.warn { "Process channel couldn't stop during 1 min" } - inProcessChannel.shutdownNow() - LOGGER.warn { "Process channel shutdown now, is terminated: ${inProcessChannel.isTerminated}" } - } - LOGGER.info { "Shutdown process server" } - inProcessServer.shutdown() - if (!inProcessServer.awaitTermination(1, TimeUnit.MINUTES)) { - LOGGER.warn { "Process server couldn't stop during 1 min" } - inProcessServer.shutdownNow() - LOGGER.warn { "Process server shutdown now, is terminated: ${inProcessChannel.isTerminated}" } - } - } - } - - companion object { - private val LOGGER = KotlinLogging.logger { } - - private const val SERVER_NAME = "server" - } +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +class GrpcDataProviderImplTest : GRPCBaseTests() { + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `stops pulling if data out of range exist`(offsetNewData: Boolean) { + this.stopsPullingDataWhenOutOfRangeExists(offsetNewData) + } + + override fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase = GrpcDataProviderImpl( + configuration, + searchHandler, + searchEventsHandler, + generalCradleHandler, + measurement + ) } \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt new file mode 100644 index 00000000..cb548ed7 --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt @@ -0,0 +1,147 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.cradle.CradleManager +import com.exactpro.cradle.CradleStorage +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import com.exactpro.th2.lwdataprovider.Decoder +import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.configuration.Configuration +import com.exactpro.th2.lwdataprovider.configuration.CustomConfigurationClass +import com.exactpro.th2.lwdataprovider.db.CradleMessageExtractor +import com.exactpro.th2.lwdataprovider.db.DataMeasurement +import com.exactpro.th2.lwdataprovider.handlers.GeneralCradleHandler +import com.exactpro.th2.lwdataprovider.handlers.SearchEventsHandler +import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler +import com.exactpro.th2.lwdataprovider.util.DummyDataMeasurement +import io.github.oshai.kotlinlogging.KotlinLogging +import io.grpc.BindableService +import io.grpc.ManagedChannel +import io.grpc.Server +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import org.mockito.kotlin.any +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.spy +import java.util.Queue +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Executor +import java.util.concurrent.TimeUnit + + +abstract class GrpcImplTestBase { + protected val executor: Executor = Executor { it.run() } + protected val storage = mock() + protected val manager = mock { + on { storage } doReturn storage + } + protected val searchEventsHandler: SearchEventsHandler = mock { } + protected val generalCradleHandler: GeneralCradleHandler = mock { } + protected val measurement: DataMeasurement = mock { + on { start(any()) } doReturn mock { } + } + protected val decoder = spy(TestDecoder()) + protected val searchHandler = createSearchMessagesHandler(decoder, false) + protected val configuration = Configuration(CustomConfigurationClass()) + + protected open class TestDecoder( + capacity: Int = 10 + ) : Decoder { + val protoQueue: Queue = ArrayBlockingQueue(capacity) + val transportQueue: Queue = ArrayBlockingQueue(capacity) + override fun sendBatchMessage( + batchBuilder: MessageGroupBatch.Builder, + requests: Collection, + session: String + ) { + protoQueue.addAll(requests) + } + + override fun sendBatchMessage( + batchBuilder: GroupBatch.Builder, + requests: Collection, + session: String + ) { + transportQueue.addAll(requests) + } + } + + private fun createSearchMessagesHandler( + decoder: Decoder, + useTransportMode: Boolean + ) = SearchMessagesHandler( + CradleMessageExtractor(manager, DummyDataMeasurement, false), + decoder, + executor, + Configuration( + CustomConfigurationClass( + bufferPerQuery = 4, + useTransportMode = useTransportMode, + batchSizeBytes = 300, + ) + ) + ) + + abstract fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase + + protected class GrpcTestHolder( + service: BindableService + ) : AutoCloseable { + private val inProcessServer: Server = InProcessServerBuilder + .forName(SERVER_NAME) + .addService(service) + .directExecutor() + .build() + .also(Server::start) + + private val inProcessChannel: ManagedChannel = InProcessChannelBuilder + .forName(SERVER_NAME) + .directExecutor() + .build() + + val stub: DataProviderGrpc.DataProviderBlockingStub = DataProviderGrpc.newBlockingStub(inProcessChannel) + + operator fun component1(): DataProviderGrpc.DataProviderBlockingStub = stub + + override fun close() { + LOGGER.info { "Shutdown process channel" } + inProcessChannel.shutdown() + if (!inProcessChannel.awaitTermination(1, TimeUnit.MINUTES)) { + LOGGER.warn { "Process channel couldn't stop during 1 min" } + inProcessChannel.shutdownNow() + LOGGER.warn { "Process channel shutdown now, is terminated: ${inProcessChannel.isTerminated}" } + } + LOGGER.info { "Shutdown process server" } + inProcessServer.shutdown() + if (!inProcessServer.awaitTermination(1, TimeUnit.MINUTES)) { + LOGGER.warn { "Process server couldn't stop during 1 min" } + inProcessServer.shutdownNow() + LOGGER.warn { "Process server shutdown now, is terminated: ${inProcessChannel.isTerminated}" } + } + } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + + private const val SERVER_NAME = "server" + } +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt index 1e855465..8da89d5b 100644 --- a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt @@ -19,7 +19,10 @@ package com.exactpro.th2.lwdataprovider.util import com.exactpro.cradle.Order import com.exactpro.cradle.messages.StoredMessage import com.exactpro.cradle.messages.StoredMessageId +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.grpc.toInstant import com.exactpro.th2.lwdataprovider.workers.RequestId import org.junit.jupiter.api.Assertions @@ -58,4 +61,24 @@ fun validateMessagesOrder(messages: List, expectedUniqu Assertions.assertEquals(expectedUniqueMessages, ids.size) { "Unexpected number of IDs: $ids" } +} + +fun validateMessagesOrderGrpc(messages: List, expectedUniqueMessages: Int) { + var prevMessage: MessageSearchResponse? = null + val ids = HashSet(expectedUniqueMessages) + for (message in messages) { + if (message.hasMessageStreamPointers()) { + continue + } + ids += message.message.messageId + prevMessage?.also { + Assertions.assertTrue(it.message.messageId.timestamp.toInstant() <= message.message.messageId.timestamp.toInstant()) { + "Unordered messages: $it and $message" + } + } + prevMessage = message + } + Assertions.assertEquals(expectedUniqueMessages, ids.size) { + "Unexpected number of IDs: $ids" + } } \ No newline at end of file diff --git a/grpc/README.md b/grpc/README.md index 7fa1b72c..2f260f9c 100644 --- a/grpc/README.md +++ b/grpc/README.md @@ -9,6 +9,9 @@ ### 2.3.3 +#### Updates: ++ th2 gradle plugin `0.1.3` + #### Updates: + th2 gradle plugin `0.0.8` diff --git a/utils/README.md b/utils/README.md index d861101a..e2d23f8d 100644 --- a/utils/README.md +++ b/utils/README.md @@ -9,6 +9,9 @@ ## 0.0.3 +### Updates: ++ th2 gradle plugin `0.1.3` + ### Updates: + th2 gradle plugin `0.0.8` + common: `5.12.0-dev` From 0a02c00de2c9b16dfd66ad370f8e076af20163f3 Mon Sep 17 00:00:00 2001 From: Denis Date: Thu, 21 Nov 2024 18:33:07 +0300 Subject: [PATCH 5/7] Remove isCancelled --- .../grpc/GrpcDataProviderBackPressure.kt | 19 ++++++++----------- .../grpc/GrpcDataProviderImpl.kt | 2 +- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt index b58a8e09..7461adee 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt @@ -58,7 +58,6 @@ class GrpcDataProviderBackPressure( val servCallObs = responseObserver as ServerCallStreamObserver val lock = ReentrantLock() var future: Future<*>? = null - val isCancelled = AtomicBoolean(false) fun cleanBuffer() { while (buffer.poll() != null) { @@ -67,13 +66,11 @@ class GrpcDataProviderBackPressure( } fun cancel() { - if (isCancelled.compareAndSet(false, true)) { - handler.cancel() - onClose(handler) - cleanBuffer() - onFinished() - logger.info { "Stream cancelled and cleaned up" } - } + handler.cancel() + onClose(handler) + cleanBuffer() + onFinished() + logger.info { "Stream cancelled and cleaned up" } } servCallObs.setOnCancelHandler { @@ -86,7 +83,7 @@ class GrpcDataProviderBackPressure( } servCallObs.setOnReadyHandler { - if (!handler.isAlive || isCancelled.get()) { + if (!handler.isAlive) { logger.debug { "Handler no longer alive or already cancelled, skipping processing" } return@setOnReadyHandler } @@ -97,7 +94,7 @@ class GrpcDataProviderBackPressure( } var inProcess = true - while (servCallObs.isReady && inProcess && !isCancelled.get()) { + while (servCallObs.isReady && inProcess) { if (servCallObs.isCancelled) { logger.warn { "Request is canceled during processing" } cancel() @@ -141,7 +138,7 @@ class GrpcDataProviderBackPressure( } } - if (inProcess && !isCancelled.get()) { + if (inProcess && !handler.isAlive) { lock.withLock { future = scheduler.schedule({ runCatching { diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt index 83c28f79..1530345f 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt @@ -244,7 +244,7 @@ open class GrpcDataProviderImpl( } } - open fun processResponse( + internal open fun processResponse( responseObserver: StreamObserver, buffer: BlockingQueue, handler: CancelableResponseHandler, From f46103f7c4e97ac974c722450454dbd88154a6bb Mon Sep 17 00:00:00 2001 From: Denis Date: Fri, 22 Nov 2024 16:34:01 +0300 Subject: [PATCH 6/7] Add test to verify cancellation on RST_STREAM --- .../handlers/SearchMessagesHandler.kt | 1 + .../grpc/GrpcDataProviderBackPressureTest.kt | 152 ++++++++++++++++++ .../lwdataprovider/grpc/GrpcImplTestBase.kt | 11 +- 3 files changed, 161 insertions(+), 3 deletions(-) diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt index 060b6e4f..d96b74ac 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt @@ -293,6 +293,7 @@ class SearchMessagesHandler( } val lastTimestamp: Instant = request.endTimestamp val allGroupLoaded = hashSetOf() + // TODO: Maybe we need to wait between pulling attempts to not overwhelm storage with requests do { val keepPulling = pullUpdates(request, lastTimestamp, sink, parameters, allGroupLoaded) sink.canceled?.apply { diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt index 3a739a2a..325d20b2 100644 --- a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt @@ -16,14 +16,38 @@ package com.exactpro.th2.lwdataprovider.grpc +import com.exactpro.th2.common.message.toTimestamp import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.lwdataprovider.CancelableResponseHandler +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat +import com.exactpro.th2.lwdataprovider.handlers.MessageResponseHandler +import com.exactpro.th2.lwdataprovider.util.ImmutableListCradleResult +import com.exactpro.th2.lwdataprovider.util.createBatches +import io.grpc.Context +import io.grpc.StatusRuntimeException +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.assertInstanceOf +import org.junit.jupiter.api.assertThrows import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource import org.junit.jupiter.params.provider.ValueSource +import org.mockito.kotlin.whenever +import org.mockito.kotlin.any +import org.mockito.kotlin.argThat +import org.mockito.kotlin.spy +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.verify +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit class GrpcDataProviderBackPressureTest : GRPCBaseTests() { val backPressureExecutor = Executors.newSingleThreadScheduledExecutor() + val deadlineExecutor = Executors.newSingleThreadScheduledExecutor() @ParameterizedTest @ValueSource(booleans = [true, false]) @@ -31,6 +55,134 @@ class GrpcDataProviderBackPressureTest : GRPCBaseTests() { this.stopsPullingDataWhenOutOfRangeExists(offsetNewData) } + @ParameterizedTest + @CsvSource(value = ["true,true", "true,false", "false,true", "false, false"]) + @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS) + fun `stops after RST_STREAM event received`(offsetNewData: Boolean, newDataOnEveryRequest: Boolean) { + val startTimestamp = Instant.now() + val firstEndTimestamp = startTimestamp.plus(10L, ChronoUnit.MINUTES) + val endTimestamp = firstEndTimestamp.plus(10L, ChronoUnit.MINUTES) + val aliasesCount = 5 + val increase = 5L + val firstBatchMessagesCount = (firstEndTimestamp.epochSecond - startTimestamp.epochSecond) / increase + val firstMessagesPerAlias = firstBatchMessagesCount / aliasesCount + + val lastBatchMessagesCount = (endTimestamp.epochSecond - firstEndTimestamp.epochSecond) / increase + val lastMessagesPerAlias = lastBatchMessagesCount / aliasesCount + + val firstBatches = createBatches( + firstMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + startTimestamp, + firstEndTimestamp, + ) + val lastBatches = createBatches( + lastMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + firstEndTimestamp, + endTimestamp, + aliasIndexOffset = if (offsetNewData) aliasesCount else 0 + ) + val emptyBatch = createBatches( + 10, + 0, + 0, + increase, + endTimestamp.plusNanos(1), + endTimestamp.plus(5, ChronoUnit.MINUTES), + ) + val group = "test" + + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == startTimestamp && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(firstBatches)) + + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == firstBatches.maxOf { it.lastTimestamp } && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(lastBatches)) + + if(newDataOnEveryRequest) { + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(lastBatches)) + + whenever(storage.getGroupedMessageBatches(argThat { + limit == 1 && groupName == group + })).thenReturn(ImmutableListCradleResult(emptyBatch)) + } else { + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(emptyBatch)) + + whenever(storage.getGroupedMessageBatches(argThat { + limit == 1 && groupName == group + })).thenReturn(ImmutableListCradleResult(emptyBatch)) + } + + val request = MessageGroupsSearchRequest.newBuilder().apply { + addMessageGroupBuilder().setName("test") + addResponseFormats(ResponseFormat.BASE_64.name) + bookIdBuilder.setName("test") + this.startTimestamp = startTimestamp.toTimestamp() + this.endTimestamp = endTimestamp.toTimestamp() + this.keepOpen = true + }.build() + + val searchHandler = spy(searchHandler) + val requestContextCaptor = argumentCaptor() + val grpcDataProvider = GrpcDataProviderBackPressure( + configuration, + searchHandler, + searchEventsHandler, + generalCradleHandler, + measurement, + backPressureExecutor + ) + GrpcTestHolder(grpcDataProvider).use { (stub) -> + + assertThrows { + withDeadline(100) { + val iterator = stub.searchMessageGroups(request) + while (iterator.hasNext()) { + iterator.next() // to not trigger backpressure + } + } + } + + verify(searchHandler).loadMessageGroups( + any(), + requestContextCaptor.capture(), + any() + ) + + val capturedRequestContext = requestContextCaptor.firstValue + + assertInstanceOf(capturedRequestContext) + + Thread.sleep(200) + + Assertions.assertFalse(capturedRequestContext.isAlive) + } + } + + private fun withDeadline( + durationMs: Long, + code: () -> Unit + ) { + Context.current() + .withCancellation() + .withDeadlineAfter(durationMs, TimeUnit.MILLISECONDS, deadlineExecutor) + .use { context -> + context.call { + code() + } + } + } + override fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase = GrpcDataProviderBackPressure( configuration, searchHandler, diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt index cb548ed7..bce35f71 100644 --- a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt @@ -44,11 +44,13 @@ import org.mockito.kotlin.spy import java.util.Queue import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.Executor +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit abstract class GrpcImplTestBase { - protected val executor: Executor = Executor { it.run() } + + protected val executor: Executor = Executors.newSingleThreadExecutor() protected val storage = mock() protected val manager = mock { on { storage } doReturn storage @@ -105,16 +107,19 @@ abstract class GrpcImplTestBase { protected class GrpcTestHolder( service: BindableService ) : AutoCloseable { + private val serverExecutor = Executors.newFixedThreadPool(1) + private val clientExecutor = Executors.newFixedThreadPool(1) + private val inProcessServer: Server = InProcessServerBuilder .forName(SERVER_NAME) .addService(service) - .directExecutor() + .executor(serverExecutor) .build() .also(Server::start) private val inProcessChannel: ManagedChannel = InProcessChannelBuilder .forName(SERVER_NAME) - .directExecutor() + .executor(clientExecutor) .build() val stub: DataProviderGrpc.DataProviderBlockingStub = DataProviderGrpc.newBlockingStub(inProcessChannel) From 61b35e0071b9c4f9e8ad614637ccfdb784a56397 Mon Sep 17 00:00:00 2001 From: Denis Date: Fri, 22 Nov 2024 16:42:08 +0300 Subject: [PATCH 7/7] Add new configuration parameter: keepOpenPullingTimeout --- README.md | 2 ++ .../th2/lwdataprovider/configuration/Configuration.kt | 3 +++ .../th2/lwdataprovider/handlers/SearchMessagesHandler.kt | 4 +++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f08651ce..3e39f386 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,7 @@ spec: # * 0: no compression # * 1: best speed # * 9: best compression +# keepOpenPullingTimeout: 100 # time to wait in between attempts to get new data from data storage when `keepOpen` request parameter is used. pins: # pins are used to communicate with codec components to parse message data - name: to_codec @@ -182,6 +183,7 @@ spec: # validateCradleData: false # validate data loaded from cradle. NOTE: Enabled validation affect performance # codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false) # responseFormats: string list # resolve data for selected formats only. (allowed values: BASE_64, PARSED) +# keepOpenPullingTimeout: 100 # time to wait in between attempts to get new data from data storage when `keepOpen` request parameter is used. # pins are used to communicate with codec components to parse message data diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt index 3f5ebc9b..0d05b4e3 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt @@ -49,6 +49,7 @@ class CustomConfigurationClass( @JsonDeserialize(using = ByteSizeDeserializer::class) val batchSizeBytes: Int? = null, val downloadTaskTTL: Long? = null, + val keepOpenPullingTimeoutMs: Long? = null ) class Configuration(customConfiguration: CustomConfigurationClass) { @@ -79,6 +80,8 @@ class Configuration(customConfiguration: CustomConfigurationClass) { val gzipCompressionLevel: Int = VariableBuilder.getVariable(customConfiguration::gzipCompressionLevel, -1) val batchSizeBytes: Int = VariableBuilder.getVariable(customConfiguration::batchSizeBytes, 256 * 1024) val downloadTaskTTL: Long = VariableBuilder.getVariable(customConfiguration::downloadTaskTTL, TimeUnit.HOURS.toMillis(1)) + val keepOpenPullingTimeoutMs: Long = VariableBuilder.getVariable(customConfiguration::keepOpenPullingTimeoutMs, 10) + init { require(bufferPerQuery <= maxBufferDecodeQueue) { "buffer per queue ($bufferPerQuery) must be less or equal to the total buffer size ($maxBufferDecodeQueue)" diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt index d96b74ac..6674b715 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt @@ -140,6 +140,7 @@ class SearchMessagesHandler( val allLoaded = hashSetOf() do { val continuePulling = pullUpdates(request, order, sink, allLoaded) + Thread.sleep(configuration.keepOpenPullingTimeoutMs) } while (continuePulling) } } @@ -293,13 +294,14 @@ class SearchMessagesHandler( } val lastTimestamp: Instant = request.endTimestamp val allGroupLoaded = hashSetOf() - // TODO: Maybe we need to wait between pulling attempts to not overwhelm storage with requests + do { val keepPulling = pullUpdates(request, lastTimestamp, sink, parameters, allGroupLoaded) sink.canceled?.apply { logger.info { "request canceled: $message" } return@use } + Thread.sleep(configuration.keepOpenPullingTimeoutMs) } while (keepPulling) } } catch (ex: Exception) {