diff --git a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala index db6f66f82d..4bf436a6e8 100644 --- a/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala +++ b/ship/src/main/scala/ch/epfl/bluebrain/nexus/ship/files/FileCopier.scala @@ -3,7 +3,8 @@ package ch.epfl.bluebrain.nexus.ship.files import akka.http.scaladsl.model.Uri import akka.http.scaladsl.model.Uri.Path import cats.effect.IO -import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategy.logError +import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy} import ch.epfl.bluebrain.nexus.delta.kernel.utils.UrlUtils import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{CopyOptions, S3LocationGenerator} @@ -12,6 +13,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.ship.config.FileProcessingConfig import ch.epfl.bluebrain.nexus.ship.files.FileCopier.CopyResult import ch.epfl.bluebrain.nexus.ship.files.FileCopier.CopyResult.{CopySkipped, CopySuccess} +import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ +import software.amazon.awssdk.services.s3.model.S3Exception import scala.concurrent.duration.DurationInt @@ -27,6 +30,13 @@ object FileCopier { private val longCopyThreshold = 5.seconds + private val copyRetryStrategy: RetryStrategy[S3Exception] = RetryStrategy.constant( + 30.seconds, + 10, + e => e.statusCode() >= 500 && e.statusCode() < 600, + logError(logger, "s3Copy") + ) + sealed trait CopyResult extends Product with Serializable object CopyResult { @@ -44,40 +54,41 @@ object FileCopier { val importBucket = config.importBucket val targetBucket = config.targetBucket val locationGenerator = new S3LocationGenerator(config.prefix.getOrElse(Path.Empty)) - (project: ProjectRef, attributes: FileAttributes) => { - val origin = attributes.path - val patchedFileName = if (attributes.filename.isEmpty) "file" else attributes.filename - val target = locationGenerator.file(project, attributes.uuid, patchedFileName).path - val FIVE_GB = 5_000_000_000L - - val originKey = UrlUtils.decode(origin) - val targetKey = UrlUtils.decode(target) - - val copyOptions = CopyOptions(overwriteTarget = false, attributes.mediaType) - - def copy = { - if (attributes.bytes >= FIVE_GB) { - logger.info(s"Attempting to copy a large file from $importBucket/$originKey to $targetBucket/$targetKey") >> - s3StorageClient.copyObjectMultiPart(importBucket, originKey, targetBucket, targetKey, copyOptions) - } else - s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey, copyOptions) - }.timed.flatMap { case (duration, _) => - IO.whenA(duration > longCopyThreshold)( - logger.info(s"Copy file ${attributes.path} of size ${attributes.bytes} took ${duration.toSeconds} seconds.") - ) - } - - for { - isObject <- s3StorageClient.objectExists(importBucket, originKey) - isFolder <- - if (isObject) IO.pure(false) else s3StorageClient.listObjectsV2(importBucket, originKey).map(_.hasContents) - _ <- IO.whenA(isObject) { copy } - _ <- IO.whenA(isFolder) { logger.info(s"$target has been found to be a folder, skipping the file copy...") } - _ <- IO.whenA(!isFolder && !isObject) { - logger.error(s"$target is neither an object or folder, something is wrong.") - } - } yield if (isObject) CopySuccess(target) else CopySkipped - } + (project: ProjectRef, attributes: FileAttributes) => + { + val origin = attributes.path + val patchedFileName = if (attributes.filename.isEmpty) "file" else attributes.filename + val target = locationGenerator.file(project, attributes.uuid, patchedFileName).path + val FIVE_GB = 5_000_000_000L + + val originKey = UrlUtils.decode(origin) + val targetKey = UrlUtils.decode(target) + + val copyOptions = CopyOptions(overwriteTarget = false, attributes.mediaType) + + def copy = { + if (attributes.bytes >= FIVE_GB) { + logger.info(s"Attempting to copy a large file from $importBucket/$originKey to $targetBucket/$targetKey") >> + s3StorageClient.copyObjectMultiPart(importBucket, originKey, targetBucket, targetKey, copyOptions) + } else + s3StorageClient.copyObject(importBucket, originKey, targetBucket, targetKey, copyOptions) + }.timed.flatMap { case (duration, _) => + IO.whenA(duration > longCopyThreshold)( + logger.info(s"Copy file ${attributes.path} of size ${attributes.bytes} took ${duration.toSeconds} seconds.") + ) + } + + for { + isObject <- s3StorageClient.objectExists(importBucket, originKey) + isFolder <- + if (isObject) IO.pure(false) else s3StorageClient.listObjectsV2(importBucket, originKey).map(_.hasContents) + _ <- IO.whenA(isObject) { copy } + _ <- IO.whenA(isFolder) { logger.info(s"$target has been found to be a folder, skipping the file copy...") } + _ <- IO.whenA(!isFolder && !isObject) { + logger.error(s"$target is neither an object or folder, something is wrong.") + } + } yield if (isObject) CopySuccess(target) else CopySkipped + }.retry(copyRetryStrategy) } def apply(): FileCopier = (_: ProjectRef, attributes: FileAttributes) => IO.pure(CopySuccess(attributes.path))