Skip to content

Commit

Permalink
输出下载失败的文件列表, #189
Browse files Browse the repository at this point in the history
  • Loading branch information
iikira committed May 19, 2018
1 parent f8b4325 commit 6ab5bc4
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 55 deletions.
21 changes: 20 additions & 1 deletion internal/pcscommand/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ func download(id int, downloadURL, savePath string, loadBalansers []string, clie
err = download.Execute()
close(exitChan)
if err != nil {
// 下载失败, 删去空文件
if info, infoErr := file.Stat(); infoErr == nil {
if info.Size() == 0 {
pcsCommandVerbose.Infof("[%d] remove empty file: %s\n", id, savePath)
os.Remove(savePath)
}
}
return err
}

Expand Down Expand Up @@ -213,6 +220,7 @@ func RunDownload(paths []string, option DownloadOption) {
}

var (
failedList []string
handleTaskErr = func(task *dtask, errManifest string, err error) {
if task == nil {
panic("task is nil")
Expand All @@ -235,9 +243,12 @@ func RunDownload(paths []string, option DownloadOption) {
if task.retry < task.MaxRetry {
task.retry++
dlist.PushBack(task)
} else {
failedList = append(failedList, task.path)
}
time.Sleep(3 * time.Duration(task.retry) * time.Second)
}
startTime = time.Now()
totalSize int64
)

Expand Down Expand Up @@ -385,7 +396,15 @@ func RunDownload(paths []string, option DownloadOption) {
totalSize += task.downloadInfo.Size
}

fmt.Printf("任务结束, 数据总量: %s\n", converter.ConvertFileSize(totalSize))
fmt.Printf("任务结束, 时间: %s, 数据总量: %s\n", time.Since(startTime), converter.ConvertFileSize(totalSize))
if len(failedList) != 0 {
fmt.Printf("以下文件下载失败: \n")
tb := pcstable.NewTable(os.Stdout)
for k := range failedList {
tb.Append([]string{strconv.Itoa(k), failedList[k]})
}
tb.Render()
}
}

// RunLocateDownload 执行获取直链
Expand Down
29 changes: 17 additions & 12 deletions internal/pcscommand/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ type utask struct {
savePath string
}

// SumOption 计算文件摘要值配置
type SumOption struct {
// SumConfig 计算文件摘要值配置
type SumConfig struct {
IsMD5Sum bool
IsSliceMD5Sum bool
IsCRC32Sum bool
}

// // UploadOptions 上传文件可选项
// type UploadOptions struct {
// IsRapidUpload bool
// }

// LocalPathInfo 本地文件详情
type LocalPathInfo struct {
Path string // 本地路径
Expand Down Expand Up @@ -119,38 +124,38 @@ func (lp *LocalPathInfo) repeatRead(ws ...io.Writer) {
}

// Sum 计算文件摘要值
func (lp *LocalPathInfo) Sum(opt SumOption) {
func (lp *LocalPathInfo) Sum(cfg SumConfig) {
var (
md5w hash.Hash
crc32w hash.Hash32
)

ws := make([]io.Writer, 0, 2)
if opt.IsMD5Sum {
if cfg.IsMD5Sum {
md5w = md5.New()
ws = append(ws, md5w)
}
if opt.IsCRC32Sum {
if cfg.IsCRC32Sum {
crc32w = crc32.NewIEEE()
ws = append(ws, crc32w)
}
if opt.IsSliceMD5Sum {
if cfg.IsSliceMD5Sum {
lp.SliceMD5Sum()
}

lp.repeatRead(ws...)

if opt.IsMD5Sum {
if cfg.IsMD5Sum {
lp.MD5 = md5w.Sum(nil)
}
if opt.IsCRC32Sum {
if cfg.IsCRC32Sum {
lp.CRC32 = crc32w.Sum32()
}
}

// Md5Sum 获取文件的 md5 值
func (lp *LocalPathInfo) Md5Sum() {
lp.Sum(SumOption{
lp.Sum(SumConfig{
IsMD5Sum: true,
})
}
Expand Down Expand Up @@ -184,7 +189,7 @@ md5sum:

// Crc32Sum 获取文件的 crc32 值
func (lp *LocalPathInfo) Crc32Sum() {
lp.Sum(SumOption{
lp.Sum(SumConfig{
IsCRC32Sum: true,
})
}
Expand Down Expand Up @@ -477,7 +482,7 @@ func RunUpload(localPaths []string, savePath string) {
}

// GetFileSum 获取文件的大小, md5, 前256KB切片的 md5, crc32
func GetFileSum(localPath string, opt *SumOption) (lp *LocalPathInfo, err error) {
func GetFileSum(localPath string, cfg *SumConfig) (lp *LocalPathInfo, err error) {
file, err := os.Open(localPath)
if err != nil {
return nil, err
Expand All @@ -499,7 +504,7 @@ func GetFileSum(localPath string, opt *SumOption) (lp *LocalPathInfo, err error)
Length: fileStat.Size(),
}

lp.Sum(*opt)
lp.Sum(*cfg)

return lp, nil
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ func main() {
}

for k, filePath := range c.Args() {
lp, err := pcscommand.GetFileSum(filePath, &pcscommand.SumOption{
lp, err := pcscommand.GetFileSum(filePath, &pcscommand.SumConfig{
IsMD5Sum: true,
IsCRC32Sum: true,
IsSliceMD5Sum: true,
Expand Down
95 changes: 54 additions & 41 deletions requester/downloader/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,66 +333,38 @@ func (mt *Monitor) Execute(cancelCtx context.Context) {
// 速度减慢或者全部失败, 开始监控
isLeftWorkersAllFailed := mt.IsLeftWorkersAllFailed()
if mt.status.SpeedsPerSecond() < mt.status.MaxSpeeds()/20 || isLeftWorkersAllFailed {
pcsverbose.Verbosef("DEBUG: monitor: start reload.\n")
if isLeftWorkersAllFailed {
pcsverbose.Verbosef("DEBUG: monitor: All workers failed\n")
}
mt.status.ResetMaxSpeeds() //清空统计

for _, worker := range mt.workers {
// 重设长时间无响应, 和下载速度为 0 的线程
go func(worker *Worker) {
if atomic.LoadInt32(&reloadNum) > maxReloadNumFunc() { //达到最大重载次数
return
}

if worker.Completed() {
return
}

// 忽略正在写入数据到硬盘的
// 过滤速度有变化的线程
status := worker.GetStatus()
speeds := worker.GetSpeedsPerSecond()
if speeds != 0 {
return
}

switch status.StatusCode() {
case StatusCodePending, StatusCodeReseted:
fallthrough
case StatusCodeWaitToWrite: // 正在写入数据
fallthrough
case StatusCodePaused: // 已暂停
// 忽略, 返回
return
}

atomic.AddInt32(&reloadNum, 1) // 增加重载次数

// 重设连接
pcsverbose.Verbosef("MONITER: worker reload, worker id: %d\n", worker.ID())
worker.Reset()
}(worker)
}
// 先进行动态分配线程

//下载快完成了, 动态分配线程
if float64(mt.status.Downloaded()) > float64(mt.status.TotalSize())*0.7 {
pcsverbose.Verbosef("DEBUG: monitor: start duplicate.\n")

var (
dwg = sync.WaitGroup{}
dupNum int32
)
for k := range mt.workers {
if mt.workers[k] == nil {
continue
}
//动态分配线程

dwg.Add(1)
go func(worker *Worker) {
//过滤速度为0的worker
if worker.GetSpeedsPerSecond() == 0 {
return
}
defer dwg.Done()

mt.dymanicMu.Lock()
defer mt.dymanicMu.Unlock()

if atomic.LoadInt32(&dupNum) >= 32 {
return
}

// 筛选空闲的Worker
avaliableWorker := mt.GetAvaliableWorker()
if avaliableWorker == nil || worker == avaliableWorker { // 没有空的
Expand All @@ -408,6 +380,8 @@ func (mt *Monitor) Execute(cancelCtx context.Context) {
return
}

atomic.AddInt32(&dupNum, 1)

// 折半

avaliableWorkerRange := avaliableWorker.GetRange()
Expand All @@ -423,7 +397,46 @@ func (mt *Monitor) Execute(cancelCtx context.Context) {
time.Sleep(10 * time.Microsecond)
}(mt.workers[k])
} //end for
dwg.Wait()
} // end if 1

// 重设长时间无响应, 和下载速度为 0 的线程
pcsverbose.Verbosef("DEBUG: monitor: start reload.\n")
for _, worker := range mt.workers {
go func(worker *Worker) {
if atomic.LoadInt32(&reloadNum) > maxReloadNumFunc() { //达到最大重载次数
return
}

if worker.Completed() {
return
}

// 忽略正在写入数据到硬盘的
// 过滤速度有变化的线程
status := worker.GetStatus()
speeds := worker.GetSpeedsPerSecond()
if speeds != 0 {
return
}

switch status.StatusCode() {
case StatusCodePending, StatusCodeReseted:
fallthrough
case StatusCodeWaitToWrite: // 正在写入数据
fallthrough
case StatusCodePaused: // 已暂停
// 忽略, 返回
return
}

atomic.AddInt32(&reloadNum, 1) // 增加重载次数

// 重设连接
pcsverbose.Verbosef("MONITER: worker reload, worker id: %d\n", worker.ID())
worker.Reset()
}(worker)
} // end for
} // end if 2
} //end select
} //end for
Expand Down

0 comments on commit 6ab5bc4

Please sign in to comment.