Skip to content

Commit

Permalink
Merge pull request #1326 from nextcloud-libraries/fix/batch-upload
Browse files Browse the repository at this point in the history
  • Loading branch information
skjnldsv authored Aug 14, 2024
2 parents 3690ad2 + e4b9d1c commit a4a27b6
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 59 deletions.
30 changes: 23 additions & 7 deletions cypress/components/UploadPicker/UploadPicker.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// eslint-disable-next-line import/no-unresolved,n/no-missing-import
import { Folder, Permission, addNewFileMenuEntry, type Entry } from '@nextcloud/files'
import { generateRemoteUrl } from '@nextcloud/router'
import { UploadPicker, getUploader } from '../../../lib/index.ts'
import { UploadPicker, UploadStatus, getUploader } from '../../../lib/index.ts'

let state: string | undefined
before(() => {
Expand Down Expand Up @@ -347,7 +347,7 @@ describe('Destination management', () => {

cy.wait('@upload').then((upload) => {
expect(upload.request.url).to.have.string(
'/remote.php/dav/files/user/image.jpg'
'/remote.php/dav/files/user/image.jpg',
)
})

Expand All @@ -374,7 +374,7 @@ describe('Destination management', () => {

cy.wait('@upload').then((upload) => {
expect(upload.request.url).to.have.string(
'/remote.php/dav/files/user/Photos/image.jpg'
'/remote.php/dav/files/user/Photos/image.jpg',
)
})
})
Expand Down Expand Up @@ -517,9 +517,17 @@ describe('UploadPicker notify testing', () => {
cy.get('@progress')
.children('progress')
.should('not.have.value', '0')
expect(notify).to.be.calledOnce
expect(listeners.uploaded).to.be.calledOnce
expect(notify).to.be.calledTwice
// The image upload
expect(notify.getCall(0).args[0].file.name).to.eq('image.jpg')
expect(notify.getCall(0).args[0].status).to.eq(UploadStatus.FINISHED)
// The meta upload
expect(notify.getCall(1).args[0].status).to.eq(UploadStatus.FINISHED)
expect(notify.getCall(1).args[0].file.name).to.eq('')
expect(notify.getCall(1).args[0].file.type).to.eq('httpd/unix-directory')
// the listeners
expect(listeners.failed).to.not.be.called
expect(listeners.uploaded).to.be.calledTwice
})
})

Expand Down Expand Up @@ -549,9 +557,17 @@ describe('UploadPicker notify testing', () => {
cy.get('@progress')
.children('progress')
.should('not.have.value', '0')
expect(notify).to.be.calledOnce
expect(notify).to.be.calledTwice
// The image upload
expect(notify.getCall(0).args[0].file.name).to.eq('image.jpg')
expect(notify.getCall(0).args[0].status).to.eq(UploadStatus.FAILED)
// The meta upload
expect(notify.getCall(1).args[0].status).to.eq(UploadStatus.FAILED)
expect(notify.getCall(1).args[0].file.name).to.eq('')
expect(notify.getCall(1).args[0].file.type).to.eq('httpd/unix-directory')
// the listeners
expect(listeners.uploaded).to.not.be.called
expect(listeners.failed).to.be.calledOnce
expect(listeners.failed).to.be.calledTwice
})
})
})
132 changes: 80 additions & 52 deletions lib/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,40 +234,88 @@ export class Uploader {
batchUpload(
destination: string,
files: (File|FileSystemEntry)[],
callback?: (nodes: Array<File|IDirectory>,
currentPath: string) => Promise<Array<File|IDirectory>|false>,
callback?: (nodes: Array<File|IDirectory>, currentPath: string) => Promise<Array<File|IDirectory>|false>,
): PCancelable<Upload[]> {
const rootFolder = new Directory('', files)
if (!callback) {
callback = async (files: Array<File|Directory>) => files
}

try {
// Increase concurrency to 4 to keep 3 parallel uploads as one if blocked by the directory meta-upload
this._jobQueue.concurrency += 1
return new PCancelable(async (resolve, reject, onCancel) => {
// create a meta upload to ensure all ongoing child requests are listed
const upload = new Upload(`${this.root.replace(/\/$/, '')}/${destination.replace(/^\//, '')}`, false, 0, rootFolder)
upload.status = UploadStatus.UPLOADING
this._uploadQueue.push(upload)
try {
// Create the promise for the virtual root directory
const promise = this.uploadDirectory(destination, rootFolder, callback, davGetClient(this.root))
// Make sure to cancel it when requested
onCancel(() => promise.cancel())
// await the uploads and resolve with "finished" status
const uploads = await promise
upload.status = UploadStatus.FINISHED
resolve(uploads)
} catch (error) {
logger.error('Error in batch upload', { error })
upload.status = UploadStatus.FAILED
reject(t('Upload has been cancelled'))
} finally {
this._notifyAll(upload)
this.updateStats()
}
})
}

/**
* Helper to create a directory wrapped inside an Upload class
* @param destination Destination where to create the directory
* @param directory The directory to create
* @param client The cached WebDAV client
*/
private createDirectory(destination: string, directory: Directory, client: WebDAVClient): PCancelable<Upload> {
const folderPath = normalize(`${destination}/${directory.name}`).replace(/\/$/, '')
const rootPath = `${this.root.replace(/\/$/, '')}/${folderPath.replace(/^\//, '')}`

if (!directory.name) {
throw new Error('Can not create empty directory')
}

// Add a new upload to the upload queue
const currentUpload: Upload = new Upload(rootPath, false, 0, directory)
this._uploadQueue.push(currentUpload)

// Return the cancelable promise
return new PCancelable(async (resolve, reject, onCancel) => {
const abort = new AbortController()
onCancel(() => abort.abort())
currentUpload.signal.addEventListener('abort', () => reject(t('Upload has been cancelled')))

return new PCancelable(async (resolve, reject, onCancel) => {
// Add the request to the job queue -> wait for finish to resolve the promise
await this._jobQueue.add(async () => {
currentUpload.status = UploadStatus.UPLOADING
try {
const value = await this._jobQueue.add(() => {
const promise = this.uploadDirectory(destination, rootFolder, callback, davGetClient(this.root))
onCancel(() => promise.cancel())
return promise
})
if (value) {
resolve(value)
}
await client.createDirectory(folderPath, { signal: abort.signal })
resolve(currentUpload)
} catch (error) {
logger.error('Error in batch upload', { error })
if (error && typeof error === 'object' && 'status' in error && error.status === 405) {
// Directory already exists, so just write into it and ignore the error
currentUpload.status = UploadStatus.FINISHED
logger.debug('Directory already exists, writing into it', { directory: directory.name })
} else {
// Another error happened, so abort uploading the directory
currentUpload.status = UploadStatus.FAILED
reject(error)
}
} finally {
// Update statistics
this._notifyAll(currentUpload)
this.updateStats()
}
reject(t('Upload has been cancelled'))
})
} finally {
// Reset concurrency
this._jobQueue.concurrency -= 1
}
})
}

// Helper for uploading directories (recursivly)
// Helper for uploading directories (recursively)
private uploadDirectory(
destination: string,
directory: Directory,
Expand All @@ -276,7 +324,6 @@ export class Uploader {
client: WebDAVClient,
): PCancelable<Upload[]> {
const folderPath = normalize(`${destination}/${directory.name}`).replace(/\/$/, '')
const rootPath = `${this.root.replace(/\/$/, '')}/${folderPath.replace(/^\//, '')}`

return new PCancelable(async (resolve, reject, onCancel) => {
const abort = new AbortController()
Expand All @@ -294,27 +341,19 @@ export class Uploader {

const directories: PCancelable<Upload[]>[] = []
const uploads: PCancelable<Upload>[] = []
const currentUpload: Upload = new Upload(rootPath, false, 0, directory)
currentUpload.signal.addEventListener('abort', () => reject(t('Upload has been cancelled')))
currentUpload.status = UploadStatus.UPLOADING
// Setup abort controller to cancel all child requests
abort.signal.addEventListener('abort', () => {
directories.forEach((upload) => upload.cancel())
uploads.forEach((upload) => upload.cancel())
})

try {
// Wait for own directory to be created (if not the virtual root)
if (directory.name) {
try {
await client.createDirectory(folderPath, { signal: abort.signal })
// We add the "upload" to get some information of changed nodes
uploads.push(new PCancelable((resolve) => resolve(currentUpload!)))
this._uploadQueue.push(currentUpload)
} catch (error) {
if (error && typeof error === 'object' && 'status' in error && error.status === 405) {
// Directory already exists, so just write into it and ignore the error
logger.debug('Directory already exists, writing into it', { directory: directory.name })
} else {
// Another error happend, so abort uploading the directory
throw error
}
}
// If not the virtual root we need to create the directory first before uploading
// Make sure the promise is listed in the final result
uploads.push(this.createDirectory(destination, directory, client) as PCancelable<Upload>)
// Ensure the directory is created before uploading / creating children
await uploads.at(-1)
}

for (const node of selectedForUpload) {
Expand All @@ -325,24 +364,13 @@ export class Uploader {
}
}

abort.signal.addEventListener('abort', () => {
uploads.forEach((upload) => upload.cancel())
directories.forEach((upload) => upload.cancel())
})

const resolvedUploads = await Promise.all(uploads)
const resolvedDirectoryUploads = await Promise.all(directories)
currentUpload.status = UploadStatus.FINISHED
resolve([resolvedUploads, ...resolvedDirectoryUploads].flat())
} catch (e) {
// Ensure a failure cancels all other requests
abort.abort(e)
currentUpload.status = UploadStatus.FAILED
reject(e)
} finally {
if (directory.name) {
this._notifyAll(currentUpload)
this.updateStats()
}
}
})
}
Expand Down

0 comments on commit a4a27b6

Please sign in to comment.