From a4edaf0b9ffd51cd1b71773087207ae45da7fc21 Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 10 Nov 2023 13:40:12 +0000 Subject: [PATCH 1/8] feat: distributed locks --- packages/s3-store/index.ts | 277 +++++++++++------- packages/server/src/constants.ts | 8 + packages/server/src/handlers/BaseHandler.ts | 29 ++ packages/server/src/handlers/DeleteHandler.ts | 5 +- packages/server/src/handlers/GetHandler.ts | 1 + packages/server/src/handlers/HeadHandler.ts | 4 +- packages/server/src/handlers/PatchHandler.ts | 13 +- packages/server/src/handlers/PostHandler.ts | 11 +- packages/server/src/models/DataStore.ts | 21 +- packages/server/src/models/Locker.ts | 101 +++++++ packages/server/src/models/index.ts | 1 + packages/server/src/server.ts | 14 +- packages/server/src/types.ts | 4 +- packages/server/test/Locker.test.ts | 77 +++++ 14 files changed, 451 insertions(+), 115 deletions(-) create mode 100644 packages/server/src/models/Locker.ts create mode 100644 packages/server/test/Locker.test.ts diff --git a/packages/s3-store/index.ts b/packages/s3-store/index.ts index e81c5dd7..37795670 100644 --- a/packages/s3-store/index.ts +++ b/packages/s3-store/index.ts @@ -9,6 +9,7 @@ import debug from 'debug' import {DataStore, StreamSplitter, Upload} from '@tus/server' import {ERRORS, TUS_RESUMABLE} from '@tus/server' +import {DataStoreDefaultOptions} from '@tus/server/dist/models/DataStore' const log = debug('tus-node-server:stores:s3store') @@ -31,6 +32,11 @@ type MetadataValue = { 'upload-id': string 'tus-version': string } + +interface WithAbortSignal { + signal?: AbortSignal | undefined +} + // Implementation (based on https://github.com/tus/tusd/blob/master/s3store/s3store.go) // // Once a new tus upload is initiated, multiple objects in S3 are created: @@ -94,17 +100,24 @@ export class S3Store extends DataStore { * on the S3 object's `Metadata` field, so that only a `headObject` * is necessary to retrieve the data. */ - private async saveMetadata(upload: Upload, uploadId: string) { + private async saveMetadata( + upload: Upload, + uploadId: string, + options?: WithAbortSignal + ) { log(`[${upload.id}] saving metadata`) - await this.client.putObject({ - Bucket: this.bucket, - Key: `${upload.id}.info`, - Body: JSON.stringify(upload), - Metadata: { - 'upload-id': uploadId, - 'tus-version': TUS_RESUMABLE, + await this.client.putObject( + { + Bucket: this.bucket, + Key: `${upload.id}.info`, + Body: JSON.stringify(upload), + Metadata: { + 'upload-id': uploadId, + 'tus-version': TUS_RESUMABLE, + }, }, - }) + {abortSignal: options?.signal} + ) log(`[${upload.id}] metadata file saved`) } @@ -113,16 +126,23 @@ export class S3Store extends DataStore { * There's a small and simple caching mechanism to avoid multiple * HTTP calls to S3. */ - private async getMetadata(id: string): Promise { + private async getMetadata( + id: string, + options?: WithAbortSignal + ): Promise { const cached = this.cache.get(id) if (cached?.file) { return cached } - const {Metadata, Body} = await this.client.getObject({ - Bucket: this.bucket, - Key: `${id}.info`, - }) + const {Metadata, Body} = await this.client.getObject( + { + Bucket: this.bucket, + Key: `${id}.info`, + }, + {abortSignal: options?.signal} + ) + const file = JSON.parse((await Body?.transformToString()) as string) this.cache.set(id, { 'tus-version': Metadata?.['tus-version'] as string, @@ -152,38 +172,52 @@ export class S3Store extends DataStore { private async uploadPart( metadata: MetadataValue, readStream: fs.ReadStream | Readable, - partNumber: number + partNumber: number, + options?: WithAbortSignal ): Promise { - const data = await this.client.uploadPart({ - Bucket: this.bucket, - Key: metadata.file.id, - UploadId: metadata['upload-id'], - PartNumber: partNumber, - Body: readStream, - }) + const data = await this.client.uploadPart( + { + Bucket: this.bucket, + Key: metadata.file.id, + UploadId: metadata['upload-id'], + PartNumber: partNumber, + Body: readStream, + }, + {abortSignal: options?.signal} + ) log(`[${metadata.file.id}] finished uploading part #${partNumber}`) return data.ETag as string } private async uploadIncompletePart( id: string, - readStream: fs.ReadStream | Readable + readStream: fs.ReadStream | Readable, + options?: WithAbortSignal ): Promise { - const data = await this.client.putObject({ - Bucket: this.bucket, - Key: this.partKey(id, true), - Body: readStream, - }) + const data = await this.client.putObject( + { + Bucket: this.bucket, + Key: this.partKey(id, true), + Body: readStream, + }, + {abortSignal: options?.signal} + ) log(`[${id}] finished uploading incomplete part`) return data.ETag as string } - private async getIncompletePart(id: string): Promise { + private async getIncompletePart( + id: string, + options?: WithAbortSignal + ): Promise { try { - const data = await this.client.getObject({ - Bucket: this.bucket, - Key: this.partKey(id, true), - }) + const data = await this.client.getObject( + { + Bucket: this.bucket, + Key: this.partKey(id, true), + }, + {abortSignal: options?.signal} + ) return data.Body as Readable } catch (error) { if (error instanceof NoSuchKey) { @@ -194,12 +228,18 @@ export class S3Store extends DataStore { } } - private async getIncompletePartSize(id: string): Promise { + private async getIncompletePartSize( + id: string, + options?: WithAbortSignal + ): Promise { try { - const data = await this.client.headObject({ - Bucket: this.bucket, - Key: this.partKey(id, true), - }) + const data = await this.client.headObject( + { + Bucket: this.bucket, + Key: this.partKey(id, true), + }, + {abortSignal: options?.signal} + ) return data.ContentLength } catch (error) { if (error instanceof NotFound) { @@ -209,16 +249,23 @@ export class S3Store extends DataStore { } } - private async deleteIncompletePart(id: string): Promise { - await this.client.deleteObject({ - Bucket: this.bucket, - Key: this.partKey(id, true), - }) + private async deleteIncompletePart( + id: string, + options?: WithAbortSignal + ): Promise { + await this.client.deleteObject( + { + Bucket: this.bucket, + Key: this.partKey(id, true), + }, + {abortSignal: options?.signal} + ) } private async prependIncompletePart( newChunkPath: string, - previousIncompletePart: Readable + previousIncompletePart: Readable, + options?: WithAbortSignal ): Promise { const tempPath = `${newChunkPath}-prepend` try { @@ -235,12 +282,14 @@ export class S3Store extends DataStore { await streamProm.pipeline( previousIncompletePart, byteCounterTransform, - fs.createWriteStream(tempPath) + fs.createWriteStream(tempPath), + {signal: options?.signal} ) // append to temporary file await streamProm.pipeline( fs.createReadStream(newChunkPath), - fs.createWriteStream(tempPath, {flags: 'a'}) + fs.createWriteStream(tempPath, {flags: 'a'}), + {signal: options?.signal} ) // overwrite existing file await fsProm.rename(tempPath, newChunkPath) @@ -261,7 +310,8 @@ export class S3Store extends DataStore { metadata: MetadataValue, readStream: http.IncomingMessage | fs.ReadStream, currentPartNumber: number, - offset: number + offset: number, + options?: WithAbortSignal ): Promise { const size = metadata.file.size const promises: Promise[] = [] @@ -297,24 +347,28 @@ export class S3Store extends DataStore { // If we received a chunk under the minimum part size in a previous iteration, // we used a regular S3 upload to save it in the bucket. We try to get the incomplete part here. - const incompletePart = await this.getIncompletePart(metadata.file.id) + const incompletePart = await this.getIncompletePart( + metadata.file.id, + options + ) if (incompletePart) { // We found an incomplete part, prepend it to the chunk on disk we were about to upload, // and delete the incomplete part from the bucket. This can be done in parallel. incompletePartSize = await this.prependIncompletePart( path, - incompletePart + incompletePart, + options ) - await this.deleteIncompletePart(metadata.file.id) + await this.deleteIncompletePart(metadata.file.id, options) } } const readable = fs.createReadStream(path) readable.on('error', reject) if (partSize + incompletePartSize >= this.minPartSize || isFinalPart) { - await this.uploadPart(metadata, readable, partNumber) + await this.uploadPart(metadata, readable, partNumber, options) } else { - await this.uploadIncompletePart(metadata.file.id, readable) + await this.uploadIncompletePart(metadata.file.id, readable, options) } bytesUploaded += partSize @@ -332,7 +386,7 @@ export class S3Store extends DataStore { }) try { - await streamProm.pipeline(readStream, splitterStream) + await streamProm.pipeline(readStream, splitterStream, {signal: options?.signal}) } catch (error) { if (pendingChunkFilepath !== null) { try { @@ -354,20 +408,27 @@ export class S3Store extends DataStore { * Completes a multipart upload on S3. * This is where S3 concatenates all the uploaded parts. */ - private async finishMultipartUpload(metadata: MetadataValue, parts: Array) { - const response = await this.client.completeMultipartUpload({ - Bucket: this.bucket, - Key: metadata.file.id, - UploadId: metadata['upload-id'], - MultipartUpload: { - Parts: parts.map((part) => { - return { - ETag: part.ETag, - PartNumber: part.PartNumber, - } - }), + private async finishMultipartUpload( + metadata: MetadataValue, + parts: Array, + options?: WithAbortSignal + ) { + const response = await this.client.completeMultipartUpload( + { + Bucket: this.bucket, + Key: metadata.file.id, + UploadId: metadata['upload-id'], + MultipartUpload: { + Parts: parts.map((part) => { + return { + ETag: part.ETag, + PartNumber: part.PartNumber, + } + }), + }, }, - }) + {abortSignal: options?.signal} + ) return response.Location } @@ -377,7 +438,8 @@ export class S3Store extends DataStore { */ private async retrieveParts( id: string, - partNumberMarker?: string + partNumberMarker?: string, + options?: WithAbortSignal ): Promise> { const params: AWS.ListPartsCommandInput = { Bucket: this.bucket, @@ -386,12 +448,14 @@ export class S3Store extends DataStore { PartNumberMarker: partNumberMarker, } - const data = await this.client.listParts(params) + const data = await this.client.listParts(params, { + abortSignal: options?.signal, + }) let parts = data.Parts ?? [] if (data.IsTruncated) { - const rest = await this.retrieveParts(id, data.NextPartNumberMarker) + const rest = await this.retrieveParts(id, data.NextPartNumberMarker, options) parts = [...parts, ...rest] } @@ -440,7 +504,7 @@ export class S3Store extends DataStore { * Also, a `${file_id}.info` file is created which holds some information * about the upload itself like: `upload-id`, `upload-length`, etc. */ - public async create(upload: Upload) { + public async create(upload: Upload, options?: DataStoreDefaultOptions) { log(`[${upload.id}] initializing multipart upload`) const request: AWS.CreateMultipartUploadCommandInput = { Bucket: this.bucket, @@ -453,17 +517,20 @@ export class S3Store extends DataStore { } const res = await this.client.createMultipartUpload(request) - await this.saveMetadata(upload, res.UploadId as string) + await this.saveMetadata(upload, res.UploadId as string, {signal: options?.signal}) log(`[${upload.id}] multipart upload created (${res.UploadId})`) return upload } - async read(id: string) { - const data = await this.client.getObject({ - Bucket: this.bucket, - Key: id, - }) + async read(id: string, options?: WithAbortSignal) { + const data = await this.client.getObject( + { + Bucket: this.bucket, + Key: id, + }, + {abortSignal: options?.signal} + ) return data.Body as Readable } @@ -473,7 +540,8 @@ export class S3Store extends DataStore { public async write( readable: http.IncomingMessage | fs.ReadStream, id: string, - offset: number + offset: number, + options?: DataStoreDefaultOptions ): Promise { // Metadata request needs to happen first const metadata = await this.getMetadata(id) @@ -486,15 +554,16 @@ export class S3Store extends DataStore { metadata, readable, nextPartNumber, - offset + offset, + options ) const newOffset = offset + bytesUploaded if (metadata.file.size === newOffset) { try { - const parts = await this.retrieveParts(id) - await this.finishMultipartUpload(metadata, parts) + const parts = await this.retrieveParts(id, undefined, options) + await this.finishMultipartUpload(metadata, parts, options) this.clearCache(id) } catch (error) { log(`[${id}] failed to finish upload`, error) @@ -505,10 +574,10 @@ export class S3Store extends DataStore { return newOffset } - public async getUpload(id: string): Promise { + public async getUpload(id: string, options?: DataStoreDefaultOptions): Promise { let metadata: MetadataValue try { - metadata = await this.getMetadata(id) + metadata = await this.getMetadata(id, options) } catch (error) { log('getUpload: No file found.', error) throw ERRORS.FILE_NOT_FOUND @@ -517,7 +586,7 @@ export class S3Store extends DataStore { let offset = 0 try { - const parts = await this.retrieveParts(id) + const parts = await this.retrieveParts(id, undefined, options) offset = calcOffsetFromParts(parts) } catch (error) { // Check if the error is caused by the upload not being found. This happens @@ -540,7 +609,7 @@ export class S3Store extends DataStore { throw error } - const incompletePartSize = await this.getIncompletePartSize(id) + const incompletePartSize = await this.getIncompletePartSize(id, options) return new Upload({ id, @@ -550,26 +619,33 @@ export class S3Store extends DataStore { }) } - public async declareUploadLength(file_id: string, upload_length: number) { - const {file, 'upload-id': uploadId} = await this.getMetadata(file_id) + public async declareUploadLength( + file_id: string, + upload_length: number, + options?: DataStoreDefaultOptions + ) { + const {file, 'upload-id': uploadId} = await this.getMetadata(file_id, options) if (!file) { throw ERRORS.FILE_NOT_FOUND } file.size = upload_length - this.saveMetadata(file, uploadId) + await this.saveMetadata(file, uploadId, options) } - public async remove(id: string): Promise { + public async remove(id: string, options?: DataStoreDefaultOptions): Promise { try { - const {'upload-id': uploadId} = await this.getMetadata(id) + const {'upload-id': uploadId} = await this.getMetadata(id, options) if (uploadId) { - await this.client.abortMultipartUpload({ - Bucket: this.bucket, - Key: id, - UploadId: uploadId, - }) + await this.client.abortMultipartUpload( + { + Bucket: this.bucket, + Key: id, + UploadId: uploadId, + }, + {abortSignal: options?.signal} + ) } } catch (error) { if (error?.code && ['NotFound', 'NoSuchKey', 'NoSuchUpload'].includes(error.Code)) { @@ -579,12 +655,15 @@ export class S3Store extends DataStore { throw error } - await this.client.deleteObjects({ - Bucket: this.bucket, - Delete: { - Objects: [{Key: id}, {Key: `${id}.info`}], + await this.client.deleteObjects( + { + Bucket: this.bucket, + Delete: { + Objects: [{Key: id}, {Key: `${id}.info`}], + }, }, - }) + {abortSignal: options?.signal} + ) this.clearCache(id) } diff --git a/packages/server/src/constants.ts b/packages/server/src/constants.ts index 1048b489..3d1ccf33 100644 --- a/packages/server/src/constants.ts +++ b/packages/server/src/constants.ts @@ -33,6 +33,14 @@ export const ERRORS = { status_code: 403, body: 'Upload-Offset header required\n', }, + ABORTED: { + status_code: 409, + body: 'Request aborted due to lock acquired', + }, + ERR_LOCK_TIMEOUT: { + status_code: 500, + body: 'failed to acquire lock before timeout', + }, INVALID_CONTENT_TYPE: { status_code: 403, body: 'Content-Type header required\n', diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index e04ce0ac..0fb6e06f 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -3,6 +3,7 @@ import EventEmitter from 'node:events' import type {ServerOptions} from '../types' import type {DataStore} from '../models' import type http from 'node:http' +import {ERRORS} from '../constants' const reExtractFileID = /([^/]+)\/?$/ const reForwardedHost = /host="?([^";]+)/ @@ -27,6 +28,7 @@ export class BaseHandler extends EventEmitter { // @ts-expect-error not explicitly typed but possible headers['Content-Length'] = Buffer.byteLength(body, 'utf8') } + res.writeHead(status, headers) res.write(body) return res.end() @@ -78,4 +80,31 @@ export class BaseHandler extends EventEmitter { return decodeURIComponent(match[1]) } + + getLocker(req: http.IncomingMessage) { + if (typeof this.options.locker === 'function') { + return this.options.locker(req) + } + return this.options.locker + } + + async lock( + req: http.IncomingMessage, + id: string, + fn: (signal: AbortSignal) => Promise + ) { + const abortController = new AbortController() + const locker = this.getLocker(req) + await locker?.lock(id, () => { + if (!abortController.signal.aborted) { + abortController.abort(ERRORS.ABORTED) + } + }) + + try { + return await fn(abortController.signal) + } finally { + await locker?.unlock(id) + } + } } diff --git a/packages/server/src/handlers/DeleteHandler.ts b/packages/server/src/handlers/DeleteHandler.ts index 02afae52..14364f85 100644 --- a/packages/server/src/handlers/DeleteHandler.ts +++ b/packages/server/src/handlers/DeleteHandler.ts @@ -10,7 +10,10 @@ export class DeleteHandler extends BaseHandler { throw ERRORS.FILE_NOT_FOUND } - await this.store.remove(id) + await this.lock(req, id, (signal) => { + return this.store.remove(id, {signal}) + }) + const writtenRes = this.write(res, 204, {}) this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id) return writtenRes diff --git a/packages/server/src/handlers/GetHandler.ts b/packages/server/src/handlers/GetHandler.ts index f503322c..42b446ca 100644 --- a/packages/server/src/handlers/GetHandler.ts +++ b/packages/server/src/handlers/GetHandler.ts @@ -36,6 +36,7 @@ export class GetHandler extends BaseHandler { } const stats = await this.store.getUpload(id) + if (!stats || stats.offset !== stats.size) { throw ERRORS.FILE_NOT_FOUND } diff --git a/packages/server/src/handlers/HeadHandler.ts b/packages/server/src/handlers/HeadHandler.ts index 8ca97389..f65b5946 100644 --- a/packages/server/src/handlers/HeadHandler.ts +++ b/packages/server/src/handlers/HeadHandler.ts @@ -12,7 +12,9 @@ export class HeadHandler extends BaseHandler { throw ERRORS.FILE_NOT_FOUND } - const file = await this.store.getUpload(id) + const file = await this.lock(req, id, (signal) => { + return this.store.getUpload(id, {signal}) + }) // If a Client does attempt to resume an upload which has since // been removed by the Server, the Server SHOULD respond with the diff --git a/packages/server/src/handlers/PatchHandler.ts b/packages/server/src/handlers/PatchHandler.ts index c01624bb..fa8ef007 100644 --- a/packages/server/src/handlers/PatchHandler.ts +++ b/packages/server/src/handlers/PatchHandler.ts @@ -30,7 +30,9 @@ export class PatchHandler extends BaseHandler { throw ERRORS.INVALID_CONTENT_TYPE } - const upload = await this.store.getUpload(id) + const upload = await this.lock(req, id, (signal) => { + return this.store.getUpload(id, {signal: signal}) + }) // If a Client does attempt to resume an upload which has since // been removed by the Server, the Server SHOULD respond with the @@ -73,11 +75,16 @@ export class PatchHandler extends BaseHandler { throw ERRORS.INVALID_LENGTH } - await this.store.declareUploadLength(id, size) + await this.lock(req, id, (signal) => { + return this.store.declareUploadLength(id, size, {signal: signal}) + }) upload.size = size } - const newOffset = await this.store.write(req, id, offset) + const newOffset = await this.lock(req, id, (signal) => { + return this.store.write(req, id, offset, {signal: signal}) + }) + upload.offset = newOffset this.emit(EVENTS.POST_RECEIVE, req, res, upload) if (newOffset === upload.size && this.options.onUploadFinish) { diff --git a/packages/server/src/handlers/PostHandler.ts b/packages/server/src/handlers/PostHandler.ts index 49041c09..56a8e012 100644 --- a/packages/server/src/handlers/PostHandler.ts +++ b/packages/server/src/handlers/PostHandler.ts @@ -89,7 +89,6 @@ export class PostHandler extends BaseHandler { this.emit(EVENTS.POST_CREATE, req, res, upload, url) - let newOffset let isFinal = upload.size === 0 && !upload.sizeIsDeferred const headers: { 'Upload-Offset'?: string @@ -98,7 +97,10 @@ export class PostHandler extends BaseHandler { // The request MIGHT include a Content-Type header when using creation-with-upload extension if (validateHeader('content-type', req.headers['content-type'])) { - newOffset = await this.store.write(req, upload.id, 0) + const newOffset = await this.lock(req, id, async (signal) => { + return this.store.write(req, upload.id, 0, {signal: signal}) + }) + headers['Upload-Offset'] = newOffset.toString() isFinal = newOffset === Number.parseInt(upload_length as string, 10) upload.offset = newOffset @@ -119,7 +121,10 @@ export class PostHandler extends BaseHandler { this.store.getExpiration() > 0 && upload.creation_date ) { - const created = await this.store.getUpload(upload.id) + const created = await this.lock(req, id, async (signal) => { + return this.store.getUpload(upload.id, {signal: signal}) + }) + if (created.offset !== Number.parseInt(upload_length as string, 10)) { const creation = new Date(upload.creation_date) // Value MUST be in RFC 7231 datetime format diff --git a/packages/server/src/models/DataStore.ts b/packages/server/src/models/DataStore.ts index c33c51ca..92c61fd7 100644 --- a/packages/server/src/models/DataStore.ts +++ b/packages/server/src/models/DataStore.ts @@ -6,6 +6,10 @@ import {Upload} from './Upload' import type stream from 'node:stream' import type http from 'node:http' +export interface DataStoreDefaultOptions { + signal?: AbortSignal +} + export class DataStore extends EventEmitter { extensions: string[] = [] @@ -19,7 +23,7 @@ export class DataStore extends EventEmitter { * * http://tus.io/protocols/resumable-upload.html#creation */ - async create(file: Upload) { + async create(file: Upload, options?: DataStoreDefaultOptions) { return file } @@ -27,7 +31,7 @@ export class DataStore extends EventEmitter { * Called in DELETE requests. This method just deletes the file from the store. * http://tus.io/protocols/resumable-upload.html#termination */ - async remove(id: string) {} + async remove(id: string, options?: DataStoreDefaultOptions) {} /** * Called in PATCH requests. This method should write data @@ -39,7 +43,8 @@ export class DataStore extends EventEmitter { async write( stream: http.IncomingMessage | stream.Readable, id: string, - offset: number + offset: number, + options?: DataStoreDefaultOptions ) { return 0 } @@ -49,19 +54,23 @@ export class DataStore extends EventEmitter { * writen to the DataStore, for the client to know where to resume * the upload. */ - async getUpload(id: string): Promise { + async getUpload(id: string, options?: DataStoreDefaultOptions): Promise { return new Upload({id, size: 0, offset: 0}) } /** * Called in PATCH requests when upload length is known after being defered. */ - async declareUploadLength(id: string, upload_length: number) {} + async declareUploadLength( + id: string, + upload_length: number, + options?: DataStoreDefaultOptions + ) {} /** * Returns number of expired uploads that were deleted. */ - async deleteExpired(): Promise { + async deleteExpired(options?: DataStoreDefaultOptions): Promise { return 0 } diff --git a/packages/server/src/models/Locker.ts b/packages/server/src/models/Locker.ts new file mode 100644 index 00000000..e1ca20ad --- /dev/null +++ b/packages/server/src/models/Locker.ts @@ -0,0 +1,101 @@ +import {ERRORS} from '../constants' + +export type RequestRelease = () => Promise | void + +export interface Locker { + lock(id: string, cancelReq: RequestRelease): Promise + unlock(id: string): Promise +} + +export interface MemoryLockerOptions { + acquireLockTimeout: number +} + +class Lock { + public locked = false + public requestRelease?: RequestRelease +} + +export class MemoryLocker implements Locker { + private locks = new Map() + protected timeout: number + + constructor(options?: MemoryLockerOptions) { + this.timeout = options?.acquireLockTimeout ?? 1000 * 30 + } + + private getLock(id: string): Lock { + let lock = this.locks.get(id) + if (lock === undefined) { + lock = new Lock() + this.locks.set(id, lock) + } + return lock + } + + async lock(id: string, requestRelease: RequestRelease): Promise { + const abortController = new AbortController() + const lock = await Promise.race([ + this.waitTimeout(abortController.signal), + this.acquireLock(id, abortController.signal), + ]) + + abortController.abort() + + if (!lock) { + throw ERRORS.ERR_LOCK_TIMEOUT + } + lock.requestRelease = requestRelease + } + + protected async acquireLock(id: string, signal: AbortSignal): Promise { + if (signal.aborted) { + return + } + + const lock = this.getLock(id) + + if (!lock.locked) { + lock.locked = true + return lock + } + + await lock.requestRelease?.() + + const potentialNewLock = this.getLock(id) + if (!potentialNewLock.locked) { + potentialNewLock.locked = true + return potentialNewLock + } + + return await new Promise((resolve, reject) => { + setImmediate(() => { + this.acquireLock(id, signal).then(resolve).catch(reject) + }) + }) + } + + async unlock(id: string): Promise { + const lock = this.getLock(id) + if (!lock.locked) { + throw new Error('Releasing an unlocked lock!') + } + + this.locks.delete(id) + } + + protected waitTimeout(signal: AbortSignal) { + return new Promise((resolve) => { + const timeout = setTimeout(() => { + resolve() + }, this.timeout) + + const abortListener = () => { + clearTimeout(timeout) + signal.removeEventListener('abort', abortListener) + resolve() + } + signal.addEventListener('abort', abortListener) + }) + } +} diff --git a/packages/server/src/models/index.ts b/packages/server/src/models/index.ts index e3fcbc9c..4de74840 100644 --- a/packages/server/src/models/index.ts +++ b/packages/server/src/models/index.ts @@ -3,3 +3,4 @@ export * as Metadata from './Metadata' export {StreamSplitter} from './StreamSplitter' export {Uid} from './Uid' export {Upload} from './Upload' +export {Locker, MemoryLocker} from './Locker' diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 5640b179..8d7b2731 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -150,7 +150,19 @@ export class Server extends EventEmitter { const onError = (error: {status_code?: number; body?: string; message: string}) => { const status_code = error.status_code || ERRORS.UNKNOWN_ERROR.status_code const body = error.body || `${ERRORS.UNKNOWN_ERROR.body}${error.message || ''}\n` - return this.write(res, status_code, body) + const isAbortError = + status_code === ERRORS.ABORTED.status_code && error.body === ERRORS.ABORTED.body + + if (isAbortError) { + res.setHeader('Connection', 'close') + } + + const response = this.write(res, status_code, body) + + if (isAbortError) { + req.destroy() + } + return response } try { diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index ebdf9297..a6462237 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -1,6 +1,6 @@ import type http from 'node:http' -import type {Upload} from './models' +import type {Locker, Upload} from './models' export type ServerOptions = { // The route to accept requests. @@ -14,6 +14,8 @@ export type ServerOptions = { // It is important to make these unique to prevent data loss. Only use it if you really need to. // Default uses `crypto.randomBytes(16).toString('hex')`. namingFunction?: (req: http.IncomingMessage) => string + // locker implementation to support distributed locks + locker?: Locker | ((req: http.IncomingMessage) => Locker) // `onUploadCreate` will be invoked before a new upload is created. // If the function returns the (modified) response, the upload will be created. // If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks) diff --git a/packages/server/test/Locker.test.ts b/packages/server/test/Locker.test.ts new file mode 100644 index 00000000..88850414 --- /dev/null +++ b/packages/server/test/Locker.test.ts @@ -0,0 +1,77 @@ +import {MemoryLocker} from '../src/models/Locker' +import assert from 'node:assert' +import sinon from 'sinon' +import {ERRORS} from '../src' + +describe('MemoryLocker', () => { + it('will acquire a lock by notifying another to release it', async () => { + const locker = new MemoryLocker() + const lockId = 'upload-id-1' + + const cancel = sinon.spy() + const cancel2 = sinon.spy() + + await locker.lock(lockId, async () => { + await locker.unlock(lockId) + cancel() + }) + + await locker.lock(lockId, async () => { + cancel2() + }) + + await locker.unlock(lockId) + + assert(cancel.callCount === 1, `calls count dont match ${cancel.callCount} !== 1`) + assert(cancel2.callCount === 0, `calls count dont match ${cancel.callCount} !== 1`) + }) + + it('will return a lock timeout error', async () => { + const locker = new MemoryLocker({ + acquireLockTimeout: 500, + }) + const lockId = 'upload-id-1' + + const cancel = sinon.spy() + + await locker.lock(lockId, async () => { + cancel() + // We note that the function has been called, but do not + // release the lock + }) + + try { + await locker.lock(lockId, async () => { + throw new Error('panic should not be called') + }) + } catch (e) { + assert(!(e instanceof Error), `error returned is not correct ${e.message}`) + assert('body' in e, 'body is not present in the error') + assert(e.body === ERRORS.ERR_LOCK_TIMEOUT.body) + } + }) + + it('request lock and unlock', async () => { + const locker = new MemoryLocker() + const lockId = 'upload-id-1' + + const cancel = sinon.spy() + await locker.lock(lockId, () => { + cancel() + setTimeout(async () => { + await locker.unlock(lockId) + }, 50) + }) + + await locker.lock(lockId, () => { + throw new Error('should not be called') + }) + + await locker.unlock(lockId) + + assert( + cancel.callCount > 0, + `request released called more times than expected - ${cancel.callCount}` + ) + }) +}) From c5a0e35fb56069f7f97ff730c83bd949a667f369 Mon Sep 17 00:00:00 2001 From: fenos Date: Tue, 5 Dec 2023 10:49:10 +0000 Subject: [PATCH 2/8] review improvements --- packages/server/src/models/Locker.ts | 13 ++++++++ packages/server/src/server.ts | 49 +++++++++++++--------------- test/package.json | 4 +-- 3 files changed, 38 insertions(+), 28 deletions(-) diff --git a/packages/server/src/models/Locker.ts b/packages/server/src/models/Locker.ts index a1b974c7..fb75d46d 100644 --- a/packages/server/src/models/Locker.ts +++ b/packages/server/src/models/Locker.ts @@ -2,6 +2,11 @@ import {ERRORS} from '../constants' export type RequestRelease = () => Promise | void +// The Locker interface defines methods for implementing a locking mechanism. +// This is crucial for ensuring exclusive access to uploads and their metadata. +// Following TUS recommendations, it's important to cancel locks from previous requests +// to avoid holding locks for too long and to manage half-open TCP connections effectively. +// The lock method includes a cancel callback to facilitate the cancellation of a request that previously acquired the lock. export interface Locker { lock(id: string, cancelReq: RequestRelease): Promise unlock(id: string): Promise @@ -15,6 +20,10 @@ class Lock { public requestRelease?: RequestRelease } +// MemoryLocker is an implementation of the Locker interface, maintaining locks in memory. +// It ensures exclusive access to upload resources by managing locks for each upload. +// The lock() method ensures that any previously held lock is released before acquiring a new one. +// Lock acquisition attempts will timeout based on the specified 'acquireLockTimeout' duration. export class MemoryLocker implements Locker { private locks = new Map() protected timeout: number @@ -61,6 +70,10 @@ export class MemoryLocker implements Locker { } return await new Promise((resolve, reject) => { + // Using setImmediate to: + // 1. Prevent stack overflow by deferring recursive calls to the next event loop iteration. + // 2. Allow event loop to process other pending events, maintaining server responsiveness. + // 3. Ensure fairness in lock acquisition by giving other requests a chance to acquire the lock. setImmediate(() => { this.acquireLock(id, signal).then(resolve).catch(reject) }) diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 1c2e3d87..9fda3763 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -159,7 +159,7 @@ export class Server extends EventEmitter { const status_code = error.status_code || ERRORS.UNKNOWN_ERROR.status_code const body = error.body || `${ERRORS.UNKNOWN_ERROR.body}${error.message || ''}\n` - return this.write(req, res, status_code, body, {}, context) + return this.write(context, req, res, status_code, body, {}) } if (req.method === 'GET') { @@ -173,7 +173,7 @@ export class Server extends EventEmitter { res.setHeader('Tus-Resumable', TUS_RESUMABLE) if (req.method !== 'OPTIONS' && req.headers['tus-resumable'] === undefined) { - return this.write(req, res, 412, 'Tus-Resumable Required\n', {}, context) + return this.write(context, req, res, 412, 'Tus-Resumable Required\n') } // Validate all required headers to adhere to the tus protocol @@ -199,14 +199,7 @@ export class Server extends EventEmitter { } if (invalid_headers.length > 0) { - return this.write( - req, - res, - 400, - `Invalid ${invalid_headers.join(' ')}\n`, - {}, - context - ) + return this.write(context, req, res, 400, `Invalid ${invalid_headers.join(' ')}\n`) } // Enable CORS @@ -221,16 +214,16 @@ export class Server extends EventEmitter { return handler.send(req, res, context).catch(onError) } - return this.write(req, res, 404, 'Not found\n', {}, context) + return this.write(context, req, res, 404, 'Not found\n') } write( + context: CancellationContext, req: http.IncomingMessage, res: http.ServerResponse, status: number, body = '', - headers = {}, - context: CancellationContext + headers = {} ) { const isAborted = context.signal.aborted @@ -269,33 +262,37 @@ export class Server extends EventEmitter { } protected createContext(req: http.IncomingMessage) { - const abortController = new AbortController() - const delayedAbortController = new AbortController() + // Initialize two AbortControllers: + // 1. `requestAbortController` for instant request termination, particularly useful for stopping clients to upload when errors occur. + // 2. `abortWithDelayController` to introduce a delay before aborting, allowing the server time to complete ongoing operations. + // This is particularly useful when a future request may need to acquire a lock currently held by this request. + const requestAbortController = new AbortController() + const abortWithDelayController = new AbortController() const onAbort = (err: unknown) => { - abortController.signal.removeEventListener('abort', onAbort) + abortWithDelayController.signal.removeEventListener('abort', onAbort) setTimeout(() => { - delayedAbortController.abort(err) + requestAbortController.abort(err) }, 3000) } - abortController.signal.addEventListener('abort', onAbort) + abortWithDelayController.signal.addEventListener('abort', onAbort) req.on('close', () => { - abortController.signal.removeEventListener('abort', onAbort) + abortWithDelayController.signal.removeEventListener('abort', onAbort) }) return { - signal: delayedAbortController.signal, + signal: requestAbortController.signal, abort: () => { - // abort the delayed signal right away - if (!delayedAbortController.signal.aborted) { - delayedAbortController.abort(ERRORS.ABORTED) + // abort the request immediately + if (!requestAbortController.signal.aborted) { + requestAbortController.abort(ERRORS.ABORTED) } }, cancel: () => { - // cancel and wait until the delayedController time elapses - if (!abortController.signal.aborted) { - abortController.abort(ERRORS.ABORTED) + // Initiates the delayed abort sequence unless it's already in progress. + if (!abortWithDelayController.signal.aborted) { + abortWithDelayController.abort(ERRORS.ABORTED) } }, } diff --git a/test/package.json b/test/package.json index f9a27c64..422792ec 100644 --- a/test/package.json +++ b/test/package.json @@ -10,8 +10,7 @@ "@tus/file-store": "workspace:^", "@tus/gcs-store": "workspace:^", "@tus/s3-store": "workspace:^", - "@tus/server": "workspace:^", - "throttle": "^1.0.3" + "@tus/server": "workspace:^" }, "devDependencies": { "@types/mocha": "^10.0.1", @@ -25,6 +24,7 @@ "should": "^13.2.3", "sinon": "^15.2.0", "supertest": "^6.3.3", + "throttle": "^1.0.3", "ts-node": "^10.9.1", "tsconfig": "*", "typescript": "^5.2.2" From 941e557a16254b924566d57e34bd6d65bbe78a14 Mon Sep 17 00:00:00 2001 From: fenos Date: Wed, 6 Dec 2023 09:01:19 +0000 Subject: [PATCH 3/8] review improvements --- packages/server/src/handlers/BaseHandler.ts | 29 ++++- packages/server/src/handlers/DeleteHandler.ts | 2 +- packages/server/src/handlers/HeadHandler.ts | 2 +- packages/server/src/handlers/PatchHandler.ts | 2 +- packages/server/src/handlers/PostHandler.ts | 2 +- packages/server/src/index.ts | 1 + packages/server/src/lockers/MemoryLocker.ts | 104 +++++++++++++++ packages/server/src/lockers/index.ts | 1 + packages/server/src/models/Locker.ts | 118 +++--------------- packages/server/src/models/index.ts | 2 +- packages/server/src/server.ts | 19 ++- packages/server/test/Locker.test.ts | 3 +- 12 files changed, 171 insertions(+), 114 deletions(-) create mode 100644 packages/server/src/lockers/MemoryLocker.ts create mode 100644 packages/server/src/lockers/index.ts diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index 26e57cc2..68d3e486 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -10,6 +10,26 @@ const reExtractFileID = /([^/]+)\/?$/ const reForwardedHost = /host="?([^";]+)/ const reForwardedProto = /proto=(https?)/ +/** + * The CancellationContext interface provides mechanisms to manage the termination of a request. + * It is designed to handle two types of request terminations: immediate abortion and graceful cancellation. + * + * Properties: + * - signal: An instance of AbortSignal. It allows external entities to listen for cancellation requests, + * making it possible to react accordingly. + * + * Methods: + * - abort(): This function should be called to immediately terminate the request. It is intended for scenarios + * where the request cannot continue and needs to be stopped as soon as possible, such as due to upload errors + * or invalid conditions. Implementers should ensure that invoking this method leads to the swift cessation of all + * request-related operations to save resources. + * + * - cancel(): This function is used for more controlled termination of the request. It signals that the request should + * be concluded, but allows for a short period of time to finalize operations gracefully. This could involve + * completing current transactions or cleaning up resources. The exact behavior and the time allowed for cancellation + * completion are determined by the implementation, but the goal is to try to end the request without abrupt interruption, + * ensuring orderly shutdown of ongoing processes. + */ export interface CancellationContext { signal: AbortSignal abort: () => void @@ -124,11 +144,16 @@ export class BaseHandler extends EventEmitter { context: CancellationContext ) { const locker = this.getLocker(req) - await locker?.lock(id, () => { + + if (!locker) { + return + } + + await locker.lock(id, () => { context.cancel() }) - return () => locker?.unlock(id) + return () => locker.unlock(id) } protected writeToStore( diff --git a/packages/server/src/handlers/DeleteHandler.ts b/packages/server/src/handlers/DeleteHandler.ts index 7836656a..c7c704ab 100644 --- a/packages/server/src/handlers/DeleteHandler.ts +++ b/packages/server/src/handlers/DeleteHandler.ts @@ -22,7 +22,7 @@ export class DeleteHandler extends BaseHandler { try { await this.store.remove(id) } finally { - await unlock() + await unlock?.() } const writtenRes = this.write(res, 204, {}) this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id) diff --git a/packages/server/src/handlers/HeadHandler.ts b/packages/server/src/handlers/HeadHandler.ts index c9d156df..49ba1e9f 100644 --- a/packages/server/src/handlers/HeadHandler.ts +++ b/packages/server/src/handlers/HeadHandler.ts @@ -26,7 +26,7 @@ export class HeadHandler extends BaseHandler { try { file = await this.store.getUpload(id) } finally { - await unlock() + await unlock?.() } // If a Client does attempt to resume an upload which has since diff --git a/packages/server/src/handlers/PatchHandler.ts b/packages/server/src/handlers/PatchHandler.ts index 3facdb92..0dcb8d14 100644 --- a/packages/server/src/handlers/PatchHandler.ts +++ b/packages/server/src/handlers/PatchHandler.ts @@ -92,7 +92,7 @@ export class PatchHandler extends BaseHandler { newOffset = await this.writeToStore(req, id, offset, context) } finally { - await unlock() + await unlock?.() } upload.offset = newOffset diff --git a/packages/server/src/handlers/PostHandler.ts b/packages/server/src/handlers/PostHandler.ts index a355f810..d070cd9a 100644 --- a/packages/server/src/handlers/PostHandler.ts +++ b/packages/server/src/handlers/PostHandler.ts @@ -120,7 +120,7 @@ export class PostHandler extends BaseHandler { context.abort() throw e } finally { - await unlock() + await unlock?.() } if (isFinal && this.options.onUploadFinish) { diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 538d7111..2bb16a96 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1,4 +1,5 @@ export {Server} from './server' export * from './types' export * from './models' +export * from './lockers' export * from './constants' diff --git a/packages/server/src/lockers/MemoryLocker.ts b/packages/server/src/lockers/MemoryLocker.ts new file mode 100644 index 00000000..602af32c --- /dev/null +++ b/packages/server/src/lockers/MemoryLocker.ts @@ -0,0 +1,104 @@ +import {ERRORS} from '../constants' +import {Locker, RequestRelease} from '../models/Locker' + +export interface MemoryLockerOptions { + acquireLockTimeout: number +} + +class Lock { + public requestRelease?: RequestRelease +} + +/** + * MemoryLocker is an implementation of the Locker interface that manages locks in memory. + * This class is designed for exclusive access control over resources, often used in scenarios like upload management. + * + * Key Features: + * - Ensures exclusive resource access by using a memory-based map to track locks. + * - Implements timeout for lock acquisition, mitigating deadlock situations. + * - Facilitates both immediate and graceful release of locks through different mechanisms. + * + * Locking Behavior: + * - When the `lock` method is invoked for an already locked resource, the `cancelReq` callback is called. + * This signals to the current lock holder that another process is requesting the lock, encouraging them to release it as soon as possible. + * - The lock attempt continues until the specified timeout is reached. If the timeout expires and the lock is still not + * available, an error is thrown to indicate lock acquisition failure. + * + * Lock Acquisition and Release: + * - The `lock` method implements a wait mechanism, allowing a lock request to either succeed when the lock becomes available, + * or fail after the timeout period. + * - The `unlock` method releases a lock, making the resource available for other requests. + */ +export class MemoryLocker implements Locker { + private locks = new Map() + protected timeout: number + + constructor(options?: MemoryLockerOptions) { + this.timeout = options?.acquireLockTimeout ?? 1000 * 30 + } + + async lock(id: string, requestRelease: RequestRelease): Promise { + const abortController = new AbortController() + const lock = await Promise.race([ + this.waitTimeout(abortController.signal), + this.acquireLock(id, abortController.signal), + ]) + + abortController.abort() + + if (!lock) { + throw ERRORS.ERR_LOCK_TIMEOUT + } + lock.requestRelease = requestRelease + } + + protected async acquireLock(id: string, signal: AbortSignal): Promise { + if (signal.aborted) { + return + } + + const lock = this.locks.get(id) + + if (!lock) { + const lock = new Lock() + this.locks.set(id, lock) + return lock + } + + await lock.requestRelease?.() + + return await new Promise((resolve, reject) => { + // Using setImmediate to: + // 1. Prevent stack overflow by deferring recursive calls to the next event loop iteration. + // 2. Allow event loop to process other pending events, maintaining server responsiveness. + // 3. Ensure fairness in lock acquisition by giving other requests a chance to acquire the lock. + setImmediate(() => { + this.acquireLock(id, signal).then(resolve).catch(reject) + }) + }) + } + + async unlock(id: string): Promise { + const lock = this.locks.get(id) + if (!lock) { + throw new Error('Releasing an unlocked lock!') + } + + this.locks.delete(id) + } + + protected waitTimeout(signal: AbortSignal) { + return new Promise((resolve) => { + const timeout = setTimeout(() => { + resolve() + }, this.timeout) + + const abortListener = () => { + clearTimeout(timeout) + signal.removeEventListener('abort', abortListener) + resolve() + } + signal.addEventListener('abort', abortListener) + }) + } +} diff --git a/packages/server/src/lockers/index.ts b/packages/server/src/lockers/index.ts new file mode 100644 index 00000000..b6de79c2 --- /dev/null +++ b/packages/server/src/lockers/index.ts @@ -0,0 +1 @@ +export * from './MemoryLocker' diff --git a/packages/server/src/models/Locker.ts b/packages/server/src/models/Locker.ts index fb75d46d..8a909e21 100644 --- a/packages/server/src/models/Locker.ts +++ b/packages/server/src/models/Locker.ts @@ -1,106 +1,24 @@ -import {ERRORS} from '../constants' - export type RequestRelease = () => Promise | void -// The Locker interface defines methods for implementing a locking mechanism. -// This is crucial for ensuring exclusive access to uploads and their metadata. -// Following TUS recommendations, it's important to cancel locks from previous requests -// to avoid holding locks for too long and to manage half-open TCP connections effectively. -// The lock method includes a cancel callback to facilitate the cancellation of a request that previously acquired the lock. +/** + * The Locker interface defines methods for implementing a locking mechanism. + * It is primarily used to ensure exclusive access to resources, such as uploads and their metadata. + * + * The interface adheres to TUS protocol recommendations, emphasizing the need to prevent prolonged lock retention. + * This approach helps manage resources efficiently and avoids issues with half-open TCP connections. + * + * Methods: + * - lock(id, cancelReq): Acquires a lock on a resource identified by 'id'. If the lock is already held by another request, + * the 'cancelReq' callback is provided to signal the current lock holder to release the lock. + * The 'cancelReq' callback should be invoked when there's an attempt by another request to acquire a previously locked resource. + * This mechanism ensures that locks are held only as long as necessary and are promptly released for other requests. + * + * - unlock(id): Releases the lock held on the resource identified by 'id'. This should be called by the lock holder + * after completing their operation or upon receiving a signal through the 'cancelReq' callback from a subsequent request + * attempting to acquire the lock. + * + */ export interface Locker { lock(id: string, cancelReq: RequestRelease): Promise unlock(id: string): Promise } - -export interface MemoryLockerOptions { - acquireLockTimeout: number -} - -class Lock { - public requestRelease?: RequestRelease -} - -// MemoryLocker is an implementation of the Locker interface, maintaining locks in memory. -// It ensures exclusive access to upload resources by managing locks for each upload. -// The lock() method ensures that any previously held lock is released before acquiring a new one. -// Lock acquisition attempts will timeout based on the specified 'acquireLockTimeout' duration. -export class MemoryLocker implements Locker { - private locks = new Map() - protected timeout: number - - constructor(options?: MemoryLockerOptions) { - this.timeout = options?.acquireLockTimeout ?? 1000 * 30 - } - - async lock(id: string, requestRelease: RequestRelease): Promise { - const abortController = new AbortController() - const lock = await Promise.race([ - this.waitTimeout(abortController.signal), - this.acquireLock(id, abortController.signal), - ]) - - abortController.abort() - - if (!lock) { - throw ERRORS.ERR_LOCK_TIMEOUT - } - lock.requestRelease = requestRelease - } - - protected async acquireLock(id: string, signal: AbortSignal): Promise { - if (signal.aborted) { - return - } - - const lock = this.locks.get(id) - - if (!lock) { - const lock = new Lock() - this.locks.set(id, lock) - return lock - } - - await lock.requestRelease?.() - - const potentialNewLock = this.locks.get(id) - if (!potentialNewLock) { - const lock = new Lock() - this.locks.set(id, lock) - return lock - } - - return await new Promise((resolve, reject) => { - // Using setImmediate to: - // 1. Prevent stack overflow by deferring recursive calls to the next event loop iteration. - // 2. Allow event loop to process other pending events, maintaining server responsiveness. - // 3. Ensure fairness in lock acquisition by giving other requests a chance to acquire the lock. - setImmediate(() => { - this.acquireLock(id, signal).then(resolve).catch(reject) - }) - }) - } - - async unlock(id: string): Promise { - const lock = this.locks.get(id) - if (!lock) { - throw new Error('Releasing an unlocked lock!') - } - - this.locks.delete(id) - } - - protected waitTimeout(signal: AbortSignal) { - return new Promise((resolve) => { - const timeout = setTimeout(() => { - resolve() - }, this.timeout) - - const abortListener = () => { - clearTimeout(timeout) - signal.removeEventListener('abort', abortListener) - resolve() - } - signal.addEventListener('abort', abortListener) - }) - } -} diff --git a/packages/server/src/models/index.ts b/packages/server/src/models/index.ts index 4de74840..83b4e24a 100644 --- a/packages/server/src/models/index.ts +++ b/packages/server/src/models/index.ts @@ -3,4 +3,4 @@ export * as Metadata from './Metadata' export {StreamSplitter} from './StreamSplitter' export {Uid} from './Uid' export {Upload} from './Upload' -export {Locker, MemoryLocker} from './Locker' +export {Locker} from './Locker' diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 8e8672e8..71b83ada 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -233,14 +233,23 @@ export class Server extends EventEmitter { } if (isAborted) { + // This condition handles situations where the request has been flagged as aborted. + // In such cases, the server informs the client that the connection will be closed. + // This is communicated by setting the 'Connection' header to 'close' in the response. + // This step is essential to prevent the server from continuing to process a request + // that is no longer needed, thereby saving resources. + // @ts-expect-error not explicitly typed but possible headers['Connection'] = 'close' - res.writeHead(status, headers) - res.write(body) - const sent = res.end() - req.destroy() - return sent + // An event listener is added to the response ('res') for the 'finish' event. + // The 'finish' event is triggered when the response has been sent to the client. + // Once the response is complete, the request ('req') object is destroyed. + // Destroying the request object is a crucial step to release any resources + // tied to this request, as it has already been aborted. + res.on('finish', () => { + req.destroy() + }) } res.writeHead(status, headers) diff --git a/packages/server/test/Locker.test.ts b/packages/server/test/Locker.test.ts index 88850414..39be3c75 100644 --- a/packages/server/test/Locker.test.ts +++ b/packages/server/test/Locker.test.ts @@ -1,7 +1,6 @@ -import {MemoryLocker} from '../src/models/Locker' import assert from 'node:assert' import sinon from 'sinon' -import {ERRORS} from '../src' +import {ERRORS, MemoryLocker} from '../src' describe('MemoryLocker', () => { it('will acquire a lock by notifying another to release it', async () => { From a6bf1b06fac73a3dd63846a6f9fff8e34a32adf8 Mon Sep 17 00:00:00 2001 From: fenos Date: Thu, 7 Dec 2023 11:37:10 +0000 Subject: [PATCH 4/8] revert: custom context creation --- packages/server/src/handlers/BaseHandler.ts | 36 ++-------- packages/server/src/handlers/DeleteHandler.ts | 7 +- packages/server/src/handlers/HeadHandler.ts | 8 +-- packages/server/src/handlers/PatchHandler.ts | 8 +-- packages/server/src/handlers/PostHandler.ts | 8 +-- packages/server/src/lockers/MemoryLocker.ts | 72 ++++++++++++------- packages/server/src/models/Context.ts | 25 +++++++ packages/server/src/models/Locker.ts | 15 ++-- packages/server/src/models/index.ts | 3 +- packages/server/src/server.ts | 3 +- packages/server/src/types.ts | 2 +- packages/server/test/DeleteHandler.test.ts | 2 +- packages/server/test/HeadHandler.test.ts | 3 +- packages/server/test/PatchHandler.test.ts | 3 +- packages/server/test/PostHandler.test.ts | 3 +- 15 files changed, 111 insertions(+), 87 deletions(-) create mode 100644 packages/server/src/models/Context.ts diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index 68d3e486..575740bb 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -1,7 +1,7 @@ import EventEmitter from 'node:events' import type {ServerOptions} from '../types' -import type {DataStore} from '../models' +import type {DataStore, CancellationContext} from '../models' import type http from 'node:http' import stream from 'node:stream' import {ERRORS} from '../constants' @@ -10,32 +10,6 @@ const reExtractFileID = /([^/]+)\/?$/ const reForwardedHost = /host="?([^";]+)/ const reForwardedProto = /proto=(https?)/ -/** - * The CancellationContext interface provides mechanisms to manage the termination of a request. - * It is designed to handle two types of request terminations: immediate abortion and graceful cancellation. - * - * Properties: - * - signal: An instance of AbortSignal. It allows external entities to listen for cancellation requests, - * making it possible to react accordingly. - * - * Methods: - * - abort(): This function should be called to immediately terminate the request. It is intended for scenarios - * where the request cannot continue and needs to be stopped as soon as possible, such as due to upload errors - * or invalid conditions. Implementers should ensure that invoking this method leads to the swift cessation of all - * request-related operations to save resources. - * - * - cancel(): This function is used for more controlled termination of the request. It signals that the request should - * be concluded, but allows for a short period of time to finalize operations gracefully. This could involve - * completing current transactions or cleaning up resources. The exact behavior and the time allowed for cancellation - * completion are determined by the implementation, but the goal is to try to end the request without abrupt interruption, - * ensuring orderly shutdown of ongoing processes. - */ -export interface CancellationContext { - signal: AbortSignal - abort: () => void - cancel: () => void -} - export class BaseHandler extends EventEmitter { options: ServerOptions store: DataStore @@ -143,17 +117,19 @@ export class BaseHandler extends EventEmitter { id: string, context: CancellationContext ) { - const locker = this.getLocker(req) + const locker = await this.getLocker(req) if (!locker) { return } - await locker.lock(id, () => { + const lock = locker.newLock(id) + + await lock.lock(() => { context.cancel() }) - return () => locker.unlock(id) + return lock } protected writeToStore( diff --git a/packages/server/src/handlers/DeleteHandler.ts b/packages/server/src/handlers/DeleteHandler.ts index c7c704ab..d93e6c50 100644 --- a/packages/server/src/handlers/DeleteHandler.ts +++ b/packages/server/src/handlers/DeleteHandler.ts @@ -1,5 +1,6 @@ -import {BaseHandler, CancellationContext} from './BaseHandler' +import {BaseHandler} from './BaseHandler' import {ERRORS, EVENTS} from '../constants' +import {CancellationContext} from '../models' import type http from 'node:http' @@ -18,11 +19,11 @@ export class DeleteHandler extends BaseHandler { await this.options.onIncomingRequest(req, res, id) } - const unlock = await this.acquireLock(req, id, context) + const lock = await this.acquireLock(req, id, context) try { await this.store.remove(id) } finally { - await unlock?.() + await lock?.unlock() } const writtenRes = this.write(res, 204, {}) this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id) diff --git a/packages/server/src/handlers/HeadHandler.ts b/packages/server/src/handlers/HeadHandler.ts index 49ba1e9f..a8db9f10 100644 --- a/packages/server/src/handlers/HeadHandler.ts +++ b/packages/server/src/handlers/HeadHandler.ts @@ -1,7 +1,7 @@ -import {BaseHandler, CancellationContext} from './BaseHandler' +import {BaseHandler} from './BaseHandler' import {ERRORS} from '../constants' -import {Metadata, Upload} from '../models' +import {Metadata, Upload, CancellationContext} from '../models' import type http from 'node:http' @@ -20,13 +20,13 @@ export class HeadHandler extends BaseHandler { await this.options.onIncomingRequest(req, res, id) } - const unlock = await this.acquireLock(req, id, context) + const lock = await this.acquireLock(req, id, context) let file: Upload try { file = await this.store.getUpload(id) } finally { - await unlock?.() + await lock?.unlock() } // If a Client does attempt to resume an upload which has since diff --git a/packages/server/src/handlers/PatchHandler.ts b/packages/server/src/handlers/PatchHandler.ts index 0dcb8d14..228d64a9 100644 --- a/packages/server/src/handlers/PatchHandler.ts +++ b/packages/server/src/handlers/PatchHandler.ts @@ -1,10 +1,10 @@ import debug from 'debug' -import {BaseHandler, CancellationContext} from './BaseHandler' +import {BaseHandler} from './BaseHandler' import {ERRORS, EVENTS} from '../constants' import type http from 'node:http' -import {Upload} from '../models' +import {CancellationContext, Upload} from '../models' const log = debug('tus-node-server:handlers:patch') @@ -36,7 +36,7 @@ export class PatchHandler extends BaseHandler { throw ERRORS.INVALID_CONTENT_TYPE } - const unlock = await this.acquireLock(req, id, context) + const lock = await this.acquireLock(req, id, context) let upload: Upload let newOffset: number @@ -92,7 +92,7 @@ export class PatchHandler extends BaseHandler { newOffset = await this.writeToStore(req, id, offset, context) } finally { - await unlock?.() + await lock?.unlock() } upload.offset = newOffset diff --git a/packages/server/src/handlers/PostHandler.ts b/packages/server/src/handlers/PostHandler.ts index d070cd9a..ac52e1fa 100644 --- a/packages/server/src/handlers/PostHandler.ts +++ b/packages/server/src/handlers/PostHandler.ts @@ -1,13 +1,13 @@ import debug from 'debug' -import {BaseHandler, CancellationContext} from './BaseHandler' +import {BaseHandler} from './BaseHandler' import {Upload, Uid, Metadata} from '../models' import {validateHeader} from '../validators/HeaderValidator' import {EVENTS, ERRORS} from '../constants' import type http from 'node:http' import type {ServerOptions} from '../types' -import {DataStore} from '../models' +import {DataStore, CancellationContext} from '../models' const log = debug('tus-node-server:handlers:post') @@ -92,7 +92,7 @@ export class PostHandler extends BaseHandler { } } - const unlock = await this.acquireLock(req, id, context) + const lock = await this.acquireLock(req, id, context) let isFinal: boolean let url: string let headers: { @@ -120,7 +120,7 @@ export class PostHandler extends BaseHandler { context.abort() throw e } finally { - await unlock?.() + await lock?.unlock() } if (isFinal && this.options.onUploadFinish) { diff --git a/packages/server/src/lockers/MemoryLocker.ts b/packages/server/src/lockers/MemoryLocker.ts index 602af32c..8e68fdbe 100644 --- a/packages/server/src/lockers/MemoryLocker.ts +++ b/packages/server/src/lockers/MemoryLocker.ts @@ -1,13 +1,5 @@ import {ERRORS} from '../constants' -import {Locker, RequestRelease} from '../models/Locker' - -export interface MemoryLockerOptions { - acquireLockTimeout: number -} - -class Lock { - public requestRelease?: RequestRelease -} +import {Lock, Locker, RequestRelease} from '../models' /** * MemoryLocker is an implementation of the Locker interface that manages locks in memory. @@ -29,19 +21,40 @@ class Lock { * or fail after the timeout period. * - The `unlock` method releases a lock, making the resource available for other requests. */ + +export interface MemoryLockerOptions { + acquireLockTimeout: number +} + +interface LockEntry { + requestRelease: RequestRelease +} + export class MemoryLocker implements Locker { - private locks = new Map() - protected timeout: number + timeout: number + locks = new Map() constructor(options?: MemoryLockerOptions) { this.timeout = options?.acquireLockTimeout ?? 1000 * 30 } - async lock(id: string, requestRelease: RequestRelease): Promise { + newLock(id: string) { + return new MemoryLock(id, this, this.timeout) + } +} + +class MemoryLock implements Lock { + constructor( + private id: string, + private locker: MemoryLocker, + private timeout: number = 1000 * 30 + ) {} + + async lock(requestRelease: RequestRelease): Promise { const abortController = new AbortController() const lock = await Promise.race([ this.waitTimeout(abortController.signal), - this.acquireLock(id, abortController.signal), + this.acquireLock(this.id, requestRelease, abortController.signal), ]) abortController.abort() @@ -49,20 +62,25 @@ export class MemoryLocker implements Locker { if (!lock) { throw ERRORS.ERR_LOCK_TIMEOUT } - lock.requestRelease = requestRelease } - protected async acquireLock(id: string, signal: AbortSignal): Promise { + protected async acquireLock( + id: string, + requestRelease: RequestRelease, + signal: AbortSignal + ): Promise { if (signal.aborted) { - return + return false } - const lock = this.locks.get(id) + const lock = this.locker.locks.get(id) if (!lock) { - const lock = new Lock() - this.locks.set(id, lock) - return lock + const lock = { + requestRelease, + } + this.locker.locks.set(id, lock) + return true } await lock.requestRelease?.() @@ -73,30 +91,30 @@ export class MemoryLocker implements Locker { // 2. Allow event loop to process other pending events, maintaining server responsiveness. // 3. Ensure fairness in lock acquisition by giving other requests a chance to acquire the lock. setImmediate(() => { - this.acquireLock(id, signal).then(resolve).catch(reject) + this.acquireLock(id, requestRelease, signal).then(resolve).catch(reject) }) }) } - async unlock(id: string): Promise { - const lock = this.locks.get(id) + async unlock(): Promise { + const lock = this.locker.locks.get(this.id) if (!lock) { throw new Error('Releasing an unlocked lock!') } - this.locks.delete(id) + this.locker.locks.delete(this.id) } protected waitTimeout(signal: AbortSignal) { - return new Promise((resolve) => { + return new Promise((resolve) => { const timeout = setTimeout(() => { - resolve() + resolve(false) }, this.timeout) const abortListener = () => { clearTimeout(timeout) signal.removeEventListener('abort', abortListener) - resolve() + resolve(false) } signal.addEventListener('abort', abortListener) }) diff --git a/packages/server/src/models/Context.ts b/packages/server/src/models/Context.ts new file mode 100644 index 00000000..18d20c49 --- /dev/null +++ b/packages/server/src/models/Context.ts @@ -0,0 +1,25 @@ +/** + * The CancellationContext interface provides mechanisms to manage the termination of a request. + * It is designed to handle two types of request terminations: immediate abortion and graceful cancellation. + * + * Properties: + * - signal: An instance of AbortSignal. It allows external entities to listen for cancellation requests, + * making it possible to react accordingly. + * + * Methods: + * - abort(): This function should be called to immediately terminate the request. It is intended for scenarios + * where the request cannot continue and needs to be stopped as soon as possible, such as due to upload errors + * or invalid conditions. Implementers should ensure that invoking this method leads to the swift cessation of all + * request-related operations to save resources. + * + * - cancel(): This function is used for more controlled termination of the request. It signals that the request should + * be concluded, but allows for a short period of time to finalize operations gracefully. This could involve + * completing current transactions or cleaning up resources. The exact behavior and the time allowed for cancellation + * completion are determined by the implementation, but the goal is to try to end the request without abrupt interruption, + * ensuring orderly shutdown of ongoing processes. + */ +export interface CancellationContext { + signal: AbortSignal + abort: () => void + cancel: () => void +} diff --git a/packages/server/src/models/Locker.ts b/packages/server/src/models/Locker.ts index 8a909e21..e4365f4b 100644 --- a/packages/server/src/models/Locker.ts +++ b/packages/server/src/models/Locker.ts @@ -1,7 +1,14 @@ export type RequestRelease = () => Promise | void /** - * The Locker interface defines methods for implementing a locking mechanism. + * The Locker interface creates a Lock instance for a given resource identifier. + */ +export interface Locker { + newLock(id: string): Lock +} + +/** + * The Lock interface defines methods for implementing a locking mechanism. * It is primarily used to ensure exclusive access to resources, such as uploads and their metadata. * * The interface adheres to TUS protocol recommendations, emphasizing the need to prevent prolonged lock retention. @@ -18,7 +25,7 @@ export type RequestRelease = () => Promise | void * attempting to acquire the lock. * */ -export interface Locker { - lock(id: string, cancelReq: RequestRelease): Promise - unlock(id: string): Promise +export interface Lock { + lock(cancelReq: RequestRelease): Promise + unlock(): Promise } diff --git a/packages/server/src/models/index.ts b/packages/server/src/models/index.ts index 83b4e24a..908f53ca 100644 --- a/packages/server/src/models/index.ts +++ b/packages/server/src/models/index.ts @@ -3,4 +3,5 @@ export * as Metadata from './Metadata' export {StreamSplitter} from './StreamSplitter' export {Uid} from './Uid' export {Upload} from './Upload' -export {Locker} from './Locker' +export {Locker, Lock, RequestRelease} from './Locker' +export {CancellationContext} from './Context' diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 71b83ada..15906088 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -21,8 +21,7 @@ import { import type stream from 'node:stream' import type {ServerOptions, RouteHandler} from './types' -import type {DataStore, Upload} from './models' -import {CancellationContext} from './handlers/BaseHandler' +import type {DataStore, Upload, CancellationContext} from './models' type Handlers = { GET: InstanceType diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index dcaa3067..7e0193c0 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -24,7 +24,7 @@ export type ServerOptions = { // Default uses `crypto.randomBytes(16).toString('hex')`. namingFunction?: (req: http.IncomingMessage) => string // locker implementation to support distributed locks - locker?: Locker | ((req: http.IncomingMessage) => Locker) + locker?: Locker | ((req: http.IncomingMessage) => Locker | Promise) // `onUploadCreate` will be invoked before a new upload is created. // If the function returns the (modified) response, the upload will be created. // If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks) diff --git a/packages/server/test/DeleteHandler.test.ts b/packages/server/test/DeleteHandler.test.ts index b867272f..825bcb9f 100644 --- a/packages/server/test/DeleteHandler.test.ts +++ b/packages/server/test/DeleteHandler.test.ts @@ -9,7 +9,7 @@ import httpMocks from 'node-mocks-http' import {DataStore} from '../src/models/DataStore' import {DeleteHandler} from '../src/handlers/DeleteHandler' import {ERRORS, EVENTS} from '../src/constants' -import {CancellationContext} from '../src/handlers/BaseHandler' +import {CancellationContext} from '../src/models' describe('DeleteHandler', () => { const path = '/test/output' diff --git a/packages/server/test/HeadHandler.test.ts b/packages/server/test/HeadHandler.test.ts index 536f08f6..08f1ec0b 100644 --- a/packages/server/test/HeadHandler.test.ts +++ b/packages/server/test/HeadHandler.test.ts @@ -4,10 +4,9 @@ import http from 'node:http' import sinon from 'sinon' import httpMocks from 'node-mocks-http' -import {DataStore, Upload} from '../src/models' +import {DataStore, Upload, CancellationContext} from '../src/models' import {HeadHandler} from '../src/handlers/HeadHandler' import {ERRORS} from '../src/constants' -import {CancellationContext} from '../src/handlers/BaseHandler' describe('HeadHandler', () => { const path = '/test/output' diff --git a/packages/server/test/PatchHandler.test.ts b/packages/server/test/PatchHandler.test.ts index dde309e0..4384df94 100644 --- a/packages/server/test/PatchHandler.test.ts +++ b/packages/server/test/PatchHandler.test.ts @@ -7,9 +7,8 @@ import sinon from 'sinon' import httpMocks from 'node-mocks-http' import {PatchHandler} from '../src/handlers/PatchHandler' -import {Upload, DataStore} from '../src/models' +import {Upload, DataStore, CancellationContext} from '../src/models' import {EVENTS} from '../src/constants' -import {CancellationContext} from '../src/handlers/BaseHandler' import {EventEmitter} from 'node:events' import {addPipableStreamBody} from './utils' diff --git a/packages/server/test/PostHandler.test.ts b/packages/server/test/PostHandler.test.ts index 28ead94e..eea9a6c8 100644 --- a/packages/server/test/PostHandler.test.ts +++ b/packages/server/test/PostHandler.test.ts @@ -7,10 +7,9 @@ import http from 'node:http' import httpMocks from 'node-mocks-http' import sinon from 'sinon' -import {Upload, DataStore} from '../src/models' +import {Upload, DataStore, CancellationContext} from '../src/models' import {PostHandler} from '../src/handlers/PostHandler' import {EVENTS} from '../src/constants' -import {CancellationContext} from '../src/handlers/BaseHandler' import {addPipableStreamBody} from './utils' const SERVER_OPTIONS = { From cf9164d685ae9fc488e8ef652f1b7b7d7f8d84fd Mon Sep 17 00:00:00 2001 From: Fabrizio Date: Fri, 8 Dec 2023 13:54:33 +0000 Subject: [PATCH 5/8] Update packages/server/src/handlers/BaseHandler.ts Co-authored-by: Merlijn Vos --- packages/server/src/handlers/BaseHandler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index 575740bb..ee3c73c2 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -105,7 +105,7 @@ export class BaseHandler extends EventEmitter { return {host: host as string, proto} } - protected getLocker(req: http.IncomingMessage) { + protected async getLocker(req: http.IncomingMessage) { if (typeof this.options.locker === 'function') { return this.options.locker(req) } From 560852bf86759779e23f9c520d3ff72c91a53b58 Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 8 Dec 2023 14:23:05 +0000 Subject: [PATCH 6/8] default MemoryLocker --- packages/server/src/handlers/BaseHandler.ts | 15 +++++++++------ packages/server/src/handlers/DeleteHandler.ts | 2 +- packages/server/src/handlers/HeadHandler.ts | 2 +- packages/server/src/handlers/OptionsHandler.ts | 2 +- packages/server/src/handlers/PatchHandler.ts | 2 +- packages/server/src/handlers/PostHandler.ts | 6 +++--- packages/server/src/server.ts | 6 ++++++ packages/server/src/types.ts | 5 ++++- packages/server/test/DeleteHandler.test.ts | 7 ++++++- 9 files changed, 32 insertions(+), 15 deletions(-) diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index ee3c73c2..203d93dc 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -5,13 +5,16 @@ import type {DataStore, CancellationContext} from '../models' import type http from 'node:http' import stream from 'node:stream' import {ERRORS} from '../constants' +import {MemoryLocker} from '../lockers' const reExtractFileID = /([^/]+)\/?$/ const reForwardedHost = /host="?([^";]+)/ const reForwardedProto = /proto=(https?)/ +type WithRequired = T & {[P in K]-?: T[P]} + export class BaseHandler extends EventEmitter { - options: ServerOptions + options: WithRequired store: DataStore constructor(store: DataStore, options: ServerOptions) { @@ -20,8 +23,12 @@ export class BaseHandler extends EventEmitter { throw new Error('Store must be defined') } + if (!options.locker) { + options.locker = new MemoryLocker() + } + this.store = store - this.options = options + this.options = options as WithRequired } write(res: http.ServerResponse, status: number, headers = {}, body = '') { @@ -119,10 +126,6 @@ export class BaseHandler extends EventEmitter { ) { const locker = await this.getLocker(req) - if (!locker) { - return - } - const lock = locker.newLock(id) await lock.lock(() => { diff --git a/packages/server/src/handlers/DeleteHandler.ts b/packages/server/src/handlers/DeleteHandler.ts index d93e6c50..cb685070 100644 --- a/packages/server/src/handlers/DeleteHandler.ts +++ b/packages/server/src/handlers/DeleteHandler.ts @@ -23,7 +23,7 @@ export class DeleteHandler extends BaseHandler { try { await this.store.remove(id) } finally { - await lock?.unlock() + await lock.unlock() } const writtenRes = this.write(res, 204, {}) this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id) diff --git a/packages/server/src/handlers/HeadHandler.ts b/packages/server/src/handlers/HeadHandler.ts index a8db9f10..86846aab 100644 --- a/packages/server/src/handlers/HeadHandler.ts +++ b/packages/server/src/handlers/HeadHandler.ts @@ -26,7 +26,7 @@ export class HeadHandler extends BaseHandler { try { file = await this.store.getUpload(id) } finally { - await lock?.unlock() + await lock.unlock() } // If a Client does attempt to resume an upload which has since diff --git a/packages/server/src/handlers/OptionsHandler.ts b/packages/server/src/handlers/OptionsHandler.ts index df5d2782..00af77f8 100644 --- a/packages/server/src/handlers/OptionsHandler.ts +++ b/packages/server/src/handlers/OptionsHandler.ts @@ -16,6 +16,6 @@ export class OptionsHandler extends BaseHandler { res.setHeader('Tus-Extension', this.store.extensions.join(',')) } - return this.write(res, 204, {}) + return this.write(res, 204) } } diff --git a/packages/server/src/handlers/PatchHandler.ts b/packages/server/src/handlers/PatchHandler.ts index 228d64a9..93f8f2e2 100644 --- a/packages/server/src/handlers/PatchHandler.ts +++ b/packages/server/src/handlers/PatchHandler.ts @@ -92,7 +92,7 @@ export class PatchHandler extends BaseHandler { newOffset = await this.writeToStore(req, id, offset, context) } finally { - await lock?.unlock() + await lock.unlock() } upload.offset = newOffset diff --git a/packages/server/src/handlers/PostHandler.ts b/packages/server/src/handlers/PostHandler.ts index ac52e1fa..ac079a06 100644 --- a/packages/server/src/handlers/PostHandler.ts +++ b/packages/server/src/handlers/PostHandler.ts @@ -13,8 +13,8 @@ const log = debug('tus-node-server:handlers:post') export class PostHandler extends BaseHandler { // Overriding the `BaseHandler` type. We always set `namingFunction` in the constructor. - options!: Required> & - Omit + options!: Required> & + Omit constructor(store: DataStore, options: ServerOptions) { if (options.namingFunction && typeof options.namingFunction !== 'function') { @@ -120,7 +120,7 @@ export class PostHandler extends BaseHandler { context.abort() throw e } finally { - await lock?.unlock() + await lock.unlock() } if (isFinal && this.options.onUploadFinish) { diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 72033d33..dd180ffd 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -22,6 +22,7 @@ import { import type stream from 'node:stream' import type {ServerOptions, RouteHandler} from './types' import type {DataStore, Upload, CancellationContext} from './models' +import {MemoryLocker} from './lockers' type Handlers = { GET: InstanceType @@ -58,6 +59,7 @@ interface TusEvents { type on = EventEmitter['on'] type emit = EventEmitter['emit'] + export declare interface Server { on(event: Event, listener: TusEvents[Event]): this on(eventName: Parameters[0], listener: Parameters[1]): this @@ -92,6 +94,10 @@ export class Server extends EventEmitter { throw new Error("'datastore' is not defined; must have a datastore") } + if (!options.locker) { + options.locker = new MemoryLocker() + } + const {datastore, ...rest} = options this.options = rest this.datastore = datastore diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index 78d92e2d..9ca1e3f9 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -24,7 +24,10 @@ export type ServerOptions = { // Default uses `crypto.randomBytes(16).toString('hex')`. namingFunction?: (req: http.IncomingMessage) => string // locker implementation to support distributed locks - locker?: Locker | ((req: http.IncomingMessage) => Locker | Promise) + locker?: + | Locker + | Promise + | ((req: http.IncomingMessage) => Locker | Promise) // `onUploadCreate` will be invoked before a new upload is created. // If the function returns the (modified) response, the upload will be created. // If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks) diff --git a/packages/server/test/DeleteHandler.test.ts b/packages/server/test/DeleteHandler.test.ts index 825bcb9f..a785df3a 100644 --- a/packages/server/test/DeleteHandler.test.ts +++ b/packages/server/test/DeleteHandler.test.ts @@ -10,6 +10,7 @@ import {DataStore} from '../src/models/DataStore' import {DeleteHandler} from '../src/handlers/DeleteHandler' import {ERRORS, EVENTS} from '../src/constants' import {CancellationContext} from '../src/models' +import {MemoryLocker} from '../src' describe('DeleteHandler', () => { const path = '/test/output' @@ -21,7 +22,11 @@ describe('DeleteHandler', () => { beforeEach(() => { fake_store.remove.resetHistory() - handler = new DeleteHandler(fake_store, {relativeLocation: true, path}) + handler = new DeleteHandler(fake_store, { + relativeLocation: true, + path, + locker: new MemoryLocker(), + }) req = {url: `${path}/1234`, method: 'DELETE'} as http.IncomingMessage res = httpMocks.createResponse() const abortController = new AbortController() From 24071daee8e0b9191d789d95d5d4234ed7d12504 Mon Sep 17 00:00:00 2001 From: fenos Date: Fri, 8 Dec 2023 14:23:16 +0000 Subject: [PATCH 7/8] removed p-queue from file store --- .../configstores/FileConfigstore.ts | 25 ++++++-------- packages/file-store/package.json | 3 +- packages/server/src/server.ts | 5 --- yarn.lock | 34 ------------------- 4 files changed, 11 insertions(+), 56 deletions(-) diff --git a/packages/file-store/configstores/FileConfigstore.ts b/packages/file-store/configstores/FileConfigstore.ts index ef573293..e118e349 100644 --- a/packages/file-store/configstores/FileConfigstore.ts +++ b/packages/file-store/configstores/FileConfigstore.ts @@ -1,7 +1,6 @@ import fs from 'node:fs/promises' import path from 'node:path' import {Upload} from '@tus/server' -import PQueue from 'p-queue' import {Configstore} from './Types' @@ -11,16 +10,14 @@ import {Configstore} from './Types' */ export class FileConfigstore implements Configstore { directory: string - queue: PQueue constructor(path: string) { this.directory = path - this.queue = new PQueue({concurrency: 1}) } async get(key: string): Promise { try { - const buffer = await this.queue.add(() => fs.readFile(this.resolve(key), 'utf8')) + const buffer = await fs.readFile(this.resolve(key), 'utf8') return JSON.parse(buffer as string) } catch { return undefined @@ -28,23 +25,21 @@ export class FileConfigstore implements Configstore { } async set(key: string, value: Upload): Promise { - await this.queue.add(() => fs.writeFile(this.resolve(key), JSON.stringify(value))) + await fs.writeFile(this.resolve(key), JSON.stringify(value)) } async delete(key: string): Promise { - await this.queue.add(() => fs.rm(this.resolve(key))) + await fs.rm(this.resolve(key)) } async list(): Promise> { - return this.queue.add(async () => { - const files = await fs.readdir(this.directory) - const sorted = files.sort((a, b) => a.localeCompare(b)) - const name = (file: string) => path.basename(file, '.json') - // To only return tus file IDs we check if the file has a corresponding JSON info file - return sorted.filter( - (file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1]) - ) - }) + const files = await fs.readdir(this.directory) + const sorted = files.sort((a, b) => a.localeCompare(b)) + const name = (file: string) => path.basename(file, '.json') + // To only return tus file IDs we check if the file has a corresponding JSON info file + return sorted.filter( + (file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1]) + ) } private resolve(key: string): string { diff --git a/packages/file-store/package.json b/packages/file-store/package.json index 788c9c46..3b356c5d 100644 --- a/packages/file-store/package.json +++ b/packages/file-store/package.json @@ -21,8 +21,7 @@ "test": "mocha test.ts --exit --extension ts --require ts-node/register" }, "dependencies": { - "debug": "^4.3.4", - "p-queue": "^6.6.2" + "debug": "^4.3.4" }, "devDependencies": { "@tus/server": "workspace:^", diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index dd180ffd..cece359b 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -22,7 +22,6 @@ import { import type stream from 'node:stream' import type {ServerOptions, RouteHandler} from './types' import type {DataStore, Upload, CancellationContext} from './models' -import {MemoryLocker} from './lockers' type Handlers = { GET: InstanceType @@ -94,10 +93,6 @@ export class Server extends EventEmitter { throw new Error("'datastore' is not defined; must have a datastore") } - if (!options.locker) { - options.locker = new MemoryLocker() - } - const {datastore, ...rest} = options this.options = rest this.datastore = datastore diff --git a/yarn.lock b/yarn.lock index 31332497..fd8bb4bc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1500,7 +1500,6 @@ __metadata: eslint: ^8.48.0 eslint-config-custom: "workspace:*" mocha: ^10.2.0 - p-queue: ^6.6.2 should: ^13.2.3 typescript: ^5.2.2 peerDependencies: @@ -2858,13 +2857,6 @@ __metadata: languageName: node linkType: hard -"eventemitter3@npm:^4.0.4": - version: 4.0.7 - resolution: "eventemitter3@npm:4.0.7" - checksum: 1875311c42fcfe9c707b2712c32664a245629b42bb0a5a84439762dd0fd637fc54d078155ea83c2af9e0323c9ac13687e03cfba79b03af9f40c89b4960099374 - languageName: node - linkType: hard - "events@npm:^3.3.0": version: 3.3.0 resolution: "events@npm:3.3.0" @@ -4217,13 +4209,6 @@ __metadata: languageName: node linkType: hard -"p-finally@npm:^1.0.0": - version: 1.0.0 - resolution: "p-finally@npm:1.0.0" - checksum: 93a654c53dc805dd5b5891bab16eb0ea46db8f66c4bfd99336ae929323b1af2b70a8b0654f8f1eae924b2b73d037031366d645f1fd18b3d30cbd15950cc4b1d4 - languageName: node - linkType: hard - "p-limit@npm:^3.0.1, p-limit@npm:^3.0.2": version: 3.1.0 resolution: "p-limit@npm:3.1.0" @@ -4251,25 +4236,6 @@ __metadata: languageName: node linkType: hard -"p-queue@npm:^6.6.2": - version: 6.6.2 - resolution: "p-queue@npm:6.6.2" - dependencies: - eventemitter3: ^4.0.4 - p-timeout: ^3.2.0 - checksum: 832642fcc4ab6477b43e6d7c30209ab10952969ed211c6d6f2931be8a4f9935e3578c72e8cce053dc34f2eb6941a408a2c516a54904e989851a1a209cf19761c - languageName: node - linkType: hard - -"p-timeout@npm:^3.2.0": - version: 3.2.0 - resolution: "p-timeout@npm:3.2.0" - dependencies: - p-finally: ^1.0.0 - checksum: 3dd0eaa048780a6f23e5855df3dd45c7beacff1f820476c1d0d1bcd6648e3298752ba2c877aa1c92f6453c7dd23faaf13d9f5149fc14c0598a142e2c5e8d649c - languageName: node - linkType: hard - "parent-module@npm:^1.0.0": version: 1.0.1 resolution: "parent-module@npm:1.0.1" From b59f5c2cc996cb30de7172df13cec6194ddc0193 Mon Sep 17 00:00:00 2001 From: fenos Date: Tue, 12 Dec 2023 09:07:51 +0000 Subject: [PATCH 8/8] default locker on server constructor --- packages/server/src/handlers/BaseHandler.ts | 13 +++--------- packages/server/src/handlers/PostHandler.ts | 4 ++-- packages/server/src/server.ts | 11 ++++++++--- packages/server/src/types.ts | 2 ++ packages/server/test/BaseHandler.test.ts | 8 +++++++- packages/server/test/GetHandler.test.ts | 15 +++++++------- packages/server/test/HeadHandler.test.ts | 7 ++++++- packages/server/test/OptionsHandler.test.ts | 3 ++- packages/server/test/PatchHandler.test.ts | 4 +++- packages/server/test/PostHandler.test.ts | 22 +++++++++++++++++++-- 10 files changed, 61 insertions(+), 28 deletions(-) diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index 203d93dc..2e777dca 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -1,34 +1,27 @@ import EventEmitter from 'node:events' -import type {ServerOptions} from '../types' +import type {ServerOptions, WithRequired} from '../types' import type {DataStore, CancellationContext} from '../models' import type http from 'node:http' import stream from 'node:stream' import {ERRORS} from '../constants' -import {MemoryLocker} from '../lockers' const reExtractFileID = /([^/]+)\/?$/ const reForwardedHost = /host="?([^";]+)/ const reForwardedProto = /proto=(https?)/ -type WithRequired = T & {[P in K]-?: T[P]} - export class BaseHandler extends EventEmitter { options: WithRequired store: DataStore - constructor(store: DataStore, options: ServerOptions) { + constructor(store: DataStore, options: WithRequired) { super() if (!store) { throw new Error('Store must be defined') } - if (!options.locker) { - options.locker = new MemoryLocker() - } - this.store = store - this.options = options as WithRequired + this.options = options } write(res: http.ServerResponse, status: number, headers = {}, body = '') { diff --git a/packages/server/src/handlers/PostHandler.ts b/packages/server/src/handlers/PostHandler.ts index ac079a06..37eb88c3 100644 --- a/packages/server/src/handlers/PostHandler.ts +++ b/packages/server/src/handlers/PostHandler.ts @@ -6,7 +6,7 @@ import {validateHeader} from '../validators/HeaderValidator' import {EVENTS, ERRORS} from '../constants' import type http from 'node:http' -import type {ServerOptions} from '../types' +import type {ServerOptions, WithRequired} from '../types' import {DataStore, CancellationContext} from '../models' const log = debug('tus-node-server:handlers:post') @@ -16,7 +16,7 @@ export class PostHandler extends BaseHandler { options!: Required> & Omit - constructor(store: DataStore, options: ServerOptions) { + constructor(store: DataStore, options: WithRequired) { if (options.namingFunction && typeof options.namingFunction !== 'function') { throw new Error("'namingFunction' must be a function") } diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index cece359b..5f7a02ab 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -20,8 +20,9 @@ import { } from './constants' import type stream from 'node:stream' -import type {ServerOptions, RouteHandler} from './types' +import type {ServerOptions, RouteHandler, WithRequired} from './types' import type {DataStore, Upload, CancellationContext} from './models' +import {MemoryLocker} from './lockers' type Handlers = { GET: InstanceType @@ -76,7 +77,7 @@ const log = debug('tus-node-server') export class Server extends EventEmitter { datastore: DataStore handlers: Handlers - options: ServerOptions + options: WithRequired constructor(options: ServerOptions & {datastore: DataStore}) { super() @@ -93,8 +94,12 @@ export class Server extends EventEmitter { throw new Error("'datastore' is not defined; must have a datastore") } + if (!options.locker) { + options.locker = new MemoryLocker() + } + const {datastore, ...rest} = options - this.options = rest + this.options = rest as WithRequired this.datastore = datastore this.handlers = { // GET handlers should be written in the implementations diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index 9ca1e3f9..a9ddcbcc 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -64,3 +64,5 @@ export type ServerOptions = { } export type RouteHandler = (req: http.IncomingMessage, res: http.ServerResponse) => void + +export type WithRequired = T & {[P in K]-?: T[P]} diff --git a/packages/server/test/BaseHandler.test.ts b/packages/server/test/BaseHandler.test.ts index 664ee2aa..7fe2117b 100644 --- a/packages/server/test/BaseHandler.test.ts +++ b/packages/server/test/BaseHandler.test.ts @@ -5,10 +5,14 @@ import httpMocks from 'node-mocks-http' import {BaseHandler} from '../src/handlers//BaseHandler' import {DataStore} from '../src/models' +import {MemoryLocker} from '../src' describe('BaseHandler', () => { const store = new DataStore() - const handler = new BaseHandler(store, {path: '/test/output'}) + const handler = new BaseHandler(store, { + path: '/test/output', + locker: new MemoryLocker(), + }) let res: httpMocks.MockResponse beforeEach(() => { @@ -69,6 +73,7 @@ describe('BaseHandler', () => { it('should allow to to generate a url with a custom function', () => { const handler = new BaseHandler(store, { path: '/path', + locker: new MemoryLocker(), generateUrl: (req: http.IncomingMessage, info) => { const {proto, host, baseUrl, path, id} = info return `${proto}://${host}${baseUrl}${path}/${id}?customParam=1` @@ -89,6 +94,7 @@ describe('BaseHandler', () => { it('should allow extracting the request id with a custom function', () => { const handler = new BaseHandler(store, { path: '/path', + locker: new MemoryLocker(), getFileIdFromRequest: (req: http.IncomingMessage) => { return req.url?.split('/').pop() + '-custom' }, diff --git a/packages/server/test/GetHandler.test.ts b/packages/server/test/GetHandler.test.ts index 03ba3985..db9e3253 100644 --- a/packages/server/test/GetHandler.test.ts +++ b/packages/server/test/GetHandler.test.ts @@ -12,10 +12,11 @@ import {GetHandler} from '../src/handlers/GetHandler' import {DataStore} from '../src/models/DataStore' import {FileStore} from '@tus/file-store' import {Upload} from '../src/models/Upload' +import {MemoryLocker} from '../src' describe('GetHandler', () => { const path = '/test/output' - const serverOptions = {path} + const serverOptions = {path, locker: new MemoryLocker()} let req: http.IncomingMessage let res: http.ServerResponse @@ -28,7 +29,7 @@ describe('GetHandler', () => { it('should 404 when file does not exist', async () => { const store = sinon.createStubInstance(FileStore) store.getUpload.rejects({status_code: 404}) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const spy_getFileIdFromRequest = sinon.spy(handler, 'getFileIdFromRequest') req.url = `${path}/1234` await assert.rejects(() => handler.send(req, res), {status_code: 404}) @@ -37,7 +38,7 @@ describe('GetHandler', () => { it('should 404 for non registered path', async () => { const store = sinon.createStubInstance(FileStore) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const spy_getFileIdFromRequest = sinon.spy(handler, 'getFileIdFromRequest') req.url = `/not_a_valid_file_path` await assert.rejects(() => handler.send(req, res), {status_code: 404}) @@ -47,7 +48,7 @@ describe('GetHandler', () => { it('should 404 when file is not complete', async () => { const store = sinon.createStubInstance(FileStore) store.getUpload.resolves(new Upload({id: '1234', offset: 512, size: 1024})) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await assert.rejects(() => handler.send(req, res), {status_code: 404}) @@ -71,7 +72,7 @@ describe('GetHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: size, size})) // @ts-expect-error what should this be? store.read.returns(stream.Readable.from(fs.createReadStream('invalid_path'))) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await handler.send(req, res) @@ -87,7 +88,7 @@ describe('GetHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: 512, size: 512})) // @ts-expect-error should store.read.returns(stream.Readable.from(Buffer.alloc(512))) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await handler.send(req, res) @@ -101,7 +102,7 @@ describe('GetHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: size, size})) // @ts-expect-error what should this be? store.read.returns(stream.Readable.from(Buffer.alloc(size), {objectMode: false})) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await handler.send(req, res) diff --git a/packages/server/test/HeadHandler.test.ts b/packages/server/test/HeadHandler.test.ts index 08f1ec0b..243e9f60 100644 --- a/packages/server/test/HeadHandler.test.ts +++ b/packages/server/test/HeadHandler.test.ts @@ -7,11 +7,16 @@ import httpMocks from 'node-mocks-http' import {DataStore, Upload, CancellationContext} from '../src/models' import {HeadHandler} from '../src/handlers/HeadHandler' import {ERRORS} from '../src/constants' +import {MemoryLocker} from '../src' describe('HeadHandler', () => { const path = '/test/output' const fake_store = sinon.createStubInstance(DataStore) - const handler = new HeadHandler(fake_store, {relativeLocation: true, path}) + const handler = new HeadHandler(fake_store, { + relativeLocation: true, + path, + locker: new MemoryLocker(), + }) let req: http.IncomingMessage let res: httpMocks.MockResponse let context: CancellationContext diff --git a/packages/server/test/OptionsHandler.test.ts b/packages/server/test/OptionsHandler.test.ts index b4a35f49..5a3aedb2 100644 --- a/packages/server/test/OptionsHandler.test.ts +++ b/packages/server/test/OptionsHandler.test.ts @@ -8,9 +8,10 @@ import httpMocks from 'node-mocks-http' import {OptionsHandler} from '../src/handlers/OptionsHandler' import {DataStore} from '../src/models/DataStore' import {ALLOWED_METHODS, ALLOWED_HEADERS, MAX_AGE} from '../src/constants' +import {MemoryLocker} from '../src' describe('OptionsHandler', () => { - const options = {path: '/test/output'} + const options = {path: '/test/output', locker: new MemoryLocker()} const store = new DataStore() const handler = new OptionsHandler(store, options) diff --git a/packages/server/test/PatchHandler.test.ts b/packages/server/test/PatchHandler.test.ts index 4384df94..f8f6c91e 100644 --- a/packages/server/test/PatchHandler.test.ts +++ b/packages/server/test/PatchHandler.test.ts @@ -11,6 +11,7 @@ import {Upload, DataStore, CancellationContext} from '../src/models' import {EVENTS} from '../src/constants' import {EventEmitter} from 'node:events' import {addPipableStreamBody} from './utils' +import {MemoryLocker} from '../src' describe('PatchHandler', () => { const path = '/test/output' @@ -22,7 +23,7 @@ describe('PatchHandler', () => { beforeEach(() => { store = sinon.createStubInstance(DataStore) - handler = new PatchHandler(store, {path}) + handler = new PatchHandler(store, {path, locker: new MemoryLocker()}) req = addPipableStreamBody( httpMocks.createRequest({ method: 'PATCH', @@ -54,6 +55,7 @@ describe('PatchHandler', () => { const handler = new PatchHandler(store, { path: '/test/output', onUploadFinish: spy, + locker: new MemoryLocker(), }) req.headers = { diff --git a/packages/server/test/PostHandler.test.ts b/packages/server/test/PostHandler.test.ts index eea9a6c8..e7ce3559 100644 --- a/packages/server/test/PostHandler.test.ts +++ b/packages/server/test/PostHandler.test.ts @@ -11,10 +11,12 @@ import {Upload, DataStore, CancellationContext} from '../src/models' import {PostHandler} from '../src/handlers/PostHandler' import {EVENTS} from '../src/constants' import {addPipableStreamBody} from './utils' +import {MemoryLocker} from '../src' const SERVER_OPTIONS = { path: '/test', namingFunction: () => '1234', + locker: new MemoryLocker(), } describe('PostHandler', () => { @@ -73,6 +75,7 @@ describe('PostHandler', () => { const fake_store = sinon.createStubInstance(DataStore) const handler = new PostHandler(fake_store, { path: '/test', + locker: new MemoryLocker(), namingFunction: sinon.stub().throws(), }) @@ -83,7 +86,11 @@ describe('PostHandler', () => { it('should call custom namingFunction', async () => { const fake_store = sinon.createStubInstance(DataStore) const namingFunction = sinon.stub().returns('1234') - const handler = new PostHandler(fake_store, {path: '/test/', namingFunction}) + const handler = new PostHandler(fake_store, { + path: '/test/', + namingFunction, + locker: new MemoryLocker(), + }) req.headers = {'upload-length': '1000'} await handler.send(req, res, context) @@ -105,6 +112,7 @@ describe('PostHandler', () => { it('must acknowledge successful POST requests with the 201', async () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), namingFunction: () => '1234', }) req.headers = {'upload-length': '1000', host: 'localhost:3000'} @@ -117,6 +125,7 @@ describe('PostHandler', () => { describe('respect forwarded headers', () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), respectForwardedHeaders: true, namingFunction: () => '1234', }) @@ -169,6 +178,7 @@ describe('PostHandler', () => { it('should handle root as path', async () => { const handler = new PostHandler(fake_store, { path: '/', + locker: new MemoryLocker(), respectForwardedHeaders: true, namingFunction: () => '1234', }) @@ -202,6 +212,7 @@ describe('PostHandler', () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), namingFunction: () => '1234', }) handler.on(EVENTS.POST_CREATE, (_, __, ___, url) => { @@ -221,6 +232,7 @@ describe('PostHandler', () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), relativeLocation: true, namingFunction: () => '1234', }) @@ -241,7 +253,10 @@ describe('PostHandler', () => { fake_store.create.resolvesArg(0) fake_store.write.resolves(upload_length) - const handler = new PostHandler(fake_store, {path: '/test/output'}) + const handler = new PostHandler(fake_store, { + path: '/test/output', + locker: new MemoryLocker(), + }) handler.on(EVENTS.POST_CREATE, () => { done() }) @@ -259,6 +274,7 @@ describe('PostHandler', () => { const spy = sinon.stub().resolvesArg(1) const handler = new PostHandler(store, { path: '/test/output', + locker: new MemoryLocker(), onUploadCreate: spy, }) @@ -280,6 +296,7 @@ describe('PostHandler', () => { const spy = sinon.stub().resolvesArg(1) const handler = new PostHandler(store, { path: '/test/output', + locker: new MemoryLocker(), onUploadFinish: spy, }) @@ -303,6 +320,7 @@ describe('PostHandler', () => { const spy = sinon.stub().resolvesArg(1) const handler = new PostHandler(store, { path: '/test/output', + locker: new MemoryLocker(), onUploadFinish: spy, })