Skip to content

Commit

Permalink
Extract bundle files in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
iychoi committed Feb 15, 2023
1 parent 08b7ae5 commit f58e15a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
7 changes: 5 additions & 2 deletions cmd/subcmd/bput.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ func processBputCommand(command *cobra.Command, args []string) error {
logger.Debugf("use staging dir - %s", irodsTempDirPath)

// clean up staging dir in the target dir
unusedStagingDir := commons.GetDefaultStagingDirInTargetPath(targetPath)
defer filesystem.RemoveDir(unusedStagingDir, true, true)
defer func() {
unusedStagingDir := commons.GetDefaultStagingDirInTargetPath(targetPath)
logger.Debugf("delete staging dir - %s", unusedStagingDir)
filesystem.RemoveDir(unusedStagingDir, true, true)
}()

bundleTransferManager := commons.NewBundleTransferManager(filesystem, targetPath, maxFileNum, maxFileSize, localTempDirPath, irodsTempDirPath, diff, noHash, progress)
bundleTransferManager.Start()
Expand Down
46 changes: 29 additions & 17 deletions commons/bundletransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ func (manager *BundleTransferManager) Start() {
// --> remove ------------>

go func() {
logger.Debug("start transfer thread 1")
defer logger.Debug("exit transfer thread 1")
logger.Debug("start input thread")
defer logger.Debug("exit input thread")

defer close(processBundleTarChan)
defer close(processBundleRemoveFilesChan)
Expand Down Expand Up @@ -477,8 +477,8 @@ func (manager *BundleTransferManager) Start() {

// process bundle - tar
go func() {
logger.Debug("start transfer thread 2")
defer logger.Debug("exit transfer thread 2")
logger.Debug("start bundle thread")
defer logger.Debug("exit bundle thread")

defer close(processBundleUploadChan)

Expand Down Expand Up @@ -513,8 +513,8 @@ func (manager *BundleTransferManager) Start() {

// process bundle - upload
funcAsyncUpload := func(id int, wg *sync.WaitGroup) {
logger.Debugf("start transfer thread 3-%d", id)
defer logger.Debugf("exit transfer thread 3-%d", id)
logger.Debugf("start transfer thread %d", id)
defer logger.Debugf("exit transfer thread %d", id)

defer wg.Done()

Expand Down Expand Up @@ -565,8 +565,8 @@ func (manager *BundleTransferManager) Start() {

// process bundle - remove files
go func() {
logger.Debug("start transfer thread 4")
defer logger.Debug("exit transfer thread 4")
logger.Debug("start stale file remove thread")
defer logger.Debug("exit stale file remove thread")

defer close(processBundleExtractChan2)

Expand Down Expand Up @@ -600,15 +600,15 @@ func (manager *BundleTransferManager) Start() {
}()

// process bundle - extract
go func() {
logger.Debug("start transfer thread 5")
defer logger.Debug("exit transfer thread 5")
// order may be different
removeTaskCompleted := map[int64]int{}
removeTaskCompletedMutex := sync.Mutex{}

defer manager.endProgress()
funcAsyncExtract := func(id int, wg *sync.WaitGroup) {
logger.Debugf("start extract thread %d", id)
defer logger.Debugf("exit extract thread %d", id)

// order may be different
removeTaskCompleted := map[int64]int{}
removeTaskCompletedMutex := sync.Mutex{}
defer wg.Done()

for {
select {
Expand Down Expand Up @@ -649,7 +649,7 @@ func (manager *BundleTransferManager) Start() {
}
}

defer manager.transferWait.Done()
manager.transferWait.Done()
} else {
removeTaskCompleted[bundle1.index] = 1
removeTaskCompletedMutex.Unlock()
Expand Down Expand Up @@ -697,7 +697,7 @@ func (manager *BundleTransferManager) Start() {
}
}

defer manager.transferWait.Done()
manager.transferWait.Done()
} else {
removeTaskCompleted[bundle2.index] = 1
removeTaskCompletedMutex.Unlock()
Expand All @@ -713,6 +713,18 @@ func (manager *BundleTransferManager) Start() {
return
}
}
}

waitAsyncExtract := sync.WaitGroup{}
for i := 0; i < 3; i++ {
waitAsyncExtract.Add(1)
go funcAsyncExtract(i, &waitAsyncExtract)
}

go func() {
waitAsyncExtract.Wait()

manager.endProgress()
}()
}

Expand Down

0 comments on commit f58e15a

Please sign in to comment.