Skip to content

Commit

Permalink
Merge pull request #457 from bloxapp/stage
Browse files Browse the repository at this point in the history
Stage to Main (v0.1.7 preparation)
  • Loading branch information
nivBlox authored Nov 30, 2021
2 parents 95a305e + f8347e3 commit a4fa01c
Show file tree
Hide file tree
Showing 20 changed files with 446 additions and 107 deletions.
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ RUN apt-get update && \
RUN go version
RUN python --version

ARG APP_VERSION

WORKDIR /go/src/github.com/bloxapp/ssv/
COPY go.mod .
COPY go.sum .
Expand All @@ -21,11 +23,13 @@ RUN go mod download
#
FROM preparer AS builder

ARG APP_VERSION
# Copy files and install app
COPY . .

RUN go get -d -v ./...
RUN CGO_ENABLED=1 GOOS=linux go install -tags blst_enabled -ldflags "-X main.Version=`git describe --tags $(git rev-list --tags --max-count=1)` -linkmode external -extldflags \"-static -lm\"" ./cmd/ssvnode

RUN CGO_ENABLED=1 GOOS=linux go install -tags blst_enabled -ldflags "-X main.Version=`if [ ! -z "${APP_VERSION}" ]; then echo $APP_VERSION; else git describe --tags $(git rev-list --tags --max-count=1); fi` -linkmode external -extldflags \"-static -lm\"" ./cmd/ssvnode

#
# STEP 3: Prepare image to run the binary
Expand Down
2 changes: 1 addition & 1 deletion docs/OPERATOR_GETTING_STARTED.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ OperatorPrivateKey: LS0tLS...
Run the docker image in the same folder you created the `config.yaml`:
```shell
$ docker run -d --restart unless-stopped --name=ssv_node -e CONFIG_PATH=./config.yaml -p 13000:13000 -p 12000:12000/udp -v $(pwd)/config.yaml:/config.yaml -v $(pwd):/data -it 'bloxstaking/ssv-node:latest' make BUILD_PATH=/go/bin/ssvnode start-node \
$ docker run -d --restart unless-stopped --name=ssv_node -e CONFIG_PATH=./config.yaml -p 13000:13000 -p 12000:12000/udp -v $(pwd)/config.yaml:/config.yaml -v $(pwd):/data --log-opt max-size=500m --log-opt max-file=10 -it 'bloxstaking/ssv-node:latest' make BUILD_PATH=/go/bin/ssvnode start-node \
&& docker logs ssv_node --follow
```

