Skip to content

Commit

Permalink
Do not set up the content type with it is already correctly set (#5131)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Sep 4, 2024
1 parent 823d072 commit f3d3e34
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 48 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

sealed trait S3OperationResult extends Product with Serializable

object S3OperationResult {

final case object Success extends S3OperationResult

final case object AlreadyExists extends S3OperationResult

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.cli
import akka.http.scaladsl.model.ContentType
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, HeadObject, PutObjectRequest}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, HeadObject, PutObjectRequest, S3OperationResult}
import fs2.Stream
import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, DefaultCredentialsProvider, StaticCredentialsProvider}
Expand Down Expand Up @@ -35,22 +35,22 @@ trait S3StorageClient {
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[CopyResult]
): IO[S3OperationResult]

def copyObjectMultiPart(
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[CopyResult]
): IO[S3OperationResult]

def uploadFile(
put: PutObjectRequest,
fileData: Stream[IO, ByteBuffer]
): IO[Unit]

def updateContentType(bucket: String, key: String, contentType: ContentType): IO[Unit]
def updateContentType(bucket: String, key: String, contentType: ContentType): IO[S3OperationResult]

def objectExists(bucket: String, key: String): IO[Boolean]
def bucketExists(bucket: String): IO[Boolean]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.cli

import akka.http.scaladsl.model.ContentType
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, HeadObject, PutObjectRequest}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, HeadObject, PutObjectRequest, S3OperationResult}
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled
import fs2.Stream
import software.amazon.awssdk.services.s3.model._
Expand All @@ -27,7 +27,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[CopyResult] = raiseDisabledErr
): IO[S3OperationResult] = raiseDisabledErr

override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr

Expand All @@ -36,7 +36,8 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
data: Stream[IO, ByteBuffer]
): IO[Unit] = raiseDisabledErr

override def updateContentType(bucket: String, key: String, contentType: ContentType): IO[Unit] = raiseDisabledErr
override def updateContentType(bucket: String, key: String, contentType: ContentType): IO[S3OperationResult] =
raiseDisabledErr

override def bucketExists(bucket: String): IO[Boolean] = raiseDisabledErr

Expand All @@ -46,7 +47,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[CopyResult] = raiseDisabledErr
): IO[S3OperationResult] = raiseDisabledErr

override def readFileMultipart(bucket: String, fileKey: String): Stream[IO, Byte] = throw disabledErr
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[CopyResult] =
): IO[S3OperationResult] =
approveCopy(destinationBucket, destinationKey, options.overwriteTarget).flatMap { approved =>
if (approved) {
val requestBuilder = CopyObjectRequest
Expand All @@ -77,8 +77,8 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
.contentType(contentType.value)
.metadataDirective(MetadataDirective.REPLACE)
}
client.copyObject(requestWithOptions.build()).as(CopyResult.Success)
} else IO.pure(CopyResult.AlreadyExists)
client.copyObject(requestWithOptions.build()).as(S3OperationResult.Success)
} else IO.pure(S3OperationResult.AlreadyExists)
}

def copyObjectMultiPart(
Expand All @@ -87,13 +87,13 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[CopyResult] =
): IO[S3OperationResult] =
approveCopy(destinationBucket, destinationKey, options.overwriteTarget).flatMap { approved =>
if (approved) {
copyObjectMultiPart(sourceBucket, sourceKey, destinationBucket, destinationKey, options.newContentType).as(
CopyResult.Success
S3OperationResult.Success
)
} else IO.pure(CopyResult.AlreadyExists)
} else IO.pure(S3OperationResult.AlreadyExists)
}

