Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Elasticsearch state storage for httpjson and cel inputs #41446

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
47 changes: 44 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package beater

import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
Expand All @@ -39,6 +41,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
Expand Down Expand Up @@ -79,7 +82,7 @@ type Filebeat struct {
type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin

type StateStore interface {
Access() (*statestore.Store, error)
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

Expand Down Expand Up @@ -281,13 +284,44 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
// Use context, like normal people do, hooking up to the beat.done channel
ctx, cn := context.WithCancel(context.Background())
go func() {
<-fb.done
cn()
}()

stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// If notifier is set, configure the listener for output configuration
// The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
// in order to allow it fully configure
if stateStore.notifier != nil {
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
return nil
}

// TODO: REMOVE THIS HACK BEFORE MERGE. LEAVING FOR TESTING FOR DRAFT
// Injecting the ApiKey that has enough permissions to write to the index
// TODO: need to figure out how add permissions for the state index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fleet knows when something is an agentless package and that is probably what would hook into this to generate the key.

We could add a new state storage section to an agent policy (agent.storage?) that Fleet knows how to template when this happens.

Agent could then send it down as another output unit with a new type (or we could define a new type of unit but that is even more work).

This would allow the key to update on the fly through Fleet and control protocol.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also possibly be handled in the agentless api / controller and hidden from Fleet if we just inject it in as an env var. No opposition to that either really.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also possibly be handled in the agentless api / controller and hidden from Fleet if we just inject it in as an env var. No opposition to that either really.

I brought this up during the meeting today as an option. IMHO it's just one thing to manage, might be cleaner if all in one place in the policy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of details we need to think about with respect to these keys is what the process should be for rotating and/or revoking them.

// agentless-state-<input id>, for example httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959
apiKey := os.Getenv("AGENTLESS_ELASTICSEARCH_APIKEY")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will collaborate with agentless team on addressing this part

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When running under Elastic agent, every change of the output configuration results in a restart of the Beat process, in case that simplifies anything here for you.

if apiKey != "" {
outCfg.Config().SetString("api_key", -1, apiKey)
}

stateStore.notifier.NotifyConfigUpdate(outCfg.Config())
return nil
})
}

err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
Expand Down Expand Up @@ -341,6 +375,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
_ = inputTaskGroup.Stop()
}()

// Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
Expand Down Expand Up @@ -509,7 +545,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
return nil
}

store, err := stateStore.Access()
store, err := stateStore.Access("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
Expand Down Expand Up @@ -567,3 +603,8 @@ func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) {

return inputs, nil
}

func useElasticsearchStorage() bool {
s := os.Getenv("AGENTLESS_ELASTICSEARCH_STATE_STORE")
return s != ""
}
49 changes: 43 additions & 6 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,80 @@
package beater

import (
"context"
"time"

"github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/features"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend"
"github.com/elastic/beats/v7/libbeat/statestore/backend/es"
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

type filebeatStore struct {
registry *statestore.Registry
esRegistry *statestore.Registry
storeName string
cleanInterval time.Duration

// Notifies the Elasticsearch store about configuration change
// which is available only after the beat runtime manager connects to the Agent
// and receives the output configuration
notifier *es.Notifier
}

func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
memlog, err := memlog.New(logger, memlog.Settings{
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
var (
reg backend.Registry
err error

esreg *es.Registry
notifier *es.Notifier
)

if features.IsElasticsearchStateStoreEnabled() {
notifier = es.NewNotifier()
esreg, err = es.New(ctx, logger, notifier)
if err != nil {
return nil, err
}
}

reg, err = memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
return nil, err
}

return &filebeatStore{
registry: statestore.NewRegistry(memlog),
store := &filebeatStore{
registry: statestore.NewRegistry(reg),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
}, nil
notifier: notifier,
}

if esreg != nil {
store.esRegistry = statestore.NewRegistry(esreg)
}

return store, nil
}

func (s *filebeatStore) Close() {
s.registry.Close()
}

func (s *filebeatStore) Access() (*statestore.Store, error) {
// Access returns the storage registry depending on the type. Default is the file store.
func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
return s.esRegistry.Get(s.storeName)
}
return s.registry.Get(s.storeName)
}

Expand Down
48 changes: 48 additions & 0 deletions filebeat/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import "os"

type void struct{}

// List of input types Elasticsearch state store is enabled for
var esTypesEnabled = map[string]void{
"httpjson": {},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be configuration instead of in the code, maybe another env var?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure can do. Something like this?
AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES=httpjson,cel

}

var isESEnabled bool

func init() {
isESEnabled = (os.Getenv("AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED") != "")
}

// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless
func IsElasticsearchStateStoreEnabled() bool {
return isESEnabled
}

// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled
func IsElasticsearchStateStoreEnabledForInput(inputType string) bool {
if IsElasticsearchStateStoreEnabled() {
if _, ok := esTypesEnabled[inputType]; ok {
return true
}
}
return false
}
55 changes: 35 additions & 20 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/elastic/go-concert/unison"

"github.com/elastic/beats/v7/filebeat/features"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/statestore"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -64,7 +65,7 @@ type InputManager struct {
// that will be used to collect events from each source.
Configure func(cfg *conf.C) (Prospector, Harvester, error)

initOnce sync.Once
initedFull bool
initErr error
store *store
ackUpdater *updateWriter
Expand All @@ -88,34 +89,48 @@ const globalInputID = ".global"

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

func (cim *InputManager) init() error {
cim.initOnce.Do(func() {
// init initializes the state store
// This function is called from:
// 1. InputManager::Init on beat start
// 2. InputManager::Create when the input is initialized with configuration
// When Elasticsearch state storage is used for the input it will be only fully configured on InputManager::Create,
// so skip reading the state from the storage on InputManager::Init in this case
func (cim *InputManager) init(inputID string) error {
if cim.initedFull {
return nil
}

log := cim.Logger.With("input_type", cim.Type)
log := cim.Logger.With("input_type", cim.Type)

var store *store
store, cim.initErr = openStore(log, cim.StateStore, cim.Type)
if cim.initErr != nil {
return
}
var store *store

cim.store = store
cim.ackCH = newUpdateChan()
cim.ackUpdater = newUpdateWriter(store, cim.ackCH)
cim.ids = map[string]struct{}{}
})
useES := features.IsElasticsearchStateStoreEnabledForInput(cim.Type)
fullInit := !useES || (inputID != "" && useES)
store, cim.initErr = openStore(log, cim.StateStore, cim.Type, inputID, fullInit)
if cim.initErr != nil {
return cim.initErr
}

cim.store = store
cim.ackCH = newUpdateChan()
cim.ackUpdater = newUpdateWriter(store, cim.ackCH)
cim.ids = map[string]struct{}{}

if fullInit {
cim.initedFull = true
}

return cim.initErr
}

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group) error {
if err := cim.init(); err != nil {
if err := cim.init(""); err != nil {
return err
}

Expand Down Expand Up @@ -150,10 +165,6 @@ func (cim *InputManager) shutdown() {
// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
if err := cim.init(); err != nil {
return nil, err
}

settings := struct {
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
Expand All @@ -163,6 +174,10 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
return nil, err
}

if err := cim.init(settings.ID); err != nil {
return nil, err
}

if settings.ID == "" {
cim.Logger.Error("filestream input ID without ID might lead to data" +
" duplication, please add an ID and restart Filebeat")
Expand Down
Loading
Loading