From ca90dc6ec6b24fc30cd58b5035cd83387b35b2b8 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 19 Dec 2023 19:18:07 +0800 Subject: [PATCH] Revert "Revise wait_for_integration ES implementation (#12150)" This reverts commit 3e54384bf79d53c776a0a3d2f0480daa4f432521. --- internal/beater/beater.go | 46 +++++--------- internal/beater/beatertest/server.go | 3 +- internal/beater/checkintegration.go | 91 +++++++++++++++++----------- internal/beater/server_test.go | 46 +++++--------- internal/beater/waitready.go | 23 ++++--- 5 files changed, 99 insertions(+), 110 deletions(-) diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 330753510dd..aee736e3fa1 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -25,7 +25,6 @@ import ( "net/http" "os" "runtime" - "sync" "time" "github.com/dustin/go-humanize" @@ -43,9 +42,11 @@ import ( _ "google.golang.org/grpc/encoding/gzip" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/outputs" + esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" agentconfig "github.com/elastic/elastic-agent-libs/config" @@ -346,13 +347,7 @@ func (s *Runner) Run(ctx context.Context) error { // any events to Elasticsearch before the integration is ready. publishReady := make(chan struct{}) drain := make(chan struct{}) - startWaitReady := make(chan struct{}) - var waitReadyOnce sync.Once g.Go(func() error { - select { - case <-ctx.Done(): - case <-startWaitReady: - } if err := s.waitReady(ctx, kibanaClient, tracer); err != nil { // One or more preconditions failed; drop events. close(drain) @@ -363,25 +358,24 @@ func (s *Runner) Run(ctx context.Context) error { close(publishReady) return nil }) - prePublish := func(ctx context.Context) error { - waitReadyOnce.Do(func() { - close(startWaitReady) - }) + callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error { select { - case <-ctx.Done(): - return ctx.Err() - case <-drain: - return errServerShuttingDown case <-publishReady: + return nil + default: } - return nil + return errors.New("not ready for publishing events") + }) + if err != nil { + return err } + defer esoutput.DeregisterConnectCallback(callbackUUID) newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) { httpTransport, err := elasticsearch.NewHTTPTransport(cfg) if err != nil { return nil, err } - transport := &waitReadyRoundTripper{Transport: httpTransport, onBulk: prePublish} + transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain} return elasticsearch.NewClientParams(elasticsearch.ClientParams{ Config: cfg, Transport: transport, @@ -438,7 +432,7 @@ func (s *Runner) Run(ctx context.Context) error { // Create the BatchProcessor chain that is used to process all events, // including the metrics aggregated by APM Server. finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor( - tracer, newElasticsearchClient, memLimitGB, prePublish, + tracer, newElasticsearchClient, memLimitGB, ) if err != nil { return err @@ -653,9 +647,7 @@ func (s *Runner) waitReady( return errors.New("cannot wait for integration without either Kibana or Elasticsearch config") } preconditions = append(preconditions, func(ctx context.Context) error { - return checkIndexTemplatesInstalled( - ctx, kibanaClient, esOutputClient, s.config.DataStreams.Namespace, s.logger, - ) + return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger) }) } @@ -680,13 +672,12 @@ func (s *Runner) newFinalBatchProcessor( tracer *apm.Tracer, newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error), memLimit float64, - prePublish func(context.Context) error, ) (modelpb.BatchProcessor, func(context.Context) error, error) { monitoring.Default.Remove("libbeat") libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat") if s.elasticsearchOutputConfig == nil { - return s.newLibbeatFinalBatchProcessor(tracer, prePublish, libbeatMonitoringRegistry) + return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry) } stateRegistry := monitoring.GetNamespace("state").GetRegistry() @@ -838,7 +829,6 @@ func docappenderConfig( func (s *Runner) newLibbeatFinalBatchProcessor( tracer *apm.Tracer, - prePublish func(context.Context) error, libbeatMonitoringRegistry *monitoring.Registry, ) (modelpb.BatchProcessor, func(context.Context) error, error) { // When the publisher stops cleanly it will close its pipeline client, @@ -899,13 +889,7 @@ func (s *Runner) newLibbeatFinalBatchProcessor( } return acker.Wait(ctx) } - processor := modelprocessor.Chained{ - modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error { - return prePublish(ctx) - }), - publisher, - } - return processor, stop, nil + return publisher, stop, nil } const sourcemapIndex = ".apm-source-map" diff --git a/internal/beater/beatertest/server.go b/internal/beater/beatertest/server.go index 63109ac25ae..9812e3cb6e7 100644 --- a/internal/beater/beatertest/server.go +++ b/internal/beater/beatertest/server.go @@ -108,8 +108,7 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server { require.NoError(t, err) if !outputConfig.Output.IsSet() { err = cfg.Merge(map[string]any{ - "output.null": map[string]any{}, - "queue.mem.flush": map[string]any{"min_events": 1, "timeout": "1ns"}, + "output.null": map[string]any{}, }) require.NoError(t, err) } diff --git a/internal/beater/checkintegration.go b/internal/beater/checkintegration.go index 1db8bf3e84c..f0fe62ebc59 100644 --- a/internal/beater/checkintegration.go +++ b/internal/beater/checkintegration.go @@ -24,26 +24,23 @@ import ( "fmt" "io" "net/http" - "strings" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/apm-server/internal/elasticsearch" "github.com/elastic/apm-server/internal/kibana" - "github.com/elastic/go-elasticsearch/v8/typedapi/indices/createdatastream" - "github.com/elastic/go-elasticsearch/v8/typedapi/types" + "github.com/elastic/go-elasticsearch/v8/esapi" ) -// checkIndexTemplatesInstalled checks if the APM index templates are installed by querying the -// APM integration status via Kibana, or by attempting to create a data stream via Elasticsearch, -// returning nil if and only if it is installed. -func checkIndexTemplatesInstalled( +// checkIntegrationInstalled checks if the APM integration is installed by querying Kibana +// and/or Elasticsearch, returning nil if and only if it is installed. +func checkIntegrationInstalled( ctx context.Context, kibanaClient *kibana.Client, esClient *elasticsearch.Client, - namespace string, logger *logp.Logger, ) (err error) { defer func() { @@ -56,36 +53,37 @@ func checkIndexTemplatesInstalled( } } }() - if esClient != nil { - installed, err := checkCreateDataStream(ctx, esClient, namespace) - if err != nil { - return fmt.Errorf("error checking Elasticsearch index template setup: %w", err) - } - if !installed { - return errors.New("index templates not installed") - } - return nil - } if kibanaClient != nil { - installed, err := checkIntegrationInstalled(ctx, kibanaClient, logger) + installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger) if err != nil { // We only return the Kibana error if we have no Elasticsearch client, // as we may not have sufficient privileges to query the Fleet API. if esClient == nil { return fmt.Errorf("error querying Kibana for integration package status: %w", err) } - } - if !installed { + } else if !installed { // We were able to query Kibana, but the package is not yet installed. + // We should continue querying the package status via Kibana, as it is + // more authoritative than checking for index template installation. return errors.New("integration package not yet installed") } + // Fall through and query Elasticsearch (if we have a client). Kibana may prematurely + // report packages as installed: https://github.com/elastic/kibana/issues/108649 + } + if esClient != nil { + installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger) + if err != nil { + return fmt.Errorf("error querying Elasticsearch for integration index templates: %w", err) + } else if !installed { + return errors.New("integration index templates not installed") + } } return nil } // checkIntegrationInstalledKibana checks if the APM integration package // is installed by querying Kibana. -func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) { +func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) { resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil) if err != nil { return false, err @@ -108,22 +106,41 @@ func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client, return result.Response.Status == "installed", nil } -// checkCreateDataStream attempts to create a traces-apm- data stream, -// returning an error if it could not be created. This will fail if there is no -// index template matching the pattern. -func checkCreateDataStream(ctx context.Context, esClient *elasticsearch.Client, namespace string) (bool, error) { - if _, err := createdatastream.NewCreateDataStreamFunc(esClient)("traces-apm-" + namespace).Do(ctx); err != nil { - var esError *types.ElasticsearchError - if errors.As(err, &esError) { - cause := esError.ErrorCause - if cause.Type == "resource_already_exists_exception" { - return true, nil +func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient *elasticsearch.Client, _ *logp.Logger) (bool, error) { + // TODO(axw) generate the list of expected index templates. + templates := []string{ + "traces-apm", + "traces-apm.sampled", + "metrics-apm.app", + "metrics-apm.internal", + "logs-apm.error", + } + for _, intervals := range []string{"1m", "10m", "60m"} { + for _, ds := range []string{"metrics-apm.transaction", "metrics-apm.service_transaction", "metrics-apm.service_destination", "metrics-apm.service_summary"} { + templates = append(templates, fmt.Sprintf("%s.%s", ds, intervals)) + } + } + // IndicesGetIndexTemplateRequest accepts a slice of template names, + // but the REST API expects just one index template name. Query them + // in parallel. + g, ctx := errgroup.WithContext(ctx) + for _, template := range templates { + template := template // copy for closure + g.Go(func() error { + req := esapi.IndicesGetIndexTemplateRequest{Name: template} + resp, err := req.Do(ctx, esClient) + if err != nil { + return err } - if cause.Reason != nil && strings.HasPrefix(*cause.Reason, "no matching index template") { - return false, nil + defer resp.Body.Close() + + if resp.IsError() { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body)) } - } - return false, err + return nil + }) } - return true, nil + err := g.Wait() + return err == nil, err } diff --git a/internal/beater/server_test.go b/internal/beater/server_test.go index 98429cd9247..1249a41c4af 100644 --- a/internal/beater/server_test.go +++ b/internal/beater/server_test.go @@ -332,7 +332,7 @@ func TestServerOTLPGRPC(t *testing.T) { func TestServerWaitForIntegrationKibana(t *testing.T) { var requests int64 - requestCh := make(chan struct{}, 3) + requestCh := make(chan struct{}) mux := http.NewServeMux() mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`{"version":{"number":"1.2.3"}}`)) @@ -363,14 +363,6 @@ func TestServerWaitForIntegrationKibana(t *testing.T) { }, }))) - // Send some events to the server. They should be accepted and enqueued. - req := makeTransactionRequest(t, srv.URL) - req.Header.Add("Content-Type", "application/x-ndjson") - resp, err := srv.Client.Do(req) - assert.NoError(t, err) - assert.Equal(t, http.StatusAccepted, resp.StatusCode) - resp.Body.Close() - timeout := time.After(10 * time.Second) for i := 0; i < 3; i++ { select { @@ -395,8 +387,8 @@ func TestServerWaitForIntegrationKibana(t *testing.T) { func TestServerWaitForIntegrationElasticsearch(t *testing.T) { var mu sync.Mutex - var createDataStreamRequests int - createDataStreamRequestsCh := make(chan int) + var tracesRequests int + tracesRequestsCh := make(chan int) bulkCh := make(chan struct{}, 1) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { @@ -405,24 +397,17 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { // elasticsearch client to send bulk requests. fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) - mux.HandleFunc("/_data_stream/", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) { mu.Lock() defer mu.Unlock() - name := path.Base(r.URL.Path) - if name != "traces-apm-testing" { - panic("unexpected data stream name: " + name) - } - createDataStreamRequests++ - switch createDataStreamRequests { - case 1: - w.WriteHeader(500) - case 2: - w.WriteHeader(400) - w.Write([]byte(`{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"}],"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"},"status":400}`)) - case 3: - w.Write([]byte(`{"acknowledged":true}`)) + template := path.Base(r.URL.Path) + if template == "traces-apm" { + tracesRequests++ + if tracesRequests == 1 { + w.WriteHeader(404) + } + tracesRequestsCh <- tracesRequests } - createDataStreamRequestsCh <- createDataStreamRequests }) mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { select { @@ -437,7 +422,6 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { "apm-server": map[string]interface{}{ "wait_ready_interval": "100ms", "data_streams.wait_for_integration": true, - "data_streams.namespace": "testing", }, "output.elasticsearch": map[string]interface{}{ "hosts": []string{elasticsearchServer.URL}, @@ -472,8 +456,8 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { var done bool for !done { select { - case n := <-createDataStreamRequestsCh: - done = n == 3 + case n := <-tracesRequestsCh: + done = n == 2 case <-timeout: t.Fatal("timed out waiting for request") } @@ -487,7 +471,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) { } logs := srv.Logs.FilterMessageSnippet("please install the apm integration") - assert.Len(t, logs.All(), 2, "couldn't find remediation message logs") + assert.Len(t, logs.All(), 1, "couldn't find remediation message logs") // Healthcheck should now report that the server is publish-ready. resp, err = srv.Client.Get(srv.URL + api.RootPath) @@ -506,7 +490,7 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) { // elasticsearch client to send bulk requests. fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) - mux.HandleFunc("/_data_stream/traces-apm-default", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(404) }) mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/beater/waitready.go b/internal/beater/waitready.go index 50811a4d688..7bfaa6b04fd 100644 --- a/internal/beater/waitready.go +++ b/internal/beater/waitready.go @@ -21,7 +21,6 @@ import ( "context" "errors" "net/http" - "path" "time" "go.elastic.co/apm/v2" @@ -72,21 +71,27 @@ func waitReady( } // waitReadyRoundTripper wraps a *net/http.Transport, ensuring the server's -// indexing preconditions have been satisfied prior to allowing any indexing -// requests through. This is used to ensure we don't index any documents prior -// to the data stream index templates being ready. +// indexing preconditions have been satisfied by waiting for "ready" channel +// to be signalled, prior to allowing any requests through. +// +// This is used to prevent elasticsearch clients from proceeding with requests +// until the APM integration is installed to ensure we don't index any documents +// prior to the data stream index templates being ready. type waitReadyRoundTripper struct { *http.Transport - onBulk func(context.Context) error + ready <-chan struct{} + drain <-chan struct{} } var errServerShuttingDown = errors.New("server shutting down") func (c *waitReadyRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { - if path.Base(r.URL.Path) == "_bulk" { - if err := c.onBulk(r.Context()); err != nil { - return nil, err - } + select { + case <-c.drain: + return nil, errServerShuttingDown + case <-c.ready: + case <-r.Context().Done(): + return nil, r.Context().Err() } return c.Transport.RoundTrip(r) }