diff --git a/.yarnrc.yml b/.yarnrc.yml index faa024cd..b229bb79 100644 --- a/.yarnrc.yml +++ b/.yarnrc.yml @@ -7,3 +7,5 @@ plugins: spec: "@yarnpkg/plugin-typescript" - path: .yarn/plugins/@yarnpkg/plugin-interactive-tools.cjs spec: "@yarnpkg/plugin-interactive-tools" + +yarnPath: .yarn/releases/yarn-3.2.3.cjs diff --git a/packages/file-store/configstores/FileConfigstore.ts b/packages/file-store/configstores/FileConfigstore.ts index ef573293..e118e349 100644 --- a/packages/file-store/configstores/FileConfigstore.ts +++ b/packages/file-store/configstores/FileConfigstore.ts @@ -1,7 +1,6 @@ import fs from 'node:fs/promises' import path from 'node:path' import {Upload} from '@tus/server' -import PQueue from 'p-queue' import {Configstore} from './Types' @@ -11,16 +10,14 @@ import {Configstore} from './Types' */ export class FileConfigstore implements Configstore { directory: string - queue: PQueue constructor(path: string) { this.directory = path - this.queue = new PQueue({concurrency: 1}) } async get(key: string): Promise { try { - const buffer = await this.queue.add(() => fs.readFile(this.resolve(key), 'utf8')) + const buffer = await fs.readFile(this.resolve(key), 'utf8') return JSON.parse(buffer as string) } catch { return undefined @@ -28,23 +25,21 @@ export class FileConfigstore implements Configstore { } async set(key: string, value: Upload): Promise { - await this.queue.add(() => fs.writeFile(this.resolve(key), JSON.stringify(value))) + await fs.writeFile(this.resolve(key), JSON.stringify(value)) } async delete(key: string): Promise { - await this.queue.add(() => fs.rm(this.resolve(key))) + await fs.rm(this.resolve(key)) } async list(): Promise> { - return this.queue.add(async () => { - const files = await fs.readdir(this.directory) - const sorted = files.sort((a, b) => a.localeCompare(b)) - const name = (file: string) => path.basename(file, '.json') - // To only return tus file IDs we check if the file has a corresponding JSON info file - return sorted.filter( - (file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1]) - ) - }) + const files = await fs.readdir(this.directory) + const sorted = files.sort((a, b) => a.localeCompare(b)) + const name = (file: string) => path.basename(file, '.json') + // To only return tus file IDs we check if the file has a corresponding JSON info file + return sorted.filter( + (file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1]) + ) } private resolve(key: string): string { diff --git a/packages/file-store/package.json b/packages/file-store/package.json index 788c9c46..894a9c0a 100644 --- a/packages/file-store/package.json +++ b/packages/file-store/package.json @@ -1,7 +1,7 @@ { "$schema": "https://json.schemastore.org/package.json", "name": "@tus/file-store", - "version": "1.0.1", + "version": "1.1.0", "description": "Local file storage for @tus/server", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -21,8 +21,7 @@ "test": "mocha test.ts --exit --extension ts --require ts-node/register" }, "dependencies": { - "debug": "^4.3.4", - "p-queue": "^6.6.2" + "debug": "^4.3.4" }, "devDependencies": { "@tus/server": "workspace:^", diff --git a/packages/s3-store/package.json b/packages/s3-store/package.json index 93bd1c33..c7b14218 100644 --- a/packages/s3-store/package.json +++ b/packages/s3-store/package.json @@ -1,7 +1,7 @@ { "$schema": "https://json.schemastore.org/package.json", "name": "@tus/s3-store", - "version": "1.1.1", + "version": "1.2.0", "description": "AWS S3 store for @tus/server", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/packages/server/package.json b/packages/server/package.json index 73d324e0..ae9f0db7 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -1,7 +1,7 @@ { "$schema": "https://json.schemastore.org/package.json", "name": "@tus/server", - "version": "1.0.1", + "version": "1.1.0", "description": "Tus resumable upload protocol in Node.js", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/packages/server/src/constants.ts b/packages/server/src/constants.ts index 1048b489..c9247711 100644 --- a/packages/server/src/constants.ts +++ b/packages/server/src/constants.ts @@ -33,6 +33,14 @@ export const ERRORS = { status_code: 403, body: 'Upload-Offset header required\n', }, + ABORTED: { + status_code: 400, + body: 'Request aborted due to lock acquired', + }, + ERR_LOCK_TIMEOUT: { + status_code: 500, + body: 'failed to acquire lock before timeout', + }, INVALID_CONTENT_TYPE: { status_code: 403, body: 'Content-Type header required\n', diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index 26eedf87..aae7bf0f 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -1,8 +1,10 @@ import EventEmitter from 'node:events' import type {ServerOptions} from '../types' -import type {DataStore} from '../models' +import type {DataStore, CancellationContext} from '../models' import type http from 'node:http' +import stream from 'node:stream' +import {ERRORS} from '../constants' const reExtractFileID = /([^/]+)\/?$/ const reForwardedHost = /host="?([^";]+)/ @@ -27,6 +29,7 @@ export class BaseHandler extends EventEmitter { // @ts-expect-error not explicitly typed but possible headers['Content-Length'] = Buffer.byteLength(body, 'utf8') } + res.writeHead(status, headers) res.write(body) return res.end() @@ -101,4 +104,61 @@ export class BaseHandler extends EventEmitter { return {host: host as string, proto} } + + protected async getLocker(req: http.IncomingMessage) { + if (typeof this.options.locker === 'function') { + return this.options.locker(req) + } + return this.options.locker + } + + protected async acquireLock( + req: http.IncomingMessage, + id: string, + context: CancellationContext + ) { + const locker = await this.getLocker(req) + + const lock = locker.newLock(id) + + await lock.lock(() => { + context.cancel() + }) + + return lock + } + + protected writeToStore( + req: http.IncomingMessage, + id: string, + offset: number, + context: CancellationContext + ) { + return new Promise(async (resolve, reject) => { + if (context.signal.aborted) { + reject(ERRORS.ABORTED) + return + } + + const proxy = new stream.PassThrough() + stream.addAbortSignal(context.signal, proxy) + + proxy.on('error', (err) => { + req.unpipe(proxy) + if (err.name === 'AbortError') { + reject(ERRORS.ABORTED) + } else { + reject(err) + } + }) + + req.on('error', (err) => { + if (!proxy.closed) { + proxy.destroy(err) + } + }) + + this.store.write(req.pipe(proxy), id, offset).then(resolve).catch(reject) + }) + } } diff --git a/packages/server/src/handlers/DeleteHandler.ts b/packages/server/src/handlers/DeleteHandler.ts index 8625fe61..cb685070 100644 --- a/packages/server/src/handlers/DeleteHandler.ts +++ b/packages/server/src/handlers/DeleteHandler.ts @@ -1,10 +1,15 @@ import {BaseHandler} from './BaseHandler' import {ERRORS, EVENTS} from '../constants' +import {CancellationContext} from '../models' import type http from 'node:http' export class DeleteHandler extends BaseHandler { - async send(req: http.IncomingMessage, res: http.ServerResponse) { + async send( + req: http.IncomingMessage, + res: http.ServerResponse, + context: CancellationContext + ) { const id = this.getFileIdFromRequest(req) if (!id) { throw ERRORS.FILE_NOT_FOUND @@ -14,7 +19,12 @@ export class DeleteHandler extends BaseHandler { await this.options.onIncomingRequest(req, res, id) } - await this.store.remove(id) + const lock = await this.acquireLock(req, id, context) + try { + await this.store.remove(id) + } finally { + await lock.unlock() + } const writtenRes = this.write(res, 204, {}) this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id) return writtenRes diff --git a/packages/server/src/handlers/GetHandler.ts b/packages/server/src/handlers/GetHandler.ts index 2f704cd4..d8c34990 100644 --- a/packages/server/src/handlers/GetHandler.ts +++ b/packages/server/src/handlers/GetHandler.ts @@ -40,6 +40,7 @@ export class GetHandler extends BaseHandler { } const stats = await this.store.getUpload(id) + if (!stats || stats.offset !== stats.size) { throw ERRORS.FILE_NOT_FOUND } diff --git a/packages/server/src/handlers/HeadHandler.ts b/packages/server/src/handlers/HeadHandler.ts index 6d5946bd..86846aab 100644 --- a/packages/server/src/handlers/HeadHandler.ts +++ b/packages/server/src/handlers/HeadHandler.ts @@ -1,12 +1,16 @@ import {BaseHandler} from './BaseHandler' import {ERRORS} from '../constants' -import {Metadata} from '../models' +import {Metadata, Upload, CancellationContext} from '../models' import type http from 'node:http' export class HeadHandler extends BaseHandler { - async send(req: http.IncomingMessage, res: http.ServerResponse) { + async send( + req: http.IncomingMessage, + res: http.ServerResponse, + context: CancellationContext + ) { const id = this.getFileIdFromRequest(req) if (!id) { throw ERRORS.FILE_NOT_FOUND @@ -16,7 +20,14 @@ export class HeadHandler extends BaseHandler { await this.options.onIncomingRequest(req, res, id) } - const file = await this.store.getUpload(id) + const lock = await this.acquireLock(req, id, context) + + let file: Upload + try { + file = await this.store.getUpload(id) + } finally { + await lock.unlock() + } // If a Client does attempt to resume an upload which has since // been removed by the Server, the Server SHOULD respond with the diff --git a/packages/server/src/handlers/PatchHandler.ts b/packages/server/src/handlers/PatchHandler.ts index fc3be942..93f8f2e2 100644 --- a/packages/server/src/handlers/PatchHandler.ts +++ b/packages/server/src/handlers/PatchHandler.ts @@ -4,6 +4,7 @@ import {BaseHandler} from './BaseHandler' import {ERRORS, EVENTS} from '../constants' import type http from 'node:http' +import {CancellationContext, Upload} from '../models' const log = debug('tus-node-server:handlers:patch') @@ -11,116 +12,132 @@ export class PatchHandler extends BaseHandler { /** * Write data to the DataStore and return the new offset. */ - async send(req: http.IncomingMessage, res: http.ServerResponse) { - const id = this.getFileIdFromRequest(req) - if (!id) { - throw ERRORS.FILE_NOT_FOUND - } - - // The request MUST include a Upload-Offset header - if (req.headers['upload-offset'] === undefined) { - throw ERRORS.MISSING_OFFSET - } - - const offset = Number.parseInt(req.headers['upload-offset'] as string, 10) + async send( + req: http.IncomingMessage, + res: http.ServerResponse, + context: CancellationContext + ) { + try { + const id = this.getFileIdFromRequest(req) + if (!id) { + throw ERRORS.FILE_NOT_FOUND + } - // The request MUST include a Content-Type header - const content_type = req.headers['content-type'] - if (content_type === undefined) { - throw ERRORS.INVALID_CONTENT_TYPE - } + // The request MUST include a Upload-Offset header + if (req.headers['upload-offset'] === undefined) { + throw ERRORS.MISSING_OFFSET + } - if (this.options.onIncomingRequest) { - await this.options.onIncomingRequest(req, res, id) - } + const offset = Number.parseInt(req.headers['upload-offset'] as string, 10) - const upload = await this.store.getUpload(id) - - // If a Client does attempt to resume an upload which has since - // been removed by the Server, the Server SHOULD respond with the - // with the 404 Not Found or 410 Gone status. The latter one SHOULD - // be used if the Server is keeping track of expired uploads. - const now = Date.now() - const creation = upload.creation_date ? new Date(upload.creation_date).getTime() : now - const expiration = creation + this.store.getExpiration() - if ( - this.store.hasExtension('expiration') && - this.store.getExpiration() > 0 && - now > expiration - ) { - throw ERRORS.FILE_NO_LONGER_EXISTS - } + // The request MUST include a Content-Type header + const content_type = req.headers['content-type'] + if (content_type === undefined) { + throw ERRORS.INVALID_CONTENT_TYPE + } - if (upload.offset !== offset) { - // If the offsets do not match, the Server MUST respond with the 409 Conflict status without modifying the upload resource. - log( - `[PatchHandler] send: Incorrect offset - ${offset} sent but file is ${upload.offset}` - ) - throw ERRORS.INVALID_OFFSET - } + const lock = await this.acquireLock(req, id, context) - // The request MUST validate upload-length related headers - const upload_length = req.headers['upload-length'] as string | undefined - if (upload_length !== undefined) { - const size = Number.parseInt(upload_length, 10) - // Throw error if extension is not supported - if (!this.store.hasExtension('creation-defer-length')) { - throw ERRORS.UNSUPPORTED_CREATION_DEFER_LENGTH_EXTENSION + let upload: Upload + let newOffset: number + try { + upload = await this.store.getUpload(id) + + // If a Client does attempt to resume an upload which has since + // been removed by the Server, the Server SHOULD respond with the + // with the 404 Not Found or 410 Gone status. The latter one SHOULD + // be used if the Server is keeping track of expired uploads. + const now = Date.now() + const creation = upload.creation_date + ? new Date(upload.creation_date).getTime() + : now + const expiration = creation + this.store.getExpiration() + if ( + this.store.hasExtension('expiration') && + this.store.getExpiration() > 0 && + now > expiration + ) { + throw ERRORS.FILE_NO_LONGER_EXISTS + } + + if (upload.offset !== offset) { + // If the offsets do not match, the Server MUST respond with the 409 Conflict status without modifying the upload resource. + log( + `[PatchHandler] send: Incorrect offset - ${offset} sent but file is ${upload.offset}` + ) + throw ERRORS.INVALID_OFFSET + } + + // The request MUST validate upload-length related headers + const upload_length = req.headers['upload-length'] as string | undefined + if (upload_length !== undefined) { + const size = Number.parseInt(upload_length, 10) + // Throw error if extension is not supported + if (!this.store.hasExtension('creation-defer-length')) { + throw ERRORS.UNSUPPORTED_CREATION_DEFER_LENGTH_EXTENSION + } + + // Throw error if upload-length is already set. + if (upload.size !== undefined) { + throw ERRORS.INVALID_LENGTH + } + + if (size < upload.offset) { + throw ERRORS.INVALID_LENGTH + } + + await this.store.declareUploadLength(id, size) + upload.size = size + } + + newOffset = await this.writeToStore(req, id, offset, context) + } finally { + await lock.unlock() } - // Throw error if upload-length is already set. - if (upload.size !== undefined) { - throw ERRORS.INVALID_LENGTH + upload.offset = newOffset + this.emit(EVENTS.POST_RECEIVE, req, res, upload) + if (newOffset === upload.size && this.options.onUploadFinish) { + try { + res = await this.options.onUploadFinish(req, res, upload) + } catch (error) { + log(`onUploadFinish: ${error.body}`) + throw error + } } - if (size < upload.offset) { - throw ERRORS.INVALID_LENGTH + const headers: { + 'Upload-Offset': number + 'Upload-Expires'?: string + } = { + 'Upload-Offset': newOffset, } - await this.store.declareUploadLength(id, size) - upload.size = size - } - - const newOffset = await this.store.write(req, id, offset) - upload.offset = newOffset - this.emit(EVENTS.POST_RECEIVE, req, res, upload) - if (newOffset === upload.size && this.options.onUploadFinish) { - try { - res = await this.options.onUploadFinish(req, res, upload) - } catch (error) { - log(`onUploadFinish: ${error.body}`) - throw error + if ( + this.store.hasExtension('expiration') && + this.store.getExpiration() > 0 && + upload.creation_date && + (upload.size === undefined || newOffset < upload.size) + ) { + const creation = new Date(upload.creation_date) + // Value MUST be in RFC 7231 datetime format + const dateString = new Date( + creation.getTime() + this.store.getExpiration() + ).toUTCString() + headers['Upload-Expires'] = dateString } - } - const headers: { - 'Upload-Offset': number - 'Upload-Expires'?: string - } = { - 'Upload-Offset': newOffset, - } - - if ( - this.store.hasExtension('expiration') && - this.store.getExpiration() > 0 && - upload.creation_date && - (upload.size === undefined || newOffset < upload.size) - ) { - const creation = new Date(upload.creation_date) - // Value MUST be in RFC 7231 datetime format - const dateString = new Date( - creation.getTime() + this.store.getExpiration() - ).toUTCString() - headers['Upload-Expires'] = dateString - } + // The Server MUST acknowledge successful PATCH requests with the 204 + const writtenRes = this.write(res, 204, headers) - // The Server MUST acknowledge successful PATCH requests with the 204 - const writtenRes = this.write(res, 204, headers) + if (newOffset === upload.size) { + this.emit(EVENTS.POST_FINISH, req, writtenRes, upload) + } - if (newOffset === upload.size) { - this.emit(EVENTS.POST_FINISH, req, writtenRes, upload) + return writtenRes + } catch (e) { + context.abort() + throw e } - - return writtenRes } } diff --git a/packages/server/src/handlers/PostHandler.ts b/packages/server/src/handlers/PostHandler.ts index 61ddf7fc..9d93f4af 100644 --- a/packages/server/src/handlers/PostHandler.ts +++ b/packages/server/src/handlers/PostHandler.ts @@ -6,15 +6,14 @@ import {validateHeader} from '../validators/HeaderValidator' import {EVENTS, ERRORS} from '../constants' import type http from 'node:http' -import type {ServerOptions} from '../types' -import type {DataStore} from '../models' +import type {ServerOptions, WithRequired} from '../types' +import {DataStore, CancellationContext} from '../models' const log = debug('tus-node-server:handlers:post') export class PostHandler extends BaseHandler { // Overriding the `BaseHandler` type. We always set `namingFunction` in the constructor. - options!: Required> & - Omit + options!: WithRequired constructor(store: DataStore, options: ServerOptions) { if (options.namingFunction && typeof options.namingFunction !== 'function') { @@ -31,7 +30,11 @@ export class PostHandler extends BaseHandler { /** * Create a file in the DataStore. */ - async send(req: http.IncomingMessage, res: http.ServerResponse) { + async send( + req: http.IncomingMessage, + res: http.ServerResponse, + context: CancellationContext + ) { if ('upload-concat' in req.headers && !this.store.hasExtension('concatentation')) { throw ERRORS.UNSUPPORTED_CONCATENATION_EXTENSION } @@ -88,25 +91,37 @@ export class PostHandler extends BaseHandler { } } - await this.store.create(upload) - const url = this.generateUrl(req, upload.id) - - this.emit(EVENTS.POST_CREATE, req, res, upload, url) - - let newOffset - let isFinal = upload.size === 0 && !upload.sizeIsDeferred - const headers: { + const lock = await this.acquireLock(req, id, context) + let isFinal: boolean + let url: string + let headers: { 'Upload-Offset'?: string 'Upload-Expires'?: string - } = {} - - // The request MIGHT include a Content-Type header when using creation-with-upload extension - if (validateHeader('content-type', req.headers['content-type'])) { - newOffset = await this.store.write(req, upload.id, 0) - headers['Upload-Offset'] = newOffset.toString() - isFinal = newOffset === Number.parseInt(upload_length as string, 10) - upload.offset = newOffset } + try { + await this.store.create(upload) + url = this.generateUrl(req, upload.id) + + this.emit(EVENTS.POST_CREATE, req, res, upload, url) + + isFinal = upload.size === 0 && !upload.sizeIsDeferred + headers = {} + + // The request MIGHT include a Content-Type header when using creation-with-upload extension + if (validateHeader('content-type', req.headers['content-type'])) { + const newOffset = await this.writeToStore(req, id, 0, context) + + headers['Upload-Offset'] = newOffset.toString() + isFinal = newOffset === Number.parseInt(upload_length as string, 10) + upload.offset = newOffset + } + } catch (e) { + context.abort() + throw e + } finally { + await lock.unlock() + } + if (isFinal && this.options.onUploadFinish) { try { res = await this.options.onUploadFinish(req, res, upload) @@ -124,6 +139,7 @@ export class PostHandler extends BaseHandler { upload.creation_date ) { const created = await this.store.getUpload(upload.id) + if (created.offset !== Number.parseInt(upload_length as string, 10)) { const creation = new Date(upload.creation_date) // Value MUST be in RFC 7231 datetime format diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 538d7111..2bb16a96 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1,4 +1,5 @@ export {Server} from './server' export * from './types' export * from './models' +export * from './lockers' export * from './constants' diff --git a/packages/server/src/lockers/MemoryLocker.ts b/packages/server/src/lockers/MemoryLocker.ts new file mode 100644 index 00000000..8e68fdbe --- /dev/null +++ b/packages/server/src/lockers/MemoryLocker.ts @@ -0,0 +1,122 @@ +import {ERRORS} from '../constants' +import {Lock, Locker, RequestRelease} from '../models' + +/** + * MemoryLocker is an implementation of the Locker interface that manages locks in memory. + * This class is designed for exclusive access control over resources, often used in scenarios like upload management. + * + * Key Features: + * - Ensures exclusive resource access by using a memory-based map to track locks. + * - Implements timeout for lock acquisition, mitigating deadlock situations. + * - Facilitates both immediate and graceful release of locks through different mechanisms. + * + * Locking Behavior: + * - When the `lock` method is invoked for an already locked resource, the `cancelReq` callback is called. + * This signals to the current lock holder that another process is requesting the lock, encouraging them to release it as soon as possible. + * - The lock attempt continues until the specified timeout is reached. If the timeout expires and the lock is still not + * available, an error is thrown to indicate lock acquisition failure. + * + * Lock Acquisition and Release: + * - The `lock` method implements a wait mechanism, allowing a lock request to either succeed when the lock becomes available, + * or fail after the timeout period. + * - The `unlock` method releases a lock, making the resource available for other requests. + */ + +export interface MemoryLockerOptions { + acquireLockTimeout: number +} + +interface LockEntry { + requestRelease: RequestRelease +} + +export class MemoryLocker implements Locker { + timeout: number + locks = new Map() + + constructor(options?: MemoryLockerOptions) { + this.timeout = options?.acquireLockTimeout ?? 1000 * 30 + } + + newLock(id: string) { + return new MemoryLock(id, this, this.timeout) + } +} + +class MemoryLock implements Lock { + constructor( + private id: string, + private locker: MemoryLocker, + private timeout: number = 1000 * 30 + ) {} + + async lock(requestRelease: RequestRelease): Promise { + const abortController = new AbortController() + const lock = await Promise.race([ + this.waitTimeout(abortController.signal), + this.acquireLock(this.id, requestRelease, abortController.signal), + ]) + + abortController.abort() + + if (!lock) { + throw ERRORS.ERR_LOCK_TIMEOUT + } + } + + protected async acquireLock( + id: string, + requestRelease: RequestRelease, + signal: AbortSignal + ): Promise { + if (signal.aborted) { + return false + } + + const lock = this.locker.locks.get(id) + + if (!lock) { + const lock = { + requestRelease, + } + this.locker.locks.set(id, lock) + return true + } + + await lock.requestRelease?.() + + return await new Promise((resolve, reject) => { + // Using setImmediate to: + // 1. Prevent stack overflow by deferring recursive calls to the next event loop iteration. + // 2. Allow event loop to process other pending events, maintaining server responsiveness. + // 3. Ensure fairness in lock acquisition by giving other requests a chance to acquire the lock. + setImmediate(() => { + this.acquireLock(id, requestRelease, signal).then(resolve).catch(reject) + }) + }) + } + + async unlock(): Promise { + const lock = this.locker.locks.get(this.id) + if (!lock) { + throw new Error('Releasing an unlocked lock!') + } + + this.locker.locks.delete(this.id) + } + + protected waitTimeout(signal: AbortSignal) { + return new Promise((resolve) => { + const timeout = setTimeout(() => { + resolve(false) + }, this.timeout) + + const abortListener = () => { + clearTimeout(timeout) + signal.removeEventListener('abort', abortListener) + resolve(false) + } + signal.addEventListener('abort', abortListener) + }) + } +} diff --git a/packages/server/src/lockers/index.ts b/packages/server/src/lockers/index.ts new file mode 100644 index 00000000..b6de79c2 --- /dev/null +++ b/packages/server/src/lockers/index.ts @@ -0,0 +1 @@ +export * from './MemoryLocker' diff --git a/packages/server/src/models/Context.ts b/packages/server/src/models/Context.ts new file mode 100644 index 00000000..18d20c49 --- /dev/null +++ b/packages/server/src/models/Context.ts @@ -0,0 +1,25 @@ +/** + * The CancellationContext interface provides mechanisms to manage the termination of a request. + * It is designed to handle two types of request terminations: immediate abortion and graceful cancellation. + * + * Properties: + * - signal: An instance of AbortSignal. It allows external entities to listen for cancellation requests, + * making it possible to react accordingly. + * + * Methods: + * - abort(): This function should be called to immediately terminate the request. It is intended for scenarios + * where the request cannot continue and needs to be stopped as soon as possible, such as due to upload errors + * or invalid conditions. Implementers should ensure that invoking this method leads to the swift cessation of all + * request-related operations to save resources. + * + * - cancel(): This function is used for more controlled termination of the request. It signals that the request should + * be concluded, but allows for a short period of time to finalize operations gracefully. This could involve + * completing current transactions or cleaning up resources. The exact behavior and the time allowed for cancellation + * completion are determined by the implementation, but the goal is to try to end the request without abrupt interruption, + * ensuring orderly shutdown of ongoing processes. + */ +export interface CancellationContext { + signal: AbortSignal + abort: () => void + cancel: () => void +} diff --git a/packages/server/src/models/Locker.ts b/packages/server/src/models/Locker.ts new file mode 100644 index 00000000..e4365f4b --- /dev/null +++ b/packages/server/src/models/Locker.ts @@ -0,0 +1,31 @@ +export type RequestRelease = () => Promise | void + +/** + * The Locker interface creates a Lock instance for a given resource identifier. + */ +export interface Locker { + newLock(id: string): Lock +} + +/** + * The Lock interface defines methods for implementing a locking mechanism. + * It is primarily used to ensure exclusive access to resources, such as uploads and their metadata. + * + * The interface adheres to TUS protocol recommendations, emphasizing the need to prevent prolonged lock retention. + * This approach helps manage resources efficiently and avoids issues with half-open TCP connections. + * + * Methods: + * - lock(id, cancelReq): Acquires a lock on a resource identified by 'id'. If the lock is already held by another request, + * the 'cancelReq' callback is provided to signal the current lock holder to release the lock. + * The 'cancelReq' callback should be invoked when there's an attempt by another request to acquire a previously locked resource. + * This mechanism ensures that locks are held only as long as necessary and are promptly released for other requests. + * + * - unlock(id): Releases the lock held on the resource identified by 'id'. This should be called by the lock holder + * after completing their operation or upon receiving a signal through the 'cancelReq' callback from a subsequent request + * attempting to acquire the lock. + * + */ +export interface Lock { + lock(cancelReq: RequestRelease): Promise + unlock(): Promise +} diff --git a/packages/server/src/models/index.ts b/packages/server/src/models/index.ts index e3fcbc9c..908f53ca 100644 --- a/packages/server/src/models/index.ts +++ b/packages/server/src/models/index.ts @@ -3,3 +3,5 @@ export * as Metadata from './Metadata' export {StreamSplitter} from './StreamSplitter' export {Uid} from './Uid' export {Upload} from './Upload' +export {Locker, Lock, RequestRelease} from './Locker' +export {CancellationContext} from './Context' diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index 819e3241..e9cee2bb 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -20,8 +20,9 @@ import { } from './constants' import type stream from 'node:stream' -import type {ServerOptions, RouteHandler} from './types' -import type {DataStore, Upload} from './models' +import type {ServerOptions, RouteHandler, WithOptional} from './types' +import type {DataStore, Upload, CancellationContext} from './models' +import {MemoryLocker} from './lockers' type Handlers = { GET: InstanceType @@ -58,6 +59,7 @@ interface TusEvents { type on = EventEmitter['on'] type emit = EventEmitter['emit'] + export declare interface Server { on(event: Event, listener: TusEvents[Event]): this on(eventName: Parameters[0], listener: Parameters[1]): this @@ -77,7 +79,7 @@ export class Server extends EventEmitter { handlers: Handlers options: ServerOptions - constructor(options: ServerOptions & {datastore: DataStore}) { + constructor(options: WithOptional & {datastore: DataStore}) { super() if (!options) { @@ -92,8 +94,12 @@ export class Server extends EventEmitter { throw new Error("'datastore' is not defined; must have a datastore") } + if (!options.locker) { + options.locker = new MemoryLocker() + } + const {datastore, ...rest} = options - this.options = rest + this.options = rest as ServerOptions this.datastore = datastore this.handlers = { // GET handlers should be written in the implementations @@ -139,6 +145,8 @@ export class Server extends EventEmitter { res: http.ServerResponse // TODO: this return type does not make sense ): Promise { + const context = this.createContext(req) + log(`[TusServer] handle: ${req.method} ${req.url}`) // Allow overriding the HTTP method. The reason for this is // that some libraries/environments to not support PATCH and @@ -163,7 +171,7 @@ export class Server extends EventEmitter { } } - return this.write(res, status_code, body) + return this.write(context, req, res, status_code, body) } if (req.method === 'GET') { @@ -177,7 +185,7 @@ export class Server extends EventEmitter { res.setHeader('Tus-Resumable', TUS_RESUMABLE) if (req.method !== 'OPTIONS' && req.headers['tus-resumable'] === undefined) { - return this.write(res, 412, 'Tus-Resumable Required\n') + return this.write(context, req, res, 412, 'Tus-Resumable Required\n') } // Validate all required headers to adhere to the tus protocol @@ -203,7 +211,7 @@ export class Server extends EventEmitter { } if (invalid_headers.length > 0) { - return this.write(res, 400, `Invalid ${invalid_headers.join(' ')}\n`) + return this.write(context, req, res, 400, `Invalid ${invalid_headers.join(' ')}\n`) } // Enable CORS @@ -215,17 +223,47 @@ export class Server extends EventEmitter { // Invoke the handler for the method requested const handler = this.handlers[req.method as keyof Handlers] if (handler) { - return handler.send(req, res).catch(onError) + return handler.send(req, res, context).catch(onError) } - return this.write(res, 404, 'Not found\n') + return this.write(context, req, res, 404, 'Not found\n') } - write(res: http.ServerResponse, status: number, body = '', headers = {}) { + write( + context: CancellationContext, + req: http.IncomingMessage, + res: http.ServerResponse, + status: number, + body = '', + headers = {} + ) { + const isAborted = context.signal.aborted + if (status !== 204) { // @ts-expect-error not explicitly typed but possible headers['Content-Length'] = Buffer.byteLength(body, 'utf8') } + + if (isAborted) { + // This condition handles situations where the request has been flagged as aborted. + // In such cases, the server informs the client that the connection will be closed. + // This is communicated by setting the 'Connection' header to 'close' in the response. + // This step is essential to prevent the server from continuing to process a request + // that is no longer needed, thereby saving resources. + + // @ts-expect-error not explicitly typed but possible + headers['Connection'] = 'close' + + // An event listener is added to the response ('res') for the 'finish' event. + // The 'finish' event is triggered when the response has been sent to the client. + // Once the response is complete, the request ('req') object is destroyed. + // Destroying the request object is a crucial step to release any resources + // tied to this request, as it has already been aborted. + res.on('finish', () => { + req.destroy() + }) + } + res.writeHead(status, headers) res.write(body) return res.end() @@ -243,4 +281,41 @@ export class Server extends EventEmitter { return this.datastore.deleteExpired() } + + protected createContext(req: http.IncomingMessage) { + // Initialize two AbortControllers: + // 1. `requestAbortController` for instant request termination, particularly useful for stopping clients to upload when errors occur. + // 2. `abortWithDelayController` to introduce a delay before aborting, allowing the server time to complete ongoing operations. + // This is particularly useful when a future request may need to acquire a lock currently held by this request. + const requestAbortController = new AbortController() + const abortWithDelayController = new AbortController() + + const onDelayedAbort = (err: unknown) => { + abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort) + setTimeout(() => { + requestAbortController.abort(err) + }, 3000) + } + abortWithDelayController.signal.addEventListener('abort', onDelayedAbort) + + req.on('close', () => { + abortWithDelayController.signal.removeEventListener('abort', onDelayedAbort) + }) + + return { + signal: requestAbortController.signal, + abort: () => { + // abort the request immediately + if (!requestAbortController.signal.aborted) { + requestAbortController.abort(ERRORS.ABORTED) + } + }, + cancel: () => { + // Initiates the delayed abort sequence unless it's already in progress. + if (!abortWithDelayController.signal.aborted) { + abortWithDelayController.abort(ERRORS.ABORTED) + } + }, + } + } } diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index 39b8dec9..7fcbfd21 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -1,53 +1,116 @@ import type http from 'node:http' -import type {Upload} from './models' +import type {Locker, Upload} from './models' +/** + * Represents the configuration options for a server. + */ export type ServerOptions = { - // The route to accept requests. + /** + * The route to accept requests. + */ path: string - // Return a relative URL as the `Location` header. + + /** + * Return a relative URL as the `Location` header. + */ relativeLocation?: boolean - // Allow `Forwarded`, `X-Forwarded-Proto`, and `X-Forwarded-Host` headers - // to override the `Location` header returned by the server. + + /** + * Allow `Forwarded`, `X-Forwarded-Proto`, and `X-Forwarded-Host` headers + * to override the `Location` header returned by the server. + */ respectForwardedHeaders?: boolean - // adds custom headers sent in `Access-Control-Allow-Headers`. + + /** + * Additional headers sent in `Access-Control-Allow-Headers`. + */ allowedHeaders?: string[] - // Control how the upload url is generated + + /** + * Control how the upload URL is generated. + * @param req - The incoming HTTP request. + * @param options - Options for generating the URL. + */ generateUrl?: ( req: http.IncomingMessage, options: {proto: string; host: string; baseUrl: string; path: string; id: string} ) => string - // Control how the Upload-ID is extracted from the request + + /** + * Control how the Upload-ID is extracted from the request. + * @param req - The incoming HTTP request. + */ getFileIdFromRequest?: (req: http.IncomingMessage) => string | void - // Control how you want to name files. - // It is important to make these unique to prevent data loss. Only use it if you really need to. - // Default uses `crypto.randomBytes(16).toString('hex')`. + + /** + * Control how you want to name files. + * It is important to make these unique to prevent data loss. + * Only use it if you really need to. + * Default uses `crypto.randomBytes(16).toString('hex')`. + * @param req - The incoming HTTP request. + */ namingFunction?: (req: http.IncomingMessage) => string - // `onUploadCreate` will be invoked before a new upload is created. - // If the function returns the (modified) response, the upload will be created. - // If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks) - // will be sent to the client. This can be used to implement validation of upload metadata or add headers. + + /** + * The Lock interface defines methods for implementing a locking mechanism. + * It is primarily used to ensure exclusive access to resources, such as uploads and their metadata. + */ + locker: + | Locker + | Promise + | ((req: http.IncomingMessage) => Locker | Promise) + + /** + * `onUploadCreate` will be invoked before a new upload is created. + * If the function returns the (modified) response, the upload will be created. + * If an error is thrown, the HTTP request will be aborted, and the provided `body` and `status_code` + * (or their fallbacks) will be sent to the client. This can be used to implement validation of upload + * metadata or add headers. + * @param req - The incoming HTTP request. + * @param res - The HTTP response. + * @param upload - The Upload object. + */ onUploadCreate?: ( req: http.IncomingMessage, res: http.ServerResponse, upload: Upload ) => Promise - // `onUploadFinish` will be invoked after an upload is completed but before a response is returned to the client. - // If the function returns the (modified) response, the upload will finish. - // If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks) - // will be sent to the client. This can be used to implement post-processing validation. + + /** + * `onUploadFinish` will be invoked after an upload is completed but before a response is returned to the client. + * If the function returns the (modified) response, the upload will finish. + * If an error is thrown, the HTTP request will be aborted, and the provided `body` and `status_code` + * (or their fallbacks) will be sent to the client. This can be used to implement post-processing validation. + * @param req - The incoming HTTP request. + * @param res - The HTTP response. + * @param upload - The Upload object. + */ onUploadFinish?: ( req: http.IncomingMessage, res: http.ServerResponse, upload: Upload ) => Promise + + /** + * `onIncomingRequest` will be invoked when an incoming request is received. + * @param req - The incoming HTTP request. + * @param res - The HTTP response. + * @param uploadId - The ID of the upload. + */ onIncomingRequest?: ( req: http.IncomingMessage, res: http.ServerResponse, uploadId: string ) => Promise - // `onResponseError` will be invoked when an error response is about to be sent by the server. - // you use this function to map custom errors to tus errors or for custom observability. + + /** + * `onResponseError` will be invoked when an error response is about to be sent by the server. + * Use this function to map custom errors to tus errors or for custom observability. + * @param req - The incoming HTTP request. + * @param res - The HTTP response. + * @param err - The error object or response. + */ onResponseError?: ( req: http.IncomingMessage, res: http.ServerResponse, @@ -59,3 +122,7 @@ export type ServerOptions = { } export type RouteHandler = (req: http.IncomingMessage, res: http.ServerResponse) => void + +export type WithOptional = Omit & {[P in K]+?: T[P]} + +export type WithRequired = T & {[P in K]-?: T[P]} diff --git a/packages/server/test/BaseHandler.test.ts b/packages/server/test/BaseHandler.test.ts index 664ee2aa..7fe2117b 100644 --- a/packages/server/test/BaseHandler.test.ts +++ b/packages/server/test/BaseHandler.test.ts @@ -5,10 +5,14 @@ import httpMocks from 'node-mocks-http' import {BaseHandler} from '../src/handlers//BaseHandler' import {DataStore} from '../src/models' +import {MemoryLocker} from '../src' describe('BaseHandler', () => { const store = new DataStore() - const handler = new BaseHandler(store, {path: '/test/output'}) + const handler = new BaseHandler(store, { + path: '/test/output', + locker: new MemoryLocker(), + }) let res: httpMocks.MockResponse beforeEach(() => { @@ -69,6 +73,7 @@ describe('BaseHandler', () => { it('should allow to to generate a url with a custom function', () => { const handler = new BaseHandler(store, { path: '/path', + locker: new MemoryLocker(), generateUrl: (req: http.IncomingMessage, info) => { const {proto, host, baseUrl, path, id} = info return `${proto}://${host}${baseUrl}${path}/${id}?customParam=1` @@ -89,6 +94,7 @@ describe('BaseHandler', () => { it('should allow extracting the request id with a custom function', () => { const handler = new BaseHandler(store, { path: '/path', + locker: new MemoryLocker(), getFileIdFromRequest: (req: http.IncomingMessage) => { return req.url?.split('/').pop() + '-custom' }, diff --git a/packages/server/test/DeleteHandler.test.ts b/packages/server/test/DeleteHandler.test.ts index 64ff2207..a785df3a 100644 --- a/packages/server/test/DeleteHandler.test.ts +++ b/packages/server/test/DeleteHandler.test.ts @@ -9,6 +9,8 @@ import httpMocks from 'node-mocks-http' import {DataStore} from '../src/models/DataStore' import {DeleteHandler} from '../src/handlers/DeleteHandler' import {ERRORS, EVENTS} from '../src/constants' +import {CancellationContext} from '../src/models' +import {MemoryLocker} from '../src' describe('DeleteHandler', () => { const path = '/test/output' @@ -16,28 +18,39 @@ describe('DeleteHandler', () => { let handler: InstanceType let req: http.IncomingMessage let res: httpMocks.MockResponse + let context: CancellationContext beforeEach(() => { fake_store.remove.resetHistory() - handler = new DeleteHandler(fake_store, {relativeLocation: true, path}) + handler = new DeleteHandler(fake_store, { + relativeLocation: true, + path, + locker: new MemoryLocker(), + }) req = {url: `${path}/1234`, method: 'DELETE'} as http.IncomingMessage res = httpMocks.createResponse() + const abortController = new AbortController() + context = { + signal: abortController.signal, + cancel: () => abortController.abort(), + abort: () => abortController.abort(), + } }) it('should 404 if no file id match', () => { fake_store.remove.rejects(ERRORS.FILE_NOT_FOUND) - return assert.rejects(() => handler.send(req, res), {status_code: 404}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 404}) }) it('should 404 if no file ID', async () => { sinon.stub(handler, 'getFileIdFromRequest').returns(undefined) - await assert.rejects(() => handler.send(req, res), {status_code: 404}) + await assert.rejects(() => handler.send(req, res, context), {status_code: 404}) assert.equal(fake_store.remove.callCount, 0) }) it('must acknowledge successful DELETE requests with the 204', async () => { fake_store.remove.resolves() - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res.statusCode, 204) }) @@ -48,6 +61,6 @@ describe('DeleteHandler', () => { assert.equal(id, '1234') done() }) - handler.send(req, res) + handler.send(req, res, context) }) }) diff --git a/packages/server/test/GetHandler.test.ts b/packages/server/test/GetHandler.test.ts index 03ba3985..db9e3253 100644 --- a/packages/server/test/GetHandler.test.ts +++ b/packages/server/test/GetHandler.test.ts @@ -12,10 +12,11 @@ import {GetHandler} from '../src/handlers/GetHandler' import {DataStore} from '../src/models/DataStore' import {FileStore} from '@tus/file-store' import {Upload} from '../src/models/Upload' +import {MemoryLocker} from '../src' describe('GetHandler', () => { const path = '/test/output' - const serverOptions = {path} + const serverOptions = {path, locker: new MemoryLocker()} let req: http.IncomingMessage let res: http.ServerResponse @@ -28,7 +29,7 @@ describe('GetHandler', () => { it('should 404 when file does not exist', async () => { const store = sinon.createStubInstance(FileStore) store.getUpload.rejects({status_code: 404}) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const spy_getFileIdFromRequest = sinon.spy(handler, 'getFileIdFromRequest') req.url = `${path}/1234` await assert.rejects(() => handler.send(req, res), {status_code: 404}) @@ -37,7 +38,7 @@ describe('GetHandler', () => { it('should 404 for non registered path', async () => { const store = sinon.createStubInstance(FileStore) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const spy_getFileIdFromRequest = sinon.spy(handler, 'getFileIdFromRequest') req.url = `/not_a_valid_file_path` await assert.rejects(() => handler.send(req, res), {status_code: 404}) @@ -47,7 +48,7 @@ describe('GetHandler', () => { it('should 404 when file is not complete', async () => { const store = sinon.createStubInstance(FileStore) store.getUpload.resolves(new Upload({id: '1234', offset: 512, size: 1024})) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await assert.rejects(() => handler.send(req, res), {status_code: 404}) @@ -71,7 +72,7 @@ describe('GetHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: size, size})) // @ts-expect-error what should this be? store.read.returns(stream.Readable.from(fs.createReadStream('invalid_path'))) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await handler.send(req, res) @@ -87,7 +88,7 @@ describe('GetHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: 512, size: 512})) // @ts-expect-error should store.read.returns(stream.Readable.from(Buffer.alloc(512))) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await handler.send(req, res) @@ -101,7 +102,7 @@ describe('GetHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: size, size})) // @ts-expect-error what should this be? store.read.returns(stream.Readable.from(Buffer.alloc(size), {objectMode: false})) - const handler = new GetHandler(store, {path}) + const handler = new GetHandler(store, {path, locker: new MemoryLocker()}) const fileId = '1234' req.url = `${path}/${fileId}` await handler.send(req, res) diff --git a/packages/server/test/HeadHandler.test.ts b/packages/server/test/HeadHandler.test.ts index e0414207..243e9f60 100644 --- a/packages/server/test/HeadHandler.test.ts +++ b/packages/server/test/HeadHandler.test.ts @@ -4,35 +4,47 @@ import http from 'node:http' import sinon from 'sinon' import httpMocks from 'node-mocks-http' -import {DataStore, Upload} from '../src/models' +import {DataStore, Upload, CancellationContext} from '../src/models' import {HeadHandler} from '../src/handlers/HeadHandler' import {ERRORS} from '../src/constants' +import {MemoryLocker} from '../src' describe('HeadHandler', () => { const path = '/test/output' const fake_store = sinon.createStubInstance(DataStore) - const handler = new HeadHandler(fake_store, {relativeLocation: true, path}) + const handler = new HeadHandler(fake_store, { + relativeLocation: true, + path, + locker: new MemoryLocker(), + }) let req: http.IncomingMessage let res: httpMocks.MockResponse + let context: CancellationContext beforeEach(() => { req = {url: `${path}/1234`, method: 'HEAD'} as http.IncomingMessage res = httpMocks.createResponse({req}) + const abortController = new AbortController() + context = { + cancel: () => abortController.abort(), + abort: () => abortController.abort(), + signal: abortController.signal, + } }) it('should 404 if no file id match', () => { fake_store.getUpload.rejects(ERRORS.FILE_NOT_FOUND) - return assert.rejects(() => handler.send(req, res), {status_code: 404}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 404}) }) it('should 404 if no file ID', () => { req.url = `${path}/` - return assert.rejects(() => handler.send(req, res), {status_code: 404}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 404}) }) it('should resolve with the offset and cache-control', async () => { fake_store.getUpload.resolves(new Upload({id: '1234', offset: 0})) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res.getHeader('Upload-Offset'), 0) assert.equal(res.getHeader('Cache-Control'), 'no-store') assert.equal(res.statusCode, 200) @@ -45,7 +57,7 @@ describe('HeadHandler', () => { size: 512, }) fake_store.getUpload.resolves(file) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res.getHeader('Upload-Length'), file.size) assert.equal(res.hasHeader('Upload-Defer-Length'), false) }) @@ -56,7 +68,7 @@ describe('HeadHandler', () => { offset: 0, }) fake_store.getUpload.resolves(file) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res.getHeader('Upload-Defer-Length'), '1') assert.equal(res.hasHeader('Upload-Length'), false) }) @@ -68,7 +80,7 @@ describe('HeadHandler', () => { metadata: {is_confidential: null, foo: 'bar'}, }) fake_store.getUpload.resolves(file) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res.getHeader('Upload-Metadata'), 'is_confidential,foo YmFy') }) @@ -78,7 +90,7 @@ describe('HeadHandler', () => { offset: 0, }) fake_store.getUpload.resolves(file) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res.hasHeader('Upload-Metadata'), false) }) }) diff --git a/packages/server/test/Locker.test.ts b/packages/server/test/Locker.test.ts new file mode 100644 index 00000000..0f377025 --- /dev/null +++ b/packages/server/test/Locker.test.ts @@ -0,0 +1,82 @@ +import assert from 'node:assert' +import sinon from 'sinon' +import {ERRORS, MemoryLocker} from '../src' + +describe('MemoryLocker', () => { + it('will acquire a lock by notifying another to release it', async () => { + const locker = new MemoryLocker() + const lockId = 'upload-id-1' + + const cancel = sinon.spy() + const cancel2 = sinon.spy() + + const lock1 = locker.newLock(lockId) + const lock2 = locker.newLock(lockId) + + await lock1.lock(async () => { + await lock1.unlock() + cancel() + }) + + await lock2.lock(async () => { + cancel2() + }) + + await lock2.unlock() + + assert(cancel.callCount === 1, `calls count dont match ${cancel.callCount} !== 1`) + assert(cancel2.callCount === 0, `calls count dont match ${cancel.callCount} !== 1`) + }) + + it('will return a lock timeout error', async () => { + const locker = new MemoryLocker({ + acquireLockTimeout: 500, + }) + const lockId = 'upload-id-1' + const lock = locker.newLock(lockId) + + const cancel = sinon.spy() + + await lock.lock(async () => { + cancel() + // We note that the function has been called, but do not + // release the lock + }) + + try { + await lock.lock(async () => { + throw new Error('panic should not be called') + }) + } catch (e) { + assert(!(e instanceof Error), `error returned is not correct ${e.message}`) + assert('body' in e, 'body is not present in the error') + assert(e.body === ERRORS.ERR_LOCK_TIMEOUT.body) + } + }) + + it('request lock and unlock', async () => { + const locker = new MemoryLocker() + const lockId = 'upload-id-1' + const lock = locker.newLock(lockId) + const lock2 = locker.newLock(lockId) + + const cancel = sinon.spy() + await lock.lock(() => { + cancel() + setTimeout(async () => { + await lock.unlock() + }, 50) + }) + + await lock2.lock(() => { + throw new Error('should not be called') + }) + + await lock2.unlock() + + assert( + cancel.callCount > 0, + `request released called more times than expected - ${cancel.callCount}` + ) + }) +}) diff --git a/packages/server/test/OptionsHandler.test.ts b/packages/server/test/OptionsHandler.test.ts index b4a35f49..5a3aedb2 100644 --- a/packages/server/test/OptionsHandler.test.ts +++ b/packages/server/test/OptionsHandler.test.ts @@ -8,9 +8,10 @@ import httpMocks from 'node-mocks-http' import {OptionsHandler} from '../src/handlers/OptionsHandler' import {DataStore} from '../src/models/DataStore' import {ALLOWED_METHODS, ALLOWED_HEADERS, MAX_AGE} from '../src/constants' +import {MemoryLocker} from '../src' describe('OptionsHandler', () => { - const options = {path: '/test/output'} + const options = {path: '/test/output', locker: new MemoryLocker()} const store = new DataStore() const handler = new OptionsHandler(store, options) diff --git a/packages/server/test/PatchHandler.test.ts b/packages/server/test/PatchHandler.test.ts index 79ac2a92..f8f6c91e 100644 --- a/packages/server/test/PatchHandler.test.ts +++ b/packages/server/test/PatchHandler.test.ts @@ -7,8 +7,11 @@ import sinon from 'sinon' import httpMocks from 'node-mocks-http' import {PatchHandler} from '../src/handlers/PatchHandler' -import {Upload, DataStore} from '../src/models' +import {Upload, DataStore, CancellationContext} from '../src/models' import {EVENTS} from '../src/constants' +import {EventEmitter} from 'node:events' +import {addPipableStreamBody} from './utils' +import {MemoryLocker} from '../src' describe('PatchHandler', () => { const path = '/test/output' @@ -16,22 +19,35 @@ describe('PatchHandler', () => { let res: httpMocks.MockResponse let store: sinon.SinonStubbedInstance let handler: InstanceType + let context: CancellationContext beforeEach(() => { store = sinon.createStubInstance(DataStore) - handler = new PatchHandler(store, {path}) - req = {method: 'PATCH', url: `${path}/1234`} as http.IncomingMessage + handler = new PatchHandler(store, {path, locker: new MemoryLocker()}) + req = addPipableStreamBody( + httpMocks.createRequest({ + method: 'PATCH', + url: `${path}/1234`, + eventEmitter: EventEmitter, + }) + ) res = httpMocks.createResponse({req}) + const abortController = new AbortController() + context = { + cancel: () => abortController.abort(), + abort: () => abortController.abort(), + signal: abortController.signal, + } }) it('should 403 if no Content-Type header', () => { req.headers = {} - return assert.rejects(() => handler.send(req, res), {status_code: 403}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 403}) }) it('should 403 if no Upload-Offset header', () => { req.headers = {'content-type': 'application/offset+octet-stream'} - return assert.rejects(() => handler.send(req, res), {status_code: 403}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 403}) }) it('should call onUploadFinished hook', async function () { @@ -39,6 +55,7 @@ describe('PatchHandler', () => { const handler = new PatchHandler(store, { path: '/test/output', onUploadFinish: spy, + locker: new MemoryLocker(), }) req.headers = { @@ -48,7 +65,7 @@ describe('PatchHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: 0, size: 1024})) store.write.resolves(1024) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(spy.calledOnce, true) const upload = spy.args[0][2] assert.equal(upload.offset, 1024) @@ -58,7 +75,7 @@ describe('PatchHandler', () => { describe('send()', () => { it('should 404 urls without a path', () => { req.url = `${path}/` - return assert.rejects(() => handler.send(req, res), {status_code: 404}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 404}) }) it('should 403 if the offset is omitted', () => { @@ -66,13 +83,13 @@ describe('PatchHandler', () => { 'content-type': 'application/offset+octet-stream', } req.url = `${path}/file` - return assert.rejects(() => handler.send(req, res), {status_code: 403}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 403}) }) it('should 403 the content-type is omitted', () => { req.headers = {'upload-offset': '0'} req.url = `${path}/file` - return assert.rejects(() => handler.send(req, res), {status_code: 403}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 403}) }) it('should declare upload-length once it is send', async () => { @@ -88,7 +105,7 @@ describe('PatchHandler', () => { store.write.resolves(5) store.declareUploadLength.resolves() - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(store.declareUploadLength.calledOnceWith('file', 10), true) }) @@ -104,7 +121,7 @@ describe('PatchHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: 0, size: 20})) store.hasExtension.withArgs('creation-defer-length').returns(true) - return assert.rejects(() => handler.send(req, res), {status_code: 400}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 400}) }) it('must return a promise if the headers validate', () => { @@ -115,7 +132,7 @@ describe('PatchHandler', () => { } req.url = `${path}/1234` // eslint-disable-next-line new-cap - handler.send(req, res).should.be.a.Promise() + handler.send(req, res, context).should.be.a.Promise() }) it('must 409 if the offset does not match', () => { @@ -127,7 +144,7 @@ describe('PatchHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: 0, size: 512})) - return assert.rejects(() => handler.send(req, res), {status_code: 409}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 409}) }) it('must acknowledge successful PATCH requests with the 204', async () => { @@ -139,7 +156,7 @@ describe('PatchHandler', () => { store.getUpload.resolves(new Upload({id: '1234', offset: 0, size: 1024})) store.write.resolves(10) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res._getHeaders()['upload-offset'], 10) assert.equal(res.hasHeader('Content-Length'), false) @@ -158,7 +175,7 @@ describe('PatchHandler', () => { store.write.resolves(10) handler.on(EVENTS.POST_RECEIVE, spy) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(spy.calledOnce, true) assert.ok(spy.args[0][0]) diff --git a/packages/server/test/PostHandler.test.ts b/packages/server/test/PostHandler.test.ts index e17a7de0..e7ce3559 100644 --- a/packages/server/test/PostHandler.test.ts +++ b/packages/server/test/PostHandler.test.ts @@ -7,25 +7,35 @@ import http from 'node:http' import httpMocks from 'node-mocks-http' import sinon from 'sinon' -import {Upload, DataStore} from '../src/models' +import {Upload, DataStore, CancellationContext} from '../src/models' import {PostHandler} from '../src/handlers/PostHandler' import {EVENTS} from '../src/constants' +import {addPipableStreamBody} from './utils' +import {MemoryLocker} from '../src' const SERVER_OPTIONS = { path: '/test', namingFunction: () => '1234', + locker: new MemoryLocker(), } describe('PostHandler', () => { let req: http.IncomingMessage let res: httpMocks.MockResponse + let context: CancellationContext const fake_store = sinon.createStubInstance(DataStore) fake_store.hasExtension.withArgs('creation-defer-length').returns(true) beforeEach(() => { - req = {url: '/files', method: 'POST'} as http.IncomingMessage + req = addPipableStreamBody(httpMocks.createRequest({method: 'POST'})) res = httpMocks.createResponse({req}) + const abortController = new AbortController() + context = { + cancel: () => abortController.abort(), + abort: () => abortController.abort(), + signal: abortController.signal, + } }) describe('constructor()', () => { @@ -46,39 +56,44 @@ describe('PostHandler', () => { const handler = new PostHandler(fake_store, SERVER_OPTIONS) req.headers = {} - return assert.rejects(() => handler.send(req, res), {status_code: 400}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 400}) }) it('must 400 if the Upload-Length and Upload-Defer-Length headers are both present', async () => { const handler = new PostHandler(fake_store, SERVER_OPTIONS) req.headers = {'upload-length': '512', 'upload-defer-length': '1'} - return assert.rejects(() => handler.send(req, res), {status_code: 400}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 400}) }) it("must 501 if the 'concatenation' extension is not supported", async () => { const handler = new PostHandler(fake_store, SERVER_OPTIONS) req.headers = {'upload-concat': 'partial'} - return assert.rejects(() => handler.send(req, res), {status_code: 501}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 501}) }) it('should send error when naming function throws', async () => { const fake_store = sinon.createStubInstance(DataStore) const handler = new PostHandler(fake_store, { path: '/test', + locker: new MemoryLocker(), namingFunction: sinon.stub().throws(), }) req.headers = {'upload-length': '1000'} - return assert.rejects(() => handler.send(req, res), {status_code: 500}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 500}) }) it('should call custom namingFunction', async () => { const fake_store = sinon.createStubInstance(DataStore) const namingFunction = sinon.stub().returns('1234') - const handler = new PostHandler(fake_store, {path: '/test/', namingFunction}) + const handler = new PostHandler(fake_store, { + path: '/test/', + namingFunction, + locker: new MemoryLocker(), + }) req.headers = {'upload-length': '1000'} - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(namingFunction.calledOnce, true) }) @@ -89,7 +104,7 @@ describe('PostHandler', () => { const handler = new PostHandler(fake_store, SERVER_OPTIONS) req.headers = {'upload-length': '1000'} - return assert.rejects(() => handler.send(req, res), {status_code: 500}) + return assert.rejects(() => handler.send(req, res, context), {status_code: 500}) }) }) @@ -97,10 +112,11 @@ describe('PostHandler', () => { it('must acknowledge successful POST requests with the 201', async () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), namingFunction: () => '1234', }) req.headers = {'upload-length': '1000', host: 'localhost:3000'} - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res._getHeaders().location, 'http://localhost:3000/test/output/1234') assert.equal(res.statusCode, 201) }) @@ -109,6 +125,7 @@ describe('PostHandler', () => { describe('respect forwarded headers', () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), respectForwardedHeaders: true, namingFunction: () => '1234', }) @@ -120,7 +137,7 @@ describe('PostHandler', () => { 'x-forwarded-host': 'foo.com', 'x-forwarded-proto': 'https', } - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res._getHeaders().location, 'https://foo.com/test/output/1234') assert.equal(res.statusCode, 201) }) @@ -131,7 +148,7 @@ describe('PostHandler', () => { host: 'localhost:3000', forwarded: 'for=localhost:3000;by=203.0.113.60;proto=https;host=foo.com', } - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res._getHeaders().location, 'https://foo.com/test/output/1234') assert.equal(res.statusCode, 201) }) @@ -142,7 +159,7 @@ describe('PostHandler', () => { host: 'localhost:3000', forwarded: 'invalid', } - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res._getHeaders().location, 'http://localhost:3000/test/output/1234') assert.equal(res.statusCode, 201) }) @@ -153,7 +170,7 @@ describe('PostHandler', () => { host: 'localhost:3000', 'x-forwarded-proto': 'foo', } - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res._getHeaders().location, 'http://localhost:3000/test/output/1234') assert.equal(res.statusCode, 201) }) @@ -161,11 +178,12 @@ describe('PostHandler', () => { it('should handle root as path', async () => { const handler = new PostHandler(fake_store, { path: '/', + locker: new MemoryLocker(), respectForwardedHeaders: true, namingFunction: () => '1234', }) req.headers = {'upload-length': '1000', host: 'localhost:3000'} - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(res._getHeaders().location, 'http://localhost:3000/1234') assert.equal(res.statusCode, 201) }) @@ -182,7 +200,7 @@ describe('PostHandler', () => { store.create.resolves(file) handler.on(EVENTS.POST_CREATE, spy) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(spy.calledOnce, true) }) @@ -194,6 +212,7 @@ describe('PostHandler', () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), namingFunction: () => '1234', }) handler.on(EVENTS.POST_CREATE, (_, __, ___, url) => { @@ -202,7 +221,7 @@ describe('PostHandler', () => { }) req.headers = {'upload-length': '1000', host: 'localhost:3000'} - handler.send(req, res) + handler.send(req, res, context) }) it(`must fire the ${EVENTS.POST_CREATE} event with relative URL`, (done) => { @@ -213,6 +232,7 @@ describe('PostHandler', () => { const handler = new PostHandler(fake_store, { path: '/test/output', + locker: new MemoryLocker(), relativeLocation: true, namingFunction: () => '1234', }) @@ -222,7 +242,7 @@ describe('PostHandler', () => { }) req.headers = {'upload-length': '1000', host: 'localhost:3000'} - handler.send(req, res) + handler.send(req, res, context) }) it(`must fire the ${EVENTS.POST_CREATE} event when upload is complete with single request`, (done) => { @@ -233,7 +253,10 @@ describe('PostHandler', () => { fake_store.create.resolvesArg(0) fake_store.write.resolves(upload_length) - const handler = new PostHandler(fake_store, {path: '/test/output'}) + const handler = new PostHandler(fake_store, { + path: '/test/output', + locker: new MemoryLocker(), + }) handler.on(EVENTS.POST_CREATE, () => { done() }) @@ -243,7 +266,7 @@ describe('PostHandler', () => { host: 'localhost:3000', 'content-type': 'application/offset+octet-stream', } - handler.send(req, res) + handler.send(req, res, context) }) it('should call onUploadCreate hook', async function () { @@ -251,6 +274,7 @@ describe('PostHandler', () => { const spy = sinon.stub().resolvesArg(1) const handler = new PostHandler(store, { path: '/test/output', + locker: new MemoryLocker(), onUploadCreate: spy, }) @@ -260,7 +284,7 @@ describe('PostHandler', () => { } store.create.resolvesArg(0) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(spy.calledOnce, true) const upload = spy.args[0][2] assert.equal(upload.offset, 0) @@ -272,6 +296,7 @@ describe('PostHandler', () => { const spy = sinon.stub().resolvesArg(1) const handler = new PostHandler(store, { path: '/test/output', + locker: new MemoryLocker(), onUploadFinish: spy, }) @@ -283,7 +308,7 @@ describe('PostHandler', () => { store.create.resolvesArg(0) store.write.resolves(1024) - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(spy.calledOnce, true) const upload = spy.args[0][2] assert.equal(upload.offset, 1024) @@ -295,12 +320,13 @@ describe('PostHandler', () => { const spy = sinon.stub().resolvesArg(1) const handler = new PostHandler(store, { path: '/test/output', + locker: new MemoryLocker(), onUploadFinish: spy, }) req.headers = {'upload-length': '0', host: 'localhost:3000'} - await handler.send(req, res) + await handler.send(req, res, context) assert.equal(spy.calledOnce, true) const upload = spy.args[0][2] assert.equal(upload.offset, 0) diff --git a/packages/server/test/Server.test.ts b/packages/server/test/Server.test.ts index 96846556..3c5ee442 100644 --- a/packages/server/test/Server.test.ts +++ b/packages/server/test/Server.test.ts @@ -12,6 +12,7 @@ import {Server} from '../src' import {FileStore} from '@tus/file-store' import {DataStore} from '../src/models' import {TUS_RESUMABLE, EVENTS} from '../src/constants' +import httpMocks from 'node-mocks-http' import sinon from 'sinon' // Test server crashes on http://{some-ip} so we remove the protocol... @@ -234,13 +235,13 @@ describe('Server', () => { }) it('should allow overriding the HTTP method', (done) => { - const req = { + const req = httpMocks.createRequest({ headers: {'x-http-method-override': 'OPTIONS'}, method: 'GET', - } + }) + // @ts-expect-error todo const res = new http.ServerResponse({method: 'OPTIONS'}) - // @ts-expect-error todo server.handle(req, res) assert.equal(req.method, 'OPTIONS') done() @@ -248,10 +249,13 @@ describe('Server', () => { it('should allow overriding the HTTP method', async () => { const origin = 'vimeo.com' - const req = {headers: {origin}, method: 'OPTIONS', url: '/'} + const req = httpMocks.createRequest({ + headers: {origin}, + method: 'OPTIONS', + url: '/', + }) // @ts-expect-error todo const res = new http.ServerResponse({method: 'OPTIONS'}) - // @ts-expect-error todo await server.handle(req, res) assert.equal(res.hasHeader('Access-Control-Allow-Origin'), true) }) diff --git a/packages/server/test/utils.ts b/packages/server/test/utils.ts new file mode 100644 index 00000000..a16866bd --- /dev/null +++ b/packages/server/test/utils.ts @@ -0,0 +1,21 @@ +import httpMocks from 'node-mocks-http' +import stream from 'node:stream' + +export function addPipableStreamBody>( + mockRequest: T +) { + // Create a Readable stream that simulates the request body + const bodyStream = new stream.Readable({ + read() { + this.push(JSON.stringify(mockRequest.body)) + this.push(null) + }, + }) + + // Add the pipe method to the mockRequest + // @ts-expect-error pipe exists + mockRequest.pipe = function (dest: stream.Writable) { + bodyStream.pipe(dest) + } + return mockRequest +} diff --git a/test/e2e.test.ts b/test/e2e.test.ts index 6c178241..b915b373 100644 --- a/test/e2e.test.ts +++ b/test/e2e.test.ts @@ -7,12 +7,15 @@ import rimraf from 'rimraf' import request from 'supertest' import {Storage} from '@google-cloud/storage' -import {Server, TUS_RESUMABLE} from '@tus/server' +import {MemoryLocker, Server, TUS_RESUMABLE} from '@tus/server' import {Configstore, MemoryConfigstore} from '@tus/file-store' import {FileStore} from '@tus/file-store' import {GCSStore} from '@tus/gcs-store' -import type http from 'node:http' +import http from 'node:http' +import sinon from 'sinon' +import Throttle from 'throttle' +import {Agent} from 'http' const STORE_PATH = '/output' const PROJECT_ID = 'tus-node-server' @@ -676,4 +679,107 @@ describe('EndToEnd', () => { }) }) }) + + describe('File Store with Locking', () => { + before(() => { + server = new Server({ + path: STORE_PATH, + datastore: new FileStore({directory: `./${STORE_PATH}`}), + locker: new MemoryLocker(), + }) + listener = server.listen() + agent = request.agent(listener) + }) + + after((done) => { + // Remove the files directory + rimraf(FILES_DIRECTORY, (err) => { + if (err) { + return done(err) + } + + // Clear the config + // @ts-expect-error we can consider a generic to pass to + // datastore to narrow down the store type + const uploads = (server.datastore.configstore as Configstore).list?.() ?? [] + for (const upload in uploads) { + // @ts-expect-error we can consider a generic to pass to + // datastore to narrow down the store type + await(server.datastore.configstore as Configstore).delete(upload) + } + listener.close() + return done() + }) + }) + + it('will allow another request to acquire the lock by cancelling the previous request', async () => { + const res = await agent + .post(STORE_PATH) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Length', TEST_FILE_SIZE) + .set('Upload-Metadata', TEST_METADATA) + .set('Tus-Resumable', TUS_RESUMABLE) + .expect(201) + + assert.equal('location' in res.headers, true) + assert.equal(res.headers['tus-resumable'], TUS_RESUMABLE) + // Save the id for subsequent tests + const file_id = res.headers.location.split('/').pop() + const file_size = parseInt(TEST_FILE_SIZE, 10) + + // Slow down writing + const originalWrite = server.datastore.write.bind(server.datastore) + sinon.stub(server.datastore, 'write').callsFake((stream, ...args) => { + const throttleStream = new Throttle({bps: file_size / 4}) + return originalWrite(stream.pipe(throttleStream), ...args) + }) + + const data = Buffer.alloc(parseInt(TEST_FILE_SIZE, 10), 'a') + const httpAgent = new Agent({ + maxSockets: 2, + maxFreeSockets: 10, + timeout: 10000, + keepAlive: true, + }) + + const createPatchReq = (offset: number) => { + return agent + .patch(`${STORE_PATH}/${file_id}`) + .agent(httpAgent) + .set('Tus-Resumable', TUS_RESUMABLE) + .set('Upload-Offset', offset.toString()) + .set('Content-Type', 'application/offset+octet-stream') + .send(data.subarray(offset)) + } + + const req1 = createPatchReq(0).then((e) => e) + await wait(100) + + const req2 = agent + .head(`${STORE_PATH}/${file_id}`) + .agent(httpAgent) + .set('Tus-Resumable', TUS_RESUMABLE) + .expect(200) + .then((e) => e) + + const [res1, res2] = await Promise.allSettled([req1, req2]) + assert.equal(res1.status, 'fulfilled') + assert.equal(res2.status, 'fulfilled') + assert.equal(res1.value.statusCode, 400) + assert.equal(res1.value.headers['upload-offset'] !== TEST_FILE_SIZE, true) + + assert.equal(res2.value.statusCode, 200) + + // Verify that we are able to resume even if the first request + // was cancelled by the second request trying to acquire the lock + const offset = parseInt(res2.value.headers['upload-offset'], 10) + + const finishedUpload = await createPatchReq(offset) + + assert.equal(finishedUpload.statusCode, 204) + assert.equal(finishedUpload.headers['upload-offset'], TEST_FILE_SIZE) + }).timeout(20000) + }) }) + +const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) diff --git a/test/package.json b/test/package.json index e5833471..422792ec 100644 --- a/test/package.json +++ b/test/package.json @@ -18,11 +18,13 @@ "@types/rimraf": "^3.0.2", "@types/sinon": "^10.0.16", "@types/supertest": "^2.0.12", + "@types/throttle": "^1.0.4", "mocha": "^10.2.0", "rimraf": "^3.0.2", "should": "^13.2.3", "sinon": "^15.2.0", "supertest": "^6.3.3", + "throttle": "^1.0.3", "ts-node": "^10.9.1", "tsconfig": "*", "typescript": "^5.2.2" diff --git a/test/s3.e2e.ts b/test/s3.e2e.ts index e935415a..7bb6f623 100644 --- a/test/s3.e2e.ts +++ b/test/s3.e2e.ts @@ -165,7 +165,7 @@ describe('S3 Store E2E', () => { assert(infoFile.$metadata.httpStatusCode === 200) assert(partFile.$metadata.httpStatusCode === 200) - await wait(expireTime) + await wait(expireTime + 100) // .info file and .part should be deleted since now they should be expired const deletedFiles = await store.deleteExpired() diff --git a/yarn.lock b/yarn.lock index d4f16063..fd8bb4bc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1500,7 +1500,6 @@ __metadata: eslint: ^8.48.0 eslint-config-custom: "workspace:*" mocha: ^10.2.0 - p-queue: ^6.6.2 should: ^13.2.3 typescript: ^5.2.2 peerDependencies: @@ -1719,6 +1718,15 @@ __metadata: languageName: node linkType: hard +"@types/throttle@npm:^1.0.4": + version: 1.0.4 + resolution: "@types/throttle@npm:1.0.4" + dependencies: + "@types/node": "*" + checksum: 54eb7238bace356775275687e3f6725bd0a114c5a4a2f39f660dd99ced68b3dda224b403a8dc0741dbdc30eacda24edda0a22121c7a9114b89073c643eb3249b + languageName: node + linkType: hard + "@typescript-eslint/eslint-plugin@npm:^5.62.0": version: 5.62.0 resolution: "@typescript-eslint/eslint-plugin@npm:5.62.0" @@ -2101,7 +2109,7 @@ __metadata: languageName: node linkType: hard -"base64-js@npm:^1.3.0": +"base64-js@npm:^1.3.0, base64-js@npm:^1.3.1": version: 1.5.1 resolution: "base64-js@npm:1.5.1" checksum: 669632eb3745404c2f822a18fc3a0122d2f9a7a13f7fb8b5823ee19d1d2ff9ee5b52c53367176ea4ad093c332fd5ab4bd0ebae5a8e27917a4105a4cfc86b1005 @@ -2178,6 +2186,16 @@ __metadata: languageName: node linkType: hard +"buffer@npm:^6.0.3": + version: 6.0.3 + resolution: "buffer@npm:6.0.3" + dependencies: + base64-js: ^1.3.1 + ieee754: ^1.2.1 + checksum: 5ad23293d9a731e4318e420025800b42bf0d264004c0286c8cc010af7a270c7a0f6522e84f54b9ad65cbd6db20b8badbfd8d2ebf4f80fa03dab093b89e68c3f9 + languageName: node + linkType: hard + "cacache@npm:^17.0.0": version: 17.1.3 resolution: "cacache@npm:17.1.3" @@ -2410,6 +2428,15 @@ __metadata: languageName: node linkType: hard +"debug@npm:2": + version: 2.6.9 + resolution: "debug@npm:2.6.9" + dependencies: + ms: 2.0.0 + checksum: d2f51589ca66df60bf36e1fa6e4386b318c3f1e06772280eea5b1ae9fd3d05e9c2b7fd8a7d862457d00853c75b00451aa2d7459b924629ee385287a650f58fe6 + languageName: node + linkType: hard + "debug@npm:4, debug@npm:4.3.4, debug@npm:^4.1.0, debug@npm:^4.1.1, debug@npm:^4.3.2, debug@npm:^4.3.3, debug@npm:^4.3.4": version: 4.3.4 resolution: "debug@npm:4.3.4" @@ -2830,10 +2857,10 @@ __metadata: languageName: node linkType: hard -"eventemitter3@npm:^4.0.4": - version: 4.0.7 - resolution: "eventemitter3@npm:4.0.7" - checksum: 1875311c42fcfe9c707b2712c32664a245629b42bb0a5a84439762dd0fd637fc54d078155ea83c2af9e0323c9ac13687e03cfba79b03af9f40c89b4960099374 +"events@npm:^3.3.0": + version: 3.3.0 + resolution: "events@npm:3.3.0" + checksum: f6f487ad2198aa41d878fa31452f1a3c00958f46e9019286ff4787c84aac329332ab45c9cdc8c445928fc6d7ded294b9e005a7fce9426488518017831b272780 languageName: node linkType: hard @@ -3371,6 +3398,13 @@ __metadata: languageName: node linkType: hard +"ieee754@npm:^1.2.1": + version: 1.2.1 + resolution: "ieee754@npm:1.2.1" + checksum: 5144c0c9815e54ada181d80a0b810221a253562422e7c6c3a60b1901154184f49326ec239d618c416c1c5945a2e197107aee8d986a3dd836b53dffefd99b5e7e + languageName: node + linkType: hard + "ignore@npm:^5.2.0": version: 5.2.4 resolution: "ignore@npm:5.2.4" @@ -3991,6 +4025,13 @@ __metadata: languageName: node linkType: hard +"ms@npm:2.0.0": + version: 2.0.0 + resolution: "ms@npm:2.0.0" + checksum: 0e6a22b8b746d2e0b65a430519934fefd41b6db0682e3477c10f60c76e947c4c0ad06f63ffdf1d78d335f83edee8c0aa928aa66a36c7cd95b69b26f468d527f4 + languageName: node + linkType: hard + "ms@npm:2.1.2": version: 2.1.2 resolution: "ms@npm:2.1.2" @@ -4168,13 +4209,6 @@ __metadata: languageName: node linkType: hard -"p-finally@npm:^1.0.0": - version: 1.0.0 - resolution: "p-finally@npm:1.0.0" - checksum: 93a654c53dc805dd5b5891bab16eb0ea46db8f66c4bfd99336ae929323b1af2b70a8b0654f8f1eae924b2b73d037031366d645f1fd18b3d30cbd15950cc4b1d4 - languageName: node - linkType: hard - "p-limit@npm:^3.0.1, p-limit@npm:^3.0.2": version: 3.1.0 resolution: "p-limit@npm:3.1.0" @@ -4202,25 +4236,6 @@ __metadata: languageName: node linkType: hard -"p-queue@npm:^6.6.2": - version: 6.6.2 - resolution: "p-queue@npm:6.6.2" - dependencies: - eventemitter3: ^4.0.4 - p-timeout: ^3.2.0 - checksum: 832642fcc4ab6477b43e6d7c30209ab10952969ed211c6d6f2931be8a4f9935e3578c72e8cce053dc34f2eb6941a408a2c516a54904e989851a1a209cf19761c - languageName: node - linkType: hard - -"p-timeout@npm:^3.2.0": - version: 3.2.0 - resolution: "p-timeout@npm:3.2.0" - dependencies: - p-finally: ^1.0.0 - checksum: 3dd0eaa048780a6f23e5855df3dd45c7beacff1f820476c1d0d1bcd6648e3298752ba2c877aa1c92f6453c7dd23faaf13d9f5149fc14c0598a142e2c5e8d649c - languageName: node - linkType: hard - "parent-module@npm:^1.0.0": version: 1.0.1 resolution: "parent-module@npm:1.0.1" @@ -4316,6 +4331,13 @@ __metadata: languageName: node linkType: hard +"process@npm:^0.11.10": + version: 0.11.10 + resolution: "process@npm:0.11.10" + checksum: bfcce49814f7d172a6e6a14d5fa3ac92cc3d0c3b9feb1279774708a719e19acd673995226351a082a9ae99978254e320ccda4240ddc474ba31a76c79491ca7c3 + languageName: node + linkType: hard + "promise-retry@npm:^2.0.1": version: 2.0.1 resolution: "promise-retry@npm:2.0.1" @@ -4382,6 +4404,19 @@ __metadata: languageName: node linkType: hard +"readable-stream@npm:>= 0.3.0": + version: 4.4.2 + resolution: "readable-stream@npm:4.4.2" + dependencies: + abort-controller: ^3.0.0 + buffer: ^6.0.3 + events: ^3.3.0 + process: ^0.11.10 + string_decoder: ^1.3.0 + checksum: 6f4063763dbdb52658d22d3f49ca976420e1fbe16bbd241f744383715845350b196a2f08b8d6330f8e219153dff34b140aeefd6296da828e1041a7eab1f20d5e + languageName: node + linkType: hard + "readable-stream@npm:^3.1.1, readable-stream@npm:^3.6.0": version: 3.6.2 resolution: "readable-stream@npm:3.6.2" @@ -4698,6 +4733,15 @@ __metadata: languageName: node linkType: hard +"stream-parser@npm:>= 0.0.2": + version: 0.3.1 + resolution: "stream-parser@npm:0.3.1" + dependencies: + debug: 2 + checksum: 4d86ff8cffe7c7587dc91433fff9dce38a93ea7e9f47560055addc81eae6b6befab22b75643ce539faf325fe2b17d371778242566bed086e75f6cffb1e76c06c + languageName: node + linkType: hard + "stream-shift@npm:^1.0.0": version: 1.0.1 resolution: "stream-shift@npm:1.0.1" @@ -4727,7 +4771,7 @@ __metadata: languageName: node linkType: hard -"string_decoder@npm:^1.1.1": +"string_decoder@npm:^1.1.1, string_decoder@npm:^1.3.0": version: 1.3.0 resolution: "string_decoder@npm:1.3.0" dependencies: @@ -4861,11 +4905,13 @@ __metadata: "@types/rimraf": ^3.0.2 "@types/sinon": ^10.0.16 "@types/supertest": ^2.0.12 + "@types/throttle": ^1.0.4 mocha: ^10.2.0 rimraf: ^3.0.2 should: ^13.2.3 sinon: ^15.2.0 supertest: ^6.3.3 + throttle: ^1.0.3 ts-node: ^10.9.1 tsconfig: "*" typescript: ^5.2.2 @@ -4879,6 +4925,16 @@ __metadata: languageName: node linkType: hard +"throttle@npm:^1.0.3": + version: 1.0.3 + resolution: "throttle@npm:1.0.3" + dependencies: + readable-stream: ">= 0.3.0" + stream-parser: ">= 0.0.2" + checksum: acbda5a239b6a6453bdd0d98018123fbed04606b3cfacb189713c2e710a167b42e67e00bb05927891c47fc5d90fb8f80eff811a1643393b08bc03491576d3139 + languageName: node + linkType: hard + "to-regex-range@npm:^5.0.1": version: 5.0.1 resolution: "to-regex-range@npm:5.0.1"