Skip to content

Commit

Permalink
Pass error details in headers instead of an error message (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Dec 6, 2024
1 parent a2a687e commit 6db2aed
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.implicits.*
import com.google.protobuf.ByteString
import connectrpc.conformance.v1.*
import io.grpc.{Metadata, Status, StatusRuntimeException}
import scalapb.TextFormat
import org.ivovk.connect_rpc_scala.syntax.all.*

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
Expand All @@ -15,8 +15,6 @@ case class UnaryHandlerResponse(payload: ConformancePayload, trailers: Metadata)

class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrailers[F, Metadata] {

import org.ivovk.connect_rpc_scala.Mappings.*

override def unary(
request: UnaryRequest,
ctx: Metadata
Expand Down Expand Up @@ -71,13 +69,9 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
case UnaryResponseDefinition.Response.Empty =>
none
case UnaryResponseDefinition.Response.Error(Error(code, message, _)) =>
val status = Status.fromCodeValue(code.value)
.withDescription(message.orNull)
.augmentDescription(
TextFormat.printToSingleLineUnicodeString(requestInfo.toProtoErrorDetailsAny)
)
val status = Status.fromCodeValue(code.value).withDescription(message.orNull)

throw new StatusRuntimeException(status, trailers)
throw new StatusRuntimeException(status, trailers).withDetails(requestInfo)
}

val sleep = Duration(responseDefinition.responseDelayMs, TimeUnit.MILLISECONDS)
Expand Down
1 change: 0 additions & 1 deletion core/src/main/protobuf/connectrpc/error.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ enum Code {
CODE_UNAUTHENTICATED = 16;
}


// This message is similar to the google.protobuf.Any message.
//
// Separate type was needed to introduce a separate JSON serializer for this message, since Any in error details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ import io.grpc.*
import io.grpc.MethodDescriptor.MethodType
import io.grpc.stub.MetadataUtils
import org.http4s.dsl.Http4sDsl
import org.http4s.{Header, Headers, MediaType, MessageFailure, Method, Response}
import org.http4s.{Header, MediaType, MessageFailure, Method, Response}
import org.ivovk.connect_rpc_scala.Mappings.*
import org.ivovk.connect_rpc_scala.grpc.{GrpcClientCalls, MethodName, MethodRegistry}
import org.ivovk.connect_rpc_scala.grpc.{GrpcClientCalls, GrpcHeaders, MethodName, MethodRegistry}
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
import org.ivovk.connect_rpc_scala.http.codec.MessageCodec.given
import org.ivovk.connect_rpc_scala.http.codec.{MessageCodec, MessageCodecRegistry}
import org.ivovk.connect_rpc_scala.http.{MediaTypes, RequestEntity}
import org.slf4j.{Logger, LoggerFactory}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion, TextFormat}
import scalapb.{GeneratedMessage, GeneratedMessageCompanion}

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
import scala.util.chaining.*

class ConnectHandler[F[_] : Async](
Expand Down Expand Up @@ -144,45 +145,23 @@ class ConnectHandler[F[_] : Async](
case _ => io.grpc.Status.INTERNAL
}

val rawMessage = Option(e match {
case e: StatusRuntimeException => e.getStatus.getDescription
case e: StatusException => e.getStatus.getDescription
case e => e.getMessage
})

val headers = e match {
case e: StatusRuntimeException => e.getTrailers.toHeaders(trailing = !treatTrailersAsHeaders)
case e: StatusException => e.getTrailers.toHeaders(trailing = !treatTrailersAsHeaders)
case _ => Headers.empty
val (message, metadata) = e match {
case e: StatusRuntimeException => (Option(e.getStatus.getDescription), e.getTrailers)
case e: StatusException => (Option(e.getStatus.getDescription), e.getTrailers)
case e => (Option(e.getMessage), new Metadata())
}

val messageWithDetails = rawMessage
.map(
_.split("\n").partition(m => !m.startsWith("type: "))
)
.map((messageParts, additionalDetails) =>
val details = additionalDetails
.map(TextFormat.fromAscii(connectrpc.ErrorDetailsAny, _) match {
case Right(details) => details
case Left(e) =>
logger.warn(s"Failed to parse additional details", e)

com.google.protobuf.wrappers.StringValue(e.msg).toProtoErrorDetailsAny
})
.toSeq

(messageParts.mkString("\n"), details)
)

val message = messageWithDetails.map(_._1)
val details = messageWithDetails.map(_._2).getOrElse(Seq.empty)

val httpStatus = grpcStatus.toHttpStatus
val connectCode = grpcStatus.toConnectCode

val details = Option(metadata.removeAll(GrpcHeaders.ErrorDetailsKey))
.fold(Seq.empty)(_.asScala.toSeq)

val headers = metadata.toHeaders(trailing = !treatTrailersAsHeaders)

if (logger.isTraceEnabled) {
logger.warn(s"<<< Error processing request", e)
logger.trace(s"<<< Http Status: $httpStatus, Connect Error Code: $connectCode, Message: ${rawMessage.orNull}")
logger.trace(s"<<< Http Status: $httpStatus, Connect Error Code: $connectCode, Message: ${message.orNull}")
}

Response[F](httpStatus, headers = headers).withEntity(connectrpc.Error(
Expand Down
27 changes: 2 additions & 25 deletions core/src/main/scala/org/ivovk/connect_rpc_scala/Mappings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package org.ivovk.connect_rpc_scala

import io.grpc.{Metadata, Status}
import org.http4s.{Header, Headers}
import org.ivovk.connect_rpc_scala.grpc.GrpcHeaders.asciiKey
import org.typelevel.ci.CIString
import scalapb.GeneratedMessage

object Mappings extends HeaderMappings, StatusCodeMappings, ProtoMappings
object Mappings extends HeaderMappings, StatusCodeMappings

trait HeaderMappings {

private inline def asciiKey(name: String): Metadata.Key[String] =
Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER)

extension (headers: Headers) {
def toMetadata: Metadata = {
val metadata = new Metadata()
Expand Down Expand Up @@ -123,23 +120,3 @@ trait StatusCodeMappings {
}

}

trait ProtoMappings {

extension [T <: GeneratedMessage](t: T) {
def toProtoAny: com.google.protobuf.any.Any = {
com.google.protobuf.any.Any(
typeUrl = "type.googleapis.com/" + t.companion.scalaDescriptor.fullName,
value = t.toByteString
)
}

def toProtoErrorDetailsAny: connectrpc.ErrorDetailsAny =
connectrpc.ErrorDetailsAny(
`type` = t.companion.scalaDescriptor.fullName,
value = t.toByteString
)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object GrpcClientCalls {
* [[StreamObserverToCallListenerAdapter]] either executes [[onNext]] -> [[onCompleted]] during the happy path
* or just [[onError]] in case of an error.
*/
private class CallbackObserver[F[_] : Async, Resp](cb: Either[Throwable, Resp] => Unit) extends StreamObserver[Resp] {
private class CallbackObserver[Resp](cb: Either[Throwable, Resp] => Unit) extends StreamObserver[Resp] {
private var value: Option[Either[Throwable, Resp]] = None

override def onNext(value: Resp): Unit = {
Expand All @@ -81,7 +81,6 @@ object GrpcClientCalls {
case Some(v) => cb(v)
case None => cb(Left(new IllegalStateException("No value received or call to onCompleted after onError")))
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.ivovk.connect_rpc_scala.grpc

import connectrpc.ErrorDetailsAny
import io.grpc.Metadata

object GrpcHeaders {

val ErrorDetailsKey: Metadata.Key[ErrorDetailsAny] =
Metadata.Key.of("connect-error-details-bin", new Metadata.BinaryMarshaller[ErrorDetailsAny] {
override def toBytes(value: ErrorDetailsAny): Array[Byte] = value.toByteArray

override def parseBytes(serialized: Array[Byte]): ErrorDetailsAny = ErrorDetailsAny.parseFrom(serialized)
})

inline def asciiKey(name: String): Metadata.Key[String] =
Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER)

}
44 changes: 44 additions & 0 deletions core/src/main/scala/org/ivovk/connect_rpc_scala/syntax/all.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.ivovk.connect_rpc_scala.syntax

import io.grpc.{StatusException, StatusRuntimeException}
import org.ivovk.connect_rpc_scala.grpc.GrpcHeaders
import scalapb.GeneratedMessage

object all extends RuntimeExceptionSyntax, ProtoMappingsSyntax

trait RuntimeExceptionSyntax {

extension (e: StatusRuntimeException) {
def withDetails[T <: GeneratedMessage](t: T): StatusRuntimeException = {
e.getTrailers.put(GrpcHeaders.ErrorDetailsKey, connectrpc.ErrorDetailsAny(
`type` = t.companion.scalaDescriptor.fullName,
value = t.toByteString
))
e
}
}

extension (e: StatusException) {
def withDetails[T <: GeneratedMessage](t: T): StatusException = {
e.getTrailers.put(GrpcHeaders.ErrorDetailsKey, connectrpc.ErrorDetailsAny(
`type` = t.companion.scalaDescriptor.fullName,
value = t.toByteString
))
e
}
}

}

trait ProtoMappingsSyntax {

extension [T <: GeneratedMessage](t: T) {
def toProtoAny: com.google.protobuf.any.Any = {
com.google.protobuf.any.Any(
typeUrl = "type.googleapis.com/" + t.companion.scalaDescriptor.fullName,
value = t.toByteString
)
}
}

}

0 comments on commit 6db2aed

Please sign in to comment.