diff --git a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/S3TestingBackend.scala b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/S3TestingBackend.scala index 3442eb6..d29159b 100644 --- a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/S3TestingBackend.scala +++ b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/S3TestingBackend.scala @@ -17,6 +17,8 @@ trait S3TestingBackend[F[_]] { } object S3TestingBackend { + + /** An S3 testing backend that runs in-memory using a concurrently available Map. */ def inMemory[F[_]: Sync] = Ref[F].of(Map.empty[(String, String), (Map[String, String], Array[Byte])]).map { ref => new S3TestingBackend[F] { diff --git a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3ObjectOps.scala b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3ObjectOps.scala index 50e9c63..e01ceee 100644 --- a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3ObjectOps.scala +++ b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3ObjectOps.scala @@ -4,18 +4,33 @@ import cats._ import cats.implicits._ import com.rewardsnetwork.pureaws.s3.S3ObjectOps -class TestS3ObjectOps[F[_]](backend: S3TestingBackend[F])(implicit F: MonadError[F, Throwable]) extends S3ObjectOps[F] { - def copy(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] = { +/** A test utility for integrating with the `S3ObjectOps` algebra. + * + * @param backend Your `S3TestingBackend`. + * @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery. + */ +class TestS3ObjectOps[F[_]](backend: S3TestingBackend[F], failWith: Option[Throwable] = none)(implicit + F: MonadError[F, Throwable] +) extends S3ObjectOps[F] { + + private def doOrFail[A](fa: F[A]): F[A] = failWith match { + case Some(t) => F.raiseError(t) + case None => fa + } + + def copy(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] = doOrFail { backend.get(oldBucket, oldKey).flatMap { case None => F.raiseError(new Exception("Object not found")) case Some((meta, obj)) => backend.put(newBucket, newKey, obj, meta) } } - def delete(bucket: String, key: String): F[Unit] = + def delete(bucket: String, key: String): F[Unit] = doOrFail { backend.delete(bucket, key) + } - def move(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] = + def move(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] = doOrFail { copy(oldBucket, oldKey, newBucket, newKey) >> delete(oldBucket, oldKey) + } } diff --git a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Sink.scala b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Sink.scala index 13a745a..74506e9 100644 --- a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Sink.scala +++ b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Sink.scala @@ -1,20 +1,28 @@ package com.rewardsnetwork.pureaws.s3.testing import cats.effect.Sync +import cats.implicits._ import com.rewardsnetwork.pureaws.s3.S3Sink -import fs2.Pipe +import fs2.{Pipe, Stream} /** A test utility for integrating with the `S3Sink` algebra. * * @param backend Your `S3TestingBackend`. - * @param emitEtag An iterator of strings for sequentially assigning eTags. + * @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery. + * @param emitEtag An iterator of strings for sequentially assigning eTags. Defaults to the sequence 0, 1, 2... as strings. */ class TestS3Sink[F[_]: Sync]( backend: S3TestingBackend[F], + failWith: Option[Throwable] = none, emitEtag: Iterator[String] = Iterator.from(0).map(_.toString) ) extends S3Sink[F] { - def writeText(bucket: String, key: String): Pipe[F, Byte, String] = { s => + private def doOrFail[A](pipe: Pipe[F, Byte, String]): Pipe[F, Byte, String] = failWith match { + case Some(t) => _ >> Stream.raiseError[F](t) + case None => pipe + } + + def writeText(bucket: String, key: String): Pipe[F, Byte, String] = doOrFail { s => s.chunkAll.map(_.toByteBuffer.array).evalMap(backend.put(bucket, key, _)).as(emitEtag.next()) } diff --git a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Source.scala b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Source.scala index af00be4..00cece4 100644 --- a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Source.scala +++ b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestS3Source.scala @@ -6,23 +6,42 @@ import cats.implicits._ import com.rewardsnetwork.pureaws.s3.S3Source import fs2.Stream -class TestS3Source[F[_]: Sync](backend: S3TestingBackend[F]) extends S3Source[F] { - def readObject(bucket: String, key: String): Stream[F, Byte] = +/** A test utility for integrating with the `S3Source` algebra. + * + * @param backend Your `S3TestingBackend`. + * @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery. + */ +class TestS3Source[F[_]: Sync](backend: S3TestingBackend[F], failWith: Option[Throwable] = none) extends S3Source[F] { + private def doOrFail[A](fa: F[A]): F[A] = failWith match { + case Some(t) => Sync[F].raiseError(t) + case None => fa + } + + private def doOrFailStream[A](fa: Stream[F, A]): Stream[F, A] = failWith match { + case Some(t) => Stream.raiseError[F](t) + case None => fa + } + + def readObject(bucket: String, key: String): Stream[F, Byte] = doOrFailStream { Stream.eval(readObjectWithMetadata(bucket, key)).flatMap(_._2) + } - def readWholeObject(bucket: String, key: String)(implicit F: Applicative[F]): F[Array[Byte]] = + def readWholeObject(bucket: String, key: String)(implicit F: Applicative[F]): F[Array[Byte]] = doOrFail { readObject(bucket, key).compile.to(Array) + } def readObjectWithMetadata(bucket: String, key: String)(implicit F: Applicative[F] - ): F[(Map[String, String], Stream[F, Byte])] = + ): F[(Map[String, String], Stream[F, Byte])] = doOrFail { backend.get(bucket, key).flatMap { case Some((meta, payload)) => (meta -> Stream.emits(payload.toList).covary[F]).pure[F] case None => Sync[F].raiseError(new Exception("Object not found")) } + } def readWholeObjectWithMetadata(bucket: String, key: String)(implicit F: Applicative[F] - ): F[(Map[String, String], Array[Byte])] = + ): F[(Map[String, String], Array[Byte])] = doOrFail { readObjectWithMetadata(bucket, key).flatMap { case (meta, stream) => stream.compile.to(Array).tupleLeft(meta) } + } } diff --git a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestSimpleS3Client.scala b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestSimpleS3Client.scala index 3a332de..083be85 100644 --- a/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestSimpleS3Client.scala +++ b/modules/s3-testing/src/main/scala/com/rewardsnetwork/pureaws/s3/testing/TestSimpleS3Client.scala @@ -1,8 +1,9 @@ package com.rewardsnetwork.pureaws.s3.testing import cats.effect._ +import cats.implicits._ -/** An amalgamation of all available S3 algebras in one client. */ +/** All available test helpers for S3 at once. */ sealed trait TestSimpleS3Client[F[_]] { def ops: TestS3ObjectOps[F] def sink: TestS3Sink[F] @@ -13,10 +14,13 @@ object TestSimpleS3Client { /** Constructs a `SimpleS3Client` using an existing `PureS3Client` for some `F[_]`. * Gives you access to all available algebras for the S3 client in one place. + * + * @param backend Your `S3TestingBackend`. + * @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery. */ - def apply[F[_]: Sync](backend: S3TestingBackend[F]) = new TestSimpleS3Client[F] { - val ops = new TestS3ObjectOps(backend) - val sink = new TestS3Sink(backend) - val source = new TestS3Source(backend) + def apply[F[_]: Sync](backend: S3TestingBackend[F], failWith: Option[Throwable] = none) = new TestSimpleS3Client[F] { + val ops = new TestS3ObjectOps(backend, failWith) + val sink = new TestS3Sink(backend, failWith) + val source = new TestS3Source(backend, failWith) } }