Skip to content

Commit

Permalink
HTTP CAR validation (#222)
Browse files Browse the repository at this point in the history
* feat(verifiedcar): initial verifiedcar package

* feat(verifiedcar): verify http retrievals

* chore(verifiedcar): tests for basic error cases

* fix(verifiedcar): coverage of more cases, handle known edges properly

* fix(verifiedcar): remove extraneous go-routine (#226)

Co-authored-by: Rod Vagg <[email protected]>

* fix(verifiedcar): address feedback

* fix(verifiedcar): fix flaky tests

---------

Co-authored-by: Hannah Howard <[email protected]>
  • Loading branch information
rvagg and hannahhoward committed May 12, 2023
1 parent d4f07d4 commit 0afc0a6
Show file tree
Hide file tree
Showing 8 changed files with 937 additions and 120 deletions.
101 changes: 36 additions & 65 deletions pkg/internal/itest/http_fetch_test.go

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions pkg/internal/itest/unixfs/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,51 @@ func CompareDirEntries(t *testing.T, a, b DirEntry) {
require.True(t, found, fmt.Sprintf("%s child %s not found in b", a.Path, a.Children[i].Path))
}
}

const WrapPath = "/want2/want1/want0"

// WrapContent embeds the content we want in some random nested content such
// that it's fetchable under the path "/want2/want1/want0" but also contains
// extraneous files in those nested directories.
func WrapContent(t *testing.T, rndReader io.Reader, lsys *ipld.LinkSystem, content DirEntry) DirEntry {
before := GenerateDirectory(t, lsys, rndReader, 4<<10, false)
before.Path = "!before"
// target content goes here
want := content
want.Path = "want0"
after := GenerateFile(t, lsys, rndReader, 4<<11)
after.Path = "~after"
want = BuildDirectory(t, lsys, []DirEntry{before, want, after}, false)

before = GenerateFile(t, lsys, rndReader, 4<<10)
before.Path = "!before"
want.Path = "want1"
after = GenerateDirectory(t, lsys, rndReader, 4<<11, true)
after.Path = "~after"
want = BuildDirectory(t, lsys, []DirEntry{before, want, after}, false)

before = GenerateFile(t, lsys, rndReader, 4<<10)
before.Path = "!before"
want.Path = "want2"
after = GenerateFile(t, lsys, rndReader, 4<<11)
after.Path = "~after"
want = BuildDirectory(t, lsys, []DirEntry{before, want, after}, false)

return want
}

// WrapContentExclusive is the same as WrapContent but doesn't
// include the extraneous files.
func WrapContentExclusive(t *testing.T, rndReader io.Reader, lsys *ipld.LinkSystem, content DirEntry) DirEntry {
want := content
want.Path = "want0"
want = BuildDirectory(t, lsys, []DirEntry{want}, false)

want.Path = "want1"
want = BuildDirectory(t, lsys, []DirEntry{want}, false)

want.Path = "want2"
want = BuildDirectory(t, lsys, []DirEntry{want}, false)

return want
}
1 change: 1 addition & 0 deletions pkg/internal/itest/unixfs/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func BuildDirectory(t *testing.T, linkSys *linking.LinkSystem, children []DirEnt
root, size, err = builder.BuildUnixFSDirectory(dirLinks, linkSys)
require.NoError(t, err)
}

return DirEntry{
Path: "",
Root: root.(cidlink.Link).Cid,
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/testutil/collectingeventlsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func VerifyContainsCollectedEvent(t *testing.T, afterStart time.Duration, expect
return actual.Code()
}
if actual.Code() == expected.Code() {
fmt.Printf("non-match: %s <> %s\n", actual, expected)
t.Logf("non-matching event: %s <> %s\n", actual, expected)
}
}
require.Failf(t, "unexpected event", "got '%s' @ %s", actual.Code(), afterStart)
Expand Down
77 changes: 43 additions & 34 deletions pkg/retriever/httpretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lassie/pkg/events"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/filecoin-project/lassie/pkg/verifiedcar"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/metadata"
"github.com/multiformats/go-multicodec"
)
Expand Down Expand Up @@ -103,7 +102,6 @@ func (ph *ProtocolHttp) Retrieve(
timeout time.Duration,
candidate types.RetrievalCandidate,
) (*types.RetrievalStats, error) {

// Connect and read body in one flow, we can move ph.beginRequest() to Connect()
// to parallelise connections if we have confidence in not wasting server time
// by requesting but not reading bodies (or delayed reading which may result in
Expand All @@ -115,46 +113,34 @@ func (ph *ProtocolHttp) Retrieve(

defer resp.Body.Close()

var blockBytes uint64
cbr, err := car.NewBlockReader(resp.Body)
var ttfb time.Duration
rdr := newTimeToFirstByteReader(resp.Body, func() {
ttfb = retrieval.Clock.Since(phaseStartTime)
session.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, candidate))
})

