Skip to content

Commit

Permalink
fix #29
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikubill committed Mar 13, 2021
1 parent ade6af7 commit 0ffb403
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 115 deletions.
5 changes: 3 additions & 2 deletions apis/public/cowtransfer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package cowtransfer

import (
"fmt"
"github.com/spf13/cobra"
"transfer/apis"
"transfer/utils"

"github.com/spf13/cobra"
)

var (
Expand All @@ -21,7 +22,7 @@ type cowTransfer struct {
func (b *cowTransfer) SetArgs(cmd *cobra.Command) {
cmd.Flags().IntVarP(&b.Config.Parallel, "parallel", "p", 2, "Set the number of upload threads")
cmd.Flags().StringVarP(&b.Config.token, "cookie", "c", "", "Your user cookie (optional)")
cmd.Flags().IntVarP(&b.Config.blockSize, "block", "", 262144, "Upload block size")
cmd.Flags().IntVarP(&b.Config.blockSize, "block", "", 1200000, "Upload block size")
cmd.Flags().IntVarP(&b.Config.interval, "timeout", "t", 10, "Request retry/timeout limit in second")
cmd.Flags().BoolVarP(&b.Config.singleMode, "single", "s", false, "Upload multi files in a single link")
cmd.Flags().BoolVarP(&b.Config.hashCheck, "hash", "", false, "Check hash after block upload")
Expand Down
31 changes: 27 additions & 4 deletions apis/public/cowtransfer/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
)

type requestConfig struct {
debug bool
debug bool
action string
//retry int
timeout time.Duration
modifier func(r *http.Request)
Expand All @@ -24,7 +25,7 @@ type uploadPart struct {

type uploadConfig struct {
wg *sync.WaitGroup
token string
config *initResp
hashMap *cmap.ConcurrentMap
}

Expand All @@ -38,6 +39,15 @@ type cowOptions struct {
passCode string
}

type initResp struct {
Token string
TransferGUID string
FileGUID string
EncodeID string
Exp int64 `json:"expireAt"`
ID string `json:"uploadId"`
}

type prepareSendResp struct {
UploadToken string `json:"uptoken"`
TransferGUID string `json:"transferguid"`
Expand All @@ -54,8 +64,21 @@ type beforeSendResp struct {
}

type uploadResponse struct {
Ticket string `json:"ctx"`
Hash int64 `json:"crc32"`
Etag string `json:"etag"`
MD5 string `json:"md5"`
}

type slek struct {
ETag string `json:"etag"`
Part int64 `json:"partNumber"`
}

type clds struct {
Parts []slek `json:"parts"`
FName string `json:"fname"`
Mimetype string `json:"mimeType"`
Metadata map[string]string
Vars map[string]string
}

type finishResponse struct {
Expand Down
188 changes: 82 additions & 106 deletions apis/public/cowtransfer/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@ package cowtransfer

import (
"bytes"
"crypto/md5"
"encoding/json"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"log"
"math"
"mime/multipart"
"net/http"
"strconv"
"strings"
"sync"
"time"
"transfer/apis"
Expand All @@ -23,15 +21,14 @@ import (
)

const (
prepareSend = "https://cowtransfer.com/transfer/preparesend"
setPassword = "https://cowtransfer.com/transfer/v2/bindpasscode"
beforeUpload = "https://cowtransfer.com/transfer/beforeupload"
uploadInitEndpoint = "https://upload.qiniup.com/mkblk/%d"
uploadEndpoint = "https://upload.qiniup.com/bput/%s/%d"
uploadFinish = "https://cowtransfer.com/transfer/uploaded"
uploadComplete = "https://cowtransfer.com/transfer/complete"
uploadMergeFile = "https://upload.qiniup.com/mkfile/%s/key/%s/fname/%s"
block = 4194304
prepareSend = "https://cowtransfer.com/transfer/preparesend"
setPassword = "https://cowtransfer.com/transfer/v2/bindpasscode"
beforeUpload = "https://cowtransfer.com/transfer/beforeupload"
uploadFinish = "https://cowtransfer.com/transfer/uploaded"
uploadComplete = "https://cowtransfer.com/transfer/complete"
initUpload = "https://upload-fog-cn-east-1.qiniup.com/buckets/cowtransfer-yz/objects/%s/uploads"
doUpload = "https://upload-fog-cn-east-1.qiniup.com/buckets/cowtransfer-yz/objects/%s/uploads/%s/%d"
finUpload = "https://upload-fog-cn-east-1.qiniup.com/buckets/cowtransfer-yz/objects/%s/uploads/%s"
)

func (b *cowTransfer) InitUpload(_ []string, sizes []int64) error {
Expand Down Expand Up @@ -87,15 +84,18 @@ func (b cowTransfer) DoUpload(name string, size int64, file io.Reader) error {
for i := 0; i < b.Config.Parallel; i++ {
go b.uploader(&ch, uploadConfig{
wg: wg,
token: config.UploadToken,
config: config,
hashMap: &hashMap,
})
}

if b.Config.blockSize < 1200000 {
b.Config.blockSize = 1200000
}
part := int64(0)
for {
part++
buf := make([]byte, block)
buf := make([]byte, b.Config.blockSize)
nr, err := io.ReadFull(file, buf)
if nr <= 0 {
break
Expand Down Expand Up @@ -127,75 +127,17 @@ func (b cowTransfer) DoUpload(name string, size int64, file io.Reader) error {
func (b cowTransfer) uploader(ch *chan *uploadPart, conf uploadConfig) {
for item := range *ch {
Start:
postURL := fmt.Sprintf(uploadInitEndpoint, len(item.content))
postURL := fmt.Sprintf(doUpload, conf.config.EncodeID, conf.config.ID, item.count)
if apis.DebugMode {
log.Printf("part %d start uploading, size: %d", item.count, len(item.content))
log.Printf("part %d posting %s", item.count, postURL)
}

// makeBlock
body, err := newPostRequest(postURL, nil, requestConfig{
debug: apis.DebugMode,
//retry: 0,
timeout: time.Duration(b.Config.interval) * time.Second,
modifier: addToken(conf.token),
})
if err != nil {
if apis.DebugMode {
log.Printf("failed make mkblk on part %d, error: %s (retrying)",
item.count, err)
}
if apis.DebugMode {
log.Printf("part %d retrying", item.count)
}
goto Start
}
var rBody uploadResponse
if err := json.Unmarshal(body, &rBody); err != nil {
if apis.DebugMode {
log.Printf("failed make mkblk on part %d error: %v, returns: %s (retrying)",
item.count, string(body), strings.ReplaceAll(err.Error(), "\n", ""))
}
if apis.DebugMode {
log.Printf("part %d retrying", item.count)
}
goto Start
}

//blockPut
failFlag := false
blockCount := int(math.Ceil(float64(len(item.content)) / float64(b.Config.blockSize)))
if apis.DebugMode {
log.Printf("init: part %d block %d ", item.count, blockCount)
}
ticket := rBody.Ticket
for i := 0; i < blockCount; i++ {
start := i * b.Config.blockSize
end := (i + 1) * b.Config.blockSize
var buf []byte
if end > len(item.content) {
end = len(item.content)
}
buf = item.content[start:end]
if apis.DebugMode {
log.Printf("part %d block %d %d - [%d:%d] start upload...", item.count, i, len(buf), start, end)
}
postURL = fmt.Sprintf(uploadEndpoint, ticket, start)
ticket, err = b.blockPut(postURL, buf, conf.token)
if err != nil {
if apis.DebugMode {
log.Printf("part %d block %d failed. error: %s", item.count, i, err)
}
failFlag = true
break
}
if item.bar != nil {
item.bar.Add(len(buf))
}
}
if failFlag {
ticket, err := b.blockPut(postURL, item.content, conf.config.Token)
if err != nil {
if apis.DebugMode {
log.Printf("part %d retrying", item.count)
log.Printf("part %d failed. error: %s", item.count, err)
}
goto Start
}
Expand All @@ -205,15 +147,20 @@ func (b cowTransfer) uploader(ch *chan *uploadPart, conf uploadConfig) {
}
conf.hashMap.Set(strconv.FormatInt(item.count, 10), ticket)
conf.wg.Done()
if item.bar != nil {
item.bar.Add(len(item.content))
}
}

}

func (b cowTransfer) blockPut(postURL string, buf []byte, token string) (string, error) {
data := new(bytes.Buffer)
data.Write(buf)
body, err := newPostRequest(postURL, data, requestConfig{
debug: apis.DebugMode,
body, err := newRequest(postURL, data, requestConfig{
debug: apis.DebugMode,
action: "PUT",

//retry: 0,
timeout: time.Duration(b.Config.interval) * time.Second,
modifier: addToken(token),
Expand All @@ -238,7 +185,7 @@ func (b cowTransfer) blockPut(postURL string, buf []byte, token string) (string,
//return b.blockPut(postURL, buf, token, retry+1)
}
if b.Config.hashCheck {
if hashBlock(buf) != rBody.Hash {
if hashBlock(buf) != rBody.MD5 {
if apis.DebugMode {
log.Printf("block hashcheck failed (retrying)")
}
Expand All @@ -248,45 +195,44 @@ func (b cowTransfer) blockPut(postURL string, buf []byte, token string) (string,
//return b.blockPut(postURL, buf, token, retry+1)
}
}
return rBody.Ticket, nil
return rBody.Etag, nil
}

func hashBlock(buf []byte) int64 {
return int64(crc32.ChecksumIEEE(buf))
func hashBlock(buf []byte) string {
return fmt.Sprintf("%x", md5.Sum(buf))
}

func (b cowTransfer) finishUpload(config *prepareSendResp, name string, size int64, hashMap *cmap.ConcurrentMap, limit int64) error {
func (b cowTransfer) finishUpload(config *initResp, name string, size int64, hashMap *cmap.ConcurrentMap, limit int64) error {
if apis.DebugMode {
log.Println("finishing upload...")
log.Println("step1 -> api/mergeFile")
}
filename := utils.URLSafeEncode(name)
fileLocate := utils.URLSafeEncode(fmt.Sprintf("%s/%s/%s", config.Prefix, config.TransferGUID, name))
mergeFileURL := fmt.Sprintf(uploadMergeFile, strconv.FormatInt(size, 10), fileLocate, filename)
postBody := ""
cal := 0
mergeFileURL := fmt.Sprintf(finUpload, config.EncodeID, config.ID)
var postData clds
for i := int64(1); i <= limit; i++ {
item, alimasu := hashMap.Get(strconv.FormatInt(i, 10))
if alimasu {
postBody += item.(string) + ","
postData.Parts = append(postData.Parts, slek{
ETag: item.(string),
Part: i,
})
}
cal++
}
if apis.DebugMode {
log.Println("resource status: ", cal)
}
if strings.HasSuffix(postBody, ",") {
postBody = postBody[:len(postBody)-1]
postData.FName = name
postBody, err := json.Marshal(postData)
if err != nil {
return err
}
if apis.DebugMode {
log.Printf("merge payload: %s\n", postBody)
}
reader := bytes.NewReader([]byte(postBody))
resp, err := newPostRequest(mergeFileURL, reader, requestConfig{
debug: apis.DebugMode,
resp, err := newRequest(mergeFileURL, reader, requestConfig{
debug: apis.DebugMode,
action: "POST",
//retry: 0,
timeout: time.Duration(b.Config.interval) * time.Second,
modifier: addToken(config.UploadToken),
modifier: addToken(config.Token),
})
if err != nil {
return err
Expand Down Expand Up @@ -315,8 +261,8 @@ func (b cowTransfer) finishUpload(config *prepareSendResp, name string, size int
if err != nil {
return err
}
if string(body) != "true" {
return fmt.Errorf("finish upload failed: status != true")
if string(body) != "true" || bytes.Contains(body, []byte("error")) {
return fmt.Errorf("finish upload failed: %s", body)
}
return nil
}
Expand Down Expand Up @@ -412,7 +358,7 @@ func (b cowTransfer) getSendConfig(totalSize int64) (*prepareSendResp, error) {
return config, nil
}

func (b cowTransfer) getUploadConfig(name string, size int64, config prepareSendResp) (*prepareSendResp, error) {
func (b cowTransfer) getUploadConfig(name string, size int64, config prepareSendResp) (*initResp, error) {

if apis.DebugMode {
log.Println("retrieving upload config...")
Expand Down Expand Up @@ -440,13 +386,43 @@ func (b cowTransfer) getUploadConfig(name string, size int64, config prepareSend
var beforeResp *beforeSendResp
if err = json.Unmarshal(resp, &beforeResp); err != nil {
return nil, err
} else {
config.FileGUID = beforeResp.FileGuid
}
return &config, nil
config.FileGUID = beforeResp.FileGuid

data = map[string]string{
"transferGuid": config.TransferGUID,
"storagePrefix": config.Prefix,
}
p, err := json.Marshal(data)
if err != nil {
return nil, err
}
w := utils.URLSafeEncode(fmt.Sprintf("%s/%s/%s", config.Prefix, config.TransferGUID, name))
inits := fmt.Sprintf(initUpload, w)
resp, err = newRequest(inits, bytes.NewReader(p), requestConfig{
debug: apis.DebugMode,
action: "POST",
//retry: 0,
timeout: time.Duration(b.Config.interval) * time.Second,
modifier: addToken(config.UploadToken),
})
if err != nil {
return nil, err
}
var initResp *initResp
if err = json.Unmarshal(resp, &initResp); err != nil {
return nil, err
}
initResp.Token = config.UploadToken
initResp.EncodeID = w
initResp.TransferGUID = config.TransferGUID
initResp.FileGUID = config.FileGUID

// return config, nil
return initResp, nil
}

func newPostRequest(link string, postBody io.Reader, config requestConfig) ([]byte, error) {
func newRequest(link string, postBody io.Reader, config requestConfig) ([]byte, error) {
if config.debug {
//if config.retry != 0 {
// log.Printf("retrying: %v", config.retry)
Expand All @@ -457,7 +433,7 @@ func newPostRequest(link string, postBody io.Reader, config requestConfig) ([]by
if config.timeout != 0 {
client = http.Client{Timeout: config.timeout}
}
req, err := http.NewRequest("POST", link, postBody)
req, err := http.NewRequest(config.action, link, postBody)
if err != nil {
if config.debug {
log.Printf("build requests returns error: %v", err)
Expand Down
Loading

0 comments on commit 0ffb403

Please sign in to comment.