From b3a90d1e716cba8a0bcddaa6d62ce2b157a08af1 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Tue, 26 Nov 2024 15:58:50 -0800 Subject: [PATCH] Integrate pprof to encoder and node (#935) --- disperser/cmd/encoder/config.go | 2 ++ disperser/cmd/encoder/flags/flags.go | 15 ++++++++++++++ disperser/encoder/config.go | 2 ++ disperser/encoder/server.go | 29 ++++++++++++++++------------ node/config.go | 5 +++++ node/flags/flags.go | 15 ++++++++++++++ node/node.go | 6 ++++++ 7 files changed, 62 insertions(+), 12 deletions(-) diff --git a/disperser/cmd/encoder/config.go b/disperser/cmd/encoder/config.go index 69c0ff640..4003adcf9 100644 --- a/disperser/cmd/encoder/config.go +++ b/disperser/cmd/encoder/config.go @@ -58,6 +58,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { RequestPoolSize: ctx.GlobalInt(flags.RequestPoolSizeFlag.Name), EnableGnarkChunkEncoding: ctx.Bool(flags.EnableGnarkChunkEncodingFlag.Name), PreventReencoding: ctx.Bool(flags.PreventReencodingFlag.Name), + PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name), + EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name), }, MetricsConfig: encoder.MetrisConfig{ HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), diff --git a/disperser/cmd/encoder/flags/flags.go b/disperser/cmd/encoder/flags/flags.go index d14b45e04..8c9399a39 100644 --- a/disperser/cmd/encoder/flags/flags.go +++ b/disperser/cmd/encoder/flags/flags.go @@ -73,6 +73,19 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "PREVENT_REENCODING"), } + PprofHttpPort = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "pprof-http-port"), + Usage: "the http port which the pprof server is listening", + Required: false, + Value: "6060", + EnvVar: common.PrefixEnvVar(envVarPrefix, "PPROF_HTTP_PORT"), + } + EnablePprof = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-pprof"), + Usage: "start prrof server", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_PPROF"), + } ) var requiredFlags = []cli.Flag{ @@ -88,6 +101,8 @@ var optionalFlags = []cli.Flag{ EncoderVersionFlag, S3BucketNameFlag, PreventReencodingFlag, + PprofHttpPort, + EnablePprof, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/encoder/config.go b/disperser/encoder/config.go index 8fcba36cd..b543efe7b 100644 --- a/disperser/encoder/config.go +++ b/disperser/encoder/config.go @@ -10,4 +10,6 @@ type ServerConfig struct { RequestPoolSize int EnableGnarkChunkEncoding bool PreventReencoding bool + PprofHttpPort string + EnablePprof bool } diff --git a/disperser/encoder/server.go b/disperser/encoder/server.go index 4eb0c39f6..f7f06e682 100644 --- a/disperser/encoder/server.go +++ b/disperser/encoder/server.go @@ -10,12 +10,13 @@ import ( "time" "github.com/Layr-Labs/eigenda/common/healthcheck" + commonpprof "github.com/Layr-Labs/eigenda/common/pprof" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder" - grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/Layr-Labs/eigenda/disperser/common" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ) @@ -23,12 +24,12 @@ import ( type EncoderServer struct { pb.UnimplementedEncoderServer - config ServerConfig - logger logging.Logger - prover encoding.Prover - metrics *Metrics + config ServerConfig + logger logging.Logger + prover encoding.Prover + metrics *Metrics grpcMetrics *grpcprom.ServerMetrics - close func() + close func() runningRequests chan struct{} requestPool chan blobRequest @@ -46,10 +47,10 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin metrics.SetQueueCapacity(config.RequestPoolSize) return &EncoderServer{ - config: config, - logger: logger.With("component", "EncoderServer"), - prover: prover, - metrics: metrics, + config: config, + logger: logger.With("component", "EncoderServer"), + prover: prover, + metrics: metrics, grpcMetrics: grpcMetrics, runningRequests: make(chan struct{}, config.MaxConcurrentRequests), @@ -59,6 +60,12 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin } func (s *EncoderServer) Start() error { + pprofProfiler := commonpprof.NewPprofProfiler(s.config.PprofHttpPort, s.logger) + if s.config.EnablePprof { + go pprofProfiler.Start() + s.logger.Info("Enabled pprof for encoder server", "port", s.config.PprofHttpPort) + } + // Serve grpc requests addr := fmt.Sprintf("%s:%s", disperser.Localhost, s.config.GrpcPort) listener, err := net.Listen("tcp", addr) @@ -104,8 +111,6 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques blobSize := len(req.GetData()) sizeBucket := common.BlobSizeBucket(blobSize) - - select { case s.requestPool <- blobRequest{blobSizeByte: blobSize}: s.queueLock.Lock() diff --git a/node/config.go b/node/config.go index 7d73bdcb2..c5244e55b 100644 --- a/node/config.go +++ b/node/config.go @@ -91,6 +91,9 @@ type Config struct { EnableV2 bool OnchainStateRefreshInterval time.Duration + + PprofHttpPort string + EnablePprof bool } // NewConfig parses the Config from the provided flags or environment variables and @@ -237,5 +240,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { BLSRemoteSignerEnabled: blsRemoteSignerEnabled, EnableV2: ctx.GlobalBool(flags.EnableV2Flag.Name), OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshIntervalFlag.Name), + PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name), + EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name), }, nil } diff --git a/node/flags/flags.go b/node/flags/flags.go index b16e2709e..40c1237a7 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -313,6 +313,19 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "BLS_SIGNER_CERT_FILE"), } + PprofHttpPort = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "pprof-http-port"), + Usage: "the http port which the pprof server is listening", + Required: false, + Value: "6060", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "PPROF_HTTP_PORT"), + } + EnablePprof = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-pprof"), + Usage: "start prrof server", + Required: false, + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_PPROF"), + } ) var requiredFlags = []cli.Flag{ @@ -361,6 +374,8 @@ var optionalFlags = []cli.Flag{ BLSSignerCertFileFlag, EnableV2Flag, OnchainStateRefreshIntervalFlag, + PprofHttpPort, + EnablePprof, } func init() { diff --git a/node/node.go b/node/node.go index 550228e9c..07055e55c 100644 --- a/node/node.go +++ b/node/node.go @@ -18,6 +18,7 @@ import ( "time" "github.com/Layr-Labs/eigenda/common/kvstore/tablestore" + "github.com/Layr-Labs/eigenda/common/pprof" "github.com/Layr-Labs/eigenda/common/pubip" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" @@ -280,6 +281,11 @@ func NewNode( // Start starts the Node. If the node is not registered, register it on chain, otherwise just // update its socket on chain. func (n *Node) Start(ctx context.Context) error { + pprofProfiler := pprof.NewPprofProfiler(n.Config.PprofHttpPort, n.Logger) + if n.Config.EnablePprof { + go pprofProfiler.Start() + n.Logger.Info("Enabled pprof for Node", "port", n.Config.PprofHttpPort) + } if n.Config.EnableMetrics { n.Metrics.Start() n.Logger.Info("Enabled metrics", "socket", n.Metrics.socketAddr)