Skip to content

Commit

Permalink
Ensure we test both gRPC & HTTP clients
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohdezma committed Jun 25, 2024
1 parent a21379c commit 7d55dd9
Showing 1 changed file with 103 additions and 94 deletions.
197 changes: 103 additions & 94 deletions modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fs2.pubsub

import scala.annotation.nowarn
import scala.concurrent.duration._

import cats.effect.IO
Expand All @@ -26,6 +27,7 @@ import com.dimafeng.testcontainers.GenericContainer
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
import com.permutive.common.types.gcp.http4s._
import fs2.Chunk
import fs2.pubsub.dsl.client.PubSubClientStep
import io.circe.Json
import io.circe.syntax._
import munit.CatsEffectSuite
Expand All @@ -36,68 +38,75 @@ import org.http4s.client.dsl.io._
import org.http4s.ember.client.EmberClientBuilder
import org.testcontainers.containers.wait.strategy.Wait

@nowarn("msg=unused")
class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {

afterProducing(records = 1)
.test("it should send and receive a message, acknowledging as expected") { subscriber =>
val result = subscriber
.evalTap(_.ack)
.map(_.value)
.interruptAfter(2.seconds)
.compile
.toList
for {
(clientType, constructor) <- List(("gRPC", PubSubClient.grpc[IO]), ("HTTP", PubSubClient.http[IO]))
} yield {

val expected = List("ping".some)
afterProducing(constructor, records = 1)
.test(s"$clientType - it should send and receive a message, acknowledging as expected") { subscriber =>
val result = subscriber
.evalTap(_.ack)
.map(_.value)
.interruptAfter(2.seconds)
.compile
.toList

assertIO(result, expected)
}
val expected = List("ping".some)

afterProducing(records = 5)
.test("it should preserve chunksize in the underlying stream") { subscriber =>
val result = subscriber.chunks
.evalTap(_.traverse(_.ack))
.interruptAfter(2.seconds)
.map(_.map(_.value))
.compile
.toList
assertIO(result, expected)
}

assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some)))
}
afterProducing(constructor, records = 5)
.test(s"$clientType - it should preserve chunksize in the underlying stream") { subscriber =>
val result = subscriber.chunks
.evalTap(_.traverse(_.ack))
.interruptAfter(2.seconds)
.map(_.map(_.value))
.compile
.toList

afterProducing(records = 1, withAckDeadlineSeconds = 2)
.test("it should extend the deadline for a message") { subscriber =>
val deadline = AckDeadline.from(10.seconds).toOption.get
assertIO(result, List(Chunk("ping".some, "ping".some, "ping".some, "ping".some, "ping".some)))
}

val result = subscriber
.evalTap(_.extendDeadline(deadline))
.evalTap(_ => IO.sleep(3.seconds))
.evalTap(_.ack)
.interruptAfter(5.seconds)
.compile
.count
afterProducing(constructor, records = 1, withAckDeadlineSeconds = 2)
.test(s"$clientType - it should extend the deadline for a message") { subscriber =>
val deadline = AckDeadline.from(10.seconds).toOption.get

assertIO(result, 1L)
}
val result = subscriber
.evalTap(_.extendDeadline(deadline))
.evalTap(_ => IO.sleep(3.seconds))
.evalTap(_.ack)
.interruptAfter(5.seconds)
.compile
.count

afterProducing(records = 1)
.test("it should nack a message properly") { subscriber =>
val result = subscriber
.evalScan(false) { case (nackedAlready, record) =>
if (nackedAlready) record.ack.as(true) else record.nack.as(true)
}
.void
.interruptAfter(2.seconds)
.compile
.count
assertIO(result, 1L)
}

assertIO(result, 3L)
}
afterProducing(constructor, records = 1)
.test(s"$clientType - it should nack a message properly") { subscriber =>
val result = subscriber
.evalScan(false) { case (nackedAlready, record) =>
if (nackedAlready) record.ack.as(true) else record.nack.as(true)
}
.void
.interruptAfter(2.seconds)
.compile
.count

assertIO(result, 3L)
}

}

//////////////
// Fixtures //
//////////////

val projects = List.fill(4)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
s"test-project-${index + 1},$topics"
}

Expand All @@ -109,54 +118,54 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {
.toResource
)

def afterProducing(records: Int, withAckDeadlineSeconds: Int = 10) = ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = PubSubClient
.http[IO]
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}
def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) =
ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].withHttp2.build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = constructor
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}

case object container
extends GenericContainer(
Expand Down

0 comments on commit 7d55dd9

Please sign in to comment.