Skip to content

Commit

Permalink
Merge branch 'feature/s3-test-error-handling' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
sloshy committed Jan 23, 2021
2 parents cf7128a + 76f0a00 commit 8a92a35
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ trait S3TestingBackend[F[_]] {
}

object S3TestingBackend {

/** An S3 testing backend that runs in-memory using a concurrently available Map. */
def inMemory[F[_]: Sync] =
Ref[F].of(Map.empty[(String, String), (Map[String, String], Array[Byte])]).map { ref =>
new S3TestingBackend[F] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,33 @@ import cats._
import cats.implicits._
import com.rewardsnetwork.pureaws.s3.S3ObjectOps

class TestS3ObjectOps[F[_]](backend: S3TestingBackend[F])(implicit F: MonadError[F, Throwable]) extends S3ObjectOps[F] {
def copy(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] = {
/** A test utility for integrating with the `S3ObjectOps` algebra.
*
* @param backend Your `S3TestingBackend`.
* @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery.
*/
class TestS3ObjectOps[F[_]](backend: S3TestingBackend[F], failWith: Option[Throwable] = none)(implicit
F: MonadError[F, Throwable]
) extends S3ObjectOps[F] {

private def doOrFail[A](fa: F[A]): F[A] = failWith match {
case Some(t) => F.raiseError(t)
case None => fa
}

def copy(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] = doOrFail {
backend.get(oldBucket, oldKey).flatMap {
case None => F.raiseError(new Exception("Object not found"))
case Some((meta, obj)) => backend.put(newBucket, newKey, obj, meta)
}
}

def delete(bucket: String, key: String): F[Unit] =
def delete(bucket: String, key: String): F[Unit] = doOrFail {
backend.delete(bucket, key)
}

def move(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] =
def move(oldBucket: String, oldKey: String, newBucket: String, newKey: String): F[Unit] = doOrFail {
copy(oldBucket, oldKey, newBucket, newKey) >> delete(oldBucket, oldKey)
}

}
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package com.rewardsnetwork.pureaws.s3.testing

import cats.effect.Sync
import cats.implicits._
import com.rewardsnetwork.pureaws.s3.S3Sink
import fs2.Pipe
import fs2.{Pipe, Stream}

/** A test utility for integrating with the `S3Sink` algebra.
*
* @param backend Your `S3TestingBackend`.
* @param emitEtag An iterator of strings for sequentially assigning eTags.
* @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery.
* @param emitEtag An iterator of strings for sequentially assigning eTags. Defaults to the sequence 0, 1, 2... as strings.
*/
class TestS3Sink[F[_]: Sync](
backend: S3TestingBackend[F],
failWith: Option[Throwable] = none,
emitEtag: Iterator[String] = Iterator.from(0).map(_.toString)
) extends S3Sink[F] {

def writeText(bucket: String, key: String): Pipe[F, Byte, String] = { s =>
private def doOrFail[A](pipe: Pipe[F, Byte, String]): Pipe[F, Byte, String] = failWith match {
case Some(t) => _ >> Stream.raiseError[F](t)
case None => pipe
}

def writeText(bucket: String, key: String): Pipe[F, Byte, String] = doOrFail { s =>
s.chunkAll.map(_.toByteBuffer.array).evalMap(backend.put(bucket, key, _)).as(emitEtag.next())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,42 @@ import cats.implicits._
import com.rewardsnetwork.pureaws.s3.S3Source
import fs2.Stream

class TestS3Source[F[_]: Sync](backend: S3TestingBackend[F]) extends S3Source[F] {
def readObject(bucket: String, key: String): Stream[F, Byte] =
/** A test utility for integrating with the `S3Source` algebra.
*
* @param backend Your `S3TestingBackend`.
* @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery.
*/
class TestS3Source[F[_]: Sync](backend: S3TestingBackend[F], failWith: Option[Throwable] = none) extends S3Source[F] {
private def doOrFail[A](fa: F[A]): F[A] = failWith match {
case Some(t) => Sync[F].raiseError(t)
case None => fa
}

private def doOrFailStream[A](fa: Stream[F, A]): Stream[F, A] = failWith match {
case Some(t) => Stream.raiseError[F](t)
case None => fa
}

def readObject(bucket: String, key: String): Stream[F, Byte] = doOrFailStream {
Stream.eval(readObjectWithMetadata(bucket, key)).flatMap(_._2)
}

def readWholeObject(bucket: String, key: String)(implicit F: Applicative[F]): F[Array[Byte]] =
def readWholeObject(bucket: String, key: String)(implicit F: Applicative[F]): F[Array[Byte]] = doOrFail {
readObject(bucket, key).compile.to(Array)
}

def readObjectWithMetadata(bucket: String, key: String)(implicit
F: Applicative[F]
): F[(Map[String, String], Stream[F, Byte])] =
): F[(Map[String, String], Stream[F, Byte])] = doOrFail {
backend.get(bucket, key).flatMap {
case Some((meta, payload)) => (meta -> Stream.emits(payload.toList).covary[F]).pure[F]
case None => Sync[F].raiseError(new Exception("Object not found"))
}
}

def readWholeObjectWithMetadata(bucket: String, key: String)(implicit
F: Applicative[F]
): F[(Map[String, String], Array[Byte])] =
): F[(Map[String, String], Array[Byte])] = doOrFail {
readObjectWithMetadata(bucket, key).flatMap { case (meta, stream) => stream.compile.to(Array).tupleLeft(meta) }
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.rewardsnetwork.pureaws.s3.testing

import cats.effect._
import cats.implicits._

/** An amalgamation of all available S3 algebras in one client. */
/** All available test helpers for S3 at once. */
sealed trait TestSimpleS3Client[F[_]] {
def ops: TestS3ObjectOps[F]
def sink: TestS3Sink[F]
Expand All @@ -13,10 +14,13 @@ object TestSimpleS3Client {

/** Constructs a `SimpleS3Client` using an existing `PureS3Client` for some `F[_]`.
* Gives you access to all available algebras for the S3 client in one place.
*
* @param backend Your `S3TestingBackend`.
* @param failWith An optional `Throwable` that you would like all requests to fail with, to test error recovery.
*/
def apply[F[_]: Sync](backend: S3TestingBackend[F]) = new TestSimpleS3Client[F] {
val ops = new TestS3ObjectOps(backend)
val sink = new TestS3Sink(backend)
val source = new TestS3Source(backend)
def apply[F[_]: Sync](backend: S3TestingBackend[F], failWith: Option[Throwable] = none) = new TestSimpleS3Client[F] {
val ops = new TestS3ObjectOps(backend, failWith)
val sink = new TestS3Sink(backend, failWith)
val source = new TestS3Source(backend, failWith)
}
}

0 comments on commit 8a92a35

Please sign in to comment.