private def copyObjectMultiPart(
Expand Down Expand Up @@ -183,19 +183,21 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
.compile
.drain

override def updateContentType(bucket: String, key: String, contentType: ContentType): IO[Unit] = {
val requestBuilder = CopyObjectRequest
.builder()
.sourceBucket(bucket)
.sourceKey(key)
.destinationBucket(bucket)
.destinationKey(key)
.checksumAlgorithm(checksumAlgorithm)
.contentType(contentType.value)
.metadataDirective(MetadataDirective.REPLACE)

client.copyObject(requestBuilder.build()).void
}
override def updateContentType(bucket: String, key: String, contentType: ContentType): IO[S3OperationResult] =
headObject(bucket, key).flatMap {
case head if head.contentType.contains(contentType) => IO.pure(S3OperationResult.AlreadyExists)
case _ =>
val requestBuilder = CopyObjectRequest
.builder()
.sourceBucket(bucket)
.sourceKey(key)
.destinationBucket(bucket)
.destinationKey(key)
.checksumAlgorithm(checksumAlgorithm)
.contentType(contentType.value)
.metadataDirective(MetadataDirective.REPLACE)
client.copyObject(requestBuilder.build()).as(S3OperationResult.Success)
}

override def bucketExists(bucket: String): IO[Boolean] = {
listObjectsV2(bucket)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.cli
import akka.http.scaladsl.model.ContentTypes
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, LocalStackS3StorageClient, S3Helpers}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, LocalStackS3StorageClient, S3Helpers, S3OperationResult}
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import munit.AnyFixture
Expand Down Expand Up @@ -33,7 +33,7 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
result <- s3StorageClient.copyObject(bucket, key, bucket, newKey, options)
head <- s3StorageClient.headObject(bucket, newKey)
} yield {
assertEquals(result, CopyResult.Success)
assertEquals(result, S3OperationResult.Success)
assertEquals(head.fileSize, contentLength)
assertEquals(head.contentType, Some(expectedContentType))
}
Expand All @@ -50,7 +50,7 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
result <- s3StorageClient.copyObject(bucket, key, bucket, newKey, options)
head <- s3StorageClient.headObject(bucket, newKey)
} yield {
assertEquals(result, CopyResult.Success)
assertEquals(result, S3OperationResult.Success)
assertEquals(head.fileSize, contentLength)
assertEquals(head.contentType, Some(contentType))
}
Expand All @@ -67,7 +67,7 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
head <- s3StorageClient.headObject(bucket, existingTargetKey)
} yield {
val clue = "The file should not have been overwritten"
assertEquals(result, CopyResult.AlreadyExists)
assertEquals(result, S3OperationResult.AlreadyExists)
assertEquals(head.fileSize, anotherContentLength, clue)
assertEquals(head.contentType, Some(expectedContentType), clue)
}
Expand All @@ -84,7 +84,7 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
head <- s3StorageClient.headObject(bucket, existingTargetKey)
} yield {
val clue = "The file should have been overwritten"
assertEquals(result, CopyResult.Success)
assertEquals(result, S3OperationResult.Success)
assertEquals(head.fileSize, contentLength, clue)
assertEquals(head.contentType, Some(contentType), clue)
}
Expand All @@ -96,13 +96,29 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
givenAnS3Bucket { bucket =>
givenAFileInABucket(bucket, fileContents) { key =>
for {
_ <- s3StorageClient.updateContentType(bucket, key, contentType)
head <- s3StorageClient.headObject(bucket, key)
result <- s3StorageClient.updateContentType(bucket, key, contentType)
head <- s3StorageClient.headObject(bucket, key)
} yield {
assertEquals(result, S3OperationResult.Success)
assertEquals(head.contentType, Some(contentType))
}
}
}
}

test("Do not update the content type of an existing object if it is already set to this value") {
val originalContentType = ContentTypes.`text/plain(UTF-8)`
givenAnS3Bucket { bucket =>
givenAFileInABucket(bucket, fileContents) { key =>
for {
result <- s3StorageClient.updateContentType(bucket, key, originalContentType)
head <- s3StorageClient.headObject(bucket, key)
} yield {
assertEquals(result, S3OperationResult.AlreadyExists)
assertEquals(head.contentType, Some(originalContentType))
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy.logError
import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, S3LocationGenerator}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, S3LocationGenerator, S3OperationResult}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.FileProcessingConfig
Expand Down Expand Up @@ -74,8 +74,8 @@ object FileCopier {
} else
s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey, copyOptions)
}.flatMap {
case CopyResult.Success => IO.unit
case CopyResult.AlreadyExists =>
case S3OperationResult.Success => IO.unit
case S3OperationResult.AlreadyExists =>
IO.whenA(forceContentType) {
attributes.mediaType.traverse { mediaType =>
logger.info(s"Patching to content type $mediaType for file $patchedFileName") >>
Expand Down

0 comments on commit f3d3e34

Please sign in to comment.