Skip to content

Commit

Permalink
Merge pull request #198 from foomo/feature/graceful-shutdown
Browse files Browse the repository at this point in the history
feat(server): update graceful shutdown behaviour
  • Loading branch information
franklinkim authored Mar 25, 2024
2 parents 801110d + ee2aba4 commit 8229068
Show file tree
Hide file tree
Showing 32 changed files with 735 additions and 947 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import (

var (
ErrServerNotRunning = errors.New("server not running")
ErrServerShutdown = errors.New("server is shutting down")
)
80 changes: 53 additions & 27 deletions examples/graceful/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,84 @@ package main
import (
"context"
"net/http"
"sync"
"syscall"
"time"

"github.com/foomo/keel/interfaces"
"github.com/foomo/keel/service"
"go.uber.org/zap"

"github.com/foomo/keel"
"github.com/foomo/keel/log"
)

func main() {
service.DefaultHTTPHealthzAddr = "localhost:9400"

l := zap.NewExample().Named("root")

l.Info("1. starting readiness checks")
go call(l.Named("readiness"), "http://localhost:9400/healthz/readiness")

svr := keel.NewServer(
keel.WithHTTPZapService(true),
keel.WithHTTPViperService(true),
keel.WithHTTPPrometheusService(true),
keel.WithLogger(l.Named("server")),
keel.WithHTTPHealthzService(true),
)

l := svr.Logger()

go waitGroup(svr.CancelContext(), l.With(log.FServiceName("waitGroup")))

// create demo service
svs := http.NewServeMux()
svs.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})

svr.AddService(
service.NewHTTP(l, "demo", "localhost:8080", svs),
service.NewHTTP(l, "http", "localhost:8080", svs),
)

svr.Run()
}

func waitGroup(ctx context.Context, l *zap.Logger) {
var wg sync.WaitGroup
svr.AddCloser(interfaces.CloserFunc(func(ctx context.Context) error {
l := l.Named("closer")
l.Info("closing stuff")
time.Sleep(3 * time.Second)
l.Info("done closing stuff")
return nil
}))

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
l.Info("Break the loop")
return
case <-time.After(3 * time.Second):
l.Info("Hello in a loop")
}

l.Info("3. starting http checks")
go call(l.Named("http"), "http://localhost:8080")

l.Info("4. sleeping for 5 seconds")
time.Sleep(5 * time.Second)

l.Info("5. sending shutdown signal")
if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil {
l.Fatal(err.Error())
}

}()

wg.Wait()
svr.Run()
l.Info("done")
}

