Skip to content

Commit

Permalink
Pass information about book and session group down to the MessageID
Browse files Browse the repository at this point in the history
  • Loading branch information
OptimumCode committed May 15, 2024
1 parent 81aaf29 commit 9fdb8b9
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 162 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.common.schema.message.impl.rabbitmq.transport

import kotlin.properties.Delegates

/**
* This class holds information about book and session group from the batch root.
* It is used to pass this information down to [MessageId].
*/
internal sealed interface BatchInfoProvider {
val book: String
val sessionGroup: String

class Mutable : BatchInfoProvider {
override var book: String by Delegates.notNull()
override var sessionGroup: String by Delegates.notNull()
override fun equals(other: Any?): Boolean {
return this === other
}

override fun hashCode(): Int {
return System.identityHashCode(this)
}
}

object Empty : BatchInfoProvider {
override val book: String
get() = ""
override val sessionGroup: String
get() = ""
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,28 @@ enum class ValueType(val codec: ValueCodec<*>) {
}
}

class DecodeContext private constructor(
internal val batchInfoProvider: BatchInfoProvider
) {
companion object {
private val DEFAULT = DecodeContext(BatchInfoProvider.Empty)
@JvmStatic
internal fun create(provider: BatchInfoProvider): DecodeContext = DecodeContext(provider)

@JvmStatic
fun create(): DecodeContext = DEFAULT
}
}

sealed interface ValueCodec<T> {
val type: UByte
fun encode(source: T, target: ByteBuf)
fun decode(source: ByteBuf): T
fun decode(context: DecodeContext, source: ByteBuf): T
}

object UnknownValueCodec : ValueCodec<ByteBuf> {
override val type: UByte = 0u
override fun decode(source: ByteBuf): ByteBuf = source.readSlice(source.skipBytes(Byte.SIZE_BYTES).readIntLE())
override fun decode(context: DecodeContext, source: ByteBuf): ByteBuf = source.readSlice(source.skipBytes(Byte.SIZE_BYTES).readIntLE())
override fun encode(source: ByteBuf, target: ByteBuf): Nothing = throw UnsupportedOperationException()
}

Expand All @@ -97,47 +110,47 @@ abstract class AbstractCodec<T>(final override val type: UByte) : ValueCodec<T>

protected abstract fun write(buffer: ByteBuf, value: T)

override fun decode(source: ByteBuf): T {
override fun decode(context: DecodeContext, source: ByteBuf): T {
val tag = source.readByte().toUByte()
check(tag == this.type) { "Unexpected type tag: $tag (expected: ${this.type})" }
val length = source.readIntLE()
return read(source.readSlice(length)) // FIXME: avoid slicing to avoid buffer allocation
return read(context, source.readSlice(length)) // FIXME: avoid slicing to avoid buffer allocation
}

protected abstract fun read(buffer: ByteBuf): T
protected abstract fun read(context: DecodeContext, buffer: ByteBuf): T
}

abstract class StringCodec(
type: UByte,
private val charset: Charset = Charsets.UTF_8,
) : AbstractCodec<String>(type) {
override fun read(buffer: ByteBuf): String = buffer.readCharSequence(buffer.readableBytes(), charset).toString()
override fun read(context: DecodeContext, buffer: ByteBuf): String = buffer.readCharSequence(buffer.readableBytes(), charset).toString()

override fun write(buffer: ByteBuf, value: String) {
buffer.writeCharSequence(value, charset)
}
}

abstract class LongCodec(type: UByte) : AbstractCodec<Long>(type) {
override fun read(buffer: ByteBuf): Long = buffer.readLongLE()
override fun read(context: DecodeContext, buffer: ByteBuf): Long = buffer.readLongLE()

override fun write(buffer: ByteBuf, value: Long) {
buffer.writeLongLE(value)
}
}

abstract class IntCodec(type: UByte) : AbstractCodec<Int>(type) {
override fun read(buffer: ByteBuf): Int = buffer.readIntLE()
override fun read(context: DecodeContext, buffer: ByteBuf): Int = buffer.readIntLE()

override fun write(buffer: ByteBuf, value: Int) {
buffer.writeIntLE(value)
}
}

abstract class ListCodec<T>(type: UByte, private val elementCodec: ValueCodec<T>) : AbstractCodec<List<T>>(type) {
override fun read(buffer: ByteBuf): MutableList<T> = mutableListOf<T>().also { list ->
override fun read(context: DecodeContext, buffer: ByteBuf): MutableList<T> = mutableListOf<T>().also { list ->
while (buffer.isReadable) {
list += elementCodec.decode(buffer)
list += elementCodec.decode(context, buffer)
}
}

Expand All @@ -151,9 +164,9 @@ abstract class MapCodec<K, V>(
private val keyCodec: ValueCodec<K>,
private val valueCodec: ValueCodec<V>,
) : AbstractCodec<Map<K, V>>(type) {
override fun read(buffer: ByteBuf): MutableMap<K, V> = hashMapOf<K, V>().apply {
override fun read(context: DecodeContext, buffer: ByteBuf): MutableMap<K, V> = hashMapOf<K, V>().apply {
while (buffer.isReadable) {
this[keyCodec.decode(buffer)] = valueCodec.decode(buffer)
this[keyCodec.decode(context, buffer)] = valueCodec.decode(context, buffer)
}
}

Expand All @@ -164,15 +177,15 @@ abstract class MapCodec<K, V>(
}

abstract class ByteBufCodec(type: UByte) : AbstractCodec<ByteBuf>(type) {
override fun read(buffer: ByteBuf): ByteBuf = buffer.copy()
override fun read(context: DecodeContext, buffer: ByteBuf): ByteBuf = buffer.copy()

override fun write(buffer: ByteBuf, value: ByteBuf) {
value.markReaderIndex().apply(buffer::writeBytes).resetReaderIndex()
}
}

abstract class InstantCodec(type: UByte) : AbstractCodec<Instant>(type) {
override fun read(buffer: ByteBuf): Instant = Instant.ofEpochSecond(buffer.readLongLE(), buffer.readIntLE().toLong())
override fun read(context: DecodeContext, buffer: ByteBuf): Instant = Instant.ofEpochSecond(buffer.readLongLE(), buffer.readIntLE().toLong())

override fun write(buffer: ByteBuf, value: Instant) {
buffer.writeLongLE(value.epochSecond).writeIntLE(value.nano)
Expand All @@ -187,18 +200,19 @@ object StringTypeCodec : StringCodec(2u)
object IntTypeCodec : IntCodec(3u)

object MessageIdCodec : AbstractCodec<MessageId>(10u) {
override fun read(buffer: ByteBuf): MessageId = MessageId.builder().apply {
buffer.forEachValue { codec ->
when (codec) {
is SessionAliasCodec -> setSessionAlias(codec.decode(buffer))
is DirectionCodec -> setDirection(codec.decode(buffer))
is SequenceCodec -> setSequence(codec.decode(buffer))
is SubsequenceCodec -> setSubsequence(codec.decode(buffer))
is TimestampCodec -> setTimestamp(codec.decode(buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}")
override fun read(context: DecodeContext, buffer: ByteBuf): MessageId =
MessageId.builder(context.batchInfoProvider).apply {
buffer.forEachValue(context) { codec ->
when (codec) {
is SessionAliasCodec -> setSessionAlias(codec.decode(context, buffer))
is DirectionCodec -> setDirection(codec.decode(context, buffer))
is SequenceCodec -> setSequence(codec.decode(context, buffer))
is SubsequenceCodec -> setSubsequence(codec.decode(context, buffer))
is TimestampCodec -> setTimestamp(codec.decode(context, buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}")
}
}
}
}.build()
}.build()

override fun write(buffer: ByteBuf, value: MessageId) {
SessionAliasCodec.encode(value.sessionAlias, buffer)
Expand All @@ -216,7 +230,7 @@ object SessionGroupCodec : StringCodec(102u)
object SessionAliasCodec : StringCodec(103u)

object DirectionCodec : AbstractCodec<Direction>(104u) {
override fun read(buffer: ByteBuf): Direction = Direction.forId(buffer.readByte().toInt())
override fun read(context: DecodeContext, buffer: ByteBuf): Direction = Direction.forId(buffer.readByte().toInt())

override fun write(buffer: ByteBuf, value: Direction) {
buffer.writeByte(value.id)
Expand All @@ -240,15 +254,15 @@ object IdCodec : StringCodec(14u)
object ScopeCodec : StringCodec(15u)

object EventIdCodec : AbstractCodec<EventId>(16u) {
override fun read(buffer: ByteBuf): EventId {
override fun read(context: DecodeContext, buffer: ByteBuf): EventId {
return EventId.builder().apply {
buffer.forEachValue { codec ->
buffer.forEachValue(context) { codec ->
when (codec) {
is IdCodec -> setId(codec.decode(buffer))
is BookCodec -> setBook(codec.decode(buffer))
is ScopeCodec -> setScope(codec.decode(buffer))
is TimestampCodec -> setTimestamp(codec.decode(buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}")
is IdCodec -> setId(codec.decode(context, buffer))
is BookCodec -> setBook(codec.decode(context, buffer))
is ScopeCodec -> setScope(codec.decode(context, buffer))
is TimestampCodec -> setTimestamp(codec.decode(context, buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}")
}
}
}.build()
Expand All @@ -263,15 +277,15 @@ object EventIdCodec : AbstractCodec<EventId>(16u) {
}

object RawMessageCodec : AbstractCodec<RawMessage>(20u) {
override fun read(buffer: ByteBuf): RawMessage = RawMessage.builder().apply {
buffer.forEachValue { codec ->
override fun read(context: DecodeContext, buffer: ByteBuf): RawMessage = RawMessage.builder().apply {
buffer.forEachValue(context) { codec ->
when (codec) {
is MessageIdCodec -> setId(codec.decode(buffer))
is EventIdCodec -> setEventId(codec.decode(buffer))
is MetadataCodec -> setMetadata(codec.decode(buffer))
is ProtocolCodec -> setProtocol(codec.decode(buffer))
is RawMessageBodyCodec -> setBody(codec.decode(buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}")
is MessageIdCodec -> setId(codec.decode(context, buffer))
is EventIdCodec -> setEventId(codec.decode(context, buffer))
is MetadataCodec -> setMetadata(codec.decode(context, buffer))
is ProtocolCodec -> setProtocol(codec.decode(context, buffer))
is RawMessageBodyCodec -> setBody(codec.decode(context, buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}")
}
}
}.build()
Expand All @@ -288,18 +302,18 @@ object RawMessageCodec : AbstractCodec<RawMessage>(20u) {
object RawMessageBodyCodec : ByteBufCodec(21u)

object ParsedMessageCodec : AbstractCodec<ParsedMessage>(30u) {
override fun read(buffer: ByteBuf): ParsedMessage = ParsedMessage.builder { buf ->
override fun read(context: DecodeContext, buffer: ByteBuf): ParsedMessage = ParsedMessage.builder { buf ->
ByteBufInputStream(buf).use { MAPPER.readValue(it) }
}.apply {
buffer.forEachValue { codec ->
buffer.forEachValue(context) { codec ->
when (codec) {
is MessageIdCodec -> setId(codec.decode(buffer))
is EventIdCodec -> setEventId(codec.decode(buffer))
is MetadataCodec -> setMetadata(codec.decode(buffer))
is ProtocolCodec -> setProtocol(codec.decode(buffer))
is MessageTypeCodec -> setType(codec.decode(buffer))
is ParsedMessageRawBodyCodec -> setRawBody(codec.decode(buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}")
is MessageIdCodec -> setId(codec.decode(context, buffer))
is EventIdCodec -> setEventId(codec.decode(context, buffer))
is MetadataCodec -> setMetadata(codec.decode(context, buffer))
is ProtocolCodec -> setProtocol(codec.decode(context, buffer))
is MessageTypeCodec -> setType(codec.decode(context, buffer))
is ParsedMessageRawBodyCodec -> setRawBody(codec.decode(context, buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}")
}
}
}.build()
Expand Down Expand Up @@ -331,11 +345,11 @@ object ParsedMessageCodec : AbstractCodec<ParsedMessage>(30u) {
object ParsedMessageRawBodyCodec : ByteBufCodec(31u)

object MessageGroupCodec : AbstractCodec<MessageGroup>(40u) {
override fun read(buffer: ByteBuf): MessageGroup = MessageGroup.builder().apply {
buffer.forEachValue { codec ->
override fun read(context: DecodeContext, buffer: ByteBuf): MessageGroup = MessageGroup.builder().apply {
buffer.forEachValue(context) { codec ->
when (codec) {
is MessageListCodec -> setMessages(codec.decode(buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}")
is MessageListCodec -> setMessages(codec.decode(context, buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}")
}
}
}.build()
Expand All @@ -346,12 +360,12 @@ object MessageGroupCodec : AbstractCodec<MessageGroup>(40u) {
}

object MessageListCodec : AbstractCodec<List<Message<*>>>(41u) {
override fun read(buffer: ByteBuf): MutableList<Message<*>> = mutableListOf<Message<*>>().apply {
buffer.forEachValue { codec ->
override fun read(context: DecodeContext, buffer: ByteBuf): MutableList<Message<*>> = mutableListOf<Message<*>>().apply {
buffer.forEachValue(context) { codec ->
when (codec) {
is RawMessageCodec -> this += codec.decode(buffer)
is ParsedMessageCodec -> this += codec.decode(buffer)
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}")
is RawMessageCodec -> this += codec.decode(context, buffer)
is ParsedMessageCodec -> this += codec.decode(context, buffer)
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}")
}
}
}
Expand All @@ -366,32 +380,51 @@ object MessageListCodec : AbstractCodec<List<Message<*>>>(41u) {
}

object GroupBatchCodec : AbstractCodec<GroupBatch>(50u) {
override fun read(buffer: ByteBuf): GroupBatch = GroupBatch.builder().apply {
buffer.forEachValue { codec ->
override fun read(context: DecodeContext, buffer: ByteBuf): GroupBatch = GroupBatch.builder().apply {
val mutableProvider: BatchInfoProvider.Mutable
val groupContext: DecodeContext
if (context.batchInfoProvider is BatchInfoProvider.Mutable) {
mutableProvider = context.batchInfoProvider
groupContext = context
} else {
// Mutable provider is initialized only inside
mutableProvider = BatchInfoProvider.Mutable()
groupContext = DecodeContext.create(mutableProvider)
}
buffer.forEachValue(context) { codec ->
when (codec) {
is BookCodec -> setBook(codec.decode(buffer))
is SessionGroupCodec -> setSessionGroup(codec.decode(buffer))
is GroupListCodec -> setGroups(codec.decode(buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}")
is BookCodec -> setBook(codec.decode(context, buffer))
is SessionGroupCodec -> setSessionGroup(codec.decode(context, buffer))
is GroupListCodec -> setGroups(codec.decode(groupContext, buffer))
else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}")
}
}
// set this in the end
// if for some reason batch did not have book or session group this will cause an error
mutableProvider.sessionGroup = sessionGroup
mutableProvider.book = book
}.build()

override fun write(buffer: ByteBuf, value: GroupBatch) {
BookCodec.encode(value.book, buffer)
SessionGroupCodec.encode(value.sessionGroup, buffer)
GroupListCodec.encode(value.groups, buffer)
}

fun decode(buffer: ByteBuf): GroupBatch = decode(DecodeContext.create(BatchInfoProvider.Mutable()), buffer)
}

object GroupListCodec : ListCodec<MessageGroup>(51u, MessageGroupCodec)

inline fun ByteBuf.forEachValue(action: (codec: ValueCodec<*>) -> Unit) {
inline fun ByteBuf.forEachValue(context: DecodeContext, action: (codec: ValueCodec<*>) -> Unit) {
while (isReadable) {
val type = getByte(readerIndex()).toUByte()

when (val codec = ValueType.forId(type).codec) {
is UnknownValueCodec -> println("Skipping unknown type $type value: ${ByteBufUtil.hexDump(codec.decode(this))}")
is UnknownValueCodec -> println("Skipping unknown type $type value: ${ByteBufUtil.hexDump(codec.decode(
context,
this
))}")
else -> action(codec)
}
}
Expand Down
Loading

0 comments on commit 9fdb8b9

Please sign in to comment.