diff --git a/rbdeal/deal_repair.go b/rbdeal/deal_repair.go index e6d3a78..f44a092 100644 --- a/rbdeal/deal_repair.go +++ b/rbdeal/deal_repair.go @@ -7,7 +7,6 @@ import ( "github.com/lotus-web3/ribs/ributil" "golang.org/x/xerrors" "io" - "net/http" "os" "path" "path/filepath" @@ -198,63 +197,11 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro log.Errorw("attempting http repair retrieval", "url", reqUrl.String(), "group", group, "provider", candidate.Provider) - // custom http client allowing for conn deadlines - - /*dialer := &net.Dialer{ - Timeout: 20 * time.Second, - } - - var nc net.Conn - - // Create a custom HTTP client - client := &http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - if nc != nil { - return nil, xerrors.Errorf("one connection already made") - } - - conn, err := dialer.DialContext(ctx, network, addr) - if err != nil { - return nil, err - } - - nc = conn - - // Set a deadline for the whole operation, including reading the response - if err := conn.SetDeadline(time.Now().Add(30 * time.Second)); err != nil { - return nil, xerrors.Errorf("set deadline: %w", err) - } - - return conn, nil - }, - }, - }*/ - // make the request!! - req, err := http.NewRequestWithContext(ctx, "GET", reqUrl.String(), nil) - if err != nil { - //return xerrors.Errorf("new request: %w", err) - log.Errorw("failed to create request", "err", err, "provider", candidate.Provider) - continue - } - - // set content length to gm.CarSize - req.Header.Set("Content-Length", fmt.Sprintf("%d", gm.CarSize)) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - //return xerrors.Errorf("do request: %w", err) - log.Errorw("failed to do request", "err", err, "provider", candidate.Provider) - continue - } - - if resp.StatusCode != 200 { - //return xerrors.Errorf("http status: %d", resp.StatusCode) - log.Errorw("http status", "status", resp.StatusCode, "provider", candidate.Provider) - continue - } + rc := ributil.RobustGet(reqUrl.String(), gm.CarSize, func() *ributil.RateCounter { + return r.repairFetchCounters.Get(group) + }) r.updateRepairStats(workerID, func(r *ribs2.RepairJob) { r.FetchProgress = 0 @@ -291,23 +238,23 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro }() cc := new(ributil.DataCidWriter) - commdReader := io.TeeReader(resp.Body, cc) + commdReader := io.TeeReader(rc, cc) _, err = io.Copy(f, commdReader) done() if err != nil { _ = f.Close() _ = os.Remove(groupFile) - _ = resp.Body.Close() + _ = rc.Close() return xerrors.Errorf("copy response body: %w", err) } if err := f.Close(); err != nil { - _ = resp.Body.Close() + _ = rc.Close() return xerrors.Errorf("close group file: %w", err) } - if err := resp.Body.Close(); err != nil { + if err := rc.Close(); err != nil { return xerrors.Errorf("close response body: %w", err) } diff --git a/ributil/robusthttp.go b/ributil/robusthttp.go index 72687ec..ea0aabd 100644 --- a/ributil/robusthttp.go +++ b/ributil/robusthttp.go @@ -55,6 +55,7 @@ func (r *robustHttpResponse) Read(p []byte) (n int, err error) { continue } if n == 0 { + r.curCloser.Close() return 0, xerrors.Errorf("read 0 bytes") } @@ -136,13 +137,23 @@ func (r *robustHttpResponse) startReq() error { rw := NewRateEnforcingReader(dlRead, rc, reqTxIdleTimeout) r.cur = rw - r.curCloser = resp.Body + r.curCloser = funcCloser(func() error { + rc.release() + return req.Body.Close() + }) return nil } -func RobustGet(url string, dataSize int64) io.ReadCloser { +type funcCloser func() error + +func (fc funcCloser) Close() error { + return fc() +} + +func RobustGet(url string, dataSize int64, rcf func() *RateCounter) io.ReadCloser { return &robustHttpResponse{ + getRC: rcf, url: url, dataSize: dataSize, }