Skip to content

Commit

Permalink
REST VC: Subscribe to Beacon API events (#13453)
Browse files Browse the repository at this point in the history
* Revert "Revert "REST VC: Subscribe to Beacon API events  (#13354)" (#13428)"

This reverts commit 8d092a1.

* change logic

* review

* test fix

* fix critical error

* merge flag check

* change error msg

* return on errors

---------

Co-authored-by: james-prysm <[email protected]>
  • Loading branch information
rkapka and james-prysm authored Jan 18, 2024
1 parent f3ef1b6 commit 204de13
Show file tree
Hide file tree
Showing 32 changed files with 502 additions and 83 deletions.
2 changes: 2 additions & 0 deletions api/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ const (
ConsensusBlockValueHeader = "Eth-Consensus-Block-Value"
JsonMediaType = "application/json"
OctetStreamMediaType = "application/octet-stream"
EventStreamMediaType = "text/event-stream"
KeepAlive = "keep-alive"
)
1 change: 1 addition & 0 deletions beacon-chain/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/gateway",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//api:go_default_library",
"//api/gateway:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/gateway/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gateway

import (
gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/api/gateway"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
ethpbalpha "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -40,7 +41,7 @@ func DefaultConfig(enableDebugRPCEndpoints bool, httpModules string) MuxConfig {
},
}),
gwruntime.WithMarshalerOption(
"text/event-stream", &gwruntime.EventSourceJSONPb{},
api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{},
),
)
v1AlphaPbHandler = &gateway.PbMux{
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ go_library(
"structs.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events",
visibility = ["//beacon-chain:__subpackages__"],
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
Expand All @@ -18,6 +19,7 @@ go_library(
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
"//config/params:go_default_library",
"//network/httputil:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/eth/v2:go_default_library",
Expand Down
24 changes: 19 additions & 5 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"encoding/json"
"fmt"
"net/http"
time2 "time"

"github.com/ethereum/go-ethereum/common/hexutil"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"

"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
Expand All @@ -18,11 +17,14 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/network/httputil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
ethpbv2 "github.com/prysmaticlabs/prysm/v4/proto/eth/v2"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

const (
Expand Down Expand Up @@ -114,16 +116,24 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
defer stateSub.Unsubscribe()

// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", api.EventStreamMediaType)
w.Header().Set("Connection", api.KeepAlive)

// Handle each event received and context cancellation.
// We send a keepalive dummy message immediately to prevent clients
// stalling while waiting for the first response chunk.
// After that we send a keepalive dummy message every SECONDS_PER_SLOT
// to prevent anyone (e.g. proxy servers) from closing connections.
sendKeepalive(w, flusher)
keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second)
for {
select {
case event := <-opsChan:
handleBlockOperationEvents(w, flusher, topicsMap, event)
case event := <-stateChan:
s.handleStateEvents(ctx, w, flusher, topicsMap, event)
case <-keepaliveTicker.C:
sendKeepalive(w, flusher)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -505,6 +515,10 @@ func send(w http.ResponseWriter, flusher http.Flusher, name string, data interfa
write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j))
}

func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) {
write(w, flusher, ":\n\n")
}

func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) {
_, err := fmt.Fprintf(w, format, a...)
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions beacon-chain/rpc/eth/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
})
}

const operationsResult = `event: attestation
const operationsResult = `:
event: attestation
data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}
event: attestation
Expand All @@ -401,7 +403,9 @@ data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_roo
`

const stateResult = `event: head
const stateResult = `:
event: head
data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"}
event: finalized_checkpoint
Expand All @@ -415,17 +419,23 @@ data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a1
`

const payloadAttributesBellatrixResult = `event: payload_attributes
const payloadAttributesBellatrixResult = `:
event: payload_attributes
data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}}
`

const payloadAttributesCapellaResult = `event: payload_attributes
const payloadAttributesCapellaResult = `:
event: payload_attributes
data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}}
`

const payloadAttributesDenebResult = `event: payload_attributes
const payloadAttributesDenebResult = `:
event: payload_attributes
data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}}
`
14 changes: 14 additions & 0 deletions testing/validator-mock/node_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions testing/validator-mock/validator_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions validator/accounts/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,15 @@ func (m *Validator) SetProposerSettings(_ context.Context, settings *validatorse
m.proposerSettings = settings
return nil
}

func (_ *Validator) StartEventStream(_ context.Context) error {
panic("implement me")
}

func (_ *Validator) EventStreamIsRunning() bool {
panic("implement me")
}

func (_ *Validator) NodeIsHealthy(ctx context.Context) bool {
panic("implement me")
}
4 changes: 4 additions & 0 deletions validator/client/beacon-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"domain_data.go",
"doppelganger.go",
"duties.go",
"event_handler.go",
"genesis.go",
"get_beacon_block.go",
"index.go",
Expand Down Expand Up @@ -43,6 +44,7 @@ go_library(
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/rpc/eth/beacon:go_default_library",
"//beacon-chain/rpc/eth/config:go_default_library",
"//beacon-chain/rpc/eth/events:go_default_library",
"//beacon-chain/rpc/eth/node:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
"//beacon-chain/rpc/eth/validator:go_default_library",
Expand Down Expand Up @@ -83,6 +85,7 @@ go_test(
"domain_data_test.go",
"doppelganger_test.go",
"duties_test.go",
"event_handler_test.go",
"genesis_test.go",
"get_beacon_block_test.go",
"index_test.go",
Expand Down Expand Up @@ -139,6 +142,7 @@ go_test(
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
],
Expand Down
4 changes: 4 additions & 0 deletions validator/client/beacon-api/beacon_api_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (c *beaconApiNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*
panic("beaconApiNodeClient.ListPeers is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiNodeClientWithFallback.")
}

func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool {
return c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil) == nil
}

func NewNodeClientWithFallback(jsonRestHandler JsonRestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
return &beaconApiNodeClient{
jsonRestHandler: jsonRestHandler,
Expand Down
30 changes: 28 additions & 2 deletions validator/client/beacon-api/beacon_api_validator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,26 @@ import (
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
)

type ValidatorClientOpt func(*beaconApiValidatorClient)

func WithEventHandler(h *EventHandler) ValidatorClientOpt {
return func(c *beaconApiValidatorClient) {
c.eventHandler = h
}
}

type beaconApiValidatorClient struct {
genesisProvider GenesisProvider
dutiesProvider dutiesProvider
stateValidatorsProvider StateValidatorsProvider
jsonRestHandler JsonRestHandler
eventHandler *EventHandler
beaconBlockConverter BeaconBlockConverter
prysmBeaconChainCLient iface.PrysmBeaconChainClient
}

func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler) iface.ValidatorClient {
return &beaconApiValidatorClient{
func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
c := &beaconApiValidatorClient{
genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler},
stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler},
Expand All @@ -34,6 +43,10 @@ func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler) iface.Validato
jsonRestHandler: jsonRestHandler,
},
}
for _, o := range opts {
o(c)
}
return c
}

func (c *beaconApiValidatorClient) GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
Expand Down Expand Up @@ -149,3 +162,16 @@ func (c *beaconApiValidatorClient) WaitForActivation(ctx context.Context, in *et
func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *empty.Empty) (*ethpb.ChainStartResponse, error) {
return c.waitForChainStart(ctx)
}

func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error {
if c.eventHandler != nil {
if err := c.eventHandler.get(ctx, []string{"head"}); err != nil {
return errors.Wrapf(err, "could not invoke event handler")
}
}
return nil
}

func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
return c.eventHandler.running
}
Loading

0 comments on commit 204de13

Please sign in to comment.