Skip to content

Commit

Permalink
wire up robusthttp
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Nov 12, 2023
1 parent 0ba2a7b commit 926acc7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 62 deletions.
67 changes: 7 additions & 60 deletions rbdeal/deal_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/lotus-web3/ribs/ributil"
"golang.org/x/xerrors"
"io"
"net/http"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -198,63 +197,11 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro

log.Errorw("attempting http repair retrieval", "url", reqUrl.String(), "group", group, "provider", candidate.Provider)

// custom http client allowing for conn deadlines

/*dialer := &net.Dialer{
Timeout: 20 * time.Second,
}
var nc net.Conn
// Create a custom HTTP client
client := &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
if nc != nil {
return nil, xerrors.Errorf("one connection already made")
}
conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
nc = conn
// Set a deadline for the whole operation, including reading the response
if err := conn.SetDeadline(time.Now().Add(30 * time.Second)); err != nil {
return nil, xerrors.Errorf("set deadline: %w", err)
}
return conn, nil
},
},
}*/

// make the request!!

req, err := http.NewRequestWithContext(ctx, "GET", reqUrl.String(), nil)
if err != nil {
//return xerrors.Errorf("new request: %w", err)
log.Errorw("failed to create request", "err", err, "provider", candidate.Provider)
continue
}

// set content length to gm.CarSize
req.Header.Set("Content-Length", fmt.Sprintf("%d", gm.CarSize))

resp, err := http.DefaultClient.Do(req)
if err != nil {
//return xerrors.Errorf("do request: %w", err)
log.Errorw("failed to do request", "err", err, "provider", candidate.Provider)
continue
}

if resp.StatusCode != 200 {
//return xerrors.Errorf("http status: %d", resp.StatusCode)
log.Errorw("http status", "status", resp.StatusCode, "provider", candidate.Provider)
continue
}
rc := ributil.RobustGet(reqUrl.String(), gm.CarSize, func() *ributil.RateCounter {
return r.repairFetchCounters.Get(group)
})

r.updateRepairStats(workerID, func(r *ribs2.RepairJob) {
r.FetchProgress = 0
Expand Down Expand Up @@ -291,23 +238,23 @@ func (r *ribs) fetchGroupHttp(ctx context.Context, workerID int, group ribs2.Gro
}()

cc := new(ributil.DataCidWriter)
commdReader := io.TeeReader(resp.Body, cc)
commdReader := io.TeeReader(rc, cc)

_, err = io.Copy(f, commdReader)
done()
if err != nil {
_ = f.Close()
_ = os.Remove(groupFile)
_ = resp.Body.Close()
_ = rc.Close()
return xerrors.Errorf("copy response body: %w", err)
}

if err := f.Close(); err != nil {
_ = resp.Body.Close()
_ = rc.Close()
return xerrors.Errorf("close group file: %w", err)
}

if err := resp.Body.Close(); err != nil {
if err := rc.Close(); err != nil {
return xerrors.Errorf("close response body: %w", err)
}

Expand Down
15 changes: 13 additions & 2 deletions ributil/robusthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (r *robustHttpResponse) Read(p []byte) (n int, err error) {
continue
}
if n == 0 {
r.curCloser.Close()
return 0, xerrors.Errorf("read 0 bytes")
}

Expand Down Expand Up @@ -136,13 +137,23 @@ func (r *robustHttpResponse) startReq() error {
rw := NewRateEnforcingReader(dlRead, rc, reqTxIdleTimeout)

r.cur = rw
r.curCloser = resp.Body
r.curCloser = funcCloser(func() error {
rc.release()
return req.Body.Close()
})

return nil
}

func RobustGet(url string, dataSize int64) io.ReadCloser {
type funcCloser func() error

func (fc funcCloser) Close() error {
return fc()
}

func RobustGet(url string, dataSize int64, rcf func() *RateCounter) io.ReadCloser {
return &robustHttpResponse{
getRC: rcf,
url: url,
dataSize: dataSize,
}
Expand Down

0 comments on commit 926acc7

Please sign in to comment.