Skip to content

Commit

Permalink
[core] Adding http metrics endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Tichák committed Dec 11, 2024
1 parent de4d866 commit bfa4f7e
Show file tree
Hide file tree
Showing 8 changed files with 481 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT))
GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping
SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut
TEST_DIRS := ./apricot/local ./common/gera ./common/utils/safeacks ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment
GO_TEST_DIRS := ./core/repos ./core/integration/dcs
GO_TEST_DIRS := ./core/repos ./core/integration/dcs ./common/monitoring

coverage:COVERAGE_PREFIX := ./coverage_results
coverage:GOTEST_COVERAGE_FILE := $(COVERAGE_PREFIX)/gotest.out
Expand Down
14 changes: 14 additions & 0 deletions common/ecsmetrics/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package ecsmetrics

import (
"time"

"github.com/AliceO2Group/Control/common/monitoring"
)

func NewMetric(name string) monitoring.Metric {
timestamp := time.Now()
metric := monitoring.Metric{Name: name, Timestamp: timestamp.UnixMilli()}
metric.AddTag("subsystem", "ECS")
return metric
}
69 changes: 69 additions & 0 deletions common/ecsmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package ecsmetrics

import (
"fmt"
internalmetrics "runtime/metrics"
"time"

"github.com/AliceO2Group/Control/common/monitoring"
)

var endRequestChannel chan struct{}

func gather() monitoring.Metric {
samples := []internalmetrics.Sample{
{Name: "/gc/cycles/total:gc-cycles"},
{Name: "/memory/classes/other:bytes"},
{Name: "/memory/classes/total:bytes"},
{Name: "/sched/goroutines:goroutines"},
{Name: "/sync/mutex/wait/total:seconds"},
{Name: "/memory/classes/other:bytes"},
{Name: "/memory/classes/total:bytes"},
{Name: "/memory/classes/heap/free:bytes"},
{Name: "/memory/classes/heap/objects:bytes"},
{Name: "/memory/classes/heap/released:bytes"},
{Name: "/memory/classes/heap/stacks:bytes"},
{Name: "/memory/classes/heap/unused:bytes"},
}

// Collect metrics data
internalmetrics.Read(samples)

metric := NewMetric("golangruntimemetrics")

for _, sample := range samples {
switch sample.Value.Kind() {
case internalmetrics.KindUint64:
metric.AddValue(sample.Name, sample.Value.Uint64())
case internalmetrics.KindFloat64:
metric.AddValue(sample.Name, sample.Value.Float64())
case internalmetrics.KindFloat64Histogram:
fmt.Printf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
continue
default:
fmt.Printf("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
continue
}
}
return metric
}

func StartGolangMetrics(period time.Duration) {
go func() {
for {
select {
case <-endRequestChannel:
endRequestChannel <- struct{}{}
return
default:
monitoring.Send(gather())
time.Sleep(period)
}
}
}()
}

func StopGolangMetrics() {
endRequestChannel <- struct{}{}
<-endRequestChannel
}
27 changes: 27 additions & 0 deletions common/monitoring/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package monitoring

type (
TagsType map[string]any
ValuesType map[string]any
)

type Metric struct {
Name string `json:"name"`
Values ValuesType `json:"values"`
Tags TagsType `json:"tags,omitempty"`
Timestamp int64 `json:"timestamp"`
}

func (metric *Metric) AddTag(tagName string, value any) {
if metric.Tags == nil {
metric.Tags = make(TagsType)
}
metric.Tags[tagName] = value
}

func (metric *Metric) AddValue(valueName string, value any) {
if metric.Values == nil {
metric.Values = make(ValuesType)
}
metric.Values[valueName] = value
}
125 changes: 125 additions & 0 deletions common/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package monitoring

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/AliceO2Group/Control/common/logger"
"github.com/sirupsen/logrus"
)

var (
server *http.Server
metricsLimit int = 1000000
metrics []Metric
// channel that is used to request end of metrics server, it sends notification when server ended.
// It needs to be read!!!
endChannel chan struct{}

// channel used to send metrics into the event loop
metricsChannel chan Metric

// channel for sending notifications to event loop that new http Request to report metrics arrived
metricsRequestChannel chan struct{}

// channel used to send metrics to be reported by http request from event loop
metricsToRequest chan []Metric

Log = logger.New(logrus.StandardLogger(), "metrics")
)

func initChannels(messageBufferSize int) {
endChannel = make(chan struct{})
metricsRequestChannel = make(chan struct{})
metricsChannel = make(chan Metric, 100)
metricsToRequest = make(chan []Metric)
metricsLimit = messageBufferSize
}

func closeChannels() {
close(endChannel)
close(metricsRequestChannel)
close(metricsChannel)
close(metricsToRequest)
}

func eventLoop() {
for {
select {
case <-metricsRequestChannel:
shallowCopyMetrics := metrics
metrics = make([]Metric, 0)
metricsToRequest <- shallowCopyMetrics

case metric := <-metricsChannel:
if len(metrics) < metricsLimit {
metrics = append(metrics, metric)
} else {
Log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?")
}

case <-endChannel:
endChannel <- struct{}{}
return
}
}
}

func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
metricsRequestChannel <- struct{}{}
metricsToConvert := <-metricsToRequest
if metricsToConvert == nil {
metricsToConvert = make([]Metric, 0)
}
json.NewEncoder(w).Encode(metricsToConvert)
}

func Send(metric Metric) {
metricsChannel <- metric
}

func handleFunc(endpointName string) {
// recover is here to correctly allow multiple Starts and Stops of server
defer func() {
recover()
}()

http.HandleFunc(endpointName, exportMetricsAndReset)
}

// \param port port where the scraping endpoint will be created
// \param endpointName name of the endpoint, which must start with a slash eg. "/internalmetrics"
// \param messageBufferSize size of buffer for messages where messages are kept between scraping request.
//
// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged.
func Start(port uint16, endpointName string, messageBufferSize int) error {
if server != nil {
return nil
}

initChannels(messageBufferSize)

go eventLoop()

server := &http.Server{Addr: fmt.Sprintf(":%d", port)}
handleFunc(endpointName)
return server.ListenAndServe()
}

func Stop() {
if server == nil {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
server.Shutdown(ctx)

endChannel <- struct{}{}
<-endChannel
server = nil
}
Loading

0 comments on commit bfa4f7e

Please sign in to comment.