Skip to content

Commit

Permalink
Validator HTTP endpoints (#13167)
Browse files Browse the repository at this point in the history
* HTTP validator endpoints

* Sammy's review

* capitalize errors

* test fix

---------

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
rkapka and prylabs-bulldozer[bot] authored Nov 3, 2023
1 parent 0f65e51 commit 1f250f7
Show file tree
Hide file tree
Showing 19 changed files with 694 additions and 3,226 deletions.
1 change: 0 additions & 1 deletion beacon-chain/gateway/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func DefaultConfig(enableDebugRPCEndpoints bool, httpModules string) MuxConfig {
if flags.EnableHTTPEthAPI(httpModules) {
ethRegistrations := []gateway.PbHandlerRegistration{
ethpbservice.RegisterBeaconChainHandler,
ethpbservice.RegisterBeaconValidatorHandler,
ethpbservice.RegisterEventsHandler,
}
ethMux := gwruntime.NewServeMux(
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/gateway/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestDefaultConfig(t *testing.T) {
assert.NotNil(t, cfg.EthPbMux.Mux)
require.Equal(t, 2, len(cfg.EthPbMux.Patterns))
assert.Equal(t, "/internal/eth/v1/", cfg.EthPbMux.Patterns[0])
assert.Equal(t, 3, len(cfg.EthPbMux.Registrations))
assert.Equal(t, 2, len(cfg.EthPbMux.Registrations))
assert.Equal(t, (*gateway.PbMux)(nil), cfg.V1AlphaPbMux)
})
t.Run("Without Eth API", func(t *testing.T) {
Expand Down
8 changes: 0 additions & 8 deletions beacon-chain/rpc/apimiddleware/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/apimiddleware",
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/gateway/apimiddleware:go_default_library",
"//api/grpc:go_default_library",
"//beacon-chain/rpc/eth/events:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//network/http:go_default_library",
"//proto/eth/v2:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand All @@ -37,16 +33,12 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//api:go_default_library",
"//api/gateway/apimiddleware:go_default_library",
"//api/grpc:go_default_library",
"//beacon-chain/rpc/eth/events:go_default_library",
"//config/params:go_default_library",
"//proto/eth/v2:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//time/slots:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_r3labs_sse_v2//:go_default_library",
],
)
161 changes: 0 additions & 161 deletions beacon-chain/rpc/apimiddleware/custom_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,178 +2,17 @@ package apimiddleware

import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"

"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/api/gateway/apimiddleware"
"github.com/prysmaticlabs/prysm/v4/api/grpc"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events"
http2 "github.com/prysmaticlabs/prysm/v4/network/http"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/r3labs/sse/v2"
)

type sszConfig struct {
fileName string
responseJson SszResponse
}

func handleProduceBlockSSZ(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
config := sszConfig{
fileName: "produce_beacon_block.ssz",
responseJson: &VersionedSSZResponseJson{},
}
return handleGetSSZ(m, endpoint, w, req, config)
}

func handleProduceBlindedBlockSSZ(
m *apimiddleware.ApiProxyMiddleware,
endpoint apimiddleware.Endpoint,
w http.ResponseWriter,
req *http.Request,
) (handled bool) {
config := sszConfig{
fileName: "produce_blinded_beacon_block.ssz",
responseJson: &VersionedSSZResponseJson{},
}
return handleGetSSZ(m, endpoint, w, req, config)
}

func handleGetSSZ(
m *apimiddleware.ApiProxyMiddleware,
endpoint apimiddleware.Endpoint,
w http.ResponseWriter,
req *http.Request,
config sszConfig,
) (handled bool) {
ssz := http2.SszRequested(req)
if !ssz {
return false
}

if errJson := prepareSSZRequestForProxying(m, endpoint, req); errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}
grpcResponse, errJson := m.ProxyRequest(req)
if errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}
grpcResponseBody, errJson := apimiddleware.ReadGrpcResponseBody(grpcResponse.Body)
if errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}
respHasError, errJson := apimiddleware.HandleGrpcResponseError(endpoint.Err, grpcResponse, grpcResponseBody, w)
if errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}
if respHasError {
return true
}
if errJson := apimiddleware.DeserializeGrpcResponseBodyIntoContainer(grpcResponseBody, config.responseJson); errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}
respVersion, responseSsz, errJson := serializeMiddlewareResponseIntoSSZ(config.responseJson)
if errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}
if errJson := writeSSZResponseHeaderAndBody(grpcResponse, w, responseSsz, respVersion, config.fileName); errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}
if errJson := apimiddleware.Cleanup(grpcResponse.Body); errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
}

