Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

During import, also patch the file content type in S3 #5103

Merged
merged 3 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,8 @@ lazy val ship = project
.settings(
name := "nexus-ship",
moduleName := "nexus-ship",
Test / parallelExecution := false
Test / parallelExecution := false,
addCompilerPlugin(betterMonadicFor)
)
.enablePlugins(UniversalPlugin, JavaAppPackaging, DockerPlugin, BuildInfoPlugin)
.settings(shared, compilation, servicePackaging, assertJavaVersion, coverage, release)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

sealed trait CopyResult extends Product with Serializable

object CopyResult {

final case object Success extends CopyResult

final case object AlreadyExists extends CopyResult

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client

import akka.http.scaladsl.model.ContentType
import cats.effect.{IO, Resource}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, HeadObject, PutObjectRequest}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, HeadObject, PutObjectRequest}
import fs2.Stream
import io.laserdisc.pure.s3.tagless.{Interpreter, S3AsyncClientOp}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, AwsCredentialsProvider, DefaultCredentialsProvider, StaticCredentialsProvider}
Expand Down Expand Up @@ -34,21 +35,23 @@ trait S3StorageClient {
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[Unit]
): IO[CopyResult]

def copyObjectMultiPart(
sourceBucket: String,
sourceKey: String,
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[Unit]
): IO[CopyResult]

def uploadFile(
put: PutObjectRequest,
fileData: Stream[IO, ByteBuffer]
): IO[Unit]

def updateContentType(bucket: String, key: String, contentType: ContentType): IO[Unit]

def objectExists(bucket: String, key: String): IO[Boolean]
def bucketExists(bucket: String): IO[Boolean]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client

import akka.http.scaladsl.model.ContentType
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, HeadObject, PutObjectRequest}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, HeadObject, PutObjectRequest}
import ch.epfl.bluebrain.nexus.delta.sdk.error.ServiceError.FeatureDisabled
import fs2.Stream
import software.amazon.awssdk.services.s3.model._
Expand All @@ -26,7 +27,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[Unit] = raiseDisabledErr
): IO[CopyResult] = raiseDisabledErr

override def objectExists(bucket: String, key: String): IO[Boolean] = raiseDisabledErr

Expand All @@ -35,6 +36,8 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
data: Stream[IO, ByteBuffer]
): IO[Unit] = raiseDisabledErr

override def updateContentType(bucket: String, key: String, contentType: ContentType): IO[Unit] = raiseDisabledErr

override def bucketExists(bucket: String): IO[Boolean] = raiseDisabledErr

override def copyObjectMultiPart(
Expand All @@ -43,7 +46,7 @@ private[client] object S3StorageClientDisabled extends S3StorageClient {
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[Unit] = raiseDisabledErr
): IO[CopyResult] = raiseDisabledErr

override def readFileMultipart(bucket: String, fileKey: String): Stream[IO, Byte] = throw disabledErr
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageNotAccessible
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{checksumAlgorithm, CopyOptions, HeadObject, PutObjectRequest}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{PutObjectRequest, _}
import eu.timepit.refined.refineMV
import eu.timepit.refined.types.string.NonEmptyString
import fs2.Stream
Expand Down Expand Up @@ -62,9 +62,9 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[Unit] =
): IO[CopyResult] =
approveCopy(destinationBucket, destinationKey, options.overwriteTarget).flatMap { approved =>
IO.whenA(approved) {
if (approved) {
val requestBuilder = CopyObjectRequest
.builder()
.sourceBucket(sourceBucket)
Expand All @@ -77,8 +77,8 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
.contentType(contentType.value)
.metadataDirective(MetadataDirective.REPLACE)
}
client.copyObject(requestWithOptions.build()).void
}
client.copyObject(requestWithOptions.build()).as(CopyResult.Success)
} else IO.pure(CopyResult.AlreadyExists)
}

def copyObjectMultiPart(
Expand All @@ -87,11 +87,13 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
destinationBucket: String,
destinationKey: String,
options: CopyOptions
): IO[Unit] =
): IO[CopyResult] =
approveCopy(destinationBucket, destinationKey, options.overwriteTarget).flatMap { approved =>
IO.whenA(approved) {
copyObjectMultiPart(sourceBucket, sourceKey, destinationBucket, destinationKey, options.newContentType)
}
if (approved) {
copyObjectMultiPart(sourceBucket, sourceKey, destinationBucket, destinationKey, options.newContentType).as(
CopyResult.Success
)
} else IO.pure(CopyResult.AlreadyExists)
}

