From 316e824a4089cffd59d05d73ff70c9b0da55d69f Mon Sep 17 00:00:00 2001 From: Oleg Smelov <45400511+lumber1000@users.noreply.github.com> Date: Wed, 6 Dec 2023 13:56:49 +0400 Subject: [PATCH] [TH2-4918] th2 transport protocol support (#16) * [TH2-4987] Added gradle-git-properties plugin * [TS-1512] Add parameter to disable type prefix * [TH2-5051] Update common version with correct mapper settings * [TH2-5082] Use dev release. Add support for event batching --------- Co-authored-by: Oleg Smelov Co-authored-by: nikita.smirnov Co-authored-by: Oleg Smirnov --- Dockerfile | 2 +- README.md | 12 +- build.gradle | 143 ++----------- gradle.properties | 3 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../exactpro/th2/codec/AbstractAvroCodec.kt | 164 +++++++++++---- .../th2/codec/AbstractMessageWriter.kt | 79 +++++++ .../com/exactpro/th2/codec/AliasAvroCodec.kt | 34 ++- .../exactpro/th2/codec/AvroCodecFactory.kt | 5 +- .../exactpro/th2/codec/AvroCodecSettings.kt | 3 +- .../exactpro/th2/codec/GenericDataFactory.kt | 32 +++ .../exactpro/th2/codec/MessageDatumReader.kt | 48 ++--- .../exactpro/th2/codec/MessageDatumWriter.kt | 95 ++------- .../exactpro/th2/codec/StandardAvroCodec.kt | 88 ++++++-- .../th2/codec/TransportMessageDatumReader.kt | 139 ++++++++++++ .../th2/codec/TransportMessageDatumWriter.kt | 199 ++++++++++++++++++ .../th2/codec/resolver/AliasDatumResolver.kt | 26 ++- .../th2/codec/resolver/IDatumResolver.kt | 4 + .../codec/resolver/SchemaIdDatumResolver.kt | 39 +++- .../com/exactpro/th2/codec/TestAvroCodec.kt | 122 +++++++++-- 20 files changed, 905 insertions(+), 334 deletions(-) create mode 100644 src/main/kotlin/com/exactpro/th2/codec/AbstractMessageWriter.kt create mode 100644 src/main/kotlin/com/exactpro/th2/codec/GenericDataFactory.kt create mode 100644 src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumReader.kt create mode 100644 src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumWriter.kt diff --git a/Dockerfile b/Dockerfile index ac05dc3..3ee4f84 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:7.5.1-jdk11 AS build +FROM gradle:7.6-jdk11 AS build ARG release_version COPY ./ . RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version} diff --git a/README.md b/README.md index e5c8264..8183033 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# AVRO codec (2.2.1) +# AVRO codec (2.3.0) ## Description Designed for decode AVRO raw messages to parsed messages and encode back. It is based on [th2-codec](https://github.com/th2-net/th2-codec). @@ -15,7 +15,7 @@ The first byte of the AVRO header is always 0(MAGIC BYTE). It is followed by four bytes which identify the AVRO schema to be used for decoding. ```text [0],[][][][], [][][][]][][]...[] - schema id data + schema id data ``` ### Schema resolution mode by session alias Enabled when setting `sessionAliasToDictionaryAlias` is filled. @@ -61,6 +61,7 @@ AVRO codec has the following parameters: ```yaml enableIdPrefixEnumFields: false +enablePrefixEnumFieldsDecode: true avroMessageIdToDictionaryAlias: '1': "${dictionary_link:avro-schema-1-dictionary}" '2': "${dictionary_link:avro-schema-2-dictionary}" @@ -70,6 +71,10 @@ sessionAliasToDictionaryAlias: 'sessionGroup1Alias*': "${dictionary_link:avro-schema-2-dictionary}" '???????Group2Alias*': "${dictionary_link:avro-schema-3-dictionary}" ``` +**enablePrefixEnumFieldsDecode** - enables prefix before field name. +If disabled no type or ID prefix will be added before field name. +_NOTE: in this case the message decoded by codec cannot be encoded - encoding requires type or ID prefix_ + **enableIdPrefixEnumFields** - prefix setting for UNION fields. If `false`, use prefix as `AVRO data type`(for example `Record-`, `Map-`), if `true` then use `schema id` prefix(for example `Id0-`, `Id3-`). The default value is `false` **avroMessageIdToDictionaryAlias** - matching `schema id` pairs with its `alias` available for loading in the pipelineCodecContext. @@ -80,6 +85,9 @@ Only one of settings `sessionAliasToDictionaryAlias` or `avroMessageIdToDictiona ## Release notes +### 2.3.0 ++ TH2 transport protocol support + ### 2.2.1 * Added detailed TRACE logging for parsing into class com.exactpro.th2.codec.MessageDatumReader diff --git a/build.gradle b/build.gradle index 1f27a94..3b88bc5 100644 --- a/build.gradle +++ b/build.gradle @@ -1,150 +1,47 @@ -plugins { - id 'com.palantir.docker' version '0.25.0' apply false - id 'org.jetbrains.kotlin.jvm' version '1.6.21' - // for run ./gradlew jmh - id 'me.champeau.jmh' version '0.6.8' - id "org.owasp.dependencycheck" version "8.1.2" -} - -apply plugin: 'application' -apply plugin: 'com.palantir.docker' -apply plugin: 'kotlin-kapt' - -ext { - sailfishVersion = '3.3.54' -} -ext.excludeSailfish = { rcd -> - rcd.excludeModule("com.exactpro.sf", "sailfish-common") - rcd.excludeModule("com.exactpro.sf", "sailfish-rest-api-client") -} - -group = 'com.exactpro.th2' -version = release_version - -sourceCompatibility = 11 -targetCompatibility = 11 - -repositories { - maven { - name 'Sonatype_snapshots' - url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' - metadataSources { - mavenPom() - artifact() - ignoreGradleMetadataRedirection() +buildscript { + repositories { + maven { + url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" } } - maven { - name 'Sonatype_releases' - url 'https://s01.oss.sonatype.org/content/repositories/releases/' - content { - excludeSailfish(it) - } - } - maven { - name 'Sonatype_releases' - url 'https://s01.oss.sonatype.org/content/repositories/releases/' - metadataSources { - mavenPom() - artifact() - ignoreGradleMetadataRedirection() - } - } - mavenCentral() - - configurations.configureEach { - resolutionStrategy.cacheChangingModulesFor 0, 'seconds' - resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' + dependencies { + classpath "com.exactpro.th2:th2-gradle-plugin:0.0.1-dev-5915772757-13a28ae-SNAPSHOT" } } -jar { - manifest { - attributes( - 'Created-By': "${System.getProperty('java.version')} (${System.getProperty('java.vendor')})", - 'Specification-Title': '', - 'Specification-Vendor': 'Exactpro Systems LLC', - 'Implementation-Title': project.archivesBaseName, - 'Implementation-Vendor': 'Exactpro Systems LLC', - 'Implementation-Vendor-Id': 'com.exactpro', - 'Implementation-Version': project.version - ) - } +plugins { + // for run ./gradlew jmh + id 'me.champeau.jmh' version '0.6.8' } -configurations.implementation { - exclude group: 'org.slf4j', module: 'slf4j-log4j12' -} +apply plugin: "com.exactpro.th2.common-conventions" +apply plugin: "com.exactpro.th2.docker-conventions" + +apply plugin: 'kotlin-kapt' dependencies { - api platform('com.exactpro.th2:bom:4.2.0') + api platform('com.exactpro.th2:bom:4.5.0') - implementation 'com.exactpro.th2:common:5.2.0-dev' //FIXME: migrate to release - implementation 'com.exactpro.th2:codec:5.2.0-dev' + implementation "com.exactpro.th2:common:5.4.2-dev" + implementation "com.exactpro.th2:common-utils:2.2.0-dev" + implementation "com.exactpro.th2:codec:5.4.0-dev" implementation 'javax.xml.bind:jaxb-api:2.3.1' - implementation 'io.netty:netty-buffer' + implementation 'org.apache.avro:avro:1.11.1' + implementation 'commons-io:commons-io:2.12.0' compileOnly 'com.google.auto.service:auto-service:1.0.1' - - implementation 'org.slf4j:slf4j-api' - - implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' - implementation 'org.jetbrains.kotlin:kotlin-reflect' - implementation 'io.github.microutils:kotlin-logging:3.0.0' - - testImplementation "org.junit.jupiter:junit-jupiter:5.9.0" testImplementation 'org.jetbrains.kotlin:kotlin-test-junit5:1.6.21' - kapt 'com.google.auto.service:auto-service:1.0.1' jmh 'org.openjdk.jmh:jmh-core:1.36' jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.36' jmhAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.36' - - implementation 'org.apache.avro:avro:1.11.1' -} - -test { - useJUnitPlatform() -} - -application { - mainClass.set('com.exactpro.th2.codec.MainKt') -} - -applicationName = 'service' - -distTar { - archiveFileName.set("${applicationName}.tar") -} - -dockerPrepare { - dependsOn distTar -} - -docker { - copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar")) } -tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach { - kotlinOptions { - jvmTarget = "11" - } -} jmh { //profilers = ['stack'] // Use profilers to collect additional data. Supported profilers: [cl, comp, gc, stack, perf, perfnorm, perfasm, xperf, xperfasm, hs_cl, hs_comp, hs_gc, hs_rt, hs_thr, async] } dependencyLocking { lockAllConfigurations() -} -dependencyCheck { - formats=['SARIF', 'JSON', 'HTML'] - failBuildOnCVSS=5 - - analyzers { - assemblyEnabled = false - nugetconfEnabled = false - nodeEnabled = false - } -} +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 8e4a655..10e466d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1,2 @@ -release_version=2.2.1 \ No newline at end of file +release_version=2.3.0 +app_main_class=com.exactpro.th2.codec.MainKt \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ae04661..070cb70 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/main/kotlin/com/exactpro/th2/codec/AbstractAvroCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/AbstractAvroCodec.kt index 2c81473..6098544 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/AbstractAvroCodec.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/AbstractAvroCodec.kt @@ -13,14 +13,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec import com.exactpro.th2.codec.api.IPipelineCodec +import com.exactpro.th2.codec.api.IReportingContext +import com.exactpro.th2.codec.util.toJson import com.exactpro.th2.codec.util.toMessageMetadataBuilder import com.exactpro.th2.codec.util.toRawMetadataBuilder -import com.exactpro.th2.common.grpc.* import com.exactpro.th2.common.message.sessionAlias import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.toByteArray import com.google.protobuf.ByteString import com.google.protobuf.UnsafeByteOperations import io.netty.buffer.ByteBuf @@ -32,22 +38,24 @@ import org.apache.avro.io.EncoderFactory import java.io.ByteArrayOutputStream import java.io.IOException import javax.xml.bind.DatatypeConverter +import com.exactpro.th2.common.grpc.AnyMessage as ProtoAnyMessage +import com.exactpro.th2.common.grpc.Message as ProtoMessage +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup +import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage abstract class AbstractAvroCodec( settings: AvroCodecSettings, ) : IPipelineCodec { - protected val enableIdPrefixEnumFields = settings.enableIdPrefixEnumFields - - override fun decode(messageGroup: MessageGroup): MessageGroup { - val messages = messageGroup.messagesList - - - if (messages.isEmpty().or(messages.stream().allMatch(AnyMessage::hasMessage))) { - return messageGroup - } + protected val enableIdPrefixEnumFields: Boolean = settings.enableIdPrefixEnumFields + protected val enablePrefixEnumFieldsDecode: Boolean? = if (settings.enablePrefixEnumFieldsDecode) { + enableIdPrefixEnumFields + } else { + null + } - val msgBuilder = MessageGroup.newBuilder() - messages.forEach { message -> + override fun decode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup { + val msgBuilder = ProtoMessageGroup.newBuilder() + messageGroup.messagesList.forEach { message -> if (!message.hasRawMessage().or(message.message.metadata.run { protocol.isNotEmpty().and(protocol != AvroCodecFactory.PROTOCOL) })) { @@ -56,24 +64,33 @@ abstract class AbstractAvroCodec( val rawMessage = message.rawMessage val sessionAlias = rawMessage.sessionAlias val decodeMessage = decodeRawMessage(rawMessage, sessionAlias) - msgBuilder.addMessages(AnyMessage.newBuilder().setMessage(decodeMessage).build()) + msgBuilder.addMessages(ProtoAnyMessage.newBuilder().setMessage(decodeMessage).build()) } } return msgBuilder.build() } - abstract fun decodeRawMessage(rawMessage: RawMessage, sessionAlias: String): Message - - override fun encode(messageGroup: MessageGroup): MessageGroup { - val messages = messageGroup.messagesList + override fun decode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup { + return MessageGroup.builder().apply { + messageGroup.messages.forEach { message -> + addMessage( + if (message !is RawMessage || (message.protocol.isNotEmpty() && (message.protocol != AvroCodecFactory.PROTOCOL))) { + message + } else { + decodeRawMessage(message, message.id.sessionAlias) + } + ) + } + }.build() + } - if (messages.isEmpty().or(messages.stream().allMatch(AnyMessage::hasRawMessage))) { - return messageGroup - } + abstract fun decodeRawMessage(rawMessage: ProtoRawMessage, sessionAlias: String): ProtoMessage + abstract fun decodeRawMessage(rawMessage: RawMessage, sessionAlias: String): ParsedMessage - val msgBuilder = MessageGroup.newBuilder() + override fun encode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup { + val msgBuilder = ProtoMessageGroup.newBuilder() Unpooled.buffer() - messages.forEach { message -> + messageGroup.messagesList.forEach { message -> if (!message.hasMessage().or(message.message.metadata.run { protocol.isNotEmpty().and(protocol != AvroCodecFactory.PROTOCOL) })) { @@ -82,7 +99,7 @@ abstract class AbstractAvroCodec( val parsedMessage = message.message val sessionAlias = parsedMessage.sessionAlias val messageBody = encodeMessage(parsedMessage, sessionAlias) - val rawMessage = RawMessage.newBuilder() + val rawMessage = ProtoRawMessage.newBuilder() .apply { if (parsedMessage.hasParentEventId()) this.parentEventId = parsedMessage.parentEventId } .setMetadata( parsedMessage.toRawMetadataBuilder(listOf(AvroCodecFactory.PROTOCOL)) @@ -90,44 +107,85 @@ abstract class AbstractAvroCodec( .setBody(messageBody) .build() - msgBuilder.addMessages(AnyMessage.newBuilder().setRawMessage(rawMessage).build()) + msgBuilder.addMessages(ProtoAnyMessage.newBuilder().setRawMessage(rawMessage).build()) } } return msgBuilder.build() } - abstract fun encodeMessage(parsedMessage: Message, sessionAlias: String): ByteString? + override fun encode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup { + return MessageGroup.builder().apply { + messageGroup.messages.forEach { message -> + addMessage( + if (message !is ParsedMessage || message.protocol.isNotEmpty() && message.protocol != AvroCodecFactory.PROTOCOL) { + message + } else { + val sessionAlias = message.id.sessionAlias + val messageBody = encodeMessage(message, sessionAlias) + RawMessage.builder().apply { + setId(message.id) + message.eventId?.let(this::setEventId) + setMetadata(message.metadata) + setProtocol(AvroCodecFactory.PROTOCOL) + setBody(Unpooled.wrappedBuffer(messageBody)) + }.build() + } + ) + } + }.build() + } + + abstract fun encodeMessage(parsedMessage: ProtoMessage, sessionAlias: String): ByteString + abstract fun encodeMessage(parsedMessage: ParsedMessage, sessionAlias: String): ByteArray protected fun getDecodedData( reader: MessageDatumReader, decoder: Decoder, + rawMessage: ProtoRawMessage, + bytes: ByteArray, + id: Any + ): ProtoMessage = runCatching { + reader.read(ProtoMessage.newBuilder(), decoder) + .apply { if (rawMessage.hasParentEventId()) this.parentEventId = rawMessage.parentEventId } + .setMetadata( + rawMessage.toMessageMetadataBuilder(listOf(AvroCodecFactory.PROTOCOL)) + .setMessageType(reader.schema.name) + ).build() + }.getOrElse { + throw DecodeException( + "Can't parse message data: ${DatatypeConverter.printHexBinary(bytes)} by schema id: $id", + it + ) + } + + protected fun getDecodedData( + reader: TransportMessageDatumReader, + decoder: Decoder, rawMessage: RawMessage, bytes: ByteArray?, id: Any - ): Message { - try { - return reader.read(Message.newBuilder(), decoder) - .apply { if (rawMessage.hasParentEventId()) this.parentEventId = rawMessage.parentEventId } - .setMetadata( - rawMessage.toMessageMetadataBuilder(listOf(AvroCodecFactory.PROTOCOL)) - .setMessageType(reader.schema.name) - ) - .build() - - } catch (e: IOException) { - throw DecodeException( - "Can't parse message data: ${DatatypeConverter.printHexBinary(bytes)} by schema id: $id", - e - ) - } + ): ParsedMessage = runCatching { + ParsedMessage.builder().apply { + setId(rawMessage.id) + rawMessage.eventId?.let(this::setEventId) + setMetadata(rawMessage.metadata) + setProtocol(AvroCodecFactory.PROTOCOL) + setType(reader.schema.name) + setBody(reader.read(hashMapOf(), decoder)) + }.build() + }.getOrElse { + throw DecodeException( + "Can't parse message data: ${DatatypeConverter.printHexBinary(bytes)} by schema id: $id", + it + ) } protected fun getEncodedData( writer: MessageDatumWriter, - parsedMessage: Message, - byteBuf: ByteBuf? - ): ByteString? { + parsedMessage: ProtoMessage, + byteBuf: ByteBuf + ): ByteString { val byteArrayOutputStream = ByteArrayOutputStream() val encoder: Encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null) try { @@ -140,4 +198,22 @@ abstract class AbstractAvroCodec( val header = UnsafeByteOperations.unsafeWrap(ByteBufUtil.getBytes(byteBuf)) return header.concat(UnsafeByteOperations.unsafeWrap(byteArrayOutputStream.toByteArray())) } + + protected fun getEncodedData( + writer: TransportMessageDatumWriter, + parsedMessage: ParsedMessage, + byteBuf: ByteBuf + ): ByteArray { + val byteArrayOutputStream = ByteArrayOutputStream() + val encoder: Encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null) + try { + writer.write(parsedMessage.body, encoder) + } catch (e: IOException) { + throw IllegalStateException("Can't parse message data: ${parsedMessage.toJson()}", e) + } + encoder.flush() + + val header = byteBuf.toByteArray() + return header + byteArrayOutputStream.toByteArray() + } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/AbstractMessageWriter.kt b/src/main/kotlin/com/exactpro/th2/codec/AbstractMessageWriter.kt new file mode 100644 index 0000000..bf92e00 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/AbstractMessageWriter.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec + +import org.apache.avro.AvroTypeException +import org.apache.avro.Schema +import org.apache.avro.generic.GenericDatumWriter +import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter + +abstract class AbstractMessageWriter(schema: Schema, private val enableIdPrefixEnumFields: Boolean) + : GenericDatumWriter(schema, getData()) { + + protected fun resolveUnion(union: Schema, fieldName: String?, enumValue: T?): Int { + if (enableIdPrefixEnumFields.and(enumValue != null)) { + try { + return checkNotNull( + fieldName?.substringBefore(UNION_FIELD_NAME_TYPE_DELIMITER)?.substringAfter(UNION_ID_PREFIX) + ?.toInt() + ) { "Schema id not found in field name: $fieldName" } + } catch (e: NumberFormatException) { + throw AvroTypeException( + "Union prefix: $UNION_ID_PREFIX'{schema id}'$UNION_FIELD_NAME_TYPE_DELIMITER not found in field name: $fieldName", + e + ) + } + } + val schemaName = checkNotNull( + if (enumValue == null) Schema.Type.NULL.getName() else fieldName?.substringBefore( + UNION_FIELD_NAME_TYPE_DELIMITER + ) + ) { "Union prefix: {avro type}$UNION_FIELD_NAME_TYPE_DELIMITER not found in field name: $fieldName" } + return checkNotNull(union.getIndexNamed(schemaName)) { "Schema with name: $schemaName not found in union parent schema: ${union.name}" } + } + + protected fun convertFieldValue(value: String, typeName: String): Any { + val converter = typeNameToConverter[typeName] ?: throw AvroTypeException("Logical type $typeName is not supported}") + return converter(value) + } + + companion object { + private val typeNameToConverter = buildMap Any> { + put("decimal") { it.toBigDecimal() } + put("date") { LocalDate.parse(it) } + put("time-millis") { java.time.LocalTime.parse(it, localTimeWithMillisConverter) } + put("time-micros") { java.time.LocalTime.parse(it, localTimeWithMicrosConverter) } + put("timestamp-millis") { Instant.parse(it) } + put("timestamp-micros") { Instant.parse(it) } + put("local-timestamp-millis") { LocalDateTime.parse(it, localDateTimeWithMillisConverter) } + put("local-timestamp-micros") { LocalDateTime.parse(it, localDateTimeWithMicrosConverter) } + } + + private val localTimeWithMillisConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS").withZone(ZoneOffset.UTC) + private val localTimeWithMicrosConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC) + private val localDateTimeWithMillisConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS").withZone(ZoneOffset.UTC) + private val localDateTimeWithMicrosConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC) + + const val FORMAT_TYPE_ERROR = "Unsupported type %s for %s" + const val UNION_FIELD_NAME_TYPE_DELIMITER = '-' + const val UNION_ID_PREFIX = "Id" + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/AliasAvroCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/AliasAvroCodec.kt index e31a5c9..ee7955f 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/AliasAvroCodec.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/AliasAvroCodec.kt @@ -13,12 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec import com.exactpro.th2.codec.resolver.AliasDatumResolver +import com.exactpro.th2.codec.util.toJson import com.exactpro.th2.common.grpc.Message -import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.toByteArray import com.google.protobuf.ByteString import io.netty.buffer.Unpooled import org.apache.avro.Schema @@ -26,18 +31,35 @@ import org.apache.avro.io.DecoderFactory class AliasAvroCodec(sessionAliasToSchema: Map, settings: AvroCodecSettings) : AbstractAvroCodec(settings) { - private val aliasResolver = AliasDatumResolver(sessionAliasToSchema, enableIdPrefixEnumFields) - override fun decodeRawMessage(rawMessage: RawMessage, sessionAlias: String): Message { + private val aliasResolver = AliasDatumResolver(sessionAliasToSchema, enableIdPrefixEnumFields, enablePrefixEnumFieldsDecode) + + override fun decodeRawMessage(rawMessage: ProtoRawMessage, sessionAlias: String): Message { + check(sessionAlias.isNotBlank()) { "Session alias cannot be empty. Raw message: ${rawMessage.toJson()}" } val bytes = rawMessage.body.toByteArray() - check(sessionAlias.isNotEmpty()) { "Session alias cannot be empty. Raw message: ${rawMessage.toJson()}" } val reader = aliasResolver.getReader(sessionAlias) val decoder = DecoderFactory.get().binaryDecoder(bytes, null) return getDecodedData(reader, decoder, rawMessage, bytes, sessionAlias) } - override fun encodeMessage(parsedMessage: Message, sessionAlias: String): ByteString? { + + override fun decodeRawMessage(rawMessage: RawMessage, sessionAlias: String): ParsedMessage { + check(sessionAlias.isNotBlank()) { "Session alias cannot be empty. Raw message: ${rawMessage.toJson()}" } + val bytes = rawMessage.body.toByteArray() + val reader = aliasResolver.getTransportReader(sessionAlias) + val decoder = DecoderFactory.get().binaryDecoder(bytes, null) + return getDecodedData(reader, decoder, rawMessage, bytes, sessionAlias) + } + + override fun encodeMessage(parsedMessage: Message, sessionAlias: String): ByteString { + check(sessionAlias.isNotBlank()) { "Session alias cannot be empty. Parsed message: ${parsedMessage.toJson()}" } val byteBuf = Unpooled.buffer() - check(sessionAlias.isNotEmpty()) { "Session alias cannot be empty. Parsed message: ${parsedMessage.toJson()}" } val writer = aliasResolver.getWriter(sessionAlias) return getEncodedData(writer, parsedMessage, byteBuf) } + + override fun encodeMessage(parsedMessage: ParsedMessage, sessionAlias: String): ByteArray { + check(sessionAlias.isNotBlank()) { "Session alias cannot be empty. Parsed message: ${parsedMessage.toJson()}" } + val byteBuf = Unpooled.buffer() + val writer = aliasResolver.getTransportWriter(sessionAlias) + return getEncodedData(writer, parsedMessage, byteBuf) + } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/AvroCodecFactory.kt b/src/main/kotlin/com/exactpro/th2/codec/AvroCodecFactory.kt index 88d3f2b..31d068e 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/AvroCodecFactory.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/AvroCodecFactory.kt @@ -26,8 +26,8 @@ class AvroCodecFactory : IPipelineCodecFactory { private lateinit var codecContext: IPipelineCodecContext - @Deprecated("Please migrate to the protocols property") - override val protocol: String = PROTOCOL + override val protocols: Set + get() = PROTOCOLS override val settingsClass: Class = AvroCodecSettings::class.java @@ -61,5 +61,6 @@ class AvroCodecFactory : IPipelineCodecFactory { companion object { const val PROTOCOL = "AVRO" + private val PROTOCOLS = setOf(PROTOCOL) } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/AvroCodecSettings.kt b/src/main/kotlin/com/exactpro/th2/codec/AvroCodecSettings.kt index 4593c41..2baa9bb 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/AvroCodecSettings.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/AvroCodecSettings.kt @@ -20,5 +20,6 @@ import com.exactpro.th2.codec.api.IPipelineCodecSettings class AvroCodecSettings( val avroMessageIdToDictionaryAlias: Map = emptyMap(), val sessionAliasToDictionaryAlias: Map = emptyMap(), - val enableIdPrefixEnumFields: Boolean = false + val enableIdPrefixEnumFields: Boolean = false, + val enablePrefixEnumFieldsDecode: Boolean = true, ) : IPipelineCodecSettings diff --git a/src/main/kotlin/com/exactpro/th2/codec/GenericDataFactory.kt b/src/main/kotlin/com/exactpro/th2/codec/GenericDataFactory.kt new file mode 100644 index 0000000..439dd86 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/GenericDataFactory.kt @@ -0,0 +1,32 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec + +import org.apache.avro.Conversions +import org.apache.avro.data.TimeConversions +import org.apache.avro.generic.GenericData + +fun getData(): GenericData = GenericData.get().apply { + addLogicalTypeConversion(Conversions.DecimalConversion()) + addLogicalTypeConversion(TimeConversions.DateConversion()) + addLogicalTypeConversion(TimeConversions.TimeMillisConversion()) + addLogicalTypeConversion(TimeConversions.TimeMicrosConversion()) + addLogicalTypeConversion(TimeConversions.TimestampMillisConversion()) + addLogicalTypeConversion(TimeConversions.TimestampMicrosConversion()) + addLogicalTypeConversion(TimeConversions.LocalTimestampMillisConversion()) + addLogicalTypeConversion(TimeConversions.LocalTimestampMicrosConversion()) + } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/MessageDatumReader.kt b/src/main/kotlin/com/exactpro/th2/codec/MessageDatumReader.kt index 5501249..2468f03 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/MessageDatumReader.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/MessageDatumReader.kt @@ -13,34 +13,38 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec -import com.exactpro.th2.codec.MessageDatumWriter.Companion.UNION_FIELD_NAME_TYPE_DELIMITER -import com.exactpro.th2.codec.MessageDatumWriter.Companion.UNION_ID_PREFIX +import com.exactpro.th2.codec.AbstractMessageWriter.Companion.UNION_FIELD_NAME_TYPE_DELIMITER +import com.exactpro.th2.codec.AbstractMessageWriter.Companion.UNION_ID_PREFIX import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.grpc.Value import com.exactpro.th2.common.message.addField import com.exactpro.th2.common.value.toValue import com.google.protobuf.TextFormat.shortDebugString -import org.apache.avro.* -import org.apache.avro.data.TimeConversions.* -import org.apache.avro.generic.* +import org.apache.avro.Schema +import org.apache.avro.LogicalType +import org.apache.avro.Conversion +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.generic.GenericFixed import org.apache.avro.io.Decoder import org.apache.avro.io.ResolvingDecoder import java.io.IOException import java.nio.ByteBuffer -import java.util.* import javax.xml.bind.DatatypeConverter import mu.KotlinLogging -class MessageDatumReader(schema: Schema, private val enableIdPrefixEnumFields: Boolean = false) : - GenericDatumReader(schema, schema, getData()) { +class MessageDatumReader( + schema: Schema, + private val enableIdPrefixEnumFields: Boolean? = false, +) : GenericDatumReader(schema, schema, getData()) { @Throws(IOException::class) override fun readWithoutConversion(old: Any?, expected: Schema, decoder: ResolvingDecoder): Any? { return if (expected.type == Schema.Type.UNION) { val readIndex = decoder.readIndex() val schema = expected.types[readIndex] - UnionData(read(old, schema, decoder), if(enableIdPrefixEnumFields) "$UNION_ID_PREFIX$readIndex" else schema.name) + UnionData(read(old, schema, decoder), enableIdPrefixEnumFields?.let { if(it) "$UNION_ID_PREFIX$readIndex" else schema.name }) } else { super.readWithoutConversion(old, expected, decoder) } @@ -73,8 +77,12 @@ class MessageDatumReader(schema: Schema, private val enableIdPrefixEnumFields: B } } - private fun resolveUnionFieldName(fieldName: String, description: String): String { - return "$description$UNION_FIELD_NAME_TYPE_DELIMITER$fieldName" + private fun resolveUnionFieldName(fieldName: String, description: String?): String { + return if (description == null) { + fieldName + } else { + "$description$UNION_FIELD_NAME_TYPE_DELIMITER$fieldName" + } } private fun createRecord(): Message.Builder { @@ -128,21 +136,9 @@ class MessageDatumReader(schema: Schema, private val enableIdPrefixEnumFields: B } data class UnionData( val value: Any?, - val description: String + val description: String? ) companion object { - private val LOGGER = KotlinLogging.logger { } - fun getData(): GenericData? { - return GenericData.get().apply { - addLogicalTypeConversion(Conversions.DecimalConversion()) - addLogicalTypeConversion(DateConversion()) - addLogicalTypeConversion(TimeMillisConversion()) - addLogicalTypeConversion(TimeMicrosConversion()) - addLogicalTypeConversion(TimestampMillisConversion()) - addLogicalTypeConversion(TimestampMicrosConversion()) - addLogicalTypeConversion(LocalTimestampMillisConversion()) - addLogicalTypeConversion(LocalTimestampMicrosConversion()) - } - } + private val LOGGER = KotlinLogging.logger {} } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/MessageDatumWriter.kt b/src/main/kotlin/com/exactpro/th2/codec/MessageDatumWriter.kt index c3f2a19..85e3c46 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/MessageDatumWriter.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/MessageDatumWriter.kt @@ -13,28 +13,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec -import com.exactpro.th2.codec.MessageDatumReader.Companion.getData import com.exactpro.th2.common.grpc.Message import com.exactpro.th2.common.grpc.Value import com.exactpro.th2.common.message.getField import com.exactpro.th2.common.value.getList import com.exactpro.th2.common.value.getMessage -import org.apache.avro.* -import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.UnresolvedUnionException +import org.apache.avro.AvroTypeException +import org.apache.avro.Schema +import org.apache.avro.Conversion import org.apache.avro.io.Encoder -import org.apache.avro.path.* +import org.apache.avro.path.TracingAvroTypeException +import org.apache.avro.path.TracingClassCastException +import org.apache.avro.path.TracingNullPointException +import org.apache.avro.path.LocationStep import org.apache.avro.util.SchemaUtil import java.io.IOException import java.nio.ByteBuffer -import java.time.* -import java.time.format.DateTimeFormatter import javax.xml.bind.DatatypeConverter - -class MessageDatumWriter(schema: Schema, private val enableIdPrefixEnumFields: Boolean = false) : - GenericDatumWriter(schema, getData()) { +class MessageDatumWriter(schema: Schema, enableIdPrefixEnumFields: Boolean = false) : + AbstractMessageWriter(schema, enableIdPrefixEnumFields) { @Throws(IOException::class) override fun writeField(datum: Any?, f: Schema.Field, out: Encoder, state: Any?) { val value = resolveUnionValue(f, datum) @@ -97,9 +99,7 @@ class MessageDatumWriter(schema: Schema, private val enableIdPrefixEnumFields: B when (datum) { is Value -> out.writeInt(datum.simpleValue.toInt()) is Int -> out.writeInt(datum) - else -> throw AvroTypeException( - String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name) - ) + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) } } @@ -107,9 +107,7 @@ class MessageDatumWriter(schema: Schema, private val enableIdPrefixEnumFields: B when (datum) { is Value -> out.writeLong(datum.simpleValue.toLong()) is Long -> out.writeLong(datum) - else -> throw AvroTypeException( - String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name) - ) + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) } } @@ -164,66 +162,13 @@ class MessageDatumWriter(schema: Schema, private val enableIdPrefixEnumFields: B @Throws(IOException::class) override fun write(schema: Schema, datum: Any, out: Encoder) { - val logicalType = schema.logicalType - if (logicalType != null) { + val value = schema.logicalType?.let { val simpleValue = (datum as Value).simpleValue - val convertedValue = when (logicalType.name) { - Type.DECIMAL.type -> simpleValue.toBigDecimal() - Type.DATE.type -> LocalDate.parse(simpleValue.toString()) - Type.TIME_MILLIS.type -> LocalTime.parse(simpleValue.toString(), localTimeWithMillisConverter) - Type.TIME_MICROS.type -> LocalTime.parse(simpleValue.toString(), localTimeWithMicrosConverter) - Type.TIMESTAMP_MILLIS.type, - Type.TIMESTAMP_MICROS.type -> Instant.parse(simpleValue.toString()) - Type.LOCAL_TIMESTAMP_MILLIS.type -> LocalDateTime.parse(simpleValue.toString(), localDateTimeWithMillisConverter) - Type.LOCAL_TIMESTAMP_MICROS.type -> LocalDateTime.parse(simpleValue.toString(), localDateTimeWithMicrosConverter) - else -> throw AvroTypeException( - "Logical type ${logicalType.name} is not supported}" - ) - } - val conversion: Conversion<*> = data.getConversionByClass(convertedValue.javaClass, logicalType) - writeWithoutConversion(schema, convert(schema, logicalType, conversion, convertedValue), out) - } else { - writeWithoutConversion(schema, datum, out) - } - } - private fun resolveUnion(union: Schema, fieldName: String?, enumValue: Value?): Int { - if (enableIdPrefixEnumFields.and(enumValue != null)) { - try { - return checkNotNull( - fieldName?.substringBefore(UNION_FIELD_NAME_TYPE_DELIMITER)?.substringAfter(UNION_ID_PREFIX) - ?.toInt() - ) { "Schema id not found in field name: $fieldName" } - } catch (e: NumberFormatException) { - throw AvroTypeException( - "Union prefix: $UNION_ID_PREFIX'{schema id}'$UNION_FIELD_NAME_TYPE_DELIMITER not found in field name: $fieldName", - e - ) - } - } - val schemaName = checkNotNull( - if (enumValue == null) Schema.Type.NULL.getName() else fieldName?.substringBefore( - UNION_FIELD_NAME_TYPE_DELIMITER - ) - ) { "Union prefix: {avro type}$UNION_FIELD_NAME_TYPE_DELIMITER not found in field name: $fieldName" } - return checkNotNull(union.getIndexNamed(schemaName)) { "Schema with name: $schemaName not found in union parent schema: ${union.name}" } - } - companion object { - const val FORMAT_TYPE_ERROR = "Unsupported type %s for %s" - const val UNION_FIELD_NAME_TYPE_DELIMITER = '-' - const val UNION_ID_PREFIX = "Id" - val localTimeWithMillisConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS").withZone(ZoneOffset.UTC) - val localTimeWithMicrosConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC) - val localDateTimeWithMillisConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS").withZone(ZoneOffset.UTC) - val localDateTimeWithMicrosConverter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSS").withZone(ZoneOffset.UTC) - enum class Type(val type: String) { - DECIMAL("decimal"), - DATE("date"), - TIME_MILLIS("time-millis"), - TIME_MICROS("time-micros"), - TIMESTAMP_MILLIS("timestamp-millis"), - TIMESTAMP_MICROS("timestamp-micros"), - LOCAL_TIMESTAMP_MILLIS("local-timestamp-millis"), - LOCAL_TIMESTAMP_MICROS("local-timestamp-micros") - } + val convertedValue = convertFieldValue(simpleValue, it.name) + val conversion: Conversion<*> = data.getConversionByClass(convertedValue.javaClass, it) + convert(schema, it, conversion, convertedValue) + } ?: datum + + writeWithoutConversion(schema, value, out) } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/StandardAvroCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/StandardAvroCodec.kt index bacc83b..cb84788 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/StandardAvroCodec.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/StandardAvroCodec.kt @@ -13,54 +13,104 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec import com.exactpro.th2.codec.resolver.SchemaIdDatumResolver -import com.exactpro.th2.common.grpc.Message -import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.common.message.toJson +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.toByteArray import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled import org.apache.avro.Schema +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.io.Decoder import org.apache.avro.io.DecoderFactory -import java.util.* +import com.exactpro.th2.common.grpc.Message as ProtoMessage +import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage class StandardAvroCodec(schemaIdToSchema: Map, settings: AvroCodecSettings) : AbstractAvroCodec(settings) { - private val schemaIdResolver = SchemaIdDatumResolver(schemaIdToSchema, enableIdPrefixEnumFields) - override fun decodeRawMessage(rawMessage: RawMessage, sessionAlias: String): Message { - val bytes = rawMessage.body.toByteArray() + private val schemaIdResolver = SchemaIdDatumResolver(schemaIdToSchema, enableIdPrefixEnumFields, enablePrefixEnumFieldsDecode) + override fun decodeRawMessage(rawMessage: ProtoRawMessage, sessionAlias: String): ProtoMessage = decodeRawMessage( + rawMessage, + rawMessage.body.toByteArray(), + schemaIdResolver::getReader, + this::getDecodedData + ) + + override fun decodeRawMessage(rawMessage: RawMessage, sessionAlias: String): ParsedMessage = decodeRawMessage( + rawMessage, + rawMessage.body.toByteArray(), + schemaIdResolver::getTransportReader, + this::getDecodedData + ) + + private inline fun , T> decodeRawMessage( + rawMessage: S, + bytes: ByteArray, + supplyReader: (Int) -> R, + toParse: (R, Decoder, S, ByteArray, Int) -> T + ): T { val byteBuf = Unpooled.wrappedBuffer(bytes, 0, AVRO_HEADER_SIZE) val magicNumber: Byte = byteBuf.readByte() if (magicNumber.toInt() != MAGIC_BYTE_VALUE) { throw DecodeException( - "Message starts with not the magic value ${MAGIC_BYTE_VALUE}, data: ${ - Arrays.toString( - bytes - ) - }" + "Message starts with not the magic value ${MAGIC_BYTE_VALUE}, data: ${bytes.contentToString()}" ) } val schemaId = byteBuf.readInt() - val reader = schemaIdResolver.getReader(schemaId) + val reader = supplyReader(schemaId) val decoder = DecoderFactory.get().binaryDecoder( bytes, - AVRO_HEADER_SIZE, bytes.size - AVRO_HEADER_SIZE, null + AVRO_HEADER_SIZE, + bytes.size - AVRO_HEADER_SIZE, + null ) - return getDecodedData(reader, decoder, rawMessage, bytes, schemaId) + + return toParse(reader, decoder, rawMessage, bytes, schemaId) } - override fun encodeMessage(parsedMessage: Message, sessionAlias: String): ByteString? { + override fun encodeMessage(parsedMessage: ProtoMessage, sessionAlias: String): ByteString = encodeMessage( + parsedMessage, + checkNotBlank(parsedMessage.metadata.messageType) { + "Message type is required. Message ${parsedMessage.toJson()} does not have it" + }, + schemaIdResolver::getWriter, + this::getEncodedData + ) + + override fun encodeMessage(parsedMessage: ParsedMessage, sessionAlias: String): ByteArray = encodeMessage( + parsedMessage, + checkNotBlank(parsedMessage.type) { + "Message type is required. Message $parsedMessage does not have it" + }, + schemaIdResolver::getTransportWriter, + this::getEncodedData + ) + + private inline fun encodeMessage( + parsedMessage: S, + messageType: String, + supplyWriter: (Int) -> W, + toRaw: (W, S, ByteBuf) -> T + ): T { val byteBuf = Unpooled.buffer() - val messageType = - checkNotNull(parsedMessage.metadata.messageType) { "Message type is required. Message ${parsedMessage.toJson()} does not have it" } val schemaId = schemaIdResolver.getSchemaId(messageType) byteBuf.writeByte(MAGIC_BYTE_VALUE) byteBuf.writeInt(schemaId) - val writer = schemaIdResolver.getWriter(schemaId) - return getEncodedData(writer, parsedMessage, byteBuf) + val writer = supplyWriter(schemaId) + return toRaw(writer, parsedMessage, byteBuf) } + companion object { const val AVRO_HEADER_SIZE = 5 private const val MAGIC_BYTE_VALUE = 0 + + private inline fun checkNotBlank(value: String, lazyMessage: () -> Any): String { + require(value.isNotBlank(), lazyMessage) + return value + } } } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumReader.kt b/src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumReader.kt new file mode 100644 index 0000000..f341660 --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumReader.kt @@ -0,0 +1,139 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec + +import com.exactpro.th2.codec.AbstractMessageWriter.Companion.UNION_FIELD_NAME_TYPE_DELIMITER +import com.exactpro.th2.codec.AbstractMessageWriter.Companion.UNION_ID_PREFIX +import org.apache.avro.Schema +import org.apache.avro.LogicalType +import org.apache.avro.Conversion +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.generic.GenericFixed +import org.apache.avro.io.Decoder +import org.apache.avro.io.ResolvingDecoder +import java.io.IOException +import java.nio.ByteBuffer +import javax.xml.bind.DatatypeConverter +import mu.KotlinLogging + +class TransportMessageDatumReader( + schema: Schema, + private val enableIdPrefixEnumFields: Boolean? = false, +) : GenericDatumReader>(schema, schema, getData()) { + @Throws(IOException::class) + override fun readWithoutConversion(old: Any?, expected: Schema, decoder: ResolvingDecoder): Any? { + return if (expected.type == Schema.Type.UNION) { + val readIndex = decoder.readIndex() + val schema = expected.types[readIndex] + UnionData(read(old, schema, decoder), enableIdPrefixEnumFields?.let { if(it) "$UNION_ID_PREFIX$readIndex" else schema.name }) + } else { + super.readWithoutConversion(old, expected, decoder) + } + } + + @Throws(IOException::class) + override fun readRecord(old: Any?, expected: Schema, decoder: ResolvingDecoder): MutableMap { + val r = createRecord() + for (f in decoder.readFieldOrder()) { + readField(r, f, old, decoder, null) + } + return r + } + + @Throws(IOException::class) + override fun readField(r: Any, f: Schema.Field, oldDatum: Any?, decoder: ResolvingDecoder, state: Any?) { + var readValue = read(oldDatum, f.schema(), decoder) + var fieldName = f.name() + if (readValue is UnionData) { + val description = readValue.description + readValue = readValue.value + if (readValue != null) { + fieldName = resolveUnionFieldName(fieldName, description) + } + } + if (readValue != null) { + val convertedValue = readValue.convertToValue() + LOGGER.trace { "Read value ${f.name()}: $convertedValue (origin: $readValue)" } + (r as MutableMap)[fieldName] = convertedValue + } + } + + private fun resolveUnionFieldName(fieldName: String, description: String?): String { + return if (description == null) { + fieldName + } else { + "$description$UNION_FIELD_NAME_TYPE_DELIMITER$fieldName" + } + } + + private fun createRecord(): MutableMap { + return mutableMapOf() + } + + @Throws(IOException::class) + override fun readString(old: Any?, expected: Schema, decoder: Decoder): String { + return super.readString(old, expected, decoder).toString() + } + + @Throws(IOException::class) + override fun readEnum(expected: Schema, decoder: Decoder): String { + return expected.enumSymbols[decoder.readEnum()] + } + + override fun addToMap(map: Any, key: Any?, value: Any?) { + if (value != null) { + (map as MutableMap)[key.toString()] = value + } + } + + override fun newMap(old: Any?, size: Int): MutableMap { + return mutableMapOf() + } + + override fun convert(datum: Any?, schema: Schema?, type: LogicalType?, conversion: Conversion<*>?): Any { + val convertedValue = super.convert(datum, schema, type, conversion) + if(LOGGER.isTraceEnabled) { + val rawValueString = when(datum) { + is ByteBuffer -> datum.asHexString() + else -> datum.toString() + } + LOGGER.trace { "Converting value using logical type ${type?.name} from $rawValueString to $convertedValue" } + } + return convertedValue + } + + private fun ByteBuffer.asHexString(): String { + val bytes = ByteArray(this.remaining()) + this.get(bytes) + return DatatypeConverter.printHexBinary(bytes) + } + + private fun Any.convertToValue(): Any = when (this) { + is ByteBuffer -> ByteArray(remaining()).also { get(it) } + is GenericFixed -> bytes() + else -> this + } + + data class UnionData( + val value: Any?, + val description: String? + ) + + companion object { + private val LOGGER = KotlinLogging.logger {} + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumWriter.kt b/src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumWriter.kt new file mode 100644 index 0000000..e9e66fd --- /dev/null +++ b/src/main/kotlin/com/exactpro/th2/codec/TransportMessageDatumWriter.kt @@ -0,0 +1,199 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.codec + +import org.apache.avro.Schema +import org.apache.avro.AvroTypeException +import org.apache.avro.UnresolvedUnionException +import org.apache.avro.io.Encoder +import org.apache.avro.path.TracingAvroTypeException +import org.apache.avro.path.TracingNullPointException +import org.apache.avro.path.TracingClassCastException +import org.apache.avro.path.LocationStep +import org.apache.avro.util.SchemaUtil +import java.io.IOException +import java.nio.ByteBuffer +import javax.xml.bind.DatatypeConverter + +class TransportMessageDatumWriter(schema: Schema, enableIdPrefixEnumFields: Boolean = false) : + AbstractMessageWriter>(schema, enableIdPrefixEnumFields) { + @Throws(IOException::class) + override fun writeField(datum: Any?, f: Schema.Field, out: Encoder, state: Any?) { + val value = resolveUnionValue(f, datum) + if (value != null) { + try { + write(f.schema(), value, out) + } catch (e: Exception) { + when (e) { + is UnresolvedUnionException -> throw UnresolvedUnionException(f.schema(), f, value).apply { addSuppressed(e) } + is TracingNullPointException -> throw e.apply {tracePath(LocationStep(".", f.name()))} + is TracingClassCastException -> throw e.apply {tracePath(LocationStep(".", f.name()))} + is TracingAvroTypeException -> throw e.apply {tracePath(LocationStep(".", f.name()))} + is NullPointerException -> throw npe(e, " in field ${f.name()}") + is ClassCastException -> throw addClassCastMsg(e, " in field ${f.name()}") + is AvroTypeException -> throw addAvroTypeMsg(e, " in field ${f.name()}") + else -> throw e + } + } + } + } + + private fun resolveUnionValue(f: Schema.Field, datum: Any?): Any? { + @Suppress("UNCHECKED_CAST") + val map = (datum as Map) + if (f.schema().type == Schema.Type.UNION) { + val fieldName = + map.keys.firstOrNull { s -> s.endsWith("$UNION_FIELD_NAME_TYPE_DELIMITER${f.name()}") } + val unionValue = if (fieldName == null) null else map[fieldName] + return Pair(resolveUnion(f.schema(), fieldName, unionValue), unionValue) + } + return map[f.name()] + } + + @Throws(IOException::class) + override fun writeRecord(schema: Schema, datum: Any?, out: Encoder) { + for (field in schema.fields) { + writeField(datum, field, out, null) + } + } + + @Throws(IOException::class) + override fun writeWithoutConversion(schema: Schema, datum: Any, out: Encoder) { + val schemaType = schema.type + try { + when (schemaType) { + Schema.Type.RECORD -> writeRecord(schema, datum, out) + Schema.Type.ENUM -> writeEnum(schema, datum, out) + Schema.Type.ARRAY -> writeArray(schema, datum, out) + Schema.Type.MAP -> writeMap(schema, datum, out) + Schema.Type.UNION -> { + val (unionIndex, value) = datum as Pair<*, *> + out.writeIndex(unionIndex as Int) + if (value != null) { + write(schema.types[unionIndex], value, out) + } + } + + Schema.Type.FIXED -> { + val bytes = when (datum) { + is ByteArray -> datum + is String -> DatatypeConverter.parseHexBinary(datum) + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) + } + writeFixed(schema, bytes, out) + } + Schema.Type.STRING -> writeString(schema, datum as String, out) + Schema.Type.BYTES -> writeBytes(datum, out) + Schema.Type.INT -> { + val int = when (datum) { + is Int -> datum + is String -> datum.toInt() + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) + } + out.writeInt(int) + } + + Schema.Type.LONG -> { + val long = when (datum) { + is Long -> datum + is String -> datum.toLong() + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) + } + out.writeLong(long) + } + + Schema.Type.FLOAT -> { + val float = when (datum) { + is Float -> datum + is String -> datum.toFloat() + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) + } + out.writeFloat(float) + } + + Schema.Type.DOUBLE -> { + val double = when (datum) { + is Double -> datum + is String -> datum.toDouble() + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) + } + out.writeDouble(double) + } + + Schema.Type.BOOLEAN -> { + val boolean = when (datum) { + is Boolean -> datum + is String -> datum.toBoolean() + else -> throw AvroTypeException(String.format(FORMAT_TYPE_ERROR, datum.javaClass, schemaType.name)) + } + out.writeBoolean(boolean) + } + + Schema.Type.NULL -> out.writeNull() + else -> throw AvroTypeException("Value ${SchemaUtil.describe(datum)} is not a ${SchemaUtil.describe(schema)}") + } + } catch (e: Exception) { + when (e) { + is NullPointerException -> throw TracingNullPointException(e, schema, false) + is ClassCastException -> throw TracingClassCastException(e, datum, schema, false) + is AvroTypeException -> throw TracingAvroTypeException(e) + else -> throw e + } + } + } + + @Throws(IOException::class) + override fun writeEnum(schema: Schema, datum: Any, out: Encoder) { + out.writeEnum(schema.getEnumOrdinal(datum as String)) + } + + override fun getMapSize(map: Any): Int = (map as Map<*, *>).size + override fun getMapEntries(map: Any): Iterable?> = (map as Map<*, *>).entries + + @Throws(IOException::class) + override fun writeFixed(schema: Schema, datum: Any, out: Encoder) { + out.writeFixed(datum as ByteArray, 0, schema.fixedSize) + } + + @Throws(IOException::class) + override fun writeBytes(datum: Any, out: Encoder) { + when(datum){ + is ByteBuffer -> { + val bytes = ByteArray(datum.remaining()) + datum.get(bytes) + out.writeBytes(bytes) + } + is ByteArray -> out.writeBytes(datum) + is String -> out.writeBytes(DatatypeConverter.parseHexBinary(datum)) + else -> throw AvroTypeException("Class ${datum::class.java} is not supported}") + } + } + + @Throws(IOException::class) + override fun write(schema: Schema, datum: Any, out: Encoder) { + val value = schema.logicalType?.let { + val convertedValue = if (datum is String) { + convertFieldValue(datum, it.name) + } else { + datum + } + convert(schema, it, data.getConversionByClass(convertedValue.javaClass, it), convertedValue) + } ?: datum + + writeWithoutConversion(schema, value, out) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/resolver/AliasDatumResolver.kt b/src/main/kotlin/com/exactpro/th2/codec/resolver/AliasDatumResolver.kt index 8798d64..635720f 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/resolver/AliasDatumResolver.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/resolver/AliasDatumResolver.kt @@ -13,10 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec.resolver import com.exactpro.th2.codec.MessageDatumReader import com.exactpro.th2.codec.MessageDatumWriter +import com.exactpro.th2.codec.TransportMessageDatumReader +import com.exactpro.th2.codec.TransportMessageDatumWriter import org.apache.avro.Schema import org.apache.commons.io.FilenameUtils import java.util.concurrent.locks.Lock @@ -24,7 +27,8 @@ import java.util.concurrent.locks.ReentrantLock class AliasDatumResolver( aliasToSchema: Map, - enableIdPrefixEnumFields: Boolean = false + enableIdPrefixEnumFields: Boolean = false, + enablePrefixEnumFieldsDecode: Boolean? = true, ) : IDatumResolver { private val datumCache: MutableMap = mutableMapOf() private val wildcardAliases: List @@ -35,8 +39,10 @@ class AliasDatumResolver( Alias( it.first, DatumPair( - MessageDatumReader(it.second, enableIdPrefixEnumFields), - MessageDatumWriter(it.second, enableIdPrefixEnumFields) + MessageDatumReader(it.second, enablePrefixEnumFieldsDecode), + MessageDatumWriter(it.second, enableIdPrefixEnumFields), + TransportMessageDatumReader(it.second, enablePrefixEnumFieldsDecode), + TransportMessageDatumWriter(it.second, enableIdPrefixEnumFields) ) ) }.partition { isWildcard(it.wildcardAlias) } @@ -55,17 +61,27 @@ class AliasDatumResolver( data class DatumPair( val reader: MessageDatumReader, - val writer: MessageDatumWriter + val writer: MessageDatumWriter, + val transportReader: TransportMessageDatumReader, + val transportWriter: TransportMessageDatumWriter ) override fun getReader(value: String): MessageDatumReader { return getDatums(value)?.reader ?: throw IllegalStateException("No reader found for session alias: $value") } + override fun getTransportReader(value: String): TransportMessageDatumReader { + return getDatums(value)?.transportReader ?: throw IllegalStateException("No reader found for session alias: $value") + } + override fun getWriter(value: String): MessageDatumWriter { return getDatums(value)?.writer ?: throw IllegalStateException("No writer found for session alias: $value") } + override fun getTransportWriter(value: String): TransportMessageDatumWriter { + return getDatums(value)?.transportWriter ?: throw IllegalStateException("No writer found for session alias: $value") + } + private fun getDatums(value: String): DatumPair? { lock.lock() try { @@ -75,7 +91,6 @@ class AliasDatumResolver( } } - private fun resolveAlias(alias: String): DatumPair? { wildcardAliases.forEach { aliasElement -> if (FilenameUtils.wildcardMatch(alias, aliasElement.wildcardAlias)) { @@ -90,5 +105,4 @@ class AliasDatumResolver( private fun isWildcard(value: String): Boolean { return (value.indexOf('?') != -1).or(value.indexOf('*') != -1) } - } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/resolver/IDatumResolver.kt b/src/main/kotlin/com/exactpro/th2/codec/resolver/IDatumResolver.kt index 66e802b..af0af63 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/resolver/IDatumResolver.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/resolver/IDatumResolver.kt @@ -17,9 +17,13 @@ package com.exactpro.th2.codec.resolver import com.exactpro.th2.codec.MessageDatumReader import com.exactpro.th2.codec.MessageDatumWriter +import com.exactpro.th2.codec.TransportMessageDatumReader +import com.exactpro.th2.codec.TransportMessageDatumWriter interface IDatumResolver { fun getReader(value: T): MessageDatumReader + fun getTransportReader(value: T): TransportMessageDatumReader fun getWriter(value: T): MessageDatumWriter + fun getTransportWriter(value: T): TransportMessageDatumWriter } \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/resolver/SchemaIdDatumResolver.kt b/src/main/kotlin/com/exactpro/th2/codec/resolver/SchemaIdDatumResolver.kt index 76a77cf..647983c 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/resolver/SchemaIdDatumResolver.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/resolver/SchemaIdDatumResolver.kt @@ -13,38 +13,55 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec.resolver import com.exactpro.th2.codec.MessageDatumReader import com.exactpro.th2.codec.MessageDatumWriter +import com.exactpro.th2.codec.TransportMessageDatumReader +import com.exactpro.th2.codec.TransportMessageDatumWriter import org.apache.avro.Schema class SchemaIdDatumResolver( schemaIdToSchema: Map, - enableIdPrefixEnumFields: Boolean = false + enableIdPrefixEnumFields: Boolean = false, + enablePrefixEnumFieldsDecode: Boolean? = true, ) : IDatumResolver { + private val datumReaders = schemaIdToSchema.mapValues { - MessageDatumReader( - it.value, - enableIdPrefixEnumFields - ) + MessageDatumReader(it.value, enablePrefixEnumFieldsDecode) + } + + private val transportDatumReaders = schemaIdToSchema.mapValues { + TransportMessageDatumReader(it.value, enablePrefixEnumFieldsDecode) } + private val datumWriters = schemaIdToSchema.mapValues { - MessageDatumWriter( - it.value, - enableIdPrefixEnumFields - ) + MessageDatumWriter(it.value, enableIdPrefixEnumFields) + } + + private val transportDatumWriters = schemaIdToSchema.mapValues { + TransportMessageDatumWriter(it.value, enableIdPrefixEnumFields) } + private val schemaIdToMessageName = checkSchemaNames(schemaIdToSchema.mapValues { it.value.name }) private val messageNameToSchemaId = schemaIdToMessageName.entries.associate { (key, value) -> value to key } override fun getReader(value: Int): MessageDatumReader { - return checkNotNull(datumReaders[value]) { "No reader found for schema id: $value" } + return checkNotNull(datumReaders[value]) { "No proto reader found for schema id: $value" } + } + + override fun getTransportReader(value: Int): TransportMessageDatumReader { + return checkNotNull(transportDatumReaders[value]) { "No transport reader found for schema id: $value" } } override fun getWriter(value: Int): MessageDatumWriter { - return checkNotNull(datumWriters[value]) { "No writer found for schema id: $value" } + return checkNotNull(datumWriters[value]) { "No proto writer found for schema id: $value" } + } + + override fun getTransportWriter(value: Int): TransportMessageDatumWriter { + return checkNotNull(transportDatumWriters[value]) { "No transport writer found for schema id: $value" } } fun getSchemaId(messageName: String): Int { diff --git a/src/test/kotlin/com/exactpro/th2/codec/TestAvroCodec.kt b/src/test/kotlin/com/exactpro/th2/codec/TestAvroCodec.kt index 6c2e18e..2230e69 100644 --- a/src/test/kotlin/com/exactpro/th2/codec/TestAvroCodec.kt +++ b/src/test/kotlin/com/exactpro/th2/codec/TestAvroCodec.kt @@ -13,13 +13,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.exactpro.th2.codec import com.exactpro.th2.codec.api.DictionaryAlias import com.exactpro.th2.codec.api.IPipelineCodecContext -import com.exactpro.th2.common.grpc.* +import com.exactpro.th2.codec.api.impl.ReportingContext +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup +import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage +import com.exactpro.th2.common.grpc.AnyMessage as ProtoAnyMessage +import com.exactpro.th2.common.grpc.MessageID as ProtoMessageID +import com.exactpro.th2.common.grpc.ConnectionID as ProtoConnectionID +import com.exactpro.th2.common.grpc.RawMessageMetadata as ProtoRawMessageMetadata import com.exactpro.th2.common.schema.dictionary.DictionaryType +import com.exactpro.th2.common.schema.grpc.router.GrpcRouter +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.* +import com.exactpro.th2.common.utils.message.toTransport import com.google.protobuf.ByteString +import io.netty.buffer.Unpooled +import com.google.protobuf.UnsafeByteOperations import org.apache.avro.Schema.Parser import org.apache.avro.generic.GenericDatumWriter import org.apache.avro.io.BinaryEncoder @@ -32,6 +44,10 @@ import java.io.InputStream import javax.xml.bind.DatatypeConverter import kotlin.test.assertEquals import org.junit.jupiter.api.Disabled +import java.time.Instant +import kotlin.test.assertNotNull +import kotlin.test.assertContentEquals +import kotlin.test.assertTrue class TestAvroCodec { private val pipelineCodecContext = CodecContext() @@ -51,6 +67,7 @@ class TestAvroCodec { ) decodeToEncode(rawBytes, 37) } + @Test fun `test full decode encode union id prefix`() { codec = codecFactory.create(AvroCodecSettings(schemaIdToSchemaAlias, emptyMap(), true)) @@ -96,6 +113,15 @@ class TestAvroCodec { decodeToEncode(rawBytes, 12) } + @Test + fun `test decode using th2 transport protocol`() { + val rawBytes = + DatatypeConverter.parseHexBinary( + "0000000002000012B7ADB9A75E63FF6149CAFFDA9A0DF7C3FC8CDFF08FED78EF3369D249F1EE3F3E66676F71776D75686762676D6E636A78797679786F67666E636A746E646C6600020A326F686B786B6B686C6D616671776C716162747561696F6E716E0E6278796D7269792069626D7167766B697476676662666B644A62646168726F657964676B6F74676A766275646862757279657374716261716F766B626474267466776D6278676D6977647163786F6179777500044E6770636D6A796C777061736D71667562677377656B7671696169776A7965616F676A726A7962610C7978666E6D78246E6267776D686E69687265786A776E6E75631E68726461726771686179646A6A6471002E" + ) + decodeToEncode(rawBytes, 12) + } + @Test fun `test decode encode logical types`() { val rawBytes = @@ -107,46 +133,106 @@ class TestAvroCodec { @Test fun `test decode encode union with logical types`() { + val rawBytes = DatatypeConverter.parseHexBinary("000000000402F586B9CF0E") + decodeToEncode(rawBytes, 1) + } + + @Test + fun `test decode encode logical types without type prefix for proto`() { + codec = codecFactory.create(AvroCodecSettings(schemaIdToSchemaAlias, enablePrefixEnumFieldsDecode = false)) val rawBytes = DatatypeConverter.parseHexBinary( "000000000402F586B9CF0E" ) - decodeToEncode(rawBytes, 1) + val messageGroup = decode(UnsafeByteOperations.unsafeWrap(rawBytes), sessionAlias = null) + assertEquals(1, messageGroup.messagesCount, "unexpected groups count") + val message = messageGroup.getMessages(0) + assertNotNull(message.message.fieldsMap["enumWithLogical"], "cannot find field without type prefix in ${message.message.fieldsMap}") + } + + @Test + fun `test decode encode logical types without type prefix for transport`() { + codec = codecFactory.create(AvroCodecSettings(schemaIdToSchemaAlias, enablePrefixEnumFieldsDecode = false)) + val rawBytes = + DatatypeConverter.parseHexBinary( + "000000000402F586B9CF0E" + ) + val messageGroup = transportDecode(rawBytes, sessionAlias = null) + assertEquals(1, messageGroup.messages.size, "unexpected groups count") + val message = messageGroup.messages[0] + assertTrue(message is ParsedMessage, "got unexpected message type: ${message::class}") + assertNotNull(message.body["enumWithLogical"], "cannot find field without type prefix in ${message.body}") } - private fun decodeToEncode(rawBytes: ByteArray?, expected: Int, sessionAlias: String? = null) { + private fun decodeToEncode(rawBytes: ByteArray, expected: Int, sessionAlias: String? = null) { + // proto decode val body = ByteString.copyFrom(rawBytes) val decodeGroup = decode(body, sessionAlias) - val actualCountFields = decodeGroup.messagesList[0].message.fieldsMap.size - assertEquals(expected, actualCountFields) - val encodeBody = encode(decodeGroup) - assertEquals(body, encodeBody) + assertEquals(expected, decodeGroup.messagesList[0].message.fieldsMap.size) + + // proto encode + val protoEncoded = encode(decodeGroup)?.toByteArray() + assertContentEquals(protoEncoded, rawBytes) + + // transport decode + val transportDecodeGroup = transportDecode(rawBytes, sessionAlias) + assertEquals(expected, (transportDecodeGroup.messages[0].body as Map).size) + + // transport encode + val transportEncoded = transportEncode(transportDecodeGroup) + assertContentEquals(transportEncoded, rawBytes) + + // transport encode (string values - converted from proto) + val convertedMsg = decodeGroup.messagesList[0].message.toTransport() + val transportEncodedFromStrings = transportEncode(MessageGroup(listOf(convertedMsg))) + assertContentEquals(transportEncodedFromStrings, rawBytes) } - private fun decode(body: ByteString?, sessionAlias: String?): MessageGroup { - val rawMessage = RawMessage.newBuilder() + private fun decode(body: ByteString?, sessionAlias: String?): ProtoMessageGroup { + val rawMessage = ProtoRawMessage.newBuilder() .setMetadata( - RawMessageMetadata.newBuilder() - .setId(MessageID.newBuilder() + ProtoRawMessageMetadata.newBuilder() + .setId(ProtoMessageID.newBuilder() .setSequence(1) - .apply { if(sessionAlias != null) setConnectionId(ConnectionID.newBuilder().setSessionAlias(sessionAlias))}) + .apply { if(sessionAlias != null) setConnectionId(ProtoConnectionID.newBuilder().setSessionAlias(sessionAlias))}) .setProtocol(AvroCodecFactory.PROTOCOL) ) .setBody(body) .build() - val group = MessageGroup.newBuilder().addMessages(AnyMessage.newBuilder().setRawMessage(rawMessage)).build() - val decodeGroup = codec.decode(group) + val group = ProtoMessageGroup.newBuilder().addMessages(ProtoAnyMessage.newBuilder().setRawMessage(rawMessage)).build() + val decodeGroup = codec.decode(group, ReportingContext()) val decodeMessages = decodeGroup.messagesList assertEquals(1, decodeMessages.size) return decodeGroup } - private fun encode(messageGroup: MessageGroup): ByteString? { - val encodeMessages = codec.encode(messageGroup).messagesList + private fun transportDecode(body: ByteArray, sessionAlias: String?): MessageGroup { + val rawMessage = RawMessage( + id = MessageId(sessionAlias ?: "", Direction.OUTGOING, 1, Instant.now()), + protocol = AvroCodecFactory.PROTOCOL, + body = Unpooled.wrappedBuffer(body) + ) + + val group = MessageGroup(mutableListOf(rawMessage)) + val decodeGroup = codec.decode(group, ReportingContext()) + val decodeMessages = decodeGroup.messages + assertEquals(1, decodeMessages.size) + return decodeGroup + } + + private fun encode(messageGroup: ProtoMessageGroup): ByteString? { + val encodeMessages = codec.encode(messageGroup, ReportingContext()).messagesList assertEquals(1, encodeMessages.size) return encodeMessages[0].rawMessage.body } + private fun transportEncode(messageGroup: MessageGroup): ByteArray { + val encodeMessages = codec.encode(messageGroup, ReportingContext()).messages + assertEquals(1, encodeMessages.size) + val rawMessage = encodeMessages[0] as RawMessage + return rawMessage.body.toByteArray() + } + @Disabled @Test fun generateAvroRandomDataBySchema() { @@ -179,6 +265,10 @@ class TestAvroCodec { TODO("Not yet implemented") } + override fun getGrpcRouter(): GrpcRouter { + TODO("Not yet implemented") + } + } enum class SchemaAlias(val alias: String) { BIG_SCHEMA("big_schema"),