return true
}

func prepareSSZRequestForProxying(m *apimiddleware.ApiProxyMiddleware, endpoint apimiddleware.Endpoint, req *http.Request) apimiddleware.ErrorJson {
req.URL.Scheme = "http"
req.URL.Host = m.GatewayAddress
req.RequestURI = ""
if errJson := apimiddleware.HandleURLParameters(endpoint.Path, req, endpoint.RequestURLLiterals); errJson != nil {
return errJson
}
if errJson := apimiddleware.HandleQueryParameters(req, endpoint.RequestQueryParams); errJson != nil {
return errJson
}
// We have to add new segments after handling parameters because it changes URL segment indexing.
req.URL.Path = "/internal" + req.URL.Path + "/ssz"
return nil
}

func preparePostedSSZData(req *http.Request) apimiddleware.ErrorJson {
buf, err := io.ReadAll(req.Body)
if err != nil {
return apimiddleware.InternalServerErrorWithMessage(err, "could not read body")
}
j := SszRequestJson{Data: base64.StdEncoding.EncodeToString(buf)}
data, err := json.Marshal(j)
if err != nil {
return apimiddleware.InternalServerErrorWithMessage(err, "could not prepare POST data")
}
req.Body = io.NopCloser(bytes.NewBuffer(data))
req.ContentLength = int64(len(data))
req.Header.Set("Content-Type", api.JsonMediaType)
return nil
}

func serializeMiddlewareResponseIntoSSZ(respJson SszResponse) (version string, ssz []byte, errJson apimiddleware.ErrorJson) {
// Serialize the SSZ part of the deserialized value.
data, err := base64.StdEncoding.DecodeString(respJson.SSZData())
if err != nil {
return "", nil, apimiddleware.InternalServerErrorWithMessage(err, "could not decode response body into base64")
}
return strings.ToLower(respJson.SSZVersion()), data, nil
}

func writeSSZResponseHeaderAndBody(grpcResp *http.Response, w http.ResponseWriter, respSsz []byte, respVersion, fileName string) apimiddleware.ErrorJson {
var statusCodeHeader string
for h, vs := range grpcResp.Header {
// We don't want to expose any gRPC metadata in the HTTP response, so we skip forwarding metadata headers.
if strings.HasPrefix(h, "Grpc-Metadata") {
if h == "Grpc-Metadata-"+grpc.HttpCodeMetadataKey {
statusCodeHeader = vs[0]
}
} else {
for _, v := range vs {
w.Header().Set(h, v)
}
}
}
w.Header().Set("Content-Length", strconv.Itoa(len(respSsz)))
w.Header().Set("Content-Type", api.OctetStreamMediaType)
w.Header().Set("Content-Disposition", "attachment; filename="+fileName)
w.Header().Set(api.VersionHeader, respVersion)
if statusCodeHeader != "" {
code, err := strconv.Atoi(statusCodeHeader)
if err != nil {
return apimiddleware.InternalServerErrorWithMessage(err, "could not parse status code")
}
w.WriteHeader(code)
} else {
w.WriteHeader(grpcResp.StatusCode)
}
if _, err := io.Copy(w, io.NopCloser(bytes.NewReader(respSsz))); err != nil {
return apimiddleware.InternalServerErrorWithMessage(err, "could not write response message")
}
return nil
}

func handleEvents(m *apimiddleware.ApiProxyMiddleware, _ apimiddleware.Endpoint, w http.ResponseWriter, req *http.Request) (handled bool) {
sseClient := sse.NewClient("http://" + m.GatewayAddress + "/internal" + req.URL.RequestURI())
sseClient.Headers["Grpc-Timeout"] = "0S"
Expand Down
149 changes: 0 additions & 149 deletions beacon-chain/rpc/apimiddleware/custom_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,165 +4,16 @@ import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/api/gateway/apimiddleware"
"github.com/prysmaticlabs/prysm/v4/api/grpc"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/r3labs/sse/v2"
)

type testSSZResponseJson struct {
Version string `json:"version"`
ExecutionOptimistic bool `json:"execution_optimistic"`
Finalized bool `json:"finalized"`
Data string `json:"data"`
}

func (t testSSZResponseJson) SSZVersion() string {
return t.Version
}

func (t testSSZResponseJson) SSZOptimistic() bool {
return t.ExecutionOptimistic
}

func (t testSSZResponseJson) SSZData() string {
return t.Data
}

func (t testSSZResponseJson) SSZFinalized() bool {
return t.Finalized
}

