Skip to content

Commit

Permalink
Merge branch 'develop' into graffiti-keymanager-api
Browse files Browse the repository at this point in the history
  • Loading branch information
james-prysm authored Jan 18, 2024
2 parents 8fa12b4 + fc05e30 commit 33fbd8f
Show file tree
Hide file tree
Showing 39 changed files with 693 additions and 114 deletions.
2 changes: 2 additions & 0 deletions api/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ const (
ConsensusBlockValueHeader = "Eth-Consensus-Block-Value"
JsonMediaType = "application/json"
OctetStreamMediaType = "application/octet-stream"
EventStreamMediaType = "text/event-stream"
KeepAlive = "keep-alive"
)
1 change: 1 addition & 0 deletions beacon-chain/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/gateway",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//api:go_default_library",
"//api/gateway:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/gateway/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gateway

import (
gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/api/gateway"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
ethpbalpha "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -40,7 +41,7 @@ func DefaultConfig(enableDebugRPCEndpoints bool, httpModules string) MuxConfig {
},
}),
gwruntime.WithMarshalerOption(
"text/event-stream", &gwruntime.EventSourceJSONPb{},
api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{},
),
)
v1AlphaPbHandler = &gateway.PbMux{
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ go_library(
"structs.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events",
visibility = ["//beacon-chain:__subpackages__"],
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
Expand All @@ -18,6 +19,7 @@ go_library(
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
"//config/params:go_default_library",
"//network/httputil:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/eth/v2:go_default_library",
Expand Down
24 changes: 19 additions & 5 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"encoding/json"
"fmt"
"net/http"
time2 "time"

"github.com/ethereum/go-ethereum/common/hexutil"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"

"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
Expand All @@ -18,11 +17,14 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/network/httputil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
ethpbv2 "github.com/prysmaticlabs/prysm/v4/proto/eth/v2"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

const (
Expand Down Expand Up @@ -114,16 +116,24 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
defer stateSub.Unsubscribe()

// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", api.EventStreamMediaType)
w.Header().Set("Connection", api.KeepAlive)

// Handle each event received and context cancellation.
// We send a keepalive dummy message immediately to prevent clients
// stalling while waiting for the first response chunk.
// After that we send a keepalive dummy message every SECONDS_PER_SLOT
// to prevent anyone (e.g. proxy servers) from closing connections.
sendKeepalive(w, flusher)
keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second)
for {
select {
case event := <-opsChan:
handleBlockOperationEvents(w, flusher, topicsMap, event)
case event := <-stateChan:
s.handleStateEvents(ctx, w, flusher, topicsMap, event)
case <-keepaliveTicker.C:
sendKeepalive(w, flusher)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -505,6 +515,10 @@ func send(w http.ResponseWriter, flusher http.Flusher, name string, data interfa
write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j))
}

func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) {
write(w, flusher, ":\n\n")
}

func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) {
_, err := fmt.Fprintf(w, format, a...)
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions beacon-chain/rpc/eth/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
})
}

const operationsResult = `event: attestation
const operationsResult = `:
event: attestation
data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}
event: attestation
Expand All @@ -401,7 +403,9 @@ data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_roo
`

const stateResult = `event: head
const stateResult = `:
event: head
data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"}
event: finalized_checkpoint
Expand All @@ -415,17 +419,23 @@ data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a1
`

const payloadAttributesBellatrixResult = `event: payload_attributes
const payloadAttributesBellatrixResult = `:
event: payload_attributes
data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}}
`

const payloadAttributesCapellaResult = `event: payload_attributes
const payloadAttributesCapellaResult = `:
event: payload_attributes
data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}}
`

