diff --git a/CHANGELOG.md b/CHANGELOG.md index 734c89ebf..8ebe24d2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The following emojis are used to highlight certain changes: ### Added - `blockservice` now has `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context. Future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will use the session in the context. +- `blockservice` now has `WithContentBlocker` option which allows to filter Add and Get requests by CID. ### Changed diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 7733788ec..e9772928c 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -5,7 +5,6 @@ package blockservice import ( "context" - "io" "sync" "go.opentelemetry.io/otel/attribute" @@ -40,41 +39,15 @@ type BlockGetter interface { GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block } +// Blocker returns err != nil if the CID is disallowed to be fetched or stored in blockservice. +// It returns an error so error messages could be passed. +type Blocker func(cid.Cid) error + // BlockService is a hybrid block datastore. It stores data in a local // datastore and may retrieve data from a remote Exchange. -// It uses an internal `datastore.Datastore` instance to store values. -type BlockService interface { - io.Closer - BlockGetter - - // Blockstore returns a reference to the underlying blockstore - Blockstore() blockstore.Blockstore - - // Exchange returns a reference to the underlying exchange (usually bitswap) - Exchange() exchange.Interface - - // AddBlock puts a given block to the underlying datastore - AddBlock(ctx context.Context, o blocks.Block) error - - // AddBlocks adds a slice of blocks at the same time using batching - // capabilities of the underlying datastore whenever possible. - AddBlocks(ctx context.Context, bs []blocks.Block) error - - // DeleteBlock deletes the given block from the blockservice. - DeleteBlock(ctx context.Context, o cid.Cid) error -} - -// BoundedBlockService is a Blockservice bounded via strict multihash Allowlist. -type BoundedBlockService interface { - BlockService - - Allowlist() verifcid.Allowlist -} - -var _ BoundedBlockService = (*blockService)(nil) - -type blockService struct { +type BlockService struct { allowlist verifcid.Allowlist + blocker Blocker blockstore blockstore.Blockstore exchange exchange.Interface // If checkFirst is true then first check that a block doesn't @@ -82,30 +55,37 @@ type blockService struct { checkFirst bool } -type Option func(*blockService) +type Option func(*BlockService) // WriteThrough disable cache checks for writes and make them go straight to // the blockstore. func WriteThrough() Option { - return func(bs *blockService) { + return func(bs *BlockService) { bs.checkFirst = false } } // WithAllowlist sets a custom [verifcid.Allowlist] which will be used func WithAllowlist(allowlist verifcid.Allowlist) Option { - return func(bs *blockService) { + return func(bs *BlockService) { bs.allowlist = allowlist } } +// WithContentBlocker allows to filter what blocks can be fetched or added to the blockservice. +func WithContentBlocker(blocker Blocker) Option { + return func(bs *BlockService) { + bs.blocker = blocker + } +} + // New creates a BlockService with given datastore instance. -func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService { +func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) *BlockService { if exchange == nil { logger.Debug("blockservice running in local (offline) mode.") } - service := &blockService{ + service := &BlockService{ allowlist: verifcid.DefaultAllowlist, blockstore: bs, exchange: exchange, @@ -119,50 +99,46 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) return service } -// NewWriteThrough creates a BlockService that guarantees writes will go -// through to the blockstore and are not skipped by cache checks. -// -// Deprecated: Use [New] with the [WriteThrough] option. -func NewWriteThrough(bs blockstore.Blockstore, exchange exchange.Interface) BlockService { - return New(bs, exchange, WriteThrough()) -} - // Blockstore returns the blockstore behind this blockservice. -func (s *blockService) Blockstore() blockstore.Blockstore { +func (s *BlockService) Blockstore() blockstore.Blockstore { return s.blockstore } // Exchange returns the exchange behind this blockservice. -func (s *blockService) Exchange() exchange.Interface { +func (s *BlockService) Exchange() exchange.Interface { return s.exchange } -func (s *blockService) Allowlist() verifcid.Allowlist { +func (s *BlockService) Allowlist() verifcid.Allowlist { return s.allowlist } +func (s *BlockService) Blocker() Blocker { + return s.blocker +} + // NewSession creates a new session that allows for // controlled exchange of wantlists to decrease the bandwidth overhead. // If the current exchange is a SessionExchange, a new exchange // session will be created. Otherwise, the current exchange will be used // directly. // Sessions are lazily setup, this is cheap. -func NewSession(ctx context.Context, bs BlockService) *Session { - ses := grabSessionFromContext(ctx, bs) +func (s *BlockService) NewSession(ctx context.Context) *Session { + ses := grabSessionFromContext(ctx, s) if ses != nil { return ses } - return newSession(ctx, bs) + return s.newSession(ctx) } // newSession is like [NewSession] but it does not attempt to reuse session from the existing context. -func newSession(ctx context.Context, bs BlockService) *Session { - return &Session{bs: bs, sesctx: ctx} +func (s *BlockService) newSession(ctx context.Context) *Session { + return &Session{bs: s, sesctx: ctx} } // AddBlock adds a particular block to the service, Putting it into the datastore. -func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { +func (s *BlockService) AddBlock(ctx context.Context, o blocks.Block) error { ctx, span := internal.StartSpan(ctx, "blockService.AddBlock") defer span.End() @@ -171,6 +147,13 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { if err != nil { return err } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + return err + } + } + if s.checkFirst { if has, err := s.blockstore.Has(ctx, c); has || err != nil { return err @@ -192,16 +175,23 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { return nil } -func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { +func (s *BlockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { ctx, span := internal.StartSpan(ctx, "blockService.AddBlocks") defer span.End() // hash security for _, b := range bs { - err := verifcid.ValidateCid(s.allowlist, b.Cid()) + c := b.Cid() + err := verifcid.ValidateCid(s.allowlist, c) if err != nil { return err } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + return err + } + } } var toput []blocks.Block if s.checkFirst { @@ -239,7 +229,7 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). -func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { +func (s *BlockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { if ses := grabSessionFromContext(ctx, s); ses != nil { return ses.GetBlock(ctx, c) } @@ -247,21 +237,27 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s, s.getExchangeFetcher) + return s.getBlock(ctx, c, s.getExchangeFetcher) } // Look at what I have to do, no interface covariance :'( -func (s *blockService) getExchangeFetcher() exchange.Fetcher { +func (s *BlockService) getExchangeFetcher() exchange.Fetcher { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { - err := verifcid.ValidateCid(grabAllowlistFromBlockservice(bs), c) // hash security +func (s *BlockService) getBlock(ctx context.Context, c cid.Cid, fetchFactory func() exchange.Fetcher) (blocks.Block, error) { + err := verifcid.ValidateCid(s.allowlist, c) // hash security if err != nil { return nil, err } - blockstore := bs.Blockstore() + if s.blocker != nil { + if err := s.blocker(c); err != nil { + return nil, err + } + } + + blockstore := s.Blockstore() block, err := blockstore.Get(ctx, c) switch { @@ -289,7 +285,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func if err != nil { return nil, err } - if ex := bs.Exchange(); ex != nil { + if ex := s.Exchange(); ex != nil { err = ex.NotifyNewBlocks(ctx, blk) if err != nil { return nil, err @@ -302,7 +298,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func // GetBlocks gets a list of blocks asynchronously and returns through // the returned channel. // NB: No guarantees are made about order. -func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { +func (s *BlockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { if ses := grabSessionFromContext(ctx, s); ses != nil { return ses.GetBlocks(ctx, ks) } @@ -310,23 +306,27 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s, s.getExchangeFetcher) + return s.getBlocks(ctx, ks, s.getExchangeFetcher) } -func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { +func (s *BlockService) getBlocks(ctx context.Context, ks []cid.Cid, fetchFactory func() exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { defer close(out) - allowlist := grabAllowlistFromBlockservice(blockservice) - var lastAllValidIndex int var c cid.Cid for lastAllValidIndex, c = range ks { - if err := verifcid.ValidateCid(allowlist, c); err != nil { + if err := verifcid.ValidateCid(s.allowlist, c); err != nil { break } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + break + } + } } if lastAllValidIndex != len(ks) { @@ -335,16 +335,24 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet copy(ks2, ks[:lastAllValidIndex]) // fast path for already filtered elements for _, c := range ks[lastAllValidIndex:] { // don't rescan already scanned elements // hash security - if err := verifcid.ValidateCid(allowlist, c); err == nil { - ks2 = append(ks2, c) - } else { + if err := verifcid.ValidateCid(s.allowlist, c); err != nil { logger.Errorf("unsafe CID (%s) passed to blockService.GetBlocks: %s", c, err) + continue + } + + if s.blocker != nil { + if err := s.blocker(c); err != nil { + logger.Errorf("blocked CID (%s) passed to blockService.GetBlocks: %s", c, err) + continue + } } + + ks2 = append(ks2, c) } ks = ks2 } - bs := blockservice.Blockstore() + bs := s.Blockstore() var misses []cid.Cid for _, c := range ks { @@ -371,7 +379,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet return } - ex := blockservice.Exchange() + ex := s.Exchange() var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block @@ -414,7 +422,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet } // DeleteBlock deletes a block in the blockservice from the datastore -func (s *blockService) DeleteBlock(ctx context.Context, c cid.Cid) error { +func (s *BlockService) DeleteBlock(ctx context.Context, c cid.Cid) error { ctx, span := internal.StartSpan(ctx, "blockService.DeleteBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() @@ -425,7 +433,7 @@ func (s *blockService) DeleteBlock(ctx context.Context, c cid.Cid) error { return err } -func (s *blockService) Close() error { +func (s *BlockService) Close() error { logger.Debug("blockservice is shutting down...") if s.exchange == nil { return nil @@ -436,7 +444,7 @@ func (s *blockService) Close() error { // Session is a helper type to provide higher level access to bitswap sessions type Session struct { createSession sync.Once - bs BlockService + bs *BlockService ses exchange.Fetcher sesctx context.Context } @@ -469,7 +477,7 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - return getBlock(ctx, c, s.bs, s.grabSession) + return s.bs.getBlock(ctx, c, s.grabSession) } // GetBlocks gets blocks in the context of a request session @@ -477,7 +485,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") defer span.End() - return getBlocks(ctx, ks, s.bs, s.grabSession) + return s.bs.getBlocks(ctx, ks, s.grabSession) } var _ BlockGetter = (*Session)(nil) @@ -487,11 +495,11 @@ var _ BlockGetter = (*Session)(nil) // will be redirected to this same session instead. // Sessions are lazily setup, this is cheap. // It wont make a new session if one exists already in the context. -func ContextWithSession(ctx context.Context, bs BlockService) context.Context { - if grabSessionFromContext(ctx, bs) != nil { +func (s *BlockService) ContextWithSession(ctx context.Context) context.Context { + if grabSessionFromContext(ctx, s) != nil { return ctx } - return EmbedSessionInContext(ctx, newSession(ctx, bs)) + return EmbedSessionInContext(ctx, s.newSession(ctx)) } // EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. @@ -504,7 +512,7 @@ func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { // This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, // if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. // By having this private we allow consumers to follow the trace of where the blockservice is passed and used. -func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { +func grabSessionFromContext(ctx context.Context, bs *BlockService) *Session { s := ctx.Value(bs) if s == nil { return nil @@ -518,11 +526,3 @@ func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { return ss } - -// grabAllowlistFromBlockservice never returns nil -func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { - if bbs, ok := bs.(BoundedBlockService); ok { - return bbs.Allowlist() - } - return verifcid.DefaultAllowlist -} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 53fd725f3..16dd92940 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -2,6 +2,7 @@ package blockservice import ( "context" + "errors" "testing" blockstore "github.com/ipfs/boxo/blockstore" @@ -67,7 +68,7 @@ func TestExchangeWrite(t *testing.T) { for name, fetcher := range map[string]BlockGetter{ "blockservice": bserv, - "session": NewSession(context.Background(), bserv), + "session": bserv.NewSession(context.Background()), } { t.Run(name, func(t *testing.T) { // GetBlock @@ -154,7 +155,7 @@ func TestLazySessionInitialization(t *testing.T) { t.Fatal(err) } - bsession := NewSession(ctx, bservSessEx) + bsession := bservSessEx.NewSession(ctx) if bsession.ses != nil { t.Fatal("Session exchange should not instantiated session immediately") } @@ -235,7 +236,7 @@ func TestNilExchange(t *testing.T) { bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bserv := New(bs, nil, WriteThrough()) - sess := NewSession(ctx, bserv) + sess := bserv.NewSession(ctx) _, err := sess.GetBlock(ctx, block.Cid()) if !ipld.IsNotFound(err) { t.Fatal("expected block to not be found") @@ -286,7 +287,7 @@ func TestAllowlist(t *testing.T) { blockservice := New(bs, nil, WithAllowlist(verifcid.NewAllowlist(map[uint64]bool{multihash.BLAKE3: true}))) check(blockservice.GetBlock) - check(NewSession(ctx, blockservice).GetBlock) + check(blockservice.NewSession(ctx).GetBlock) } type fakeIsNewSessionCreateExchange struct { @@ -335,7 +336,7 @@ func TestContextSession(t *testing.T) { service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) - ctx = ContextWithSession(ctx, service) + ctx = service.ContextWithSession(ctx) b, err := service.GetBlock(ctx, block1.Cid()) a.NoError(err) @@ -348,8 +349,77 @@ func TestContextSession(t *testing.T) { a.False(sesEx.newSessionWasCalled, "session should be reused in context") a.Equal( - NewSession(ctx, service), - NewSession(ContextWithSession(ctx, service), service), + service.NewSession(ctx), + service.NewSession(service.ContextWithSession(ctx)), "session must be deduped in all invocations on the same context", ) } + +func TestBlocker(t *testing.T) { + t.Parallel() + a := assert.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bgen := butil.NewBlockGenerator() + allowed := bgen.Next() + notAllowed := bgen.Next() + + var disallowed = errors.New("disallowed") + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + service := New(bs, nil, WithContentBlocker(func(c cid.Cid) error { + if c == notAllowed.Cid() { + return disallowed + } + return nil + })) + + // try putting + a.NoError(service.AddBlock(ctx, allowed)) + has, err := bs.Has(ctx, allowed.Cid()) + a.NoError(err) + a.True(has, "block was not added even tho it is not blocked") + a.NoError(service.DeleteBlock(ctx, allowed.Cid())) + + a.ErrorIs(service.AddBlock(ctx, notAllowed), disallowed) + has, err = bs.Has(ctx, notAllowed.Cid()) + a.NoError(err) + a.False(has, "block was added even tho it is blocked") + + a.NoError(service.AddBlocks(ctx, []blocks.Block{allowed})) + has, err = bs.Has(ctx, allowed.Cid()) + a.NoError(err) + a.True(has, "block was not added even tho it is not blocked") + a.NoError(service.DeleteBlock(ctx, allowed.Cid())) + + a.ErrorIs(service.AddBlocks(ctx, []blocks.Block{notAllowed}), disallowed) + has, err = bs.Has(ctx, notAllowed.Cid()) + a.NoError(err) + a.False(has, "block was added even tho it is blocked") + + // now try fetch + a.NoError(bs.Put(ctx, allowed)) + a.NoError(bs.Put(ctx, notAllowed)) + + block, err := service.GetBlock(ctx, allowed.Cid()) + a.NoError(err) + a.Equal(block.RawData(), allowed.RawData()) + + _, err = service.GetBlock(ctx, notAllowed.Cid()) + a.ErrorIs(err, disallowed) + + var gotAllowed bool + for block := range service.GetBlocks(ctx, []cid.Cid{allowed.Cid(), notAllowed.Cid()}) { + switch block.Cid() { + case allowed.Cid(): + gotAllowed = true + case notAllowed.Cid(): + t.Error("got disallowed block") + default: + t.Fatalf("got unrelated block: %s", block.Cid()) + } + } + a.True(gotAllowed, "did not got allowed block") +} diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index e32b10b99..e33551f4c 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -9,13 +9,13 @@ import ( ) // Mocks returns |n| connected mock Blockservices -func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService { +func Mocks(n int, opts ...blockservice.Option) []*blockservice.BlockService { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) sg := testinstance.NewTestInstanceGenerator(net, nil, nil) instances := sg.Instances(n) - var servs []blockservice.BlockService + var servs []*blockservice.BlockService for _, i := range instances { servs = append(servs, blockservice.New(i.Blockstore(), i.Exchange, opts...)) } diff --git a/fetcher/impl/blockservice/fetcher.go b/fetcher/impl/blockservice/fetcher.go index a02e6ebbf..010bb2b28 100644 --- a/fetcher/impl/blockservice/fetcher.go +++ b/fetcher/impl/blockservice/fetcher.go @@ -23,13 +23,13 @@ type fetcherSession struct { // FetcherConfig defines a configuration object from which Fetcher instances are constructed type FetcherConfig struct { - blockService blockservice.BlockService + blockService *blockservice.BlockService NodeReifier ipld.NodeReifier PrototypeChooser traversal.LinkTargetNodePrototypeChooser } // NewFetcherConfig creates a FetchConfig from which session may be created and nodes retrieved. -func NewFetcherConfig(blockService blockservice.BlockService) FetcherConfig { +func NewFetcherConfig(blockService *blockservice.BlockService) FetcherConfig { return FetcherConfig{ blockService: blockService, PrototypeChooser: DefaultPrototypeChooser, @@ -39,7 +39,7 @@ func NewFetcherConfig(blockService blockservice.BlockService) FetcherConfig { // NewSession creates a session from which nodes may be retrieved. // The session ends when the provided context is canceled. func (fc FetcherConfig) NewSession(ctx context.Context) fetcher.Fetcher { - return fc.FetcherWithSession(ctx, blockservice.NewSession(ctx, fc.blockService)) + return fc.FetcherWithSession(ctx, fc.blockService.NewSession(ctx)) } func (fc FetcherConfig) FetcherWithSession(ctx context.Context, s *blockservice.Session) fetcher.Fetcher { diff --git a/gateway/blocks_backend.go b/gateway/blocks_backend.go index fe188ae71..bb76eff66 100644 --- a/gateway/blocks_backend.go +++ b/gateway/blocks_backend.go @@ -52,7 +52,7 @@ import ( // BlocksBackend is an [IPFSBackend] implementation based on a [blockservice.BlockService]. type BlocksBackend struct { blockStore blockstore.Blockstore - blockService blockservice.BlockService + blockService *blockservice.BlockService dagService format.DAGService resolver resolver.Resolver @@ -97,7 +97,7 @@ func WithResolver(r resolver.Resolver) BlocksBackendOption { type BlocksBackendOption func(options *blocksBackendOptions) error -func NewBlocksBackend(blockService blockservice.BlockService, opts ...BlocksBackendOption) (*BlocksBackend, error) { +func NewBlocksBackend(blockService *blockservice.BlockService, opts ...BlocksBackendOption) (*BlocksBackend, error) { var compiledOptions blocksBackendOptions for _, o := range opts { if err := o(&compiledOptions); err != nil { @@ -687,7 +687,7 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool { var _ WithContextHint = (*BlocksBackend)(nil) func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context { - return blockservice.ContextWithSession(ctx, bb.blockService) + return bb.blockService.ContextWithSession(ctx) } func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { diff --git a/ipld/merkledag/merkledag.go b/ipld/merkledag/merkledag.go index a227780ff..1581279d9 100644 --- a/ipld/merkledag/merkledag.go +++ b/ipld/merkledag/merkledag.go @@ -35,7 +35,7 @@ const progressContextKey contextKey = "progress" // NewDAGService constructs a new DAGService (using the default implementation). // Note that the default implementation is also an ipld.LinkGetter. -func NewDAGService(bs bserv.BlockService) *dagService { +func NewDAGService(bs *bserv.BlockService) *dagService { return &dagService{ Blocks: bs, decoder: ipldLegacyDecoder, @@ -49,7 +49,7 @@ func NewDAGService(bs bserv.BlockService) *dagService { // // able to free some of them when vm pressure is high type dagService struct { - Blocks bserv.BlockService + Blocks *bserv.BlockService decoder *legacy.Decoder } @@ -162,7 +162,7 @@ func WrapSession(s *bserv.Session) format.NodeGetter { // Session returns a NodeGetter using a new session for block fetches. func (n *dagService) Session(ctx context.Context) format.NodeGetter { - session := bserv.NewSession(ctx, n.Blocks) + session := n.Blocks.NewSession(ctx) return &sesGetter{ bs: session, decoder: n.decoder, diff --git a/ipld/merkledag/test/utils.go b/ipld/merkledag/test/utils.go index 302d194d6..855196af1 100644 --- a/ipld/merkledag/test/utils.go +++ b/ipld/merkledag/test/utils.go @@ -17,7 +17,7 @@ func Mock() ipld.DAGService { } // Bserv returns a new, thread-safe, mock BlockService. -func Bserv() bsrv.BlockService { +func Bserv() *bsrv.BlockService { bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) return bsrv.New(bstore, offline.Exchange(bstore)) }