diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/Headers.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/Headers.scala index 057566d..2894883 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/Headers.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/Headers.scala @@ -10,7 +10,7 @@ object Headers { @targetName("ConnectTimeoutMs") case class `Connect-Timeout-Ms`(value: Long) - @targetName("HeaderConnectTimeoutMs") + @targetName("ConnectTimeoutMs$") object `Connect-Timeout-Ms` { def parse(s: String): ParseResult[`Connect-Timeout-Ms`] = { ParseResult.fromTryCatchNonFatal(s)(`Connect-Timeout-Ms`(s.toLong)) @@ -26,7 +26,7 @@ object Headers { @targetName("XTestCaseName") case class `X-Test-Case-Name`(value: String) - @targetName("HeaderXTestCaseName") + @targetName("XTestCaseName$") object `X-Test-Case-Name` { @targetName("HeaderXTestCaseName") implicit val header: Header[`X-Test-Case-Name`, Header.Single] = Header.createRendered( diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala index deeb798..9c7fb5d 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala @@ -1,7 +1,6 @@ package org.ivovk.connect_rpc_scala.http import cats.Applicative -import cats.data.EitherT import cats.effect.{Async, Sync} import cats.implicits.* import com.google.protobuf.CodedOutputStream @@ -10,7 +9,7 @@ import fs2.io.{readOutputStream, toInputStreamResource} import fs2.text.decodeWithCharset import fs2.{Chunk, Stream} import org.http4s.headers.`Content-Type` -import org.http4s.{ContentCoding, DecodeResult, Entity, EntityDecoder, EntityEncoder, MediaRange, MediaType} +import org.http4s.{ContentCoding, DecodeResult, Entity, EntityDecoder, EntityEncoder, InvalidMessageBodyFailure, MediaRange, MediaType} import org.ivovk.connect_rpc_scala.ConnectRpcHttpRoutes.getClass import org.slf4j.{Logger, LoggerFactory} import scalapb.json4s.{JsonFormat, Printer} @@ -54,17 +53,17 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess .compile.string } - val f = string + string .flatMap { str => if (logger.isTraceEnabled) { - logger.trace(s">>> Headers: ${entity.headers.redactSensitive}") + logger.trace(s">>> Headers: ${entity.headers.redactSensitive()}") logger.trace(s">>> JSON: $str") } Sync[F].delay(JsonFormat.fromJsonString(str)) } - - EitherT.right(f) + .attemptT + .leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some)) } override def encode[A <: Message](message: A): Entity[F] = { @@ -93,7 +92,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] { override val mediaType: MediaType = MediaTypes.`application/proto` override def decode[A <: Message](entity: RequestEntity[F])(using cmp: Companion[A]): DecodeResult[F, A] = { - val f = entity.message match { + val msg = entity.message match { case str: String => Async[F].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset))) .flatMap(arr => Async[F].delay(cmp.parseFrom(arr))) @@ -102,14 +101,17 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] { .use(is => Async[F].delay(cmp.parseFrom(is))) } - EitherT.right(f.map { message => - if (logger.isTraceEnabled) { - logger.trace(s">>> Headers: ${entity.headers.redactSensitive}") - logger.trace(s">>> Proto: ${message.toProtoString}") - } + msg + .map { message => + if (logger.isTraceEnabled) { + logger.trace(s">>> Headers: ${entity.headers.redactSensitive()}") + logger.trace(s">>> Proto: ${message.toProtoString}") + } - message - }) + message + } + .attemptT + .leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some)) } override def encode[A <: Message](message: A): Entity[F] = {