Skip to content

Commit

Permalink
Upgrade library to Cats Effect 3
Browse files Browse the repository at this point in the history
  • Loading branch information
sloshy committed Aug 3, 2021
1 parent de4ec44 commit 4e82529
Show file tree
Hide file tree
Showing 28 changed files with 175 additions and 251 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ name: Continuous Integration

on:
pull_request:
branches: ['*']
branches: ['**']
push:
branches: ['*']
branches: ['**']
tags: [v*]

env:
Expand All @@ -33,7 +33,7 @@ jobs:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v10
uses: olafurpg/setup-scala@v12
with:
java-version: ${{ matrix.java }}

Expand All @@ -50,10 +50,10 @@ jobs:
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Check that workflows are up to date
run: sbt ++${{ matrix.scala }} githubWorkflowCheck
run: sbt --client '++${{ matrix.scala }}; githubWorkflowCheck'

- name: Build project
run: sbt ++${{ matrix.scala }} test
run: sbt --client '++${{ matrix.scala }}; test'

- name: Compress target directories
run: tar cf targets.tar modules/s3/target target modules/sqs-refined/target modules/sqs/target modules/s3-testing/target modules/core/target project/target
Expand Down Expand Up @@ -81,7 +81,7 @@ jobs:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v10
uses: olafurpg/setup-scala@v12
with:
java-version: ${{ matrix.java }}

