Skip to content

Commit

Permalink
feat: distributed locks
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Nov 24, 2023
1 parent 612ac24 commit a4edaf0
Show file tree
Hide file tree
Showing 14 changed files with 451 additions and 115 deletions.
277 changes: 178 additions & 99 deletions packages/s3-store/index.ts

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions packages/server/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export const ERRORS = {
status_code: 403,
body: 'Upload-Offset header required\n',
},
ABORTED: {
status_code: 409,
body: 'Request aborted due to lock acquired',
},
ERR_LOCK_TIMEOUT: {
status_code: 500,
body: 'failed to acquire lock before timeout',
},
INVALID_CONTENT_TYPE: {
status_code: 403,
body: 'Content-Type header required\n',
Expand Down
29 changes: 29 additions & 0 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import EventEmitter from 'node:events'
import type {ServerOptions} from '../types'
import type {DataStore} from '../models'
import type http from 'node:http'
import {ERRORS} from '../constants'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
Expand All @@ -27,6 +28,7 @@ export class BaseHandler extends EventEmitter {
// @ts-expect-error not explicitly typed but possible
headers['Content-Length'] = Buffer.byteLength(body, 'utf8')
}

res.writeHead(status, headers)
res.write(body)
return res.end()
Expand Down Expand Up @@ -78,4 +80,31 @@ export class BaseHandler extends EventEmitter {

return decodeURIComponent(match[1])
}

getLocker(req: http.IncomingMessage) {
if (typeof this.options.locker === 'function') {
return this.options.locker(req)
}
return this.options.locker
}

async lock<T>(
req: http.IncomingMessage,
id: string,
fn: (signal: AbortSignal) => Promise<T>
) {
const abortController = new AbortController()
const locker = this.getLocker(req)
await locker?.lock(id, () => {
if (!abortController.signal.aborted) {
abortController.abort(ERRORS.ABORTED)
}
})

try {
return await fn(abortController.signal)
} finally {
await locker?.unlock(id)
}
}
}
5 changes: 4 additions & 1 deletion packages/server/src/handlers/DeleteHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ export class DeleteHandler extends BaseHandler {
throw ERRORS.FILE_NOT_FOUND
}

await this.store.remove(id)
await this.lock(req, id, (signal) => {
return this.store.remove(id, {signal})
})

const writtenRes = this.write(res, 204, {})
this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id)
return writtenRes
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/handlers/GetHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class GetHandler extends BaseHandler {
}

const stats = await this.store.getUpload(id)

if (!stats || stats.offset !== stats.size) {
throw ERRORS.FILE_NOT_FOUND
}
Expand Down
4 changes: 3 additions & 1 deletion packages/server/src/handlers/HeadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ export class HeadHandler extends BaseHandler {
throw ERRORS.FILE_NOT_FOUND
}

const file = await this.store.getUpload(id)
const file = await this.lock(req, id, (signal) => {
return this.store.getUpload(id, {signal})
})

// If a Client does attempt to resume an upload which has since
// been removed by the Server, the Server SHOULD respond with the
Expand Down
13 changes: 10 additions & 3 deletions packages/server/src/handlers/PatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ export class PatchHandler extends BaseHandler {
throw ERRORS.INVALID_CONTENT_TYPE
}

const upload = await this.store.getUpload(id)
const upload = await this.lock(req, id, (signal) => {
return this.store.getUpload(id, {signal: signal})
})

// If a Client does attempt to resume an upload which has since
// been removed by the Server, the Server SHOULD respond with the
Expand Down Expand Up @@ -73,11 +75,16 @@ export class PatchHandler extends BaseHandler {
throw ERRORS.INVALID_LENGTH
}

await this.store.declareUploadLength(id, size)
await this.lock(req, id, (signal) => {
return this.store.declareUploadLength(id, size, {signal: signal})
})
upload.size = size
}

const newOffset = await this.store.write(req, id, offset)
const newOffset = await this.lock(req, id, (signal) => {
return this.store.write(req, id, offset, {signal: signal})
})

