From 8525c1afd2a99bfc4d5aea77f6a142181f43be21 Mon Sep 17 00:00:00 2001 From: Nikita Smirnov <46124551+Nikita-Smirnov-Exactpro@users.noreply.github.com> Date: Fri, 29 Dec 2023 14:07:22 +0400 Subject: [PATCH] [Th2 5143] problem when codec publishes event for book `A` with attached messages for book `B` (#16) * [TH2-4934] th2 transport protocol support --------- Co-authored-by: Oleg Smelov --- ...lease-java-publish-sonatype-and-docker.yml | 22 ++ Dockerfile | 2 +- README.md | 49 ++-- build.gradle | 99 +++---- gradle.properties | 4 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../th2/codec/http/HttpPipelineCodec.kt | 255 +++++++++++++++--- .../codec/http/HttpPipelineCodecFactory.kt | 6 +- .../com/exactpro/th2/codec/http/DecodeTest.kt | 119 +++++++- .../com/exactpro/th2/codec/http/EncodeTest.kt | 79 +++++- suppressions.xml | 9 + 11 files changed, 483 insertions(+), 163 deletions(-) create mode 100644 .github/workflows/dev-release-java-publish-sonatype-and-docker.yml create mode 100644 suppressions.xml diff --git a/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml new file mode 100644 index 0000000..e515f3a --- /dev/null +++ b/.github/workflows/dev-release-java-publish-sonatype-and-docker.yml @@ -0,0 +1,22 @@ +name: Build and release Java distributions to sonatype. + +on: + push: + tags: + - \d+.\d+.\d+-dev + +jobs: + build: + uses: th2-net/.github/.github/workflows/compound-java.yml@main + with: + build-target: 'Docker' + runsOn: ubuntu-latest + gradleVersion: '7' + docker-username: ${{ github.actor }} + devRelease: true + secrets: + sonatypeUsername: ${{ secrets.SONATYPE_NEXUS_USERNAME }} + sonatypePassword: ${{ secrets.SONATYPE_NEXUS_PASSWORD }} + sonatypeSigningKey: ${{ secrets.SONATYPE_GPG_ARMORED_KEY }} + sonatypeSigningPassword: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} + docker-password: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 84de259..3ee4f84 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM gradle:7.5-jdk11 AS build +FROM gradle:7.6-jdk11 AS build ARG release_version COPY ./ . RUN gradle --no-daemon clean build dockerPrepare -Prelease_version=${release_version} diff --git a/README.md b/README.md index 85dad91..cc401ce 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# HTTP Codec v0.2.0 +# HTTP Codec v0.4.0 This microservice can encode and decode HTTP messages @@ -27,32 +27,33 @@ This codec processes parsed messages with `http` protocol in metadata field 1. HTTP message - `Message` with `Request` or `Response` type (protocol = `http`) 2. HTTP message body - `RawMessage` with a raw message body (if present) -If decoded message was an HTTP request, message body metadata would contain `method` and `uri` properties with HTTP request method name and URI respectively +If decoded message was an HTTP request, message body metadata would contain `method` and `uri` properties with HTTP +request method name and URI respectively ## Message types * Request -|Field|Type|Description| -|:---:|:---:|:---:| -|method|String|HTTP method name (e.g. GET, POST, etc.)| -|uri|String|Request URI (e.g. /some/request/path?param1=value1¶m2=value2...)| -|headers|List\
|HTTP headers (e.g. Host, Content-Length, etc.)| +| Field | Type | Description | +|:-------:|:-------------:|:--------------------------------------------------------------------:| +| method | String | HTTP method name (e.g. GET, POST, etc.) | +| uri | String | Request URI (e.g. /some/request/path?param1=value1¶m2=value2...) | +| headers | List\
| HTTP headers (e.g. Host, Content-Length, etc.) | * Response -|Field|Type|Description| -|:---:|:---:|:---:| -|statusCode|String|HTTP status code (e.g. 200, 403, 500, etc)| -|reason|String|HTTP status reason (e.g. OK, Forbidden, Internal Server Error, etc.)| -|headers|List\
|HTTP headers (e.g. Set-Cookie, Content-Length, etc.)| +| Field | Type | Description | +|:----------:|:-------------:|:--------------------------------------------------------------------:| +| statusCode | String | HTTP status code (e.g. 200, 403, 500, etc) | +| reason | String | HTTP status reason (e.g. OK, Forbidden, Internal Server Error, etc.) | +| headers | List\
| HTTP headers (e.g. Set-Cookie, Content-Length, etc.) | * Header -|Field|Type|Description| -|:---:|:---:|:---:| -|name|String|HTTP header name| -|value|String|HTTP header value| +| Field | Type | Description | +|:-----:|:------:|:-----------------:| +| name | String | HTTP header name | +| value | String | HTTP header value | ## Deployment via `infra-mgr` @@ -119,16 +120,20 @@ spec: enabled: false ``` -### Gradle metadata note -ignoreGradleMetadataRedirection is used for sonatype because Sailfish dependencies have constrains that interfere with BOM, -so we exclude Gradle metadata for these repositories. - -It's been verified that Sailfish itself is compatible with versions from BOM and therefore safe to use. +# Release notes +## 0.4.0 -# Release notes ++ th2 transport protocol support ++ Updated bom: `4.5.0-dev` ++ Updated common: `5.7.2-dev` ++ Updated common-utils: `2.2.2-dev` ++ Updated codec: `5.4.1-dev` ++ Updated kotlin: `1.8.22` ++ Removed sailfish-utils ## 0.3.0 + * th2-common upgrade to `3.44.0` * th2-bom upgrade to `4.1.0` * th2-codec upgrade to `4.7.6` diff --git a/build.gradle b/build.gradle index 9c8567b..9155ccd 100644 --- a/build.gradle +++ b/build.gradle @@ -2,18 +2,8 @@ plugins { id 'com.palantir.docker' version '0.25.0' id 'org.jetbrains.kotlin.jvm' version "${kotlin_version}" id 'application' - id "org.owasp.dependencycheck" version "7.2.0" id 'org.jetbrains.kotlin.kapt' version "${kotlin_version}" -} - -dependencyCheck { - format='HTML' - failBuildOnCVSS=5 -} - -ext { - sharedDir = file("${project.rootDir}/shared") - sailfishVersion = '3.3.54' + id "org.owasp.dependencycheck" version "8.2.1" } group = 'com.exactpro.th2' @@ -22,60 +12,19 @@ version = release_version sourceCompatibility = 11 targetCompatibility = 11 -ext.excludeSailfish = { rcd -> - rcd.excludeModule("com.exactpro.sf", "sailfish-core") - rcd.excludeModule("com.exactpro.sf", "sailfish-common") - rcd.excludeModule("com.exactpro.sf", "sailfish-rest-api-client") - rcd.excludeModule("com.exactpro.sf", "service-http") -} - repositories { - maven { - name 'MavenLocal' - url sharedDir - } - - maven { - name 'Sonatype_snapshots' - url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' - content { - excludeSailfish(it) - } - } - - // ignoreGradleMetadataRedirection is used for sonatype because - // Sailfish dependencies have constrains that interfere with our BOM - // so we exclude Gradle metadata for this repositories. - // We've checked these versions - they are compatible and safe to use + mavenCentral() maven { name 'Sonatype_snapshots' url 'https://s01.oss.sonatype.org/content/repositories/snapshots/' - metadataSources { - mavenPom() - artifact() - ignoreGradleMetadataRedirection() - } } maven { name 'Sonatype_releases' url 'https://s01.oss.sonatype.org/content/repositories/releases/' - content { - excludeSailfish(it) - } } - maven { - name 'Sonatype_releases' - url 'https://s01.oss.sonatype.org/content/repositories/releases/' - metadataSources { - mavenPom() - artifact() - ignoreGradleMetadataRedirection() - } - } - mavenCentral() mavenLocal() - configurations.all { + configurations.configureEach { resolutionStrategy.cacheChangingModulesFor 0, 'seconds' resolutionStrategy.cacheDynamicVersionsFor 0, 'seconds' } @@ -96,24 +45,21 @@ jar { } dependencies { - api platform('com.exactpro.th2:bom:4.1.0') + api platform("com.exactpro.th2:bom:4.5.0") - implementation 'com.exactpro.th2:common:5.1.0-dev-version-5+' - implementation 'com.exactpro.th2:codec:5.0.0-dev-version-5+' + implementation "com.exactpro.th2:common:5.7.2-dev" + implementation "com.exactpro.th2:common-utils:2.2.2-dev" + implementation "com.exactpro.th2:codec:5.4.1-dev" - implementation 'com.exactpro.th2:sailfish-utils:3.14.0' - - compileOnly 'com.google.auto.service:auto-service:1.0.1' - annotationProcessor 'com.google.auto.service:auto-service:1.0.1' + compileOnly "com.google.auto.service:auto-service:1.1.1" + annotationProcessor "com.google.auto.service:auto-service:1.1.1" + kapt "com.google.auto.service:auto-service:1.1.1" - implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: kotlin_version - implementation group: 'org.jetbrains.kotlin', name: 'kotlin-reflect', version: kotlin_version - implementation group: 'io.github.microutils', name: 'kotlin-logging', version: '1.7.9' - implementation group: 'com.athaydes.rawhttp', name: 'rawhttp-core', version: '2.4.1' + implementation "io.github.microutils:kotlin-logging:3.0.5" + implementation "com.athaydes.rawhttp:rawhttp-core:2.4.1" - testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit5', version: kotlin_version - testImplementation 'org.junit.jupiter:junit-jupiter:5.9.0' - kapt 'com.google.auto.service:auto-service:1.0.1' + testImplementation "org.jetbrains.kotlin:kotlin-test-junit5" + testImplementation "org.junit.jupiter:junit-jupiter:5.10.0" } application { @@ -149,3 +95,20 @@ compileTestKotlin { test { useJUnitPlatform() } + +configurations { + compileClasspath { + resolutionStrategy.activateDependencyLocking() + } +} + +dependencyCheck { + formats = ['SARIF', 'JSON', 'HTML'] + failBuildOnCVSS = 5 + suppressionFile = file('suppressions.xml') + analyzers { + assemblyEnabled = false + nugetconfEnabled = false + nodeEnabled = false + } +} diff --git a/gradle.properties b/gradle.properties index b63fa03..2412c75 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ kotlin.code.style=official -kotlin_version=1.6.21 -release_version=0.3.0 \ No newline at end of file +kotlin_version=1.8.22 +release_version=0.4.0 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 11abb6d..2b20ca3 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ #Mon May 25 11:22:24 MSK 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodec.kt b/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodec.kt index 762283c..700bc9e 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodec.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodec.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,26 +17,26 @@ package com.exactpro.th2.codec.http import com.exactpro.th2.codec.api.IPipelineCodec +import com.exactpro.th2.codec.api.IReportingContext import com.exactpro.th2.codec.http.HttpPipelineCodecFactory.Companion.PROTOCOL import com.exactpro.th2.common.grpc.AnyMessage.KindCase.MESSAGE import com.exactpro.th2.common.grpc.AnyMessage.KindCase.RAW_MESSAGE import com.exactpro.th2.common.grpc.Direction.FIRST import com.exactpro.th2.common.grpc.Direction.SECOND import com.exactpro.th2.common.grpc.EventID -import com.exactpro.th2.common.grpc.Message -import com.exactpro.th2.common.grpc.MessageGroup import com.exactpro.th2.common.grpc.MessageID -import com.exactpro.th2.common.grpc.RawMessage import com.exactpro.th2.common.grpc.Value import com.exactpro.th2.common.grpc.Value.KindCase.LIST_VALUE import com.exactpro.th2.common.grpc.Value.KindCase.MESSAGE_VALUE import com.exactpro.th2.common.grpc.Value.KindCase.SIMPLE_VALUE import com.exactpro.th2.common.message.addField import com.exactpro.th2.common.message.plusAssign +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.* +import com.exactpro.th2.common.utils.message.transport.getMap import com.exactpro.th2.common.value.toValue import com.google.protobuf.ByteString import com.google.protobuf.MessageLite.Builder -import com.google.protobuf.Timestamp +import io.netty.buffer.Unpooled import rawhttp.core.HttpMessage import rawhttp.core.HttpVersion.HTTP_1_1 import rawhttp.core.RawHttp @@ -52,14 +52,14 @@ import rawhttp.core.body.BytesBody import java.io.ByteArrayOutputStream import java.net.URI import kotlin.text.Charsets.UTF_8 +import com.exactpro.th2.common.grpc.Message as ProtoMessage +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup +import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage class HttpPipelineCodec : IPipelineCodec { - override fun encode(messageGroup: MessageGroup): MessageGroup { - val messages = messageGroup.messagesList - if (messages.isEmpty()) { - return messageGroup - } + override fun encode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup { + val messages = messageGroup.messagesList require(messages.size <= 2) { "Message group must contain at most 2 messages" } require(messages[0].kindCase == MESSAGE) { "First message must be a parsed message" } @@ -68,13 +68,13 @@ class HttpPipelineCodec : IPipelineCodec { require(message.metadata.protocol == PROTOCOL) { "Unsupported protocol: ${message.metadata.protocol}" } - val body: RawMessage? = messages.getOrNull(1)?.run { + val body: ProtoRawMessage? = messages.getOrNull(1)?.run { require(kindCase == RAW_MESSAGE) { "Second message must be a raw message" } rawMessage } val messageFields = message.fieldsMap - val builder = MessageGroup.newBuilder() + val builder = ProtoMessageGroup.newBuilder() val httpMessage: HttpMessage = when (val messageType = message.metadata.messageType) { REQUEST_MESSAGE -> messageFields.run { @@ -95,10 +95,12 @@ class HttpPipelineCodec : IPipelineCodec { body?.toBody().run(::withBody) ?: this } } + RESPONSE_MESSAGE -> messageFields.run { requireKnownFields(RESPONSE_FIELDS) - val statusCode = getString(STATUS_CODE_FIELD).toIntOrNull() ?: error("$STATUS_CODE_FIELD is not a number") + val statusCode = + getString(STATUS_CODE_FIELD).toIntOrNull() ?: error("$STATUS_CODE_FIELD is not a number") val reason = getString(REASON_FIELD) val headers = RawHttpHeaders.newBuilder() @@ -114,12 +116,13 @@ class HttpPipelineCodec : IPipelineCodec { body?.toBody().run(::withBody) ?: this } } + else -> error("Unsupported message type: $messageType") } val metadata = message.metadata - builder += httpMessage.toByteArray().toRawMessage( + builder += httpMessage.toByteArray().toProtoRawMessage( metadata.id, metadata.propertiesMap, body?.metadata?.propertiesMap, @@ -129,37 +132,138 @@ class HttpPipelineCodec : IPipelineCodec { return builder.build() } - override fun decode(messageGroup: MessageGroup): MessageGroup { - val messages = messageGroup.messagesList + override fun encode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup { + val messages = messageGroup.messages + + require(messages.size <= 2) { "Message group must contain at most 2 messages" } + val message = messages[0] as? ParsedMessage ?: error("First message must be a parsed message") + + require(message.protocol == PROTOCOL) { "Unsupported protocol: ${message.protocol}" } + + val body: RawMessage? = messages.getOrNull(1)?.run { + require(this is RawMessage) { "Second message must be a raw message" } + this + } + + val encodedMessages = mutableListOf>() + + val httpMessage: HttpMessage = when (val messageType = message.type) { + REQUEST_MESSAGE -> message.body.run { + requireKnownFields(REQUEST_FIELDS) + + val uri = get(URI_FIELD) as String + val method = get(METHOD_FIELD) as String + val headers = RawHttpHeaders.newBuilder().apply { + (getMap(HEADERS_FIELD) as? Map)?.fillHeaders(this) + } + + RawHttpRequest( + RequestLine(method, URI(uri), HTTP_1_1), + headers.build(), + null, + null + ).run { + body?.toBody().run(::withBody) ?: this + } + } + + RESPONSE_MESSAGE -> message.body.run { + requireKnownFields(RESPONSE_FIELDS) - if (messages.isEmpty()) { - return messageGroup + val statusCode = + (get(STATUS_CODE_FIELD) as String).toIntOrNull() ?: error("$STATUS_CODE_FIELD is not a number") + val reason = get(REASON_FIELD) as String + val headers = RawHttpHeaders.newBuilder().apply { + (getMap(HEADERS_FIELD) as? Map)?.fillHeaders(this) + } + + RawHttpResponse( + null, + null, + StatusLine(HTTP_1_1, statusCode, reason), + headers.build(), + null + ).run { + body?.toBody().run(::withBody) ?: this + } + } + + else -> error("Unsupported message type: $messageType") } + encodedMessages += RawMessage( + message.id, + message.eventId, + message.metadata, + message.protocol, + Unpooled.wrappedBuffer(httpMessage.toByteArray()) + ) + + return MessageGroup(encodedMessages) + } + + override fun decode(messageGroup: ProtoMessageGroup, context: IReportingContext): ProtoMessageGroup { + val messages = messageGroup.messagesList + require(messages.size == 1) { "Message group must contain only 1 message" } require(messages[0].kindCase == RAW_MESSAGE) { "Message must be a raw message" } val message = messages[0].rawMessage val body = message.body.toByteArray().toString(UTF_8) - val builder = MessageGroup.newBuilder() + val builder = ProtoMessageGroup.newBuilder() when (val direction = message.metadata.id.direction) { FIRST -> RAW_HTTP.parseResponse(body).convert(RESPONSE_MESSAGE, message, builder) { httpMessage, _ -> httpMessage.addField(STATUS_CODE_FIELD, statusCode) httpMessage.addField(REASON_FIELD, reason) } - SECOND -> RAW_HTTP.parseRequest(body).convert(REQUEST_MESSAGE, message, builder) { httpMessage, metadataProperties -> - httpMessage.addField(METHOD_FIELD, method) - httpMessage.addField(URI_FIELD, uri) - metadataProperties[METHOD_METADATA_PROPERTY] = method - metadataProperties[URI_METADATA_PROPERTY] = uri.toString() - } + + SECOND -> RAW_HTTP.parseRequest(body) + .convert(REQUEST_MESSAGE, message, builder) { httpMessage, metadataProperties -> + httpMessage.addField(METHOD_FIELD, method) + httpMessage.addField(URI_FIELD, uri) + metadataProperties[METHOD_METADATA_PROPERTY] = method + metadataProperties[URI_METADATA_PROPERTY] = uri.toString() + } + else -> error("Unsupported message direction: $direction") } return builder.build() } + override fun decode(messageGroup: MessageGroup, context: IReportingContext): MessageGroup { + val messages = messageGroup.messages + + require(messages.size == 1) { "Message group must contain only 1 message" } + + val message = messages[0] + require(message is RawMessage) { "Message must be a raw message" } + + val body = message.body.toString(UTF_8) + val decodedMessages = mutableListOf>() + + when (val direction = message.id.direction) { + Direction.INCOMING -> RAW_HTTP.parseResponse(body) + .convert(RESPONSE_MESSAGE, message, decodedMessages) { httpMessage, _ -> + httpMessage[STATUS_CODE_FIELD] = statusCode + httpMessage[REASON_FIELD] = reason + } + + Direction.OUTGOING -> RAW_HTTP.parseRequest(body) + .convert(REQUEST_MESSAGE, message, decodedMessages) { httpMessage, metadataProperties -> + httpMessage[METHOD_FIELD] = method + httpMessage[URI_FIELD] = uri + metadataProperties[METHOD_METADATA_PROPERTY] = method + metadataProperties[URI_METADATA_PROPERTY] = uri.toString() + } + + else -> error("Unsupported message direction: $direction") + } + + return MessageGroup(decodedMessages) + } + companion object { const val REQUEST_MESSAGE = "Request" const val RESPONSE_MESSAGE = "Response" @@ -180,9 +284,10 @@ class HttpPipelineCodec : IPipelineCodec { private val RAW_HTTP = RawHttp() - private fun Map.requireKnownFields(messageFields: Set) = require(keys.all(messageFields::contains)) { - "Message contains unknown fields: ${keys.filterNot(messageFields::contains)}" - } + private fun Map.requireKnownFields(messageFields: Set) = + require(keys.all(messageFields::contains)) { + "Message contains unknown fields: ${keys.filterNot(messageFields::contains)}" + } private fun Map.getString(fieldName: String) = get(fieldName)?.run { require(kindCase == SIMPLE_VALUE) { "$fieldName is not a string" } @@ -208,20 +313,24 @@ class HttpPipelineCodec : IPipelineCodec { } } + private fun Map.fillHeaders(builder: RawHttpHeaders.Builder) = + forEach { (name, value) -> builder.with(name, value) } + + private fun ProtoRawMessage.toBody() = BytesBody(body.toByteArray()) private fun RawMessage.toBody() = BytesBody(body.toByteArray()) private fun Writable.toByteArray() = ByteArrayOutputStream().apply(::writeTo).toByteArray() private inline operator fun T.invoke(block: T.() -> Unit) = apply(block) - private fun ByteArray.toRawMessage( + private fun ByteArray.toProtoRawMessage( messageId: MessageID, metadataProperties: Map, additionalMetadataProperties: Map? = null, subsequence: Iterable = messageId.subsequenceList.dropLast(1), eventID: EventID - ): RawMessage = RawMessage.newBuilder().apply { - this.body = ByteString.copyFrom(this@toRawMessage) + ): ProtoRawMessage = ProtoRawMessage.newBuilder().apply { + this.body = ByteString.copyFrom(this@toProtoRawMessage) parentEventIdBuilder.mergeFrom(eventID) this.metadataBuilder { putAllProperties(metadataProperties) @@ -234,10 +343,10 @@ class HttpPipelineCodec : IPipelineCodec { private fun HttpMessage.convert( type: String, - source: RawMessage, - builder: MessageGroup.Builder, + source: ProtoRawMessage, + builder: ProtoMessageGroup.Builder, handleStartLine: T.( - httpMessage: Message.Builder, + httpMessage: ProtoMessage.Builder, metadataProperties: MutableMap ) -> Unit ) { @@ -247,7 +356,7 @@ class HttpPipelineCodec : IPipelineCodec { val messageId = metadata.id val subsequence = messageId.subsequenceList - builder += Message.newBuilder().apply { + builder += ProtoMessage.newBuilder().apply { handleStartLine(startLine as T, this, additionalMetadataProperties) parentEventIdBuilder.mergeFrom(source.parentEventId) @@ -255,7 +364,7 @@ class HttpPipelineCodec : IPipelineCodec { val headerList = arrayListOf() headers.forEach { name, value -> - val headerMessage = Message.newBuilder() + val headerMessage = ProtoMessage.newBuilder() headerMessage.addField(HEADER_NAME_FIELD, name) headerMessage.addField(HEADER_VALUE_FIELD, value) headerList += headerMessage.toValue() @@ -278,7 +387,7 @@ class HttpPipelineCodec : IPipelineCodec { body.map(BodyReader::decodeBody) .filter(ByteArray::isNotEmpty) .ifPresent { - builder += it.toRawMessage( + builder += it.toProtoRawMessage( messageId, metadataProperties, additionalMetadataProperties, @@ -288,12 +397,74 @@ class HttpPipelineCodec : IPipelineCodec { } } + private fun HttpMessage.convert( + type: String, + source: RawMessage, + resultMessages: MutableList>, + handleStartLine: T.( + httpMessageBody: MutableMap, + metadataProperties: MutableMap + ) -> Unit + ) { + val additionalMetadataProperties = mutableMapOf() + val messageId = source.id.toBuilder().addSubsequence(1).build() + val parsedBody = mutableMapOf() + + handleStartLine(startLine as T, parsedBody, additionalMetadataProperties) + + if (!headers.isEmpty) { + parsedBody[HEADERS_FIELD] = mutableMapOf().apply { + headers.forEach { name, value -> this[name] = value } + } + } + + resultMessages += ParsedMessage( + messageId.toBuilder().addSubsequence(1).build(), + source.eventId, + type, + source.metadata, + PROTOCOL, + parsedBody + ) + + body.map(BodyReader::decodeBody) + .filter(ByteArray::isNotEmpty) + .ifPresent { + resultMessages += RawMessage( + messageId.toBuilder().addSubsequence(2).build(), + source.eventId, + source.metadata, + body = Unpooled.wrappedBuffer(it) + ) + } + } + + private fun RawHttpRequest.convert( + type: String, + source: ProtoRawMessage, + builder: ProtoMessageGroup.Builder, + handleStartLine: RequestLine.( + httpMessage: ProtoMessage.Builder, + metadataProperties: MutableMap + ) -> Unit + ) = convert(type, source, builder, handleStartLine) + + private fun RawHttpResponse<*>.convert( + type: String, + source: ProtoRawMessage, + builder: ProtoMessageGroup.Builder, + handleStartLine: StatusLine.( + httpMessage: ProtoMessage.Builder, + metadataProperties: MutableMap + ) -> Unit + ) = convert(type, source, builder, handleStartLine) + private fun RawHttpRequest.convert( type: String, source: RawMessage, - builder: MessageGroup.Builder, + builder: MutableList>, handleStartLine: RequestLine.( - httpMessage: Message.Builder, + httpMessage: MutableMap, metadataProperties: MutableMap ) -> Unit ) = convert(type, source, builder, handleStartLine) @@ -301,11 +472,11 @@ class HttpPipelineCodec : IPipelineCodec { private fun RawHttpResponse<*>.convert( type: String, source: RawMessage, - builder: MessageGroup.Builder, + builder: MutableList>, handleStartLine: StatusLine.( - httpMessage: Message.Builder, + httpMessage: MutableMap, metadataProperties: MutableMap ) -> Unit ) = convert(type, source, builder, handleStartLine) } -} +} \ No newline at end of file diff --git a/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodecFactory.kt b/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodecFactory.kt index 16993a8..c38224f 100644 --- a/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodecFactory.kt +++ b/src/main/kotlin/com/exactpro/th2/codec/http/HttpPipelineCodecFactory.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,12 +25,12 @@ import com.google.auto.service.AutoService @AutoService(IPipelineCodecFactory::class) class HttpPipelineCodecFactory : IPipelineCodecFactory { override val settingsClass: Class = HttpPipelineCodecSettings::class.java - @Deprecated("Please migrate to the protocols property") - override val protocol: String = PROTOCOL + override val protocols: Set get() = PROTOCOLS override fun init(pipelineCodecContext: IPipelineCodecContext) = Unit override fun create(settings: IPipelineCodecSettings?): IPipelineCodec = HttpPipelineCodec() companion object { const val PROTOCOL = "http" + private val PROTOCOLS = setOf(PROTOCOL) } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/codec/http/DecodeTest.kt b/src/test/kotlin/com/exactpro/th2/codec/http/DecodeTest.kt index 9450571..186d996 100644 --- a/src/test/kotlin/com/exactpro/th2/codec/http/DecodeTest.kt +++ b/src/test/kotlin/com/exactpro/th2/codec/http/DecodeTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,24 @@ package com.exactpro.th2.codec.http +import com.exactpro.th2.codec.api.impl.ReportingContext import com.exactpro.th2.codec.http.HttpPipelineCodecFactory.Companion.PROTOCOL -import com.exactpro.th2.common.grpc.AnyMessage -import com.exactpro.th2.common.grpc.Direction -import com.exactpro.th2.common.grpc.MessageGroup -import com.exactpro.th2.common.grpc.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.EventId +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageId +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.Direction +import com.exactpro.th2.common.grpc.AnyMessage as ProtoAnyMessage +import com.exactpro.th2.common.grpc.Direction as ProtoDirection +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup +import com.exactpro.th2.common.grpc.RawMessage as ProtoRawMessage import com.google.protobuf.ByteString +import io.netty.buffer.Unpooled import kotlin.test.assertEquals import org.junit.jupiter.api.Test +import java.net.URI +import java.time.Instant class DecodeTest { @@ -40,16 +50,16 @@ class DecodeTest { val codec = HttpPipelineCodec() - val message = RawMessage.newBuilder().apply { + val message = ProtoRawMessage.newBuilder().apply { parentEventIdBuilder.id = eventID metadataBuilder.protocol = PROTOCOL - metadataBuilder.idBuilder.direction = Direction.SECOND + metadataBuilder.idBuilder.direction = ProtoDirection.SECOND body = ByteString.copyFrom(request.toByteArray()) } - val messageGroup = MessageGroup.newBuilder().addMessages(AnyMessage.newBuilder().setRawMessage(message).build()).build() + val messageGroup = ProtoMessageGroup.newBuilder().addMessages(ProtoAnyMessage.newBuilder().setRawMessage(message).build()).build() - val decodedEventID = codec.decode(messageGroup).getMessages(0).message.parentEventId + val decodedEventID = codec.decode(messageGroup, ReportingContext()).getMessages(0).message.parentEventId assertEquals(eventID, decodedEventID.id) } @@ -66,18 +76,101 @@ class DecodeTest { val codec = HttpPipelineCodec() - val message = RawMessage.newBuilder().apply { + val message = ProtoRawMessage.newBuilder().apply { parentEventIdBuilder.id = eventID metadataBuilder.protocol = PROTOCOL - metadataBuilder.idBuilder.direction = Direction.FIRST + metadataBuilder.idBuilder.direction = ProtoDirection.FIRST body = ByteString.copyFrom(response.toByteArray()) } - val messageGroup = MessageGroup.newBuilder().addMessages(AnyMessage.newBuilder().setRawMessage(message).build()).build() + val messageGroup = ProtoMessageGroup.newBuilder().addMessages(ProtoAnyMessage.newBuilder().setRawMessage(message).build()).build() - val decodedEventID = codec.decode(messageGroup).getMessages(0).message.parentEventId + val decodedEventID = codec.decode(messageGroup, ReportingContext()).getMessages(0).message.parentEventId assertEquals(eventID, decodedEventID.id) } + @Test + fun `parent event id test - response (transport)`() { + val eventID = "123" + + val response = """ + HTTP/1.1 200 OK + Content-Type: text/plain + Content-Length: 0 + """.trimIndent() + + + val codec = HttpPipelineCodec() + val message = RawMessage( + id = MessageId("alias_01", Direction.INCOMING, 1, Instant.now()), + eventId = EventId(eventID, "book_01", "scope_01", Instant.now()), + protocol = PROTOCOL, + body = Unpooled.wrappedBuffer(response.toByteArray()) + ) + + val messageGroup = MessageGroup(listOf(message)) + + val decodedGroup = codec.decode(messageGroup, ReportingContext()) + + val decodedMessage = decodedGroup.messages[0] as ParsedMessage + + val decodedEventID = decodedMessage.eventId + val decodedBody = decodedMessage.body + + assertEquals(PROTOCOL, decodedMessage.protocol) + assertEquals("Response", decodedMessage.type) + assertEquals(3, decodedBody.size) + assertEquals(200, decodedBody["statusCode"]) + assertEquals("OK", decodedBody["reason"]) + val decodedHeaders = decodedBody["headers"] as Map + assertEquals(2, decodedHeaders.size) + assertEquals("text/plain", decodedHeaders["Content-Type"]) + assertEquals("0", decodedHeaders["Content-Length"]) + assertEquals(eventID, decodedEventID?.id) + } + + @Test + fun `parent event id test - request (transport)`() { + val eventID = "123" + + val request = """ + GET /hello.txt HTTP/1.1 + User-Agent: OpenSSL/0.9.7l + Host: www.test.com + Accept-Language: en, mi + """.trimIndent() + + + val codec = HttpPipelineCodec() + val message = RawMessage( + id = MessageId("alias_01", Direction.OUTGOING, 1, Instant.now()), + eventId = EventId(eventID, "book_01", "scope_01", Instant.now()), + protocol = PROTOCOL, + body = Unpooled.wrappedBuffer(request.toByteArray()) + ) + + val messageGroup = MessageGroup(listOf(message)) + + val decodedGroup = codec.decode(messageGroup, ReportingContext()) + + assertEquals(1, decodedGroup.messages.size) + + val decodedMessage = decodedGroup.messages[0] as ParsedMessage + + val decodedEventID = decodedMessage.eventId + val decodedBody = decodedMessage.body + + assertEquals(PROTOCOL, decodedMessage.protocol) + assertEquals("Request", decodedMessage.type) + assertEquals("GET", decodedBody["method"]) + assertEquals(URI("http://www.test.com/hello.txt"), decodedBody["uri"]) + val decodedHeaders = decodedBody["headers"] as Map + assertEquals(3, decodedHeaders.size) + assertEquals("OpenSSL/0.9.7l", decodedHeaders["User-Agent"]) + assertEquals("www.test.com", decodedHeaders["Host"]) + assertEquals("en, mi", decodedHeaders["Accept-Language"]) + + assertEquals(eventID, decodedEventID?.id) + } } \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/codec/http/EncodeTest.kt b/src/test/kotlin/com/exactpro/th2/codec/http/EncodeTest.kt index f0ea6d5..dd10d43 100644 --- a/src/test/kotlin/com/exactpro/th2/codec/http/EncodeTest.kt +++ b/src/test/kotlin/com/exactpro/th2/codec/http/EncodeTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2023 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,20 @@ package com.exactpro.th2.codec.http +import com.exactpro.th2.codec.api.impl.ReportingContext import com.exactpro.th2.codec.http.HttpPipelineCodecFactory.Companion.PROTOCOL -import com.exactpro.th2.common.grpc.AnyMessage -import com.exactpro.th2.common.grpc.Message -import com.exactpro.th2.common.grpc.MessageGroup +import com.exactpro.th2.common.grpc.AnyMessage as ProtoAnyMessage +import com.exactpro.th2.common.grpc.Message as ProtoMessage +import com.exactpro.th2.common.grpc.MessageGroup as ProtoMessageGroup import com.exactpro.th2.common.message.addField import com.exactpro.th2.common.message.messageType +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.EventId +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.MessageGroup +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.ParsedMessage +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage import kotlin.test.assertEquals import org.junit.jupiter.api.Test +import java.time.Instant class EncodeTest { @@ -32,7 +38,7 @@ class EncodeTest { val eventID = "123" val codec = HttpPipelineCodec() - val message = Message.newBuilder().apply { + val message = ProtoMessage.newBuilder().apply { messageType = HttpPipelineCodec.REQUEST_MESSAGE addField(HttpPipelineCodec.URI_FIELD, "/test") addField(HttpPipelineCodec.METHOD_FIELD, "GET") @@ -40,9 +46,9 @@ class EncodeTest { metadataBuilder.protocol = PROTOCOL } - val messageGroup = MessageGroup.newBuilder().addMessages(AnyMessage.newBuilder().setMessage(message).build()).build() + val messageGroup = ProtoMessageGroup.newBuilder().addMessages(ProtoAnyMessage.newBuilder().setMessage(message).build()).build() - val encodedEventID = codec.encode(messageGroup).getMessages(0).rawMessage.parentEventId + val encodedEventID = codec.encode(messageGroup, ReportingContext()).getMessages(0).rawMessage.parentEventId assertEquals(eventID, encodedEventID.id) } @@ -52,7 +58,7 @@ class EncodeTest { val eventID = "123" val codec = HttpPipelineCodec() - val message = Message.newBuilder().apply { + val message = ProtoMessage.newBuilder().apply { messageType = HttpPipelineCodec.RESPONSE_MESSAGE addField(HttpPipelineCodec.STATUS_CODE_FIELD, "200") addField(HttpPipelineCodec.REASON_FIELD, "OK") @@ -60,11 +66,62 @@ class EncodeTest { metadataBuilder.protocol = PROTOCOL } - val messageGroup = MessageGroup.newBuilder().addMessages(AnyMessage.newBuilder().setMessage(message).build()).build() - - val encodedEventID = codec.encode(messageGroup).getMessages(0).rawMessage.parentEventId + val messageGroup = ProtoMessageGroup.newBuilder().addMessages(ProtoAnyMessage.newBuilder().setMessage(message).build()).build() + val encodedEventID = codec.encode(messageGroup, ReportingContext()).getMessages(0).rawMessage.parentEventId assertEquals(eventID, encodedEventID.id) } + @Test + fun `parent event id test - request (transport)`() { + val eventID = "123" + + val codec = HttpPipelineCodec() + val message = ParsedMessage( + eventId = EventId(eventID, "book_1", "scope_1", Instant.now()), + type = HttpPipelineCodec.REQUEST_MESSAGE, + protocol = PROTOCOL, + body = mapOf( + HttpPipelineCodec.URI_FIELD to "/test", + HttpPipelineCodec.METHOD_FIELD to "GET" + ) + ) + + val messageGroup = MessageGroup(listOf(message)) + val encodedGroup = codec.encode(messageGroup, ReportingContext()) + val encodedMessage = encodedGroup.messages[0] as RawMessage + val encodedEventID = encodedMessage.eventId + + assertEquals(1, encodedGroup.messages.size) + assertEquals(PROTOCOL, encodedMessage.protocol) + assertEquals(22, encodedMessage.body.readableBytes()) + assertEquals(eventID, encodedEventID?.id) + } + + @Test + fun `parent event id test - response (transport)`() { + val eventID = "123" + + val codec = HttpPipelineCodec() + val message = ParsedMessage( + eventId = EventId(eventID, "book_1", "scope_1", Instant.now()), + type = HttpPipelineCodec.RESPONSE_MESSAGE, + protocol = PROTOCOL, + body = mapOf( + HttpPipelineCodec.STATUS_CODE_FIELD to "200", + HttpPipelineCodec.REASON_FIELD to "OK" + ) + ) + + val messageGroup = MessageGroup(listOf(message)) + val encodedGroup = codec.encode(messageGroup, ReportingContext()) + + val encodedMessage = encodedGroup.messages[0] as RawMessage + val encodedEventID = encodedMessage.eventId + + assertEquals(1, encodedGroup.messages.size) + assertEquals(PROTOCOL, encodedMessage.protocol) + assertEquals(19, encodedMessage.body.readableBytes()) + assertEquals(eventID, encodedEventID?.id) + } } \ No newline at end of file diff --git a/suppressions.xml b/suppressions.xml new file mode 100644 index 0000000..52da5f2 --- /dev/null +++ b/suppressions.xml @@ -0,0 +1,9 @@ + + + + + + ^pkg:maven/com\.exactpro\.th2/task-utils@.*$ + cpe:/a:utils_project:utils + + \ No newline at end of file