Skip to content

Commit

Permalink
fix(http): more test coverage, minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed May 8, 2023
1 parent c812ef7 commit f6e8366
Show file tree
Hide file tree
Showing 8 changed files with 527 additions and 132 deletions.
40 changes: 30 additions & 10 deletions pkg/internal/testutil/collectingeventlsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package testutil
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/filecoin-project/lassie/pkg/events"
"github.com/filecoin-project/lassie/pkg/types"
Expand All @@ -30,14 +32,29 @@ func (ev *AsyncCollectingEventsListener) Collect(evt types.RetrievalEvent) {
}
}

func (ev *AsyncCollectingEventsListener) VerifyNextEvents(t *testing.T, expectedEvents []types.RetrievalEvent) {
func (ev *AsyncCollectingEventsListener) VerifyNextEvents(t *testing.T, afterStart time.Duration, expectedEvents []types.RetrievalEvent) {
got := make([]types.EventCode, 0)
for i := 0; i < len(expectedEvents); i++ {
select {
case evt := <-ev.retrievalEventChan:
t.Logf("received event: %s", evt)
VerifyContainsCollectedEvent(t, expectedEvents, evt)
got = append(got, VerifyContainsCollectedEvent(t, afterStart, expectedEvents, evt))
case <-ev.ctx.Done():
require.FailNow(t, "did not receive expected events")
// work out which codes we didn't have
missing := make([]string, 0)
for _, expected := range expectedEvents {
found := false
for _, g := range got {
if g == expected.Code() {
found = true
break
}
}
if !found {
missing = append(missing, string(expected.Code()))
}
}
require.FailNowf(t, "did not receive expected events", "missing: %s", strings.Join(missing, ", "))
}
}
}
Expand Down Expand Up @@ -76,20 +93,23 @@ func VerifyCollectedEventTimings(t *testing.T, events []types.RetrievalEvent) {
}
}

