diff --git a/README.md b/README.md index 7a387fc..f062c7c 100644 --- a/README.md +++ b/README.md @@ -150,11 +150,11 @@ Diagnostic data from the server itself is written to the log file `out/out.log`. ### Connect protocol conformance tests status -Current status: __77/79__ tests pass. +✅ JSON codec conformance status: __79/79__ tests pass. Known issues: -* Response headers are ignored in case of an error from the server +~~* Response headers are ignored in case of an error from the server~~ ## Future improvements diff --git a/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala b/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala index 22207ea..87d571e 100644 --- a/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala +++ b/conformance/src/main/scala/org/ivovk/connect_rpc_scala/conformance/ConformanceServiceImpl.scala @@ -60,6 +60,11 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai requests = requests ) + val trailers = mkMetadata(Seq.concat( + responseDefinition.responseHeaders, + responseDefinition.responseTrailers.map(h => h.copy(name = s"trailer-${h.name}")), + )) + val responseData = responseDefinition.response match { case UnaryResponseDefinition.Response.ResponseData(bs) => bs.some @@ -72,22 +77,22 @@ class ConformanceServiceImpl[F[_] : Async] extends ConformanceServiceFs2GrpcTrai TextFormat.printToSingleLineUnicodeString(requestInfo.toProtoErrorDetailsAny) ) - throw new StatusRuntimeException(status) + throw new StatusRuntimeException(status, trailers) } - val trailers = mkMetadata(Seq.concat( - responseDefinition.responseHeaders, - responseDefinition.responseTrailers.map(h => h.copy(name = s"trailer-${h.name}")), - )) + val sleep = Duration(responseDefinition.responseDelayMs, TimeUnit.MILLISECONDS) - Async[F].sleep(Duration(responseDefinition.responseDelayMs, TimeUnit.MILLISECONDS)) *> - Async[F].pure(UnaryHandlerResponse( - payload = ConformancePayload( + Async[F].delayBy( + UnaryHandlerResponse( + ConformancePayload( responseData.getOrElse(ByteString.EMPTY), requestInfo.some ), - trailers = trailers - )) + trailers + ).pure[F], + sleep + ) + } private def keyof(key: String): Metadata.Key[String] = diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala index 2c79181..57edb4a 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/ConnectHandler.scala @@ -7,7 +7,7 @@ import io.grpc.* import io.grpc.MethodDescriptor.MethodType import io.grpc.stub.MetadataUtils import org.http4s.dsl.Http4sDsl -import org.http4s.{Header, MediaType, MessageFailure, Method, Response} +import org.http4s.{Header, Headers, MediaType, MessageFailure, Method, Response} import org.ivovk.connect_rpc_scala.Mappings.* import org.ivovk.connect_rpc_scala.grpc.{GrpcClientCalls, MethodName, MethodRegistry} import org.ivovk.connect_rpc_scala.http.Headers.`X-Test-Case-Name` @@ -150,6 +150,12 @@ class ConnectHandler[F[_] : Async]( case e => e.getMessage }) + val headers = e match { + case e: StatusRuntimeException => e.getTrailers.toHeaders(trailing = !treatTrailersAsHeaders) + case e: StatusException => e.getTrailers.toHeaders(trailing = !treatTrailersAsHeaders) + case _ => Headers.empty + } + val messageWithDetails = rawMessage .map( _.split("\n").partition(m => !m.startsWith("type: ")) @@ -179,7 +185,7 @@ class ConnectHandler[F[_] : Async]( logger.trace(s"<<< Http Status: $httpStatus, Connect Error Code: $connectCode, Message: ${rawMessage.orNull}") } - Response[F](httpStatus).withEntity(connectrpc.Error( + Response[F](httpStatus, headers = headers).withEntity(connectrpc.Error( code = connectCode, message = message, details = details diff --git a/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/GrpcClientCalls.scala b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/GrpcClientCalls.scala index eb27679..1bee183 100644 --- a/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/GrpcClientCalls.scala +++ b/core/src/main/scala/org/ivovk/connect_rpc_scala/grpc/GrpcClientCalls.scala @@ -59,8 +59,8 @@ object GrpcClientCalls { } /** - * [[CallbackObserver]] either executes [[onNext]] -> [[onCompleted]] during the happy path or just [[onError]] in case of - * an error. + * [[StreamObserverToCallListenerAdapter]] either executes [[onNext]] -> [[onCompleted]] during the happy path + * or just [[onError]] in case of an error. */ private class CallbackObserver[F[_] : Async, Resp](cb: Either[Throwable, Resp] => Unit) extends StreamObserver[Resp] { private var value: Option[Either[Throwable, Resp]] = None