Skip to content

Commit

Permalink
add metrics and tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
mkysel committed Aug 30, 2024
1 parent 308ef7a commit f88b6f2
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 18 deletions.
45 changes: 38 additions & 7 deletions cmd/replication/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"log"
"os"
"os/signal"
Expand All @@ -11,6 +13,7 @@ import (
"github.com/jessevdk/go-flags"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/metrics"
"github.com/xmtp/xmtpd/pkg/registry"
"github.com/xmtp/xmtpd/pkg/server"
"github.com/xmtp/xmtpd/pkg/tracing"
Expand All @@ -31,11 +34,20 @@ func main() {
return
}

log, _, err := buildLogger(options)
logger, _, err := buildLogger(options)
if err != nil {
fatal("Could not build logger: %s", err)
}

if options.Tracing.Enable {
logger.Info("starting tracer")
tracing.Start(Commit, logger)
defer func() {
logger.Info("stopping tracer")
tracing.Stop()
}()
}

ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
Expand All @@ -49,17 +61,34 @@ func main() {
)

if err != nil {
log.Fatal("initializing database", zap.Error(err))
logger.Fatal("initializing database", zap.Error(err))
}

privateKey, err := utils.ParseEcdsaPrivateKey(options.SignerPrivateKey)
if err != nil {
log.Fatal("parsing private key", zap.Error(err))
}

var mtcs *metrics.Server
if options.Metrics.Enable {
promReg := prometheus.NewRegistry()
promReg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
promReg.MustRegister(collectors.NewGoCollector())

mtcs, err = metrics.NewMetricsServer(ctx,
options.Metrics.Address,
options.Metrics.Port,
logger,
promReg,
)
if err != nil {
logger.Fatal("initializing metrics server", zap.Error(err))
}
}

s, err := server.NewReplicationServer(
ctx,
log,
logger,
options,
// TODO:nm replace with real node registry
registry.NewFixedNodeRegistry(
Expand All @@ -74,10 +103,12 @@ func main() {
},
),
db,
mtcs,
)
if err != nil {
log.Fatal("initializing server", zap.Error(err))
}

s.WaitForShutdown()
doneC <- true
})
Expand All @@ -91,7 +122,7 @@ func main() {
)
select {
case sig := <-sigC:
log.Info("ending on signal", zap.String("signal", sig.String()))
logger.Info("ending on signal", zap.String("signal", sig.String()))
case <-doneC:
}
cancel()
Expand Down Expand Up @@ -126,12 +157,12 @@ func buildLogger(options config.ServerOptions) (*zap.Logger, *zap.Config, error)
EncodeCaller: zapcore.ShortCallerEncoder,
},
}
log, err := cfg.Build()
logger, err := cfg.Build()
if err != nil {
return nil, nil, err
}

log = log.Named("replication")
logger = logger.Named("replication")

return log, &cfg, nil
return logger, &cfg, nil
}
2 changes: 1 addition & 1 deletion dev/run
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ set -eu

. dev/local.env

go run cmd/replication/main.go
go run cmd/replication/main.go "$@"
30 changes: 21 additions & 9 deletions pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,31 @@ type DbOptions struct {
WaitForDB time.Duration `long:"wait-for" env:"XMTPD_DB_WAIT_FOR" description:"wait for DB on start, up to specified duration"`
}

// MetricsOptions are settings used to start a prometheus server
type MetricsOptions struct {
Enable bool `long:"enable" env:"XMTPD_METRICS_ENABLE" description:"Enable the metrics server"`
Address string `long:"metrics-address" env:"XMTPD_METRICS_METRICS_ADDRESS" description:"Listening address of the metrics server" default:"127.0.0.1"`
Port int `long:"metrics-port" env:"XMTPD_METRICS_METRICS_PORT" description:"Listening HTTP port of the metrics server" default:"8008"`
}

type PayerOptions struct {
PrivateKey string `long:"private-key" env:"XMTPD_PAYER_PRIVATE_KEY" description:"Private key used to sign blockchain transactions"`
}

