Skip to content

Commit

Permalink
Tidy
Browse files Browse the repository at this point in the history
  • Loading branch information
dantb committed Apr 8, 2024
1 parent 7aa18fc commit c17d447
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,7 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient, storage: S3Stora

(for {
_ <- log(key, s"Checking for object existence")
_ <- getFileAttributes(key).redeemWith(
{
case _: NoSuchKeyException => IO.unit
case e => IO.raiseError(e)
},
_ => IO.raiseError(ResourceAlreadyExists(key))
)
_ <- validateObjectDoesNotExist(key)
_ <- log(key, s"Beginning multipart upload")
maybeEtags <- uploadFileMultipart(fileData, key)
_ <- log(key, s"Finished multipart upload. Etag by part: $maybeEtags")
Expand All @@ -73,15 +67,22 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient, storage: S3Stora
.adaptError { err => UnexpectedSaveError(key, err.getMessage) }
}

private def validateObjectDoesNotExist(key: String) =
getFileAttributes(key).redeemWith(
{
case _: NoSuchKeyException => IO.unit
case e => IO.raiseError(e)
},
_ => IO.raiseError(ResourceAlreadyExists(key))
)

private def convertStream(source: Source[ByteString, Any]): Stream[IO, Byte] =
StreamConverter(
source
.flatMapConcat(x => Source.fromIterator(() => x.iterator))
.mapMaterializedValue(_ => NotUsed)
)

// TODO test etags and file round trip for true multipart uploads.
// It's not clear whether the last value in the stream will be the final aggregate checksum
private def uploadFileMultipart(fileData: Stream[IO, Byte], key: String): IO[List[Option[ETag]]] =
fileData
.through(
Expand Down Expand Up @@ -117,8 +118,8 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient, storage: S3Stora
case Some(onlyPartETag :: Nil) =>
// TODO our tests expect specific values for digests and the only algorithm currently used is SHA-256.
// If we want to continue to check this, but allow for different algorithms, this needs to be abstracted
// in the tests and checked on specific file contents.
// The result will depend on whether we use a multipart upload or a standard put object.
// in the tests and verified for specific file contents.
// The result will als depend on whether we use a multipart upload or a standard put object.
for {
_ <- log(key, s"Received ETag for single part upload: $onlyPartETag")
fileSize <- computeSize(bytes)
Expand Down Expand Up @@ -161,8 +162,5 @@ final class S3StorageSaveFile(s3StorageClient: S3StorageClient, storage: S3Stora

private def raiseUnexpectedErr[A](key: String, msg: String): IO[A] = IO.raiseError(UnexpectedSaveError(key, msg))

private def log(key: String, msg: String) = {
val thing = s"Bucket: ${bucket.value}. Key: $key. $msg"
logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg") >> IO.println(thing)
}
private def log(key: String, msg: String) = logger.info(s"Bucket: ${bucket.value}. Key: $key. $msg")
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ object S3StorageClient {

def readFile(bucket: String, fileKey: String): fs2.Stream[IO, Byte] =
for {
bk <- Stream.fromEither[IO].apply(refineV[NonEmpty](bucket).leftMap(e => new IllegalArgumentException(e)))
fk <- Stream.fromEither[IO].apply(refineV[NonEmpty](fileKey).leftMap(e => new IllegalArgumentException(e)))
bk <- Stream.fromEither[IO](refineV[NonEmpty](bucket).leftMap(e => new IllegalArgumentException(e)))
fk <- Stream.fromEither[IO](refineV[NonEmpty](fileKey).leftMap(e => new IllegalArgumentException(e)))
bytes <- s3.readFile(BucketName(bk), FileKey(fk))
} yield bytes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, DeleteBuck

import scala.concurrent.duration.{Duration, DurationInt}

class S3StorageAccessSpecLocalStack
class S3StorageAccessSpec
extends NexusSuite
with StorageFixtures
with LocalStackS3StorageClient.Fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.UUID
import scala.concurrent.duration.{Duration, DurationInt}
import scala.jdk.CollectionConverters.ListHasAsScala

class S3StorageFetchSaveSpecLocalStack
class S3StorageFetchSaveSpec
extends NexusSuite
with StorageFixtures
with ActorSystemSetup.Fixture
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion tests/docker/config/delta-postgres.conf
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ plugins {
}

remote-disk {
enabled = false
enabled = true
credentials {
type: "client-credentials"
user: "delta"
Expand Down

0 comments on commit c17d447

Please sign in to comment.