Skip to content

Commit

Permalink
send error codes on transport conn and stream failures
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Nov 21, 2024
1 parent e5acd28 commit 1ad628f
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 17 deletions.
14 changes: 13 additions & 1 deletion p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,19 @@ func (s *stream) Reset() error {
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
panic("not implemented")
// Cancel any pending reads/writes with an error.
// TODO: Should these be the other way round(remote=true)?
s.write.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode})
s.read.CloseWithError(&network.StreamError{Remote: false, ErrorCode: errCode})

select {
case s.reset <- struct{}{}:
default:
}
<-s.closed

// No meaningful error case here.
return nil
}

func (s *stream) teardown() {
Expand Down
8 changes: 6 additions & 2 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
// If we do this in the Upgrader, we will not be able to do this.
if s.gater != nil {
if allow, _ := s.gater.InterceptUpgraded(c); !allow {
// TODO Send disconnect with reason here
err := tc.Close()
var err error
if tcc, ok := tc.(network.CloseWithErrorer); ok {
err = tcc.CloseWithError(network.ConnGated)
} else {
err = tc.Close()
}
if err != nil {
log.Warnf("failed to close connection with peer %s and addr %s; err: %s", p, addr, err)
}
Expand Down
6 changes: 5 additions & 1 deletion p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ func (c *Conn) start() {
}
scope, err := c.swarm.ResourceManager().OpenStream(c.RemotePeer(), network.DirInbound)
if err != nil {
ts.Reset()
if tse, ok := ts.(network.ResetWithErrorer); ok {
tse.ResetWithError(network.StreamResourceLimitExceeded)
} else {
ts.Reset()
}
continue
}
c.swarm.refs.Add(1)
Expand Down
6 changes: 5 additions & 1 deletion p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ func (l *listener) handleIncoming() {
// if we stop accepting connections for some reason,
// we'll eventually close all the open ones
// instead of hanging onto them.
conn.Close()
if cc, ok := conn.(network.CloseWithErrorer); ok {
cc.CloseWithError(network.ConnRateLimited)
} else {
conn.Close()
}
}
}()
}
Expand Down
5 changes: 2 additions & 3 deletions p2p/transport/quic/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
tpt "github.com/libp2p/go-libp2p/core/transport"
p2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"

ma "github.com/multiformats/go-multiaddr"
"github.com/quic-go/quic-go"
)
Expand Down Expand Up @@ -54,12 +53,12 @@ func (l *listener) Accept() (tpt.CapableConn, error) {
c, err := l.wrapConn(qconn)
if err != nil {
log.Debugf("failed to setup connection: %s", err)
qconn.CloseWithError(1, "")
qconn.CloseWithError(quic.ApplicationErrorCode(network.ConnResourceLimitExceeded), "")
continue
}
l.transport.addConn(qconn, c)
if l.transport.gater != nil && !(l.transport.gater.InterceptAccept(c) && l.transport.gater.InterceptSecured(network.DirInbound, c.remotePeerID, c)) {
c.closeWithError(errorCodeConnectionGating, "connection gated")
c.closeWithError(quic.ApplicationErrorCode(network.ConnGated), "connection gated")
continue
}

Expand Down
4 changes: 1 addition & 3 deletions p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ var ErrHolePunching = errors.New("hole punching attempted; no active dial")

var HolePunchTimeout = 5 * time.Second

const errorCodeConnectionGating = 0x47415445 // GATE in ASCII

// The Transport implements the tpt.Transport interface for QUIC connections.
type transport struct {
privKey ic.PrivKey
Expand Down Expand Up @@ -169,7 +167,7 @@ func (t *transport) dialWithScope(ctx context.Context, raddr ma.Multiaddr, p pee
remoteMultiaddr: raddr,
}
if t.gater != nil && !t.gater.InterceptSecured(network.DirOutbound, p, c) {
pconn.CloseWithError(errorCodeConnectionGating, "connection gated")
pconn.CloseWithError(quic.ApplicationErrorCode(network.ConnGated), "connection gated")
return nil, fmt.Errorf("secured connection gated")
}
t.addConn(pconn, c)
Expand Down
3 changes: 2 additions & 1 deletion p2p/transport/quic/virtuallistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libp2pquic
import (
"sync"

"github.com/libp2p/go-libp2p/core/network"
tpt "github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"

Expand Down Expand Up @@ -142,8 +143,8 @@ func (r *acceptLoopRunner) innerAccept(l *listener, expectedVersion quic.Version
select {
case ch <- acceptVal{conn: conn}:
default:
conn.(network.CloseWithErrorer).CloseWithError(network.ConnRateLimited)
// accept queue filled up, drop the connection
conn.Close()
log.Warn("Accept queue filled. Dropping connection.")
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/transport/quicreuse/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
"github.com/quic-go/quic-go"
Expand Down Expand Up @@ -212,7 +213,7 @@ func (l *listener) Close() error {
close(l.queue)
// drain the queue
for conn := range l.queue {
conn.CloseWithError(1, "closing")
conn.CloseWithError(quic.ApplicationErrorCode(network.ConnShutdown), "closing")
}
})
return nil
Expand Down
4 changes: 0 additions & 4 deletions p2p/transport/webtransport/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func (s *stream) Reset() error {
return nil
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
panic("not implemented")
}

func (s *stream) Close() error {
s.Stream.CancelRead(reset)
return s.Stream.Close()
Expand Down

0 comments on commit 1ad628f

Please sign in to comment.