diff --git a/packages/s3-store/README.md b/packages/s3-store/README.md index da206e4e..b0f1afda 100644 --- a/packages/s3-store/README.md +++ b/packages/s3-store/README.md @@ -91,6 +91,22 @@ you need to provide a cache implementation that is shared between all instances See the exported [KV stores][kvstores] from `@tus/server` for more information. +#### `options.maxConcurrentPartUploads` + +This setting determines the maximum number of simultaneous part uploads to an S3 storage service. +The default value is 60. This default is chosen in conjunction with the typical partSize of 8MiB, aiming for an effective transfer rate of 3.84Gbit/s. + +**Considerations:** +The ideal value for `maxConcurrentPartUploads` varies based on your `partSize` and the upload bandwidth to your S3 bucket. A larger partSize means less overall upload bandwidth available for other concurrent uploads. + +- **Lowering the Value**: Reducing `maxConcurrentPartUploads` decreases the number of simultaneous upload requests to S3. This can be beneficial for conserving memory, CPU, and disk I/O resources, especially in environments with limited system resources or where the upload speed it low or the part size is large. + + +- **Increasing the Value**: A higher value potentially enhances the data transfer rate to the server, but at the cost of increased resource usage (memory, CPU, and disk I/O). This can be advantageous when the goal is to maximize throughput, and sufficient system resources are available. + + +- **Bandwidth Considerations**: It's important to note that if your upload bandwidth to S3 is a limiting factor, increasing `maxConcurrentPartUploads` won’t lead to higher throughput. Instead, it will result in additional resource consumption without proportional gains in transfer speed. + ## Extensions The tus protocol supports optional [extensions][]. Below is a table of the supported extensions in `@tus/s3-store`. diff --git a/packages/s3-store/index.ts b/packages/s3-store/index.ts index 44af250c..5a5c9023 100644 --- a/packages/s3-store/index.ts +++ b/packages/s3-store/index.ts @@ -2,7 +2,6 @@ import os from 'node:os' import fs, {promises as fsProm} from 'node:fs' import stream, {promises as streamProm} from 'node:stream' import type {Readable} from 'node:stream' -import http from 'node:http' import AWS, {NoSuchKey, NotFound, S3, S3ClientConfig} from '@aws-sdk/client-s3' import debug from 'debug' @@ -17,6 +16,11 @@ import { MemoryKvStore, } from '@tus/utils' +import {Semaphore, Permit} from '@shopify/semaphore' +import MultiStream from 'multistream' +import crypto from 'node:crypto' +import path from 'node:path' + const log = debug('tus-node-server:stores:s3store') type Options = { @@ -25,6 +29,7 @@ type Options = { // but may increase it to not exceed the S3 10K parts limit. partSize?: number useTags?: boolean + maxConcurrentPartUploads?: number cache?: KvStore expirationPeriodInMilliseconds?: number // Options to pass to the AWS S3 SDK. @@ -82,6 +87,7 @@ export class S3Store extends DataStore { private preferredPartSize: number private expirationPeriodInMilliseconds = 0 private useTags = true + private partUploadSemaphore: Semaphore public maxMultipartParts = 10_000 as const public minPartSize = 5_242_880 as const // 5MiB public maxUploadSize = 5_497_558_138_880 as const // 5TiB @@ -101,8 +107,9 @@ export class S3Store extends DataStore { this.preferredPartSize = partSize || 8 * 1024 * 1024 this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0 this.useTags = options.useTags ?? true - this.client = new S3(restS3ClientConfig) this.cache = options.cache ?? new MemoryKvStore() + this.client = new S3(restS3ClientConfig) + this.partUploadSemaphore = new Semaphore(options.maxConcurrentPartUploads ?? 60) } protected shouldUseExpirationTags() { @@ -233,6 +240,61 @@ export class S3Store extends DataStore { return data.ETag as string } + private async downloadIncompletePart(id: string) { + const incompletePart = await this.getIncompletePart(id) + + if (!incompletePart) { + return + } + const filePath = await this.uniqueTmpFileName('tus-s3-incomplete-part-') + + try { + let incompletePartSize = 0 + + const byteCounterTransform = new stream.Transform({ + transform(chunk, _, callback) { + incompletePartSize += chunk.length + callback(null, chunk) + }, + }) + + // write to temporary file + await streamProm.pipeline( + incompletePart, + byteCounterTransform, + fs.createWriteStream(filePath) + ) + + const createReadStream = (options: {cleanUpOnEnd: boolean}) => { + const fileReader = fs.createReadStream(filePath) + + if (options.cleanUpOnEnd) { + fileReader.on('end', () => { + fs.unlink(filePath, () => { + // ignore + }) + }) + + fileReader.on('error', (err) => { + fileReader.destroy(err) + fs.unlink(filePath, () => { + // ignore + }) + }) + } + + return fileReader + } + + return {size: incompletePartSize, path: filePath, createReader: createReadStream} + } catch (err) { + fsProm.rm(filePath).catch(() => { + /* ignore */ + }) + throw err + } + } + private async getIncompletePart(id: string): Promise { try { const data = await this.client.getObject({ @@ -271,50 +333,12 @@ export class S3Store extends DataStore { }) } - private async prependIncompletePart( - newChunkPath: string, - previousIncompletePart: Readable - ): Promise { - const tempPath = `${newChunkPath}-prepend` - try { - let incompletePartSize = 0 - - const byteCounterTransform = new stream.Transform({ - transform(chunk, _, callback) { - incompletePartSize += chunk.length - callback(null, chunk) - }, - }) - - // write to temporary file, truncating if needed - await streamProm.pipeline( - previousIncompletePart, - byteCounterTransform, - fs.createWriteStream(tempPath) - ) - // append to temporary file - await streamProm.pipeline( - fs.createReadStream(newChunkPath), - fs.createWriteStream(tempPath, {flags: 'a'}) - ) - // overwrite existing file - await fsProm.rename(tempPath, newChunkPath) - - return incompletePartSize - } catch (err) { - fsProm.rm(tempPath).catch(() => { - /* ignore */ - }) - throw err - } - } - /** * Uploads a stream to s3 using multiple parts */ - private async processUpload( + private async uploadParts( metadata: MetadataValue, - readStream: http.IncomingMessage | fs.ReadStream, + readStream: stream.Readable, currentPartNumber: number, offset: number ): Promise { @@ -322,12 +346,15 @@ export class S3Store extends DataStore { const promises: Promise[] = [] let pendingChunkFilepath: string | null = null let bytesUploaded = 0 - let currentChunkNumber = 0 + let permit: Permit | undefined = undefined const splitterStream = new StreamSplitter({ chunkSize: this.calcOptimalPartSize(size), directory: os.tmpdir(), }) + .on('beforeChunkStarted', async () => { + permit = await this.partUploadSemaphore.acquire() + }) .on('chunkStarted', (filepath) => { pendingChunkFilepath = filepath }) @@ -335,38 +362,21 @@ export class S3Store extends DataStore { pendingChunkFilepath = null const partNumber = currentPartNumber++ - const chunkNumber = currentChunkNumber++ + const acquiredPermit = permit offset += partSize - const isFirstChunk = chunkNumber === 0 const isFinalPart = size === offset // eslint-disable-next-line no-async-promise-executor const deferred = new Promise(async (resolve, reject) => { try { - let incompletePartSize = 0 // Only the first chunk of each PATCH request can prepend // an incomplete part (last chunk) from the previous request. - if (isFirstChunk) { - // If we received a chunk under the minimum part size in a previous iteration, - // we used a regular S3 upload to save it in the bucket. We try to get the incomplete part here. - - const incompletePart = await this.getIncompletePart(metadata.file.id) - if (incompletePart) { - // We found an incomplete part, prepend it to the chunk on disk we were about to upload, - // and delete the incomplete part from the bucket. This can be done in parallel. - incompletePartSize = await this.prependIncompletePart( - path, - incompletePart - ) - await this.deleteIncompletePart(metadata.file.id) - } - } - const readable = fs.createReadStream(path) readable.on('error', reject) - if (partSize + incompletePartSize >= this.minPartSize || isFinalPart) { + + if (partSize >= this.minPartSize || isFinalPart) { await this.uploadPart(metadata, readable, partNumber) } else { await this.uploadIncompletePart(metadata.file.id, readable) @@ -380,11 +390,15 @@ export class S3Store extends DataStore { fsProm.rm(path).catch(() => { /* ignore */ }) + acquiredPermit?.release() } }) promises.push(deferred) }) + .on('chunkError', () => { + permit?.release() + }) try { await streamProm.pipeline(readStream, splitterStream) @@ -533,11 +547,7 @@ export class S3Store extends DataStore { /** * Write to the file, starting at the provided offset */ - public async write( - readable: http.IncomingMessage | fs.ReadStream, - id: string, - offset: number - ): Promise { + public async write(src: stream.Readable, id: string, offset: number): Promise { // Metadata request needs to happen first const metadata = await this.getMetadata(id) const parts = await this.retrieveParts(id) @@ -545,14 +555,22 @@ export class S3Store extends DataStore { const partNumber: number = parts.length > 0 ? parts[parts.length - 1].PartNumber! : 0 const nextPartNumber = partNumber + 1 - const bytesUploaded = await this.processUpload( - metadata, - readable, - nextPartNumber, - offset - ) + const incompletePart = await this.downloadIncompletePart(id) + const requestedOffset = offset + + if (incompletePart) { + // once the file is on disk, we delete the incomplete part + await this.deleteIncompletePart(id) + + offset = requestedOffset - incompletePart.size + src = new MultiStream([incompletePart.createReader({cleanUpOnEnd: true}), src]) + } + + const bytesUploaded = await this.uploadParts(metadata, src, nextPartNumber, offset) - const newOffset = offset + bytesUploaded + // The size of the incomplete part should not be counted, because the + // process of the incomplete part should be fully transparent to the user. + const newOffset = requestedOffset + bytesUploaded - (incompletePart?.size ?? 0) if (metadata.file.size === newOffset) { try { @@ -741,4 +759,29 @@ export class S3Store extends DataStore { return deleted } + + private async uniqueTmpFileName(template: string): Promise { + let tries = 0 + const maxTries = 10 + + while (tries < maxTries) { + const fileName = + template + crypto.randomBytes(10).toString('base64url').slice(0, 10) + const filePath = path.join(os.tmpdir(), fileName) + + try { + await fsProm.lstat(filePath) + // If no error, file exists, so try again + tries++ + } catch (e) { + if (e.code === 'ENOENT') { + // File does not exist, return the path + return filePath + } + throw e // For other errors, rethrow + } + } + + throw new Error(`Could not find a unique file name after ${maxTries} tries`) + } } diff --git a/packages/s3-store/package.json b/packages/s3-store/package.json index 59d8dea4..d961ef4c 100644 --- a/packages/s3-store/package.json +++ b/packages/s3-store/package.json @@ -22,12 +22,15 @@ }, "dependencies": { "@aws-sdk/client-s3": "^3.490.0", + "@shopify/semaphore": "^3.0.2", "@tus/utils": "workspace:*", - "debug": "^4.3.4" + "debug": "^4.3.4", + "multistream": "^4.1.0" }, "devDependencies": { "@types/debug": "^4.1.12", "@types/mocha": "^10.0.6", + "@types/multistream": "^4.1.3", "@types/node": "^20.11.5", "eslint": "^8.56.0", "eslint-config-custom": "workspace:*", diff --git a/packages/s3-store/test.ts b/packages/s3-store/test.ts index dd37911b..2101ccb3 100644 --- a/packages/s3-store/test.ts +++ b/packages/s3-store/test.ts @@ -1,5 +1,4 @@ import path from 'node:path' -import fs from 'node:fs/promises' import assert from 'node:assert/strict' import {Readable} from 'node:stream' @@ -39,17 +38,6 @@ describe('S3DataStore', function () { assert.strictEqual(Number.isFinite(store.calcOptimalPartSize(undefined)), true) }) - it('should correctly prepend a buffer to a file', async function () { - const p = path.resolve(fixturesPath, 'foo.txt') - await fs.writeFile(p, 'world!') - await this.datastore.prependIncompletePart( - p, - Readable.from([new TextEncoder().encode('Hello, ')]) - ) - assert.strictEqual(await fs.readFile(p, 'utf8'), 'Hello, world!') - await fs.unlink(p) - }) - it('should store in between chunks under the minimum part size and prepend it to the next call', async function () { const store = this.datastore const size = 1024 diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index c6f8a1d9..9cee0519 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -149,9 +149,11 @@ export class BaseHandler extends EventEmitter { reject(err.name === 'AbortError' ? ERRORS.ABORTED : err) }) - req.on('error', (err) => { + req.on('error', () => { if (!proxy.closed) { - proxy.destroy(err) + // we end the stream gracefully here so that we can upload the remaining bytes to the store + // as an incompletePart + proxy.end() } }) diff --git a/packages/utils/src/models/StreamSplitter.ts b/packages/utils/src/models/StreamSplitter.ts index 97d79e09..2afbe3f9 100644 --- a/packages/utils/src/models/StreamSplitter.ts +++ b/packages/utils/src/models/StreamSplitter.ts @@ -33,6 +33,7 @@ export class StreamSplitter extends stream.Writable { this.directory = directory this.filenameTemplate = randomString(10) this.part = 0 + this.on('error', this._handleError.bind(this)) } @@ -87,6 +88,7 @@ export class StreamSplitter extends stream.Writable { } async _handleError() { + await this.emitEvent('chunkError', this.currentChunkPath) // If there was an error, we want to stop allowing to write on disk as we cannot advance further. // At this point the chunk might be incomplete advancing further might cause data loss. // some scenarios where this might happen is if the disk is full or if we abort the stream midway. @@ -106,7 +108,7 @@ export class StreamSplitter extends stream.Writable { await this.fileHandle.close() - this.emit('chunkFinished', { + await this.emitEvent('chunkFinished', { path: this.currentChunkPath, size: this.currentChunkSize, }) @@ -117,13 +119,23 @@ export class StreamSplitter extends stream.Writable { this.part += 1 } + async emitEvent(name: string, payload: T) { + const listeners = this.listeners(name) + for (const listener of listeners) { + await listener(payload) + } + } + async _newChunk(): Promise { - this.currentChunkPath = path.join( + const currentChunkPath = path.join( this.directory, `${this.filenameTemplate}-${this.part}` ) + await this.emitEvent('beforeChunkStarted', currentChunkPath) + this.currentChunkPath = currentChunkPath + const fileHandle = await fs.open(this.currentChunkPath, 'w') - this.emit('chunkStarted', this.currentChunkPath) + await this.emitEvent('chunkStarted', this.currentChunkPath) this.currentChunkSize = 0 this.fileHandle = fileHandle } diff --git a/test/s3.e2e.ts b/test/s3.e2e.ts index fea2697e..802d723d 100644 --- a/test/s3.e2e.ts +++ b/test/s3.e2e.ts @@ -33,12 +33,16 @@ const createStore = (options: S3Options = {}) => s3ClientConfig: s3Credentials, }) -const createUpload = async (agent: SuperAgentTest, uploadLength: number) => { - const response = await agent - .post(STORE_PATH) - .set('Tus-Resumable', TUS_RESUMABLE) - .set('Upload-Length', uploadLength.toString()) - .expect(201) +const createUpload = async (agent: SuperAgentTest, uploadLength?: number) => { + const req = agent.post(STORE_PATH).set('Tus-Resumable', TUS_RESUMABLE) + + if (uploadLength) { + req.set('Upload-Length', uploadLength.toString()) + } else { + req.set('Upload-Defer-Length', '1') + } + + const response = await req.expect(201) assert(Boolean(response.headers.location), 'location not returned') const uploadId = response.headers.location.split('/').pop() @@ -52,15 +56,20 @@ const patchUpload = async ( agent: SuperAgentTest, uploadId: string, data: Buffer, - offset = 0 + offset = 0, + uploadLength?: number ) => { - const res = await agent + const req = agent .patch(`${STORE_PATH}/${uploadId}`) .set('Tus-Resumable', TUS_RESUMABLE) .set('Upload-Offset', offset.toString()) .set('Content-Type', 'application/offset+octet-stream') - .send(data) - .expect(204) + + if (uploadLength) { + req.set('Upload-Length', uploadLength.toString()) + } + + const res = await req.send(data).expect(204) return {offset: parseInt(res.headers['upload-offset'], 10)} } @@ -283,4 +292,50 @@ describe('S3 Store E2E', () => { .expect(410) }) }) + + describe('Upload', () => { + let server: Server + let listener: http.Server + let agent: SuperAgentTest + let store: S3Store + + before((done) => { + store = createStore({ + partSize: 5_242_880, + }) + server = new Server({ + path: STORE_PATH, + datastore: store, + }) + listener = server.listen() + agent = request.agent(listener) + done() + }) + + after((done) => { + listener.close(done) + }) + + it('can a upload a smaller file than the minPreferred size using a deferred length', async () => { + const data = allocMB(1) + const {uploadId} = await createUpload(agent) + const {offset} = await patchUpload(agent, uploadId, data) + const {offset: offset2} = await patchUpload( + agent, + uploadId, + new Buffer(0), + offset, + data.length + ) + + assert.equal(offset2, data.length) + + const head = await s3Client.headObject({ + Bucket: s3Credentials.bucket, + Key: uploadId, + }) + + assert.equal(head.$metadata.httpStatusCode, 200) + }) + }) }) diff --git a/yarn.lock b/yarn.lock index e4d1dc1d..5eb9e47d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1206,6 +1206,13 @@ __metadata: languageName: node linkType: hard +"@shopify/semaphore@npm:^3.0.2": + version: 3.0.2 + resolution: "@shopify/semaphore@npm:3.0.2" + checksum: fcee7f95830539f0fb4a085cd8c3f6b39c5fa99aa8067c4718aa088a4fb1d6096d80635e97f146651da5acc00a2d72ce977b9c54cd5ebee5cb1aec8af9dc6181 + languageName: node + linkType: hard + "@sinonjs/commons@npm:^2.0.0": version: 2.0.0 resolution: "@sinonjs/commons@npm:2.0.0" @@ -2012,14 +2019,17 @@ __metadata: resolution: "@tus/s3-store@workspace:packages/s3-store" dependencies: "@aws-sdk/client-s3": "npm:^3.490.0" + "@shopify/semaphore": "npm:^3.0.2" "@tus/utils": "workspace:*" "@types/debug": "npm:^4.1.12" "@types/mocha": "npm:^10.0.6" + "@types/multistream": "npm:^4.1.3" "@types/node": "npm:^20.11.5" debug: "npm:^4.3.4" eslint: "npm:^8.56.0" eslint-config-custom: "workspace:*" mocha: "npm:^10.2.0" + multistream: "npm:^4.1.0" should: "npm:^13.2.3" typescript: "npm:^5.3.3" languageName: unknown @@ -2210,6 +2220,15 @@ __metadata: languageName: node linkType: hard +"@types/multistream@npm:^4.1.3": + version: 4.1.3 + resolution: "@types/multistream@npm:4.1.3" + dependencies: + "@types/node": "npm:*" + checksum: 9b91fbc513a030e45491c6f79e6165d4c7ca56360bc7ab355feb3a57b1fd3c5eda9096b56e78fcdce7a2e4526da8ff59dbc6c1bd52844258e31677e5ca3208fc + languageName: node + linkType: hard + "@types/node@npm:*": version: 20.4.2 resolution: "@types/node@npm:20.4.2" @@ -5626,6 +5645,16 @@ __metadata: languageName: node linkType: hard +"multistream@npm:^4.1.0": + version: 4.1.0 + resolution: "multistream@npm:4.1.0" + dependencies: + once: "npm:^1.4.0" + readable-stream: "npm:^3.6.0" + checksum: 305c49a1aadcb7f63f64d8ca2bb6e7852e5f7dba94c7329e9a72ce53cd0046686b71668dc1adbf123f17d2dd107765fc946e64c36a26b15c470a3146ea3bc923 + languageName: node + linkType: hard + "nanoid@npm:3.3.3": version: 3.3.3 resolution: "nanoid@npm:3.3.3"