diff --git a/cmd/subcmd/bput.go b/cmd/subcmd/bput.go index acc6a84..d6a1acf 100644 --- a/cmd/subcmd/bput.go +++ b/cmd/subcmd/bput.go @@ -180,8 +180,11 @@ func processBputCommand(command *cobra.Command, args []string) error { logger.Debugf("use staging dir - %s", irodsTempDirPath) // clean up staging dir in the target dir - unusedStagingDir := commons.GetDefaultStagingDirInTargetPath(targetPath) - defer filesystem.RemoveDir(unusedStagingDir, true, true) + defer func() { + unusedStagingDir := commons.GetDefaultStagingDirInTargetPath(targetPath) + logger.Debugf("delete staging dir - %s", unusedStagingDir) + filesystem.RemoveDir(unusedStagingDir, true, true) + }() bundleTransferManager := commons.NewBundleTransferManager(filesystem, targetPath, maxFileNum, maxFileSize, localTempDirPath, irodsTempDirPath, diff, noHash, progress) bundleTransferManager.Start() diff --git a/commons/bundletransfer.go b/commons/bundletransfer.go index ef4c5d8..8992713 100644 --- a/commons/bundletransfer.go +++ b/commons/bundletransfer.go @@ -446,8 +446,8 @@ func (manager *BundleTransferManager) Start() { // --> remove ------------> go func() { - logger.Debug("start transfer thread 1") - defer logger.Debug("exit transfer thread 1") + logger.Debug("start input thread") + defer logger.Debug("exit input thread") defer close(processBundleTarChan) defer close(processBundleRemoveFilesChan) @@ -477,8 +477,8 @@ func (manager *BundleTransferManager) Start() { // process bundle - tar go func() { - logger.Debug("start transfer thread 2") - defer logger.Debug("exit transfer thread 2") + logger.Debug("start bundle thread") + defer logger.Debug("exit bundle thread") defer close(processBundleUploadChan) @@ -513,8 +513,8 @@ func (manager *BundleTransferManager) Start() { // process bundle - upload funcAsyncUpload := func(id int, wg *sync.WaitGroup) { - logger.Debugf("start transfer thread 3-%d", id) - defer logger.Debugf("exit transfer thread 3-%d", id) + logger.Debugf("start transfer thread %d", id) + defer logger.Debugf("exit transfer thread %d", id) defer wg.Done() @@ -565,8 +565,8 @@ func (manager *BundleTransferManager) Start() { // process bundle - remove files go func() { - logger.Debug("start transfer thread 4") - defer logger.Debug("exit transfer thread 4") + logger.Debug("start stale file remove thread") + defer logger.Debug("exit stale file remove thread") defer close(processBundleExtractChan2) @@ -600,15 +600,15 @@ func (manager *BundleTransferManager) Start() { }() // process bundle - extract - go func() { - logger.Debug("start transfer thread 5") - defer logger.Debug("exit transfer thread 5") + // order may be different + removeTaskCompleted := map[int64]int{} + removeTaskCompletedMutex := sync.Mutex{} - defer manager.endProgress() + funcAsyncExtract := func(id int, wg *sync.WaitGroup) { + logger.Debugf("start extract thread %d", id) + defer logger.Debugf("exit extract thread %d", id) - // order may be different - removeTaskCompleted := map[int64]int{} - removeTaskCompletedMutex := sync.Mutex{} + defer wg.Done() for { select { @@ -649,7 +649,7 @@ func (manager *BundleTransferManager) Start() { } } - defer manager.transferWait.Done() + manager.transferWait.Done() } else { removeTaskCompleted[bundle1.index] = 1 removeTaskCompletedMutex.Unlock() @@ -697,7 +697,7 @@ func (manager *BundleTransferManager) Start() { } } - defer manager.transferWait.Done() + manager.transferWait.Done() } else { removeTaskCompleted[bundle2.index] = 1 removeTaskCompletedMutex.Unlock() @@ -713,6 +713,18 @@ func (manager *BundleTransferManager) Start() { return } } + } + + waitAsyncExtract := sync.WaitGroup{} + for i := 0; i < 3; i++ { + waitAsyncExtract.Add(1) + go funcAsyncExtract(i, &waitAsyncExtract) + } + + go func() { + waitAsyncExtract.Wait() + + manager.endProgress() }() }