Skip to content

Commit

Permalink
[v2] Node v2 gRPC Entrypoint (Layr-Labs#797)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Oct 10, 2024
1 parent 9910b72 commit 6ef7538
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 96 deletions.
9 changes: 5 additions & 4 deletions node/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigenda/node/flags"
"github.com/Layr-Labs/eigenda/node/grpc"
nodegrpc "github.com/Layr-Labs/eigenda/node/grpc"
)

var (
Expand Down Expand Up @@ -85,8 +85,9 @@ func NodeMain(ctx *cli.Context) error {
}

// Creates the GRPC server.
server := grpc.NewServer(config, node, logger, ratelimiter)
server.Start()
server := nodegrpc.NewServer(config, node, logger, ratelimiter)
serverV2 := nodegrpc.NewServerV2(config, node, logger, ratelimiter)
err = nodegrpc.RunServers(server, serverV2, config, logger)

return nil
return err
}
81 changes: 81 additions & 0 deletions node/grpc/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package grpc

import (
"errors"
"fmt"
"net"

pb "github.com/Layr-Labs/eigenda/api/grpc/node"
pbv2 "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

const localhost = "0.0.0.0"

func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logger logging.Logger) error {
if serverV1 == nil {
return errors.New("node V1 server is not configured")
}
if serverV2 == nil {
return errors.New("node V2 server is not configured")
}

go func() {
for {
addr := fmt.Sprintf("%s:%s", localhost, config.InternalDispersalPort)
listener, err := net.Listen("tcp", addr)
if err != nil {
logger.Fatalf("Could not start tcp listener: %v", err)
}

opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB
gs := grpc.NewServer(opt)

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
reflection.Register(gs)

pb.RegisterDispersalServer(gs, serverV1)
pbv2.RegisterDispersalServer(gs, serverV2)

healthcheck.RegisterHealthServer("node.Dispersal", gs)

logger.Info("port", config.InternalDispersalPort, "address", listener.Addr().String(), "GRPC Listening")
if err := gs.Serve(listener); err != nil {
logger.Error("dispersal server failed; restarting.", "err", err)
}
}
}()

go func() {
for {
addr := fmt.Sprintf("%s:%s", localhost, config.InternalRetrievalPort)
listener, err := net.Listen("tcp", addr)
if err != nil {
logger.Fatalf("Could not start tcp listener: %v", err)
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
reflection.Register(gs)

pb.RegisterRetrievalServer(gs, serverV1)
pbv2.RegisterRetrievalServer(gs, serverV2)
healthcheck.RegisterHealthServer("node.Retrieval", gs)

logger.Info("port", config.InternalRetrievalPort, "address", listener.Addr().String(), "GRPC Listening")
if err := gs.Serve(listener); err != nil {
logger.Error("retrieval server failed; restarting.", "err", err)
}
}
}()

return nil
}
84 changes: 0 additions & 84 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ import (
"sync"
"time"

"net"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node"
Expand All @@ -27,14 +24,10 @@ import (

_ "go.uber.org/automaxprocs"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
)

const localhost = "0.0.0.0"

// Server implements the Node proto APIs.
type Server struct {
pb.UnimplementedDispersalServer
Expand Down Expand Up @@ -63,83 +56,6 @@ func NewServer(config *node.Config, node *node.Node, logger logging.Logger, rate
}
}

func (s *Server) Start() {

// TODO: In order to facilitate integration testing with multiple nodes, we need to be able to set the port.
// TODO: Properly implement the health check.
// go func() {
// http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
// w.WriteHeader(http.StatusOK)
// })
// }()

// TODO: Add monitoring
go func() {
for {
err := s.serveDispersal()
s.logger.Error("dispersal server failed; restarting.", "err", err)
}
}()

go func() {
for {
err := s.serveRetrieval()
s.logger.Error("retrieval server failed; restarting.", "err", err)
}
}()
}

func (s *Server) serveDispersal() error {

addr := fmt.Sprintf("%s:%s", localhost, s.config.InternalDispersalPort)
listener, err := net.Listen("tcp", addr)
if err != nil {
s.logger.Fatalf("Could not start tcp listener: %v", err)
}

opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB
gs := grpc.NewServer(opt)

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
reflection.Register(gs)

pb.RegisterDispersalServer(gs, s)
healthcheck.RegisterHealthServer("node.Dispersal", gs)

s.logger.Info("port", s.config.InternalDispersalPort, "address", listener.Addr().String(), "GRPC Listening")
if err := gs.Serve(listener); err != nil {
return err
}
return nil

}

func (s *Server) serveRetrieval() error {
addr := fmt.Sprintf("%s:%s", localhost, s.config.InternalRetrievalPort)
listener, err := net.Listen("tcp", addr)
if err != nil {
s.logger.Fatalf("Could not start tcp listener: %v", err)
}

opt := grpc.MaxRecvMsgSize(1024 * 1024 * 300) // 300 MiB
gs := grpc.NewServer(opt)

// Register reflection service on gRPC server
// This makes "grpcurl -plaintext localhost:9000 list" command work
reflection.Register(gs)

pb.RegisterRetrievalServer(gs, s)
healthcheck.RegisterHealthServer("node.Retrieval", gs)

s.logger.Info("port", s.config.InternalRetrievalPort, "address", listener.Addr().String(), "GRPC Listening")
if err := gs.Serve(listener); err != nil {
return err
}
return nil

}

func (s *Server) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.NodeInfoReply, error) {
if s.config.DisableNodeInfoResources {
return &pb.NodeInfoReply{Semver: node.SemVer}, nil
Expand Down
62 changes: 62 additions & 0 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package grpc

import (
"context"
"runtime"
"sync"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/shirou/gopsutil/mem"
"google.golang.org/grpc/codes"
)

// ServerV2 implements the Node v2 proto APIs.
type ServerV2 struct {
pb.UnimplementedDispersalServer
pb.UnimplementedRetrievalServer

node *node.Node
config *node.Config
logger logging.Logger

ratelimiter common.RateLimiter

mu *sync.Mutex
}

// NewServerV2 creates a new Server instance with the provided parameters.
func NewServerV2(config *node.Config, node *node.Node, logger logging.Logger, ratelimiter common.RateLimiter) *ServerV2 {
return &ServerV2{
config: config,
logger: logger,
node: node,
ratelimiter: ratelimiter,
mu: &sync.Mutex{},
}
}

func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.NodeInfoReply, error) {
if s.config.DisableNodeInfoResources {
return &pb.NodeInfoReply{Semver: node.SemVer}, nil
}

memBytes := uint64(0)
v, err := mem.VirtualMemory()
if err == nil {
memBytes = v.Total
}

return &pb.NodeInfoReply{Semver: node.SemVer, Os: runtime.GOOS, Arch: runtime.GOARCH, NumCpu: uint32(runtime.GOMAXPROCS(0)), MemBytes: memBytes}, nil
}

func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
return &pb.StoreChunksReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented")
}

func (s *ServerV2) GetChunks(context.Context, *pb.GetChunksRequest) (*pb.GetChunksReply, error) {
return &pb.GetChunksReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented")
}
Loading

0 comments on commit 6ef7538

Please sign in to comment.