Skip to content

Commit

Permalink
Support passing response headers when backend returns an error (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-vovk authored Dec 6, 2024
1 parent 29dab9a commit a2a687e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ Diagnostic data from the server itself is written to the log file `out/out.log`.

### Connect protocol conformance tests status

Current status: __77/79__ tests pass.
✅ JSON codec conformance status: __79/79__ tests pass.

Known issues:

* Response headers are ignored in case of an error from the server
~~* Response headers are ignored in case of an error from the server~~

## Future improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
requests = requests
)

val trailers = mkMetadata(Seq.concat(
responseDefinition.responseHeaders,
responseDefinition.responseTrailers.map(h => h.copy(name = s"trailer-${h.name}")),
))

val responseData = responseDefinition.response match {
case UnaryResponseDefinition.Response.ResponseData(bs) =>
bs.some
Expand All @@ -72,22 +77,22 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai
TextFormat.printToSingleLineUnicodeString(requestInfo.toProtoErrorDetailsAny)
)

throw new StatusRuntimeException(status)
throw new StatusRuntimeException(status, trailers)
}

val trailers = mkMetadata(Seq.concat(
responseDefinition.responseHeaders,
responseDefinition.responseTrailers.map(h => h.copy(name = s"trailer-${h.name}")),
))
val sleep = Duration(responseDefinition.responseDelayMs, TimeUnit.MILLISECONDS)

Async[F].sleep(Duration(responseDefinition.responseDelayMs, TimeUnit.MILLISECONDS)) *>
Async[F].pure(UnaryHandlerResponse(
payload = ConformancePayload(
Async[F].delayBy(
UnaryHandlerResponse(
ConformancePayload(
responseData.getOrElse(ByteString.EMPTY),
requestInfo.some
),
trailers = trailers
))
trailers
).pure[F],
sleep
)

}

private def keyof(key: String): Metadata.Key[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.grpc.*
import io.grpc.MethodDescriptor.MethodType
import io.grpc.stub.MetadataUtils
import org.http4s.dsl.Http4sDsl
import org.http4s.{Header, MediaType, MessageFailure, Method, Response}
import org.http4s.{Header, Headers, MediaType, MessageFailure, Method, Response}
import org.ivovk.connect_rpc_scala.Mappings.*
import org.ivovk.connect_rpc_scala.grpc.{GrpcClientCalls, MethodName, MethodRegistry}
import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name`
Expand Down Expand Up @@ -150,6 +150,12 @@ class ConnectHandler[F[_] : Async](
case e => e.getMessage
})

val headers = e match {
case e: StatusRuntimeException => e.getTrailers.toHeaders(trailing = !treatTrailersAsHeaders)
case e: StatusException => e.getTrailers.toHeaders(trailing = !treatTrailersAsHeaders)
case _ => Headers.empty
}

val messageWithDetails = rawMessage
.map(
_.split("\n").partition(m => !m.startsWith("type: "))
Expand Down Expand Up @@ -179,7 +185,7 @@ class ConnectHandler[F[_] : Async](
logger.trace(s"<<< Http Status: $httpStatus, Connect Error Code: $connectCode, Message: ${rawMessage.orNull}")
}

Response[F](httpStatus).withEntity(connectrpc.Error(
Response[F](httpStatus, headers = headers).withEntity(connectrpc.Error(
code = connectCode,
message = message,
details = details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ object GrpcClientCalls {
}

/**
* [[CallbackObserver]] either executes [[onNext]] -> [[onCompleted]] during the happy path or just [[onError]] in case of
* an error.
* [[StreamObserverToCallListenerAdapter]] either executes [[onNext]] -> [[onCompleted]] during the happy path
* or just [[onError]] in case of an error.
*/
private class CallbackObserver[F[_] : Async, Resp](cb: Either[Throwable, Resp] => Unit) extends StreamObserver[Resp] {
private var value: Option[Either[Throwable, Resp]] = None
Expand Down

0 comments on commit a2a687e

Please sign in to comment.