Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17] Instrumentation: fix log trace inconsistent status code with timeout check when writing the response (backport #15123) #15165

Merged
merged 1 commit into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ func agentConfigHandler(
func apmMiddleware(m map[request.ResultID]*monitoring.Int) []middleware.Middleware {
return []middleware.Middleware{
middleware.LogMiddleware(),
middleware.TimeoutMiddleware(),
middleware.RecoverPanicMiddleware(),
middleware.MonitoringMiddleware(m, nil),
}
Expand Down
45 changes: 0 additions & 45 deletions internal/beater/middleware/timeout_middleware.go

This file was deleted.

71 changes: 0 additions & 71 deletions internal/beater/middleware/timeout_middleware_test.go

This file was deleted.

11 changes: 11 additions & 0 deletions internal/beater/request/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package request
import (
"compress/gzip"
"compress/zlib"
"context"
"encoding/json"
"errors"
"io"
"net/http"
"net/netip"
Expand All @@ -42,6 +44,7 @@ const (

var (
mimeTypesJSON = []string{mimeTypeAny, mimeTypeApplicationJSON}
errTimeout = errors.New("request timed out")
)

type zlibReadCloseResetter interface {
Expand Down Expand Up @@ -189,6 +192,14 @@ func (c *Context) WriteResult() {
}
c.writeAttempts++

// Before writing the result check for client timeout.
// In case it happened override the result with timeout error.
if err := c.Request.Context().Err(); errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
c.Result.SetDefault(IDResponseErrorsTimeout)
c.Result.Err = errTimeout
c.Result.Body = errTimeout.Error()
}

c.ResponseWriter.Header().Set(headers.XContentTypeOptions, "nosniff")

body := c.Result.Body
Expand Down
93 changes: 93 additions & 0 deletions internal/beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand All @@ -39,6 +40,8 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -48,13 +51,17 @@ import (
_ "github.com/elastic/beats/v7/libbeat/outputs/console"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/beater"
"github.com/elastic/apm-server/internal/beater/api"
"github.com/elastic/apm-server/internal/beater/api/intake"
"github.com/elastic/apm-server/internal/beater/beatertest"
"github.com/elastic/apm-server/internal/beater/config"
"github.com/elastic/apm-server/internal/beater/monitoringtest"
"github.com/elastic/apm-server/internal/beater/request"
)

func TestServerOk(t *testing.T) {
Expand Down Expand Up @@ -587,6 +594,92 @@ func TestWrapServer(t *testing.T) {
require.Equal(t, "true", out["labels"].(map[string]any)["wrapped_reporter"])
}

func TestWrapServerAPMInstrumentationTimeout(t *testing.T) {
// Override ELASTIC_APM_API_REQUEST_TIME to 10ms instead of
// the default 10s to speed up this test time.
t.Setenv("ELASTIC_APM_API_REQUEST_TIME", "10ms")

// Enable self instrumentation, simulate a client disconnecting when sending intakev2 request
// Check that tracer records the correct http status code
found := make(chan struct{})
reqCtx, reqCancel := context.WithCancel(context.Background())

escfg, _ := beatertest.ElasticsearchOutputConfig(t)
_ = logp.DevelopmentSetup(logp.ToObserverOutput())
srv := beatertest.NewServer(t, beatertest.WithConfig(escfg, agentconfig.MustNewConfigFrom(
map[string]interface{}{
"instrumentation.enabled": true,
})), beatertest.WithWrapServer(
func(args beater.ServerParams, runServer beater.RunServerFunc) (beater.ServerParams, beater.RunServerFunc, error) {
args.BatchProcessor = modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
// The service name is set to "1234_service-12a3" in the testData file
if len(*batch) > 0 && (*batch)[0].Service.Name == "1234_service-12a3" {
// Simulate a client disconnecting by cancelling the context
reqCancel()
// Wait for the client disconnection to be acknowledged by http server
<-ctx.Done()
assert.ErrorIs(t, ctx.Err(), context.Canceled)
return errors.New("foobar")
}
for _, i := range *batch {
// Perform assertions on the event sent by the apmgorilla tracer
if i.Transaction.Id != "" && i.Transaction.Name == "POST /intake/v2/events" {
assert.Equal(t, "HTTP 5xx", i.Transaction.Result)
assert.Equal(t, http.StatusServiceUnavailable, int(i.Http.Response.StatusCode))
close(found)
}
}
return nil
})
return args, runServer, nil
},
))

monitoringtest.ClearRegistry(intake.MonitoringMap)

req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, srv.URL+api.IntakePath, bytes.NewReader(testData))
require.NoError(t, err)
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := srv.Client.Do(req)
require.ErrorIs(t, err, context.Canceled)
require.Nil(t, resp)

select {
case <-time.After(time.Second): // go apm agent takes time to send trace events
assert.Fail(t, "timeout waiting for trace doc")
case <-found:
// Have to wait a bit here to avoid racing on the order of metrics middleware and the batch processor from above.
time.Sleep(10 * time.Millisecond)
}

// Assert that logs contain expected values:
// - Original error with the status code.
// - Request timeout is logged separately with the the original error status code.
logs := logp.ObserverLogs().Filter(func(l observer.LoggedEntry) bool {
return l.Level == zapcore.ErrorLevel
}).AllUntimed()
assert.Len(t, logs, 1)
assert.Equal(t, logs[0].Message, "request timed out")
for _, f := range logs[0].Context {
switch f.Key {
case "http.response.status_code":
assert.Equal(t, int(f.Integer), http.StatusServiceUnavailable)
case "error.message":
assert.Equal(t, f.String, "request timed out")
}
}
// Assert that metrics have expected response values reported.
equal, result := monitoringtest.CompareMonitoringInt(map[request.ResultID]int{
request.IDRequestCount: 2,
request.IDResponseCount: 2,
request.IDResponseErrorsCount: 1,
request.IDResponseValidCount: 1,
request.IDResponseErrorsTimeout: 1, // test data POST /intake/v2/events
request.IDResponseValidAccepted: 1, // self-instrumentation
}, intake.MonitoringMap)
assert.True(t, equal, result)
}

var testData = func() []byte {
b, err := os.ReadFile("../../testdata/intake-v2/transactions.ndjson")
if err != nil {
Expand Down
Loading