Skip to content

Commit

Permalink
Itest light node state hash (#1512)
Browse files Browse the repository at this point in the history
* Test fixture that starts one Go node added.
Configuration option to disable mining on Go node added.
Configuration option to enable mining on Scala node converted to the option that disables mining on it.
Existing tests changed accordingly.

* Genesis and key-block snapshot hashes generation added.
Messages sequence implemented.
Score calculation added.

* Added config options to configure preactivated featuers and absence period.

* Fix genesis block snapshot hash calculation.
Added delay before sending key-block.

* Test refactoring.

* Linter issues fixed.

* Rename test suite.

* Better function naming.
Constant extracted.

* TaskGroup goroutine manager added in pkg/execution package.
Tests on TaskGroup added.

* Networking package with a new connection handler Session added.

* Logger interface removed from networking package. Standard slog package is used instead.

* WIP. Simple connection replaced with NetClient.
NetClient usage moved into Universal client.
Handshake proto updated to compatibility with Handshake interface from networking package.

* Fixed NetClient closing issue.
Configuration option to set KeepAliveInterval added to networking.Config.

* Redundant log removed.

* Move save int conversion to safecast lib.

* Accidentially added files removed.

* Fixed handshake issue for the single node test suite.

* Fix data race error in 'networking_test' package

Implement 'io.Stringer' for 'Session' struct.
Data race happens because 'clientHandler' mock in 'TestSessionTimeoutOnHandshake' test
reads 'Session' structure at the same time as 'clientSession.Close' call.

* Refactofing of test containers shutdown. Explicit test containers removal.
Quorum incrased to 2 for itest nodes because of constant test network connection.
Level of Session logging set to INFO for itests.

* Paralles start and shutdown of test nodes implemented.

* Add parallelizm to some Docker functions.

* Support native itest container building for Gowaves node.

* Fix linter issues.

* Replace atomic.Uint32 with atomic.Bool and use CompareAndSwap there it's possible.
Replace random delay with constan to make test not blink.
Simplify assertion in test to make it stable.

* Assertions added.
Style fixed.

* Simplified closing and close logic in NetClient.
Added logs on handshake rejection to clarify the reason of rejections.
Added and used function to configure Session with list of Slog attributes.

* Prepare for new timer in Go 1.23

Co-authored-by: Nikolay Eskov <[email protected]>

* Move constant into function were it used.
Proper error declaration.

* Better way to prevent from running multiple receiveLoops.
Shutdown lock replaced with sync.Once.

* Better data emptyness checks.

* Better read error handling.

Co-authored-by: Nikolay Eskov <[email protected]>

* Use constructor.

* Wrap heavy logging into log level checks.
Fix data lock and data access order.

* Session configuration accepts slog handler to set up logging.
Discarding slog handler implemented and used instead of setting default slog logger.
Checks on interval values added to Session constructor.

* Close error channel on sending data successfully.
Better error channel passing.
Reset receiving buffer by deffering.

* Better error handling while reading.

Co-authored-by: Nikolay Eskov <[email protected]>

* Fine error assertions.

* Fix blinking test.

* Better configuration handling.

Co-authored-by: Nikolay Eskov <[email protected]>

* Fixed blinking test TestCloseParentContext. Wait group added to wait for client to finish sending handshake.
Better wait groups naming.

* Better test workflow. Better wait group naming.

* Fix deadlock in test by introducing wait group instead of sleep.

* Function to disable IPv6 in itest docker containers added.
Healthcheck instruction added to itest docker container.

* Functions to subscribe and wait for network messages of specifed types added to itest network client.
SimpleSnapshot test reimplemented using assertions for expected messages instead of sleeping for some time.

* Better error messages.
Close of handler's channel added.
Unnecessary sleep removed.

* Internal sendPacket reimplemented using io.Reader. Data restoration function removed.
Handler's OnReceive use io.Reader to pass received data.
Tests updated. Mocks regenerated.

* Handler implementation updated.

* Itest network client handler updated.

* Fix multiple containers removal issue.

---------

Co-authored-by: Nikolay Eskov <[email protected]>
  • Loading branch information
alexeykiselev and nickeskov authored Dec 28, 2024
1 parent 14ae6e5 commit 4001464
Show file tree
Hide file tree
Showing 19 changed files with 840 additions and 238 deletions.
10 changes: 7 additions & 3 deletions Dockerfile.gowaves-it
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ COPY cmd cmd
COPY pkg pkg

ARG WITH_RACE_SUFFIX=""
RUN make build-node-linux-amd64${WITH_RACE_SUFFIX}
RUN make build-node-native${WITH_RACE_SUFFIX}

FROM alpine:3.21.0
ENV TZ=Etc/UTC \
APP_USER=gowaves

RUN apk add --no-cache bind-tools
RUN apk add --no-cache bind-tools curl

RUN addgroup -S $APP_USER \
&& adduser -S $APP_USER -G $APP_USER
Expand All @@ -34,7 +34,9 @@ ENV CONFIG_PATH=/home/gowaves/config/gowaves-it.json \

USER $APP_USER

COPY --from=builder /app/build/bin/linux-amd64/node /app/node
COPY --from=builder /app/build/bin/native/node /app/node

HEALTHCHECK CMD curl -f http://localhost:6869/node/status || exit 1

STOPSIGNAL SIGINT

Expand Down Expand Up @@ -64,3 +66,5 @@ CMD /app/node \
-microblock-interval 2s \
-blacklist-residence-time 0 \
-rate-limiter-opts="rps=100&burst=100" \
-min-peers-mining=2 \
-disable-miner=$DISABLE_MINER \
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ dist-blockcmp: release-blockcmp

build-node-native:
@go build -o build/bin/native/node -ldflags="-X 'github.com/wavesplatform/gowaves/pkg/versioning.Version=$(VERSION)'" ./cmd/node
build-node-native-with-race:
@go build -race -o build/bin/native/node -ldflags="-X 'github.com/wavesplatform/gowaves/pkg/versioning.Version=$(VERSION)'" ./cmd/node
build-node-linux-amd64:
@CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/bin/linux-amd64/node -ldflags="-X 'github.com/wavesplatform/gowaves/pkg/versioning.Version=$(VERSION)'" ./cmd/node
build-node-linux-amd64-with-race:
Expand Down
132 changes: 128 additions & 4 deletions itests/clients/net_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"log/slog"
"math/big"
"net"
"reflect"
"sync"
"sync/atomic"
"testing"
Expand All @@ -33,6 +37,7 @@ type NetClient struct {
impl Implementation
n *networking.Network
c *networking.Config
h *handler
s *networking.Session

closing atomic.Bool
Expand All @@ -45,7 +50,17 @@ func NewNetClient(
n := networking.NewNetwork()
p := newProtocol(t, nil)
h := newHandler(t, peers)
log := slogt.New(t)

f := slogt.Factory(func(w io.Writer) slog.Handler {
opts := &slog.HandlerOptions{
AddSource: true,
Level: slog.LevelInfo,
}
return slog.NewTextHandler(w, opts)
})
log := slogt.New(t, f)

slog.SetLogLoggerLevel(slog.LevelError)
conf := networking.NewConfig(p, h).
WithSlogHandler(log.Handler()).
WithWriteTimeout(networkTimeout).
Expand All @@ -58,7 +73,7 @@ func NewNetClient(
s, err := n.NewSession(ctx, conn, conf)
require.NoError(t, err, "failed to establish new session to %s node", impl.String())

cli := &NetClient{ctx: ctx, t: t, impl: impl, n: n, c: conf, s: s}
cli := &NetClient{ctx: ctx, t: t, impl: impl, n: n, c: conf, h: h, s: s}
h.client = cli // Set client reference in handler.
return cli
}
Expand Down Expand Up @@ -93,9 +108,85 @@ func (c *NetClient) Close() {
}
err := c.s.Close()
require.NoError(c.t, err, "failed to close session to %s node at %q", c.impl.String(), c.s.RemoteAddr())
c.h.close()
})
}

