Skip to content

Commit

Permalink
chore: add backpressure + download incomplete part first
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jan 22, 2024
1 parent b4fb15a commit 6235da5
Show file tree
Hide file tree
Showing 8 changed files with 489 additions and 108 deletions.
250 changes: 170 additions & 80 deletions packages/s3-store/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os from 'node:os'
import fs, {promises as fsProm} from 'node:fs'
import stream, {promises as streamProm} from 'node:stream'
import stream, {PassThrough, promises as streamProm} from 'node:stream'
import type {Readable} from 'node:stream'
import http from 'node:http'

import AWS, {NoSuchKey, NotFound, S3, S3ClientConfig} from '@aws-sdk/client-s3'
import debug from 'debug'
Expand All @@ -15,7 +14,11 @@ import {
TUS_RESUMABLE,
KvStore,
MemoryKvStore,
Semaphore,
Permit,
} from '@tus/server'
import crypto from 'node:crypto'
import path from 'node:path'

const log = debug('tus-node-server:stores:s3store')

Expand All @@ -25,6 +28,7 @@ type Options = {
// but may increase it to not exceed the S3 10K parts limit.
partSize?: number
useTags?: boolean
maxConcurrentPartUploads?: number
cache?: KvStore<MetadataValue>
expirationPeriodInMilliseconds?: number
// Options to pass to the AWS S3 SDK.
Expand All @@ -42,6 +46,45 @@ function calcOffsetFromParts(parts?: Array<AWS.Part>) {
return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0
}

function concatenateStreams(streams: Readable[]): Readable {
const passThrough = new PassThrough()
let currentStreamIndex = 0
let errorOccurred = false

const pipeNextStream = (): void => {
if (errorOccurred || currentStreamIndex >= streams.length) {
passThrough.end()
return
}

const stream = streams[currentStreamIndex]
stream.on('error', (err: Error) => {
if (!errorOccurred) {
errorOccurred = true
passThrough.emit('error', err)
passThrough.end()

// notify the other subsequent streams that an error has occurred
streams.slice(currentStreamIndex + 1).forEach((s) => {
if (typeof s.destroy === 'function') {
s.destroy(err)
}
})
}
})

stream.pipe(passThrough, {end: false})
stream.on('end', () => {
currentStreamIndex++
pipeNextStream()
})
}

pipeNextStream()

return passThrough
}

// Implementation (based on https://github.com/tus/tusd/blob/master/s3store/s3store.go)
//
// Once a new tus upload is initiated, multiple objects in S3 are created:
Expand Down Expand Up @@ -82,6 +125,7 @@ export class S3Store extends DataStore {
private preferredPartSize: number
private expirationPeriodInMilliseconds = 0
private useTags = true
private semaphore: Semaphore
public maxMultipartParts = 10_000 as const
public minPartSize = 5_242_880 as const // 5MiB
public maxUploadSize = 5_497_558_138_880 as const // 5TiB
Expand All @@ -101,8 +145,9 @@ export class S3Store extends DataStore {
this.preferredPartSize = partSize || 8 * 1024 * 1024
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0
this.useTags = options.useTags ?? true
this.client = new S3(restS3ClientConfig)
this.cache = options.cache ?? new MemoryKvStore<MetadataValue>()
this.client = new S3(restS3ClientConfig)
this.semaphore = new Semaphore(options.maxConcurrentPartUploads ?? 30)
}

protected shouldUseExpirationTags() {
Expand Down Expand Up @@ -233,6 +278,61 @@ export class S3Store extends DataStore {
return data.ETag as string
}

private async downloadIncompletePart(id: string) {
const incompletePart = await this.getIncompletePart(id)

if (!incompletePart) {
return
}
const filePath = await this.uniqueTmpFileName('tus-s3-incomplete-part-')

try {
let incompletePartSize = 0

const byteCounterTransform = new stream.Transform({
transform(chunk, _, callback) {
incompletePartSize += chunk.length
callback(null, chunk)
},
})

// write to temporary file
await streamProm.pipeline(
incompletePart,
byteCounterTransform,
fs.createWriteStream(filePath)
)

const createReadStream = (options: {cleanUpOnEnd: boolean}) => {
const fileReader = fs.createReadStream(filePath)

if (options.cleanUpOnEnd) {
fileReader.on('end', () => {
fs.unlink(filePath, () => {
// ignore
})
})

fileReader.on('error', (err) => {
fileReader.destroy(err)
fs.unlink(filePath, () => {
// ignore
})
})
}

return fileReader
}

return {size: incompletePartSize, path: filePath, createReader: createReadStream}
} catch (err) {
fsProm.rm(filePath).catch(() => {
/* ignore */
})
throw err
}
}

private async getIncompletePart(id: string): Promise<Readable | undefined> {
try {
const data = await this.client.getObject({
Expand Down Expand Up @@ -271,102 +371,58 @@ export class S3Store extends DataStore {
})
}

private async prependIncompletePart(
newChunkPath: string,
previousIncompletePart: Readable
): Promise<number> {
const tempPath = `${newChunkPath}-prepend`
try {
let incompletePartSize = 0

const byteCounterTransform = new stream.Transform({
transform(chunk, _, callback) {
incompletePartSize += chunk.length
callback(null, chunk)
},
})

// write to temporary file, truncating if needed
await streamProm.pipeline(
previousIncompletePart,
byteCounterTransform,
fs.createWriteStream(tempPath)
)
// append to temporary file
await streamProm.pipeline(
fs.createReadStream(newChunkPath),
fs.createWriteStream(tempPath, {flags: 'a'})
)
// overwrite existing file
await fsProm.rename(tempPath, newChunkPath)

return incompletePartSize
} catch (err) {
fsProm.rm(tempPath).catch(() => {
/* ignore */
})
throw err
}
}

/**
* Uploads a stream to s3 using multiple parts
*/
private async processUpload(
private async uploadParts(
metadata: MetadataValue,
readStream: http.IncomingMessage | fs.ReadStream,
readStream: stream.Readable,
currentPartNumber: number,
offset: number
): Promise<number> {
const size = metadata.file.size
const promises: Promise<void>[] = []
let pendingChunkFilepath: string | null = null
let bytesUploaded = 0
let currentChunkNumber = 0
let permit: Permit | undefined = undefined

const splitterStream = new StreamSplitter({
chunkSize: this.calcOptimalPartSize(size),
directory: os.tmpdir(),
asyncEvents: true,
})
.on('chunkStarted', (filepath) => {
.on('beforeChunkStarted', (_, done) => {
this.semaphore
.acquire()
.catch(done)
.then((acquiredPermit) => {
permit = acquiredPermit
done()
})
})
.on('chunkStarted', (filepath, done) => {
pendingChunkFilepath = filepath
done()
})
.on('chunkFinished', ({path, size: partSize}) => {
.on('chunkFinished', ({path, size: partSize}, done) => {
pendingChunkFilepath = null

const partNumber = currentPartNumber++
const chunkNumber = currentChunkNumber++
const acquiredPermit = permit

offset += partSize

const isFirstChunk = chunkNumber === 0
const isFinalPart = size === offset

// eslint-disable-next-line no-async-promise-executor
const deferred = new Promise<void>(async (resolve, reject) => {
try {
let incompletePartSize = 0
// Only the first chunk of each PATCH request can prepend
// an incomplete part (last chunk) from the previous request.
if (isFirstChunk) {
// If we received a chunk under the minimum part size in a previous iteration,
// we used a regular S3 upload to save it in the bucket. We try to get the incomplete part here.

const incompletePart = await this.getIncompletePart(metadata.file.id)
if (incompletePart) {
// We found an incomplete part, prepend it to the chunk on disk we were about to upload,
// and delete the incomplete part from the bucket. This can be done in parallel.
incompletePartSize = await this.prependIncompletePart(
path,
incompletePart
)
await this.deleteIncompletePart(metadata.file.id)
}
}

const readable = fs.createReadStream(path)
readable.on('error', reject)
if (partSize + incompletePartSize >= this.minPartSize || isFinalPart) {

if (partSize >= this.minPartSize || isFinalPart) {
await this.uploadPart(metadata, readable, partNumber)
} else {
await this.uploadIncompletePart(metadata.file.id, readable)
Expand All @@ -380,10 +436,16 @@ export class S3Store extends DataStore {
fsProm.rm(path).catch(() => {
/* ignore */
})
acquiredPermit?.release()
}
})

promises.push(deferred)
done()
})
.on('chunkError', (_, done) => {
permit?.release()
done()
})

try {
Expand Down Expand Up @@ -484,9 +546,11 @@ export class S3Store extends DataStore {
else if (size <= this.preferredPartSize * this.maxMultipartParts) {
optimalPartSize = this.preferredPartSize
// The upload is too big for the preferred size.
// We devide the size with the max amount of parts and round it up.
// We divide the size with the max amount of parts and round it up.
} else if (size % this.maxMultipartParts === 0) {
optimalPartSize = size / this.maxMultipartParts
} else {
optimalPartSize = Math.ceil(size / this.maxMultipartParts)
optimalPartSize = size / this.maxMultipartParts + 1
}

return optimalPartSize
Expand Down Expand Up @@ -533,26 +597,30 @@ export class S3Store extends DataStore {
/**
* Write to the file, starting at the provided offset
*/
public async write(
readable: http.IncomingMessage | fs.ReadStream,
id: string,
offset: number
): Promise<number> {
public async write(src: stream.Readable, id: string, offset: number): Promise<number> {
// Metadata request needs to happen first
const metadata = await this.getMetadata(id)
const parts = await this.retrieveParts(id)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const partNumber: number = parts.length > 0 ? parts[parts.length - 1].PartNumber! : 0
const nextPartNumber = partNumber + 1

const bytesUploaded = await this.processUpload(
metadata,
readable,
nextPartNumber,
offset
)
const incompletePart = await this.downloadIncompletePart(id)
const requestedOffset = offset

if (incompletePart) {
// once the file is on disk, we delete the incomplete part
await this.deleteIncompletePart(id)

const newOffset = offset + bytesUploaded
offset = requestedOffset - incompletePart.size
src = concatenateStreams([incompletePart.createReader({cleanUpOnEnd: true}), src])
}

const bytesUploaded = await this.uploadParts(metadata, src, nextPartNumber, offset)

// The size of the incomplete part should not be counted, because the
// process of the incomplete part should be fully transparent to the user.
const newOffset = requestedOffset + bytesUploaded - (incompletePart?.size ?? 0)

if (metadata.file.size === newOffset) {
try {
Expand Down Expand Up @@ -741,4 +809,26 @@ export class S3Store extends DataStore {

return deleted
}

private async uniqueTmpFileName(template: string): Promise<string> {
const fileName = template + crypto.randomBytes(10).toString('base64url').slice(0, 10)
const filePath = path.join(os.tmpdir(), fileName)

const fileExists = await new Promise<boolean>((resolve, reject) => {
fs.lstat(filePath, (error) => {
if (!error) {
return resolve(true)
}
if (error.code === 'ENOENT') {
return resolve(false)
}
reject(error)
})
})

if (fileExists) {
return this.uniqueTmpFileName(template)
}
return filePath
}
}
Loading

0 comments on commit 6235da5

Please sign in to comment.