From 55c72d3162234bf26e673bdb02f9dd3d7c7a9d89 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 24 Oct 2024 15:59:05 -0400 Subject: [PATCH 01/11] [filebeat] Elasticsearch state storage for httpjson input --- filebeat/beater/filebeat.go | 47 ++- filebeat/beater/store.go | 49 ++- filebeat/features/features.go | 48 +++ .../internal/input-logfile/manager.go | 55 ++- .../internal/input-logfile/store.go | 65 ++-- .../internal/input-logfile/store_test.go | 4 +- filebeat/input/v2/input-cursor/manager.go | 59 +-- filebeat/input/v2/input-cursor/store.go | 66 ++-- filebeat/input/v2/input-cursor/store_test.go | 4 +- filebeat/registrar/registrar.go | 8 +- libbeat/statestore/backend/backend.go | 4 + libbeat/statestore/backend/es/error.go | 24 ++ libbeat/statestore/backend/es/notifier.go | 66 ++++ libbeat/statestore/backend/es/registry.go | 55 +++ libbeat/statestore/backend/es/store.go | 347 ++++++++++++++++++ libbeat/statestore/backend/es/store_test.go | 163 ++++++++ libbeat/statestore/backend/memlog/store.go | 4 + libbeat/statestore/store.go | 4 + x-pack/filebeat/input/awss3/states.go | 2 +- 19 files changed, 953 insertions(+), 121 deletions(-) create mode 100644 filebeat/features/features.go create mode 100644 libbeat/statestore/backend/es/error.go create mode 100644 libbeat/statestore/backend/es/notifier.go create mode 100644 libbeat/statestore/backend/es/registry.go create mode 100644 libbeat/statestore/backend/es/store.go create mode 100644 libbeat/statestore/backend/es/store_test.go diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 9d9cb220d4e..0223fb6fda2 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -18,8 +18,10 @@ package beater import ( + "context" "flag" "fmt" + "os" "path/filepath" "strings" "sync" @@ -39,6 +41,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" @@ -79,7 +82,7 @@ type Filebeat struct { type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin type StateStore interface { - Access() (*statestore.Store, error) + Access(typ string) (*statestore.Store, error) CleanupInterval() time.Duration } @@ -281,13 +284,44 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry) + // Use context, like normal people do, hooking up to the beat.done channel + ctx, cn := context.WithCancel(context.Background()) + go func() { + <-fb.done + cn() + }() + + stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry) if err != nil { logp.Err("Failed to open state store: %+v", err) return err } defer stateStore.Close() + // If notifier is set, configure the listener for output configuration + // The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage + // in order to allow it fully configure + if stateStore.notifier != nil { + b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error { + outCfg := conf.Namespace{} + if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" { + return nil + } + + // TODO: REMOVE THIS HACK BEFORE MERGE. LEAVING FOR TESTING FOR DRAFT + // Injecting the ApiKey that has enough permissions to write to the index + // TODO: need to figure out how add permissions for the state index + // agentless-state-, for example httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959 + apiKey := os.Getenv("AGENTLESS_ELASTICSEARCH_APIKEY") + if apiKey != "" { + outCfg.Config().SetString("api_key", -1, apiKey) + } + + stateStore.notifier.NotifyConfigUpdate(outCfg.Config()) + return nil + }) + } + err = processLogInputTakeOver(stateStore, config) if err != nil { logp.Err("Failed to attempt filestream state take over: %+v", err) @@ -341,6 +375,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { defer func() { _ = inputTaskGroup.Stop() }() + + // Store needs to be fully configured at this point if err := v2InputLoader.Init(&inputTaskGroup); err != nil { logp.Err("Failed to initialize the input managers: %v", err) return err @@ -509,7 +545,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { return nil } - store, err := stateStore.Access() + store, err := stateStore.Access("") if err != nil { return fmt.Errorf("Failed to access state when attempting take over: %w", err) } @@ -567,3 +603,8 @@ func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) { return inputs, nil } + +func useElasticsearchStorage() bool { + s := os.Getenv("AGENTLESS_ELASTICSEARCH_STATE_STORE") + return s != "" +} diff --git a/filebeat/beater/store.go b/filebeat/beater/store.go index 745c507d6e5..5b11792d666 100644 --- a/filebeat/beater/store.go +++ b/filebeat/beater/store.go @@ -18,11 +18,15 @@ package beater import ( + "context" "time" "github.com/elastic/beats/v7/filebeat/config" + "github.com/elastic/beats/v7/filebeat/features" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/beats/v7/libbeat/statestore/backend/es" "github.com/elastic/beats/v7/libbeat/statestore/backend/memlog" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/paths" @@ -30,12 +34,34 @@ import ( type filebeatStore struct { registry *statestore.Registry + esRegistry *statestore.Registry storeName string cleanInterval time.Duration + + // Notifies the Elasticsearch store about configuration change + // which is available only after the beat runtime manager connects to the Agent + // and receives the output configuration + notifier *es.Notifier } -func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) { - memlog, err := memlog.New(logger, memlog.Settings{ +func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) { + var ( + reg backend.Registry + err error + + esreg *es.Registry + notifier *es.Notifier + ) + + if features.IsElasticsearchStateStoreEnabled() { + notifier = es.NewNotifier() + esreg, err = es.New(ctx, logger, notifier) + if err != nil { + return nil, err + } + } + + reg, err = memlog.New(logger, memlog.Settings{ Root: paths.Resolve(paths.Data, cfg.Path), FileMode: cfg.Permissions, }) @@ -43,18 +69,29 @@ func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (* return nil, err } - return &filebeatStore{ - registry: statestore.NewRegistry(memlog), + store := &filebeatStore{ + registry: statestore.NewRegistry(reg), storeName: info.Beat, cleanInterval: cfg.CleanInterval, - }, nil + notifier: notifier, + } + + if esreg != nil { + store.esRegistry = statestore.NewRegistry(esreg) + } + + return store, nil } func (s *filebeatStore) Close() { s.registry.Close() } -func (s *filebeatStore) Access() (*statestore.Store, error) { +// Access returns the storage registry depending on the type. Default is the file store. +func (s *filebeatStore) Access(typ string) (*statestore.Store, error) { + if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil { + return s.esRegistry.Get(s.storeName) + } return s.registry.Get(s.storeName) } diff --git a/filebeat/features/features.go b/filebeat/features/features.go new file mode 100644 index 00000000000..2e9811cbb79 --- /dev/null +++ b/filebeat/features/features.go @@ -0,0 +1,48 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package features + +import "os" + +type void struct{} + +// List of input types Elasticsearch state store is enabled for +var esTypesEnabled = map[string]void{ + "httpjson": {}, +} + +var isESEnabled bool + +func init() { + isESEnabled = (os.Getenv("AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED") != "") +} + +// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless +func IsElasticsearchStateStoreEnabled() bool { + return isESEnabled +} + +// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled +func IsElasticsearchStateStoreEnabledForInput(inputType string) bool { + if IsElasticsearchStateStoreEnabled() { + if _, ok := esTypesEnabled[inputType]; ok { + return true + } + } + return false +} diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index c65ccb5e308..a048fcb58b5 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/go-concert/unison" + "github.com/elastic/beats/v7/filebeat/features" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" @@ -64,7 +65,7 @@ type InputManager struct { // that will be used to collect events from each source. Configure func(cfg *conf.C) (Prospector, Harvester, error) - initOnce sync.Once + initedFull bool initErr error store *store ackUpdater *updateWriter @@ -88,26 +89,40 @@ const globalInputID = ".global" // StateStore interface and configurations used to give the Manager access to the persistent store. type StateStore interface { - Access() (*statestore.Store, error) + Access(typ string) (*statestore.Store, error) CleanupInterval() time.Duration } -func (cim *InputManager) init() error { - cim.initOnce.Do(func() { +// init initializes the state store +// This function is called from: +// 1. InputManager::Init on beat start +// 2. InputManager::Create when the input is initialized with configuration +// When Elasticsearch state storage is used for the input it will be only fully configured on InputManager::Create, +// so skip reading the state from the storage on InputManager::Init in this case +func (cim *InputManager) init(inputID string) error { + if cim.initedFull { + return nil + } - log := cim.Logger.With("input_type", cim.Type) + log := cim.Logger.With("input_type", cim.Type) - var store *store - store, cim.initErr = openStore(log, cim.StateStore, cim.Type) - if cim.initErr != nil { - return - } + var store *store - cim.store = store - cim.ackCH = newUpdateChan() - cim.ackUpdater = newUpdateWriter(store, cim.ackCH) - cim.ids = map[string]struct{}{} - }) + useES := features.IsElasticsearchStateStoreEnabledForInput(cim.Type) + fullInit := !useES || (inputID != "" && useES) + store, cim.initErr = openStore(log, cim.StateStore, cim.Type, inputID, fullInit) + if cim.initErr != nil { + return cim.initErr + } + + cim.store = store + cim.ackCH = newUpdateChan() + cim.ackUpdater = newUpdateWriter(store, cim.ackCH) + cim.ids = map[string]struct{}{} + + if fullInit { + cim.initedFull = true + } return cim.initErr } @@ -115,7 +130,7 @@ func (cim *InputManager) init() error { // Init starts background processes for deleting old entries from the // persistent store if mode is ModeRun. func (cim *InputManager) Init(group unison.Group) error { - if err := cim.init(); err != nil { + if err := cim.init(""); err != nil { return err } @@ -150,10 +165,6 @@ func (cim *InputManager) shutdown() { // Create builds a new v2.Input using the provided Configure function. // The Input will run a go-routine per source that has been configured. func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { - if err := cim.init(); err != nil { - return nil, err - } - settings := struct { ID string `config:"id"` CleanInactive time.Duration `config:"clean_inactive"` @@ -163,6 +174,10 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { return nil, err } + if err := cim.init(settings.ID); err != nil { + return nil, err + } + if settings.ID == "" { cim.Logger.Error("filestream input ID without ID might lead to data" + " duplication, please add an ID and restart Filebeat") diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 024ca5c9bfd..d947511f16d 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -141,16 +141,19 @@ type ( // hook into store close for testing purposes var closeStore = (*store).close -func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) { +func openStore(log *logp.Logger, statestore StateStore, prefix string, inputID string, fullInit bool) (*store, error) { ok := false - persistentStore, err := statestore.Access() + log.Debugf("input-logfile::openStore: prefix: %v", prefix) + + persistentStore, err := statestore.Access(prefix) if err != nil { return nil, err } defer cleanup.IfNot(&ok, func() { persistentStore.Close() }) + persistentStore.SetID(inputID) - states, err := readStates(log, persistentStore, prefix) + states, err := readStates(log, persistentStore, prefix, fullInit) if err != nil { return nil, err } @@ -574,41 +577,43 @@ func (r *resource) stateSnapshot() state { } } -func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) { +func readStates(log *logp.Logger, store *statestore.Store, prefix string, fullInit bool) (*states, error) { keyPrefix := prefix + "::" states := &states{ table: map[string]*resource{}, } - err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(key, keyPrefix) { - return true, nil - } + if fullInit { + err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(key, keyPrefix) { + return true, nil + } - var st state - if err := dec.Decode(&st); err != nil { - log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", - key, err) - return true, nil - } + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", + key, err) + return true, nil + } - resource := &resource{ - key: key, - stored: true, - lock: unison.MakeMutex(), - internalState: stateInternal{ - TTL: st.TTL, - Updated: st.Updated, - }, - cursor: st.Cursor, - cursorMeta: st.Meta, - } - states.table[resource.key] = resource + resource := &resource{ + key: key, + stored: true, + lock: unison.MakeMutex(), + internalState: stateInternal{ + TTL: st.TTL, + Updated: st.Updated, + }, + cursor: st.Cursor, + cursorMeta: st.Meta, + } + states.table[resource.key] = resource - return true, nil - }) - if err != nil { - return nil, err + return true, nil + }) + if err != nil { + return nil, err + } } return states, nil } diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index 6f19e1afad7..374c1cdd8fd 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -70,7 +70,7 @@ func TestStore_OpenClose(t *testing.T) { }) t.Run("fail if persistent store can not be accessed", func(t *testing.T) { - _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test") + _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", false, "") require.Error(t, err) }) @@ -478,7 +478,7 @@ func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *sto persistentStore = createSampleStore(t, nil) } - store, err := openStore(logp.NewLogger("test"), persistentStore, prefix) + store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, false, "") if err != nil { t.Fatalf("failed to open the store") } diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index 1d5578a7122..9b6c1372bd7 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -21,11 +21,11 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/elastic/go-concert/unison" + "github.com/elastic/beats/v7/filebeat/features" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" @@ -63,9 +63,9 @@ type InputManager struct { // that will be used to collect events from each source. Configure func(cfg *conf.C) ([]Source, Input, error) - initOnce sync.Once - initErr error - store *store + initedFull bool + initErr error + store *store } // Source describe a source the input can collect data from. @@ -82,25 +82,38 @@ var ( // StateStore interface and configurations used to give the Manager access to the persistent store. type StateStore interface { - Access() (*statestore.Store, error) + Access(typ string) (*statestore.Store, error) CleanupInterval() time.Duration } -func (cim *InputManager) init() error { - cim.initOnce.Do(func() { - if cim.DefaultCleanTimeout <= 0 { - cim.DefaultCleanTimeout = 30 * time.Minute - } +// init initializes the state store +// This function is called from: +// 1. InputManager::Init on beat start +// 2. InputManager::Create when the input is initialized with configuration +// When Elasticsearch state storage is used for the input it will be only fully configured on InputManager::Create, +// so skip reading the state from the storage on InputManager::Init in this case +func (cim *InputManager) init(inputID string) error { + if cim.initedFull { + return nil + } - log := cim.Logger.With("input_type", cim.Type) - var store *store - store, cim.initErr = openStore(log, cim.StateStore, cim.Type) - if cim.initErr != nil { - return - } + if cim.DefaultCleanTimeout <= 0 { + cim.DefaultCleanTimeout = 30 * time.Minute + } - cim.store = store - }) + log := cim.Logger.With("input_type", cim.Type) + var store *store + useES := features.IsElasticsearchStateStoreEnabledForInput(cim.Type) + fullInit := !useES || (inputID != "" && useES) + store, cim.initErr = openStore(log, cim.StateStore, cim.Type, inputID, fullInit) + if cim.initErr != nil { + return cim.initErr + } + + cim.store = store + if fullInit { + cim.initedFull = true + } return cim.initErr } @@ -108,7 +121,7 @@ func (cim *InputManager) init() error { // Init starts background processes for deleting old entries from the // persistent store if mode is ModeRun. func (cim *InputManager) Init(group unison.Group) error { - if err := cim.init(); err != nil { + if err := cim.init(""); err != nil { return err } @@ -143,10 +156,6 @@ func (cim *InputManager) shutdown() { // Create builds a new v2.Input using the provided Configure function. // The Input will run a go-routine per source that has been configured. func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { - if err := cim.init(); err != nil { - return nil, err - } - settings := struct { ID string `config:"id"` CleanInactive time.Duration `config:"clean_inactive"` @@ -155,6 +164,10 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { return nil, err } + if err := cim.init(settings.ID); err != nil { + return nil, err + } + sources, inp, err := cim.Configure(config) if err != nil { return nil, err diff --git a/filebeat/input/v2/input-cursor/store.go b/filebeat/input/v2/input-cursor/store.go index cc755f046ca..bf48d603d81 100644 --- a/filebeat/input/v2/input-cursor/store.go +++ b/filebeat/input/v2/input-cursor/store.go @@ -127,16 +127,18 @@ type ( // hook into store close for testing purposes var closeStore = (*store).close -func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) { +func openStore(log *logp.Logger, statestore StateStore, prefix string, inputID string, fullInit bool) (*store, error) { ok := false - persistentStore, err := statestore.Access() + log.Debugf("input-cursor::openStore: prefix: %v", prefix) + persistentStore, err := statestore.Access(prefix) if err != nil { return nil, err } defer cleanup.IfNot(&ok, func() { persistentStore.Close() }) + persistentStore.SetID(inputID) - states, err := readStates(log, persistentStore, prefix) + states, err := readStates(log, persistentStore, prefix, fullInit) if err != nil { return nil, err } @@ -283,41 +285,45 @@ func (r *resource) stateSnapshot() state { } } -func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) { +func readStates(log *logp.Logger, store *statestore.Store, prefix string, fullInit bool) (*states, error) { keyPrefix := prefix + "::" states := &states{ table: map[string]*resource{}, } - err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(string(key), keyPrefix) { - return true, nil - } + if fullInit { + err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { + if !strings.HasPrefix(string(key), keyPrefix) { + return true, nil + } + + var st state + if err := dec.Decode(&st); err != nil { + log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", + key, err) + return true, nil + } + + log.Debugf("input-cursor store.Each, got: key:%v, val: %#v", key, st) + + resource := &resource{ + key: key, + stored: true, + lock: unison.MakeMutex(), + internalInSync: true, + internalState: stateInternal{ + TTL: st.TTL, + Updated: st.Updated, + }, + cursor: st.Cursor, + } + states.table[resource.key] = resource - var st state - if err := dec.Decode(&st); err != nil { - log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v", - key, err) return true, nil + }) + if err != nil { + return nil, err } - - resource := &resource{ - key: key, - stored: true, - lock: unison.MakeMutex(), - internalInSync: true, - internalState: stateInternal{ - TTL: st.TTL, - Updated: st.Updated, - }, - cursor: st.Cursor, - } - states.table[resource.key] = resource - - return true, nil - }) - if err != nil { - return nil, err } return states, nil } diff --git a/filebeat/input/v2/input-cursor/store_test.go b/filebeat/input/v2/input-cursor/store_test.go index fc1d57fac3e..2cb2abe3fc4 100644 --- a/filebeat/input/v2/input-cursor/store_test.go +++ b/filebeat/input/v2/input-cursor/store_test.go @@ -52,7 +52,7 @@ func TestStore_OpenClose(t *testing.T) { }) t.Run("fail if persistent store can not be accessed", func(t *testing.T) { - _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test") + _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", false) require.Error(t, err) }) @@ -240,7 +240,7 @@ func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *sto persistentStore = createSampleStore(t, nil) } - store, err := openStore(logp.NewLogger("test"), persistentStore, prefix) + store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, false, "") if err != nil { t.Fatalf("failed to open the store") } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 3ba8427e55f..a5621fc4462 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -55,7 +55,7 @@ type successLogger interface { } type StateStore interface { - Access() (*statestore.Store, error) + Access(typ string) (*statestore.Store, error) } var ( @@ -72,7 +72,7 @@ const fileStatePrefix = "filebeat::logs::" // New creates a new Registrar instance, updating the registry file on // `file.State` updates. New fails if the file can not be opened or created. func New(stateStore StateStore, out successLogger, flushTimeout time.Duration) (*Registrar, error) { - store, err := stateStore.Access() + store, err := stateStore.Access("") if err != nil { return nil, err } @@ -98,7 +98,7 @@ func (r *Registrar) GetStates() []file.State { // loadStates fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. func (r *Registrar) loadStates() error { - states, err := readStatesFrom(r.store) + states, err := r.readStatesFrom(r.store) if err != nil { return fmt.Errorf("can not load filebeat registry state: %w", err) } @@ -266,7 +266,7 @@ func (r *Registrar) processEventStates(states []file.State) { } } -func readStatesFrom(store *statestore.Store) ([]file.State, error) { +func (r *Registrar) readStatesFrom(store *statestore.Store) ([]file.State, error) { var states []file.State err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { diff --git a/libbeat/statestore/backend/backend.go b/libbeat/statestore/backend/backend.go index c40d8515977..5987814f5b0 100644 --- a/libbeat/statestore/backend/backend.go +++ b/libbeat/statestore/backend/backend.go @@ -68,4 +68,8 @@ type Store interface { // is assumed to be invalidated once fn returns // The loop shall return if fn returns an error or false. Each(fn func(string, ValueDecoder) (bool, error)) error + + // Sets the store ID when the full input configuration is aquired + // This is needed in order to support Elasticsearch state store naming convention based on the input ID + SetID(id string) } diff --git a/libbeat/statestore/backend/es/error.go b/libbeat/statestore/backend/es/error.go new file mode 100644 index 00000000000..df8b1a734d6 --- /dev/null +++ b/libbeat/statestore/backend/es/error.go @@ -0,0 +1,24 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import "errors" + +var ( + ErrKeyUnknown = errors.New("key unknown") +) diff --git a/libbeat/statestore/backend/es/notifier.go b/libbeat/statestore/backend/es/notifier.go new file mode 100644 index 00000000000..b4e029a7a51 --- /dev/null +++ b/libbeat/statestore/backend/es/notifier.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "sync" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +type OnConfigUpdateFunc func(c *conf.C) + +type Notifier struct { + mx sync.RWMutex + + listeners map[int]OnConfigUpdateFunc + counter int +} + +func NewNotifier() *Notifier { + n := &Notifier{ + listeners: make(map[int]OnConfigUpdateFunc), + } + return n +} + +func (n *Notifier) Subscribe(fn OnConfigUpdateFunc) int { + n.mx.Lock() + defer n.mx.Unlock() + + id := n.counter + n.counter++ + n.listeners[id] = fn + + return id +} + +func (n *Notifier) Unsubscribe(id int) { + n.mx.Lock() + defer n.mx.Unlock() + delete(n.listeners, id) +} + +func (n *Notifier) NotifyConfigUpdate(c *conf.C) { + n.mx.RLock() + defer n.mx.RUnlock() + + for _, listener := range n.listeners { + go listener(c) + } +} diff --git a/libbeat/statestore/backend/es/registry.go b/libbeat/statestore/backend/es/registry.go new file mode 100644 index 00000000000..12b5dd8b9cb --- /dev/null +++ b/libbeat/statestore/backend/es/registry.go @@ -0,0 +1,55 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/statestore/backend" + "github.com/elastic/elastic-agent-libs/logp" +) + +type Registry struct { + ctx context.Context + + log *logp.Logger + mx sync.Mutex + + notifier *Notifier +} + +func New(ctx context.Context, log *logp.Logger, notifier *Notifier) (*Registry, error) { + r := &Registry{ + ctx: ctx, + log: log, + notifier: notifier, + } + + return r, nil +} + +func (r *Registry) Access(name string) (backend.Store, error) { + r.mx.Lock() + defer r.mx.Unlock() + return openStore(r.ctx, r.log, name, r.notifier) +} + +func (r *Registry) Close() error { + return nil +} diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go new file mode 100644 index 00000000000..08193a8ef2d --- /dev/null +++ b/libbeat/statestore/backend/es/store.go @@ -0,0 +1,347 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "sync" + + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// TODO: Possibly add in-memory cache, since the operations could have delays +// for example when the key is deleted, it's still could be searchable until the next refresh +// the refresh delay is even worse for serverless +type store struct { + ctx context.Context + cn context.CancelFunc + log *logp.Logger + name string + index string + notifier *Notifier + + chReady chan struct{} + once sync.Once + + mx sync.Mutex + cli *eslegclient.Connection + cliErr error +} + +const docType = "_doc" + +func openStore(ctx context.Context, log *logp.Logger, name string, notifier *Notifier) (*store, error) { + ctx, cn := context.WithCancel(ctx) + s := &store{ + ctx: ctx, + cn: cn, + log: log.With("name", name).With("backend", "elasticsearch"), + name: name, + index: renderIndexName(name), + notifier: notifier, + chReady: make(chan struct{}), + } + + chCfg := make(chan *conf.C) + + id := s.notifier.Subscribe(func(c *conf.C) { + select { + case chCfg <- c: + case <-ctx.Done(): + } + }) + + go s.loop(ctx, cn, id, chCfg) + + return s, nil +} + +func renderIndexName(name string) string { + return "agentless-state-" + name +} + +func (s *store) waitReady() error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case <-s.chReady: + return s.cliErr + } +} + +func (s *store) SetID(id string) { + s.mx.Lock() + defer s.mx.Unlock() + + if id == "" { + return + } + s.index = renderIndexName(id) +} + +func (s *store) Close() error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.cn != nil { + s.cn() + } + + if s.cli != nil { + err := s.cli.Close() + s.cli = nil + return err + } + return nil +} + +func (s *store) Has(key string) (bool, error) { + if err := s.waitReady(); err != nil { + return false, err + } + s.mx.Lock() + defer s.mx.Unlock() + + var v interface{} + err := s.get(key, v) + if err != nil { + if errors.Is(err, ErrKeyUnknown) { + return false, nil + } + return false, err + } + return true, nil +} + +func (s *store) Get(key string, to interface{}) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + return s.get(key, to) +} + +func (s *store) get(key string, to interface{}) error { + status, data, err := s.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", s.index, docType, url.QueryEscape(key)), "", nil, nil) + + if err != nil && status != http.StatusNotFound { + return err + } + + if status == http.StatusNotFound { + return ErrKeyUnknown + } + if err != nil { + return err + } + + var qr queryResult + err = json.Unmarshal(data, &qr) + if err != nil { + return err + } + + err = json.Unmarshal(qr.Source.Value, to) + if err != nil { + return err + } + return nil +} + +type queryResult struct { + Found bool `json:"found"` + Source struct { + Value json.RawMessage `json:"v"` + } `json:"_source"` +} + +type doc struct { + Value any `struct:"v"` +} + +type upsertRequest struct { + Doc doc `struct:"doc"` + Upsert doc `struct:"upsert"` +} + +type entry struct { + value interface{} +} + +func (e entry) Decode(to interface{}) error { + return typeconv.Convert(to, e.value) +} + +func renderUpsertRequest(val interface{}) upsertRequest { + d := doc{ + Value: val, + } + return upsertRequest{ + Doc: d, + Upsert: d, + } +} + +func (s *store) Set(key string, value interface{}) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + // The advantage of using upsert here is the the seqno doesn't increase if the document is the same + upsert := renderUpsertRequest(value) + _, _, err := s.cli.Request("POST", fmt.Sprintf("/%s/%s/%s", s.index, "_update", url.QueryEscape(key)), "", nil, upsert) + if err != nil { + return err + } + return nil +} + +func (s *store) Remove(key string) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + _, _, err := s.cli.Delete(s.index, docType, url.QueryEscape(key), nil) + if err != nil { + return err + } + return nil +} + +type searchResult struct { + ID string `json:"_id"` + Source struct { + Value json.RawMessage `json:"v"` + } `json:"_source"` +} + +func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + // Can't wait here for when the connection is configured. + // Currently Each method is being called for each plugin with store (input-logfile, input-cursor) as well as the registrar + // from filebeat::Run. All these pieces have to be initialized before the libbeat Manager starts and gets a chance to connect + // to the Agent and get the configuration. + // + // At this point it's not clear how to hook up the elasticsearch storage or if it's even possible without some major rewrite. + // + // Commented for now: + // if err := s.waitReady(); err != nil { + // return err + // } + + s.mx.Lock() + defer s.mx.Unlock() + + // Do nothing for now if the store was not initialized + if s.cli == nil { + return nil + } + + status, result, err := s.cli.SearchURIWithBody(s.index, "", nil, map[string]any{ + "query": map[string]any{ + "match_all": map[string]any{}, + }, + "size": 1000, // TODO: we might have to do scroll of there are more than 1000 keys + }) + + if err != nil && status != http.StatusNotFound { + return err + } + err = nil + + if result == nil || len(result.Hits.Hits) == 0 { + return nil + } + + for _, hit := range result.Hits.Hits { + var sres searchResult + err = json.Unmarshal(hit, &sres) + if err != nil { + return err + } + var e entry + err = json.Unmarshal(sres.Source.Value, &e.value) + if err != nil { + return err + } + key, err := url.QueryUnescape(sres.ID) + if err != nil { + return err + } + + fn(key, e) + } + + return nil +} + +func (s *store) configure(c *conf.C) { + s.mx.Lock() + defer s.mx.Unlock() + + if s.cli != nil { + _ = s.cli.Close() + s.cli = nil + } + s.cliErr = nil + + cli, err := eslegclient.NewConnectedClient(c, s.name) + if err != nil { + s.log.Errorf("ES store, failed to create elasticsearch client: %v", err) + s.cliErr = err + } else { + s.cli = cli + } + + // Signal store is ready + s.once.Do(func() { + close(s.chReady) + }) + +} + +func (s *store) loop(ctx context.Context, cn context.CancelFunc, subId int, chCfg chan *conf.C) { + defer cn() + + defer s.notifier.Unsubscribe(subId) + + defer s.log.Debug("ES store exit main loop") + + for { + select { + case <-ctx.Done(): + return + case cu := <-chCfg: + s.configure(cu) + } + } +} diff --git a/libbeat/statestore/backend/es/store_test.go b/libbeat/statestore/backend/es/store_test.go new file mode 100644 index 00000000000..979618033e3 --- /dev/null +++ b/libbeat/statestore/backend/es/store_test.go @@ -0,0 +1,163 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/elastic/beats/v7/libbeat/statestore/backend" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/google/go-cmp/cmp" +) + +func TestStore(t *testing.T) { + // This just a convenience test for store development + // REMOVE: before the opening PR + //t.Skip() + + ctx, cn := context.WithCancel(context.Background()) + defer cn() + + notifier := NewNotifier() + + store, err := openStore(ctx, logp.NewLogger("tester"), "filebeat", notifier) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + config, err := conf.NewConfigFrom(map[string]interface{}{ + "api_key": "xxxxxxxxxx:xxxxxxxc-U6VH4DK8g", + "hosts": []string{ + "https://6598f1d41f9d4e81a78117dddbb2b03e.us-central1.gcp.cloud.es.io:443", + }, + "preset": "balanced", + "type": "elasticsearch", + }) + + if err != nil { + t.Fatal(err) + } + + notifier.NotifyConfigUpdate(config) + + var m map[string]any + store.SetID("httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959") + err = store.Get("foo", &m) + if err != nil && !errors.Is(err, ErrKeyUnknown) { + t.Fatal(err) + } + + err = store.Each(func(s string, vd backend.ValueDecoder) (bool, error) { + var v any + err := vd.Decode(&v) + if err != nil { + return false, err + } + fmt.Printf("%v :: %v\n", s, v) + return true, nil + }) + + v := map[string]interface{}{ + "updated": []interface{}{ + float64(280444598839616), + float64(1729277837), + }, + "cursor": map[string]interface{}{ + "published": "2024-10-17T18:33:58.960Z", + }, + "ttl": float64(1800000000000), + } + + err = store.Set("foo", v) + if err != nil { + t.Fatal(err) + } + + err = store.Get("foo", &m) + if err != nil && !errors.Is(err, ErrKeyUnknown) { + t.Fatal(err) + } + + diff := cmp.Diff(v, m) + if diff != "" { + t.Fatal(diff) + } + + var s1 = "dfsdf" + err = store.Set("foo1", s1) + if err != nil { + t.Fatal(err) + } + + var s2 string + err = store.Get("foo1", &s2) + if err != nil { + t.Fatal(err) + } + + diff = cmp.Diff(s1, s2) + if diff != "" { + t.Fatal(diff) + } + + var n1 = 12345 + err = store.Set("foon", n1) + if err != nil { + t.Fatal(err) + } + + var n2 int + err = store.Get("foon", &n2) + if err != nil { + t.Fatal(err) + } + + diff = cmp.Diff(n1, n2) + if diff != "" { + t.Fatal(diff) + } + + if err != nil { + t.Fatal(err) + } + + err = store.Remove("foon") + if err != nil { + t.Fatal(err) + } + + err = store.Each(func(s string, vd backend.ValueDecoder) (bool, error) { + var v any + err := vd.Decode(&v) + if err != nil { + return false, err + } + fmt.Printf("%v :: %v\n", s, v) + return true, nil + }) + + if err != nil { + t.Fatal(err) + } + +} diff --git a/libbeat/statestore/backend/memlog/store.go b/libbeat/statestore/backend/memlog/store.go index 5bd6aac77fd..67b94862262 100644 --- a/libbeat/statestore/backend/memlog/store.go +++ b/libbeat/statestore/backend/memlog/store.go @@ -276,6 +276,10 @@ func (m *memstore) Remove(key string) bool { return true } +func (s *store) SetID(_ string) { + // NOOP +} + func (e entry) Decode(to interface{}) error { return typeconv.Convert(to, e.value) } diff --git a/libbeat/statestore/store.go b/libbeat/statestore/store.go index c204fcde8f5..875ba43e870 100644 --- a/libbeat/statestore/store.go +++ b/libbeat/statestore/store.go @@ -61,6 +61,10 @@ func newStore(shared *sharedStore) *Store { } } +func (s *Store) SetID(id string) { + s.shared.backend.SetID(id) +} + // Close deactivates the current store. No new transacation can be generated. // Already active transaction will continue to function until Closed. // The backing store will be closed once all stores and active transactions have been closed. diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index cb40abbd41f..484d0ca2c7d 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -32,7 +32,7 @@ type states struct { // newStates generates a new states registry. func newStates(log *logp.Logger, stateStore beater.StateStore) (*states, error) { - store, err := stateStore.Access() + store, err := stateStore.Access("") if err != nil { return nil, fmt.Errorf("can't access persistent store: %w", err) } From 1bf288d3fee63ffcb096422cf3adcf2fb3352352 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 24 Oct 2024 16:54:34 -0400 Subject: [PATCH 02/11] Fixup tests --- filebeat/input/filestream/environment_test.go | 8 ++++---- filebeat/input/filestream/input_test.go | 2 +- .../input/filestream/internal/input-logfile/store_test.go | 6 +++--- filebeat/input/journald/environment_test.go | 2 +- filebeat/input/journald/input_filtering_test.go | 2 +- filebeat/input/v2/input-cursor/store_test.go | 6 +++--- libbeat/statestore/storetest/storetest.go | 4 ++++ x-pack/filebeat/input/awss3/states_test.go | 2 +- x-pack/filebeat/input/salesforce/input_manager_test.go | 2 +- 9 files changed, 19 insertions(+), 15 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index f9804bb16f3..680cb77215c 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string { } func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) { - inputStore, _ := e.stateStore.Access() + inputStore, _ := e.stateStore.Access("") actual := 0 err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) { @@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str e.t.Fatalf("cannot stat file when cheking for offset: %+v", err) } - inputStore, _ := e.stateStore.Access() + inputStore, _ := e.stateStore.Access("") id := getIDFromPath(filepath, inputID, fi) var entry registryEntry @@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect } func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) { - inputStore, _ := e.stateStore.Access() + inputStore, _ := e.stateStore.Access("") var entry registryEntry err := inputStore.Get(key, &entry) @@ -538,7 +538,7 @@ func (s *testInputStore) Close() { s.registry.Close() } -func (s *testInputStore) Access() (*statestore.Store, error) { +func (s *testInputStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index 3dfe176ac01..f94494b6e00 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -244,7 +244,7 @@ func (s *testStore) Close() { s.registry.Close() } -func (s *testStore) Access() (*statestore.Store, error) { +func (s *testStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filestream-benchmark") } diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index 374c1cdd8fd..a9f84e12517 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -70,7 +70,7 @@ func TestStore_OpenClose(t *testing.T) { }) t.Run("fail if persistent store can not be accessed", func(t *testing.T) { - _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", false, "") + _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", "", true) require.Error(t, err) }) @@ -478,7 +478,7 @@ func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *sto persistentStore = createSampleStore(t, nil) } - store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, false, "") + store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, "", true) if err != nil { t.Fatalf("failed to open the store") } @@ -505,7 +505,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore { func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } -func (ts testStateStore) Access() (*statestore.Store, error) { +func (ts testStateStore) Access(_ string) (*statestore.Store, error) { if ts.Store == nil { return nil, errors.New("no store configured") } diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go index 57f75163e92..9ea77d017d1 100644 --- a/filebeat/input/journald/environment_test.go +++ b/filebeat/input/journald/environment_test.go @@ -139,7 +139,7 @@ func (s *testInputStore) Close() { s.registry.Close() } -func (s *testInputStore) Access() (*statestore.Store, error) { +func (s *testInputStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 1aa58d1f8bc..32c947de57e 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -251,7 +251,7 @@ func TestInputSeek(t *testing.T) { env := newInputTestingEnvironment(t) if testCase.cursor != "" { - store, _ := env.stateStore.Access() + store, _ := env.stateStore.Access(_ string) tmp := map[string]any{} if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil { t.Fatal(err) diff --git a/filebeat/input/v2/input-cursor/store_test.go b/filebeat/input/v2/input-cursor/store_test.go index 2cb2abe3fc4..b7fbba9c8ad 100644 --- a/filebeat/input/v2/input-cursor/store_test.go +++ b/filebeat/input/v2/input-cursor/store_test.go @@ -52,7 +52,7 @@ func TestStore_OpenClose(t *testing.T) { }) t.Run("fail if persistent store can not be accessed", func(t *testing.T) { - _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", false) + _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", "", true) require.Error(t, err) }) @@ -240,7 +240,7 @@ func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *sto persistentStore = createSampleStore(t, nil) } - store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, false, "") + store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, "", true) if err != nil { t.Fatalf("failed to open the store") } @@ -267,7 +267,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore { func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } -func (ts testStateStore) Access() (*statestore.Store, error) { +func (ts testStateStore) Access(_ string) (*statestore.Store, error) { if ts.Store == nil { return nil, errors.New("no store configured") } diff --git a/libbeat/statestore/storetest/storetest.go b/libbeat/statestore/storetest/storetest.go index a7a91074696..2cdc0e1b4e9 100644 --- a/libbeat/statestore/storetest/storetest.go +++ b/libbeat/statestore/storetest/storetest.go @@ -213,3 +213,7 @@ func (s *MapStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) err func (d valueUnpacker) Decode(to interface{}) error { return typeconv.Convert(to, d.from) } + +func (s *MapStore) SetID(_ string) { + // NOOP +} diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index dc345d5f88e..24259752ef5 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -30,7 +30,7 @@ func (s *testInputStore) Close() { _ = s.registry.Close() } -func (s *testInputStore) Access() (*statestore.Store, error) { +func (s *testInputStore) Access(_ string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/x-pack/filebeat/input/salesforce/input_manager_test.go b/x-pack/filebeat/input/salesforce/input_manager_test.go index 8b73763f93f..fc69f918040 100644 --- a/x-pack/filebeat/input/salesforce/input_manager_test.go +++ b/x-pack/filebeat/input/salesforce/input_manager_test.go @@ -34,7 +34,7 @@ func makeTestStore(data map[string]interface{}) *statestore.Store { type stateStore struct{} -func (stateStore) Access() (*statestore.Store, error) { +func (stateStore) Access(_ string) (*statestore.Store, error) { return makeTestStore(map[string]interface{}{"hello": "world"}), nil } func (stateStore) CleanupInterval() time.Duration { return time.Duration(0) } From e00305333c5cd722394502dddfb3a02c8e127b11 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Fri, 25 Oct 2024 09:20:39 -0400 Subject: [PATCH 03/11] Linter --- libbeat/statestore/backend/backend.go | 2 +- libbeat/statestore/backend/es/store_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/statestore/backend/backend.go b/libbeat/statestore/backend/backend.go index 5987814f5b0..3685165007e 100644 --- a/libbeat/statestore/backend/backend.go +++ b/libbeat/statestore/backend/backend.go @@ -69,7 +69,7 @@ type Store interface { // The loop shall return if fn returns an error or false. Each(fn func(string, ValueDecoder) (bool, error)) error - // Sets the store ID when the full input configuration is aquired + // Sets the store ID when the full input configuration is acquired // This is needed in order to support Elasticsearch state store naming convention based on the input ID SetID(id string) } diff --git a/libbeat/statestore/backend/es/store_test.go b/libbeat/statestore/backend/es/store_test.go index 979618033e3..c8e7ac54ee5 100644 --- a/libbeat/statestore/backend/es/store_test.go +++ b/libbeat/statestore/backend/es/store_test.go @@ -32,7 +32,7 @@ import ( func TestStore(t *testing.T) { // This just a convenience test for store development // REMOVE: before the opening PR - //t.Skip() + t.Skip() ctx, cn := context.WithCancel(context.Background()) defer cn() From dfce978dc750e6763fee87929c388bfcd89a3afc Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Mon, 28 Oct 2024 08:44:35 -0400 Subject: [PATCH 04/11] Enabled elastisearch storage support for cel input and some cleanup --- filebeat/beater/filebeat.go | 11 +++++------ filebeat/features/features.go | 1 + filebeat/input/journald/input_filtering_test.go | 2 +- filebeat/input/v2/input-cursor/store.go | 2 +- filebeat/registrar/registrar.go | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 0223fb6fda2..fab69ae82d4 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -305,6 +305,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error { outCfg := conf.Namespace{} if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" { + logp.Err("Failed to unpack the output config: %v", err) return nil } @@ -314,7 +315,10 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // agentless-state-, for example httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959 apiKey := os.Getenv("AGENTLESS_ELASTICSEARCH_APIKEY") if apiKey != "" { - outCfg.Config().SetString("api_key", -1, apiKey) + err := outCfg.Config().SetString("api_key", -1, apiKey) + if err != nil { + return fmt.Errorf("failed to overwrite api_key: %w", err) + } } stateStore.notifier.NotifyConfigUpdate(outCfg.Config()) @@ -603,8 +607,3 @@ func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) { return inputs, nil } - -func useElasticsearchStorage() bool { - s := os.Getenv("AGENTLESS_ELASTICSEARCH_STATE_STORE") - return s != "" -} diff --git a/filebeat/features/features.go b/filebeat/features/features.go index 2e9811cbb79..e839669d9d5 100644 --- a/filebeat/features/features.go +++ b/filebeat/features/features.go @@ -24,6 +24,7 @@ type void struct{} // List of input types Elasticsearch state store is enabled for var esTypesEnabled = map[string]void{ "httpjson": {}, + "cel": {}, } var isESEnabled bool diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 32c947de57e..d497b5b6c22 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -251,7 +251,7 @@ func TestInputSeek(t *testing.T) { env := newInputTestingEnvironment(t) if testCase.cursor != "" { - store, _ := env.stateStore.Access(_ string) + store, _ := env.stateStore.Access("") tmp := map[string]any{} if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil { t.Fatal(err) diff --git a/filebeat/input/v2/input-cursor/store.go b/filebeat/input/v2/input-cursor/store.go index bf48d603d81..fec6f0ff588 100644 --- a/filebeat/input/v2/input-cursor/store.go +++ b/filebeat/input/v2/input-cursor/store.go @@ -293,7 +293,7 @@ func readStates(log *logp.Logger, store *statestore.Store, prefix string, fullIn if fullInit { err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(string(key), keyPrefix) { + if !strings.HasPrefix(key, keyPrefix) { return true, nil } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index a5621fc4462..b1eeaf7509f 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -113,7 +113,7 @@ func (r *Registrar) Start() error { // Load the previous log file locations now, for use in input err := r.loadStates() if err != nil { - return fmt.Errorf("error loading state: %v", err) + return fmt.Errorf("error loading state: %w", err) } r.wg.Add(1) From e2e25fa18fd581c5e968edc8a4bd4568867c4601 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 30 Oct 2024 08:24:59 -0400 Subject: [PATCH 05/11] Remove the "hack" with .Each implementation --- libbeat/statestore/backend/es/store.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go index 08193a8ef2d..5c2774dd211 100644 --- a/libbeat/statestore/backend/es/store.go +++ b/libbeat/statestore/backend/es/store.go @@ -246,17 +246,9 @@ type searchResult struct { } func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { - // Can't wait here for when the connection is configured. - // Currently Each method is being called for each plugin with store (input-logfile, input-cursor) as well as the registrar - // from filebeat::Run. All these pieces have to be initialized before the libbeat Manager starts and gets a chance to connect - // to the Agent and get the configuration. - // - // At this point it's not clear how to hook up the elasticsearch storage or if it's even possible without some major rewrite. - // - // Commented for now: - // if err := s.waitReady(); err != nil { - // return err - // } + if err := s.waitReady(); err != nil { + return err + } s.mx.Lock() defer s.mx.Unlock() From c1fc2a8c91f0a93d10ae38b6961d4f7eacb3d6a3 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 31 Oct 2024 07:32:22 -0400 Subject: [PATCH 06/11] Adjust for the latest main es client signature change --- libbeat/statestore/backend/es/store.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go index 5c2774dd211..bb95e3dcbbd 100644 --- a/libbeat/statestore/backend/es/store.go +++ b/libbeat/statestore/backend/es/store.go @@ -296,7 +296,7 @@ func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error return nil } -func (s *store) configure(c *conf.C) { +func (s *store) configure(ctx context.Context, c *conf.C) { s.mx.Lock() defer s.mx.Unlock() @@ -306,7 +306,7 @@ func (s *store) configure(c *conf.C) { } s.cliErr = nil - cli, err := eslegclient.NewConnectedClient(c, s.name) + cli, err := eslegclient.NewConnectedClient(ctx, c, s.name) if err != nil { s.log.Errorf("ES store, failed to create elasticsearch client: %v", err) s.cliErr = err @@ -333,7 +333,7 @@ func (s *store) loop(ctx context.Context, cn context.CancelFunc, subId int, chCf case <-ctx.Done(): return case cu := <-chCfg: - s.configure(cu) + s.configure(ctx, cu) } } } From c9b025685bd1da1d49275d705cadf1c0105b84e5 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 31 Oct 2024 08:02:03 -0400 Subject: [PATCH 07/11] Make check happy --- libbeat/statestore/backend/es/store_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/statestore/backend/es/store_test.go b/libbeat/statestore/backend/es/store_test.go index c8e7ac54ee5..867b0165861 100644 --- a/libbeat/statestore/backend/es/store_test.go +++ b/libbeat/statestore/backend/es/store_test.go @@ -23,10 +23,11 @@ import ( "fmt" "testing" + "github.com/google/go-cmp/cmp" + "github.com/elastic/beats/v7/libbeat/statestore/backend" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/google/go-cmp/cmp" ) func TestStore(t *testing.T) { From 21d451d987544a177941789f71758883f5e78b86 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 31 Oct 2024 08:51:17 -0400 Subject: [PATCH 08/11] Fixed missing interface method on test mock store --- libbeat/statestore/mock_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/statestore/mock_test.go b/libbeat/statestore/mock_test.go index 69a1d80303c..c446aee5140 100644 --- a/libbeat/statestore/mock_test.go +++ b/libbeat/statestore/mock_test.go @@ -87,3 +87,6 @@ func (m *mockStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) er args := m.Called(fn) return args.Error(0) } + +func (m *mockStore) SetID(_ string) { +} From ffb9364b6a9180b8a6d2d8ce277ce232b32900e3 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 31 Oct 2024 09:20:48 -0400 Subject: [PATCH 09/11] Add error check in ES store Each --- libbeat/statestore/backend/es/store.go | 7 ++++++- libbeat/statestore/backend/es/store_test.go | 3 --- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go index bb95e3dcbbd..1aa16fe67b5 100644 --- a/libbeat/statestore/backend/es/store.go +++ b/libbeat/statestore/backend/es/store.go @@ -280,17 +280,22 @@ func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error if err != nil { return err } + var e entry err = json.Unmarshal(sres.Source.Value, &e.value) if err != nil { return err } + key, err := url.QueryUnescape(sres.ID) if err != nil { return err } - fn(key, e) + cont, err := fn(key, e) + if !cont || err != nil { + return err + } } return nil diff --git a/libbeat/statestore/backend/es/store_test.go b/libbeat/statestore/backend/es/store_test.go index 867b0165861..12afc90ae99 100644 --- a/libbeat/statestore/backend/es/store_test.go +++ b/libbeat/statestore/backend/es/store_test.go @@ -20,7 +20,6 @@ package es import ( "context" "errors" - "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -74,7 +73,6 @@ func TestStore(t *testing.T) { if err != nil { return false, err } - fmt.Printf("%v :: %v\n", s, v) return true, nil }) @@ -153,7 +151,6 @@ func TestStore(t *testing.T) { if err != nil { return false, err } - fmt.Printf("%v :: %v\n", s, v) return true, nil }) From 10d212f677310fd96e9e587ab347d89bcfc4db94 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Tue, 5 Nov 2024 12:32:37 -0500 Subject: [PATCH 10/11] Parameterize the supported input types through environment variables --- filebeat/features/features.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/filebeat/features/features.go b/filebeat/features/features.go index e839669d9d5..85402e513b0 100644 --- a/filebeat/features/features.go +++ b/filebeat/features/features.go @@ -17,20 +17,36 @@ package features -import "os" +import ( + "os" + "strings" +) + +const ( + envAgentlessElasticsearchStateStoreEnabled = "AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED" + envAgentlessElasticsearchStateStoreInputTypes = "AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES" +) type void struct{} // List of input types Elasticsearch state store is enabled for -var esTypesEnabled = map[string]void{ - "httpjson": {}, - "cel": {}, -} +var esTypesEnabled map[string]void var isESEnabled bool func init() { - isESEnabled = (os.Getenv("AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED") != "") + isESEnabled = (os.Getenv(envAgentlessElasticsearchStateStoreEnabled) != "") + + esTypesEnabled = make(map[string]void) + + s := os.Getenv(envAgentlessElasticsearchStateStoreInputTypes) + arr := strings.Split(s, ",") + for _, e := range arr { + k := strings.TrimSpace(e) + if k != "" { + esTypesEnabled[k] = void{} + } + } } // IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless From 24000d79aa5c7f64a33bcc1c96e161329af78962 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Tue, 5 Nov 2024 14:55:32 -0500 Subject: [PATCH 11/11] Delete the dev tests file --- libbeat/statestore/backend/es/store_test.go | 161 -------------------- 1 file changed, 161 deletions(-) delete mode 100644 libbeat/statestore/backend/es/store_test.go diff --git a/libbeat/statestore/backend/es/store_test.go b/libbeat/statestore/backend/es/store_test.go deleted file mode 100644 index 12afc90ae99..00000000000 --- a/libbeat/statestore/backend/es/store_test.go +++ /dev/null @@ -1,161 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package es - -import ( - "context" - "errors" - "testing" - - "github.com/google/go-cmp/cmp" - - "github.com/elastic/beats/v7/libbeat/statestore/backend" - conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" -) - -func TestStore(t *testing.T) { - // This just a convenience test for store development - // REMOVE: before the opening PR - t.Skip() - - ctx, cn := context.WithCancel(context.Background()) - defer cn() - - notifier := NewNotifier() - - store, err := openStore(ctx, logp.NewLogger("tester"), "filebeat", notifier) - if err != nil { - t.Fatal(err) - } - defer store.Close() - - config, err := conf.NewConfigFrom(map[string]interface{}{ - "api_key": "xxxxxxxxxx:xxxxxxxc-U6VH4DK8g", - "hosts": []string{ - "https://6598f1d41f9d4e81a78117dddbb2b03e.us-central1.gcp.cloud.es.io:443", - }, - "preset": "balanced", - "type": "elasticsearch", - }) - - if err != nil { - t.Fatal(err) - } - - notifier.NotifyConfigUpdate(config) - - var m map[string]any - store.SetID("httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959") - err = store.Get("foo", &m) - if err != nil && !errors.Is(err, ErrKeyUnknown) { - t.Fatal(err) - } - - err = store.Each(func(s string, vd backend.ValueDecoder) (bool, error) { - var v any - err := vd.Decode(&v) - if err != nil { - return false, err - } - return true, nil - }) - - v := map[string]interface{}{ - "updated": []interface{}{ - float64(280444598839616), - float64(1729277837), - }, - "cursor": map[string]interface{}{ - "published": "2024-10-17T18:33:58.960Z", - }, - "ttl": float64(1800000000000), - } - - err = store.Set("foo", v) - if err != nil { - t.Fatal(err) - } - - err = store.Get("foo", &m) - if err != nil && !errors.Is(err, ErrKeyUnknown) { - t.Fatal(err) - } - - diff := cmp.Diff(v, m) - if diff != "" { - t.Fatal(diff) - } - - var s1 = "dfsdf" - err = store.Set("foo1", s1) - if err != nil { - t.Fatal(err) - } - - var s2 string - err = store.Get("foo1", &s2) - if err != nil { - t.Fatal(err) - } - - diff = cmp.Diff(s1, s2) - if diff != "" { - t.Fatal(diff) - } - - var n1 = 12345 - err = store.Set("foon", n1) - if err != nil { - t.Fatal(err) - } - - var n2 int - err = store.Get("foon", &n2) - if err != nil { - t.Fatal(err) - } - - diff = cmp.Diff(n1, n2) - if diff != "" { - t.Fatal(diff) - } - - if err != nil { - t.Fatal(err) - } - - err = store.Remove("foon") - if err != nil { - t.Fatal(err) - } - - err = store.Each(func(s string, vd backend.ValueDecoder) (bool, error) { - var v any - err := vd.Decode(&v) - if err != nil { - return false, err - } - return true, nil - }) - - if err != nil { - t.Fatal(err) - } - -}