Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@tus/server: add support for lockers #514

Merged
merged 15 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .yarnrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ plugins:
spec: "@yarnpkg/plugin-typescript"
- path: .yarn/plugins/@yarnpkg/plugin-interactive-tools.cjs
spec: "@yarnpkg/plugin-interactive-tools"

yarnPath: .yarn/releases/yarn-3.2.3.cjs
8 changes: 8 additions & 0 deletions packages/server/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export const ERRORS = {
status_code: 403,
body: 'Upload-Offset header required\n',
},
ABORTED: {
status_code: 400,
body: 'Request aborted due to lock acquired',
},
ERR_LOCK_TIMEOUT: {
status_code: 500,
body: 'failed to acquire lock before timeout',
},
INVALID_CONTENT_TYPE: {
status_code: 403,
body: 'Content-Type header required\n',
Expand Down
66 changes: 65 additions & 1 deletion packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import EventEmitter from 'node:events'

import type {ServerOptions} from '../types'
import type {DataStore} from '../models'
import type {DataStore, CancellationContext} from '../models'
import type http from 'node:http'
import stream from 'node:stream'
import {ERRORS} from '../constants'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
Expand All @@ -27,6 +29,7 @@ export class BaseHandler extends EventEmitter {
// @ts-expect-error not explicitly typed but possible
headers['Content-Length'] = Buffer.byteLength(body, 'utf8')
}

res.writeHead(status, headers)
res.write(body)
return res.end()
Expand Down Expand Up @@ -101,4 +104,65 @@ export class BaseHandler extends EventEmitter {

return {host: host as string, proto}
}

protected getLocker(req: http.IncomingMessage) {
fenos marked this conversation as resolved.
Show resolved Hide resolved
if (typeof this.options.locker === 'function') {
return this.options.locker(req)
fenos marked this conversation as resolved.
Show resolved Hide resolved
}
return this.options.locker
}

protected async acquireLock(
req: http.IncomingMessage,
id: string,
context: CancellationContext
) {
const locker = await this.getLocker(req)

if (!locker) {
return
}

const lock = locker.newLock(id)

await lock.lock(() => {
context.cancel()
})

return lock
}

protected writeToStore(
req: http.IncomingMessage,
id: string,
offset: number,
context: CancellationContext
) {
return new Promise<number>(async (resolve, reject) => {
if (context.signal.aborted) {
reject(ERRORS.ABORTED)
return
}

const proxy = new stream.PassThrough()
stream.addAbortSignal(context.signal, proxy)

proxy.on('error', (err) => {
req.unpipe(proxy)
if (err.name === 'AbortError') {
reject(ERRORS.ABORTED)
} else {
reject(err)
}
})

req.on('error', (err) => {
if (!proxy.closed) {
proxy.destroy(err)
}
})

this.store.write(req.pipe(proxy), id, offset).then(resolve).catch(reject)
Acconut marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
14 changes: 12 additions & 2 deletions packages/server/src/handlers/DeleteHandler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import {BaseHandler} from './BaseHandler'
import {ERRORS, EVENTS} from '../constants'
import {CancellationContext} from '../models'

import type http from 'node:http'

export class DeleteHandler extends BaseHandler {
async send(req: http.IncomingMessage, res: http.ServerResponse) {
async send(
req: http.IncomingMessage,
res: http.ServerResponse,
context: CancellationContext
) {
const id = this.getFileIdFromRequest(req)
if (!id) {
throw ERRORS.FILE_NOT_FOUND
Expand All @@ -14,7 +19,12 @@ export class DeleteHandler extends BaseHandler {
await this.options.onIncomingRequest(req, res, id)
}

await this.store.remove(id)
const lock = await this.acquireLock(req, id, context)
try {
await this.store.remove(id)
} finally {
await lock?.unlock()
}
const writtenRes = this.write(res, 204, {})
this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id)
return writtenRes
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/handlers/GetHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class GetHandler extends BaseHandler {
}

const stats = await this.store.getUpload(id)

if (!stats || stats.offset !== stats.size) {
throw ERRORS.FILE_NOT_FOUND
}
Expand Down
17 changes: 14 additions & 3 deletions packages/server/src/handlers/HeadHandler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import {BaseHandler} from './BaseHandler'

import {ERRORS} from '../constants'
import {Metadata} from '../models'
import {Metadata, Upload, CancellationContext} from '../models'

import type http from 'node:http'

export class HeadHandler extends BaseHandler {
async send(req: http.IncomingMessage, res: http.ServerResponse) {
async send(
req: http.IncomingMessage,
res: http.ServerResponse,
context: CancellationContext
) {
const id = this.getFileIdFromRequest(req)
if (!id) {
throw ERRORS.FILE_NOT_FOUND
Expand All @@ -16,7 +20,14 @@ export class HeadHandler extends BaseHandler {
await this.options.onIncomingRequest(req, res, id)
}

const file = await this.store.getUpload(id)
const lock = await this.acquireLock(req, id, context)

let file: Upload
try {
file = await this.store.getUpload(id)
} finally {
await lock?.unlock()
}

// If a Client does attempt to resume an upload which has since
// been removed by the Server, the Server SHOULD respond with the
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/handlers/OptionsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ export class OptionsHandler extends BaseHandler {
res.setHeader('Tus-Extension', this.store.extensions.join(','))
}

return this.write(res, 204)
return this.write(res, 204, {})
}
}
Loading