Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.12] Revert "Revise wait_for_integration ES implementation (#12150)" (backport #12240) #12249

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 15 additions & 31 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"net/http"
"os"
"runtime"
"sync"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -43,9 +42,11 @@ import (
_ "google.golang.org/grpc/encoding/gzip"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/licenser"
"github.com/elastic/beats/v7/libbeat/outputs"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
agentconfig "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -346,13 +347,7 @@ func (s *Runner) Run(ctx context.Context) error {
// any events to Elasticsearch before the integration is ready.
publishReady := make(chan struct{})
drain := make(chan struct{})
startWaitReady := make(chan struct{})
var waitReadyOnce sync.Once
g.Go(func() error {
select {
case <-ctx.Done():
case <-startWaitReady:
}
if err := s.waitReady(ctx, kibanaClient, tracer); err != nil {
// One or more preconditions failed; drop events.
close(drain)
Expand All @@ -363,25 +358,24 @@ func (s *Runner) Run(ctx context.Context) error {
close(publishReady)
return nil
})
prePublish := func(ctx context.Context) error {
waitReadyOnce.Do(func() {
close(startWaitReady)
})
callbackUUID, err := esoutput.RegisterConnectCallback(func(*eslegclient.Connection) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-drain:
return errServerShuttingDown
case <-publishReady:
return nil
default:
}
return nil
return errors.New("not ready for publishing events")
})
if err != nil {
return err
}
defer esoutput.DeregisterConnectCallback(callbackUUID)
newElasticsearchClient := func(cfg *elasticsearch.Config) (*elasticsearch.Client, error) {
httpTransport, err := elasticsearch.NewHTTPTransport(cfg)
if err != nil {
return nil, err
}
transport := &waitReadyRoundTripper{Transport: httpTransport, onBulk: prePublish}
transport := &waitReadyRoundTripper{Transport: httpTransport, ready: publishReady, drain: drain}
return elasticsearch.NewClientParams(elasticsearch.ClientParams{
Config: cfg,
Transport: transport,
Expand Down Expand Up @@ -438,7 +432,7 @@ func (s *Runner) Run(ctx context.Context) error {
// Create the BatchProcessor chain that is used to process all events,
// including the metrics aggregated by APM Server.
finalBatchProcessor, closeFinalBatchProcessor, err := s.newFinalBatchProcessor(
tracer, newElasticsearchClient, memLimitGB, prePublish,
tracer, newElasticsearchClient, memLimitGB,
)
if err != nil {
return err
Expand Down Expand Up @@ -653,9 +647,7 @@ func (s *Runner) waitReady(
return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
}
preconditions = append(preconditions, func(ctx context.Context) error {
return checkIndexTemplatesInstalled(
ctx, kibanaClient, esOutputClient, s.config.DataStreams.Namespace, s.logger,
)
return checkIntegrationInstalled(ctx, kibanaClient, esOutputClient, s.logger)
})
}

Expand All @@ -680,13 +672,12 @@ func (s *Runner) newFinalBatchProcessor(
tracer *apm.Tracer,
newElasticsearchClient func(cfg *elasticsearch.Config) (*elasticsearch.Client, error),
memLimit float64,
prePublish func(context.Context) error,
) (modelpb.BatchProcessor, func(context.Context) error, error) {

monitoring.Default.Remove("libbeat")
libbeatMonitoringRegistry := monitoring.Default.NewRegistry("libbeat")
if s.elasticsearchOutputConfig == nil {
return s.newLibbeatFinalBatchProcessor(tracer, prePublish, libbeatMonitoringRegistry)
return s.newLibbeatFinalBatchProcessor(tracer, libbeatMonitoringRegistry)
}

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
Expand Down Expand Up @@ -838,7 +829,6 @@ func docappenderConfig(

func (s *Runner) newLibbeatFinalBatchProcessor(
tracer *apm.Tracer,
prePublish func(context.Context) error,
libbeatMonitoringRegistry *monitoring.Registry,
) (modelpb.BatchProcessor, func(context.Context) error, error) {
// When the publisher stops cleanly it will close its pipeline client,
Expand Down Expand Up @@ -899,13 +889,7 @@ func (s *Runner) newLibbeatFinalBatchProcessor(
}
return acker.Wait(ctx)
}
processor := modelprocessor.Chained{
modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
return prePublish(ctx)
}),
publisher,
}
return processor, stop, nil
return publisher, stop, nil
}

const sourcemapIndex = ".apm-source-map"
Expand Down
3 changes: 1 addition & 2 deletions internal/beater/beatertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ func NewUnstartedServer(t testing.TB, opts ...option) *Server {
require.NoError(t, err)
if !outputConfig.Output.IsSet() {
err = cfg.Merge(map[string]any{
"output.null": map[string]any{},
"queue.mem.flush": map[string]any{"min_events": 1, "timeout": "1ns"},
"output.null": map[string]any{},
})
require.NoError(t, err)
}
Expand Down
91 changes: 54 additions & 37 deletions internal/beater/checkintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,23 @@ import (
"fmt"
"io"
"net/http"
"strings"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/kibana"
"github.com/elastic/go-elasticsearch/v8/typedapi/indices/createdatastream"
"github.com/elastic/go-elasticsearch/v8/typedapi/types"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

// checkIndexTemplatesInstalled checks if the APM index templates are installed by querying the
// APM integration status via Kibana, or by attempting to create a data stream via Elasticsearch,
// returning nil if and only if it is installed.
func checkIndexTemplatesInstalled(
// checkIntegrationInstalled checks if the APM integration is installed by querying Kibana
// and/or Elasticsearch, returning nil if and only if it is installed.
func checkIntegrationInstalled(
ctx context.Context,
kibanaClient *kibana.Client,
esClient *elasticsearch.Client,
namespace string,
logger *logp.Logger,
) (err error) {
defer func() {
Expand All @@ -56,36 +53,37 @@ func checkIndexTemplatesInstalled(
}
}
}()
if esClient != nil {
installed, err := checkCreateDataStream(ctx, esClient, namespace)
if err != nil {
return fmt.Errorf("error checking Elasticsearch index template setup: %w", err)
}
if !installed {
return errors.New("index templates not installed")
}
return nil
}
if kibanaClient != nil {
installed, err := checkIntegrationInstalled(ctx, kibanaClient, logger)
installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger)
if err != nil {
// We only return the Kibana error if we have no Elasticsearch client,
// as we may not have sufficient privileges to query the Fleet API.
if esClient == nil {
return fmt.Errorf("error querying Kibana for integration package status: %w", err)
}
}
if !installed {
} else if !installed {
// We were able to query Kibana, but the package is not yet installed.
// We should continue querying the package status via Kibana, as it is
// more authoritative than checking for index template installation.
return errors.New("integration package not yet installed")
}
// Fall through and query Elasticsearch (if we have a client). Kibana may prematurely
// report packages as installed: https://github.com/elastic/kibana/issues/108649
}
if esClient != nil {
installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger)
if err != nil {
return fmt.Errorf("error querying Elasticsearch for integration index templates: %w", err)
} else if !installed {
return errors.New("integration index templates not installed")
}
}
return nil
}

// checkIntegrationInstalledKibana checks if the APM integration package
// is installed by querying Kibana.
func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient *kibana.Client, logger *logp.Logger) (bool, error) {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
return false, err
Expand All @@ -108,22 +106,41 @@ func checkIntegrationInstalled(ctx context.Context, kibanaClient *kibana.Client,
return result.Response.Status == "installed", nil
}

// checkCreateDataStream attempts to create a traces-apm-<namespace> data stream,
// returning an error if it could not be created. This will fail if there is no
// index template matching the pattern.
func checkCreateDataStream(ctx context.Context, esClient *elasticsearch.Client, namespace string) (bool, error) {
if _, err := createdatastream.New(esClient).Name("traces-apm-" + namespace).Do(ctx); err != nil {
var esError *types.ElasticsearchError
if errors.As(err, &esError) {
cause := esError.ErrorCause
if cause.Type == "resource_already_exists_exception" {
return true, nil
func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient *elasticsearch.Client, _ *logp.Logger) (bool, error) {
// TODO(axw) generate the list of expected index templates.
templates := []string{
"traces-apm",
"traces-apm.sampled",
"metrics-apm.app",
"metrics-apm.internal",
"logs-apm.error",
}
for _, intervals := range []string{"1m", "10m", "60m"} {
for _, ds := range []string{"metrics-apm.transaction", "metrics-apm.service_transaction", "metrics-apm.service_destination", "metrics-apm.service_summary"} {
templates = append(templates, fmt.Sprintf("%s.%s", ds, intervals))
}
}
// IndicesGetIndexTemplateRequest accepts a slice of template names,
// but the REST API expects just one index template name. Query them
// in parallel.
g, ctx := errgroup.WithContext(ctx)
for _, template := range templates {
template := template // copy for closure
g.Go(func() error {
req := esapi.IndicesGetIndexTemplateRequest{Name: template}
resp, err := req.Do(ctx, esClient)
if err != nil {
return err
}
if cause.Reason != nil && strings.HasPrefix(*cause.Reason, "no matching index template") {
return false, nil
defer resp.Body.Close()

if resp.IsError() {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body))
}
}
return false, err
return nil
})
}
return true, nil
err := g.Wait()
return err == nil, err
}
46 changes: 15 additions & 31 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestServerOTLPGRPC(t *testing.T) {

func TestServerWaitForIntegrationKibana(t *testing.T) {
var requests int64
requestCh := make(chan struct{}, 3)
requestCh := make(chan struct{})
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"version":{"number":"1.2.3"}}`))
Expand Down Expand Up @@ -363,14 +363,6 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {
},
})))

// Send some events to the server. They should be accepted and enqueued.
req := makeTransactionRequest(t, srv.URL)
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := srv.Client.Do(req)
assert.NoError(t, err)
assert.Equal(t, http.StatusAccepted, resp.StatusCode)
resp.Body.Close()

timeout := time.After(10 * time.Second)
for i := 0; i < 3; i++ {
select {
Expand All @@ -395,8 +387,8 @@ func TestServerWaitForIntegrationKibana(t *testing.T) {

func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var mu sync.Mutex
var createDataStreamRequests int
createDataStreamRequestsCh := make(chan int)
var tracesRequests int
tracesRequestsCh := make(chan int)
bulkCh := make(chan struct{}, 1)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -405,24 +397,17 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_data_stream/", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
name := path.Base(r.URL.Path)
if name != "traces-apm-testing" {
panic("unexpected data stream name: " + name)
}
createDataStreamRequests++
switch createDataStreamRequests {
case 1:
w.WriteHeader(500)
case 2:
w.WriteHeader(400)
w.Write([]byte(`{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"}],"type":"illegal_argument_exception","reason":"no matching index template found for data stream [traces-apm-testing]"},"status":400}`))
case 3:
w.Write([]byte(`{"acknowledged":true}`))
template := path.Base(r.URL.Path)
if template == "traces-apm" {
tracesRequests++
if tracesRequests == 1 {
w.WriteHeader(404)
}
tracesRequestsCh <- tracesRequests
}
createDataStreamRequestsCh <- createDataStreamRequests
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
select {
Expand All @@ -437,7 +422,6 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
"apm-server": map[string]interface{}{
"wait_ready_interval": "100ms",
"data_streams.wait_for_integration": true,
"data_streams.namespace": "testing",
},
"output.elasticsearch": map[string]interface{}{
"hosts": []string{elasticsearchServer.URL},
Expand Down Expand Up @@ -472,8 +456,8 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var done bool
for !done {
select {
case n := <-createDataStreamRequestsCh:
done = n == 3
case n := <-tracesRequestsCh:
done = n == 2
case <-timeout:
t.Fatal("timed out waiting for request")
}
Expand All @@ -487,7 +471,7 @@ func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
}

logs := srv.Logs.FilterMessageSnippet("please install the apm integration")
assert.Len(t, logs.All(), 2, "couldn't find remediation message logs")
assert.Len(t, logs.All(), 1, "couldn't find remediation message logs")

// Healthcheck should now report that the server is publish-ready.
resp, err = srv.Client.Get(srv.URL + api.RootPath)
Expand All @@ -506,7 +490,7 @@ func TestServerFailedPreconditionDoesNotIndex(t *testing.T) {
// elasticsearch client to send bulk requests.
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
mux.HandleFunc("/_data_stream/traces-apm-default", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
})
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading