From ff8091b3ea01f8fd511fe06b8effb6daa2892f73 Mon Sep 17 00:00:00 2001 From: Oleg Smirnov Date: Wed, 31 Jan 2024 15:05:03 +0400 Subject: [PATCH] [TH2-5139] Add API for submitting download task and checking its status (#77) * Extract json stream processing into separate method * Add task handler * Add opeapi annotations to generate proper description * Add negative tests * Add fail-fast property to download task request * Add env variable to configure cotext path * Use context path only for redoc * Rename env variable * Update version and readme * Register handler in server * Add missing copyright * Extract task management into a separate object. Add cleanup thread for tasks * Update readme * Add info logging for requests and enable debug loggin on trace level * Correct text in tests * Add endpoint for getting all the task statuses * Add method name to request logger * Change loggin level for fail-fast codec error --- README.md | 12 +- gradle.properties | 2 +- .../exactpro/th2/lwdataprovider/Context.kt | 19 +- .../com/exactpro/th2/lwdataprovider/Main.kt | 6 +- .../configuration/Configuration.kt | 4 + .../entities/internal/ResponseFormat.kt | 6 + .../entities/requests/MessagesGroupRequest.kt | 3 +- .../http/FileDownloadHandler.kt | 68 +- .../th2/lwdataprovider/http/HttpServer.kt | 45 +- .../lwdataprovider/http/SseRequestContext.kt | 21 +- .../http/TaskDownloadHandler.kt | 439 +++++++++++ .../http/listener/ProgressListener.kt | 41 ++ .../http/serializers/BookIdDeserializer.kt | 34 + .../CustomMillisOrNanosInstantDeserializer.kt | 38 + .../http/util/JsonStreamingUtil.kt | 116 +++ .../th2/lwdataprovider/workers/TaskManager.kt | 227 ++++++ .../lwdataprovider/workers/TimeoutWatcher.kt | 23 +- .../http/AbstractHttpHandlerTest.kt | 63 +- .../http/TestTaskDownloadHandler.kt | 680 ++++++++++++++++++ .../th2/lwdataprovider/util/CradleTestUtil.kt | 16 + 20 files changed, 1765 insertions(+), 98 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/lwdataprovider/http/TaskDownloadHandler.kt create mode 100644 src/main/kotlin/com/exactpro/th2/lwdataprovider/http/listener/ProgressListener.kt create mode 100644 src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/BookIdDeserializer.kt create mode 100644 src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/CustomMillisOrNanosInstantDeserializer.kt create mode 100644 src/main/kotlin/com/exactpro/th2/lwdataprovider/http/util/JsonStreamingUtil.kt create mode 100644 src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TaskManager.kt create mode 100644 src/test/kotlin/com/exactpro/th2/lwdataprovider/http/TestTaskDownloadHandler.kt diff --git a/README.md b/README.md index 0ddfda1f..ed22bbce 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Lightweight data provider (2.5.2) +# Lightweight data provider (2.6.0) # Overview This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources. @@ -224,6 +224,16 @@ spec: # Release notes: +## 2.6.0 + ++ Add download task endpoints: + + POST `/download` - register task + + GET `/download/{taskID}` - execute task + + GET `/download/{taskID}/status` - get task status + + DELETE `/download/{taskID}` - remove task ++ Add parameter `downloadTaskTTL` to clean up the completed or not started task after the specified time in milliseconds. 1 hour by default. ++ Add `EXTERNAL_CONTEXT_PATH` env variable to inform provider about external context that is used in requests + ## 2.5.2 + Fix possible deadlock in case the response queue is filled up with keep-alive events diff --git a/gradle.properties b/gradle.properties index b246cc74..53fdc36c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,7 +16,7 @@ kotlin.code.style=official -release_version=2.5.2 +release_version=2.6.0 description='th2 Lightweight data provider component' vcs_url=https://github.com/th2-net/th2-lw-data-provider docker_image_name= diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/Context.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/Context.kt index 9a0b7487..5d63ed2b 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/Context.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/Context.kt @@ -34,6 +34,7 @@ import com.exactpro.th2.lwdataprovider.handlers.SearchEventsHandler import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler import com.exactpro.th2.lwdataprovider.metrics.DataMeasurementHistogram import com.exactpro.th2.lwdataprovider.workers.KeepAliveHandler +import com.exactpro.th2.lwdataprovider.workers.TaskManager import com.exactpro.th2.lwdataprovider.workers.TimerWatcher import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper @@ -67,7 +68,7 @@ class Context( configuration.codecUsePinAttributes ), - val timeoutHandler: TimerWatcher = TimerWatcher(mqDecoder, configuration), + val timeoutHandler: TimerWatcher = TimerWatcher(mqDecoder, configuration.decodingTimeout, "decoding"), val cradleEventExtractor: CradleEventExtractor = CradleEventExtractor( cradleManager, DataMeasurementHistogram.create(registry, "cradle event") @@ -108,8 +109,22 @@ class Context( execExecutor, ), val generalCradleHandler: GeneralCradleHandler = GeneralCradleHandler(generalCradleExtractor, execExecutor), - val applicationName: String + val applicationName: String, + val taskManager: TaskManager = TaskManager(), + val taskWatcher: TimerWatcher = TimerWatcher(taskManager, configuration.downloadTaskTTL, "tasks"), ) { + + fun start() { + timeoutHandler.start() + keepAliveHandler.start() + taskWatcher.start() + } + + fun stop() { + taskWatcher.stop() + keepAliveHandler.stop() + timeoutHandler.stop() + } companion object { @JvmStatic fun createObjectMapper(): ObjectMapper = jacksonObjectMapper() diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/Main.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/Main.kt index 348e6513..f0f4c589 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/Main.kt @@ -87,11 +87,9 @@ class Main { private fun startServer() { setupMetrics(context) - context.keepAliveHandler.start() - context.timeoutHandler.start() + context.start() - resources += AutoCloseable { context.keepAliveHandler.stop() } - resources += AutoCloseable { context.timeoutHandler.stop() } + resources += AutoCloseable { context.stop() } @Suppress("LiftReturnOrAssignment") when (context.configuration.mode) { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt index 50f697ba..a7075033 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/configuration/Configuration.kt @@ -20,6 +20,7 @@ import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat import com.fasterxml.jackson.databind.annotation.JsonDeserialize import mu.KotlinLogging import java.util.* +import java.util.concurrent.TimeUnit import kotlin.math.max import kotlin.math.min @@ -47,6 +48,7 @@ class CustomConfigurationClass( val gzipCompressionLevel: Int? = null, @JsonDeserialize(using = ByteSizeDeserializer::class) val batchSizeBytes: Int? = null, + val downloadTaskTTL: Long? = null, ) class Configuration(customConfiguration: CustomConfigurationClass) { @@ -76,6 +78,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) { val flushSseAfter: Int = VariableBuilder.getVariable(customConfiguration::flushSseAfter, 0) val gzipCompressionLevel: Int = VariableBuilder.getVariable(customConfiguration::gzipCompressionLevel, -1) val batchSizeBytes: Int = VariableBuilder.getVariable(customConfiguration::batchSizeBytes, 256 * 1024) + val downloadTaskTTL: Long = VariableBuilder.getVariable(customConfiguration::downloadTaskTTL, TimeUnit.HOURS.toMillis(1)) init { require(bufferPerQuery <= maxBufferDecodeQueue) { "buffer per queue ($bufferPerQuery) must be less or equal to the total buffer size ($maxBufferDecodeQueue)" @@ -96,6 +99,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) { require(gzipCompressionLevel <= 9 && gzipCompressionLevel >= -1) { "gzipCompressionLevel must be integer in the [-1, 9] range" } + require(downloadTaskTTL > 0) { "negative download task TTL: $downloadTaskTTL" } } override fun toString(): String { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/internal/ResponseFormat.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/internal/ResponseFormat.kt index 8851a2e3..e88f594d 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/internal/ResponseFormat.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/internal/ResponseFormat.kt @@ -33,5 +33,11 @@ enum class ResponseFormat { error("only one parsed format can be specified in $formats") } } + + @JvmStatic + fun isValidCombination(formats: Set): Boolean { + if (formats.size <= 1) return true + return !(PROTO_PARSED in formats && JSON_PARSED in formats) + } } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt index a3e98cf2..2ed537d1 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/entities/requests/MessagesGroupRequest.kt @@ -35,7 +35,8 @@ data class MessagesGroupRequest( val responseFormats: Set? = null, val includeStreams: Set = emptySet(), val limit: Int? = null, - val searchDirection: SearchDirection = SearchDirection.next + val searchDirection: SearchDirection = SearchDirection.next, + val failFast: Boolean = false, // for backward compatibility ) { init { if (!responseFormats.isNullOrEmpty()) { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/FileDownloadHandler.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/FileDownloadHandler.kt index d8dee96c..a1052533 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/FileDownloadHandler.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/FileDownloadHandler.kt @@ -29,7 +29,9 @@ import com.exactpro.th2.lwdataprovider.entities.requests.SearchDirection import com.exactpro.th2.lwdataprovider.entities.requests.util.convertToMessageStreams import com.exactpro.th2.lwdataprovider.entities.responses.ProviderMessage53 import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler +import com.exactpro.th2.lwdataprovider.http.util.JSON_STREAM_CONTENT_TYPE import com.exactpro.th2.lwdataprovider.http.util.listQueryParameters +import com.exactpro.th2.lwdataprovider.http.util.writeJsonStream import com.exactpro.th2.lwdataprovider.metrics.HttpWriteMetrics import com.exactpro.th2.lwdataprovider.metrics.ResponseQueue import com.exactpro.th2.lwdataprovider.workers.KeepAliveHandler @@ -188,76 +190,12 @@ class FileDownloadHandler( ) keepAliveHandler.addKeepAliveData(handler).use { searchMessagesHandler.loadMessageGroups(request, handler, dataMeasurement) - processData(ctx, queue, handler) + writeJsonStream(ctx, queue, handler, dataMeasurement, LOGGER) LOGGER.info { "Processing search sse messages group request finished" } } } - private fun processData( - ctx: Context, - queue: ArrayBlockingQueue>, - handler: HttpMessagesRequestHandler - ) { - val matchedPath = ctx.matchedPath() - var dataSent = 0 - - var writeHeader = true - var status: HttpStatus = HttpStatus.OK - - fun writeHeader() { - if (writeHeader) { - ctx.status(status) - .contentType(JSON_STREAM_CONTENT_TYPE) - .header(Header.TRANSFER_ENCODING, "chunked") - writeHeader = false - } - } - - val output = ctx.res().outputStream.buffered() - try { - do { - dataMeasurement.start("process_sse_event").use { - val nextEvent = queue.take() - ResponseQueue.currentSize(matchedPath, queue.size) - val sseEvent = dataMeasurement.start("await_convert_to_json").use { nextEvent.get() } - if (writeHeader && sseEvent is SseEvent.ErrorData.SimpleError) { - // something happened during request - status = HttpStatus.INTERNAL_SERVER_ERROR - } - writeHeader() - when (sseEvent.event) { - EventType.KEEP_ALIVE -> output.flush() - EventType.CLOSE -> { - LOGGER.info { "Received close event" } - return - } - - else -> { - LOGGER.debug { - "Write event to output: ${ - StringUtils.abbreviate(sseEvent.data.toString(DATA_CHARSET), 100) - }" - } - output.write(sseEvent.data) - output.write('\n'.code) - dataSent++ - } - } - } - } while (true) - } catch (ex: Exception) { - LOGGER.error(ex) { "cannot process next event" } - handler.cancel() - queue.clear() - } finally { - HttpWriteMetrics.messageSent(matchedPath, dataSent) - runCatching { output.flush() } - .onFailure { LOGGER.error(it) { "cannot flush the remaining data when processing is finished" } } - } - } - companion object { - private const val JSON_STREAM_CONTENT_TYPE = "application/stream+json" private const val GROUP_PARAM = "group" private const val START_TIMESTAMP_PARAM = "startTimestamp" private const val END_TIMESTAMP_PARAM = "endTimestamp" diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/HttpServer.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/HttpServer.kt index 1492b323..91e19b6a 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/HttpServer.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/HttpServer.kt @@ -23,9 +23,12 @@ import com.exactpro.th2.lwdataprovider.SseResponseBuilder import com.exactpro.th2.lwdataprovider.entities.exceptions.InvalidRequestException import com.exactpro.th2.lwdataprovider.entities.internal.ProviderEventId import com.exactpro.th2.lwdataprovider.entities.requests.SearchDirection +import com.exactpro.th2.lwdataprovider.http.serializers.BookIdDeserializer import com.exactpro.th2.lwdataprovider.producers.MessageProducer import com.exactpro.th2.lwdataprovider.producers.MessageProducer53 import com.exactpro.th2.lwdataprovider.producers.MessageProducer53Transport +import com.exactpro.th2.lwdataprovider.workers.TaskManager +import com.fasterxml.jackson.databind.module.SimpleModule import io.javalin.Javalin import io.javalin.config.JavalinConfig import io.javalin.http.BadRequestResponse @@ -131,23 +134,46 @@ class HttpServer(private val context: Context) { context.searchMessagesHandler, context.requestsDataMeasurement, ), + TaskDownloadHandler( + configuration, + context.convExecutor, + sseResponseBuilder, + context.keepAliveHandler, + context.searchMessagesHandler, + context.requestsDataMeasurement, + context.taskManager, + ), ) app = Javalin.create { it.showJavalinBanner = false - it.jsonMapper(JavalinJackson(jacksonMapper)) -// it.plugins.enableDevLogging() + it.jsonMapper(JavalinJackson( + jacksonMapper.registerModule( + SimpleModule("th2").apply { + addDeserializer(BookId::class.java, BookIdDeserializer()) + } + ) + )) + if (logger.isTraceEnabled) { + it.plugins.enableDevLogging() + } else { + it.requestLogger.http { ctx, time -> + logger.info { "Request ${ctx.method().name} '${ctx.path()}' executed with status ${ctx.status()}: ${time}.ms" } + } + } it.plugins.register(MicrometerPlugin.create { micrometer -> micrometer.registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT, CollectorRegistry.defaultRegistry, Clock.SYSTEM) micrometer.tags = listOf(Tag.of("application", context.applicationName)) }) - setupOpenApi(it) + val externalContextPath = System.getenv(EXTERNAL_CONTEXT_PATH_ENV)?.takeUnless(String::isBlank) + + setupOpenApi(it, externalContextPath) // setupSwagger(it) - setupReDoc(it) + setupReDoc(it, externalContextPath) }.apply { setupConverters(this) val javalinContext = JavalinContext(configuration.flushSseAfter) @@ -168,8 +194,11 @@ class HttpServer(private val context: Context) { logger.info { "http server stopped" } } - private fun setupReDoc(it: JavalinConfig) { + private fun setupReDoc(it: JavalinConfig, externalContextPath: String?) { val reDocConfiguration = ReDocConfiguration() + externalContextPath?.also { path -> + reDocConfiguration.basePath = path + } it.plugins.register(ReDocPlugin(reDocConfiguration)) } @@ -178,7 +207,7 @@ class HttpServer(private val context: Context) { // it.plugins.register(SwaggerPlugin(swaggerConfiguration)) // } - private fun setupOpenApi(it: JavalinConfig) { + private fun setupOpenApi(it: JavalinConfig, externalContextPath: String?) { val openApiConfiguration = OpenApiPluginConfiguration() .withDefinitionConfiguration { _, definition -> @@ -197,7 +226,7 @@ class HttpServer(private val context: Context) { "Light Weight Data Provider provides you with fast access to data in Cradle" openApiInfo.contact = openApiContact openApiInfo.license = openApiLicense - openApiInfo.version = "2.2.0" + openApiInfo.version = "2.6.0" }.withServer { openApiServer -> openApiServer.url = "http://localhost:{port}" openApiServer.addVariable("port", "8080", arrayOf("8080"), "Port of the server") @@ -209,6 +238,8 @@ class HttpServer(private val context: Context) { companion object { private val logger = KotlinLogging.logger {} + private const val EXTERNAL_CONTEXT_PATH_ENV = "EXTERNAL_CONTEXT_PATH" + const val TIME_EXAMPLE = "Every value that is greater than 1_000_000_000 ^ 2 will be interpreted as nanos. Otherwise, as millis.\n" + "Millis: 1676023329533, Nanos: 1676023329533590976" diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt index 995d023e..1bcfd193 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/SseRequestContext.kt @@ -32,6 +32,7 @@ import com.exactpro.th2.lwdataprovider.handlers.MessageResponseHandler import com.exactpro.th2.lwdataprovider.metrics.HttpWriteMetrics import com.exactpro.th2.lwdataprovider.producers.JsonFormatter import com.exactpro.th2.lwdataprovider.producers.ParsedFormats +import mu.KotlinLogging import org.apache.commons.lang3.exception.ExceptionUtils import java.time.Duration import java.util.EnumSet @@ -49,6 +50,7 @@ class HttpMessagesRequestHandler( dataMeasurement: DataMeasurement, maxMessagesPerRequest: Int = 0, responseFormats: Set = EnumSet.of(ResponseFormat.BASE_64, ResponseFormat.PROTO_PARSED), + private val failFast: Boolean = false, ) : MessageResponseHandler(dataMeasurement, maxMessagesPerRequest), KeepAliveListener { private val includeRaw: Boolean = responseFormats.isEmpty() || ResponseFormat.BASE_64 in responseFormats private val jsonFormatter: JsonFormatter? = (responseFormats - ResponseFormat.BASE_64).run { @@ -71,7 +73,15 @@ class HttpMessagesRequestHandler( val future: CompletableFuture = data.completed.thenApplyAsync({ requestedMessage: RequestedMessage -> dataMeasurement.start("convert_to_json").use { if (jsonFormatter != null && requestedMessage.protoMessage == null && requestedMessage.transportMessage == null) { - builder.codecTimeoutError(requestedMessage.storedMessage.id, counter) + builder.codecTimeoutError(requestedMessage.storedMessage.id, counter).also { + if (failFast) { + LOGGER.warn { "Codec timeout. Canceling processing due to fail-fast strategy" } + // note that this might not stop the processing right away + // this is called on a different thread + // so some messages might be loaded before the processing is canceled + fail() + } + } } else { builder.build( requestedMessage, jsonFormatter, includeRaw, @@ -105,6 +115,15 @@ class HttpMessagesRequestHandler( val counter = indexer.nextIndex() return buffer.offer({ builder.build(scannedObjectInfo, counter) }, duration.toMillis(), TimeUnit.MILLISECONDS) } + + private fun fail() { + cancel() + buffer.put { SseEvent.Closed } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + } } class DataIndexer { diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/TaskDownloadHandler.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/TaskDownloadHandler.kt new file mode 100644 index 00000000..4a248cbd --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/TaskDownloadHandler.kt @@ -0,0 +1,439 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.http + +import com.exactpro.cradle.BookId +import com.exactpro.cradle.Direction +import com.exactpro.th2.common.event.EventUtils +import com.exactpro.th2.lwdataprovider.SseEvent +import com.exactpro.th2.lwdataprovider.SseResponseBuilder +import com.exactpro.th2.lwdataprovider.configuration.Configuration +import com.exactpro.th2.lwdataprovider.db.DataMeasurement +import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat +import com.exactpro.th2.lwdataprovider.entities.requests.MessagesGroupRequest +import com.exactpro.th2.lwdataprovider.entities.requests.ProviderMessageStream +import com.exactpro.th2.lwdataprovider.entities.requests.SearchDirection +import com.exactpro.th2.lwdataprovider.entities.responses.ProviderMessage53 +import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler +import com.exactpro.th2.lwdataprovider.http.serializers.CustomMillisOrNanosInstantDeserializer +import com.exactpro.th2.lwdataprovider.http.util.JSON_STREAM_CONTENT_TYPE +import com.exactpro.th2.lwdataprovider.http.util.writeJsonStream +import com.exactpro.th2.lwdataprovider.workers.KeepAliveHandler +import com.exactpro.th2.lwdataprovider.workers.TaskID +import com.exactpro.th2.lwdataprovider.workers.TaskInformation +import com.exactpro.th2.lwdataprovider.workers.TaskManager +import com.exactpro.th2.lwdataprovider.workers.TaskStatus +import com.fasterxml.jackson.annotation.JsonFormat +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.annotation.JsonSerialize +import com.fasterxml.jackson.datatype.jsr310.ser.InstantSerializer +import io.javalin.Javalin +import io.javalin.http.Context +import io.javalin.http.HttpStatus +import io.javalin.http.bodyValidator +import io.javalin.openapi.HttpMethod +import io.javalin.openapi.Nullability +import io.javalin.openapi.OpenApi +import io.javalin.openapi.OpenApiContent +import io.javalin.openapi.OpenApiDescription +import io.javalin.openapi.OpenApiExample +import io.javalin.openapi.OpenApiParam +import io.javalin.openapi.OpenApiPropertyType +import io.javalin.openapi.OpenApiRequestBody +import io.javalin.openapi.OpenApiResponse +import mu.KotlinLogging +import java.time.Instant +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.Executor +import java.util.function.Supplier + +class TaskDownloadHandler( + private val configuration: Configuration, + private val convExecutor: Executor, + private val sseResponseBuilder: SseResponseBuilder, + private val keepAliveHandler: KeepAliveHandler, + private val searchMessagesHandler: SearchMessagesHandler, + private val dataMeasurement: DataMeasurement, + private val taskManager: TaskManager, +) : JavalinHandler { + + override fun setup(app: Javalin, context: JavalinContext) { + app.post(DOWNLOAD_ROUTE, this::registerTask) + app.get(TASK_STATUSES_ROUTE, this::listOfStatuses) + app.get(TASK_STATUS_ROUTE, this::getTaskStatus) + app.get(TASK_ROUTE, this::executeTask) + app.delete(TASK_ROUTE, this::deleteTask) + } + + @OpenApi( + path = TASK_STATUSES_ROUTE, + methods = [HttpMethod.GET], + responses = [ + OpenApiResponse( + status = "200", + content = [OpenApiContent(from = Array::class)] + ) + ] + ) + private fun listOfStatuses(context: Context) { + LOGGER.info { "Getting possible task statuses" } + context.status(HttpStatus.OK) + .json(TaskStatus.values().map { it.toInfo() }) + } + + @OpenApi( + path = TASK_ROUTE, + methods = [HttpMethod.DELETE], + pathParams = [ + OpenApiParam( + name = TASK_ID, + description = "task ID", + required = true, + ), + ], + responses = [ + OpenApiResponse( + status = "204", + description = "task successfully removed", + ), + OpenApiResponse( + status = "404", + content = [OpenApiContent(from = ErrorMessage::class)], + description = "task with specified ID is not found", + ) + ] + ) + private fun deleteTask(context: Context) { + val taskID = TaskID.create(context.pathParam(TASK_ID)) + LOGGER.info { "Removing task $taskID" } + val removed = taskManager.remove(taskID) + if (removed == null) { + LOGGER.error { "Task $taskID not found" } + context.status(HttpStatus.NOT_FOUND) + .json(ErrorMessage("task with id '${taskID.id}' is not found")) + } else { + LOGGER.info { "Task $taskID removed" } + context.status(HttpStatus.NO_CONTENT) + } + } + + @OpenApi( + path = TASK_STATUS_ROUTE, + methods = [HttpMethod.GET], + pathParams = [ + OpenApiParam( + name = TASK_ID, + description = "task ID", + required = true, + ), + ], + responses = [ + OpenApiResponse( + status = "200", + content = [OpenApiContent(from = TaskStatusResponse::class)], + description = "task current status", + ), + OpenApiResponse( + status = "404", + content = [OpenApiContent(from = ErrorMessage::class)], + description = "task with specified ID is not found", + ), + ] + ) + private fun getTaskStatus(context: Context) { + val taskID = TaskID.create(context.pathParam(TASK_ID)) + LOGGER.info { "Checking status for task $taskID" } + val taskInfo = taskManager[taskID] ?: run { + LOGGER.error { "Task $taskID not found" } + context.status(HttpStatus.NOT_FOUND) + .json(ErrorMessage("task with id '${taskID.id}' is not found")) + return + } + context.status(HttpStatus.OK) + .json(taskInfo.toTaskStatusResponse()) + } + + @OpenApi( + path = TASK_ROUTE, + methods = [HttpMethod.GET], + pathParams = [ + OpenApiParam( + name = TASK_ID, + description = "task ID", + required = true, + ), + ], + responses = [ + OpenApiResponse( + status = "200", + content = [ + OpenApiContent( + from = ProviderMessage53::class, + mimeType = JSON_STREAM_CONTENT_TYPE, + ), + ], + ), + OpenApiResponse( + status = "404", + content = [OpenApiContent(from = ErrorMessage::class)], + description = "task with specified ID is not found", + ), + OpenApiResponse( + status = "409", + content = [OpenApiContent(from = ErrorMessage::class)], + description = "task already in progress", + ) + ] + ) + private fun executeTask(context: Context) { + val taskID = TaskID.create(context.pathParam(TASK_ID)) + LOGGER.info { "Executing task $taskID" } + val taskState: TaskState = taskManager.execute(taskID) { + if (it == null) { + return@execute TaskState.NotFound + } + val queue = ArrayBlockingQueue>(configuration.responseQueueSize) + val handler = HttpMessagesRequestHandler( + queue, sseResponseBuilder, convExecutor, dataMeasurement, + maxMessagesPerRequest = configuration.bufferPerQuery, + responseFormats = it.request.responseFormats + ?: configuration.responseFormats, + failFast = it.request.failFast, + ) + if (!it.attachHandler(handler)) { + return@execute TaskState.AlreadyInProgress + } + TaskState.Ready(it, handler, queue) + } + when (taskState) { + TaskState.AlreadyInProgress -> { + LOGGER.error { "Task $taskID already in progress" } + context.status(HttpStatus.CONFLICT) + .json(ErrorMessage("task with id '${taskID.id}' already in progress")) + } + + TaskState.NotFound -> { + LOGGER.error { "Task $taskID not found" } + context.status(HttpStatus.NOT_FOUND) + .json(ErrorMessage("task with id '${taskID.id}' is not found")) + } + + is TaskState.Ready -> { + val (taskInfo, handler, queue) = taskState + keepAliveHandler.addKeepAliveData(handler).use { + searchMessagesHandler.loadMessageGroups(taskInfo.request, handler, dataMeasurement) + writeJsonStream(context, queue, handler, dataMeasurement, LOGGER, taskInfo) + LOGGER.info { "Task $taskID completed with status ${taskInfo.status}" } + } + } + } + } + + @OpenApi( + path = DOWNLOAD_ROUTE, + methods = [HttpMethod.POST], + requestBody = OpenApiRequestBody( + required = true, + content = [ + OpenApiContent(from = CreateTaskRequest::class) + ] + ), + responses = [ + OpenApiResponse( + status = "201", + content = [ + OpenApiContent(from = TaskIDResponse::class) + ], + description = "task successfully created", + ), + OpenApiResponse( + status = "404", + description = "invalid parameters", + ) + ] + ) + private fun registerTask(context: Context) { + val request = context.bodyValidator() + .check( + CreateTaskRequest::bookID.name, + { it.bookID.name.isNotEmpty() }, + "empty value", + ) + .check( + CreateTaskRequest::groups.name, + { it.groups.isNotEmpty() }, + "empty value", + ) + .check( + CreateTaskRequest::responseFormats.name, + { ResponseFormat.isValidCombination(it.responseFormats) }, + "only one ${ResponseFormat.PROTO_PARSED} or ${ResponseFormat.JSON_PARSED} must be used", + ) + .check( + CreateTaskRequest::limit.name, + { it.limit == null || it.limit >= 0 }, + "negative limit", + ) + .get() + + val taskID = TaskID(EventUtils.generateUUID()) + + LOGGER.info { "Registering task $taskID" } + taskManager[taskID] = TaskInformation(taskID, request.toGroupRequest()) + LOGGER.info { "Task $taskID registered" } + + context.status(HttpStatus.CREATED) + .json(TaskIDResponse(taskID)) + } + + private sealed class TaskState { + object AlreadyInProgress : TaskState() + object NotFound : TaskState() + data class Ready( + val info: TaskInformation, + val handler: HttpMessagesRequestHandler, + val queue: ArrayBlockingQueue>, + ) : TaskState() + } + + private class TaskIDResponse( + @get:OpenApiPropertyType(definedBy = String::class) + val taskID: TaskID, + ) + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private class TaskStatusResponse( + @get:OpenApiPropertyType(definedBy = String::class) + val taskID: TaskID, + @get:OpenApiPropertyType(definedBy = String::class) + @field:JsonSerialize(using = InstantSerializer::class) + @field:JsonFormat(shape = JsonFormat.Shape.STRING) + val createdAt: Instant, + @get:OpenApiPropertyType(definedBy = String::class, nullability = Nullability.NULLABLE) + @field:JsonSerialize(using = InstantSerializer::class) + @field:JsonFormat(shape = JsonFormat.Shape.STRING) + val completedAt: Instant? = null, + val status: TaskStatus, + @get:OpenApiPropertyType(definedBy = Array::class, nullability = Nullability.NULLABLE) + val errors: List = emptyList(), + ) + + @JsonInclude(JsonInclude.Include.NON_NULL) + private class StatusInfoResponse( + val status: TaskStatus, + val terminal: Boolean, + val description: String? = null, + ) + + private fun TaskInformation.toTaskStatusResponse(): TaskStatusResponse = + TaskStatusResponse( + taskID = taskID, + createdAt = creationTime, + completedAt = completionTime, + status = status, + errors = errors.map { holder -> + ErrorMessage( + error = "${holder.message}${holder.cause?.let { " cause $it" } ?: ""}" + ) + } + ) + + private fun CreateTaskRequest.toGroupRequest(): MessagesGroupRequest { + return MessagesGroupRequest( + groups = groups, + startTimestamp = startTimestamp, + endTimestamp = endTimestamp, + keepOpen = false, + bookId = bookID, + responseFormats = responseFormats.ifEmpty { configuration.responseFormats }, + includeStreams = streams.asSequence().flatMap { it.toProviderMessageStreams() } + .toSet(), + searchDirection = searchDirection, + limit = limit, + failFast = failFast, + ) + } + + private fun MessageStream.toProviderMessageStreams(): Sequence { + return directions.asSequence().map { ProviderMessageStream(sessionAlias, it) } + } + + private class CreateTaskRequest( + val resource: Resource, + @get:OpenApiPropertyType(definedBy = String::class) + val bookID: BookId, + @get:OpenApiPropertyType(definedBy = Long::class) + @get:OpenApiExample(HttpServer.TIME_EXAMPLE) + @field:JsonDeserialize(using = CustomMillisOrNanosInstantDeserializer::class) + val startTimestamp: Instant, + @get:OpenApiPropertyType(definedBy = Long::class) + @get:OpenApiExample(HttpServer.TIME_EXAMPLE) + @field:JsonDeserialize(using = CustomMillisOrNanosInstantDeserializer::class) + val endTimestamp: Instant, + val groups: Set, + @get:OpenApiPropertyType(definedBy = Array::class, nullability = Nullability.NULLABLE) + val responseFormats: Set = emptySet(), + val limit: Int? = null, + @get:OpenApiPropertyType(definedBy = Array::class, nullability = Nullability.NULLABLE) + val streams: List = emptyList(), + @get:OpenApiPropertyType(definedBy = SearchDirection::class, nullability = Nullability.NULLABLE) + val searchDirection: SearchDirection = SearchDirection.next, + @get:OpenApiPropertyType(definedBy = Boolean::class, nullability = Nullability.NULLABLE) + @get:OpenApiDescription("the request will stop right after the first error reported. Enabled by default") + val failFast: Boolean = true, + ) + + private class MessageStream( + val sessionAlias: String, + @get:OpenApiPropertyType(definedBy = Array::class, nullability = Nullability.NULLABLE) + val directions: Set = setOf(Direction.SECOND, Direction.FIRST) + ) + + private enum class Resource { + MESSAGES, + } + + private class ErrorMessage( + val error: String, + ) + + private fun TaskStatus.toInfo(): StatusInfoResponse { + return StatusInfoResponse( + status = this, + terminal = this.ordinal > TaskStatus.EXECUTING_WITH_ERRORS.ordinal, + description = when (this) { + TaskStatus.CREATED -> "task is created and ready for execution" + TaskStatus.EXECUTING -> "task is executing" + TaskStatus.EXECUTING_WITH_ERRORS -> "task is executing but have some errors during execution" + TaskStatus.COMPLETED -> "task completed" + TaskStatus.COMPLETED_WITH_ERRORS -> "task completed with errors" + TaskStatus.CANCELED -> "task was canceled" + TaskStatus.CANCELED_WITH_ERRORS -> "task was canceled and have some errors" + }, + ) + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + private const val TASK_ID = "taskID" + private const val DOWNLOAD_ROUTE = "/download" + private const val TASK_STATUSES_ROUTE = "/download/status" + private const val TASK_ROUTE = "$DOWNLOAD_ROUTE/{$TASK_ID}" + private const val TASK_STATUS_ROUTE = "$DOWNLOAD_ROUTE/{$TASK_ID}/status" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/listener/ProgressListener.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/listener/ProgressListener.kt new file mode 100644 index 00000000..47211bb6 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/listener/ProgressListener.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.http.listener + +import com.exactpro.th2.lwdataprovider.SseEvent + +interface ProgressListener { + fun onStart() + fun onError(ex: Exception) + fun onError(errorEvent: SseEvent.ErrorData) + fun onCompleted() + + fun onCanceled() +} + +@JvmField +val DEFAULT_PROCESS_LISTENER: ProgressListener = object : ProgressListener { + override fun onStart() = Unit + + override fun onError(ex: Exception) = Unit + + override fun onError(errorEvent: SseEvent.ErrorData) = Unit + + override fun onCompleted() = Unit + + override fun onCanceled() = Unit +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/BookIdDeserializer.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/BookIdDeserializer.kt new file mode 100644 index 00000000..c99f4e53 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/BookIdDeserializer.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.http.serializers + +import com.exactpro.cradle.BookId +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.deser.std.StdDeserializer + +internal class BookIdDeserializer : StdDeserializer(BookId::class.java) { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): BookId { + require(p.currentToken == JsonToken.VALUE_STRING) { "value must be a string" } + return BookId(p.valueAsString) + } + + companion object { + private const val serialVersionUID: Long = -5889633617969813739L + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/CustomMillisOrNanosInstantDeserializer.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/CustomMillisOrNanosInstantDeserializer.kt new file mode 100644 index 00000000..024f5763 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/serializers/CustomMillisOrNanosInstantDeserializer.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.http.serializers + +import com.exactpro.th2.lwdataprovider.http.HttpServer +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.core.JsonToken +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.deser.std.StdDeserializer +import java.time.Instant + +internal class CustomMillisOrNanosInstantDeserializer : StdDeserializer(Instant::class.java) { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): Instant { + require(p.currentToken == JsonToken.VALUE_NUMBER_INT) { + "cannot deserialize ${Instant::class.simpleName} from ${p.currentToken}" + } + return HttpServer.convertToInstant(p.longValue) + } + + companion object { + private const val serialVersionUID: Long = -8423525654021997687L + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/util/JsonStreamingUtil.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/util/JsonStreamingUtil.kt new file mode 100644 index 00000000..713afb87 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/http/util/JsonStreamingUtil.kt @@ -0,0 +1,116 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.http.util + +import com.exactpro.th2.lwdataprovider.EventType +import com.exactpro.th2.lwdataprovider.SseEvent +import com.exactpro.th2.lwdataprovider.db.DataMeasurement +import com.exactpro.th2.lwdataprovider.http.HttpMessagesRequestHandler +import com.exactpro.th2.lwdataprovider.http.listener.DEFAULT_PROCESS_LISTENER +import com.exactpro.th2.lwdataprovider.http.listener.ProgressListener +import com.exactpro.th2.lwdataprovider.metrics.HttpWriteMetrics +import com.exactpro.th2.lwdataprovider.metrics.ResponseQueue +import io.javalin.http.Context +import io.javalin.http.Header +import io.javalin.http.HttpStatus +import mu.KLogger +import org.apache.commons.lang3.StringUtils +import java.util.concurrent.ArrayBlockingQueue +import java.util.function.Supplier + +const val JSON_STREAM_CONTENT_TYPE = "application/stream+json" + +fun writeJsonStream( + ctx: Context, + queue: ArrayBlockingQueue>, + handler: HttpMessagesRequestHandler, + dataMeasurement: DataMeasurement, + logger: KLogger, + progressListener: ProgressListener = DEFAULT_PROCESS_LISTENER, +) { + progressListener.onStart() + + val matchedPath = ctx.matchedPath() + var dataSent = 0 + + var writeHeader = true + var status: HttpStatus = HttpStatus.OK + + fun writeHeader() { + if (writeHeader) { + ctx.status(status) + .contentType(JSON_STREAM_CONTENT_TYPE) + .header(Header.TRANSFER_ENCODING, "chunked") + writeHeader = false + } + } + + val output = ctx.res().outputStream.buffered() + try { + do { + dataMeasurement.start("process_sse_event").use { + val nextEvent = queue.take() + ResponseQueue.currentSize(matchedPath, queue.size) + val sseEvent = dataMeasurement.start("await_convert_to_json").use { nextEvent.get() } + if (writeHeader && sseEvent is SseEvent.ErrorData.SimpleError) { + // something happened during request + status = HttpStatus.INTERNAL_SERVER_ERROR + } + writeHeader() + if (sseEvent is SseEvent.ErrorData) { + progressListener.onError(sseEvent) + } + when (sseEvent.event) { + EventType.KEEP_ALIVE -> output.flush() + EventType.CLOSE -> { + logger.info { "Received close event" } + return + } + + else -> { + logger.debug { + "Write event to output: ${ + StringUtils.abbreviate(sseEvent.data.toString(SseEvent.DATA_CHARSET), 100) + }" + } + output.write(sseEvent.data) + output.write('\n'.code) + dataSent++ + } + } + if (queue.isEmpty() && !handler.isAlive) { + logger.info { "Request canceled" } + return + } + } + } while (true) + } catch (ex: Exception) { + logger.error(ex) { "cannot process next event" } + progressListener.onError(ex) + handler.cancel() + queue.clear() + } finally { + if (handler.isAlive) { + progressListener.onCompleted() + } else { + progressListener.onCanceled() + } + HttpWriteMetrics.messageSent(matchedPath, dataSent) + runCatching { output.flush() } + .onFailure { logger.error(it) { "cannot flush the remaining data when processing is finished" } } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TaskManager.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TaskManager.kt new file mode 100644 index 00000000..1545b49d --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TaskManager.kt @@ -0,0 +1,227 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.workers + +import com.exactpro.th2.lwdataprovider.CancelableResponseHandler +import com.exactpro.th2.lwdataprovider.SseEvent +import com.exactpro.th2.lwdataprovider.entities.requests.MessagesGroupRequest +import com.exactpro.th2.lwdataprovider.http.listener.ProgressListener +import com.fasterxml.jackson.annotation.JsonCreator +import com.fasterxml.jackson.annotation.JsonValue +import mu.KotlinLogging +import org.apache.commons.lang3.exception.ExceptionUtils +import java.time.Duration +import java.time.Instant +import java.util.concurrent.locks.ReentrantReadWriteLock +import javax.annotation.concurrent.NotThreadSafe +import kotlin.concurrent.read +import kotlin.concurrent.write +import kotlin.math.min + +class TaskManager : AutoCloseable, TimeoutChecker { + private val tasksLock = ReentrantReadWriteLock() + private val tasks: MutableMap = HashMap() + + operator fun get(taskID: TaskID): TaskInformation? = + tasksLock.read { tasks[taskID] } + + operator fun set(taskID: TaskID, information: TaskInformation): Unit = + tasksLock.write { tasks[taskID] = information } + + fun remove(taskID: TaskID): TaskInformation? = + tasksLock.write { internalRemove(taskID) } + + fun execute(taskID: TaskID, action: (TaskInformation?) -> T): T = + tasksLock.write { action(tasks[taskID]) } + + override fun removeOlderThen(timeout: Long): Long { + val currentTime = Instant.now() + val (tasksToCleanup: Map, minCreationTime: Instant) = tasksLock.read { + val mitTime: Instant = tasks.values.minOfOrNull { info -> + info.completionTime?.let { minOf(it, info.creationTime) } + ?: info.creationTime + } ?: currentTime + + tasks.filter { (_, info) -> + createdButNotStarted(info, currentTime, timeout) + || completed(info, currentTime, timeout) + } to mitTime + } + if (tasksToCleanup.isNotEmpty()) { + tasksLock.write { + for ((id, info) in tasksToCleanup) { + LOGGER.info { "Canceling task $id in status ${info.status} due to timeout in $timeout mls" } + internalRemove(id) + } + } + } + return minCreationTime.toEpochMilli() + } + + private fun internalRemove(taskID: TaskID) = tasks.remove(taskID)?.apply(TaskInformation::onCanceled) + + private fun completed(info: TaskInformation, currentTime: Instant, timeout: Long): Boolean { + return info.completionTime.let { completedAt -> + completedAt != null && checkDiff(currentTime, timeout, completedAt) + } + } + + private fun createdButNotStarted(info: TaskInformation, currentTime: Instant, timeout: Long) = + info.status == TaskStatus.CREATED && checkDiff(currentTime, timeout, info.creationTime) + + private fun checkDiff(currentTime: Instant, timeout: Long, taskTime: Instant) = + Duration.between(taskTime, currentTime).abs().toMillis() > timeout + + override fun close() { + tasksLock.write { + tasks.forEach { (id, taskInfo) -> + LOGGER.info { "Closing task $id in status ${taskInfo.status}" } + taskInfo.onCanceled() + } + tasks.clear() + } + } + + companion object { + private val LOGGER = KotlinLogging.logger { } + } +} + +data class TaskID( + @field:JsonValue + val id: String, +) { + companion object { + @JsonCreator + @JvmStatic + fun create(id: String): TaskID = TaskID(id) + } +} + +enum class TaskStatus { + CREATED, + EXECUTING, + EXECUTING_WITH_ERRORS, + COMPLETED, + COMPLETED_WITH_ERRORS, + CANCELED, + CANCELED_WITH_ERRORS, +} + +@NotThreadSafe +class TaskInformation( + val taskID: TaskID, + val request: MessagesGroupRequest, +) : ProgressListener { + val creationTime: Instant = Instant.now() + @Volatile + var completionTime: Instant? = null + private set + @Volatile + private var _status: TaskStatus = TaskStatus.CREATED + private val _errors: MutableList = ArrayList() + @Volatile + private var handler: CancelableResponseHandler? = null + + val status: TaskStatus + get() = _status + val errors: List + get() = _errors.toList() + + override fun onStart() { + LOGGER.trace { "Task $taskID started" } + _status = TaskStatus.EXECUTING + } + + override fun onError(ex: Exception) { + LOGGER.trace(ex) { "Task $taskID received an error" } + _status = TaskStatus.EXECUTING_WITH_ERRORS + _errors += ErrorHolder( + ExceptionUtils.getMessage(ex), + ExceptionUtils.getRootCauseMessage(ex), + ) + } + + override fun onError(errorEvent: SseEvent.ErrorData) { + LOGGER.trace { "Task $taskID received error event" } + _status = TaskStatus.EXECUTING_WITH_ERRORS + _errors += ErrorHolder( + errorEvent.data.toString(Charsets.UTF_8) + ) + } + + override fun onCompleted() { + LOGGER.trace { "Task $taskID completed" } + _status = when (_status) { + TaskStatus.CANCELED -> + TaskStatus.CANCELED + + TaskStatus.EXECUTING_WITH_ERRORS -> + TaskStatus.COMPLETED_WITH_ERRORS + + TaskStatus.CREATED, + TaskStatus.EXECUTING, + TaskStatus.COMPLETED, + TaskStatus.COMPLETED_WITH_ERRORS, + TaskStatus.CANCELED_WITH_ERRORS -> + TaskStatus.COMPLETED + } + completionTime = Instant.now() + } + + override fun onCanceled() { + LOGGER.trace { "Task $taskID canceled" } + _status = when (_status) { + TaskStatus.EXECUTING_WITH_ERRORS -> + TaskStatus.CANCELED_WITH_ERRORS + + TaskStatus.COMPLETED_WITH_ERRORS, + TaskStatus.COMPLETED -> + _status + + TaskStatus.CREATED, + TaskStatus.EXECUTING, + TaskStatus.CANCELED, + TaskStatus.CANCELED_WITH_ERRORS -> + TaskStatus.CANCELED + } + handler?.also { + if (it.isAlive) { + it.cancel() + } + } + completionTime = Instant.now() + } + + fun attachHandler(handler: CancelableResponseHandler): Boolean { + LOGGER.trace { "Attaching handler to task $taskID" } + if (this.handler != null) { + return false + } + this.handler = handler + return true + } + + companion object { + private val LOGGER = KotlinLogging.logger(TaskInformation::class.qualifiedName!!) + } +} + +class ErrorHolder( + val message: String, + val cause: String? = null, +) \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TimeoutWatcher.kt b/src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TimeoutWatcher.kt index 2a275d45..6c4aed73 100644 --- a/src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TimeoutWatcher.kt +++ b/src/main/kotlin/com/exactpro/th2/lwdataprovider/workers/TimeoutWatcher.kt @@ -16,20 +16,23 @@ package com.exactpro.th2.lwdataprovider.workers -import com.exactpro.th2.lwdataprovider.configuration.Configuration import mu.KotlinLogging import java.util.concurrent.atomic.AtomicBoolean import kotlin.concurrent.thread class TimerWatcher( private val decodeBuffer: TimeoutChecker, - configuration: Configuration + private val timeout: Long, + private val name: String, ) { - - private val timeout: Long = configuration.decodingTimeout private val running = AtomicBoolean() private var thread: Thread? = null + init { + require(timeout > 0) { "negative timeout: $timeout" } + require(name.isNotBlank()) { "blank name" } + } + companion object { private val logger = KotlinLogging.logger { } } @@ -38,26 +41,26 @@ class TimerWatcher( fun start() { running.set(true) thread?.interrupt() - thread = thread(name="timeout-watcher", start = true, priority = 2) { run() } + thread = thread(name="timeout-watcher-$name", start = true, priority = 2) { run() } } fun stop() { if (running.compareAndSet(true, false)) { thread?.interrupt() } else { - logger.warn { "Timeout watcher already stopped" } + logger.warn { "Timeout watcher $name already stopped" } } } private fun run() { - logger.info { "Timeout watcher started" } + logger.info { "Timeout watcher $name started" } try { while (running.get()) { val minTime: Long = try { decodeBuffer.removeOlderThen(timeout) } catch (ex: Exception) { - logger.error(ex) { "cannot remove old elements from buffer" } + logger.error(ex) { "cannot remove old elements in time watcher $name" } System.currentTimeMillis() } @@ -67,14 +70,14 @@ class TimerWatcher( Thread.sleep(sleepTime) } catch (e: InterruptedException) { if (running.compareAndSet(true, false)) { - logger.warn(e) { "Someone stopped timeout watcher" } + logger.warn(e) { "Someone stopped timeout watcher $name" } break } } } } } finally { - logger.info { "Timeout watcher finished" } + logger.info { "Timeout watcher $name finished" } } } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/lwdataprovider/http/AbstractHttpHandlerTest.kt b/src/test/kotlin/com/exactpro/th2/lwdataprovider/http/AbstractHttpHandlerTest.kt index ef626133..bf1efd66 100644 --- a/src/test/kotlin/com/exactpro/th2/lwdataprovider/http/AbstractHttpHandlerTest.kt +++ b/src/test/kotlin/com/exactpro/th2/lwdataprovider/http/AbstractHttpHandlerTest.kt @@ -51,6 +51,8 @@ import org.junit.jupiter.api.* import org.mockito.invocation.InvocationOnMock import org.mockito.kotlin.* import strikt.api.Assertion +import strikt.api.ExpectationBuilder +import strikt.api.expectCatching import strikt.assertions.isNotNull import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors @@ -68,9 +70,14 @@ abstract class AbstractHttpHandlerTest { .setNameFormat("test-executor-%d") .build() ) + private val converterExecutor = Executors.newSingleThreadExecutor( + ThreadFactoryBuilder() + .setNameFormat("test-conv-executor-%d") + .build() + ) protected open val configuration = Configuration( CustomConfigurationClass( - decodingTimeout = 200, + decodingTimeout = 400, ) ) @@ -120,7 +127,7 @@ abstract class AbstractHttpHandlerTest { transportMessageRouter = transportMessageRouter, eventRouter = eventRouter, execExecutor = executor, - convExecutor = executor, + convExecutor = converterExecutor, applicationName = "test-lw-data-provider", ) } @@ -129,14 +136,14 @@ abstract class AbstractHttpHandlerTest { @BeforeAll fun setup() { - context.keepAliveHandler.start() - context.timeoutHandler.start() + context.start() } @AfterAll fun shutdown() { - context.keepAliveHandler.stop() - context.timeoutHandler.stop() + context.stop() + executor.shutdown() + converterExecutor.shutdown() } @BeforeEach @@ -251,6 +258,9 @@ abstract class AbstractHttpHandlerTest { protected fun Assertion.Builder.jsonBody(): Assertion.Builder = get { body }.isNotNull().get { MAPPER.readTree(bytes()) } + protected fun Response.bodyAsJson(): JsonNode = + MAPPER.readTree(requireNotNull(body) { "empty body" }.bytes()) + protected fun HttpClient.sse(path: String, requestCfg: Request.Builder.() -> Unit = {}): Response = get(path) { requestCfg(it) it.header(Header.ACCEPT, "text/event-stream") @@ -266,5 +276,46 @@ abstract class AbstractHttpHandlerTest { const val SESSION_GROUP = "test-session-group" const val SESSION_ALIAS = "test-session-alias" const val MESSAGE_TYPE = "test-message-type" + + @JvmStatic + fun expectEventually( + timeout: Long, + delay: Long = 100, + description: String = "expected condition was not met withing $timeout mls", + action: suspend () -> Boolean, + ): Assertion.Builder> { + require(timeout > 0) { "invalid timeout value $timeout" } + require(delay > 0) { "invalid delay value $delay" } + return expectCatching { + val deadline = System.currentTimeMillis() + timeout + while (System.currentTimeMillis() < deadline) { + if (action()) { + return@expectCatching + } + Thread.sleep(delay) + } + throw IllegalStateException("condition was not met") + }.describedAs(description) + } + + @JvmStatic + fun expectNever( + timeout: Long, + delay: Long = 100, + description: String = "unexpected condition happened within $timeout mls", + action: suspend () -> Boolean, + ): Assertion.Builder> { + require(timeout > 0) { "invalid timeout value $timeout" } + require(delay > 0) { "invalid delay value $delay" } + return expectCatching { + val deadline = System.currentTimeMillis() + timeout + while (System.currentTimeMillis() < deadline) { + if (action()) { + throw IllegalStateException("condition is met") + } + Thread.sleep(delay) + } + }.describedAs(description) + } } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/lwdataprovider/http/TestTaskDownloadHandler.kt b/src/test/kotlin/com/exactpro/th2/lwdataprovider/http/TestTaskDownloadHandler.kt new file mode 100644 index 00000000..17ca5b8e --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/lwdataprovider/http/TestTaskDownloadHandler.kt @@ -0,0 +1,680 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.lwdataprovider.http + +import com.exactpro.cradle.Direction +import com.exactpro.cradle.messages.StoredGroupedMessageBatch +import com.exactpro.cradle.messages.StoredMessageIdUtils +import com.exactpro.th2.lwdataprovider.configuration.Configuration +import com.exactpro.th2.lwdataprovider.configuration.CustomConfigurationClass +import com.exactpro.th2.lwdataprovider.util.CradleResult +import com.exactpro.th2.lwdataprovider.util.GroupBatch +import com.exactpro.th2.lwdataprovider.util.SupplierResult +import com.exactpro.th2.lwdataprovider.util.createCradleStoredMessage +import com.exactpro.th2.lwdataprovider.workers.TaskStatus +import io.javalin.http.HttpStatus +import okhttp3.internal.closeQuietly +import org.junit.jupiter.api.Test +import org.mockito.kotlin.argThat +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.whenever +import strikt.api.expect +import strikt.api.expectThat +import strikt.assertions.allIndexed +import strikt.assertions.elementAt +import strikt.assertions.isEqualTo +import strikt.assertions.isNotBlank +import strikt.assertions.isNotNull +import strikt.jackson.booleanValue +import strikt.jackson.has +import strikt.jackson.hasSize +import strikt.jackson.isArray +import strikt.jackson.isBoolean +import strikt.jackson.isObject +import strikt.jackson.isTextual +import strikt.jackson.path +import strikt.jackson.textValue +import strikt.jackson.textValues +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock + +class TestTaskDownloadHandler : AbstractHttpHandlerTest() { + override fun createHandler(): TaskDownloadHandler { + return TaskDownloadHandler( + configuration, + convExecutor = context.convExecutor, + sseResponseBuilder, + keepAliveHandler = context.keepAliveHandler, + searchMessagesHandler = context.searchMessagesHandler, + context.requestsDataMeasurement, + context.taskManager, + ) + } + + override val configuration: Configuration + get() = Configuration( + CustomConfigurationClass( + decodingTimeout = 400, + batchSizeBytes = 30, + downloadTaskTTL = 500, + ) + ) + + @Test + fun `get possible statuses`() { + startTest { _, client -> + val response = client.get("/download/status") + val statuses = TaskStatus.values() + val firstTerminalIndex = TaskStatus.COMPLETED.ordinal + expectThat(response) + .jsonBody() + .isArray() + .hasSize(statuses.size) + .allIndexed { index -> + isObject() and { + path("status") + .isTextual() + .textValue() + .isEqualTo(statuses[index].name) + path("terminal") + .isBoolean() + .booleanValue() + .isEqualTo(index >= firstTerminalIndex) + path("description") + .isTextual() + .textValue() + .isNotNull() + .isNotBlank() + } + } + } + } + + @Test + fun `creates task`() { + startTest { _, client -> + val response = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().plusSeconds(10).toEpochMilli(), + "groups" to setOf("group1", "group2"), + "limit" to 42, + "streams" to listOf( + mapOf( + "sessionAlias" to "test", + "directions" to setOf("FIRST"), + ), + ), + "searchDirection" to "previous", + "responseFormats" to setOf("BASE_64", "JSON_PARSED"), + "failFast" to true, + ) + ) + + expectThat(response) { + get { code } isEqualTo HttpStatus.CREATED.code + jsonBody() + .isObject() + .has("taskID") + .path("taskID") + .isTextual() + } + } + } + + @Test + fun `reports incorrect params`() { + startTest { _, client -> + val response = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().plusSeconds(10).toEpochMilli(), + "groups" to emptySet(), + "limit" to -5, + "searchDirection" to "previous", + "responseFormats" to setOf("PROTO_PARSED", "JSON_PARSED"), + ) + ) + + expectThat(response) { + get { code } isEqualTo HttpStatus.BAD_REQUEST.code + jsonBody() + .isObject() + .has("bookID") + .has("groups") + .has("limit") + .has("responseFormats") + } + } + } + + @Test + fun `removes existing task`() { + startTest { _, client -> + val response = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().plusSeconds(10).toEpochMilli(), + "groups" to setOf("group1", "group2"), + ) + ) + val taskID = response.bodyAsJson()["taskID"].asText() + val deleteResp = client.delete( + path = "/download/$taskID" + ) + + expectThat(deleteResp) { + get { code } isEqualTo HttpStatus.NO_CONTENT.code + } + } + } + + @Test + fun `created task is removed automatically after TTL expires`() { + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("BASE_64"), + ) + ) + val taskID = createResp.use { it.bodyAsJson()["taskID"].asText() } + + val downloadTaskTTL = configuration.downloadTaskTTL + expectEventually( + timeout = downloadTaskTTL + 500L, + delay = 300L, + description = "task with id $taskID is not removed due cleanup timeout", + ) { + client.get("/download/$taskID/status").use { it.code == HttpStatus.NOT_FOUND.code } + } + } + } + + @Test + fun `executing task is not removed automatically after TTL expires`() { + val start = Instant.now() + val lock = ReentrantLock() + val condition = lock.newCondition() + doReturn( + SupplierResult( + { + lock.withLock { + condition.await() + } + generateBatch(start, 1) + }, + ) + ).whenever(storage).getGroupedMessageBatches(argThat { + groupName == "test-group" && bookId.name == "test-book" + }) + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("BASE_64"), + ) + ) + val taskID = createResp.use { it.bodyAsJson()["taskID"].asText() } + val downloadInProgress = CompletableFuture.supplyAsync { + client.get("/download/$taskID") + } + + fun freeRequest() { + lock.withLock { + condition.signalAll() + } + } + + try { + val downloadTaskTTL = configuration.downloadTaskTTL + expectNever( + timeout = downloadTaskTTL + 500L, + delay = 300L, + description = "executing task with id $taskID was removed due cleanup timeout", + ) { + client.get("/download/$taskID/status").use { + it.code == HttpStatus.NOT_FOUND.code + } + } + freeRequest() + downloadInProgress.get(100, TimeUnit.MILLISECONDS).closeQuietly() + } finally { + freeRequest() + } + } + } + + @Test + fun `completed task is removed automatically after TTL expires`() { + val start = Instant.now() + doReturn( + SupplierResult( + { generateBatch(start, 1) }, + ) + ).whenever(storage).getGroupedMessageBatches(argThat { + groupName == "test-group" && bookId.name == "test-book" + }) + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("BASE_64"), + ) + ) + val taskID = createResp.use { it.bodyAsJson()["taskID"].asText() } + client.get("/download/$taskID").closeQuietly() + + val downloadTaskTTL = configuration.downloadTaskTTL + expectEventually( + timeout = downloadTaskTTL + 500L, + delay = 300L, + description = "completed task with id $taskID was not removed due cleanup timeout", + ) { + client.get("/download/$taskID/status").use { + it.code == HttpStatus.NOT_FOUND.code + } + } + } + } + + @Test + fun `checks status for existing task`() { + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().plusSeconds(10).toEpochMilli(), + "groups" to setOf("group1", "group2"), + ) + ) + val taskID = createResp.bodyAsJson()["taskID"].asText() + val deleteResp = client.get( + path = "/download/$taskID/status" + ) + + expectThat(deleteResp) { + get { code } isEqualTo HttpStatus.OK.code + jsonBody() + .isObject() + .apply { + path("taskID").textValue() isEqualTo taskID + path("status").textValue() isEqualTo "CREATED" + not().has("errors") + } + } + } + } + + @Test + fun `launches the existing task`() { + val start = Instant.now() + doReturn( + CradleResult( + generateBatch(start, 6) + ) + ).whenever(storage).getGroupedMessageBatches(argThat { + groupName == "test-group" && bookId.name == "test-book" + }) + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to start.toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("BASE_64"), + ) + ) + val taskID = createResp.bodyAsJson()["taskID"].asText() + val response = client.get("/download/$taskID") + + val expectedTimestamp = StoredMessageIdUtils.timestampToString(start) + val seconds = start.epochSecond + val nanos = start.nano + expectThat(response) { + get { code } isEqualTo HttpStatus.OK.code + get { body?.bytes()?.toString(Charsets.UTF_8) } + .isNotNull() + .isEqualTo( + """{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-0","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-0:1:${expectedTimestamp}:1"} + |{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-1","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-1:1:${expectedTimestamp}:2"} + |{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-2","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-2:1:${expectedTimestamp}:3"} + |{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-0","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-0:1:${expectedTimestamp}:4"} + |{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-1","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-1:1:${expectedTimestamp}:5"} + |{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-2","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-2:1:${expectedTimestamp}:6"} + |""".trimMargin(marginPrefix = "|") + ) + } + } + } + + @Test + fun `report decoding timeout during task execution with fail fast`() { + val start = Instant.now() + doReturn( + // This a bit relies on internal implementation. + // We request first bath and the next right way. + // So, first two batches are extract almost at the same time. + // In order to emulate the delay we introduce a sleep in third batch + // And make the batch size small enough to fit only a single message + // In this case first two requests will be sent one by one + // And at the moment we process the last one the firs one is already failed + SupplierResult( + { generateBatch(start, 1) }, + { generateBatch(start, 1, index = 2) }, + { + Thread.sleep(600) // let codec timeout expire + generateBatch(start, 1, index = 3) + }, + + ) + ).whenever(storage).getGroupedMessageBatches(argThat { + groupName == "test-group" && bookId.name == "test-book" + }) + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to start.toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("JSON_PARSED"), + ) + ) + val taskID = createResp.bodyAsJson()["taskID"].asText() + + val expectedTimestamp = StoredMessageIdUtils.timestampToString(start) + expect { + that(client.get("/download/$taskID")) { + get { code } isEqualTo HttpStatus.OK.code + get { body?.bytes()?.toString(Charsets.UTF_8) } + .isNotNull() + .isEqualTo( + """{"id":"test-book:test-0:1:$expectedTimestamp:1","error":"Codec response wasn\u0027t received during timeout"} + |""".trimMargin(marginPrefix = "|") + ) + } + that(client.get("/download/$taskID/status")) { + get { code } isEqualTo HttpStatus.OK.code + jsonBody() + .isObject() and { + path("status").textValue() isEqualTo "CANCELED_WITH_ERRORS" + path("errors").isArray() + .hasSize(1) + .elementAt(0) + .path("error") + .textValue() isEqualTo "{\"id\":\"test-book:test-0:1:$expectedTimestamp:1\",\"error\":\"Codec response wasn\\u0027t received during timeout\"}" + } + } + } + } + } + + @Test + fun `report decoding timeout during task execution without fail fast`() { + val start = Instant.now() + doReturn( + CradleResult( + generateBatch(start, 3) + ) + ).whenever(storage).getGroupedMessageBatches(argThat { + groupName == "test-group" && bookId.name == "test-book" + }) + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to start.toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("JSON_PARSED"), + "failFast" to false, + ) + ) + val taskID = createResp.bodyAsJson()["taskID"].asText() + + val expectedTimestamp = StoredMessageIdUtils.timestampToString(start) + expect { + that(client.get("/download/$taskID")) { + get { code } isEqualTo HttpStatus.OK.code + get { body?.bytes()?.toString(Charsets.UTF_8) } + .isNotNull() + .isEqualTo( + """{"id":"test-book:test-0:1:$expectedTimestamp:1","error":"Codec response wasn\u0027t received during timeout"} + |{"id":"test-book:test-1:1:$expectedTimestamp:2","error":"Codec response wasn\u0027t received during timeout"} + |{"id":"test-book:test-2:1:$expectedTimestamp:3","error":"Codec response wasn\u0027t received during timeout"} + |""".trimMargin(marginPrefix = "|") + ) + } + that(client.get("/download/$taskID/status")) { + get { code } isEqualTo HttpStatus.OK.code + jsonBody() + .isObject() and { + path("status").textValue() isEqualTo "COMPLETED_WITH_ERRORS" + path("errors").isArray() + .hasSize(3) + .allIndexed { + path("error").textValue() isEqualTo + "{\"id\":\"test-book:test-$it:1:$expectedTimestamp:${it + 1}\",\"error\":\"Codec response wasn\\u0027t received during timeout\"}" + } + } + } + } + } + } + + @Test + fun `report error during task execution`() { + val start = Instant.now() + doReturn( + SupplierResult( + { generateBatch(start, 1) }, + { generateBatch(start, 2, index = 2) }, + { throw IllegalStateException("ignore") }, + ) + ).whenever(storage).getGroupedMessageBatches(argThat { + groupName == "test-group" && bookId.name == "test-book" + }) + + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to start.toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("BASE_64"), + ) + ) + val taskID = createResp.bodyAsJson()["taskID"].asText() + + val expectedTimestamp = StoredMessageIdUtils.timestampToString(start) + val seconds = start.epochSecond + val nanos = start.nano + expect { + that(client.get("/download/$taskID")) { + get { code } isEqualTo HttpStatus.OK.code + get { body?.bytes()?.toString(Charsets.UTF_8) } + .isNotNull() + .isEqualTo( + """{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-0","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-0:1:${expectedTimestamp}:1"} + |{"error":"ignore"} + |""".trimMargin(marginPrefix = "|") + ) + } + that(client.get("/download/$taskID/status")) { + get { code } isEqualTo HttpStatus.OK.code + jsonBody() + .isObject() and { + path("status").textValue() isEqualTo "CANCELED_WITH_ERRORS" + path("errors").isArray() + .hasSize(1) + .elementAt(0) + .path("error") + .textValue() isEqualTo + "{\"error\":\"ignore\"}" + } + } + } + } + } + + @Test + fun `task cannot be started twice`() { + val start = Instant.now() + doReturn( + CradleResult( + generateBatch(start, 1) + ) + ).whenever(storage).getGroupedMessageBatches(argThat { + groupName == "test-group" && bookId.name == "test-book" + }) + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to start.toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("BASE_64"), + ) + ) + val taskID = createResp.bodyAsJson()["taskID"].asText() + + val expectedTimestamp = StoredMessageIdUtils.timestampToString(start) + val seconds = start.epochSecond + val nanos = start.nano + expect { + that(client.get("/download/$taskID")) { + get { code } isEqualTo HttpStatus.OK.code + get { body?.bytes()?.toString(Charsets.UTF_8) } + .isNotNull() + .isEqualTo( + """{"timestamp":{"epochSecond":${seconds},"nano":${nanos}},"direction":"IN","sessionId":"test-0","messageType":"","attachedEventIds":[],"body":{},"bodyBase64":"aGVsbG8=","messageId":"test-book:test-0:1:${expectedTimestamp}:1"} + |""".trimMargin(marginPrefix = "|") + ) + } + that(client.get("/download/$taskID")) { + get { code } isEqualTo HttpStatus.CONFLICT.code + jsonBody() + .isObject() + .path("error") + .textValue() isEqualTo "task with id '$taskID' already in progress" + } + } + } + } + + @Test + fun `task cannot be started once removed`() { + + startTest { _, client -> + val createResp = client.post( + path = "/download", + json = mapOf( + "resource" to "MESSAGES", + "bookID" to "test-book", + "startTimestamp" to Instant.now().toEpochMilli(), + "endTimestamp" to Instant.now().toEpochMilli(), + "groups" to setOf("test-group"), + "responseFormats" to setOf("BASE_64"), + ) + ) + val taskID = createResp.bodyAsJson()["taskID"].asText() + client.delete("/download/$taskID") + + expect { + that(client.get("/download/$taskID")) { + get { code } isEqualTo HttpStatus.NOT_FOUND.code + jsonBody() + .isObject() + .path("error") + .textValue() isEqualTo "task with id '$taskID' is not found" + } + } + } + } + + private fun generateBatch(start: Instant, count: Int, index: Long = 1L): StoredGroupedMessageBatch { + var startIndex = index + return GroupBatch( + "test-group", + book = "test-book", + messages = buildList { + repeat(count) { + add( + createCradleStoredMessage( + "test-${it % 3}", + Direction.FIRST, + startIndex++, + timestamp = start, + book = "test-book", + ) + ) + } + }, + ) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/CradleTestUtil.kt b/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/CradleTestUtil.kt index f5a17d33..ead13bca 100644 --- a/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/CradleTestUtil.kt +++ b/src/test/kotlin/com/exactpro/th2/lwdataprovider/util/CradleTestUtil.kt @@ -27,6 +27,7 @@ import com.exactpro.cradle.resultset.CradleResultSet import com.exactpro.cradle.testevents.StoredTestEventId import com.exactpro.cradle.testevents.StoredTestEventSingle import com.exactpro.cradle.testevents.TestEventSingleToStore +import org.apache.logging.log4j.util.Supplier import java.time.Instant fun createCradleStoredMessage( @@ -109,9 +110,24 @@ class ImmutableListCradleResult(collection: Collection) : CradleResultSet< override fun next(): T = iterator.next() } +class SupplierCradleResult( + suppliers: Collection>, +) : CradleResultSet { + private val iterator: Iterator> = suppliers.iterator() + override fun remove() = throw UnsupportedOperationException() + + override fun hasNext(): Boolean = iterator.hasNext() + + override fun next(): T = iterator.next().get() + +} + @Suppress("TestFunctionName") fun CradleResult(vararg data: T): CradleResultSet = ImmutableListCradleResult(data.toList()) +@Suppress("TestFunctionName") +fun SupplierResult(vararg suppliers: Supplier): CradleResultSet = SupplierCradleResult(suppliers.toList()) + const val TEST_SESSION_GROUP = "test-group" const val TEST_SESSION_ALIAS = "test-alias"