From b5dd950be634f5805e5bcac109443af9575caf7c Mon Sep 17 00:00:00 2001 From: Ihor Vovk Date: Sat, 30 Nov 2024 19:59:15 +0100 Subject: [PATCH] Remove Compression implicit requirement (#30) --- .../ConnectRouteBuilder.scala | 14 ++++----- .../connect_rpc_scala/http/MessageCodec.scala | 29 +++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala index 20510c7..9f7ccbd 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala @@ -3,7 +3,6 @@ package org.ivovk.connect_rpc_scala import cats.Endo import cats.effect.{Async, Resource} import cats.implicits.* -import fs2.compression.Compression import io.grpc.{ManagedChannelBuilder, ServerBuilder, ServerServiceDefinition} import org.http4s.dsl.Http4sDsl import org.http4s.{HttpApp, HttpRoutes, Method} @@ -15,18 +14,18 @@ import scala.concurrent.duration.* object ConnectRouteBuilder { - def forService[F[_] : Async: Compression](service: ServerServiceDefinition): ConnectRouteBuilder[F] = + def forService[F[_] : Async](service: ServerServiceDefinition): ConnectRouteBuilder[F] = ConnectRouteBuilder(Seq(service)) - def forServices[F[_] : Async: Compression](service: ServerServiceDefinition, other: ServerServiceDefinition*): ConnectRouteBuilder[F] = + def forServices[F[_] : Async](service: ServerServiceDefinition, other: ServerServiceDefinition*): ConnectRouteBuilder[F] = ConnectRouteBuilder(service +: other) - def forServices[F[_] : Async: Compression](services: Seq[ServerServiceDefinition]): ConnectRouteBuilder[F] = + def forServices[F[_] : Async](services: Seq[ServerServiceDefinition]): ConnectRouteBuilder[F] = ConnectRouteBuilder(services) } -case class ConnectRouteBuilder[F[_] : Async: Compression] private( +case class ConnectRouteBuilder[F[_] : Async] private( services: Seq[ServerServiceDefinition], jsonPrinterConfigurator: Endo[Printer] = identity, serverBuilderConfigurator: Endo[ServerBuilder[_]] = identity, @@ -56,11 +55,12 @@ case class ConnectRouteBuilder[F[_] : Async: Compression] private( val httpDsl = Http4sDsl[F] import httpDsl.* + val compressor = Compressor[F] val jsonPrinter = jsonPrinterConfigurator(JsonFormat.printer) val codecRegistry = MessageCodecRegistry[F]( - JsonMessageCodec[F](jsonPrinter), - ProtoMessageCodec[F], + JsonMessageCodec[F](compressor, jsonPrinter), + ProtoMessageCodec[F](compressor) ) val methodRegistry = MethodRegistry(services) diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala index 215033e..04950a2 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala @@ -35,7 +35,7 @@ trait MessageCodec[F[_]] { } -class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends MessageCodec[F] { +class JsonMessageCodec[F[_] : Sync](compressor: Compressor[F], printer: Printer) extends MessageCodec[F] { private val logger: Logger = LoggerFactory.getLogger(getClass) @@ -47,7 +47,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess case str: String => Sync[F].delay(URLDecoder.decode(str, charset)) case stream: Stream[F, Byte] => - decompressed(entity.encoding, stream) + compressor.decompressed(entity.encoding, stream) .through(decodeWithCharset(charset)) .compile.string } @@ -82,7 +82,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess } -class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] { +class ProtoMessageCodec[F[_] : Async](compressor: Compressor[F]) extends MessageCodec[F] { private val logger: Logger = LoggerFactory.getLogger(getClass) @@ -96,7 +96,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] { Async[F].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset))) .flatMap(arr => Async[F].delay(cmp.parseFrom(arr))) case stream: Stream[F, Byte] => - toInputStreamResource(decompressed(entity.encoding, stream)) + toInputStreamResource(compressor.decompressed(entity.encoding, stream)) .use(is => Async[F].delay(cmp.parseFrom(is))) } @@ -129,11 +129,16 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] { } -def decompressed[F[_] : Compression](encoding: Option[ContentCoding], body: Stream[F, Byte]): Stream[F, Byte] = { - body.through(encoding match { - case Some(ContentCoding.gzip) => - Compression[F].gunzip().andThen(_.flatMap(_.content)) - case _ => - identity - }) -} \ No newline at end of file +class Compressor[F[_]: Sync] { + + given Compression[F] = Compression.forSync[F] + + def decompressed(encoding: Option[ContentCoding], body: Stream[F, Byte]): Stream[F, Byte] = + body.through(encoding match { + case Some(ContentCoding.gzip) => + Compression[F].gunzip().andThen(_.flatMap(_.content)) + case _ => + identity + }) + +}