func TestPrepareSSZRequestForProxying(t *testing.T) {
middleware := &apimiddleware.ApiProxyMiddleware{
GatewayAddress: "http://apimiddleware.example",
}
endpoint := apimiddleware.Endpoint{
Path: "http://foo.example",
}
var body bytes.Buffer
request := httptest.NewRequest("GET", "http://foo.example", &body)

errJson := prepareSSZRequestForProxying(middleware, endpoint, request)
require.Equal(t, true, errJson == nil)
assert.Equal(t, "/internal/ssz", request.URL.Path)
}

func TestPreparePostedSszData(t *testing.T) {
var body bytes.Buffer
body.Write([]byte("body"))
request := httptest.NewRequest("POST", "http://foo.example", &body)

preparePostedSSZData(request)
assert.Equal(t, int64(19), request.ContentLength)
assert.Equal(t, api.JsonMediaType, request.Header.Get("Content-Type"))
}

func TestSerializeMiddlewareResponseIntoSSZ(t *testing.T) {
t.Run("ok", func(t *testing.T) {
j := testSSZResponseJson{
Version: "Version",
Data: "Zm9v",
}
v, ssz, errJson := serializeMiddlewareResponseIntoSSZ(j)
require.Equal(t, true, errJson == nil)
assert.DeepEqual(t, []byte("foo"), ssz)
assert.Equal(t, "version", v)
})

t.Run("invalid_data", func(t *testing.T) {
j := testSSZResponseJson{
Version: "Version",
Data: "invalid",
}
_, _, errJson := serializeMiddlewareResponseIntoSSZ(j)
require.Equal(t, false, errJson == nil)
assert.Equal(t, true, strings.Contains(errJson.Msg(), "could not decode response body into base64"))
assert.Equal(t, http.StatusInternalServerError, errJson.StatusCode())
})
}

func TestWriteSSZResponseHeaderAndBody(t *testing.T) {
responseSsz := []byte("ssz")
version := "version"
fileName := "test.ssz"

t.Run("ok", func(t *testing.T) {
response := &http.Response{
Header: http.Header{
"Foo": []string{"foo"},
"Grpc-Metadata-" + grpc.HttpCodeMetadataKey: []string{"204"},
},
}

writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}

errJson := writeSSZResponseHeaderAndBody(response, writer, responseSsz, version, fileName)
require.Equal(t, true, errJson == nil)
v, ok := writer.Header()["Foo"]
require.Equal(t, true, ok, "header not found")
require.Equal(t, 1, len(v), "wrong number of header values")
assert.Equal(t, "foo", v[0])
v, ok = writer.Header()["Content-Length"]
require.Equal(t, true, ok, "header not found")
require.Equal(t, 1, len(v), "wrong number of header values")
assert.Equal(t, "3", v[0])
v, ok = writer.Header()["Content-Type"]
require.Equal(t, true, ok, "header not found")
require.Equal(t, 1, len(v), "wrong number of header values")
assert.Equal(t, api.OctetStreamMediaType, v[0])
v, ok = writer.Header()["Content-Disposition"]
require.Equal(t, true, ok, "header not found")
require.Equal(t, 1, len(v), "wrong number of header values")
assert.Equal(t, "attachment; filename=test.ssz", v[0])
v, ok = writer.Header()[api.VersionHeader]
require.Equal(t, true, ok, "header not found")
require.Equal(t, 1, len(v), "wrong number of header values")
assert.Equal(t, "version", v[0])
assert.Equal(t, 204, writer.Code)
})

t.Run("no_grpc_status_code_header", func(t *testing.T) {
response := &http.Response{
Header: http.Header{},
StatusCode: 204,
}
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}

errJson := writeSSZResponseHeaderAndBody(response, writer, responseSsz, version, fileName)
require.Equal(t, true, errJson == nil)
assert.Equal(t, 204, writer.Code)
})

t.Run("invalid_status_code", func(t *testing.T) {
response := &http.Response{
Header: http.Header{
"Foo": []string{"foo"},
"Grpc-Metadata-" + grpc.HttpCodeMetadataKey: []string{"invalid"},
},
}
responseSsz := []byte("ssz")
writer := httptest.NewRecorder()
writer.Body = &bytes.Buffer{}

errJson := writeSSZResponseHeaderAndBody(response, writer, responseSsz, version, fileName)
require.Equal(t, false, errJson == nil)
assert.Equal(t, true, strings.Contains(errJson.Msg(), "could not parse status code"))
assert.Equal(t, http.StatusInternalServerError, errJson.StatusCode())
})
}

func TestReceiveEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *sse.Event)
Expand Down
Loading

0 comments on commit 1f250f7

Please sign in to comment.