From 0b9aff6e3e27c09d4297ac7452fc738b5aceae00 Mon Sep 17 00:00:00 2001 From: Gregory-Berkman-Imprivata Date: Tue, 22 Aug 2023 16:14:18 -0400 Subject: [PATCH 1/4] Add StatusException trailers to response metadata --- .../zio_grpc/server/ListenerDriver.scala | 3 +- .../scalapb/zio_grpc/TestServiceImpl.scala | 78 +++++++++++-------- .../scalapb/zio_grpc/CommonServiceSpec.scala | 2 +- .../scala/scalapb/zio_grpc/TestUtils.scala | 9 +++ 4 files changed, 57 insertions(+), 35 deletions(-) diff --git a/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala b/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala index 09555a96d..85e3bb2cc 100644 --- a/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala +++ b/core/src/main/scalajvm/scalapb/zio_grpc/server/ListenerDriver.scala @@ -29,7 +29,8 @@ object ListenerDriver { completed.await *> call.sendHeaders(new Metadata) *> request.await flatMap writeResponse - ).onExit(ex => call.close(ListenerDriver.exitToStatus(ex), requestContext.responseMetadata.metadata).ignore) + ).tapError(statusException => requestContext.responseMetadata.wrap(_.merge(statusException.getTrailers)).ignore) + .onExit(ex => call.close(ListenerDriver.exitToStatus(ex), requestContext.responseMetadata.metadata).ignore) .ignore // Why forkDaemon? we need the driver to keep runnning in the background after we return a listener // back to grpc-java. If it was just fork, the call to unsafeRun would not return control, so grpc-java diff --git a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala index 3f6e85fc0..eabe51286 100644 --- a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala +++ b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala @@ -3,7 +3,8 @@ package scalapb.zio_grpc import scalapb.zio_grpc.testservice.Request import zio.{Clock, Console, Exit, Promise, ZIO, ZLayer} import scalapb.zio_grpc.testservice.Response -import io.grpc.{Status, StatusException} +import io.grpc.Metadata.BinaryMarshaller +import io.grpc.{Metadata, Status, StatusException} import scalapb.zio_grpc.testservice.Request.Scenario import zio.stream.{Stream, ZStream} import zio.ZEnvironment @@ -17,11 +18,11 @@ package object server { object TestServiceImpl { class Service( - requestReceived: zio.Promise[Nothing, Unit], - delayReceived: zio.Promise[Nothing, Unit], - exit: zio.Promise[Nothing, Exit[StatusException, Response]] - )(clock: Clock, console: Console) - extends testservice.ZioTestservice.TestService { + requestReceived: zio.Promise[Nothing, Unit], + delayReceived: zio.Promise[Nothing, Unit], + exit: zio.Promise[Nothing, Exit[StatusException, Response]] + )(clock: Clock, console: Console) + extends testservice.ZioTestservice.TestService { def unary(request: Request): ZIO[Any, StatusException, Response] = (requestReceived.succeed(()) *> (request.scenario match { case Scenario.OK => @@ -29,18 +30,29 @@ package object server { Response(out = "Res" + request.in.toString) ) case Scenario.ERROR_NOW => - ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException()) - case Scenario.DELAY => ZIO.never - case Scenario.DIE => ZIO.die(new RuntimeException("FOO")) - case _ => ZIO.fail(Status.UNKNOWN.asException()) + ZIO.succeed { + val metadataKey = Metadata.Key.of("foo-bin", new BinaryMarshaller[String] { + override def toBytes(value: String): Array[Byte] = value.getBytes + + override def parseBytes(serialized: Array[Byte]): String = new String(serialized) + }) + val metadata = new Metadata() + metadata.put(metadataKey, "bar") + metadata + }.flatMap { metadata => + ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException(metadata)) + } + case Scenario.DELAY => ZIO.never + case Scenario.DIE => ZIO.die(new RuntimeException("FOO")) + case _ => ZIO.fail(Status.UNKNOWN.asException()) })).onExit(exit.succeed(_)) def unaryTypeMapped(request: Request): ZIO[Any, StatusException, WrappedString] = unary(request).map(r => WrappedString(r.out)) def serverStreaming( - request: Request - ): ZStream[Any, StatusException, Response] = + request: Request + ): ZStream[Any, StatusException, Response] = ZStream .acquireReleaseExitWith(requestReceived.succeed(())) { (_, ex) => ex.foldExit( @@ -53,22 +65,22 @@ package object server { } .flatMap { _ => request.scenario match { - case Scenario.OK => + case Scenario.OK => ZStream(Response(out = "X1"), Response(out = "X2")) - case Scenario.ERROR_NOW => + case Scenario.ERROR_NOW => ZStream.fail(Status.INTERNAL.withDescription("FOO!").asException()) case Scenario.ERROR_AFTER => ZStream(Response(out = "X1"), Response(out = "X2")) ++ ZStream .fail( Status.INTERNAL.withDescription("FOO!").asException() ) - case Scenario.DELAY => + case Scenario.DELAY => ZStream( Response(out = "X1"), Response(out = "X2") ) ++ ZStream.never - case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) - case _ => ZStream.fail(Status.UNKNOWN.asException()) + case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) + case _ => ZStream.fail(Status.UNKNOWN.asException()) } } @@ -76,37 +88,37 @@ package object server { serverStreaming(request).map(r => WrappedString(r.out)) def clientStreaming( - request: Stream[StatusException, Request] - ): ZIO[Any, StatusException, Response] = + request: Stream[StatusException, Request] + ): ZIO[Any, StatusException, Response] = requestReceived.succeed(()) *> request .runFoldZIO(0)((state, req) => req.scenario match { - case Scenario.OK => ZIO.succeed(state + req.in) - case Scenario.DELAY => delayReceived.succeed(()) *> ZIO.never - case Scenario.DIE => ZIO.die(new RuntimeException("foo")) + case Scenario.OK => ZIO.succeed(state + req.in) + case Scenario.DELAY => delayReceived.succeed(()) *> ZIO.never + case Scenario.DIE => ZIO.die(new RuntimeException("foo")) case Scenario.ERROR_NOW => ZIO.fail((Status.INTERNAL.withDescription("InternalError").asException())) - case _: Scenario => ZIO.fail(Status.UNKNOWN.asException()) + case _: Scenario => ZIO.fail(Status.UNKNOWN.asException()) } ) .map(r => Response(r.toString)) .onExit(exit.succeed(_)) def bidiStreaming( - request: Stream[StatusException, Request] - ): Stream[StatusException, Response] = + request: Stream[StatusException, Request] + ): Stream[StatusException, Response] = (ZStream.fromZIO(requestReceived.succeed(())).drain ++ (request.flatMap { r => r.scenario match { - case Scenario.OK => + case Scenario.OK => ZStream(Response(r.in.toString)) .repeat(Schedule.recurs(r.in - 1)) - case Scenario.DELAY => ZStream.never - case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) + case Scenario.DELAY => ZStream.never + case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) case Scenario.ERROR_NOW => ZStream.fail(Status.INTERNAL.withDescription("Intentional error").asException()) - case _ => + case _ => ZStream.fail( Status.INVALID_ARGUMENT.withDescription(s"Got request: ${r.toProtoString}").asException() ) @@ -123,9 +135,9 @@ package object server { } def make( - clock: Clock, - console: Console - ): zio.IO[Nothing, TestServiceImpl.Service] = + clock: Clock, + console: Console + ): zio.IO[Nothing, TestServiceImpl.Service] = for { p1 <- Promise.make[Nothing, Unit] p2 <- Promise.make[Nothing, Unit] @@ -134,7 +146,7 @@ package object server { def makeFromEnv: ZIO[Any, Nothing, Service] = for { - clock <- ZIO.clock + clock <- ZIO.clock console <- ZIO.console service <- make(clock, console) } yield service diff --git a/e2e/src/test/scala/scalapb/zio_grpc/CommonServiceSpec.scala b/e2e/src/test/scala/scalapb/zio_grpc/CommonServiceSpec.scala index 4aceaccd8..7e4f3c6b4 100644 --- a/e2e/src/test/scala/scalapb/zio_grpc/CommonServiceSpec.scala +++ b/e2e/src/test/scala/scalapb/zio_grpc/CommonServiceSpec.scala @@ -29,7 +29,7 @@ trait CommonTestServiceSpec { .unary(Request(Request.Scenario.ERROR_NOW, in = 12)) .exit )( - fails(hasStatusCode(Status.INTERNAL)) + fails(hasStatusCode(Status.INTERNAL) && hasMetadataKey("foo-bin")) ) }, test("returns response on failures") { diff --git a/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala b/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala index 92a6a429f..ab5fe0678 100644 --- a/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala +++ b/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala @@ -1,5 +1,7 @@ package scalapb.zio_grpc +import scala.jdk.CollectionConverters.CollectionHasAsScala + import zio.test.Assertion._ import io.grpc.{Status, StatusException} import io.grpc.Status.Code @@ -18,6 +20,13 @@ object TestUtils { equalTo(d) ) + def hasMetadataKey(key: String) = + hasField[StatusException, Iterable[String]]( + "metadataKey", + e => e.getTrailers.keys.asScala, + contains(key) + ) + def collectWithError[R, E, A]( zs: ZStream[R, E, A] ): URIO[R, (List[A], Option[E])] = From dbe8b3ff63c7243c77a08e752f14466d7dd279ae Mon Sep 17 00:00:00 2001 From: Gregory-Berkman-Imprivata Date: Tue, 22 Aug 2023 16:30:07 -0400 Subject: [PATCH 2/4] fixed formatting --- .../scalapb/zio_grpc/TestServiceImpl.scala | 78 ++++++++----------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala index eabe51286..3f6e85fc0 100644 --- a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala +++ b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala @@ -3,8 +3,7 @@ package scalapb.zio_grpc import scalapb.zio_grpc.testservice.Request import zio.{Clock, Console, Exit, Promise, ZIO, ZLayer} import scalapb.zio_grpc.testservice.Response -import io.grpc.Metadata.BinaryMarshaller -import io.grpc.{Metadata, Status, StatusException} +import io.grpc.{Status, StatusException} import scalapb.zio_grpc.testservice.Request.Scenario import zio.stream.{Stream, ZStream} import zio.ZEnvironment @@ -18,11 +17,11 @@ package object server { object TestServiceImpl { class Service( - requestReceived: zio.Promise[Nothing, Unit], - delayReceived: zio.Promise[Nothing, Unit], - exit: zio.Promise[Nothing, Exit[StatusException, Response]] - )(clock: Clock, console: Console) - extends testservice.ZioTestservice.TestService { + requestReceived: zio.Promise[Nothing, Unit], + delayReceived: zio.Promise[Nothing, Unit], + exit: zio.Promise[Nothing, Exit[StatusException, Response]] + )(clock: Clock, console: Console) + extends testservice.ZioTestservice.TestService { def unary(request: Request): ZIO[Any, StatusException, Response] = (requestReceived.succeed(()) *> (request.scenario match { case Scenario.OK => @@ -30,29 +29,18 @@ package object server { Response(out = "Res" + request.in.toString) ) case Scenario.ERROR_NOW => - ZIO.succeed { - val metadataKey = Metadata.Key.of("foo-bin", new BinaryMarshaller[String] { - override def toBytes(value: String): Array[Byte] = value.getBytes - - override def parseBytes(serialized: Array[Byte]): String = new String(serialized) - }) - val metadata = new Metadata() - metadata.put(metadataKey, "bar") - metadata - }.flatMap { metadata => - ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException(metadata)) - } - case Scenario.DELAY => ZIO.never - case Scenario.DIE => ZIO.die(new RuntimeException("FOO")) - case _ => ZIO.fail(Status.UNKNOWN.asException()) + ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException()) + case Scenario.DELAY => ZIO.never + case Scenario.DIE => ZIO.die(new RuntimeException("FOO")) + case _ => ZIO.fail(Status.UNKNOWN.asException()) })).onExit(exit.succeed(_)) def unaryTypeMapped(request: Request): ZIO[Any, StatusException, WrappedString] = unary(request).map(r => WrappedString(r.out)) def serverStreaming( - request: Request - ): ZStream[Any, StatusException, Response] = + request: Request + ): ZStream[Any, StatusException, Response] = ZStream .acquireReleaseExitWith(requestReceived.succeed(())) { (_, ex) => ex.foldExit( @@ -65,22 +53,22 @@ package object server { } .flatMap { _ => request.scenario match { - case Scenario.OK => + case Scenario.OK => ZStream(Response(out = "X1"), Response(out = "X2")) - case Scenario.ERROR_NOW => + case Scenario.ERROR_NOW => ZStream.fail(Status.INTERNAL.withDescription("FOO!").asException()) case Scenario.ERROR_AFTER => ZStream(Response(out = "X1"), Response(out = "X2")) ++ ZStream .fail( Status.INTERNAL.withDescription("FOO!").asException() ) - case Scenario.DELAY => + case Scenario.DELAY => ZStream( Response(out = "X1"), Response(out = "X2") ) ++ ZStream.never - case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) - case _ => ZStream.fail(Status.UNKNOWN.asException()) + case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) + case _ => ZStream.fail(Status.UNKNOWN.asException()) } } @@ -88,37 +76,37 @@ package object server { serverStreaming(request).map(r => WrappedString(r.out)) def clientStreaming( - request: Stream[StatusException, Request] - ): ZIO[Any, StatusException, Response] = + request: Stream[StatusException, Request] + ): ZIO[Any, StatusException, Response] = requestReceived.succeed(()) *> request .runFoldZIO(0)((state, req) => req.scenario match { - case Scenario.OK => ZIO.succeed(state + req.in) - case Scenario.DELAY => delayReceived.succeed(()) *> ZIO.never - case Scenario.DIE => ZIO.die(new RuntimeException("foo")) + case Scenario.OK => ZIO.succeed(state + req.in) + case Scenario.DELAY => delayReceived.succeed(()) *> ZIO.never + case Scenario.DIE => ZIO.die(new RuntimeException("foo")) case Scenario.ERROR_NOW => ZIO.fail((Status.INTERNAL.withDescription("InternalError").asException())) - case _: Scenario => ZIO.fail(Status.UNKNOWN.asException()) + case _: Scenario => ZIO.fail(Status.UNKNOWN.asException()) } ) .map(r => Response(r.toString)) .onExit(exit.succeed(_)) def bidiStreaming( - request: Stream[StatusException, Request] - ): Stream[StatusException, Response] = + request: Stream[StatusException, Request] + ): Stream[StatusException, Response] = (ZStream.fromZIO(requestReceived.succeed(())).drain ++ (request.flatMap { r => r.scenario match { - case Scenario.OK => + case Scenario.OK => ZStream(Response(r.in.toString)) .repeat(Schedule.recurs(r.in - 1)) - case Scenario.DELAY => ZStream.never - case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) + case Scenario.DELAY => ZStream.never + case Scenario.DIE => ZStream.die(new RuntimeException("FOO")) case Scenario.ERROR_NOW => ZStream.fail(Status.INTERNAL.withDescription("Intentional error").asException()) - case _ => + case _ => ZStream.fail( Status.INVALID_ARGUMENT.withDescription(s"Got request: ${r.toProtoString}").asException() ) @@ -135,9 +123,9 @@ package object server { } def make( - clock: Clock, - console: Console - ): zio.IO[Nothing, TestServiceImpl.Service] = + clock: Clock, + console: Console + ): zio.IO[Nothing, TestServiceImpl.Service] = for { p1 <- Promise.make[Nothing, Unit] p2 <- Promise.make[Nothing, Unit] @@ -146,7 +134,7 @@ package object server { def makeFromEnv: ZIO[Any, Nothing, Service] = for { - clock <- ZIO.clock + clock <- ZIO.clock console <- ZIO.console service <- make(clock, console) } yield service From f60fd3cc3ff3e2c8f6fbc7f84941e056ec0747e4 Mon Sep 17 00:00:00 2001 From: Gregory-Berkman-Imprivata Date: Tue, 22 Aug 2023 16:54:09 -0400 Subject: [PATCH 3/4] used JavaConverter for test --- .../scalapb/zio_grpc/TestServiceImpl.scala | 16 ++++++++++++++-- .../test/scala/scalapb/zio_grpc/TestUtils.scala | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala index 3f6e85fc0..aedb8b29d 100644 --- a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala +++ b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala @@ -3,7 +3,8 @@ package scalapb.zio_grpc import scalapb.zio_grpc.testservice.Request import zio.{Clock, Console, Exit, Promise, ZIO, ZLayer} import scalapb.zio_grpc.testservice.Response -import io.grpc.{Status, StatusException} +import io.grpc.Metadata.BinaryMarshaller +import io.grpc.{Metadata, Status, StatusException} import scalapb.zio_grpc.testservice.Request.Scenario import zio.stream.{Stream, ZStream} import zio.ZEnvironment @@ -29,7 +30,18 @@ package object server { Response(out = "Res" + request.in.toString) ) case Scenario.ERROR_NOW => - ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException()) + ZIO.succeed { + val metadataKey = Metadata.Key.of("foo-bin", new BinaryMarshaller[String] { + override def toBytes(value: String): Array[Byte] = value.getBytes + + override def parseBytes(serialized: Array[Byte]): String = new String(serialized) + }) + val metadata = new Metadata() + metadata.put(metadataKey, "bar") + metadata + }.flatMap { metadata => + ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException(metadata)) + } case Scenario.DELAY => ZIO.never case Scenario.DIE => ZIO.die(new RuntimeException("FOO")) case _ => ZIO.fail(Status.UNKNOWN.asException()) diff --git a/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala b/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala index ab5fe0678..8b160b248 100644 --- a/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala +++ b/e2e/src/test/scala/scalapb/zio_grpc/TestUtils.scala @@ -1,6 +1,6 @@ package scalapb.zio_grpc -import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.collection.JavaConverters._ import zio.test.Assertion._ import io.grpc.{Status, StatusException} From c293b3459711c62bd61cddbc9feccece3e733a9e Mon Sep 17 00:00:00 2001 From: Gregory-Berkman-Imprivata Date: Tue, 22 Aug 2023 17:04:09 -0400 Subject: [PATCH 4/4] scalafmt --- .../scalapb/zio_grpc/TestServiceImpl.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala index aedb8b29d..2f662ca34 100644 --- a/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala +++ b/e2e/src/main/scalajvm/scalapb/zio_grpc/TestServiceImpl.scala @@ -30,18 +30,18 @@ package object server { Response(out = "Res" + request.in.toString) ) case Scenario.ERROR_NOW => - ZIO.succeed { - val metadataKey = Metadata.Key.of("foo-bin", new BinaryMarshaller[String] { + val metadataKey = Metadata.Key.of( + "foo-bin", + new BinaryMarshaller[String] { override def toBytes(value: String): Array[Byte] = value.getBytes override def parseBytes(serialized: Array[Byte]): String = new String(serialized) - }) - val metadata = new Metadata() - metadata.put(metadataKey, "bar") - metadata - }.flatMap { metadata => - ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException(metadata)) - } + } + ) + ZIO + .succeed(new Metadata()) + .tap(m => ZIO.succeed(m.put(metadataKey, "bar"))) + .flatMap(metadata => ZIO.fail(Status.INTERNAL.withDescription("FOO!").asException(metadata))) case Scenario.DELAY => ZIO.never case Scenario.DIE => ZIO.die(new RuntimeException("FOO")) case _ => ZIO.fail(Status.UNKNOWN.asException())