Skip to content

Commit

Permalink
change logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rkapka committed Jan 11, 2024
1 parent 3a7efcd commit 4c29055
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 67 deletions.
14 changes: 14 additions & 0 deletions testing/validator-mock/node_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion testing/validator-mock/validator_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions validator/accounts/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,11 @@ func (m *Validator) SetProposerSettings(_ context.Context, settings *validatorse
func (_ *Validator) StartEventStream(_ context.Context) error {
panic("implement me")
}

func (_ *Validator) EventStreamIsRunning() bool {
panic("implement me")
}

func (_ *Validator) NodeIsHealthy(ctx context.Context) bool {
panic("implement me")
}
4 changes: 4 additions & 0 deletions validator/client/beacon-api/beacon_api_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (c *beaconApiNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*
panic("beaconApiNodeClient.ListPeers is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiNodeClientWithFallback.")
}

func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool {
return c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil) == nil
}

func NewNodeClientWithFallback(jsonRestHandler JsonRestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
return &beaconApiNodeClient{
jsonRestHandler: jsonRestHandler,
Expand Down
16 changes: 5 additions & 11 deletions validator/client/beacon-api/beacon_api_validator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,12 @@ func WithEventHandler(h *EventHandler) ValidatorClientOpt {
}
}

func WithEventErrorChannel(ch chan error) ValidatorClientOpt {
return func(c *beaconApiValidatorClient) {
c.eventErrCh = ch
}
}

type beaconApiValidatorClient struct {
genesisProvider GenesisProvider
dutiesProvider dutiesProvider
stateValidatorsProvider StateValidatorsProvider
jsonRestHandler JsonRestHandler
eventHandler *EventHandler
eventErrCh chan error
beaconBlockConverter BeaconBlockConverter
prysmBeaconChainCLient iface.PrysmBeaconChainClient
}
Expand Down Expand Up @@ -172,12 +165,13 @@ func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *emp

func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error {
if c.eventHandler != nil {
if c.eventErrCh == nil {
return errors.New("event handler cannot be initialized without an event error channel")
}
if err := c.eventHandler.get(ctx, []string{"head"}, c.eventErrCh); err != nil {
if err := c.eventHandler.get(ctx, []string{"head"}); err != nil {
return errors.Wrapf(err, "event handler stopped working")
}
}
return nil
}

func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
return c.eventHandler.running
}
43 changes: 23 additions & 20 deletions validator/client/beacon-api/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const eventByteLimit = 512
type EventHandler struct {
httpClient *http.Client
host string
running bool
subs []eventSub
sync.Mutex
}
Expand All @@ -38,6 +39,7 @@ func NewEventHandler(httpClient *http.Client, host string) *EventHandler {
return &EventHandler{
httpClient: httpClient,
host: host,
running: false,
subs: make([]eventSub, 0),
}
}
Expand All @@ -48,28 +50,29 @@ func (h *EventHandler) subscribe(sub eventSub) {
h.Unlock()
}

func (h *EventHandler) get(ctx context.Context, topics []string, eventErrCh chan<- error) error {
func (h *EventHandler) get(ctx context.Context, topics []string) error {
if len(topics) == 0 {
return errors.New("no topics provided")
}

allTopics := strings.Join(topics, ",")
log.Info("Starting listening to Beacon API events on topics " + allTopics)
url := h.host + "/eth/v1/events?topics=" + allTopics
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return errors.Wrap(err, "failed to create HTTP request")
}

req.Header.Set("Accept", api.EventStreamMediaType)
req.Header.Set("Connection", "keep-alive")

resp, err := h.httpClient.Do(req)
if err != nil {
return errors.Wrap(err, "failed to perform HTTP request")
}

go func() {
h.running = true
defer func() { h.running = false }()

allTopics := strings.Join(topics, ",")
log.Info("Starting listening to Beacon API events on topics " + allTopics)
url := h.host + "/eth/v1/events?topics=" + allTopics
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
log.WithError(err).Error("Failed to create HTTP request")
}
req.Header.Set("Accept", api.EventStreamMediaType)
req.Header.Set("Connection", "keep-alive")
resp, err := h.httpClient.Do(req)
if err != nil {
log.WithError(err).Error("Failed to perform HTTP request")
}

defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.WithError(closeErr).Error("Failed to close events response body")
Expand All @@ -81,18 +84,18 @@ func (h *EventHandler) get(ctx context.Context, topics []string, eventErrCh chan
eof := false
for {
if ctx.Err() != nil {
eventErrCh <- ctx.Err()
log.WithError(ctx.Err()).Error("Stopping listening to Beacon API events")
return
}

rawData := make([]byte, eventByteLimit)
_, err = resp.Body.Read(rawData)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
log.Error("Received EOF while reading events response body")
log.Error("Received EOF while reading events response body. Stopping listening to Beacon API events")
eof = true
} else {
eventErrCh <- err
log.WithError(err).Error("Stopping listening to Beacon API events")
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion validator/client/beacon-api/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestEventHandler(t *testing.T) {
handler.subscribe(sub2)
handler.subscribe(sub3)

require.NoError(t, handler.get(context.Background(), []string{"head"}, make(chan error)))
require.NoError(t, handler.get(context.Background(), []string{"head"}))
// make sure the goroutine inside handler.get is invoked
time.Sleep(500 * time.Millisecond)

Expand Down
11 changes: 5 additions & 6 deletions validator/client/beacon-api/json_rest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"io"
"net/http"
"strings"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/api"
Expand All @@ -25,10 +26,6 @@ type BeaconApiJsonRestHandler struct {
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c BeaconApiJsonRestHandler) Get(ctx context.Context, endpoint string, resp interface{}) error {
if resp == nil {
return errors.New("resp is nil")
}

url := c.Host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
Expand Down Expand Up @@ -92,14 +89,16 @@ func decodeResp(httpResp *http.Response, resp interface{}) error {
}

if httpResp.Header.Get("Content-Type") != api.JsonMediaType {
if httpResp.StatusCode == http.StatusOK {
// 2XX codes are a success
if strings.HasPrefix(httpResp.Status, "2") {
return nil
}
return &httputil.DefaultJsonError{Code: httpResp.StatusCode, Message: string(body)}
}

decoder := json.NewDecoder(bytes.NewBuffer(body))
if httpResp.StatusCode != http.StatusOK {
// non-2XX codes are a failure
if !strings.HasPrefix(httpResp.Status, "2") {
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
Expand Down
5 changes: 5 additions & 0 deletions validator/client/grpc-api/grpc_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (c *grpcNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*ethpb
return c.nodeClient.ListPeers(ctx, in)
}

// IsHealthy returns a dummy value for gRPC
func (c *grpcNodeClient) IsHealthy(context.Context) bool {
return true
}

func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
return &grpcNodeClient{ethpb.NewNodeClient(cc)}
}
5 changes: 5 additions & 0 deletions validator/client/grpc-api/grpc_validator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,8 @@ func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
func (c *grpcValidatorClient) StartEventStream(context.Context) error {
return nil
}

// EventStreamIsRunning returns a dummy value for gRPC
func (c *grpcValidatorClient) EventStreamIsRunning() bool {
return true
}
1 change: 1 addition & 0 deletions validator/client/iface/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type NodeClient interface {
GetGenesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error)
GetVersion(ctx context.Context, in *empty.Empty) (*ethpb.Version, error)
ListPeers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error)
IsHealthy(ctx context.Context) bool
}
2 changes: 2 additions & 0 deletions validator/client/iface/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Validator interface {
ProposerSettings() *validatorserviceconfig.ProposerSettings
SetProposerSettings(context.Context, *validatorserviceconfig.ProposerSettings) error
StartEventStream(ctx context.Context) error
EventStreamIsRunning() bool
NodeIsHealthy(ctx context.Context) bool
}

// SigningFunc interface defines a type for the a function that signs a message
Expand Down
1 change: 1 addition & 0 deletions validator/client/iface/validator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ type ValidatorClient interface {
StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error)
SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error)
StartEventStream(ctx context.Context) error
EventStreamIsRunning() bool
}
28 changes: 28 additions & 0 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
Expand Down Expand Up @@ -189,10 +190,15 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
if err != nil {
log.WithError(err).Fatal("Could not wait for validator activation")
}

if err = v.StartEventStream(ctx); err != nil {
log.WithError(err).Fatal("Could not start API event stream")
}

if features.Get().EnableBeaconRESTApi {
runHealthCheckRoutine(ctx, v)
}

headSlot, err = v.CanonicalHeadSlot(ctx)
if isConnectionError(err) {
log.WithError(err).Warn("Could not get current canonical head slot")
Expand Down Expand Up @@ -275,3 +281,25 @@ func handleAssignmentError(err error, slot primitives.Slot) {
log.WithField("error", err).Error("Failed to update assignments")
}
}

func runHealthCheckRoutine(ctx context.Context, v iface.Validator) {
healthCheckTicker := time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
go func() {
for {
select {
case <-healthCheckTicker.C:
if v.NodeIsHealthy(ctx) && !v.EventStreamIsRunning() {
if err := v.StartEventStream(ctx); err != nil {
log.WithError(err).Error("Could not start API event stream")
}
}
case <-ctx.Done():
if ctx.Err() != nil {
log.WithError(ctx.Err()).Error("Context cancelled")
}
log.Error("Context cancelled")
return
}
}
}()
}
16 changes: 5 additions & 11 deletions validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,17 @@ func (v *ValidatorService) Start() {
HttpClient: http.Client{Timeout: v.conn.GetBeaconApiTimeout()},
Host: v.conn.GetBeaconApiUrl(),
}

evHandler := beaconApi.NewEventHandler(http.DefaultClient, v.conn.GetBeaconApiUrl())
evErrCh := make(chan error)
opts := []beaconApi.ValidatorClientOpt{beaconApi.WithEventHandler(evHandler), beaconApi.WithEventErrorChannel(evErrCh)}
opts := []beaconApi.ValidatorClientOpt{beaconApi.WithEventHandler(evHandler)}
validatorClient := validatorClientFactory.NewValidatorClient(v.conn, restHandler, opts...)
go func() {
e := <-evErrCh
log.WithError(e).Error("Event streaming failed")
v.cancel()
}()

valStruct := &validator{
db: v.db,
validatorClient: validatorClient,
beaconClient: beaconChainClientFactory.NewBeaconChainClient(v.conn, restHandler),
node: nodeClientFactory.NewNodeClient(v.conn, restHandler),
nodeClient: nodeClientFactory.NewNodeClient(v.conn, restHandler),
prysmBeaconClient: beaconChainClientFactory.NewPrysmBeaconClient(v.conn, restHandler),
db: v.db,
graffiti: v.graffiti,
logValidatorBalances: v.logValidatorBalances,
emitAccountMetrics: v.emitAccountMetrics,
Expand All @@ -234,7 +229,6 @@ func (v *ValidatorService) Start() {
Web3SignerConfig: v.Web3SignerConfig,
proposerSettings: v.proposerSettings,
walletInitializedChannel: make(chan *wallet.Wallet, 1),
prysmBeaconClient: beaconChainClientFactory.NewPrysmBeaconClient(v.conn, restHandler),
validatorsRegBatchSize: v.validatorsRegBatchSize,
}

Expand Down
8 changes: 8 additions & 0 deletions validator/client/testutil/mock_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,11 @@ func (fv *FakeValidator) SetProposerSettings(_ context.Context, settings *valida
func (fv *FakeValidator) StartEventStream(_ context.Context) error {
return nil
}

func (fv *FakeValidator) EventStreamIsRunning() bool {
return true
}

func (fv *FakeValidator) NodeIsHealthy(context.Context) bool {
return true
}
Loading

0 comments on commit 4c29055

Please sign in to comment.