Skip to content

Commit

Permalink
upload files in a bundle one-by-one to avoid race
Browse files Browse the repository at this point in the history
  • Loading branch information
iychoi committed Mar 1, 2023
1 parent b2f5c76 commit a5eea3f
Showing 1 changed file with 28 additions and 53 deletions.
81 changes: 28 additions & 53 deletions commons/bundletransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
const (
MaxBundleFileNumDefault int = 50
MaxBundleFileSizeDefault int64 = 1 * 1024 * 1024 * 1024 // 1GB
MinBundleFileNumDefault int = 5
MinBundleFileNumDefault int = 3
)

const (
Expand Down Expand Up @@ -877,73 +877,48 @@ func (manager *BundleTransferManager) processBundleUpload(bundle *Bundle) error
manager.progress(progressName, 0, totalFileSize, progress.UnitsBytes, false)
}

var asyncErr error
wg := sync.WaitGroup{}

for fileIdx := range bundle.files {
// this is for safe access to file in the bundle array
file := bundle.files[fileIdx]
wg.Add(1)

go func(progressIdx int) {
defer wg.Done()

var callbackFileUpload func(processed int64, total int64)
if manager.showProgress {
callbackFileUpload = func(processed int64, total int64) {
fileUploadProgressMutex.Lock()
defer fileUploadProgressMutex.Unlock()

fileUploadProgress[progressIdx] = processed

progressSum := int64(0)
for _, progress := range fileUploadProgress {
progressSum += progress
}

manager.progress(progressName, progressSum, totalFileSize, progress.UnitsBytes, false)
}
}
for fileIdx, file := range bundle.files {
var callbackFileUpload func(processed int64, total int64)
if manager.showProgress {
callbackFileUpload = func(processed int64, total int64) {
fileUploadProgressMutex.Lock()
defer fileUploadProgressMutex.Unlock()

if !ExistsIRODSDir(manager.filesystem, path.Dir(file.IRODSPath)) {
// if parent dir does not exist, create
err := manager.filesystem.MakeDir(path.Dir(file.IRODSPath), true)
if err != nil {
if manager.showProgress {
manager.progress(progressName, -1, totalFileSize, progress.UnitsBytes, true)
}
fileUploadProgress[fileIdx] = processed

err = xerrors.Errorf("failed to create a dir %s to upload file %s in bundle %d to %s: %w", path.Dir(file.IRODSPath), file.LocalPath, bundle.index, file.IRODSPath, err)
logger.Error(err)
asyncErr = err
//return err
return
progressSum := int64(0)
for _, progress := range fileUploadProgress {
progressSum += progress
}

ClearIRODSDirCache(manager.filesystem, path.Dir(file.IRODSPath))
manager.progress(progressName, progressSum, totalFileSize, progress.UnitsBytes, false)
}
}

err := manager.filesystem.UploadFileParallel(file.LocalPath, file.IRODSPath, "", 0, manager.replication, callbackFileUpload)
if !ExistsIRODSDir(manager.filesystem, path.Dir(file.IRODSPath)) {
// if parent dir does not exist, create
err := manager.filesystem.MakeDir(path.Dir(file.IRODSPath), true)
if err != nil {
if manager.showProgress {
manager.progress(progressName, -1, totalFileSize, progress.UnitsBytes, true)
}

err = xerrors.Errorf("failed to upload file %s in bundle %d to %s: %w", file.LocalPath, bundle.index, file.IRODSPath, err)
logger.Error(err)
asyncErr = err
//return err
return
return xerrors.Errorf("failed to create a dir %s to upload file %s in bundle %d to %s: %w", path.Dir(file.IRODSPath), file.LocalPath, bundle.index, file.IRODSPath, err)
}

logger.Debugf("uploaded file %s in bundle %d to %s", file.LocalPath, bundle.index, file.IRODSPath)
}(fileIdx)
}
ClearIRODSDirCache(manager.filesystem, path.Dir(file.IRODSPath))
}

err := manager.filesystem.UploadFileParallel(file.LocalPath, file.IRODSPath, "", 0, manager.replication, callbackFileUpload)
if err != nil {
if manager.showProgress {
manager.progress(progressName, -1, totalFileSize, progress.UnitsBytes, true)
}

wg.Wait()
return xerrors.Errorf("failed to upload file %s in bundle %d to %s: %w", file.LocalPath, bundle.index, file.IRODSPath, err)
}

if asyncErr != nil {
return asyncErr
logger.Debugf("uploaded file %s in bundle %d to %s", file.LocalPath, bundle.index, file.IRODSPath)
}
}

Expand Down

0 comments on commit a5eea3f

Please sign in to comment.