Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@s3/store: add backpressure + download incomplete part first #561

Merged
merged 11 commits into from
Feb 5, 2024
16 changes: 16 additions & 0 deletions packages/s3-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ you need to provide a cache implementation that is shared between all instances

See the exported [KV stores][kvstores] from `@tus/server` for more information.

### `options.maxConcurrentPartUploads`
fenos marked this conversation as resolved.
Show resolved Hide resolved

This setting determines the maximum number of simultaneous part uploads to an S3 storage service.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to state in this document what a part actually is. Maybe a brief explanation saying that this module uses S3 multipart uploads, which splits a file into multiple equally-sized parts (independent of the PATCH requests). With this context, it's easier to understand what concurrent part uploads actually means.

The default value is 60. This default is chosen in conjunction with the typical partSize of 8MiB, aiming for an effective transfer rate of approximately 480MiB/s.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation seems assume that one part is uploaded in 1s. Might be worth mentioning. Also transfer speeds are commonly given in Mbit/s not MiB/s. That would be 3.84 Gbit/s, which seems very high. I am not sure if the transfer speeds to S3 reach that level.

Copy link
Collaborator Author

@fenos fenos Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Traffic between Amazon EC2 and Amazon S3 uses up to 100 Gbps of bandwidth to Amazon Virtual Private Cloud (Amazon VPC) endpoints and public IP addresses in the same AWS Region.

According to google is 12.5 Gbit/s


**Considerations:**
The ideal value for maxConcurrentPartUploads varies based on your `partSize` and the upload bandwidth to your S3 bucket. A larger partSize means less overall upload bandwidth available for other concurrent uploads.

- **Lowering the Value**: Reducing maxConcurrentPartUploads decreases the number of simultaneous upload requests to S3. This can be beneficial for conserving memory, CPU, and disk I/O resources, especially in environments with limited system resources or where the upload speed it low or the part size is large.


- **Increasing the Value**: A higher value potentially enhances the data transfer rate to the server, but at the cost of increased resource usage (memory, CPU, and disk I/O). This can be advantageous when the goal is to maximize throughput, and sufficient system resources are available.


- **Bandwidth Considerations**: It's important to note that if your upload bandwidth to S3 is a limiting factor, increasing maxConcurrentPartUploads won’t lead to higher throughput. Instead, it will result in additional resource consumption without proportional gains in transfer speed.
fenos marked this conversation as resolved.
Show resolved Hide resolved

## Extensions

The tus protocol supports optional [extensions][]. Below is a table of the supported extensions in `@tus/s3-store`.
Expand Down
239 changes: 161 additions & 78 deletions packages/s3-store/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os from 'node:os'
import fs, {promises as fsProm} from 'node:fs'
import stream, {promises as streamProm} from 'node:stream'
import stream, {PassThrough, promises as streamProm} from 'node:stream'
import type {Readable} from 'node:stream'
import http from 'node:http'

import AWS, {NoSuchKey, NotFound, S3, S3ClientConfig} from '@aws-sdk/client-s3'
import debug from 'debug'
Expand All @@ -15,7 +14,11 @@ import {
TUS_RESUMABLE,
KvStore,
MemoryKvStore,
Semaphore,
Permit,
} from '@tus/server'
import crypto from 'node:crypto'
import path from 'node:path'

const log = debug('tus-node-server:stores:s3store')

