Skip to content

Commit

Permalink
Ensure we test both gRPC & HTTP clients
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohdezma committed Jun 25, 2024
1 parent 5f1fb31 commit 47eb027
Showing 1 changed file with 108 additions and 96 deletions.
204 changes: 108 additions & 96 deletions modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package fs2.pubsub

import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.util.Properties

import cats.effect.IO
import cats.effect.std.Queue
Expand All @@ -26,6 +28,7 @@ import com.dimafeng.testcontainers.GenericContainer
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
import com.permutive.common.types.gcp.http4s._
import fs2.Chunk
import fs2.pubsub.dsl.client.PubSubClientStep
import io.circe.Json
import io.circe.syntax._
import munit.CatsEffectSuite
Expand All @@ -38,66 +41,75 @@ import org.testcontainers.containers.wait.strategy.Wait

class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {

afterProducing(records = 1)
.test("it should send and receive a message, acknowledging as expected") { subscriber =>
val result = subscriber
.evalTap(_.ack)
.map(_.value)
.interruptAfter(2.seconds)
.compile
.toList

val expected = List("ping".some)

assertIO(result, expected)
}
@nowarn("msg=deprecated")
val options =
if (Properties.releaseVersion.orEmpty.startsWith("2.12"))
List(("HTTP", PubSubClient.http[IO]))
else
List(("gRPC", PubSubClient.grpc[IO]), ("HTTP", PubSubClient.http[IO]))

options.foreach { case (clientType, constructor) =>
afterProducing(constructor, records = 1)
.test(s"$clientType - it should send and receive a message, acknowledging as expected") { subscriber =>
val result = subscriber
.evalTap(_.ack)
.map(_.value)
.interruptAfter(2.seconds)
.compile
.toList

val expected = List("ping".some)

assertIO(result, expected)
}

afterProducing(records = 5)
.test("it should preserve chunksize in the underlying stream") { subscriber =>
val result = subscriber.chunks
.evalTap(_.traverse(_.ack))
.interruptAfter(2.seconds)
.map(_.map(_.value))
.compile
.toList
afterProducing(constructor, records = 5)
.test(s"$clientType - it should preserve chunksize in the underlying stream") { subscriber =>
val result = subscriber.chunks
.evalTap(_.traverse(_.ack))
.interruptAfter(2.seconds)
.map(_.map(_.value))
.compile
.toList

assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some)))
}
assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some)))
}

afterProducing(records = 1, withAckDeadlineSeconds = 2)
.test("it should extend the deadline for a message") { subscriber =>
val deadline = AckDeadline.from(10.seconds).toOption.get
afterProducing(constructor, records = 1, withAckDeadlineSeconds = 2)
.test(s"$clientType - it should extend the deadline for a message") { subscriber =>
val deadline = AckDeadline.from(10.seconds).toOption.get

val result = subscriber
.evalTap(_.extendDeadline(deadline))
.evalTap(_ => IO.sleep(3.seconds))
.evalTap(_.ack)
.interruptAfter(5.seconds)
.compile
.count
val result = subscriber
.evalTap(_.extendDeadline(deadline))
.evalTap(_ => IO.sleep(3.seconds))
.evalTap(_.ack)
.interruptAfter(5.seconds)
.compile
.count

assertIO(result, 1L)
}
assertIO(result, 1L)
}

afterProducing(records = 1)
.test("it should nack a message properly") { subscriber =>
val result = subscriber
.evalScan(false) { case (nackedAlready, record) =>
if (nackedAlready) record.ack.as(true) else record.nack.as(true)
}
.void
.interruptAfter(2.seconds)
.compile
.count
afterProducing(constructor, records = 1)
.test(s"$clientType - it should nack a message properly") { subscriber =>
val result = subscriber
.evalScan(false) { case (nackedAlready, record) =>
if (nackedAlready) record.ack.as(true) else record.nack.as(true)
}
.void
.interruptAfter(2.seconds)
.compile
.count

assertIO(result, 3L)
}
assertIO(result, 3L)
}
}

//////////////
// Fixtures //
//////////////

val projects = List.fill(4)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
s"test-project-${index + 1},$topics"
}

Expand All @@ -109,54 +121,54 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {
.toResource
)

def afterProducing(records: Int, withAckDeadlineSeconds: Int = 10) = ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = PubSubClient
.http[IO]
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}
def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) =
ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].withHttp2.build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = constructor
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}

case object container
extends GenericContainer(
Expand Down

0 comments on commit 47eb027

Please sign in to comment.