Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

feat: switch to boxo and fix CAR fetch timeouts #68

Merged
merged 4 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"net/url"
"time"

ipfsblockstore "github.com/ipfs/boxo/blockstore"
ipath "github.com/ipfs/boxo/coreiface/path"
gateway "github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipfsblockstore "github.com/ipfs/go-ipfs-blockstore"
blocks "github.com/ipfs/go-libipfs/blocks"
gateway "github.com/ipfs/go-libipfs/gateway"
ipath "github.com/ipfs/interface-go-ipfs-core/path"
)

type Config struct {
Expand Down Expand Up @@ -75,11 +75,19 @@ type Config struct {
MaxNCoolOff int
}

const DefaultLoggingInterval = 5 * time.Second
const DefaultSaturnLoggerRequestTimeout = 1 * time.Minute

const DefaultSaturnOrchestratorRequestTimeout = 30 * time.Second

const DefaultSaturnBlockRequestTimeout = 19 * time.Second
const DefaultSaturnCarRequestTimeout = 30 * time.Minute
Comment on lines +83 to +84
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Had to add separate timeout for CARs.
Not feeling strongly about 30m, but that is for how long I was able to stream wikipedia DAG from the old gateway, so a good starting point.


const DefaultMaxRetries = 3
const DefaultPoolFailureDownvoteDebounce = 1 * time.Minute
const DefaultPoolMembershipDebounce = 3 * DefaultPoolRefreshInterval
const DefaultPoolLowWatermark = 5
const DefaultSaturnRequestTimeout = 19 * time.Second

const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000"
const DefaultPoolRefreshInterval = 5 * time.Minute
Expand Down Expand Up @@ -122,7 +130,16 @@ type ErrCoolDown struct {
}

func (e *ErrCoolDown) Error() string {
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %s/Path %s, please retry after %s", e.Cid, e.Path, humanRetry(e.retryAfter))
Copy link
Contributor Author

@lidel lidel Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ This was producing very confusing log errors failures seen for CID b/Path due to e.Cid being undefined, and printed not sanitized input from the user.

switch true {
case e.Cid != cid.Undef && e.Path != "":
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %q and Path %q, please retry after %s", e.Cid, e.Path, humanRetry(e.retryAfter))
case e.Path != "":
return fmt.Sprintf("multiple saturn retrieval failures seen for Path %q, please retry after %s", e.Path, humanRetry(e.retryAfter))
case e.Cid != cid.Undef:
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %q, please retry after %s", e.Cid, humanRetry(e.retryAfter))
default:
return fmt.Sprintf("multiple saturn retrieval failures for unknown CID/Path (BUG), please retry after %s", humanRetry(e.retryAfter))
}
}

func (e *ErrCoolDown) RetryAfter() time.Duration {
Expand Down Expand Up @@ -188,7 +205,7 @@ func NewCaboose(config *Config) (*Caboose, error) {

if c.config.SaturnClient == nil {
c.config.SaturnClient = &http.Client{
Timeout: DefaultSaturnRequestTimeout,
Timeout: DefaultSaturnCarRequestTimeout,
}
}
if c.config.OrchestratorEndpoint == nil {
Expand Down
2 changes: 1 addition & 1 deletion caboose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
Expand Down
10 changes: 5 additions & 5 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
carv2 "github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/boxo/ipld/car/v2/blockstore"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
Expand Down Expand Up @@ -49,15 +49,15 @@ func main1() int {

cb, err := caboose.NewCaboose(&caboose.Config{
OrchestratorClient: &http.Client{
Timeout: 30 * time.Second,
Timeout: caboose.DefaultSaturnOrchestratorRequestTimeout,
},

LoggingEndpoint: *le,
LoggingClient: http.DefaultClient,
LoggingInterval: 5 * time.Second,
LoggingInterval: caboose.DefaultLoggingInterval,

DoValidation: true,
PoolRefresh: 5 * time.Minute,
PoolRefresh: caboose.DefaultPoolRefreshInterval,
SaturnClient: &saturnClient,
})
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blocks "github.com/ipfs/go-libipfs/blocks"
)

var saturnReqTmpl = "/ipfs/%s?format=raw"
Expand Down Expand Up @@ -81,6 +81,11 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
isCacheHit := false
networkError := ""

isBlockRequest := false
if mime == "application/vnd.ipld.raw" {
isBlockRequest = true
}

defer func() {
var ttfbMs int64
durationSecs := time.Since(start).Seconds()
Expand All @@ -92,15 +97,15 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
ttfbMs = fb.Sub(start).Milliseconds()
fetchTTFBPerBlockPerPeerSuccessMetric.Observe(float64(ttfbMs))
// track individual block metrics separately
if mime == "application/vnd.ipld.raw" {
if isBlockRequest {
fetchDurationPerBlockPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds()))
} else {
fetchDurationPerCarPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds()))
}
fetchSpeedPerBlockPerPeerMetric.Observe(float64(received) / float64(durationMs))
} else {
fetchTTFBPerBlockPerPeerFailureMetric.Observe(float64(ttfbMs))
if mime == "application/vnd.ipld.raw" {
if isBlockRequest {
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
} else {
fetchDurationPerCarPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
Expand Down Expand Up @@ -145,7 +150,16 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
}
}()

reqCtx, cancel := context.WithTimeout(ctx, DefaultSaturnRequestTimeout)
// TODO: Ideally, we would have additional "PerRequestInactivityTimeout"
// which is the amount of time without any NEW data from the server, but
// that can be added later. We need both because a slow trickle of data
// could take a large amount of time.
requestTimeout := DefaultSaturnCarRequestTimeout
if isBlockRequest {
requestTimeout = DefaultSaturnBlockRequestTimeout
}

reqCtx, cancel := context.WithTimeout(ctx, requestTimeout)
Comment on lines +153 to +162
Copy link
Contributor Author

@lidel lidel Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 ⚠️ we may have a DoS vector here, if L1 is malicious, it could be keeping a transferless connection open for 30 minutes – not the best.

Implementing "PerRequestInactivityTimeout" would help a lot – we could then have this 30m timeout as a hard ceiling (or even raise it), but then have the same timeout for block and for CAR when L1 did not send any new bytes for some time.

defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
Expand Down
48 changes: 22 additions & 26 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ go 1.19

require (
github.com/google/uuid v1.3.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-libipfs v0.6.1-0.20230224134131-7ba1df55d53b
github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/interface-go-ipfs-core v0.11.1
github.com/ipld/go-car/v2 v2.6.0
github.com/ipld/go-ipld-prime v0.19.0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/multiformats/go-multicodec v0.7.0
github.com/multiformats/go-multicodec v0.8.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.14.0
github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
github.com/urfave/cli/v2 v2.24.2
)

require (
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand All @@ -39,35 +38,31 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.1.1 // indirect
github.com/ipfs/go-blockservice v0.5.0 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-fetcher v1.6.1 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-format v0.4.0 // indirect
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-ipns v0.3.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.9.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-namesys v0.7.0 // indirect
github.com/ipfs/go-path v0.3.0 // indirect
github.com/ipfs/go-verifcid v0.0.2 // indirect
github.com/ipld/go-car v0.5.0 // indirect
github.com/ipld/go-codec-dagpb v1.5.0 // indirect
github.com/ipfs/go-unixfsnode v1.6.0 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-libp2p v0.25.1 // indirect
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-libp2p v0.26.3 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.21.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.21.1 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.4.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
Expand Down Expand Up @@ -100,16 +95,17 @@ require (
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.12.0 // indirect
go.opentelemetry.io/otel/trace v1.12.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand Down
Loading