diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 3128731ddbe..58fdee03cc3 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/8.5\...main[View commits] ==== Bug fixes - Set `message` instead of `labels.event` for Jaeger span events {pull}8765[8765] - Fix event loss during reload of TBS processor {pull}8809[8809] +- Fix sporadically missing custom libbeat metrics {pull}8900[8900] [float] ==== Intake API Changes diff --git a/internal/beater/beater.go b/internal/beater/beater.go index b3988bdef79..5aa743e8a05 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -235,7 +235,10 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b * if b.Config != nil { reloader.outputConfig = b.Config.Output } - if err := reloader.reload(); err != nil { + reloader.mu.Lock() + err := reloader.reload() + reloader.mu.Unlock() + if err != nil { return err } } @@ -289,6 +292,7 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { } r.mu.Lock() + defer r.mu.Unlock() r.rawConfig = integrationConfig.APMServer // Merge in datastream namespace passed in from apm integration if integrationConfig.DataStream != nil && integrationConfig.DataStream.Namespace != "" { @@ -300,7 +304,6 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error { } } r.fleetConfig = &integrationConfig.Fleet - r.mu.Unlock() return r.reload() } @@ -312,14 +315,16 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error { } } r.mu.Lock() + defer r.mu.Unlock() r.outputConfig = outputConfig - r.mu.Unlock() return r.reload() } +// reload creates a new serverRunner, launches it in a new goroutine, waits +// for it to have successfully started and returns after waiting for the previous +// serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly +// by acquiring reloader#mu by callers. func (r *reloader) reload() error { - r.mu.Lock() - defer r.mu.Unlock() if r.rawConfig == nil { // APM Server config not loaded yet. return nil @@ -345,6 +350,17 @@ func (r *reloader) reload() error { r.args.Logger.Error(err) } }() + + // Wait for the new runner to start; this avoids the race condition in updating + // the monitoring#Deafult global registry inside the runner due to two reloads, + // one for the inputs and the other for the elasticsearch output + select { + case <-runner.done: + return errors.New("runner exited unexpectedly") + case <-runner.started: + // runner has started + } + // If the old runner exists, cancel it if r.runner != nil { r.runner.cancelRunServerContext() @@ -364,6 +380,7 @@ type serverRunner struct { // immediately when the Stop method is invoked. runServerContext context.Context cancelRunServerContext context.CancelFunc + started chan struct{} done chan struct{} pipeline beat.PipelineConnector @@ -416,6 +433,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne runServerContext: runServerContext, cancelRunServerContext: cancel, done: make(chan struct{}), + started: make(chan struct{}), config: cfg, rawConfig: args.RawConfig, @@ -581,6 +599,10 @@ func (s *serverRunner) run(listener net.Listener) error { NewElasticsearchClient: newElasticsearchClient, }) }) + + // Signal that the runner has started + close(s.started) + result := g.Wait() if err := closeFinalBatchProcessor(s.backgroundContext); err != nil { result = multierror.Append(result, err) @@ -664,19 +686,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client return waitReady(ctx, s.config.WaitReadyInterval, s.tracer, s.logger, check) } -// This mutex must be held when updating the libbeat monitoring registry, -// as there may be multiple servers running concurrently. -var monitoringRegistryMu sync.Mutex - // newFinalBatchProcessor returns the final model.BatchProcessor that publishes events, // and a cleanup function which should be called on server shutdown. If the output is // "elasticsearch", then we use modelindexer; otherwise we use the libbeat publisher. func (s *serverRunner) newFinalBatchProcessor( newElasticsearchClient func(cfg *elasticsearch.Config) (elasticsearch.Client, error), ) (model.BatchProcessor, func(context.Context) error, error) { - monitoringRegistryMu.Lock() - defer monitoringRegistryMu.Unlock() - if s.elasticsearchOutputConfig == nil { // When the publisher stops cleanly it will close its pipeline client, // calling the acker's Close method. We need to call Open for each new