-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
@s3/store: add backpressure + download incomplete part first #561
Changes from 8 commits
6235da5
8a1d6c7
3d72893
b0dad84
a4ffbd0
6499867
c95db2b
84eaff5
7236f44
37db04f
2aacf2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,22 @@ you need to provide a cache implementation that is shared between all instances | |
|
||
See the exported [KV stores][kvstores] from `@tus/server` for more information. | ||
|
||
#### `options.maxConcurrentPartUploads` | ||
|
||
This setting determines the maximum number of simultaneous part uploads to an S3 storage service. | ||
The default value is 60. This default is chosen in conjunction with the typical partSize of 8MiB, aiming for an effective transfer rate of approximately 480MiB/s. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The calculation seems assume that one part is uploaded in 1s. Might be worth mentioning. Also transfer speeds are commonly given in Mbit/s not MiB/s. That would be 3.84 Gbit/s, which seems very high. I am not sure if the transfer speeds to S3 reach that level. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Traffic between Amazon EC2 and Amazon S3 uses up to 100 Gbps of bandwidth to Amazon Virtual Private Cloud (Amazon VPC) endpoints and public IP addresses in the same AWS Region. According to google is 12.5 Gbit/s |
||
|
||
**Considerations:** | ||
The ideal value for `maxConcurrentPartUploads` varies based on your `partSize` and the upload bandwidth to your S3 bucket. A larger partSize means less overall upload bandwidth available for other concurrent uploads. | ||
|
||
- **Lowering the Value**: Reducing `maxConcurrentPartUploads` decreases the number of simultaneous upload requests to S3. This can be beneficial for conserving memory, CPU, and disk I/O resources, especially in environments with limited system resources or where the upload speed it low or the part size is large. | ||
|
||
|
||
- **Increasing the Value**: A higher value potentially enhances the data transfer rate to the server, but at the cost of increased resource usage (memory, CPU, and disk I/O). This can be advantageous when the goal is to maximize throughput, and sufficient system resources are available. | ||
|
||
|
||
- **Bandwidth Considerations**: It's important to note that if your upload bandwidth to S3 is a limiting factor, increasing `maxConcurrentPartUploads` won’t lead to higher throughput. Instead, it will result in additional resource consumption without proportional gains in transfer speed. | ||
|
||
## Extensions | ||
|
||
The tus protocol supports optional [extensions][]. Below is a table of the supported extensions in `@tus/s3-store`. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,7 @@ | ||
import os from 'node:os' | ||
import fs, {promises as fsProm} from 'node:fs' | ||
import stream, {promises as streamProm} from 'node:stream' | ||
import stream, {PassThrough, promises as streamProm} from 'node:stream' | ||
import type {Readable} from 'node:stream' | ||
import http from 'node:http' | ||
|
||
import AWS, {NoSuchKey, NotFound, S3, S3ClientConfig} from '@aws-sdk/client-s3' | ||
import debug from 'debug' | ||
|
@@ -15,7 +14,11 @@ import { | |
TUS_RESUMABLE, | ||
KvStore, | ||
MemoryKvStore, | ||
Semaphore, | ||
Permit, | ||
} from '@tus/server' | ||
import crypto from 'node:crypto' | ||
import path from 'node:path' | ||
|
||
const log = debug('tus-node-server:stores:s3store') | ||
|
||
|
@@ -25,6 +28,7 @@ type Options = { | |
// but may increase it to not exceed the S3 10K parts limit. | ||
partSize?: number | ||
useTags?: boolean | ||
maxConcurrentPartUploads?: number | ||
Murderlon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cache?: KvStore<MetadataValue> | ||
expirationPeriodInMilliseconds?: number | ||
// Options to pass to the AWS S3 SDK. | ||
|
@@ -42,6 +46,45 @@ function calcOffsetFromParts(parts?: Array<AWS.Part>) { | |
return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0 | ||
} | ||
|
||
function concatenateStreams(streams: Readable[]): Readable { | ||
fenos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const passThrough = new PassThrough() | ||
let currentStreamIndex = 0 | ||
let errorOccurred = false | ||
|
||
const pipeNextStream = (): void => { | ||
if (errorOccurred || currentStreamIndex >= streams.length) { | ||
passThrough.end() | ||
return | ||
} | ||
|
||
const stream = streams[currentStreamIndex] | ||
stream.on('error', (err: Error) => { | ||
if (!errorOccurred) { | ||
errorOccurred = true | ||
passThrough.emit('error', err) | ||
passThrough.end() | ||
|
||
// notify the other subsequent streams that an error has occurred | ||
streams.slice(currentStreamIndex + 1).forEach((s) => { | ||
if (typeof s.destroy === 'function') { | ||
s.destroy(err) | ||
} | ||
}) | ||
} | ||
}) | ||
|
||
stream.pipe(passThrough, {end: false}) | ||
stream.on('end', () => { | ||
currentStreamIndex++ | ||
pipeNextStream() | ||
}) | ||
} | ||
|
||
pipeNextStream() | ||
|
||
return passThrough | ||
} | ||
|
||
// Implementation (based on https://github.com/tus/tusd/blob/master/s3store/s3store.go) | ||
// | ||
// Once a new tus upload is initiated, multiple objects in S3 are created: | ||
|
@@ -82,6 +125,7 @@ export class S3Store extends DataStore { | |
private preferredPartSize: number | ||
private expirationPeriodInMilliseconds = 0 | ||
private useTags = true | ||
private semaphore: Semaphore | ||
fenos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public maxMultipartParts = 10_000 as const | ||
public minPartSize = 5_242_880 as const // 5MiB | ||
public maxUploadSize = 5_497_558_138_880 as const // 5TiB | ||
|
@@ -101,8 +145,9 @@ export class S3Store extends DataStore { | |
this.preferredPartSize = partSize || 8 * 1024 * 1024 | ||
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0 | ||
this.useTags = options.useTags ?? true | ||
this.client = new S3(restS3ClientConfig) | ||
this.cache = options.cache ?? new MemoryKvStore<MetadataValue>() | ||
this.client = new S3(restS3ClientConfig) | ||
this.semaphore = new Semaphore(options.maxConcurrentPartUploads ?? 60) | ||
} | ||
|
||
protected shouldUseExpirationTags() { | ||
|
@@ -233,6 +278,61 @@ export class S3Store extends DataStore { | |
return data.ETag as string | ||
} | ||
|
||
private async downloadIncompletePart(id: string) { | ||
const incompletePart = await this.getIncompletePart(id) | ||
|
||
if (!incompletePart) { | ||
return | ||
} | ||
const filePath = await this.uniqueTmpFileName('tus-s3-incomplete-part-') | ||
|
||
try { | ||
let incompletePartSize = 0 | ||
|
||
const byteCounterTransform = new stream.Transform({ | ||
fenos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
transform(chunk, _, callback) { | ||
incompletePartSize += chunk.length | ||
callback(null, chunk) | ||
}, | ||
}) | ||
|
||
// write to temporary file | ||
await streamProm.pipeline( | ||
incompletePart, | ||
byteCounterTransform, | ||
fs.createWriteStream(filePath) | ||
) | ||
|
||
const createReadStream = (options: {cleanUpOnEnd: boolean}) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yhea, this option is more for awareness. It could hide the detail in which the file is being removed to the next developer looking at the code if we simply call: Happy to remove it, but it seems clearer with it |
||
const fileReader = fs.createReadStream(filePath) | ||
|
||
if (options.cleanUpOnEnd) { | ||
fileReader.on('end', () => { | ||
fs.unlink(filePath, () => { | ||
// ignore | ||
}) | ||
}) | ||
|
||
fileReader.on('error', (err) => { | ||
fileReader.destroy(err) | ||
fs.unlink(filePath, () => { | ||
// ignore | ||
}) | ||
}) | ||
} | ||
|
||
return fileReader | ||
} | ||
|
||
return {size: incompletePartSize, path: filePath, createReader: createReadStream} | ||
} catch (err) { | ||
fsProm.rm(filePath).catch(() => { | ||
/* ignore */ | ||
}) | ||
throw err | ||
} | ||
} | ||
|
||
private async getIncompletePart(id: string): Promise<Readable | undefined> { | ||
try { | ||
const data = await this.client.getObject({ | ||
|
@@ -271,102 +371,50 @@ export class S3Store extends DataStore { | |
}) | ||
} | ||
|
||
private async prependIncompletePart( | ||
newChunkPath: string, | ||
previousIncompletePart: Readable | ||
): Promise<number> { | ||
const tempPath = `${newChunkPath}-prepend` | ||
try { | ||
let incompletePartSize = 0 | ||
|
||
const byteCounterTransform = new stream.Transform({ | ||
transform(chunk, _, callback) { | ||
incompletePartSize += chunk.length | ||
callback(null, chunk) | ||
}, | ||
}) | ||
|
||
// write to temporary file, truncating if needed | ||
await streamProm.pipeline( | ||
previousIncompletePart, | ||
byteCounterTransform, | ||
fs.createWriteStream(tempPath) | ||
) | ||
// append to temporary file | ||
await streamProm.pipeline( | ||
fs.createReadStream(newChunkPath), | ||
fs.createWriteStream(tempPath, {flags: 'a'}) | ||
) | ||
// overwrite existing file | ||
await fsProm.rename(tempPath, newChunkPath) | ||
|
||
return incompletePartSize | ||
} catch (err) { | ||
fsProm.rm(tempPath).catch(() => { | ||
/* ignore */ | ||
}) | ||
throw err | ||
} | ||
} | ||
|
||
/** | ||
* Uploads a stream to s3 using multiple parts | ||
*/ | ||
private async processUpload( | ||
private async uploadParts( | ||
metadata: MetadataValue, | ||
readStream: http.IncomingMessage | fs.ReadStream, | ||
readStream: stream.Readable, | ||
currentPartNumber: number, | ||
offset: number | ||
): Promise<number> { | ||
const size = metadata.file.size | ||
const promises: Promise<void>[] = [] | ||
let pendingChunkFilepath: string | null = null | ||
let bytesUploaded = 0 | ||
let currentChunkNumber = 0 | ||
let permit: Permit | undefined = undefined | ||
|
||
const splitterStream = new StreamSplitter({ | ||
chunkSize: this.calcOptimalPartSize(size), | ||
directory: os.tmpdir(), | ||
}) | ||
.on('beforeChunkStarted', async () => { | ||
permit = await this.semaphore.acquire() | ||
}) | ||
.on('chunkStarted', (filepath) => { | ||
pendingChunkFilepath = filepath | ||
}) | ||
.on('chunkFinished', ({path, size: partSize}) => { | ||
pendingChunkFilepath = null | ||
|
||
const partNumber = currentPartNumber++ | ||
const chunkNumber = currentChunkNumber++ | ||
const acquiredPermit = permit | ||
|
||
offset += partSize | ||
|
||
const isFirstChunk = chunkNumber === 0 | ||
const isFinalPart = size === offset | ||
|
||
// eslint-disable-next-line no-async-promise-executor | ||
const deferred = new Promise<void>(async (resolve, reject) => { | ||
try { | ||
let incompletePartSize = 0 | ||
// Only the first chunk of each PATCH request can prepend | ||
// an incomplete part (last chunk) from the previous request. | ||
if (isFirstChunk) { | ||
// If we received a chunk under the minimum part size in a previous iteration, | ||
// we used a regular S3 upload to save it in the bucket. We try to get the incomplete part here. | ||
|
||
const incompletePart = await this.getIncompletePart(metadata.file.id) | ||
if (incompletePart) { | ||
// We found an incomplete part, prepend it to the chunk on disk we were about to upload, | ||
// and delete the incomplete part from the bucket. This can be done in parallel. | ||
incompletePartSize = await this.prependIncompletePart( | ||
path, | ||
incompletePart | ||
) | ||
await this.deleteIncompletePart(metadata.file.id) | ||
} | ||
} | ||
|
||
const readable = fs.createReadStream(path) | ||
readable.on('error', reject) | ||
if (partSize + incompletePartSize >= this.minPartSize || isFinalPart) { | ||
|
||
if (partSize >= this.minPartSize || isFinalPart) { | ||
await this.uploadPart(metadata, readable, partNumber) | ||
} else { | ||
await this.uploadIncompletePart(metadata.file.id, readable) | ||
|
@@ -380,11 +428,15 @@ export class S3Store extends DataStore { | |
fsProm.rm(path).catch(() => { | ||
/* ignore */ | ||
}) | ||
acquiredPermit?.release() | ||
} | ||
}) | ||
|
||
promises.push(deferred) | ||
}) | ||
.on('chunkError', () => { | ||
permit?.release() | ||
}) | ||
|
||
try { | ||
await streamProm.pipeline(readStream, splitterStream) | ||
|
@@ -484,9 +536,11 @@ export class S3Store extends DataStore { | |
else if (size <= this.preferredPartSize * this.maxMultipartParts) { | ||
optimalPartSize = this.preferredPartSize | ||
// The upload is too big for the preferred size. | ||
// We devide the size with the max amount of parts and round it up. | ||
// We divide the size with the max amount of parts and round it up. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no rounding up here happening. The comment likely needs to be updated. |
||
} else if (size % this.maxMultipartParts === 0) { | ||
optimalPartSize = size / this.maxMultipartParts | ||
} else { | ||
optimalPartSize = Math.ceil(size / this.maxMultipartParts) | ||
optimalPartSize = size / this.maxMultipartParts + 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I based this change on this: There is no rounding as far as i can see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Go automatically does rounding :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To add some more explanation to @Murderlon's comment: The code from tusd uses integers. Integer division is different than floating point division in that the fractional is ignored. For example:
That's why you want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, very good point! I totally missed that the data types were ints and not float. Will bring the rounding back :) |
||
} | ||
|
||
return optimalPartSize | ||
|
@@ -533,26 +587,30 @@ export class S3Store extends DataStore { | |
/** | ||
* Write to the file, starting at the provided offset | ||
*/ | ||
public async write( | ||
readable: http.IncomingMessage | fs.ReadStream, | ||
id: string, | ||
offset: number | ||
): Promise<number> { | ||
public async write(src: stream.Readable, id: string, offset: number): Promise<number> { | ||
// Metadata request needs to happen first | ||
const metadata = await this.getMetadata(id) | ||
const parts = await this.retrieveParts(id) | ||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion | ||
const partNumber: number = parts.length > 0 ? parts[parts.length - 1].PartNumber! : 0 | ||
const nextPartNumber = partNumber + 1 | ||
|
||
const bytesUploaded = await this.processUpload( | ||
metadata, | ||
readable, | ||
nextPartNumber, | ||
offset | ||
) | ||
const incompletePart = await this.downloadIncompletePart(id) | ||
const requestedOffset = offset | ||
|
||
if (incompletePart) { | ||
// once the file is on disk, we delete the incomplete part | ||
await this.deleteIncompletePart(id) | ||
|
||
const newOffset = offset + bytesUploaded | ||
offset = requestedOffset - incompletePart.size | ||
src = concatenateStreams([incompletePart.createReader({cleanUpOnEnd: true}), src]) | ||
} | ||
|
||
const bytesUploaded = await this.uploadParts(metadata, src, nextPartNumber, offset) | ||
|
||
// The size of the incomplete part should not be counted, because the | ||
// process of the incomplete part should be fully transparent to the user. | ||
const newOffset = requestedOffset + bytesUploaded - (incompletePart?.size ?? 0) | ||
|
||
if (metadata.file.size === newOffset) { | ||
try { | ||
|
@@ -741,4 +799,29 @@ export class S3Store extends DataStore { | |
|
||
return deleted | ||
} | ||
|
||
private async uniqueTmpFileName(template: string): Promise<string> { | ||
fenos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let tries = 0 | ||
const maxTries = 10 | ||
|
||
while (tries < maxTries) { | ||
const fileName = | ||
template + crypto.randomBytes(10).toString('base64url').slice(0, 10) | ||
const filePath = path.join(os.tmpdir(), fileName) | ||
|
||
try { | ||
await fsProm.lstat(filePath) | ||
// If no error, file exists, so try again | ||
tries++ | ||
} catch (e) { | ||
if (e.code === 'ENOENT') { | ||
// File does not exist, return the path | ||
return filePath | ||
} | ||
throw e // For other errors, rethrow | ||
} | ||
} | ||
|
||
throw new Error(`Could not find a unique file name after ${maxTries} tries`) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be helpful to state in this document what a part actually is. Maybe a brief explanation saying that this module uses S3 multipart uploads, which splits a file into multiple equally-sized parts (independent of the PATCH requests). With this context, it's easier to understand what concurrent part uploads actually means.