From da34d4128728ee01bdab8532102091237957cadf Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 28 Apr 2023 14:21:40 +1000 Subject: [PATCH] feat(http): refactor http & graphsync specific pieces to "TransportProtocol" iface --- pkg/retriever/graphsyncretriever.go | 266 ++++++++++++++ pkg/retriever/httpretriever.go | 177 ++++++++++ pkg/retriever/httputils.go | 354 ------------------- pkg/retriever/parallelpeerretriever.go | 462 ++++++------------------- 4 files changed, 556 insertions(+), 703 deletions(-) create mode 100644 pkg/retriever/graphsyncretriever.go create mode 100644 pkg/retriever/httpretriever.go delete mode 100644 pkg/retriever/httputils.go diff --git a/pkg/retriever/graphsyncretriever.go b/pkg/retriever/graphsyncretriever.go new file mode 100644 index 00000000..f9df71c2 --- /dev/null +++ b/pkg/retriever/graphsyncretriever.go @@ -0,0 +1,266 @@ +package retriever + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/benbjohnson/clock" + datatransfer "github.com/filecoin-project/go-data-transfer/v2" + retrievaltypes "github.com/filecoin-project/go-retrieval-types" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lassie/pkg/events" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + "github.com/ipni/go-libipni/metadata" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multicodec" + "go.uber.org/multierr" +) + +type GraphsyncClient interface { + Connect(ctx context.Context, peerAddr peer.AddrInfo) error + RetrieveFromPeer( + ctx context.Context, + linkSystem ipld.LinkSystem, + peerID peer.ID, + proposal *retrievaltypes.DealProposal, + selector ipld.Node, + maxLinks uint64, + eventsCallback datatransfer.Subscriber, + gracefulShutdownRequested <-chan struct{}, + ) (*types.RetrievalStats, error) +} + +var _ TransportProtocol = &ProtocolGraphsync{} + +type ProtocolGraphsync struct { + Client GraphsyncClient +} + +// NewGraphsyncRetriever makes a new CandidateRetriever for Graphsync retrievals +// (transport-graphsync-filecoinv1). +func NewGraphsyncRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client GraphsyncClient) types.CandidateRetriever { + return ¶llelPeerRetriever{ + Protocol: &ProtocolGraphsync{ + Client: client, + }, + GetStorageProviderTimeout: getStorageProviderTimeout, + Clock: clock.New(), + QueueInitialPause: 2 * time.Millisecond, + } +} + +func (pg ProtocolGraphsync) Code() multicodec.Code { + return multicodec.TransportGraphsyncFilecoinv1 +} + +func (pg ProtocolGraphsync) GetMergedMetadata(cid cid.Cid, currentMetadata, newMetadata metadata.Protocol) metadata.Protocol { + gsNewMetadata, ok := newMetadata.(*metadata.GraphsyncFilecoinV1) + if !ok { + gsNewMetadata = &metadata.GraphsyncFilecoinV1{PieceCID: cid} + } + if currentMetadata != nil { // seen this candidate before + if !ok { + return currentMetadata + } + gsCurrentMetadata := currentMetadata.(*metadata.GraphsyncFilecoinV1) + if !graphsyncMetadataCompare(gsNewMetadata, gsCurrentMetadata, false) { + return currentMetadata // old one is better + } + } + return gsNewMetadata +} + +// graphsyncMetadataCompare compares two metadata.GraphsyncFilecoinV1s and +// returns true if the first is preferable to the second. +func graphsyncMetadataCompare(a, b *metadata.GraphsyncFilecoinV1, defaultValue bool) bool { + // prioritize verified deals over not verified deals + if a.VerifiedDeal != b.VerifiedDeal { + return a.VerifiedDeal + } + + // prioritize fast retrievel over not fast retrieval + if a.FastRetrieval != b.FastRetrieval { + return a.FastRetrieval + } + + return defaultValue +} + +func (pg ProtocolGraphsync) CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool { + gsmda := mda.(*metadata.GraphsyncFilecoinV1) + gsmdb := mdb.(*metadata.GraphsyncFilecoinV1) + return graphsyncMetadataCompare(gsmda, gsmdb, a.Duration < b.Duration) +} + +func (pg *ProtocolGraphsync) Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) error { + return pg.Client.Connect(ctx, candidate.MinerPeer) +} + +func (pg *ProtocolGraphsync) Retrieve( + ctx context.Context, + retrieval *retrieval, + session *retrievalSession, + phaseStartTime time.Time, + timeout time.Duration, + candidate types.RetrievalCandidate, +) (*types.RetrievalStats, error) { + + eventsCallback := makeEventsCallback( + session, + retrieval.parallelPeerRetriever.Clock, + retrieval.request.RetrievalID, + phaseStartTime, + candidate, + ) + return pg.retrievalPhase( + ctx, + retrieval, + timeout, + candidate, + eventsCallback, + ) +} + +func makeEventsCallback( + session *retrievalSession, + clock clock.Clock, + retrievalId types.RetrievalID, + phaseStartTime time.Time, + candidate types.RetrievalCandidate) datatransfer.Subscriber { + + var receivedFirstByte bool + return func(event datatransfer.Event, channelState datatransfer.ChannelState) { + switch event.Code { + case datatransfer.Open: + session.sendEvent(events.Proposed(clock.Now(), retrievalId, phaseStartTime, candidate)) + case datatransfer.NewVoucherResult: + lastVoucher := channelState.LastVoucherResult() + resType, err := retrievaltypes.DealResponseFromNode(lastVoucher.Voucher) + if err != nil { + return + } + if resType.Status == retrievaltypes.DealStatusAccepted { + session.sendEvent(events.Accepted(clock.Now(), retrievalId, phaseStartTime, candidate)) + } + case datatransfer.DataReceivedProgress: + if !receivedFirstByte { + receivedFirstByte = true + session.sendEvent(events.FirstByte(clock.Now(), retrievalId, phaseStartTime, candidate)) + } + } + } +} + +func (pg *ProtocolGraphsync) retrievalPhase( + ctx context.Context, + retrieval *retrieval, + timeout time.Duration, + candidate types.RetrievalCandidate, + eventsCallback datatransfer.Subscriber, +) (*types.RetrievalStats, error) { + + ss := "*" + selector := retrieval.request.GetSelector() + if !ipld.DeepEqual(selector, selectorparse.CommonSelector_ExploreAllRecursively) { + byts, err := ipld.Encode(selector, dagjson.Encode) + if err != nil { + return nil, err + } + ss = string(byts) + } + + log.Infof( + "Attempting retrieval from SP %s for %s (with selector: [%s])", + candidate.MinerPeer.ID, + candidate.RootCid, + ss, + ) + + params, err := retrievaltypes.NewParamsV1(big.Zero(), 0, 0, selector, nil, big.Zero()) + if err != nil { + return nil, multierr.Append(multierr.Append(ErrRetrievalFailed, ErrProposalCreationFailed), err) + } + proposal := &retrievaltypes.DealProposal{ + PayloadCID: candidate.RootCid, + ID: retrievaltypes.DealID(dealIdGen.Next()), + Params: params, + } + + retrieveCtx, retrieveCancel := context.WithCancel(ctx) + defer retrieveCancel() + + var lastBytesReceived uint64 + var doneLk sync.Mutex + var done, timedOut bool + var lastBytesReceivedTimer, gracefulShutdownTimer *clock.Timer + + gracefulShutdownChan := make(chan struct{}) + + // Start the timeout tracker only if retrieval timeout isn't 0 + if timeout != 0 { + lastBytesReceivedTimer = retrieval.parallelPeerRetriever.Clock.AfterFunc(timeout, func() { + doneLk.Lock() + done = true + timedOut = true + doneLk.Unlock() + + gracefulShutdownChan <- struct{}{} + gracefulShutdownTimer = retrieval.parallelPeerRetriever.Clock.AfterFunc(1*time.Minute, retrieveCancel) + }) + } + + eventsSubscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if event.Code == datatransfer.DataReceivedProgress { + if lastBytesReceivedTimer != nil { + doneLk.Lock() + if !done { + if lastBytesReceived != channelState.Received() { + lastBytesReceivedTimer.Reset(timeout) + lastBytesReceived = channelState.Received() + } + } + doneLk.Unlock() + } + } + eventsCallback(event, channelState) + } + + stats, err := pg.Client.RetrieveFromPeer( + retrieveCtx, + retrieval.request.LinkSystem, + candidate.MinerPeer.ID, + proposal, + selector, + uint64(retrieval.request.MaxBlocks), + eventsSubscriber, + gracefulShutdownChan, + ) + + if timedOut { + return nil, multierr.Append(ErrRetrievalFailed, + fmt.Errorf( + "%w after %s", + ErrRetrievalTimedOut, + timeout, + ), + ) + } + + if lastBytesReceivedTimer != nil { + lastBytesReceivedTimer.Stop() + } + if gracefulShutdownTimer != nil { + gracefulShutdownTimer.Stop() + } + + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrRetrievalFailed, err) + } + return stats, nil +} diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go new file mode 100644 index 00000000..7b211105 --- /dev/null +++ b/pkg/retriever/httpretriever.go @@ -0,0 +1,177 @@ +package retriever + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/benbjohnson/clock" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipni/go-libipni/metadata" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multicodec" +) + +var ( + ErrHttpSelectorRequest = errors.New("HTTP retrieval for an explicit selector request") + ErrNoHttpForPeer = errors.New("no HTTP url for peer") + ErrBadPathForRequest = errors.New("bad path for request") +) + +var _ TransportProtocol = &ProtocolHttp{} + +type ProtocolHttp struct { + Client *http.Client + + req *http.Request + resp *http.Response +} + +// NewHttpRetriever makes a new CandidateRetriever for verified CAR HTTP +// retrievals (transport-ipfs-gateway-http). +func NewHttpRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client) types.CandidateRetriever { + return ¶llelPeerRetriever{ + Protocol: &ProtocolHttp{ + Client: client, + }, + GetStorageProviderTimeout: getStorageProviderTimeout, + Clock: clock.New(), + QueueInitialPause: 2 * time.Millisecond, + } +} + +func (ph ProtocolHttp) Code() multicodec.Code { + return multicodec.TransportIpfsGatewayHttp +} + +func (ph ProtocolHttp) GetMergedMetadata(cid cid.Cid, currentMetadata, newMetadata metadata.Protocol) metadata.Protocol { + return &metadata.IpfsGatewayHttp{} +} + +func (ph ProtocolHttp) CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool { + // we only have duration .. currently + return a.Duration < b.Duration +} + +func (ph *ProtocolHttp) Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) error { + // we currently start a full HTTP request during "connect" phase, which means + // we'll hit all candidates in parallel before proceeding to read the body + // of ones we choose, one by one, until we get success. This may not be + // optimal for servers that have to queue with a body, and may also result in + // timeouts for body reading when we fail on one and move to another. + // This may all need to move into the Retrieve() call and Connect() be a noop. + var err error + ph.req, err = makeRequest(ctx, retrieval.request, candidate) + if err == nil { + ph.resp, err = ph.Client.Do(ph.req) + } + return err +} + +func (ph *ProtocolHttp) Retrieve( + ctx context.Context, + retrieval *retrieval, + session *retrievalSession, + phaseStartTime time.Time, + timeout time.Duration, + candidate types.RetrievalCandidate, +) (*types.RetrievalStats, error) { + + defer ph.resp.Body.Close() + return readBody(candidate.RootCid, candidate.MinerPeer.ID, ph.resp.Body, retrieval.request.LinkSystem.StorageWriteOpener) +} + +func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (*http.Request, error) { + candidateURL, err := candidate.ToURL() + fmt.Println("candidateURL", candidateURL, "err", err, "candidate", candidate) + if err != nil { + log.Warnf("Couldn't construct a url for miner %s: %v", candidate.MinerPeer.ID, err) + return nil, fmt.Errorf("%w: %v", ErrNoHttpForPeer, err) + } + + path, err := request.GetUrlPath() + if err != nil { + log.Warnf("Couldn't construct a url path for request: %v", err) + return nil, fmt.Errorf("%w: %v", ErrBadPathForRequest, err) + } + + reqURL := fmt.Sprintf("%s/ipfs/%s%s", candidateURL, request.Cid, path) + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + log.Warnf("Couldn't construct a http request %s: %v", candidate.MinerPeer.ID, err) + return nil, fmt.Errorf("%w for peer %s: %v", ErrBadPathForRequest, candidate.MinerPeer.ID, err) + } + req.Header.Add("Accept", request.Scope.AcceptHeader()) + + return req, nil +} + +func readBody(rootCid cid.Cid, peerId peer.ID, body io.ReadCloser, writer linking.BlockWriteOpener) (*types.RetrievalStats, error) { + startTime := time.Now() // TODO: consider whether this should be at connection time rather than body read time + cr := &countingReader{Reader: body} + cbr, err := car.NewBlockReader(cr) + if err != nil { + return nil, err + } + ttfb := time.Since(startTime) + + var blockCount uint64 + for { + blk, err := cbr.Next() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return nil, err + } + w, d, err := writer(ipld.LinkContext{}) + if err != nil { + return nil, err + } + _, err = w.Write(blk.RawData()) + if err != nil { + return nil, err + } + err = d(cidlink.Link{Cid: blk.Cid()}) + if err != nil { + return nil, err + } + blockCount++ + } + + duration := time.Since(startTime) + speed := uint64(float64(cr.total) / duration.Seconds()) + + return &types.RetrievalStats{ + RootCid: rootCid, + StorageProviderId: peerId, + Size: cr.total, + Blocks: blockCount, + Duration: duration, + AverageSpeed: speed, + TotalPayment: big.Zero(), + NumPayments: 0, + AskPrice: big.Zero(), + TimeToFirstByte: ttfb, + }, nil +} + +type countingReader struct { + io.Reader + total uint64 +} + +func (cr *countingReader) Read(p []byte) (n int, err error) { + n, err = cr.Reader.Read(p) + cr.total += uint64(n) + return +} diff --git a/pkg/retriever/httputils.go b/pkg/retriever/httputils.go deleted file mode 100644 index d7341e2c..00000000 --- a/pkg/retriever/httputils.go +++ /dev/null @@ -1,354 +0,0 @@ -package retriever - -import ( - "context" - "errors" - "fmt" - "io" - "net/http" - "time" - - "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/lassie/pkg/types" - "github.com/ipfs/go-cid" - "github.com/ipld/go-car/v2" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/linking" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p/core/peer" -) - -var ( - ErrHttpSelectorRequest = errors.New("HTTP retrieval for an explicit selector request") - ErrNoHttpForPeer = errors.New("no HTTP url for peer") - ErrBadPathForRequest = errors.New("bad path for request") - //ErrHttpConstruction = errors.New("failed to construct a HTTP request") -) - -/* -type HTTPRetriever struct { - Client *http.Client - Clock clock.Clock -} - -func NewHTTPRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client) *HTTPRetriever { - return &HTTPRetriever{ - Client: client, - Clock: clock.New(), - } -} - -type httpRetrieval struct { - *HTTPRetriever - ctx context.Context - request types.RetrievalRequest - resultChan chan retrievalResult - finishChan chan struct{} - eventsCallback func(types.RetrievalEvent) -} - -func (cfg *HTTPRetriever) Retrieve( - ctx context.Context, - retrievalRequest types.RetrievalRequest, - eventsCallback func(types.RetrievalEvent), -) types.CandidateRetrieval { - - if cfg == nil { - cfg = &HTTPRetriever{} - } - if eventsCallback == nil { - eventsCallback = func(re types.RetrievalEvent) {} - } - - // state local to this CID's retrieval - return &httpRetrieval{ - ctx: ctx, - request: retrievalRequest, - resultChan: make(chan retrievalResult), - finishChan: make(chan struct{}), - eventsCallback: eventsCallback, - } -} - -func (retrieval *httpRetrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsyncCandidates) (*types.RetrievalStats, error) { - if retrieval.request.Selector != nil { // explicit selector, can't handle these here - return nil, ErrHttpSelectorRequest - } - - ctx, cancelCtx := context.WithCancel(retrieval.ctx) - - // start retrievals - phaseStartTime := retrieval.Clock.Now() - var waitGroup sync.WaitGroup - waitGroup.Add(1) - go func() { - defer waitGroup.Done() - for { - hasCandidates, candidates, err := asyncCandidates.Next(ctx) - if !hasCandidates || err != nil { - return - } - for _, candidate := range candidates { - // Start the retrieval with the candidate - waitGroup.Add(1) - localCandidate := candidate - go func() { - defer waitGroup.Done() - runHttpRetrievalCandidate( - ctx, - retrieval.HTTPRetriever, - retrieval.request, - nil, // TODO: nope: retrieval.HTTPRetriever.Client, - retrieval.request.LinkSystem, - retrieval, //TODO:Nope - phaseStartTime, - localCandidate, - ) - }() - } - } - }() - - finishAll := make(chan struct{}, 1) - go func() { - waitGroup.Wait() - close(retrieval.resultChan) - finishAll <- struct{}{} - }() - - stats, err := retrieval.collectHTTPResults(ctx) - fmt.Println("cancelling context") - cancelCtx() - // optimistically try to wait for all routines to finish - select { - case <-finishAll: - case <-time.After(100 * time.Millisecond): - log.Warn("Unable to successfully cancel all retrieval attempts withing 100ms") - } - return stats, err -} - -func (r *httpRetrieval) collectHTTPResults(ctx context.Context) (*types.RetrievalStats, error) { - var retrievalErrors error - for { - select { - case result, ok := <-r.resultChan: - // have we got all responses but no success? - if !ok { - // we failed, and got only retrieval errors - fmt.Println("closed resultChan") - retrievalErrors = multierr.Append(retrievalErrors, ErrAllRetrievalsFailed) - return nil, retrievalErrors - } - - fmt.Println("result.Event", result.Event) - if result.Event != nil { - r.eventsCallback(*result.Event) - break - } - if result.Err != nil { - fmt.Println("result.Err", result.Err) - retrievalErrors = multierr.Append(retrievalErrors, result.Err) - } - if result.Stats != nil { - return result.Stats, nil - } - case <-ctx.Done(): - return nil, context.Canceled - } - } -} - -// runHttpRetrievalCandidate is a singular CID:SP retrieval, expected to be run in a goroutine -// and coordinate with other candidate retrievals to block after query phase and -// only attempt one retrieval-proper at a time. -func runHttpRetrievalCandidate( - ctx context.Context, - cfg *HTTPRetriever, - retrievalRequest types.RetrievalRequest, - client GraphsyncClient, - linkSystem ipld.LinkSystem, - retrieval *httpRetrieval, - phaseStartTime time.Time, - candidate types.RetrievalCandidate, -) { - var stats *types.RetrievalStats - var retrievalErr error - - retrieval.sendEvent(events.Started(cfg.Clock.Now(), retrievalRequest.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) - - req, err := makeRequest(ctx, retrievalRequest, candidate) - if err != nil { - retrievalErr = err - retrieval.sendEvent(events.Failed(cfg.Clock.Now(), retrievalRequest.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) - } else { - resp, err := cfg.Client.Do(req) - if err != nil { - if ctx.Err() == nil { // not cancelled, maybe timed out though - log.Warnf("Failed to connect to http %s: %v", candidate.MinerPeer.ID, err) - retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err) - retrieval.sendEvent(events.Failed(cfg.Clock.Now(), retrievalRequest.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) - } - } else { - retrieval.sendEvent(events.Connected(cfg.Clock.Now(), retrievalRequest.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) - - // TODO: move this properly to fb - if retrieval.canSendResult() { // move on to retrieval phase - retrieval.sendEvent(events.FirstByte(cfg.Clock.Now(), retrievalRequest.RetrievalID, phaseStartTime, candidate)) - } - - defer resp.Body.Close() - - if _, err := readBody(candidate.RootCid, candidate.MinerPeer.ID, resp.Body, retrievalRequest.LinkSystem.StorageWriteOpener); err != nil { - retrievalErr = err - retrieval.sendEvent(events.Failed(cfg.Clock.Now(), retrievalRequest.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) - } else { - retrieval.sendEvent(events.Success( - cfg.Clock.Now(), - retrievalRequest.RetrievalID, - phaseStartTime, - candidate, - stats.Size, - stats.Blocks, - stats.Duration, - stats.TotalPayment, - 0, - )) - } - } - } - - fmt.Println("retrievalErr", retrievalErr, "stats", stats, "ctx.Err()", ctx.Err()) - if retrieval.canSendResult() { - if retrievalErr != nil { - if ctx.Err() != nil { // cancelled, don't report the error - retrieval.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID}) - } else { - // an error of some kind to report - retrieval.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Err: retrievalErr}) - } - } else { // success, we have stats and no errors - retrieval.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Stats: stats}) - } - } else { - fmt.Println("can't send result") - } // else nothing to do, we were cancelled -} -*/ - -func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (*http.Request, error) { - candidateURL, err := candidate.ToURL() - fmt.Println("candidateURL", candidateURL, "err", err, "candidate", candidate) - if err != nil { - log.Warnf("Couldn't construct a url for miner %s: %v", candidate.MinerPeer.ID, err) - return nil, fmt.Errorf("%w: %v", ErrNoHttpForPeer, err) - } - - path, err := request.GetUrlPath() - if err != nil { - log.Warnf("Couldn't construct a url path for request: %v", err) - return nil, fmt.Errorf("%w: %v", ErrBadPathForRequest, err) - } - - reqURL := fmt.Sprintf("%s/ipfs/%s%s", candidateURL, request.Cid, path) - req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) - if err != nil { - log.Warnf("Couldn't construct a http request %s: %v", candidate.MinerPeer.ID, err) - return nil, fmt.Errorf("%w for peer %s: %v", ErrBadPathForRequest, candidate.MinerPeer.ID, err) - } - req.Header.Add("Accept", request.Scope.AcceptHeader()) - - return req, nil -} - -func readBody(rootCid cid.Cid, peerId peer.ID, body io.ReadCloser, writer linking.BlockWriteOpener) (*types.RetrievalStats, error) { - startTime := time.Now() // TODO: consider whether this should be at connection time rather than body read time - cr := &countingReader{Reader: body} - cbr, err := car.NewBlockReader(cr) - if err != nil { - return nil, err - } - ttfb := time.Since(startTime) - - var blockCount uint64 - for { - blk, err := cbr.Next() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return nil, err - } - w, d, err := writer(ipld.LinkContext{}) - if err != nil { - return nil, err - } - _, err = w.Write(blk.RawData()) - if err != nil { - return nil, err - } - err = d(cidlink.Link{Cid: blk.Cid()}) - if err != nil { - return nil, err - } - blockCount++ - } - - duration := time.Since(startTime) - speed := uint64(float64(cr.total) / duration.Seconds()) - - return &types.RetrievalStats{ - RootCid: rootCid, - StorageProviderId: peerId, - Size: cr.total, - Blocks: blockCount, - Duration: duration, - AverageSpeed: speed, - TotalPayment: big.Zero(), - NumPayments: 0, - AskPrice: big.Zero(), - TimeToFirstByte: ttfb, - }, nil -} - -type countingReader struct { - io.Reader - total uint64 -} - -func (cr *countingReader) Read(p []byte) (n int, err error) { - n, err = cr.Reader.Read(p) - cr.total += uint64(n) - return -} - -/* -// sendResult will only send a result to the parent goroutine if a retrieval has -// finished (likely by a success), otherwise it will send the result -func (retrieval *httpRetrieval) sendResult(result retrievalResult) bool { - select { - case <-retrieval.finishChan: - return false - case retrieval.resultChan <- result: - if result.Stats != nil { - close(retrieval.finishChan) - } - } - return true -} - -func (retrieval *httpRetrieval) sendEvent(event types.RetrievalEvent) { - retrieval.sendResult(retrievalResult{PeerID: event.StorageProviderId(), Event: &event}) -} - -// canSendResult will indicate whether a result is likely to be accepted (true) -// or whether the retrieval is already finished (likely by a success) -func (retrieval *httpRetrieval) canSendResult() bool { - select { - case <-retrieval.finishChan: - return false - default: - } - return true -} -*/ diff --git a/pkg/retriever/parallelpeerretriever.go b/pkg/retriever/parallelpeerretriever.go index 6e4e531a..c17571e6 100644 --- a/pkg/retriever/parallelpeerretriever.go +++ b/pkg/retriever/parallelpeerretriever.go @@ -4,96 +4,58 @@ import ( "context" "errors" "fmt" - "net/http" "sync" "time" "github.com/benbjohnson/clock" - datatransfer "github.com/filecoin-project/go-data-transfer/v2" - retrievaltypes "github.com/filecoin-project/go-retrieval-types" - "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lassie/pkg/events" "github.com/filecoin-project/lassie/pkg/retriever/prioritywaitqueue" "github.com/filecoin-project/lassie/pkg/types" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/codec/dagjson" - selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + "github.com/ipfs/go-cid" "github.com/ipni/go-libipni/metadata" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multicodec" "go.uber.org/multierr" ) -type CounterCallback func(int) error -type CandidateCallback func(types.RetrievalCandidate) error -type CandidateErrorCallback func(types.RetrievalCandidate, error) - type GetStorageProviderTimeout func(peer peer.ID) time.Duration -type GraphsyncClient interface { - Connect(ctx context.Context, peerAddr peer.AddrInfo) error - RetrieveFromPeer( +// TransportProtocol implements the protocol-specific portions of a parallel- +// peer retriever. It is responsible for communicating with individual peers +// and also bears responsibility for some of the peer-selection logic. +type TransportProtocol interface { + Code() multicodec.Code + GetMergedMetadata(cid cid.Cid, currentMetadata, newMetadata metadata.Protocol) metadata.Protocol + CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool + Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) error + Retrieve( ctx context.Context, - linkSystem ipld.LinkSystem, - peerID peer.ID, - proposal *retrievaltypes.DealProposal, - selector ipld.Node, - maxLinks uint64, - eventsCallback datatransfer.Subscriber, - gracefulShutdownRequested <-chan struct{}, + retrieval *retrieval, + session *retrievalSession, + phaseStartTime time.Time, + timeout time.Duration, + candidate types.RetrievalCandidate, ) (*types.RetrievalStats, error) } -type TransportProtocol multicodec.Code - -const ( - ProtocolGraphsync = TransportProtocol(multicodec.TransportGraphsyncFilecoinv1) - ProtocolHttp = TransportProtocol(multicodec.TransportIpfsGatewayHttp) -) - +var _ types.CandidateRetriever = (*parallelPeerRetriever)(nil) +var _ types.CandidateRetrieval = (*retrieval)(nil) + +// parallelPeerRetriever is an abstract utility type that implements a retrieval +// flow that retrieves from multiple peers separately but needs to manage that +// flow in parallel. Unlike a Bitswap retrieval, in which all peers are managed +// as a group and may all be collectively retrieved from, parallelPeerRetriever +// is used for protocols where a retrieval is performed directly with a single +// peer, but many peers may be prioritised for attempts. +// +// The concrete implementation of the retrieval protocol is provided by the +// TransportProtocol interface. parallelPeerRetriever manages candidates and +// the parallel+serial flow of connect+retrieve. type parallelPeerRetriever struct { Protocol TransportProtocol GetStorageProviderTimeout GetStorageProviderTimeout Clock clock.Clock QueueInitialPause time.Duration - GraphsyncClient GraphsyncClient - HttpClient *http.Client -} - -func NewGraphsyncRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client GraphsyncClient) types.CandidateRetriever { - return ¶llelPeerRetriever{ - Protocol: ProtocolGraphsync, - GetStorageProviderTimeout: getStorageProviderTimeout, - Clock: clock.New(), - QueueInitialPause: 2 * time.Millisecond, - GraphsyncClient: client, - } -} - -func NewHttpRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client) types.CandidateRetriever { - return ¶llelPeerRetriever{ - Protocol: ProtocolGraphsync, - GetStorageProviderTimeout: getStorageProviderTimeout, - Clock: clock.New(), - QueueInitialPause: 2 * time.Millisecond, - HttpClient: client, - } -} - -// graphsyncMetadataCompare compares two metadata.GraphsyncFilecoinV1s and -// returns true if the first is preferable to the second. -func graphsyncMetadataCompare(a, b *metadata.GraphsyncFilecoinV1, defaultValue bool) bool { - // prioritize verified deals over not verified deals - if a.VerifiedDeal != b.VerifiedDeal { - return a.VerifiedDeal - } - - // prioritize fast retrievel over not fast retrieval - if a.FastRetrieval != b.FastRetrieval { - return a.FastRetrieval - } - - return defaultValue } // retrieval handles state on a per-retrieval (across multiple candidates) basis @@ -106,11 +68,6 @@ type retrieval struct { candidateMetdataLk sync.RWMutex } -type connectCandidate struct { - PeerID peer.ID - Duration time.Duration -} - type retrievalResult struct { PeerID peer.ID PhaseStart time.Time @@ -119,15 +76,18 @@ type retrievalResult struct { Err error } -type candidateRetrieval struct { +// connectCandidate is used for the prioritywaitqueue +type connectCandidate struct { + PeerID peer.ID + Duration time.Duration +} + +type retrievalSession struct { waitQueue prioritywaitqueue.PriorityWaitQueue[connectCandidate] resultChan chan retrievalResult finishChan chan struct{} } -// RetrieveFromCandidates performs a retrieval for a given CID by querying the indexer, then -// attempting to query all candidates and attempting to perform a full retrieval -// from the best and fastest storage provider as the queries are received. func (cfg *parallelPeerRetriever) Retrieve( ctx context.Context, retrievalRequest types.RetrievalRequest, @@ -151,58 +111,37 @@ func (cfg *parallelPeerRetriever) Retrieve( } } -// candidateCompare compares two connectCandidates and returns true if the first is -// preferable to the second. This is used for the PriorityWaitQueue that will -// prioritise execution of retrievals if two candidates are available to compare -// at the same time. -func (r *retrieval) candidateCompare(a, b connectCandidate) bool { - r.candidateMetdataLk.RLock() - defer r.candidateMetdataLk.RUnlock() - - mdA, ok := r.candidateMetadata[a.PeerID] - if !ok { - return false - } - - mdB, ok := r.candidateMetadata[b.PeerID] - if !ok { - return true - } - - return graphsyncMetadataCompare(mdA.(*metadata.GraphsyncFilecoinV1), mdB.(*metadata.GraphsyncFilecoinV1), a.Duration < b.Duration) -} +func (retrieval *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsyncCandidates) (*types.RetrievalStats, error) { + ctx, cancelCtx := context.WithCancel(retrieval.ctx) -func (r *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsyncCandidates) (*types.RetrievalStats, error) { - ctx, cancelCtx := context.WithCancel(r.ctx) - - retrieval := &candidateRetrieval{ + session := &retrievalSession{ resultChan: make(chan retrievalResult), finishChan: make(chan struct{}), waitQueue: prioritywaitqueue.New( - r.candidateCompare, - prioritywaitqueue.WithInitialPause[connectCandidate](r.QueueInitialPause), - prioritywaitqueue.WithClock[connectCandidate](r.Clock), + retrieval.candidateCompare, + prioritywaitqueue.WithInitialPause[connectCandidate](retrieval.QueueInitialPause), + prioritywaitqueue.WithClock[connectCandidate](retrieval.Clock), ), } // start retrievals - phaseStartTime := r.Clock.Now() + phaseStartTime := retrieval.Clock.Now() var waitGroup sync.WaitGroup waitGroup.Add(1) go func() { defer waitGroup.Done() for { - active, candidates, err := r.filterCandidates(ctx, asyncCandidates) + active, candidates, err := retrieval.filterCandidates(ctx, asyncCandidates) if !active || err != nil { return } for _, candidate := range candidates { - // Start the retrieval with the candidate + // start the retrieval with the candidate candidate := candidate waitGroup.Add(1) go func() { defer waitGroup.Done() - r.runRetrievalCandidate(ctx, retrieval, phaseStartTime, candidate) + retrieval.runRetrievalCandidate(ctx, session, phaseStartTime, candidate) }() } } @@ -211,11 +150,11 @@ func (r *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsy finishAll := make(chan struct{}, 1) go func() { waitGroup.Wait() - close(retrieval.resultChan) + close(session.resultChan) finishAll <- struct{}{} }() - stats, err := collectResults(ctx, retrieval, r.eventsCallback) + stats, err := collectResults(ctx, session, retrieval.eventsCallback) cancelCtx() // optimistically try to wait for all routines to finish select { @@ -226,60 +165,51 @@ func (r *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsy return stats, err } -func (r *retrieval) filterCandidates(ctx context.Context, asyncCandidates types.InboundAsyncCandidates) (bool, []types.RetrievalCandidate, error) { +// candidateCompare compares two connectCandidates and returns true if the first is +// preferable to the second. This is used for the PriorityWaitQueue that will +// prioritise execution of retrievals if two candidates are available to compare +// at the same time. +func (retrieval *retrieval) candidateCompare(a, b connectCandidate) bool { + retrieval.candidateMetdataLk.RLock() + defer retrieval.candidateMetdataLk.RUnlock() + + mdA, ok := retrieval.candidateMetadata[a.PeerID] + if !ok { + return false + } + + mdB, ok := retrieval.candidateMetadata[b.PeerID] + if !ok { + return true + } + + return retrieval.Protocol.CompareCandidates(a, b, mdA, mdB) +} + +// filterCandidates is needed because we can receive duplicate candidates in +// a single batch or across different batches. We need to filter out duplicates +// and make sure we have the best information from candidate metadata across +// those duplicates. +func (retrieval *retrieval) filterCandidates(ctx context.Context, asyncCandidates types.InboundAsyncCandidates) (bool, []types.RetrievalCandidate, error) { filtered := make([]types.RetrievalCandidate, 0) active, candidates, err := asyncCandidates.Next(ctx) if !active || err != nil { return false, nil, err } - for _, candidate := range candidates { - // Grab the current candidate's metadata, adding the piece cid to the metadata if the type assertion failed - candidateMetadata := candidate.Metadata.Get(multicodec.Code(r.Protocol)) - var hasMetadata bool - switch r.Protocol { - case ProtocolGraphsync: - _, hasMetadata = candidateMetadata.(*metadata.GraphsyncFilecoinV1) - if !hasMetadata { - candidateMetadata = &metadata.GraphsyncFilecoinV1{PieceCID: r.request.Cid} - } - case ProtocolHttp: - _, ok := candidateMetadata.(*metadata.IpfsGatewayHttp) - if !ok { - candidateMetadata = &metadata.IpfsGatewayHttp{} - } - default: - panic(fmt.Sprintf("unexpected protocol: %v", r.Protocol)) - } - - // Check if we already started a retrieval for this candidate - r.candidateMetdataLk.RLock() - currMetadata, seenCandidate := r.candidateMetadata[candidate.MinerPeer.ID] - r.candidateMetdataLk.RUnlock() - - // Don't start a new retrieval if we've seen this candidate before, - // but update the metadata if it's more favorable - if r.Protocol == ProtocolGraphsync && seenCandidate { - // We know the metadata is not as favorable if the type assertion failed - // since the metadata will be the zero value of graphsync metadata - if !hasMetadata { - continue - } + retrieval.candidateMetdataLk.Lock() + defer retrieval.candidateMetdataLk.Unlock() - if graphsyncMetadataCompare(candidateMetadata.(*metadata.GraphsyncFilecoinV1), currMetadata.(*metadata.GraphsyncFilecoinV1), false) { - r.candidateMetdataLk.Lock() - r.candidateMetadata[candidate.MinerPeer.ID] = candidateMetadata - r.candidateMetdataLk.Unlock() - } - continue + for _, candidate := range candidates { + // update or add new candidate metadata + currMetadata, seenCandidate := retrieval.candidateMetadata[candidate.MinerPeer.ID] + newMetadata := candidate.Metadata.Get(multicodec.Code(retrieval.Protocol.Code())) + candidateMetadata := retrieval.Protocol.GetMergedMetadata(retrieval.request.Cid, currMetadata, newMetadata) + retrieval.candidateMetadata[candidate.MinerPeer.ID] = candidateMetadata + // if it's a new candidate, include it, otherwise don't start a new retrieval for it + if !seenCandidate { + filtered = append(filtered, candidate) } - - // Track the candidate metadata - r.candidateMetdataLk.Lock() - r.candidateMetadata[candidate.MinerPeer.ID] = candidateMetadata - r.candidateMetdataLk.Unlock() - - filtered = append(filtered, candidate) } return true, filtered, nil @@ -288,7 +218,7 @@ func (r *retrieval) filterCandidates(ctx context.Context, asyncCandidates types. // collectResults is responsible for receiving query errors, retrieval errors // and retrieval results and aggregating into an appropriate return of either // a complete RetrievalStats or an bundled multi-error -func collectResults(ctx context.Context, retrieval *candidateRetrieval, eventsCallback func(types.RetrievalEvent)) (*types.RetrievalStats, error) { +func collectResults(ctx context.Context, retrieval *retrievalSession, eventsCallback func(types.RetrievalEvent)) (*types.RetrievalStats, error) { var retrievalErrors error for { select { @@ -319,90 +249,60 @@ func collectResults(ctx context.Context, retrieval *candidateRetrieval, eventsCa // runRetrievalCandidate is a singular CID:SP retrieval, expected to be run in a goroutine // and coordinate with other candidate retrievals to block after query phase and // only attempt one retrieval-proper at a time. -func (r *retrieval) runRetrievalCandidate( +func (retrieval *retrieval) runRetrievalCandidate( ctx context.Context, - retrieval *candidateRetrieval, + session *retrievalSession, phaseStartTime time.Time, candidate types.RetrievalCandidate, ) { var timeout time.Duration - if r.parallelPeerRetriever.GetStorageProviderTimeout != nil { - timeout = r.parallelPeerRetriever.GetStorageProviderTimeout(candidate.MinerPeer.ID) + if retrieval.parallelPeerRetriever.GetStorageProviderTimeout != nil { + timeout = retrieval.parallelPeerRetriever.GetStorageProviderTimeout(candidate.MinerPeer.ID) } var stats *types.RetrievalStats var retrievalErr error var done func() - retrieval.sendEvent(events.Started(r.parallelPeerRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) + session.sendEvent(events.Started(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) connectCtx := ctx if timeout != 0 { var timeoutFunc func() - connectCtx, timeoutFunc = r.parallelPeerRetriever.Clock.WithDeadline(ctx, r.parallelPeerRetriever.Clock.Now().Add(timeout)) + connectCtx, timeoutFunc = retrieval.parallelPeerRetriever.Clock.WithDeadline(ctx, retrieval.parallelPeerRetriever.Clock.Now().Add(timeout)) defer timeoutFunc() } - // Setup - var err error - var httpResp *http.Response - switch r.Protocol { - case ProtocolGraphsync: - err = r.GraphsyncClient.Connect(connectCtx, candidate.MinerPeer) - case ProtocolHttp: - // TODO: consider whether we want to be starting the requests in parallel here - // and just deferring body read for the serial portion below, we may be causing - // upstream providers to do more work than necessary if we give up on them - // because we get a complete response from another candidate. - var httpReq *http.Request - httpReq, err = makeRequest(ctx, r.request, candidate) - if err == nil { - httpResp, err = r.HttpClient.Do(httpReq) - } - default: - panic(fmt.Sprintf("unexpected protocol: %v", r.Protocol)) - } + // Setup in parallel + err := retrieval.Protocol.Connect(connectCtx, retrieval, candidate) if err != nil { if ctx.Err() == nil { // not cancelled, maybe timed out though log.Warnf("Failed to connect to SP %s: %v", candidate.MinerPeer.ID, err) retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err) - retrieval.sendEvent(events.Failed(r.parallelPeerRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + session.sendEvent(events.Failed(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) } } else { - retrieval.sendEvent(events.Connected(r.parallelPeerRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) - // if query is successful, then wait for priority and execute retrieval - done = retrieval.waitQueue.Wait(connectCandidate{ + session.sendEvent(events.Connected(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) + + // Form a queue and run retrieval in serial + done = session.waitQueue.Wait(connectCandidate{ PeerID: candidate.MinerPeer.ID, - Duration: r.parallelPeerRetriever.Clock.Now().Sub(phaseStartTime), + Duration: retrieval.parallelPeerRetriever.Clock.Now().Sub(phaseStartTime), }) - if retrieval.canSendResult() { // move on to retrieval phase - switch r.Protocol { - case ProtocolGraphsync: - eventsCallback := retrieval.makeEventsCallback(r.parallelPeerRetriever.Clock, r.request.RetrievalID, phaseStartTime, candidate) - stats, retrievalErr = r.retrievalPhase( - ctx, - timeout, - candidate, - eventsCallback, - ) - case ProtocolHttp: - defer httpResp.Body.Close() - stats, retrievalErr = readBody(candidate.RootCid, candidate.MinerPeer.ID, httpResp.Body, r.request.LinkSystem.StorageWriteOpener) - default: - panic(fmt.Sprintf("unexpected protocol: %v", r.Protocol)) - } + if session.canSendResult() { // move on to retrieval phase + stats, retrievalErr = retrieval.Protocol.Retrieve(ctx, retrieval, session, phaseStartTime, timeout, candidate) if retrievalErr != nil { msg := retrievalErr.Error() if errors.Is(retrievalErr, ErrRetrievalTimedOut) { msg = fmt.Sprintf("timeout after %s", timeout) } - retrieval.sendEvent(events.Failed(r.parallelPeerRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, msg)) + session.sendEvent(events.Failed(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, msg)) } else { - retrieval.sendEvent(events.Success( - r.parallelPeerRetriever.Clock.Now(), - r.request.RetrievalID, + session.sendEvent(events.Success( + retrieval.parallelPeerRetriever.Clock.Now(), + retrieval.request.RetrievalID, phaseStartTime, candidate, stats.Size, @@ -416,16 +316,16 @@ func (r *retrieval) runRetrievalCandidate( } // else we didn't get to retrieval phase because we were cancelled } - if retrieval.canSendResult() { + if session.canSendResult() { if retrievalErr != nil { if ctx.Err() != nil { // cancelled, don't report the error - retrieval.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID}) + session.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID}) } else { // an error of some kind to report - retrieval.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Err: retrievalErr}) + session.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Err: retrievalErr}) } } else { // success, we have stats and no errors - retrieval.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Stats: stats}) + session.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Stats: stats}) } } // else nothing to do, we were cancelled @@ -434,38 +334,9 @@ func (r *retrieval) runRetrievalCandidate( } } -func (retrieval *candidateRetrieval) makeEventsCallback( - clock clock.Clock, - retrievalId types.RetrievalID, - phaseStartTime time.Time, - candidate types.RetrievalCandidate) datatransfer.Subscriber { - - var receivedFirstByte bool - return func(event datatransfer.Event, channelState datatransfer.ChannelState) { - switch event.Code { - case datatransfer.Open: - retrieval.sendEvent(events.Proposed(clock.Now(), retrievalId, phaseStartTime, candidate)) - case datatransfer.NewVoucherResult: - lastVoucher := channelState.LastVoucherResult() - resType, err := retrievaltypes.DealResponseFromNode(lastVoucher.Voucher) - if err != nil { - return - } - if resType.Status == retrievaltypes.DealStatusAccepted { - retrieval.sendEvent(events.Accepted(clock.Now(), retrievalId, phaseStartTime, candidate)) - } - case datatransfer.DataReceivedProgress: - if !receivedFirstByte { - receivedFirstByte = true - retrieval.sendEvent(events.FirstByte(clock.Now(), retrievalId, phaseStartTime, candidate)) - } - } - } -} - // canSendResult will indicate whether a result is likely to be accepted (true) // or whether the retrieval is already finished (likely by a success) -func (retrieval *candidateRetrieval) canSendResult() bool { +func (retrieval *retrievalSession) canSendResult() bool { select { case <-retrieval.finishChan: return false @@ -476,7 +347,7 @@ func (retrieval *candidateRetrieval) canSendResult() bool { // sendResult will only send a result to the parent goroutine if a retrieval has // finished (likely by a success), otherwise it will send the result -func (retrieval *candidateRetrieval) sendResult(result retrievalResult) bool { +func (retrieval *retrievalSession) sendResult(result retrievalResult) bool { select { case <-retrieval.finishChan: return false @@ -491,113 +362,6 @@ func (retrieval *candidateRetrieval) sendResult(result retrievalResult) bool { return true } -func (retrieval *candidateRetrieval) sendEvent(event types.RetrievalEvent) { - retrieval.sendResult(retrievalResult{PeerID: event.StorageProviderId(), Event: &event}) -} - -func (r *retrieval) retrievalPhase( - ctx context.Context, - timeout time.Duration, - candidate types.RetrievalCandidate, - eventsCallback datatransfer.Subscriber, -) (*types.RetrievalStats, error) { - - ss := "*" - selector := r.request.GetSelector() - if !ipld.DeepEqual(selector, selectorparse.CommonSelector_ExploreAllRecursively) { - byts, err := ipld.Encode(selector, dagjson.Encode) - if err != nil { - return nil, err - } - ss = string(byts) - } - - log.Infof( - "Attempting retrieval from SP %s for %s (with selector: [%s])", - candidate.MinerPeer.ID, - candidate.RootCid, - ss, - ) - - params, err := retrievaltypes.NewParamsV1(big.Zero(), 0, 0, selector, nil, big.Zero()) - if err != nil { - return nil, multierr.Append(multierr.Append(ErrRetrievalFailed, ErrProposalCreationFailed), err) - } - proposal := &retrievaltypes.DealProposal{ - PayloadCID: candidate.RootCid, - ID: retrievaltypes.DealID(dealIdGen.Next()), - Params: params, - } - - retrieveCtx, retrieveCancel := context.WithCancel(ctx) - defer retrieveCancel() - - var lastBytesReceived uint64 - var doneLk sync.Mutex - var done, timedOut bool - var lastBytesReceivedTimer, gracefulShutdownTimer *clock.Timer - - gracefulShutdownChan := make(chan struct{}) - - // Start the timeout tracker only if retrieval timeout isn't 0 - if timeout != 0 { - lastBytesReceivedTimer = r.parallelPeerRetriever.Clock.AfterFunc(timeout, func() { - doneLk.Lock() - done = true - timedOut = true - doneLk.Unlock() - - gracefulShutdownChan <- struct{}{} - gracefulShutdownTimer = r.parallelPeerRetriever.Clock.AfterFunc(1*time.Minute, retrieveCancel) - }) - } - - eventsSubscriber := func(event datatransfer.Event, channelState datatransfer.ChannelState) { - if event.Code == datatransfer.DataReceivedProgress { - if lastBytesReceivedTimer != nil { - doneLk.Lock() - if !done { - if lastBytesReceived != channelState.Received() { - lastBytesReceivedTimer.Reset(timeout) - lastBytesReceived = channelState.Received() - } - } - doneLk.Unlock() - } - } - eventsCallback(event, channelState) - } - - stats, err := r.GraphsyncClient.RetrieveFromPeer( - retrieveCtx, - r.request.LinkSystem, - candidate.MinerPeer.ID, - proposal, - selector, - uint64(r.request.MaxBlocks), - eventsSubscriber, - gracefulShutdownChan, - ) - - if timedOut { - return nil, multierr.Append(ErrRetrievalFailed, - fmt.Errorf( - "%w after %s", - ErrRetrievalTimedOut, - timeout, - ), - ) - } - - if lastBytesReceivedTimer != nil { - lastBytesReceivedTimer.Stop() - } - if gracefulShutdownTimer != nil { - gracefulShutdownTimer.Stop() - } - - if err != nil { - return nil, fmt.Errorf("%w: %v", ErrRetrievalFailed, err) - } - return stats, nil +func (session *retrievalSession) sendEvent(event types.RetrievalEvent) { + session.sendResult(retrievalResult{PeerID: event.StorageProviderId(), Event: &event}) }