Skip to content

Commit

Permalink
New network connection (#1547)
Browse files Browse the repository at this point in the history
* TaskGroup goroutine manager added in pkg/execution package.
Tests on TaskGroup added.

* Networking package with a new connection handler Session added.

* Logger interface removed from networking package. Standard slog package is used instead.

* WIP. Simple connection replaced with NetClient.
NetClient usage moved into Universal client.
Handshake proto updated to compatibility with Handshake interface from networking package.

* Fixed NetClient closing issue.
Configuration option to set KeepAliveInterval added to networking.Config.

* Redundant log removed.

* Move save int conversion to safecast lib.

* Fix data race error in 'networking_test' package

Implement 'io.Stringer' for 'Session' struct.
Data race happens because 'clientHandler' mock in 'TestSessionTimeoutOnHandshake' test
reads 'Session' structure at the same time as 'clientSession.Close' call.

* Replace atomic.Uint32 with atomic.Bool and use CompareAndSwap there it's possible.
Replace random delay with constan to make test not blink.
Simplify assertion in test to make it stable.

* Assertions added.
Style fixed.

* Simplified closing and close logic in NetClient.
Added logs on handshake rejection to clarify the reason of rejections.
Added and used function to configure Session with list of Slog attributes.

* Prepare for new timer in Go 1.23

Co-authored-by: Nikolay Eskov <[email protected]>

* Move constant into function were it used.
Proper error declaration.

* Better way to prevent from running multiple receiveLoops.
Shutdown lock replaced with sync.Once.

* Better data emptyness checks.

* Better read error handling.

Co-authored-by: Nikolay Eskov <[email protected]>

* Use constructor.

* Wrap heavy logging into log level checks.
Fix data lock and data access order.

* Session configuration accepts slog handler to set up logging.
Discarding slog handler implemented and used instead of setting default slog logger.
Checks on interval values added to Session constructor.

* Close error channel on sending data successfully.
Better error channel passing.
Reset receiving buffer by deffering.

* Better error handling while reading.

Co-authored-by: Nikolay Eskov <[email protected]>

* Fine error assertions.

* Fix blinking test.

* Better configuration handling.

Co-authored-by: Nikolay Eskov <[email protected]>

* Fixed blinking test TestCloseParentContext. Wait group added to wait for client to finish sending handshake.
Better wait groups naming.

* Better test workflow. Better wait group naming.

* Fix deadlock in test by introducing wait group instead of sleep.

* Internal sendPacket reimplemented using io.Reader. Data restoration function removed.
Handler's OnReceive use io.Reader to pass received data.
Tests updated. Mocks regenerated.

* Itest network client handler updated.

* Changed the way OnReceive passes the receiveBuffer. Test updated.

---------

Co-authored-by: Nikolay Eskov <[email protected]>
  • Loading branch information
alexeykiselev and nickeskov authored Dec 28, 2024
1 parent 4559244 commit 14ae6e5
Show file tree
Hide file tree
Showing 29 changed files with 2,508 additions and 235 deletions.
12 changes: 12 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
quiet: False
with-expecter: True
dir: "{{.InterfaceDir}}/mocks"
mockname: "Mock{{.InterfaceName}}"
filename: "{{.InterfaceName | snakecase}}.go"

packages:
github.com/wavesplatform/gowaves/pkg/networking:
interfaces:
Header:
Protocol:
Handler:
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
filippo.io/edwards25519 v1.1.0
github.com/beevik/ntp v1.4.3
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/ccoveille/go-safecast v1.2.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/consensys/gnark v0.11.0
Expand All @@ -22,6 +23,7 @@ require (
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/jinzhu/copier v0.4.0
github.com/mr-tron/base58 v1.2.0
github.com/neilotoole/slogt v1.1.0
github.com/ory/dockertest/v3 v3.11.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
Expand All @@ -42,6 +44,7 @@ require (
github.com/valyala/bytebufferpool v1.0.0
github.com/xenolf/lego v2.7.2+incompatible
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
Expand Down Expand Up @@ -98,6 +101,7 @@ require (
github.com/rs/zerolog v1.33.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tidwall/gjson v1.14.2 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurT
github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ=
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/ccoveille/go-safecast v1.2.0 h1:H4X7aosepsU1Mfk+098CTdKpsDH0cfYJ2RmwXFjgvfc=
github.com/ccoveille/go-safecast v1.2.0/go.mod h1:QqwNjxQ7DAqY0C721OIO9InMk9zCwcsO7tnRuHytad8=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -199,6 +201,8 @@ github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjW
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/neilotoole/slogt v1.1.0 h1:c7qE92sq+V0yvCuaxph+RQ2jOKL61c4hqS1Bv9W7FZE=
github.com/neilotoole/slogt v1.1.0/go.mod h1:RCrGXkPc/hYybNulqQrMHRtvlQ7F6NktNVLuLwk6V+w=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -282,6 +286,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
5 changes: 5 additions & 0 deletions itests/clients/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (c *GRPCClient) GetAssetsInfo(t *testing.T, id []byte) *g.AssetInfoResponse
return assetInfo
}

func (c *GRPCClient) Close(t testing.TB) {
err := c.conn.Close()
assert.NoError(t, err, "failed to close GRPC connection to %s node", c.impl.String())
}

func (c *GRPCClient) getBalance(t *testing.T, req *g.BalancesRequest) *g.BalanceResponse {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
Expand Down
218 changes: 218 additions & 0 deletions itests/clients/net_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package clients

import (
"bytes"
"context"
"encoding/base64"
"io"
"log/slog"
"net"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/neilotoole/slogt"
"github.com/stretchr/testify/require"

"github.com/wavesplatform/gowaves/itests/config"
"github.com/wavesplatform/gowaves/pkg/networking"
"github.com/wavesplatform/gowaves/pkg/proto"
)

const (
appName = "wavesL"
nonce = uint64(0)
networkTimeout = 3 * time.Second
pingInterval = 5 * time.Second
)

type NetClient struct {
ctx context.Context
t testing.TB
impl Implementation
n *networking.Network
c *networking.Config
s *networking.Session

closing atomic.Bool
closed sync.Once
}

func NewNetClient(
ctx context.Context, t testing.TB, impl Implementation, port string, peers []proto.PeerInfo,
) *NetClient {
n := networking.NewNetwork()
p := newProtocol(t, nil)
h := newHandler(t, peers)
log := slogt.New(t)
conf := networking.NewConfig(p, h).
WithSlogHandler(log.Handler()).
WithWriteTimeout(networkTimeout).
WithKeepAliveInterval(pingInterval).
WithSlogAttributes(slog.String("suite", t.Name()), slog.String("impl", impl.String()))

conn, err := net.Dial("tcp", config.DefaultIP+":"+port)
require.NoError(t, err, "failed to dial TCP to %s node", impl.String())

s, err := n.NewSession(ctx, conn, conf)
require.NoError(t, err, "failed to establish new session to %s node", impl.String())

cli := &NetClient{ctx: ctx, t: t, impl: impl, n: n, c: conf, s: s}
h.client = cli // Set client reference in handler.
return cli
}

func (c *NetClient) SendHandshake() {
handshake := &proto.Handshake{
AppName: appName,
Version: proto.ProtocolVersion(),
NodeName: "itest",
NodeNonce: nonce,
DeclaredAddr: proto.HandshakeTCPAddr{},
Timestamp: proto.NewTimestampFromTime(time.Now()),
}
buf := bytes.NewBuffer(nil)
_, err := handshake.WriteTo(buf)
require.NoError(c.t, err,
"failed to marshal handshake to %s node at %q", c.impl.String(), c.s.RemoteAddr())
_, err = c.s.Write(buf.Bytes())
require.NoError(c.t, err,
"failed to send handshake to %s node at %q", c.impl.String(), c.s.RemoteAddr())
}

func (c *NetClient) SendMessage(m proto.Message) {
_, err := m.WriteTo(c.s)
require.NoError(c.t, err, "failed to send message to %s node at %q", c.impl.String(), c.s.RemoteAddr())
}

func (c *NetClient) Close() {
c.closed.Do(func() {
if c.closing.CompareAndSwap(false, true) {
c.t.Logf("Closing connection to %s node at %q", c.impl.String(), c.s.RemoteAddr().String())
}
err := c.s.Close()
require.NoError(c.t, err, "failed to close session to %s node at %q", c.impl.String(), c.s.RemoteAddr())
})
}

func (c *NetClient) reconnect() {
c.t.Logf("Reconnecting to %q", c.s.RemoteAddr().String())
conn, err := net.Dial("tcp", c.s.RemoteAddr().String())
require.NoError(c.t, err, "failed to dial TCP to %s node", c.impl.String())

s, err := c.n.NewSession(c.ctx, conn, c.c)
require.NoError(c.t, err, "failed to re-establish the session to %s node", c.impl.String())
c.s = s

c.SendHandshake()
}

type protocol struct {
t testing.TB
dropLock sync.Mutex
drop map[proto.PeerMessageID]struct{}
}

func newProtocol(t testing.TB, drop []proto.PeerMessageID) *protocol {
m := make(map[proto.PeerMessageID]struct{})
for _, id := range drop {
m[id] = struct{}{}
}
return &protocol{t: t, drop: m}
}

func (p *protocol) EmptyHandshake() networking.Handshake {
return &proto.Handshake{}
}

func (p *protocol) EmptyHeader() networking.Header {
return &proto.Header{}
}

func (p *protocol) Ping() ([]byte, error) {
msg := &proto.GetPeersMessage{}
return msg.MarshalBinary()
}

func (p *protocol) IsAcceptableHandshake(h networking.Handshake) bool {
hs, ok := h.(*proto.Handshake)
if !ok {
return false
}
// Reject nodes with incorrect network bytes, unsupported protocol versions,
// or a zero nonce (indicating a self-connection).
if hs.AppName != appName || hs.Version.Cmp(proto.ProtocolVersion()) < 0 || hs.NodeNonce == 0 {
p.t.Logf("Unacceptable handshake:")
if hs.AppName != appName {
p.t.Logf("\tinvalid application name %q, expected %q", hs.AppName, appName)
}
if hs.Version.Cmp(proto.ProtocolVersion()) < 0 {
p.t.Logf("\tinvalid application version %q should be equal or more than %q",
hs.Version, proto.ProtocolVersion())
}
if hs.NodeNonce == 0 {
p.t.Logf("\tinvalid node nonce %d", hs.NodeNonce)
}
return false
}
return true
}

func (p *protocol) IsAcceptableMessage(h networking.Header) bool {
hdr, ok := h.(*proto.Header)
if !ok {
return false
}
p.dropLock.Lock()
defer p.dropLock.Unlock()
_, ok = p.drop[hdr.ContentID]
return !ok
}

type handler struct {
peers []proto.PeerInfo
t testing.TB
client *NetClient
}

func newHandler(t testing.TB, peers []proto.PeerInfo) *handler {
return &handler{t: t, peers: peers}
}

func (h *handler) OnReceive(s *networking.Session, r io.Reader) {
data, err := io.ReadAll(r)
if err != nil {
h.t.Logf("Failed to read message from %q: %v", s.RemoteAddr(), err)
h.t.FailNow()
return
}
msg, err := proto.UnmarshalMessage(data)
if err != nil { // Fail test on unmarshal error.
h.t.Logf("Failed to unmarshal message from bytes: %q", base64.StdEncoding.EncodeToString(data))
h.t.FailNow()
return
}
switch msg.(type) { // Only reply with peers on GetPeersMessage.
case *proto.GetPeersMessage:
h.t.Logf("Received GetPeersMessage from %q", s.RemoteAddr())
rpl := &proto.PeersMessage{Peers: h.peers}
if _, sErr := rpl.WriteTo(s); sErr != nil {
h.t.Logf("Failed to send peers message: %v", sErr)
h.t.FailNow()
return
}
default:
}
}

func (h *handler) OnHandshake(_ *networking.Session, _ networking.Handshake) {
h.t.Logf("Connection to %s node at %q was established", h.client.impl.String(), h.client.s.RemoteAddr())
}

func (h *handler) OnClose(s *networking.Session) {
h.t.Logf("Connection to %q was closed", s.RemoteAddr())
if !h.client.closing.Load() && h.client != nil {
h.client.reconnect()
}
}
41 changes: 37 additions & 4 deletions itests/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/wavesplatform/gowaves/itests/config"
d "github.com/wavesplatform/gowaves/itests/docker"
"github.com/wavesplatform/gowaves/pkg/crypto"
"github.com/wavesplatform/gowaves/pkg/proto"
Expand All @@ -24,10 +26,19 @@ type NodesClients struct {
ScalaClient *NodeUniversalClient
}

func NewNodesClients(t *testing.T, goPorts, scalaPorts *d.PortConfig) *NodesClients {
func NewNodesClients(ctx context.Context, t *testing.T, goPorts, scalaPorts *d.PortConfig) *NodesClients {
sp, err := proto.NewPeerInfoFromString(config.DefaultIP + ":" + scalaPorts.BindPort)
require.NoError(t, err, "failed to create Scala peer info")
gp, err := proto.NewPeerInfoFromString(config.DefaultIP + ":" + goPorts.BindPort)
require.NoError(t, err, "failed to create Go peer info")
peers := []proto.PeerInfo{sp, gp}
return &NodesClients{
GoClient: NewNodeUniversalClient(t, NodeGo, goPorts.RESTAPIPort, goPorts.GRPCPort),
ScalaClient: NewNodeUniversalClient(t, NodeScala, scalaPorts.RESTAPIPort, scalaPorts.GRPCPort),
GoClient: NewNodeUniversalClient(
ctx, t, NodeGo, goPorts.RESTAPIPort, goPorts.GRPCPort, goPorts.BindPort, peers,
),
ScalaClient: NewNodeUniversalClient(
ctx, t, NodeScala, scalaPorts.RESTAPIPort, scalaPorts.GRPCPort, scalaPorts.BindPort, peers,
),
}
}

Expand Down Expand Up @@ -236,7 +247,6 @@ func (c *NodesClients) SynchronizedWavesBalances(
if err != nil {
t.Logf("Errors while requesting balances: %v", err)
}
t.Log("Entering loop")
for {
commonHeight := mostCommonHeight(sbs)
toRetry := make([]proto.WavesAddress, 0, len(addresses))
Expand Down Expand Up @@ -273,6 +283,29 @@ func (c *NodesClients) SynchronizedWavesBalances(
return r
}

func (c *NodesClients) Handshake() {
c.GoClient.Connection.SendHandshake()
c.ScalaClient.Connection.SendHandshake()
}

func (c *NodesClients) SendToNodes(t *testing.T, m proto.Message, scala bool) {
t.Logf("Sending message to Go node: %T", m)
c.GoClient.Connection.SendMessage(m)
t.Log("Message sent to Go node")
if scala {
t.Logf("Sending message to Scala node: %T", m)
c.ScalaClient.Connection.SendMessage(m)
t.Log("Message sent to Scala node")
}
}

func (c *NodesClients) Close(t *testing.T) {
c.GoClient.GRPCClient.Close(t)
c.GoClient.Connection.Close()
c.ScalaClient.GRPCClient.Close(t)
c.ScalaClient.Connection.Close()
}

func (c *NodesClients) requestNodesAvailableBalances(
ctx context.Context, address proto.WavesAddress,
) (addressedBalanceAtHeight, error) {
Expand Down
Loading

0 comments on commit 14ae6e5

Please sign in to comment.