From 6ef7538f21fd4eeed290ce8e0b74cbc50b50d80e Mon Sep 17 00:00:00 2001 From: Ian Shim <100327837+ian-shim@users.noreply.github.com> Date: Thu, 10 Oct 2024 11:03:04 -0700 Subject: [PATCH] [v2] Node v2 gRPC Entrypoint (#797) --- node/cmd/main.go | 9 +-- node/grpc/run.go | 81 +++++++++++++++++++++++ node/grpc/server.go | 84 ----------------------- node/grpc/server_v2.go | 62 +++++++++++++++++ node/grpc/server_v2_test.go | 128 ++++++++++++++++++++++++++++++++++++ test/integration_test.go | 21 +++--- 6 files changed, 289 insertions(+), 96 deletions(-) create mode 100644 node/grpc/run.go create mode 100644 node/grpc/server_v2.go create mode 100644 node/grpc/server_v2_test.go diff --git a/node/cmd/main.go b/node/cmd/main.go index 00b959213..0225b33d7 100644 --- a/node/cmd/main.go +++ b/node/cmd/main.go @@ -17,7 +17,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigenda/node/flags" - "github.com/Layr-Labs/eigenda/node/grpc" + nodegrpc "github.com/Layr-Labs/eigenda/node/grpc" ) var ( @@ -85,8 +85,9 @@ func NodeMain(ctx *cli.Context) error { } // Creates the GRPC server. - server := grpc.NewServer(config, node, logger, ratelimiter) - server.Start() + server := nodegrpc.NewServer(config, node, logger, ratelimiter) + serverV2 := nodegrpc.NewServerV2(config, node, logger, ratelimiter) + err = nodegrpc.RunServers(server, serverV2, config, logger) - return nil + return err } diff --git a/node/grpc/run.go b/node/grpc/run.go new file mode 100644 index 000000000..5fea03d62 --- /dev/null +++ b/node/grpc/run.go @@ -0,0 +1,81 @@ +package grpc + +import ( + "errors" + "fmt" + "net" + + pb "github.com/Layr-Labs/eigenda/api/grpc/node" + pbv2 "github.com/Layr-Labs/eigenda/api/grpc/node/v2" + "github.com/Layr-Labs/eigenda/common/healthcheck" + "github.com/Layr-Labs/eigenda/node" + "github.com/Layr-Labs/eigensdk-go/logging" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +const localhost = "0.0.0.0" + +func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logger logging.Logger) error { + if serverV1 == nil { + return errors.New("node V1 server is not configured") + } + if serverV2 == nil { + return errors.New("node V2 server is not configured") + } + + go func() { + for { + addr := fmt.Sprintf("%s:%s", localhost, config.InternalDispersalPort) + listener, err := net.Listen("tcp", addr) + if err != nil { + logger.Fatalf("Could not start tcp listener: %v", err) + } + + opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB + gs := grpc.NewServer(opt) + + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) + + pb.RegisterDispersalServer(gs, serverV1) + pbv2.RegisterDispersalServer(gs, serverV2) + + healthcheck.RegisterHealthServer("node.Dispersal", gs) + + logger.Info("port", config.InternalDispersalPort, "address", listener.Addr().String(), "GRPC Listening") + if err := gs.Serve(listener); err != nil { + logger.Error("dispersal server failed; restarting.", "err", err) + } + } + }() + + go func() { + for { + addr := fmt.Sprintf("%s:%s", localhost, config.InternalRetrievalPort) + listener, err := net.Listen("tcp", addr) + if err != nil { + logger.Fatalf("Could not start tcp listener: %v", err) + } + + opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB + gs := grpc.NewServer(opt) + + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) + + pb.RegisterRetrievalServer(gs, serverV1) + pbv2.RegisterRetrievalServer(gs, serverV2) + healthcheck.RegisterHealthServer("node.Retrieval", gs) + + logger.Info("port", config.InternalRetrievalPort, "address", listener.Addr().String(), "GRPC Listening") + if err := gs.Serve(listener); err != nil { + logger.Error("retrieval server failed; restarting.", "err", err) + } + } + }() + + return nil +} diff --git a/node/grpc/server.go b/node/grpc/server.go index 71df57b91..0cba1b187 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -10,12 +10,9 @@ import ( "sync" "time" - "net" - "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/node" @@ -27,14 +24,10 @@ import ( _ "go.uber.org/automaxprocs" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/wrapperspb" ) -const localhost = "0.0.0.0" - // Server implements the Node proto APIs. type Server struct { pb.UnimplementedDispersalServer @@ -63,83 +56,6 @@ func NewServer(config *node.Config, node *node.Node, logger logging.Logger, rate } } -func (s *Server) Start() { - - // TODO: In order to facilitate integration testing with multiple nodes, we need to be able to set the port. - // TODO: Properly implement the health check. - // go func() { - // http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - // w.WriteHeader(http.StatusOK) - // }) - // }() - - // TODO: Add monitoring - go func() { - for { - err := s.serveDispersal() - s.logger.Error("dispersal server failed; restarting.", "err", err) - } - }() - - go func() { - for { - err := s.serveRetrieval() - s.logger.Error("retrieval server failed; restarting.", "err", err) - } - }() -} - -func (s *Server) serveDispersal() error { - - addr := fmt.Sprintf("%s:%s", localhost, s.config.InternalDispersalPort) - listener, err := net.Listen("tcp", addr) - if err != nil { - s.logger.Fatalf("Could not start tcp listener: %v", err) - } - - opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB - gs := grpc.NewServer(opt) - - // Register reflection service on gRPC server - // This makes "grpcurl -plaintext localhost:9000 list" command work - reflection.Register(gs) - - pb.RegisterDispersalServer(gs, s) - healthcheck.RegisterHealthServer("node.Dispersal", gs) - - s.logger.Info("port", s.config.InternalDispersalPort, "address", listener.Addr().String(), "GRPC Listening") - if err := gs.Serve(listener); err != nil { - return err - } - return nil - -} - -func (s *Server) serveRetrieval() error { - addr := fmt.Sprintf("%s:%s", localhost, s.config.InternalRetrievalPort) - listener, err := net.Listen("tcp", addr) - if err != nil { - s.logger.Fatalf("Could not start tcp listener: %v", err) - } - - opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB - gs := grpc.NewServer(opt) - - // Register reflection service on gRPC server - // This makes "grpcurl -plaintext localhost:9000 list" command work - reflection.Register(gs) - - pb.RegisterRetrievalServer(gs, s) - healthcheck.RegisterHealthServer("node.Retrieval", gs) - - s.logger.Info("port", s.config.InternalRetrievalPort, "address", listener.Addr().String(), "GRPC Listening") - if err := gs.Serve(listener); err != nil { - return err - } - return nil - -} - func (s *Server) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.NodeInfoReply, error) { if s.config.DisableNodeInfoResources { return &pb.NodeInfoReply{Semver: node.SemVer}, nil diff --git a/node/grpc/server_v2.go b/node/grpc/server_v2.go new file mode 100644 index 000000000..ca690e294 --- /dev/null +++ b/node/grpc/server_v2.go @@ -0,0 +1,62 @@ +package grpc + +import ( + "context" + "runtime" + "sync" + + "github.com/Layr-Labs/eigenda/api" + pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/node" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/shirou/gopsutil/mem" + "google.golang.org/grpc/codes" +) + +// ServerV2 implements the Node v2 proto APIs. +type ServerV2 struct { + pb.UnimplementedDispersalServer + pb.UnimplementedRetrievalServer + + node *node.Node + config *node.Config + logger logging.Logger + + ratelimiter common.RateLimiter + + mu *sync.Mutex +} + +// NewServerV2 creates a new Server instance with the provided parameters. +func NewServerV2(config *node.Config, node *node.Node, logger logging.Logger, ratelimiter common.RateLimiter) *ServerV2 { + return &ServerV2{ + config: config, + logger: logger, + node: node, + ratelimiter: ratelimiter, + mu: &sync.Mutex{}, + } +} + +func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.NodeInfoReply, error) { + if s.config.DisableNodeInfoResources { + return &pb.NodeInfoReply{Semver: node.SemVer}, nil + } + + memBytes := uint64(0) + v, err := mem.VirtualMemory() + if err == nil { + memBytes = v.Total + } + + return &pb.NodeInfoReply{Semver: node.SemVer, Os: runtime.GOOS, Arch: runtime.GOARCH, NumCpu: uint32(runtime.GOMAXPROCS(0)), MemBytes: memBytes}, nil +} + +func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) { + return &pb.StoreChunksReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") +} + +func (s *ServerV2) GetChunks(context.Context, *pb.GetChunksRequest) (*pb.GetChunksReply, error) { + return &pb.GetChunksReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") +} diff --git a/node/grpc/server_v2_test.go b/node/grpc/server_v2_test.go new file mode 100644 index 000000000..d166624f2 --- /dev/null +++ b/node/grpc/server_v2_test.go @@ -0,0 +1,128 @@ +package grpc_test + +import ( + "context" + "fmt" + "os" + "testing" + + commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" + pbv2 "github.com/Layr-Labs/eigenda/api/grpc/node/v2" + "github.com/Layr-Labs/eigenda/common" + commonmock "github.com/Layr-Labs/eigenda/common/mock" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/node" + "github.com/Layr-Labs/eigenda/node/grpc" + "github.com/Layr-Labs/eigensdk-go/metrics" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func newTestServerV2(t *testing.T, mockValidator bool) *grpc.ServerV2 { + return newTestServerV2WithConfig(t, mockValidator, makeConfig(t)) +} + +func newTestServerV2WithConfig(t *testing.T, mockValidator bool, config *node.Config) *grpc.ServerV2 { + var err error + keyPair, err = core.GenRandomBlsKeys() + if err != nil { + panic("failed to create a BLS Key") + } + opID = [32]byte{} + copy(opID[:], []byte(fmt.Sprintf("%d", 3))) + loggerConfig := common.DefaultLoggerConfig() + logger, err := common.NewLogger(loggerConfig) + if err != nil { + panic("failed to create a logger") + } + + err = os.MkdirAll(config.DbPath, os.ModePerm) + if err != nil { + panic("failed to create a directory for db") + } + noopMetrics := metrics.NewNoopMetrics() + reg := prometheus.NewRegistry() + tx := &coremock.MockTransactor{} + + ratelimiter := &commonmock.NoopRatelimiter{} + + var val core.ShardValidator + + if mockValidator { + mockVal := coremock.NewMockShardValidator() + mockVal.On("ValidateBlobs", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockVal.On("ValidateBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil) + val = mockVal + } else { + + _, v, err := makeTestComponents() + if err != nil { + panic("failed to create test encoder") + } + + asn := &core.StdAssignmentCoordinator{} + + cst, err := coremock.MakeChainDataMock(map[uint8]int{ + 0: 10, + 1: 10, + 2: 10, + }) + if err != nil { + panic("failed to create test encoder") + } + + val = core.NewShardValidator(v, asn, cst, opID) + } + + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx, chainState) + store, err := node.NewLevelDBStore(config.DbPath, logger, metrics, 1e9, 1e9) + if err != nil { + panic("failed to create a new levelDB store") + } + + node := &node.Node{ + Config: config, + Logger: logger, + KeyPair: keyPair, + Metrics: metrics, + Store: store, + ChainState: chainState, + Validator: val, + } + return grpc.NewServerV2(config, node, logger, ratelimiter) +} + +func TestV2NodeInfoRequest(t *testing.T) { + server := newTestServerV2(t, true) + resp, err := server.NodeInfo(context.Background(), &pbv2.NodeInfoRequest{}) + assert.True(t, resp.Semver == "0.0.0") + assert.True(t, err == nil) +} + +func TestV2StoreChunks(t *testing.T) { + server := newTestServerV2(t, true) + _, err := server.StoreChunks(context.Background(), &pbv2.StoreChunksRequest{ + BlobCertificates: []*commonpb.BlobCertificate{}, + }) + assert.ErrorContains(t, err, "not implemented") +} + +func TestV2GetChunks(t *testing.T) { + server := newTestServerV2(t, true) + + _, err := server.GetChunks(context.Background(), &pbv2.GetChunksRequest{ + BlobKey: []byte{0}, + }) + assert.ErrorContains(t, err, "not implemented") +} + +func GetV2BlobCertificate(t *testing.T) { + server := newTestServerV2(t, true) + + _, err := server.GetBlobCertificate(context.Background(), &pbv2.GetBlobCertificateRequest{ + BlobKey: []byte{0}, + }) + assert.ErrorContains(t, err, "not implemented") +} diff --git a/test/integration_test.go b/test/integration_test.go index 6ce9f8bf6..693f75e49 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -43,6 +43,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser/common/inmem" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/node" + "github.com/Layr-Labs/eigenda/node/grpc" nodegrpc "github.com/Layr-Labs/eigenda/node/grpc" nodepb "github.com/Layr-Labs/eigenda/api/grpc/node" @@ -206,8 +207,9 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } type TestOperator struct { - Node *node.Node - Server *nodegrpc.Server + Node *node.Node + ServerV1 *nodegrpc.Server + ServerV2 *nodegrpc.ServerV2 } func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging.Logger) map[core.OperatorID]TestOperator { @@ -315,11 +317,13 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging ratelimiter := &commonmock.NoopRatelimiter{} - s := nodegrpc.NewServer(config, n, logger, ratelimiter) + serverV1 := nodegrpc.NewServer(config, n, logger, ratelimiter) + serverV2 := nodegrpc.NewServerV2(config, n, logger, ratelimiter) ops[id] = TestOperator{ - Node: n, - Server: s, + Node: n, + ServerV1: serverV1, + ServerV2: serverV2, } } @@ -389,7 +393,8 @@ func TestDispersalAndRetrieval(t *testing.T) { assert.NoError(t, err) fmt.Println("Starting server") - go op.Server.Start() + err = grpc.RunServers(op.ServerV1, op.ServerV2, op.Node.Config, logger) + assert.NoError(t, err) } blob := mustMakeTestBlob() @@ -513,7 +518,7 @@ func TestDispersalAndRetrieval(t *testing.T) { fmt.Println("Processing operator: ", hexutil.Encode(op.Node.Config.ID[:])) // check that blob headers can be retrieved from operators - headerReply, err := op.Server.GetBlobHeader(ctx, &nodepb.GetBlobHeaderRequest{ + headerReply, err := op.ServerV1.GetBlobHeader(ctx, &nodepb.GetBlobHeaderRequest{ BatchHeaderHash: batchHeaderHash, BlobIndex: metadata.ConfirmationInfo.BlobIndex, QuorumId: uint32(0), @@ -549,7 +554,7 @@ func TestDispersalAndRetrieval(t *testing.T) { } // check that chunks can be retrieved from operators - chunksReply, err := op.Server.RetrieveChunks(ctx, &nodepb.RetrieveChunksRequest{ + chunksReply, err := op.ServerV1.RetrieveChunks(ctx, &nodepb.RetrieveChunksRequest{ BatchHeaderHash: batchHeaderHash, BlobIndex: metadata.ConfirmationInfo.BlobIndex, QuorumId: uint32(0),