Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcher for performance improvements #12

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# WebSocket Client v0.3.1
# WebSocket Client v0.3.2

This microservice allows sending and receiving messages via WebSocket protocol

Expand Down Expand Up @@ -86,13 +86,15 @@ metadata:
name: ws-client
spec:
image-name: ghcr.io/th2-net/th2-conn-ws-client
image-version: 0.3.1
image-version: 0.3.2
custom-config:
uri: wss://echo.websocket.org
sessionAlias: api_session
grpcStartControl: true
autoStart: true
autoStopAfter: 300
maxBatchSize: 100
maxFlushTime: 1000
handlerSettings:
pingInterval: 30000
type: th2-conn
Expand Down Expand Up @@ -125,6 +127,12 @@ spec:

## Changelog

### v0.3.2

#### Added:
* batching for messages (dependency on common-utils)
* batching for events (dependency on common-utils)

### v0.3.1

#### Added:
Expand Down
12 changes: 8 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ clean {
}

dependencies {
api platform('com.exactpro.th2:bom:3.1.0')
api platform('com.exactpro.th2:bom:4.0.1')

implementation 'com.exactpro.th2:common:3.40.0'
implementation "com.exactpro.th2:common:3.41.0"
implementation 'com.exactpro.th2:common-utils:0.0.1'
implementation 'com.exactpro.th2:grpc-conn:0.0.1'

implementation 'org.slf4j:slf4j-log4j12'
implementation 'org.slf4j:slf4j-api'
implementation "org.slf4j:slf4j-api:2.0.3"
implementation ("org.apache.logging.log4j:log4j-slf4j2-impl:2.19.0")
implementation ("org.apache.logging.log4j:log4j-1.2-api:2.19.0")
implementation ("org.apache.logging.log4j:log4j-api:2.19.0")
implementation ("org.apache.logging.log4j:log4j-core:2.19.0")

implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: kotlin_version
implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlin_version
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kotlin.code.style=official
kotlin_version=1.4.10
release_version=0.3.1
kotlin_version=1.6.21
release_version=0.3.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump minor version instead since there was an extension of functionality

description='Websocket Client'
vcs_url=https://github.com/th2-net/th2-conn-ws-client
64 changes: 56 additions & 8 deletions src/main/kotlin/com/exactpro/th2/ws/client/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.exactpro.th2.ws.client

import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.event.EventUtils
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.EventBatch
Expand All @@ -29,6 +30,10 @@ import com.exactpro.th2.common.schema.message.MessageListener
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.schema.message.QueueAttribute
import com.exactpro.th2.common.schema.message.storeEvent
import com.exactpro.th2.common.utils.event.EventBatcher
import com.exactpro.th2.common.utils.message.RAW_DIRECTION_SELECTOR
import com.exactpro.th2.common.utils.message.RawMessageBatcher
import com.exactpro.th2.common.utils.message.direction
import com.exactpro.th2.ws.client.Settings.FrameType.TEXT
import com.exactpro.th2.ws.client.api.IClient
import com.exactpro.th2.ws.client.api.IHandler
Expand All @@ -37,16 +42,18 @@ import com.exactpro.th2.ws.client.api.IHandlerSettingsTypeProvider
import com.exactpro.th2.ws.client.api.impl.DefaultHandler
import com.exactpro.th2.ws.client.api.impl.DefaultHandlerSettingsTypeProvider
import com.exactpro.th2.ws.client.api.impl.WebSocketClient
import com.exactpro.th2.ws.client.util.toBatch
import com.exactpro.th2.ws.client.util.toPrettyString
import com.exactpro.th2.ws.client.util.toRawMessage
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import mu.KotlinLogging
import org.apache.commons.lang3.exception.ExceptionUtils
import java.net.URI
import java.time.Instant
import java.util.ServiceLoader
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
Expand Down Expand Up @@ -121,16 +128,36 @@ fun run(
val incomingSequence = createSequence()
val outgoingSequence = createSequence()

//TODO: add batching (by size or time)
val scheduledExecutorService = Executors.newScheduledThreadPool(1).also {
registerResource("Batcher scheduled executor", it::shutdownNow)
}

val batcher = RawMessageBatcher(settings.maxBatchSize, settings.maxFlushTime, RAW_DIRECTION_SELECTOR, scheduledExecutorService, { throwable: Throwable ->
LOGGER.error(throwable) { "Can't send message group batch due inner error" }
}) {
when (it.groupsList.first().direction) {
Direction.FIRST -> messageRouter.send(it, QueueAttribute.FIRST.value)
Direction.SECOND -> messageRouter.send(it, QueueAttribute.SECOND.value)
else -> error("Unrecognized direction")
}
}.also {
registerResource("Raw message batcher", it::close)
}

val onMessage = { message: ByteArray, _: Boolean, direction: Direction ->
val sequence = if (direction == Direction.FIRST) incomingSequence else outgoingSequence
val attribute = if (direction == Direction.FIRST) QueueAttribute.FIRST else QueueAttribute.SECOND
messageRouter.send(message.toBatch(connectionId, direction, sequence()), attribute.toString())
batcher.onMessage(message.toRawMessage(
connectionId,
direction,
(if (direction == Direction.FIRST) incomingSequence else outgoingSequence)()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(if (direction == Direction.FIRST) incomingSequence else outgoingSequence)()
(if (direction == Direction.FIRST) incomingSequence else outgoingSequence).invoke()

For readability

))
}

val eventBatcher = EventBatcher(settings.maxBatchSize, settings.maxFlushTime, scheduledExecutorService, eventRouter::send).also {
registerResource("Event batcher", it::close)
}

val onEvent = { cause: Throwable?, message: () -> String ->
val type = if (cause != null) "Error" else "Info"
eventRouter.storeEvent(rootEventId, message(), type, cause)
eventBatcher.storeEvent(message(), cause, rootEventId)
}

val client = WebSocketClient(
Expand Down Expand Up @@ -190,7 +217,9 @@ data class Settings(
val handlerSettings: IHandlerSettings? = null,
val grpcStartControl: Boolean = false,
val autoStart: Boolean = true,
val autoStopAfter: Int = 0
val autoStopAfter: Int = 0,
val maxBatchSize: Int = 1000,
val maxFlushTime: Long = 1000
) {
enum class FrameType {
TEXT {
Expand Down Expand Up @@ -218,3 +247,22 @@ private inline fun <reified T> load(defaultImpl: Class<out T>): T {
private fun createSequence(): () -> Long = Instant.now().run {
AtomicLong(epochSecond * SECONDS.toNanos(1) + nano)
}::incrementAndGet

fun EventBatcher.storeEvent(name: String, cause: Throwable?, parentEventId: String) {
val event = createEvent(name, cause)
onEvent(event.toProtoEvent(parentEventId))
}

fun createEvent(
name: String,
cause: Throwable? = null
): Event = Event.start().apply {
endTimestamp()
name(name)
type(if (cause != null) "Error" else "Info")
status(if (cause != null) Event.Status.FAILED else Event.Status.PASSED)

generateSequence(cause, Throwable::cause).forEach { error ->
bodyData(EventUtils.createMessageBean(ExceptionUtils.getMessage(error)))
}
}
16 changes: 4 additions & 12 deletions src/main/kotlin/com/exactpro/th2/ws/client/util/MessageUtil.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

package com.exactpro.th2.ws.client.util

import com.exactpro.th2.common.grpc.AnyMessage
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.MessageGroup
import com.exactpro.th2.common.grpc.MessageGroupBatch
import com.exactpro.th2.common.grpc.RawMessage
import com.exactpro.th2.common.message.toTimestamp
import com.google.protobuf.ByteString
Expand All @@ -35,17 +32,12 @@ private inline operator fun <T : Builder> T.invoke(block: T.() -> Unit) = apply(

fun MessageOrBuilder.toPrettyString(): String = JsonFormat.printer().omittingInsignificantWhitespace().includingDefaultValueFields().print(this)

private fun RawMessage.Builder.toBatch() = run(AnyMessage.newBuilder()::setRawMessage)
.run(MessageGroup.newBuilder()::addMessages)
.run(MessageGroupBatch.newBuilder()::addGroups)
.build()

fun ByteArray.toBatch(
fun ByteArray.toRawMessage(
connectionId: ConnectionID,
direction: Direction,
sequence: Long,
): MessageGroupBatch = RawMessage.newBuilder().apply {
this.body = ByteString.copyFrom(this@toBatch)
): RawMessage.Builder = RawMessage.newBuilder().apply {
this.body = ByteString.copyFrom(this@toRawMessage)
this.metadataBuilder {
this.timestamp = Instant.now().toTimestamp()
this.idBuilder {
Expand All @@ -54,4 +46,4 @@ fun ByteArray.toBatch(
this.sequence = sequence
}
}
}.toBatch()
}