Skip to content

Commit

Permalink
Support external configuration for GRPC client and server builders (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Nov 30, 2024
1 parent 4da983f commit 6f36826
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object Main extends IOApp.Simple {
.create[IO](
Seq(service),
Configuration(
jsonPrinterConfiguration = { p =>
jsonPrinterConfigurer = { p =>
// Registering message types in TypeRegistry is required to pass com.google.protobuf.any.Any
// JSON-serialization conformance tests
p.withTypeRegistry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import scala.concurrent.duration.*
import scala.util.chaining.*

case class Configuration(
jsonPrinterConfiguration: Endo[Printer] = identity,
jsonPrinterConfigurer: Endo[Printer] = identity,
serverBuilderConfigurer: Endo[ServerBuilder[_]] = identity,
channelBuilderConfigurer: Endo[ManagedChannelBuilder[_]] = identity,
waitForShutdown: Duration = 10.seconds,
)

Expand All @@ -42,7 +44,7 @@ object ConnectRpcHttpRoutes {
val dsl = Http4sDsl[F]
import dsl.*

val jsonPrinter = configuration.jsonPrinterConfiguration(JsonFormat.printer)
val jsonPrinter = configuration.jsonPrinterConfigurer(JsonFormat.printer)

val codecRegistry = MessageCodecRegistry[F](
JsonMessageCodec[F](jsonPrinter),
Expand All @@ -52,7 +54,12 @@ object ConnectRpcHttpRoutes {
val methodRegistry = MethodRegistry(services)

for
ipChannel <- InProcessChannelBridge.create(services, configuration.waitForShutdown)
ipChannel <- InProcessChannelBridge.create(
services,
configuration.serverBuilderConfigurer,
configuration.channelBuilderConfigurer,
configuration.waitForShutdown,
)
yield
def handle(
httpMethod: Method,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
package org.ivovk.connect_rpc_scala

import cats.Endo
import cats.effect.{Resource, Sync}
import cats.implicits.*
import io.grpc.*
import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.{Channel, ManagedChannel, Server, ServerServiceDefinition}

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters.*
import scala.util.chaining.*

object InProcessChannelBridge {

def create[F[_] : Sync](
services: Seq[ServerServiceDefinition],
serverBuilderConfigurer: Endo[ServerBuilder[?]] = identity,
channelBuilderConfigurer: Endo[ManagedChannelBuilder[?]] = identity,
waitForShutdown: Duration,
): Resource[F, Channel] = {
for
name <- Resource.eval(Sync[F].delay(InProcessServerBuilder.generateName()))
server <- createServer(name, services, waitForShutdown)
channel <- createStub(name, waitForShutdown)
server <- createServer(name, services, waitForShutdown, serverBuilderConfigurer)
channel <- createStub(name, waitForShutdown, channelBuilderConfigurer)
yield channel
}

private def createServer[F[_] : Sync](
name: String,
services: Seq[ServerServiceDefinition],
waitForShutdown: Duration,
serverBuilderConfigurer: Endo[ServerBuilder[?]] = identity,
): Resource[F, Server] = {
val acquire = Sync[F].delay {
InProcessServerBuilder.forName(name)
.directExecutor()
.addServices(services.asJava)
.pipe(serverBuilderConfigurer)
.build()
.start()
}
Expand All @@ -43,10 +49,12 @@ object InProcessChannelBridge {
private def createStub[F[_] : Sync](
name: String,
waitForShutdown: Duration,
channelBuilderConfigurer: Endo[ManagedChannelBuilder[?]] = identity,
): Resource[F, ManagedChannel] = {
val acquire = Sync[F].delay {
InProcessChannelBuilder.forName(name)
.directExecutor()
.pipe(channelBuilderConfigurer)
.build()
}
val release = (c: ManagedChannel) =>
Expand Down

0 comments on commit 6f36826

Please sign in to comment.