// SubscribeForMessages adds specified types to the message waiting queue.
// Once the awaited message received the corresponding type is removed from the queue.
func (c *NetClient) SubscribeForMessages(messageType ...reflect.Type) error {
for _, mt := range messageType {
if err := c.h.waitFor(mt); err != nil {
return err
}
}
return nil
}

// AwaitMessage waits for a message from the node for the specified timeout.
func (c *NetClient) AwaitMessage(messageType reflect.Type, timeout time.Duration) (proto.Message, error) {
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
case <-time.After(timeout):
return nil, fmt.Errorf("timeout waiting for message of type %q", messageType.String())
case msg := <-c.h.receiveChan():
if reflect.TypeOf(msg) != messageType {
return nil, fmt.Errorf("unexpected message type %q, expecting %q",
reflect.TypeOf(msg).String(), messageType.String())
}
return msg, nil
}
}

// AwaitGetBlockMessage waits for a GetBlockMessage from the node for the specified timeout and
// returns the requested block ID.
func (c *NetClient) AwaitGetBlockMessage(timeout time.Duration) (proto.BlockID, error) {
msg, err := c.AwaitMessage(reflect.TypeOf(&proto.GetBlockMessage{}), timeout)
if err != nil {
return proto.BlockID{}, err
}
getBlockMessage, ok := msg.(*proto.GetBlockMessage)
if !ok {
return proto.BlockID{}, fmt.Errorf("failed to cast message of type %q to GetBlockMessage",
reflect.TypeOf(msg).String())
}
return getBlockMessage.BlockID, nil
}

