Skip to content

Commit

Permalink
Merge pull request #1072 from dusk-network/peer-bottleneck
Browse files Browse the repository at this point in the history
Use ringbuffer directly in favor of respChan
  • Loading branch information
jules authored Jun 10, 2021
2 parents 28f843e + a7303c2 commit 3ea984c
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 176 deletions.
20 changes: 15 additions & 5 deletions cmd/voucher/challenger/challenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/dusk-network/dusk-blockchain/cmd/voucher/node"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/peer"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/message"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
log "github.com/sirupsen/logrus"
)
Expand All @@ -25,16 +26,17 @@ const challengeLength = 20

// Challenger is the component responsible for vetting incoming connections.
type Challenger struct {
nodes *node.Store
nodes *node.Store
gossip *protocol.Gossip
}

// New creates a new, initialized Challenger.
func New(store *node.Store) *Challenger {
return &Challenger{nodes: store}
return &Challenger{nodes: store, gossip: protocol.NewGossip(protocol.TestNet)}
}

// SendChallenge to a connecting peer.
func (c *Challenger) SendChallenge(ctx context.Context, r *peer.Reader, w *peer.Writer, ch chan bytes.Buffer) {
func (c *Challenger) SendChallenge(ctx context.Context, r *peer.Reader, w *peer.Writer) {
challenge, err := generateRandomBytes(challengeLength)
if err != nil {
log.Panic(err)
Expand All @@ -45,12 +47,20 @@ func (c *Challenger) SendChallenge(ctx context.Context, r *peer.Reader, w *peer.
log.Panic(err)
}

ch <- *buf
if err := c.gossip.Process(buf); err != nil {
log.WithError(err).Warnln("could not send challenge")
return
}

if _, err := w.Write(buf.Bytes()); err != nil {
log.WithError(err).Warnln("could not send challenge")
return
}

// Enter the node into the store, for future reference.
c.nodes.Add(r.Addr(), challenge)

peer.Create(ctx, r, w, ch)
peer.Create(ctx, r, w)

c.nodes.SetInactive(r.Addr())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/candidate/requestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func TestRequestor(t *testing.T) {

streamer := eventbus.NewGossipStreamer(protocol.TestNet)

bus.Subscribe(topics.Gossip, eventbus.NewStreamListener(streamer))
l := eventbus.NewStreamListener(streamer)
bus.Subscribe(topics.Gossip, l)

cChan := make(chan block.Block, 1)

Expand Down
3 changes: 2 additions & 1 deletion pkg/core/chain/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func TestAcceptFromPeer(t *testing.T) {
}

streamer := eventbus.NewGossipStreamer(protocol.TestNet)
eb.Subscribe(topics.Gossip, eventbus.NewStreamListener(streamer))
l := eventbus.NewStreamListener(streamer)
eb.Subscribe(topics.Gossip, l)

blk := mockAcceptableBlock(*c.tip)

Expand Down
3 changes: 2 additions & 1 deletion pkg/core/consensus/reduction/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func PrepareSendReductionTest(hlp *Helper, stepFn consensus.PhaseFn) func(t *tes
require := require.New(t)

streamer := eventbus.NewGossipStreamer(protocol.TestNet)
hlp.EventBus.Subscribe(topics.Gossip, eventbus.NewStreamListener(streamer))
l := eventbus.NewStreamListener(streamer)
hlp.EventBus.Subscribe(topics.Gossip, l)

ctx, cancel := context.WithCancel(context.Background())
go func(cancel context.CancelFunc) {
Expand Down
16 changes: 7 additions & 9 deletions pkg/p2p/peer/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
peerCountTime = 30 * time.Second
)

type connectFunc func(context.Context, *Reader, *Writer, chan bytes.Buffer)
type connectFunc func(context.Context, *Reader, *Writer)

// Connector is responsible for accepting incoming connection requests, and
// establishing outward connections with desired peers.
Expand Down Expand Up @@ -168,9 +168,8 @@ func (c *Connector) Dial(addr string) (net.Conn, error) {
}

func (c *Connector) acceptConnection(conn net.Conn) {
writeQueueChan := make(chan bytes.Buffer, 1000)
pConn := NewConnection(conn, c.gossip)
peerReader := c.readerFactory.SpawnReader(pConn, writeQueueChan)
peerReader := c.readerFactory.SpawnReader(pConn)

if err := peerReader.Accept(c.services); err != nil {
log.WithField("process", "peer connector").
Expand All @@ -182,18 +181,17 @@ func (c *Connector) acceptConnection(conn net.Conn) {
WithField("address", peerReader.Addr()).
Debugln("incoming connection established")

peerWriter := NewWriter(pConn, c.eventBus)

c.addPeer(peerReader.Addr())

peerWriter := NewWriter(pConn, c.eventBus)

go func() {
c.connectFunc(context.Background(), peerReader, peerWriter, writeQueueChan)
c.connectFunc(context.Background(), peerReader, peerWriter)
c.removePeer(peerReader.Addr())
}()
}

func (c *Connector) proposeConnection(conn net.Conn) {
writeQueueChan := make(chan bytes.Buffer, 1000)
pConn := NewConnection(conn, c.gossip)
peerWriter := NewWriter(pConn, c.eventBus)

Expand All @@ -209,12 +207,12 @@ func (c *Connector) proposeConnection(conn net.Conn) {
WithField("address", address).
Debugln("outgoing connection established")

peerReader := c.readerFactory.SpawnReader(pConn, writeQueueChan)
peerReader := c.readerFactory.SpawnReader(pConn)

c.addPeer(peerWriter.Addr())

go func() {
c.connectFunc(context.Background(), peerReader, peerWriter, writeQueueChan)
c.connectFunc(context.Background(), peerReader, peerWriter)
c.removePeer(peerWriter.Addr())
}()
}
Expand Down
19 changes: 3 additions & 16 deletions pkg/p2p/peer/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@

package peer

import (
"bytes"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
)

// ReaderFactory is responsible for spawning peers. It provides them with the
// reference to the message processor, which will process the received messages.
type ReaderFactory struct {
Expand All @@ -25,18 +19,11 @@ func NewReaderFactory(processor *MessageProcessor) *ReaderFactory {

// SpawnReader returns a Reader. It will still need to be launched by
// running ReadLoop in a goroutine.
func (f *ReaderFactory) SpawnReader(conn *Connection, responseChan chan<- bytes.Buffer) *Reader {
func (f *ReaderFactory) SpawnReader(conn *Connection) *Reader {
reader := &Reader{
Connection: conn,
responseChan: responseChan,
processor: f.processor,
Connection: conn,
processor: f.processor,
}

// On each new connection the node sends topics.Mempool to retrieve mempool
// txs from the new peer
go func() {
responseChan <- topics.MemPool.ToBuffer()
}()

return reader
}
12 changes: 5 additions & 7 deletions pkg/p2p/peer/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package peer

import (
"bytes"
"net"
"os"
"testing"
Expand Down Expand Up @@ -37,22 +36,21 @@ func TestHandshake(t *testing.T) {

client, srv := net.Pipe()

g := protocol.NewGossip(protocol.TestNet)
pConn := NewConnection(client, g)
pw := NewWriter(pConn, eb)

go func() {
responseChan := make(chan bytes.Buffer, 100)
pConn := NewConnection(srv, protocol.NewGossip(protocol.TestNet))

peerReader := factory.SpawnReader(pConn, responseChan)
peerReader := factory.SpawnReader(pConn)
if err := peerReader.Accept(protocol.FullNode); err != nil {
panic(err)
}
}()

time.Sleep(500 * time.Millisecond)

g := protocol.NewGossip(protocol.TestNet)
pConn := NewConnection(client, g)
pw := NewWriter(pConn, eb)

defer func() {
_ = pw.Conn.Close()
}()
Expand Down
87 changes: 23 additions & 64 deletions pkg/p2p/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus/capi"

"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/checksum"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/protocol"
"github.com/dusk-network/dusk-blockchain/pkg/p2p/wire/topics"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
"github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus"
)

Expand Down Expand Up @@ -88,8 +90,7 @@ type Writer struct {
// other network nodes.
type Reader struct {
*Connection
processor *MessageProcessor
responseChan chan<- bytes.Buffer
processor *MessageProcessor
}

// NewWriter returns a Writer. It will still need to be initialized by
Expand Down Expand Up @@ -209,29 +210,27 @@ func (p *Reader) Accept(services protocol.ServiceFlag) error {
// Create two-way communication with a peer. This function will allow both
// goroutines to run as long as no errors are encountered. Once the first error
// comes through, the context is canceled, and both goroutines are cleaned up.
func Create(ctx context.Context, reader *Reader, writer *Writer, writeQueueChan chan bytes.Buffer) {
func Create(ctx context.Context, reader *Reader, writer *Writer) {
pCtx, cancel := context.WithCancel(ctx)
defer cancel()

g := &GossipConnector{writer.Connection}
l := eventbus.NewStreamListener(g)
writer.gossipID = writer.subscriber.Subscribe(topics.Gossip, l)
ringBuf := ring.NewBuffer(1000)

// On each new connection the node sends topics.Mempool to retrieve mempool
// txs from the new peer
buf := topics.MemPool.ToBuffer()
if !ringBuf.Put(buf.Bytes()) {
logrus.WithField("process", "peer").
Errorln("could not send mempool message to peer")
}

go func() {
reader.ReadLoop(pCtx)
cancel()
}()
_ = ring.NewConsumer(ringBuf, eventbus.Consume, g)

writer.Serve(pCtx, writeQueueChan)
cancel()
}

// Serve utilizes two different methods for writing to the open connection.
func (w *Writer) Serve(ctx context.Context, writeQueueChan <-chan bytes.Buffer) {
// Any gossip topics are written into interrupt-driven ringBuffer
// Single-consumer pushes messages to the socket
g := &GossipConnector{w.Connection}
w.gossipID = w.subscriber.Subscribe(topics.Gossip, eventbus.NewStreamListener(g))

// writeQueue - FIFO queue
// writeLoop pushes first-in message to the socket
w.writeLoop(ctx, writeQueueChan)
w.onDisconnect()
reader.ReadLoop(pCtx, ringBuf)
writer.onDisconnect()
}

func (w *Writer) onDisconnect() {
Expand Down Expand Up @@ -260,50 +259,10 @@ func (w *Writer) onDisconnect() {
}
}

func (w *Writer) writeLoop(ctx context.Context, writeQueueChan <-chan bytes.Buffer) {
for {
select {
case buf := <-writeQueueChan:
if !canRoute(w.services, topics.Topic(buf.Bytes()[0])) {
l.WithField("topic", topics.Topic(buf.Bytes()[0]).String()).
WithField("service flag", w.services).
Warnln("dropping message")
continue
}

if err := w.gossip.Process(&buf); err != nil {
l.WithError(err).Warnln("error processing outgoing message")
continue
}

if _, err := w.Connection.Write(buf.Bytes()); err != nil {
l.WithField("process", "writeloop").
WithError(err).Warnln("error writing message")
return
}
case <-ctx.Done():
log.WithField("process", "writeloop").Debug("context canceled")
return
}
}
}

// ReadLoop will block on the read until a message is read, or until the deadline
// is reached. Should be called in a go-routine, after a successful handshake with
// a peer. Eventual duplicated messages are silently discarded.
func (p *Reader) ReadLoop(ctx context.Context) {
// As the peer ReadLoop is at the front-line of P2P network, receiving a
// malformed frame by an adversary node could lead to a panic.
// In such situation, the node should survive but adversary conn gets dropped
// defer func() {
// if r := recover(); r != nil {
// log.Errorf("Peer %s failed with critical issue: %v", p.RemoteAddr(), r)
// }
// }()
p.readLoop(ctx)
}

func (p *Reader) readLoop(ctx context.Context) {
func (p *Reader) ReadLoop(ctx context.Context, ringBuf *ring.Buffer) {
defer func() {
_ = p.Conn.Close()
}()
Expand Down Expand Up @@ -364,7 +323,7 @@ func (p *Reader) readLoop(ctx context.Context) {
// or blacklist spammers
startTime := time.Now().UnixNano()

if _, err = p.processor.Collect(p.Addr(), message, p.responseChan, p.services, nil); err != nil {
if _, err = p.processor.Collect(p.Addr(), message, ringBuf, p.services, nil); err != nil {
l.WithField("process", "readloop").WithField("cs", hex.EncodeToString(cs)).
WithError(err).Error("failed to process message")
}
Expand Down
Loading

0 comments on commit 3ea984c

Please sign in to comment.