Skip to content

Commit

Permalink
Add new configuration parameter: keepOpenPullingTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
purplegen committed Nov 22, 2024
1 parent f46103f commit 61b35e0
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ spec:
# * 0: no compression
# * 1: best speed
# * 9: best compression
# keepOpenPullingTimeout: 100 # time to wait in between attempts to get new data from data storage when `keepOpen` request parameter is used.

pins: # pins are used to communicate with codec components to parse message data
- name: to_codec
Expand Down Expand Up @@ -182,6 +183,7 @@ spec:
# validateCradleData: false # validate data loaded from cradle. NOTE: Enabled validation affect performance
# codecUsePinAttributes: true # send raw message to specified codec (true) or send to all codecs (false)
# responseFormats: string list # resolve data for selected formats only. (allowed values: BASE_64, PARSED)
# keepOpenPullingTimeout: 100 # time to wait in between attempts to get new data from data storage when `keepOpen` request parameter is used.


# pins are used to communicate with codec components to parse message data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class CustomConfigurationClass(
@JsonDeserialize(using = ByteSizeDeserializer::class)
val batchSizeBytes: Int? = null,
val downloadTaskTTL: Long? = null,
val keepOpenPullingTimeoutMs: Long? = null
)

class Configuration(customConfiguration: CustomConfigurationClass) {
Expand Down Expand Up @@ -79,6 +80,8 @@ class Configuration(customConfiguration: CustomConfigurationClass) {
val gzipCompressionLevel: Int = VariableBuilder.getVariable(customConfiguration::gzipCompressionLevel, -1)
val batchSizeBytes: Int = VariableBuilder.getVariable(customConfiguration::batchSizeBytes, 256 * 1024)
val downloadTaskTTL: Long = VariableBuilder.getVariable(customConfiguration::downloadTaskTTL, TimeUnit.HOURS.toMillis(1))
val keepOpenPullingTimeoutMs: Long = VariableBuilder.getVariable(customConfiguration::keepOpenPullingTimeoutMs, 10)

init {
require(bufferPerQuery <= maxBufferDecodeQueue) {
"buffer per queue ($bufferPerQuery) must be less or equal to the total buffer size ($maxBufferDecodeQueue)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class SearchMessagesHandler(
val allLoaded = hashSetOf<Stream>()
do {
val continuePulling = pullUpdates(request, order, sink, allLoaded)
Thread.sleep(configuration.keepOpenPullingTimeoutMs)
} while (continuePulling)
}
}
Expand Down Expand Up @@ -293,13 +294,14 @@ class SearchMessagesHandler(
}
val lastTimestamp: Instant = request.endTimestamp
val allGroupLoaded = hashSetOf<String>()
// TODO: Maybe we need to wait between pulling attempts to not overwhelm storage with requests

do {
val keepPulling = pullUpdates(request, lastTimestamp, sink, parameters, allGroupLoaded)
sink.canceled?.apply {
logger.info { "request canceled: $message" }
return@use
}
Thread.sleep(configuration.keepOpenPullingTimeoutMs)
} while (keepPulling)
}
} catch (ex: Exception) {
Expand Down

0 comments on commit 61b35e0

Please sign in to comment.