Skip to content

Commit

Permalink
move types from core to support Solana plugin command extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 9, 2023
1 parent 9d58ef1 commit 1809045
Show file tree
Hide file tree
Showing 7 changed files with 590 additions and 2 deletions.
35 changes: 35 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package loop

import (
"fmt"
"os"
"strconv"
)

var EnvPromPort = "CL_PROMETHEUS_PORT"

// EnvConfig is the configuration interface between the application and the LOOP executable. The values
// are fully resolved and static and passed via the environment.
type EnvConfig interface {
PrometheusPort() int
}

// getEnvConfig deserializes LOOP-specific environment variables to an EnvConfig
func getEnvConfig() (EnvConfig, error) {
promPortStr := os.Getenv(EnvPromPort)
promPort, err := strconv.Atoi(promPortStr)
if err != nil {
return nil, fmt.Errorf("failed to parse %s = %q: %w", EnvPromPort, promPortStr, err)
}

return &envConfig{prometheusPort: promPort}, nil
}

// envConfig is an implementation of EnvConfig.
type envConfig struct {
prometheusPort int
}

func (e *envConfig) PrometheusPort() int {
return e.prometheusPort
}
42 changes: 42 additions & 0 deletions pkg/loop/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package loop

import (
"sync"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/services"
)

// Plugin is a base layer for plugins to easily manage sub-[types.Service]s.
// Useful for implementing PluginRelayer and PluginMedian.
type Plugin struct {
Logger logger.Logger

mu sync.RWMutex
ss []services.Service
}

func (p *Plugin) Ready() error { return nil }
func (p *Plugin) Name() string { return p.Logger.Name() }

func (p *Plugin) SubService(s services.Service) {
p.mu.Lock()
p.ss = append(p.ss, s)
p.mu.Unlock()
}

func (p *Plugin) HealthReport() map[string]error {
hr := map[string]error{p.Name(): nil}
p.mu.RLock()
defer p.mu.RUnlock()
for _, s := range p.ss {
services.CopyHealth(hr, s.HealthReport())
}
return hr
}

func (p *Plugin) Close() (err error) {
p.mu.RLock()
defer p.mu.RUnlock()
return services.MultiCloser(p.ss).Close()
}
109 changes: 109 additions & 0 deletions pkg/loop/prom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package loop

import (
"errors"
"fmt"
"net"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/net/context"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
)

type promServer struct {
port int
srvrDone chan struct{} // closed when the http server is done
srvr *http.Server
tcpListener *net.TCPListener
lggr logger.Logger

handler http.Handler
}

type promServerOpt func(*promServer)

func withHandler(h http.Handler) promServerOpt {
return func(s *promServer) {
s.handler = h
}
}

// TODO there is one core/web test using this....
func newPromServer(port int, lggr logger.Logger, opts ...promServerOpt) *promServer {
s := &promServer{
port: port,
lggr: lggr,
srvrDone: make(chan struct{}),
srvr: &http.Server{
// reasonable default based on typical prom poll interval of 15s.
ReadTimeout: 5 * time.Second,
},

handler: promhttp.HandlerFor(
prometheus.DefaultGatherer,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
),
}

for _, opt := range opts {
opt(s)
}

return s
}

// Start starts HTTP server on specified port to handle metrics requests
func (p *promServer) Start() error {
p.lggr.Debugf("Starting prom server on port %d", p.port)
err := p.setupListener()
if err != nil {
return err
}

http.Handle("/metrics", p.handler)

go func() {
defer close(p.srvrDone)
err := p.srvr.Serve(p.tcpListener)
if errors.Is(err, net.ErrClosed) {
// ErrClose is expected on gracefully shutdown
p.lggr.Warnf("%s closed", p.Name())
} else {
p.lggr.Errorf("%s: %s", p.Name(), err)
}

}()
return nil
}

// Close shuts down the underlying HTTP server. See [http.Server.Close] for details
func (p *promServer) Close() error {
err := p.srvr.Shutdown(context.Background())
<-p.srvrDone
return err
}

// Name of the server
func (p *promServer) Name() string {
return fmt.Sprintf("%s-prom-server", p.lggr.Name())
}

// setupListener creates explicit listener so that we can resolve `:0` port, which is needed for testing
// if we didn't need the resolved addr, or could pick a static port we could use p.srvr.ListenAndServer
func (p *promServer) setupListener() error {
l, err := net.ListenTCP("tcp", &net.TCPAddr{
Port: p.port,
})
if err != nil {
return err
}

p.tcpListener = l
return nil
}
56 changes: 56 additions & 0 deletions pkg/loop/prom_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package loop

import (
"fmt"
"io"
"net"
"net/http"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
)