sel, err := selector.CompileSelector(retrieval.request.GetSelector())
if err != nil {
return nil, err
}
ttfb := retrieval.Clock.Since(phaseStartTime)
session.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, candidate))

var blockCount uint64
for {
blk, err := cbr.Next()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, err
}
w, d, err := retrieval.request.LinkSystem.StorageWriteOpener(ipld.LinkContext{})
if err != nil {
return nil, err
}
_, err = w.Write(blk.RawData())
if err != nil {
return nil, err
}
blockBytes += uint64(len(blk.RawData()))
err = d(cidlink.Link{Cid: blk.Cid()})
if err != nil {
return nil, err
}
blockCount++

cfg := verifiedcar.Config{
Root: retrieval.request.Cid,
Selector: sel,
}

blockCount, byteCount, err := cfg.Verify(ctx, rdr, retrieval.request.LinkSystem)
if err != nil {
return nil, err
}

duration := retrieval.Clock.Since(phaseStartTime)
speed := uint64(float64(blockBytes) / duration.Seconds())
speed := uint64(float64(byteCount) / duration.Seconds())

return &types.RetrievalStats{
RootCid: candidate.RootCid,
StorageProviderId: candidate.MinerPeer.ID,
Size: blockBytes,
Size: byteCount,
Blocks: blockCount,
Duration: duration,
AverageSpeed: speed,
Expand Down Expand Up @@ -197,3 +183,26 @@ func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate

return req, nil
}

var _ io.Reader = (*timeToFirstByteReader)(nil)

type timeToFirstByteReader struct {
r io.Reader
first bool
cb func()
}

func newTimeToFirstByteReader(r io.Reader, cb func()) *timeToFirstByteReader {
return &timeToFirstByteReader{
r: r,
cb: cb,
}
}

