Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New network connection #1547

Merged
merged 43 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
470cae5
TaskGroup goroutine manager added in pkg/execution package.
alexeykiselev Nov 14, 2024
561302a
Networking package with a new connection handler Session added.
alexeykiselev Nov 15, 2024
3d5e202
Logger interface removed from networking package. Standard slog packa…
alexeykiselev Nov 18, 2024
60d0178
WIP. Simple connection replaced with NetClient.
alexeykiselev Nov 25, 2024
eb14f66
Merge branch 'master' into new-network-connection
alexeykiselev Nov 25, 2024
948cc56
Fixed NetClient closing issue.
alexeykiselev Nov 25, 2024
67f4b85
Redundant log removed.
alexeykiselev Nov 25, 2024
d44fa7f
Move save int conversion to safecast lib.
alexeykiselev Nov 25, 2024
65414fa
Merge branch 'master' into new-network-connection
alexeykiselev Nov 27, 2024
7bab250
Merge branch 'master' into new-network-connection
nickeskov Nov 28, 2024
df01e60
Fix data race error in 'networking_test' package
nickeskov Nov 28, 2024
9705342
Merge branch 'master' into new-network-connection
nickeskov Dec 3, 2024
70a8c34
Merge branch 'master' into new-network-connection
alexeykiselev Dec 6, 2024
00a9ebe
Replace atomic.Uint32 with atomic.Bool and use CompareAndSwap there i…
alexeykiselev Dec 10, 2024
63a0305
Assertions added.
alexeykiselev Dec 10, 2024
5219227
Simplified closing and close logic in NetClient.
alexeykiselev Dec 11, 2024
ff41cf7
Prepare for new timer in Go 1.23
alexeykiselev Dec 11, 2024
e2f697f
Move constant into function were it used.
alexeykiselev Dec 12, 2024
edd942a
Merge branch 'master' into new-network-connection
alexeykiselev Dec 12, 2024
f832683
Better way to prevent from running multiple receiveLoops.
alexeykiselev Dec 13, 2024
c2ad101
Better data emptyness checks.
alexeykiselev Dec 13, 2024
3aa8a85
Better read error handling.
alexeykiselev Dec 13, 2024
c08bace
Use constructor.
alexeykiselev Dec 13, 2024
abced7f
Wrap heavy logging into log level checks.
alexeykiselev Dec 14, 2024
64bf7d9
Merge branch 'master' into new-network-connection
alexeykiselev Dec 14, 2024
294cf66
Merge branch 'master' into new-network-connection
alexeykiselev Dec 14, 2024
57b9ffb
Session configuration accepts slog handler to set up logging.
alexeykiselev Dec 16, 2024
14420bc
Close error channel on sending data successfully.
alexeykiselev Dec 16, 2024
412377f
Better error handling while reading.
alexeykiselev Dec 16, 2024
52a893e
Fine error assertions.
alexeykiselev Dec 16, 2024
77633f3
Fix blinking test.
alexeykiselev Dec 16, 2024
8747fc4
Merge branch 'master' into new-network-connection
alexeykiselev Dec 18, 2024
df77a57
Better configuration handling.
alexeykiselev Dec 19, 2024
dc78383
Merge branch 'master' into new-network-connection
alexeykiselev Dec 19, 2024
7b3fffb
Fixed blinking test TestCloseParentContext. Wait group added to wait …
alexeykiselev Dec 19, 2024
d2e2646
Better test workflow. Better wait group naming.
alexeykiselev Dec 19, 2024
3d8f38d
Merge branch 'master' into new-network-connection
alexeykiselev Dec 20, 2024
9599840
Fix deadlock in test by introducing wait group instead of sleep.
alexeykiselev Dec 20, 2024
b5d5173
Merge branch 'master' into new-network-connection
alexeykiselev Dec 24, 2024
16a32cb
Merge branch 'master' into new-network-connection
alexeykiselev Dec 26, 2024
63ade4e
Internal sendPacket reimplemented using io.Reader. Data restoration f…
alexeykiselev Dec 27, 2024
78579ca
Itest network client handler updated.
alexeykiselev Dec 27, 2024
de29ff8
Changed the way OnReceive passes the receiveBuffer. Test updated.
alexeykiselev Dec 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
quiet: False
with-expecter: True
dir: "{{.InterfaceDir}}/mocks"
mockname: "Mock{{.InterfaceName}}"
filename: "{{.InterfaceName | snakecase}}.go"

