Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filestream returns error when an input with duplicated ID is created #41954

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
253d137
filestream: do not run input with duplicated ID
AndersonQ Dec 6, 2024
050af29
wip
AndersonQ Dec 6, 2024
09b5bab
wip
AndersonQ Dec 9, 2024
f56961f
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Dec 16, 2024
f45f406
Restore configuration files/remove test files
belimawr Dec 16, 2024
4ede9b5
Move reload retry logic to reload.go
belimawr Dec 17, 2024
5a7e4ac
Fix isReloadable and improve tests
belimawr Dec 17, 2024
510f68a
Add changelog
belimawr Dec 17, 2024
35e10c7
remove comments
belimawr Dec 17, 2024
c1c9a6c
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Dec 17, 2024
8ea1a31
Fix build
belimawr Dec 17, 2024
341dbb6
Update reloadInputs to use the new error types from cfgfile/list.go
belimawr Dec 17, 2024
b7bd7e6
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Dec 18, 2024
79df181
Fix more tests
belimawr Dec 18, 2024
f576354
Add `allow_deprecated_id_duplication` flag
belimawr Dec 18, 2024
d248381
Add non reloadable logic to autodiscover
belimawr Dec 18, 2024
3d0de94
Add test to autodiscover not reloading common.ErrNonReloadable
belimawr Dec 18, 2024
eaba986
Add test for common.IsInputReloadable
belimawr Dec 18, 2024
6c456d1
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Dec 18, 2024
4d5734d
Update changelog
belimawr Dec 18, 2024
5ddb9eb
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Dec 19, 2024
4c1db16
address lint warnings
belimawr Dec 19, 2024
bea2d8e
Merge branch 'main' into 41938-filestream-do-not-run-duplicated-id
jlind23 Dec 30, 2024
a2f0f2d
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Jan 2, 2025
8bae05e
Update notice to 2025
belimawr Jan 2, 2025
4dff23d
Merge branch '41938-filestream-do-not-run-duplicated-id' of github.co…
belimawr Jan 2, 2025
e1b6c4d
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Jan 2, 2025
e5d6bdf
Implement review suggestions
belimawr Jan 2, 2025
6e2c73a
Fix OTel API
belimawr Jan 2, 2025
17d0c97
Merge branch 'main' of github.com:elastic/beats into 41938-filestream…
belimawr Jan 3, 2025
1183431
Fix flakiness on TestFilestreamMetadataUpdatedOnRename
belimawr Jan 3, 2025
2579f5b
Merge branch 'main' into 41938-filestream-do-not-run-duplicated-id
belimawr Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]
- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]

*Heartbeat*

Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (c *crawler) Start(
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %w", err)
}
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
modulesLoader := cfgfile.NewReloader(logp.L().Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}

Expand Down
28 changes: 22 additions & 6 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/go-concert/unison"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -164,17 +165,32 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
}

if settings.ID == "" {
cim.Logger.Error("filestream input ID without ID might lead to data" +
" duplication, please add an ID and restart Filebeat")
cim.Logger.Warn("filestream input without ID is discouraged, please add an ID and restart Filebeat")
}

metricsID := settings.ID
cim.idsMux.Lock()
if _, exists := cim.ids[settings.ID]; exists {
cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID. Metrics "+
"collection has been disabled on this input.", settings.ID)
metricsID = ""
duplicatedInput := map[string]any{}
unpackErr := config.Unpack(&duplicatedInput)
if unpackErr != nil {
duplicatedInput["error"] = fmt.Errorf("failed to umpack dupliucated input config: %w", unpackErr).Error()
belimawr marked this conversation as resolved.
Show resolved Hide resolved
}

cim.Logger.Errorw(
fmt.Sprintf(
"filestream input '%s' is duplicated: input will NOT start",
settings.ID,
),
"input.cfg", conf.DebugString(config, true))

cim.idsMux.Unlock()
return nil, &common.ErrNonReloadable{
Err: fmt.Errorf(
"filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID",
settings.ID,
)}
}

