Skip to content

Commit

Permalink
Properly map decoding failures (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Nov 30, 2024
1 parent 6f36826 commit 50732cc
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object Headers {
@targetName("ConnectTimeoutMs")
case class `Connect-Timeout-Ms`(value: Long)

@targetName("HeaderConnectTimeoutMs")
@targetName("ConnectTimeoutMs$")
object `Connect-Timeout-Ms` {
def parse(s: String): ParseResult[`Connect-Timeout-Ms`] = {
ParseResult.fromTryCatchNonFatal(s)(`Connect-Timeout-Ms`(s.toLong))
Expand All @@ -26,7 +26,7 @@ object Headers {
@targetName("XTestCaseName")
case class `X-Test-Case-Name`(value: String)

@targetName("HeaderXTestCaseName")
@targetName("XTestCaseName$")
object `X-Test-Case-Name` {
@targetName("HeaderXTestCaseName")
implicit val header: Header[`X-Test-Case-Name`, Header.Single] = Header.createRendered(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.ivovk.connect_rpc_scala.http

import cats.Applicative
import cats.data.EitherT
import cats.effect.{Async, Sync}
import cats.implicits.*
import com.google.protobuf.CodedOutputStream
Expand All @@ -10,7 +9,7 @@ import fs2.io.{readOutputStream, toInputStreamResource}
import fs2.text.decodeWithCharset
import fs2.{Chunk, Stream}
import org.http4s.headers.`Content-Type`
import org.http4s.{ContentCoding, DecodeResult, Entity, EntityDecoder, EntityEncoder, MediaRange, MediaType}
import org.http4s.{ContentCoding, DecodeResult, Entity, EntityDecoder, EntityEncoder, InvalidMessageBodyFailure, MediaRange, MediaType}
import org.ivovk.connect_rpc_scala.ConnectRpcHttpRoutes.getClass
import org.slf4j.{Logger, LoggerFactory}
import scalapb.json4s.{JsonFormat, Printer}
Expand Down Expand Up @@ -54,17 +53,17 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess
.compile.string
}

val f = string
string
.flatMap { str =>
if (logger.isTraceEnabled) {
logger.trace(s">>> Headers: ${entity.headers.redactSensitive}")
logger.trace(s">>> Headers: ${entity.headers.redactSensitive()}")
logger.trace(s">>> JSON: $str")
}

Sync[F].delay(JsonFormat.fromJsonString(str))
}

EitherT.right(f)
.attemptT
.leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some))
}

override def encode[A <: Message](message: A): Entity[F] = {
Expand Down Expand Up @@ -93,7 +92,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
override val mediaType: MediaType = MediaTypes.`application/proto`

override def decode[A <: Message](entity: RequestEntity[F])(using cmp: Companion[A]): DecodeResult[F, A] = {
val f = entity.message match {
val msg = entity.message match {
case str: String =>
Async[F].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset)))
.flatMap(arr => Async[F].delay(cmp.parseFrom(arr)))
Expand All @@ -102,14 +101,17 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
.use(is => Async[F].delay(cmp.parseFrom(is)))
}

EitherT.right(f.map { message =>
if (logger.isTraceEnabled) {
logger.trace(s">>> Headers: ${entity.headers.redactSensitive}")
logger.trace(s">>> Proto: ${message.toProtoString}")
}
msg
.map { message =>
if (logger.isTraceEnabled) {
logger.trace(s">>> Headers: ${entity.headers.redactSensitive()}")
logger.trace(s">>> Proto: ${message.toProtoString}")
}

message
})
message
}
.attemptT
.leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some))
}

override def encode[A <: Message](message: A): Entity[F] = {
Expand Down

0 comments on commit 50732cc

Please sign in to comment.