Skip to content

Commit

Permalink
fix: clear scroll after completing scroll requests (#14551)
Browse files Browse the repository at this point in the history
* fix: clear scroll after completing scroll requests

clear the search context using a clear scroll request
for sourcemap and agentcfg search scroll requests

* lint: fix linter issues

* fix: clear scroll on error too

* test: validate scrollID when clearing

* test: increase sourcemap test coverage to use scroll requests

* feat: log failed clear scroll requests

* Update elasticsearch.go

* Update metadata_fetcher.go
  • Loading branch information
kruskall authored Nov 14, 2024
1 parent e0d7e32 commit 9487f8d
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 2 deletions.
19 changes: 19 additions & 0 deletions internal/agentcfg/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
for {
result, err := f.singlePageRefresh(ctx, scrollID)
if err != nil {
f.clearScroll(ctx, scrollID)
return err
}

Expand All @@ -240,6 +241,8 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
}
}

f.clearScroll(ctx, scrollID)

f.mu.Lock()
f.cache = buffer
f.mu.Unlock()
Expand All @@ -249,6 +252,22 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {
return nil
}

func (f *ElasticsearchFetcher) clearScroll(ctx context.Context, scrollID string) {
resp, err := esapi.ClearScrollRequest{
ScrollID: []string{scrollID},
}.Do(ctx, f.client)
if err != nil {
f.logger.Warnf("failed to clear scroll: %v", err)
return
}

if resp.IsError() {
f.logger.Warn("clearscroll request returned error: %s", resp.Status())
}

resp.Body.Close()
}

func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) {
var result cacheResult
var err error
Expand Down
11 changes: 9 additions & 2 deletions internal/agentcfg/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -79,6 +80,11 @@ func newElasticsearchFetcher(
i := 0

fetcher := NewElasticsearchFetcher(newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodDelete && strings.HasPrefix(r.URL.Path, "/_search/scroll") {
scrollID := strings.TrimPrefix(r.URL.Path, "/_search/scroll/")
assert.Equal(t, respTmpl["_scroll_id"], scrollID)
return
}
switch r.URL.Path {
case "/_search/scroll":
scrollID := r.URL.Query().Get("scroll_id")
Expand Down Expand Up @@ -117,14 +123,15 @@ func TestRun(t *testing.T) {
assert.Eventually(t, func() bool {
rt.Tracer.Flush(nil)
payloads := rt.Payloads()
return len(payloads.Transactions) == 1 && len(payloads.Spans) == 3
return len(payloads.Transactions) == 1 && len(payloads.Spans) == 4
}, 10*time.Second, 10*time.Millisecond)

payloads := rt.Payloads()
assert.Equal(t, "ElasticsearchFetcher.refresh", payloads.Transactions[0].Name)
assert.Equal(t, "Elasticsearch: POST .apm-agent-configuration/_search", payloads.Spans[0].Name)
assert.Equal(t, "Elasticsearch: POST _search/scroll", payloads.Spans[1].Name)
assert.Equal(t, "ElasticsearchFetcher.refreshCache", payloads.Spans[2].Name)
assert.Equal(t, "Elasticsearch: DELETE _search/scroll/", payloads.Spans[2].Name[:37]) // trim scrollID
assert.Equal(t, "ElasticsearchFetcher.refreshCache", payloads.Spans[3].Name)
}

func TestFetch(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions internal/sourcemap/metadata_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (s *MetadataESFetcher) sync(ctx context.Context) error {
for {
result, err = s.scrollsearch(ctx, scrollID, sourcemaps)
if err != nil {
s.clearScroll(ctx, scrollID)
if e := apm.CaptureError(ctx, err); e != nil {
e.Send()
}
Expand All @@ -179,10 +180,28 @@ func (s *MetadataESFetcher) sync(ctx context.Context) error {
}
}

s.clearScroll(ctx, scrollID)

s.update(ctx, sourcemaps)
return nil
}

func (s *MetadataESFetcher) clearScroll(ctx context.Context, scrollID string) {
resp, err := esapi.ClearScrollRequest{
ScrollID: []string{scrollID},
}.Do(ctx, s.esClient)
if err != nil {
s.logger.Warnf("failed to clear scroll: %v", err)
return
}

if resp.IsError() {
s.logger.Warn("clearscroll request returned error: %s", resp.Status())
}

resp.Body.Close()
}

func (s *MetadataESFetcher) update(ctx context.Context, sourcemaps map[identifier]string) {
span := apm.TransactionFromContext(ctx).StartSpan("MetadataESFetcher.update", "", nil)
defer span.End()
Expand Down
15 changes: 15 additions & 0 deletions internal/sourcemap/metadata_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func TestMetadataFetcher(t *testing.T) {
require.NotNil(t, tc.searchReponse, "nil searchReponse, possible unexpected request")
// search request from the metadata fetcher
tc.searchReponse(w, r)
case "/_search/scroll":
scrollIDValue := r.URL.Query().Get("scroll_id")
assert.Equal(t, scrollID, scrollIDValue)
w.Write([]byte(`{}`))
case "/_search/scroll/" + scrollID:
assert.Equal(t, r.Method, http.MethodDelete)
default:
w.WriteHeader(http.StatusTeapot)
t.Fatalf("unhandled request path: %s", r.URL.Path)
Expand Down Expand Up @@ -234,6 +240,12 @@ func TestInvalidation(t *testing.T) {
case "/.apm-source-map/_search":
// search request from the metadata fetcher
tc.searchReponse(w, r)
case "/_search/scroll":
scrollIDValue := r.URL.Query().Get("scroll_id")
assert.Equal(t, scrollID, scrollIDValue)
w.Write([]byte(`{}`))
case "/_search/scroll/" + scrollID:
assert.Equal(t, r.Method, http.MethodDelete)
default:
w.WriteHeader(http.StatusTeapot)
t.Fatalf("unhandled request path: %s", r.URL.Path)
Expand Down Expand Up @@ -294,6 +306,8 @@ func TestInvalidation(t *testing.T) {
}
}

const scrollID = "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFkJUT0Z5bFUtUXRXM3NTYno0dkM2MlEAAAAAAABnRBY5OUxYalAwUFFoS1NfLV9lWjlSYTRn"

func sourcemapSearchResponseBody(ids []metadata) []byte {
m := make([]map[string]interface{}, 0, len(ids))
for _, id := range ids {
Expand All @@ -318,6 +332,7 @@ func sourcemapSearchResponseBody(ids []metadata) []byte {
},
"hits": m,
},
"_scroll_id": scrollID,
}

data, err := json.Marshal(result)
Expand Down

0 comments on commit 9487f8d

Please sign in to comment.