diff --git a/cmd/retransmitter/retransmit/network/incoming_peer.go b/cmd/retransmitter/retransmit/network/incoming_peer.go index ab7ad13bc..af6b65e54 100644 --- a/cmd/retransmitter/retransmit/network/incoming_peer.go +++ b/cmd/retransmitter/retransmit/network/incoming_peer.go @@ -23,16 +23,16 @@ type IncomingPeer struct { } type incomingPeerID struct { - remoteAddr net.Addr - localAddr net.Addr + remoteAddr string + localAddr string } func newPeerID(remoteAddr net.Addr, localAddr net.Addr) incomingPeerID { - return incomingPeerID{remoteAddr: remoteAddr, localAddr: localAddr} + return incomingPeerID{remoteAddr: remoteAddr.String(), localAddr: localAddr.String()} } func (id incomingPeerID) String() string { - return fmt.Sprintf("incoming Connection %s -> %s", id.remoteAddr.String(), id.localAddr.String()) + return fmt.Sprintf("incoming Connection %s -> %s", id.remoteAddr, id.localAddr) } type IncomingPeerParams struct { @@ -60,7 +60,7 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) { default: } - id := fmt.Sprintf("incoming Connection %s -> %s", c.RemoteAddr().String(), c.LocalAddr().String()) + id := newPeerID(c.RemoteAddr(), c.LocalAddr()) zap.S().Infof("read handshake from %s %+v", id, readHandshake) writeHandshake := proto.Handshake{ @@ -95,7 +95,7 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) { params: params, conn: connection, remote: remote, - uniqueID: newPeerID(c.RemoteAddr(), c.LocalAddr()), + uniqueID: id, cancel: cancel, handshake: readHandshake, } diff --git a/pkg/p2p/incoming/incoming.go b/pkg/p2p/incoming/incoming.go index c7254b374..fc72f2076 100644 --- a/pkg/p2p/incoming/incoming.go +++ b/pkg/p2p/incoming/incoming.go @@ -76,7 +76,12 @@ func runIncomingPeer(ctx context.Context, cancel context.CancelFunc, params Peer remote := peer.NewRemote() connection := conn.WrapConnection(c, remote.ToCh, remote.FromCh, remote.ErrCh, params.Skip) - peerImpl := peer.NewPeerImpl(readHandshake, connection, peer.Incoming, remote, cancel) + peerImpl, err := peer.NewPeerImpl(readHandshake, connection, peer.Incoming, remote, cancel) + if err != nil { + _ = c.Close() // TODO: handle error + zap.S().Warn("Failed to create new peer impl: ", err) + return errors.Wrap(err, "failed to run incoming peer") + } out := peer.InfoMessage{ Peer: peerImpl, diff --git a/pkg/p2p/outgoing/outgoing.go b/pkg/p2p/outgoing/outgoing.go index a2f2874c2..e734bee24 100644 --- a/pkg/p2p/outgoing/outgoing.go +++ b/pkg/p2p/outgoing/outgoing.go @@ -29,6 +29,7 @@ type EstablishParams struct { func EstablishConnection(ctx context.Context, params EstablishParams, v proto.Version) error { ctx, cancel := context.WithCancel(ctx) + // FIXME: cancel should be defered remote := peer.NewRemote() p := connector{ params: params, @@ -36,19 +37,27 @@ func EstablishConnection(ctx context.Context, params EstablishParams, v proto.Ve remote: remote, } + // TODO: use net.DialTimeout c, err := net.Dial("tcp", params.Address.String()) if err != nil { return err } + // FIXME: connection.close should be called in case of any error, or it should be deferred in any case connection, handshake, err := p.connect(ctx, c, v) if err != nil { + // FIXME: close connection zap.S().Debugf("Outgoing connection to address %s failed with error: %v", params.Address.String(), err) return errors.Wrapf(err, "%q", params.Address) } p.connection = connection - peerImpl := peer.NewPeerImpl(*handshake, connection, peer.Outgoing, remote, cancel) + peerImpl, err := peer.NewPeerImpl(*handshake, connection, peer.Outgoing, remote, cancel) + if err != nil { + _ = c.Close() // TODO: handle error + zap.S().Debugf("Failed to create new peer impl for outgoing conn to %s: %v", params.Address, err) + return errors.Wrapf(err, "failed to establish connection to %s", params.Address.String()) + } connected := peer.InfoMessage{ Peer: peerImpl, diff --git a/pkg/p2p/peer/handle.go b/pkg/p2p/peer/handle.go index f8643ba92..810238e86 100644 --- a/pkg/p2p/peer/handle.go +++ b/pkg/p2p/peer/handle.go @@ -55,6 +55,7 @@ type HandlerParams struct { } // Handle sends and receives messages no matter outgoing or incoming connection. +// TODO: caller should be responsible for closing network connection func Handle(params HandlerParams) error { for { select { diff --git a/pkg/p2p/peer/peer_impl.go b/pkg/p2p/peer/peer_impl.go index 8c272e488..bbcc2aa5f 100644 --- a/pkg/p2p/peer/peer_impl.go +++ b/pkg/p2p/peer/peer_impl.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "net" - "strings" + "net/netip" "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/p2p/conn" @@ -13,17 +13,27 @@ import ( ) type peerImplID struct { - addr net.Addr - nonce uint64 + addr16 [16]byte + nonce uint64 } -func newPeerImplID(addr net.Addr, nonce uint64) peerImplID { - return peerImplID{addr: addr, nonce: nonce} +func newPeerImplID(addr net.Addr, nonce uint64) (peerImplID, error) { + var ( + netStr = addr.Network() + addrStr = addr.String() + ) + tcpAddr, err := net.ResolveTCPAddr(netStr, addrStr) + if err != nil { + return peerImplID{}, errors.Wrapf(err, "failed to resolve '%s' addr from '%s'", netStr, addrStr) + } + var addr16 [16]byte + copy(addr16[:], tcpAddr.IP.To16()) + return peerImplID{addr16: addr16, nonce: nonce}, nil } func (id peerImplID) String() string { - a := strings.Split(id.addr.String(), ":")[0] - return fmt.Sprintf("%s-%d", a, id.nonce) + addr := netip.AddrFrom16(id.addr16).Unmap() + return fmt.Sprintf("%s-%d", addr.String(), id.nonce) } type PeerImpl struct { @@ -35,15 +45,19 @@ type PeerImpl struct { cancel context.CancelFunc } -func NewPeerImpl(handshake proto.Handshake, conn conn.Connection, direction Direction, remote Remote, cancel context.CancelFunc) *PeerImpl { +func NewPeerImpl(handshake proto.Handshake, conn conn.Connection, direction Direction, remote Remote, cancel context.CancelFunc) (*PeerImpl, error) { + id, err := newPeerImplID(conn.Conn().RemoteAddr(), handshake.NodeNonce) + if err != nil { + return nil, errors.Wrap(err, "failed to create new peer") + } return &PeerImpl{ handshake: handshake, conn: conn, direction: direction, remote: remote, - id: newPeerImplID(conn.Conn().RemoteAddr(), handshake.NodeNonce), + id: id, cancel: cancel, - } + }, nil } func (a *PeerImpl) Direction() Direction { diff --git a/pkg/p2p/peer/peer_test.go b/pkg/p2p/peer/peer_test.go index 4ed81d931..d541325cb 100644 --- a/pkg/p2p/peer/peer_test.go +++ b/pkg/p2p/peer/peer_test.go @@ -1,13 +1,87 @@ package peer import ( - "net" + "fmt" + "net/netip" + "strconv" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestID(t *testing.T) { - addr, _ := net.ResolveTCPAddr("", "127.0.0.1:6868") - assert.Equal(t, "127.0.0.1-100500", peerImplID{addr: addr, nonce: 100500}.String()) +type netAddr struct{ net, addr string } + +func (n netAddr) Network() string { return n.net } + +func (n netAddr) String() string { return n.addr } + +func TestPeerImplID(t *testing.T) { + tests := []struct { + net, addr string + nonce uint64 + errorStr string + }{ + {net: "tcp", addr: "127.0.0.1:100", nonce: 100501}, + {net: "tcp4", addr: "127.0.0.1:100", nonce: 100502}, + {net: "", addr: "127.0.0.1:100", nonce: 100504}, + {net: "tcp", addr: "[2001:db8::1]:8080", nonce: 80}, + {net: "tcp6", addr: "[2001:db8::1]:8080", nonce: 82}, + { + net: "tcp6", addr: "127.0.0.1:100", nonce: 100503, + errorStr: "failed to resolve 'tcp6' addr from '127.0.0.1:100': address 127.0.0.1: no suitable address found", + }, + { + net: "tcp4", addr: "[2001:db8::1]:8080", nonce: 81, + errorStr: "failed to resolve 'tcp4' addr from '[2001:db8::1]:8080': address 2001:db8::1: no suitable address found", + }, + { + net: "udp", addr: "[2001:db8::1]:8080", nonce: 80, + errorStr: "failed to resolve 'udp' addr from '[2001:db8::1]:8080': unknown network udp", + }, + { + net: "tcp", addr: "127.0.0.01", nonce: 90, + errorStr: "failed to resolve 'tcp' addr from '127.0.0.01': address 127.0.0.01: missing port in address", + }, + } + for i, test := range tests { + t.Run(strconv.Itoa(i+1), func(t *testing.T) { + id, err := newPeerImplID(netAddr{net: test.net, addr: test.addr}, test.nonce) + if test.errorStr != "" { + assert.EqualError(t, err, test.errorStr) + } else { + addrP, err := netip.ParseAddrPort(test.addr) + require.NoError(t, err) + expectedAddr := addrP.Addr() + assert.Equal(t, expectedAddr.As16(), id.addr16) + assert.Equal(t, test.nonce, id.nonce) + expectedString := fmt.Sprintf("%s-%d", expectedAddr, test.nonce) + assert.Equal(t, expectedString, id.String()) + } + }) + } +} + +func TestPeerImplId_InMap(t *testing.T) { + const ( + net = "tcp" + addr = "127.0.0.1:8080" + ) + type noncePair struct{ first, second uint64 } + for i, np := range []noncePair{{100, 500}, {100, 100}} { + t.Run(strconv.Itoa(i+1), func(t *testing.T) { + first, err := newPeerImplID(netAddr{net: net, addr: addr}, np.first) + require.NoError(t, err) + second, err := newPeerImplID(netAddr{net: net, addr: addr}, np.second) + require.NoError(t, err) + + m := map[ID]struct{}{first: {}} + _, ok := m[second] + if unique := np.first != np.second; unique { + assert.False(t, ok) + } else { + assert.True(t, ok) + } + }) + } }