From 154fd633dc3009d94ae4ced77116c7ea13b6c7ab Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 20 Apr 2023 11:43:21 +0200 Subject: [PATCH] First pass at adapting graphsync to http --- go.mod | 2 +- go.sum | 4 +- pkg/lassie/lassie.go | 3 + pkg/retriever/httpretriever.go | 260 +++++++++++++++++++++++++++++++++ pkg/types/request.go | 4 + 5 files changed, 270 insertions(+), 3 deletions(-) create mode 100644 pkg/retriever/httpretriever.go diff --git a/go.mod b/go.mod index f809a66e..6391e80e 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/libp2p/go-libp2p-testing v0.12.0 github.com/mitchellh/go-server-timing v1.0.1 github.com/multiformats/go-multiaddr v0.8.0 - github.com/multiformats/go-multicodec v0.8.1 + github.com/multiformats/go-multicodec v0.8.2-0.20230419141826-bd7ef45ca89b github.com/multiformats/go-multihash v0.2.1 github.com/prometheus/client_golang v1.14.0 github.com/stretchr/testify v1.8.2 diff --git a/go.sum b/go.sum index 4a56df66..12290bda 100644 --- a/go.sum +++ b/go.sum @@ -571,8 +571,8 @@ github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/g github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= github.com/multiformats/go-multibase v0.1.1 h1:3ASCDsuLX8+j4kx58qnJ4YFq/JWTJpCyDW27ztsVTOI= github.com/multiformats/go-multibase v0.1.1/go.mod h1:ZEjHE+IsUrgp5mhlEAYjMtZwK1k4haNkcaPg9aoe1a8= -github.com/multiformats/go-multicodec v0.8.1 h1:ycepHwavHafh3grIbR1jIXnKCsFm0fqsfEOsJ8NtKE8= -github.com/multiformats/go-multicodec v0.8.1/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= +github.com/multiformats/go-multicodec v0.8.2-0.20230419141826-bd7ef45ca89b h1:MEoPnsX6gSJh8Esy/sYKIkLlToWF0XWtYBlsr/lDG9U= +github.com/multiformats/go-multicodec v0.8.2-0.20230419141826-bd7ef45ca89b/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.0.9/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= diff --git a/pkg/lassie/lassie.go b/pkg/lassie/lassie.go index c2f7f841..84b4780d 100644 --- a/pkg/lassie/lassie.go +++ b/pkg/lassie/lassie.go @@ -2,6 +2,7 @@ package lassie import ( "context" + "net/http" "time" "github.com/filecoin-project/lassie/pkg/indexerlookup" @@ -107,6 +108,8 @@ func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error TempDir: cfg.TempDir, Concurrency: cfg.BitswapConcurrency, }) + case multicodec.TransportIpfsGatewayHttp: + protocolRetrievers[protocol] = retriever.NewHTTPRetriever(session.GetStorageProviderTimeout, *http.DefaultClient) } } diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go new file mode 100644 index 00000000..e5ccbee4 --- /dev/null +++ b/pkg/retriever/httpretriever.go @@ -0,0 +1,260 @@ +package retriever + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/filecoin-project/lassie/pkg/events" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipni/go-libipni/maurl" + "go.uber.org/multierr" +) + +type HTTPRetriever struct { + Client http.Client + Clock clock.Clock +} + +func NewHTTPRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client http.Client) *HTTPRetriever { + return &HTTPRetriever{ + Client: client, + } +} + +type httpRetrieval struct { + *HTTPRetriever + ctx context.Context + request types.RetrievalRequest + eventChan chan retrievalResult + eventsCallback func(types.RetrievalEvent) +} + +func (cfg *HTTPRetriever) Retrieve( + ctx context.Context, + retrievalRequest types.RetrievalRequest, + eventsCallback func(types.RetrievalEvent), +) types.CandidateRetrieval { + + if cfg == nil { + cfg = &HTTPRetriever{} + } + if eventsCallback == nil { + eventsCallback = func(re types.RetrievalEvent) {} + } + + // state local to this CID's retrieval + return &httpRetrieval{ + HTTPRetriever: cfg, + ctx: ctx, + request: retrievalRequest, + eventChan: make(chan retrievalResult), + eventsCallback: eventsCallback, + } +} + +func (r *httpRetrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsyncCandidates) (*types.RetrievalStats, error) { + ctx, cancelCtx := context.WithCancel(r.ctx) + + // start retrievals + phaseStartTime := r.Clock.Now() + var waitGroup sync.WaitGroup + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + for { + hasCandidates, candidates, err := asyncCandidates.Next(ctx) + if !hasCandidates || err != nil { + return + } + for _, candidate := range candidates { + // Start the retrieval with the candidate + waitGroup.Add(1) + localCandidate := candidate + go func() { + defer waitGroup.Done() + r.doHTTPDownload(ctx, phaseStartTime, localCandidate) + }() + } + } + }() + + finishAll := make(chan struct{}, 1) + go func() { + waitGroup.Wait() + close(r.eventChan) + finishAll <- struct{}{} + }() + + stats, err := r.collectHTTPResults(ctx) + cancelCtx() + // optimistically try to wait for all routines to finish + select { + case <-finishAll: + case <-time.After(100 * time.Millisecond): + log.Warn("Unable to successfully cancel all retrieval attempts withing 100ms") + } + return stats, err +} + +func (r *httpRetrieval) collectHTTPResults(ctx context.Context) (*types.RetrievalStats, error) { + var retrievalErrors error + for { + select { + case result, ok := <-r.eventChan: + // have we got all responses but no success? + if !ok { + // we failed, and got only retrieval errors + retrievalErrors = multierr.Append(retrievalErrors, ErrAllRetrievalsFailed) + return nil, retrievalErrors + } + + if result.Event != nil { + r.eventsCallback(*result.Event) + break + } + if result.Err != nil { + retrievalErrors = multierr.Append(retrievalErrors, result.Err) + } + if result.Stats != nil { + return result.Stats, nil + } + case <-ctx.Done(): + return nil, context.Canceled + } + } +} + +func (r *httpRetrieval) doHTTPDownload( + ctx context.Context, + phaseStartTime time.Time, + candidate types.RetrievalCandidate, +) { + var stats *types.RetrievalStats + var retrievalErr error + + r.sendEvent(events.Started(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) + + var candidateURL *url.URL + var err error + for _, addr := range candidate.MinerPeer.Addrs { + candidateURL, err = maurl.ToURL(addr) + if err == nil { + break + } + } + if err != nil { + log.Warnf("Couldn't construct a url for miner %s: %v", candidate.MinerPeer.ID, err) + retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err) + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + return + } + + reqURL := fmt.Sprintf("%s/ipfs/%s%s", candidateURL, r.request.Cid, r.request.Path) + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + log.Warnf("Couldn't construct a http request %s: %v", candidate.MinerPeer.ID, err) + retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err) + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + return + } + if r.request.Scope == types.CarScopeBlock { + req.Header.Add("Accept", "application/vnd.ipld.block") + } else { + req.Header.Add("Accept", "application/vnd.ipld.car") + } + + resp, err := r.Client.Do(req) + + if err != nil { + if ctx.Err() == nil { // not cancelled, maybe timed out though + log.Warnf("Failed to connect to http %s: %v", candidate.MinerPeer.ID, err) + retrievalErr = fmt.Errorf("%w: %v", ErrConnectFailed, err) + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + } + } else { + r.sendEvent(events.Connected(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) + + if r.canSendResult() { // move on to retrieval phase + r.sendEvent(events.FirstByte(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, candidate)) + } + } + defer resp.Body.Close() + + cbr, err := car.NewBlockReader(resp.Body) + if err != nil { + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + return + } + + for { + blk, err := cbr.Next() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + return + } + w, d, err := r.request.LinkSystem.StorageWriteOpener(ipld.LinkContext{}) + if err != nil { + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + return + } + _, err = w.Write(blk.RawData()) + if err != nil { + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + return + } + err = d(cidlink.Link{Cid: blk.Cid()}) + if err != nil { + r.sendEvent(events.Failed(r.HTTPRetriever.Clock.Now(), r.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrievalErr.Error())) + return + } + } + + r.sendEvent(events.Success( + r.HTTPRetriever.Clock.Now(), + r.request.RetrievalID, + phaseStartTime, + candidate, + stats.Size, + stats.Blocks, + stats.Duration, + stats.TotalPayment, + 0, + ), + ) + + if r.canSendResult() { + r.sendResult(retrievalResult{PhaseStart: phaseStartTime, PeerID: candidate.MinerPeer.ID, Stats: stats}) + } +} + +// sendResult will only send a result to the parent goroutine if a retrieval has +// finished (likely by a success), otherwise it will send the result +func (retrieval *httpRetrieval) sendResult(result retrievalResult) bool { + select { + case <-retrieval.ctx.Done(): + return false + case retrieval.eventChan <- result: + } + return true +} + +func (retrieval *httpRetrieval) sendEvent(event types.RetrievalEvent) { + retrieval.sendResult(retrievalResult{PeerID: event.StorageProviderId(), Event: &event}) +} + +func (retrieval *httpRetrieval) canSendResult() bool { + return retrieval.ctx.Err() != nil +} diff --git a/pkg/types/request.go b/pkg/types/request.go index 4b486c4e..7f07c803 100644 --- a/pkg/types/request.go +++ b/pkg/types/request.go @@ -52,6 +52,8 @@ type RetrievalRequest struct { Cid cid.Cid LinkSystem ipld.LinkSystem Selector ipld.Node + Path string + Scope CarScope Protocols []multicodec.Code PreloadLinkSystem ipld.LinkSystem MaxBlocks uint64 @@ -97,6 +99,8 @@ func NewRequestForPath(store ipldstorage.WritableStorage, cid cid.Cid, path stri RetrievalID: retrievalId, Cid: cid, Selector: selector, + Path: fmt.Sprintf("%s?car-scope=%s", path, carScope), + Scope: carScope, LinkSystem: linkSystem, }, nil }