private def copyObjectMultiPart(
Expand Down Expand Up @@ -181,6 +183,20 @@ final private[client] class S3StorageClientImpl(client: S3AsyncClientOp[IO]) ext
.compile
.drain

override def updateContentType(bucket: String, key: String, contentType: ContentType): IO[Unit] = {
val requestBuilder = CopyObjectRequest
.builder()
.sourceBucket(bucket)
.sourceKey(key)
.destinationBucket(bucket)
.destinationKey(key)
.checksumAlgorithm(checksumAlgorithm)
.contentType(contentType.value)
.metadataDirective(MetadataDirective.REPLACE)

client.copyObject(requestBuilder.build()).void
}

override def bucketExists(bucket: String): IO[Boolean] = {
listObjectsV2(bucket)
.redeemWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.cli
import akka.http.scaladsl.model.ContentTypes
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.S3StorageConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, LocalStackS3StorageClient, S3Helpers}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, LocalStackS3StorageClient, S3Helpers}
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import io.laserdisc.pure.s3.tagless.S3AsyncClientOp
import munit.AnyFixture
Expand All @@ -30,9 +30,10 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
givenAFileInABucket(bucket, fileContents) { key =>
val newKey = genString()
for {
_ <- s3StorageClient.copyObject(bucket, key, bucket, newKey, options)
head <- s3StorageClient.headObject(bucket, newKey)
result <- s3StorageClient.copyObject(bucket, key, bucket, newKey, options)
head <- s3StorageClient.headObject(bucket, newKey)
} yield {
assertEquals(result, CopyResult.Success)
assertEquals(head.fileSize, contentLength)
assertEquals(head.contentType, Some(expectedContentType))
}
Expand All @@ -46,9 +47,10 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
givenAFileInABucket(bucket, fileContents) { key =>
val newKey = genString()
for {
_ <- s3StorageClient.copyObject(bucket, key, bucket, newKey, options)
head <- s3StorageClient.headObject(bucket, newKey)
result <- s3StorageClient.copyObject(bucket, key, bucket, newKey, options)
head <- s3StorageClient.headObject(bucket, newKey)
} yield {
assertEquals(result, CopyResult.Success)
assertEquals(head.fileSize, contentLength)
assertEquals(head.contentType, Some(contentType))
}
Expand All @@ -61,10 +63,11 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
val options = CopyOptions(overwriteTarget = false, Some(contentType))
givenFilesInABucket(bucket, fileContents, anotherContent) { case (sourceKey, existingTargetKey) =>
for {
_ <- s3StorageClient.copyObject(bucket, sourceKey, bucket, existingTargetKey, options)
head <- s3StorageClient.headObject(bucket, existingTargetKey)
result <- s3StorageClient.copyObject(bucket, sourceKey, bucket, existingTargetKey, options)
head <- s3StorageClient.headObject(bucket, existingTargetKey)
} yield {
val clue = "The file should not have been overwritten"
assertEquals(result, CopyResult.AlreadyExists)
assertEquals(head.fileSize, anotherContentLength, clue)
assertEquals(head.contentType, Some(expectedContentType), clue)
}
Expand All @@ -77,15 +80,29 @@ class S3StorageClientSuite extends NexusSuite with LocalStackS3StorageClient.Fix
val options = CopyOptions(overwriteTarget = true, Some(contentType))
givenFilesInABucket(bucket, fileContents, anotherContent) { case (sourceKey, existingTargetKey) =>
for {
_ <- s3StorageClient.copyObject(bucket, sourceKey, bucket, existingTargetKey, options)
head <- s3StorageClient.headObject(bucket, existingTargetKey)
result <- s3StorageClient.copyObject(bucket, sourceKey, bucket, existingTargetKey, options)
head <- s3StorageClient.headObject(bucket, existingTargetKey)
} yield {
val clue = "The file should have been overwritten"
assertEquals(result, CopyResult.Success)
assertEquals(head.fileSize, contentLength, clue)
assertEquals(head.contentType, Some(contentType), clue)
}
}
}
}

test("Update the content type of an existing object") {
givenAnS3Bucket { bucket =>
givenAFileInABucket(bucket, fileContents) { key =>
for {
_ <- s3StorageClient.updateContentType(bucket, key, contentType)
head <- s3StorageClient.headObject(bucket, key)
} yield {
assertEquals(head.contentType, Some(contentType))
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@ package ch.epfl.bluebrain.nexus.ship.files
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path
import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy.logError
import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, S3LocationGenerator}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, CopyResult, S3LocationGenerator}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.client.S3StorageClient
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import ch.epfl.bluebrain.nexus.ship.config.FileProcessingConfig
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.CopyResult
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.CopyResult.{CopySkipped, CopySuccess}
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult.{FileCopySkipped, FileCopySuccess}
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import software.amazon.awssdk.services.s3.model.S3Exception

import scala.concurrent.duration.DurationInt

trait FileCopier {

def copyFile(project: ProjectRef, attributes: FileAttributes): IO[CopyResult]
def copyFile(project: ProjectRef, attributes: FileAttributes, forceContentType: Boolean): IO[FileCopyResult]

}

Expand All @@ -37,13 +38,13 @@ object FileCopier {
logError(logger, "s3Copy")
)

sealed trait CopyResult extends Product with Serializable
sealed trait FileCopyResult extends Product with Serializable

object CopyResult {
object FileCopyResult {

final case class CopySuccess(newPath: Uri.Path) extends CopyResult
final case class FileCopySuccess(newPath: Uri.Path) extends FileCopyResult

final case object CopySkipped extends CopyResult
final case object FileCopySkipped extends FileCopyResult

}

Expand All @@ -54,7 +55,7 @@ object FileCopier {
val importBucket = config.importBucket
val targetBucket = config.targetBucket
val locationGenerator = new S3LocationGenerator(config.prefix.getOrElse(Path.Empty))
(project: ProjectRef, attributes: FileAttributes) =>
(project: ProjectRef, attributes: FileAttributes, forceContentType: Boolean) =>
{
val origin = attributes.path
val patchedFileName = if (attributes.filename.isEmpty) "file" else attributes.filename
Expand All @@ -72,11 +73,23 @@ object FileCopier {
s3StorageClient.copyObjectMultiPart(importBucket, originKey, targetBucket, targetKey, copyOptions)
} else
s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey, copyOptions)
}.timed.flatMap { case (duration, _) =>
IO.whenA(duration > longCopyThreshold)(
logger.info(s"Copy file ${attributes.path} of size ${attributes.bytes} took ${duration.toSeconds} seconds.")
)
}
}.flatMap {
case CopyResult.Success => IO.unit
case CopyResult.AlreadyExists =>
IO.whenA(forceContentType) {
attributes.mediaType.traverse { mediaType =>
logger.info(s"Patching to content type $mediaType for file $patchedFileName")
s3StorageClient.updateContentType(targetBucket, targetKey, mediaType)
}.void
}
}.timed
.flatMap { case (duration, _) =>
IO.whenA(duration > longCopyThreshold)(
logger.info(
s"Copy file ${attributes.path} of size ${attributes.bytes} took ${duration.toSeconds} seconds."
)
)
}

for {
isObject <- s3StorageClient.objectExists(importBucket, originKey)
Expand All @@ -87,10 +100,11 @@ object FileCopier {
_ <- IO.whenA(!isFolder && !isObject) {
logger.error(s"$target is neither an object or folder, something is wrong.")
}
} yield if (isObject) CopySuccess(target) else CopySkipped
} yield if (isObject) FileCopySuccess(target) else FileCopySkipped
}.retry(copyRetryStrategy)
}

def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes) => IO.pure(CopySuccess(attributes.path))
def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes, _: Boolean) =>
IO.pure(FileCopySuccess(attributes.path))

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.{ScopedEventLog, Transactors}
import ch.epfl.bluebrain.nexus.ship._
import ch.epfl.bluebrain.nexus.ship.acls.AclWiring.alwaysAuthorize
import ch.epfl.bluebrain.nexus.ship.config.InputConfig
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.CopyResult.{CopySkipped, CopySuccess}
import ch.epfl.bluebrain.nexus.ship.files.FileCopier.FileCopyResult.{FileCopySkipped, FileCopySuccess}
import ch.epfl.bluebrain.nexus.ship.files.FileProcessor.{logger, patchMediaType}
import ch.epfl.bluebrain.nexus.ship.files.FileWiring._
import ch.epfl.bluebrain.nexus.ship.storages.StorageWiring
Expand Down Expand Up @@ -68,27 +68,29 @@ class FileProcessor private (
event match {
case e: FileCreated =>
val attrs = e.attributes
val customMetadata = Some(getCustomMetadata(attrs))
fileCopier.copyFile(e.project, attrs).flatMap {
case CopySuccess(newPath) =>
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val newAttrs = e.attributes.copy(mediaType = newMediaType)
val customMetadata = Some(getCustomMetadata(newAttrs))
fileCopier.copyFile(e.project, newAttrs, attrs.mediaType != newMediaType).flatMap {
case FileCopySuccess(newPath) =>
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
files
.linkFile(Some(event.id), project, None, linkRequest, e.tag)
.as(ImportStatus.Success)
case CopySkipped => IO.pure(ImportStatus.Dropped)
case FileCopySkipped => IO.pure(ImportStatus.Dropped)
}
case e: FileUpdated =>
val attrs = e.attributes
val customMetadata = Some(getCustomMetadata(attrs))
fileCopier.copyFile(e.project, attrs).flatMap {
case CopySuccess(newPath) =>
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
val newMediaType = patchMediaType(attrs.filename, attrs.mediaType)
val newAttrs = e.attributes.copy(mediaType = newMediaType)
val customMetadata = Some(getCustomMetadata(newAttrs))
fileCopier.copyFile(e.project, newAttrs, attrs.mediaType != newMediaType).flatMap {
case FileCopySuccess(newPath) =>
val linkRequest = FileLinkRequest(newPath, newMediaType, customMetadata)
files
.updateLinkedFile(fileId, cRev, None, linkRequest, e.tag)
.as(ImportStatus.Success)
case CopySkipped => IO.pure(ImportStatus.Dropped)
case FileCopySkipped => IO.pure(ImportStatus.Dropped)
}
case e: FileCustomMetadataUpdated =>
files.updateMetadata(fileId, cRev, e.metadata, e.tag).as(ImportStatus.Success)
Expand Down
Loading