Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat: StreamBlocks() #327

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (

bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/internal/session"
bsnet "github.com/ipfs/go-bitswap/network"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
bsnet "github.com/ipfs/go-bitswap/network"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
Expand Down Expand Up @@ -99,6 +99,8 @@ var benches = []bench{
bench{"10Nodes-OnePeerPerBlock-BigBatch", 10, 100, onePeerPerBlock, batchFetchAll},
// - request 1, then 10, then 89 blocks (similar to how IPFS would fetch a file)
bench{"10Nodes-OnePeerPerBlock-UnixfsFetch", 10, 100, onePeerPerBlock, unixfsFileFetch},
// - request 1, then 10, then 89 blocks using StreamBlocks
bench{"10Nodes-OnePeerPerBlock-UnixfsStream", 10, 100, onePeerPerBlock, unixfsStreamFetch},

// Fetch from 199 seed nodes, all nodes have all blocks, fetch all 20 blocks with a single GetBlocks() call
bench{"200Nodes-AllToAll-BigBatch", 200, 20, allToAll, batchFetchAll},
Expand Down Expand Up @@ -572,6 +574,28 @@ func unixfsFileFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
}
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
// using StreamBlocks()
func unixfsStreamFetch(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
ksch := make(chan []cid.Cid)
out, err := ses.StreamBlocks(context.Background(), ksch)
if err != nil {
b.Fatal(err)
}

ksch <- ks[:1]
<-out
ksch <- ks[1:11]
for i := 0; i < 10; i++ {
<-out
}
ksch <- ks[11:]
for i := 0; i < 81; i++ {
<-out
}
}

func unixfsFileFetchLarge(b *testing.B, bs *bitswap.Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
_, err := ses.GetBlock(context.Background(), ks[0])
Expand Down
35 changes: 27 additions & 8 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (

var log = logging.Logger("bitswap")

var _ exchange.SessionExchange = (*Bitswap)(nil)
// var _ exchange.SessionExchange = (*Bitswap)(nil)

const (
// these requests take at _least_ two minutes at the moment.
Expand Down Expand Up @@ -144,7 +144,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
notif *notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
Expand Down Expand Up @@ -197,7 +197,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
go func() {
<-px.Closing() // process closes first
cancelFunc()
notif.Shutdown()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first

Expand Down Expand Up @@ -225,7 +224,7 @@ type Bitswap struct {
blockstore blockstore.Blockstore

// manages channels of outgoing blocks for sessions
notif notifications.PubSub
notif *notifications.PubSub

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
Expand Down Expand Up @@ -294,9 +293,13 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
return bs.engine.LedgerForPeer(p)
}

// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
// GetBlocks returns a stream of blocks, given a list of CIDs. It will
// return blocks in any order.
//
// To wait for all remaining blocks, close the CID channel and wait for
// the blocks channel to be closed. A closed channel does not mean that
// _all_ blocks were retrieved, it just means that the fetcher is done
// retrieving blocks.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
Expand All @@ -306,6 +309,22 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return session.GetBlocks(ctx, keys)
}

// StreamBlocks returns a stream of blocks, given a stream of CIDs. It will
// return blocks in any order.
//
// To wait for all remaining blocks, close the CID channel and wait for
// the blocks channel to be closed. A closed channel does not mean that
// _all_ blocks were retrieved, it just means that the fetcher is done
// retrieving blocks.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) StreamBlocks(ctx context.Context, keys <-chan []cid.Cid) (<-chan blocks.Block, error) {
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.StreamBlocks(ctx, keys)
}

// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
Expand Down Expand Up @@ -530,6 +549,6 @@ func (bs *Bitswap) IsOnline() bool {
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
func (bs *Bitswap) NewSession(ctx context.Context) bssm.Fetcher {
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
58 changes: 58 additions & 0 deletions bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,64 @@ func TestSessionBetweenPeers(t *testing.T) {
}
}

func TestSessionBetweenPeersStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()

inst := ig.Instances(10)

// Add 101 blocks to Peer A
blks := bgen.Blocks(101)
if err := inst[0].Blockstore().PutMany(blks); err != nil {
t.Fatal(err)
}

var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}

// Create a session on Peer B
ses := inst[1].Exchange.NewSession(ctx)
if _, err := ses.GetBlock(ctx, cids[0]); err != nil {
t.Fatal(err)
}
blks = blks[1:]
cids = cids[1:]

// Fetch blocks with the session, 10 at a time
ksch := make(chan []cid.Cid)
ch, err := ses.StreamBlocks(ctx, ksch)
for i := 0; i < 10; i++ {
ksch <- cids[i*10 : (i+1)*10]
if err != nil {
t.Fatal(err)
}

var got []blocks.Block
for i := 0; i < 10; i++ {
got = append(got, <-ch)
}
if err := assertBlockLists(got, blks[i*10:(i+1)*10]); err != nil {
t.Fatal(err)
}
}
for _, is := range inst[2:] {
stat, err := is.Exchange.Stat()
if err != nil {
t.Fatal(err)
}
if stat.MessagesReceived > 2 {
t.Fatal("uninvolved nodes should only receive two messages", stat.MessagesReceived)
}
}
}

func TestSessionSplitFetch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
137 changes: 78 additions & 59 deletions internal/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,75 +58,94 @@ func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block,
}

// WantFunc is any function that can express a want for set of blocks.
type WantFunc func(context.Context, []cid.Cid)

// AsyncGetBlocks take a set of block cids, a pubsub channel for incoming
// blocks, a want function, and a close function, and returns a channel of
// incoming blocks.
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid, notif notifications.PubSub,
want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {

// If there are no keys supplied, just return a closed channel
if len(keys) == 0 {
out := make(chan blocks.Block)
close(out)
return out, nil
}

// Use a PubSub notifier to listen for incoming blocks for each key
remaining := cid.NewSet()
promise := notif.Subscribe(ctx, keys...)
for _, k := range keys {
log.Debugw("Bitswap.GetBlockRequest.Start", "cid", k)
remaining.Add(k)
}

// Send the want request for the keys to the network
want(ctx, keys)

type WantFunc func([]cid.Cid)

// AsyncGetBlocks listens for the blocks corresponding to the requested wants,
// and outputs them on the returned channel.
// If the wants channel is closed and all wanted blocks are received, closes
// the returned channel.
// If the session context or request context are cancelled, calls cancelWants
// with all pending wants and closes the returned channel.
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, wants <-chan []cid.Cid, notif *notifications.PubSub,
want WantFunc, cancelWants func([]cid.Cid)) (<-chan blocks.Block, error) {

// Channel of blocks to return to the client
out := make(chan blocks.Block)
go handleIncoming(ctx, sessctx, remaining, promise, out, cwants)
return out, nil
}

// Listens for incoming blocks, passing them to the out channel.
// If the context is cancelled or the incoming channel closes, calls cfun with
// any keys corresponding to blocks that were never received.
func handleIncoming(ctx context.Context, sessctx context.Context, remaining *cid.Set,
in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) {

ctx, cancel := context.WithCancel(ctx)

// Clean up before exiting this function, and call the cancel function on
// any remaining keys
defer func() {
cancel()
close(out)
// can't just defer this call on its own, arguments are resolved *when* the defer is created
cfun(remaining.Keys())
}()
// Keep track of which wants we haven't yet received a block
pending := cid.NewSet()

for {
select {
case blk, ok := <-in:
// If the channel is closed, we're done (note that PubSub closes
// the channel once all the keys have been received)
if !ok {
return
// Use a PubSub notifier to listen for incoming blocks for each key
sub := notif.NewSubscription()

go func() {
// Before exiting
defer func() {
// Close the client's channel of blocks
close(out)
// Close the subscription
sub.Close()

// Cancel any pending wants
if pending.Len() > 0 {
cancelWants(pending.Keys())
}
}()

remaining.Remove(blk.Cid())
blksCh := sub.Blocks()
for {
select {
case out <- blk:

// For each wanted key
case ks, ok := <-wants:
// Stop receiving from the channel if it's closed
if !ok {
wants = nil
if pending.Len() == 0 {
return
}
} else {
for _, k := range ks {
// Record that the want is pending
log.Debugw("Bitswap.GetBlockRequest.Start", "cid", k)
pending.Add(k)
}

// Add the keys to the subscriber so that we'll be notified
// if the corresponding block arrives
sub.Add(ks...)

// Send the want request for the keys to the network
want(ks)
}

// For each received block
case blk := <-blksCh:
// Remove the want from the pending set
pending.Remove(blk.Cid())

// Send the block to the client
select {
case out <- blk:
case <-ctx.Done():
return
case <-sessctx.Done():
return
}

// If the wants channel has been closed, and we're not
// expecting any more blocks, exit
if wants == nil && pending.Len() == 0 {
return
}

case <-ctx.Done():
return
case <-sessctx.Done():
return
}
case <-ctx.Done():
return
case <-sessctx.Done():
return
}
}
}()

return out, nil
}
Loading