Skip to content

Commit

Permalink
Use builder syntax for configuration (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Nov 30, 2024
1 parent 5d72e8b commit 3c27f7d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,17 @@ object Main extends IOApp.Simple {
httpApp <- ConnectRpcHttpRoutes
.create[IO](
Seq(service),
Configuration(
jsonPrinterConfigurer = { p =>
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
// JSON-serialization conformance tests
Configuration.default
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
// JSON-serialization conformance tests
.withJsonPrinterConfigurator { p =>
p.withTypeRegistry(
TypeRegistry.default
.addMessage[connectrpc.conformance.v1.UnaryRequest]
.addMessage[connectrpc.conformance.v1.IdempotentUnaryRequest]
.addMessage[connectrpc.conformance.v1.ConformancePayload.RequestInfo]
)
}
)
)
.map(r => r.orNotFound)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.ivovk.connect_rpc_scala

import cats.Endo
import io.grpc.{ManagedChannelBuilder, ServerBuilder}
import scalapb.json4s.Printer

import scala.concurrent.duration.*

object Configuration {
val default: Configuration = Configuration()
}

case class Configuration private(
jsonPrinterConfigurator: Endo[Printer] = identity,
serverBuilderConfigurator: Endo[ServerBuilder[_]] = identity,
channelBuilderConfigurator: Endo[ManagedChannelBuilder[_]] = identity,
waitForShutdown: Duration = 5.seconds,
) {

def withJsonPrinterConfigurator(jsonPrinterConfigurer: Endo[Printer]): Configuration =
copy(jsonPrinterConfigurator = jsonPrinterConfigurer)

def withServerBuilderConfigurator(serverBuilderConfigurer: Endo[ServerBuilder[_]]): Configuration =
copy(serverBuilderConfigurator = serverBuilderConfigurer)

def withChannelBuilderConfigurator(channelBuilderConfigurer: Endo[ManagedChannelBuilder[_]]): Configuration =
copy(channelBuilderConfigurator = channelBuilderConfigurer)

def withWaitForShutdown(waitForShutdown: Duration): Configuration =
copy(waitForShutdown = waitForShutdown)

}

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package org.ivovk.connect_rpc_scala

import cats.Endo
import cats.data.EitherT
import cats.effect.Async
import cats.effect.kernel.Resource
import cats.effect.{Async, Resource}
import cats.implicits.*
import fs2.compression.Compression
import io.grpc.*
Expand All @@ -17,19 +16,13 @@ import org.ivovk.connect_rpc_scala.http.MessageCodec.given
import org.ivovk.connect_rpc_scala.http.QueryParams.*
import org.slf4j.{Logger, LoggerFactory}
import scalapb.grpc.ClientCalls
import scalapb.json4s.{JsonFormat, Printer}
import scalapb.json4s.JsonFormat
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TextFormat}

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.*
import scala.concurrent.duration.MILLISECONDS
import scala.util.chaining.*

case class Configuration(
jsonPrinterConfigurer: Endo[Printer] = identity,
serverBuilderConfigurer: Endo[ServerBuilder[_]] = identity,
channelBuilderConfigurer: Endo[ManagedChannelBuilder[_]] = identity,
waitForShutdown: Duration = 10.seconds,
)

object ConnectRpcHttpRoutes {

Expand All @@ -39,12 +32,12 @@ object ConnectRpcHttpRoutes {

def create[F[_] : Async](
services: Seq[ServerServiceDefinition],
configuration: Configuration = Configuration()
configuration: Configuration = Configuration.default
): Resource[F, HttpRoutes[F]] = {
val dsl = Http4sDsl[F]
import dsl.*

val jsonPrinter = configuration.jsonPrinterConfigurer(JsonFormat.printer)
val jsonPrinter = configuration.jsonPrinterConfigurator(JsonFormat.printer)

val codecRegistry = MessageCodecRegistry[F](
JsonMessageCodec[F](jsonPrinter),
Expand All @@ -56,8 +49,8 @@ object ConnectRpcHttpRoutes {
for
ipChannel <- InProcessChannelBridge.create(
services,
configuration.serverBuilderConfigurer,
configuration.channelBuilderConfigurer,
configuration.serverBuilderConfigurator,
configuration.channelBuilderConfigurator,
configuration.waitForShutdown,
)
yield
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@ object InProcessChannelBridge {

def create[F[_] : Sync](
services: Seq[ServerServiceDefinition],
serverBuilderConfigurer: Endo[ServerBuilder[?]] = identity,
channelBuilderConfigurer: Endo[ManagedChannelBuilder[?]] = identity,
serverBuilderConfigurator: Endo[ServerBuilder[?]] = identity,
channelBuilderConfigurator: Endo[ManagedChannelBuilder[?]] = identity,
waitForShutdown: Duration,
): Resource[F, Channel] = {
for
name <- Resource.eval(Sync[F].delay(InProcessServerBuilder.generateName()))
server <- createServer(name, services, waitForShutdown, serverBuilderConfigurer)
channel <- createStub(name, waitForShutdown, channelBuilderConfigurer)
server <- createServer(name, services, waitForShutdown, serverBuilderConfigurator)
channel <- createStub(name, waitForShutdown, channelBuilderConfigurator)
yield channel
}

private def createServer[F[_] : Sync](
name: String,
services: Seq[ServerServiceDefinition],
waitForShutdown: Duration,
serverBuilderConfigurer: Endo[ServerBuilder[?]] = identity,
serverBuilderConfigurator: Endo[ServerBuilder[?]] = identity,
): Resource[F, Server] = {
val acquire = Sync[F].delay {
InProcessServerBuilder.forName(name)
.directExecutor()
.addServices(services.asJava)
.pipe(serverBuilderConfigurer)
.pipe(serverBuilderConfigurator)
.build()
.start()
}
Expand All @@ -49,12 +49,12 @@ object InProcessChannelBridge {
private def createStub[F[_] : Sync](
name: String,
waitForShutdown: Duration,
channelBuilderConfigurer: Endo[ManagedChannelBuilder[?]] = identity,
channelBuilderConfigurator: Endo[ManagedChannelBuilder[?]] = identity,
): Resource[F, ManagedChannel] = {
val acquire = Sync[F].delay {
InProcessChannelBuilder.forName(name)
.directExecutor()
.pipe(channelBuilderConfigurer)
.pipe(channelBuilderConfigurator)
.build()
}
val release = (c: ManagedChannel) =>
Expand Down

0 comments on commit 3c27f7d

Please sign in to comment.