Skip to content

Commit

Permalink
Add new flags 'upload_thread_num', 'download_thread_num', and 'tcp_bu…
Browse files Browse the repository at this point in the history
…ffer_size' for transfer control
  • Loading branch information
iychoi committed Mar 13, 2023
1 parent acfad64 commit 9210aef
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 46 deletions.
21 changes: 16 additions & 5 deletions cmd/subcmd/bput.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func AddBputCommand(rootCmd *cobra.Command) {
// unused, but required for compatibility with retry
bputCmd.Flags().BoolP("force", "f", false, "unused")
bputCmd.Flags().MarkHidden("force")

bputCmd.Flags().Bool("clear_leftover", false, "Clear leftover bundle files")
bputCmd.Flags().Int("max_file_num", commons.MaxBundleFileNumDefault, "Specify max file number in a bundle file")
bputCmd.Flags().Int64("max_file_size", commons.MaxBundleFileSizeDefault, "Specify max file size of a bundle file")
bputCmd.Flags().Int("upload_thread_num", commons.UploadTreadNumDefault, "Specify the number of upload threads")
bputCmd.Flags().String("max_file_size", strconv.FormatInt(commons.MaxBundleFileSizeDefault, 10), "Specify max file size of a bundle file (default is 2GB)")
bputCmd.Flags().Int("upload_thread_num", commons.UploadTreadNumDefault, "Specify the number of upload threads (default is 5)")
bputCmd.Flags().String("tcp_buffer_size", strconv.Itoa(commons.TcpBufferSizeDefault), "Specify TCP socket buffer size (default is 4MB)")
bputCmd.Flags().Bool("progress", false, "Display progress bars")
bputCmd.Flags().String("local_temp", os.TempDir(), "Specify local temp directory path to create bundle files")
bputCmd.Flags().String("irods_temp", "", "Specify iRODS temp directory path to upload bundle files to")
Expand Down Expand Up @@ -85,7 +85,7 @@ func processBputCommand(command *cobra.Command, args []string) error {
maxFileSize := commons.MaxBundleFileSizeDefault
maxFileSizeFlag := command.Flags().Lookup("max_file_size")
if maxFileSizeFlag != nil {
n, err := strconv.ParseInt(maxFileSizeFlag.Value.String(), 10, 64)
n, err := commons.ParseSize(maxFileSizeFlag.Value.String())
if err == nil {
maxFileSize = n
}
Expand All @@ -100,6 +100,17 @@ func processBputCommand(command *cobra.Command, args []string) error {
}
}

maxConnectionNum := uploadThreadNum + 2 + 2 // 2 for metadata op, 2 for extraction

tcpBufferSize := commons.TcpBufferSizeDefault
tcpBufferSizeFlag := command.Flags().Lookup("tcp_buffer_size")
if tcpBufferSizeFlag != nil {
n, err := commons.ParseSize(tcpBufferSizeFlag.Value.String())
if err == nil {
tcpBufferSize = int(n)
}
}

progress := false
progressFlag := command.Flags().Lookup("progress")
if progressFlag != nil {
Expand Down Expand Up @@ -189,7 +200,7 @@ func processBputCommand(command *cobra.Command, args []string) error {

// Create a file system
account := commons.GetAccount()
filesystem, err := commons.GetIRODSFSClient(account)
filesystem, err := commons.GetIRODSFSClientAdvanced(account, maxConnectionNum, tcpBufferSize)
if err != nil {
return xerrors.Errorf("failed to get iRODS FS Client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/subcmd/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func processCpCommand(command *cobra.Command, args []string) error {
targetPath := args[len(args)-1]
sourcePaths := args[:len(args)-1]

parallelJobManager := commons.NewParallelJobManager(filesystem, commons.MaxThreadNumDefault, progress)
parallelJobManager := commons.NewParallelJobManager(filesystem, commons.MaxParallelJobThreadNumDefault, progress)
parallelJobManager.Start()

for _, sourcePath := range sourcePaths {
Expand Down
43 changes: 26 additions & 17 deletions cmd/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

irodsclient_fs "github.com/cyverse/go-irodsclient/fs"
irodsclient_util "github.com/cyverse/go-irodsclient/irods/util"
"github.com/cyverse/gocommands/commons"
"github.com/jedib0t/go-pretty/v6/progress"
log "github.com/sirupsen/logrus"
Expand All @@ -26,6 +27,8 @@ func AddGetCommand(rootCmd *cobra.Command) {
commons.SetCommonFlags(getCmd)

getCmd.Flags().BoolP("force", "f", false, "Get forcefully")
getCmd.Flags().Int("download_thread_num", commons.MaxParallelJobThreadNumDefault, "Specify the number of download threads (default is 5)")
getCmd.Flags().String("tcp_buffer_size", strconv.Itoa(commons.TcpBufferSizeDefault), "Specify TCP socket buffer size (default is 4MB)")
getCmd.Flags().Bool("progress", false, "Display progress bar")
getCmd.Flags().Bool("diff", false, "Get files having different content")
getCmd.Flags().Bool("no_hash", false, "Compare files without using md5 hash")
Expand Down Expand Up @@ -60,6 +63,26 @@ func processGetCommand(command *cobra.Command, args []string) error {
}
}

downloadThreadNum := commons.MaxParallelJobThreadNumDefault
downloadThreadNumFlag := command.Flags().Lookup("download_thread_num")
if downloadThreadNumFlag != nil {
n, err := strconv.ParseInt(downloadThreadNumFlag.Value.String(), 10, 32)
if err == nil {
downloadThreadNum = int(n)
}
}

maxConnectionNum := downloadThreadNum + 2 // 2 for metadata op

tcpBufferSize := commons.TcpBufferSizeDefault
tcpBufferSizeFlag := command.Flags().Lookup("tcp_buffer_size")
if tcpBufferSizeFlag != nil {
n, err := commons.ParseSize(tcpBufferSizeFlag.Value.String())
if err == nil {
tcpBufferSize = int(n)
}
}

progress := false
progressFlag := command.Flags().Lookup("progress")
if progressFlag != nil {
Expand Down Expand Up @@ -126,7 +149,7 @@ func processGetCommand(command *cobra.Command, args []string) error {

// Create a file system
account := commons.GetAccount()
filesystem, err := commons.GetIRODSFSClient(account)
filesystem, err := commons.GetIRODSFSClientAdvanced(account, maxConnectionNum, tcpBufferSize)
if err != nil {
return xerrors.Errorf("failed to get iRODS FS Client: %w", err)
}
Expand All @@ -145,7 +168,7 @@ func processGetCommand(command *cobra.Command, args []string) error {
sourcePaths = args[:len(args)-1]
}

parallelJobManager := commons.NewParallelJobManager(filesystem, commons.MaxThreadNumDefault, progress)
parallelJobManager := commons.NewParallelJobManager(filesystem, downloadThreadNum, progress)
parallelJobManager.Start()

for _, sourcePath := range sourcePaths {
Expand Down Expand Up @@ -269,7 +292,7 @@ func getOne(parallelJobManager *commons.ParallelJobManager, sourcePath string, t
}
}

threadsRequired := computeThreadsRequiredForGet(sourceEntry.Size)
threadsRequired := irodsclient_util.GetNumTasksForParallelTransfer(sourceEntry.Size)
parallelJobManager.Schedule(sourcePath, getTask, threadsRequired, progress.UnitsBytes)
logger.Debugf("scheduled a data object download %s to %s", sourcePath, targetFilePath)
} else {
Expand Down Expand Up @@ -299,17 +322,3 @@ func getOne(parallelJobManager *commons.ParallelJobManager, sourcePath string, t
}
return nil
}

func computeThreadsRequiredForGet(size int64) int {
// compute num threads required
threadsRequired := 1
// 4MB is one thread, max 4 threads
if size > 4*1024*1024 {
threadsRequired = int(size / 4 * 1024 * 1024)
if threadsRequired > 4 {
threadsRequired = 4
}
}

return threadsRequired
}
39 changes: 26 additions & 13 deletions cmd/subcmd/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"

"github.com/cyverse/go-irodsclient/fs"
irodsclient_util "github.com/cyverse/go-irodsclient/irods/util"
"github.com/cyverse/gocommands/commons"
"github.com/jedib0t/go-pretty/v6/progress"
log "github.com/sirupsen/logrus"
Expand All @@ -27,6 +28,8 @@ func AddPutCommand(rootCmd *cobra.Command) {
commons.SetCommonFlags(putCmd)

putCmd.Flags().BoolP("force", "f", false, "Put forcefully")
putCmd.Flags().Int("upload_thread_num", commons.MaxParallelJobThreadNumDefault, "Specify the number of upload threads (default is 5)")
putCmd.Flags().String("tcp_buffer_size", strconv.Itoa(commons.TcpBufferSizeDefault), "Specify TCP socket buffer size (default is 4MB)")
putCmd.Flags().Bool("progress", false, "Display progress bar")
putCmd.Flags().Bool("diff", false, "Put files having different content")
putCmd.Flags().Bool("no_hash", false, "Compare files without using md5 hash")
Expand Down Expand Up @@ -62,6 +65,26 @@ func processPutCommand(command *cobra.Command, args []string) error {
}
}

uploadThreadNum := commons.MaxParallelJobThreadNumDefault
uploadThreadNumFlag := command.Flags().Lookup("upload_thread_num")
if uploadThreadNumFlag != nil {
n, err := strconv.ParseInt(uploadThreadNumFlag.Value.String(), 10, 32)
if err == nil {
uploadThreadNum = int(n)
}
}

maxConnectionNum := uploadThreadNum + 2 // 2 for metadata op

tcpBufferSize := commons.TcpBufferSizeDefault
tcpBufferSizeFlag := command.Flags().Lookup("tcp_buffer_size")
if tcpBufferSizeFlag != nil {
n, err := commons.ParseSize(tcpBufferSizeFlag.Value.String())
if err == nil {
tcpBufferSize = int(n)
}
}

progress := false
progressFlag := command.Flags().Lookup("progress")
if progressFlag != nil {
Expand Down Expand Up @@ -139,7 +162,7 @@ func processPutCommand(command *cobra.Command, args []string) error {

// Create a file system
account := commons.GetAccount()
filesystem, err := commons.GetIRODSFSClient(account)
filesystem, err := commons.GetIRODSFSClientAdvanced(account, maxConnectionNum, tcpBufferSize)
if err != nil {
return xerrors.Errorf("failed to get iRODS FS Client: %w", err)
}
Expand All @@ -158,7 +181,7 @@ func processPutCommand(command *cobra.Command, args []string) error {
sourcePaths = args[:len(args)-1]
}

parallelJobManager := commons.NewParallelJobManager(filesystem, commons.MaxThreadNumDefault, progress)
parallelJobManager := commons.NewParallelJobManager(filesystem, uploadThreadNum, progress)
parallelJobManager.Start()

for _, sourcePath := range sourcePaths {
Expand Down Expand Up @@ -311,17 +334,7 @@ func putOne(parallelJobManager *commons.ParallelJobManager, sourcePath string, t

func computeThreadsRequiredForPut(fs *fs.FileSystem, size int64) int {
if fs.SupportParallelUpload() {
// compute num threads required
threadsRequired := 1
// 4MB is one thread, max 4 threads
if size > 4*1024*1024 {
threadsRequired = int(size / 4 * 1024 * 1024)
if threadsRequired > 4 {
threadsRequired = 4
}
}

return threadsRequired
return irodsclient_util.GetNumTasksForParallelTransfer(size)
}

return 1
Expand Down
33 changes: 29 additions & 4 deletions cmd/subcmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ func AddSyncCommand(rootCmd *cobra.Command) {

syncCmd.Flags().Bool("clear_leftover", false, "Clear leftover bundle files")
syncCmd.Flags().Int("max_file_num", commons.MaxBundleFileNumDefault, "Specify max file number in a bundle file")
syncCmd.Flags().Int64("max_file_size", commons.MaxBundleFileSizeDefault, "Specify max file size of a bundle file")
syncCmd.Flags().String("max_file_size", strconv.FormatInt(commons.MaxBundleFileSizeDefault, 10), "Specify max file size of a bundle file")
syncCmd.Flags().Int("upload_thread_num", commons.UploadTreadNumDefault, "Specify the number of upload threads")
syncCmd.Flags().Int("download_thread_num", commons.MaxParallelJobThreadNumDefault, "Specify the number of download threads")
syncCmd.Flags().String("tcp_buffer_size", strconv.Itoa(commons.TcpBufferSizeDefault), "Specify TCP socket buffer size (default is 4MB)")
syncCmd.Flags().Bool("progress", false, "Display progress bar")
syncCmd.Flags().String("local_temp", os.TempDir(), "Specify local temp directory path to create bundle files")
syncCmd.Flags().String("irods_temp", "", "Specify iRODS temp directory path to upload bundle files to")
Expand Down Expand Up @@ -85,7 +87,7 @@ func processSyncCommand(command *cobra.Command, args []string) error {
maxFileSize := commons.MaxBundleFileSizeDefault
maxFileSizeFlag := command.Flags().Lookup("max_file_size")
if maxFileSizeFlag != nil {
n, err := strconv.ParseInt(maxFileSizeFlag.Value.String(), 10, 64)
n, err := commons.ParseSize(maxFileSizeFlag.Value.String())
if err == nil {
maxFileSize = n
}
Expand All @@ -100,6 +102,29 @@ func processSyncCommand(command *cobra.Command, args []string) error {
}
}

downloadThreadNum := commons.MaxParallelJobThreadNumDefault
downloadThreadNumFlag := command.Flags().Lookup("download_thread_num")
if downloadThreadNumFlag != nil {
n, err := strconv.ParseInt(downloadThreadNumFlag.Value.String(), 10, 32)
if err == nil {
downloadThreadNum = int(n)
}
}

maxConnectionNum := uploadThreadNum + 2 + 2 // 2 for metadata op, 2 for extraction
if downloadThreadNum+2 > maxConnectionNum {
maxConnectionNum = downloadThreadNum + 2
}

tcpBufferSize := commons.TcpBufferSizeDefault
tcpBufferSizeFlag := command.Flags().Lookup("tcp_buffer_size")
if tcpBufferSizeFlag != nil {
n, err := commons.ParseSize(tcpBufferSizeFlag.Value.String())
if err == nil {
tcpBufferSize = int(n)
}
}

progress := false
progressFlag := command.Flags().Lookup("progress")
if progressFlag != nil {
Expand Down Expand Up @@ -180,7 +205,7 @@ func processSyncCommand(command *cobra.Command, args []string) error {

// Create a file system
account := commons.GetAccount()
filesystem, err := commons.GetIRODSFSClient(account)
filesystem, err := commons.GetIRODSFSClientAdvanced(account, maxConnectionNum, tcpBufferSize)
if err != nil {
return xerrors.Errorf("failed to get iRODS FS Client: %w", err)
}
Expand Down Expand Up @@ -335,7 +360,7 @@ func syncFromLocal(filesystem *fs.FileSystem, sourcePaths []string, targetPath s
}

func syncFromRemote(filesystem *fs.FileSystem, sourcePaths []string, targetPath string, progress bool, noHash bool) error {
parallelJobManager := commons.NewParallelJobManager(filesystem, commons.MaxThreadNumDefault, progress)
parallelJobManager := commons.NewParallelJobManager(filesystem, commons.MaxParallelJobThreadNumDefault, progress)
parallelJobManager.Start()

for _, sourcePath := range sourcePaths {
Expand Down
2 changes: 0 additions & 2 deletions commons/bundletransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ const (
MaxBundleFileNumDefault int = 50
MaxBundleFileSizeDefault int64 = 2 * 1024 * 1024 * 1024 // 2GB
MinBundleFileNumDefault int = 1 // it seems untar recreates dir and changes collection ID, causing getting collection by ID fail
UploadTreadNumDefault int = 5
UploadTreadNumMax int = 20
)

const (
Expand Down
7 changes: 7 additions & 0 deletions commons/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package commons

const (
UploadTreadNumDefault int = 5
UploadTreadNumMax int = 20
TcpBufferSizeDefault int = 4 * 1024 * 1024
)
20 changes: 18 additions & 2 deletions commons/irodsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,29 @@ const (
ClientProgramName string = "gocommands"
connectionTimeout time.Duration = 10 * time.Minute
filesystemTimeout time.Duration = 10 * time.Minute
tcpBufferSize int = 4 * 1024 * 1024
)

// GetIRODSFSClient returns a file system client
func GetIRODSFSClient(account *irodsclient_types.IRODSAccount) (*irodsclient_fs.FileSystem, error) {
fsConfig := irodsclient_fs.NewFileSystemConfig(ClientProgramName, irodsclient_fs.ConnectionLifespanDefault,
filesystemTimeout, filesystemTimeout, irodsclient_fs.FileSystemConnectionMaxDefault, tcpBufferSize,
filesystemTimeout, filesystemTimeout, irodsclient_fs.FileSystemConnectionMaxDefault, TcpBufferSizeDefault,
irodsclient_fs.FileSystemTimeoutDefault, irodsclient_fs.FileSystemTimeoutDefault, []irodsclient_fs.MetadataCacheTimeoutSetting{}, true, true)

return irodsclient_fs.NewFileSystem(account, fsConfig)
}

// GetIRODSFSClientAdvanced returns a file system client
func GetIRODSFSClientAdvanced(account *irodsclient_types.IRODSAccount, maxConnection int, tcpBufferSize int) (*irodsclient_fs.FileSystem, error) {
if maxConnection < irodsclient_fs.FileSystemConnectionMaxDefault {
maxConnection = irodsclient_fs.FileSystemConnectionMaxDefault
}

if tcpBufferSize < TcpBufferSizeDefault {
tcpBufferSize = TcpBufferSizeDefault
}

fsConfig := irodsclient_fs.NewFileSystemConfig(ClientProgramName, irodsclient_fs.ConnectionLifespanDefault,
filesystemTimeout, filesystemTimeout, maxConnection, tcpBufferSize,
irodsclient_fs.FileSystemTimeoutDefault, irodsclient_fs.FileSystemTimeoutDefault, []irodsclient_fs.MetadataCacheTimeoutSetting{}, true, true)

return irodsclient_fs.NewFileSystem(account, fsConfig)
Expand Down
2 changes: 1 addition & 1 deletion commons/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// default values
const (
MaxThreadNumDefault int = 5
MaxParallelJobThreadNumDefault int = 5
)

type ParallelJobTask func(job *ParallelJob) error
Expand Down
Loading

0 comments on commit 9210aef

Please sign in to comment.