diff --git a/pkg/internal/testutil/mockclient.go b/pkg/internal/testutil/mockclient.go index 99cdddd7..391bf209 100644 --- a/pkg/internal/testutil/mockclient.go +++ b/pkg/internal/testutil/mockclient.go @@ -83,6 +83,12 @@ func (mc *MockClient) VerifyRetrievalsReceived(ctx context.Context, t *testing.T require.ElementsMatch(t, expectedRetrievals, retrievals) } +func (mc *MockClient) VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []RemoteStats) { +} + +func (mc *MockClient) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) { +} + func (mc *MockClient) VerifyReceivedRetrievalFrom(ctx context.Context, t *testing.T, p peer.ID) ClientRetrievalRequest { for { select { diff --git a/pkg/internal/testutil/verifier.go b/pkg/internal/testutil/verifier.go index 86d71db3..288d8842 100644 --- a/pkg/internal/testutil/verifier.go +++ b/pkg/internal/testutil/verifier.go @@ -7,6 +7,7 @@ import ( "github.com/benbjohnson/clock" "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" ) @@ -15,17 +16,40 @@ type ExpectedActionsAtTime struct { AfterStart time.Duration ReceivedConnections []peer.ID ReceivedRetrievals []peer.ID + ServedRetrievals []RemoteStats + CompletedRetrievals []peer.ID CandidatesDiscovered []DiscoveredCandidate ExpectedEvents []types.RetrievalEvent } +type RemoteStats struct { + Peer peer.ID + Root cid.Cid + ByteCount uint64 + Blocks []cid.Cid +} + type RetrievalVerifier struct { ExpectedSequence []ExpectedActionsAtTime } type RunRetrieval func(cb func(types.RetrievalEvent)) (*types.RetrievalStats, error) -func (rv RetrievalVerifier) RunWithVerification(ctx context.Context, t *testing.T, clock *clock.Mock, mockClient *MockClient, mockCandidateFinder *MockCandidateFinder, runRetrievals []RunRetrieval) []types.RetrievalResult { +type VerifierClient interface { + VerifyConnectionsReceived(ctx context.Context, t *testing.T, expectedConnections []peer.ID) + VerifyRetrievalsReceived(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) + VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []RemoteStats) + VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) +} + +func (rv RetrievalVerifier) RunWithVerification(ctx context.Context, + t *testing.T, + clock *clock.Mock, + client VerifierClient, + mockCandidateFinder *MockCandidateFinder, + runRetrievals []RunRetrieval, +) []types.RetrievalResult { + resultChan := make(chan types.RetrievalResult, len(runRetrievals)) asyncCollectingEventsListener := NewAsyncCollectingEventsListener(ctx) for _, runRetrieval := range runRetrievals { @@ -44,9 +68,11 @@ func (rv RetrievalVerifier) RunWithVerification(ctx context.Context, t *testing. if mockCandidateFinder != nil { mockCandidateFinder.VerifyCandidatesDiscovered(ctx, t, expectedActionsAtTime.CandidatesDiscovered) } - if mockClient != nil { - mockClient.VerifyConnectionsReceived(ctx, t, expectedActionsAtTime.ReceivedConnections) - mockClient.VerifyRetrievalsReceived(ctx, t, expectedActionsAtTime.ReceivedRetrievals) + if client != nil { + client.VerifyConnectionsReceived(ctx, t, expectedActionsAtTime.ReceivedConnections) + client.VerifyRetrievalsReceived(ctx, t, expectedActionsAtTime.ReceivedRetrievals) + client.VerifyRetrievalsServed(ctx, t, expectedActionsAtTime.ServedRetrievals) + client.VerifyRetrievalsCompleted(ctx, t, expectedActionsAtTime.CompletedRetrievals) } } results := make([]types.RetrievalResult, 0, len(runRetrievals)) diff --git a/pkg/retriever/bitswapretriever_test.go b/pkg/retriever/bitswapretriever_test.go index 4b7458bb..d95cf949 100644 --- a/pkg/retriever/bitswapretriever_test.go +++ b/pkg/retriever/bitswapretriever_test.go @@ -354,7 +354,7 @@ func TestBitswapRetriever(t *testing.T) { clock.Set(time.Now()) retrievalResult := make(chan types.RetrievalResult, 1) go func() { - stats, err := retrieval1.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, spreadCandidates(expectedCandidates[rid1]))) + stats, err := retrieval1.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, expectedCandidates[rid1])) retrievalResult <- types.RetrievalResult{Stats: stats, Err: err} }() if len(expectedCandidates[rid1]) > 0 { @@ -384,7 +384,7 @@ func TestBitswapRetriever(t *testing.T) { clock.Set(time.Now()) unlockExchange = make(chan struct{}) go func() { - stats, err := retrieval2.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, spreadCandidates(expectedCandidates[rid2]))) + stats, err := retrieval2.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, expectedCandidates[rid2])) retrievalResult <- types.RetrievalResult{Stats: stats, Err: err} }() if len(expectedCandidates[rid2]) > 0 { diff --git a/pkg/retriever/graphsyncretriever_test.go b/pkg/retriever/graphsyncretriever_test.go index ca614a51..d550054c 100644 --- a/pkg/retriever/graphsyncretriever_test.go +++ b/pkg/retriever/graphsyncretriever_test.go @@ -521,7 +521,7 @@ func TestRetrievalRacing(t *testing.T) { Cid: cid.Undef, RetrievalID: retrievalID, LinkSystem: cidlink.DefaultLinkSystem(), - }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, spreadCandidates(candidates))) + }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, candidates)) }}) require.Len(t, results, 1) stats, err := results[0].Stats, results[0].Err @@ -650,22 +650,22 @@ func TestMultipleRetrievals(t *testing.T) { Cid: cid1, RetrievalID: retrievalID, LinkSystem: cidlink.DefaultLinkSystem(), - }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, spreadCandidates([]types.RetrievalCandidate{ + }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, []types.RetrievalCandidate{ types.NewRetrievalCandidate(peer.ID("foo"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{}), types.NewRetrievalCandidate(peer.ID("bar"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{}), types.NewRetrievalCandidate(peer.ID("baz"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{}), - }))) + })) }, func(cb func(types.RetrievalEvent)) (*types.RetrievalStats, error) { return cfg.Retrieve(context.Background(), types.RetrievalRequest{ Cid: cid2, RetrievalID: retrievalID, LinkSystem: cidlink.DefaultLinkSystem(), - }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, spreadCandidates([]types.RetrievalCandidate{ + }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, []types.RetrievalCandidate{ types.NewRetrievalCandidate(peer.ID("bang"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{}), types.NewRetrievalCandidate(peer.ID("boom"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{}), types.NewRetrievalCandidate(peer.ID("bing"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{}), - }))) + })) }}) require.Len(t, results, 2) stats, err := results[0].Stats, results[0].Err @@ -702,7 +702,7 @@ func TestRetrievalSelector(t *testing.T) { LinkSystem: cidlink.DefaultLinkSystem(), Selector: selector, }, nil) - stats, err := retrieval.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, [][]types.RetrievalCandidate{{types.NewRetrievalCandidate(peer.ID("foo"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{})}})) + stats, err := retrieval.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, []types.RetrievalCandidate{types.NewRetrievalCandidate(peer.ID("foo"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{})})) require.NoError(t, err) require.NotNil(t, stats) require.Equal(t, mockClient.GetRetrievalReturns()["foo"].ResultStats, stats) @@ -797,13 +797,13 @@ func TestDuplicateRetreivals(t *testing.T) { Cid: cid1, RetrievalID: retrievalID, LinkSystem: cidlink.DefaultLinkSystem(), - }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, spreadCandidates([]types.RetrievalCandidate{ + }, cb).RetrieveFromAsyncCandidates(makeAsyncCandidates(t, []types.RetrievalCandidate{ types.NewRetrievalCandidate(peer.ID("foo"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{PieceCID: cid.Cid{}, VerifiedDeal: false, FastRetrieval: false}), types.NewRetrievalCandidate(peer.ID("baz"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{PieceCID: cid.Cid{}, VerifiedDeal: false, FastRetrieval: false}), types.NewRetrievalCandidate(peer.ID("bar"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{PieceCID: cid.Cid{}, VerifiedDeal: false, FastRetrieval: false}), types.NewRetrievalCandidate(peer.ID("bar"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{PieceCID: cid.Cid{}, VerifiedDeal: false, FastRetrieval: true}), types.NewRetrievalCandidate(peer.ID("bar"), nil, cid.Undef, &metadata.GraphsyncFilecoinV1{PieceCID: cid.Cid{}, VerifiedDeal: true, FastRetrieval: false}), - }))) + })) }, }) require.Len(t, results, 1) @@ -814,19 +814,10 @@ func TestDuplicateRetreivals(t *testing.T) { require.Equal(t, mockClient.GetRetrievalReturns()["bar"].ResultStats, stats) } -func spreadCandidates(cands []types.RetrievalCandidate) [][]types.RetrievalCandidate { - // make cands into an slice of slices of candidates where each subslice only has one of the candidates in it - out := make([][]types.RetrievalCandidate, 0) - for _, cand := range cands { - out = append(out, []types.RetrievalCandidate{cand}) - } - return out -} - -func makeAsyncCandidates(t *testing.T, candidates [][]types.RetrievalCandidate) types.InboundAsyncCandidates { +func makeAsyncCandidates(t *testing.T, candidates []types.RetrievalCandidate) types.InboundAsyncCandidates { incoming, outgoing := types.MakeAsyncCandidates(len(candidates)) for _, candidate := range candidates { - err := outgoing.SendNext(context.Background(), candidate) + err := outgoing.SendNext(context.Background(), []types.RetrievalCandidate{candidate}) require.NoError(t, err) } close(outgoing) diff --git a/pkg/retriever/httpretriever_test.go b/pkg/retriever/httpretriever_test.go index 2851c960..41f1092c 100644 --- a/pkg/retriever/httpretriever_test.go +++ b/pkg/retriever/httpretriever_test.go @@ -13,9 +13,11 @@ import ( "github.com/benbjohnson/clock" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lassie/pkg/events" "github.com/filecoin-project/lassie/pkg/internal/testutil" "github.com/filecoin-project/lassie/pkg/retriever" "github.com/filecoin-project/lassie/pkg/types" + "github.com/google/uuid" "github.com/ipfs/go-cid" gstestutil "github.com/ipfs/go-graphsync/testutil" "github.com/ipld/go-car/v2" @@ -34,6 +36,13 @@ import ( "github.com/stretchr/testify/require" ) +type httpRemote struct { + peer peer.AddrInfo + lsys *linking.LinkSystem + sel ipld.Node + responseDelay time.Duration +} + func TestHTTPRetriever(t *testing.T) { ctx := context.Background() @@ -57,121 +66,122 @@ func TestHTTPRetriever(t *testing.T) { cid2 := tbc2.TipLink.(cidlink.Link).Cid cid1Cands := testutil.GenerateRetrievalCandidatesForCID(t, 10, cid1, metadata.IpfsGatewayHttp{}) cid2Cands := testutil.GenerateRetrievalCandidatesForCID(t, 10, cid2, metadata.IpfsGatewayHttp{}) + rid1 := types.RetrievalID(uuid.New()) + rid2 := types.RetrievalID(uuid.New()) remoteBlockDuration := 50 * time.Millisecond + allSelector := selectorparse.CommonSelector_ExploreAllRecursively getTimeout := func(_ peer.ID) time.Duration { return 5 * time.Second } + startTime := time.Now().Add(time.Hour) testCases := []struct { - name string - localLinkSystems map[cid.Cid]*linking.LinkSystem - requestPath []string - requestScope []types.CarScope - remoteLinkSystems map[cid.Cid]*linking.LinkSystem - remoteSelector map[cid.Cid]ipld.Node // selector to run on the remote that matches this path+scope - expectedCandidates map[cid.Cid][][]types.RetrievalCandidate - expectedEvents map[cid.Cid]map[types.EventCode]int - expectedStats map[cid.Cid]*types.RetrievalStats - expectedErrors map[cid.Cid]struct{} - expectedCids map[cid.Cid][]cid.Cid + name string + requests map[cid.Cid]types.RetrievalID + requestPath map[cid.Cid]string + requestScope map[cid.Cid]types.CarScope + remotes map[cid.Cid][]httpRemote + expectedStats map[cid.Cid]types.RetrievalStats + expectedErrors map[cid.Cid]struct{} + expectedCids map[cid.Cid][]cid.Cid + expectSequence []testutil.ExpectedActionsAtTime }{ { - name: "successful full remote fetch, single candidates", - requestPath: []string{"", ""}, - requestScope: []types.CarScope{types.CarScopeAll, types.CarScopeAll}, - remoteSelector: map[cid.Cid]ipld.Node{ - cid1: selectorparse.CommonSelector_ExploreAllRecursively, - cid2: selectorparse.CommonSelector_ExploreAllRecursively, - }, - remoteLinkSystems: map[cid.Cid]*linking.LinkSystem{ - cid1: makeLsys(tbc1.AllBlocks()), - cid2: makeLsys(tbc2.AllBlocks()), - }, - expectedCandidates: map[cid.Cid][][]types.RetrievalCandidate{ - cid1: {cid1Cands[0:1]}, - cid2: {cid2Cands[0:1]}, - }, - expectedEvents: map[cid.Cid]map[types.EventCode]int{ + name: "single full fetch, one peer", + requests: map[cid.Cid]types.RetrievalID{cid1: rid1}, + remotes: map[cid.Cid][]httpRemote{ cid1: { - types.StartedCode: 1, - types.ConnectedCode: 1, - types.FirstByteCode: 1, - types.SuccessCode: 1, - }, - cid2: { - types.StartedCode: 1, - types.ConnectedCode: 1, - types.FirstByteCode: 1, - types.SuccessCode: 1, + { + peer: cid1Cands[0].MinerPeer, + lsys: makeLsys(tbc1.AllBlocks()), + sel: allSelector, + responseDelay: time.Millisecond * 40, + }, }, }, expectedCids: map[cid.Cid][]cid.Cid{ cid1: tbc1Cids, - cid2: tbc2Cids, }, - expectedStats: map[cid.Cid]*types.RetrievalStats{ + expectedStats: map[cid.Cid]types.RetrievalStats{ cid1: { RootCid: cid1, StorageProviderId: cid1Cands[0].MinerPeer.ID, Size: sizeOf(tbc1.AllBlocks()), Blocks: 100, - Duration: remoteBlockDuration * 100, - AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), + Duration: 40*time.Millisecond + remoteBlockDuration*100, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (40*time.Millisecond + remoteBlockDuration*100).Seconds()), + TimeToFirstByte: 40 * time.Millisecond, TotalPayment: big.Zero(), AskPrice: big.Zero(), }, - cid2: { - RootCid: cid2, - StorageProviderId: cid2Cands[0].MinerPeer.ID, - Size: sizeOf(tbc2.AllBlocks()), - Blocks: 100, - Duration: remoteBlockDuration * 100, - AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), - TotalPayment: big.Zero(), - AskPrice: big.Zero(), + }, + expectSequence: []testutil.ExpectedActionsAtTime{ + { + AfterStart: 0, + ExpectedEvents: []types.RetrievalEvent{ + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + }, + }, + { + AfterStart: 0, + ReceivedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + }, + { + AfterStart: time.Millisecond * 40, + ExpectedEvents: []types.RetrievalEvent{ + events.FirstByte(startTime.Add(time.Millisecond*40), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)), + }, + }, + { + AfterStart: time.Millisecond*40 + remoteBlockDuration*100, + ExpectedEvents: []types.RetrievalEvent{ + events.Success(startTime.Add(time.Millisecond*40+remoteBlockDuration*100), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, 40*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), + }, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[0].MinerPeer.ID, + Root: cid1, + ByteCount: sizeOf(tbc1.AllBlocks()), + Blocks: tbc1Cids, + }, + }, + CompletedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, }, }, }, { - name: "successful full remote fetch, multiple candidates, first successful", - requestPath: []string{"", ""}, - requestScope: []types.CarScope{types.CarScopeAll, types.CarScopeAll}, - remoteSelector: map[cid.Cid]ipld.Node{ - cid1: selectorparse.CommonSelector_ExploreAllRecursively, - cid2: selectorparse.CommonSelector_ExploreAllRecursively, - }, - remoteLinkSystems: map[cid.Cid]*linking.LinkSystem{ - cid1: makeLsys(tbc1.AllBlocks()), - cid2: makeLsys(tbc2.AllBlocks()), - }, - expectedCandidates: map[cid.Cid][][]types.RetrievalCandidate{ - cid1: {cid1Cands}, - cid2: {cid2Cands}, - }, - expectedEvents: map[cid.Cid]map[types.EventCode]int{ + name: "two parallel full fetch, small offset, one peer each", + requests: map[cid.Cid]types.RetrievalID{cid1: rid1, cid2: rid2}, + remotes: map[cid.Cid][]httpRemote{ cid1: { - types.StartedCode: 10, - types.ConnectedCode: 10, - types.FirstByteCode: 1, - types.SuccessCode: 1, + { + peer: cid1Cands[0].MinerPeer, + lsys: makeLsys(tbc1.AllBlocks()), + sel: allSelector, + responseDelay: time.Millisecond * 40, + }, }, cid2: { - types.StartedCode: 10, - types.ConnectedCode: 10, - types.FirstByteCode: 1, - types.SuccessCode: 1, + { + peer: cid2Cands[0].MinerPeer, + lsys: makeLsys(tbc2.AllBlocks()), + sel: allSelector, + responseDelay: time.Millisecond * 10, + }, }, }, expectedCids: map[cid.Cid][]cid.Cid{ cid1: tbc1Cids, cid2: tbc2Cids, }, - expectedStats: map[cid.Cid]*types.RetrievalStats{ + expectedStats: map[cid.Cid]types.RetrievalStats{ cid1: { RootCid: cid1, StorageProviderId: cid1Cands[0].MinerPeer.ID, Size: sizeOf(tbc1.AllBlocks()), Blocks: 100, - Duration: remoteBlockDuration * 100, - AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), + Duration: 40*time.Millisecond + remoteBlockDuration*100, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (40*time.Millisecond + remoteBlockDuration*100).Seconds()), + TimeToFirstByte: 40 * time.Millisecond, TotalPayment: big.Zero(), AskPrice: big.Zero(), }, @@ -180,12 +190,70 @@ func TestHTTPRetriever(t *testing.T) { StorageProviderId: cid2Cands[0].MinerPeer.ID, Size: sizeOf(tbc2.AllBlocks()), Blocks: 100, - Duration: remoteBlockDuration * 100, - AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (remoteBlockDuration * 100).Seconds()), + Duration: 10*time.Millisecond + remoteBlockDuration*100, + AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (10*time.Millisecond + remoteBlockDuration*100).Seconds()), + TimeToFirstByte: 10 * time.Millisecond, TotalPayment: big.Zero(), AskPrice: big.Zero(), }, }, + expectSequence: []testutil.ExpectedActionsAtTime{ + { + AfterStart: 0, + ExpectedEvents: []types.RetrievalEvent{ + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Started(startTime, rid2, startTime, types.RetrievalPhase, toCandidate(cid2, cid2Cands[0].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Connected(startTime, rid2, startTime, types.RetrievalPhase, toCandidate(cid2, cid2Cands[0].MinerPeer)), + }, + }, + { + AfterStart: 0, + ReceivedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID, cid2Cands[0].MinerPeer.ID}, + }, + { + AfterStart: time.Millisecond * 10, + ExpectedEvents: []types.RetrievalEvent{ + events.FirstByte(startTime.Add(time.Millisecond*10), rid2, startTime, toCandidate(cid2, cid2Cands[0].MinerPeer)), + }, + }, + { + AfterStart: time.Millisecond * 40, + ExpectedEvents: []types.RetrievalEvent{ + events.FirstByte(startTime.Add(time.Millisecond*40), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)), + }, + }, + { + AfterStart: time.Millisecond*10 + remoteBlockDuration*100, + ExpectedEvents: []types.RetrievalEvent{ + events.Success(startTime.Add(time.Millisecond*10+remoteBlockDuration*100), rid2, startTime, toCandidate(cid2, cid2Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, 10*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), + }, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid2Cands[0].MinerPeer.ID, + Root: cid2, + ByteCount: sizeOf(tbc2.AllBlocks()), + Blocks: tbc2Cids, + }, + }, + CompletedRetrievals: []peer.ID{cid2Cands[0].MinerPeer.ID}, + }, + { + AfterStart: time.Millisecond*40 + remoteBlockDuration*100, + ExpectedEvents: []types.RetrievalEvent{ + events.Success(startTime.Add(time.Millisecond*40+remoteBlockDuration*100), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, 40*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), + }, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[0].MinerPeer.ID, + Root: cid1, + ByteCount: sizeOf(tbc1.AllBlocks()), + Blocks: tbc1Cids, + }, + }, + CompletedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + }, + }, }, } @@ -199,225 +267,87 @@ func TestHTTPRetriever(t *testing.T) { defer cancel() clock := clock.NewMock() + clock.Set(startTime) - localLinkSystems := testCase.localLinkSystems - if localLinkSystems == nil { - localLinkSystems = make(map[cid.Cid]*linking.LinkSystem) - } - remoteLinkSystems := testCase.remoteLinkSystems - if remoteLinkSystems == nil { - remoteLinkSystems = make(map[cid.Cid]*linking.LinkSystem) - } - linkSystemForCid := func(c cid.Cid, lsMap map[cid.Cid]*linking.LinkSystem) *linking.LinkSystem { - if _, ok := lsMap[c]; !ok { - lsMap[c] = makeLsys(nil) - } - return lsMap[c] - } - rid1, err := types.NewRetrievalID() - req.NoError(err) - req1Context := types.RegisterRetrievalIDToContext(ctx, rid1) - req1 := types.RetrievalRequest{ - RetrievalID: rid1, - Cid: cid1, - LinkSystem: *linkSystemForCid(cid1, localLinkSystems), - Path: testCase.requestPath[0], - Scope: testCase.requestScope[0], - } - rid2, err := types.NewRetrievalID() - req.NoError(err) - req2Context := types.RegisterRetrievalIDToContext(ctx, rid2) - req2 := types.RetrievalRequest{ - RetrievalID: rid2, - Cid: cid2, - LinkSystem: *linkSystemForCid(cid2, localLinkSystems), - Path: testCase.requestPath[1], - Scope: testCase.requestScope[1], - } awaitReceivedCandidates := make(chan struct{}, 1) - - remoteStats := make([]sentStats, 0) - var remoteStatsLk sync.Mutex - var bodyWg sync.WaitGroup - makeBody := func(root cid.Cid) io.ReadCloser { - fmt.Println("makeBody") - bodyWg.Add(1) - // make traverser - lsys := linkSystemForCid(root, remoteLinkSystems) - carR, carW := io.Pipe() - statsCh := traverseCar(t, ctx, clock, remoteBlockDuration, carW, lsys, root, testCase.remoteSelector[root]) - go func() { - select { - case <-ctx.Done(): - return - case stats, ok := <-statsCh: - if !ok { - return - } - remoteStatsLk.Lock() - remoteStats = append(remoteStats, stats) - remoteStatsLk.Unlock() + getRemote := func(cid cid.Cid, maddr string) httpRemote { + remotes, ok := testCase.remotes[cid] + req.True(ok) + for _, remote := range remotes { + if remote.peer.Addrs[0].String() == maddr { + return remote } - bodyWg.Done() - fmt.Println("bodyWg.Done()") - }() - return carR - } - rt := NewCannedBytesRoundTripper(t, ctx, testCase.requestPath, testCase.requestScope, testCase.expectedCandidates, makeBody) - // mock http client that returns carBytes as the response body regardless - // of the request - client := &http.Client{Transport: rt} - hr := retriever.NewHttpRetrieverWithDeps(getTimeout, client, clock, awaitReceivedCandidates) - receivedEvents := make(map[cid.Cid][]types.RetrievalEvent) - retrievalCollector := func(evt types.RetrievalEvent) { - receivedEvents[evt.PayloadCid()] = append(receivedEvents[evt.PayloadCid()], evt) - } - retrieval1 := hr.Retrieve(req1Context, req1, retrievalCollector) - retrieval2 := hr.Retrieve(req2Context, req2, retrievalCollector) - receivedStats := make(map[cid.Cid]*types.RetrievalStats, 2) - receivedErrors := make(map[cid.Cid]struct{}, 2) - - // reset the clock - clock.Set(time.Now()) - retrievalResult := make(chan types.RetrievalResult, 1) - go func() { - stats, err := retrieval1.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, testCase.expectedCandidates[cid1])) - retrievalResult <- types.RetrievalResult{Stats: stats, Err: err} - }() - if len(testCase.expectedCandidates[cid1]) > 0 && len(testCase.expectedCandidates[cid1][0]) > 0 { - select { - case <-ctx.Done(): - req.FailNow("did not receive all candidates") - case <-awaitReceivedCandidates: - } - } - - { - var stats *types.RetrievalStats - select { - case <-ctx.Done(): - req.FailNow("did not receive result") - case result := <-retrievalResult: - stats, err = result.Stats, result.Err - } - if stats != nil { - receivedStats[cid1] = stats - } - if err != nil { - fmt.Println("retrieval1 error", err) - receivedErrors[cid1] = struct{}{} - } - } - - // reset the clock - clock.Set(time.Now()) - go func() { - stats, err := retrieval2.RetrieveFromAsyncCandidates(makeAsyncCandidates(t, testCase.expectedCandidates[cid2])) - retrievalResult <- types.RetrievalResult{Stats: stats, Err: err} - }() - if len(testCase.expectedCandidates[cid2]) > 0 && len(testCase.expectedCandidates[cid2][0]) > 0 { - select { - case <-ctx.Done(): - req.FailNow("did not receive all candidates") - case <-awaitReceivedCandidates: - } - } - - { - var stats *types.RetrievalStats - select { - case <-ctx.Done(): - req.FailNow("did not receive result") - case result := <-retrievalResult: - stats, err = result.Stats, result.Err - } - if stats != nil { - receivedStats[cid2] = stats } - if err != nil { - fmt.Println("retrieval2 error", err) - receivedErrors[cid2] = struct{}{} - } - } - - done := make(chan struct{}) - go func() { - bodyWg.Wait() - close(done) - }() - select { - case <-done: - case <-ctx.Done(): - req.FailNow("Did not end sending all bodies") + t.Fatal("remote not found") + return httpRemote{} } - expectedErrors := testCase.expectedErrors - if expectedErrors == nil { - expectedErrors = make(map[cid.Cid]struct{}) - } - req.Equal(expectedErrors, receivedErrors) - expectedStats := testCase.expectedStats - if expectedStats == nil { - expectedStats = make(map[cid.Cid]*types.RetrievalStats) - } - // print testCase.expectedCandidates for each CID in the map, in String() form as a command separated list to stdout - // this is useful for updating the test case - for key, candidates := range testCase.expectedCandidates { - fmt.Printf("expectedCandidates[%s] = []types.RetrievalCandidate{\n", key) - for _, co := range candidates { - for _, candidate := range co { - fmt.Printf("\t%s,\n", candidate.MinerPeer.ID.String()) + roundTripper := NewCannedBytesRoundTripper(t, ctx, clock, remoteBlockDuration, testCase.requestPath, testCase.requestScope, getRemote) + defer roundTripper.Close() + client := &http.Client{Transport: roundTripper} + retriever := retriever.NewHttpRetrieverWithDeps(getTimeout, client, clock, awaitReceivedCandidates) + + retrievals := make([]testutil.RunRetrieval, 0) + expectedStats := make([]types.RetrievalStats, 0) + for cid, rid := range testCase.requests { + cid := cid + rid := rid + expectedStats = append(expectedStats, testCase.expectedStats[cid]) + retrievals = append(retrievals, func(eventsCb func(types.RetrievalEvent)) (*types.RetrievalStats, error) { + request := types.RetrievalRequest{ + RetrievalID: rid, + Cid: cid, + LinkSystem: *makeLsys(nil), + Path: testCase.requestPath[cid], + Scope: testCase.requestScope[cid], } - } - fmt.Printf("}\n") - // print StorageProviderId from receivedStats map in the same way - fmt.Printf("receivedStats[%s] = &types.RetrievalStats{StorageProviderID: %s}\n", key, receivedStats[key].StorageProviderId.String()) - + candidates := toCandidates(cid, testCase.remotes[cid]) + return retriever.Retrieve(context.Background(), request, eventsCb). + RetrieveFromAsyncCandidates(makeAsyncCandidates(t, candidates)) + }) } - req.Equal(expectedStats, receivedStats) - - receivedCodes := make(map[cid.Cid]map[types.EventCode]int, 0) - for key, events := range receivedEvents { - if receivedCodes[key] == nil { - receivedCodes[key] = make(map[types.EventCode]int, 0) - } - for _, event := range events { - v := receivedCodes[key][event.Code()] - receivedCodes[key][event.Code()] = v + 1 - } - } - expectedRemoteStats := []sentStats{ - { - root: cid1, - byteCount: testCase.expectedStats[cid1].Size, - blocks: testCase.expectedCids[cid1], - }, - { - root: cid2, - byteCount: testCase.expectedStats[cid2].Size, - blocks: testCase.expectedCids[cid2], - }, - } - req.Equal(expectedRemoteStats, remoteStats) - if testCase.expectedEvents == nil { - testCase.expectedEvents = make(map[cid.Cid]map[types.EventCode]int, 0) + results := testutil.RetrievalVerifier{ + ExpectedSequence: testCase.expectSequence, + }.RunWithVerification(ctx, t, clock, roundTripper, nil, retrievals) + + require.Len(t, results, len(testCase.requests)) + actualStats := make([]types.RetrievalStats, 0) + for _, result := range results { + stats, err := result.Stats, result.Err + require.NoError(t, err) + require.NotNil(t, stats) + actualStats = append(actualStats, *stats) } - req.Equal(testCase.expectedEvents, receivedCodes) + require.ElementsMatch(t, expectedStats, actualStats) }) } } +func toCandidates(root cid.Cid, remotes []httpRemote) []types.RetrievalCandidate { + candidates := make([]types.RetrievalCandidate, len(remotes)) + for i, r := range remotes { + candidates[i] = toCandidate(root, r.peer) + } + return candidates +} + +func toCandidate(root cid.Cid, peer peer.AddrInfo) types.RetrievalCandidate { + return types.NewRetrievalCandidate(peer.ID, peer.Addrs, root, &metadata.IpfsGatewayHttp{}) +} + type cannedBytesRoundTripper struct { - t *testing.T - ctx context.Context - expectedPath []string - expectedScope []types.CarScope - candidates map[cid.Cid][][]types.RetrievalCandidate - makeBody func(cid.Cid) io.ReadCloser - batchCounter map[cid.Cid][]int - batchCounterLk *sync.Mutex - batchWait *sync.Cond + StartsCh chan peer.ID + StatsCh chan testutil.RemoteStats + EndsCh chan peer.ID + + t *testing.T + ctx context.Context + clock *clock.Mock + remoteBlockDuration time.Duration + expectedPath map[cid.Cid]string + expectedScope map[cid.Cid]types.CarScope + getRemote func(cid cid.Cid, maddr string) httpRemote } var _ http.RoundTripper = (*cannedBytesRoundTripper)(nil) @@ -425,26 +355,31 @@ var _ http.RoundTripper = (*cannedBytesRoundTripper)(nil) func NewCannedBytesRoundTripper( t *testing.T, ctx context.Context, - expectedPath []string, - expectedScope []types.CarScope, - candidates map[cid.Cid][][]types.RetrievalCandidate, - makeBody func(cid.Cid) io.ReadCloser, + clock *clock.Mock, + remoteBlockDuration time.Duration, + expectedPath map[cid.Cid]string, + expectedScope map[cid.Cid]types.CarScope, + getRemote func(cid cid.Cid, maddr string) httpRemote, ) *cannedBytesRoundTripper { - - lk := sync.Mutex{} return &cannedBytesRoundTripper{ + make(chan peer.ID, 32), + make(chan testutil.RemoteStats, 32), + make(chan peer.ID, 32), t, ctx, + clock, + remoteBlockDuration, expectedPath, expectedScope, - candidates, - makeBody, - make(map[cid.Cid][]int), - &lk, - &sync.Cond{L: &lk}, + getRemote, } } +func (c *cannedBytesRoundTripper) Close() { + close(c.StartsCh) + close(c.EndsCh) +} + func (c *cannedBytesRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { us := strings.Split(req.URL.Path, "/") require.True(c.t, len(us) > 2) @@ -452,83 +387,88 @@ func (c *cannedBytesRoundTripper) RoundTrip(req *http.Request) (*http.Response, root, err := cid.Parse(us[2]) require.NoError(c.t, err) path := strings.Join(us[3:], "/") - matchedPath := -1 - for ii, p := range c.expectedPath { - if p == path { - matchedPath = ii - break - } + expectedPath, ok := c.expectedPath[root] + if !ok { + require.Equal(c.t, path, "") + } else { + require.Equal(c.t, path, expectedPath) } - require.True(c.t, matchedPath > -1) - require.Equal(c.t, req.URL.RawQuery, fmt.Sprintf("car-scope=%s", c.expectedScope[matchedPath])) + expectedScope := types.CarScopeAll + if scope, ok := c.expectedScope[root]; ok { + expectedScope = scope + } + require.Equal(c.t, req.URL.RawQuery, fmt.Sprintf("car-scope=%s", expectedScope)) ip := req.URL.Hostname() port := req.URL.Port() - myaddr := fmt.Sprintf("/ip4/%s/tcp/%s/http", ip, port) - - cands, ok := c.candidates[root] - require.True(c.t, ok) - - c.batchCounterLk.Lock() - var batchNum int - var batchCount int - for { - if c.ctx.Err() != nil { - c.batchCounterLk.Unlock() - return nil, c.ctx.Err() - } - - // get next expected candidate to serve - batch, ok := c.batchCounter[root] - if ok { - batchNum = batch[0] - batchCount = batch[1] - } // else start at 0,0 - if batchNum >= len(cands) { - require.FailNowf(c.t, "too many requests (exceeded batch count)", "batchNum: %d, len(cands): %d, batchCount: %d", batchNum, len(cands), batchCount) - } - if batchCount >= len(cands[batchNum]) { - require.FailNowf(c.t, "too many requests", "batchNum: %d, len(cands): %d, batchCount: %d, len(cands[batchNum]): %d", batchNum, len(cands), batchCount, len(cands[batchNum])) - } - if cands[batchNum][batchCount].MinerPeer.Addrs[0].String() == myaddr { - break - } else { - c.batchWait.Wait() - } + maddr := fmt.Sprintf("/ip4/%s/tcp/%s/http", ip, port) + remote := c.getRemote(root, maddr) + c.StartsCh <- remote.peer.ID + + makeBody := func(root cid.Cid, maddr string) io.ReadCloser { + carR, carW := io.Pipe() + statsCh := traverseCar(c.t, c.ctx, remote.peer.ID, c.clock, c.remoteBlockDuration, carW, remote.lsys, root, remote.sel) + go func() { + select { + case <-c.ctx.Done(): + return + case stats, ok := <-statsCh: + if !ok { + return + } + c.StatsCh <- stats + } + }() + return carR } - // increment values for next candidate to pick up from - if batchCount+1 >= len(cands[batchNum]) { - batchNum++ - batchCount = 0 - } else { - batchCount++ - } - fmt.Printf("Setting batch counters for %s to %d, %d\n", root, batchNum, batchCount) - c.batchCounter[root] = []int{batchNum, batchCount} - c.batchCounterLk.Unlock() + c.clock.Sleep(remote.responseDelay) + return &http.Response{ + StatusCode: http.StatusOK, + Body: &deferredReader{root: root, maddr: maddr, makeBody: makeBody, end: func() { c.EndsCh <- remote.peer.ID }}, + }, nil +} - endCh := make(chan struct{}) - go func() { +func (c *cannedBytesRoundTripper) VerifyConnectionsReceived(ctx context.Context, t *testing.T, expectedConnections []peer.ID) { + // connection is currently a noop +} + +func (c *cannedBytesRoundTripper) VerifyRetrievalsReceived(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) { + retrievals := make([]peer.ID, 0, len(expectedRetrievals)) + for i := 0; i < len(expectedRetrievals); i++ { select { - case <-c.ctx.Done(): - case <-endCh: + case retrieval := <-c.StartsCh: + retrievals = append(retrievals, retrieval) + case <-ctx.Done(): + require.FailNowf(t, "failed to receive expected retrievals", "expected %d, received %d", len(expectedRetrievals), i) } - c.batchCounterLk.Lock() - c.batchWait.Broadcast() - c.batchCounterLk.Unlock() - }() + } + require.ElementsMatch(t, expectedRetrievals, retrievals) +} - fmt.Println("setting up response") - return &http.Response{ - StatusCode: http.StatusOK, - Body: &deferredReader{root: root, makeBody: c.makeBody, endCh: endCh}, - }, nil +func (c *cannedBytesRoundTripper) VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []testutil.RemoteStats) { + remoteStats := make([]testutil.RemoteStats, 0, len(expectedServed)) + for i := 0; i < len(expectedServed); i++ { + select { + case stats := <-c.StatsCh: + remoteStats = append(remoteStats, stats) + case <-ctx.Done(): + require.FailNowf(t, "failed to receive expected served", "expected %d, received %d", len(expectedServed), i) + } + } + require.ElementsMatch(t, expectedServed, remoteStats) } -type sentStats struct { - root cid.Cid - byteCount uint64 - blocks []cid.Cid +func (c *cannedBytesRoundTripper) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) { + retrievals := make([]peer.ID, 0, len(expectedRetrievals)) + for i := 0; i < len(expectedRetrievals); i++ { + select { + case retrieval := <-c.EndsCh: + retrievals = append(retrievals, retrieval) + case <-ctx.Done(): + require.FailNowf(t, "failed to complete expected retrievals", "expected %d, received %d", len(expectedRetrievals), i) + } + } + require.ElementsMatch(t, expectedRetrievals, retrievals) } // deferredReader is simply a Reader that lazily calls makeBody on the first Read @@ -536,8 +476,9 @@ type sentStats struct { // the client. type deferredReader struct { root cid.Cid - makeBody func(cid.Cid) io.ReadCloser - endCh chan struct{} + maddr string + makeBody func(cid.Cid, string) io.ReadCloser + end func() r io.ReadCloser once sync.Once @@ -547,11 +488,11 @@ var _ io.ReadCloser = (*deferredReader)(nil) func (d *deferredReader) Read(p []byte) (n int, err error) { d.once.Do(func() { - d.r = d.makeBody(d.root) + d.r = d.makeBody(d.root, d.maddr) }) n, err = d.r.Read(p) if err == io.EOF { - close(d.endCh) + d.end() } return n, err } @@ -568,23 +509,25 @@ func (d *deferredReader) Close() error { // receive basic stats on what was written _after_ the write is finished. func traverseCar(t *testing.T, ctx context.Context, + id peer.ID, clock *clock.Mock, blockDuration time.Duration, carW io.WriteCloser, lsys *linking.LinkSystem, root cid.Cid, selNode ipld.Node, -) chan sentStats { +) chan testutil.RemoteStats { req := require.New(t) sel, err := selector.CompileSelector(selNode) req.NoError(err) - statsCh := make(chan sentStats, 1) + statsCh := make(chan testutil.RemoteStats, 1) go func() { - stats := sentStats{ - root: root, - blocks: make([]cid.Cid, 0), + stats := testutil.RemoteStats{ + Peer: id, + Root: root, + Blocks: make([]cid.Cid, 0), } // instantiating this writes a CARv1 header and waits for more Put()s @@ -596,7 +539,7 @@ func traverseCar(t *testing.T, // to the CARv1 writer. originalSRO := lsys.StorageReadOpener lsys.StorageReadOpener = func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { - stats.blocks = append(stats.blocks, lnk.(cidlink.Link).Cid) + stats.Blocks = append(stats.Blocks, lnk.(cidlink.Link).Cid) r, err := originalSRO(lc, lnk) if err != nil { return nil, err @@ -607,8 +550,8 @@ func traverseCar(t *testing.T, } err = carWriter.Put(ctx, lnk.(cidlink.Link).Cid.KeyString(), byts) req.NoError(err) - stats.byteCount += uint64(len(byts)) // only the length of the bytes, not the rest of the CAR infrastructure - clock.Add(blockDuration) + stats.ByteCount += uint64(len(byts)) // only the length of the bytes, not the rest of the CAR infrastructure + clock.Sleep(blockDuration) return bytes.NewReader(byts), nil } diff --git a/pkg/retriever/parallelpeerretriever.go b/pkg/retriever/parallelpeerretriever.go index 78b14aff..5bbe3bf1 100644 --- a/pkg/retriever/parallelpeerretriever.go +++ b/pkg/retriever/parallelpeerretriever.go @@ -236,7 +236,6 @@ func collectResults(ctx context.Context, retrieval *retrievalSession, eventsCall // have we got all responses but no success? if !ok { // we failed, and got only retrieval errors - fmt.Println("all failed, retrievalErrors", retrievalErrors) retrievalErrors = multierr.Append(retrievalErrors, ErrAllRetrievalsFailed) return nil, retrievalErrors } @@ -246,7 +245,6 @@ func collectResults(ctx context.Context, retrieval *retrievalSession, eventsCall break } if result.Err != nil { - fmt.Println("result.Err", result.Err) retrievalErrors = multierr.Append(retrievalErrors, result.Err) } if result.Stats != nil {