// TODO: improve how inputs with empty IDs are tracked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package input_logfile

import (
"bytes"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -42,6 +45,18 @@ func (s *testSource) Name() string {
return s.name
}

type noopProspector struct{}

func (m noopProspector) Init(_, _ ProspectorCleaner, _ func(Source) string) error {
return nil
}

func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {}

func (m noopProspector) Test() error {
return nil
}

func TestSourceIdentifier_ID(t *testing.T) {
testCases := map[string]struct {
userID string
Expand Down Expand Up @@ -198,6 +213,67 @@ func TestInputManager_Create(t *testing.T) {
assert.NotContains(t, buff.String(),
"already exists")
})

t.Run("does not start an input with duplicated ID", func(t *testing.T) {
tcs := []struct {
name string
id string
}{
{name: "ID is empty", id: ""},
{name: "non-empty ID", id: "non-empty-ID"},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
testStore, err := storeReg.Get("test")
require.NoError(t, err)

log, buff := newBufferLogger()

cim := &InputManager{
Logger: log,
StateStore: testStateStore{Store: testStore},
Configure: func(_ *config.C) (Prospector, Harvester, error) {
var wg sync.WaitGroup

return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
}}
cfg1 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/foo
`, tc.id))

// Create a different 2nd config with duplicated ID to ensure
// the ID itself is the only requirement to prevent the 2nd input
// from being created.
cfg2 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/bar
`, tc.id))

_, err = cim.Create(cfg1)
require.NoError(t, err, "1st input should have been created")

// Attempt to create an input with a duplicated ID
_, err = cim.Create(cfg2)
require.Error(t, err, "filestream should not have created an input with a duplicated ID")

logs := buff.String()
// Assert the logs contain the correct log message
assert.Contains(t, logs,
fmt.Sprintf("filestream input '%s' is duplicated:", tc.id))

// Assert the error contains the correct text
assert.Contains(t, err.Error(),
fmt.Sprintf("filestream input with ID '%s' already exists", tc.id))
})
}
})
}

func newBufferLogger() (*logp.Logger, *bytes.Buffer) {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/integration/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,5 +269,5 @@ logging:
filebeat.WaitForLogs(
"Input 'filestream' starting",
10*time.Second,
"Filebeat did log a validation error")
"Filebeat did not log a validation error")
}
14 changes: 9 additions & 5 deletions libbeat/cfgfile/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"sync"

"github.com/joeshaw/multierror"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -71,8 +70,7 @@ func (r *RunnerList) Runners() []Runner {
//
// Runners might fail to start, it's the callers responsibility to
// handle any error. During execution, any encountered errors are
// accumulated in a `multierror.Errors` and returned as
// a `multierror.MultiError` upon completion.
// accumulated in a []errors and returned as errors.Join(errs) upon completion.
//
// While the stopping of runners occurs on separate goroutines,
// Reload will wait for all runners to finish before starting any new runners.
Expand All @@ -85,7 +83,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
r.mutex.Lock()
defer r.mutex.Unlock()

var errs multierror.Errors
var errs []error

startList := map[uint64]*reload.ConfigWithMeta{}
stopList := r.copyRunnerList()
Expand Down Expand Up @@ -147,6 +145,12 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
}
}

// if the error is returned, the broken config will be reloaded indefinitely.
// That means, for the use-case where the user configured it manually,
// it'll be logging the error as often as `reload.period`. Which might
// flood the logs with the error.
// perhaps the test to check if the error on non-reloadable should be here https://github.com/elastic/beats/blob/bf39c9a6e3e76ed1d3789d35a69c7bd9aba82eb1/libbeat/cfgfile/reload.go#L214

errs = append(errs, fmt.Errorf("Error creating runner from config: %w", err))
continue
}
Expand Down Expand Up @@ -179,7 +183,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
// above it is done asynchronously.
moduleRunning.Set(int64(len(r.runners)))

return errs.Err()
return errors.Join(errs...)
}

