Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@tus/server: add GCS locker #616

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
415 changes: 247 additions & 168 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions packages/gcs-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"debug": "^4.3.4"
},
"devDependencies": {
"@google-cloud/storage": "^6.12.0",
"@google-cloud/storage": "^7.12.0",
"@tus/server": "^1.7.0",
"@types/debug": "^4.1.12",
"@types/mocha": "^10.0.6",
Expand All @@ -35,7 +35,7 @@
"should": "^13.2.3"
},
"peerDependencies": {
"@google-cloud/storage": "*"
"@google-cloud/storage": "^7.12.0"
},
"engines": {
"node": ">=16"
Expand Down
34 changes: 22 additions & 12 deletions packages/gcs-store/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const log = debug('tus-node-server:stores:gcsstore')

type Options = {bucket: Bucket}

export {GCSLocker} from './locker/GCSLocker'

export class GCSStore extends DataStore {
bucket: Bucket

Expand Down Expand Up @@ -39,11 +41,7 @@ export class GCSStore extends DataStore {
metadata: {
metadata: {
tus_version: TUS_RESUMABLE,
size: file.size,
sizeIsDeferred: `${file.sizeIsDeferred}`,
offset: file.offset,
metadata: JSON.stringify(file.metadata),
storage: JSON.stringify(file.storage),
...this.#stringifyUploadKeys(file),
},
},
}
Expand Down Expand Up @@ -77,15 +75,14 @@ export class GCSStore extends DataStore {
return new Promise((resolve, reject) => {
const file = this.bucket.file(id)
const destination = upload.offset === 0 ? file : this.bucket.file(`${id}_patch`)

upload.offset = offset

const options = {
metadata: {
metadata: {
tus_version: TUS_RESUMABLE,
size: upload.size,
sizeIsDeferred: `${upload.sizeIsDeferred}`,
offset,
metadata: JSON.stringify(upload.metadata),
storage: JSON.stringify(upload.storage),
...this.#stringifyUploadKeys(upload),
},
},
}
Expand Down Expand Up @@ -151,7 +148,7 @@ export class GCSStore extends DataStore {
return resolve(
new Upload({
id,
size: size ? Number.parseInt(size, 10) : size,
size: size ? Number.parseInt(size, 10) : undefined,
offset: Number.parseInt(metadata.size, 10), // `size` is set by GCS
metadata: meta ? JSON.parse(meta) : undefined,
storage: {type: 'gcs', path: id, bucket: this.bucket.name},
Expand All @@ -166,6 +163,19 @@ export class GCSStore extends DataStore {

upload.size = upload_length

await this.bucket.file(id).setMetadata({metadata: upload})
await this.bucket.file(id).setMetadata({metadata: this.#stringifyUploadKeys(upload)})
}

/**
* Convert the Upload object to a format that can be stored in GCS metadata.
*/
#stringifyUploadKeys(upload: Upload) {
return {
size: upload.size ?? null,
sizeIsDeferred: `${upload.sizeIsDeferred}`,
offset: upload.offset,
metadata: JSON.stringify(upload.metadata),
storage: JSON.stringify(upload.storage),
}
}
}
118 changes: 118 additions & 0 deletions packages/gcs-store/src/locker/GCSLock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import {RequestRelease} from '@tus/utils'
import {Bucket} from '@google-cloud/storage'
import GCSLockFile, {GCSLockFileMetadata} from './GCSLockFile'

/**
* Handles interaction with a lock.
*/
export default class GCSLock {
protected resourceId: string
protected file: GCSLockFile
protected ttl: number
protected watchInterval: number
protected watcher: NodeJS.Timeout | undefined

constructor(
resourceId: string,
lockBucket: Bucket,
ttl: number,
watchInterval: number
) {
this.resourceId = resourceId
this.file = new GCSLockFile(lockBucket, `${resourceId}.lock`)
this.ttl = ttl
this.watchInterval = watchInterval
}

/**
* Try to create the lockfile and start the watcher. If lock is already taken, requests for release and returns FALSE.
*/
public async take(cancelHandler: RequestRelease): Promise<boolean> {
try {
//Try to create lock file
const exp = Date.now() + this.ttl
await this.file.create(exp)

//Lock acquired, start watcher
this.startWatcher(cancelHandler)

return true
} catch (err) {
//Probably lock is already taken
const isHealthy = await this.insureHealth()

if (!isHealthy) {
//Lock is not healthy, restart the process
return await this.take(cancelHandler)
} else {
//Lock is still healthy, request release
await this.file.requestRelease()

return false
}
}
}

/**
* Release the lock - clear watcher and delete the file.
*/
public async release() {
//Clear watcher
clearInterval(this.watcher)

//Delete the lock file
this.file.deleteOwn()
}

/**
* Check if the lock is healthy, delete if not.
* Returns TRUE if the lock is healthy.
*/
protected async insureHealth() {
try {
const meta = await this.file.getMeta()

if (this.hasExpired(meta)) {
//TTL expired, delete unhealthy lock
await this.file.deleteUnhealthy(meta.metageneration as number)

return false
}
} catch (err) {
//Probably lock does not exist (anymore)
return false
}

return true
}

/**
* Start watching the lock file - keep it healthy and handle release requests.
*/
protected startWatcher(cancelHandler: RequestRelease) {
this.watcher = setInterval(() => {
const handleError = () => {
//Probably the watched lock is freed, terminate watcher
clearInterval(this.watcher)
}

this.file.checkOwnReleaseRequest().then((shouldRelease) => {
if (shouldRelease) {
cancelHandler()
}

//Update TTL to keep the lock healthy
const exp = Date.now() + this.ttl
this.file.refreshOwn(exp).catch(handleError)
}, handleError)
}, this.watchInterval)
}

/**
* Compare lock expiration timestamp with the current time.
*/
protected hasExpired(meta: GCSLockFileMetadata) {
const expDate = Date.parse(meta.exp + '')
return !expDate || expDate < Date.now()
}
}
128 changes: 128 additions & 0 deletions packages/gcs-store/src/locker/GCSLockFile.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import {Bucket, File} from '@google-cloud/storage'
import type {FileMetadata} from '@google-cloud/storage'

export type GCSLockFileMetadata = FileMetadata & {
/**
* The lock file expires at this time (in ms) if its not refreshed.
*/
exp: number
}

type MetaGeneration = string | number | undefined

/**
* Handles communication with GCS.
*/
export default class GCSLockFile {
/**
* Name of the file in the bucket.
*/
protected name: string
/**
* GCS File instance for the lock.
*/
protected lockFile: File
/**
* GCS File instance for release request.
*/
protected releaseFile: File
/**
* The last known metageneration of the file. If it does not match the GCS metageneration, this lockfile has been deleted and another instance has already created a new one.
*/
protected currentMetaGeneration: MetaGeneration

constructor(bucket: Bucket, name: string) {
this.name = name
this.lockFile = bucket.file(name)
this.releaseFile = bucket.file(name + '.release')
}
/**
* Create the lockfile with the specified exp time. Throws if the file already exists
*/
public async create(exp: number) {
const metadata: GCSLockFileMetadata = {
exp,
cacheControl: 'no-store',
}

await this.lockFile.save('', {
preconditionOpts: {ifGenerationMatch: 0},
metadata,
})
this.currentMetaGeneration = 0
}

/**
* Fetch metadata of the lock file.
*/
public async getMeta() {
return (await this.lockFile.getMetadata())[0] as GCSLockFileMetadata
}

/**
* Refresh our own lockfile. Throws if it does not exist or the file is modified by another instance.
*/
public async refreshOwn(exp: number) {
const metadata: GCSLockFileMetadata = {
exp,
}
const res = await this.lockFile.setMetadata(metadata, {
ifMetaGenerationMatch: this.currentMetaGeneration,
})
this.currentMetaGeneration = res[0].metageneration
}
/**
* Check if a release request has been submitted to our own lockfile. Throws if it does not exist or the file is modified by another instance.
*/
public async checkOwnReleaseRequest() {
const meta = await this.getMeta()
if (meta.metageneration !== this.currentMetaGeneration) {
throw new Error('This lockfile has been already taken by another instance.')
}

const releaseRequestExists = (await this.releaseFile.exists())[0]
return releaseRequestExists
}

/**
* Delete our own lockfile if it still exists.
*/
public async deleteOwn() {
try {
await this.deleteReleaseRequest()
await this.lockFile.delete({ifGenerationMatch: this.currentMetaGeneration})
} catch (err) {
//Probably already deleted, no need to report
}
}

/**
* Request releasing the lock from another instance. As metadata edits are only prohibited for the owner (so it can keep track of metageneration), we write to a separate file.
*/
public async requestRelease() {
try {
await this.releaseFile.save('', {
preconditionOpts: {ifGenerationMatch: 0},
})
} catch (err) {
//Release file already created, no need to report
}
}

/**
* Delete the unhealthy file of a previous lock.
*/
public async deleteUnhealthy(metaGeneration: number) {
await this.deleteReleaseRequest()
await this.lockFile.delete({ifMetagenerationMatch: metaGeneration})
}

/**
* Delete release request file of the lock if exists.
*/
protected async deleteReleaseRequest() {
try {
await this.releaseFile.delete()
} catch (err) {}
}
}
Loading
Loading