func VerifyContainsCollectedEvent(t *testing.T, expectedList []types.RetrievalEvent, actual types.RetrievalEvent) {
func VerifyContainsCollectedEvent(t *testing.T, afterStart time.Duration, expectedList []types.RetrievalEvent, actual types.RetrievalEvent) types.EventCode {
for _, expected := range expectedList {
// this matching might need to evolve to be more sophisticated, particularly SP ID
if actual.Code() == expected.Code() &&
actual.RetrievalId() == expected.RetrievalId() &&
actual.PayloadCid() == expected.PayloadCid() &&
actual.Phase() == expected.Phase() {
if actual.StorageProviderId() == expected.StorageProviderId() {
VerifyCollectedEvent(t, actual, expected)
return
}
actual.Phase() == expected.Phase() &&
actual.StorageProviderId() == expected.StorageProviderId() {
VerifyCollectedEvent(t, actual, expected)
return actual.Code()
}
if actual.Code() == expected.Code() {
fmt.Printf("non-match: %s <> %s\n", actual, expected)
}
}
require.Fail(t, "event not found", actual.Code())
require.Failf(t, "unexpected event", "got '%s' @ %s", actual.Code(), afterStart)
return ""
}

func VerifyCollectedEvent(t *testing.T, actual types.RetrievalEvent, expected types.RetrievalEvent) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/internal/testutil/mockcandidatefinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testutil
import (
"context"
"testing"
"time"

"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
Expand All @@ -28,14 +29,14 @@ func NewMockCandidateFinder(err error, candidates map[cid.Cid][]types.RetrievalC
}
}

func (me *MockCandidateFinder) VerifyCandidatesDiscovered(ctx context.Context, t *testing.T, expectedCandidatesDiscovered []DiscoveredCandidate) {
func (me *MockCandidateFinder) VerifyCandidatesDiscovered(ctx context.Context, t *testing.T, afterStart time.Duration, expectedCandidatesDiscovered []DiscoveredCandidate) {
candidatesDiscovered := make([]DiscoveredCandidate, 0, len(expectedCandidatesDiscovered))
for i := 0; i < len(expectedCandidatesDiscovered); i++ {
select {
case candidate := <-me.discoveredCandidates:
candidatesDiscovered = append(candidatesDiscovered, candidate)
case <-ctx.Done():
require.FailNowf(t, "failed to receive expected candidates", "expected %d, received %d", len(expectedCandidatesDiscovered), i)
require.FailNowf(t, "failed to receive expected candidates", "expected %d, received %d @", len(expectedCandidatesDiscovered), i, afterStart)
}
}
require.ElementsMatch(t, expectedCandidatesDiscovered, candidatesDiscovered)
Expand Down
18 changes: 12 additions & 6 deletions pkg/internal/testutil/mockclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,43 @@ func NewMockClient(connectReturns map[string]DelayedConnectReturn, retrievalRetu
}
}

func (mc *MockClient) VerifyConnectionsReceived(ctx context.Context, t *testing.T, expectedConnections []peer.ID) {
func (mc *MockClient) VerifyConnectionsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedConnections []peer.ID) {
connections := make([]peer.ID, 0, len(expectedConnections))
for i := 0; i < len(expectedConnections); i++ {
select {
case connection := <-mc.received_connections:
t.Logf("connecting to peer: %s", connection)
connections = append(connections, connection)
case <-ctx.Done():
require.FailNowf(t, "failed to receive expected connections", "expected %d, received %d", len(expectedConnections), i)
require.FailNowf(t, "failed to receive expected connections", "expected %d, received %d @ %s", len(expectedConnections), i, afterStart)
}
}
require.ElementsMatch(t, expectedConnections, connections)
}

func (mc *MockClient) VerifyRetrievalsReceived(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) {
func (mc *MockClient) VerifyRetrievalsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) {
retrievals := make([]peer.ID, 0, len(expectedRetrievals))
for i := 0; i < len(expectedRetrievals); i++ {
select {
case retrieval := <-mc.received_retrievals:
retrievals = append(retrievals, retrieval.Peer)
case <-ctx.Done():
require.FailNowf(t, "failed to receive expected retrievals", "expected %d, received %d", len(expectedRetrievals), i)
require.FailNowf(t, "failed to receive expected retrievals", "expected %d, received %d @ %s", len(expectedRetrievals), i, afterStart)
}
}
require.ElementsMatch(t, expectedRetrievals, retrievals)
}

func (mc *MockClient) VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []RemoteStats) {
func (mc *MockClient) VerifyRetrievalsServed(ctx context.Context, t *testing.T, afterStart time.Duration, expectedServed []RemoteStats) {
if len(expectedServed) > 0 {
require.FailNowf(t, "unexpected RetrievalsServed", "@ %s", afterStart)
}
}

func (mc *MockClient) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) {
func (mc *MockClient) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) {
if len(expectedRetrievals) > 0 {
require.FailNowf(t, "unexpected RetrievalsCompleted", "@ %s", afterStart)
}
}

func (mc *MockClient) VerifyReceivedRetrievalFrom(ctx context.Context, t *testing.T, p peer.ID) ClientRetrievalRequest {
Expand Down
21 changes: 11 additions & 10 deletions pkg/internal/testutil/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type RemoteStats struct {
Root cid.Cid
ByteCount uint64
Blocks []cid.Cid
Err struct{}
}

type RetrievalVerifier struct {
Expand All @@ -36,10 +37,10 @@ type RetrievalVerifier struct {
type RunRetrieval func(cb func(types.RetrievalEvent)) (*types.RetrievalStats, error)

type VerifierClient interface {
VerifyConnectionsReceived(ctx context.Context, t *testing.T, expectedConnections []peer.ID)
VerifyRetrievalsReceived(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID)
VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []RemoteStats)
VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID)
VerifyConnectionsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedConnections []peer.ID)
VerifyRetrievalsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID)
VerifyRetrievalsServed(ctx context.Context, t *testing.T, afterStart time.Duration, expectedServed []RemoteStats)
VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID)
}

