Skip to content

Commit

Permalink
feat: distributed locks
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Nov 10, 2023
1 parent 612ac24 commit 6aa643a
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 18 deletions.
4 changes: 3 additions & 1 deletion packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import EventEmitter from 'node:events'

import type {ServerOptions} from '../types'
import type {DataStore} from '../models'
import type {DataStore, Locker} from '../models'
import type http from 'node:http'

const reExtractFileID = /([^/]+)\/?$/
Expand All @@ -11,6 +11,7 @@ const reForwardedProto = /proto=(https?)/
export class BaseHandler extends EventEmitter {
options: ServerOptions
store: DataStore
locker?: Locker

constructor(store: DataStore, options: ServerOptions) {
super()
Expand All @@ -20,6 +21,7 @@ export class BaseHandler extends EventEmitter {

this.store = store
this.options = options
this.locker = options.locker
}

write(res: http.ServerResponse, status: number, headers = {}, body = '') {
Expand Down
8 changes: 7 additions & 1 deletion packages/server/src/handlers/DeleteHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ export class DeleteHandler extends BaseHandler {
throw ERRORS.FILE_NOT_FOUND
}

await this.store.remove(id)
try {
await this.locker?.lock(id)
await this.store.remove(id)
} finally {
await this.locker?.unlock(id)
}

const writtenRes = this.write(res, 204, {})
this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id)
return writtenRes
Expand Down
10 changes: 9 additions & 1 deletion packages/server/src/handlers/GetHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {ERRORS} from '../constants'

import type http from 'node:http'
import type {RouteHandler} from '../types'
import {Upload} from '../models'

export class GetHandler extends BaseHandler {
paths: Map<string, RouteHandler> = new Map()
Expand Down Expand Up @@ -35,7 +36,14 @@ export class GetHandler extends BaseHandler {
throw ERRORS.FILE_NOT_FOUND
}

const stats = await this.store.getUpload(id)
let stats: Upload
try {
await this.locker?.lock(id)
stats = await this.store.getUpload(id)
} finally {
await this.locker?.unlock(id)
}

if (!stats || stats.offset !== stats.size) {
throw ERRORS.FILE_NOT_FOUND
}
Expand Down
10 changes: 8 additions & 2 deletions packages/server/src/handlers/HeadHandler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {BaseHandler} from './BaseHandler'

import {ERRORS} from '../constants'
import {Metadata} from '../models'
import {Metadata, Upload} from '../models'

import type http from 'node:http'

Expand All @@ -12,7 +12,13 @@ export class HeadHandler extends BaseHandler {
throw ERRORS.FILE_NOT_FOUND
}

const file = await this.store.getUpload(id)
let file: Upload
try {
await this.locker?.lock(id)
file = await this.store.getUpload(id)
} finally {
await this.locker?.unlock(id)
}

// If a Client does attempt to resume an upload which has since
// been removed by the Server, the Server SHOULD respond with the
Expand Down
11 changes: 10 additions & 1 deletion packages/server/src/handlers/PatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ export class PatchHandler extends BaseHandler {
throw ERRORS.INVALID_CONTENT_TYPE
}

await this.locker?.lock(id)
const upload = await this.store.getUpload(id)
await this.locker?.unlock(id)

// If a Client does attempt to resume an upload which has since
// been removed by the Server, the Server SHOULD respond with the
Expand Down Expand Up @@ -77,7 +79,14 @@ export class PatchHandler extends BaseHandler {
upload.size = size
}

const newOffset = await this.store.write(req, id, offset)
let newOffset: number
try {
await this.locker?.lock(id)
newOffset = await this.store.write(req, id, offset)
} finally {
await this.locker?.unlock(id)
}

upload.offset = newOffset
this.emit(EVENTS.POST_RECEIVE, req, res, upload)
if (newOffset === upload.size && this.options.onUploadFinish) {
Expand Down
32 changes: 21 additions & 11 deletions packages/server/src/handlers/PostHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,15 @@ export class PostHandler extends BaseHandler {

// The request MIGHT include a Content-Type header when using creation-with-upload extension
if (validateHeader('content-type', req.headers['content-type'])) {
newOffset = await this.store.write(req, upload.id, 0)
headers['Upload-Offset'] = newOffset.toString()
isFinal = newOffset === Number.parseInt(upload_length as string, 10)
upload.offset = newOffset
try {
await this.locker?.lock(upload.id)
newOffset = await this.store.write(req, upload.id, 0)
headers['Upload-Offset'] = newOffset.toString()
isFinal = newOffset === Number.parseInt(upload_length as string, 10)
upload.offset = newOffset
} finally {
await this.locker?.unlock(upload.id)
}
}
if (isFinal && this.options.onUploadFinish) {
try {
Expand All @@ -119,13 +124,18 @@ export class PostHandler extends BaseHandler {
this.store.getExpiration() > 0 &&
upload.creation_date
) {
const created = await this.store.getUpload(upload.id)
if (created.offset !== Number.parseInt(upload_length as string, 10)) {
const creation = new Date(upload.creation_date)
// Value MUST be in RFC 7231 datetime format
headers['Upload-Expires'] = new Date(
creation.getTime() + this.store.getExpiration()
).toUTCString()
try {
await this.locker?.lock(upload.id)
const created = await this.store.getUpload(upload.id)
if (created.offset !== Number.parseInt(upload_length as string, 10)) {
const creation = new Date(upload.creation_date)
// Value MUST be in RFC 7231 datetime format
headers['Upload-Expires'] = new Date(
creation.getTime() + this.store.getExpiration()
).toUTCString()
}
} finally {
await this.locker?.unlock(upload.id)
}
}

Expand Down
34 changes: 34 additions & 0 deletions packages/server/src/models/Locker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import {EventEmitter} from 'node:events'

export interface Locker {
lock(id: string): Promise<void>
unlock(id: string): Promise<void>
}

export class MemoryLocker implements Locker {
private lockEvent = new EventEmitter()
private locks: Record<string, boolean> = {}

isLocked(id: string) {
return Boolean(this.locks[id])
}

lock(id: string): Promise<void> {
if (!this.isLocked(id)) {
this.locks[id] = true
return Promise.resolve()
}

return new Promise((resolve) => {
this.lockEvent.once(`unlock:${id}`, () => {
this.locks[id] = true
resolve()
})
})
}

async unlock(id: string): Promise<void> {
delete this.locks[id]
this.lockEvent.emit(`unlock:${id}`)
}
}
1 change: 1 addition & 0 deletions packages/server/src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * as Metadata from './Metadata'
export {StreamSplitter} from './StreamSplitter'
export {Uid} from './Uid'
export {Upload} from './Upload'
export {Locker, MemoryLocker} from './Locker'
4 changes: 3 additions & 1 deletion packages/server/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type http from 'node:http'

import type {Upload} from './models'
import type {Locker, Upload} from './models'

export type ServerOptions = {
// The route to accept requests.
Expand All @@ -14,6 +14,8 @@ export type ServerOptions = {
// It is important to make these unique to prevent data loss. Only use it if you really need to.
// Default uses `crypto.randomBytes(16).toString('hex')`.
namingFunction?: (req: http.IncomingMessage) => string
// locker implementation to support distributed locks
locker?: Locker
// `onUploadCreate` will be invoked before a new upload is created.
// If the function returns the (modified) response, the upload will be created.
// If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks)
Expand Down
19 changes: 19 additions & 0 deletions packages/server/test/Locker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import {MemoryLocker} from '../src/models/Locker'
import * as assert from 'assert'

describe('MemoryLocker', () => {
it('will hold the lock for subsequent calls until released', async () => {
const locker = new MemoryLocker()
const lockId = 'upload-id-1'

const date = new Date()
await locker.lock(lockId)
setTimeout(() => {
locker.unlock(lockId)
}, 300)
await locker.lock(lockId) // will wait until the other lock is released
await locker.unlock(lockId)
const endDate = new Date().valueOf() - date.valueOf()
assert.equal(endDate >= 300, true)
})
})

0 comments on commit 6aa643a

Please sign in to comment.