// AwaitScoreMessage waits for a ScoreMessage from the node for the specified timeout and returns the received score.
func (c *NetClient) AwaitScoreMessage(timeout time.Duration) (*big.Int, error) {
msg, err := c.AwaitMessage(reflect.TypeOf(&proto.ScoreMessage{}), timeout)
if err != nil {
return nil, err
}
scoreMessage, ok := msg.(*proto.ScoreMessage)
if !ok {
return nil, fmt.Errorf("failed to cast message of type %q to ScoreMessage", reflect.TypeOf(msg).String())
}
score := new(big.Int).SetBytes(scoreMessage.Score)
return score, nil
}

// AwaitMicroblockRequest waits for a MicroBlockRequestMessage from the node for the specified timeout and
// returns the received block ID.
func (c *NetClient) AwaitMicroblockRequest(timeout time.Duration) (proto.BlockID, error) {
msg, err := c.AwaitMessage(reflect.TypeOf(&proto.MicroBlockRequestMessage{}), timeout)
if err != nil {
return proto.BlockID{}, err
}
mbr, ok := msg.(*proto.MicroBlockRequestMessage)
if !ok {
return proto.BlockID{}, fmt.Errorf("failed to cast message of type %q to MicroBlockRequestMessage",
reflect.TypeOf(msg).String())
}
r, err := proto.NewBlockIDFromBytes(mbr.TotalBlockSig)
if err != nil {
return proto.BlockID{}, err
}
return r, nil
}

func (c *NetClient) reconnect() {
c.t.Logf("Reconnecting to %q", c.s.RemoteAddr().String())
conn, err := net.Dial("tcp", c.s.RemoteAddr().String())
Expand Down Expand Up @@ -174,10 +265,13 @@ type handler struct {
peers []proto.PeerInfo
t testing.TB
client *NetClient
queue []reflect.Type
ch chan proto.Message
}

func newHandler(t testing.TB, peers []proto.PeerInfo) *handler {
return &handler{t: t, peers: peers}
ch := make(chan proto.Message, 1)
return &handler{t: t, peers: peers, ch: ch}
}

func (h *handler) OnReceive(s *networking.Session, r io.Reader) {
Expand All @@ -195,14 +289,22 @@ func (h *handler) OnReceive(s *networking.Session, r io.Reader) {
}
switch msg.(type) { // Only reply with peers on GetPeersMessage.
case *proto.GetPeersMessage:
h.t.Logf("Received GetPeersMessage from %q", s.RemoteAddr())
rpl := &proto.PeersMessage{Peers: h.peers}
if _, sErr := rpl.WriteTo(s); sErr != nil {
h.t.Logf("Failed to send peers message: %v", sErr)
h.t.FailNow()
return
}
default:
if len(h.queue) == 0 { // No messages to wait for.
return
}
et := h.queue[0]
if reflect.TypeOf(msg) == et {
h.t.Logf("Received expected message of type %q", reflect.TypeOf(msg).String())
h.queue = h.queue[1:] // Pop the expected type.
h.ch <- msg
}
}
}

Expand All @@ -216,3 +318,25 @@ func (h *handler) OnClose(s *networking.Session) {
h.client.reconnect()
}
}

