Skip to content

Commit

Permalink
Remove Compression implicit requirement (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Nov 30, 2024
1 parent 1428dd0 commit b5dd950
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.ivovk.connect_rpc_scala
import cats.Endo
import cats.effect.{Async, Resource}
import cats.implicits.*
import fs2.compression.Compression
import io.grpc.{ManagedChannelBuilder, ServerBuilder, ServerServiceDefinition}
import org.http4s.dsl.Http4sDsl
import org.http4s.{HttpApp, HttpRoutes, Method}
Expand All @@ -15,18 +14,18 @@ import scala.concurrent.duration.*

object ConnectRouteBuilder {

def forService[F[_] : Async: Compression](service: ServerServiceDefinition): ConnectRouteBuilder[F] =
def forService[F[_] : Async](service: ServerServiceDefinition): ConnectRouteBuilder[F] =
ConnectRouteBuilder(Seq(service))

def forServices[F[_] : Async: Compression](service: ServerServiceDefinition, other: ServerServiceDefinition*): ConnectRouteBuilder[F] =
def forServices[F[_] : Async](service: ServerServiceDefinition, other: ServerServiceDefinition*): ConnectRouteBuilder[F] =
ConnectRouteBuilder(service +: other)

def forServices[F[_] : Async: Compression](services: Seq[ServerServiceDefinition]): ConnectRouteBuilder[F] =
def forServices[F[_] : Async](services: Seq[ServerServiceDefinition]): ConnectRouteBuilder[F] =
ConnectRouteBuilder(services)

}

case class ConnectRouteBuilder[F[_] : Async: Compression] private(
case class ConnectRouteBuilder[F[_] : Async] private(
services: Seq[ServerServiceDefinition],
jsonPrinterConfigurator: Endo[Printer] = identity,
serverBuilderConfigurator: Endo[ServerBuilder[_]] = identity,
Expand Down Expand Up @@ -56,11 +55,12 @@ case class ConnectRouteBuilder[F[_] : Async: Compression] private(
val httpDsl = Http4sDsl[F]
import httpDsl.*

val compressor = Compressor[F]
val jsonPrinter = jsonPrinterConfigurator(JsonFormat.printer)

val codecRegistry = MessageCodecRegistry[F](
JsonMessageCodec[F](jsonPrinter),
ProtoMessageCodec[F],
JsonMessageCodec[F](compressor, jsonPrinter),
ProtoMessageCodec[F](compressor)
)

val methodRegistry = MethodRegistry(services)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait MessageCodec[F[_]] {

}

class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends MessageCodec[F] {
class JsonMessageCodec[F[_] : Sync](compressor: Compressor[F], printer: Printer) extends MessageCodec[F] {

private val logger: Logger = LoggerFactory.getLogger(getClass)

Expand All @@ -47,7 +47,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess
case str: String =>
Sync[F].delay(URLDecoder.decode(str, charset))
case stream: Stream[F, Byte] =>
decompressed(entity.encoding, stream)
compressor.decompressed(entity.encoding, stream)
.through(decodeWithCharset(charset))
.compile.string
}
Expand Down Expand Up @@ -82,7 +82,7 @@ class JsonMessageCodec[F[_] : Sync : Compression](printer: Printer) extends Mess

}

class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
class ProtoMessageCodec[F[_] : Async](compressor: Compressor[F]) extends MessageCodec[F] {

private val logger: Logger = LoggerFactory.getLogger(getClass)

Expand All @@ -96,7 +96,7 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {
Async[F].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset)))
.flatMap(arr => Async[F].delay(cmp.parseFrom(arr)))
case stream: Stream[F, Byte] =>
toInputStreamResource(decompressed(entity.encoding, stream))
toInputStreamResource(compressor.decompressed(entity.encoding, stream))
.use(is => Async[F].delay(cmp.parseFrom(is)))
}

Expand Down Expand Up @@ -129,11 +129,16 @@ class ProtoMessageCodec[F[_] : Async : Compression] extends MessageCodec[F] {

}

def decompressed[F[_] : Compression](encoding: Option[ContentCoding], body: Stream[F, Byte]): Stream[F, Byte] = {
body.through(encoding match {
case Some(ContentCoding.gzip) =>
Compression[F].gunzip().andThen(_.flatMap(_.content))
case _ =>
identity
})
}
class Compressor[F[_]: Sync] {

given Compression[F] = Compression.forSync[F]

def decompressed(encoding: Option[ContentCoding], body: Stream[F, Byte]): Stream[F, Byte] =
body.through(encoding match {
case Some(ContentCoding.gzip) =>
Compression[F].gunzip().andThen(_.flatMap(_.content))
case _ =>
identity
})

}

0 comments on commit b5dd950

Please sign in to comment.