Skip to content

Commit

Permalink
Integrate pprof to encoder and node (#935)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Nov 26, 2024
1 parent f6732a5 commit b3a90d1
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 12 deletions.
2 changes: 2 additions & 0 deletions disperser/cmd/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func NewConfig(ctx *cli.Context) (Config, error) {
RequestPoolSize: ctx.GlobalInt(flags.RequestPoolSizeFlag.Name),
EnableGnarkChunkEncoding: ctx.Bool(flags.EnableGnarkChunkEncodingFlag.Name),
PreventReencoding: ctx.Bool(flags.PreventReencodingFlag.Name),
PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name),
EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name),
},
MetricsConfig: encoder.MetrisConfig{
HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name),
Expand Down
15 changes: 15 additions & 0 deletions disperser/cmd/encoder/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ var (
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "PREVENT_REENCODING"),
}
PprofHttpPort = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "pprof-http-port"),
Usage: "the http port which the pprof server is listening",
Required: false,
Value: "6060",
EnvVar: common.PrefixEnvVar(envVarPrefix, "PPROF_HTTP_PORT"),
}
EnablePprof = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-pprof"),
Usage: "start prrof server",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_PPROF"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -88,6 +101,8 @@ var optionalFlags = []cli.Flag{
EncoderVersionFlag,
S3BucketNameFlag,
PreventReencodingFlag,
PprofHttpPort,
EnablePprof,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
2 changes: 2 additions & 0 deletions disperser/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ type ServerConfig struct {
RequestPoolSize int
EnableGnarkChunkEncoding bool
PreventReencoding bool
PprofHttpPort string
EnablePprof bool
}
29 changes: 17 additions & 12 deletions disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,26 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common/healthcheck"
commonpprof "github.com/Layr-Labs/eigenda/common/pprof"
"github.com/Layr-Labs/eigenda/disperser"
pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

type EncoderServer struct {
pb.UnimplementedEncoderServer

config ServerConfig
logger logging.Logger
prover encoding.Prover
metrics *Metrics
config ServerConfig
logger logging.Logger
prover encoding.Prover
metrics *Metrics
grpcMetrics *grpcprom.ServerMetrics
close func()
close func()

runningRequests chan struct{}
requestPool chan blobRequest
Expand All @@ -46,10 +47,10 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin
metrics.SetQueueCapacity(config.RequestPoolSize)

return &EncoderServer{
config: config,
logger: logger.With("component", "EncoderServer"),
prover: prover,
metrics: metrics,
config: config,
logger: logger.With("component", "EncoderServer"),
prover: prover,
metrics: metrics,
grpcMetrics: grpcMetrics,

runningRequests: make(chan struct{}, config.MaxConcurrentRequests),
Expand All @@ -59,6 +60,12 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin
}

func (s *EncoderServer) Start() error {
pprofProfiler := commonpprof.NewPprofProfiler(s.config.PprofHttpPort, s.logger)
if s.config.EnablePprof {
go pprofProfiler.Start()
s.logger.Info("Enabled pprof for encoder server", "port", s.config.PprofHttpPort)
}

// Serve grpc requests
addr := fmt.Sprintf("%s:%s", disperser.Localhost, s.config.GrpcPort)
listener, err := net.Listen("tcp", addr)
Expand Down Expand Up @@ -104,8 +111,6 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques
blobSize := len(req.GetData())
sizeBucket := common.BlobSizeBucket(blobSize)



select {
case s.requestPool <- blobRequest{blobSizeByte: blobSize}:
s.queueLock.Lock()
Expand Down
5 changes: 5 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type Config struct {

EnableV2 bool
OnchainStateRefreshInterval time.Duration

PprofHttpPort string
EnablePprof bool
}

// NewConfig parses the Config from the provided flags or environment variables and
Expand Down Expand Up @@ -237,5 +240,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
BLSRemoteSignerEnabled: blsRemoteSignerEnabled,
EnableV2: ctx.GlobalBool(flags.EnableV2Flag.Name),
OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshIntervalFlag.Name),
PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name),
EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name),
}, nil
}
15 changes: 15 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,19 @@ var (
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "BLS_SIGNER_CERT_FILE"),
}
PprofHttpPort = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "pprof-http-port"),
Usage: "the http port which the pprof server is listening",
Required: false,
Value: "6060",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "PPROF_HTTP_PORT"),
}
EnablePprof = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-pprof"),
Usage: "start prrof server",
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_PPROF"),
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -361,6 +374,8 @@ var optionalFlags = []cli.Flag{
BLSSignerCertFileFlag,
EnableV2Flag,
OnchainStateRefreshIntervalFlag,
PprofHttpPort,
EnablePprof,
}

func init() {
Expand Down
6 changes: 6 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common/kvstore/tablestore"
"github.com/Layr-Labs/eigenda/common/pprof"
"github.com/Layr-Labs/eigenda/common/pubip"
"github.com/Layr-Labs/eigenda/encoding/kzg/verifier"

Expand Down Expand Up @@ -280,6 +281,11 @@ func NewNode(
// Start starts the Node. If the node is not registered, register it on chain, otherwise just
// update its socket on chain.
func (n *Node) Start(ctx context.Context) error {
pprofProfiler := pprof.NewPprofProfiler(n.Config.PprofHttpPort, n.Logger)
if n.Config.EnablePprof {
go pprofProfiler.Start()
n.Logger.Info("Enabled pprof for Node", "port", n.Config.PprofHttpPort)
}
if n.Config.EnableMetrics {
n.Metrics.Start()
n.Logger.Info("Enabled metrics", "socket", n.Metrics.socketAddr)
Expand Down

0 comments on commit b3a90d1

Please sign in to comment.