Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement error codes spec #2927

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"fmt"
"io"

ic "github.com/libp2p/go-libp2p/core/crypto"
Expand All @@ -11,6 +12,17 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

type ConnErrorCode uint32

type ConnError struct {
Remote bool
ErrorCode ConnErrorCode
}

func (c *ConnError) Error() string {
return fmt.Sprintf("connection closed: code: %d", c.ErrorCode)
}

// Conn is a connection to a remote peer. It multiplexes streams.
// Usually there is no need to use a Conn directly, but it may
// be useful to get information about the peer on the other side:
Expand All @@ -24,6 +36,11 @@ type Conn interface {
ConnStat
ConnScoper

// CloseWithError closes the connection with errCode. The errCode is sent to the
// peer on a best effort basis. For transports that do not support sending error
// codes on connection close, the behavior is identical to calling Close.
CloseWithError(errCode ConnErrorCode) error

// ID returns an identifier that uniquely identifies this Conn within this
// host, during this run. Connection IDs may repeat across restarts.
ID() string
Expand Down
29 changes: 29 additions & 0 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
Expand All @@ -11,6 +12,21 @@ import (
// ErrReset is returned when reading or writing on a reset stream.
var ErrReset = errors.New("stream reset")

type StreamErrorCode uint32

type StreamError struct {
ErrorCode StreamErrorCode
Remote bool
}

func (s *StreamError) Error() string {
return fmt.Sprintf("stream reset: code: %d", s.ErrorCode)
}

func (s *StreamError) Is(target error) bool {
return target == ErrReset
}

// MuxedStream is a bidirectional io pipe within a connection.
type MuxedStream interface {
io.Reader
Expand Down Expand Up @@ -61,6 +77,13 @@ type MuxedStream interface {
SetWriteDeadline(time.Time) error
}

type ResetWithErrorer interface {
// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// to the peer on a best effort basis. For transports that do not support sending
// error codes to remote peer, the behavior is identical to calling Reset
ResetWithError(errCode StreamErrorCode) error
}

// MuxedConn represents a connection to a remote peer that has been
// extended to support stream multiplexing.
//
Expand All @@ -86,6 +109,12 @@ type MuxedConn interface {
AcceptStream() (MuxedStream, error)
}

type CloseWithErrorer interface {
// CloseWithError closes the connection with errCode. The errCode is sent
// to the peer.
CloseWithError(errCode ConnErrorCode) error
}

// Multiplexer wraps a net.Conn with a stream multiplexing
// implementation and returns a MuxedConn that supports opening
// multiple streams over the underlying net.Conn
Expand Down
4 changes: 4 additions & 0 deletions core/network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ type Stream interface {

// Scope returns the user's view of this stream's resource scope
Scope() StreamScope

// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// to the peer.
ResetWithError(errCode StreamErrorCode) error
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/libp2p/go-nat v0.2.0
github.com/libp2p/go-netroute v0.2.1
github.com/libp2p/go-reuseport v0.4.0
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/libp2p/go-yamux/v4 v4.0.2-0.20240828193053-e17eaa82d8a7
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ github.com/libp2p/go-netroute v0.2.1 h1:V8kVrpD8GK0Riv15/7VN6RbUQ3URNZVosw7H2v9t
github.com/libp2p/go-netroute v0.2.1/go.mod h1:hraioZr0fhBjG0ZRXJJ6Zj2IVEVNx6tDTFQfSmcq7mQ=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/libp2p/go-yamux/v4 v4.0.1 h1:FfDR4S1wj6Bw2Pqbc8Uz7pCxeRBPbwsBbEdfwiCypkQ=
github.com/libp2p/go-yamux/v4 v4.0.1/go.mod h1:NWjl8ZTLOGlozrXSOZ/HlfG++39iKNnM5wwmtQP1YB4=
github.com/libp2p/go-yamux/v4 v4.0.2-0.20240828193053-e17eaa82d8a7 h1:9DQhrYNrteSCiE8EZC1Na9AZNothvTF+NQtbnOjbxzo=
github.com/libp2p/go-yamux/v4 v4.0.2-0.20240828193053-e17eaa82d8a7/go.mod h1:PGP+3py2ZWDKABvqstBZtMnixEHNC7U/odnGylzur5o=
github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q=
github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
Expand Down
8 changes: 6 additions & 2 deletions p2p/muxer/yamux/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (c *conn) Close() error {
return c.yamux().Close()
}

func (c *conn) CloseWithError(errCode network.ConnErrorCode) error {
return c.yamux().CloseWithError(uint32(errCode))
}

// IsClosed checks if yamux.Session is in closed state.
func (c *conn) IsClosed() bool {
return c.yamux().IsClosed()
Expand All @@ -32,7 +36,7 @@ func (c *conn) IsClosed() bool {
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.yamux().OpenStream(ctx)
if err != nil {
return nil, err
return nil, parseResetError(err)
}

return (*stream)(s), nil
Expand All @@ -41,7 +45,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.yamux().AcceptStream()
return (*stream)(s), err
return (*stream)(s), parseResetError(err)
}

func (c *conn) yamux() *yamux.Session {
Expand Down
32 changes: 22 additions & 10 deletions p2p/muxer/yamux/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package yamux

import (
"errors"
"time"

"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -13,22 +14,29 @@ type stream yamux.Stream

var _ network.MuxedStream = &stream{}

func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
func parseResetError(err error) error {
if err == nil {
return err
}
se := &yamux.StreamError{}
if errors.As(err, &se) {
return &network.StreamError{Remote: se.Remote, ErrorCode: network.StreamErrorCode(se.ErrorCode)}
}
ce := &yamux.GoAwayError{}
if errors.As(err, &ce) {
return &network.ConnError{Remote: ce.Remote, ErrorCode: network.ConnErrorCode(ce.ErrorCode)}
}
return err
}

return n, err
func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
return n, parseResetError(err)
}

func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.yamux().Write(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
}

return n, err
return n, parseResetError(err)
}

func (s *stream) Close() error {
Expand All @@ -39,6 +47,10 @@ func (s *stream) Reset() error {
return s.yamux().Reset()
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
return s.yamux().ResetWithError(uint32(errCode))
}

func (s *stream) CloseRead() error {
return s.yamux().CloseRead()
}
Expand Down
1 change: 1 addition & 0 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ type mockConn struct {
}

func (m mockConn) Close() error { panic("implement me") }
func (m mockConn) CloseWithError(errCode network.ConnErrorCode) error { panic("implement me") }
func (m mockConn) LocalPeer() peer.ID { panic("implement me") }
func (m mockConn) RemotePeer() peer.ID { panic("implement me") }
func (m mockConn) RemotePublicKey() crypto.PubKey { panic("implement me") }
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func (s *stream) Reset() error {
return nil
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
panic("not implemented")
}

func (s *stream) teardown() {
// at this point, no streams are writing.
s.conn.removeStream(s)
Expand Down
8 changes: 8 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,14 @@ func (c connWithMetrics) Close() error {
return c.CapableConn.Close()
}

func (c connWithMetrics) CloseWithError(errCode network.ConnErrorCode) error {
c.metricsTracer.ClosedConnection(c.dir, time.Since(c.opened), c.ConnState(), c.LocalMultiaddr())
if ce, ok := c.CapableConn.(network.CloseWithErrorer); ok {
return ce.CloseWithError(errCode)
}
return c.CapableConn.Close()
}

func (c connWithMetrics) Stat() network.ConnStats {
if cs, ok := c.CapableConn.(network.ConnStat); ok {
return cs.Stat()
Expand Down
23 changes: 20 additions & 3 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,20 @@ func (c *Conn) ID() string {
// open notifications must finish before we can fire off the close
// notifications).
func (c *Conn) Close() error {
c.closeOnce.Do(c.doClose)
c.closeOnce.Do(func() {
c.doClose(0)
})
return c.err
}

func (c *Conn) doClose() {
func (c *Conn) CloseWithError(errCode network.ConnErrorCode) error {
c.closeOnce.Do(func() {
c.doClose(errCode)
})
return c.err
}

func (c *Conn) doClose(errCode network.ConnErrorCode) {
c.swarm.removeConn(c)

// Prevent new streams from opening.
Expand All @@ -71,7 +80,15 @@ func (c *Conn) doClose() {
c.streams.m = nil
c.streams.Unlock()

c.err = c.conn.Close()
if errCode != 0 {
if ce, ok := c.conn.(network.CloseWithErrorer); ok {
c.err = ce.CloseWithError(errCode)
} else {
c.err = c.conn.Close()
}
} else {
c.err = c.conn.Close()
}

// Send the connectedness event after closing the connection.
// This ensures that both remote connection close and local connection
Expand Down
11 changes: 11 additions & 0 deletions p2p/net/swarm/swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ func (s *Stream) Reset() error {
return err
}

func (s *Stream) ResetWithError(errCode network.StreamErrorCode) error {
var err error
if se, ok := s.stream.(network.ResetWithErrorer); ok {
err = se.ResetWithError(errCode)
} else {
err = s.stream.Reset()
}
s.closeAndRemoveStream()
return err
}

func (s *Stream) closeAndRemoveStream() {
s.closeMx.Lock()
defer s.closeMx.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions p2p/net/upgrader/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,10 @@ func (t *transportConn) ConnState() network.ConnectionState {
UsedEarlyMuxerNegotiation: t.usedEarlyMuxerNegotiation,
}
}

func (t *transportConn) CloseWithError(errCode network.ConnErrorCode) error {
if ce, ok := t.MuxedConn.(network.CloseWithErrorer); ok {
return ce.CloseWithError(errCode)
}
return t.Close()
}
Loading
Loading