func (rv RetrievalVerifier) RunWithVerification(ctx context.Context,
Expand All @@ -64,15 +65,15 @@ func (rv RetrievalVerifier) RunWithVerification(ctx context.Context,
clock.Add(expectedActionsAtTime.AfterStart - currentTime)
currentTime = expectedActionsAtTime.AfterStart
t.Logf("current time: %s", clock.Now())
asyncCollectingEventsListener.VerifyNextEvents(t, expectedActionsAtTime.ExpectedEvents)
asyncCollectingEventsListener.VerifyNextEvents(t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ExpectedEvents)
if mockCandidateFinder != nil {
mockCandidateFinder.VerifyCandidatesDiscovered(ctx, t, expectedActionsAtTime.CandidatesDiscovered)
mockCandidateFinder.VerifyCandidatesDiscovered(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.CandidatesDiscovered)
}
if client != nil {
client.VerifyConnectionsReceived(ctx, t, expectedActionsAtTime.ReceivedConnections)
client.VerifyRetrievalsReceived(ctx, t, expectedActionsAtTime.ReceivedRetrievals)
client.VerifyRetrievalsServed(ctx, t, expectedActionsAtTime.ServedRetrievals)
client.VerifyRetrievalsCompleted(ctx, t, expectedActionsAtTime.CompletedRetrievals)
client.VerifyConnectionsReceived(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ReceivedConnections)
client.VerifyRetrievalsReceived(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ReceivedRetrievals)
client.VerifyRetrievalsServed(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ServedRetrievals)
client.VerifyRetrievalsCompleted(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.CompletedRetrievals)
}
}
results := make([]types.RetrievalResult, 0, len(runRetrievals))
Expand Down
2 changes: 1 addition & 1 deletion pkg/retriever/graphsyncretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func graphsyncMetadataCompare(a, b *metadata.GraphsyncFilecoinV1, defaultValue b
return defaultValue
}

func (pg ProtocolGraphsync) CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool {
func (pg ProtocolGraphsync) CompareCandidates(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool {
gsmda := mda.(*metadata.GraphsyncFilecoinV1)
gsmdb := mdb.(*metadata.GraphsyncFilecoinV1)
return graphsyncMetadataCompare(gsmda, gsmdb, a.Duration < b.Duration)
Expand Down
52 changes: 37 additions & 15 deletions pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,34 @@ var _ TransportProtocol = &ProtocolHttp{}
type ProtocolHttp struct {
Client *http.Client

req *http.Request
resp *http.Response
// customCompare is for testing only, and should be removed when we have more
// ways to compare candidates so we can control ordering deterministically in
// our tests.
customCompare func(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool
}

// NewHttpRetriever makes a new CandidateRetriever for verified CAR HTTP
// retrievals (transport-ipfs-gateway-http).
func NewHttpRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client) types.CandidateRetriever {
return NewHttpRetrieverWithDeps(getStorageProviderTimeout, client, clock.New(), nil)
return NewHttpRetrieverWithDeps(getStorageProviderTimeout, client, clock.New(), nil, 0, nil)
}

func NewHttpRetrieverWithDeps(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client, clock clock.Clock, awaitReceivedCandidates chan<- struct{}) types.CandidateRetriever {
func NewHttpRetrieverWithDeps(
getStorageProviderTimeout GetStorageProviderTimeout,
client *http.Client,
clock clock.Clock,
awaitReceivedCandidates chan<- struct{},
initialPause time.Duration,
customCompare func(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool,
) types.CandidateRetriever {
return &parallelPeerRetriever{
Protocol: &ProtocolHttp{
Client: client,
Client: client,
customCompare: customCompare,
},
GetStorageProviderTimeout: getStorageProviderTimeout,
Clock: clock,
QueueInitialPause: initialPause,
awaitReceivedCandidates: awaitReceivedCandidates,
}
}
Expand All @@ -60,15 +71,25 @@ func (ph ProtocolHttp) GetMergedMetadata(cid cid.Cid, currentMetadata, newMetada
return &metadata.IpfsGatewayHttp{}
}

func (ph ProtocolHttp) CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool {
// we only have duration .. currently
func (ph ProtocolHttp) CompareCandidates(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool {
if ph.customCompare != nil {
return ph.customCompare(a, b, mda, mdb)
}
// since Connect is a noop, Duration should be ~0; i.e. meaningless since it
// mainly relates to internal timings, including goroutine scheduling.
return a.Duration < b.Duration
}

func (ph *ProtocolHttp) Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) error {
// We could begin the request here by moving ph.beginRequest() to this function.
// That would result in parallel connections to candidates as they are received,
// then serial reading of bodies.
// If/when we need to share connection state between a Connect() and Retrieve()
// call, we'll need a shared state that we can pass - either return a Context
// here that we pick up in Retrieve, or have something on `retrieval` that can
// be keyed by `candidate` to do this; or similar. ProtocolHttp is not
// per-connection, it's per-protocol, and `retrieval` is not per-candidate
// either, it's per-retrieval.
return nil
}

Expand All @@ -85,14 +106,15 @@ func (ph *ProtocolHttp) Retrieve(
// to parallelise connections if we have confidence in not wasting server time
// by requesting but not reading bodies (or delayed reading which may result in
// timeouts).
if err := ph.beginRequest(ctx, retrieval.request, candidate); err != nil {
resp, err := ph.beginRequest(ctx, retrieval.request, candidate)
if err != nil {
return nil, err
}

defer ph.resp.Body.Close()
defer resp.Body.Close()

var blockBytes uint64
cbr, err := car.NewBlockReader(ph.resp.Body)
cbr, err := car.NewBlockReader(resp.Body)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -141,13 +163,13 @@ func (ph *ProtocolHttp) Retrieve(
}, nil
}

func (ph *ProtocolHttp) beginRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) error {
var err error
ph.req, err = makeRequest(ctx, request, candidate)
func (ph *ProtocolHttp) beginRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (resp *http.Response, err error) {
var req *http.Request
req, err = makeRequest(ctx, request, candidate)
if err == nil {
ph.resp, err = ph.Client.Do(ph.req)
resp, err = ph.Client.Do(req)
}
return err
return resp, err
}

func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (*http.Request, error) {
Expand Down
Loading

0 comments on commit f6e8366

Please sign in to comment.