func call(l *zap.Logger, url string) {
l = l.With(zap.String("url", url))
for {
func() {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
l.With(zap.Error(err)).Error("failed to create request")
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
l.With(zap.Error(err)).Error("failed to send request")
return
}
l.Info("ok", zap.Int("status", resp.StatusCode))
}()
time.Sleep(time.Second)
}
}
6 changes: 3 additions & 3 deletions examples/healthz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
// See k8s for probe documentation
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#types-of-probe
func main() {
service.DefaultHTTPHealthzAddr = "localhost:9400"

// you can override the below config by settings env vars
_ = os.Setenv("SERVICE_HEALTHZ_ENABLED", "true")

svr := keel.NewServer(
keel.WithHTTPZapService(true),
keel.WithHTTPViperService(true),
// allows you to use probes for health checks in cluster:
// GET :9400/healthz
// GET :9400/healthz/readiness
Expand Down Expand Up @@ -65,7 +65,7 @@ func main() {
select {
case <-time.After(10 * time.Second):
l.Info("initialization done")
case <-svr.CancelContext().Done():
case <-svr.ShutdownContext().Done():
l.Info("initialization canceled")
}

Expand Down
9 changes: 6 additions & 3 deletions examples/persistence/mongo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ func main() {
l := svr.Logger()

cDateTime := &store.DateTimeCodec{}
rb := bson.NewRegistryBuilder()
rb.RegisterCodec(store.TDateTime, cDateTime)
rb := bson.NewRegistry()
rb.RegisterTypeEncoder(store.TDateTime, cDateTime)
rb.RegisterTypeDecoder(store.TDateTime, cDateTime)

// create persistor
persistor, err := keelmongo.New(
Expand All @@ -36,7 +37,9 @@ func main() {
// enable telemetry (enabled by default)
keelmongo.WithOtelEnabled(true),
keelmongo.WithClientOptions(
options.Client().SetRegistry(rb.Build()),
func(clientOptions *options.ClientOptions) {
clientOptions.SetRegistry(rb)
},
),
)
// use log must helper to exit on error
Expand Down
3 changes: 3 additions & 0 deletions examples/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"os"

"github.com/foomo/keel"
"github.com/foomo/keel/service"
)

func main() {
service.DefaultHTTPPrometheusAddr = "localhost:9200"

// you can override the below config by settings env vars
_ = os.Setenv("SERVICE_ZAP_ENABLED", "true")
_ = os.Setenv("SERVICE_VIPER_ENABLED", "true")
Expand Down
45 changes: 34 additions & 11 deletions examples/telemetry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ package main
import (
"math/rand"
"net/http"
"time"

"github.com/foomo/keel/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"

"github.com/foomo/keel"
"github.com/foomo/keel/log"
"github.com/foomo/keel/net/http/middleware"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

var metricRequestLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "demo",
Name: "request_latency_seconds",
Help: "Request Latency",
Buckets: prometheus.ExponentialBuckets(.0001, 2, 50),
})

func main() {
// Run this example with the following env vars:
//
Expand Down Expand Up @@ -48,14 +57,14 @@ func main() {
svs := http.NewServeMux()

{ // counter
counter, err := meter.SyncInt64().Counter(
counter, err := meter.Int64Counter(
"a.counter",
instrument.WithDescription("Count things"),
metric.WithDescription("Count things"),
)
log.Must(l, err, "failed to create counter meter")

svs.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {
counter.Add(r.Context(), 1, attribute.String("key", "value"))
counter.Add(r.Context(), 1, metric.WithAttributes(attribute.String("key", "value")))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
Expand All @@ -70,33 +79,47 @@ func main() {
})

{ // up down
upDown, err := meter.SyncInt64().UpDownCounter(
upDown, err := meter.Int64UpDownCounter(
"a.updown",
instrument.WithDescription("Up down values"),
metric.WithDescription("Up down values"),
)
log.Must(l, err, "failed to create up down meter")

svs.HandleFunc("/up", func(w http.ResponseWriter, r *http.Request) {
upDown.Add(r.Context(), 1, attribute.String("key", "value"))
upDown.Add(r.Context(), 1, metric.WithAttributes(attribute.String("key", "value")))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
svs.HandleFunc("/down", func(w http.ResponseWriter, r *http.Request) {
upDown.Add(r.Context(), -1, attribute.String("key", "value"))
upDown.Add(r.Context(), -1, metric.WithAttributes(attribute.String("key", "value")))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
}

{ // histogram
histogram, err := meter.SyncInt64().Histogram(
histogram, err := meter.Int64Histogram(
"a.histogram",
instrument.WithDescription("Up down values"),
metric.WithDescription("Up down values"),
metric.WithUnit("ms"),
)
log.Must(l, err, "failed to create up down meter")

svs.HandleFunc("/histogram", func(w http.ResponseWriter, r *http.Request) {
histogram.Record(r.Context(), int64(rand.Int()), attribute.String("key", "value"))
start := time.Now()
time.Sleep(time.Second)
traceID := trace.SpanContextFromContext(r.Context())
histogram.Record(r.Context(), int64(rand.Int()),
metric.WithAttributes(
attribute.String("key", "value"),
attribute.String("traceID", traceID.TraceID().String()),
),
)

metricRequestLatency.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": traceID.TraceID().String()},
)

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
Expand Down
Loading

0 comments on commit 8229068

Please sign in to comment.