Expand Down
2 changes: 1 addition & 1 deletion docs/resources/cov-badge.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
19 changes: 12 additions & 7 deletions exporter/api/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,26 @@ import (
type Broadcaster interface {
FromFeed(feed *event.Feed) error
Broadcast(msg Message) error
Register(conn Conn) bool
Deregister(conn Conn) bool
Register(conn broadcasted) bool
Deregister(conn broadcasted) bool
}

type broadcasted interface {
ID() string
Send([]byte)
}

type broadcaster struct {
logger *zap.Logger
mut sync.Mutex
connections map[string]Conn
connections map[string]broadcasted
}

func newBroadcaster(logger *zap.Logger) Broadcaster {
return &broadcaster{
logger: logger.With(zap.String("component", "exporter/api/broadcaster")),
mut: sync.Mutex{},
connections: map[string]Conn{},
connections: map[string]broadcasted{},
}
}

Expand Down Expand Up @@ -66,7 +71,7 @@ func (b *broadcaster) Broadcast(msg Message) error {
b.mut.Lock()
b.logger.Debug("broadcasting message", zap.Int("total connections", len(b.connections)),
zap.Any("msg", msg))
var conns []Conn
var conns []broadcasted
for _, c := range b.connections {
conns = append(conns, c)
}
Expand All @@ -80,7 +85,7 @@ func (b *broadcaster) Broadcast(msg Message) error {
}

// Register registers a connection for broadcasting
func (b *broadcaster) Register(conn Conn) bool {
func (b *broadcaster) Register(conn broadcasted) bool {
b.mut.Lock()
defer b.mut.Unlock()

Expand All @@ -93,7 +98,7 @@ func (b *broadcaster) Register(conn Conn) bool {
}

// Deregister de-registers a connection for broadcasting
func (b *broadcaster) Deregister(conn Conn) bool {
func (b *broadcaster) Deregister(conn broadcasted) bool {
b.mut.Lock()
defer b.mut.Unlock()

Expand Down
74 changes: 74 additions & 0 deletions exporter/api/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package api

import (
"fmt"
"github.com/prysmaticlabs/prysm/async/event"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"sync"
"testing"
"time"
)

func TestBroadcaster(t *testing.T) {
logger := zaptest.NewLogger(t)
b := newBroadcaster(logger)

feed := new(event.Feed)
go func() {
require.NoError(t, b.FromFeed(feed))
}()
bm1 := newBroadcastedMock("1")
bm2 := newBroadcastedMock("2")

require.True(t, b.Register(bm1))
defer b.Deregister(bm1)

require.True(t, b.Register(bm2))

// wait so setup will be finished
<-time.After(10 * time.Millisecond)
go feed.Send(Message{Type: TypeValidator, Filter: MessageFilter{From: 0, To: 0}})
<-time.After(5 * time.Millisecond)
go b.Deregister(bm2)
<-time.After(5 * time.Millisecond)
go feed.Send(Message{Type: TypeValidator, Filter: MessageFilter{From: 0, To: 0}})

// wait so messages will propagate
<-time.After(100 * time.Millisecond)
require.Equal(t, bm1.Size(), 2)
// the second broadcasted was deregistered after the first message
require.Equal(t, bm2.Size(), 1)
}

type broadcastedMock struct {
mut sync.Mutex
msgs [][]byte
id string
}

func newBroadcastedMock(id string) *broadcastedMock {
return &broadcastedMock{
mut: sync.Mutex{},
msgs: [][]byte{},
id: id,
}
}

func (b *broadcastedMock) ID() string {
return b.id
}

func (b *broadcastedMock) Send(msg []byte) {
b.mut.Lock()
defer b.mut.Unlock()
fmt.Println("sent")
b.msgs = append(b.msgs, msg)
}

func (b *broadcastedMock) Size() int {
b.mut.Lock()
defer b.mut.Unlock()

return len(b.msgs)
}
58 changes: 42 additions & 16 deletions ibft/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"github.com/bloxapp/ssv/ibft"
contollerforks "github.com/bloxapp/ssv/ibft/controller/forks"
"github.com/bloxapp/ssv/utils/threadsafe"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
Expand All @@ -16,6 +17,11 @@ import (
"github.com/bloxapp/ssv/validator/storage"
)

var (
// ErrAlreadyRunning is used to express that some process is already running, e.g. sync
ErrAlreadyRunning = errors.New("already running")
)

// Controller implements Controller interface
type Controller struct {
currentInstance ibft.Instance
Expand All @@ -30,7 +36,8 @@ type Controller struct {
signer beacon.Signer

// flags
initFinished bool
initHandlers *threadsafe.SafeBool
initSynced *threadsafe.SafeBool

// locks
currentInstanceLock sync.Locker
Expand Down Expand Up @@ -62,7 +69,8 @@ func New(
signer: signer,

// flags
initFinished: false,
initHandlers: threadsafe.NewSafeBool(),
initSynced: threadsafe.NewSafeBool(),

// locks
currentInstanceLock: &sync.Mutex{},
Expand All @@ -75,25 +83,43 @@ func New(
}

// Init sets all major processes of iBFT while blocking until completed.
// if init fails to sync
func (i *Controller) Init() error {
i.logger.Info("iBFT implementation init started")
ReportIBFTStatus(i.ValidatorShare.PublicKey.SerializeToHexStr(), false, false)
i.processDecidedQueueMessages()
i.processSyncQueueMessages()
i.listenToSyncMessages()
i.listenToNetworkMessages()
i.listenToNetworkDecidedMessages()
i.waitForMinPeerOnInit(1) // minimum of 2 validators (me + 1)
if err := i.SyncIBFT(); err != nil {
ReportIBFTStatus(i.ValidatorShare.PublicKey.SerializeToHexStr(), false, true)
return errors.Wrap(err, "could not sync history, stopping Controller init")
if !i.initHandlers.Get() {
i.logger.Info("iBFT implementation init started")
ReportIBFTStatus(i.ValidatorShare.PublicKey.SerializeToHexStr(), false, false)
i.processDecidedQueueMessages()
i.processSyncQueueMessages()
i.listenToSyncMessages()
i.listenToNetworkMessages()
i.listenToNetworkDecidedMessages()
i.initHandlers.Set(true)
i.logger.Debug("managed to setup iBFT handlers")
}

if !i.initSynced.Get() {
// IBFT sync to make sure the operator is aligned for this validator
if err := i.SyncIBFT(); err != nil {
if err == ErrAlreadyRunning {
// don't fail if init is already running
i.logger.Debug("iBFT init is already running (syncing history)")
return nil
}
i.logger.Warn("iBFT implementation init failed to sync history", zap.Error(err))
ReportIBFTStatus(i.ValidatorShare.PublicKey.SerializeToHexStr(), false, true)
return errors.Wrap(err, "could not sync history")
}
ReportIBFTStatus(i.ValidatorShare.PublicKey.SerializeToHexStr(), true, false)
i.logger.Info("iBFT implementation init finished")
}
i.initFinished = true
ReportIBFTStatus(i.ValidatorShare.PublicKey.SerializeToHexStr(), true, false)
i.logger.Info("iBFT implementation init finished")

return nil
}

func (i *Controller) initialized() bool {
return i.initHandlers.Get() && i.initSynced.Get()
}

// StartInstance - starts an ibft instance or returns error
func (i *Controller) StartInstance(opts ibft.ControllerStartInstanceOptions) (res *ibft.InstanceResult, err error) {
instanceOpts, err := i.instanceOptionsFromStartOptions(opts)
Expand Down
4 changes: 4 additions & 0 deletions ibft/controller/controller_decided.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (i *Controller) decidedForCurrentInstance(msg *proto.SignedMessage) bool {
// - highest known seq lower than msg seq
// - AND msg is not for current instance
func (i *Controller) decidedRequiresSync(msg *proto.SignedMessage) (bool, error) {
// if IBFT sync failed to init, trigger it again
if !i.initSynced.Get() {
return true, nil
}
if i.decidedForCurrentInstance(msg) {
return false, nil
}
Expand Down
29 changes: 26 additions & 3 deletions ibft/controller/controller_decided_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestDecidedRequiresSync(t *testing.T) {
msg *proto.SignedMessage
expectedRes bool
expectedErr string
initSynced bool
}{
{
"decided from future, requires sync.",
Expand All @@ -106,6 +107,7 @@ func TestDecidedRequiresSync(t *testing.T) {
}),
true,
"",
true,
},
{
"decided from future, requires sync. current is nil",
Expand All @@ -120,6 +122,22 @@ func TestDecidedRequiresSync(t *testing.T) {
}),
true,
"",
true,
},
{
"decided when init failed to sync",
nil,
SignMsg(t, 1, secretKeys[1], &proto.Message{
Type: proto.RoundState_Commit,
SeqNumber: 1,
}),
SignMsg(t, 1, secretKeys[1], &proto.Message{
Type: proto.RoundState_Commit,
SeqNumber: 1,
}),
true,
"",
false,
},
{
"decided from far future, requires sync.",
Expand All @@ -137,6 +155,7 @@ func TestDecidedRequiresSync(t *testing.T) {
}),
true,
"",
true,
},
{
"decided from past, doesn't requires sync.",
Expand All @@ -154,6 +173,7 @@ func TestDecidedRequiresSync(t *testing.T) {
}),
false,
"",
true,
},
{
"decided for current",
Expand All @@ -171,6 +191,7 @@ func TestDecidedRequiresSync(t *testing.T) {
}),
false,
"",
true,
},
{
"decided for seq 0",
Expand All @@ -185,16 +206,19 @@ func TestDecidedRequiresSync(t *testing.T) {
}),
false,
"",
true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ibft := Controller{
ctrl := Controller{
currentInstance: test.currentInstance,
ibftStorage: newTestStorage(test.highestDecided),
initSynced: threadsafe.NewSafeBool(),
}
res, err := ibft.decidedRequiresSync(test.msg)
ctrl.initSynced.Set(test.initSynced)
res, err := ctrl.decidedRequiresSync(test.msg)
require.EqualValues(t, test.expectedRes, res)
if len(test.expectedErr) > 0 {
require.EqualError(t, err, test.expectedErr)
Expand Down Expand Up @@ -275,7 +299,6 @@ func TestForceDecided(t *testing.T) {
identifier := []byte("lambda_11")
s1 := populatedStorage(t, sks, 3)
i1 := populatedIbft(1, identifier, network, s1, sks, nodes, newTestSigner())

// test before sync
highest, found, err := i1.(*Controller).ibftStorage.GetHighestDecidedInstance(identifier)
require.True(t, found)
Expand Down
Loading

0 comments on commit a4fa01c

Please sign in to comment.