// Stop all runners
Expand Down
57 changes: 44 additions & 13 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cfgfile

import (
"errors"
"fmt"
"path/filepath"
"sync"
Expand All @@ -26,6 +27,7 @@
"github.com/joeshaw/multierror"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -42,8 +44,6 @@
},
}

debugf = logp.MakeDebug("cfgfile")

// configScans measures how many times the config dir was scanned for
// changes, configReloads measures how many times there were changes that
// triggered an actual reload.
Expand Down Expand Up @@ -101,10 +101,11 @@
path string
done chan struct{}
wg sync.WaitGroup
logger *logp.Logger
}

// NewReloader creates new Reloader instance for the given config
func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader {
func NewReloader(logger *logp.Logger, pipeline beat.PipelineConnector, cfg *config.C) *Reloader {
conf := DefaultDynamicConfig
_ = cfg.Unpack(&conf)

Expand All @@ -118,6 +119,7 @@
config: conf,
path: path,
done: make(chan struct{}),
logger: logger,
}
}

Expand All @@ -128,7 +130,7 @@
return nil
}

debugf("Checking module configs from: %s", rl.path)
rl.logger.Debugf("Checking module configs from: %s", rl.path)
gw := NewGlobWatcher(rl.path)

files, _, err := gw.Scan()
Expand All @@ -142,7 +144,7 @@
return fmt.Errorf("loading configs: %w", err)
}

debugf("Number of module configs found: %v", len(configs))
rl.logger.Debugf("Number of module configs found: %v", len(configs))

// Initialize modules
for _, c := range configs {
Expand Down Expand Up @@ -190,7 +192,7 @@
return

case <-time.After(rl.config.Reload.Period):
debugf("Scan for new config files")
rl.logger.Debug("Scan for new config files")
configScans.Add(1)

files, updated, err := gw.Scan()
Expand All @@ -209,13 +211,19 @@
// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))
rl.logger.Debugf("Number of module configs found: %v", len(configs))

err = list.Reload(configs)
// Force reload on the next iteration if and only if this one failed.
// (Any errors are already logged by list.Reload, so we don't need to
// propagate the details further.)
forceReload = err != nil
// Force reload on the next iteration if and only if the error
// can be retried.
// Errors are already logged by list.Reload, so we don't need to
// propagate details any further.
forceReload = isReloadable(err)
if forceReload {
rl.logger.Debugf("error '%v' can be retried. Will try again in %s", err, rl.config.Reload.Period.String())
} else {
rl.logger.Debugf("error '%v' cannot retried. Modify any input file to reload.", err)
}
}

// Path loading is enabled but not reloading. Loads files only once and then stops.
Expand All @@ -228,6 +236,29 @@
}
}

func isReloadable(err error) bool {
if err == nil {
return false
}

type unwrapList interface {
Unwrap() []error
}

errList, isErrList := err.(unwrapList)

Check failure on line 248 in libbeat/cfgfile/reload.go

View workflow job for this annotation

GitHub Actions / lint (linux)

type assertion on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
if !isErrList {
return !errors.Is(err, common.ErrNonReloadable{})
}

for _, e := range errList.Unwrap() {
if !errors.Is(e, common.ErrNonReloadable{}) {
return true
}
}

return false
}

// Load loads configuration files once.
func (rl *Reloader) Load(runnerFactory RunnerFactory) {
list := NewRunnerList("load", runnerFactory, rl.pipeline)
Expand All @@ -240,7 +271,7 @@

gw := NewGlobWatcher(rl.path)

debugf("Scan for config files")
rl.logger.Debug("Scan for config files")
files, _, err := gw.Scan()
if err != nil {
logp.Err("Error fetching new config files: %v", err)
Expand All @@ -249,7 +280,7 @@
// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))
rl.logger.Debugf("Number of module configs found: %v", len(configs))

if err := list.Reload(configs); err != nil {
logp.Err("Error loading configuration files: %+v", err)
Expand Down
Loading
Loading