Skip to content

Commit

Permalink
[Heartbeat] Add publish pipeline timeout to run_once (#35721)
Browse files Browse the repository at this point in the history
Decouples run_once from sync pipeline and adds a timeout before exiting for emitting pending events.

* Add publish pipeline timeout to run_once

* Clean up ISyncClient

* Nit wait for run_once

* Rename pipeline

* Add signal tests

* Add pipeline sync tests

* Disable linter false positive

* golint

* Apply suggestions from code review

* Update heartbeat/monitors/pipeline.go

* Apply suggestions from code review

* Add changelog and docs
  • Loading branch information
emilioalvap authored Jul 28, 2023
1 parent 3ea0c53 commit b2d5017
Show file tree
Hide file tree
Showing 19 changed files with 636 additions and 112 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Filter dev flags for ui monitors inside synthetics_args. {pull}35788[35788]
- Fix temp dir running out of space with project monitors. {issue}35843[35843]
- Fixing the grok expression outputs of log files {pull}35221[35221]
- Enable heartbeat-wide publish timeout setting with run_once. {pull}35721[35721]

*Heartbeat*

Expand Down
58 changes: 36 additions & 22 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
)

// Heartbeat represents the root datastructure of this beat.
Expand Down Expand Up @@ -117,17 +116,8 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {

sched := scheduler.Create(limit, hbregistry.SchedulerRegistry, location, jobConfig, parsedConfig.RunOnce)

pipelineClientFactory := func(p beat.Pipeline) (pipeline.ISyncClient, error) {
if parsedConfig.RunOnce {
client, err := pipeline.NewSyncClient(logp.L(), p, beat.ClientConfig{})
if err != nil {
return nil, fmt.Errorf("could not create pipeline sync client for run_once: %w", err)
}
return client, nil
} else {
client, err := p.Connect()
return monitors.SyncPipelineClientAdaptor{C: client}, err
}
pipelineClientFactory := func(p beat.Pipeline) (beat.Client, error) {
return p.Connect()
}

bt := &Heartbeat{
Expand Down Expand Up @@ -160,23 +150,33 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
bt.trace.Start()
defer bt.trace.Close()

// Adapt local pipeline to synchronized mode if run_once is enabled
pipeline := b.Publisher
var pipelineWrapper monitors.PipelineWrapper = &monitors.NoopPipelineWrapper{}
if bt.config.RunOnce {
sync := &monitors.SyncPipelineWrapper{}

pipeline = monitors.WithSyncPipelineWrapper(pipeline, sync)
pipelineWrapper = sync
}

logp.L().Info("heartbeat is running! Hit CTRL-C to stop it.")
groups, _ := syscall.Getgroups()
logp.L().Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)

waitMonitors := monitors.NewSignalWait()

// It is important this appear before we check for run once mode
// In run once mode we depend on these monitors being loaded, but not other more
// dynamic types.
stopStaticMonitors, err := bt.RunStaticMonitors(b)
stopStaticMonitors, err := bt.RunStaticMonitors(b, pipeline)
if err != nil {
return err
}
defer stopStaticMonitors()

if bt.config.RunOnce {
bt.scheduler.WaitForRunOnce()
logp.L().Info("Ending run_once run")
return nil
waitMonitors.Add(monitors.WithLog(bt.scheduler.WaitForRunOnce, "Ending run_once run."))
}

if b.Manager.Enabled() {
Expand Down Expand Up @@ -211,20 +211,34 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {

defer bt.scheduler.Stop()

<-bt.done
// Wait until run_once ends or bt is being shut down
waitMonitors.AddChan(bt.done)
waitMonitors.Wait()

if err != nil {
logp.L().Errorf("could not write trace stop event: %s", err)
logp.L().Info("Shutting down, waiting for output to complete")

// Due to defer's LIFO execution order, waitPublished.Wait() has to be
// located _after_ b.Manager.Stop() or else it will exit early
waitPublished := monitors.NewSignalWait()
defer waitPublished.Wait()

// Three possible events: global beat, run_once pipeline done and publish timeout
waitPublished.AddChan(bt.done)
waitPublished.Add(monitors.WithLog(pipelineWrapper.Wait, "shutdown: finished publishing events."))
if bt.config.PublishTimeout > 0 {
logp.Info("shutdown: output timer started. Waiting for max %v.", bt.config.PublishTimeout)
waitPublished.Add(monitors.WithLog(monitors.WaitDuration(bt.config.PublishTimeout),
"shutdown: timed out waiting for pipeline to publish events."))
}
logp.L().Info("Shutting down.")

return nil
}

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat, pipeline beat.Pipeline) (stop func(), err error) {
runners := make([]cfgfile.Runner, 0, len(bt.config.Monitors))
for _, cfg := range bt.config.Monitors {
created, err := bt.monitorFactory.Create(b.Publisher, cfg)
created, err := bt.monitorFactory.Create(pipeline, cfg)
if err != nil {
if errors.Is(err, monitors.ErrMonitorDisabled) {
logp.L().Info("skipping disabled monitor: %s", err)
Expand Down
1 change: 1 addition & 0 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type LocationWithID struct {
// Config defines the structure of heartbeat.yml.
type Config struct {
RunOnce bool `config:"run_once"`
PublishTimeout time.Duration `config:"publish_timeout"`
Monitors []*conf.C `config:"monitors"`
ConfigMonitors *conf.C `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Expand Down
18 changes: 18 additions & 0 deletions heartbeat/docs/heartbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,21 @@ heartbeat.run_once: true
heartbeat.monitors:
# your monitor config here...
----------------------------------------------------------------------

[float]
[[publish-timeout]]
=== Publish timeout (Experimental)

You can configure {beatname_uc} to exit after an elapsed timeout if unable to publish pending events.
This is an experimental feature and is subject to change.

Note, the `heartbeat.run_once` flag is required for `publish_timeout` to take effect.

[source,yaml]
----------------------------------------------------------------------
# heartbeat.yml
heartbeat.publish_timeout: 30s
heartbeat.run_once: true
heartbeat.monitors:
# your monitor config here...
----------------------------------------------------------------------
3 changes: 1 addition & 2 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/processors/util"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)

Expand All @@ -56,7 +55,7 @@ type RunnerFactory struct {
beatLocation *config.LocationWithID
}

type PipelineClientFactory func(pipeline beat.Pipeline) (pipeline.ISyncClient, error)
type PipelineClientFactory func(pipeline beat.Pipeline) (beat.Client, error)

type publishSettings struct {
// Fields and tags to add to monitor.
Expand Down
5 changes: 4 additions & 1 deletion heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ func TestDuplicateMonitorIDs(t *testing.T) {
}
}

c, err := mockPipeline.Connect()
require.NoError(t, err)

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, mockPipeline.ConnectSync(), sched.Add, nil, nil)
_, m0Err := newMonitor(badConf, reg, c, sched.Add, nil, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
Expand Down
12 changes: 2 additions & 10 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
beatversion "github.com/elastic/beats/v7/libbeat/version"
)

Expand Down Expand Up @@ -80,9 +79,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch
AddTask: sched.Add,
StateLoader: monitorstate.NilStateLoader,
PluginsReg: pluginsReg,
PipelineClientFactory: func(pipeline beat.Pipeline) (pipeline.ISyncClient, error) {
c, _ := pipeline.Connect()
return SyncPipelineClientAdaptor{C: c}, nil
PipelineClientFactory: func(pipeline beat.Pipeline) (beat.Client, error) {
return pipeline.Connect()
},
}),
sched,
Expand Down Expand Up @@ -164,12 +162,6 @@ func (pc *MockPipeline) ConnectWith(cc beat.ClientConfig) (beat.Client, error) {
return c, nil
}

// Convenience function for tests
func (pc *MockPipeline) ConnectSync() pipeline.ISyncClient {
c, _ := pc.Connect()
return SyncPipelineClientAdaptor{C: c}
}

func (pc *MockPipeline) PublishedEvents() []*beat.Event {
pc.mtx.Lock()
defer pc.mtx.Unlock()
Expand Down
11 changes: 5 additions & 6 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"

"github.com/mitchellh/hashstructure"

Expand Down Expand Up @@ -63,9 +62,9 @@ type Monitor struct {
internalsMtx sync.Mutex
close func() error

// pubClient accepts an ISyncClient as the lowest common denominator of client
// since async clients are a subset of sync clients
pubClient pipeline.ISyncClient
// pubClient accepts a generic beat.Client. Pipeline synchronicity is implemented
// at client wrapper-level
pubClient beat.Client

// stats is the countersRecorder used to record lifecycle events
// for global metrics + telemetry
Expand All @@ -89,7 +88,7 @@ func checkMonitorConfig(config *conf.C, registrar *plugin.PluginsReg) error {
func newMonitor(
config *conf.C,
registrar *plugin.PluginsReg,
pubClient pipeline.ISyncClient,
pubClient beat.Client,
taskAdder scheduler.AddTask,
stateLoader monitorstate.StateLoader,
onStop func(*Monitor),
Expand All @@ -106,7 +105,7 @@ func newMonitor(
func newMonitorUnsafe(
config *conf.C,
registrar *plugin.PluginsReg,
pubClient pipeline.ISyncClient,
pubClient beat.Client,
addTask scheduler.AddTask,
stateLoader monitorstate.StateLoader,
onStop func(*Monitor),
Expand Down
8 changes: 6 additions & 2 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func testMonitorConfig(t *testing.T, conf *conf.C, eventValidator validator.Vali
sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false)
defer sched.Stop()

mon, err := newMonitor(conf, reg, pipel.ConnectSync(), sched.Add, nil, nil)
c, err := pipel.Connect()
require.NoError(t, err)
mon, err := newMonitor(conf, reg, c, sched.Add, nil, nil)
require.NoError(t, err)

mon.Start()
Expand Down Expand Up @@ -116,7 +118,9 @@ func TestCheckInvalidConfig(t *testing.T) {
sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false)
defer sched.Stop()

m, err := newMonitor(serverMonConf, reg, pipel.ConnectSync(), sched.Add, nil, nil)
c, err := pipel.Connect()
require.NoError(t, err)
m, err := newMonitor(serverMonConf, reg, c, sched.Add, nil, nil)
require.Error(t, err)
// This could change if we decide the contract for newMonitor should always return a monitor
require.Nil(t, m, "For this test to work we need a nil value for the monitor.")
Expand Down
91 changes: 91 additions & 0 deletions heartbeat/monitors/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package monitors

import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/elastic-agent-libs/logp"
)

// Defines a synchronous pipeline wrapper interface
type PipelineWrapper interface {
Wait()
}

type NoopPipelineWrapper struct {
}

// Noop
func (n *NoopPipelineWrapper) Wait() {
}

// Pipeline wrapper that implements synchronous op. Calling Wait() on this client will block until all
// events passed through this pipeline (and any of the linked clients) are ACKed, safe to use concurrently.
type SyncPipelineWrapper struct {
wg sync.WaitGroup
}

// Used to wrap every client and track emitted vs acked events.
type wrappedClient struct {
wg *sync.WaitGroup
client beat.Client
}

// returns a new pipeline with the provided SyncPipelineClientWrapper.
func WithSyncPipelineWrapper(pipeline beat.Pipeline, pw *SyncPipelineWrapper) beat.Pipeline {
pipeline = pipetool.WithACKer(pipeline, acker.TrackingCounter(func(_, total int) {
logp.L().Debugf("ack callback receives with events count of %d", total)
pw.onACK(total)
}))

pipeline = pipetool.WithClientWrapper(pipeline, func(client beat.Client) beat.Client {
return &wrappedClient{
wg: &pw.wg,
client: client,
}
})

return pipeline
}

func (c *wrappedClient) Publish(event beat.Event) {
c.wg.Add(1)
c.client.Publish(event)
}

func (c *wrappedClient) PublishAll(events []beat.Event) {
c.wg.Add(len(events))
c.client.PublishAll(events)
}

func (c *wrappedClient) Close() error {
return c.client.Close()
}

// waits until ACK is received for every event that was sent
func (s *SyncPipelineWrapper) Wait() {
s.wg.Wait()
}

func (s *SyncPipelineWrapper) onACK(n int) {
s.wg.Add(-1 * n)
}
Loading

0 comments on commit b2d5017

Please sign in to comment.