diff --git a/api/headers.go b/api/headers.go index 5973b814087c..b3f7a29ab4be 100644 --- a/api/headers.go +++ b/api/headers.go @@ -7,4 +7,6 @@ const ( ConsensusBlockValueHeader = "Eth-Consensus-Block-Value" JsonMediaType = "application/json" OctetStreamMediaType = "application/octet-stream" + EventStreamMediaType = "text/event-stream" + KeepAlive = "keep-alive" ) diff --git a/beacon-chain/gateway/BUILD.bazel b/beacon-chain/gateway/BUILD.bazel index 5ea81d383e80..f0bb36f59bd7 100644 --- a/beacon-chain/gateway/BUILD.bazel +++ b/beacon-chain/gateway/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/gateway", visibility = ["//beacon-chain:__subpackages__"], deps = [ + "//api:go_default_library", "//api/gateway:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//proto/prysm/v1alpha1:go_default_library", diff --git a/beacon-chain/gateway/helpers.go b/beacon-chain/gateway/helpers.go index b51777b03c59..d8abfe1dfdef 100644 --- a/beacon-chain/gateway/helpers.go +++ b/beacon-chain/gateway/helpers.go @@ -2,6 +2,7 @@ package gateway import ( gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/api/gateway" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" ethpbalpha "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -40,7 +41,7 @@ func DefaultConfig(enableDebugRPCEndpoints bool, httpModules string) MuxConfig { }, }), gwruntime.WithMarshalerOption( - "text/event-stream", &gwruntime.EventSourceJSONPb{}, + api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{}, ), ) v1AlphaPbHandler = &gateway.PbMux{ diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index 449cce79d4e3..378fd828d59b 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -8,8 +8,9 @@ go_library( "structs.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events", - visibility = ["//beacon-chain:__subpackages__"], + visibility = ["//visibility:public"], deps = [ + "//api:go_default_library", "//beacon-chain/blockchain:go_default_library", "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", @@ -18,6 +19,7 @@ go_library( "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", "//beacon-chain/rpc/eth/shared:go_default_library", + "//config/params:go_default_library", "//network/httputil:go_default_library", "//proto/eth/v1:go_default_library", "//proto/eth/v2:go_default_library", diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index dc5c3c98a97f..e922e90600e6 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -5,11 +5,10 @@ import ( "encoding/json" "fmt" "net/http" + time2 "time" "github.com/ethereum/go-ethereum/common/hexutil" - log "github.com/sirupsen/logrus" - "go.opencensus.io/trace" - + "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation" @@ -18,11 +17,14 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" + "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/network/httputil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1" ethpbv2 "github.com/prysmaticlabs/prysm/v4/proto/eth/v2" "github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/time/slots" + log "github.com/sirupsen/logrus" + "go.opencensus.io/trace" ) const ( @@ -114,16 +116,24 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { defer stateSub.Unsubscribe() // Set up SSE response headers - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Connection", "keep-alive") + w.Header().Set("Content-Type", api.EventStreamMediaType) + w.Header().Set("Connection", api.KeepAlive) // Handle each event received and context cancellation. + // We send a keepalive dummy message immediately to prevent clients + // stalling while waiting for the first response chunk. + // After that we send a keepalive dummy message every SECONDS_PER_SLOT + // to prevent anyone (e.g. proxy servers) from closing connections. + sendKeepalive(w, flusher) + keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second) for { select { case event := <-opsChan: handleBlockOperationEvents(w, flusher, topicsMap, event) case event := <-stateChan: s.handleStateEvents(ctx, w, flusher, topicsMap, event) + case <-keepaliveTicker.C: + sendKeepalive(w, flusher) case <-ctx.Done(): return } @@ -505,6 +515,10 @@ func send(w http.ResponseWriter, flusher http.Flusher, name string, data interfa write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j)) } +func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) { + write(w, flusher, ":\n\n") +} + func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) { _, err := fmt.Fprintf(w, format, a...) if err != nil { diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 1acb9244560e..236c3da2570d 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -375,7 +375,9 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { }) } -const operationsResult = `event: attestation +const operationsResult = `: + +event: attestation data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} event: attestation @@ -401,7 +403,9 @@ data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_roo ` -const stateResult = `event: head +const stateResult = `: + +event: head data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"} event: finalized_checkpoint @@ -415,17 +419,23 @@ data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a1 ` -const payloadAttributesBellatrixResult = `event: payload_attributes +const payloadAttributesBellatrixResult = `: + +event: payload_attributes data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}} ` -const payloadAttributesCapellaResult = `event: payload_attributes +const payloadAttributesCapellaResult = `: + +event: payload_attributes data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}} ` -const payloadAttributesDenebResult = `event: payload_attributes +const payloadAttributesDenebResult = `: + +event: payload_attributes data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}} ` diff --git a/testing/validator-mock/node_client_mock.go b/testing/validator-mock/node_client_mock.go index 04dc946f05e0..be28b56c38a8 100644 --- a/testing/validator-mock/node_client_mock.go +++ b/testing/validator-mock/node_client_mock.go @@ -81,6 +81,20 @@ func (mr *MockNodeClientMockRecorder) GetVersion(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVersion", reflect.TypeOf((*MockNodeClient)(nil).GetVersion), arg0, arg1) } +// IsHealthy mocks base method. +func (m *MockNodeClient) IsHealthy(arg0 context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsHealthy", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsHealthy indicates an expected call of IsHealthy. +func (mr *MockNodeClientMockRecorder) IsHealthy(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockNodeClient)(nil).IsHealthy), arg0) +} + // ListPeers mocks base method. func (m *MockNodeClient) ListPeers(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Peers, error) { m.ctrl.T.Helper() diff --git a/testing/validator-mock/validator_client_mock.go b/testing/validator-mock/validator_client_mock.go index 071b039d110a..d71baea2b21b 100644 --- a/testing/validator-mock/validator_client_mock.go +++ b/testing/validator-mock/validator_client_mock.go @@ -67,6 +67,20 @@ func (mr *MockValidatorClientMockRecorder) DomainData(arg0, arg1 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DomainData", reflect.TypeOf((*MockValidatorClient)(nil).DomainData), arg0, arg1) } +// EventStreamIsRunning mocks base method. +func (m *MockValidatorClient) EventStreamIsRunning() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EventStreamIsRunning") + ret0, _ := ret[0].(bool) + return ret0 +} + +// EventStreamIsRunning indicates an expected call of EventStreamIsRunning. +func (mr *MockValidatorClientMockRecorder) EventStreamIsRunning() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventStreamIsRunning", reflect.TypeOf((*MockValidatorClient)(nil).EventStreamIsRunning)) +} + // GetAttestationData mocks base method. func (m *MockValidatorClient) GetAttestationData(arg0 context.Context, arg1 *eth.AttestationDataRequest) (*eth.AttestationData, error) { m.ctrl.T.Helper() @@ -247,6 +261,20 @@ func (mr *MockValidatorClientMockRecorder) ProposeExit(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeExit", reflect.TypeOf((*MockValidatorClient)(nil).ProposeExit), arg0, arg1) } +// StartEventStream mocks base method. +func (m *MockValidatorClient) StartEventStream(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartEventStream", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// StartEventStream indicates an expected call of StartEventStream. +func (mr *MockValidatorClientMockRecorder) StartEventStream(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartEventStream", reflect.TypeOf((*MockValidatorClient)(nil).StartEventStream), arg0) +} + // StreamSlots mocks base method. func (m *MockValidatorClient) StreamSlots(arg0 context.Context, arg1 *eth.StreamSlotsRequest) (eth.BeaconNodeValidator_StreamSlotsClient, error) { m.ctrl.T.Helper() diff --git a/validator/accounts/testing/mock.go b/validator/accounts/testing/mock.go index f43ed5130e89..3a92d08c1f01 100644 --- a/validator/accounts/testing/mock.go +++ b/validator/accounts/testing/mock.go @@ -212,3 +212,15 @@ func (m *Validator) SetProposerSettings(_ context.Context, settings *validatorse m.proposerSettings = settings return nil } + +func (_ *Validator) StartEventStream(_ context.Context) error { + panic("implement me") +} + +func (_ *Validator) EventStreamIsRunning() bool { + panic("implement me") +} + +func (_ *Validator) NodeIsHealthy(ctx context.Context) bool { + panic("implement me") +} diff --git a/validator/client/beacon-api/BUILD.bazel b/validator/client/beacon-api/BUILD.bazel index 07746d098eff..fad6d0ea3bb0 100644 --- a/validator/client/beacon-api/BUILD.bazel +++ b/validator/client/beacon-api/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "domain_data.go", "doppelganger.go", "duties.go", + "event_handler.go", "genesis.go", "get_beacon_block.go", "index.go", @@ -43,6 +44,7 @@ go_library( "//beacon-chain/core/signing:go_default_library", "//beacon-chain/rpc/eth/beacon:go_default_library", "//beacon-chain/rpc/eth/config:go_default_library", + "//beacon-chain/rpc/eth/events:go_default_library", "//beacon-chain/rpc/eth/node:go_default_library", "//beacon-chain/rpc/eth/shared:go_default_library", "//beacon-chain/rpc/eth/validator:go_default_library", @@ -83,6 +85,7 @@ go_test( "domain_data_test.go", "doppelganger_test.go", "duties_test.go", + "event_handler_test.go", "genesis_test.go", "get_beacon_block_test.go", "index_test.go", @@ -139,6 +142,7 @@ go_test( "@com_github_golang_mock//gomock:go_default_library", "@com_github_golang_protobuf//ptypes/empty", "@com_github_pkg_errors//:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", "@org_golang_google_protobuf//types/known/emptypb:go_default_library", "@org_golang_google_protobuf//types/known/timestamppb:go_default_library", ], diff --git a/validator/client/beacon-api/beacon_api_node_client.go b/validator/client/beacon-api/beacon_api_node_client.go index d66f26ca19db..230848791f71 100644 --- a/validator/client/beacon-api/beacon_api_node_client.go +++ b/validator/client/beacon-api/beacon_api_node_client.go @@ -98,6 +98,10 @@ func (c *beaconApiNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (* panic("beaconApiNodeClient.ListPeers is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiNodeClientWithFallback.") } +func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool { + return c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil) == nil +} + func NewNodeClientWithFallback(jsonRestHandler JsonRestHandler, fallbackClient iface.NodeClient) iface.NodeClient { return &beaconApiNodeClient{ jsonRestHandler: jsonRestHandler, diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index fd9aaa13bd6f..df7fc6a06333 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -13,17 +13,26 @@ import ( "github.com/prysmaticlabs/prysm/v4/validator/client/iface" ) +type ValidatorClientOpt func(*beaconApiValidatorClient) + +func WithEventHandler(h *EventHandler) ValidatorClientOpt { + return func(c *beaconApiValidatorClient) { + c.eventHandler = h + } +} + type beaconApiValidatorClient struct { genesisProvider GenesisProvider dutiesProvider dutiesProvider stateValidatorsProvider StateValidatorsProvider jsonRestHandler JsonRestHandler + eventHandler *EventHandler beaconBlockConverter BeaconBlockConverter prysmBeaconChainCLient iface.PrysmBeaconChainClient } -func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler) iface.ValidatorClient { - return &beaconApiValidatorClient{ +func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient { + c := &beaconApiValidatorClient{ genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler}, dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler}, stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}, @@ -34,6 +43,10 @@ func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler) iface.Validato jsonRestHandler: jsonRestHandler, }, } + for _, o := range opts { + o(c) + } + return c } func (c *beaconApiValidatorClient) GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) { @@ -149,3 +162,16 @@ func (c *beaconApiValidatorClient) WaitForActivation(ctx context.Context, in *et func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *empty.Empty) (*ethpb.ChainStartResponse, error) { return c.waitForChainStart(ctx) } + +func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error { + if c.eventHandler != nil { + if err := c.eventHandler.get(ctx, []string{"head"}); err != nil { + return errors.Wrapf(err, "could not invoke event handler") + } + } + return nil +} + +func (c *beaconApiValidatorClient) EventStreamIsRunning() bool { + return c.eventHandler.running +} diff --git a/validator/client/beacon-api/event_handler.go b/validator/client/beacon-api/event_handler.go new file mode 100644 index 000000000000..dcb9bd800166 --- /dev/null +++ b/validator/client/beacon-api/event_handler.go @@ -0,0 +1,134 @@ +package beacon_api + +import ( + "context" + "net/http" + "strings" + "sync" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/api" +) + +// Currently set to the first power of 2 bigger than the size of the `head` event +// which is 446 bytes +const eventByteLimit = 512 + +// EventHandler is responsible for subscribing to the Beacon API events endpoint +// and dispatching received events to subscribers. +type EventHandler struct { + httpClient *http.Client + host string + running bool + subs []eventSub + sync.Mutex +} + +type eventSub struct { + name string + ch chan<- event +} + +type event struct { + eventType string + data string +} + +// NewEventHandler returns a new handler. +func NewEventHandler(httpClient *http.Client, host string) *EventHandler { + return &EventHandler{ + httpClient: httpClient, + host: host, + running: false, + subs: make([]eventSub, 0), + } +} + +func (h *EventHandler) subscribe(sub eventSub) { + h.Lock() + h.subs = append(h.subs, sub) + h.Unlock() +} + +func (h *EventHandler) get(ctx context.Context, topics []string) error { + if len(topics) == 0 { + return errors.New("no topics provided") + } + if h.running { + log.Warn("Event listener is already running, ignoring function call") + } + + go func() { + h.running = true + defer func() { h.running = false }() + + allTopics := strings.Join(topics, ",") + log.Info("Starting listening to Beacon API events on topics: " + allTopics) + url := h.host + "/eth/v1/events?topics=" + allTopics + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + log.WithError(err).Error("Failed to create HTTP request") + return + } + req.Header.Set("Accept", api.EventStreamMediaType) + req.Header.Set("Connection", api.KeepAlive) + resp, err := h.httpClient.Do(req) + if err != nil { + log.WithError(err).Error("Failed to perform HTTP request") + return + } + + defer func() { + if closeErr := resp.Body.Close(); closeErr != nil { + log.WithError(closeErr).Error("Failed to close events response body") + } + }() + + // We signal an EOF error in a special way. When we get this error while reading the response body, + // there might still be an event received in the body that we should handle. + eof := false + for { + if ctx.Err() != nil { + log.WithError(ctx.Err()).Error("Stopping listening to Beacon API events") + return + } + + rawData := make([]byte, eventByteLimit) + _, err = resp.Body.Read(rawData) + if err != nil { + if strings.Contains(err.Error(), "EOF") { + log.Error("Received EOF while reading events response body. Stopping listening to Beacon API events") + eof = true + } else { + log.WithError(err).Error("Stopping listening to Beacon API events") + return + } + } + + e := strings.Split(string(rawData), "\n") + // We expect that the event format will contain event type and data separated with a newline + if len(e) < 2 { + // We reached EOF and there is no event to send + if eof { + return + } + continue + } + + for _, sub := range h.subs { + select { + case sub.ch <- event{eventType: e[0], data: e[1]}: + // Event sent successfully. + default: + log.Warn("Subscriber '" + sub.name + "' not ready to receive events") + } + } + // We reached EOF and sent the last event + if eof { + return + } + } + }() + + return nil +} diff --git a/validator/client/beacon-api/event_handler_test.go b/validator/client/beacon-api/event_handler_test.go new file mode 100644 index 000000000000..5447b33f4ae9 --- /dev/null +++ b/validator/client/beacon-api/event_handler_test.go @@ -0,0 +1,55 @@ +package beacon_api + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/prysmaticlabs/prysm/v4/testing/assert" + "github.com/prysmaticlabs/prysm/v4/testing/require" + logtest "github.com/sirupsen/logrus/hooks/test" +) + +func TestEventHandler(t *testing.T) { + logHook := logtest.NewGlobal() + + mux := http.NewServeMux() + mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + require.Equal(t, true, ok) + _, err := fmt.Fprint(w, "head\ndata\n\n") + require.NoError(t, err) + flusher.Flush() + }) + server := httptest.NewServer(mux) + defer server.Close() + + handler := NewEventHandler(http.DefaultClient, server.URL) + ch1 := make(chan event, 1) + sub1 := eventSub{ch: ch1} + ch2 := make(chan event, 1) + sub2 := eventSub{ch: ch2} + ch3 := make(chan event, 1) + sub3 := eventSub{name: "sub3", ch: ch3} + // fill up the channel so that it can't receive more events + ch3 <- event{} + handler.subscribe(sub1) + handler.subscribe(sub2) + handler.subscribe(sub3) + + require.NoError(t, handler.get(context.Background(), []string{"head"})) + // make sure the goroutine inside handler.get is invoked + time.Sleep(500 * time.Millisecond) + + e := <-ch1 + assert.Equal(t, "head", e.eventType) + assert.Equal(t, "data", e.data) + e = <-ch2 + assert.Equal(t, "head", e.eventType) + assert.Equal(t, "data", e.data) + + assert.LogsContain(t, logHook, "Subscriber 'sub3' not ready to receive events") +} diff --git a/validator/client/beacon-api/json_rest_handler.go b/validator/client/beacon-api/json_rest_handler.go index 9271a84826df..e5962bf82036 100644 --- a/validator/client/beacon-api/json_rest_handler.go +++ b/validator/client/beacon-api/json_rest_handler.go @@ -6,6 +6,7 @@ import ( "encoding/json" "io" "net/http" + "strings" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/api" @@ -25,10 +26,6 @@ type BeaconApiJsonRestHandler struct { // Get sends a GET request and decodes the response body as a JSON object into the passed in object. // If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value. func (c BeaconApiJsonRestHandler) Get(ctx context.Context, endpoint string, resp interface{}) error { - if resp == nil { - return errors.New("resp is nil") - } - url := c.Host + endpoint req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -92,14 +89,16 @@ func decodeResp(httpResp *http.Response, resp interface{}) error { } if httpResp.Header.Get("Content-Type") != api.JsonMediaType { - if httpResp.StatusCode == http.StatusOK { + // 2XX codes are a success + if strings.HasPrefix(httpResp.Status, "2") { return nil } return &httputil.DefaultJsonError{Code: httpResp.StatusCode, Message: string(body)} } decoder := json.NewDecoder(bytes.NewBuffer(body)) - if httpResp.StatusCode != http.StatusOK { + // non-2XX codes are a failure + if !strings.HasPrefix(httpResp.Status, "2") { errorJson := &httputil.DefaultJsonError{} if err = decoder.Decode(errorJson); err != nil { return errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL) diff --git a/validator/client/beacon-api/json_rest_handler_test.go b/validator/client/beacon-api/json_rest_handler_test.go index 03ba8d9e3088..a94d48189048 100644 --- a/validator/client/beacon-api/json_rest_handler_test.go +++ b/validator/client/beacon-api/json_rest_handler_test.go @@ -103,17 +103,29 @@ func Test_decodeResp(t *testing.T) { t.Run("200 non-JSON", func(t *testing.T) { body := bytes.Buffer{} r := &http.Response{ + Status: "200", StatusCode: http.StatusOK, Body: io.NopCloser(&body), Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}}, } require.NoError(t, decodeResp(r, nil)) }) - t.Run("non-200 non-JSON", func(t *testing.T) { + t.Run("204 non-JSON", func(t *testing.T) { + body := bytes.Buffer{} + r := &http.Response{ + Status: "204", + StatusCode: http.StatusNoContent, + Body: io.NopCloser(&body), + Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}}, + } + require.NoError(t, decodeResp(r, nil)) + }) + t.Run("500 non-JSON", func(t *testing.T) { body := bytes.Buffer{} _, err := body.WriteString("foo") require.NoError(t, err) r := &http.Response{ + Status: "500", StatusCode: http.StatusInternalServerError, Body: io.NopCloser(&body), Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}}, @@ -130,6 +142,7 @@ func Test_decodeResp(t *testing.T) { require.NoError(t, err) body.Write(b) r := &http.Response{ + Status: "200", StatusCode: http.StatusOK, Body: io.NopCloser(&body), Header: map[string][]string{"Content-Type": {api.JsonMediaType}}, @@ -141,18 +154,30 @@ func Test_decodeResp(t *testing.T) { t.Run("200 JSON without resp", func(t *testing.T) { body := bytes.Buffer{} r := &http.Response{ + Status: "200", StatusCode: http.StatusOK, Body: io.NopCloser(&body), Header: map[string][]string{"Content-Type": {api.JsonMediaType}}, } require.NoError(t, decodeResp(r, nil)) }) - t.Run("non-200 JSON", func(t *testing.T) { + t.Run("204 JSON", func(t *testing.T) { + body := bytes.Buffer{} + r := &http.Response{ + Status: "204", + StatusCode: http.StatusNoContent, + Body: io.NopCloser(&body), + Header: map[string][]string{"Content-Type": {api.JsonMediaType}}, + } + require.NoError(t, decodeResp(r, nil)) + }) + t.Run("500 JSON", func(t *testing.T) { body := bytes.Buffer{} b, err := json.Marshal(&httputil.DefaultJsonError{Code: http.StatusInternalServerError, Message: "error"}) require.NoError(t, err) body.Write(b) r := &http.Response{ + Status: "500", StatusCode: http.StatusInternalServerError, Body: io.NopCloser(&body), Header: map[string][]string{"Content-Type": {api.JsonMediaType}}, @@ -168,6 +193,7 @@ func Test_decodeResp(t *testing.T) { _, err := body.WriteString("foo") require.NoError(t, err) r := &http.Response{ + Status: "200", StatusCode: http.StatusOK, Body: io.NopCloser(&body), Header: map[string][]string{"Content-Type": {api.JsonMediaType}}, @@ -177,11 +203,12 @@ func Test_decodeResp(t *testing.T) { err = decodeResp(r, resp) assert.ErrorContains(t, "failed to decode response body into json", err) }) - t.Run("non-200 JSON cannot decode", func(t *testing.T) { + t.Run("500 JSON cannot decode", func(t *testing.T) { body := bytes.Buffer{} _, err := body.WriteString("foo") require.NoError(t, err) r := &http.Response{ + Status: "500", StatusCode: http.StatusInternalServerError, Body: io.NopCloser(&body), Header: map[string][]string{"Content-Type": {api.JsonMediaType}}, diff --git a/validator/client/beacon-api/stream_blocks.go b/validator/client/beacon-api/stream_blocks.go index d68b7e030af2..2d6d453aec3a 100644 --- a/validator/client/beacon-api/stream_blocks.go +++ b/validator/client/beacon-api/stream_blocks.go @@ -4,10 +4,12 @@ import ( "bytes" "context" "encoding/json" + "strconv" "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -26,8 +28,8 @@ type streamSlotsClient struct { ctx context.Context beaconApiClient beaconApiValidatorClient streamSlotsRequest *ethpb.StreamSlotsRequest - prevBlockSlot primitives.Slot pingDelay time.Duration + ch chan event } type streamBlocksAltairClient struct { @@ -46,11 +48,14 @@ type headSignedBeaconBlockResult struct { } func (c beaconApiValidatorClient) streamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest, pingDelay time.Duration) ethpb.BeaconNodeValidator_StreamSlotsClient { + ch := make(chan event, 1) + c.eventHandler.subscribe(eventSub{name: "stream slots", ch: ch}) return &streamSlotsClient{ ctx: ctx, beaconApiClient: c, streamSlotsRequest: in, pingDelay: pingDelay, + ch: ch, } } @@ -64,28 +69,27 @@ func (c beaconApiValidatorClient) streamBlocks(ctx context.Context, in *ethpb.St } func (c *streamSlotsClient) Recv() (*ethpb.StreamSlotsResponse, error) { - result, err := c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx) - if err != nil { - return nil, errors.Wrap(err, "failed to get latest signed block") - } - - // We keep querying the beacon chain for the latest block until we receive a new slot - for (c.streamSlotsRequest.VerifiedOnly && result.executionOptimistic) || c.prevBlockSlot == result.slot { + for { select { - case <-time.After(c.pingDelay): - result, err = c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx) + case rawEvent := <-c.ch: + if rawEvent.eventType != events.HeadTopic { + continue + } + e := &events.HeadEvent{} + if err := json.Unmarshal([]byte(rawEvent.data), e); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal head event into JSON") + } + uintSlot, err := strconv.ParseUint(e.Slot, 10, 64) if err != nil { - return nil, errors.Wrap(err, "failed to get latest signed block") + return nil, errors.Wrap(err, "failed to parse slot") } + return ðpb.StreamSlotsResponse{ + Slot: primitives.Slot(uintSlot), + }, nil case <-c.ctx.Done(): return nil, errors.New("context canceled") } } - - c.prevBlockSlot = result.slot - return ðpb.StreamSlotsResponse{ - Slot: result.slot, - }, nil } func (c *streamBlocksAltairClient) Recv() (*ethpb.StreamBlocksResponse, error) { diff --git a/validator/client/grpc-api/grpc_node_client.go b/validator/client/grpc-api/grpc_node_client.go index 72a94390b439..fe35670f25e9 100644 --- a/validator/client/grpc-api/grpc_node_client.go +++ b/validator/client/grpc-api/grpc_node_client.go @@ -29,6 +29,10 @@ func (c *grpcNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*ethpb return c.nodeClient.ListPeers(ctx, in) } +func (c *grpcNodeClient) IsHealthy(context.Context) bool { + panic("function not supported for gRPC client") +} + func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient { return &grpcNodeClient{ethpb.NewNodeClient(cc)} } diff --git a/validator/client/grpc-api/grpc_validator_client.go b/validator/client/grpc-api/grpc_validator_client.go index aa1c079130c6..c97e1a621709 100644 --- a/validator/client/grpc-api/grpc_validator_client.go +++ b/validator/client/grpc-api/grpc_validator_client.go @@ -141,3 +141,11 @@ func (c *grpcValidatorClient) AggregatedSigAndAggregationBits( func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient { return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)} } + +func (c *grpcValidatorClient) StartEventStream(context.Context) error { + panic("function not supported for gRPC client") +} + +func (c *grpcValidatorClient) EventStreamIsRunning() bool { + panic("function not supported for gRPC client") +} diff --git a/validator/client/iface/node_client.go b/validator/client/iface/node_client.go index fd652384fbb5..3d4f05164cd7 100644 --- a/validator/client/iface/node_client.go +++ b/validator/client/iface/node_client.go @@ -12,4 +12,5 @@ type NodeClient interface { GetGenesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error) GetVersion(ctx context.Context, in *empty.Empty) (*ethpb.Version, error) ListPeers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error) + IsHealthy(ctx context.Context) bool } diff --git a/validator/client/iface/validator.go b/validator/client/iface/validator.go index a43fb97f6606..ebd4e7b41231 100644 --- a/validator/client/iface/validator.go +++ b/validator/client/iface/validator.go @@ -64,6 +64,9 @@ type Validator interface { SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) ProposerSettings() *validatorserviceconfig.ProposerSettings SetProposerSettings(context.Context, *validatorserviceconfig.ProposerSettings) error + StartEventStream(ctx context.Context) error + EventStreamIsRunning() bool + NodeIsHealthy(ctx context.Context) bool } // SigningFunc interface defines a type for the a function that signs a message diff --git a/validator/client/iface/validator_client.go b/validator/client/iface/validator_client.go index e83e5d1faa4f..bb04fb3f72bf 100644 --- a/validator/client/iface/validator_client.go +++ b/validator/client/iface/validator_client.go @@ -34,4 +34,6 @@ type ValidatorClient interface { SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) + StartEventStream(ctx context.Context) error + EventStreamIsRunning() bool } diff --git a/validator/client/runner.go b/validator/client/runner.go index 16d71717ac13..020fa468ee7e 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/config/features" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -195,6 +196,13 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) ( log.WithError(err).Fatal("Could not wait for validator activation") } + if features.Get().EnableBeaconRESTApi { + if err = v.StartEventStream(ctx); err != nil { + log.WithError(err).Fatal("Could not start API event stream") + } + runHealthCheckRoutine(ctx, v) + } + headSlot, err = v.CanonicalHeadSlot(ctx) if isConnectionError(err) { log.WithError(err).Warn("Could not get current canonical head slot") @@ -279,3 +287,25 @@ func handleAssignmentError(err error, slot primitives.Slot) { log.WithField("error", err).Error("Failed to update assignments") } } + +func runHealthCheckRoutine(ctx context.Context, v iface.Validator) { + healthCheckTicker := time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) + go func() { + for { + select { + case <-healthCheckTicker.C: + if v.NodeIsHealthy(ctx) && !v.EventStreamIsRunning() { + if err := v.StartEventStream(ctx); err != nil { + log.WithError(err).Error("Could not start API event stream") + } + } + case <-ctx.Done(): + if ctx.Err() != nil { + log.WithError(ctx.Err()).Error("Context cancelled") + } + log.Error("Context cancelled") + return + } + } + }() +} diff --git a/validator/client/service.go b/validator/client/service.go index a499dfca0f2e..d10d3a80c2b1 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -196,11 +196,16 @@ func (v *ValidatorService) Start() { Host: v.conn.GetBeaconApiUrl(), } + evHandler := beaconApi.NewEventHandler(http.DefaultClient, v.conn.GetBeaconApiUrl()) + opts := []beaconApi.ValidatorClientOpt{beaconApi.WithEventHandler(evHandler)} + validatorClient := validatorClientFactory.NewValidatorClient(v.conn, restHandler, opts...) + valStruct := &validator{ - db: v.db, - validatorClient: validatorClientFactory.NewValidatorClient(v.conn, restHandler), + validatorClient: validatorClient, beaconClient: beaconChainClientFactory.NewBeaconChainClient(v.conn, restHandler), - node: nodeClientFactory.NewNodeClient(v.conn, restHandler), + nodeClient: nodeClientFactory.NewNodeClient(v.conn, restHandler), + prysmBeaconClient: beaconChainClientFactory.NewPrysmBeaconClient(v.conn, restHandler), + db: v.db, graffiti: v.graffiti, logValidatorBalances: v.logValidatorBalances, emitAccountMetrics: v.emitAccountMetrics, @@ -224,7 +229,6 @@ func (v *ValidatorService) Start() { Web3SignerConfig: v.Web3SignerConfig, proposerSettings: v.proposerSettings, walletInitializedChannel: make(chan *wallet.Wallet, 1), - prysmBeaconClient: beaconChainClientFactory.NewPrysmBeaconClient(v.conn, restHandler), validatorsRegBatchSize: v.validatorsRegBatchSize, } diff --git a/validator/client/testutil/mock_validator.go b/validator/client/testutil/mock_validator.go index b7956d04ea08..e2affbc7679d 100644 --- a/validator/client/testutil/mock_validator.go +++ b/validator/client/testutil/mock_validator.go @@ -174,18 +174,18 @@ func (fv *FakeValidator) ProposeBlock(_ context.Context, slot primitives.Slot, _ } // SubmitAggregateAndProof for mocking. -func (_ *FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { +func (*FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { } // SubmitSyncCommitteeMessage for mocking. -func (_ *FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { +func (*FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { } // LogAttestationsSubmitted for mocking. -func (_ *FakeValidator) LogAttestationsSubmitted() {} +func (*FakeValidator) LogAttestationsSubmitted() {} // UpdateDomainDataCaches for mocking. -func (_ *FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {} +func (*FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {} // BalancesByPubkeys for mocking. func (fv *FakeValidator) BalancesByPubkeys(_ context.Context) map[[fieldparams.BLSPubkeyLength]byte]uint64 { @@ -213,7 +213,7 @@ func (fv *FakeValidator) Keymanager() (keymanager.IKeymanager, error) { } // CheckDoppelGanger for mocking -func (_ *FakeValidator) CheckDoppelGanger(_ context.Context) error { +func (*FakeValidator) CheckDoppelGanger(_ context.Context) error { return nil } @@ -237,7 +237,7 @@ func (fv *FakeValidator) HandleKeyReload(_ context.Context, newKeys [][fieldpara } // SubmitSignedContributionAndProof for mocking -func (_ *FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { +func (*FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { } // HasProposerSettings for mocking @@ -266,22 +266,34 @@ func (fv *FakeValidator) PushProposerSettings(ctx context.Context, km keymanager } // SetPubKeyToValidatorIndexMap for mocking -func (_ *FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error { +func (*FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error { return nil } // SignValidatorRegistrationRequest for mocking -func (_ *FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) { +func (*FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) { return nil, nil } // ProposerSettings for mocking -func (f *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings { - return f.proposerSettings +func (fv *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings { + return fv.proposerSettings } // SetProposerSettings for mocking -func (f *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error { - f.proposerSettings = settings +func (fv *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error { + fv.proposerSettings = settings return nil } + +func (fv *FakeValidator) StartEventStream(_ context.Context) error { + return nil +} + +func (fv *FakeValidator) EventStreamIsRunning() bool { + return true +} + +func (fv *FakeValidator) NodeIsHealthy(context.Context) bool { + return true +} diff --git a/validator/client/validator-client-factory/validator_client_factory.go b/validator/client/validator-client-factory/validator_client_factory.go index a3e71b74d222..d9e6cf5420d3 100644 --- a/validator/client/validator-client-factory/validator_client_factory.go +++ b/validator/client/validator-client-factory/validator_client_factory.go @@ -11,9 +11,10 @@ import ( func NewValidatorClient( validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.JsonRestHandler, + opt ...beaconApi.ValidatorClientOpt, ) iface.ValidatorClient { if features.Get().EnableBeaconRESTApi { - return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler) + return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler, opt...) } else { return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn()) } diff --git a/validator/client/validator.go b/validator/client/validator.go index bcf28905a12a..281129df1470 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -91,19 +91,19 @@ type validator struct { interopKeysConfig *local.InteropKeymanagerConfig wallet *wallet.Wallet graffitiStruct *graffiti.Graffiti - node iface.NodeClient - db vdb.Database beaconClient iface.BeaconChainClient + nodeClient iface.NodeClient + validatorClient iface.ValidatorClient + prysmBeaconClient iface.PrysmBeaconChainClient + db vdb.Database keyManager keymanager.IKeymanager ticker slots.Ticker - validatorClient iface.ValidatorClient graffiti []byte voteStats voteStats syncCommitteeStats syncCommitteeStats Web3SignerConfig *remoteweb3signer.SetupConfig proposerSettings *validatorserviceconfig.ProposerSettings walletInitializedChannel chan *wallet.Wallet - prysmBeaconClient iface.PrysmBeaconChainClient validatorsRegBatchSize int } @@ -304,7 +304,7 @@ func (v *validator) WaitForSync(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "validator.WaitForSync") defer span.End() - s, err := v.node.GetSyncStatus(ctx, &emptypb.Empty{}) + s, err := v.nodeClient.GetSyncStatus(ctx, &emptypb.Empty{}) if err != nil { return errors.Wrap(iface.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error()) } @@ -316,7 +316,7 @@ func (v *validator) WaitForSync(ctx context.Context) error { select { // Poll every half slot. case <-time.After(slots.DivideSlotBy(2 /* twice per slot */)): - s, err := v.node.GetSyncStatus(ctx, &emptypb.Empty{}) + s, err := v.nodeClient.GetSyncStatus(ctx, &emptypb.Empty{}) if err != nil { return errors.Wrap(iface.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error()) } @@ -330,7 +330,7 @@ func (v *validator) WaitForSync(ctx context.Context) error { } } -// ReceiveSlots starts a gRPC client stream listener to obtain +// ReceiveSlots starts a stream listener to obtain // slots from the beacon node when it imports a block. Upon receiving a slot, the service // broadcasts it to a feed for other usages to subscribe to. func (v *validator) ReceiveSlots(ctx context.Context, connectionErrorChannel chan<- error) { @@ -348,13 +348,14 @@ func (v *validator) ReceiveSlots(ctx context.Context, connectionErrorChannel cha } res, err := stream.Recv() if err != nil { - log.WithError(err).Error("Could not receive slots from beacon node, " + iface.ErrConnectionIssue.Error()) + log.WithError(err).Error("Could not receive slots from beacon node: " + iface.ErrConnectionIssue.Error()) connectionErrorChannel <- errors.Wrap(iface.ErrConnectionIssue, err.Error()) return } if res == nil { continue } + log.Error("Setting highest slot") v.setHighestSlot(res.Slot) } } @@ -1042,6 +1043,18 @@ func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKey return nil } +func (v *validator) StartEventStream(ctx context.Context) error { + return v.validatorClient.StartEventStream(ctx) +} + +func (v *validator) EventStreamIsRunning() bool { + return v.validatorClient.EventStreamIsRunning() +} + +func (v *validator) NodeIsHealthy(ctx context.Context) bool { + return v.nodeClient.IsHealthy(ctx) +} + func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) { filteredKeys := make([][fieldparams.BLSPubkeyLength]byte, 0) statusRequestKeys := make([][]byte, 0) diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 55aea89d7c3c..458ede6b7842 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -394,7 +394,7 @@ func TestWaitSync_ContextCanceled(t *testing.T) { n := validatormock.NewMockNodeClient(ctrl) v := validator{ - node: n, + nodeClient: n, } ctx, cancel := context.WithCancel(context.Background()) @@ -414,7 +414,7 @@ func TestWaitSync_NotSyncing(t *testing.T) { n := validatormock.NewMockNodeClient(ctrl) v := validator{ - node: n, + nodeClient: n, } n.EXPECT().GetSyncStatus( @@ -431,7 +431,7 @@ func TestWaitSync_Syncing(t *testing.T) { n := validatormock.NewMockNodeClient(ctrl) v := validator{ - node: n, + nodeClient: n, } n.EXPECT().GetSyncStatus( @@ -1304,7 +1304,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { v := validator{ validatorClient: client, - node: nodeClient, + nodeClient: nodeClient, db: db, pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), @@ -1386,7 +1386,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { v := validator{ validatorClient: client, - node: nodeClient, + nodeClient: nodeClient, db: db, pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), @@ -1464,7 +1464,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { v := validator{ validatorClient: client, - node: nodeClient, + nodeClient: nodeClient, db: db, pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), @@ -1525,7 +1525,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { v := validator{ validatorClient: client, - node: nodeClient, + nodeClient: nodeClient, db: db, pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), @@ -1593,7 +1593,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { v := validator{ validatorClient: client, - node: nodeClient, + nodeClient: nodeClient, db: db, pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), @@ -1657,7 +1657,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { v := validator{ validatorClient: client, - node: nodeClient, + nodeClient: nodeClient, db: db, pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), @@ -1752,7 +1752,7 @@ func TestValidator_PushProposerSettings(t *testing.T) { validatorSetter: func(t *testing.T) *validator { v := validator{ validatorClient: client, - node: nodeClient, + nodeClient: nodeClient, db: db, pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), diff --git a/validator/node/BUILD.bazel b/validator/node/BUILD.bazel index 90baedd49f56..147fbdbf98ee 100644 --- a/validator/node/BUILD.bazel +++ b/validator/node/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//validator:__subpackages__", ], deps = [ + "//api:go_default_library", "//api/gateway:go_default_library", "//api/server:go_default_library", "//async/event:go_default_library", diff --git a/validator/node/node.go b/validator/node/node.go index 111e68dcdddc..dc33784abf3f 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -26,6 +26,7 @@ import ( gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/pkg/errors" fastssz "github.com/prysmaticlabs/fastssz" + "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/api/gateway" "github.com/prysmaticlabs/prysm/v4/api/server" "github.com/prysmaticlabs/prysm/v4/async/event" @@ -865,7 +866,7 @@ func (c *ValidatorClient) registerRPCGatewayService(router *mux.Router) error { }, }), gwruntime.WithMarshalerOption( - "text/event-stream", &gwruntime.EventSourceJSONPb{}, // TODO: remove this + api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{}, // TODO: remove this ), gwruntime.WithForwardResponseOption(gateway.HttpResponseModifier), ) diff --git a/validator/rpc/handlers_health.go b/validator/rpc/handlers_health.go index 4ab87bc87d05..4501d71b90c5 100644 --- a/validator/rpc/handlers_health.go +++ b/validator/rpc/handlers_health.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/network/httputil" pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime/version" @@ -39,9 +40,9 @@ func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) { ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs") defer span.End() // Set up SSE response headers - w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Content-Type", api.EventStreamMediaType) w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") + w.Header().Set("Connection", api.KeepAlive) // Flush helper function to ensure data is sent to client flusher, ok := w.(http.Flusher) @@ -108,9 +109,9 @@ func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) { close(ch) }() // Set up SSE response headers - w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Content-Type", api.EventStreamMediaType) w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") + w.Header().Set("Connection", api.KeepAlive) recentLogs := s.logsStreamer.GetLastFewLogs() logStrings := make([]string, len(recentLogs)) diff --git a/validator/rpc/handlers_health_test.go b/validator/rpc/handlers_health_test.go index 7be1938bc55b..d6e9c4843f83 100644 --- a/validator/rpc/handlers_health_test.go +++ b/validator/rpc/handlers_health_test.go @@ -11,6 +11,7 @@ import ( "github.com/golang/mock/gomock" "github.com/golang/protobuf/ptypes/empty" + "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/io/logs/mock" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -92,10 +93,10 @@ func TestStreamBeaconLogs(t *testing.T) { } ct, ok := resp.Header["Content-Type"] require.Equal(t, ok, true) - require.Equal(t, ct[0], "text/event-stream") + require.Equal(t, ct[0], api.EventStreamMediaType) cn, ok := resp.Header["Connection"] require.Equal(t, ok, true) - require.Equal(t, cn[0], "keep-alive") + require.Equal(t, cn[0], api.KeepAlive) body, err := io.ReadAll(resp.Body) require.NoError(t, err) require.NotNil(t, body) @@ -143,10 +144,10 @@ func TestStreamValidatorLogs(t *testing.T) { } ct, ok := resp.Header["Content-Type"] require.Equal(t, ok, true) - require.Equal(t, ct[0], "text/event-stream") + require.Equal(t, ct[0], api.EventStreamMediaType) cn, ok := resp.Header["Connection"] require.Equal(t, ok, true) - require.Equal(t, cn[0], "keep-alive") + require.Equal(t, cn[0], api.KeepAlive) // Check if data was written body, err := io.ReadAll(resp.Body) require.NoError(t, err)