func TestPromServer(t *testing.T) {

testReg := prometheus.NewRegistry()
testHandler := promhttp.HandlerFor(testReg, promhttp.HandlerOpts{})
testMetric := prometheus.NewCounter(prometheus.CounterOpts{
Name: "test_metric",
})
testReg.MustRegister(testMetric)
testMetric.Inc()

s := newPromServer(0, logger.Test(t), withHandler(testHandler))
// check that port is not resolved yet
require.Equal(t, -1, s.Port())
require.NoError(t, s.Start())

url := fmt.Sprintf("http://localhost:%d/metrics", s.Port())
resp, err := http.Get(url) //nolint
require.NoError(t, err)
require.NoError(t, err, "endpoint %s", url)
require.NotNil(t, resp.Body)
b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(b), "test_metric")
defer resp.Body.Close()

require.NoError(t, s.Close())
}

// Port is the resolved port and is only known after Start().
// returns -1 before it is resolved or if there was an error during resolution.
func (p *promServer) Port() int {
if p.tcpListener == nil {
return -1
}
// always safe to cast because we explicitly have a tcp listener
// there is direct access to Port without the addr casting
// Note: addr `:0` is not resolved to non-zero port until ListenTCP is called
// net.ResolveTCPAddr sounds promising, but doesn't work in practice
return p.tcpListener.Addr().(*net.TCPAddr).Port
}
135 changes: 135 additions & 0 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package loop

import (
"fmt"
"os"
"sync"

"github.com/smartcontractkit/chainlink-relay/pkg/logger"
"github.com/smartcontractkit/chainlink-relay/pkg/services"
)

// NewStartedServer returns a started Server.
// The caller is responsible for calling Server.Stop().
func NewStartedServer(loggerName string) (*Server, error) {
s, err := newServer(loggerName)
if err != nil {
return nil, err
}
err = s.start()
if err != nil {
return nil, err
}

return s, nil
}

// MustNewStartedServer returns a new started Server like NewStartedServer, but logs and exits in the event of error.
// The caller is responsible for calling Server.Stop().
func MustNewStartedServer(loggerName string) *Server {
s, err := newServer(loggerName)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to start server: %s\n", err)
os.Exit(1)
}
err = s.start()
if err != nil {
s.Logger.Fatalf("Failed to start server: %s", err)
}

return s
}

// Server holds common plugin server fields.
type Server struct {
GRPCOpts GRPCOpts
Logger logger.SugaredLogger
promServer *promServer
checker *services.HealthChecker
}

func newServer(loggerName string) (*Server, error) {
s := &Server{
// default prometheus.Registerer
GRPCOpts: SetupTelemetry(nil),
}

lggr, err := NewLogger()
if err != nil {
return nil, fmt.Errorf("error creating logger: %s", err)
}
lggr = logger.Named(lggr, loggerName)
s.Logger = logger.Sugared(lggr)
return s, nil
}

func (s *Server) start() error {
envCfg, err := getEnvConfig()
if err != nil {
return fmt.Errorf("error getting environment configuration: %w", err)
}
s.promServer = newPromServer(envCfg.PrometheusPort(), s.Logger)
err = s.promServer.Start()
if err != nil {
return fmt.Errorf("error starting prometheus server: %w", err)
}

s.checker = services.NewChecker()
err = s.checker.Start()
if err != nil {
return fmt.Errorf("error starting health checker: %w", err)
}

return nil
}

// MustRegister registers the HealthReporter with services.HealthChecker, or exits upon failure.
func (s *Server) MustRegister(c services.HealthReporter) {
if err := s.Register(c); err != nil {
s.Logger.Fatalf("Failed to register %s with health checker: %v", c.Name(), err)
}
}

func (s *Server) Register(c services.HealthReporter) error { return s.checker.Register(c) }

// Stop closes resources and flushes logs.
func (s *Server) Stop() {
s.Logger.ErrorIfFn(s.checker.Close, "Failed to close health checker")
s.Logger.ErrorIfFn(s.promServer.Close, "Failed to close prometheus server")
if err := s.Logger.Sync(); err != nil {
fmt.Println("Failed to sync logger:", err)
}
}

// MultiService is a base layer for a plugin services.Service to easily manage sub-[services.Service]s.
type MultiService struct {
Logger logger.Logger

mu sync.RWMutex
srvs []services.Service
}

func (p *MultiService) Ready() error { return nil }
func (p *MultiService) Name() string { return p.Logger.Name() }

func (p *MultiService) SubService(s services.Service) {
p.mu.Lock()
p.srvs = append(p.srvs, s)
p.mu.Unlock()
}

func (p *MultiService) HealthReport() map[string]error {
hr := map[string]error{p.Name(): nil}
p.mu.RLock()
defer p.mu.RUnlock()
for _, s := range p.srvs {
services.CopyHealth(hr, s.HealthReport())
}
return hr
}

func (p *MultiService) Close() (err error) {
p.mu.RLock()
defer p.mu.RUnlock()
return services.MultiCloser(p.srvs).Close()
}
Loading

0 comments on commit 1809045

Please sign in to comment.