const payloadAttributesDenebResult = `event: payload_attributes
const payloadAttributesDenebResult = `:
event: payload_attributes
data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}}
`
7 changes: 0 additions & 7 deletions beacon-chain/sync/pending_attestations_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
seen := s.seenPendingBlocks[bRoot]
s.pendingQueueLock.RUnlock()
if !seen {
// Pending attestation's missing block has not arrived yet.
log.WithFields(logrus.Fields{
"currentSlot": s.cfg.clock.CurrentSlot(),
"attSlot": attestations[0].Message.Aggregate.Data.Slot,
"attCount": len(attestations),
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
}).Debug("Requesting block for pending attestation")
pendingRoots = append(pendingRoots, bRoot)
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/pending_attestations_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
a := &ethpb.AggregateAttestationAndProof{Aggregate: &ethpb.Attestation{Data: &ethpb.AttestationData{Target: &ethpb.Checkpoint{Root: make([]byte, 32)}}}}
r.blkRootToPendingAtts[[32]byte{'A'}] = []*ethpb.SignedAggregateAttestationAndProof{{Message: a}}
require.NoError(t, r.processPendingAtts(context.Background()))
require.LogsContain(t, hook, "Requesting block for pending attestation")
require.LogsContain(t, hook, "Requesting block by root")
}

func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
// Request parent block if not in the pending queue and not in the database.
isParentBlockInDB := s.cfg.beaconDB.HasBlock(ctx, parentRoot)
if !inPendingQueue && !isParentBlockInDB && s.hasPeer() {
log.WithFields(logrus.Fields{"currentSlot": b.Block().Slot(), "parentRoot": hex.EncodeToString(parentRoot[:])}).Debug("Requesting parent block")
parentRoots = append(parentRoots, parentRoot)
continue
}
Expand Down Expand Up @@ -283,6 +282,8 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
r := roots[i]
if s.seenPendingBlocks[r] || s.cfg.chain.BlockBeingSynced(r) {
roots = append(roots[:i], roots[i+1:]...)
} else {
log.WithField("blockRoot", fmt.Sprintf("%#x", r)).Debug("Requesting block by root")
}
}
s.pendingQueueLock.RUnlock()
Expand Down
21 changes: 19 additions & 2 deletions encoding/ssz/detect/configfork.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,23 @@ func FromState(marshaled []byte) (*VersionedUnmarshaler, error) {
return FromForkVersion(cv)
}

// FromBlock uses the known size of an offset and signature to determine the slot of a block without unmarshalling it.
// The slot is used to determine the version along with the respective config.
func FromBlock(marshaled []byte) (*VersionedUnmarshaler, error) {
slot, err := slotFromBlock(marshaled)
if err != nil {
return nil, err
}
copiedCfg := params.BeaconConfig().Copy()
epoch := slots.ToEpoch(slot)
fs := forks.NewOrderedSchedule(copiedCfg)
ver, err := fs.VersionForEpoch(epoch)
if err != nil {
return nil, err
}
return FromForkVersion(ver)
}

var ErrForkNotFound = errors.New("version found in fork schedule but can't be matched to a named fork")

// FromForkVersion uses a lookup table to resolve a Version (from a beacon node api for instance, or obtained by peeking at
Expand Down Expand Up @@ -162,7 +179,7 @@ var errBlockForkMismatch = errors.New("fork or config detected in unmarshaler is

// UnmarshalBeaconBlock uses internal knowledge in the VersionedUnmarshaler to pick the right concrete ReadOnlySignedBeaconBlock type,
// then Unmarshal()s the type and returns an instance of block.ReadOnlySignedBeaconBlock if successful.
func (cf *VersionedUnmarshaler) UnmarshalBeaconBlock(marshaled []byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
func (cf *VersionedUnmarshaler) UnmarshalBeaconBlock(marshaled []byte) (interfaces.SignedBeaconBlock, error) {
slot, err := slotFromBlock(marshaled)
if err != nil {
return nil, err
Expand Down Expand Up @@ -197,7 +214,7 @@ func (cf *VersionedUnmarshaler) UnmarshalBeaconBlock(marshaled []byte) (interfac
// UnmarshalBlindedBeaconBlock uses internal knowledge in the VersionedUnmarshaler to pick the right concrete blinded ReadOnlySignedBeaconBlock type,
// then Unmarshal()s the type and returns an instance of block.ReadOnlySignedBeaconBlock if successful.
// For Phase0 and Altair it works exactly line UnmarshalBeaconBlock.
func (cf *VersionedUnmarshaler) UnmarshalBlindedBeaconBlock(marshaled []byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
func (cf *VersionedUnmarshaler) UnmarshalBlindedBeaconBlock(marshaled []byte) (interfaces.SignedBeaconBlock, error) {
slot, err := slotFromBlock(marshaled)
if err != nil {
return nil, err
Expand Down
91 changes: 91 additions & 0 deletions encoding/ssz/detect/configfork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,97 @@ func TestUnmarshalState(t *testing.T) {
}
}

func TestDetectAndUnmarshalBlock(t *testing.T) {
undo := util.HackDenebMaxuint(t)
defer undo()
altairS, err := slots.EpochStart(params.BeaconConfig().AltairForkEpoch)
require.NoError(t, err)
bellaS, err := slots.EpochStart(params.BeaconConfig().BellatrixForkEpoch)
require.NoError(t, err)
capellaS, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
require.NoError(t, err)
denebS, err := slots.EpochStart(params.BeaconConfig().DenebForkEpoch)
require.NoError(t, err)
cases := []struct {
b func(*testing.T, primitives.Slot) interfaces.ReadOnlySignedBeaconBlock
name string
slot primitives.Slot
errExists bool
}{
{
name: "genesis - slot 0",
b: signedTestBlockGenesis,
},
{
name: "last slot of phase 0",
b: signedTestBlockGenesis,
slot: altairS - 1,
},
{
name: "first slot of altair",
b: signedTestBlockAltair,
slot: altairS,
},
{
name: "last slot of altair",
b: signedTestBlockAltair,
slot: bellaS - 1,
},
{
name: "first slot of bellatrix",
b: signedTestBlockBellatrix,
slot: bellaS,
},
{
name: "first slot of capella",
b: signedTestBlockCapella,
slot: capellaS,
},
{
name: "last slot of capella",
b: signedTestBlockCapella,
slot: denebS - 1,
},
{
name: "first slot of deneb",
b: signedTestBlockDeneb,
slot: denebS,
},
{
name: "bellatrix block in altair slot",
b: signedTestBlockBellatrix,
slot: bellaS - 1,
errExists: true,
},
{
name: "genesis block in altair slot",
b: signedTestBlockGenesis,
slot: bellaS - 1,
errExists: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := c.b(t, c.slot)
marshaled, err := b.MarshalSSZ()
require.NoError(t, err)
cf, err := FromBlock(marshaled)
require.NoError(t, err)
bcf, err := cf.UnmarshalBeaconBlock(marshaled)
if c.errExists {
require.NotNil(t, err)
return
}
require.NoError(t, err)
expected, err := b.Block().HashTreeRoot()
require.NoError(t, err)
actual, err := bcf.Block().HashTreeRoot()
require.NoError(t, err)
require.Equal(t, expected, actual)
})
}
}

func TestUnmarshalBlock(t *testing.T) {
undo := util.HackDenebMaxuint(t)
defer undo()
Expand Down
14 changes: 14 additions & 0 deletions testing/validator-mock/node_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 33fbd8f

Please sign in to comment.