func (h *handler) waitFor(messageType reflect.Type) error {
if messageType == nil {
return errors.New("nil message type")
}
if messageType == reflect.TypeOf(proto.GetPeersMessage{}) {
return errors.New("cannot wait for GetPeersMessage")
}
h.queue = append(h.queue, messageType)
return nil
}

func (h *handler) receiveChan() <-chan proto.Message {
return h.ch
}

func (h *handler) close() {
if h.ch != nil {
close(h.ch)
h.ch = nil
}
}
10 changes: 5 additions & 5 deletions itests/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func NewNodesClients(ctx context.Context, t *testing.T, goPorts, scalaPorts *d.P
}

func (c *NodesClients) SendStartMessage(t *testing.T) {
c.GoClient.HTTPClient.PrintMsg(t, "------------- Start test: "+t.Name()+" -------------")
c.ScalaClient.HTTPClient.PrintMsg(t, "------------- Start test: "+t.Name()+" -------------")
c.GoClient.SendStartMessage(t)
c.ScalaClient.SendStartMessage(t)
}

func (c *NodesClients) SendEndMessage(t *testing.T) {
c.GoClient.HTTPClient.PrintMsg(t, "------------- End test: "+t.Name()+" -------------")
c.ScalaClient.HTTPClient.PrintMsg(t, "------------- End test: "+t.Name()+" -------------")
c.GoClient.SendEndMessage(t)
c.ScalaClient.SendEndMessage(t)
}

