Skip to content

Commit

Permalink
pkg/services: add services.Config.NewService/Engine helper (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Jul 18, 2024
1 parent 20d4c18 commit 672e10f
Show file tree
Hide file tree
Showing 9 changed files with 530 additions and 101 deletions.
19 changes: 19 additions & 0 deletions pkg/internal/example/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package example contains helpers for implementing testable examples.
package example

import (
"go.uber.org/zap"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

// Logger returns a logger.Logger which outputs simplified, plaintext logs to std out, without timestamps or caller info.
func Logger() (logger.Logger, error) {
return logger.NewWith(func(config *zap.Config) {
config.OutputPaths = []string{"stdout"}
config.Encoding = "console"
config.EncoderConfig = zap.NewDevelopmentEncoderConfig()
config.EncoderConfig.TimeKey = ""
config.EncoderConfig.CallerKey = ""
})
}
3 changes: 3 additions & 0 deletions pkg/logger/sugared.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type SugaredLogger interface {
// Sugared returns a new SugaredLogger wrapping the given Logger.
// Prefer to store the SugaredLogger for reuse, instead of recreating it as needed.
func Sugared(l Logger) SugaredLogger {
if sl, ok := l.(SugaredLogger); ok {
return sl
}
return &sugared{
Logger: l,
h: Helper(l, 1),
Expand Down
14 changes: 14 additions & 0 deletions pkg/services/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

// HealthReporter should be implemented by any type requiring health checks.
type HealthReporter interface {
// Ready should return nil if ready, or an error message otherwise. From the k8s docs:
// > ready means it’s initialized and clearCond means that it can accept traffic in kubernetes
// See: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
Ready() error
// HealthReport returns a full health report of the callee including its dependencies.
// key is the dep name, value is nil if clearCond, or error message otherwise.
// Use CopyHealth to collect reports from sub-services.
HealthReport() map[string]error
// Name returns the fully qualified name of the component. Usually the logger name.
Name() string
}

// CopyHealth copies health statuses from src to dest. Useful when implementing HealthReporter.HealthReport.
// If duplicate names are encountered, the errors are joined, unless testing in which case a panic is thrown.
func CopyHealth(dest, src map[string]error) {
Expand Down
253 changes: 253 additions & 0 deletions pkg/services/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package services

import (
"context"
"errors"
"fmt"
"sync"

"github.com/google/uuid"
"golang.org/x/exp/maps"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// Service represents a long-running service inside the Application.
//
// The simplest way to implement a Service is via NewService.
//
// For other cases, consider embedding a services.StateMachine to implement these
// calls in a safe manner.
type Service interface {
// Start the service.
// - Must return promptly if the context is cancelled.
// - Must not retain the context after returning (only applies to start-up)
// - Must not depend on external resources (no blocking network calls)
Start(context.Context) error
// Close stops the Service.
// Invariants: Usually after this call the Service cannot be started
// again, you need to build a new Service to do so.
//
// See MultiCloser
Close() error

HealthReporter
}

// Engine manages service internals like health, goroutine tracking, and shutdown signals.
// See Config.NewServiceEngine
type Engine struct {
StopChan
logger.SugaredLogger

wg sync.WaitGroup

emitHealthErr func(error)
conds map[string]error
condsMu sync.RWMutex
}

// Go runs fn in a tracked goroutine that will block closing the service.
func (e *Engine) Go(fn func(context.Context)) {
e.wg.Add(1)
go func() {
defer e.wg.Done()
ctx, cancel := e.StopChan.NewCtx()
defer cancel()
fn(ctx)
}()
}

// GoTick is like Go but calls fn for each tick.
//
// v.e.GoTick(services.NewTicker(time.Minute), v.method)
func (e *Engine) GoTick(ticker *timeutil.Ticker, fn func(context.Context)) {
e.Go(func(ctx context.Context) {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fn(ctx)
}
}
})
}

// EmitHealthErr records an error to be reported via the next call to Healthy().
func (e *Engine) EmitHealthErr(err error) { e.emitHealthErr(err) }

// SetHealthCond records a condition key and an error, which causes an unhealthy report, until ClearHealthCond(condition) is called.
// condition keys are for internal use only, and do not show up in the health report.
func (e *Engine) SetHealthCond(condition string, err error) {
e.condsMu.Lock()
defer e.condsMu.Unlock()
e.conds[condition] = fmt.Errorf("%s: %e", condition, err)
}

// ClearHealthCond removes a condition and error recorded by SetHealthCond.
func (e *Engine) ClearHealthCond(condition string) {
e.condsMu.Lock()
defer e.condsMu.Unlock()
delete(e.conds, condition)
}

// NewHealthCond causes an unhealthy report, until the returned clear func() is called.
// Use this for simple cases where the func() can be kept in scope, and prefer to defer it inline if possible:
//
// defer NewHealthCond(fmt.Errorf("foo bar: %i", err))()
//
// See SetHealthCond for an alternative API.
func (e *Engine) NewHealthCond(err error) (clear func()) {
cond := uuid.NewString()
e.SetHealthCond(cond, err)
return func() { e.ClearHealthCond(cond) }
}

func (e *Engine) clearCond() error {
e.condsMu.RLock()
errs := maps.Values(e.conds)
e.condsMu.RUnlock()
return errors.Join(errs...)
}

// Config is a configuration for constructing a Service, typically with an Engine, to be embedded and extended as part
// of a Service implementation.
type Config struct {
// Name is required. It will be logged shorthand on Start and Close, and appended to the logger name.
// It must be unique among services sharing the same logger, in order to ensure uniqueness of the fully qualified name.
Name string
// NewSubServices is an optional constructor for dependent Services to Start and Close along with this one.
NewSubServices func(logger.Logger) []Service
// Start is an optional hook called after starting SubServices.
Start func(context.Context) error
// Close is an optional hook called before closing SubServices.
Close func() error
}

// NewServiceEngine returns a new Service defined by Config, and an Engine for managing health, goroutines, and logging.
// - You *should* embed the Service, in order to inherit the methods.
// - You *should not* embed the Engine. Use an unexported field instead.
//
// For example:
//
// type myType struct {
// services.Service
// env *service.Engine
// }
// t := myType{}
// t.Service, t.eng = service.Config{
// Name: "MyType",
// Start: t.start,
// Close: t.close,
// }.NewServiceEngine(lggr)
func (c Config) NewServiceEngine(lggr logger.Logger) (Service, *Engine) {
s := c.new(logger.Sugared(lggr))
return s, &s.eng
}

// NewService returns a new Service defined by Config.
// - You *should* embed the Service, in order to inherit the methods.
func (c Config) NewService(lggr logger.Logger) Service {
return c.new(logger.Sugared(lggr))
}

func (c Config) new(lggr logger.SugaredLogger) *service {
lggr = lggr.Named(c.Name)
s := &service{
cfg: c,
eng: Engine{
StopChan: make(StopChan),
SugaredLogger: lggr,
conds: make(map[string]error),
},
}
s.eng.emitHealthErr = s.StateMachine.SvcErrBuffer.Append
if c.NewSubServices != nil {
s.subs = c.NewSubServices(lggr)
}
return s
}

type service struct {
StateMachine
cfg Config
eng Engine
subs []Service
}

// Ready implements [HealthReporter.Ready] and overrides and extends [utils.StartStopOnce.Ready()] to include [Config.SubServices]
// readiness as well.
func (s *service) Ready() (err error) {
err = s.StateMachine.Ready()
for _, sub := range s.subs {
err = errors.Join(err, sub.Ready())
}
return
}

// Healthy overrides [StateMachine.Healthy] and extends it to include Engine errors as well.
// Do not override this method in your service. Instead, report errors via the Engine.
func (s *service) Healthy() (err error) {
err = s.StateMachine.Healthy()
if err == nil {
err = s.eng.clearCond()
}
return
}

func (s *service) HealthReport() map[string]error {
m := map[string]error{s.Name(): s.Healthy()}
for _, sub := range s.subs {
CopyHealth(m, sub.HealthReport())
}
return m
}

func (s *service) Name() string { return s.eng.SugaredLogger.Name() }

func (s *service) Start(ctx context.Context) error {
return s.StartOnce(s.cfg.Name, func() error {
s.eng.Info("Starting")
if len(s.subs) > 0 {
var ms MultiStart
s.eng.Infof("Starting %d sub-services", len(s.subs))
for _, sub := range s.subs {
if err := ms.Start(ctx, sub); err != nil {
s.eng.Errorw("Failed to start sub-service", "error", err)
return fmt.Errorf("failed to start sub-service of %s: %w", s.cfg.Name, err)
}
}
}
if s.cfg.Start != nil {
if err := s.cfg.Start(ctx); err != nil {
return fmt.Errorf("failed to start service %s: %w", s.cfg.Name, err)
}
}
s.eng.Info("Started")
return nil
})
}

func (s *service) Close() error {
return s.StopOnce(s.cfg.Name, func() (err error) {
s.eng.Info("Closing")
defer s.eng.Info("Closed")

close(s.eng.StopChan)
s.eng.wg.Wait()

if s.cfg.Close != nil {
err = s.cfg.Close()
}

if len(s.subs) == 0 {
return
}
s.eng.Infof("Closing %d sub-services", len(s.subs))
err = errors.Join(err, MultiCloser(s.subs).Close())
return
})
}
Loading

0 comments on commit 672e10f

Please sign in to comment.