Skip to content

Commit

Permalink
HTTP implementation of /eth/v1/beacon/pool/sync_committees (#12782)
Browse files Browse the repository at this point in the history
* impl

* protos

* tests

* register endpoint

* test fix

* test fix

* remove path

* more test fixes

* cleanup

* bzl

---------

Co-authored-by: james-prysm <[email protected]>
  • Loading branch information
rkapka and james-prysm authored Sep 1, 2023
1 parent 3c98c27 commit 34f507f
Show file tree
Hide file tree
Showing 25 changed files with 449 additions and 3,006 deletions.
22 changes: 0 additions & 22 deletions beacon-chain/rpc/apimiddleware/custom_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,6 @@ func wrapValidatorIndicesArray(
return true, nil
}

// https://ethereum.github.io/beacon-APIs/#/Beacon/submitPoolSyncCommitteeSignatures expects posting a top-level array.
// We make it more proto-friendly by wrapping it in a struct with a 'data' field.
func wrapSyncCommitteeSignaturesArray(
endpoint *apimiddleware.Endpoint,
_ http.ResponseWriter,
req *http.Request,
) (apimiddleware.RunDefault, apimiddleware.ErrorJson) {
if _, ok := endpoint.PostRequest.(*SubmitSyncCommitteeSignaturesRequestJson); ok {
data := make([]*SyncCommitteeMessageJson, 0)
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
return false, apimiddleware.InternalServerErrorWithMessage(err, "could not decode body")
}
j := &SubmitSyncCommitteeSignaturesRequestJson{Data: data}
b, err := json.Marshal(j)
if err != nil {
return false, apimiddleware.InternalServerErrorWithMessage(err, "could not marshal wrapped body")
}
req.Body = io.NopCloser(bytes.NewReader(b))
}
return true, nil
}

type phase0PublishBlockRequestJson struct {
Phase0Block *SignedBeaconBlockJson `json:"phase0_block"`
}
Expand Down
48 changes: 0 additions & 48 deletions beacon-chain/rpc/apimiddleware/custom_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,54 +100,6 @@ func TestWrapBLSChangesArray(t *testing.T) {
})
}

func TestWrapSyncCommitteeSignaturesArray(t *testing.T) {
t.Run("ok", func(t *testing.T) {
endpoint := &apimiddleware.Endpoint{
PostRequest: &SubmitSyncCommitteeSignaturesRequestJson{},
}
unwrappedSigs := []*SyncCommitteeMessageJson{{
Slot: "1",
BeaconBlockRoot: "root",
ValidatorIndex: "1",
Signature: "sig",
}}
unwrappedSigsJson, err := json.Marshal(unwrappedSigs)
require.NoError(t, err)

var body bytes.Buffer
_, err = body.Write(unwrappedSigsJson)
require.NoError(t, err)
request := httptest.NewRequest("POST", "http://foo.example", &body)

runDefault, errJson := wrapSyncCommitteeSignaturesArray(endpoint, nil, request)
require.Equal(t, true, errJson == nil)
assert.Equal(t, apimiddleware.RunDefault(true), runDefault)
wrappedSigs := &SubmitSyncCommitteeSignaturesRequestJson{}
require.NoError(t, json.NewDecoder(request.Body).Decode(wrappedSigs))
require.Equal(t, 1, len(wrappedSigs.Data), "wrong number of wrapped items")
assert.Equal(t, "1", wrappedSigs.Data[0].Slot)
assert.Equal(t, "root", wrappedSigs.Data[0].BeaconBlockRoot)
assert.Equal(t, "1", wrappedSigs.Data[0].ValidatorIndex)
assert.Equal(t, "sig", wrappedSigs.Data[0].Signature)
})

t.Run("invalid_body", func(t *testing.T) {
endpoint := &apimiddleware.Endpoint{
PostRequest: &SubmitSyncCommitteeSignaturesRequestJson{},
}
var body bytes.Buffer
_, err := body.Write([]byte("invalid"))
require.NoError(t, err)
request := httptest.NewRequest("POST", "http://foo.example", &body)

runDefault, errJson := wrapSyncCommitteeSignaturesArray(endpoint, nil, request)
require.Equal(t, false, errJson == nil)
assert.Equal(t, apimiddleware.RunDefault(false), runDefault)
assert.Equal(t, true, strings.Contains(errJson.Msg(), "could not decode body"))
assert.Equal(t, http.StatusInternalServerError, errJson.StatusCode())
})
}

