From a21379cddc429347ff31e48960b9cdf985b36f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Herna=CC=81ndez?= Date: Tue, 25 Jun 2024 13:33:33 +0200 Subject: [PATCH 1/4] gRPC should not use base-64 encoding/decoding --- .../main/scala-2.13+/fs2/pubsub/grpc/GrpcConstructors.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/fs2-pubsub/src/main/scala-2.13+/fs2/pubsub/grpc/GrpcConstructors.scala b/modules/fs2-pubsub/src/main/scala-2.13+/fs2/pubsub/grpc/GrpcConstructors.scala index 8fe42c9f..74361828 100644 --- a/modules/fs2-pubsub/src/main/scala-2.13+/fs2/pubsub/grpc/GrpcConstructors.scala +++ b/modules/fs2-pubsub/src/main/scala-2.13+/fs2/pubsub/grpc/GrpcConstructors.scala @@ -16,8 +16,6 @@ package fs2.pubsub.grpc -import java.util.Base64 - import cats.effect.Temporal import cats.syntax.all._ @@ -114,7 +112,7 @@ object GrpcConstructors { ): F[List[MessageId]] = { val toPubSubMessage = (record: PubSubRecord.Publisher[A]) => PubsubMessage( - data = ByteString.copyFromUtf8(Base64.getEncoder().encodeToString(MessageEncoder[A].encode(record.data))), + data = ByteString.copyFrom(MessageEncoder[A].encode(record.data)), attributes = record.attributes ) @@ -140,7 +138,7 @@ object GrpcConstructors { val toPubSubRecord = (message: ReceivedMessage) => PubSubRecord.Subscriber( - message.message.map(m => m.data.toByteArray()).map(Base64.getDecoder().decode), + message.message.map(m => m.data.toByteArray()), message.message.map(_.attributes).orEmpty, message.message.map(_.messageId).map(MessageId(_)), message.message.flatMap(_.publishTime.map(_.asJavaInstant)), From 5f1fb31d60b3aa1739c4429ecc4afe9d8cab722f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Herna=CC=81ndez?= Date: Tue, 25 Jun 2024 18:41:37 +0200 Subject: [PATCH 2/4] Add deprecated-unimplemented versions of gRPC constructors for Scala 2.12 This way people can easily see they're not available, and it makes it easier for us to test both HTTP and gRPC constructors --- .../fs2/pubsub/GrpcConstructors.scala | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/modules/fs2-pubsub/src/main/scala-2.12/fs2/pubsub/GrpcConstructors.scala b/modules/fs2-pubsub/src/main/scala-2.12/fs2/pubsub/GrpcConstructors.scala index 0f74f404..8e02e702 100644 --- a/modules/fs2-pubsub/src/main/scala-2.12/fs2/pubsub/GrpcConstructors.scala +++ b/modules/fs2-pubsub/src/main/scala-2.12/fs2/pubsub/GrpcConstructors.scala @@ -16,12 +16,34 @@ package fs2.pubsub.grpc +import cats.effect.Temporal + +import fs2.pubsub.MessageEncoder +import fs2.pubsub.dsl.client.PubSubClientStep +import fs2.pubsub.dsl.publisher.PubSubPublisherStep +import fs2.pubsub.dsl.subscriber.PubSubSubscriberStep + object GrpcConstructors { - trait Client {} + trait Publisher { + + @deprecated("gRPC implementation is not available on Scala 2.12", "1.1.0") + def grpc[F[_]: Temporal, A: MessageEncoder]: PubSubPublisherStep[F, A] = ??? + + } + + trait Subscriber { + + @deprecated("gRPC implementation is not available on Scala 2.12", "1.1.0") + def grpc[F[_]: Temporal]: PubSubSubscriberStep[F] = ??? + + } + + trait Client { - trait Subscriber {} + @deprecated("gRPC implementation is not available on Scala 2.12", "1.1.0") + def grpc[F[_]: Temporal]: PubSubClientStep[F] = ??? - trait Publisher {} + } } From 47eb027263f0bba3ce9c45aee320157b8c7f24ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Herna=CC=81ndez?= Date: Tue, 25 Jun 2024 13:33:45 +0200 Subject: [PATCH 3/4] Ensure we test both gRPC & HTTP clients --- .../test/scala/fs2/pubsub/PubSubSuite.scala | 204 +++++++++--------- 1 file changed, 108 insertions(+), 96 deletions(-) diff --git a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala index 2b0a04a1..fc8f59c4 100644 --- a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala +++ b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala @@ -16,7 +16,9 @@ package fs2.pubsub +import scala.annotation.nowarn import scala.concurrent.duration._ +import scala.util.Properties import cats.effect.IO import cats.effect.std.Queue @@ -26,6 +28,7 @@ import com.dimafeng.testcontainers.GenericContainer import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures import com.permutive.common.types.gcp.http4s._ import fs2.Chunk +import fs2.pubsub.dsl.client.PubSubClientStep import io.circe.Json import io.circe.syntax._ import munit.CatsEffectSuite @@ -38,66 +41,75 @@ import org.testcontainers.containers.wait.strategy.Wait class PubSubSuite extends CatsEffectSuite with TestContainersFixtures { - afterProducing(records = 1) - .test("it should send and receive a message, acknowledging as expected") { subscriber => - val result = subscriber - .evalTap(_.ack) - .map(_.value) - .interruptAfter(2.seconds) - .compile - .toList - - val expected = List("ping".some) - - assertIO(result, expected) - } + @nowarn("msg=deprecated") + val options = + if (Properties.releaseVersion.orEmpty.startsWith("2.12")) + List(("HTTP", PubSubClient.http[IO])) + else + List(("gRPC", PubSubClient.grpc[IO]), ("HTTP", PubSubClient.http[IO])) + + options.foreach { case (clientType, constructor) => + afterProducing(constructor, records = 1) + .test(s"$clientType - it should send and receive a message, acknowledging as expected") { subscriber => + val result = subscriber + .evalTap(_.ack) + .map(_.value) + .interruptAfter(2.seconds) + .compile + .toList + + val expected = List("ping".some) + + assertIO(result, expected) + } - afterProducing(records = 5) - .test("it should preserve chunksize in the underlying stream") { subscriber => - val result = subscriber.chunks - .evalTap(_.traverse(_.ack)) - .interruptAfter(2.seconds) - .map(_.map(_.value)) - .compile - .toList + afterProducing(constructor, records = 5) + .test(s"$clientType - it should preserve chunksize in the underlying stream") { subscriber => + val result = subscriber.chunks + .evalTap(_.traverse(_.ack)) + .interruptAfter(2.seconds) + .map(_.map(_.value)) + .compile + .toList - assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some))) - } + assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some))) + } - afterProducing(records = 1, withAckDeadlineSeconds = 2) - .test("it should extend the deadline for a message") { subscriber => - val deadline = AckDeadline.from(10.seconds).toOption.get + afterProducing(constructor, records = 1, withAckDeadlineSeconds = 2) + .test(s"$clientType - it should extend the deadline for a message") { subscriber => + val deadline = AckDeadline.from(10.seconds).toOption.get - val result = subscriber - .evalTap(_.extendDeadline(deadline)) - .evalTap(_ => IO.sleep(3.seconds)) - .evalTap(_.ack) - .interruptAfter(5.seconds) - .compile - .count + val result = subscriber + .evalTap(_.extendDeadline(deadline)) + .evalTap(_ => IO.sleep(3.seconds)) + .evalTap(_.ack) + .interruptAfter(5.seconds) + .compile + .count - assertIO(result, 1L) - } + assertIO(result, 1L) + } - afterProducing(records = 1) - .test("it should nack a message properly") { subscriber => - val result = subscriber - .evalScan(false) { case (nackedAlready, record) => - if (nackedAlready) record.ack.as(true) else record.nack.as(true) - } - .void - .interruptAfter(2.seconds) - .compile - .count + afterProducing(constructor, records = 1) + .test(s"$clientType - it should nack a message properly") { subscriber => + val result = subscriber + .evalScan(false) { case (nackedAlready, record) => + if (nackedAlready) record.ack.as(true) else record.nack.as(true) + } + .void + .interruptAfter(2.seconds) + .compile + .count - assertIO(result, 3L) - } + assertIO(result, 3L) + } + } ////////////// // Fixtures // ////////////// - val projects = List.fill(4)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) => + val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) => s"test-project-${index + 1},$topics" } @@ -109,54 +121,54 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures { .toResource ) - def afterProducing(records: Int, withAckDeadlineSeconds: Int = 10) = ResourceFunFixture { - IO(projectsFixture().take).flatten.toResource - .product(EmberClientBuilder.default[IO].build) - .evalTap { case (projectId, client) => - val body = Json.obj( - "subscription" := Json.obj( - "topic" := "example-topic", - "ackDeadlineSeconds" := withAckDeadlineSeconds - ), - "updateMask" := "ackDeadlineSeconds" - ) - - val request = - PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") - - client.expect[Unit](request) - } - .map { case (projectId, client) => - val pubSubClient = PubSubClient - .http[IO] - .projectId(projectId) - .uri(container.uri) - .httpClient(client) - .noRetry - - val publisher = pubSubClient - .publisher[String] - .topic(Topic("example-topic")) - - val subscriber = pubSubClient.subscriber - .subscription(Subscription("example-subscription")) - .errorHandler { - case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t) - case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t) - case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack - } - .withDefaults - .decodeTo[String] - .subscribe - - (publisher, subscriber) - } - .evalTap { - case (publisher, _) if records === 1 => publisher.publishOne("ping") - case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping"))) - } - ._2F - } + def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) = + ResourceFunFixture { + IO(projectsFixture().take).flatten.toResource + .product(EmberClientBuilder.default[IO].withHttp2.build) + .evalTap { case (projectId, client) => + val body = Json.obj( + "subscription" := Json.obj( + "topic" := "example-topic", + "ackDeadlineSeconds" := withAckDeadlineSeconds + ), + "updateMask" := "ackDeadlineSeconds" + ) + + val request = + PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") + + client.expect[Unit](request) + } + .map { case (projectId, client) => + val pubSubClient = constructor + .projectId(projectId) + .uri(container.uri) + .httpClient(client) + .noRetry + + val publisher = pubSubClient + .publisher[String] + .topic(Topic("example-topic")) + + val subscriber = pubSubClient.subscriber + .subscription(Subscription("example-subscription")) + .errorHandler { + case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t) + case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t) + case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack + } + .withDefaults + .decodeTo[String] + .subscribe + + (publisher, subscriber) + } + .evalTap { + case (publisher, _) if records === 1 => publisher.publishOne("ping") + case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping"))) + } + ._2F + } case object container extends GenericContainer( From 9699bd38073bb5bc46e36857ea64aea5da0f3c22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Herna=CC=81ndez?= Date: Tue, 25 Jun 2024 18:41:59 +0200 Subject: [PATCH 4/4] Relax `versionPolicyIntention` --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index dc641b96..e213b0c3 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ ThisBuild / scalaVersion := "2.13.14" ThisBuild / crossScalaVersions := Seq("2.12.19", "2.13.14", "3.3.3") ThisBuild / organization := "com.permutive" -ThisBuild / versionPolicyIntention := Compatibility.BinaryAndSourceCompatible +ThisBuild / versionPolicyIntention := Compatibility.BinaryCompatible addCommandAlias("ci-test", "fix --check; versionPolicyCheck; mdoc; publishLocal; +test") addCommandAlias("ci-docs", "github; mdoc; headerCreateAll")