func (c *NodesClients) StateHashCmp(t *testing.T, height uint64) (*proto.StateHash, *proto.StateHash, bool) {
Expand Down Expand Up @@ -242,7 +242,7 @@ func (c *NodesClients) SynchronizedWavesBalances(
ctx, cancel := context.WithTimeout(context.Background(), synchronizedBalancesTimeout)
defer cancel()

t.Logf("Initial balacnces request")
t.Logf("Initial balances request")
sbs, err := c.requestAvailableBalancesForAddresses(ctx, addresses)
if err != nil {
t.Logf("Errors while requesting balances: %v", err)
Expand Down
17 changes: 17 additions & 0 deletions itests/clients/universal_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,20 @@ func NewNodeUniversalClient(
Connection: NewNetClient(ctx, t, impl, netPort, peers),
}
}

func (c *NodeUniversalClient) SendStartMessage(t *testing.T) {
c.HTTPClient.PrintMsg(t, "------------- Start test: "+t.Name()+" -------------")
}

func (c *NodeUniversalClient) SendEndMessage(t *testing.T) {
c.HTTPClient.PrintMsg(t, "------------- End test: "+t.Name()+" -------------")
}

func (c *NodeUniversalClient) Handshake() {
c.Connection.SendHandshake()
}

func (c *NodeUniversalClient) Close(t testing.TB) {
c.GRPCClient.Close(t)
c.Connection.Close()
}
88 changes: 15 additions & 73 deletions itests/config/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,19 @@ func (ra *RewardAddresses) AddressesAfter21() []proto.WavesAddress {
return []proto.WavesAddress{}
}

// BlockchainOption is a function type that allows to set additional parameters to BlockchainConfig.
type BlockchainOption func(*BlockchainConfig) error

// BlockchainConfig is a struct that contains settings for blockchain.
// This configuration is used both for building Scala and Go configuration files.
// Also, it's used to produce a Docker container run configurations for both nodes.
type BlockchainConfig struct {
accounts []AccountInfo
supported []int16
desiredReward uint64

Settings *settings.BlockchainSettings
Features []FeatureInfo
RewardAddresses RewardAddresses
EnableScalaMining bool
accounts []AccountInfo
supported []int16
desiredReward uint64
disableGoMining bool
disableScalaMining bool

Settings *settings.BlockchainSettings
Features []FeatureInfo
RewardAddresses RewardAddresses
}

func NewBlockchainConfig(options ...BlockchainOption) (*BlockchainConfig, error) {
Expand Down Expand Up @@ -174,71 +172,15 @@ func (c *BlockchainConfig) TestConfig() TestConfig {
}
}

// WithFeatureSettingFromFile is a BlockchainOption that allows to set feature settings from configuration file.
// Feature settings configuration file is a JSON file with the structure of `featureSettings`.
func WithFeatureSettingFromFile(path ...string) BlockchainOption {
return func(cfg *BlockchainConfig) error {
fs, err := NewFeatureSettingsFromFile(path...)
if err != nil {
return errors.Wrap(err, "failed to modify features settings")
}
cfg.supported = fs.SupportedFeatures
if ftErr := cfg.UpdatePreactivatedFeatures(fs.PreactivatedFeatures); ftErr != nil {
return errors.Wrap(ftErr, "failed to modify preactivated features")
}
return nil
}
}

// WithPaymentsSettingFromFile is a BlockchainOption that allows to set payment settings from configuration file.
// Payment settings configuration file is a JSON file with the structure of `paymentSettings`.
func WithPaymentsSettingFromFile(path ...string) BlockchainOption {
return func(cfg *BlockchainConfig) error {
fs, err := NewPaymentSettingsFromFile(path...)
if err != nil {
return errors.Wrap(err, "failed to modify payments settings")
}
cfg.Settings.PaymentsFixAfterHeight = fs.PaymentsFixAfterHeight
cfg.Settings.InternalInvokePaymentsValidationAfterHeight = fs.InternalInvokePaymentsValidationAfterHeight
cfg.Settings.InternalInvokeCorrectFailRejectBehaviourAfterHeight =
fs.InternalInvokeCorrectFailRejectBehaviourAfterHeight
cfg.Settings.InvokeNoZeroPaymentsAfterHeight = fs.InvokeNoZeroPaymentsAfterHeight
return nil
}
}

// WithRewardSettingFromFile is a BlockchainOption that allows to set reward settings from configuration file.
// Reward settings configuration file is a JSON file with the structure of `rewardSettings`.
func WithRewardSettingFromFile(path ...string) BlockchainOption {
return func(cfg *BlockchainConfig) error {
rs, err := NewRewardSettingsFromFile(path...)
if err != nil {
return errors.Wrap(err, "failed to modify reward settings")
}
cfg.Settings.InitialBlockReward = rs.InitialBlockReward
cfg.Settings.BlockRewardIncrement = rs.BlockRewardIncrement
cfg.Settings.BlockRewardVotingPeriod = rs.BlockRewardVotingPeriod
cfg.Settings.BlockRewardTermAfter20 = rs.BlockRewardTermAfter20
cfg.Settings.BlockRewardTerm = rs.BlockRewardTerm
cfg.Settings.MinXTNBuyBackPeriod = rs.MinXTNBuyBackPeriod

ras, err := NewRewardAddresses(rs.DaoAddress, rs.XtnBuybackAddress)
if err != nil {
return errors.Wrap(err, "failed to modify reward settings")
}
cfg.RewardAddresses = ras
cfg.Settings.RewardAddresses = ras.Addresses()
cfg.Settings.RewardAddressesAfter21 = ras.AddressesAfter21()
cfg.desiredReward = rs.DesiredBlockReward
return nil
}
func (c *BlockchainConfig) DisableGoMiningString() string {
return strconv.FormatBool(c.disableGoMining)
}

func WithScalaMining() BlockchainOption {
return func(cfg *BlockchainConfig) error {
cfg.EnableScalaMining = true
return nil
func (c *BlockchainConfig) EnableScalaMiningString() string {
if c.disableScalaMining {
return "no"
}
return "yes"
}

func safeNow() uint64 {
Expand Down
Loading

0 comments on commit 4001464

Please sign in to comment.