func TestSetInitialPublishBlockPostRequest(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig().Copy()
Expand Down
7 changes: 0 additions & 7 deletions beacon-chain/rpc/apimiddleware/endpoint_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func (_ *BeaconEndpointFactory) Paths() []string {
"/eth/v1/beacon/pool/attester_slashings",
"/eth/v1/beacon/pool/proposer_slashings",
"/eth/v1/beacon/pool/bls_to_execution_changes",
"/eth/v1/beacon/pool/sync_committees",
"/eth/v1/beacon/pool/bls_to_execution_changes",
"/eth/v1/beacon/weak_subjectivity",
"/eth/v1/node/identity",
Expand Down Expand Up @@ -144,12 +143,6 @@ func (_ *BeaconEndpointFactory) Create(path string) (*apimiddleware.Endpoint, er
endpoint.Hooks = apimiddleware.HookCollection{
OnPreDeserializeRequestBodyIntoContainer: wrapBLSChangesArray,
}
case "/eth/v1/beacon/pool/sync_committees":
endpoint.PostRequest = &SubmitSyncCommitteeSignaturesRequestJson{}
endpoint.Err = &IndexedVerificationFailureErrorJson{}
endpoint.Hooks = apimiddleware.HookCollection{
OnPreDeserializeRequestBodyIntoContainer: wrapSyncCommitteeSignaturesArray,
}
case "/eth/v1/beacon/weak_subjectivity":
endpoint.GetResponse = &WeakSubjectivityResponse{}
case "/eth/v1/node/identity":
Expand Down
11 changes: 0 additions & 11 deletions beacon-chain/rpc/apimiddleware/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ type ProposerSlashingsPoolResponseJson struct {
Data []*ProposerSlashingJson `json:"data"`
}

type SubmitSyncCommitteeSignaturesRequestJson struct {
Data []*SyncCommitteeMessageJson `json:"data"`
}

type BLSToExecutionChangesPoolResponseJson struct {
Data []*SignedBLSToExecutionChangeJson `json:"data"`
}
Expand Down Expand Up @@ -841,13 +837,6 @@ type VoluntaryExitJson struct {
ValidatorIndex string `json:"validator_index"`
}

type SyncCommitteeMessageJson struct {
Slot string `json:"slot"`
BeaconBlockRoot string `json:"beacon_block_root" hex:"true"`
ValidatorIndex string `json:"validator_index"`
Signature string `json:"signature" hex:"true"`
}

type IdentityJson struct {
PeerId string `json:"peer_id"`
Enr string `json:"enr"`
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ type Service struct {
OperationNotifier opfeed.Notifier
AttestationCache *cache.AttestationCache
StateGen stategen.StateManager
P2P p2p.Broadcaster
}
27 changes: 27 additions & 0 deletions beacon-chain/rpc/core/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,30 @@ func (s *Service) GetAttestationData(
}
return res, nil
}

// SubmitSyncMessage submits the sync committee message to the network.
// It also saves the sync committee message into the pending pool for block inclusion.
func (s *Service) SubmitSyncMessage(ctx context.Context, msg *ethpb.SyncCommitteeMessage) error {
errs, ctx := errgroup.WithContext(ctx)

headSyncCommitteeIndices, err := s.HeadFetcher.HeadSyncCommitteeIndices(ctx, msg.ValidatorIndex, msg.Slot)
if err != nil {
return err
}
// Broadcasting and saving message into the pool in parallel. As one fail should not affect another.
// This broadcasts for all subnets.
for _, index := range headSyncCommitteeIndices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
errs.Go(func() error {
return s.P2P.BroadcastSyncCommitteeMessage(ctx, subnet, msg)
})
}

if err := s.SyncCommitteePool.SaveSyncCommitteeMessage(msg); err != nil {
return err
}

// Wait for p2p broadcast to complete and return the first error (if any)
return errs.Wait()
}
7 changes: 2 additions & 5 deletions beacon-chain/rpc/eth/beacon/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/rpc/eth/helpers:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
"//beacon-chain/rpc/lookup:go_default_library",
Expand All @@ -58,7 +59,6 @@ go_library(
"//encoding/ssz/detect:go_default_library",
"//network/forks:go_default_library",
"//network/http:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/eth/v2:go_default_library",
"//proto/migration:go_default_library",
Expand All @@ -70,7 +70,6 @@ go_library(
"@com_github_gorilla_mux//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_wealdtech_go_bytesutil//:go_default_library",
"@io_bazel_rules_go//proto/wkt:empty_go_proto",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
Expand Down Expand Up @@ -100,7 +99,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//api:go_default_library",
"//api/grpc:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
Expand All @@ -116,11 +114,11 @@ go_test(
"//beacon-chain/operations/voluntaryexits/mock:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/rpc/apimiddleware:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/rpc/eth/helpers:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
"//beacon-chain/rpc/eth/shared/testing:go_default_library",
"//beacon-chain/rpc/lookup:go_default_library",
"//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library",
"//beacon-chain/rpc/testutil:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
Expand Down Expand Up @@ -156,7 +154,6 @@ go_test(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_stretchr_testify//mock:go_default_library",
"@com_github_wealdtech_go_bytesutil//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
Expand Down
55 changes: 53 additions & 2 deletions beacon-chain/rpc/eth/beacon/handlers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Server) ListAttestations(w http.ResponseWriter, r *http.Request) {
http2.WriteJson(w, &ListAttestationsResponse{Data: filteredAtts})
}

// SubmitAttestations submits an Attestation object to node. If the attestation passes all validation
// SubmitAttestations submits an attestation object to node. If the attestation passes all validation
// constraints, node MUST publish the attestation on an appropriate subnet.
func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestations")
Expand Down Expand Up @@ -193,7 +193,7 @@ func (s *Server) ListVoluntaryExits(w http.ResponseWriter, r *http.Request) {
http2.WriteJson(w, &ListVoluntaryExitsResponse{Data: exits})
}

// SubmitVoluntaryExit submits SignedVoluntaryExit object to node's pool
// SubmitVoluntaryExit submits a SignedVoluntaryExit object to node's pool
// and if passes validation node MUST broadcast it to network.
func (s *Server) SubmitVoluntaryExit(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitVoluntaryExit")
Expand Down Expand Up @@ -256,3 +256,54 @@ func (s *Server) SubmitVoluntaryExit(w http.ResponseWriter, r *http.Request) {
return
}
}

// SubmitSyncCommitteeSignatures submits sync committee signature objects to the node.
func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
defer span.End()

var req SubmitSyncCommitteeSignaturesRequest
err := json.NewDecoder(r.Body).Decode(&req.Data)
switch {
case err == io.EOF:
http2.HandleError(w, "No data submitted", http.StatusBadRequest)
return
case err != nil:
http2.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
return
}
if len(req.Data) == 0 {
http2.HandleError(w, "No data submitted", http.StatusBadRequest)
return
}

var validMessages []*ethpbalpha.SyncCommitteeMessage
var msgFailures []*shared.IndexedVerificationFailure
for i, sourceMsg := range req.Data {
msg, err := sourceMsg.ToConsensus()
if err != nil {
msgFailures = append(msgFailures, &shared.IndexedVerificationFailure{
Index: i,
Message: "Could not convert request message to consensus message: " + err.Error(),
})
continue
}
validMessages = append(validMessages, msg)
}

for _, msg := range validMessages {
if err = s.CoreService.SubmitSyncMessage(ctx, msg); err != nil {
http2.HandleError(w, "Could not submit message: "+err.Error(), http.StatusInternalServerError)
return
}
}

if len(msgFailures) > 0 {
failuresErr := &shared.IndexedVerificationFailureError{
Code: http.StatusBadRequest,
Message: "One or more messages failed validation",
Failures: msgFailures,
}
http2.WriteError(w, failuresErr)
}
}
Loading

0 comments on commit 34f507f

Please sign in to comment.