func (t *timeToFirstByteReader) Read(p []byte) (n int, err error) {
if !t.first {
t.first = true
defer t.cb()
}
return t.r.Read(p)
}
42 changes: 22 additions & 20 deletions pkg/retriever/httpretriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
gstestutil "github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/storage"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
Expand Down Expand Up @@ -302,7 +303,8 @@ func TestHTTPRetriever(t *testing.T) {
{
AfterStart: initialPause + time.Millisecond*10,
ExpectedEvents: []types.RetrievalEvent{
events.Failed(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer), "unexpected EOF"),
events.FirstByte(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)),
events.Failed(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer), "malformed CAR; unexpected EOF"),
},
CompletedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID},
ReceivedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID},
Expand All @@ -318,7 +320,8 @@ func TestHTTPRetriever(t *testing.T) {
{
AfterStart: initialPause + time.Millisecond*20,
ExpectedEvents: []types.RetrievalEvent{
events.Failed(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer), "unexpected EOF"),
events.FirstByte(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, toCandidate(cid1, cid1Cands[1].MinerPeer)),
events.Failed(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer), "malformed CAR; unexpected EOF"),
},
CompletedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID},
ReceivedRetrievals: []peer.ID{cid1Cands[2].MinerPeer.ID},
Expand All @@ -334,7 +337,8 @@ func TestHTTPRetriever(t *testing.T) {
{
AfterStart: initialPause + time.Millisecond*30,
ExpectedEvents: []types.RetrievalEvent{
events.Failed(startTime.Add(initialPause+time.Millisecond*30), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[2].MinerPeer), "unexpected EOF"),
events.FirstByte(startTime.Add(initialPause+time.Millisecond*30), rid1, startTime, toCandidate(cid1, cid1Cands[2].MinerPeer)),
events.Failed(startTime.Add(initialPause+time.Millisecond*30), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[2].MinerPeer), "malformed CAR; unexpected EOF"),
},
CompletedRetrievals: []peer.ID{cid1Cands[2].MinerPeer.ID},
ServedRetrievals: []testutil.RemoteStats{
Expand Down Expand Up @@ -408,7 +412,8 @@ func TestHTTPRetriever(t *testing.T) {
{
AfterStart: initialPause + time.Millisecond*10,
ExpectedEvents: []types.RetrievalEvent{
events.Failed(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer), "unexpected EOF"),
events.FirstByte(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)),
events.Failed(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer), "malformed CAR; unexpected EOF"),
},
CompletedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID},
ReceivedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID},
Expand All @@ -424,7 +429,8 @@ func TestHTTPRetriever(t *testing.T) {
{
AfterStart: initialPause + time.Millisecond*20,
ExpectedEvents: []types.RetrievalEvent{
events.Failed(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer), "unexpected EOF"),
events.FirstByte(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, toCandidate(cid1, cid1Cands[1].MinerPeer)),
events.Failed(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer), "malformed CAR; unexpected EOF"),
},
CompletedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID},
ReceivedRetrievals: []peer.ID{cid1Cands[2].MinerPeer.ID},
Expand Down Expand Up @@ -477,18 +483,8 @@ func TestHTTPRetriever(t *testing.T) {
},
},
expectedCids: map[cid.Cid][]cid.Cid{cid1: tbc1Cids[0:50]},
expectedStats: map[cid.Cid]*types.RetrievalStats{
cid1: {
RootCid: cid1,
StorageProviderId: cid1Cands[0].MinerPeer.ID,
Size: sizeOf(tbc1.AllBlocks()[0:50]),
Blocks: 50,
Duration: initialPause + 40*time.Millisecond + remoteBlockDuration*50,
AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[0:50])) / (initialPause + 40*time.Millisecond + remoteBlockDuration*50).Seconds()),
TimeToFirstByte: initialPause + 40*time.Millisecond,
TotalPayment: big.Zero(),
AskPrice: big.Zero(),
},
expectedErrors: map[cid.Cid]struct{}{
cid1: {},
},
expectSequence: []testutil.ExpectedActionsAtTime{
{
Expand All @@ -511,7 +507,7 @@ func TestHTTPRetriever(t *testing.T) {
{
AfterStart: initialPause + time.Millisecond*40 + remoteBlockDuration*50,
ExpectedEvents: []types.RetrievalEvent{
events.Success(startTime.Add(initialPause+time.Millisecond*40+remoteBlockDuration*50), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()[0:50]), 50, initialPause+40*time.Millisecond+remoteBlockDuration*50, big.Zero(), 0),
events.Failed(startTime.Add(initialPause+time.Millisecond*40+remoteBlockDuration*50), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer), "missing block in CAR; ipld: could not find "+tbc1.AllBlocks()[50].Cid().String()),
},
ServedRetrievals: []testutil.RemoteStats{
{
Expand All @@ -531,6 +527,8 @@ func TestHTTPRetriever(t *testing.T) {
for _, testCase := range testCases {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()

req := require.New(t)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -925,7 +923,11 @@ func traverseCar(
// load and register the root link so it's pushed to the CAR since
// the traverser won't load it (we feed the traverser the rood _node_
// not the link)
rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, basicnode.Prototype.Any)
var proto datamodel.NodePrototype = basicnode.Prototype.Any
if root.Prefix().Codec == cid.DagProtobuf {
proto = dagpb.Type.PBNode
}
rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, proto)
if err != nil {
stats.Err = struct{}{}
} else {
Expand All @@ -934,7 +936,7 @@ func traverseCar(
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: *lsys,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
LinkTargetNodePrototypeChooser: dagpb.AddSupportToChooser(basicnode.Chooser),
},
}.WalkAdv(rootNode, sel, func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil })
if err != nil {
Expand Down
Loading

0 comments on commit 0afc0a6

Please sign in to comment.