diff --git a/README.md b/README.md index a99e4040..3e39f386 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Lightweight data provider (2.12.1) + # Overview This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources. This component is similar to [rpt-data-provider](https://github.com/th2-net/th2-rpt-data-provider) but the last one contains additional GUI-specific logic. @@ -123,6 +124,7 @@ spec: # * 0: no compression # * 1: best speed # * 9: best compression +# keepOpenPullingTimeout: 100 # time to wait in between attempts to get new data from data storage when `keepOpen` request parameter is used. pins: # pins are used to communicate with codec components to parse message data - name: to_codec @@ -181,6 +183,7 @@ spec: # validateCradleData: false # validate data loaded from cradle. NOTE: Enabled validation affect performance # codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false) # responseFormats: string list # resolve data for selected formats only. (allowed values: BASE_64, PARSED) +# keepOpenPullingTimeout: 100 # time to wait in between attempts to get new data from data storage when `keepOpen` request parameter is used. # pins are used to communicate with codec components to parse message data @@ -224,6 +227,11 @@ spec: # Release notes: +## 2.12.1 + ++ Support `keepOpen` option for `searchMessageGroups` gRPC request ++ th2 gradle plugin `0.1.3` + ## 2.12.0 + Conversion to JSON in HTTP handlers is executed in the separate executor. diff --git a/app/build.gradle.kts b/app/build.gradle.kts index aab5d500..fdf70b40 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -63,6 +63,8 @@ dependencies { testImplementation("org.testcontainers:cassandra") testImplementation("com.datastax.oss:java-driver-core") + testImplementation("io.grpc:grpc-testing") + testImplementation("io.grpc:grpc-inprocess") } application { diff --git a/app/gradle.properties b/app/gradle.properties index 126392ea..15197833 100644 --- a/app/gradle.properties +++ b/app/gradle.properties @@ -1,4 +1,4 @@ kotlin.code.style=official -release_version=2.12.0 +release_version=2.12.1 description='th2 Lightweight data provider component' kapt.include.compile.classpath=false \ No newline at end of file diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt index 3f5ebc9b..0d05b4e3 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt @@ -49,6 +49,7 @@ class CustomConfigurationClass( @JsonDeserialize(using = ByteSizeDeserializer::class) val batchSizeBytes: Int? = null, val downloadTaskTTL: Long? = null, + val keepOpenPullingTimeoutMs: Long? = null ) class Configuration(customConfiguration: CustomConfigurationClass) { @@ -79,6 +80,8 @@ class Configuration(customConfiguration: CustomConfigurationClass) { val gzipCompressionLevel: Int = VariableBuilder.getVariable(customConfiguration::gzipCompressionLevel, -1) val batchSizeBytes: Int = VariableBuilder.getVariable(customConfiguration::batchSizeBytes, 256 * 1024) val downloadTaskTTL: Long = VariableBuilder.getVariable(customConfiguration::downloadTaskTTL, TimeUnit.HOURS.toMillis(1)) + val keepOpenPullingTimeoutMs: Long = VariableBuilder.getVariable(customConfiguration::keepOpenPullingTimeoutMs, 10) + init { require(bufferPerQuery <= maxBufferDecodeQueue) { "buffer per queue ($bufferPerQuery) must be less or equal to the total buffer size ($maxBufferDecodeQueue)" diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt index 2ed537d1..8f607284 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Exactpro (Exactpro Systems Limited) + * Copyright 2022-2024 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,7 +60,7 @@ data class MessagesGroupRequest( }, if (hasStartTimestamp()) startTimestamp.toInstant() else error("missing start timestamp"), if (hasEndTimestamp()) endTimestamp.toInstant() else error("missing end timestamp"), - false, // FIXME: update gRPC + request.keepOpen, if (hasBookId()) bookId.toCradle() else error("parameter '$BOOK_ID_PARAM' is required"), request.responseFormatsList.takeIf { it.isNotEmpty() } ?.mapTo(hashSetOf(), ResponseFormat.Companion::fromString) diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt index 115ffb02..7461adee 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressure.kt @@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue import java.util.concurrent.Future import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -42,9 +43,9 @@ class GrpcDataProviderBackPressure( dataMeasurement: DataMeasurement, private val scheduler: ScheduledExecutorService, ) : GrpcDataProviderImpl(configuration, searchMessagesHandler, searchEventsHandler, generalCradleHandler, dataMeasurement) { - companion object { private val logger = KotlinLogging.logger { } + private const val EVENT_POLLING_TIMEOUT = 100L } override fun processResponse( @@ -63,44 +64,81 @@ class GrpcDataProviderBackPressure( buffer.clear() } } + fun cancel() { handler.cancel() onClose(handler) cleanBuffer() onFinished() + logger.info { "Stream cancelled and cleaned up" } + } + + servCallObs.setOnCancelHandler { + logger.warn { "Execution cancelled" } + lock.withLock { + future?.cancel(true) + future = null + } + cancel() } + servCallObs.setOnReadyHandler { - if (!handler.isAlive) + if (!handler.isAlive) { + logger.debug { "Handler no longer alive or already cancelled, skipping processing" } return@setOnReadyHandler + } + lock.withLock { future?.cancel(false) future = null } + var inProcess = true while (servCallObs.isReady && inProcess) { if (servCallObs.isCancelled) { logger.warn { "Request is canceled during processing" } - handler.cancel() + cancel() return@setOnReadyHandler } - val event = buffer.take() - if (event.close) { - servCallObs.onCompleted() - inProcess = false - onFinished() - onClose(handler) - logger.info { "Executing finished successfully" } - } else if (event.error != null) { - servCallObs.onError(event.error) - inProcess = false - onFinished() - handler.complete() - logger.warn(event.error) { "Executing finished with error" } - } else { - converter.invoke(event)?.let { servCallObs.onNext(it) } + + try { + // We need to poll because if we will use take and keepOpen option it is possible that we will have to wait here indefinitely + val event = buffer.poll(EVENT_POLLING_TIMEOUT, TimeUnit.MILLISECONDS) ?: continue + when { + event.close -> { + servCallObs.onCompleted() + inProcess = false + onFinished() + onClose(handler) + logger.info { "Executing finished successfully" } + } + event.error != null -> { + servCallObs.onError(event.error) + inProcess = false + onFinished() + handler.complete() + logger.warn(event.error) { "Executing finished with error" } + } + else -> { + converter.invoke(event)?.let { servCallObs.onNext(it) } + } + } + } catch (e: InterruptedException) { + logger.warn(e) { "Processing interrupted" } + cancel() + return@setOnReadyHandler + } catch (e: Exception) { + logger.error(e) { "Error processing event" } + servCallObs.onError(Status.INTERNAL + .withDescription("Internal error during processing") + .withCause(e) + .asRuntimeException()) + cancel() + return@setOnReadyHandler } } - if (inProcess) { + + if (inProcess && !handler.isAlive) { lock.withLock { future = scheduler.schedule({ runCatching { @@ -118,20 +156,10 @@ class GrpcDataProviderBackPressure( }, configuration.grpcBackPressureReadinessTimeoutMls, TimeUnit.MILLISECONDS) } } + if (!servCallObs.isReady) { logger.trace { "Suspending processing because the opposite side is not ready to receive more messages. In queue: ${buffer.size}" } } } - - servCallObs.setOnCancelHandler { - logger.warn{ "Execution cancelled" } - lock.withLock { - future?.cancel(true) - future = null - } - cancel() - } - - } } \ No newline at end of file diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt index 3c93bf8a..1530345f 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImpl.kt @@ -244,7 +244,7 @@ open class GrpcDataProviderImpl( } } - protected open fun processResponse( + internal open fun processResponse( responseObserver: StreamObserver, buffer: BlockingQueue, handler: CancelableResponseHandler, diff --git a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt index 060b6e4f..6674b715 100644 --- a/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt +++ b/app/src/main/kotlin/com/exactpro/th2/lwdataprovider/handlers/SearchMessagesHandler.kt @@ -140,6 +140,7 @@ class SearchMessagesHandler( val allLoaded = hashSetOf() do { val continuePulling = pullUpdates(request, order, sink, allLoaded) + Thread.sleep(configuration.keepOpenPullingTimeoutMs) } while (continuePulling) } } @@ -293,12 +294,14 @@ class SearchMessagesHandler( } val lastTimestamp: Instant = request.endTimestamp val allGroupLoaded = hashSetOf() + do { val keepPulling = pullUpdates(request, lastTimestamp, sink, parameters, allGroupLoaded) sink.canceled?.apply { logger.info { "request canceled: $message" } return@use } + Thread.sleep(configuration.keepOpenPullingTimeoutMs) } while (keepPulling) } } catch (ex: Exception) { diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GRPCBaseTests.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GRPCBaseTests.kt new file mode 100644 index 00000000..662e0c0a --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GRPCBaseTests.kt @@ -0,0 +1,114 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.cradle.messages.StoredMessage +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat +import com.exactpro.th2.lwdataprovider.util.ImmutableListCradleResult +import com.exactpro.th2.lwdataprovider.util.createBatches +import com.exactpro.th2.lwdataprovider.util.validateMessagesOrderGrpc +import org.junit.jupiter.api.Assertions.assertEquals +import org.mockito.kotlin.argThat +import org.mockito.kotlin.whenever +import java.time.Instant +import java.time.temporal.ChronoUnit + +abstract class GRPCBaseTests : GrpcImplTestBase() { + + protected fun stopsPullingDataWhenOutOfRangeExists(offsetNewData: Boolean) { + val startTimestamp = Instant.now() + val firstEndTimestamp = startTimestamp.plus(10L, ChronoUnit.MINUTES) + val endTimestamp = firstEndTimestamp.plus(10L, ChronoUnit.MINUTES) + val aliasesCount = 5 + val increase = 5L + val firstBatchMessagesCount = (firstEndTimestamp.epochSecond - startTimestamp.epochSecond) / increase + val firstMessagesPerAlias = firstBatchMessagesCount / aliasesCount + + val lastBatchMessagesCount = (endTimestamp.epochSecond - firstEndTimestamp.epochSecond) / increase + val lastMessagesPerAlias = lastBatchMessagesCount / aliasesCount + + val firstBatches = createBatches( + firstMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + startTimestamp, + firstEndTimestamp, + ) + val lastBatches = createBatches( + lastMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + firstEndTimestamp, + endTimestamp, + aliasIndexOffset = if (offsetNewData) aliasesCount else 0 + ) + val outsideBatches = createBatches( + 10, + 1, + 0, + increase, + endTimestamp.plusNanos(1), + endTimestamp.plus(5, ChronoUnit.MINUTES), + ) + val group = "test" + val firstRequestMessagesCount = firstBatches.sumOf { it.messageCount } + val secondRequestMessagesCount = lastBatches.sumOf { it.messageCount } + val messagesCount = firstRequestMessagesCount + secondRequestMessagesCount + + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == startTimestamp && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(firstBatches)) + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == firstBatches.maxOf { it.lastTimestamp } && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(lastBatches)) + whenever(storage.getGroupedMessageBatches(argThat { + limit == 1 && groupName == group + })).thenReturn(ImmutableListCradleResult(outsideBatches)) + + val request = MessageGroupsSearchRequest.newBuilder().apply { + addMessageGroupBuilder().setName("test") + addResponseFormats(ResponseFormat.BASE_64.name) + bookIdBuilder.setName("test") + this.startTimestamp = startTimestamp.toTimestamp() + this.endTimestamp = endTimestamp.toTimestamp() + this.keepOpen = true + }.build() + + val grpcDataProvider = createGrpcDataProvider() + GrpcTestHolder(grpcDataProvider).use { (stub) -> + val responses = stub.searchMessageGroups(request).asSequence().toList() + + assertEquals(messagesCount + 1, responses.size) { + val missing: List = + (firstBatches.asSequence() + lastBatches.asSequence()).flatMap { it.messages }.filter { stored -> + responses.none { + val messageId = it.message.messageId + messageId.connectionId.sessionAlias == stored.sessionAlias + && messageId.sequence == stored.sequence + && messageId.direction.toCradleDirection() == stored.direction + } + }.toList() + "Missing ${missing.size} message(s): $missing" + } + + validateMessagesOrderGrpc(responses, messagesCount) + } + } +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt new file mode 100644 index 00000000..325d20b2 --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderBackPressureTest.kt @@ -0,0 +1,194 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.th2.common.message.toTimestamp +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import com.exactpro.th2.dataprovider.lw.grpc.MessageGroupsSearchRequest +import com.exactpro.th2.lwdataprovider.CancelableResponseHandler +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat +import com.exactpro.th2.lwdataprovider.handlers.MessageResponseHandler +import com.exactpro.th2.lwdataprovider.util.ImmutableListCradleResult +import com.exactpro.th2.lwdataprovider.util.createBatches +import io.grpc.Context +import io.grpc.StatusRuntimeException +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.assertInstanceOf +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource +import org.junit.jupiter.params.provider.ValueSource +import org.mockito.kotlin.whenever +import org.mockito.kotlin.any +import org.mockito.kotlin.argThat +import org.mockito.kotlin.spy +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.verify +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +class GrpcDataProviderBackPressureTest : GRPCBaseTests() { + + val backPressureExecutor = Executors.newSingleThreadScheduledExecutor() + val deadlineExecutor = Executors.newSingleThreadScheduledExecutor() + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `stops pulling if data out of range exist`(offsetNewData: Boolean) { + this.stopsPullingDataWhenOutOfRangeExists(offsetNewData) + } + + @ParameterizedTest + @CsvSource(value = ["true,true", "true,false", "false,true", "false, false"]) + @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS) + fun `stops after RST_STREAM event received`(offsetNewData: Boolean, newDataOnEveryRequest: Boolean) { + val startTimestamp = Instant.now() + val firstEndTimestamp = startTimestamp.plus(10L, ChronoUnit.MINUTES) + val endTimestamp = firstEndTimestamp.plus(10L, ChronoUnit.MINUTES) + val aliasesCount = 5 + val increase = 5L + val firstBatchMessagesCount = (firstEndTimestamp.epochSecond - startTimestamp.epochSecond) / increase + val firstMessagesPerAlias = firstBatchMessagesCount / aliasesCount + + val lastBatchMessagesCount = (endTimestamp.epochSecond - firstEndTimestamp.epochSecond) / increase + val lastMessagesPerAlias = lastBatchMessagesCount / aliasesCount + + val firstBatches = createBatches( + firstMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + startTimestamp, + firstEndTimestamp, + ) + val lastBatches = createBatches( + lastMessagesPerAlias, + aliasesCount, + overlapCount = 0, + increase, + firstEndTimestamp, + endTimestamp, + aliasIndexOffset = if (offsetNewData) aliasesCount else 0 + ) + val emptyBatch = createBatches( + 10, + 0, + 0, + increase, + endTimestamp.plusNanos(1), + endTimestamp.plus(5, ChronoUnit.MINUTES), + ) + val group = "test" + + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == startTimestamp && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(firstBatches)) + + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && from.value == firstBatches.maxOf { it.lastTimestamp } && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(lastBatches)) + + if(newDataOnEveryRequest) { + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(lastBatches)) + + whenever(storage.getGroupedMessageBatches(argThat { + limit == 1 && groupName == group + })).thenReturn(ImmutableListCradleResult(emptyBatch)) + } else { + whenever(storage.getGroupedMessageBatches(argThat { + groupName == group && to.value == endTimestamp + })).thenReturn(ImmutableListCradleResult(emptyBatch)) + + whenever(storage.getGroupedMessageBatches(argThat { + limit == 1 && groupName == group + })).thenReturn(ImmutableListCradleResult(emptyBatch)) + } + + val request = MessageGroupsSearchRequest.newBuilder().apply { + addMessageGroupBuilder().setName("test") + addResponseFormats(ResponseFormat.BASE_64.name) + bookIdBuilder.setName("test") + this.startTimestamp = startTimestamp.toTimestamp() + this.endTimestamp = endTimestamp.toTimestamp() + this.keepOpen = true + }.build() + + val searchHandler = spy(searchHandler) + val requestContextCaptor = argumentCaptor() + val grpcDataProvider = GrpcDataProviderBackPressure( + configuration, + searchHandler, + searchEventsHandler, + generalCradleHandler, + measurement, + backPressureExecutor + ) + GrpcTestHolder(grpcDataProvider).use { (stub) -> + + assertThrows { + withDeadline(100) { + val iterator = stub.searchMessageGroups(request) + while (iterator.hasNext()) { + iterator.next() // to not trigger backpressure + } + } + } + + verify(searchHandler).loadMessageGroups( + any(), + requestContextCaptor.capture(), + any() + ) + + val capturedRequestContext = requestContextCaptor.firstValue + + assertInstanceOf(capturedRequestContext) + + Thread.sleep(200) + + Assertions.assertFalse(capturedRequestContext.isAlive) + } + } + + private fun withDeadline( + durationMs: Long, + code: () -> Unit + ) { + Context.current() + .withCancellation() + .withDeadlineAfter(durationMs, TimeUnit.MILLISECONDS, deadlineExecutor) + .use { context -> + context.call { + code() + } + } + } + + override fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase = GrpcDataProviderBackPressure( + configuration, + searchHandler, + searchEventsHandler, + generalCradleHandler, + measurement, + backPressureExecutor + ) +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt new file mode 100644 index 00000000..ec190b46 --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcDataProviderImplTest.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +class GrpcDataProviderImplTest : GRPCBaseTests() { + + @ParameterizedTest + @ValueSource(booleans = [true, false]) + fun `stops pulling if data out of range exist`(offsetNewData: Boolean) { + this.stopsPullingDataWhenOutOfRangeExists(offsetNewData) + } + + override fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase = GrpcDataProviderImpl( + configuration, + searchHandler, + searchEventsHandler, + generalCradleHandler, + measurement + ) +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt new file mode 100644 index 00000000..bce35f71 --- /dev/null +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/grpc/GrpcImplTestBase.kt @@ -0,0 +1,152 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.grpc + +import com.exactpro.cradle.CradleManager +import com.exactpro.cradle.CradleStorage +import com.exactpro.th2.common.grpc.MessageGroupBatch +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.GroupBatch +import com.exactpro.th2.dataprovider.lw.grpc.DataProviderGrpc +import com.exactpro.th2.lwdataprovider.Decoder +import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.configuration.Configuration +import com.exactpro.th2.lwdataprovider.configuration.CustomConfigurationClass +import com.exactpro.th2.lwdataprovider.db.CradleMessageExtractor +import com.exactpro.th2.lwdataprovider.db.DataMeasurement +import com.exactpro.th2.lwdataprovider.handlers.GeneralCradleHandler +import com.exactpro.th2.lwdataprovider.handlers.SearchEventsHandler +import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler +import com.exactpro.th2.lwdataprovider.util.DummyDataMeasurement +import io.github.oshai.kotlinlogging.KotlinLogging +import io.grpc.BindableService +import io.grpc.ManagedChannel +import io.grpc.Server +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import org.mockito.kotlin.any +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.spy +import java.util.Queue +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + + +abstract class GrpcImplTestBase { + + protected val executor: Executor = Executors.newSingleThreadExecutor() + protected val storage = mock() + protected val manager = mock { + on { storage } doReturn storage + } + protected val searchEventsHandler: SearchEventsHandler = mock { } + protected val generalCradleHandler: GeneralCradleHandler = mock { } + protected val measurement: DataMeasurement = mock { + on { start(any()) } doReturn mock { } + } + protected val decoder = spy(TestDecoder()) + protected val searchHandler = createSearchMessagesHandler(decoder, false) + protected val configuration = Configuration(CustomConfigurationClass()) + + protected open class TestDecoder( + capacity: Int = 10 + ) : Decoder { + val protoQueue: Queue = ArrayBlockingQueue(capacity) + val transportQueue: Queue = ArrayBlockingQueue(capacity) + override fun sendBatchMessage( + batchBuilder: MessageGroupBatch.Builder, + requests: Collection, + session: String + ) { + protoQueue.addAll(requests) + } + + override fun sendBatchMessage( + batchBuilder: GroupBatch.Builder, + requests: Collection, + session: String + ) { + transportQueue.addAll(requests) + } + } + + private fun createSearchMessagesHandler( + decoder: Decoder, + useTransportMode: Boolean + ) = SearchMessagesHandler( + CradleMessageExtractor(manager, DummyDataMeasurement, false), + decoder, + executor, + Configuration( + CustomConfigurationClass( + bufferPerQuery = 4, + useTransportMode = useTransportMode, + batchSizeBytes = 300, + ) + ) + ) + + abstract fun createGrpcDataProvider(): DataProviderGrpc.DataProviderImplBase + + protected class GrpcTestHolder( + service: BindableService + ) : AutoCloseable { + private val serverExecutor = Executors.newFixedThreadPool(1) + private val clientExecutor = Executors.newFixedThreadPool(1) + + private val inProcessServer: Server = InProcessServerBuilder + .forName(SERVER_NAME) + .addService(service) + .executor(serverExecutor) + .build() + .also(Server::start) + + private val inProcessChannel: ManagedChannel = InProcessChannelBuilder + .forName(SERVER_NAME) + .executor(clientExecutor) + .build() + + val stub: DataProviderGrpc.DataProviderBlockingStub = DataProviderGrpc.newBlockingStub(inProcessChannel) + + operator fun component1(): DataProviderGrpc.DataProviderBlockingStub = stub + + override fun close() { + LOGGER.info { "Shutdown process channel" } + inProcessChannel.shutdown() + if (!inProcessChannel.awaitTermination(1, TimeUnit.MINUTES)) { + LOGGER.warn { "Process channel couldn't stop during 1 min" } + inProcessChannel.shutdownNow() + LOGGER.warn { "Process channel shutdown now, is terminated: ${inProcessChannel.isTerminated}" } + } + LOGGER.info { "Shutdown process server" } + inProcessServer.shutdown() + if (!inProcessServer.awaitTermination(1, TimeUnit.MINUTES)) { + LOGGER.warn { "Process server couldn't stop during 1 min" } + inProcessServer.shutdownNow() + LOGGER.warn { "Process server shutdown now, is terminated: ${inProcessChannel.isTerminated}" } + } + } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + + private const val SERVER_NAME = "server" + } +} \ No newline at end of file diff --git a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt index 1e855465..8da89d5b 100644 --- a/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt +++ b/app/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/VerificationUtil.kt @@ -19,7 +19,10 @@ package com.exactpro.th2.lwdataprovider.util import com.exactpro.cradle.Order import com.exactpro.cradle.messages.StoredMessage import com.exactpro.cradle.messages.StoredMessageId +import com.exactpro.th2.common.grpc.MessageID +import com.exactpro.th2.dataprovider.lw.grpc.MessageSearchResponse import com.exactpro.th2.lwdataprovider.RequestedMessageDetails +import com.exactpro.th2.lwdataprovider.grpc.toInstant import com.exactpro.th2.lwdataprovider.workers.RequestId import org.junit.jupiter.api.Assertions @@ -58,4 +61,24 @@ fun validateMessagesOrder(messages: List, expectedUniqu Assertions.assertEquals(expectedUniqueMessages, ids.size) { "Unexpected number of IDs: $ids" } +} + +fun validateMessagesOrderGrpc(messages: List, expectedUniqueMessages: Int) { + var prevMessage: MessageSearchResponse? = null + val ids = HashSet(expectedUniqueMessages) + for (message in messages) { + if (message.hasMessageStreamPointers()) { + continue + } + ids += message.message.messageId + prevMessage?.also { + Assertions.assertTrue(it.message.messageId.timestamp.toInstant() <= message.message.messageId.timestamp.toInstant()) { + "Unordered messages: $it and $message" + } + } + prevMessage = message + } + Assertions.assertEquals(expectedUniqueMessages, ids.size) { + "Unexpected number of IDs: $ids" + } } \ No newline at end of file diff --git a/grpc/README.md b/grpc/README.md index 40dc2024..2f260f9c 100644 --- a/grpc/README.md +++ b/grpc/README.md @@ -1,9 +1,17 @@ -# gRPC for lw-data-provider (2.3.3) +# gRPC for lw-data-provider (2.3.4) ## Release notes: +### 2.3.4 + +#### Updates: ++ th2 gradle plugin `0.1.3` + ### 2.3.3 +#### Updates: ++ th2 gradle plugin `0.1.3` + #### Updates: + th2 gradle plugin `0.0.8` diff --git a/grpc/gradle.properties b/grpc/gradle.properties index 8d10f752..19dab062 100644 --- a/grpc/gradle.properties +++ b/grpc/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=2.3.3 +release_version=2.3.4 description='th2 Lightweight data provider gRPC' \ No newline at end of file diff --git a/grpc/package_info.json b/grpc/package_info.json index bb3dfeea..2684b138 100644 --- a/grpc/package_info.json +++ b/grpc/package_info.json @@ -1,4 +1,4 @@ { "package_name": "th2_grpc_lw_data_provider", - "package_version": "2.3.3" + "package_version": "2.3.4" } diff --git a/utils/README.md b/utils/README.md index cb529294..e2d23f8d 100644 --- a/utils/README.md +++ b/utils/README.md @@ -1,9 +1,17 @@ -# utils for lw-data-provider (0.0.3) +# utils for lw-data-provider (0.0.4) # Release notes: +## 0.0.4 + +### Updates: ++ th2 gradle plugin `0.1.3` + ## 0.0.3 +### Updates: ++ th2 gradle plugin `0.1.3` + ### Updates: + th2 gradle plugin `0.0.8` + common: `5.12.0-dev` diff --git a/utils/gradle.properties b/utils/gradle.properties index 09930ec6..407c8215 100644 --- a/utils/gradle.properties +++ b/utils/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -release_version=0.0.3 +release_version=0.0.4 description='th2 Lightweight data provider utils' \ No newline at end of file