From 9fdb8b9ae5dcebd2a6ffa68505941917417c89b0 Mon Sep 17 00:00:00 2001 From: Oleg Date: Wed, 15 May 2024 13:42:24 +0400 Subject: [PATCH] Pass information about book and session group down to the MessageID --- .../rabbitmq/transport/BatchInfoProvider.kt | 48 +++++ .../message/impl/rabbitmq/transport/Codecs.kt | 167 +++++++++------ .../impl/rabbitmq/transport/MessageId.kt | 202 ++++++++++-------- .../impl/rabbitmq/transport/CodecsTest.kt | 69 +++++- 4 files changed, 324 insertions(+), 162 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/BatchInfoProvider.kt diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/BatchInfoProvider.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/BatchInfoProvider.kt new file mode 100644 index 00000000..427d5c7f --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/BatchInfoProvider.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.common.schema.message.impl.rabbitmq.transport + +import kotlin.properties.Delegates + +/** + * This class holds information about book and session group from the batch root. + * It is used to pass this information down to [MessageId]. + */ +internal sealed interface BatchInfoProvider { + val book: String + val sessionGroup: String + + class Mutable : BatchInfoProvider { + override var book: String by Delegates.notNull() + override var sessionGroup: String by Delegates.notNull() + override fun equals(other: Any?): Boolean { + return this === other + } + + override fun hashCode(): Int { + return System.identityHashCode(this) + } + } + + object Empty : BatchInfoProvider { + override val book: String + get() = "" + override val sessionGroup: String + get() = "" + } +} + diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/Codecs.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/Codecs.kt index 5fbd8d8a..b05cfb1e 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/Codecs.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/Codecs.kt @@ -74,15 +74,28 @@ enum class ValueType(val codec: ValueCodec<*>) { } } +class DecodeContext private constructor( + internal val batchInfoProvider: BatchInfoProvider +) { + companion object { + private val DEFAULT = DecodeContext(BatchInfoProvider.Empty) + @JvmStatic + internal fun create(provider: BatchInfoProvider): DecodeContext = DecodeContext(provider) + + @JvmStatic + fun create(): DecodeContext = DEFAULT + } +} + sealed interface ValueCodec { val type: UByte fun encode(source: T, target: ByteBuf) - fun decode(source: ByteBuf): T + fun decode(context: DecodeContext, source: ByteBuf): T } object UnknownValueCodec : ValueCodec { override val type: UByte = 0u - override fun decode(source: ByteBuf): ByteBuf = source.readSlice(source.skipBytes(Byte.SIZE_BYTES).readIntLE()) + override fun decode(context: DecodeContext, source: ByteBuf): ByteBuf = source.readSlice(source.skipBytes(Byte.SIZE_BYTES).readIntLE()) override fun encode(source: ByteBuf, target: ByteBuf): Nothing = throw UnsupportedOperationException() } @@ -97,21 +110,21 @@ abstract class AbstractCodec(final override val type: UByte) : ValueCodec protected abstract fun write(buffer: ByteBuf, value: T) - override fun decode(source: ByteBuf): T { + override fun decode(context: DecodeContext, source: ByteBuf): T { val tag = source.readByte().toUByte() check(tag == this.type) { "Unexpected type tag: $tag (expected: ${this.type})" } val length = source.readIntLE() - return read(source.readSlice(length)) // FIXME: avoid slicing to avoid buffer allocation + return read(context, source.readSlice(length)) // FIXME: avoid slicing to avoid buffer allocation } - protected abstract fun read(buffer: ByteBuf): T + protected abstract fun read(context: DecodeContext, buffer: ByteBuf): T } abstract class StringCodec( type: UByte, private val charset: Charset = Charsets.UTF_8, ) : AbstractCodec(type) { - override fun read(buffer: ByteBuf): String = buffer.readCharSequence(buffer.readableBytes(), charset).toString() + override fun read(context: DecodeContext, buffer: ByteBuf): String = buffer.readCharSequence(buffer.readableBytes(), charset).toString() override fun write(buffer: ByteBuf, value: String) { buffer.writeCharSequence(value, charset) @@ -119,7 +132,7 @@ abstract class StringCodec( } abstract class LongCodec(type: UByte) : AbstractCodec(type) { - override fun read(buffer: ByteBuf): Long = buffer.readLongLE() + override fun read(context: DecodeContext, buffer: ByteBuf): Long = buffer.readLongLE() override fun write(buffer: ByteBuf, value: Long) { buffer.writeLongLE(value) @@ -127,7 +140,7 @@ abstract class LongCodec(type: UByte) : AbstractCodec(type) { } abstract class IntCodec(type: UByte) : AbstractCodec(type) { - override fun read(buffer: ByteBuf): Int = buffer.readIntLE() + override fun read(context: DecodeContext, buffer: ByteBuf): Int = buffer.readIntLE() override fun write(buffer: ByteBuf, value: Int) { buffer.writeIntLE(value) @@ -135,9 +148,9 @@ abstract class IntCodec(type: UByte) : AbstractCodec(type) { } abstract class ListCodec(type: UByte, private val elementCodec: ValueCodec) : AbstractCodec>(type) { - override fun read(buffer: ByteBuf): MutableList = mutableListOf().also { list -> + override fun read(context: DecodeContext, buffer: ByteBuf): MutableList = mutableListOf().also { list -> while (buffer.isReadable) { - list += elementCodec.decode(buffer) + list += elementCodec.decode(context, buffer) } } @@ -151,9 +164,9 @@ abstract class MapCodec( private val keyCodec: ValueCodec, private val valueCodec: ValueCodec, ) : AbstractCodec>(type) { - override fun read(buffer: ByteBuf): MutableMap = hashMapOf().apply { + override fun read(context: DecodeContext, buffer: ByteBuf): MutableMap = hashMapOf().apply { while (buffer.isReadable) { - this[keyCodec.decode(buffer)] = valueCodec.decode(buffer) + this[keyCodec.decode(context, buffer)] = valueCodec.decode(context, buffer) } } @@ -164,7 +177,7 @@ abstract class MapCodec( } abstract class ByteBufCodec(type: UByte) : AbstractCodec(type) { - override fun read(buffer: ByteBuf): ByteBuf = buffer.copy() + override fun read(context: DecodeContext, buffer: ByteBuf): ByteBuf = buffer.copy() override fun write(buffer: ByteBuf, value: ByteBuf) { value.markReaderIndex().apply(buffer::writeBytes).resetReaderIndex() @@ -172,7 +185,7 @@ abstract class ByteBufCodec(type: UByte) : AbstractCodec(type) { } abstract class InstantCodec(type: UByte) : AbstractCodec(type) { - override fun read(buffer: ByteBuf): Instant = Instant.ofEpochSecond(buffer.readLongLE(), buffer.readIntLE().toLong()) + override fun read(context: DecodeContext, buffer: ByteBuf): Instant = Instant.ofEpochSecond(buffer.readLongLE(), buffer.readIntLE().toLong()) override fun write(buffer: ByteBuf, value: Instant) { buffer.writeLongLE(value.epochSecond).writeIntLE(value.nano) @@ -187,18 +200,19 @@ object StringTypeCodec : StringCodec(2u) object IntTypeCodec : IntCodec(3u) object MessageIdCodec : AbstractCodec(10u) { - override fun read(buffer: ByteBuf): MessageId = MessageId.builder().apply { - buffer.forEachValue { codec -> - when (codec) { - is SessionAliasCodec -> setSessionAlias(codec.decode(buffer)) - is DirectionCodec -> setDirection(codec.decode(buffer)) - is SequenceCodec -> setSequence(codec.decode(buffer)) - is SubsequenceCodec -> setSubsequence(codec.decode(buffer)) - is TimestampCodec -> setTimestamp(codec.decode(buffer)) - else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}") + override fun read(context: DecodeContext, buffer: ByteBuf): MessageId = + MessageId.builder(context.batchInfoProvider).apply { + buffer.forEachValue(context) { codec -> + when (codec) { + is SessionAliasCodec -> setSessionAlias(codec.decode(context, buffer)) + is DirectionCodec -> setDirection(codec.decode(context, buffer)) + is SequenceCodec -> setSequence(codec.decode(context, buffer)) + is SubsequenceCodec -> setSubsequence(codec.decode(context, buffer)) + is TimestampCodec -> setTimestamp(codec.decode(context, buffer)) + else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}") + } } - } - }.build() + }.build() override fun write(buffer: ByteBuf, value: MessageId) { SessionAliasCodec.encode(value.sessionAlias, buffer) @@ -216,7 +230,7 @@ object SessionGroupCodec : StringCodec(102u) object SessionAliasCodec : StringCodec(103u) object DirectionCodec : AbstractCodec(104u) { - override fun read(buffer: ByteBuf): Direction = Direction.forId(buffer.readByte().toInt()) + override fun read(context: DecodeContext, buffer: ByteBuf): Direction = Direction.forId(buffer.readByte().toInt()) override fun write(buffer: ByteBuf, value: Direction) { buffer.writeByte(value.id) @@ -240,15 +254,15 @@ object IdCodec : StringCodec(14u) object ScopeCodec : StringCodec(15u) object EventIdCodec : AbstractCodec(16u) { - override fun read(buffer: ByteBuf): EventId { + override fun read(context: DecodeContext, buffer: ByteBuf): EventId { return EventId.builder().apply { - buffer.forEachValue { codec -> + buffer.forEachValue(context) { codec -> when (codec) { - is IdCodec -> setId(codec.decode(buffer)) - is BookCodec -> setBook(codec.decode(buffer)) - is ScopeCodec -> setScope(codec.decode(buffer)) - is TimestampCodec -> setTimestamp(codec.decode(buffer)) - else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}") + is IdCodec -> setId(codec.decode(context, buffer)) + is BookCodec -> setBook(codec.decode(context, buffer)) + is ScopeCodec -> setScope(codec.decode(context, buffer)) + is TimestampCodec -> setTimestamp(codec.decode(context, buffer)) + else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}") } } }.build() @@ -263,15 +277,15 @@ object EventIdCodec : AbstractCodec(16u) { } object RawMessageCodec : AbstractCodec(20u) { - override fun read(buffer: ByteBuf): RawMessage = RawMessage.builder().apply { - buffer.forEachValue { codec -> + override fun read(context: DecodeContext, buffer: ByteBuf): RawMessage = RawMessage.builder().apply { + buffer.forEachValue(context) { codec -> when (codec) { - is MessageIdCodec -> setId(codec.decode(buffer)) - is EventIdCodec -> setEventId(codec.decode(buffer)) - is MetadataCodec -> setMetadata(codec.decode(buffer)) - is ProtocolCodec -> setProtocol(codec.decode(buffer)) - is RawMessageBodyCodec -> setBody(codec.decode(buffer)) - else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}") + is MessageIdCodec -> setId(codec.decode(context, buffer)) + is EventIdCodec -> setEventId(codec.decode(context, buffer)) + is MetadataCodec -> setMetadata(codec.decode(context, buffer)) + is ProtocolCodec -> setProtocol(codec.decode(context, buffer)) + is RawMessageBodyCodec -> setBody(codec.decode(context, buffer)) + else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}") } } }.build() @@ -288,18 +302,18 @@ object RawMessageCodec : AbstractCodec(20u) { object RawMessageBodyCodec : ByteBufCodec(21u) object ParsedMessageCodec : AbstractCodec(30u) { - override fun read(buffer: ByteBuf): ParsedMessage = ParsedMessage.builder { buf -> + override fun read(context: DecodeContext, buffer: ByteBuf): ParsedMessage = ParsedMessage.builder { buf -> ByteBufInputStream(buf).use { MAPPER.readValue(it) } }.apply { - buffer.forEachValue { codec -> + buffer.forEachValue(context) { codec -> when (codec) { - is MessageIdCodec -> setId(codec.decode(buffer)) - is EventIdCodec -> setEventId(codec.decode(buffer)) - is MetadataCodec -> setMetadata(codec.decode(buffer)) - is ProtocolCodec -> setProtocol(codec.decode(buffer)) - is MessageTypeCodec -> setType(codec.decode(buffer)) - is ParsedMessageRawBodyCodec -> setRawBody(codec.decode(buffer)) - else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}") + is MessageIdCodec -> setId(codec.decode(context, buffer)) + is EventIdCodec -> setEventId(codec.decode(context, buffer)) + is MetadataCodec -> setMetadata(codec.decode(context, buffer)) + is ProtocolCodec -> setProtocol(codec.decode(context, buffer)) + is MessageTypeCodec -> setType(codec.decode(context, buffer)) + is ParsedMessageRawBodyCodec -> setRawBody(codec.decode(context, buffer)) + else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}") } } }.build() @@ -331,11 +345,11 @@ object ParsedMessageCodec : AbstractCodec(30u) { object ParsedMessageRawBodyCodec : ByteBufCodec(31u) object MessageGroupCodec : AbstractCodec(40u) { - override fun read(buffer: ByteBuf): MessageGroup = MessageGroup.builder().apply { - buffer.forEachValue { codec -> + override fun read(context: DecodeContext, buffer: ByteBuf): MessageGroup = MessageGroup.builder().apply { + buffer.forEachValue(context) { codec -> when (codec) { - is MessageListCodec -> setMessages(codec.decode(buffer)) - else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}") + is MessageListCodec -> setMessages(codec.decode(context, buffer)) + else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}") } } }.build() @@ -346,12 +360,12 @@ object MessageGroupCodec : AbstractCodec(40u) { } object MessageListCodec : AbstractCodec>>(41u) { - override fun read(buffer: ByteBuf): MutableList> = mutableListOf>().apply { - buffer.forEachValue { codec -> + override fun read(context: DecodeContext, buffer: ByteBuf): MutableList> = mutableListOf>().apply { + buffer.forEachValue(context) { codec -> when (codec) { - is RawMessageCodec -> this += codec.decode(buffer) - is ParsedMessageCodec -> this += codec.decode(buffer) - else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}") + is RawMessageCodec -> this += codec.decode(context, buffer) + is ParsedMessageCodec -> this += codec.decode(context, buffer) + else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}") } } } @@ -366,15 +380,29 @@ object MessageListCodec : AbstractCodec>>(41u) { } object GroupBatchCodec : AbstractCodec(50u) { - override fun read(buffer: ByteBuf): GroupBatch = GroupBatch.builder().apply { - buffer.forEachValue { codec -> + override fun read(context: DecodeContext, buffer: ByteBuf): GroupBatch = GroupBatch.builder().apply { + val mutableProvider: BatchInfoProvider.Mutable + val groupContext: DecodeContext + if (context.batchInfoProvider is BatchInfoProvider.Mutable) { + mutableProvider = context.batchInfoProvider + groupContext = context + } else { + // Mutable provider is initialized only inside + mutableProvider = BatchInfoProvider.Mutable() + groupContext = DecodeContext.create(mutableProvider) + } + buffer.forEachValue(context) { codec -> when (codec) { - is BookCodec -> setBook(codec.decode(buffer)) - is SessionGroupCodec -> setSessionGroup(codec.decode(buffer)) - is GroupListCodec -> setGroups(codec.decode(buffer)) - else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(buffer)}") + is BookCodec -> setBook(codec.decode(context, buffer)) + is SessionGroupCodec -> setSessionGroup(codec.decode(context, buffer)) + is GroupListCodec -> setGroups(codec.decode(groupContext, buffer)) + else -> println("Skipping unexpected type ${codec.type} value: ${codec.decode(context, buffer)}") } } + // set this in the end + // if for some reason batch did not have book or session group this will cause an error + mutableProvider.sessionGroup = sessionGroup + mutableProvider.book = book }.build() override fun write(buffer: ByteBuf, value: GroupBatch) { @@ -382,16 +410,21 @@ object GroupBatchCodec : AbstractCodec(50u) { SessionGroupCodec.encode(value.sessionGroup, buffer) GroupListCodec.encode(value.groups, buffer) } + + fun decode(buffer: ByteBuf): GroupBatch = decode(DecodeContext.create(BatchInfoProvider.Mutable()), buffer) } object GroupListCodec : ListCodec(51u, MessageGroupCodec) -inline fun ByteBuf.forEachValue(action: (codec: ValueCodec<*>) -> Unit) { +inline fun ByteBuf.forEachValue(context: DecodeContext, action: (codec: ValueCodec<*>) -> Unit) { while (isReadable) { val type = getByte(readerIndex()).toUByte() when (val codec = ValueType.forId(type).codec) { - is UnknownValueCodec -> println("Skipping unknown type $type value: ${ByteBufUtil.hexDump(codec.decode(this))}") + is UnknownValueCodec -> println("Skipping unknown type $type value: ${ByteBufUtil.hexDump(codec.decode( + context, + this + ))}") else -> action(codec) } } diff --git a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/MessageId.kt b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/MessageId.kt index 5bd43a5d..bb730ee9 100644 --- a/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/MessageId.kt +++ b/src/main/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/MessageId.kt @@ -20,15 +20,27 @@ import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.builders.C import java.time.Instant import java.util.StringJoiner -data class MessageId( +class MessageId private constructor( val sessionAlias: String, val direction: Direction, val sequence: Long, val timestamp: Instant, /** The subsequence is not mutable by default */ val subsequence: List = emptyList(), + private val batchInfoProvider: BatchInfoProvider, ) { + val book: String by batchInfoProvider::book + val sessionGroup: String by batchInfoProvider::sessionGroup + + constructor( + sessionAlias: String, + direction: Direction, + sequence: Long, + timestamp: Instant, + subsequence: List = emptyList(), + ) : this(sessionAlias, direction, sequence, timestamp, subsequence, BatchInfoProvider.Empty) + override fun equals(other: Any?): Boolean { if (this === other) return true if (javaClass != other?.javaClass) return false @@ -80,119 +92,123 @@ data class MessageId( fun toBuilder(): Builder = MessageIdBuilderImpl(this) - companion object { - @JvmStatic - val DEFAULT: MessageId = MessageId("", Direction.OUTGOING, 0, Instant.EPOCH) - - @JvmStatic - fun builder(): Builder = MessageIdBuilderImpl() - } -} - + private class MessageIdBuilderImpl( + private val provider: BatchInfoProvider = BatchInfoProvider.Empty, + ) : Builder { + private var _sessionAlias: String? = null + private var _direction: Direction? = null + private var _sequence: Long = SEQUENCE_NOT_SET + private var _timestamp: Instant? = null + private var _subsequenceBuilder: CollectionBuilder? = null + private var _subsequence: List = emptyList() + + constructor(source: MessageId) : this(source.batchInfoProvider) { + _sessionAlias = source.sessionAlias + _direction = source.direction + _sequence = source.sequence + _timestamp = source.timestamp + _subsequence = source.subsequence + } -private const val SEQUENCE_NOT_SET = Long.MIN_VALUE + override fun setSessionAlias(sessionAlias: String): Builder = apply { + this._sessionAlias = sessionAlias + } -private class MessageIdBuilderImpl : MessageId.Builder { - private var _sessionAlias: String? = null - private var _direction: Direction? = null - private var _sequence: Long = SEQUENCE_NOT_SET - private var _timestamp: Instant? = null - private var _subsequenceBuilder: CollectionBuilder? = null - private var _subsequence: List = emptyList() - - constructor() - constructor(source: MessageId) { - _sessionAlias = source.sessionAlias - _direction = source.direction - _sequence = source.sequence - _timestamp = source.timestamp - _subsequence = source.subsequence - } + override val sessionAlias: String + get() = checkNotNull(_sessionAlias) { "Property \"sessionAlias\" has not been set" } - override fun setSessionAlias(sessionAlias: String): MessageId.Builder = apply { - this._sessionAlias = sessionAlias - } + override fun isSessionAliasSet(): Boolean = _sessionAlias != null - override val sessionAlias: String - get() = checkNotNull(_sessionAlias) { "Property \"sessionAlias\" has not been set" } + override fun setDirection(direction: Direction): Builder = apply { + this._direction = direction + } - override fun isSessionAliasSet(): Boolean = _sessionAlias != null + override val direction: Direction + get() = checkNotNull(_direction) { "Property \"direction\" has not been set" } - override fun setDirection(direction: Direction): MessageId.Builder = apply { - this._direction = direction - } + override fun isDirectionSet(): Boolean = _direction != null - override val direction: Direction - get() = checkNotNull(_direction) { "Property \"direction\" has not been set" } + override fun setSequence(sequence: Long): Builder = apply { + require(sequence != SEQUENCE_NOT_SET) { "Value $sequence for property \"sequence\" is reserved" } + this._sequence = sequence + } - override fun isDirectionSet(): Boolean = _direction != null + override val sequence: Long + get() { + check(_sequence != SEQUENCE_NOT_SET) { "Property \"sequence\" has not been set" } + return _sequence + } - override fun setSequence(sequence: Long): MessageId.Builder = apply { - require(sequence != SEQUENCE_NOT_SET) { "Value $sequence for property \"sequence\" is reserved" } - this._sequence = sequence - } + override fun isSequenceSet(): Boolean = _sequence != SEQUENCE_NOT_SET - override val sequence: Long - get() { - check(_sequence != SEQUENCE_NOT_SET) { "Property \"sequence\" has not been set" } - return _sequence + override fun setTimestamp(timestamp: Instant): Builder = apply { + this._timestamp = timestamp } - override fun isSequenceSet(): Boolean = _sequence != SEQUENCE_NOT_SET - - override fun setTimestamp(timestamp: Instant): MessageId.Builder = apply { - this._timestamp = timestamp - } - - override val timestamp: Instant - get() = checkNotNull(_timestamp) { "Property \"timestamp\" has not been set" } + override val timestamp: Instant + get() = checkNotNull(_timestamp) { "Property \"timestamp\" has not been set" } - override fun isTimestampSet(): Boolean = _timestamp != null + override fun isTimestampSet(): Boolean = _timestamp != null - override fun setSubsequence(subsequence: List): MessageId.Builder = apply { - check(_subsequenceBuilder == null) { "Cannot set subsequence after calling subsequenceBuilder()" } - this._subsequence = subsequence - } + override fun setSubsequence(subsequence: List): Builder = apply { + check(_subsequenceBuilder == null) { "Cannot set subsequence after calling subsequenceBuilder()" } + this._subsequence = subsequence + } - override fun subsequenceBuilder(): CollectionBuilder { - if (_subsequenceBuilder == null) { - if (_subsequence.isEmpty()) { - _subsequenceBuilder = CollectionBuilder() - } else { - _subsequenceBuilder = CollectionBuilder().apply { - addAll(_subsequence) + override fun subsequenceBuilder(): CollectionBuilder { + if (_subsequenceBuilder == null) { + if (_subsequence.isEmpty()) { + _subsequenceBuilder = CollectionBuilder() + } else { + _subsequenceBuilder = CollectionBuilder().apply { + addAll(_subsequence) + } + _subsequence = emptyList() } - _subsequence = emptyList() } + return checkNotNull(_subsequenceBuilder) { "subsequenceBuilder" } } - return checkNotNull(_subsequenceBuilder) { "subsequenceBuilder" } - } - override fun build(): MessageId { - _subsequence = _subsequenceBuilder?.build() ?: _subsequence - if (_sessionAlias == null || _direction == null || _sequence == SEQUENCE_NOT_SET || _timestamp == null) { - val missing = StringJoiner(",", "[", "]") - if (_sessionAlias == null) { - missing.add("sessionAlias") - } - if (_direction == null) { - missing.add("direction") - } - if (_sequence == SEQUENCE_NOT_SET) { - missing.add("sequence") - } - if (_timestamp == null) { - missing.add("timestamp") + override fun build(): MessageId { + _subsequence = _subsequenceBuilder?.build() ?: _subsequence + if (_sessionAlias == null || _direction == null || _sequence == SEQUENCE_NOT_SET || _timestamp == null) { + val missing = StringJoiner(",", "[", "]") + if (_sessionAlias == null) { + missing.add("sessionAlias") + } + if (_direction == null) { + missing.add("direction") + } + if (_sequence == SEQUENCE_NOT_SET) { + missing.add("sequence") + } + if (_timestamp == null) { + missing.add("timestamp") + } + error("Missing required properties: $missing") } - error("Missing required properties: $missing") + return MessageId( + _sessionAlias!!, + _direction!!, + _sequence, + _timestamp!!, + _subsequence, + provider, + ) } - return MessageId( - _sessionAlias!!, - _direction!!, - _sequence, - _timestamp!!, - _subsequence, - ) + } + + companion object { + @JvmStatic + val DEFAULT: MessageId = MessageId("", Direction.OUTGOING, 0, Instant.EPOCH) + + @JvmStatic + fun builder(): Builder = MessageIdBuilderImpl() + + @JvmStatic + internal fun builder(provider: BatchInfoProvider): Builder = MessageIdBuilderImpl(provider) } } +private const val SEQUENCE_NOT_SET = Long.MIN_VALUE + diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/CodecsTest.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/CodecsTest.kt index c4c706bd..37696372 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/CodecsTest.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/transport/CodecsTest.kt @@ -22,6 +22,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.DynamicTest import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestFactory +import org.junit.jupiter.api.assertAll import java.time.Instant import java.time.LocalDate import java.time.LocalDateTime @@ -116,7 +117,7 @@ class CodecsTest { val dest = Unpooled.buffer() ParsedMessageCodec.encode(parsedMessage, dest) - val decoded = ParsedMessageCodec.decode(dest) + val decoded = ParsedMessageCodec.decode(DecodeContext.create(), dest) assertEquals(0, dest.readableBytes()) { "unexpected bytes left: ${ByteBufUtil.hexDump(dest)}" } assertEquals(parsedMessage, decoded, "unexpected parsed result decoded") @@ -129,6 +130,70 @@ class CodecsTest { ) } + @Test + fun `book and session group from batch are available on message id level`() { + val message1 = RawMessage( + id = MessageId( + sessionAlias = "alias2", + direction = Direction.OUTGOING, + sequence = 2, + subsequence = mutableListOf(3, 4), + timestamp = Instant.now() + ), + metadata = mutableMapOf( + "prop3" to "value3", + "prop4" to "value4" + ), + protocol = "proto2", + body = Unpooled.wrappedBuffer(byteArrayOf(5, 6, 7, 8)) + ) + + val message2 = ParsedMessage( + id = MessageId( + sessionAlias = "alias3", + direction = Direction.OUTGOING, + sequence = 3, + subsequence = mutableListOf(5, 6), + timestamp = Instant.now() + ), + metadata = mutableMapOf( + "prop5" to "value6", + "prop7" to "value8" + ), + protocol = "proto3", + type = "some-type", + rawBody = Unpooled.buffer().apply { writeCharSequence("{}", Charsets.UTF_8) } + ) + + val batch = GroupBatch( + book = "book1", + sessionGroup = "group1", + groups = mutableListOf(MessageGroup(mutableListOf(message1, message2))) + ) + val target = Unpooled.buffer() + GroupBatchCodec.encode(batch, target) + val decoded = GroupBatchCodec.decode(target) + + val messages = decoded.groups.flatMap { it.messages } + assertEquals(2, messages.size) + + assertAll( + messages.map { + { + assertAll( + heading = "${it::class} has book and session group", + { + assertEquals("book1", it.id.book, "unexpected book") + }, + { + assertEquals("group1", it.id.sessionGroup, "unexpected session group") + }, + ) + } + } + ) + } + @TestFactory fun dateTypesTests(): Collection { LocalTime.parse("16:36:38.035420").toString() @@ -154,7 +219,7 @@ class CodecsTest { val dest = Unpooled.buffer() ParsedMessageCodec.encode(parsedMessage, dest) - val decoded = ParsedMessageCodec.decode(dest) + val decoded = ParsedMessageCodec.decode(DecodeContext.create(), dest) assertEquals(0, dest.readableBytes()) { "unexpected bytes left: ${ByteBufUtil.hexDump(dest)}" } assertEquals(parsedMessage, decoded, "unexpected parsed result decoded")