Expand Down Expand Up @@ -122,4 +122,4 @@ jobs:
PGP_SECRET: ${{ secrets.PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
run: sbt ++${{ matrix.scala }} ci-release
run: sbt --client '++${{ matrix.scala }}; ci-release'
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,15 @@ Each one adds enhancements on top of the other, and the base libraries feature b

For example, here is how you create an S3 and SQS client:
```scala
import cats.effect.{Blocker, IO}
import cats.effect.{IO, Resource}
import com.rewardsnetwork.pureaws.s3.PureS3Client
import com.rewardsnetwork.pureaws.sqs.PureSqsClient
import software.amazon.awssdk.regions.Region

Blocker[IO].flatMap { blocker => //Everything needs a Blocker
val region = Region.US_EAST_1
val region = Region.US_EAST_1

val pureS3clientResource = PureS3Client.async[IO](blocker, region)
val pureSQSclientResource = PureSqsClient.async[IO](blocker, region)
}
val pureS3clientResource: Resource[IO, PureS3Client[IO]] = PureS3Client.async[IO](region)
val pureSQSclientResource: Resource[IO, PureSqsClient[IO]] = PureSqsClient.async[IO](region)
```

The "Pure" clients for each library have Sync/Async variants, based on the underlying AWS client.
Expand All @@ -79,7 +77,7 @@ For modularity and separation of concerns, we've separated out the client types
```scala
import com.rewardsnetwork.pureaws.SimpleS3Client

val s3ClientResource: Resource[IO, SimpleS3Client[IO]] = Blocker[IO].flatMap(SimpleS3Client.async[IO](_, region))
val s3ClientResource: Resource[IO, SimpleS3Client[IO]] = SimpleS3Client.async[IO](region)

s3ClientResource.use { client =>
///Access each of the clients within here
Expand All @@ -95,7 +93,7 @@ Perform basic operations on S3 buckets and objects, available at `SimpleS3Client
import com.rewardsnetwork.pureaws.{S3BucketOps, S3ObjectOps}
import software.amazon.awssdk.services.s3.model.BucketLocationConstraint

val pureClient: Resource[IO, PureS3Client[IO]] = Blocker[IO].flatMap(PureS3Client.async[IO](_, region))
val pureClient: Resource[IO, PureS3Client[IO]] = PureS3Client.async[IO](region)
val bucketAndObjectOps = pureClient.map(p => S3BucketOps(p) -> S3ObjectOps(p))

bucketAndObjectOps.use { case (bucketOps, objectOps) =>
Expand All @@ -122,7 +120,7 @@ Write S3 objects using S3 (multipart not currently supported), available at `Sim
```scala
import com.rewardsnetwork.pureaws.S3Sink

val sinkResource: Resource[IO, S3Sink[IO]] = Blocker[IO].flatMap(S3Sink.async[IO](_, region))
val sinkResource: Resource[IO, S3Sink[IO]] = S3Sink.async[IO](_, region)

sinkResource.use { sink =>
Stream("hello", "world", "and all who inhabit it")
Expand All @@ -139,7 +137,7 @@ Stream S3 objects from S3 as bytes, available at `SimpleS3Client#source` or by i
```scala
import com.rewardsnetwork.pureaws.S3Source

val sourceResource: Resource[IO, S3Source[IO]] = Blocker[IO].flatMap(S3Source.async[IO](_, region))
val sourceResource: Resource[IO, S3Source[IO]] = S3Source.async[IO](region)

Stream.resource(sourceResource).flatMap { source =>
//Stream bytes from an object
Expand Down Expand Up @@ -169,7 +167,7 @@ The preferred way to use `pureaws-sqs` is to pull in the Simple client with an A
```scala
import com.rewardsnetwork.pureaws.sqs.SimpleSqsClient

val client: Resource[IO, SimpleSqsClient[IO]] = SimpleSqsClient.async[IO](blocker, region)
val client: Resource[IO, SimpleSqsClient[IO]] = SimpleSqsClient.async[IO](region)
client.use { c =>
c.streamMessages("url-to-my-queue", maxMessages = 10).take(3).compile.drain.as(ExitCode.Success)
}
Expand Down
14 changes: 5 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
//Core deps
val amazonV = "2.16.67"
val amazonV = "2.17.11"
val catsV = "2.6.1"
val catsEffectV = "2.5.1"
val fs2V = "2.5.6"
val catsEffectV = "3.2.1"
val fs2V = "3.0.6"
val log4catsV = "1.2.0"
val refinedV = "0.9.25"
val monixV = "3.4.0"
val collectionCompatV = "2.4.4"
val collectionCompatV = "2.5.0"

val catsCore = "org.typelevel" %% "cats-core" % catsV
val catsEffect = "org.typelevel" %% "cats-effect" % catsEffectV
val catsEffect = "org.typelevel" %% "cats-effect-std" % catsEffectV
val fs2Core = "co.fs2" %% "fs2-core" % fs2V
val fs2Io = "co.fs2" %% "fs2-io" % fs2V
val fs2ReactiveStreams = "co.fs2" %% "fs2-reactive-streams" % fs2V
val awsSdkCore = "software.amazon.awssdk" % "sdk-core" % amazonV
val awsSQS = "software.amazon.awssdk" % "sqs" % amazonV
val awsS3 = "software.amazon.awssdk" % "s3" % amazonV
val refined = "eu.timepit" %% "refined" % refinedV
val monixCatnap = "io.monix" %% "monix-catnap" % monixV
val collectionCompat =
"org.scala-lang.modules" %% "scala-collection-compat" % collectionCompatV

Expand Down Expand Up @@ -107,7 +105,6 @@ lazy val sqs = (project in file("modules/sqs"))
libraryDependencies ++= Seq(
//Core deps
awsSQS,
monixCatnap,
//Test deps
catsEffectLaws
)
Expand All @@ -132,7 +129,6 @@ lazy val s3 = (project in file("modules/s3"))
//Core deps
awsS3,
fs2Io,
monixCatnap,
//Test deps
catsEffectLaws
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package com.rewardsnetwork.pureaws
import java.nio.ByteBuffer
import java.util.concurrent.CompletableFuture

import fs2.Stream
import cats.effect.kernel.Async
import fs2.{Chunk, Stream}
import software.amazon.awssdk.core.async.{AsyncResponseTransformer, SdkPublisher}
import fs2.Chunk
import cats.effect.ConcurrentEffect

/** An implementation of the `AsyncResponseTransformer` interface, but using FS2 Stream */
trait Fs2AsyncResponseTransformer[F[_], A] extends AsyncResponseTransformer[A, (A, Stream[F, Byte])] {
Expand All @@ -22,7 +21,7 @@ trait Fs2AsyncResponseTransformer[F[_], A] extends AsyncResponseTransformer[A, (
object Fs2AsyncResponseTransformer {

/** Creates an `Fs2AsyncResponseTransformer` that returns your response object as well as a stream of bytes. */
def apply[F[_]: ConcurrentEffect, A]: Fs2AsyncResponseTransformer[F, A] =
def apply[F[_]: Async, A]: Fs2AsyncResponseTransformer[F, A] =
new Fs2AsyncResponseTransformer[F, A] {

private val cf: CompletableFuture[Stream[F, Byte]] = new CompletableFuture[Stream[F, Byte]]()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.rewardsnetwork.pureaws

import java.nio.ByteBuffer

import cats.effect.IO
import fs2.interop.reactivestreams._
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.should.Matchers
import software.amazon.awssdk.core.internal.async.SdkPublishers

import scala.concurrent.ExecutionContext.global
import java.nio.ByteBuffer
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
import cats.effect.unsafe.IORuntime

class Fs2AsyncResponseTransformerSpec extends AnyFreeSpec with Matchers with ScalaCheckPropertyChecks {
implicit val cs = IO.contextShift(global)
implicit val timer = IO.timer(global)
implicit val iort = IORuntime.global

"Fs2AsyncResponseTransformer" - {
"should provide the stream of bytes to the user" in {
Expand All @@ -21,17 +21,28 @@ class Fs2AsyncResponseTransformerSpec extends AnyFreeSpec with Matchers with Sca
val exampleStrByteBuf = ByteBuffer.wrap(exampleStrBytes)
val transformer = Fs2AsyncResponseTransformer[IO, String]
val fs2publisher = fs2.Stream.emit[IO, ByteBuffer](exampleStrByteBuf).toUnicastPublisher
val sdkPublisher = SdkPublishers.envelopeWrappedPublisher(fs2publisher, "", "")
val cf = transformer.prepare()

//Use the transformer the way the SDK would
transformer.onResponse(exampleStr)
transformer.onStream(sdkPublisher)
val program = fs2publisher.use { p =>
val sdkPublisher = SdkPublishers.envelopeWrappedPublisher(p, "", "")
val cf = transformer.prepare()

//Use the transformer the way the SDK would
transformer.onResponse(exampleStr)
transformer.onStream(sdkPublisher)

//Get and test results
val (str, byteStream) = cf.get()
str shouldBe exampleStr
byteStream.compile
.to(Array)
.flatMap(bytes =>
IO {
bytes shouldBe exampleStrBytes
}
)
}

//Get and test results
val (str, byteStream) = cf.get()
str shouldBe exampleStr
byteStream.compile.to(Array).unsafeRunSync() shouldBe exampleStrBytes
program.unsafeRunSync()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import java.time.Instant

import cats.data.OptionT
import cats.effect.Sync
import cats.effect.concurrent.Ref
import cats.implicits._
import cats.effect.kernel.Ref
import cats.syntax.all._
import internal.newInstant
import S3TestingBackend._

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.rewardsnetwork.pureaws.s3.testing

import cats._
import cats.implicits._
import cats.syntax.all._
import com.rewardsnetwork.pureaws.s3.{S3BucketPermission, S3BucketOps}
import software.amazon.awssdk.services.s3.model._
import com.rewardsnetwork.pureaws.s3.S3BucketInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.rewardsnetwork.pureaws.s3.testing
import java.time.Instant

import cats._
import cats.implicits._
import cats.syntax.all._
import com.rewardsnetwork.pureaws.s3._
import fs2.Stream
import software.amazon.awssdk.services.s3.model.RequestPayer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.rewardsnetwork.pureaws.s3.testing

import cats.effect.Sync
import cats.implicits._
import cats.syntax.all._
import com.rewardsnetwork.pureaws.s3.S3Sink
import fs2.{Pipe, Stream}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.rewardsnetwork.pureaws.s3.testing

import cats.Applicative
import cats.effect.Sync
import cats.implicits._
import cats.syntax.all._
import com.rewardsnetwork.pureaws.s3.S3Source
import fs2.Stream

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.rewardsnetwork.pureaws.s3.testing

import cats.effect._
import cats.implicits._
import cats.syntax.all._

/** All available test helpers for S3 at once. */
sealed trait TestSimpleS3Client[F[_]] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.rewardsnetwork.pureaws.s3.testing

import java.time.Instant
import cats.effect.Sync

import cats.effect.kernel.Sync

object internal {
def newInstant[F[_]: Sync]: F[Instant] = Sync[F].delay(Instant.now())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package com.rewardsnetwork.pureaws.s3.testing

import java.time.Instant

import cats.Traverse
import cats.effect.unsafe.IORuntime
import cats.effect.IO
import cats.syntax.all._
import org.scalatest.freespec.AnyFreeSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks
import org.scalacheck.Gen
import com.rewardsnetwork.pureaws.s3.S3ObjectInfo
import java.time.Instant
import com.rewardsnetwork.pureaws.s3.S3ObjectListing
import cats.Traverse
import com.rewardsnetwork.pureaws.s3.{S3ObjectInfo, S3ObjectListing}

class TestS3ObjectOpsSpec extends AnyFreeSpec with Matchers with ScalaCheckPropertyChecks {
implicit val iort = IORuntime.global

"TestS3ObjectOps" - {
"listObjectsPaginated" - {
Expand Down
Loading

0 comments on commit 4e82529

Please sign in to comment.