From fe051a3780d742102ad3ca968ef7df6902bfe748 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 25 Oct 2024 10:50:30 -0400 Subject: [PATCH] Add test for elasticsearch re-connection after network error & allow graceful shutdown (#40794) This commit reworks the `eslegclient.Connection` to accept a context in its `Connect` method, this allows the caller to cancel any in flight requests made by the connection by cancelling the context. The libbeat `outputs.Connectable` interface (used by `outputs.NetworkClient`) had to be updated to accept the context, which required refactoring in most of the outputs to also accept a context on connect. The worker from libbeat/publisher/pipeline/client_worker.go now uses a context for it's cancellation instead of a channel, this context is also used when creating a connection to Elasticsearch. An integration test is added to ensure the ES output can always recover from network errors. (cherry picked from commit 4dfef8b290af22fb2d8f96b65d2e416614441a2a) --- CHANGELOG.next.asciidoc | 1 + NOTICE.txt | 53 +++++++ filebeat/beater/filebeat.go | 37 +++-- filebeat/fileset/modules_integration_test.go | 5 +- filebeat/fileset/pipelines_test.go | 5 +- go.mod | 2 + go.sum | 4 + heartbeat/beater/heartbeat.go | 8 +- heartbeat/beater/heartbeat_test.go | 3 +- .../wrappers/monitorstate/testutil.go | 5 +- libbeat/cmd/instance/beat.go | 4 +- libbeat/esleg/eslegclient/api_mock_test.go | 13 +- libbeat/esleg/eslegclient/api_test.go | 12 +- .../esleg/eslegclient/bulkapi_mock_test.go | 6 +- libbeat/esleg/eslegclient/connection.go | 29 ++-- .../connection_integration_test.go | 35 ++--- libbeat/esleg/eslegclient/connection_test.go | 4 +- libbeat/esleg/eslegtest/util.go | 8 +- .../client_handler_integration_test.go | 5 +- libbeat/licenser/elastic_fetcher.go | 24 +-- .../elastic_fetcher_integration_test.go | 15 +- libbeat/licenser/elastic_fetcher_test.go | 47 +++++- .../monitoring/report/elasticsearch/client.go | 4 +- .../report/elasticsearch/elasticsearch.go | 5 +- libbeat/outputs/backoff.go | 4 +- libbeat/outputs/elasticsearch/client.go | 6 +- .../elasticsearch/client_integration_test.go | 8 +- .../elasticsearch/client_proxy_test.go | 5 +- libbeat/outputs/elasticsearch/client_test.go | 8 +- libbeat/outputs/failover.go | 4 +- libbeat/outputs/logstash/async.go | 4 +- libbeat/outputs/logstash/async_test.go | 4 +- .../logstash/logstash_integration_test.go | 9 +- libbeat/outputs/logstash/logstash_test.go | 8 +- libbeat/outputs/logstash/sync.go | 4 +- libbeat/outputs/logstash/sync_test.go | 4 +- libbeat/outputs/outputs.go | 2 +- libbeat/outputs/redis/backoff.go | 2 +- .../outputs/redis/redis_integration_test.go | 4 +- libbeat/publisher/pipeline/client_worker.go | 32 ++-- libbeat/publisher/pipeline/testing.go | 2 +- libbeat/template/load_integration_test.go | 13 +- .../tests/integration/elasticsearch_test.go | 148 ++++++++++++++++++ packetbeat/beater/packetbeat.go | 5 +- winlogbeat/beater/winlogbeat.go | 4 +- x-pack/winlogbeat/module/testing.go | 5 +- .../winlogbeat/module/wintest/docker_test.go | 4 +- .../module/wintest/simulate_test.go | 5 +- 48 files changed, 481 insertions(+), 147 deletions(-) create mode 100644 libbeat/tests/integration/elasticsearch_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 81879622f8a..00941c1fbfd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] - Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356] +- Ensure Elasticsearch output can always recover from network errors {pull}40794[40794] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 337c6330c3b..4d7afd34945 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -16257,6 +16257,29 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/mito@v1.15.0/LI limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/elastic/mock-es +Version: v0.0.0-20240712014503-e5b47ece0015 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/elastic/mock-es@v0.0.0-20240712014503-e5b47ece0015/LICENSE: + +Copyright 2024 Elasticsearch B.V. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/tk-btf Version: v0.1.0 @@ -48611,6 +48634,36 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/mileusna/useragent +Version: v1.3.4 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/mileusna/useragent@v1.3.4/LICENSE.md: + +MIT License + +Copyright (c) 2017 Miloš Mileusnić + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- Dependency : github.com/minio/asm2plan9s Version: v0.0.0-20200509001527-cdd76441f9d8 diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 9d9cb220d4e..815b6fabfde 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -18,6 +18,7 @@ package beater import ( + "context" "flag" "fmt" "path/filepath" @@ -195,14 +196,16 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { overwritePipelines := true b.OverwritePipelinesCallback = func(esConfig *conf.C) error { - esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat") + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat") if err != nil { return err } // When running the subcommand setup, configuration from modules.d directories // have to be loaded using cfg.Reloader. Otherwise those configurations are skipped. - pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config()) + pipelineLoaderFactory := newPipelineLoaderFactory(ctx, b.Config.Output.Config()) enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1) forceEnableModuleFilesets, _ := b.BeatConfig.Bool("config.modules.force_enable_module_filesets", -1) filesetOverrides := fileset.FilesetOverrides{ @@ -322,14 +325,6 @@ func (fb *Filebeat) Run(b *beat.Beat) error { outDone := make(chan struct{}) // outDone closes down all active pipeline connections pipelineConnector := channel.NewOutletFactory(outDone).Create - // Create a ES connection factory for dynamic modules pipeline loading - var pipelineLoaderFactory fileset.PipelineLoaderFactory - if b.Config.Output.Name() == "elasticsearch" { - pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config()) - } else { - logp.Warn(pipelinesWarning) - } - inputsLogger := logp.NewLogger("input") v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore) v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType) @@ -350,8 +345,22 @@ func (fb *Filebeat) Run(b *beat.Beat) error { compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader), input.NewRunnerFactory(pipelineConnector, registrar, fb.done), )) - moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) + // Create a ES connection factory for dynamic modules pipeline loading + var pipelineLoaderFactory fileset.PipelineLoaderFactory + // The pipelineFactory needs a context to control the connections to ES, + // when the pipelineFactory/ESClient are not needed any more the context + // must be cancelled. This pipeline factory will be used by the moduleLoader + // that is run by a crawler, whenever this crawler is stopped we also cancel + // the context. + pipelineFactoryCtx, cancelPipelineFactoryCtx := context.WithCancel(context.Background()) + defer cancelPipelineFactoryCtx() + if b.Config.Output.Name() == "elasticsearch" { + pipelineLoaderFactory = newPipelineLoaderFactory(pipelineFactoryCtx, b.Config.Output.Config()) + } else { + logp.Warn(pipelinesWarning) + } + moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) @@ -389,6 +398,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules) if err != nil { crawler.Stop() + cancelPipelineFactoryCtx() return fmt.Errorf("Failed to start crawler: %w", err) } @@ -444,6 +454,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { modules.Stop() adiscover.Stop() crawler.Stop() + cancelPipelineFactoryCtx() timeout := fb.config.ShutdownTimeout // Checks if on shutdown it should wait for all events to be published @@ -487,9 +498,9 @@ func (fb *Filebeat) Stop() { } // Create a new pipeline loader (es client) factory -func newPipelineLoaderFactory(esConfig *conf.C) fileset.PipelineLoaderFactory { +func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.PipelineLoaderFactory { pipelineLoaderFactory := func() (fileset.PipelineLoader, error) { - esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat") + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat") if err != nil { return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err) } diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 0d5ad2172c0..ffb149e53b3 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -20,6 +20,7 @@ package fileset import ( + "context" "encoding/json" "path/filepath" "testing" @@ -268,7 +269,9 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { conn.Encoder = eslegclient.NewJSONEncoder(nil, false) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) if err != nil { t.Fatal(err) panic(err) // panic in case TestLogger did not stop test diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index a358b0da9be..ac6aa5035de 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -20,6 +20,7 @@ package fileset import ( + "context" "net/http" "net/http/httptest" "testing" @@ -101,7 +102,9 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }) require.NoError(t, err) - err = testESClient.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = testESClient.Connect(ctx) require.NoError(t, err) err = testRegistry.LoadPipelines(testESClient, false) diff --git a/go.mod b/go.mod index d176ad49a47..2a866141116 100644 --- a/go.mod +++ b/go.mod @@ -195,6 +195,7 @@ require ( github.com/elastic/go-quark v0.2.0 github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 github.com/elastic/mito v1.15.0 + github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 github.com/elastic/tk-btf v0.1.0 github.com/elastic/toutoumomoma v0.0.0-20240626215117-76e39db18dfb github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 @@ -340,6 +341,7 @@ require ( github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/mileusna/useragent v1.3.4 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect diff --git a/go.sum b/go.sum index 1f09e40f8da..0b8530f2327 100644 --- a/go.sum +++ b/go.sum @@ -383,6 +383,8 @@ github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/u github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/mito v1.15.0 h1:MicOxLSVkgU2Aonbh3i+++66Wl5wvD8y9gALK8PQDYs= github.com/elastic/mito v1.15.0/go.mod h1:J+wCf4HccW2YoSFmZMGu+d06gN+WmnIlj5ehBqine74= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 h1:z8cC8GASpPo8yKlbnXI36HQ/BM9wYjhBPNbDjAWm0VU= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015/go.mod h1:qH9DX/Dmflz6EAtaks/+2SsdQzecVAKE174Zl66hk7E= github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA= github.com/elastic/pkcs8 v1.0.0/go.mod h1:ipsZToJfq1MxclVTwpG7U/bgeDtf+0HkUiOxebk95+0= github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3 h1:FzA0/n4iMt8ojGDGRoiFPSHFvvdVIvxOxyLtiFnrLBM= @@ -703,6 +705,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/miekg/dns v1.1.22/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.61 h1:nLxbwF3XxhwVSm8g9Dghm9MHPaUZuqhPiGL+675ZmEs= github.com/miekg/dns v1.1.61/go.mod h1:mnAarhS3nWaW+NVP2wTkYVIZyHNJ098SJZUki3eykwQ= +github.com/mileusna/useragent v1.3.4 h1:MiuRRuvGjEie1+yZHO88UBYg8YBC/ddF6T7F56i3PCk= +github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 9a849f6bc7e..227b375ee90 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -88,7 +88,7 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() { // Connect to ES and setup the State loader if the output is not managed by agent // Note this, intentionally, blocks until connected or max attempts reached - esClient, err := makeESClient(b.Config.Output.Config(), 3, 2*time.Second) + esClient, err := makeESClient(context.TODO(), b.Config.Output.Config(), 3, 2*time.Second) if err != nil { if parsedConfig.RunOnce { trace.Abort() @@ -275,7 +275,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { } // Backoff panics with 0 duration, set to smallest unit - esClient, err := makeESClient(outCfg.Config(), 1, 1*time.Nanosecond) + esClient, err := makeESClient(context.TODO(), outCfg.Config(), 1, 1*time.Nanosecond) if err != nil { logp.L().Warnf("skipping monitor state management during managed reload: %w", err) } else { @@ -324,7 +324,7 @@ func (bt *Heartbeat) Stop() { } // makeESClient establishes an ES connection meant to load monitors' state -func makeESClient(cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) { +func makeESClient(ctx context.Context, cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) { var ( esClient *eslegclient.Connection err error @@ -353,7 +353,7 @@ func makeESClient(cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.C } for i := 0; i < attempts; i++ { - esClient, err = eslegclient.NewConnectedClient(newCfg, "Heartbeat") + esClient, err = eslegclient.NewConnectedClient(ctx, newCfg, "Heartbeat") if err == nil { connectDelay.Reset() return esClient, nil diff --git a/heartbeat/beater/heartbeat_test.go b/heartbeat/beater/heartbeat_test.go index 669811dc4c8..279366a0e7e 100644 --- a/heartbeat/beater/heartbeat_test.go +++ b/heartbeat/beater/heartbeat_test.go @@ -18,6 +18,7 @@ package beater import ( + "context" "testing" "time" @@ -39,7 +40,7 @@ func TestMakeESClient(t *testing.T) { anyAttempt := 1 anyDuration := 1 * time.Second - _, _ = makeESClient(origCfg, anyAttempt, anyDuration) + _, _ = makeESClient(context.Background(), origCfg, anyAttempt, anyDuration) timeout, err := origCfg.Int("timeout", -1) require.NoError(t, err) diff --git a/heartbeat/monitors/wrappers/monitorstate/testutil.go b/heartbeat/monitors/wrappers/monitorstate/testutil.go index 28a6c260655..be58dcdb924 100644 --- a/heartbeat/monitors/wrappers/monitorstate/testutil.go +++ b/heartbeat/monitors/wrappers/monitorstate/testutil.go @@ -18,6 +18,7 @@ package monitorstate import ( + "context" "encoding/json" "testing" @@ -50,7 +51,9 @@ func IntegES(t *testing.T) (esc *eslegclient.Connection) { conn.Encoder = eslegclient.NewJSONEncoder(nil, false) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) if err != nil { t.Fatal(err) panic(err) // panic in case TestLogger did not stop test diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 23efa03b489..6332ebac39b 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -898,7 +898,9 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er if !isElasticsearchOutput(outCfg.Name()) { return fmt.Errorf("index management requested but the Elasticsearch output is not configured/enabled") } - esClient, err := eslegclient.NewConnectedClient(outCfg.Config(), b.Info.Beat) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, outCfg.Config(), b.Info.Beat) if err != nil { return err } diff --git a/libbeat/esleg/eslegclient/api_mock_test.go b/libbeat/esleg/eslegclient/api_mock_test.go index 97834dcda51..231ee437800 100644 --- a/libbeat/esleg/eslegclient/api_mock_test.go +++ b/libbeat/esleg/eslegclient/api_mock_test.go @@ -20,6 +20,7 @@ package eslegclient import ( + "context" "encoding/json" "fmt" "net/http" @@ -63,14 +64,14 @@ func TestOneHostSuccessResp(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", } _, resp, err := client.Index(index, "test", "1", params, body) if err != nil { - t.Errorf("Index() returns error: %s", err) + t.Fatalf("Index() returns error: %s", err) } if !resp.Created { t.Errorf("Index() fails: %s", resp) @@ -89,8 +90,10 @@ func TestOneHost500Resp(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := newTestConnection(server.URL) - err := client.Connect() + client := newTestConnection(t, server.URL) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err := client.Connect(ctx) if err != nil { t.Fatalf("Failed to connect: %v", err) } @@ -121,7 +124,7 @@ func TestOneHost503Resp(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", diff --git a/libbeat/esleg/eslegclient/api_test.go b/libbeat/esleg/eslegclient/api_test.go index 6c7dd675ccf..0bd0f5341b5 100644 --- a/libbeat/esleg/eslegclient/api_test.go +++ b/libbeat/esleg/eslegclient/api_test.go @@ -19,6 +19,7 @@ package eslegclient import ( + "context" "encoding/json" "testing" @@ -170,11 +171,20 @@ func TestReadSearchResult_invalid(t *testing.T) { assert.Error(t, err) } -func newTestConnection(url string) *Connection { +// newTestConnection creates a new connection for testing +// +//nolint:unused // it's used by files with the !integration constraint +func newTestConnection(t *testing.T, url string) *Connection { conn, _ := NewConnection(ConnectionSettings{ URL: url, }) conn.Encoder = NewJSONEncoder(nil, false) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := conn.Connect(ctx); err != nil { + t.Fatalf("cannot connect to Elasticsearch: %s", err) + } + return conn } diff --git a/libbeat/esleg/eslegclient/bulkapi_mock_test.go b/libbeat/esleg/eslegclient/bulkapi_mock_test.go index 96434819eca..598204386f9 100644 --- a/libbeat/esleg/eslegclient/bulkapi_mock_test.go +++ b/libbeat/esleg/eslegclient/bulkapi_mock_test.go @@ -60,7 +60,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", @@ -95,7 +95,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", @@ -134,7 +134,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 6a22132080f..310aa853e34 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -67,7 +67,6 @@ type Connection struct { // requests will share the same cancellable context // so they can be aborted on Close() reqsContext context.Context - cancelReqs func() } // ConnectionSettings are the settings needed for a Connection @@ -82,7 +81,7 @@ type ConnectionSettings struct { Kerberos *kerberos.Config - OnConnectCallback func() error + OnConnectCallback func(*Connection) error Observer transport.IOStatser Parameters map[string]string @@ -109,7 +108,7 @@ type ESVersionData struct { BuildFlavor string `json:"build_flavor"` } -// NewConnection returns a new Elasticsearch client +// NewConnection returns a new Elasticsearch client. func NewConnection(s ConnectionSettings) (*Connection, error) { logger := logp.NewLogger("esclientleg") @@ -184,15 +183,12 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } - ctx, cancelFunc := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, HTTP: esClient, Encoder: encoder, log: logger, responseBuffer: bytes.NewBuffer(nil), - reqsContext: ctx, - cancelReqs: cancelFunc, } if s.APIKey != "" { @@ -255,7 +251,7 @@ func NewClients(cfg *cfg.C, beatname string) ([]Connection, error) { } // NewConnectedClient returns a non-thread-safe connection. Make sure for each goroutine you initialize a new connection. -func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) { +func NewConnectedClient(ctx context.Context, cfg *cfg.C, beatname string) (*Connection, error) { clients, err := NewClients(cfg, beatname) if err != nil { return nil, err @@ -264,7 +260,7 @@ func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) { errors := []string{} for _, client := range clients { - err = client.Connect() + err = client.Connect(ctx) if err != nil { const errMsg = "error connecting to Elasticsearch at %v: %v" client.log.Errorf(errMsg, client.URL, err) @@ -279,17 +275,22 @@ func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) { // Connect connects the client. It runs a GET request against the root URL of // the configured host, updates the known Elasticsearch version and calls -// globally configured handlers. -func (conn *Connection) Connect() error { +// globally configured handlers. The context is used to control the lifecycle +// of the HTTP requests/connections, the caller is responsible for cancelling +// the context to stop any in-flight requests. +func (conn *Connection) Connect(ctx context.Context) error { if conn.log == nil { conn.log = logp.NewLogger("esclientleg") } + + conn.reqsContext = ctx + if err := conn.getVersion(); err != nil { return err } if conn.OnConnectCallback != nil { - if err := conn.OnConnectCallback(); err != nil { + if err := conn.OnConnectCallback(conn); err != nil { return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %w", err) } } @@ -323,7 +324,7 @@ func (conn *Connection) Ping() (ESPingData, error) { return response, nil } -// Close closes a connection. +// Close closes any idle connections from the HTTP client. func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() return nil @@ -358,7 +359,9 @@ func (conn *Connection) Test(d testing.Driver) { }) } - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) d.Fatal("talk to server", err) version := conn.GetVersion() d.Info("version", version.String()) diff --git a/libbeat/esleg/eslegclient/connection_integration_test.go b/libbeat/esleg/eslegclient/connection_integration_test.go index b4e277ed1a6..b56360b4232 100644 --- a/libbeat/esleg/eslegclient/connection_integration_test.go +++ b/libbeat/esleg/eslegclient/connection_integration_test.go @@ -21,8 +21,7 @@ package eslegclient import ( "context" - "io/ioutil" - "math/rand" + "io" "net" "net/http" "net/http/httptest" @@ -34,17 +33,25 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" - "github.com/elastic/beats/v7/libbeat/outputs" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) func TestConnect(t *testing.T) { conn := getTestingElasticsearch(t) - err := conn.Connect() + err := conn.Connect(context.Background()) assert.NoError(t, err) } +func TestConnectionCanBeClosedAndReused(t *testing.T) { + conn := getTestingElasticsearch(t) + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, conn.Connect(ctx), "first connect must succeed") + assert.NoError(t, conn.Close(), "close must succeed") + cancel() + assert.NoError(t, conn.Connect(context.Background()), "calling connect after close must succeed") +} + func TestConnectWithProxy(t *testing.T) { wrongPort, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) @@ -66,7 +73,9 @@ func TestConnectWithProxy(t *testing.T) { "timeout": 5, // seconds }) require.NoError(t, err) - assert.Error(t, client.Connect(), "it should fail without proxy") + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + assert.Error(t, client.Connect(ctx), "it should fail without proxy") client, err = connectTestEs(t, map[string]interface{}{ "hosts": "http://" + wrongPort.Addr().String(), @@ -74,7 +83,7 @@ func TestConnectWithProxy(t *testing.T) { "timeout": 5, // seconds }) require.NoError(t, err) - assert.NoError(t, client.Connect()) + assert.NoError(t, client.Connect(ctx)) } func connectTestEs(t *testing.T, cfg interface{}) (*Connection, error) { @@ -139,16 +148,6 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *Connection { return conn } -func randomClient(grp outputs.Group) outputs.NetworkClient { - L := len(grp.Clients) - if L == 0 { - panic("no elasticsearch client") - } - - client := grp.Clients[rand.Intn(L)] - return client.(outputs.NetworkClient) -} - // startTestProxy starts a proxy that redirects all connections to the specified URL func startTestProxy(t *testing.T, redirectURL string) *httptest.Server { t.Helper() @@ -166,14 +165,14 @@ func startTestProxy(t *testing.T, redirectURL string) *httptest.Server { require.NoError(t, err) defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) require.NoError(t, err) for _, header := range []string{"Content-Encoding", "Content-Type"} { w.Header().Set(header, resp.Header.Get(header)) } w.WriteHeader(resp.StatusCode) - w.Write(body) + w.Write(body) //nolint: errcheck // It's a test, we can ignore this error })) return proxy } diff --git a/libbeat/esleg/eslegclient/connection_test.go b/libbeat/esleg/eslegclient/connection_test.go index 19fe67e9f55..77cbcdda674 100644 --- a/libbeat/esleg/eslegclient/connection_test.go +++ b/libbeat/esleg/eslegclient/connection_test.go @@ -162,7 +162,9 @@ func TestUserAgentHeader(t *testing.T) { testCase.connSettings.URL = server.URL conn, err := NewConnection(testCase.connSettings) require.NoError(t, err) - require.NoError(t, conn.Connect(), "conn.Connect must not return an error") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, conn.Connect(ctx), "conn.Connect must not return an error") }) } } diff --git a/libbeat/esleg/eslegtest/util.go b/libbeat/esleg/eslegtest/util.go index 28f33fde2dc..e86ca14363d 100644 --- a/libbeat/esleg/eslegtest/util.go +++ b/libbeat/esleg/eslegtest/util.go @@ -18,6 +18,7 @@ package eslegtest import ( + "context" "fmt" "os" ) @@ -32,20 +33,23 @@ const ( // TestLogger is used to report fatal errors to the testing framework. type TestLogger interface { Fatal(args ...interface{}) + Cleanup(f func()) } // Connectable defines the minimum interface required to initialize a connected // client. type Connectable interface { - Connect() error + Connect(context.Context) error } // InitConnection initializes a new connection if the no error value from creating the // connection instance is reported. // The test logger will be used if an error is found. func InitConnection(t TestLogger, conn Connectable, err error) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) if err == nil { - err = conn.Connect() + err = conn.Connect(ctx) } if err != nil { diff --git a/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go b/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go index 67b9a1cfb06..6f81bf98a02 100644 --- a/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go +++ b/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go @@ -20,6 +20,7 @@ package lifecycle import ( + "context" "fmt" "os" "testing" @@ -141,7 +142,9 @@ func newRawESClient(t *testing.T) ESClient { t.Fatal(err) } - if err := client.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { t.Fatalf("Failed to connect to Test Elasticsearch instance: %v", err) } diff --git a/libbeat/licenser/elastic_fetcher.go b/libbeat/licenser/elastic_fetcher.go index bcbe68a938f..1f869d61fef 100644 --- a/libbeat/licenser/elastic_fetcher.go +++ b/libbeat/licenser/elastic_fetcher.go @@ -18,10 +18,10 @@ package licenser import ( + "context" "encoding/json" "errors" "fmt" - "math/rand" "net/http" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" @@ -98,6 +98,7 @@ func (f *ElasticFetcher) parseJSON(b []byte) (License, error) { // esClientMux is taking care of round robin request over an array of elasticsearch client, note that // calling request is not threadsafe. +// nolint: unused // it's used on Linux type esClientMux struct { clients []eslegclient.Connection idx int @@ -107,6 +108,7 @@ type esClientMux struct { // at the end of the function call, if an error occur we return the error and will pick up the next client on the // next call. Not that we just round robin between hosts, any backoff strategy should be handled by // the consumer of this type. +// nolint: unused // it's used on Linux func (mux *esClientMux) Request( method, path string, pipeline string, @@ -115,7 +117,9 @@ func (mux *esClientMux) Request( ) (int, []byte, error) { c := mux.clients[mux.idx] - if err := c.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := c.Connect(ctx); err != nil { return 0, nil, err } defer c.Close() @@ -127,19 +131,3 @@ func (mux *esClientMux) Request( } return status, response, err } - -// newESClientMux takes a list of clients and randomize where we start and the list of host we are -// querying. -func newESClientMux(clients []eslegclient.Connection) *esClientMux { - // randomize where we start - idx := rand.Intn(len(clients)) - - // randomize the list of round robin hosts. - tmp := make([]eslegclient.Connection, len(clients)) - copy(tmp, clients) - rand.Shuffle(len(tmp), func(i, j int) { - tmp[i], tmp[j] = tmp[j], tmp[i] - }) - - return &esClientMux{idx: idx, clients: tmp} -} diff --git a/libbeat/licenser/elastic_fetcher_integration_test.go b/libbeat/licenser/elastic_fetcher_integration_test.go index f303bfe0d8c..7560ebb394d 100644 --- a/libbeat/licenser/elastic_fetcher_integration_test.go +++ b/libbeat/licenser/elastic_fetcher_integration_test.go @@ -20,6 +20,7 @@ package licenser import ( + "context" "testing" "time" @@ -35,7 +36,7 @@ const ( elasticsearchPort = "9200" ) -func getTestClient() *eslegclient.Connection { +func getTestClient(t *testing.T) *eslegclient.Connection { transport := httpcommon.DefaultHTTPTransportSettings() transport.Timeout = 60 * time.Second @@ -47,16 +48,22 @@ func getTestClient() *eslegclient.Connection { CompressionLevel: 3, Transport: transport, }) - if err != nil { - panic(err) + t.Fatalf("cannot get new ES connection: %s", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to ES: %s", err) } + return client } // Sanity check for schema change on the HTTP response from a live Elasticsearch instance. func TestElasticsearch(t *testing.T) { - f := NewElasticFetcher(getTestClient()) + f := NewElasticFetcher(getTestClient(t)) license, err := f.Fetch() if !assert.NoError(t, err) { return diff --git a/libbeat/licenser/elastic_fetcher_test.go b/libbeat/licenser/elastic_fetcher_test.go index 731bf5c0618..82ca7e47ca2 100644 --- a/libbeat/licenser/elastic_fetcher_test.go +++ b/libbeat/licenser/elastic_fetcher_test.go @@ -18,7 +18,8 @@ package licenser import ( - "io/ioutil" + "context" + "fmt" "net/http" "net/http/httptest" "os" @@ -26,15 +27,41 @@ import ( "testing" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/version" "github.com/stretchr/testify/assert" ) +func esRootHandler(w http.ResponseWriter, r *http.Request) { + respStr := fmt.Sprintf(` +{ + "name" : "582a64c35c16", + "cluster_name" : "docker-cluster", + "cluster_uuid" : "fnanWPBeSNS9KZ930Z5JmA", + "version" : { + "number" : "%s", + "build_flavor" : "default", + "build_type" : "docker", + "build_hash" : "14b7170921f2f0e4109255b83cb9af175385d87f", + "build_date" : "2024-08-23T00:26:58.284513650Z", + "build_snapshot" : true, + "lucene_version" : "9.11.1", + "minimum_wire_compatibility_version" : "7.17.0", + "minimum_index_compatibility_version" : "7.0.0" + }, + "tagline" : "You Know, for Search" +}`, version.GetDefaultVersion()) + + w.Write([]byte(respStr)) +} + func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *eslegclient.Connection) { mux := http.NewServeMux() - mux.Handle("/_license/", http.HandlerFunc(handler)) + mux.Handle("/", http.HandlerFunc(esRootHandler)) + mux.Handle("/_license/", handler) server := httptest.NewServer(mux) + t.Cleanup(server.Close) client, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ URL: server.URL, @@ -43,13 +70,19 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv t.Fatalf("could not create the elasticsearch client, error: %s", err) } + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to ES: %s", err) + } + return server, client } func TestParseJSON(t *testing.T) { t.Run("OSS release of Elasticsearch (Code: 405)", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "Method Not Allowed", 405) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) } s, c := newServerClientPair(t, h) defer s.Close() @@ -75,7 +108,7 @@ func TestParseJSON(t *testing.T) { t.Run("malformed JSON", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("hello bad JSON")) + _, _ = w.Write([]byte("hello bad JSON")) } s, c := newServerClientPair(t, h) defer s.Close() @@ -88,7 +121,7 @@ func TestParseJSON(t *testing.T) { t.Run("401 response", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "Unauthorized", 401) + http.Error(w, "Unauthorized", http.StatusUnauthorized) } s, c := newServerClientPair(t, h) defer s.Close() @@ -113,14 +146,14 @@ func TestParseJSON(t *testing.T) { }) t.Run("200 response", func(t *testing.T) { - filepath.Walk("testdata/", func(path string, i os.FileInfo, err error) error { + _ = filepath.Walk("testdata/", func(path string, i os.FileInfo, err error) error { if i.IsDir() { return nil } t.Run(path, func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - json, err := ioutil.ReadFile(path) + json, err := os.ReadFile(path) if err != nil { t.Fatal("could not read JSON") } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 56f56ac8e1e..28be1c37917 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -59,10 +59,10 @@ func newPublishClient( return p, nil } -func (c *publishClient) Connect() error { +func (c *publishClient) Connect(ctx context.Context) error { c.log.Debug("Monitoring client: connect.") - err := c.es.Connect() + err := c.es.Connect(ctx) if err != nil { return fmt.Errorf("cannot connect underlying Elasticsearch client: %w", err) } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index da3f6135110..61e051d1222 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -18,6 +18,7 @@ package elasticsearch import ( + "context" "errors" "io" "math/rand" @@ -214,8 +215,10 @@ func (r *reporter) initLoop(c config) { for { // Select one configured endpoint by random and check if xpack is available + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() client := r.out[rand.Intn(len(r.out))] - err := client.Connect() + err := client.Connect(ctx) if err == nil { closing(log, client) break diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 3c7f8e51e10..87d94bb66d0 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -45,8 +45,8 @@ func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient { } } -func (b *backoffClient) Connect() error { - err := b.client.Connect() +func (b *backoffClient) Connect(ctx context.Context) error { + err := b.client.Connect(ctx) backoff.WaitOnError(b.backoff, err) return err } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 70c4cc1cce5..56f28cdbf30 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -133,7 +133,7 @@ func NewClient( return nil, err } - conn.OnConnectCallback = func() error { + conn.OnConnectCallback = func(conn *eslegclient.Connection) error { globalCallbackRegistry.mutex.Lock() defer globalCallbackRegistry.mutex.Unlock() @@ -532,8 +532,8 @@ func (client *Client) applyItemStatus( return true } -func (client *Client) Connect() error { - return client.conn.Connect() +func (client *Client) Connect(ctx context.Context) error { + return client.conn.Connect(ctx) } func (client *Client) Close() error { diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 765fd3eec5a..f4fb0e4f9a9 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -429,8 +429,12 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu } client := randomClient(output).(clientWrap).Client().(*Client) - // Load version number - _ = client.Connect() + // Load version ctx + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to ES: %s", err) + } return client, client } diff --git a/libbeat/outputs/elasticsearch/client_proxy_test.go b/libbeat/outputs/elasticsearch/client_proxy_test.go index c2f23f34052..bd6739c3bf0 100644 --- a/libbeat/outputs/elasticsearch/client_proxy_test.go +++ b/libbeat/outputs/elasticsearch/client_proxy_test.go @@ -22,6 +22,7 @@ package elasticsearch import ( "bytes" + "context" "fmt" "net/http" "net/http/httptest" @@ -209,10 +210,12 @@ func doClientPing(t *testing.T) { client, err := NewClient(clientSettings, nil) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // This ping won't succeed; we aren't testing end-to-end communication // (which would require a lot more setup work), we just want to make sure // the client is pointed at the right server or proxy. - _ = client.Connect() + _ = client.Connect(ctx) } // serverState contains the state of the http listeners for proxy tests, diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 5124c0defe9..abda06a02ee 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -748,8 +748,10 @@ func TestClientWithHeaders(t *testing.T) { }, nil) assert.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // simple ping - err = client.Connect() + err = client.Connect(ctx) assert.NoError(t, err) assert.Equal(t, 1, requestCount) @@ -943,11 +945,13 @@ func TestClientWithAPIKey(t *testing.T) { }, nil) assert.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // This connection will fail since the server doesn't return a valid // response. This is fine since we're just testing the headers in the // original client request. //nolint:errcheck // connection doesn't need to succeed - client.Connect() + client.Connect(ctx) assert.Equal(t, "ApiKey aHlva0hHNEJmV2s1dmlLWjE3Mlg6bzQ1SlVreXVTLS15aVNBdXV4bDhVdw==", headers.Get("Authorization")) } diff --git a/libbeat/outputs/failover.go b/libbeat/outputs/failover.go index 3e999e8321f..d69e01b03cc 100644 --- a/libbeat/outputs/failover.go +++ b/libbeat/outputs/failover.go @@ -54,7 +54,7 @@ func NewFailoverClient(clients []NetworkClient) NetworkClient { } } -func (f *failoverClient) Connect() error { +func (f *failoverClient) Connect(ctx context.Context) error { var ( next int active = f.active @@ -82,7 +82,7 @@ func (f *failoverClient) Connect() error { client := f.clients[next] f.active = next - return client.Connect() + return client.Connect(ctx) } func (f *failoverClient) Close() error { diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index b1e20a0e774..a980d1cef32 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -91,7 +91,7 @@ func newAsyncClient( } c.connect = func() error { - err := c.Client.Connect() + err := c.Client.ConnectContext(context.Background()) if err == nil { c.client, err = clientFactory(c.Client) } @@ -116,7 +116,7 @@ func makeClientFactory( } } -func (c *asyncClient) Connect() error { +func (c *asyncClient) Connect(ctx context.Context) error { c.log.Debug("connect") return c.connect() } diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index 6e2a102edf2..12d2edd124c 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -72,6 +72,8 @@ func newAsyncTestDriver(client outputs.NetworkClient) *testAsyncDriver { go func() { defer driver.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { cmd, ok := <-driver.ch if !ok { @@ -82,7 +84,7 @@ func newAsyncTestDriver(client outputs.NetworkClient) *testAsyncDriver { case driverCmdQuit: return case driverCmdConnect: - driver.client.Connect() + driver.client.Connect(ctx) case driverCmdClose: driver.client.Close() case driverCmdPublish: diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 442145835df..286717e49ed 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -115,6 +115,11 @@ func esConnect(t *testing.T, index string) *esConnection { Password: password, Transport: transport, }) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to LS: %s:", err) + } if err != nil { t.Fatal(err) } @@ -207,7 +212,9 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { // The Elasticsearch output requires events to be encoded // before calling Publish, so create an event encoder. es.encoder = grp.EncoderFactory() - es.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + es.Connect(ctx) return es } diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index fa1b57fb841..5be2054cf2a 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -116,7 +116,9 @@ func testConnectionType( output := makeOutputer() t.Logf("new outputter: %v", output) - err := output.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := output.Connect(ctx) if err != nil { t.Error("test client failed to connect: ", err) return @@ -186,8 +188,10 @@ func newTestLumberjackOutput( t.Fatalf("init logstash output plugin failed: %v", err) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() client := grp.Clients[0].(outputs.NetworkClient) - if err := client.Connect(); err != nil { + if err := client.Connect(ctx); err != nil { t.Fatalf("Client failed to connected: %v", err) } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index d24ab1ebb97..6a456907365 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -74,9 +74,9 @@ func newSyncClient( return c, nil } -func (c *syncClient) Connect() error { +func (c *syncClient) Connect(ctx context.Context) error { c.log.Debug("connect") - err := c.Client.Connect() + err := c.Client.ConnectContext(ctx) if err != nil { return err } diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index d0410c2a8a7..0d8a3e0f513 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -86,6 +86,8 @@ func newClientTestDriver(client outputs.NetworkClient) *testSyncDriver { go func() { defer driver.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { cmd, ok := <-driver.ch if !ok { @@ -96,7 +98,7 @@ func newClientTestDriver(client outputs.NetworkClient) *testSyncDriver { case driverCmdQuit: return case driverCmdConnect: - driver.client.Connect() + driver.client.Connect(ctx) case driverCmdClose: driver.client.Close() case driverCmdPublish: diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index 0fdf4d9407b..3cfdb5aef66 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -57,5 +57,5 @@ type Connectable interface { // The connection attempt shall report an error if no connection could been // established within the given time interval. A timeout value of 0 == wait // forever. - Connect() error + Connect(context.Context) error } diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go index ef3dcd7cc48..2abc1f846f0 100644 --- a/libbeat/outputs/redis/backoff.go +++ b/libbeat/outputs/redis/backoff.go @@ -60,7 +60,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient { } } -func (b *backoffClient) Connect() error { +func (b *backoffClient) Connect(ctx context.Context) error { err := b.client.Connect() if err != nil { // give the client a chance to promote an internal error to a network error. diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index dfd48dc75d2..6fd3e09397a 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -336,7 +336,9 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) outputs.Cli } client := out.Clients[0].(outputs.NetworkClient) - if err := client.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { t.Fatalf("Failed to connect to redis host: %v", err) } diff --git a/libbeat/publisher/pipeline/client_worker.go b/libbeat/publisher/pipeline/client_worker.go index e05658d9749..3e6b8202dd2 100644 --- a/libbeat/publisher/pipeline/client_worker.go +++ b/libbeat/publisher/pipeline/client_worker.go @@ -29,8 +29,8 @@ import ( ) type worker struct { - qu chan publisher.Batch - done chan struct{} + qu chan publisher.Batch + cancel func() } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -50,14 +50,15 @@ type netClientWorker struct { } func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { + ctx, cancel := context.WithCancel(context.Background()) w := worker{ - qu: qu, - done: make(chan struct{}), + qu: qu, + cancel: cancel, } var c interface { outputWorker - run() + run(context.Context) } if nc, ok := client.(outputs.NetworkClient); ok { @@ -71,12 +72,12 @@ func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger log c = &clientWorker{worker: w, client: client} } - go c.run() + go c.run(ctx) return c } func (w *worker) close() { - close(w.done) + w.cancel() } func (w *clientWorker) Close() error { @@ -84,20 +85,20 @@ func (w *clientWorker) Close() error { return w.client.Close() } -func (w *clientWorker) run() { +func (w *clientWorker) run(ctx context.Context) { for { // We wait for either the worker to be closed or for there to be a batch of // events to publish. select { - case <-w.done: + case <-ctx.Done(): return case batch := <-w.qu: if batch == nil { continue } - if err := w.client.Publish(context.TODO(), batch); err != nil { + if err := w.client.Publish(ctx, batch); err != nil { return } } @@ -109,7 +110,7 @@ func (w *netClientWorker) Close() error { return w.client.Close() } -func (w *netClientWorker) run() { +func (w *netClientWorker) run(ctx context.Context) { var ( connected = false reconnectAttempts = 0 @@ -120,7 +121,7 @@ func (w *netClientWorker) run() { // events to publish. select { - case <-w.done: + case <-ctx.Done(): return case batch := <-w.qu: @@ -139,7 +140,7 @@ func (w *netClientWorker) run() { w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) } - err := w.client.Connect() + err := w.client.Connect(ctx) connected = err == nil if connected { w.logger.Infof("Connection to %v established", w.client) @@ -152,15 +153,14 @@ func (w *netClientWorker) run() { continue } - if err := w.publishBatch(batch); err != nil { + if err := w.publishBatch(ctx, batch); err != nil { connected = false } } } } -func (w *netClientWorker) publishBatch(batch publisher.Batch) error { - ctx := context.Background() +func (w *netClientWorker) publishBatch(ctx context.Context, batch publisher.Batch) error { if w.tracer != nil && w.tracer.Recording() { tx := w.tracer.StartTransaction("publish", "output") defer tx.End() diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index ca357646a81..61977377a75 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -54,7 +54,7 @@ type mockNetworkClient struct { outputs.Client } -func (c *mockNetworkClient) Connect() error { return nil } +func (c *mockNetworkClient) Connect(_ context.Context) error { return nil } type mockBatch struct { mu sync.Mutex diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index b3aafad5d69..4705f9be5a8 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -20,6 +20,7 @@ package template import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -66,7 +67,9 @@ func newTestSetup(t *testing.T, cfg TemplateConfig) *testSetup { cfg.Name = fmt.Sprintf("load-test-%+v", rand.Int()) } client := getTestingElasticsearch(t) - if err := client.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { t.Fatal(err) } handler := &mockClientHandler{serverless: false, mode: lifecycle.ILM} @@ -554,7 +557,9 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { conn.Encoder = eslegclient.NewJSONEncoder(nil, false) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) if err != nil { t.Fatal(err) panic(err) // panic in case TestLogger did not stop test @@ -586,7 +591,9 @@ func getMockElasticsearchClient(t *testing.T, method, endpoint string, code int, Transport: httpcommon.DefaultHTTPTransportSettings(), }) require.NoError(t, err) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) require.NoError(t, err) return conn } diff --git a/libbeat/tests/integration/elasticsearch_test.go b/libbeat/tests/integration/elasticsearch_test.go new file mode 100644 index 00000000000..6d8d1a46a08 --- /dev/null +++ b/libbeat/tests/integration/elasticsearch_test.go @@ -0,0 +1,148 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "errors" + "io" + "net/http" + "testing" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/rcrowley/go-metrics" + "github.com/stretchr/testify/require" + + "github.com/elastic/mock-es/pkg/api" +) + +var esCfg = ` +mockbeat: +logging: + level: debug + selectors: + - publisher_pipeline_output + - esclientleg +queue.mem: + events: 4096 + flush.min_events: 8 + flush.timeout: 0.1s +output.elasticsearch: + allow_older_versions: true + hosts: + - "http://localhost:4242" + backoff: + init: 0.1s + max: 0.2s +` + +func TestESOutputRecoversFromNetworkError(t *testing.T) { + mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test") + mockbeat.WriteConfigFile(esCfg) + + s, mr := startMockES(t, "localhost:4242") + + mockbeat.Start() + + // 1. Wait for one _bulk call + waitForEventToBePublished(t, mr) + + // 2. Stop the mock-es server + if err := s.Close(); err != nil { + t.Fatalf("cannot close mock-es server: %s", err) + } + + // 3. Wait for connection error logs + mockbeat.WaitForLogs( + `Get \"http://localhost:4242\": dial tcp 127.0.0.1:4242: connect: connection refused`, + 2*time.Second, + "did not find connection refused error") + + mockbeat.WaitForLogs( + "Attempting to reconnect to backoff(elasticsearch(http://localhost:4242)) with 2 reconnect attempt(s)", + 2*time.Second, + "did not find two tries to reconnect") + + // 4. Restart mock-es on the same port + s, mr = startMockES(t, "localhost:4242") + + // 5. Wait for reconnection logs + mockbeat.WaitForLogs( + "Connection to backoff(elasticsearch(http://localhost:4242)) established", + 5*time.Second, // There is a backoff, so ensure we wait enough + "did not find re connection confirmation") + + // 6. Ensure one new call to _bulk is made + waitForEventToBePublished(t, mr) + s.Close() +} + +func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) { + uid := uuid.Must(uuid.NewV4()) + mr := metrics.NewRegistry() + es := api.NewAPIHandler(uid, "foo2", mr, time.Now().Add(24*time.Hour), 0, 0, 0, 0, 0) + + s := http.Server{Addr: addr, Handler: es, ReadHeaderTimeout: time.Second} + go func() { + if err := s.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) { + t.Errorf("could not start mock-es server: %s", err) + } + }() + + require.Eventually(t, func() bool { + resp, err := http.Get("http://" + addr) //nolint: noctx // It's just a test + if err != nil { + //nolint: errcheck // We're just draining the body, we can ignore the error + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return false + } + return true + }, + time.Second, time.Millisecond, "mock-es server did not start on '%s'", addr) + + return &s, mr +} + +// waitForEventToBePublished waits for at least one event published +// by inspecting the count for `bulk.create.total` in `mr`. Once +// the counter is > 1, waitForEventToBePublished returns. If that +// does not happen within 10min, then the test fails with a call to +// t.Fatal. +func waitForEventToBePublished(t *testing.T, mr metrics.Registry) { + t.Helper() + require.Eventually(t, func() bool { + total := mr.Get("bulk.create.total") + if total == nil { + return false + } + + sc, ok := total.(*metrics.StandardCounter) + if !ok { + t.Fatalf("expecting 'bulk.create.total' to be *metrics.StandardCounter, but got '%T' instead", + total, + ) + } + + return sc.Count() > 1 + }, + 10*time.Second, 100*time.Millisecond, + "at least one bulk request must be made") +} diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 6495a733379..e12573f8406 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -18,6 +18,7 @@ package beater import ( + "context" "flag" "fmt" "sync" @@ -111,7 +112,9 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { } overwritePipelines = config.OverwritePipelines b.OverwritePipelinesCallback = func(esConfig *conf.C) error { - esClient, err := eslegclient.NewConnectedClient(esConfig, "Packetbeat") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Packetbeat") if err != nil { return err } diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index e41aa54cb7f..4e6b2b3657d 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -108,7 +108,9 @@ func (eb *Winlogbeat) init(b *beat.Beat) error { } b.OverwritePipelinesCallback = func(esConfig *conf.C) error { overwritePipelines := config.OverwritePipelines - esClient, err := eslegclient.NewConnectedClient(esConfig, "Winlogbeat") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Winlogbeat") if err != nil { return err } diff --git a/x-pack/winlogbeat/module/testing.go b/x-pack/winlogbeat/module/testing.go index 3dc628b80a9..f1d38fceac8 100644 --- a/x-pack/winlogbeat/module/testing.go +++ b/x-pack/winlogbeat/module/testing.go @@ -5,6 +5,7 @@ package module import ( + "context" "encoding/json" "flag" "fmt" @@ -105,7 +106,9 @@ func testIngestPipeline(t *testing.T, pipeline, pattern string, p *params) { } defer conn.Close() - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) if err != nil { t.Fatalf("unexpected error making connection: %v", err) } diff --git a/x-pack/winlogbeat/module/wintest/docker_test.go b/x-pack/winlogbeat/module/wintest/docker_test.go index e45826f3b08..db7ab341a27 100644 --- a/x-pack/winlogbeat/module/wintest/docker_test.go +++ b/x-pack/winlogbeat/module/wintest/docker_test.go @@ -82,7 +82,9 @@ func TestDocker(t *testing.T) { } defer conn.Close() - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) if err != nil { t.Fatalf("unexpected error making connection: %v", err) } diff --git a/x-pack/winlogbeat/module/wintest/simulate_test.go b/x-pack/winlogbeat/module/wintest/simulate_test.go index 1bda1d5fb17..b54d12f1d96 100644 --- a/x-pack/winlogbeat/module/wintest/simulate_test.go +++ b/x-pack/winlogbeat/module/wintest/simulate_test.go @@ -11,6 +11,7 @@ package wintest_test import ( + "context" "encoding/json" "fmt" "os" @@ -72,7 +73,9 @@ func TestSimulate(t *testing.T) { } defer conn.Close() - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) if err != nil { t.Fatalf("unexpected error making connection: %v", err) }