From 10c688f06785d8a471a30124097a566776ec0e97 Mon Sep 17 00:00:00 2001 From: Skyler Calaman <54462713+Blckbrry-Pi@users.noreply.github.com> Date: Fri, 29 Mar 2024 20:02:35 -0400 Subject: [PATCH] feat: Create `uploads` module --- modules/uploads/config.ts | 23 ++ .../migration.sql | 27 ++ .../uploads/db/migrations/migration_lock.toml | 3 + modules/uploads/db/schema.prisma | 33 ++ modules/uploads/module.yaml | 45 +++ modules/uploads/scripts/complete.ts | 140 +++++++++ modules/uploads/scripts/delete.ts | 86 ++++++ modules/uploads/scripts/get.ts | 48 +++ .../uploads/scripts/get_public_file_urls.ts | 77 +++++ modules/uploads/scripts/list_for_user.ts | 44 +++ modules/uploads/scripts/prepare.ts | 168 ++++++++++ modules/uploads/tests/e2e.ts | 100 ++++++ modules/uploads/tests/multipart.ts | 123 ++++++++ modules/uploads/utils/bucket.ts | 286 ++++++++++++++++++ modules/uploads/utils/data_size.ts | 35 +++ modules/uploads/utils/env.ts | 40 +++ modules/uploads/utils/types.ts | 81 +++++ tests/basic/backend.yaml | 10 + 18 files changed, 1369 insertions(+) create mode 100644 modules/uploads/config.ts create mode 100644 modules/uploads/db/migrations/20240419133543_initial_setup/migration.sql create mode 100644 modules/uploads/db/migrations/migration_lock.toml create mode 100644 modules/uploads/db/schema.prisma create mode 100644 modules/uploads/module.yaml create mode 100644 modules/uploads/scripts/complete.ts create mode 100644 modules/uploads/scripts/delete.ts create mode 100644 modules/uploads/scripts/get.ts create mode 100644 modules/uploads/scripts/get_public_file_urls.ts create mode 100644 modules/uploads/scripts/list_for_user.ts create mode 100644 modules/uploads/scripts/prepare.ts create mode 100644 modules/uploads/tests/e2e.ts create mode 100644 modules/uploads/tests/multipart.ts create mode 100644 modules/uploads/utils/bucket.ts create mode 100644 modules/uploads/utils/data_size.ts create mode 100644 modules/uploads/utils/env.ts create mode 100644 modules/uploads/utils/types.ts diff --git a/modules/uploads/config.ts b/modules/uploads/config.ts new file mode 100644 index 00000000..43ac4ca6 --- /dev/null +++ b/modules/uploads/config.ts @@ -0,0 +1,23 @@ +import { UploadSize } from "./utils/data_size.ts"; + +export interface Config { + maxUploadSize: UploadSize; + maxMultipartUploadSize: UploadSize; + maxFilesPerUpload?: number; + defaultMultipartChunkSize?: UploadSize; + + s3: { + bucketName: string; + region: string; + endpoint: string; + + accessKeyId?: string; + secretAccessKey?: string; + }; +} + +export const DEFAULT_MAX_FILES_PER_UPLOAD = 10; + +export const DEFAULT_MAX_UPLOAD_SIZE: UploadSize = "30mib"; +export const DEFAULT_MAX_MULTIPART_UPLOAD_SIZE: UploadSize = "10gib"; +export const DEFAULT_MULTIPART_CHUNK_SIZE: UploadSize = "10mib"; diff --git a/modules/uploads/db/migrations/20240419133543_initial_setup/migration.sql b/modules/uploads/db/migrations/20240419133543_initial_setup/migration.sql new file mode 100644 index 00000000..73227e16 --- /dev/null +++ b/modules/uploads/db/migrations/20240419133543_initial_setup/migration.sql @@ -0,0 +1,27 @@ +-- CreateTable +CREATE TABLE "Upload" ( + "id" UUID NOT NULL, + "userId" UUID, + "bucket" TEXT NOT NULL, + "contentLength" BIGINT NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + "completedAt" TIMESTAMP(3), + "deletedAt" TIMESTAMP(3), + + CONSTRAINT "Upload_pkey" PRIMARY KEY ("id") +); + +-- CreateTable +CREATE TABLE "Files" ( + "uploadId" UUID NOT NULL, + "multipartUploadId" TEXT, + "path" TEXT NOT NULL, + "mime" TEXT, + "contentLength" BIGINT NOT NULL, + + CONSTRAINT "Files_pkey" PRIMARY KEY ("uploadId","path") +); + +-- AddForeignKey +ALTER TABLE "Files" ADD CONSTRAINT "Files_uploadId_fkey" FOREIGN KEY ("uploadId") REFERENCES "Upload"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/modules/uploads/db/migrations/migration_lock.toml b/modules/uploads/db/migrations/migration_lock.toml new file mode 100644 index 00000000..fbffa92c --- /dev/null +++ b/modules/uploads/db/migrations/migration_lock.toml @@ -0,0 +1,3 @@ +# Please do not edit this file manually +# It should be added in your version-control system (i.e. Git) +provider = "postgresql" \ No newline at end of file diff --git a/modules/uploads/db/schema.prisma b/modules/uploads/db/schema.prisma new file mode 100644 index 00000000..e29bb4e6 --- /dev/null +++ b/modules/uploads/db/schema.prisma @@ -0,0 +1,33 @@ +// Do not modify this `datasource` block +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") +} + +model Upload { + id String @id @default(uuid()) @db.Uuid + userId String? @db.Uuid + + bucket String + contentLength BigInt + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + completedAt DateTime? + deletedAt DateTime? + + files Files[] @relation("Files") +} + +model Files { + uploadId String @db.Uuid + upload Upload @relation("Files", fields: [uploadId], references: [id]) + + multipartUploadId String? + + path String + mime String? + contentLength BigInt + + @@id([uploadId, path]) +} diff --git a/modules/uploads/module.yaml b/modules/uploads/module.yaml new file mode 100644 index 00000000..179e8ad9 --- /dev/null +++ b/modules/uploads/module.yaml @@ -0,0 +1,45 @@ +scripts: + prepare: + name: Prepare Upload + description: Prepare an upload batch for data transfer + complete: + name: Complete Upload + description: Alert the module that the upload has been completed + get: + name: Get Upload Metadata + description: Get the metadata (including contained files) for specified upload IDs + get_public_file_urls: + name: Get File Link + description: Get presigned download links for each of the specified files + list_for_user: + name: List Uploads for Users + description: Get a list of upload IDs associated with the specified user IDs + delete: + name: Delete Upload + description: Removes the upload and deletes the files from the bucket +errors: + no_files: + name: No Files Provided + description: An upload must have at least 1 file + too_many_files: + name: Too Many Files Provided + description: There is a limit to how many files can be put into a single upload (see config) + duplicate_paths: + name: Duplicate Paths Provided + description: An upload cannot contain 2 files with the same paths (see `cause` for offending paths) + size_limit_exceeded: + name: Combined Size Limit Exceeded + description: There is a maximum total size per upload (see config) + upload_not_found: + name: Upload Not Found + description: The provided upload ID didn't match any known existing uploads + upload_already_completed: + name: Upload Already completed + description: \`complete\` was already called on this upload + s3_not_configured: + name: S3 Not Configured + description: The S3 bucket is not configured (missing env variables) + multipart_upload_completion_fail: + name: Multipart Upload Completion Failure + description: The multipart upload failed to complete (see `cause` for more information) +dependencies: {} diff --git a/modules/uploads/scripts/complete.ts b/modules/uploads/scripts/complete.ts new file mode 100644 index 00000000..097676b6 --- /dev/null +++ b/modules/uploads/scripts/complete.ts @@ -0,0 +1,140 @@ +import { RuntimeError, ScriptContext } from "../_gen/scripts/complete.ts"; +import { keyExists, getMultipartUploadParts, completeMultipartUpload } from "../utils/bucket.ts"; +import { getS3EnvConfig } from "../utils/env.ts"; +import { + prismaToOutputWithFiles, + getKey, + Upload, +} from "../utils/types.ts"; + +export interface Request { + uploadId: string; +} + +export interface Response { + upload: Upload; +} + +export async function run( + ctx: ScriptContext, + req: Request, +): Promise { + const s3 = getS3EnvConfig(ctx.userConfig.s3); + if (!s3) throw new RuntimeError("s3_not_configured"); + + const newUpload = await ctx.db.$transaction(async (db) => { + // Find the upload by ID + const upload = await db.upload.findFirst({ + where: { + id: req.uploadId, + }, + select: { + id: true, + userId: true, + bucket: true, + contentLength: true, + files: true, + createdAt: true, + updatedAt: true, + completedAt: true, + }, + }); + + // Error if the upload wasn't prepared + if (!upload) { + throw new RuntimeError( + "upload_not_found", + { + meta: { uploadId: req.uploadId }, + }, + ); + } + + // Check with S3 to see if the files were uploaded + const fileExistencePromises = upload.files.map( + async file => { + // If the file was uploaded in parts, complete the multipart upload + if (file.multipartUploadId) { + try { + const parts = await getMultipartUploadParts( + s3, + getKey(upload.id, file.path), + file.multipartUploadId, + ); + if (parts.length === 0) return false; + + await completeMultipartUpload( + s3, + getKey(upload.id, file.path), + file.multipartUploadId, + parts, + ); + + + } catch (e) { + throw new RuntimeError( + "multipart_upload_completion_fail", + { cause: e }, + ) + } + + return true; + } else { + // Check if the file exists + return await keyExists(s3, getKey(upload.id, file.path)) + } + + }, + ); + const fileExistence = await Promise.all(fileExistencePromises); + const filesAllExist = fileExistence.every(Boolean); + if (!filesAllExist) { + const missingFiles = upload.files.filter((_, i) => !fileExistence[i]); + throw new RuntimeError( + "files_not_uploaded", + { + meta: { + uploadId: req.uploadId, + missingFiles: missingFiles.map((file) => file.path), + }, + }, + ); + } + + // Error if `complete` was already called with this ID + if (upload.completedAt !== null) { + throw new RuntimeError( + "upload_already_completed", + { + meta: { uploadId: req.uploadId }, + }, + ); + } + + // Update the upload to mark it as completed + const completedUpload = await db.upload.update({ + where: { + id: req.uploadId, + }, + data: { + completedAt: new Date().toISOString(), + }, + select: { + id: true, + userId: true, + bucket: true, + contentLength: true, + files: true, + createdAt: true, + updatedAt: true, + completedAt: true, + }, + }); + + return completedUpload; + }); + + return { + upload: prismaToOutputWithFiles(newUpload), + }; +} diff --git a/modules/uploads/scripts/delete.ts b/modules/uploads/scripts/delete.ts new file mode 100644 index 00000000..202555c3 --- /dev/null +++ b/modules/uploads/scripts/delete.ts @@ -0,0 +1,86 @@ +import { RuntimeError, ScriptContext } from "../_gen/scripts/delete.ts"; +import { getKey } from "../utils/types.ts"; +import { deleteKeys } from "../utils/bucket.ts"; +import { getS3EnvConfig } from "../utils/env.ts"; + +export interface Request { + uploadId: string; +} + +export interface Response { + bytesDeleted: string; +} + +export async function run( + ctx: ScriptContext, + req: Request, +): Promise { + const s3 = getS3EnvConfig(ctx.userConfig.s3); + if (!s3) throw new RuntimeError("s3_not_configured"); + + const bytesDeleted = await ctx.db.$transaction(async (db) => { + const upload = await db.upload.findFirst({ + where: { + id: req.uploadId, + completedAt: { not: null }, + deletedAt: null, + }, + select: { + id: true, + userId: true, + bucket: true, + contentLength: true, + files: true, + createdAt: true, + updatedAt: true, + completedAt: true, + }, + }); + if (!upload) { + throw new RuntimeError( + "upload_not_found", + { + meta: { + modified: false, + uploadId: req.uploadId, + }, + }, + ); + } + + const filesToDelete = upload.files.map((file) => + getKey(file.uploadId, file.path) + ); + const deleteResults = await deleteKeys(s3, filesToDelete); + + const failures = upload.files + .map((file, i) => [file, deleteResults[i]] as const) + .filter(([, successfullyDeleted]) => !successfullyDeleted) + .map(([file]) => file); + + if (failures.length) { + const failedPaths = JSON.stringify(failures.map((file) => file.path)); + throw new RuntimeError( + "failed_to_delete", + { + meta: { + modified: failures.length !== filesToDelete.length, + reason:`Failed to delete files with paths ${failedPaths}`, + }, + }, + ); + } + + await db.upload.update({ + where: { + id: req.uploadId, + }, + data: { + deletedAt: new Date().toISOString(), + }, + }); + + return upload.contentLength.toString(); + }); + return { bytesDeleted }; +} diff --git a/modules/uploads/scripts/get.ts b/modules/uploads/scripts/get.ts new file mode 100644 index 00000000..07f87fda --- /dev/null +++ b/modules/uploads/scripts/get.ts @@ -0,0 +1,48 @@ +import { ScriptContext } from "../_gen/scripts/get.ts"; +import { + PrismaUploadWithOptionalFiles, + UploadWithOptionalFiles, + prismaToOutput, +} from "../utils/types.ts"; + +export interface Request { + uploadIds: string[]; + includeFiles?: boolean; +} + +export interface Response { + uploads: UploadWithOptionalFiles[]; +} + +export async function run( + ctx: ScriptContext, + req: Request, +): Promise { + // Find uploads that match the IDs in the request + const dbUploads = await ctx.db.upload.findMany({ + where: { + id: { + in: req.uploadIds, + }, + completedAt: { not: null }, + deletedAt: null, + }, + select: { + id: true, + userId: true, + bucket: true, + contentLength: true, + files: !!req.includeFiles, + createdAt: true, + updatedAt: true, + completedAt: true, + }, + orderBy: { + id: "asc", + }, + }) as PrismaUploadWithOptionalFiles[]; + + return { + uploads: dbUploads.map(up => prismaToOutput(up)), + }; +} diff --git a/modules/uploads/scripts/get_public_file_urls.ts b/modules/uploads/scripts/get_public_file_urls.ts new file mode 100644 index 00000000..41252f52 --- /dev/null +++ b/modules/uploads/scripts/get_public_file_urls.ts @@ -0,0 +1,77 @@ +import { ScriptContext, RuntimeError } from "../_gen/scripts/get_public_file_urls.ts"; +import { getKey, UploadFile } from "../utils/types.ts"; +import { getPresignedGetUrl } from "../utils/bucket.ts"; +import { getS3EnvConfig } from "../utils/env.ts"; + +export interface Request { + files: { uploadId: string; path: string }[]; + validSecs?: number; +} + +export interface Response { + files: (UploadFile & { uploadId: string; url: string })[]; +} + +export async function run( + ctx: ScriptContext, + req: Request, +): Promise { + const s3 = getS3EnvConfig(ctx.userConfig.s3); + if (!s3) throw new RuntimeError("s3_not_configured"); + + const dbFiles = await ctx.db.files.findMany({ + where: { + uploadId: { + in: req.files.map((file) => file.uploadId), + }, + path: { + in: req.files.map((file) => file.path), + }, + upload: { + completedAt: { not: null }, + deletedAt: null, + }, + }, + select: { + uploadId: true, + path: true, + contentLength: true, + mime: true, + }, + }); + + const keys = new Set( + req.files.map((file) => getKey(file.uploadId, file.path)), + ); + const map = new Map( + dbFiles.map((file) => [getKey(file.uploadId, file.path), file]), + ); + for (const [mapKey] of map) { + // Remove any keys that don't have a corresponding file + if (!keys.has(mapKey)) map.delete(mapKey); + } + + // Create presigned URLs that can be accessed using a simple GET request + const formattedDownloadPromises = Array.from(map) + .map(([key, file]) => ({ + ...file, + url: getPresignedGetUrl( + s3, + key, + file.mime, + req.validSecs ?? 60 * 60, + ), + })) + .map(async (file) => ({ + ...file, + contentLength: file.contentLength.toString(), + url: await file.url, + })); + + // Wait for all presigned URLs to be created + const formattedUploads = await Promise.all(formattedDownloadPromises); + + return { + files: formattedUploads, + }; +} diff --git a/modules/uploads/scripts/list_for_user.ts b/modules/uploads/scripts/list_for_user.ts new file mode 100644 index 00000000..e14ffe73 --- /dev/null +++ b/modules/uploads/scripts/list_for_user.ts @@ -0,0 +1,44 @@ +import { ScriptContext } from "../_gen/scripts/list_for_user.ts"; + +export interface Request { + userIds: string[]; +} + +export interface Response { + uploadIds: Record; +} + +export async function run( + ctx: ScriptContext, + req: Request, +): Promise { + // Find uploads that match the IDs in the request + const dbUploads = await ctx.db.upload.findMany({ + where: { + userId: { + in: req.userIds, + }, + completedAt: { not: null }, + deletedAt: null, + }, + select: { + id: true, + userId: true, + }, + orderBy: { + id: "asc", + }, + }); + + // Map each userId to an array of upload IDs with that associated user ID + // TODO: There may be a more efficient way to do this? Not sure. + const uploadIds = Object.fromEntries(req.userIds.map((userId) => { + const uploads = dbUploads.filter((upload) => upload.userId === userId); + return [ + userId, + { uploadIds: uploads.map((upload) => upload.id) }, + ] as const; + })); + + return { uploadIds }; +} diff --git a/modules/uploads/scripts/prepare.ts b/modules/uploads/scripts/prepare.ts new file mode 100644 index 00000000..946cda27 --- /dev/null +++ b/modules/uploads/scripts/prepare.ts @@ -0,0 +1,168 @@ +import { RuntimeError, ScriptContext } from "../_gen/scripts/prepare.ts"; +import { + PresignedUpload, + prismaToOutput, + UploadFile, +} from "../utils/types.ts"; +import { getPresignedPutUrl, getPresignedMultipartUploadUrls } from "../utils/bucket.ts"; +import { + DEFAULT_MAX_FILES_PER_UPLOAD, + DEFAULT_MAX_UPLOAD_SIZE, + DEFAULT_MAX_MULTIPART_UPLOAD_SIZE, + DEFAULT_MULTIPART_CHUNK_SIZE, +} from "../config.ts"; +import { getS3EnvConfig } from "../utils/env.ts"; +import { getBytes } from "../utils/data_size.ts"; + +export interface Request { + userId?: string; + files: (UploadFile & { multipart: boolean })[]; +} + +export interface Response { + upload: PresignedUpload; +} + +export async function run( + ctx: ScriptContext, + req: Request, +): Promise { + const s3 = getS3EnvConfig(ctx.userConfig.s3); + if (!s3) throw new RuntimeError("s3_not_configured"); + + // Ensure there are files in the upload + if (req.files.length === 0) { + throw new RuntimeError("no_files"); + } + + // Ensure the number of files is within the limit + const maxFilesPerUpload = ctx.userConfig.maxFilesPerUpload ?? DEFAULT_MAX_FILES_PER_UPLOAD; + if (req.files.length > maxFilesPerUpload) { + throw new RuntimeError( + "too_many_files", + { + meta: { + maxFilesPerUpload, + count: req.files.length, + }, + }, + ); + } + + // Ensure there are no duplicate paths + const paths = new Set(); + const duplicates = new Set(); + for (const file of req.files) { + if (paths.has(file.path)) { + duplicates.add(file.path); + } + paths.add(file.path); + } + if (duplicates.size > 0) { + throw new RuntimeError("duplicate_paths", { + meta: { paths: Array.from(duplicates) }, + }); + } + + // Ensure the total content length is within the limit for single part uploads + const singlepartUploadContentLength = req.files.filter(f => !f.multipart).reduce( + (acc, file) => acc + BigInt(file.contentLength), + 0n, + ); + const maxSinglepartUploadSize = ctx.userConfig.maxUploadSize ?? DEFAULT_MAX_UPLOAD_SIZE; + if (singlepartUploadContentLength > getBytes(maxSinglepartUploadSize)) { + throw new RuntimeError( + "size_limit_exceeded", + { + meta: { + maxUploadSize: maxSinglepartUploadSize, + uploadContentLength: singlepartUploadContentLength, + }, + }, + ); + } + + // Ensure the total content length is within the limit for multipart uploads + const multipartUploadContentLength = req.files.filter(f => f.multipart).reduce( + (acc, file) => acc + BigInt(file.contentLength), + 0n, + ); + const maxMultipartUploadSize = ctx.userConfig.maxMultipartUploadSize ?? DEFAULT_MAX_MULTIPART_UPLOAD_SIZE; + if (multipartUploadContentLength > getBytes(maxMultipartUploadSize)) { + throw new RuntimeError( + "size_limit_exceeded", + { + meta: { + maxMultipartUploadSize: maxMultipartUploadSize, + multipartUploadContentLength, + }, + }, + ); + } + + const uploadContentLength = singlepartUploadContentLength + multipartUploadContentLength; + + const uploadId = crypto.randomUUID(); + const presignedInputFilePromises = req.files.map(async (file) => { + if (file.multipart) { + const chunkSize = ctx.userConfig.defaultMultipartChunkSize ?? DEFAULT_MULTIPART_CHUNK_SIZE; + const { urls, multipartUploadId } = await getPresignedMultipartUploadUrls(s3, uploadId, file, getBytes(chunkSize)); + + return { + ...file, + presignedUrls: urls, + multipartUploadId, + }; + } else { + const { urls } = await getPresignedPutUrl(s3, uploadId, file); + + return { + ...file, + presignedUrls: urls, + multipartUploadId: null, + }; + } + }); + const presignedInputFiles = await Promise.all(presignedInputFilePromises); + + // Format the input files for prisma + const inputFiles = presignedInputFiles.map((file) => ({ + path: file.path, + mime: file.mime, + contentLength: BigInt(file.contentLength), + multipartUploadId: file.multipartUploadId, + })); + + // Create the upload in the database + const upload = await ctx.db.upload.create({ + data: { + id: uploadId, + userId: req.userId, + bucket: s3.bucket, + contentLength: uploadContentLength, + files: { + create: inputFiles, + }, + }, + select: { + id: true, + userId: true, + bucket: true, + contentLength: true, + + files: true, + + createdAt: true, + updatedAt: true, + completedAt: true, + }, + }); + + + return { + upload: { + ...prismaToOutput(upload), + files: presignedInputFiles, + }, + }; +} diff --git a/modules/uploads/tests/e2e.ts b/modules/uploads/tests/e2e.ts new file mode 100644 index 00000000..cedd0546 --- /dev/null +++ b/modules/uploads/tests/e2e.ts @@ -0,0 +1,100 @@ +import { test, TestContext } from "../_gen/test.ts"; +import { + assert, + assertEquals, +} from "https://deno.land/std@0.220.0/assert/mod.ts"; +import { faker } from "https://deno.land/x/deno_faker@v1.0.3/mod.ts"; + +test("e2e", async (ctx: TestContext) => { + const path = faker.system.fileName(); + const contentLength = String(faker.random.number(100)); + const mime = faker.system.mimeType(); + + const fileData = crypto.getRandomValues(new Uint8Array(parseInt(contentLength))); + + // Tell the module the metadata about the upload. + const { upload: presigned } = await ctx.modules.uploads.prepare({ + files: [ + { + path, + contentLength, + mime, + multipart: false, + }, + ], + }); + + // Upload the data using the presigned URL(s) returned + const uploadPutReq = await fetch( + presigned.files[0].presignedUrls[0].url, + { + method: "PUT", + body: fileData, + }, + ); + assert(uploadPutReq.ok); + + // Tell the module that the module had completed uploading. + const { upload: completed } = await ctx.modules.uploads.complete({ + uploadId: presigned.id, + }); + + // Ensure the presigned and completed uploads are the same, except for + // expected timestamp differences + assertEquals({ + ...presigned, + files: presigned.files.map((file) => ({ + path: file.path, + contentLength: file.contentLength, + mime: file.mime, + })), + completedAt: null, + updatedAt: null, + }, { + ...completed, + files: completed.files.map((file) => ({ + path: file.path, + contentLength: file.contentLength, + mime: file.mime, + })), + completedAt: null, + updatedAt: null, + }); + + // Lookup the completed upload + const { uploads: [retrieved] } = await ctx.modules.uploads.get({ + uploadIds: [completed.id], + includeFiles: true, + }); + assertEquals(completed, retrieved); + + // Get presigned URLs to download the files from + const { files: [{ url: fileDownloadUrl }] } = await ctx.modules.uploads + .getPublicFileUrls({ + files: [{ uploadId: completed.id, path: path }], + }); + + // Download the files, and make sure the data matches + const fileDownloadReq = await fetch(fileDownloadUrl); + const fileDownloadData = new Uint8Array(await fileDownloadReq.arrayBuffer()); + assertEquals(fileData, fileDownloadData); + + // Delete the file and assert that the amount of bytes deleted matches + // what's expected + const { bytesDeleted } = await ctx.modules.uploads.delete({ + uploadId: completed.id, + }); + assertEquals(bytesDeleted, completed.contentLength); + assertEquals(bytesDeleted, presigned.contentLength); + assertEquals(bytesDeleted, retrieved?.contentLength); + assertEquals(parseInt(bytesDeleted), fileData.byteLength); + + // Check that the upload can't still be retrieved + const { uploads: uploadList } = await ctx.modules.uploads.get({ + uploadIds: [completed.id], + }); + assertEquals(uploadList, []); + + const fileDownloadReqAfterDelete = await fetch(fileDownloadUrl); + assert(!fileDownloadReqAfterDelete.ok); +}); diff --git a/modules/uploads/tests/multipart.ts b/modules/uploads/tests/multipart.ts new file mode 100644 index 00000000..6ccaef1c --- /dev/null +++ b/modules/uploads/tests/multipart.ts @@ -0,0 +1,123 @@ +import { test, TestContext } from "../_gen/test.ts"; +import { + assert, + assertEquals, +} from "https://deno.land/std@0.220.0/assert/mod.ts"; +import { faker } from "https://deno.land/x/deno_faker@v1.0.3/mod.ts"; + +function randomBuffer(size: number): Uint8Array { + const buffer = new Uint8Array(size); + + const bytesPerChunk = 1024; + + for (let i = 0; i < size; i += bytesPerChunk) { + crypto.getRandomValues(buffer.slice(i, i + bytesPerChunk)); + } + + return buffer; +} + +test("multipart uploads", async (ctx: TestContext) => { + const path = faker.system.fileName(); + const contentLength = 20_000_000; // 20MB + const mime = faker.system.mimeType(); + + const fileData = randomBuffer(contentLength); + + // Tell the module the metadata about the upload. + const { upload: presigned } = await ctx.modules.uploads.prepare({ + files: [ + { + path, + contentLength: contentLength.toString(), + mime, + multipart: true, + }, + ], + }); + + + const { files: [{ presignedUrls }] } = presigned; + + for (const chunk of presignedUrls) { + // Upload the data using the presigned URL(s) returned + const uploadPutReq = await fetch( + chunk.url, + { + method: "PUT", + body: fileData.slice( + parseInt(chunk.offset), + parseInt(chunk.offset) + parseInt(chunk.contentLength), + ), + }, + ); + assert(uploadPutReq.ok); + } + + // Tell the module that the module had completed uploading. + const { upload: completed } = await ctx.modules.uploads.complete({ + uploadId: presigned.id, + }); + + // Ensure the presigned and completed uploads are the same, except for + // expected timestamp differences + assertEquals({ + ...presigned, + files: presigned.files.map((file) => ({ + path: file.path, + contentLength: file.contentLength, + mime: file.mime, + })), + completedAt: null, + updatedAt: null, + }, { + ...completed, + files: completed.files.map((file) => ({ + path: file.path, + contentLength: file.contentLength, + mime: file.mime, + })), + completedAt: null, + updatedAt: null, + }); + + // Lookup the completed upload + const { uploads: [retrieved] } = await ctx.modules.uploads.get({ + uploadIds: [completed.id], + includeFiles: true, + }); + assertEquals(completed, retrieved); + + // Get presigned URLs to download the files from + const { files: [{ url: fileDownloadUrl }] } = await ctx.modules.uploads + .getPublicFileUrls({ + files: [{ uploadId: completed.id, path: path }], + }); + + // Download the files, and make sure the data matches + const fileDownloadReq = await fetch(fileDownloadUrl); + const fileDownloadData = new Uint8Array(await fileDownloadReq.arrayBuffer()); + + const fileDataHash = await crypto.subtle.digest("SHA-1", fileData); + const fileDownloadDataHash = await crypto.subtle.digest("SHA-1", fileDownloadData); + assertEquals(fileDataHash, fileDownloadDataHash); + + // Delete the file and assert that the amount of bytes deleted matches + // what's expected + const { bytesDeleted } = await ctx.modules.uploads.delete({ + uploadId: completed.id, + }); + assertEquals(bytesDeleted, completed.contentLength); + assertEquals(bytesDeleted, presigned.contentLength); + assertEquals(bytesDeleted, retrieved?.contentLength); + assertEquals(parseInt(bytesDeleted), fileData.byteLength); + + // Check that the upload can't still be retrieved + const { uploads: uploadList } = await ctx.modules.uploads.get({ + uploadIds: [completed.id], + }); + assertEquals(uploadList, []); + + const fileDownloadReqAfterDelete = await fetch(fileDownloadUrl); + assert(!fileDownloadReqAfterDelete.ok); +}); diff --git a/modules/uploads/utils/bucket.ts b/modules/uploads/utils/bucket.ts new file mode 100644 index 00000000..c58a065b --- /dev/null +++ b/modules/uploads/utils/bucket.ts @@ -0,0 +1,286 @@ +import { + HeadObjectCommand, // Check if object exists + GetObjectCommand, // Get URL uploaded file + DeleteObjectCommand, // Delete object + DeleteObjectsCommand, // Delete multiple objects + PutObjectCommand, // Get URL to upload single part file + + CreateMultipartUploadCommand, // Initiate a multipart upload + UploadPartCommand, // Create URL for multipart upload chunk + ListPartsCommand, // List parts of multipart upload + CompleteMultipartUploadCommand, // Complete multipart upload + + S3Client, +} from "npm:@aws-sdk/client-s3"; +import { getSignedUrl } from "npm:@aws-sdk/s3-request-presigner"; + +import { S3Config } from "./env.ts"; +import { getKey, UploadFile } from "./types.ts"; +import { PresignedUrl } from "./types.ts"; + +/** + * Create a new S3 client instance based on the S3Config + * + * @param config S3 Config + * @returns A new S3 client + */ +export function getClient(config: S3Config) { + return new S3Client({ + region: config.region, + credentials: { + accessKeyId: config.accessKeyId, + secretAccessKey: config.secretAccessKey, + }, + endpoint: config.endpoint, + defaultUserAgentProvider: () => + Promise.resolve([ + ["opengb/uploads", "0.1.0"], + ]), + }); +} + +/** + * Create a presigned URL to perform a SINGLE PART upload to S3 + * + * @param config S3 Config + * @param uploadId The ID of the Upload batch + * @param file The file to upload + * @param expirySeconds How long the upload URL should be valid for + * @returns A URL that exposes an HTTP PUT endpoint to upload the file + */ +export async function getPresignedPutUrl( + config: S3Config, + uploadId: string, + file: UploadFile, + expirySeconds = 60 * 60, +): Promise<{ key: string, urls: PresignedUrl[] }> { + const client = getClient(config); + + const key = getKey(uploadId, file.path); + const command = new PutObjectCommand({ + Bucket: config.bucket, + Key: key, + ContentType: file.mime ?? undefined, + ContentLength: parseInt(file.contentLength), + }); + const url = await getSignedUrl( + client, + command, + { expiresIn: expirySeconds }, + ); + + return { + urls: [{ + url, + partNumber: 0, + offset: "0", + contentLength: file.contentLength, + }], + key, + }; +} + +export async function getPresignedMultipartUploadUrls( + config: S3Config, + uploadId: string, + file: UploadFile, + chunkSize: bigint, + expirySeconds = 60 * 60 * 6, +): Promise<{ key: string, urls: PresignedUrl[], multipartUploadId: string }> { + const client = getClient(config); + + const key = getKey(uploadId, file.path); + const command = new CreateMultipartUploadCommand({ + Bucket: config.bucket, + Key: key, + ContentType: file.mime ?? undefined, + + }); + const multipartUpload = await client.send(command); + + const id = multipartUpload.UploadId; + if (!id) throw new Error("Multipart upload ID not returned"); + + // Round up to the nearest chunk count + const chunkCount = (BigInt(file.contentLength) + chunkSize - 1n) / chunkSize; + + const urls: PresignedUrl[] = []; + for (let i = 0n; i < chunkCount; i++) { + const offset = i * chunkSize; + const remaining = BigInt(file.contentLength) - offset; + const length = remaining < chunkSize ? remaining : chunkSize; + const partNumber = Number(i) + 1; // S3's part number is 1-based because reasons + + const command = new UploadPartCommand({ + Bucket: config.bucket, + Key: key, + UploadId: id, + ContentLength: Number(length), + PartNumber: partNumber, + }); + + const chunkPutUrl = await getSignedUrl( + client, + command, + { expiresIn: expirySeconds }, + ); + + urls.push({ + url: chunkPutUrl, + partNumber, + offset: offset.toString(), + contentLength: length.toString(), + }); + } + + return { + urls, + multipartUploadId: id, + key, + }; +} + + +export async function getMultipartUploadParts( + config: S3Config, + key: string, + multipartUploadId: string, +): Promise<{ PartNumber: number, ETag: string }[]> { + const client = getClient(config); + + console.log(multipartUploadId); + const command = new ListPartsCommand({ + Bucket: config.bucket, + Key: key, + UploadId: multipartUploadId, + }); + + const response = await client.send(command); + if (!response.Parts) return []; + + return response.Parts.map((part) => ({ + PartNumber: part.PartNumber ?? 0, + ETag: part.ETag ?? "", + })); +} + +export async function completeMultipartUpload( + config: S3Config, + key: string, + multipartUploadId: string, + parts: { PartNumber: number, ETag: string }[], +): Promise { + const client = getClient(config); + + const command = new CompleteMultipartUploadCommand({ + Bucket: config.bucket, + Key: key, + UploadId: multipartUploadId, + MultipartUpload: { + Parts: parts, + }, + }); + + await client.send(command); +} + +/** + * Create a presigned URL to get a file from S3 + * @param config S3 Config + * @param key A combination of the upload ID and the file path. (See `getKey` + * function) + * @param expirySeconds How long the URL should be valid for + * @returns A URL that exposes an HTTP GET endpoint to download the file + */ +export async function getPresignedGetUrl( + config: S3Config, + key: string, + mime: string | null, + expirySeconds = 60 * 60, +) { + const client = getClient(config); + + const command = new GetObjectCommand({ + Bucket: config.bucket, + Key: key, + ResponseContentType: mime ?? undefined, + }); + const url = await getSignedUrl( + client, + command, + { expiresIn: expirySeconds }, + ); + + return url; +}; + +/** + * Check if a key exists in the S3 bucket. (Used on `complete` script to check + * if the upload was actually completed.) + * + * @param config S3 Config + * @param key A combination of the upload ID and the file path to check. (See + * `getKey`) + * @returns Whether the key exists in the S3 bucket + */ +export async function keyExists( + config: S3Config, + key: string, +): Promise { + const client = getClient(config); + + const command = new HeadObjectCommand({ + Bucket: config.bucket, + Key: key, + }); + + try { + await client.send(command); + return true; + } catch (error) { + if (error.name === "NotFound") { + return false; + } + throw error; + } +} + +export async function deleteKey( + config: S3Config, + key: string, +): Promise { + const client = getClient(config); + + const command = new DeleteObjectCommand({ + Bucket: config.bucket, + Key: key, + }); + + const response = await client.send(command); + return response.DeleteMarker ?? false; +} + +export async function deleteKeys( + config: S3Config, + keys: string[], +): Promise { + const client = getClient(config); + + const command = new DeleteObjectsCommand({ + Bucket: config.bucket, + Delete: { + Objects: keys.map((key) => ({ Key: key })), + }, + }); + + const response = await client.send(command); + if (response.Deleted) { + const deletedKeys = response.Deleted.flatMap((obj) => + obj.Key ? [obj.Key] : [] + ); + const keySet = new Set(deletedKeys); + return keys.map((key) => keySet.has(key)); + } else { + return keys.map(() => false); + } +} diff --git a/modules/uploads/utils/data_size.ts b/modules/uploads/utils/data_size.ts new file mode 100644 index 00000000..e08a5a78 --- /dev/null +++ b/modules/uploads/utils/data_size.ts @@ -0,0 +1,35 @@ +type Units = "b" | "kb" | "mb" | "gb" | "tb" | "kib" | "mib" | "gib" | "tib"; + +export type UploadSize = `${number}${Units}`; + +export function getBytes(size: UploadSize): bigint { + const b = 1n; + const kb = 1000n * b; + const mb = 1000n * kb; + const gb = 1000n * mb; + const tb = 1000n * gb; + + const kib = 1024n * b; + const mib = 1024n * kib; + const gib = 1024n * mib; + const tib = 1024n * gib; + + switch (size.slice(-3)) { + case "kib": return BigInt(size.slice(0, -3)) * kib; + case "mib": return BigInt(size.slice(0, -3)) * mib; + case "gib": return BigInt(size.slice(0, -3)) * gib; + case "tib": return BigInt(size.slice(0, -3)) * tib; + } + + switch (size.slice(-2)) { + case "kb": return BigInt(size.slice(0, -2)) * kb; + case "mb": return BigInt(size.slice(0, -2)) * mb; + case "gb": return BigInt(size.slice(0, -2)) * gb; + case "tb": return BigInt(size.slice(0, -2)) * tb; + } + + return BigInt(size.slice(0, -1)) * b; +}; + + + diff --git a/modules/uploads/utils/env.ts b/modules/uploads/utils/env.ts new file mode 100644 index 00000000..fbf72ef3 --- /dev/null +++ b/modules/uploads/utils/env.ts @@ -0,0 +1,40 @@ +import { Config } from "../config.ts"; + +export interface S3EnvConfig { + S3_AWS_ACCESS_KEY_ID: string; + S3_AWS_SECRET_ACCESS_KEY: string; +} + +export interface S3Config { + bucket: string; + region: string; + endpoint: string; + + accessKeyId: string; + secretAccessKey: string; +} + +export function getS3EnvConfig(s3Cfg: Config["s3"]): S3Config | null { + const { + S3_AWS_ACCESS_KEY_ID, + S3_AWS_SECRET_ACCESS_KEY, + } = Deno.env.toObject() as Partial; + + const accessKeyId = S3_AWS_ACCESS_KEY_ID ?? s3Cfg.accessKeyId; + const secretAccessKey = S3_AWS_SECRET_ACCESS_KEY ?? s3Cfg.secretAccessKey; + + if ( + !accessKeyId || + !secretAccessKey + ) { + return null; + } + + return { + bucket: s3Cfg.bucketName, + region: s3Cfg.region, + endpoint: s3Cfg.endpoint, + accessKeyId, + secretAccessKey, + }; +} diff --git a/modules/uploads/utils/types.ts b/modules/uploads/utils/types.ts new file mode 100644 index 00000000..03ba37d1 --- /dev/null +++ b/modules/uploads/utils/types.ts @@ -0,0 +1,81 @@ +import { + Files as PrismaFiles, + Upload as _PrismaUpload, +} from "../_gen/prisma/default.d.ts"; + +interface PrismaUpload extends Omit<_PrismaUpload, "deletedAt"> { + files: PrismaFiles[]; +} + +export interface Upload { + id: string; + userId: string | null; + + bucket: string; + contentLength: string; + + files: UploadFile[]; + + createdAt: string; + updatedAt: string; + completedAt: string | null; +} + +export interface UploadFile { + path: string; + mime: string | null; + contentLength: string; +} + +export interface PresignedUpload extends Omit { + files: PresignedUploadFile[]; +} + +export interface PresignedUploadFile extends UploadFile { + presignedUrls: PresignedUrl[]; +} + +export interface PresignedUrl { + url: string; + + partNumber: number; + contentLength: string; + offset: string; +} + +export type UploadWithOptionalFiles = Omit & Partial>; +export type PrismaUploadWithOptionalFiles = Omit & Partial>; +export function prismaToOutput(upload: PrismaUploadWithOptionalFiles): UploadWithOptionalFiles { + return { + id: upload.id, + userId: upload.userId, + + bucket: upload.bucket, + contentLength: upload.contentLength.toString(), + + files: upload.files?.map((file) => ({ + path: file.path, + mime: file.mime, + contentLength: file.contentLength.toString(), + })), + + createdAt: upload.createdAt.toISOString(), + updatedAt: upload.updatedAt.toISOString(), + completedAt: upload.completedAt?.toISOString() ?? null, + }; +} + +export function prismaToOutputWithFiles(upload: PrismaUpload): Upload { + return { + ...prismaToOutput(upload), + files: upload.files?.map((file) => ({ + path: file.path, + mime: file.mime, + contentLength: file.contentLength.toString(), + })), + }; +} + +export function getKey(uploadId: string, path: string): string { + return `${uploadId}/${path}`; +} diff --git a/tests/basic/backend.yaml b/tests/basic/backend.yaml index 19142bec..9ebd94d5 100644 --- a/tests/basic/backend.yaml +++ b/tests/basic/backend.yaml @@ -13,6 +13,16 @@ modules: registry: local users: registry: local + uploads: + registry: local + config: + maxFilesPerUpload: 16 + maxUploadSize: 100mib + maxMultipartUploadSize: 10gib + s3: + bucketName: opengb-test-bucket + region: us-east-1 + endpoint: https://s3.us-east-1.amazonaws.com auth: registry: local config: