Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat: use session key type from exchange interface
Browse files Browse the repository at this point in the history
This allows multiple subsystems to share the same conceptual "session" and
removes the need for managing session IDs.
  • Loading branch information
Stebalien committed Apr 23, 2020
1 parent 140cb07 commit de85e28
Show file tree
Hide file tree
Showing 18 changed files with 111 additions and 111 deletions.
4 changes: 2 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
wm = bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(ctx context.Context, id exchange.SessionID, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
Expand All @@ -151,7 +151,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
sessionPeerManagerFactory := func(ctx context.Context, id exchange.SessionID) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
}
notif := notifications.New()
Expand Down
15 changes: 8 additions & 7 deletions internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-metrics-interface"

Expand All @@ -23,7 +24,7 @@ type PeerQueue interface {
}

type Session interface {
ID() uint64
ID() exchange.SessionID
SignalAvailability(peer.ID, bool)
}

Expand All @@ -42,8 +43,8 @@ type PeerManager struct {
ctx context.Context

psLk sync.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}
sessions map[exchange.SessionID]Session
peerSessions map[peer.ID]map[exchange.SessionID]struct{}

self peer.ID
}
Expand All @@ -58,8 +59,8 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *P
ctx: ctx,
self: self,

sessions: make(map[uint64]Session),
peerSessions: make(map[peer.ID]map[uint64]struct{}),
sessions: make(map[exchange.SessionID]Session),
peerSessions: make(map[peer.ID]map[exchange.SessionID]struct{}),
}
}

Expand Down Expand Up @@ -202,7 +203,7 @@ func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {
}

if _, ok := pm.peerSessions[p]; !ok {
pm.peerSessions[p] = make(map[uint64]struct{})
pm.peerSessions[p] = make(map[exchange.SessionID]struct{})
}
pm.peerSessions[p][s.ID()] = struct{}{}

Expand All @@ -212,7 +213,7 @@ func (pm *PeerManager) RegisterSession(p peer.ID, s Session) bool {

// UnregisterSession tells the PeerManager that the given session is no longer
// interested in PeerManager events.
func (pm *PeerManager) UnregisterSession(ses uint64) {
func (pm *PeerManager) UnregisterSession(ses exchange.SessionID) {
pm.psLk.Lock()
defer pm.psLk.Unlock()

Expand Down
9 changes: 5 additions & 4 deletions internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ipfs/go-bitswap/internal/testutil"
cid "github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"

"github.com/libp2p/go-libp2p-core/peer"
)
Expand Down Expand Up @@ -255,19 +256,19 @@ func TestSendCancels(t *testing.T) {
}
}

func (s *sess) ID() uint64 {
func (s *sess) ID() exchange.SessionID {
return s.id
}
func (s *sess) SignalAvailability(p peer.ID, isAvailable bool) {
s.available[p] = isAvailable
}

type sess struct {
id uint64
id exchange.SessionID
available map[peer.ID]bool
}

func newSess(id uint64) *sess {
func newSess(id exchange.SessionID) *sess {
return &sess{id, make(map[peer.ID]bool)}
}

Expand All @@ -281,7 +282,7 @@ func TestSessionRegistration(t *testing.T) {
self, p1, p2 := tp[0], tp[1], tp[2]
peerManager := New(ctx, peerQueueFactory, self)

id := uint64(1)
id := testutil.GenerateSessionID()
s := newSess(id)
peerManager.RegisterSession(p1, s)
if s.available[p1] {
Expand Down
13 changes: 7 additions & 6 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"
Expand All @@ -30,10 +31,10 @@ const (
type WantManager interface {
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, uint64, []cid.Cid)
BroadcastWantHaves(context.Context, exchange.SessionID, []cid.Cid)
// RemoveSession removes the session from the WantManager (when the
// session shuts down)
RemoveSession(context.Context, uint64)
RemoveSession(context.Context, exchange.SessionID)
}

// PeerManager keeps track of which sessions are interested in which peers
Expand All @@ -44,7 +45,7 @@ type PeerManager interface {
RegisterSession(peer.ID, bspm.Session) bool
// UnregisterSession tells the PeerManager that the session is no longer
// interested in a peer's connection state
UnregisterSession(uint64)
UnregisterSession(exchange.SessionID)
// SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
}
Expand Down Expand Up @@ -122,15 +123,15 @@ type Session struct {
// identifiers
notif notifications.PubSub
uuid logging.Loggable
id uint64
id exchange.SessionID

self peer.ID
}

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context,
id uint64,
id exchange.SessionID,
wm WantManager,
sprm SessionPeerManager,
providerFinder ProviderFinder,
Expand Down Expand Up @@ -166,7 +167,7 @@ func New(ctx context.Context,
return s
}

func (s *Session) ID() uint64 {
func (s *Session) ID() exchange.SessionID {
return s.id
}

Expand Down
10 changes: 6 additions & 4 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-core/peer"
)

Expand All @@ -31,16 +32,16 @@ func newFakeWantManager() *fakeWantManager {
}
}

func (fwm *fakeWantManager) BroadcastWantHaves(ctx context.Context, sesid uint64, cids []cid.Cid) {
func (fwm *fakeWantManager) BroadcastWantHaves(ctx context.Context, sesid exchange.SessionID, cids []cid.Cid) {
select {
case fwm.wantReqs <- wantReq{cids}:
case <-ctx.Done():
}
}
func (fwm *fakeWantManager) RemoveSession(context.Context, uint64) {}
func (fwm *fakeWantManager) RemoveSession(context.Context, exchange.SessionID) {}

func newFakeSessionPeerManager() *bsspm.SessionPeerManager {
return bsspm.New(1, newFakePeerTagger())
return bsspm.New(testutil.GenerateSessionID(), newFakePeerTagger())
}

type fakePeerTagger struct {
Expand Down Expand Up @@ -86,7 +87,7 @@ func newFakePeerManager() *fakePeerManager {
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) bool {
return true
}
func (pm *fakePeerManager) UnregisterSession(uint64) {}
func (pm *fakePeerManager) UnregisterSession(exchange.SessionID) {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}

func TestSessionGetBlocks(t *testing.T) {
Expand Down Expand Up @@ -459,6 +460,7 @@ func TestSessionReceiveMessageAfterShutdown(t *testing.T) {
notif := notifications.New()
defer notif.Shutdown()
id := testutil.GenerateSessionID()

session := New(ctx, id, fwm, fpm, fpf, sim, newFakePeerManager(), bpm, notif, time.Second, delay.Fixed(time.Minute), "")
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(2)
Expand Down
7 changes: 4 additions & 3 deletions internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
exchange "github.com/ipfs/go-ipfs-exchange-interface"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -79,7 +80,7 @@ type sessionWantSender struct {
// finished shutting down
closed chan struct{}
// The session ID
sessionID uint64
sessionID exchange.SessionID
// A channel that collects incoming changes (events)
changes chan change
// Information about each want indexed by CID
Expand All @@ -102,7 +103,7 @@ type sessionWantSender struct {
onPeersExhausted onPeersExhaustedFn
}

func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager,
func newSessionWantSender(sid exchange.SessionID, pm PeerManager, spm SessionPeerManager,
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -127,7 +128,7 @@ func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager,
return sws
}

func (sws *sessionWantSender) ID() uint64 {
func (sws *sessionWantSender) ID() exchange.SessionID {
return sws.sessionID
}

Expand Down
25 changes: 13 additions & 12 deletions internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
bspm "github.com/ipfs/go-bitswap/internal/peermanager"
"github.com/ipfs/go-bitswap/internal/testutil"
cid "github.com/ipfs/go-cid"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-core/peer"
)

Expand Down Expand Up @@ -66,7 +67,7 @@ func (pm *mockPeerManager) RegisterSession(p peer.ID, sess bspm.Session) bool {
return true
}

func (pm *mockPeerManager) UnregisterSession(sesid uint64) {
func (pm *mockPeerManager) UnregisterSession(sesid exchange.SessionID) {
}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestSendWants(t *testing.T) {
cids := testutil.GenerateCids(4)
peers := testutil.GeneratePeers(1)
peerA := peers[0]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -170,7 +171,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -228,7 +229,7 @@ func TestReceiveBlock(t *testing.T) {
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -288,7 +289,7 @@ func TestPeerUnavailable(t *testing.T) {
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -353,7 +354,7 @@ func TestPeersExhausted(t *testing.T) {
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -429,7 +430,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -477,7 +478,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -516,7 +517,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
func TestConsecutiveDontHaveLimit(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -572,7 +573,7 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -627,7 +628,7 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down Expand Up @@ -711,7 +712,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
sid := testutil.GenerateSessionID()
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
Expand Down
Loading

0 comments on commit de85e28

Please sign in to comment.