diff --git a/README.md b/README.md new file mode 100644 index 00000000..ca171645 --- /dev/null +++ b/README.md @@ -0,0 +1,254 @@ +Google Cloud Pub/Sub stream-based client built on top of cats-effect, fs2 and http4s. + +--- + +- [Installation](#installation) +- [Usage](#usage) + - [Publishing messages to a Pub/Sub topic](#publishing-messages-to-a-pubsub-topic) + - [Configuring the publisher](#configuring-the-publisher) + - [Using gRPC (only available on 2.13 or 3.x)](#using-grpc-only-available-on-213-or-3x) + - [Publishing messages asynchronously (in batches)](#publishing-messages-asynchronously-in-batches) + - [Subscribing to a Pub/Sub subscription](#subscribing-to-a-pubsub-subscription) + - [Configuring the subscriber](#configuring-the-subscriber) + - [Using gRPC (only available on 2.13 or 3.x)](#using-grpc-only-available-on-213-or-3x) + - [Creating a raw subscriber](#creating-a-raw-subscriber) + - [Pureconfig integration](#pureconfig-integration) +- [Contributors to this project](#contributors-to-this-project) + +## Installation + +Add the following line to your `build.sbt` file: + +```sbt +libraryDependencies += "com.permutive" %% "fs2-pubsub" % "0.22.0" +``` + +The library is published for Scala versions: `2.12`, `2.13` and `3`. + +## Usage + +To start using the library, you'll need an http4s `Client` with permission to +call Pub/Sub APIs in GCP. You can create one using [`gcp-auth`]: + +```scala +import org.http4s.ember.client.EmberClientBuilder +import cats.effect.IO +import cats.syntax.all._ +import com.permutive.gcp.auth.TokenProvider + +val client = EmberClientBuilder + .default[IO] + .withHttp2 + .build + .mproduct(client => TokenProvider.userAccount(client).toResource) + .map { case (client, tokenProvider) => tokenProvider.clientMiddleware(client) } +``` + + +### Publishing messages to a Pub/Sub topic + +To publish messages to Pub/Sub, you can use the `PubsubPublisher` class: + +```scala +import fs2.pubsub._ + +val publisher: PubSubPublisher[IO, String] = PubSubPublisher + .http[IO, String] + .projectId(ProjectId("my-project")) + .topic(Topic("my-topic")) + .defaultUri + .httpClient(client) + .noRetry +``` + +Then you can use any of the `PubSubPublisher` methods to send messages to Pub/Sub. + +```scala +// Producing a single message + +publisher.publishOne("message") +``` + +```scala +// Producing multiple messages + +val records = List( + PubSubRecord.Publisher("message1"), + PubSubRecord.Publisher("message2"), + PubSubRecord.Publisher("message3") +) + +publisher.publishMany(records) +``` + +```scala +// Producing a message with attributes + +publisher.publishOne("message", "key" -> "value") +``` + +```scala +// Producing a message using the record type + +val record = PubSubRecord.Publisher("message").withAttribute("key", "value") + +publisher.publishOne(record) +``` + +#### Configuring the publisher + +There are several configuration options available for the publisher: + +- `projectId`: The GCP project ID. +- `topic`: The Pub/Sub topic name. +- `uri`: The URI of the Pub/Sub API. By default, it uses the Google Cloud +Pub/Sub API. +- `httpClient`: The http4s `Client` to use for making requests to the +Pub/Sub API. +- `retry`: The retry policy to use when sending messages to Pub/Sub. By +default, it retries up to 3 times with exponential backoff. + +These configurations can either by provided by using a configuration object +(`PubSubPublisher.Config`) or by using the builder pattern. + +#### Using gRPC (only available on 2.13 or 3.x) + +You can use `PubSubPublisher.grpc` to create a publisher that uses gRPC to connect +to Pub/Sub. + +This type of publisher is only available on Scala `2.13` or `3.x`. + +#### Publishing messages asynchronously (in batches) + +In order to publish messages asynchronously, you can use the `PubSubPublisher.Async`. +You can create an instance of this class from a regular `PubSubPublisher` by using the +`batching` method: + +```scala +import cats.effect.Resource +import scala.concurrent.duration._ + +val asyncPublisher: Resource[IO, PubSubPublisher.Async[IO, String]] = + publisher + .batching + .batchSize(10) + .maxLatency(1.second) +``` + +Then you can use any of the `PubSubPublisher.Async` methods to send messages to Pub/Sub. +These methods are the same ones you'll find in the regular `PubSubPublisher`, with +the difference that they return a `F[Unit]` instead of a `F[MessageId]` and that +they expect a `PubSubRecord.Publisher.WithCallback` instead of a regular +`PubSubRecord.Publisher`. + +In order to construct such class you can either use the `PubSubRecord.Publisher.WithCallback` +constructor or use the `withCallback` method on a regular `PubSubRecord.Publisher`: + +```scala +val recordWithCallback = PubSubRecord.Publisher("message").withCallback { _ => + IO(println("Message sent!")) +} +``` + +### Subscribing to a Pub/Sub subscription + +To subscribe to a Pub/Sub subscription, you can use the `PubSubSubscriber` class: + +```scala +import fs2.Stream + +val subscriber: Stream[IO, Option[String]] = PubSubSubscriber + .http[IO] + .projectId(ProjectId("my-project")) + .subscription(Subscription("my-subscription")) + .defaultUri + .httpClient(client) + .noRetry + .noErrorHandling + .withDefaults + .decodeTo[String] + .subscribeAndAck +``` + +#### Configuring the subscriber + +There are several configuration options available for the subscriber: + +- `projectId`: The GCP project ID. +- `subscription`: The Pub/Sub subscription name. +- `uri`: The URI of the Pub/Sub API. By default, it uses the Google Cloud +Pub/Sub API. +- `httpClient`: The http4s `Client` to use for making requests to the +Pub/Sub API. +- `retry`: The retry policy to use when receiving messages from Pub/Sub. By +default, it retries up to 3 times with exponential backoff. +- `errorHandling`: The error handling policy to use when performing operations +such as decoding messages or acknowledging them. +- `batchSize`: The maximum number of messages to acknowledge at once. +- `maxLatency`: The maximum time to wait for a batch of messages before +acknowledging them. +- `maxMessages`: The maximum number of messages to receive in a single batch. +- `readConcurrency`: The number of concurrent reads from the subscription. + +These configurations can either by provided by using a configuration object +(`PubSubSubscriber.Config`) or by using the builder pattern. + +#### Using gRPC (only available on 2.13 or 3.x) + +You can use `PubSubSubscriber.grpc` to create a subscriber that uses gRPC to connect +to Pub/Sub. + +This type of subscriber is only available on Scala `2.13` or `3.x`. + +#### Creating a raw subscriber + +There are two types of subscribers available in the library: raw and decoded. + +The raw subscriber returns the raw message received from Pub/Sub, while the +decoded subscriber decodes the message to a specific type. + +The former is useful when you want to handle the message yourself, while the +latter is useful when you want to work with a specific type. You can create +a raw subscriber by using the `raw` method instead of `decodeTo`. + +### Pureconfig integration + +The library provides a way to load the configuration from a `ConfigSource` using +[`pureconfig`]. + +You just need to add the following line to your `build.sbt` file: + +```sbt +libraryDependencies += "com.permutive" %% "fs2-pubsub-pureconfig" % "0.22.0" +``` + +And then add the following import when you want to use the `pureconfig` integration: + + +```scala +import pureconfig.ConfigSource + +import fs2.pubsub.PubSubPublisher +import fs2.pubsub.pureconfig._ + +val config = ConfigSource.default.loadOrThrow[PubSubPublisher.Config] + +PubSubPublisher + .http[IO, String] + .fromConfig(config) + .httpClient(client) + .noRetry +``` + +## Contributors to this project + +| CremboC | bastewart | TimWSpence | travisbrown | ChristianJohnston97 | chrisjl154 | janstenpickle | +| :--: | :--: | :--: | :--: | :--: | :--: | :--: | +| CremboC | bastewart | TimWSpence | travisbrown | ChristianJohnston97 | chrisjl154 | janstenpickle | + +| marcelocarlos | desbo | kythyra | mcgizzle | istreeter | Joe8Bit | arunas-cesonis | +| :--: | :--: | :--: | :--: | :--: | :--: | :--: | +| marcelocarlos | desbo | kythyra | mcgizzle | istreeter | Joe8Bit | arunas-cesonis | + +[`gcp-auth`]: https://github.com/permutive-engineering/gcp-auth/ +[`pureconfig`]: https://pureconfig.github.io/ \ No newline at end of file