Skip to content

Commit

Permalink
Support response compression (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Dec 8, 2024
1 parent 68513b3 commit de571fd
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 15 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Known issues:

## Future improvements

- [x] Support GET-requests
- [ ] Support `google.api.http` annotations (GRPC transcoding)
- [x] Support GET-requests ([#10](https://github.com/igor-vovk/connect-rpc-scala/issues/10))
- [ ] Support `google.api.http` annotations (GRPC transcoding) ([#51](https://github.com/igor-vovk/connect-rpc-scala/issues/51))
- [ ] Support configurable timeouts
- [ ] Support non-unary (streaming) methods
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import org.ivovk.connect_rpc_scala.Mappings.*
import org.ivovk.connect_rpc_scala.grpc.{ClientCalls, GrpcHeaders, MethodRegistry}
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
import org.ivovk.connect_rpc_scala.http.RequestEntity
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec.given
import org.ivovk.connect_rpc_scala.http.codec.{Compressor, EncodeOptions, MessageCodec}
import org.slf4j.{Logger, LoggerFactory}
import scalapb.GeneratedMessage

Expand All @@ -33,6 +33,10 @@ class ConnectHandler[F[_] : Async](
req: RequestEntity[F],
method: MethodRegistry.Entry,
)(using MessageCodec[F]): F[Response[F]] = {
given EncodeOptions = EncodeOptions(
encoding = req.encoding.filter(Compressor.supportedEncodings.contains)
)

method.descriptor.getType match
case MethodType.UNARY =>
handleUnary(req, method)
Expand All @@ -46,7 +50,7 @@ class ConnectHandler[F[_] : Async](
private def handleUnary(
req: RequestEntity[F],
method: MethodRegistry.Entry,
)(using MessageCodec[F]): F[Response[F]] = {
)(using MessageCodec[F], EncodeOptions): F[Response[F]] = {
if (logger.isTraceEnabled) {
// Used in conformance tests
req.headers.get[`X-Test-Case-Name`] match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package org.ivovk.connect_rpc_scala.http.codec
import cats.effect.Sync
import fs2.Stream
import fs2.compression.Compression
import org.http4s.ContentCoding
import org.http4s.{ContentCoding, Entity}

object Compressor {
val supportedEncodings: Set[ContentCoding] = Set(ContentCoding.gzip)
}

class Compressor[F[_] : Sync] {

Expand All @@ -13,8 +17,22 @@ class Compressor[F[_] : Sync] {
body.through(encoding match {
case Some(ContentCoding.gzip) =>
Compression[F].gunzip().andThen(_.flatMap(_.content))
case _ =>
case Some(other) =>
throw new IllegalArgumentException(s"Unsupported encoding: $other")
case None =>
identity
})

def compressed(encoding: Option[ContentCoding], entity: Entity[F]): Entity[F] =
encoding match {
case Some(ContentCoding.gzip) =>
Entity(
body = entity.body.through(Compression[F].gzip()),
)
case Some(other) =>
throw new IllegalArgumentException(s"Unsupported encoding: $other")
case None =>
entity
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class JsonMessageCodec[F[_] : Sync](
.leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some))
}

override def encode[A <: Message](message: A): Entity[F] = {
override def encode[A <: Message](message: A, options: EncodeOptions): Entity[F] = {
val string = printer.print(message)

if (logger.isTraceEnabled) {
Expand All @@ -55,10 +55,12 @@ class JsonMessageCodec[F[_] : Sync](

val bytes = string.getBytes()

Entity(
val entity = Entity(
body = Stream.chunk(Chunk.array(bytes)),
length = Some(bytes.length.toLong),
)

compressor.compressed(options.encoding, entity)
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
package org.ivovk.connect_rpc_scala.http.codec

import org.http4s.headers.`Content-Type`
import org.http4s.{DecodeResult, Entity, EntityEncoder, MediaType}
import org.http4s.headers.{`Content-Encoding`, `Content-Type`}
import org.http4s.{ContentCoding, DecodeResult, Entity, EntityEncoder, Header, Headers, MediaType}
import org.ivovk.connect_rpc_scala.http.RequestEntity
import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Companion}

import scala.util.chaining.*

case class EncodeOptions(
encoding: Option[ContentCoding]
)

object MessageCodec {
given [F[_], A <: Message](using codec: MessageCodec[F]): EntityEncoder[F, A] =
EntityEncoder.encodeBy(`Content-Type`(codec.mediaType))(codec.encode)
given [F[_], A <: Message](using codec: MessageCodec[F], options: EncodeOptions): EntityEncoder[F, A] = {
val headers = Headers(`Content-Type`(codec.mediaType))
.pipe(
options.encoding match
case Some(encoding) => _.put(`Content-Encoding`(encoding))
case None => identity
)

EntityEncoder.encodeBy(headers)(codec.encode(_, options))
}
}

trait MessageCodec[F[_]] {
Expand All @@ -16,6 +30,6 @@ trait MessageCodec[F[_]] {

def decode[A <: Message](m: RequestEntity[F])(using cmp: Companion[A]): DecodeResult[F, A]

def encode[A <: Message](message: A): Entity[F]
def encode[A <: Message](message: A, options: EncodeOptions): Entity[F]

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ class ProtoMessageCodec[F[_] : Async] extends MessageCodec[F] {
.leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some))
}

override def encode[A <: Message](message: A): Entity[F] = {
override def encode[A <: Message](message: A, options: EncodeOptions): Entity[F] = {
if (logger.isTraceEnabled) {
logger.trace(s"<<< Proto: ${message.toProtoString}")
}

val dataLength = message.serializedSize
val chunkSize = CodedOutputStream.DEFAULT_BUFFER_SIZE min dataLength

Entity(
val entity = Entity(
body = readOutputStream(chunkSize)(os => Async[F].delay(message.writeTo(os))),
length = Some(dataLength.toLong),
)

compressor.compressed(options.encoding, entity)
}

}

0 comments on commit de571fd

Please sign in to comment.