From 0ffb40364e4696c1bfba48371f1709425e5d86e7 Mon Sep 17 00:00:00 2001 From: Miyako Satori <31246794+Mikubill@users.noreply.github.com> Date: Sat, 13 Mar 2021 20:23:27 +0900 Subject: [PATCH] fix https://github.com/Mikubill/transfer/issues/29 --- apis/public/cowtransfer/api.go | 5 +- apis/public/cowtransfer/struct.go | 31 ++++- apis/public/cowtransfer/upload.go | 188 +++++++++++++----------------- go.mod | 6 +- go.sum | 7 ++ 5 files changed, 122 insertions(+), 115 deletions(-) diff --git a/apis/public/cowtransfer/api.go b/apis/public/cowtransfer/api.go index 1581c5a..02acb9c 100644 --- a/apis/public/cowtransfer/api.go +++ b/apis/public/cowtransfer/api.go @@ -2,9 +2,10 @@ package cowtransfer import ( "fmt" - "github.com/spf13/cobra" "transfer/apis" "transfer/utils" + + "github.com/spf13/cobra" ) var ( @@ -21,7 +22,7 @@ type cowTransfer struct { func (b *cowTransfer) SetArgs(cmd *cobra.Command) { cmd.Flags().IntVarP(&b.Config.Parallel, "parallel", "p", 2, "Set the number of upload threads") cmd.Flags().StringVarP(&b.Config.token, "cookie", "c", "", "Your user cookie (optional)") - cmd.Flags().IntVarP(&b.Config.blockSize, "block", "", 262144, "Upload block size") + cmd.Flags().IntVarP(&b.Config.blockSize, "block", "", 1200000, "Upload block size") cmd.Flags().IntVarP(&b.Config.interval, "timeout", "t", 10, "Request retry/timeout limit in second") cmd.Flags().BoolVarP(&b.Config.singleMode, "single", "s", false, "Upload multi files in a single link") cmd.Flags().BoolVarP(&b.Config.hashCheck, "hash", "", false, "Check hash after block upload") diff --git a/apis/public/cowtransfer/struct.go b/apis/public/cowtransfer/struct.go index 5ad3ce8..2608b15 100644 --- a/apis/public/cowtransfer/struct.go +++ b/apis/public/cowtransfer/struct.go @@ -10,7 +10,8 @@ import ( ) type requestConfig struct { - debug bool + debug bool + action string //retry int timeout time.Duration modifier func(r *http.Request) @@ -24,7 +25,7 @@ type uploadPart struct { type uploadConfig struct { wg *sync.WaitGroup - token string + config *initResp hashMap *cmap.ConcurrentMap } @@ -38,6 +39,15 @@ type cowOptions struct { passCode string } +type initResp struct { + Token string + TransferGUID string + FileGUID string + EncodeID string + Exp int64 `json:"expireAt"` + ID string `json:"uploadId"` +} + type prepareSendResp struct { UploadToken string `json:"uptoken"` TransferGUID string `json:"transferguid"` @@ -54,8 +64,21 @@ type beforeSendResp struct { } type uploadResponse struct { - Ticket string `json:"ctx"` - Hash int64 `json:"crc32"` + Etag string `json:"etag"` + MD5 string `json:"md5"` +} + +type slek struct { + ETag string `json:"etag"` + Part int64 `json:"partNumber"` +} + +type clds struct { + Parts []slek `json:"parts"` + FName string `json:"fname"` + Mimetype string `json:"mimeType"` + Metadata map[string]string + Vars map[string]string } type finishResponse struct { diff --git a/apis/public/cowtransfer/upload.go b/apis/public/cowtransfer/upload.go index 6aeb3b5..08a8b76 100644 --- a/apis/public/cowtransfer/upload.go +++ b/apis/public/cowtransfer/upload.go @@ -2,17 +2,15 @@ package cowtransfer import ( "bytes" + "crypto/md5" "encoding/json" "fmt" - "hash/crc32" "io" "io/ioutil" "log" - "math" "mime/multipart" "net/http" "strconv" - "strings" "sync" "time" "transfer/apis" @@ -23,15 +21,14 @@ import ( ) const ( - prepareSend = "https://cowtransfer.com/transfer/preparesend" - setPassword = "https://cowtransfer.com/transfer/v2/bindpasscode" - beforeUpload = "https://cowtransfer.com/transfer/beforeupload" - uploadInitEndpoint = "https://upload.qiniup.com/mkblk/%d" - uploadEndpoint = "https://upload.qiniup.com/bput/%s/%d" - uploadFinish = "https://cowtransfer.com/transfer/uploaded" - uploadComplete = "https://cowtransfer.com/transfer/complete" - uploadMergeFile = "https://upload.qiniup.com/mkfile/%s/key/%s/fname/%s" - block = 4194304 + prepareSend = "https://cowtransfer.com/transfer/preparesend" + setPassword = "https://cowtransfer.com/transfer/v2/bindpasscode" + beforeUpload = "https://cowtransfer.com/transfer/beforeupload" + uploadFinish = "https://cowtransfer.com/transfer/uploaded" + uploadComplete = "https://cowtransfer.com/transfer/complete" + initUpload = "https://upload-fog-cn-east-1.qiniup.com/buckets/cowtransfer-yz/objects/%s/uploads" + doUpload = "https://upload-fog-cn-east-1.qiniup.com/buckets/cowtransfer-yz/objects/%s/uploads/%s/%d" + finUpload = "https://upload-fog-cn-east-1.qiniup.com/buckets/cowtransfer-yz/objects/%s/uploads/%s" ) func (b *cowTransfer) InitUpload(_ []string, sizes []int64) error { @@ -87,15 +84,18 @@ func (b cowTransfer) DoUpload(name string, size int64, file io.Reader) error { for i := 0; i < b.Config.Parallel; i++ { go b.uploader(&ch, uploadConfig{ wg: wg, - token: config.UploadToken, + config: config, hashMap: &hashMap, }) } + if b.Config.blockSize < 1200000 { + b.Config.blockSize = 1200000 + } part := int64(0) for { part++ - buf := make([]byte, block) + buf := make([]byte, b.Config.blockSize) nr, err := io.ReadFull(file, buf) if nr <= 0 { break @@ -127,75 +127,17 @@ func (b cowTransfer) DoUpload(name string, size int64, file io.Reader) error { func (b cowTransfer) uploader(ch *chan *uploadPart, conf uploadConfig) { for item := range *ch { Start: - postURL := fmt.Sprintf(uploadInitEndpoint, len(item.content)) + postURL := fmt.Sprintf(doUpload, conf.config.EncodeID, conf.config.ID, item.count) if apis.DebugMode { log.Printf("part %d start uploading, size: %d", item.count, len(item.content)) log.Printf("part %d posting %s", item.count, postURL) } - // makeBlock - body, err := newPostRequest(postURL, nil, requestConfig{ - debug: apis.DebugMode, - //retry: 0, - timeout: time.Duration(b.Config.interval) * time.Second, - modifier: addToken(conf.token), - }) - if err != nil { - if apis.DebugMode { - log.Printf("failed make mkblk on part %d, error: %s (retrying)", - item.count, err) - } - if apis.DebugMode { - log.Printf("part %d retrying", item.count) - } - goto Start - } - var rBody uploadResponse - if err := json.Unmarshal(body, &rBody); err != nil { - if apis.DebugMode { - log.Printf("failed make mkblk on part %d error: %v, returns: %s (retrying)", - item.count, string(body), strings.ReplaceAll(err.Error(), "\n", "")) - } - if apis.DebugMode { - log.Printf("part %d retrying", item.count) - } - goto Start - } - //blockPut - failFlag := false - blockCount := int(math.Ceil(float64(len(item.content)) / float64(b.Config.blockSize))) - if apis.DebugMode { - log.Printf("init: part %d block %d ", item.count, blockCount) - } - ticket := rBody.Ticket - for i := 0; i < blockCount; i++ { - start := i * b.Config.blockSize - end := (i + 1) * b.Config.blockSize - var buf []byte - if end > len(item.content) { - end = len(item.content) - } - buf = item.content[start:end] - if apis.DebugMode { - log.Printf("part %d block %d %d - [%d:%d] start upload...", item.count, i, len(buf), start, end) - } - postURL = fmt.Sprintf(uploadEndpoint, ticket, start) - ticket, err = b.blockPut(postURL, buf, conf.token) - if err != nil { - if apis.DebugMode { - log.Printf("part %d block %d failed. error: %s", item.count, i, err) - } - failFlag = true - break - } - if item.bar != nil { - item.bar.Add(len(buf)) - } - } - if failFlag { + ticket, err := b.blockPut(postURL, item.content, conf.config.Token) + if err != nil { if apis.DebugMode { - log.Printf("part %d retrying", item.count) + log.Printf("part %d failed. error: %s", item.count, err) } goto Start } @@ -205,6 +147,9 @@ func (b cowTransfer) uploader(ch *chan *uploadPart, conf uploadConfig) { } conf.hashMap.Set(strconv.FormatInt(item.count, 10), ticket) conf.wg.Done() + if item.bar != nil { + item.bar.Add(len(item.content)) + } } } @@ -212,8 +157,10 @@ func (b cowTransfer) uploader(ch *chan *uploadPart, conf uploadConfig) { func (b cowTransfer) blockPut(postURL string, buf []byte, token string) (string, error) { data := new(bytes.Buffer) data.Write(buf) - body, err := newPostRequest(postURL, data, requestConfig{ - debug: apis.DebugMode, + body, err := newRequest(postURL, data, requestConfig{ + debug: apis.DebugMode, + action: "PUT", + //retry: 0, timeout: time.Duration(b.Config.interval) * time.Second, modifier: addToken(token), @@ -238,7 +185,7 @@ func (b cowTransfer) blockPut(postURL string, buf []byte, token string) (string, //return b.blockPut(postURL, buf, token, retry+1) } if b.Config.hashCheck { - if hashBlock(buf) != rBody.Hash { + if hashBlock(buf) != rBody.MD5 { if apis.DebugMode { log.Printf("block hashcheck failed (retrying)") } @@ -248,45 +195,44 @@ func (b cowTransfer) blockPut(postURL string, buf []byte, token string) (string, //return b.blockPut(postURL, buf, token, retry+1) } } - return rBody.Ticket, nil + return rBody.Etag, nil } -func hashBlock(buf []byte) int64 { - return int64(crc32.ChecksumIEEE(buf)) +func hashBlock(buf []byte) string { + return fmt.Sprintf("%x", md5.Sum(buf)) } -func (b cowTransfer) finishUpload(config *prepareSendResp, name string, size int64, hashMap *cmap.ConcurrentMap, limit int64) error { +func (b cowTransfer) finishUpload(config *initResp, name string, size int64, hashMap *cmap.ConcurrentMap, limit int64) error { if apis.DebugMode { log.Println("finishing upload...") log.Println("step1 -> api/mergeFile") } - filename := utils.URLSafeEncode(name) - fileLocate := utils.URLSafeEncode(fmt.Sprintf("%s/%s/%s", config.Prefix, config.TransferGUID, name)) - mergeFileURL := fmt.Sprintf(uploadMergeFile, strconv.FormatInt(size, 10), fileLocate, filename) - postBody := "" - cal := 0 + mergeFileURL := fmt.Sprintf(finUpload, config.EncodeID, config.ID) + var postData clds for i := int64(1); i <= limit; i++ { item, alimasu := hashMap.Get(strconv.FormatInt(i, 10)) if alimasu { - postBody += item.(string) + "," + postData.Parts = append(postData.Parts, slek{ + ETag: item.(string), + Part: i, + }) } - cal++ } - if apis.DebugMode { - log.Println("resource status: ", cal) - } - if strings.HasSuffix(postBody, ",") { - postBody = postBody[:len(postBody)-1] + postData.FName = name + postBody, err := json.Marshal(postData) + if err != nil { + return err } if apis.DebugMode { log.Printf("merge payload: %s\n", postBody) } reader := bytes.NewReader([]byte(postBody)) - resp, err := newPostRequest(mergeFileURL, reader, requestConfig{ - debug: apis.DebugMode, + resp, err := newRequest(mergeFileURL, reader, requestConfig{ + debug: apis.DebugMode, + action: "POST", //retry: 0, timeout: time.Duration(b.Config.interval) * time.Second, - modifier: addToken(config.UploadToken), + modifier: addToken(config.Token), }) if err != nil { return err @@ -315,8 +261,8 @@ func (b cowTransfer) finishUpload(config *prepareSendResp, name string, size int if err != nil { return err } - if string(body) != "true" { - return fmt.Errorf("finish upload failed: status != true") + if string(body) != "true" || bytes.Contains(body, []byte("error")) { + return fmt.Errorf("finish upload failed: %s", body) } return nil } @@ -412,7 +358,7 @@ func (b cowTransfer) getSendConfig(totalSize int64) (*prepareSendResp, error) { return config, nil } -func (b cowTransfer) getUploadConfig(name string, size int64, config prepareSendResp) (*prepareSendResp, error) { +func (b cowTransfer) getUploadConfig(name string, size int64, config prepareSendResp) (*initResp, error) { if apis.DebugMode { log.Println("retrieving upload config...") @@ -440,13 +386,43 @@ func (b cowTransfer) getUploadConfig(name string, size int64, config prepareSend var beforeResp *beforeSendResp if err = json.Unmarshal(resp, &beforeResp); err != nil { return nil, err - } else { - config.FileGUID = beforeResp.FileGuid } - return &config, nil + config.FileGUID = beforeResp.FileGuid + + data = map[string]string{ + "transferGuid": config.TransferGUID, + "storagePrefix": config.Prefix, + } + p, err := json.Marshal(data) + if err != nil { + return nil, err + } + w := utils.URLSafeEncode(fmt.Sprintf("%s/%s/%s", config.Prefix, config.TransferGUID, name)) + inits := fmt.Sprintf(initUpload, w) + resp, err = newRequest(inits, bytes.NewReader(p), requestConfig{ + debug: apis.DebugMode, + action: "POST", + //retry: 0, + timeout: time.Duration(b.Config.interval) * time.Second, + modifier: addToken(config.UploadToken), + }) + if err != nil { + return nil, err + } + var initResp *initResp + if err = json.Unmarshal(resp, &initResp); err != nil { + return nil, err + } + initResp.Token = config.UploadToken + initResp.EncodeID = w + initResp.TransferGUID = config.TransferGUID + initResp.FileGUID = config.FileGUID + + // return config, nil + return initResp, nil } -func newPostRequest(link string, postBody io.Reader, config requestConfig) ([]byte, error) { +func newRequest(link string, postBody io.Reader, config requestConfig) ([]byte, error) { if config.debug { //if config.retry != 0 { // log.Printf("retrying: %v", config.retry) @@ -457,7 +433,7 @@ func newPostRequest(link string, postBody io.Reader, config requestConfig) ([]by if config.timeout != 0 { client = http.Client{Timeout: config.timeout} } - req, err := http.NewRequest("POST", link, postBody) + req, err := http.NewRequest(config.action, link, postBody) if err != nil { if config.debug { log.Printf("build requests returns error: %v", err) diff --git a/go.mod b/go.mod index bfd096d..4f1c582 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module transfer go 1.14 require ( - github.com/cheggaaa/pb/v3 v3.0.5 + github.com/cheggaaa/pb/v3 v3.0.6 github.com/fatih/color v1.10.0 // indirect github.com/google/uuid v1.2.0 github.com/mattn/go-runewidth v0.0.10 // indirect github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 github.com/rivo/uniseg v0.2.0 // indirect - github.com/spf13/cobra v1.1.1 + github.com/spf13/cobra v1.1.3 golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 - golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect + golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 // indirect ) diff --git a/go.sum b/go.sum index 8cc95c6..053eaad 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cheggaaa/pb/v3 v3.0.5 h1:lmZOti7CraK9RSjzExsY53+WWfub9Qv13B5m4ptEoPE= github.com/cheggaaa/pb/v3 v3.0.5/go.mod h1:X1L61/+36nz9bjIsrDU52qHKOQukUQe2Ge+YvGuquCw= +github.com/cheggaaa/pb/v3 v3.0.6 h1:ULPm1wpzvj60FvmCrX7bIaB80UgbhI+zSaQJKRfCbAs= +github.com/cheggaaa/pb/v3 v3.0.6/go.mod h1:X1L61/+36nz9bjIsrDU52qHKOQukUQe2Ge+YvGuquCw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -175,6 +177,8 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v1.1.1 h1:KfztREH0tPxJJ+geloSLaAkaPkr4ki2Er5quFV1TDo4= github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI= +github.com/spf13/cobra v1.1.3 h1:xghbfqPkxzxP3C/f3n5DdpAbdKLj4ZE4BWQI362l53M= +github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= @@ -260,6 +264,8 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY= +golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -314,6 +320,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=