From dfb80df0fbab4f7a01f42d379149859e7d7e2b0d Mon Sep 17 00:00:00 2001 From: Ihor Vovk Date: Sun, 1 Dec 2024 11:02:06 +0100 Subject: [PATCH] Optimize GRPC method lookup; optimizations in protobuf decoding (#36) --- .../connect_rpc_scala/ConnectHandler.scala | 9 ++++---- .../ConnectRouteBuilder.scala | 8 +++---- .../{ => grpc}/InProcessChannelBridge.scala | 2 +- .../connect_rpc_scala/grpc/MethodName.scala | 15 ++++++++++++ .../{ => grpc}/MethodRegistry.scala | 12 +++++----- .../connect_rpc_scala/http/MessageCodec.scala | 23 ++++++++++--------- 6 files changed, 42 insertions(+), 27 deletions(-) rename core/src/main/scala/org/ivovk/connect_rpc_scala/{ => grpc}/InProcessChannelBridge.scala (98%) create mode 100644 core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodName.scala rename core/src/main/scala/org/ivovk/connect_rpc_scala/{ => grpc}/MethodRegistry.scala (77%) diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala index 2366234..798fe90 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala @@ -4,12 +4,13 @@ import cats.Endo import cats.data.EitherT import cats.effect.Async import cats.implicits.* -import io.grpc.{CallOptions, Channel, ClientInterceptors, Metadata, StatusException, StatusRuntimeException} +import io.grpc.* import io.grpc.MethodDescriptor.MethodType import io.grpc.stub.MetadataUtils import org.http4s.dsl.Http4sDsl import org.http4s.{MediaType, Method, Response} import org.ivovk.connect_rpc_scala.Mappings.* +import org.ivovk.connect_rpc_scala.grpc.{MethodName, MethodRegistry} import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name` import org.ivovk.connect_rpc_scala.http.MessageCodec.given import org.ivovk.connect_rpc_scala.http.{MediaTypes, MessageCodec, MessageCodecRegistry, RequestEntity} @@ -35,7 +36,7 @@ class ConnectHandler[F[_]: Async]( httpMethod: Method, contentType: Option[MediaType], entity: RequestEntity[F], - grpcMethodName: String, + grpcMethodName: MethodName, ): F[Response[F]] = { val eitherT = for given MessageCodec[F] <- EitherT.fromOptionM( @@ -48,7 +49,7 @@ class ConnectHandler[F[_]: Async]( methodRegistry.get(grpcMethodName).pure[F], NotFound(connectrpc.Error( code = io.grpc.Status.NOT_FOUND.toConnectCode, - message = s"Method not found: $grpcMethodName".some + message = s"Method not found: ${grpcMethodName.fullyQualifiedName}".some )) ) @@ -58,7 +59,7 @@ class ConnectHandler[F[_]: Async]( (), Forbidden(connectrpc.Error( code = io.grpc.Status.PERMISSION_DENIED.toConnectCode, - message = s"Only POST-requests are allowed for method: $grpcMethodName".some + message = s"Only POST-requests are allowed for method: ${grpcMethodName.fullyQualifiedName}".some )) ).leftSemiflatMap(identity) diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala index 44bfd82..24860b3 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectRouteBuilder.scala @@ -6,6 +6,7 @@ import cats.implicits.* import io.grpc.{ManagedChannelBuilder, ServerBuilder, ServerServiceDefinition} import org.http4s.dsl.Http4sDsl import org.http4s.{HttpApp, HttpRoutes, Method} +import org.ivovk.connect_rpc_scala.grpc.* import org.ivovk.connect_rpc_scala.http.* import org.ivovk.connect_rpc_scala.http.QueryParams.* import scalapb.json4s.{JsonFormat, Printer} @@ -89,12 +90,12 @@ case class ConnectRouteBuilder[F[_] : Async] private( HttpRoutes.of[F] { case req@Method.GET -> Root / serviceName / methodName :? EncodingQP(contentType) +& MessageQP(message) => - val grpcMethod = grpcMethodName(serviceName, methodName) + val grpcMethod = MethodName(serviceName, methodName) val entity = RequestEntity[F](message, req.headers) handler.handle(Method.GET, contentType.some, entity, grpcMethod) case req@Method.POST -> Root / serviceName / methodName => - val grpcMethod = grpcMethodName(serviceName, methodName) + val grpcMethod = MethodName(serviceName, methodName) val contentType = req.contentType.map(_.mediaType) val entity = RequestEntity[F](req) @@ -105,7 +106,4 @@ case class ConnectRouteBuilder[F[_] : Async] private( def build: Resource[F, HttpApp[F]] = buildRoutes.map(_.orNotFound) - private inline def grpcMethodName(service: String, method: String): String = - service + "/" + method - } diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/InProcessChannelBridge.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/InProcessChannelBridge.scala similarity index 98% rename from core/src/main/scala/org/ivovk/connect_rpc_scala/InProcessChannelBridge.scala rename to core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/InProcessChannelBridge.scala index f453653..9ffa3a6 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/InProcessChannelBridge.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/InProcessChannelBridge.scala @@ -1,4 +1,4 @@ -package org.ivovk.connect_rpc_scala +package org.ivovk.connect_rpc_scala.grpc import cats.Endo import cats.effect.{Resource, Sync} diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodName.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodName.scala new file mode 100644 index 0000000..61b37f6 --- /dev/null +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodName.scala @@ -0,0 +1,15 @@ +package org.ivovk.connect_rpc_scala.grpc + +import io.grpc.MethodDescriptor + +type Service = String +type Method = String + +object MethodName { + def apply(descriptor: MethodDescriptor[_, _]): MethodName = + MethodName(descriptor.getServiceName, descriptor.getBareMethodName) +} + +case class MethodName(service: Service, method: Method) { + def fullyQualifiedName: String = s"$service/$method" +} diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/MethodRegistry.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodRegistry.scala similarity index 77% rename from core/src/main/scala/org/ivovk/connect_rpc_scala/MethodRegistry.scala rename to core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodRegistry.scala index 06d1b29..546bfd8 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/MethodRegistry.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/MethodRegistry.scala @@ -1,4 +1,4 @@ -package org.ivovk.connect_rpc_scala +package org.ivovk.connect_rpc_scala.grpc import io.grpc.{MethodDescriptor, ServerMethodDefinition, ServerServiceDefinition} import scalapb.{GeneratedMessage, GeneratedMessageCompanion} @@ -35,18 +35,18 @@ object MethodRegistry { descriptor = methodDescriptor, ) - methodDescriptor.getFullMethodName -> methodEntry + MethodName(methodDescriptor) -> methodEntry } - .toMap + .groupMapReduce((mn, _) => mn.service)((mn, m) => Map(mn.method -> m))(_ ++ _) new MethodRegistry(entries) } } -class MethodRegistry private(entries: Map[String, MethodRegistry.Entry]) { +class MethodRegistry private(entries: Map[Service, Map[Method, MethodRegistry.Entry]]) { - def get(fullMethodName: String): Option[MethodRegistry.Entry] = - entries.get(fullMethodName) + def get(methodName: MethodName): Option[MethodRegistry.Entry] = + entries.getOrElse(methodName.service, Map.empty).get(methodName.method) } diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala index 04950a2..1b67ec4 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/http/MessageCodec.scala @@ -16,6 +16,7 @@ import scalapb.{GeneratedMessage as Message, GeneratedMessageCompanion as Compan import java.net.URLDecoder import java.util.Base64 +import scala.util.chaining.* object MessageCodec { given [F[_] : Applicative, A <: Message](using codec: MessageCodec[F], cmp: Companion[A]): EntityDecoder[F, A] = @@ -93,22 +94,22 @@ class ProtoMessageCodec[F[_] : Async](compressor: Compressor[F]) extends Message override def decode[A <: Message](entity: RequestEntity[F])(using cmp: Companion[A]): DecodeResult[F, A] = { val msg = entity.message match { case str: String => - Async[F].delay(base64dec.decode(str.getBytes(entity.charset.nioCharset))) - .flatMap(arr => Async[F].delay(cmp.parseFrom(arr))) + Async[F].delay(cmp.parseFrom(base64dec.decode(str.getBytes(entity.charset.nioCharset)))) case stream: Stream[F, Byte] => toInputStreamResource(compressor.decompressed(entity.encoding, stream)) .use(is => Async[F].delay(cmp.parseFrom(is))) } msg - .map { message => - if (logger.isTraceEnabled) { - logger.trace(s">>> Headers: ${entity.headers.redactSensitive()}") - logger.trace(s">>> Proto: ${message.toProtoString}") - } - - message - } + .pipe( + if logger.isTraceEnabled then + _.map { msg => + logger.trace(s">>> Headers: ${entity.headers.redactSensitive()}") + logger.trace(s">>> Proto: ${msg.toProtoString}") + msg + } + else identity + ) .attemptT .leftMap(e => InvalidMessageBodyFailure(e.getMessage, e.some)) } @@ -129,7 +130,7 @@ class ProtoMessageCodec[F[_] : Async](compressor: Compressor[F]) extends Message } -class Compressor[F[_]: Sync] { +class Compressor[F[_] : Sync] { given Compression[F] = Compression.forSync[F]