diff --git a/README.md b/README.md index e57da7f..6f939c3 100644 --- a/README.md +++ b/README.md @@ -160,7 +160,7 @@ Known issues: ## Future improvements -- [x] Support GET-requests -- [ ] Support `google.api.http` annotations (GRPC transcoding) +- [x] Support GET-requests ([#10](https://github.com/igor-vovk/connect-rpc-scala/issues/10)) +- [ ] Support `google.api.http` annotations (GRPC transcoding) ([#51](https://github.com/igor-vovk/connect-rpc-scala/issues/51)) - [ ] Support configurable timeouts - [ ] Support non-unary (streaming) methods diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala index 638b4ad..78342e6 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala @@ -10,8 +10,8 @@ import org.ivovk.connect_rpc_scala.Mappings.* import org.ivovk.connect_rpc_scala.grpc.{ClientCalls, GrpcHeaders, MethodRegistry} import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name` import org.ivovk.connect_rpc_scala.http.RequestEntity -import org.ivovk.connect_rpc_scala.http.codec.MessageCodec import org.ivovk.connect_rpc_scala.http.codec.MessageCodec.given +import org.ivovk.connect_rpc_scala.http.codec.{Compressor, EncodeOptions, MessageCodec} import org.slf4j.{Logger, LoggerFactory} import scalapb.GeneratedMessage @@ -33,6 +33,10 @@ class ConnectHandler[F[_] : Async]( req: RequestEntity[F], method: MethodRegistry.Entry, )(using MessageCodec[F]): F[Response[F]] = { + given EncodeOptions = EncodeOptions( + encoding = req.encoding.filter(Compressor.supportedEncodings.contains) + ) + method.descriptor.getType match case MethodType.UNARY => handleUnary(req, method) @@ -46,7 +50,7 @@ class ConnectHandler[F[_] : Async]( private def handleUnary( req: RequestEntity[F], method: MethodRegistry.Entry, - )(using MessageCodec[F]): F[Response[F]] = { + )(using MessageCodec[F], EncodeOptions): F[Response[F]] = { if (logger.isTraceEnabled) { // Used in conformance tests req.headers.get[`X-Test-Case-Name`] match { diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/Compressor.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/Compressor.scala index 4f3111b..0882c89 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/Compressor.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/Compressor.scala @@ -3,7 +3,11 @@ package org.ivovk.connect_rpc_scala.http.codec import cats.effect.Sync import fs2.Stream import fs2.compression.Compression -import org.http4s.ContentCoding +import org.http4s.{ContentCoding, Entity} + +object Compressor { + val supportedEncodings: Set[ContentCoding] = Set(ContentCoding.gzip) +} class Compressor[F[_] : Sync] { @@ -13,8 +17,22 @@ class Compressor[F[_] : Sync] { body.through(encoding match { case Some(ContentCoding.gzip) => Compression[F].gunzip().andThen(_.flatMap(_.content)) - case _ => + case Some(other) => + throw new IllegalArgumentException(s"Unsupported encoding: $other") + case None => identity }) + def compressed(encoding: Option[ContentCoding], entity: Entity[F]): Entity[F] = + encoding match { + case Some(ContentCoding.gzip) => + Entity( + body = entity.body.through(Compression[F].gzip()), + ) + case Some(other) => + throw new IllegalArgumentException(s"Unsupported encoding: $other") + case None => + entity + } + } diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/JsonMessageCodec.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/JsonMessageCodec.scala index 9f31d42..9078d37 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/JsonMessageCodec.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/JsonMessageCodec.scala @@ -46,7 +46,7 @@ class JsonMessageCodec[F[_] : Sync]( .leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some)) } - override def encode[A <: Message](message: A): Entity[F] = { + override def encode[A <: Message](message: A, options: EncodeOptions): Entity[F] = { val string = printer.print(message) if (logger.isTraceEnabled) { @@ -55,10 +55,12 @@ class JsonMessageCodec[F[_] : Sync]( val bytes = string.getBytes() - Entity( + val entity = Entity( body = Stream.chunk(Chunk.array(bytes)), length = Some(bytes.length.toLong), ) + + compressor.compressed(options.encoding, entity) } } diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/MessageCodec.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/MessageCodec.scala index f5726b7..802a679 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/MessageCodec.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/MessageCodec.scala @@ -1,13 +1,27 @@ package org.ivovk.connect_rpc_scala.http.codec -import org.http4s.headers.`Content-Type` -import org.http4s.{DecodeResult, Entity, EntityEncoder, MediaType} +import org.http4s.headers.{`Content-Encoding`, `Content-Type`} +import org.http4s.{ContentCoding, DecodeResult, Entity, EntityEncoder, Header, Headers, MediaType} import org.ivovk.connect_rpc_scala.http.RequestEntity import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion} +import scala.util.chaining.* + +case class EncodeOptions( + encoding: Option[ContentCoding] +) + object MessageCodec { - given [F[_], A <: Message](using codec: MessageCodec[F]): EntityEncoder[F, A] = - EntityEncoder.encodeBy(`Content-Type`(codec.mediaType))(codec.encode) + given [F[_], A <: Message](using codec: MessageCodec[F], options: EncodeOptions): EntityEncoder[F, A] = { + val headers = Headers(`Content-Type`(codec.mediaType)) + .pipe( + options.encoding match + case Some(encoding) => _.put(`Content-Encoding`(encoding)) + case None => identity + ) + + EntityEncoder.encodeBy(headers)(codec.encode(_, options)) + } } trait MessageCodec[F[_]] { @@ -16,6 +30,6 @@ trait MessageCodec[F[_]] { def decode[A <: Message](m: RequestEntity[F])(using cmp: Companion[A]): DecodeResult[F, A] - def encode[A <: Message](message: A): Entity[F] + def encode[A <: Message](message: A, options: EncodeOptions): Entity[F] } diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/ProtoMessageCodec.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/ProtoMessageCodec.scala index 38e216a..5edb489 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/ProtoMessageCodec.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/codec/ProtoMessageCodec.scala @@ -44,7 +44,7 @@ class ProtoMessageCodec[F[_] : Async] extends MessageCodec[F] { .leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some)) } - override def encode[A <: Message](message: A): Entity[F] = { + override def encode[A <: Message](message: A, options: EncodeOptions): Entity[F] = { if (logger.isTraceEnabled) { logger.trace(s"<<< Proto: ${message.toProtoString}") } @@ -52,10 +52,12 @@ class ProtoMessageCodec[F[_] : Async] extends MessageCodec[F] { val dataLength = message.serializedSize val chunkSize = CodedOutputStream.DEFAULT_BUFFER_SIZE min dataLength - Entity( + val entity = Entity( body = readOutputStream(chunkSize)(os => Async[F].delay(message.writeTo(os))), length = Some(dataLength.toLong), ) + + compressor.compressed(options.encoding, entity) } }