From 3052f77c20ceb743b49146ab6032cadb1fa8931c Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Thu, 8 Sep 2022 14:26:50 +0400 Subject: [PATCH 01/10] batcher for performance improvements --- README.md | 12 +++- build.gradle | 1 + gradle.properties | 2 +- .../kotlin/com/exactpro/th2/ws/client/Main.kt | 55 ++++++++++++++++--- .../th2/ws/client/util/MessageUtil.kt | 16 ++---- 5 files changed, 62 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 876850a..88927a6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# WebSocket Client v0.3.1 +# WebSocket Client v0.3.2 This microservice allows sending and receiving messages via WebSocket protocol @@ -86,13 +86,15 @@ metadata: name: ws-client spec: image-name: ghcr.io/th2-net/th2-conn-ws-client - image-version: 0.3.1 + image-version: 0.3.2 custom-config: uri: wss://echo.websocket.org sessionAlias: api_session grpcStartControl: true autoStart: true autoStopAfter: 300 + maxBatchSize: 100 + maxFlushTime: 1000 handlerSettings: pingInterval: 30000 type: th2-conn @@ -125,6 +127,12 @@ spec: ## Changelog +### v0.3.2 + +#### Added: +* batching for messages (dependency on common-utils) +* batching for events (dependency on common-utils) + ### v0.3.1 #### Added: diff --git a/build.gradle b/build.gradle index 0da5f4d..da94df9 100644 --- a/build.gradle +++ b/build.gradle @@ -166,6 +166,7 @@ dependencies { api platform('com.exactpro.th2:bom:3.1.0') implementation 'com.exactpro.th2:common:3.40.0' + implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3014065584-SNAPSHOT' implementation 'com.exactpro.th2:grpc-conn:0.0.1' implementation 'org.slf4j:slf4j-log4j12' diff --git a/gradle.properties b/gradle.properties index 11437ba..32ed106 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ kotlin.code.style=official kotlin_version=1.4.10 -release_version=0.3.1 +release_version=0.3.2 description='Websocket Client' vcs_url=https://github.com/th2-net/th2-conn-ws-client diff --git a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt index 8adef5b..f258c53 100644 --- a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt @@ -19,6 +19,7 @@ package com.exactpro.th2.ws.client import com.exactpro.th2.common.event.Event +import com.exactpro.th2.common.event.EventUtils import com.exactpro.th2.common.grpc.ConnectionID import com.exactpro.th2.common.grpc.Direction import com.exactpro.th2.common.grpc.EventBatch @@ -27,8 +28,9 @@ import com.exactpro.th2.common.schema.factory.CommonFactory import com.exactpro.th2.common.schema.grpc.router.GrpcRouter import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter -import com.exactpro.th2.common.schema.message.QueueAttribute import com.exactpro.th2.common.schema.message.storeEvent +import com.exactpro.th2.common.utils.event.EventBatcher +import com.exactpro.th2.common.utils.message.RawMessageBatcher import com.exactpro.th2.ws.client.Settings.FrameType.TEXT import com.exactpro.th2.ws.client.api.IClient import com.exactpro.th2.ws.client.api.IHandler @@ -37,16 +39,18 @@ import com.exactpro.th2.ws.client.api.IHandlerSettingsTypeProvider import com.exactpro.th2.ws.client.api.impl.DefaultHandler import com.exactpro.th2.ws.client.api.impl.DefaultHandlerSettingsTypeProvider import com.exactpro.th2.ws.client.api.impl.WebSocketClient -import com.exactpro.th2.ws.client.util.toBatch import com.exactpro.th2.ws.client.util.toPrettyString +import com.exactpro.th2.ws.client.util.toRawMessage import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.module.kotlin.KotlinModule import mu.KotlinLogging +import org.apache.commons.lang3.exception.ExceptionUtils import java.net.URI import java.time.Instant import java.util.ServiceLoader import java.util.concurrent.ConcurrentLinkedDeque +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock @@ -121,16 +125,28 @@ fun run( val incomingSequence = createSequence() val outgoingSequence = createSequence() - //TODO: add batching (by size or time) + val scheduledExecutorService = Executors.newScheduledThreadPool(1).also { + registerResource("Batcher scheduled executor", it::shutdownNow) + } + + val batcher = RawMessageBatcher(settings.maxBatchSize, settings.maxFlushTime, { + it.metadataOrBuilder.id.direction + }, scheduledExecutorService, messageRouter::send) + val onMessage = { message: ByteArray, _: Boolean, direction: Direction -> - val sequence = if (direction == Direction.FIRST) incomingSequence else outgoingSequence - val attribute = if (direction == Direction.FIRST) QueueAttribute.FIRST else QueueAttribute.SECOND - messageRouter.send(message.toBatch(connectionId, direction, sequence()), attribute.toString()) + batcher.onMessage(message.toRawMessage( + connectionId, + direction, + (if (direction == Direction.FIRST) incomingSequence else outgoingSequence)() + )) + } + + val eventBatcher = EventBatcher(settings.maxBatchSize, settings.maxFlushTime, scheduledExecutorService, eventRouter::send).also { + registerResource("Event batcher", it::close) } val onEvent = { cause: Throwable?, message: () -> String -> - val type = if (cause != null) "Error" else "Info" - eventRouter.storeEvent(rootEventId, message(), type, cause) + eventBatcher.storeEvent(message(), cause, rootEventId) } val client = WebSocketClient( @@ -190,7 +206,9 @@ data class Settings( val handlerSettings: IHandlerSettings? = null, val grpcStartControl: Boolean = false, val autoStart: Boolean = true, - val autoStopAfter: Int = 0 + val autoStopAfter: Int = 0, + val maxBatchSize: Int = 100, + val maxFlushTime: Long = 1000 ) { enum class FrameType { TEXT { @@ -218,3 +236,22 @@ private inline fun load(defaultImpl: Class): T { private fun createSequence(): () -> Long = Instant.now().run { AtomicLong(epochSecond * SECONDS.toNanos(1) + nano) }::incrementAndGet + +fun EventBatcher.storeEvent(name: String, cause: Throwable?, parentEventId: String) { + val event = createEvent(name, cause) + onEvent(event.toProtoEvent(parentEventId)) +} + +fun createEvent( + name: String, + cause: Throwable? = null +): Event = Event.start().apply { + endTimestamp() + name(name) + type(if (cause != null) "Error" else "Info") + status(if (cause != null) Event.Status.FAILED else Event.Status.PASSED) + + generateSequence(cause, Throwable::cause).forEach { error -> + bodyData(EventUtils.createMessageBean(ExceptionUtils.getMessage(error))) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/ws/client/util/MessageUtil.kt b/src/main/kotlin/com/exactpro/th2/ws/client/util/MessageUtil.kt index 1de4eb0..84435c7 100644 --- a/src/main/kotlin/com/exactpro/th2/ws/client/util/MessageUtil.kt +++ b/src/main/kotlin/com/exactpro/th2/ws/client/util/MessageUtil.kt @@ -18,11 +18,8 @@ package com.exactpro.th2.ws.client.util -import com.exactpro.th2.common.grpc.AnyMessage import com.exactpro.th2.common.grpc.ConnectionID import com.exactpro.th2.common.grpc.Direction -import com.exactpro.th2.common.grpc.MessageGroup -import com.exactpro.th2.common.grpc.MessageGroupBatch import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.common.message.toTimestamp import com.google.protobuf.ByteString @@ -35,17 +32,12 @@ private inline operator fun T.invoke(block: T.() -> Unit) = apply( fun MessageOrBuilder.toPrettyString(): String = JsonFormat.printer().omittingInsignificantWhitespace().includingDefaultValueFields().print(this) -private fun RawMessage.Builder.toBatch() = run(AnyMessage.newBuilder()::setRawMessage) - .run(MessageGroup.newBuilder()::addMessages) - .run(MessageGroupBatch.newBuilder()::addGroups) - .build() - -fun ByteArray.toBatch( +fun ByteArray.toRawMessage( connectionId: ConnectionID, direction: Direction, sequence: Long, -): MessageGroupBatch = RawMessage.newBuilder().apply { - this.body = ByteString.copyFrom(this@toBatch) +): RawMessage.Builder = RawMessage.newBuilder().apply { + this.body = ByteString.copyFrom(this@toRawMessage) this.metadataBuilder { this.timestamp = Instant.now().toTimestamp() this.idBuilder { @@ -54,4 +46,4 @@ fun ByteArray.toBatch( this.sequence = sequence } } -}.toBatch() +} From f90a58d1663ece78b35df6e1e0bbd0d69a044ef8 Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Thu, 8 Sep 2022 17:37:56 +0400 Subject: [PATCH 02/10] raw message batcher into resource --- src/main/kotlin/com/exactpro/th2/ws/client/Main.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt index f258c53..56092b2 100644 --- a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt @@ -131,7 +131,9 @@ fun run( val batcher = RawMessageBatcher(settings.maxBatchSize, settings.maxFlushTime, { it.metadataOrBuilder.id.direction - }, scheduledExecutorService, messageRouter::send) + }, scheduledExecutorService, messageRouter::send).also { + registerResource("Raw message batcher", it::close) + } val onMessage = { message: ByteArray, _: Boolean, direction: Direction -> batcher.onMessage(message.toRawMessage( From 8a91849ceebc2167f13b6a12af9fbdf789ae35c6 Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Tue, 13 Sep 2022 05:02:31 +0400 Subject: [PATCH 03/10] LOG4J version update --- build.gradle | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index da94df9..0e20030 100644 --- a/build.gradle +++ b/build.gradle @@ -169,8 +169,9 @@ dependencies { implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3014065584-SNAPSHOT' implementation 'com.exactpro.th2:grpc-conn:0.0.1' - implementation 'org.slf4j:slf4j-log4j12' - implementation 'org.slf4j:slf4j-api' + implementation 'org.apache.logging.log4j:log4j-slf4j-impl' + implementation 'org.apache.logging.log4j:log4j-1.2-api' + implementation 'org.apache.logging.log4j:log4j-api' implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: kotlin_version implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlin_version From 4bf914e475f54f9f865ca97855d8626d368f1573 Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Tue, 13 Sep 2022 05:14:08 +0400 Subject: [PATCH 04/10] LOG4J version update --- build.gradle | 5 ++--- gradle.properties | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 0e20030..3a318a5 100644 --- a/build.gradle +++ b/build.gradle @@ -169,9 +169,8 @@ dependencies { implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3014065584-SNAPSHOT' implementation 'com.exactpro.th2:grpc-conn:0.0.1' - implementation 'org.apache.logging.log4j:log4j-slf4j-impl' - implementation 'org.apache.logging.log4j:log4j-1.2-api' - implementation 'org.apache.logging.log4j:log4j-api' + implementation group: 'org.slf4j', name: 'slf4j-api', version: slf4j_version + implementation group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4j_version implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: kotlin_version implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlin_version diff --git a/gradle.properties b/gradle.properties index 32ed106..279c5d2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,6 @@ kotlin.code.style=official kotlin_version=1.4.10 +slf4j_version=1.7.32 release_version=0.3.2 description='Websocket Client' vcs_url=https://github.com/th2-net/th2-conn-ws-client From 10efdda3ac8d9ddda30aa6f32194a2549d7a76bb Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Fri, 30 Sep 2022 17:05:18 +0400 Subject: [PATCH 05/10] fixed error with published message --- build.gradle | 2 +- src/main/kotlin/com/exactpro/th2/ws/client/Main.kt | 12 +++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 3a318a5..6ed4f74 100644 --- a/build.gradle +++ b/build.gradle @@ -166,7 +166,7 @@ dependencies { api platform('com.exactpro.th2:bom:3.1.0') implementation 'com.exactpro.th2:common:3.40.0' - implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3014065584-SNAPSHOT' + implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3139295943-SNAPSHOT' implementation 'com.exactpro.th2:grpc-conn:0.0.1' implementation group: 'org.slf4j', name: 'slf4j-api', version: slf4j_version diff --git a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt index 56092b2..511d198 100644 --- a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt @@ -28,9 +28,11 @@ import com.exactpro.th2.common.schema.factory.CommonFactory import com.exactpro.th2.common.schema.grpc.router.GrpcRouter import com.exactpro.th2.common.schema.message.MessageListener import com.exactpro.th2.common.schema.message.MessageRouter +import com.exactpro.th2.common.schema.message.QueueAttribute import com.exactpro.th2.common.schema.message.storeEvent import com.exactpro.th2.common.utils.event.EventBatcher import com.exactpro.th2.common.utils.message.RawMessageBatcher +import com.exactpro.th2.common.utils.message.direction import com.exactpro.th2.ws.client.Settings.FrameType.TEXT import com.exactpro.th2.ws.client.api.IClient import com.exactpro.th2.ws.client.api.IHandler @@ -131,7 +133,15 @@ fun run( val batcher = RawMessageBatcher(settings.maxBatchSize, settings.maxFlushTime, { it.metadataOrBuilder.id.direction - }, scheduledExecutorService, messageRouter::send).also { + }, scheduledExecutorService, { + when (it.groupsList.first().direction) { + Direction.FIRST -> messageRouter.send(it, QueueAttribute.FIRST.value) + Direction.SECOND -> messageRouter.send(it, QueueAttribute.SECOND.value) + else -> error("Unrecognized direction") + } + }) { + LOGGER.error(it) { "Can't send message group batch due inner error" } + }.also { registerResource("Raw message batcher", it::close) } From f6d0a0b8d7794c1cc8c95f9fe99b39bd489957a2 Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Fri, 30 Sep 2022 17:33:13 +0400 Subject: [PATCH 06/10] fixed kotlin compile error, updated dependency --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 279c5d2..31604ff 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ kotlin.code.style=official -kotlin_version=1.4.10 +kotlin_version=1.6.21 slf4j_version=1.7.32 release_version=0.3.2 description='Websocket Client' From a98f14ef45709fbe72c6799da29fe909f1381356 Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Mon, 3 Oct 2022 06:56:18 +0400 Subject: [PATCH 07/10] temporary changes due log4j issues --- build.gradle | 7 +++---- gradle.properties | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index 6ed4f74..ce48791 100644 --- a/build.gradle +++ b/build.gradle @@ -163,14 +163,13 @@ clean { } dependencies { - api platform('com.exactpro.th2:bom:3.1.0') + api platform('com.exactpro.th2:bom:4.0.1') - implementation 'com.exactpro.th2:common:3.40.0' + implementation 'com.exactpro.th2:common:3.41.0' implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3139295943-SNAPSHOT' implementation 'com.exactpro.th2:grpc-conn:0.0.1' - implementation group: 'org.slf4j', name: 'slf4j-api', version: slf4j_version - implementation group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4j_version + implementation "org.slf4j:slf4j-api" implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: kotlin_version implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlin_version diff --git a/gradle.properties b/gradle.properties index 31604ff..bcfb10c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ kotlin.code.style=official kotlin_version=1.6.21 -slf4j_version=1.7.32 +#slf4j_version=1.7.32 release_version=0.3.2 description='Websocket Client' vcs_url=https://github.com/th2-net/th2-conn-ws-client From 0fb53a7ab893f8f675f8a74ee204cffdb1eded4a Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Mon, 3 Oct 2022 07:03:36 +0400 Subject: [PATCH 08/10] common and bom version downgrade --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index ce48791..93b98df 100644 --- a/build.gradle +++ b/build.gradle @@ -163,9 +163,9 @@ clean { } dependencies { - api platform('com.exactpro.th2:bom:4.0.1') + api platform('com.exactpro.th2:bom:3.1.0') - implementation 'com.exactpro.th2:common:3.41.0' + implementation "com.exactpro.th2:common:3.37.2" implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3139295943-SNAPSHOT' implementation 'com.exactpro.th2:grpc-conn:0.0.1' From e45dbc90a5a19619995b4283f1b6aae4e1dc0ddf Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Mon, 3 Oct 2022 13:15:09 +0400 Subject: [PATCH 09/10] updated back to last common logger issues --- build.gradle | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 93b98df..99c510c 100644 --- a/build.gradle +++ b/build.gradle @@ -163,13 +163,17 @@ clean { } dependencies { - api platform('com.exactpro.th2:bom:3.1.0') + api platform('com.exactpro.th2:bom:4.0.1') - implementation "com.exactpro.th2:common:3.37.2" + implementation "com.exactpro.th2:common:3.41.0" implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3139295943-SNAPSHOT' implementation 'com.exactpro.th2:grpc-conn:0.0.1' - implementation "org.slf4j:slf4j-api" + implementation "org.slf4j:slf4j-api:2.0.3" + implementation ("org.apache.logging.log4j:log4j-slf4j2-impl:2.19.0") + implementation ("org.apache.logging.log4j:log4j-1.2-api:2.19.0") + implementation ("org.apache.logging.log4j:log4j-api:2.19.0") + implementation ("org.apache.logging.log4j:log4j-core:2.19.0") implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: kotlin_version implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlin_version From e7807f50c50c569d814b89006e8cad50232dfad2 Mon Sep 17 00:00:00 2001 From: Nikita Shaposhnikov Date: Thu, 6 Oct 2022 05:15:25 +0400 Subject: [PATCH 10/10] updated common-utils lib to master version 0.0.1 --- build.gradle | 2 +- gradle.properties | 1 - src/main/kotlin/com/exactpro/th2/ws/client/Main.kt | 11 +++++------ 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index 99c510c..739d749 100644 --- a/build.gradle +++ b/build.gradle @@ -166,7 +166,7 @@ dependencies { api platform('com.exactpro.th2:bom:4.0.1') implementation "com.exactpro.th2:common:3.41.0" - implementation 'com.exactpro.th2:common-utils:0.0.1-dev-3139295943-SNAPSHOT' + implementation 'com.exactpro.th2:common-utils:0.0.1' implementation 'com.exactpro.th2:grpc-conn:0.0.1' implementation "org.slf4j:slf4j-api:2.0.3" diff --git a/gradle.properties b/gradle.properties index bcfb10c..68bef28 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,5 @@ kotlin.code.style=official kotlin_version=1.6.21 -#slf4j_version=1.7.32 release_version=0.3.2 description='Websocket Client' vcs_url=https://github.com/th2-net/th2-conn-ws-client diff --git a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt index 511d198..1007295 100644 --- a/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt +++ b/src/main/kotlin/com/exactpro/th2/ws/client/Main.kt @@ -31,6 +31,7 @@ import com.exactpro.th2.common.schema.message.MessageRouter import com.exactpro.th2.common.schema.message.QueueAttribute import com.exactpro.th2.common.schema.message.storeEvent import com.exactpro.th2.common.utils.event.EventBatcher +import com.exactpro.th2.common.utils.message.RAW_DIRECTION_SELECTOR import com.exactpro.th2.common.utils.message.RawMessageBatcher import com.exactpro.th2.common.utils.message.direction import com.exactpro.th2.ws.client.Settings.FrameType.TEXT @@ -131,16 +132,14 @@ fun run( registerResource("Batcher scheduled executor", it::shutdownNow) } - val batcher = RawMessageBatcher(settings.maxBatchSize, settings.maxFlushTime, { - it.metadataOrBuilder.id.direction - }, scheduledExecutorService, { + val batcher = RawMessageBatcher(settings.maxBatchSize, settings.maxFlushTime, RAW_DIRECTION_SELECTOR, scheduledExecutorService, { throwable: Throwable -> + LOGGER.error(throwable) { "Can't send message group batch due inner error" } + }) { when (it.groupsList.first().direction) { Direction.FIRST -> messageRouter.send(it, QueueAttribute.FIRST.value) Direction.SECOND -> messageRouter.send(it, QueueAttribute.SECOND.value) else -> error("Unrecognized direction") } - }) { - LOGGER.error(it) { "Can't send message group batch due inner error" } }.also { registerResource("Raw message batcher", it::close) } @@ -219,7 +218,7 @@ data class Settings( val grpcStartControl: Boolean = false, val autoStart: Boolean = true, val autoStopAfter: Int = 0, - val maxBatchSize: Int = 100, + val maxBatchSize: Int = 1000, val maxFlushTime: Long = 1000 ) { enum class FrameType {