Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Jan 22, 2024
1 parent 6235da5 commit 7ccd14f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 52 deletions.
55 changes: 24 additions & 31 deletions packages/s3-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,22 +389,14 @@ export class S3Store extends DataStore {
const splitterStream = new StreamSplitter({
chunkSize: this.calcOptimalPartSize(size),
directory: os.tmpdir(),
asyncEvents: true,
})
.on('beforeChunkStarted', (_, done) => {
this.semaphore
.acquire()
.catch(done)
.then((acquiredPermit) => {
permit = acquiredPermit
done()
})
.on('beforeChunkStarted', async () => {
permit = await this.semaphore.acquire()
})
.on('chunkStarted', (filepath, done) => {
.on('chunkStarted', (filepath) => {
pendingChunkFilepath = filepath
done()
})
.on('chunkFinished', ({path, size: partSize}, done) => {
.on('chunkFinished', ({path, size: partSize}) => {
pendingChunkFilepath = null

const partNumber = currentPartNumber++
Expand Down Expand Up @@ -441,11 +433,9 @@ export class S3Store extends DataStore {
})

promises.push(deferred)
done()
})
.on('chunkError', (_, done) => {
.on('chunkError', () => {
permit?.release()
done()
})

try {
Expand Down Expand Up @@ -811,24 +801,27 @@ export class S3Store extends DataStore {
}

private async uniqueTmpFileName(template: string): Promise<string> {
const fileName = template + crypto.randomBytes(10).toString('base64url').slice(0, 10)
const filePath = path.join(os.tmpdir(), fileName)
let tries = 0
const maxTries = 10

const fileExists = await new Promise<boolean>((resolve, reject) => {
fs.lstat(filePath, (error) => {
if (!error) {
return resolve(true)
}
if (error.code === 'ENOENT') {
return resolve(false)
}
reject(error)
})
})
while (tries < maxTries) {
const fileName =
template + crypto.randomBytes(10).toString('base64url').slice(0, 10)
const filePath = path.join(os.tmpdir(), fileName)

if (fileExists) {
return this.uniqueTmpFileName(template)
try {
await fsProm.lstat(filePath)
// If no error, file exists, so try again
tries++
} catch (e) {
if (e.code === 'ENOENT') {
// File does not exist, return the path
return filePath
}
throw e // For other errors, rethrow
}
}
return filePath

throw new Error(`Could not find a unique file name after ${maxTries} tries`)
}
}
25 changes: 4 additions & 21 deletions packages/server/src/models/StreamSplitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ function randomString(size: number) {
type Options = {
chunkSize: number
directory: string
asyncEvents?: boolean
}

type Callback = (error: Error | null) => void
Expand All @@ -24,12 +23,8 @@ export class StreamSplitter extends stream.Writable {
filenameTemplate: string
chunkSize: Options['chunkSize']
part: number
asyncEvents?: boolean

constructor(
{chunkSize, directory, asyncEvents}: Options,
options?: stream.WritableOptions
) {
constructor({chunkSize, directory}: Options, options?: stream.WritableOptions) {
super(options)
this.chunkSize = chunkSize
this.currentChunkPath = null
Expand All @@ -38,7 +33,6 @@ export class StreamSplitter extends stream.Writable {
this.directory = directory
this.filenameTemplate = randomString(10)
this.part = 0
this.asyncEvents = asyncEvents

this.on('error', this._handleError.bind(this))
}
Expand Down Expand Up @@ -126,20 +120,9 @@ export class StreamSplitter extends stream.Writable {
}

async emitEvent<T>(name: string, payload: T) {
if (this.asyncEvents) {
await new Promise<void>((resolve, reject) => {
const doneCb = (err?: unknown) => {
if (err) {
reject(err)
return
}
resolve()
}

this.emit(name, payload, doneCb)
})
} else {
this.emit(name, payload)
const listeners = this.listeners(name)
for await (const listener of listeners) {
await listener(payload)
}
}

Expand Down

0 comments on commit 7ccd14f

Please sign in to comment.