Skip to content

Commit

Permalink
Merge branch 'main' into update/all
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohdezma committed Jul 23, 2024
2 parents c61a568 + ad89d29 commit 3045331
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 104 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ThisBuild / scalaVersion := "2.13.14"
ThisBuild / crossScalaVersions := Seq("2.12.19", "2.13.14", "3.3.3")
ThisBuild / organization := "com.permutive"
ThisBuild / versionPolicyIntention := Compatibility.BinaryAndSourceCompatible
ThisBuild / versionPolicyIntention := Compatibility.BinaryCompatible

addCommandAlias("ci-test", "fix --check; versionPolicyCheck; mdoc; publishLocal; +test")
addCommandAlias("ci-docs", "github; mdoc; headerCreateAll")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,34 @@

package fs2.pubsub.grpc

import cats.effect.Temporal

import fs2.pubsub.MessageEncoder
import fs2.pubsub.dsl.client.PubSubClientStep
import fs2.pubsub.dsl.publisher.PubSubPublisherStep
import fs2.pubsub.dsl.subscriber.PubSubSubscriberStep

object GrpcConstructors {

trait Client {}
trait Publisher {

@deprecated("gRPC implementation is not available on Scala 2.12", "1.1.0")
def grpc[F[_]: Temporal, A: MessageEncoder]: PubSubPublisherStep[F, A] = ???

}

trait Subscriber {

@deprecated("gRPC implementation is not available on Scala 2.12", "1.1.0")
def grpc[F[_]: Temporal]: PubSubSubscriberStep[F] = ???

}

trait Client {

trait Subscriber {}
@deprecated("gRPC implementation is not available on Scala 2.12", "1.1.0")
def grpc[F[_]: Temporal]: PubSubClientStep[F] = ???

trait Publisher {}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package fs2.pubsub.grpc

import java.util.Base64

import cats.effect.Temporal
import cats.syntax.all._

Expand Down Expand Up @@ -114,7 +112,7 @@ object GrpcConstructors {
): F[List[MessageId]] = {
val toPubSubMessage = (record: PubSubRecord.Publisher[A]) =>
PubsubMessage(
data = ByteString.copyFromUtf8(Base64.getEncoder().encodeToString(MessageEncoder[A].encode(record.data))),
data = ByteString.copyFrom(MessageEncoder[A].encode(record.data)),
attributes = record.attributes
)

Expand All @@ -140,7 +138,7 @@ object GrpcConstructors {

val toPubSubRecord = (message: ReceivedMessage) =>
PubSubRecord.Subscriber(
message.message.map(m => m.data.toByteArray()).map(Base64.getDecoder().decode),
message.message.map(m => m.data.toByteArray()),
message.message.map(_.attributes).orEmpty,
message.message.map(_.messageId).map(MessageId(_)),
message.message.flatMap(_.publishTime.map(_.asJavaInstant)),
Expand Down
204 changes: 108 additions & 96 deletions modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package fs2.pubsub

import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.util.Properties

import cats.effect.IO
import cats.effect.std.Queue
Expand All @@ -26,6 +28,7 @@ import com.dimafeng.testcontainers.GenericContainer
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
import com.permutive.common.types.gcp.http4s._
import fs2.Chunk
import fs2.pubsub.dsl.client.PubSubClientStep
import io.circe.Json
import io.circe.syntax._
import munit.CatsEffectSuite
Expand All @@ -38,66 +41,75 @@ import org.testcontainers.containers.wait.strategy.Wait

class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {

afterProducing(records = 1)
.test("it should send and receive a message, acknowledging as expected") { subscriber =>
val result = subscriber
.evalTap(_.ack)
.map(_.value)
.interruptAfter(2.seconds)
.compile
.toList

val expected = List("ping".some)

assertIO(result, expected)
}
@nowarn("msg=deprecated")
val options =
if (Properties.releaseVersion.orEmpty.startsWith("2.12"))
List(("HTTP", PubSubClient.http[IO]))
else
List(("gRPC", PubSubClient.grpc[IO]), ("HTTP", PubSubClient.http[IO]))

options.foreach { case (clientType, constructor) =>
afterProducing(constructor, records = 1)
.test(s"$clientType - it should send and receive a message, acknowledging as expected") { subscriber =>
val result = subscriber
.evalTap(_.ack)
.map(_.value)
.interruptAfter(2.seconds)
.compile
.toList

val expected = List("ping".some)

assertIO(result, expected)
}

afterProducing(records = 5)
.test("it should preserve chunksize in the underlying stream") { subscriber =>
val result = subscriber.chunks
.evalTap(_.traverse(_.ack))
.interruptAfter(2.seconds)
.map(_.map(_.value))
.compile
.toList
afterProducing(constructor, records = 5)
.test(s"$clientType - it should preserve chunksize in the underlying stream") { subscriber =>
val result = subscriber.chunks
.evalTap(_.traverse(_.ack))
.interruptAfter(2.seconds)
.map(_.map(_.value))
.compile
.toList

assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some)))
}
assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some)))
}

afterProducing(records = 1, withAckDeadlineSeconds = 2)
.test("it should extend the deadline for a message") { subscriber =>
val deadline = AckDeadline.from(10.seconds).toOption.get
afterProducing(constructor, records = 1, withAckDeadlineSeconds = 2)
.test(s"$clientType - it should extend the deadline for a message") { subscriber =>
val deadline = AckDeadline.from(10.seconds).toOption.get

val result = subscriber
.evalTap(_.extendDeadline(deadline))
.evalTap(_ => IO.sleep(3.seconds))
.evalTap(_.ack)
.interruptAfter(5.seconds)
.compile
.count
val result = subscriber
.evalTap(_.extendDeadline(deadline))
.evalTap(_ => IO.sleep(3.seconds))
.evalTap(_.ack)
.interruptAfter(5.seconds)
.compile
.count

assertIO(result, 1L)
}
assertIO(result, 1L)
}

afterProducing(records = 1)
.test("it should nack a message properly") { subscriber =>
val result = subscriber
.evalScan(false) { case (nackedAlready, record) =>
if (nackedAlready) record.ack.as(true) else record.nack.as(true)
}
.void
.interruptAfter(2.seconds)
.compile
.count
afterProducing(constructor, records = 1)
.test(s"$clientType - it should nack a message properly") { subscriber =>
val result = subscriber
.evalScan(false) { case (nackedAlready, record) =>
if (nackedAlready) record.ack.as(true) else record.nack.as(true)
}
.void
.interruptAfter(2.seconds)
.compile
.count

assertIO(result, 3L)
}
assertIO(result, 3L)
}
}

//////////////
// Fixtures //
//////////////

val projects = List.fill(4)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
s"test-project-${index + 1},$topics"
}

Expand All @@ -109,54 +121,54 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {
.toResource
)

def afterProducing(records: Int, withAckDeadlineSeconds: Int = 10) = ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = PubSubClient
.http[IO]
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}
def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) =
ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].withHttp2.build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = constructor
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}

case object container
extends GenericContainer(
Expand Down

0 comments on commit 3045331

Please sign in to comment.