diff --git a/packages/s3-store/index.ts b/packages/s3-store/index.ts index 5282c0fc..fffac8cb 100644 --- a/packages/s3-store/index.ts +++ b/packages/s3-store/index.ts @@ -1,8 +1,7 @@ import os from 'node:os' import fs, {promises as fsProm} from 'node:fs' -import stream, {promises as streamProm} from 'node:stream' +import stream, {PassThrough, promises as streamProm} from 'node:stream' import type {Readable} from 'node:stream' -import http from 'node:http' import AWS, {NoSuchKey, NotFound, S3, S3ClientConfig} from '@aws-sdk/client-s3' import debug from 'debug' @@ -15,7 +14,11 @@ import { TUS_RESUMABLE, KvStore, MemoryKvStore, + Semaphore, + Permit, } from '@tus/server' +import crypto from 'node:crypto' +import path from 'node:path' const log = debug('tus-node-server:stores:s3store') @@ -25,6 +28,7 @@ type Options = { // but may increase it to not exceed the S3 10K parts limit. partSize?: number useTags?: boolean + maxConcurrentPartUploads?: number cache?: KvStore expirationPeriodInMilliseconds?: number // Options to pass to the AWS S3 SDK. @@ -42,6 +46,35 @@ function calcOffsetFromParts(parts?: Array) { return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0 } +function concatenateStreams(streams: Readable[]): Readable { + const passThrough = new PassThrough() + let currentStreamIndex = 0 + + const pipeNextStream = () => { + if (currentStreamIndex < streams.length) { + const stream = streams[currentStreamIndex] + + // Error handling for the current stream + stream.on('error', (err) => { + passThrough.emit('error', err) + passThrough.end() + }) + + stream.pipe(passThrough, {end: false}) + stream.on('end', () => { + currentStreamIndex++ + pipeNextStream() + }) + } else { + passThrough.end() + } + } + + pipeNextStream() + + return passThrough +} + // Implementation (based on https://github.com/tus/tusd/blob/master/s3store/s3store.go) // // Once a new tus upload is initiated, multiple objects in S3 are created: @@ -82,6 +115,7 @@ export class S3Store extends DataStore { private preferredPartSize: number private expirationPeriodInMilliseconds = 0 private useTags = true + private semaphore: Semaphore public maxMultipartParts = 10_000 as const public minPartSize = 5_242_880 as const // 5MiB public maxUploadSize = 5_497_558_138_880 as const // 5TiB @@ -101,8 +135,9 @@ export class S3Store extends DataStore { this.preferredPartSize = partSize || 8 * 1024 * 1024 this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0 this.useTags = options.useTags ?? true - this.client = new S3(restS3ClientConfig) this.cache = options.cache ?? new MemoryKvStore() + this.client = new S3(restS3ClientConfig) + this.semaphore = new Semaphore(options.maxConcurrentPartUploads ?? 30) } protected shouldUseExpirationTags() { @@ -233,6 +268,58 @@ export class S3Store extends DataStore { return data.ETag as string } + private async downloadIncompletePart(id: string) { + const incompletePart = await this.getIncompletePart(id) + + if (!incompletePart) { + return + } + const fileName = crypto.randomBytes(15).toString('base64url').slice(0, 15) + const filePath = path.join(os.tmpdir(), fileName) + + try { + let incompletePartSize = 0 + + const byteCounterTransform = new stream.Transform({ + transform(chunk, _, callback) { + incompletePartSize += chunk.length + callback(null, chunk) + }, + }) + + // write to temporary file + await streamProm.pipeline( + incompletePart, + byteCounterTransform, + fs.createWriteStream(filePath) + ) + + const createReadStream = () => { + const fileReader = fs.createReadStream(filePath) + fileReader.on('end', () => { + fs.unlink(filePath, () => { + // ignore + }) + }) + + fileReader.on('error', (err) => { + fileReader.destroy(err) + fs.unlink(filePath, () => { + // ignore + }) + }) + return fileReader + } + + return {size: incompletePartSize, createReader: createReadStream} + } catch (err) { + fsProm.rm(filePath).catch(() => { + /* ignore */ + }) + throw err + } + } + private async getIncompletePart(id: string): Promise { try { const data = await this.client.getObject({ @@ -271,50 +358,12 @@ export class S3Store extends DataStore { }) } - private async prependIncompletePart( - newChunkPath: string, - previousIncompletePart: Readable - ): Promise { - const tempPath = `${newChunkPath}-prepend` - try { - let incompletePartSize = 0 - - const byteCounterTransform = new stream.Transform({ - transform(chunk, _, callback) { - incompletePartSize += chunk.length - callback(null, chunk) - }, - }) - - // write to temporary file, truncating if needed - await streamProm.pipeline( - previousIncompletePart, - byteCounterTransform, - fs.createWriteStream(tempPath) - ) - // append to temporary file - await streamProm.pipeline( - fs.createReadStream(newChunkPath), - fs.createWriteStream(tempPath, {flags: 'a'}) - ) - // overwrite existing file - await fsProm.rename(tempPath, newChunkPath) - - return incompletePartSize - } catch (err) { - fsProm.rm(tempPath).catch(() => { - /* ignore */ - }) - throw err - } - } - /** * Uploads a stream to s3 using multiple parts */ - private async processUpload( + private async uploadParts( metadata: MetadataValue, - readStream: http.IncomingMessage | fs.ReadStream, + readStream: stream.Readable, currentPartNumber: number, offset: number ): Promise { @@ -322,51 +371,45 @@ export class S3Store extends DataStore { const promises: Promise[] = [] let pendingChunkFilepath: string | null = null let bytesUploaded = 0 - let currentChunkNumber = 0 + let permit: Permit | undefined = undefined const splitterStream = new StreamSplitter({ chunkSize: this.calcOptimalPartSize(size), directory: os.tmpdir(), + asyncEvents: true, }) - .on('chunkStarted', (filepath) => { + .on('beforeChunkStarted', (_, done) => { + this.semaphore + .acquire() + .catch(done) + .then((acquiredPermit) => { + permit = acquiredPermit + done() + }) + }) + .on('chunkStarted', (filepath, done) => { pendingChunkFilepath = filepath + done() }) - .on('chunkFinished', ({path, size: partSize}) => { + .on('chunkFinished', ({path, size: partSize}, done) => { pendingChunkFilepath = null const partNumber = currentPartNumber++ - const chunkNumber = currentChunkNumber++ + const acquiredPermit = permit offset += partSize - const isFirstChunk = chunkNumber === 0 const isFinalPart = size === offset // eslint-disable-next-line no-async-promise-executor const deferred = new Promise(async (resolve, reject) => { try { - let incompletePartSize = 0 // Only the first chunk of each PATCH request can prepend // an incomplete part (last chunk) from the previous request. - if (isFirstChunk) { - // If we received a chunk under the minimum part size in a previous iteration, - // we used a regular S3 upload to save it in the bucket. We try to get the incomplete part here. - - const incompletePart = await this.getIncompletePart(metadata.file.id) - if (incompletePart) { - // We found an incomplete part, prepend it to the chunk on disk we were about to upload, - // and delete the incomplete part from the bucket. This can be done in parallel. - incompletePartSize = await this.prependIncompletePart( - path, - incompletePart - ) - await this.deleteIncompletePart(metadata.file.id) - } - } - const readable = fs.createReadStream(path) readable.on('error', reject) - if (partSize + incompletePartSize >= this.minPartSize || isFinalPart) { + + if (partSize >= this.minPartSize || isFinalPart) { await this.uploadPart(metadata, readable, partNumber) } else { await this.uploadIncompletePart(metadata.file.id, readable) @@ -380,10 +423,16 @@ export class S3Store extends DataStore { fsProm.rm(path).catch(() => { /* ignore */ }) + acquiredPermit?.release() } }) promises.push(deferred) + done() + }) + .on('chunkError', (_, done) => { + permit?.release() + done() }) try { @@ -484,9 +533,11 @@ export class S3Store extends DataStore { else if (size <= this.preferredPartSize * this.maxMultipartParts) { optimalPartSize = this.preferredPartSize // The upload is too big for the preferred size. - // We devide the size with the max amount of parts and round it up. + // We divide the size with the max amount of parts and round it up. + } else if (size % this.maxMultipartParts === 0) { + optimalPartSize = size / this.maxMultipartParts } else { - optimalPartSize = Math.ceil(size / this.maxMultipartParts) + optimalPartSize = size / this.maxMultipartParts + 1 } return optimalPartSize @@ -533,11 +584,7 @@ export class S3Store extends DataStore { /** * Write to the file, starting at the provided offset */ - public async write( - readable: http.IncomingMessage | fs.ReadStream, - id: string, - offset: number - ): Promise { + public async write(src: stream.Readable, id: string, offset: number): Promise { // Metadata request needs to happen first const metadata = await this.getMetadata(id) const parts = await this.retrieveParts(id) @@ -545,14 +592,22 @@ export class S3Store extends DataStore { const partNumber: number = parts.length > 0 ? parts[parts.length - 1].PartNumber! : 0 const nextPartNumber = partNumber + 1 - const bytesUploaded = await this.processUpload( - metadata, - readable, - nextPartNumber, - offset - ) + const incompletePart = await this.downloadIncompletePart(id) + const requestedOffset = offset + + if (incompletePart) { + // once the file is on disk, we delete the incomplete part + await this.deleteIncompletePart(id) + + offset = requestedOffset - incompletePart.size + src = concatenateStreams([incompletePart.createReader(), src]) + } + + const bytesUploaded = await this.uploadParts(metadata, src, nextPartNumber, offset) - const newOffset = offset + bytesUploaded + // The size of the incomplete part should not be counted, because the + // process of the incomplete part should be fully transparent to the user. + const newOffset = requestedOffset + bytesUploaded - (incompletePart?.size ?? 0) if (metadata.file.size === newOffset) { try { diff --git a/packages/s3-store/test.ts b/packages/s3-store/test.ts index 30c39e8a..b52d76d0 100644 --- a/packages/s3-store/test.ts +++ b/packages/s3-store/test.ts @@ -1,5 +1,4 @@ import path from 'node:path' -import fs from 'node:fs/promises' import assert from 'node:assert/strict' import {Readable} from 'node:stream' @@ -39,17 +38,6 @@ describe('S3DataStore', function () { assert.strictEqual(Number.isFinite(store.calcOptimalPartSize(undefined)), true) }) - it('should correctly prepend a buffer to a file', async function () { - const p = path.resolve(fixturesPath, 'foo.txt') - await fs.writeFile(p, 'world!') - await this.datastore.prependIncompletePart( - p, - Readable.from([new TextEncoder().encode('Hello, ')]) - ) - assert.strictEqual(await fs.readFile(p, 'utf8'), 'Hello, world!') - await fs.unlink(p) - }) - it('should store in between chunks under the minimum part size and prepend it to the next call', async function () { const store = this.datastore const size = 1024 diff --git a/packages/server/src/models/Semaphore.ts b/packages/server/src/models/Semaphore.ts new file mode 100644 index 00000000..fdf2f8a0 --- /dev/null +++ b/packages/server/src/models/Semaphore.ts @@ -0,0 +1,63 @@ +// Credits: https://github.com/Shopify/quilt/blob/main/packages/semaphore/src/Semaphore.ts + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type ReleaseCallback = () => Promise + +export class Permit { + private onRelease: ReleaseCallback + private isReleased = false + + constructor(onRelease: ReleaseCallback) { + this.onRelease = onRelease + } + + async release() { + if (!this.isReleased) { + this.isReleased = true + await this.onRelease() + } + } +} + +interface Deferred { + resolve?(permit: Permit): void + promise?: Promise +} + +export class Semaphore { + private availablePermits: number + private deferreds: Deferred[] = [] + + constructor(count: number) { + this.availablePermits = count + } + + acquire(): Promise { + if (this.availablePermits > 0) { + return Promise.resolve(this.createPermit()) + } else { + const deferred: Deferred = {} + deferred.promise = new Promise((resolve) => { + deferred.resolve = resolve + }) + this.deferreds.push(deferred) + return deferred.promise + } + } + + private createPermit(): Permit { + this.availablePermits-- + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return new Permit(async (): Promise => { + this.availablePermits++ + + if (this.deferreds.length > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const deferred = this.deferreds.shift()! + deferred.resolve?.(this.createPermit()) + await deferred.promise + } + }) + } +} diff --git a/packages/server/src/models/StreamSplitter.ts b/packages/server/src/models/StreamSplitter.ts index 97d79e09..faa2598a 100644 --- a/packages/server/src/models/StreamSplitter.ts +++ b/packages/server/src/models/StreamSplitter.ts @@ -3,6 +3,7 @@ import crypto from 'node:crypto' import fs from 'node:fs/promises' import path from 'node:path' import stream from 'node:stream' +import {Semaphore} from './Semaphore' function randomString(size: number) { return crypto.randomBytes(size).toString('base64url').slice(0, size) @@ -11,6 +12,7 @@ function randomString(size: number) { type Options = { chunkSize: number directory: string + asyncEvents?: boolean } type Callback = (error: Error | null) => void @@ -23,8 +25,12 @@ export class StreamSplitter extends stream.Writable { filenameTemplate: string chunkSize: Options['chunkSize'] part: number + asyncEvents?: boolean - constructor({chunkSize, directory}: Options, options?: stream.WritableOptions) { + constructor( + {chunkSize, directory, asyncEvents}: Options, + options?: stream.WritableOptions + ) { super(options) this.chunkSize = chunkSize this.currentChunkPath = null @@ -33,6 +39,8 @@ export class StreamSplitter extends stream.Writable { this.directory = directory this.filenameTemplate = randomString(10) this.part = 0 + this.asyncEvents = asyncEvents + this.on('error', this._handleError.bind(this)) } @@ -87,6 +95,7 @@ export class StreamSplitter extends stream.Writable { } async _handleError() { + await this.emitEvent('chunkError', this.currentChunkPath) // If there was an error, we want to stop allowing to write on disk as we cannot advance further. // At this point the chunk might be incomplete advancing further might cause data loss. // some scenarios where this might happen is if the disk is full or if we abort the stream midway. @@ -106,7 +115,7 @@ export class StreamSplitter extends stream.Writable { await this.fileHandle.close() - this.emit('chunkFinished', { + await this.emitEvent('chunkFinished', { path: this.currentChunkPath, size: this.currentChunkSize, }) @@ -117,13 +126,32 @@ export class StreamSplitter extends stream.Writable { this.part += 1 } + async emitEvent(name: string, payload: T) { + if (this.asyncEvents) { + await new Promise((resolve, reject) => { + const doneCb = (err?: unknown) => { + if (err) { + reject(err) + return + } + resolve() + } + + this.emit(name, ...[payload, doneCb]) + }) + } else { + this.emit(name, payload) + } + } + async _newChunk(): Promise { this.currentChunkPath = path.join( this.directory, `${this.filenameTemplate}-${this.part}` ) + await this.emitEvent('beforeChunkStarted', this.currentChunkPath) const fileHandle = await fs.open(this.currentChunkPath, 'w') - this.emit('chunkStarted', this.currentChunkPath) + await this.emitEvent('chunkStarted', this.currentChunkPath) this.currentChunkSize = 0 this.fileHandle = fileHandle } diff --git a/packages/server/src/models/index.ts b/packages/server/src/models/index.ts index 908f53ca..11b79132 100644 --- a/packages/server/src/models/index.ts +++ b/packages/server/src/models/index.ts @@ -5,3 +5,4 @@ export {Uid} from './Uid' export {Upload} from './Upload' export {Locker, Lock, RequestRelease} from './Locker' export {CancellationContext} from './Context' +export {Semaphore, Permit} from './Semaphore' diff --git a/packages/server/test/Semaphore.test.ts b/packages/server/test/Semaphore.test.ts new file mode 100644 index 00000000..5edceaa1 --- /dev/null +++ b/packages/server/test/Semaphore.test.ts @@ -0,0 +1,153 @@ +// Credits: https://github.com/Shopify/quilt/blob/main/packages/semaphore/src/tests/Semaphore.test.ts + +import {Permit, Semaphore} from '../src' +import assert from 'node:assert' +import sinon from 'sinon' + +describe('Semaphore', () => { + describe('acquire()', () => { + it('resolves with a permit when counter is > 0', async () => { + const semaphore = new Semaphore(1) + + assert.equal((await semaphore.acquire()) instanceof Permit, true) + }) + + it('does not resolve if counter is = 0', async () => { + const semaphore = new Semaphore(1) + + await semaphore.acquire() + + const spy = sinon.spy() + + semaphore + .acquire() + .then(spy) + .catch(() => {}) + + await setTimeout(() => {}, 0) + + assert.equal(spy.callCount, 0) + }) + + it('resolves when previous permit is released before the call', async () => { + const semaphore = new Semaphore(1) + + const permit = await semaphore.acquire() + permit.release() + + assert.equal((await semaphore.acquire()) instanceof Permit, true) + }) + + it('resolves when previous permit is released after the call', async () => { + const semaphore = new Semaphore(1) + + const permit = await semaphore.acquire() + + const spy = sinon.spy() + + semaphore + .acquire() + .then(spy) + .catch(() => {}) + + await setTimeout(() => {}, 0) + + assert.equal(spy.callCount, 0) + + await permit.release() + + assert.equal(spy.callCount > 0, true) + }) + + it('calls resolve in same order as called when previous permits are released', async () => { + const semaphore = new Semaphore(2) + + const permit1 = await semaphore.acquire() + const permit2 = await semaphore.acquire() + + const spy3 = sinon.spy() + const spy4 = sinon.spy() + + semaphore + .acquire() + .then(spy3) + .catch(() => {}) + semaphore + .acquire() + .then(spy4) + .catch(() => {}) + + await setTimeout(() => {}, 0) + + assert.equal(spy3.callCount, 0) + assert.equal(spy4.callCount, 0) + + await permit2.release() + + assert.equal(spy3.callCount > 0, true) + assert.equal(spy4.callCount, 0) + + await permit1.release() + + assert.equal(spy3.callCount > 0, true) + }) + + it('does not allow acquiring more permits than initially allowed', async () => { + const semaphore = new Semaphore(1) + + const promise1 = semaphore.acquire() + const promise2 = semaphore.acquire() + + ;(await promise1).release() + ;(await promise2).release() + + const spy3 = sinon.spy() + const spy4 = sinon.spy() + + semaphore + .acquire() + .then(spy3) + .catch(() => {}) + semaphore + .acquire() + .then(spy4) + .catch(() => {}) + + await setTimeout(() => {}, 0) + + assert.equal(spy3.callCount > 0, true) + assert.equal(spy4.callCount, 0) + }) + }) +}) + +describe('Permit', () => { + describe('release()', () => { + it('has no effect when called a second time', async () => { + const semaphore = new Semaphore(1) + + const permit = await semaphore.acquire() + + const spy2 = sinon.spy() + const spy3 = sinon.spy() + + semaphore + .acquire() + .then(spy2) + .catch(() => {}) + semaphore + .acquire() + .then(spy3) + .catch(() => {}) + + await permit.release() + + assert.equal(spy2.callCount > 0, true) + assert.equal(spy3.callCount, 0) + + await permit.release() + + assert.equal(spy3.callCount, 0) + }) + }) +}) diff --git a/test/s3.e2e.ts b/test/s3.e2e.ts index 22ffb961..60612cb3 100644 --- a/test/s3.e2e.ts +++ b/test/s3.e2e.ts @@ -33,12 +33,16 @@ const createStore = (options: S3Options = {}) => s3ClientConfig: s3Credentials, }) -const createUpload = async (agent: SuperAgentTest, uploadLength: number) => { - const response = await agent - .post(STORE_PATH) - .set('Tus-Resumable', TUS_RESUMABLE) - .set('Upload-Length', uploadLength.toString()) - .expect(201) +const createUpload = async (agent: SuperAgentTest, uploadLength?: number) => { + const req = agent.post(STORE_PATH).set('Tus-Resumable', TUS_RESUMABLE) + + if (uploadLength) { + req.set('Upload-Length', uploadLength.toString()) + } else { + req.set('Upload-Defer-Length', '1') + } + + const response = await req.expect(201) assert(Boolean(response.headers.location), 'location not returned') const uploadId = response.headers.location.split('/').pop() @@ -52,15 +56,20 @@ const patchUpload = async ( agent: SuperAgentTest, uploadId: string, data: Buffer, - offset = 0 + offset = 0, + uploadLength?: number ) => { - const res = await agent + const req = agent .patch(`${STORE_PATH}/${uploadId}`) .set('Tus-Resumable', TUS_RESUMABLE) .set('Upload-Offset', offset.toString()) .set('Content-Type', 'application/offset+octet-stream') - .send(data) - .expect(204) + + if (uploadLength) { + req.set('Upload-Length', uploadLength.toString()) + } + + const res = await req.send(data).expect(204) return {offset: parseInt(res.headers['upload-offset'], 10)} } @@ -282,4 +291,44 @@ describe('S3 Store E2E', () => { .expect(410) }) }) + + describe('Upload', () => { + let server: Server + let listener: http.Server + let agent: SuperAgentTest + let store: S3Store + + before((done) => { + store = createStore({ + expirationPeriodInMilliseconds: expireTime, + partSize: 5_242_880, + }) + server = new Server({ + path: STORE_PATH, + datastore: store, + }) + listener = server.listen() + agent = request.agent(listener) + done() + }) + + after((done) => { + listener.close(done) + }) + + it('can a upload a smaller size file than the minPreferred size using a deferred length', async () => { + const data = allocMB(1) + const {uploadId} = await createUpload(agent) + const {offset} = await patchUpload(agent, uploadId, data) + const {offset: offset2} = await patchUpload( + agent, + uploadId, + new Buffer(0), + offset, + data.length + ) + + assert.equal(offset2, data.length) + }) + }) })