Skip to content

Commit

Permalink
Fix upload empty files on remote storage (#3241)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored May 9, 2022
1 parent 0d37d43 commit 163b273
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.files

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{EntityStreamSizeException, ExceptionWithErrorInfo, HttpEntity, Multipart}
import akka.http.scaladsl.model._
import akka.http.scaladsl.server._
import akka.http.scaladsl.unmarshalling.Unmarshaller.UnsupportedContentTypeException
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
Expand All @@ -11,7 +11,6 @@ import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileRejection.{FileTooLarge, InvalidMultipartFieldName, WrappedAkkaRejection}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileDescription, FileRejection}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import monix.bio.IO
import monix.execution.Scheduler

Expand All @@ -29,14 +28,14 @@ sealed trait FormDataExtractor {
* @param storageAvailableSpace
* the remaining available space on the storage
* @return
* the file description plus the stream of [[ByteString]] with the file content
* the file description plus the entity with the file content
*/
def apply(
id: Iri,
entity: HttpEntity,
maxFileSize: Long,
storageAvailableSpace: Option[Long]
): IO[FileRejection, (FileDescription, AkkaSource)]
): IO[FileRejection, (FileDescription, BodyPartEntity)]
}
object FormDataExtractor {

Expand All @@ -53,7 +52,7 @@ object FormDataExtractor {
entity: HttpEntity,
maxFileSize: Long,
storageAvailableSpace: Option[Long]
): IO[FileRejection, (FileDescription, AkkaSource)] = {
): IO[FileRejection, (FileDescription, BodyPartEntity)] = {
val sizeLimit = Math.min(storageAvailableSpace.getOrElse(Long.MaxValue), maxFileSize)
IO.deferFuture(um(entity.withSizeLimit(sizeLimit)))
.mapError {
Expand All @@ -76,7 +75,7 @@ object FormDataExtractor {
.mapAsync(parallelism = 1) {
case part if part.name == fieldName =>
FileDescription(part.filename.getOrElse("file"), part.entity.contentType).runToFuture.map { desc =>
Some(desc -> part.entity.dataBytes)
Some(desc -> part.entity)
}
case part =>
part.entity.discardBytes().future.as(None)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileAttributes, FileDescription}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
Expand All @@ -24,10 +23,10 @@ trait SaveFile {
*
* @param description
* the file description
* @param source
* the file stream
* @param entity
* the entity with the file content
*/
def apply(description: FileDescription, source: AkkaSource): IO[SaveFileRejection, FileAttributes]
def apply(description: FileDescription, entity: BodyPartEntity): IO[SaveFileRejection, FileAttributes]
}

object SaveFile {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.stream.scaladsl.FileIO
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileAttributes, FileDescription}
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.DiskStorage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.{digestSink, intermediateFolders}
Expand All @@ -15,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.utils.SinkUtils
import monix.bio.IO

import java.nio.file.StandardOpenOption._
import java.nio.file.{FileAlreadyExistsException, Files, OpenOption, Path, Paths}
import java.nio.file._
import java.util.UUID
import scala.concurrent.Future
import scala.util.Try
Expand All @@ -26,26 +25,28 @@ final class DiskStorageSaveFile(storage: DiskStorage)(implicit as: ActorSystem)

private val openOpts: Set[OpenOption] = Set(CREATE_NEW, WRITE)

override def apply(description: FileDescription, source: AkkaSource): IO[SaveFileRejection, FileAttributes] =
override def apply(description: FileDescription, entity: BodyPartEntity): IO[SaveFileRejection, FileAttributes] =
initLocation(description.uuid, description.filename).flatMap { case (fullPath, relativePath) =>
IO.deferFuture(
source.runWith(SinkUtils.combineMat(digestSink(storage.value.algorithm), FileIO.toPath(fullPath, openOpts)) {
case (digest, ioResult) if fullPath.toFile.exists() =>
Future.successful(
FileAttributes(
uuid = description.uuid,
location = Uri(fullPath.toUri.toString),
path = relativePath,
filename = description.filename,
mediaType = description.mediaType,
bytes = ioResult.count,
digest = digest,
origin = Client
entity.dataBytes.runWith(
SinkUtils.combineMat(digestSink(storage.value.algorithm), FileIO.toPath(fullPath, openOpts)) {
case (digest, ioResult) if fullPath.toFile.exists() =>
Future.successful(
FileAttributes(
uuid = description.uuid,
location = Uri(fullPath.toUri.toString),
path = relativePath,
filename = description.filename,
mediaType = description.mediaType,
bytes = ioResult.count,
digest = digest,
origin = Client
)
)
)
case _ =>
Future.failed(new IllegalArgumentException("File was not written"))
})
case _ =>
Future.failed(new IllegalArgumentException("File was not written"))
}
)
).mapError {
case _: FileAlreadyExistsException => ResourceAlreadyExists(fullPath.toString)
case err => UnexpectedSaveError(fullPath.toString, err.getMessage)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote

import akka.actor.ActorSystem
import akka.http.scaladsl.model.BodyPartEntity
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileAttributes, FileDescription}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
Expand All @@ -10,7 +11,6 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFil
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.SaveFileRejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.model.RemoteDiskStorageFileAttributes
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import ch.epfl.bluebrain.nexus.delta.sdk.model.identities.AuthToken
import monix.bio.IO
Expand All @@ -25,10 +25,10 @@ class RemoteDiskStorageSaveFile(storage: RemoteDiskStorage)(implicit

override def apply(
description: FileDescription,
source: AkkaSource
entity: BodyPartEntity
): IO[SaveFileRejection, FileAttributes] = {
val path = intermediateFolders(storage.project, description.uuid, description.filename)
client.createFile(storage.value.folder, path, source).map {
client.createFile(storage.value.folder, path, entity).map {
case RemoteDiskStorageFileAttributes(location, bytes, digest, mediaType) =>
FileAttributes(
uuid = description.uuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote

import akka.actor.ActorSystem
import akka.http.scaladsl.client.RequestBuilding._
import akka.http.scaladsl.model.ContentTypes.`application/octet-stream`
import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.BodyPartEntity
import akka.http.scaladsl.model.Multipart.FormData
import akka.http.scaladsl.model.Multipart.FormData.BodyPart
import akka.http.scaladsl.model.StatusCodes._
Expand All @@ -26,6 +25,7 @@ import io.circe.generic.semiauto.deriveDecoder
import io.circe.syntax._
import io.circe.{Decoder, Json}
import monix.bio.{IO, UIO}

import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -72,13 +72,12 @@ final class RemoteDiskStorageClient(baseUri: BaseUri)(implicit client: HttpClien
* @param source
* the file content
*/
def createFile(bucket: Label, relativePath: Path, source: AkkaSource)(implicit
def createFile(bucket: Label, relativePath: Path, entity: BodyPartEntity)(implicit
cred: Option[AuthToken]
): IO[SaveFileRejection, RemoteDiskStorageFileAttributes] = {
val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / relativePath
val bodyPartEntity = HttpEntity.IndefiniteLength(`application/octet-stream`, source)
val filename = relativePath.lastSegment.getOrElse("filename")
val multipartForm = FormData(BodyPart("file", bodyPartEntity, Map("filename" -> filename))).toEntity()
val endpoint = baseUri.endpoint / "buckets" / bucket.value / "files" / relativePath
val filename = relativePath.lastSegment.getOrElse("filename")
val multipartForm = FormData(BodyPart("file", entity, Map("filename" -> filename))).toEntity()
client.fromJsonTo[RemoteDiskStorageFileAttributes](Put(endpoint, multipartForm).withCredentials).mapError {
case HttpClientStatusError(_, `Conflict`, _) =>
SaveFileRejection.ResourceAlreadyExists(relativePath.toString)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path.Slash
import akka.stream.alpakka.s3.{S3Attributes, S3Exception}
import akka.http.scaladsl.model.{BodyPartEntity, Uri}
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{S3Attributes, S3Exception}
import akka.stream.scaladsl.Sink
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{FileAttributes, FileDescription}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.sdk.AkkaSource
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.S3Storage
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.SaveFile.{digestSink, intermediateFolders, sizeSink}
Expand All @@ -27,7 +26,7 @@ final class S3StorageSaveFile(storage: S3Storage)(implicit config: StorageTypeCo

override def apply(
description: FileDescription,
source: AkkaSource
entity: BodyPartEntity
): IO[SaveFileRejection, FileAttributes] = {
val attributes = S3Attributes.settings(storage.value.alpakkaSettings(config))
val path = intermediateFolders(storage.project, description.uuid, description.filename)
Expand All @@ -39,7 +38,7 @@ final class S3StorageSaveFile(storage: S3Storage)(implicit config: StorageTypeCo
.runWith(Sink.last)
.flatMap {
case None =>
source.runWith(SinkUtils.combineMat(digestSink(storage.value.algorithm), sizeSink, s3Sink) {
entity.dataBytes.runWith(SinkUtils.combineMat(digestSink(storage.value.algorithm), sizeSink, s3Sink) {
case (digest, bytes, s3Result) =>
Future.successful(
FileAttributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ class FormDataExtractorSpec
)
.toEntity()

val expectedDescription = FileDescription(uuid, "file.txt", Some(`text/plain(UTF-8)`))
val (description, source) = extractor(iri, entity, 179, None).accepted
val expectedDescription = FileDescription(uuid, "file.txt", Some(`text/plain(UTF-8)`))
val (description, resultEntity) = extractor(iri, entity, 179, None).accepted
description shouldEqual expectedDescription
consume(source) shouldEqual content
consume(resultEntity.dataBytes) shouldEqual content
}

"fail to be extracted if no file part exists found" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes.`text/plain(UTF-8)`
import akka.http.scaladsl.model.Uri
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.{HttpEntity, Uri}
import akka.testkit.TestKit
import akka.util.ByteString
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
Expand Down Expand Up @@ -51,12 +49,12 @@ class DiskStorageSaveFileSpec
val storage = DiskStorage(iri, project, value, Map.empty, Secret(Json.obj()))
val uuid = UUID.fromString("8049ba90-7cc6-4de5-93a1-802c04200dcc")
val content = "file content"
val source = Source(content.map(c => ByteString(c.toString)))
val entity = HttpEntity(content)

"save a file to a volume" in {
val description = FileDescription(uuid, "myfile.txt", Some(`text/plain(UTF-8)`))

val attributes = storage.saveFile.apply(description, source).accepted
val attributes = storage.saveFile.apply(description, entity).accepted

Files.readString(file.value) shouldEqual content

Expand All @@ -78,7 +76,7 @@ class DiskStorageSaveFileSpec

"fail attempting to save the same file again" in {
val description = FileDescription(uuid, "myfile.txt", Some(`text/plain(UTF-8)`))
storage.saveFile.apply(description, source).rejectedWith[ResourceAlreadyExists]
storage.saveFile.apply(description, entity).rejectedWith[ResourceAlreadyExists]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes.`text/plain(UTF-8)`
import akka.http.scaladsl.model.Uri
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.{HttpEntity, Uri}
import akka.testkit.TestKit
import akka.util.ByteString
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin.Client
Expand Down Expand Up @@ -77,7 +75,7 @@ class RemoteStorageSaveAndFetchFileSpec(docker: RemoteStorageDocker)

"RemoteDiskStorage operations" should {
val content = "file content"
val source = Source(content.map(c => ByteString(c.toString)))
val entity = HttpEntity(content)

val attributes = FileAttributes(
uuid,
Expand All @@ -92,7 +90,7 @@ class RemoteStorageSaveAndFetchFileSpec(docker: RemoteStorageDocker)

"save a file to a folder" in {
val description = FileDescription(uuid, filename, Some(`text/plain(UTF-8)`))
storage.saveFile.apply(description, source).accepted shouldEqual attributes
storage.saveFile.apply(description, entity).accepted shouldEqual attributes
}

"fetch a file from a folder" in {
Expand All @@ -113,7 +111,7 @@ class RemoteStorageSaveAndFetchFileSpec(docker: RemoteStorageDocker)

"fail attempting to save the same file again" in {
val description = FileDescription(uuid, "myfile.txt", Some(`text/plain(UTF-8)`))
storage.saveFile.apply(description, source).rejectedWith[ResourceAlreadyExists]
storage.saveFile.apply(description, entity).rejectedWith[ResourceAlreadyExists]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes.`text/plain(UTF-8)`
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.{HttpEntity, StatusCodes, Uri}
import akka.testkit.TestKit
import akka.util.ByteString
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.{ComputedDigest, NotComputedDigest}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.DigestAlgorithm
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.AkkaSourceHelpers
Expand Down Expand Up @@ -56,7 +54,7 @@ class RemoteStorageClientSpec(docker: RemoteStorageDocker)

implicit val cred: Option[AuthToken] = None
val content = RemoteStorageDocker.Content
val source = Source(content.map(c => ByteString(c.toString)))
val entity = HttpEntity(content)
val attributes = RemoteDiskStorageFileAttributes(
location = s"file:///app/$BucketName/nexus/my/file.txt",
bytes = 12,
Expand All @@ -75,7 +73,7 @@ class RemoteStorageClientSpec(docker: RemoteStorageDocker)
}

"create a file" in {
client.createFile(bucket, Uri.Path("my/file.txt"), source).accepted shouldEqual attributes
client.createFile(bucket, Uri.Path("my/file.txt"), entity).accepted shouldEqual attributes
}

"get a file" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes.`text/plain(UTF-8)`
import akka.http.scaladsl.model.Uri
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.{HttpEntity, Uri}
import akka.testkit.TestKit
import akka.util.ByteString
import ch.epfl.bluebrain.nexus.delta.kernel.Secret
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.Digest.ComputedDigest
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.FileAttributes.FileAttributesOrigin
Expand Down Expand Up @@ -88,12 +86,12 @@ class S3StorageLinkFileSpec(docker: MinioDocker)

"S3Storage linking operations" should {
val content = "file content"
val source = Source(content.map(c => ByteString(c.toString)))
val entity = HttpEntity(content)

val description = FileDescription(uuid, filename, Some(`text/plain(UTF-8)`))

"succeed" in {
storage.saveFile.apply(description, source).accepted shouldEqual attributes
storage.saveFile.apply(description, entity).accepted shouldEqual attributes

val linkAttributes = attributes.copy(origin = FileAttributesOrigin.Storage)
storage.linkFile.apply(attributes.path, description).accepted shouldEqual linkAttributes
Expand Down
Loading

0 comments on commit 163b273

Please sign in to comment.