Skip to content

Commit

Permalink
[TH2-5139] Add API for submitting download task and checking its stat…
Browse files Browse the repository at this point in the history
…us (#77)

* Extract json stream processing into separate method

* Add task handler

* Add opeapi annotations to generate proper description

* Add negative tests

* Add fail-fast property to download task request

* Add env variable to configure cotext path

* Use context path only for redoc

* Rename env variable

* Update version and readme

* Register handler in server

* Add missing copyright

* Extract task management into a separate object. Add cleanup thread for tasks

* Update readme

* Add info logging for requests and enable debug loggin on trace level

* Correct text in tests

* Add endpoint for getting all the task statuses

* Add method name to request logger

* Change loggin level for fail-fast codec error
  • Loading branch information
OptimumCode authored Jan 31, 2024
1 parent 40fc03a commit ff8091b
Show file tree
Hide file tree
Showing 20 changed files with 1,765 additions and 98 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Lightweight data provider (2.5.2)
# Lightweight data provider (2.6.0)

# Overview
This component serves as a data provider for [th2-data-services](https://github.com/th2-net/th2-data-services). It will connect to the cassandra database via [cradle api](https://github.com/th2-net/cradleapi) and expose the data stored in there as REST resources.
Expand Down Expand Up @@ -224,6 +224,16 @@ spec:
# Release notes:
## 2.6.0
+ Add download task endpoints:
+ POST `/download` - register task
+ GET `/download/{taskID}` - execute task
+ GET `/download/{taskID}/status` - get task status
+ DELETE `/download/{taskID}` - remove task
+ Add parameter `downloadTaskTTL` to clean up the completed or not started task after the specified time in milliseconds. 1 hour by default.
+ Add `EXTERNAL_CONTEXT_PATH` env variable to inform provider about external context that is used in requests

## 2.5.2

+ Fix possible deadlock in case the response queue is filled up with keep-alive events
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

kotlin.code.style=official

release_version=2.5.2
release_version=2.6.0
description='th2 Lightweight data provider component'
vcs_url=https://github.com/th2-net/th2-lw-data-provider
docker_image_name=
Expand Down
19 changes: 17 additions & 2 deletions src/main/kotlin/com/exactpro/th2/lwdataprovider/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.exactpro.th2.lwdataprovider.handlers.SearchEventsHandler
import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler
import com.exactpro.th2.lwdataprovider.metrics.DataMeasurementHistogram
import com.exactpro.th2.lwdataprovider.workers.KeepAliveHandler
import com.exactpro.th2.lwdataprovider.workers.TaskManager
import com.exactpro.th2.lwdataprovider.workers.TimerWatcher
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -67,7 +68,7 @@ class Context(
configuration.codecUsePinAttributes
),

val timeoutHandler: TimerWatcher = TimerWatcher(mqDecoder, configuration),
val timeoutHandler: TimerWatcher = TimerWatcher(mqDecoder, configuration.decodingTimeout, "decoding"),
val cradleEventExtractor: CradleEventExtractor = CradleEventExtractor(
cradleManager,
DataMeasurementHistogram.create(registry, "cradle event")
Expand Down Expand Up @@ -108,8 +109,22 @@ class Context(
execExecutor,
),
val generalCradleHandler: GeneralCradleHandler = GeneralCradleHandler(generalCradleExtractor, execExecutor),
val applicationName: String
val applicationName: String,
val taskManager: TaskManager = TaskManager(),
val taskWatcher: TimerWatcher = TimerWatcher(taskManager, configuration.downloadTaskTTL, "tasks"),
) {

fun start() {
timeoutHandler.start()
keepAliveHandler.start()
taskWatcher.start()
}

fun stop() {
taskWatcher.stop()
keepAliveHandler.stop()
timeoutHandler.stop()
}
companion object {
@JvmStatic
fun createObjectMapper(): ObjectMapper = jacksonObjectMapper()
Expand Down
6 changes: 2 additions & 4 deletions src/main/kotlin/com/exactpro/th2/lwdataprovider/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,9 @@ class Main {
private fun startServer() {
setupMetrics(context)

context.keepAliveHandler.start()
context.timeoutHandler.start()
context.start()

resources += AutoCloseable { context.keepAliveHandler.stop() }
resources += AutoCloseable { context.timeoutHandler.stop() }
resources += AutoCloseable { context.stop() }

@Suppress("LiftReturnOrAssignment")
when (context.configuration.mode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.exactpro.th2.lwdataprovider.entities.internal.ResponseFormat
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import mu.KotlinLogging
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.math.max
import kotlin.math.min

Expand Down Expand Up @@ -47,6 +48,7 @@ class CustomConfigurationClass(
val gzipCompressionLevel: Int? = null,
@JsonDeserialize(using = ByteSizeDeserializer::class)
val batchSizeBytes: Int? = null,
val downloadTaskTTL: Long? = null,
)

class Configuration(customConfiguration: CustomConfigurationClass) {
Expand Down Expand Up @@ -76,6 +78,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
val flushSseAfter: Int = VariableBuilder.getVariable(customConfiguration::flushSseAfter, 0)
val gzipCompressionLevel: Int = VariableBuilder.getVariable(customConfiguration::gzipCompressionLevel, -1)
val batchSizeBytes: Int = VariableBuilder.getVariable(customConfiguration::batchSizeBytes, 256 * 1024)
val downloadTaskTTL: Long = VariableBuilder.getVariable(customConfiguration::downloadTaskTTL, TimeUnit.HOURS.toMillis(1))
init {
require(bufferPerQuery <= maxBufferDecodeQueue) {
"buffer per queue ($bufferPerQuery) must be less or equal to the total buffer size ($maxBufferDecodeQueue)"
Expand All @@ -96,6 +99,7 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
require(gzipCompressionLevel <= 9 && gzipCompressionLevel >= -1) {
"gzipCompressionLevel must be integer in the [-1, 9] range"
}
require(downloadTaskTTL > 0) { "negative download task TTL: $downloadTaskTTL" }
}

override fun toString(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,11 @@ enum class ResponseFormat {
error("only one parsed format can be specified in $formats")
}
}

@JvmStatic
fun isValidCombination(formats: Set<ResponseFormat>): Boolean {
if (formats.size <= 1) return true
return !(PROTO_PARSED in formats && JSON_PARSED in formats)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ data class MessagesGroupRequest(
val responseFormats: Set<ResponseFormat>? = null,
val includeStreams: Set<ProviderMessageStream> = emptySet(),
val limit: Int? = null,
val searchDirection: SearchDirection = SearchDirection.next
val searchDirection: SearchDirection = SearchDirection.next,
val failFast: Boolean = false, // for backward compatibility
) {
init {
if (!responseFormats.isNullOrEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import com.exactpro.th2.lwdataprovider.entities.requests.SearchDirection
import com.exactpro.th2.lwdataprovider.entities.requests.util.convertToMessageStreams
import com.exactpro.th2.lwdataprovider.entities.responses.ProviderMessage53
import com.exactpro.th2.lwdataprovider.handlers.SearchMessagesHandler
import com.exactpro.th2.lwdataprovider.http.util.JSON_STREAM_CONTENT_TYPE
import com.exactpro.th2.lwdataprovider.http.util.listQueryParameters
import com.exactpro.th2.lwdataprovider.http.util.writeJsonStream
import com.exactpro.th2.lwdataprovider.metrics.HttpWriteMetrics
import com.exactpro.th2.lwdataprovider.metrics.ResponseQueue
import com.exactpro.th2.lwdataprovider.workers.KeepAliveHandler
Expand Down Expand Up @@ -188,76 +190,12 @@ class FileDownloadHandler(
)
keepAliveHandler.addKeepAliveData(handler).use {
searchMessagesHandler.loadMessageGroups(request, handler, dataMeasurement)
processData(ctx, queue, handler)
writeJsonStream(ctx, queue, handler, dataMeasurement, LOGGER)
LOGGER.info { "Processing search sse messages group request finished" }
}
}

private fun processData(
ctx: Context,
queue: ArrayBlockingQueue<Supplier<SseEvent>>,
handler: HttpMessagesRequestHandler
) {
val matchedPath = ctx.matchedPath()
var dataSent = 0

var writeHeader = true
var status: HttpStatus = HttpStatus.OK

fun writeHeader() {
if (writeHeader) {
ctx.status(status)
.contentType(JSON_STREAM_CONTENT_TYPE)
.header(Header.TRANSFER_ENCODING, "chunked")
writeHeader = false
}
}

val output = ctx.res().outputStream.buffered()
try {
do {
dataMeasurement.start("process_sse_event").use {
val nextEvent = queue.take()
ResponseQueue.currentSize(matchedPath, queue.size)
val sseEvent = dataMeasurement.start("await_convert_to_json").use { nextEvent.get() }
if (writeHeader && sseEvent is SseEvent.ErrorData.SimpleError) {
// something happened during request
status = HttpStatus.INTERNAL_SERVER_ERROR
}
writeHeader()
when (sseEvent.event) {
EventType.KEEP_ALIVE -> output.flush()
EventType.CLOSE -> {
LOGGER.info { "Received close event" }
return
}

else -> {
LOGGER.debug {
"Write event to output: ${
StringUtils.abbreviate(sseEvent.data.toString(DATA_CHARSET), 100)
}"
}
output.write(sseEvent.data)
output.write('\n'.code)
dataSent++
}
}
}
} while (true)
} catch (ex: Exception) {
LOGGER.error(ex) { "cannot process next event" }
handler.cancel()
queue.clear()
} finally {
HttpWriteMetrics.messageSent(matchedPath, dataSent)
runCatching { output.flush() }
.onFailure { LOGGER.error(it) { "cannot flush the remaining data when processing is finished" } }
}
}

companion object {
private const val JSON_STREAM_CONTENT_TYPE = "application/stream+json"
private const val GROUP_PARAM = "group"
private const val START_TIMESTAMP_PARAM = "startTimestamp"
private const val END_TIMESTAMP_PARAM = "endTimestamp"
Expand Down
45 changes: 38 additions & 7 deletions src/main/kotlin/com/exactpro/th2/lwdataprovider/http/HttpServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import com.exactpro.th2.lwdataprovider.SseResponseBuilder
import com.exactpro.th2.lwdataprovider.entities.exceptions.InvalidRequestException
import com.exactpro.th2.lwdataprovider.entities.internal.ProviderEventId
import com.exactpro.th2.lwdataprovider.entities.requests.SearchDirection
import com.exactpro.th2.lwdataprovider.http.serializers.BookIdDeserializer
import com.exactpro.th2.lwdataprovider.producers.MessageProducer
import com.exactpro.th2.lwdataprovider.producers.MessageProducer53
import com.exactpro.th2.lwdataprovider.producers.MessageProducer53Transport
import com.exactpro.th2.lwdataprovider.workers.TaskManager
import com.fasterxml.jackson.databind.module.SimpleModule
import io.javalin.Javalin
import io.javalin.config.JavalinConfig
import io.javalin.http.BadRequestResponse
Expand Down Expand Up @@ -131,23 +134,46 @@ class HttpServer(private val context: Context) {
context.searchMessagesHandler,
context.requestsDataMeasurement,
),
TaskDownloadHandler(
configuration,
context.convExecutor,
sseResponseBuilder,
context.keepAliveHandler,
context.searchMessagesHandler,
context.requestsDataMeasurement,
context.taskManager,
),
)

app = Javalin.create {
it.showJavalinBanner = false
it.jsonMapper(JavalinJackson(jacksonMapper))
// it.plugins.enableDevLogging()
it.jsonMapper(JavalinJackson(
jacksonMapper.registerModule(
SimpleModule("th2").apply {
addDeserializer(BookId::class.java, BookIdDeserializer())
}
)
))
if (logger.isTraceEnabled) {
it.plugins.enableDevLogging()
} else {
it.requestLogger.http { ctx, time ->
logger.info { "Request ${ctx.method().name} '${ctx.path()}' executed with status ${ctx.status()}: ${time}.ms" }
}
}
it.plugins.register(MicrometerPlugin.create { micrometer ->
micrometer.registry =
PrometheusMeterRegistry(PrometheusConfig.DEFAULT, CollectorRegistry.defaultRegistry, Clock.SYSTEM)
micrometer.tags = listOf(Tag.of("application", context.applicationName))
})

setupOpenApi(it)
val externalContextPath = System.getenv(EXTERNAL_CONTEXT_PATH_ENV)?.takeUnless(String::isBlank)

setupOpenApi(it, externalContextPath)

// setupSwagger(it)

setupReDoc(it)
setupReDoc(it, externalContextPath)
}.apply {
setupConverters(this)
val javalinContext = JavalinContext(configuration.flushSseAfter)
Expand All @@ -168,8 +194,11 @@ class HttpServer(private val context: Context) {
logger.info { "http server stopped" }
}

private fun setupReDoc(it: JavalinConfig) {
private fun setupReDoc(it: JavalinConfig, externalContextPath: String?) {
val reDocConfiguration = ReDocConfiguration()
externalContextPath?.also { path ->
reDocConfiguration.basePath = path
}
it.plugins.register(ReDocPlugin(reDocConfiguration))
}

Expand All @@ -178,7 +207,7 @@ class HttpServer(private val context: Context) {
// it.plugins.register(SwaggerPlugin(swaggerConfiguration))
// }

private fun setupOpenApi(it: JavalinConfig) {
private fun setupOpenApi(it: JavalinConfig, externalContextPath: String?) {

val openApiConfiguration = OpenApiPluginConfiguration()
.withDefinitionConfiguration { _, definition ->
Expand All @@ -197,7 +226,7 @@ class HttpServer(private val context: Context) {
"Light Weight Data Provider provides you with fast access to data in Cradle"
openApiInfo.contact = openApiContact
openApiInfo.license = openApiLicense
openApiInfo.version = "2.2.0"
openApiInfo.version = "2.6.0"
}.withServer { openApiServer ->
openApiServer.url = "http://localhost:{port}"
openApiServer.addVariable("port", "8080", arrayOf("8080"), "Port of the server")
Expand All @@ -209,6 +238,8 @@ class HttpServer(private val context: Context) {
companion object {
private val logger = KotlinLogging.logger {}

private const val EXTERNAL_CONTEXT_PATH_ENV = "EXTERNAL_CONTEXT_PATH"

const val TIME_EXAMPLE =
"Every value that is greater than 1_000_000_000 ^ 2 will be interpreted as nanos. Otherwise, as millis.\n" +
"Millis: 1676023329533, Nanos: 1676023329533590976"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.exactpro.th2.lwdataprovider.handlers.MessageResponseHandler
import com.exactpro.th2.lwdataprovider.metrics.HttpWriteMetrics
import com.exactpro.th2.lwdataprovider.producers.JsonFormatter
import com.exactpro.th2.lwdataprovider.producers.ParsedFormats
import mu.KotlinLogging
import org.apache.commons.lang3.exception.ExceptionUtils
import java.time.Duration
import java.util.EnumSet
Expand All @@ -49,6 +50,7 @@ class HttpMessagesRequestHandler(
dataMeasurement: DataMeasurement,
maxMessagesPerRequest: Int = 0,
responseFormats: Set<ResponseFormat> = EnumSet.of(ResponseFormat.BASE_64, ResponseFormat.PROTO_PARSED),
private val failFast: Boolean = false,
) : MessageResponseHandler(dataMeasurement, maxMessagesPerRequest), KeepAliveListener {
private val includeRaw: Boolean = responseFormats.isEmpty() || ResponseFormat.BASE_64 in responseFormats
private val jsonFormatter: JsonFormatter? = (responseFormats - ResponseFormat.BASE_64).run {
Expand All @@ -71,7 +73,15 @@ class HttpMessagesRequestHandler(
val future: CompletableFuture<SseEvent> = data.completed.thenApplyAsync({ requestedMessage: RequestedMessage ->
dataMeasurement.start("convert_to_json").use {
if (jsonFormatter != null && requestedMessage.protoMessage == null && requestedMessage.transportMessage == null) {
builder.codecTimeoutError(requestedMessage.storedMessage.id, counter)
builder.codecTimeoutError(requestedMessage.storedMessage.id, counter).also {
if (failFast) {
LOGGER.warn { "Codec timeout. Canceling processing due to fail-fast strategy" }
// note that this might not stop the processing right away
// this is called on a different thread
// so some messages might be loaded before the processing is canceled
fail()
}
}
} else {
builder.build(
requestedMessage, jsonFormatter, includeRaw,
Expand Down Expand Up @@ -105,6 +115,15 @@ class HttpMessagesRequestHandler(
val counter = indexer.nextIndex()
return buffer.offer({ builder.build(scannedObjectInfo, counter) }, duration.toMillis(), TimeUnit.MILLISECONDS)
}

private fun fail() {
cancel()
buffer.put { SseEvent.Closed }
}

companion object {
private val LOGGER = KotlinLogging.logger { }
}
}

class DataIndexer {
Expand Down
Loading

0 comments on commit ff8091b

Please sign in to comment.