upload.offset = newOffset
this.emit(EVENTS.POST_RECEIVE, req, res, upload)
if (newOffset === upload.size && this.options.onUploadFinish) {
Expand Down
11 changes: 8 additions & 3 deletions packages/server/src/handlers/PostHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ export class PostHandler extends BaseHandler {

this.emit(EVENTS.POST_CREATE, req, res, upload, url)

let newOffset
let isFinal = upload.size === 0 && !upload.sizeIsDeferred
const headers: {
'Upload-Offset'?: string
Expand All @@ -98,7 +97,10 @@ export class PostHandler extends BaseHandler {

// The request MIGHT include a Content-Type header when using creation-with-upload extension
if (validateHeader('content-type', req.headers['content-type'])) {
newOffset = await this.store.write(req, upload.id, 0)
const newOffset = await this.lock(req, id, async (signal) => {
return this.store.write(req, upload.id, 0, {signal: signal})
})

headers['Upload-Offset'] = newOffset.toString()
isFinal = newOffset === Number.parseInt(upload_length as string, 10)
upload.offset = newOffset
Expand All @@ -119,7 +121,10 @@ export class PostHandler extends BaseHandler {
this.store.getExpiration() > 0 &&
upload.creation_date
) {
const created = await this.store.getUpload(upload.id)
const created = await this.lock(req, id, async (signal) => {
return this.store.getUpload(upload.id, {signal: signal})
})

if (created.offset !== Number.parseInt(upload_length as string, 10)) {
const creation = new Date(upload.creation_date)
// Value MUST be in RFC 7231 datetime format
Expand Down
21 changes: 15 additions & 6 deletions packages/server/src/models/DataStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import {Upload} from './Upload'
import type stream from 'node:stream'
import type http from 'node:http'

export interface DataStoreDefaultOptions {
signal?: AbortSignal
}

export class DataStore extends EventEmitter {
extensions: string[] = []

Expand All @@ -19,15 +23,15 @@ export class DataStore extends EventEmitter {
*
* http://tus.io/protocols/resumable-upload.html#creation
*/
async create(file: Upload) {
async create(file: Upload, options?: DataStoreDefaultOptions) {
return file
}

/**
* Called in DELETE requests. This method just deletes the file from the store.
* http://tus.io/protocols/resumable-upload.html#termination
*/
async remove(id: string) {}
async remove(id: string, options?: DataStoreDefaultOptions) {}

/**
* Called in PATCH requests. This method should write data
Expand All @@ -39,7 +43,8 @@ export class DataStore extends EventEmitter {
async write(
stream: http.IncomingMessage | stream.Readable,
id: string,
offset: number
offset: number,
options?: DataStoreDefaultOptions
) {
return 0
}
Expand All @@ -49,19 +54,23 @@ export class DataStore extends EventEmitter {
* writen to the DataStore, for the client to know where to resume
* the upload.
*/
async getUpload(id: string): Promise<Upload> {
async getUpload(id: string, options?: DataStoreDefaultOptions): Promise<Upload> {
return new Upload({id, size: 0, offset: 0})
}

/**
* Called in PATCH requests when upload length is known after being defered.
*/
async declareUploadLength(id: string, upload_length: number) {}
async declareUploadLength(
id: string,
upload_length: number,
options?: DataStoreDefaultOptions
) {}

/**
* Returns number of expired uploads that were deleted.
*/
async deleteExpired(): Promise<number> {
async deleteExpired(options?: DataStoreDefaultOptions): Promise<number> {
return 0
}

Expand Down
101 changes: 101 additions & 0 deletions packages/server/src/models/Locker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import {ERRORS} from '../constants'

export type RequestRelease = () => Promise<void> | void

export interface Locker {
lock(id: string, cancelReq: RequestRelease): Promise<void>
unlock(id: string): Promise<void>
}

export interface MemoryLockerOptions {
acquireLockTimeout: number
}

class Lock {
public locked = false
public requestRelease?: RequestRelease
}

export class MemoryLocker implements Locker {
private locks = new Map<string, Lock>()
protected timeout: number

constructor(options?: MemoryLockerOptions) {
this.timeout = options?.acquireLockTimeout ?? 1000 * 30
}

private getLock(id: string): Lock {
let lock = this.locks.get(id)
if (lock === undefined) {
lock = new Lock()
this.locks.set(id, lock)
}
return lock
}

async lock(id: string, requestRelease: RequestRelease): Promise<void> {
const abortController = new AbortController()
const lock = await Promise.race([
this.waitTimeout(abortController.signal),
this.acquireLock(id, abortController.signal),
])

abortController.abort()

if (!lock) {
throw ERRORS.ERR_LOCK_TIMEOUT
}
lock.requestRelease = requestRelease
}

protected async acquireLock(id: string, signal: AbortSignal): Promise<Lock | void> {
if (signal.aborted) {
return
}

const lock = this.getLock(id)

if (!lock.locked) {
lock.locked = true
return lock
}

await lock.requestRelease?.()

const potentialNewLock = this.getLock(id)
if (!potentialNewLock.locked) {
potentialNewLock.locked = true
return potentialNewLock
}

return await new Promise((resolve, reject) => {
setImmediate(() => {
this.acquireLock(id, signal).then(resolve).catch(reject)
})
})
}

async unlock(id: string): Promise<void> {
const lock = this.getLock(id)
if (!lock.locked) {
throw new Error('Releasing an unlocked lock!')
}

this.locks.delete(id)
}

protected waitTimeout(signal: AbortSignal) {
return new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
resolve()
}, this.timeout)

const abortListener = () => {
clearTimeout(timeout)
signal.removeEventListener('abort', abortListener)
resolve()
}
signal.addEventListener('abort', abortListener)
})
}
}
1 change: 1 addition & 0 deletions packages/server/src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * as Metadata from './Metadata'
export {StreamSplitter} from './StreamSplitter'
export {Uid} from './Uid'
export {Upload} from './Upload'
export {Locker, MemoryLocker} from './Locker'
14 changes: 13 additions & 1 deletion packages/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,19 @@ export class Server extends EventEmitter {
const onError = (error: {status_code?: number; body?: string; message: string}) => {
const status_code = error.status_code || ERRORS.UNKNOWN_ERROR.status_code
const body = error.body || `${ERRORS.UNKNOWN_ERROR.body}${error.message || ''}\n`
return this.write(res, status_code, body)
const isAbortError =
status_code === ERRORS.ABORTED.status_code && error.body === ERRORS.ABORTED.body

if (isAbortError) {
res.setHeader('Connection', 'close')
}

const response = this.write(res, status_code, body)

if (isAbortError) {
req.destroy()
}
return response
}

try {
Expand Down
4 changes: 3 additions & 1 deletion packages/server/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type http from 'node:http'

import type {Upload} from './models'
import type {Locker, Upload} from './models'

export type ServerOptions = {
// The route to accept requests.
Expand All @@ -14,6 +14,8 @@ export type ServerOptions = {
// It is important to make these unique to prevent data loss. Only use it if you really need to.
// Default uses `crypto.randomBytes(16).toString('hex')`.
namingFunction?: (req: http.IncomingMessage) => string
// locker implementation to support distributed locks
locker?: Locker | ((req: http.IncomingMessage) => Locker)
// `onUploadCreate` will be invoked before a new upload is created.
// If the function returns the (modified) response, the upload will be created.
// If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks)
Expand Down
Loading

0 comments on commit a4edaf0

Please sign in to comment.