From 9487f8d670267ac5baa8fde3ff0f45622cc067c9 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Thu, 14 Nov 2024 11:03:42 +0100 Subject: [PATCH] fix: clear scroll after completing scroll requests (#14551) * fix: clear scroll after completing scroll requests clear the search context using a clear scroll request for sourcemap and agentcfg search scroll requests * lint: fix linter issues * fix: clear scroll on error too * test: validate scrollID when clearing * test: increase sourcemap test coverage to use scroll requests * feat: log failed clear scroll requests * Update elasticsearch.go * Update metadata_fetcher.go --- internal/agentcfg/elasticsearch.go | 19 +++++++++++++++++++ internal/agentcfg/elasticsearch_test.go | 11 +++++++++-- internal/sourcemap/metadata_fetcher.go | 19 +++++++++++++++++++ internal/sourcemap/metadata_fetcher_test.go | 15 +++++++++++++++ 4 files changed, 62 insertions(+), 2 deletions(-) diff --git a/internal/agentcfg/elasticsearch.go b/internal/agentcfg/elasticsearch.go index 066b9374817..dcc371ef947 100644 --- a/internal/agentcfg/elasticsearch.go +++ b/internal/agentcfg/elasticsearch.go @@ -222,6 +222,7 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) { for { result, err := f.singlePageRefresh(ctx, scrollID) if err != nil { + f.clearScroll(ctx, scrollID) return err } @@ -240,6 +241,8 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) { } } + f.clearScroll(ctx, scrollID) + f.mu.Lock() f.cache = buffer f.mu.Unlock() @@ -249,6 +252,22 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) { return nil } +func (f *ElasticsearchFetcher) clearScroll(ctx context.Context, scrollID string) { + resp, err := esapi.ClearScrollRequest{ + ScrollID: []string{scrollID}, + }.Do(ctx, f.client) + if err != nil { + f.logger.Warnf("failed to clear scroll: %v", err) + return + } + + if resp.IsError() { + f.logger.Warn("clearscroll request returned error: %s", resp.Status()) + } + + resp.Body.Close() +} + func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) { var result cacheResult var err error diff --git a/internal/agentcfg/elasticsearch_test.go b/internal/agentcfg/elasticsearch_test.go index 4671934c847..ea8391d1756 100644 --- a/internal/agentcfg/elasticsearch_test.go +++ b/internal/agentcfg/elasticsearch_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -79,6 +80,11 @@ func newElasticsearchFetcher( i := 0 fetcher := NewElasticsearchFetcher(newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodDelete && strings.HasPrefix(r.URL.Path, "/_search/scroll") { + scrollID := strings.TrimPrefix(r.URL.Path, "/_search/scroll/") + assert.Equal(t, respTmpl["_scroll_id"], scrollID) + return + } switch r.URL.Path { case "/_search/scroll": scrollID := r.URL.Query().Get("scroll_id") @@ -117,14 +123,15 @@ func TestRun(t *testing.T) { assert.Eventually(t, func() bool { rt.Tracer.Flush(nil) payloads := rt.Payloads() - return len(payloads.Transactions) == 1 && len(payloads.Spans) == 3 + return len(payloads.Transactions) == 1 && len(payloads.Spans) == 4 }, 10*time.Second, 10*time.Millisecond) payloads := rt.Payloads() assert.Equal(t, "ElasticsearchFetcher.refresh", payloads.Transactions[0].Name) assert.Equal(t, "Elasticsearch: POST .apm-agent-configuration/_search", payloads.Spans[0].Name) assert.Equal(t, "Elasticsearch: POST _search/scroll", payloads.Spans[1].Name) - assert.Equal(t, "ElasticsearchFetcher.refreshCache", payloads.Spans[2].Name) + assert.Equal(t, "Elasticsearch: DELETE _search/scroll/", payloads.Spans[2].Name[:37]) // trim scrollID + assert.Equal(t, "ElasticsearchFetcher.refreshCache", payloads.Spans[3].Name) } func TestFetch(t *testing.T) { diff --git a/internal/sourcemap/metadata_fetcher.go b/internal/sourcemap/metadata_fetcher.go index d375a501fda..afb6d561406 100644 --- a/internal/sourcemap/metadata_fetcher.go +++ b/internal/sourcemap/metadata_fetcher.go @@ -159,6 +159,7 @@ func (s *MetadataESFetcher) sync(ctx context.Context) error { for { result, err = s.scrollsearch(ctx, scrollID, sourcemaps) if err != nil { + s.clearScroll(ctx, scrollID) if e := apm.CaptureError(ctx, err); e != nil { e.Send() } @@ -179,10 +180,28 @@ func (s *MetadataESFetcher) sync(ctx context.Context) error { } } + s.clearScroll(ctx, scrollID) + s.update(ctx, sourcemaps) return nil } +func (s *MetadataESFetcher) clearScroll(ctx context.Context, scrollID string) { + resp, err := esapi.ClearScrollRequest{ + ScrollID: []string{scrollID}, + }.Do(ctx, s.esClient) + if err != nil { + s.logger.Warnf("failed to clear scroll: %v", err) + return + } + + if resp.IsError() { + s.logger.Warn("clearscroll request returned error: %s", resp.Status()) + } + + resp.Body.Close() +} + func (s *MetadataESFetcher) update(ctx context.Context, sourcemaps map[identifier]string) { span := apm.TransactionFromContext(ctx).StartSpan("MetadataESFetcher.update", "", nil) defer span.End() diff --git a/internal/sourcemap/metadata_fetcher_test.go b/internal/sourcemap/metadata_fetcher_test.go index 7c892890906..686711f7dc5 100644 --- a/internal/sourcemap/metadata_fetcher_test.go +++ b/internal/sourcemap/metadata_fetcher_test.go @@ -103,6 +103,12 @@ func TestMetadataFetcher(t *testing.T) { require.NotNil(t, tc.searchReponse, "nil searchReponse, possible unexpected request") // search request from the metadata fetcher tc.searchReponse(w, r) + case "/_search/scroll": + scrollIDValue := r.URL.Query().Get("scroll_id") + assert.Equal(t, scrollID, scrollIDValue) + w.Write([]byte(`{}`)) + case "/_search/scroll/" + scrollID: + assert.Equal(t, r.Method, http.MethodDelete) default: w.WriteHeader(http.StatusTeapot) t.Fatalf("unhandled request path: %s", r.URL.Path) @@ -234,6 +240,12 @@ func TestInvalidation(t *testing.T) { case "/.apm-source-map/_search": // search request from the metadata fetcher tc.searchReponse(w, r) + case "/_search/scroll": + scrollIDValue := r.URL.Query().Get("scroll_id") + assert.Equal(t, scrollID, scrollIDValue) + w.Write([]byte(`{}`)) + case "/_search/scroll/" + scrollID: + assert.Equal(t, r.Method, http.MethodDelete) default: w.WriteHeader(http.StatusTeapot) t.Fatalf("unhandled request path: %s", r.URL.Path) @@ -294,6 +306,8 @@ func TestInvalidation(t *testing.T) { } } +const scrollID = "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFkJUT0Z5bFUtUXRXM3NTYno0dkM2MlEAAAAAAABnRBY5OUxYalAwUFFoS1NfLV9lWjlSYTRn" + func sourcemapSearchResponseBody(ids []metadata) []byte { m := make([]map[string]interface{}, 0, len(ids)) for _, id := range ids { @@ -318,6 +332,7 @@ func sourcemapSearchResponseBody(ids []metadata) []byte { }, "hits": m, }, + "_scroll_id": scrollID, } data, err := json.Marshal(result)