Skip to content

Commit

Permalink
Merge branch '8.15' into updatecli_8.15_updatecli-bump-elastic-stack-…
Browse files Browse the repository at this point in the history
…snapshot-8.15
  • Loading branch information
mergify[bot] authored Sep 9, 2024
2 parents c6c087b + 91bef75 commit fc58c6f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 32 deletions.
46 changes: 22 additions & 24 deletions internal/agentcfg/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,40 +251,38 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {

func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) {
var result cacheResult
var err error
var resp *esapi.Response

if scrollID == "" {
resp, err := esapi.SearchRequest{
switch scrollID {
case "":
resp, err = esapi.SearchRequest{
Index: []string{ElasticsearchIndexName},
Size: &f.searchSize,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
if err != nil {
return result, err
}
defer resp.Body.Close()

if resp.StatusCode >= http.StatusBadRequest {
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
f.invalidESCfg.Store(true)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
f.logger.Debugf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
}
return result, json.NewDecoder(resp.Body).Decode(&result)
default:
resp, err = esapi.ScrollRequest{
ScrollID: scrollID,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
}

resp, err := esapi.ScrollRequest{
ScrollID: result.ScrollID,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
if err != nil {
return result, err
}
defer resp.Body.Close()

if resp.StatusCode >= http.StatusBadRequest {
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
f.invalidESCfg.Store(true)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
f.logger.Debugf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
}
return result, json.NewDecoder(resp.Body).Decode(&result)
}

Expand Down
29 changes: 21 additions & 8 deletions internal/agentcfg/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package agentcfg
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -39,11 +38,10 @@ var sampleHits = []map[string]interface{}{
{"_id": "hvKmzYQBfJ4l0GgqXgJt", "_index": ".apm-agent-configuration", "_score": 1, "_source": map[string]interface{}{"@timestamp": 1.669897543277e+12, "applied_by_agent": false, "etag": "2da2f86251165ccced5c5e41100a216b0c880db4", "service": map[string]interface{}{"name": "second"}, "settings": map[string]interface{}{"sanitize_field_names": "foo,bar,baz", "transaction_sample_rate": "0.1"}}},
}

func newMockElasticsearchClient(t testing.TB, statusCode int, responseFunc func(io.Writer)) *elasticsearch.Client {
func newMockElasticsearchClient(t testing.TB, handler func(http.ResponseWriter, *http.Request)) *elasticsearch.Client {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
w.WriteHeader(statusCode)
responseFunc(w)
handler(w, r)
}))
t.Cleanup(srv.Close)
config := elasticsearch.DefaultConfig()
Expand Down Expand Up @@ -80,7 +78,15 @@ func newElasticsearchFetcher(

i := 0

fetcher := NewElasticsearchFetcher(newMockElasticsearchClient(t, 200, func(w io.Writer) {
fetcher := NewElasticsearchFetcher(newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/_search/scroll":
scrollID := r.URL.Query().Get("scroll_id")
assert.Equal(t, respTmpl["_scroll_id"], scrollID)
case "/.apm-agent-configuration/_search":
default:
assert.Failf(t, "unexpected path", "path: %s", r.URL.Path)
}
if i < len(hits) {
respTmpl["hits"].(map[string]interface{})["hits"] = hits[i : i+searchSize]
} else {
Expand All @@ -89,6 +95,7 @@ func newElasticsearchFetcher(

b, err := json.Marshal(respTmpl)
require.NoError(t, err)
w.WriteHeader(200)
w.Write(b)
i += searchSize
}), time.Second, nil, rt)
Expand Down Expand Up @@ -173,7 +180,9 @@ func TestFetchUseFallback(t *testing.T) {
return Result{}, nil
})
fetcher := NewElasticsearchFetcher(
newMockElasticsearchClient(t, 404, func(w io.Writer) {}),
newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
}),
time.Second,
fallbackFetcher,
apmtest.NewRecordingTracer().Tracer,
Expand All @@ -186,7 +195,9 @@ func TestFetchUseFallback(t *testing.T) {

func TestFetchNoFallbackInvalidESCfg(t *testing.T) {
fetcher := NewElasticsearchFetcher(
newMockElasticsearchClient(t, 401, func(w io.Writer) {}),
newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(401)
}),
time.Second,
nil,
apmtest.NewRecordingTracer().Tracer,
Expand All @@ -200,7 +211,9 @@ func TestFetchNoFallbackInvalidESCfg(t *testing.T) {

func TestFetchNoFallback(t *testing.T) {
fetcher := NewElasticsearchFetcher(
newMockElasticsearchClient(t, 500, func(w io.Writer) {}),
newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(500)
}),
time.Second,
nil,
apmtest.NewRecordingTracer().Tracer,
Expand Down

0 comments on commit fc58c6f

Please sign in to comment.