Expand All @@ -25,6 +28,7 @@ type Options = {
// but may increase it to not exceed the S3 10K parts limit.
partSize?: number
useTags?: boolean
maxConcurrentPartUploads?: number
Murderlon marked this conversation as resolved.
Show resolved Hide resolved
cache?: KvStore<MetadataValue>
expirationPeriodInMilliseconds?: number
// Options to pass to the AWS S3 SDK.
Expand All @@ -42,6 +46,45 @@ function calcOffsetFromParts(parts?: Array<AWS.Part>) {
return parts && parts.length > 0 ? parts.reduce((a, b) => a + b.Size, 0) : 0
}

function concatenateStreams(streams: Readable[]): Readable {
fenos marked this conversation as resolved.
Show resolved Hide resolved
const passThrough = new PassThrough()
let currentStreamIndex = 0
let errorOccurred = false

const pipeNextStream = (): void => {
if (errorOccurred || currentStreamIndex >= streams.length) {
passThrough.end()
return
}

const stream = streams[currentStreamIndex]
stream.on('error', (err: Error) => {
if (!errorOccurred) {
errorOccurred = true
passThrough.emit('error', err)
passThrough.end()

// notify the other subsequent streams that an error has occurred
streams.slice(currentStreamIndex + 1).forEach((s) => {
if (typeof s.destroy === 'function') {
s.destroy(err)
}
})
}
})

stream.pipe(passThrough, {end: false})
stream.on('end', () => {
currentStreamIndex++
pipeNextStream()
})
}

pipeNextStream()

return passThrough
}

// Implementation (based on https://github.com/tus/tusd/blob/master/s3store/s3store.go)
//
// Once a new tus upload is initiated, multiple objects in S3 are created:
Expand Down Expand Up @@ -82,6 +125,7 @@ export class S3Store extends DataStore {
private preferredPartSize: number
private expirationPeriodInMilliseconds = 0
private useTags = true
private semaphore: Semaphore
fenos marked this conversation as resolved.
Show resolved Hide resolved
public maxMultipartParts = 10_000 as const
public minPartSize = 5_242_880 as const // 5MiB
public maxUploadSize = 5_497_558_138_880 as const // 5TiB
Expand All @@ -101,8 +145,9 @@ export class S3Store extends DataStore {
this.preferredPartSize = partSize || 8 * 1024 * 1024
this.expirationPeriodInMilliseconds = options.expirationPeriodInMilliseconds ?? 0
this.useTags = options.useTags ?? true
this.client = new S3(restS3ClientConfig)
this.cache = options.cache ?? new MemoryKvStore<MetadataValue>()
this.client = new S3(restS3ClientConfig)
this.semaphore = new Semaphore(options.maxConcurrentPartUploads ?? 60)
}

protected shouldUseExpirationTags() {
Expand Down Expand Up @@ -233,6 +278,61 @@ export class S3Store extends DataStore {
return data.ETag as string
}

private async downloadIncompletePart(id: string) {
const incompletePart = await this.getIncompletePart(id)

if (!incompletePart) {
return
}
const filePath = await this.uniqueTmpFileName('tus-s3-incomplete-part-')

try {
let incompletePartSize = 0

const byteCounterTransform = new stream.Transform({
fenos marked this conversation as resolved.
Show resolved Hide resolved
transform(chunk, _, callback) {
incompletePartSize += chunk.length
callback(null, chunk)
},
})

// write to temporary file
await streamProm.pipeline(
incompletePart,
byteCounterTransform,
fs.createWriteStream(filePath)
)

const createReadStream = (options: {cleanUpOnEnd: boolean}) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanUpOnEnd seems to always be true. Do we need this option then?

Copy link
Collaborator Author

@fenos fenos Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yhea, this option is more for awareness.

It could hide the detail in which the file is being removed to the next developer looking at the code if we simply call: createReadStream() without the option.

Happy to remove it, but it seems clearer with it

const fileReader = fs.createReadStream(filePath)

if (options.cleanUpOnEnd) {
fileReader.on('end', () => {
fs.unlink(filePath, () => {
// ignore
})
})

fileReader.on('error', (err) => {
fileReader.destroy(err)
fs.unlink(filePath, () => {
// ignore
})
})
}

return fileReader
}

return {size: incompletePartSize, path: filePath, createReader: createReadStream}
} catch (err) {
fsProm.rm(filePath).catch(() => {
/* ignore */
})
throw err
}
}

private async getIncompletePart(id: string): Promise<Readable | undefined> {
try {
const data = await this.client.getObject({
Expand Down Expand Up @@ -271,102 +371,50 @@ export class S3Store extends DataStore {
})
}

private async prependIncompletePart(
newChunkPath: string,
previousIncompletePart: Readable
): Promise<number> {
const tempPath = `${newChunkPath}-prepend`
try {
let incompletePartSize = 0

const byteCounterTransform = new stream.Transform({
transform(chunk, _, callback) {
incompletePartSize += chunk.length
callback(null, chunk)
},
})

// write to temporary file, truncating if needed
await streamProm.pipeline(
previousIncompletePart,
byteCounterTransform,
fs.createWriteStream(tempPath)
)
// append to temporary file
await streamProm.pipeline(
fs.createReadStream(newChunkPath),
fs.createWriteStream(tempPath, {flags: 'a'})
)
// overwrite existing file
await fsProm.rename(tempPath, newChunkPath)

return incompletePartSize
} catch (err) {
fsProm.rm(tempPath).catch(() => {
/* ignore */
})
throw err
}
}

/**
* Uploads a stream to s3 using multiple parts
*/
private async processUpload(
private async uploadParts(
metadata: MetadataValue,
readStream: http.IncomingMessage | fs.ReadStream,
readStream: stream.Readable,
currentPartNumber: number,
offset: number
): Promise<number> {
const size = metadata.file.size
const promises: Promise<void>[] = []
let pendingChunkFilepath: string | null = null
let bytesUploaded = 0
let currentChunkNumber = 0
let permit: Permit | undefined = undefined

const splitterStream = new StreamSplitter({
chunkSize: this.calcOptimalPartSize(size),
directory: os.tmpdir(),
})
.on('beforeChunkStarted', async () => {
permit = await this.semaphore.acquire()
})
.on('chunkStarted', (filepath) => {
pendingChunkFilepath = filepath
})
.on('chunkFinished', ({path, size: partSize}) => {
pendingChunkFilepath = null

const partNumber = currentPartNumber++
const chunkNumber = currentChunkNumber++
const acquiredPermit = permit

offset += partSize

const isFirstChunk = chunkNumber === 0
const isFinalPart = size === offset

// eslint-disable-next-line no-async-promise-executor
const deferred = new Promise<void>(async (resolve, reject) => {
try {
let incompletePartSize = 0
// Only the first chunk of each PATCH request can prepend
// an incomplete part (last chunk) from the previous request.
if (isFirstChunk) {
// If we received a chunk under the minimum part size in a previous iteration,
// we used a regular S3 upload to save it in the bucket. We try to get the incomplete part here.

const incompletePart = await this.getIncompletePart(metadata.file.id)
if (incompletePart) {
// We found an incomplete part, prepend it to the chunk on disk we were about to upload,
// and delete the incomplete part from the bucket. This can be done in parallel.
incompletePartSize = await this.prependIncompletePart(
path,
incompletePart
)
await this.deleteIncompletePart(metadata.file.id)
}
}

const readable = fs.createReadStream(path)
readable.on('error', reject)
if (partSize + incompletePartSize >= this.minPartSize || isFinalPart) {

if (partSize >= this.minPartSize || isFinalPart) {
await this.uploadPart(metadata, readable, partNumber)
} else {
await this.uploadIncompletePart(metadata.file.id, readable)
Expand All @@ -380,11 +428,15 @@ export class S3Store extends DataStore {
fsProm.rm(path).catch(() => {
/* ignore */
})
acquiredPermit?.release()
}
})

promises.push(deferred)
})
.on('chunkError', () => {
permit?.release()
})

try {
await streamProm.pipeline(readStream, splitterStream)
Expand Down Expand Up @@ -484,9 +536,11 @@ export class S3Store extends DataStore {
else if (size <= this.preferredPartSize * this.maxMultipartParts) {
optimalPartSize = this.preferredPartSize
// The upload is too big for the preferred size.
// We devide the size with the max amount of parts and round it up.
// We divide the size with the max amount of parts and round it up.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no rounding up here happening. The comment likely needs to be updated.

} else if (size % this.maxMultipartParts === 0) {
optimalPartSize = size / this.maxMultipartParts
} else {
optimalPartSize = Math.ceil(size / this.maxMultipartParts)
optimalPartSize = size / this.maxMultipartParts + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the Math.ceil removed? If we reach this conditional block, we know that size / this.maxMultipartParts is not an integer, so rounding it would be sense to get rid of the fractional part.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I based this change on this:
https://github.com/tus/tusd/blob/main/pkg/s3store/s3store.go#L1176

There is no rounding as far as i can see

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go automatically does rounding :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add some more explanation to @Murderlon's comment: The code from tusd uses integers. Integer division is different than floating point division in that the fractional is ignored. For example:

14 / 5 == 2 // integer division
14.0 / 5 == 2.8 // float divison

That's why you want Math.floor when trying to replicate the Go logic, or Math.ceil when you want to replicate the integer division + 1 from tusd.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, very good point! I totally missed that the data types were ints and not float. Will bring the rounding back :)

}

return optimalPartSize
Expand Down Expand Up @@ -533,26 +587,30 @@ export class S3Store extends DataStore {
/**
* Write to the file, starting at the provided offset
*/
public async write(
readable: http.IncomingMessage | fs.ReadStream,
id: string,
offset: number
): Promise<number> {
public async write(src: stream.Readable, id: string, offset: number): Promise<number> {
// Metadata request needs to happen first
const metadata = await this.getMetadata(id)
const parts = await this.retrieveParts(id)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const partNumber: number = parts.length > 0 ? parts[parts.length - 1].PartNumber! : 0
const nextPartNumber = partNumber + 1

const bytesUploaded = await this.processUpload(
metadata,
readable,
nextPartNumber,
offset
)
const incompletePart = await this.downloadIncompletePart(id)
const requestedOffset = offset

if (incompletePart) {
// once the file is on disk, we delete the incomplete part
await this.deleteIncompletePart(id)

const newOffset = offset + bytesUploaded
offset = requestedOffset - incompletePart.size
src = concatenateStreams([incompletePart.createReader({cleanUpOnEnd: true}), src])
}

const bytesUploaded = await this.uploadParts(metadata, src, nextPartNumber, offset)

// The size of the incomplete part should not be counted, because the
// process of the incomplete part should be fully transparent to the user.
const newOffset = requestedOffset + bytesUploaded - (incompletePart?.size ?? 0)

if (metadata.file.size === newOffset) {
try {
Expand Down Expand Up @@ -741,4 +799,29 @@ export class S3Store extends DataStore {

return deleted
}

private async uniqueTmpFileName(template: string): Promise<string> {
fenos marked this conversation as resolved.
Show resolved Hide resolved
let tries = 0
const maxTries = 10

while (tries < maxTries) {
const fileName =
template + crypto.randomBytes(10).toString('base64url').slice(0, 10)
const filePath = path.join(os.tmpdir(), fileName)

try {
await fsProm.lstat(filePath)
// If no error, file exists, so try again
tries++
} catch (e) {
if (e.code === 'ENOENT') {
// File does not exist, return the path
return filePath
}
throw e // For other errors, rethrow
}
}

throw new Error(`Could not find a unique file name after ${maxTries} tries`)
}
}
Loading
Loading