type ServerOptions struct {
LogLevel string `short:"l" long:"log-level" env:"XMTPD_LOG_LEVEL" description:"Define the logging level, supported strings are: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL, and their lower-case forms." default:"INFO"`
//nolint:staticcheck
LogEncoding string ` long:"log-encoding" env:"XMTPD_LOG_ENCODING" description:"Log encoding format. Either console or json" default:"console" choice:"console"`

SignerPrivateKey string `long:"signer-private-key" env:"XMTPD_SIGNER_PRIVATE_KEY" description:"Private key used to sign messages"`
LogLevel string `short:"l" long:"log-level" env:"XMTPD_LOG_LEVEL" description:"Define the logging level, supported strings are: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL, and their lower-case forms." default:"INFO"`
LogEncoding string ` long:"log-encoding" env:"XMTPD_LOG_ENCODING" description:"Log encoding format. Either console or json" default:"console" choice:"console"`
SignerPrivateKey string ` long:"signer-private-key" env:"XMTPD_SIGNER_PRIVATE_KEY" description:"Private key used to sign messages"`

API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"`
Metrics MetricsOptions `group:"Metrics Options" namespace:"metrics"`
Payer PayerOptions `group:"Payer Options" namespace:"payer"`
Tracing TracingOptions `group:"DD APM Tracing Options" namespace:"tracing"`
}

API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"`
Payer PayerOptions `group:"Payer Options" namespace:"payer"`
// TracingOptions are settings controlling collection of DD APM traces and error tracking.
type TracingOptions struct {
Enable bool `long:"enable" env:"XMTPD_TRACING_ENABLE" description:"Enable DD APM trace collection"`
}
67 changes: 67 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metrics

import (
"context"
"fmt"
"github.com/xmtp/xmtpd/pkg/tracing"
"net"
"net/http"

"github.com/pires/go-proxyproto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)

type Server struct {
ctx context.Context
log *zap.Logger
http net.Listener
}

func NewMetricsServer(
ctx context.Context,
address string,
port int,
log *zap.Logger,
reg *prometheus.Registry,
) (*Server, error) {
s := &Server{
ctx: ctx,
log: log.Named("metrics"),
}

addr := fmt.Sprintf("%s:%d", address, port)
httpListener, err := net.Listen("tcp", addr)
s.http = &proxyproto.Listener{Listener: httpListener}
if err != nil {
return nil, err
}
registerCollectors(reg)
srv := http.Server{
Addr: addr,
Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}),
}

go tracing.PanicWrap(ctx, "metrics server", func(_ context.Context) {
s.log.Info("serving metrics http", zap.String("address", s.http.Addr().String()))
err = srv.Serve(s.http)
if err != nil {
s.log.Error("serving http", zap.Error(err))
}
})

return s, nil
}

func (s *Server) Close() error {
return s.http.Close()
}

func registerCollectors(reg prometheus.Registerer) {
//TODO: add metrics here
var cols []prometheus.Collector
for _, col := range cols {
reg.MustRegister(col)
}
}
11 changes: 11 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"database/sql"
"github.com/xmtp/xmtpd/pkg/metrics"
"net"
"os"
"os/signal"
Expand All @@ -24,6 +25,7 @@ type ReplicationServer struct {
registrant *registrant.Registrant
nodeRegistry registry.NodeRegistry
options config.ServerOptions
metrics *metrics.Server
writerDB *sql.DB
// Can add reader DB later if needed
}
Expand All @@ -34,13 +36,15 @@ func NewReplicationServer(
options config.ServerOptions,
nodeRegistry registry.NodeRegistry,
writerDB *sql.DB,
metrics *metrics.Server,
) (*ReplicationServer, error) {
var err error
s := &ReplicationServer{
options: options,
log: log,
nodeRegistry: nodeRegistry,
writerDB: writerDB,
metrics: metrics,
}

s.registrant, err = registrant.NewRegistrant(
Expand Down Expand Up @@ -78,4 +82,11 @@ func (s *ReplicationServer) Shutdown() {
if s.apiServer != nil {
s.apiServer.Close()
}

// Close metrics server.
if s.metrics != nil {
if err := s.metrics.Close(); err != nil {
s.log.Error("stopping metrics", zap.Error(err))
}
}
}
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewTestServer(
API: config.ApiOptions{
Port: 0,
},
}, registry, db)
}, registry, db, nil)
require.NoError(t, err)

return server
Expand Down

0 comments on commit f88b6f2

Please sign in to comment.