diff --git a/commons/bundletransfer.go b/commons/bundletransfer.go index 8659edd..a437da6 100644 --- a/commons/bundletransfer.go +++ b/commons/bundletransfer.go @@ -21,7 +21,7 @@ import ( const ( MaxBundleFileNumDefault int = 50 MaxBundleFileSizeDefault int64 = 1 * 1024 * 1024 * 1024 // 1GB - MinBundleFileNumDefault int = 5 + MinBundleFileNumDefault int = 3 ) const ( @@ -877,73 +877,48 @@ func (manager *BundleTransferManager) processBundleUpload(bundle *Bundle) error manager.progress(progressName, 0, totalFileSize, progress.UnitsBytes, false) } - var asyncErr error - wg := sync.WaitGroup{} - - for fileIdx := range bundle.files { - // this is for safe access to file in the bundle array - file := bundle.files[fileIdx] - wg.Add(1) - - go func(progressIdx int) { - defer wg.Done() - - var callbackFileUpload func(processed int64, total int64) - if manager.showProgress { - callbackFileUpload = func(processed int64, total int64) { - fileUploadProgressMutex.Lock() - defer fileUploadProgressMutex.Unlock() - - fileUploadProgress[progressIdx] = processed - - progressSum := int64(0) - for _, progress := range fileUploadProgress { - progressSum += progress - } - - manager.progress(progressName, progressSum, totalFileSize, progress.UnitsBytes, false) - } - } + for fileIdx, file := range bundle.files { + var callbackFileUpload func(processed int64, total int64) + if manager.showProgress { + callbackFileUpload = func(processed int64, total int64) { + fileUploadProgressMutex.Lock() + defer fileUploadProgressMutex.Unlock() - if !ExistsIRODSDir(manager.filesystem, path.Dir(file.IRODSPath)) { - // if parent dir does not exist, create - err := manager.filesystem.MakeDir(path.Dir(file.IRODSPath), true) - if err != nil { - if manager.showProgress { - manager.progress(progressName, -1, totalFileSize, progress.UnitsBytes, true) - } + fileUploadProgress[fileIdx] = processed - err = xerrors.Errorf("failed to create a dir %s to upload file %s in bundle %d to %s: %w", path.Dir(file.IRODSPath), file.LocalPath, bundle.index, file.IRODSPath, err) - logger.Error(err) - asyncErr = err - //return err - return + progressSum := int64(0) + for _, progress := range fileUploadProgress { + progressSum += progress } - ClearIRODSDirCache(manager.filesystem, path.Dir(file.IRODSPath)) + manager.progress(progressName, progressSum, totalFileSize, progress.UnitsBytes, false) } + } - err := manager.filesystem.UploadFileParallel(file.LocalPath, file.IRODSPath, "", 0, manager.replication, callbackFileUpload) + if !ExistsIRODSDir(manager.filesystem, path.Dir(file.IRODSPath)) { + // if parent dir does not exist, create + err := manager.filesystem.MakeDir(path.Dir(file.IRODSPath), true) if err != nil { if manager.showProgress { manager.progress(progressName, -1, totalFileSize, progress.UnitsBytes, true) } - err = xerrors.Errorf("failed to upload file %s in bundle %d to %s: %w", file.LocalPath, bundle.index, file.IRODSPath, err) - logger.Error(err) - asyncErr = err - //return err - return + return xerrors.Errorf("failed to create a dir %s to upload file %s in bundle %d to %s: %w", path.Dir(file.IRODSPath), file.LocalPath, bundle.index, file.IRODSPath, err) } - logger.Debugf("uploaded file %s in bundle %d to %s", file.LocalPath, bundle.index, file.IRODSPath) - }(fileIdx) - } + ClearIRODSDirCache(manager.filesystem, path.Dir(file.IRODSPath)) + } + + err := manager.filesystem.UploadFileParallel(file.LocalPath, file.IRODSPath, "", 0, manager.replication, callbackFileUpload) + if err != nil { + if manager.showProgress { + manager.progress(progressName, -1, totalFileSize, progress.UnitsBytes, true) + } - wg.Wait() + return xerrors.Errorf("failed to upload file %s in bundle %d to %s: %w", file.LocalPath, bundle.index, file.IRODSPath, err) + } - if asyncErr != nil { - return asyncErr + logger.Debugf("uploaded file %s in bundle %d to %s", file.LocalPath, bundle.index, file.IRODSPath) } }