packages:
github.com/wavesplatform/gowaves/pkg/networking:
interfaces:
Header:
Protocol:
Handler:
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
filippo.io/edwards25519 v1.1.0
github.com/beevik/ntp v1.4.3
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/ccoveille/go-safecast v1.2.0
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/consensys/gnark v0.11.0
Expand All @@ -22,6 +23,7 @@ require (
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/jinzhu/copier v0.4.0
github.com/mr-tron/base58 v1.2.0
github.com/neilotoole/slogt v1.1.0
github.com/ory/dockertest/v3 v3.11.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
Expand All @@ -42,6 +44,7 @@ require (
github.com/valyala/bytebufferpool v1.0.0
github.com/xenolf/lego v2.7.2+incompatible
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
Expand Down Expand Up @@ -98,6 +101,7 @@ require (
github.com/rs/zerolog v1.33.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tidwall/gjson v1.14.2 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurT
github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ=
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/ccoveille/go-safecast v1.2.0 h1:H4X7aosepsU1Mfk+098CTdKpsDH0cfYJ2RmwXFjgvfc=
github.com/ccoveille/go-safecast v1.2.0/go.mod h1:QqwNjxQ7DAqY0C721OIO9InMk9zCwcsO7tnRuHytad8=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -199,6 +201,8 @@ github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjW
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/neilotoole/slogt v1.1.0 h1:c7qE92sq+V0yvCuaxph+RQ2jOKL61c4hqS1Bv9W7FZE=
github.com/neilotoole/slogt v1.1.0/go.mod h1:RCrGXkPc/hYybNulqQrMHRtvlQ7F6NktNVLuLwk6V+w=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -282,6 +286,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
5 changes: 5 additions & 0 deletions itests/clients/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (c *GRPCClient) GetAssetsInfo(t *testing.T, id []byte) *g.AssetInfoResponse
return assetInfo
}

func (c *GRPCClient) Close(t testing.TB) {
err := c.conn.Close()
assert.NoError(t, err, "failed to close GRPC connection to %s node", c.impl.String())
}

func (c *GRPCClient) getBalance(t *testing.T, req *g.BalancesRequest) *g.BalanceResponse {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
Expand Down
211 changes: 211 additions & 0 deletions itests/clients/net_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package clients

import (
"bytes"
"context"
"encoding/base64"
"log/slog"
"net"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/neilotoole/slogt"
"github.com/stretchr/testify/require"

"github.com/wavesplatform/gowaves/itests/config"
"github.com/wavesplatform/gowaves/pkg/networking"
"github.com/wavesplatform/gowaves/pkg/proto"
)

const (
appName = "wavesL"
nonce = uint64(0)
networkTimeout = 3 * time.Second
pingInterval = 5 * time.Second
)

type NetClient struct {
ctx context.Context
t testing.TB
impl Implementation
n *networking.Network
c *networking.Config
s *networking.Session

closing atomic.Bool
closed sync.Once
}

func NewNetClient(
ctx context.Context, t testing.TB, impl Implementation, port string, peers []proto.PeerInfo,
) *NetClient {
n := networking.NewNetwork()
p := newProtocol(t, nil)
h := newHandler(t, peers)
log := slogt.New(t)
conf := networking.NewConfig(p, h).
WithSlogHandler(log.Handler()).
WithWriteTimeout(networkTimeout).
WithKeepAliveInterval(pingInterval).
WithSlogAttributes(slog.String("suite", t.Name()), slog.String("impl", impl.String()))

conn, err := net.Dial("tcp", config.DefaultIP+":"+port)
require.NoError(t, err, "failed to dial TCP to %s node", impl.String())

s, err := n.NewSession(ctx, conn, conf)
require.NoError(t, err, "failed to establish new session to %s node", impl.String())

cli := &NetClient{ctx: ctx, t: t, impl: impl, n: n, c: conf, s: s}
h.client = cli // Set client reference in handler.
return cli
}

func (c *NetClient) SendHandshake() {
handshake := &proto.Handshake{
AppName: appName,
Version: proto.ProtocolVersion(),
NodeName: "itest",
NodeNonce: nonce,
DeclaredAddr: proto.HandshakeTCPAddr{},
Timestamp: proto.NewTimestampFromTime(time.Now()),
}
buf := bytes.NewBuffer(nil)
_, err := handshake.WriteTo(buf)
require.NoError(c.t, err,
"failed to marshal handshake to %s node at %q", c.impl.String(), c.s.RemoteAddr())
_, err = c.s.Write(buf.Bytes())
require.NoError(c.t, err,
"failed to send handshake to %s node at %q", c.impl.String(), c.s.RemoteAddr())
}

func (c *NetClient) SendMessage(m proto.Message) {
_, err := m.WriteTo(c.s)
require.NoError(c.t, err, "failed to send message to %s node at %q", c.impl.String(), c.s.RemoteAddr())
}

func (c *NetClient) Close() {
c.closed.Do(func() {
if c.closing.CompareAndSwap(false, true) {
c.t.Logf("Closing connection to %s node at %q", c.impl.String(), c.s.RemoteAddr().String())
}
err := c.s.Close()
require.NoError(c.t, err, "failed to close session to %s node at %q", c.impl.String(), c.s.RemoteAddr())
})
}

func (c *NetClient) reconnect() {
c.t.Logf("Reconnecting to %q", c.s.RemoteAddr().String())
conn, err := net.Dial("tcp", c.s.RemoteAddr().String())
require.NoError(c.t, err, "failed to dial TCP to %s node", c.impl.String())

s, err := c.n.NewSession(c.ctx, conn, c.c)
require.NoError(c.t, err, "failed to re-establish the session to %s node", c.impl.String())
c.s = s
nickeskov marked this conversation as resolved.
Show resolved Hide resolved

c.SendHandshake()
}

type protocol struct {
t testing.TB
dropLock sync.Mutex
drop map[proto.PeerMessageID]struct{}
}

func newProtocol(t testing.TB, drop []proto.PeerMessageID) *protocol {
m := make(map[proto.PeerMessageID]struct{})
for _, id := range drop {
m[id] = struct{}{}
}
return &protocol{t: t, drop: m}
}

func (p *protocol) EmptyHandshake() networking.Handshake {
return &proto.Handshake{}
}

func (p *protocol) EmptyHeader() networking.Header {
return &proto.Header{}
}

func (p *protocol) Ping() ([]byte, error) {
msg := &proto.GetPeersMessage{}
return msg.MarshalBinary()
}

func (p *protocol) IsAcceptableHandshake(h networking.Handshake) bool {
hs, ok := h.(*proto.Handshake)
if !ok {
return false
}
// Reject nodes with incorrect network bytes, unsupported protocol versions,
// or a zero nonce (indicating a self-connection).
if hs.AppName != appName || hs.Version.Cmp(proto.ProtocolVersion()) < 0 || hs.NodeNonce == 0 {
p.t.Logf("Unacceptable handshake:")
if hs.AppName != appName {
p.t.Logf("\tinvalid application name %q, expected %q", hs.AppName, appName)
}
if hs.Version.Cmp(proto.ProtocolVersion()) < 0 {
p.t.Logf("\tinvalid application version %q should be equal or more than %q",
hs.Version, proto.ProtocolVersion())
}
if hs.NodeNonce == 0 {
p.t.Logf("\tinvalid node nonce %d", hs.NodeNonce)
}
return false
}
return true
}

func (p *protocol) IsAcceptableMessage(h networking.Header) bool {
hdr, ok := h.(*proto.Header)
if !ok {
return false
}
p.dropLock.Lock()
defer p.dropLock.Unlock()
_, ok = p.drop[hdr.ContentID]
return !ok
}

type handler struct {
peers []proto.PeerInfo
t testing.TB
client *NetClient
}

func newHandler(t testing.TB, peers []proto.PeerInfo) *handler {
return &handler{t: t, peers: peers}
}

func (h *handler) OnReceive(s *networking.Session, data []byte) {
msg, err := proto.UnmarshalMessage(data)
if err != nil { // Fail test on unmarshal error.
h.t.Logf("Failed to unmarshal message from bytes: %q", base64.StdEncoding.EncodeToString(data))
h.t.FailNow()
return
}
switch msg.(type) { // Only reply with peers on GetPeersMessage.
case *proto.GetPeersMessage:
h.t.Logf("Received GetPeersMessage from %q", s.RemoteAddr())
rpl := &proto.PeersMessage{Peers: h.peers}
if _, sErr := rpl.WriteTo(s); sErr != nil {
h.t.Logf("Failed to send peers message: %v", sErr)
h.t.FailNow()
return
}
default:
}
nickeskov marked this conversation as resolved.
Show resolved Hide resolved
}

func (h *handler) OnHandshake(_ *networking.Session, _ networking.Handshake) {
h.t.Logf("Connection to %s node at %q was established", h.client.impl.String(), h.client.s.RemoteAddr())
}

func (h *handler) OnClose(s *networking.Session) {
h.t.Logf("Connection to %q was closed", s.RemoteAddr())
if !h.client.closing.Load() && h.client != nil {
h.client.reconnect()
}
}
41 changes: 37 additions & 4 deletions itests/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/wavesplatform/gowaves/itests/config"
d "github.com/wavesplatform/gowaves/itests/docker"
"github.com/wavesplatform/gowaves/pkg/crypto"
"github.com/wavesplatform/gowaves/pkg/proto"
Expand All @@ -24,10 +26,19 @@ type NodesClients struct {
ScalaClient *NodeUniversalClient
}

func NewNodesClients(t *testing.T, goPorts, scalaPorts *d.PortConfig) *NodesClients {
func NewNodesClients(ctx context.Context, t *testing.T, goPorts, scalaPorts *d.PortConfig) *NodesClients {
sp, err := proto.NewPeerInfoFromString(config.DefaultIP + ":" + scalaPorts.BindPort)
require.NoError(t, err, "failed to create Scala peer info")
gp, err := proto.NewPeerInfoFromString(config.DefaultIP + ":" + goPorts.BindPort)
require.NoError(t, err, "failed to create Go peer info")
peers := []proto.PeerInfo{sp, gp}
return &NodesClients{
GoClient: NewNodeUniversalClient(t, NodeGo, goPorts.RESTAPIPort, goPorts.GRPCPort),
ScalaClient: NewNodeUniversalClient(t, NodeScala, scalaPorts.RESTAPIPort, scalaPorts.GRPCPort),
GoClient: NewNodeUniversalClient(
ctx, t, NodeGo, goPorts.RESTAPIPort, goPorts.GRPCPort, goPorts.BindPort, peers,
),
ScalaClient: NewNodeUniversalClient(
ctx, t, NodeScala, scalaPorts.RESTAPIPort, scalaPorts.GRPCPort, scalaPorts.BindPort, peers,
),
}
}

Expand Down Expand Up @@ -236,7 +247,6 @@ func (c *NodesClients) SynchronizedWavesBalances(
if err != nil {
t.Logf("Errors while requesting balances: %v", err)
}
t.Log("Entering loop")
for {
commonHeight := mostCommonHeight(sbs)
toRetry := make([]proto.WavesAddress, 0, len(addresses))
Expand Down Expand Up @@ -273,6 +283,29 @@ func (c *NodesClients) SynchronizedWavesBalances(
return r
}

func (c *NodesClients) Handshake() {
c.GoClient.Connection.SendHandshake()
c.ScalaClient.Connection.SendHandshake()
}

func (c *NodesClients) SendToNodes(t *testing.T, m proto.Message, scala bool) {
t.Logf("Sending message to Go node: %T", m)
c.GoClient.Connection.SendMessage(m)
t.Log("Message sent to Go node")
if scala {
t.Logf("Sending message to Scala node: %T", m)
c.ScalaClient.Connection.SendMessage(m)
t.Log("Message sent to Scala node")
}
}

func (c *NodesClients) Close(t *testing.T) {
c.GoClient.GRPCClient.Close(t)
c.GoClient.Connection.Close()
c.ScalaClient.GRPCClient.Close(t)
c.ScalaClient.Connection.Close()
}

func (c *NodesClients) requestNodesAvailableBalances(
ctx context.Context, address proto.WavesAddress,
) (addressedBalanceAtHeight, error) {
Expand Down
9 changes: 8 additions & 1 deletion itests/clients/universal_client.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package clients

import (
"context"
"testing"

"github.com/wavesplatform/gowaves/pkg/proto"
)

type NodeUniversalClient struct {
Implementation Implementation
HTTPClient *HTTPClient
GRPCClient *GRPCClient
Connection *NetClient
}

func NewNodeUniversalClient(t *testing.T, impl Implementation, httpPort string, grpcPort string) *NodeUniversalClient {
func NewNodeUniversalClient(
ctx context.Context, t *testing.T, impl Implementation, httpPort, grpcPort, netPort string, peers []proto.PeerInfo,
) *NodeUniversalClient {
return &NodeUniversalClient{
Implementation: impl,
HTTPClient: NewHTTPClient(t, impl, httpPort),
GRPCClient: NewGRPCClient(t, impl, grpcPort),
Connection: NewNetClient(ctx, t, impl, netPort, peers),
}
}
Loading
Loading