From 3af3c29470bf19bdcc51d277de1447ed20a1add4 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 30 Mar 2023 18:48:28 +0200 Subject: [PATCH] blockservice & exchange & bitswap: add non variadic NotifyNewBlock Variadicts in go are just syntactic sugar around passing a slice, that means all go memory reachability rules apply, this force the compiler to heap allocate the variadic slice for virtual call, because the implementation is allowed to leak the slice (and go's interprocedural optimisations do not cover virtuals). Passing a block without variadic will pass the itab either on the stack or decomposed through registers. Skipping having to allocate a slice. --- bitswap/bitswap.go | 7 +++++++ bitswap/client/client.go | 10 ++++++++++ bitswap/server/server.go | 11 +++++++++++ blockservice/blockservice.go | 12 +++++------- blockservice/blockservice_test.go | 5 +++++ exchange/interface.go | 2 ++ exchange/offline/offline.go | 6 ++++++ 7 files changed, 46 insertions(+), 7 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index e80a407aa..4794ded77 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -96,6 +96,13 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc return bs } +func (bs *Bitswap) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + return multierr.Combine( + bs.Client.NotifyNewBlock(ctx, blk), + bs.Server.NotifyNewBlock(ctx, blk), + ) +} + func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error { return multierr.Combine( bs.Client.NotifyNewBlocks(ctx, blks...), diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 46e3a0ecc..51645ed14 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -259,6 +259,16 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks. return session.GetBlocks(ctx, keys) } +// NotifyNewBlock announces the existence of blocks to this bitswap service. +// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure +// that those blocks are available in the blockstore before calling this function. +func (bs *Client) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + // Call to the variadic to avoid code duplication. + // This is actually fine to do because no calls is virtual the compiler is able + // to see that the slice does not leak and the slice is stack allocated. + return bs.NotifyNewBlocks(ctx, blk) +} + // NotifyNewBlocks announces the existence of blocks to this bitswap service. // Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure // that those blocks are available in the blockstore before calling this function. diff --git a/bitswap/server/server.go b/bitswap/server/server.go index a3378d6c4..924bcee38 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -404,6 +404,17 @@ func (bs *Server) Stat() (Stat, error) { return s, nil } +// NotifyNewBlock announces the existence of block to this bitswap service. The +// service will potentially notify its peers. +// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure +// that those blocks are available in the blockstore before calling this function. +func (bs *Server) NotifyNewBlock(ctx context.Context, blk blocks.Block) error { + // Call to the variadic to avoid code duplication. + // This is actually fine to do because no calls is virtual the compiler is able + // to see that the slice does not leak and the slice is stack allocated. + return bs.NotifyNewBlocks(ctx, blk) +} + // NotifyNewBlocks announces the existence of blocks to this bitswap service. The // service will potentially notify its peers. // Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 773fb5303..3691640df 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -157,8 +157,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Debugf("BlockService.BlockAdded %s", c) if s.exchange != nil { - if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil { - logger.Errorf("NotifyNewBlocks: %s", err.Error()) + if err := s.exchange.NotifyNewBlock(ctx, o); err != nil { + logger.Errorf("NotifyNewBlock: %s", err.Error()) } } @@ -254,7 +254,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun if err != nil { return nil, err } - err = f.NotifyNewBlocks(ctx, blk) + err = f.NotifyNewBlock(ctx, blk) if err != nil { return nil, err } @@ -334,7 +334,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget return } - var cache [1]blocks.Block // preallocate once for all iterations for { var b blocks.Block select { @@ -355,13 +354,11 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget } // inform the exchange that the blocks are available - cache[0] = b - err = f.NotifyNewBlocks(ctx, cache[:]...) + err = f.NotifyNewBlock(ctx, b) if err != nil { logger.Errorf("could not tell the exchange about new blocks: %s", err) return } - cache[0] = nil // early gc select { case out <- b: @@ -391,6 +388,7 @@ func (s *blockService) Close() error { } type notifier interface { + NotifyNewBlock(context.Context, blocks.Block) error NotifyNewBlocks(context.Context, ...blocks.Block) error } diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 14396c8a1..4258a8c71 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -195,6 +195,11 @@ type notifyCountingExchange struct { notifyCount int } +func (n *notifyCountingExchange) NotifyNewBlock(ctx context.Context, blocks blocks.Block) error { + n.notifyCount++ + return n.Interface.NotifyNewBlock(ctx, blocks) +} + func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { n.notifyCount += len(blocks) return n.Interface.NotifyNewBlocks(ctx, blocks...) diff --git a/exchange/interface.go b/exchange/interface.go index 3ae174d5c..7d32960a5 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,6 +13,8 @@ import ( type Interface interface { // type Exchanger interface Fetcher + // NotifyNewBlock tells the exchange that a new block is available and can be served. + NotifyNewBlock(ctx context.Context, blocks blocks.Block) error // NotifyNewBlocks tells the exchange that new blocks are available and can be served. NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index f3590893e..1e8cdd2fe 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -34,6 +34,12 @@ func (e *offlineExchange) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block return blk, err } +// NotifyNewBlock tells the exchange that a new block is available and can be served. +func (e *offlineExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error { + // as an offline exchange we have nothing to do + return nil +} + // NotifyNewBlocks tells the exchange that new blocks are available and can be served. func (e *offlineExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { // as an offline exchange we have nothing to do