diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b56dbdc765d..3e825c8c098 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -53,7 +53,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - Add kafka compression support for ZSTD. - Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731] +- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954] +- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954] - The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762] + *Heartbeat* diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index ecb7a8d1bbf..f0f5ba20b63 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -79,14 +79,14 @@ func (c *crawler) Start( } if configInputs.Enabled() { - c.inputReloader = cfgfile.NewReloader(pipeline, configInputs) + c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs) if err := c.inputReloader.Check(c.inputsFactory); err != nil { return fmt.Errorf("creating input reloader failed: %w", err) } } if configModules.Enabled() { - c.modulesReloader = cfgfile.NewReloader(pipeline, configModules) + c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules) if err := c.modulesReloader.Check(c.modulesFactory); err != nil { return fmt.Errorf("creating module reloader failed: %w", err) } diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 4909941b90a..14e3ad79f55 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -224,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { newPath := strings.TrimSuffix(origPath, ".yml") _ = fb.config.ConfigModules.SetString("path", -1, newPath) } - modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules) + modulesLoader := cfgfile.NewReloader(logp.L().Named("module.reloader"), fb.pipeline, fb.config.ConfigModules) modulesLoader.Load(modulesFactory) } diff --git a/filebeat/docs/inputs/input-filestream-file-options.asciidoc b/filebeat/docs/inputs/input-filestream-file-options.asciidoc index b87d9e67af6..f4fb4da3096 100644 --- a/filebeat/docs/inputs/input-filestream-file-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-file-options.asciidoc @@ -19,12 +19,27 @@ supported. ===== `id` A unique identifier for this filestream input. Each filestream input -must have a unique ID. +must have a unique ID. Filestream will not start inputs with duplicated IDs. WARNING: Changing input ID may cause data duplication because the state of the files will be lost and they will be read from the beginning again. +[float] +[[filestream-input-allow_deprecated_id_duplication]] +===== `allow_deprecated_id_duplication` + +This allows {beatname_uc} to run multiple instances of the {type} +input with the same ID. This is intended to add backwards +compatibility with the behaviour prior to 9.0. It defaults to `false` +and is **not recommended** in new configurations. + +This setting is per input, so make sure to enable it in all {type} +inputs that use duplicated IDs. + +WARNING: Duplicated IDs will lead to data duplication and some input +instances will not produce any metrics. + [float] [[filestream-input-paths]] ===== `paths` diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 2860dd673c2..ab994fb0b52 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -49,6 +49,10 @@ type config struct { IgnoreInactive ignoreInactiveType `config:"ignore_inactive"` Rotation *conf.Namespace `config:"rotation"` TakeOver bool `config:"take_over"` + + // AllowIDDuplication is used by InputManager.Create + // (see internal/input-logfile/manager.go). + AllowIDDuplication bool `config:"allow_deprecated_id_duplication"` } type closerConfig struct { @@ -142,6 +146,13 @@ func (c *config) Validate() error { return fmt.Errorf("no path is configured") } + if c.AllowIDDuplication { + logp.L().Named("input.filestream").Warn( + "setting `allow_deprecated_id_duplication` will lead to data " + + "duplication and incomplete input metrics, it's use is " + + "highly discouraged.") + } + return nil } diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 5c063481dd5..e8e99213da5 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -101,6 +101,23 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { "prospector.scanner.check_interval": "1ms", "prospector.scanner.fingerprint.enabled": false, "file_identity.native": map[string]any{}, + // For some reason this test became flaky, the root of the flakiness + // is not on the test, it is on how a rename operation is detected. + // Even though this test uses `os.Rename`, it does not seem to be an atomic + // operation. https://www.man7.org/linux/man-pages/man2/rename.2.html + // does not make it clear whether 'renameat' (used by `os.Rename`) is + // atomic. + // + // On a flaky execution, the file is actually perceived as removed + // and then a new file is created, both with the same inode. This + // happens on a system that does not reuse inodes as soon they're + // freed. Because the file is detected as removed, it's state is also + // removed. Then when more data is added, only the offset of the new + // data is tracked by the registry, causing the test to fail. + // + // A workaround for this is to not remove the state when the file is + // removed, hence `clean_removed: false` is set here. + "clean_removed": false, }) testline := []byte("log line\n") diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index c65ccb5e308..6c7d37a2c66 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/go-concert/unison" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -155,26 +156,54 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { } settings := struct { - ID string `config:"id"` - CleanInactive time.Duration `config:"clean_inactive"` - HarvesterLimit uint64 `config:"harvester_limit"` + ID string `config:"id"` + CleanInactive time.Duration `config:"clean_inactive"` + HarvesterLimit uint64 `config:"harvester_limit"` + AllowIDDuplication bool `config:"allow_deprecated_id_duplication"` }{CleanInactive: cim.DefaultCleanTimeout} if err := config.Unpack(&settings); err != nil { return nil, err } if settings.ID == "" { - cim.Logger.Error("filestream input ID without ID might lead to data" + - " duplication, please add an ID and restart Filebeat") + cim.Logger.Warn("filestream input without ID is discouraged, please add an ID and restart Filebeat") } metricsID := settings.ID cim.idsMux.Lock() if _, exists := cim.ids[settings.ID]; exists { - cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+ - "will lead to data duplication, please use a different ID. Metrics "+ - "collection has been disabled on this input.", settings.ID) - metricsID = "" + duplicatedInput := map[string]any{} + unpackErr := config.Unpack(&duplicatedInput) + if unpackErr != nil { + duplicatedInput["error"] = fmt.Errorf("failed to unpack duplicated input config: %w", unpackErr).Error() + } + + // Keep old behaviour so users can upgrade to 9.0 without + // having their inputs not starting. + if settings.AllowIDDuplication { + cim.Logger.Errorf("filestream input with ID '%s' already exists, "+ + "this will lead to data duplication, please use a different "+ + "ID. Metrics collection has been disabled on this input. The "+ + " input will start only because "+ + "'allow_deprecated_id_duplication' is set to true", + settings.ID) + metricsID = "" + } else { + cim.Logger.Errorw( + fmt.Sprintf( + "filestream input ID '%s' is duplicated: input will NOT start", + settings.ID, + ), + "input.cfg", conf.DebugString(config, true)) + + cim.idsMux.Unlock() + return nil, &common.ErrNonReloadable{ + Err: fmt.Errorf( + "filestream input with ID '%s' already exists, this "+ + "will lead to data duplication, please use a different ID", + settings.ID, + )} + } } // TODO: improve how inputs with empty IDs are tracked. diff --git a/filebeat/input/filestream/internal/input-logfile/manager_test.go b/filebeat/input/filestream/internal/input-logfile/manager_test.go index f13f51772f8..19c2eead9a9 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager_test.go +++ b/filebeat/input/filestream/internal/input-logfile/manager_test.go @@ -19,6 +19,8 @@ package input_logfile import ( "bytes" + "fmt" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -26,6 +28,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" "github.com/elastic/elastic-agent-libs/config" @@ -42,6 +45,18 @@ func (s *testSource) Name() string { return s.name } +type noopProspector struct{} + +func (m noopProspector) Init(_, _ ProspectorCleaner, _ func(Source) string) error { + return nil +} + +func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {} + +func (m noopProspector) Test() error { + return nil +} + func TestSourceIdentifier_ID(t *testing.T) { testCases := map[string]struct { userID string @@ -198,6 +213,115 @@ func TestInputManager_Create(t *testing.T) { assert.NotContains(t, buff.String(), "already exists") }) + + t.Run("does not start an input with duplicated ID", func(t *testing.T) { + tcs := []struct { + name string + id string + }{ + {name: "ID is empty", id: ""}, + {name: "non-empty ID", id: "non-empty-ID"}, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + testStore, err := storeReg.Get("test") + require.NoError(t, err) + + log, buff := newBufferLogger() + + cim := &InputManager{ + Logger: log, + StateStore: testStateStore{Store: testStore}, + Configure: func(_ *config.C) (Prospector, Harvester, error) { + var wg sync.WaitGroup + + return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil + }} + cfg1 := config.MustNewConfigFrom(fmt.Sprintf(` +type: filestream +id: %s +paths: + - /var/log/foo +`, tc.id)) + + // Create a different 2nd config with duplicated ID to ensure + // the ID itself is the only requirement to prevent the 2nd input + // from being created. + cfg2 := config.MustNewConfigFrom(fmt.Sprintf(` +type: filestream +id: %s +paths: + - /var/log/bar +`, tc.id)) + + _, err = cim.Create(cfg1) + require.NoError(t, err, "1st input should have been created") + + // Attempt to create an input with a duplicated ID + _, err = cim.Create(cfg2) + require.Error(t, err, "filestream should not have created an input with a duplicated ID") + + logs := buff.String() + // Assert the logs contain the correct log message + assert.Contains(t, logs, + fmt.Sprintf("filestream input ID '%s' is duplicated:", tc.id)) + + // Assert the error contains the correct text + assert.Contains(t, err.Error(), + fmt.Sprintf("filestream input with ID '%s' already exists", tc.id)) + }) + } + }) + + t.Run("allow duplicated IDs setting", func(t *testing.T) { + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + testStore, err := storeReg.Get("test") + require.NoError(t, err) + + log, buff := newBufferLogger() + + cim := &InputManager{ + Logger: log, + StateStore: testStateStore{Store: testStore}, + Configure: func(_ *config.C) (Prospector, Harvester, error) { + var wg sync.WaitGroup + + return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil + }} + cfg1 := config.MustNewConfigFrom(` +type: filestream +id: duplicated-id +allow_deprecated_id_duplication: true +paths: + - /var/log/foo +`) + + // Create a different 2nd config with duplicated ID to ensure + // the ID itself is the only requirement to prevent the 2nd input + // from being created. + cfg2 := config.MustNewConfigFrom(` +type: filestream +id: duplicated-id +allow_deprecated_id_duplication: true +paths: + - /var/log/bar +`) + _, err = cim.Create(cfg1) + require.NoError(t, err, "1st input should have been created") + // Create an input with a duplicated ID + _, err = cim.Create(cfg2) + require.NoError(t, err, "filestream should not have created an input with a duplicated ID") + + logs := buff.String() + // Assert the logs contain the correct log message + assert.Contains(t, logs, + "filestream input with ID 'duplicated-id' already exists, this "+ + "will lead to data duplication, please use a different ID. Metrics "+ + "collection has been disabled on this input.", + "did not find the expected message about the duplicated input ID") + }) } func newBufferLogger() (*logp.Logger, *bytes.Buffer) { diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 24125469dd8..bc0b96b4377 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -274,7 +274,7 @@ logging: filebeat.WaitForLogs( "Input 'filestream' starting", 10*time.Second, - "Filebeat did log a validation error") + "Filebeat did not log a validation error") } func TestFilestreamCanMigrateIdentity(t *testing.T) { diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 227b375ee90..bb6f73b2fde 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -184,7 +184,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { } if bt.config.ConfigMonitors.Enabled() { - bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors) + bt.monitorReloader = cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigMonitors) defer bt.monitorReloader.Stop() err := bt.RunReloadableMonitors() diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 36408051952..27739c2e561 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/autodiscover/meta" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-autodiscover/bus" conf "github.com/elastic/elastic-agent-libs/config" @@ -169,7 +170,10 @@ func (a *Autodiscover) worker() { updated = false // On error, make sure the next run also updates because some runners were not properly loaded - retry = err != nil + retry = common.IsInputReloadable(err) + if err != nil && !retry { + a.logger.Errorw("all new inputs failed to start with a non-retriable error", err) + } if retry { // The recoverable errors that can lead to retry are related // to the harvester state, so we need to give the publishing diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 5343c093941..40c15d7de66 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -18,8 +18,11 @@ package autodiscover import ( + "bytes" "encoding/json" + "errors" "fmt" + "path/filepath" "reflect" "strings" "sync" @@ -29,9 +32,12 @@ import ( "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/tests/resources" "github.com/elastic/elastic-agent-autodiscover/bus" conf "github.com/elastic/elastic-agent-libs/config" @@ -116,10 +122,22 @@ func (m *mockAdapter) CheckConfig(c *conf.C) error { return nil } +// Create returns a mockRunner with the provided config. If +// the config contains `err_non_reloadable: true`, then a +// common.ErrNonReloadable is returned alongside a nil runner. func (m *mockAdapter) Create(_ beat.PipelineConnector, config *conf.C) (cfgfile.Runner, error) { + // On error false is returned, that's enough to keep a correct behaviour + nonReloadable, _ := config.Bool("err_non_reloadable", -1) + if nonReloadable { + return nil, common.ErrNonReloadable{ + Err: errors.New("a non reloadable error"), + } + } + runner := &mockRunner{ config: config, } + m.mutex.Lock() defer m.mutex.Unlock() m.runners = append(m.runners, runner) @@ -800,3 +818,107 @@ func check(t *testing.T, runners []*mockRunner, expected *conf.C, started, stopp } t.Fatalf("expected cfg %v to be started=%v stopped=%v but have %v", out, started, stopped, runners) } + +func TestErrNonReloadableIsNotRetried(t *testing.T) { + // Register mock autodiscover provider + busChan := make(chan bus.Bus, 1) + Registry = NewRegistry() + err := Registry.AddProvider( + "mock", + func(beatName string, + b bus.Bus, + uuid uuid.UUID, + c *conf.C, + k keystore.Keystore) (Provider, error) { + + // intercept bus to mock events + busChan <- b + + return &mockProvider{}, nil + }) + if err != nil { + t.Fatalf("cannot add provider to registry: %s", err) + } + + // Create a mock adapter, 'err_non_reloadable' will make its Create method + // to return a common.ErrNonReloadable. + adapter := mockAdapter{ + configs: []*conf.C{ + conf.MustNewConfigFrom(map[string]any{ + "err_non_reloadable": true, + }), + }, + } + + // and settings: + providerConfig, _ := conf.NewConfigFrom(map[string]string{ + "type": "mock", + }) + config := Config{ + Providers: []*conf.C{providerConfig}, + } + k, _ := keystore.NewFileKeystore(filepath.Join(t.TempDir(), "keystore")) + // Create autodiscover manager + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + if err != nil { + t.Fatal(err) + } + + logger, logsBuffer := newBufferLogger() + autodiscover.logger = logger + // set the debounce period to something small in order to + // speed up the tests. This seems to be the sweet stop + // for the fastest test run + autodiscover.debouncePeriod = time.Millisecond + + // Start it + autodiscover.Start() + defer autodiscover.Stop() + eventBus := <-busChan + + // Send an event to the bus, the event itself is not important + // because the mockAdapter will return the same configs regardless + // of the event + eventBus.Publish(bus.Event{ + // That's used in the last assertion, the config key is + // : + "id": "foo", + "provider": "mock", + "start": true, + "meta": mapstr.M{ + "test_name": t.Name(), + }, + }) + + // Ensure we logged the error about not retrying reloading input + require.Eventually( + t, + func() bool { + return strings.Contains( + logsBuffer.String(), + `all new inputs failed to start with a non-retriable error","error":"Error creating runner from config: ErrNonReloadable: a non reloadable error`, + ) + }, + time.Second*10, + time.Millisecond*10, + "foo error") + + // Ensure nothing is running + requireRunningRunners(t, autodiscover, 0) + runners := adapter.Runners() + require.Equal(t, len(runners), 0) + + // Ensure the autodiscover got the config + require.Equal(t, len(autodiscover.configs["mock:foo"]), 1) +} + +func newBufferLogger() (*logp.Logger, *bytes.Buffer) { + buf := &bytes.Buffer{} + encoderConfig := zap.NewProductionEncoderConfig() + encoder := zapcore.NewJSONEncoder(encoderConfig) + writeSyncer := zapcore.AddSync(buf) + log := logp.NewLogger("", zap.WrapCore(func(_ zapcore.Core) zapcore.Core { + return zapcore.NewCore(encoder, writeSyncer, zapcore.DebugLevel) + })) + return log, buf +} diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 3a9e3429d29..3e076ee4c14 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/joeshaw/multierror" "github.com/mitchellh/hashstructure" "github.com/elastic/beats/v7/libbeat/beat" @@ -71,8 +70,7 @@ func (r *RunnerList) Runners() []Runner { // // Runners might fail to start, it's the callers responsibility to // handle any error. During execution, any encountered errors are -// accumulated in a `multierror.Errors` and returned as -// a `multierror.MultiError` upon completion. +// accumulated in a []errors and returned as errors.Join(errs) upon completion. // // While the stopping of runners occurs on separate goroutines, // Reload will wait for all runners to finish before starting any new runners. @@ -85,7 +83,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { r.mutex.Lock() defer r.mutex.Unlock() - var errs multierror.Errors + var errs []error startList := map[uint64]*reload.ConfigWithMeta{} stopList := r.copyRunnerList() @@ -179,7 +177,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { // above it is done asynchronously. moduleRunning.Set(int64(len(r.runners))) - return errs.Err() + return errors.Join(errs...) } // Stop all runners diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 930bd56eafd..2d802f1b30d 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -26,6 +26,7 @@ import ( "github.com/joeshaw/multierror" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -42,8 +43,6 @@ var ( }, } - debugf = logp.MakeDebug("cfgfile") - // configScans measures how many times the config dir was scanned for // changes, configReloads measures how many times there were changes that // triggered an actual reload. @@ -101,10 +100,11 @@ type Reloader struct { path string done chan struct{} wg sync.WaitGroup + logger *logp.Logger } // NewReloader creates new Reloader instance for the given config -func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader { +func NewReloader(logger *logp.Logger, pipeline beat.PipelineConnector, cfg *config.C) *Reloader { conf := DefaultDynamicConfig _ = cfg.Unpack(&conf) @@ -118,6 +118,7 @@ func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader { config: conf, path: path, done: make(chan struct{}), + logger: logger, } } @@ -128,7 +129,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { return nil } - debugf("Checking module configs from: %s", rl.path) + rl.logger.Debugf("Checking module configs from: %s", rl.path) gw := NewGlobWatcher(rl.path) files, _, err := gw.Scan() @@ -142,7 +143,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { return fmt.Errorf("loading configs: %w", err) } - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) // Initialize modules for _, c := range configs { @@ -190,7 +191,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { return case <-time.After(rl.config.Reload.Period): - debugf("Scan for new config files") + rl.logger.Debug("Scan for new config files") configScans.Add(1) files, updated, err := gw.Scan() @@ -209,13 +210,19 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { // Load all config objects configs, _ := rl.loadConfigs(files) - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) err = list.Reload(configs) - // Force reload on the next iteration if and only if this one failed. - // (Any errors are already logged by list.Reload, so we don't need to - // propagate the details further.) - forceReload = err != nil + // Force reload on the next iteration if and only if the error + // can be retried. + // Errors are already logged by list.Reload, so we don't need to + // propagate details any further. + forceReload = common.IsInputReloadable(err) + if forceReload { + rl.logger.Debugf("error '%v' can be retried. Will try again in %s", err, rl.config.Reload.Period.String()) + } else { + rl.logger.Debugf("error '%v' cannot retried. Modify any input file to reload.", err) + } } // Path loading is enabled but not reloading. Loads files only once and then stops. @@ -240,7 +247,7 @@ func (rl *Reloader) Load(runnerFactory RunnerFactory) { gw := NewGlobWatcher(rl.path) - debugf("Scan for config files") + rl.logger.Debug("Scan for config files") files, _, err := gw.Scan() if err != nil { logp.Err("Error fetching new config files: %v", err) @@ -249,7 +256,7 @@ func (rl *Reloader) Load(runnerFactory RunnerFactory) { // Load all config objects configs, _ := rl.loadConfigs(files) - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) if err := list.Reload(configs); err != nil { logp.Err("Error loading configuration files: %+v", err) diff --git a/libbeat/cfgfile/reload_test.go b/libbeat/cfgfile/reload_integration_test.go similarity index 82% rename from libbeat/cfgfile/reload_test.go rename to libbeat/cfgfile/reload_integration_test.go index f28fbd7033a..e5102764b68 100644 --- a/libbeat/cfgfile/reload_test.go +++ b/libbeat/cfgfile/reload_integration_test.go @@ -21,25 +21,34 @@ package cfgfile import ( "fmt" - "io/ioutil" "os" + "path/filepath" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) func TestReloader(t *testing.T) { // Create random temp directory - dir, err := ioutil.TempDir("", "libbeat-reloader") - defer os.RemoveAll(dir) + dir, err := os.MkdirTemp("", "libbeat-reloader") + defer func() { + if t.Failed() { + t.Logf("test failed, temp dir '%s' was kept", dir) + return + } + os.RemoveAll(dir) + }() + if err != nil { - t.Fatal(err) + t.Fatalf("could not create temp dir: %s", err) } - glob := dir + "/*.yml" + glob := filepath.Join(dir, "*.yml") config := conf.MustNewConfigFrom(mapstr.M{ "path": glob, @@ -49,7 +58,7 @@ func TestReloader(t *testing.T) { }, }) // config.C{} - reloader := NewReloader(nil, config) + reloader := NewReloader(logp.L().Named("cfgfile-test.reload"), nil, config) retryCount := 10 go reloader.Run(nil) @@ -70,11 +79,17 @@ func TestReloader(t *testing.T) { // The first scan should cause a reload, but additional ones should not, // so configReloads should still be 1. - assert.Equal(t, int64(1), configReloads.Get()) + require.Equalf( + t, + int64(1), + configReloads.Get(), + "config reload should be called once, but it was called %d times", + configReloads.Get(), + ) // Write a file to the reloader path to trigger a real reload content := []byte("test\n") - err = ioutil.WriteFile(dir+"/config1.yml", content, 0644) + err = os.WriteFile(filepath.Join(dir, "config1.yml"), content, 0644) assert.NoError(t, err) // Wait for the number of scans to increase at least twice. This is somewhat diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go index 9f5248e815e..9eddc425ae5 100644 --- a/libbeat/common/errors.go +++ b/libbeat/common/errors.go @@ -18,6 +18,7 @@ package common import ( + "errors" "fmt" ) @@ -31,3 +32,51 @@ type ErrInputNotFinished struct { func (e *ErrInputNotFinished) Error() string { return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.State) } + +type ErrNonReloadable struct { + Err error +} + +func (e ErrNonReloadable) Error() string { + return fmt.Sprintf("ErrNonReloadable: %v", e.Err) +} + +func (e ErrNonReloadable) Unwrap() error { return e.Err } + +func (e ErrNonReloadable) Is(err error) bool { + switch err.(type) { + case ErrNonReloadable: + return true + default: + return errors.Is(e.Err, err) + } +} + +// IsInputReloadable returns true if err, or any error wrapped +// by err can be retried. +// +// Effectively, it will only return false if ALL +// errors are ErrNonReloadable. +func IsInputReloadable(err error) bool { + if err == nil { + return false + } + + type unwrapList interface { + Unwrap() []error + } + + //nolint:errorlint // we only want to check that specific error, not all errors in the chain + errList, isErrList := err.(unwrapList) + if !isErrList { + return !errors.Is(err, ErrNonReloadable{}) + } + + for _, e := range errList.Unwrap() { + if !errors.Is(e, ErrNonReloadable{}) { + return true + } + } + + return false +} diff --git a/libbeat/common/errors_test.go b/libbeat/common/errors_test.go new file mode 100644 index 00000000000..6c83c675bf1 --- /dev/null +++ b/libbeat/common/errors_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "errors" + "fmt" + "testing" +) + +func TestIsInputReloadable(t *testing.T) { + testCases := map[string]struct { + err error + expected bool + }{ + "nil error is not retriable": { + err: nil, + expected: false, + }, + "simple error": { + err: errors.New("a generic error"), + expected: true, + }, + "common.ErrNonReloadable": { + err: ErrNonReloadable{}, + expected: false, + }, + "wrapped common.ErrNonReloadable": { + err: fmt.Errorf("wrapping %w", ErrNonReloadable{}), + expected: false, + }, + "errors.Join, all errors are ErrNonReloadable": { + err: errors.Join(ErrNonReloadable{}, ErrNonReloadable{}), + expected: false, + }, + "errors.Join, only one is ErrNonReloadable": { + err: errors.Join(errors.New("generic reloadable error"), ErrNonReloadable{}), + expected: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + reloadable := IsInputReloadable(tc.err) + if reloadable != tc.expected { + t.Errorf( + "expecting isReloadable to return %t, but got %t for: '%v'", + tc.expected, + reloadable, + tc.err, + ) + } + }) + } +} diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index bcb9a893a87..0695e53ea90 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -264,7 +264,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Dynamic file based modules (metricbeat.config.modules) if bt.config.ConfigModules.Enabled() { - moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules) + moduleReloader := cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigModules) if err := moduleReloader.Check(factory); err != nil { return err diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 288f28ae0de..c5f074e13d7 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -14,7 +14,6 @@ import ( "syscall" "time" - "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -681,10 +680,19 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*agentUnit) { // // in v2 only a single input type will be started per component, so we don't need to // worry about getting multiple re-loaders (we just need the one for the type) - if err := cm.reloadInputs(inputUnits); err != nil { - merror := &multierror.MultiError{} - if errors.As(err, &merror) { - for _, err := range merror.Errors { + if err := cm.reloadInputs(inputUnits); err != nil { // HERE + // cm.reloadInputs will use fmt.Errorf and join an error slice + // using errors.Join, so we need to unwrap the fmt wrapped error, + // then we can iterate over the errors list. + err = errors.Unwrap(err) + type unwrapList interface { + Unwrap() []error + } + + //nolint:errorlint // That's a custom logic based on how reloadInputs builds the error + errList, isErrList := err.(unwrapList) + if isErrList { + for _, err := range errList.Unwrap() { unitErr := cfgfile.UnitError{} if errors.As(err, &unitErr) { unitErrors[unitErr.UnitID] = append(unitErrors[unitErr.UnitID], unitErr.Err) @@ -824,8 +832,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { } if err := obj.Reload(inputBeatCfgs); err != nil { - merror := &multierror.MultiError{} - realErrors := multierror.Errors{} + realErrors := []error{} // At the moment this logic is tightly bound to the current RunnerList // implementation from libbeat/cfgfile/list.go and Input.loadStates from @@ -833,8 +840,12 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { // If they change the way they report errors, this will break. // TODO (Tiago): update all layers to use the most recent features from // the standard library errors package. - if errors.As(err, &merror) { - for _, err := range merror.Errors { + type unwrapList interface { + Unwrap() []error + } + errList, isErrList := err.(unwrapList) //nolint:errorlint // see the comment above + if isErrList { + for _, err := range errList.Unwrap() { causeErr := errors.Unwrap(err) // A Log input is only marked as finished when all events it // produced are acked by the acker so when we see this error, @@ -855,7 +866,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { } if len(realErrors) != 0 { - return fmt.Errorf("failed to reload inputs: %w", realErrors.Err()) + return fmt.Errorf("failed to reload inputs: %w", errors.Join(realErrors...)) } } else { // If there was no error reloading input and forceReload was diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index e7515266b0b..73804565dbf 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -12,7 +12,6 @@ import ( "testing" "time" - "github.com/joeshaw/multierror" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -547,14 +546,14 @@ func TestErrorPerUnit(t *testing.T) { r.MustRegisterOutput(output) inputs := &mockReloadable{ ReloadFn: func(configs []*reload.ConfigWithMeta) error { - errs := multierror.Errors{} + errs := []error{} for _, input := range configs { errs = append(errs, cfgfile.UnitError{ UnitID: input.InputUnitID, Err: errors.New(errorMessages[input.InputUnitID]), }) } - return errs.Err() + return errors.Join(errs...) }, } r.MustRegisterInput(inputs)