From 0ca4e949c1a204793d99a8dd8ecacc3404c0946c Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Tue, 17 Sep 2024 15:30:21 +0200 Subject: [PATCH] contrib/grpc: attempt to fix flaky tests - AppSec tests are occasionally receiving an unexpected `io.EOF` error from a gRPC stream send, which according to the documentation happens if the error does not originate from the client (and in those cases, blocking causes the error server-side) - Some tests were locally flaking due to expecting a certain count of security events being discovered, but this sometimes was not met due to the WAF timeout (this likely would not happen in CI, where we seront a higher-than-default timeout via environment) - Some other (non-AppSec) tests fail (possibly due to AppSec being there), but would not reproduce locally... so this PR replaces some calls to `require` with calls to `assert` to allow collection of more information about what's happened; and adds some new `assert` checks at key locations where errors could have been silently ignored previously. --- contrib/google.golang.org/grpc/appsec_test.go | 76 ++++++++++--------- contrib/google.golang.org/grpc/grpc_test.go | 32 ++++---- internal/appsec/config/config.go | 9 +++ 3 files changed, 66 insertions(+), 51 deletions(-) diff --git a/contrib/google.golang.org/grpc/appsec_test.go b/contrib/google.golang.org/grpc/appsec_test.go index 911efa7c5d..882fb60d18 100644 --- a/contrib/google.golang.org/grpc/appsec_test.go +++ b/contrib/google.golang.org/grpc/appsec_test.go @@ -9,13 +9,17 @@ import ( "context" "encoding/json" "fmt" + "io" "net" "testing" + "time" pappsec "gopkg.in/DataDog/dd-trace-go.v1/appsec" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec" + appsecConfig "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/config" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -24,26 +28,26 @@ import ( ) func TestAppSec(t *testing.T) { - appsec.Start() + appsec.Start(appsecConfig.WithWAFTimeout(time.Hour /* functionally unlimited */)) defer appsec.Stop() if !appsec.Enabled() { t.Skip("appsec disabled") } - setup := func() (FixtureClient, mocktracer.Tracer, func()) { - rig, err := newAppsecRig(false) + setup := func(t *testing.T) (FixtureClient, mocktracer.Tracer, func()) { + rig, err := newAppsecRig(t, false) require.NoError(t, err) mt := mocktracer.Start() return rig.client, mt, func() { - rig.Close() + assert.NoError(t, rig.Close()) mt.Stop() } } t.Run("unary", func(t *testing.T) { - client, mt, cleanup := setup() + client, mt, cleanup := setup(t) defer cleanup() // Send a XSS attack in the payload along with the canary value in the RPC metadata @@ -64,7 +68,7 @@ func TestAppSec(t *testing.T) { }) t.Run("stream", func(t *testing.T) { - client, mt, cleanup := setup() + client, mt, cleanup := setup(t) defer cleanup() // Send a XSS attack in the payload along with the canary value in the RPC metadata @@ -73,8 +77,9 @@ func TestAppSec(t *testing.T) { require.NoError(t, err) // Send a XSS attack - err = stream.Send(&FixtureRequest{Name: ""}) - require.NoError(t, err) + if err := stream.Send(&FixtureRequest{Name: ""}); err != io.EOF { + require.NoError(t, err) + } // Check that the handler was properly called res, err := stream.Recv() @@ -83,8 +88,9 @@ func TestAppSec(t *testing.T) { for i := 0; i < 5; i++ { // Fire multiple times, each time should result in a detected event // Send a SQLi attack - err = stream.Send(&FixtureRequest{Name: fmt.Sprintf("-%[1]d' and %[1]d=%[1]d union select * from users--", i)}) - require.NoError(t, err) + if err := stream.Send(&FixtureRequest{Name: fmt.Sprintf("-%[1]d' and %[1]d=%[1]d union select * from users--", i)}); err != io.EOF { + require.NoError(t, err) + } // Check that the handler was properly called res, err = stream.Recv() @@ -121,9 +127,9 @@ func TestAppSec(t *testing.T) { histogram[tr.Rule.ID]++ } - require.EqualValues(t, 1, histogram["crs-941-180"]) // XSS attack attempt - require.EqualValues(t, 5, histogram["crs-942-270"]) // SQL-injection attack attempt - require.EqualValues(t, 1, histogram["ua0-600-55x"]) // canary rule attack attempt + assert.EqualValues(t, 1, histogram["crs-941-180"]) // XSS attack attempt + assert.EqualValues(t, 5, histogram["crs-942-270"]) // SQL-injection attack attempt + assert.EqualValues(t, 1, histogram["ua0-600-55x"]) // canary rule attack attempt require.Len(t, histogram, 3) }) @@ -139,13 +145,13 @@ func TestBlocking(t *testing.T) { } setup := func() (FixtureClient, mocktracer.Tracer, func()) { - rig, err := newAppsecRig(false) + rig, err := newAppsecRig(t, false) require.NoError(t, err) mt := mocktracer.Start() return rig.client, mt, func() { - rig.Close() + assert.NoError(t, rig.Close()) mt.Stop() } } @@ -183,7 +189,7 @@ func TestBlocking(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // Helper assertion function to run for the unary and stream tests - assert := func(t *testing.T, do func(client FixtureClient)) { + withClient := func(t *testing.T, do func(client FixtureClient)) { client, mt, cleanup := setup() defer cleanup() @@ -204,7 +210,7 @@ func TestBlocking(t *testing.T) { } t.Run("unary", func(t *testing.T) { - assert(t, func(client FixtureClient) { + withClient(t, func(client FixtureClient) { ctx := metadata.NewOutgoingContext(context.Background(), tc.md) reply, err := client.Ping(ctx, &FixtureRequest{Name: tc.message}) require.Nil(t, reply) @@ -213,19 +219,18 @@ func TestBlocking(t *testing.T) { }) t.Run("stream", func(t *testing.T) { - assert(t, func(client FixtureClient) { + withClient(t, func(client FixtureClient) { ctx := metadata.NewOutgoingContext(context.Background(), tc.md) // Open the stream stream, err := client.StreamPing(ctx) require.NoError(t, err) - defer func() { - require.NoError(t, stream.CloseSend()) - }() + defer func() { assert.NoError(t, stream.CloseSend()) }() // Send a message - err = stream.Send(&FixtureRequest{Name: tc.message}) - require.NoError(t, err) + if err := stream.Send(&FixtureRequest{Name: tc.message}); err != io.EOF { + require.NoError(t, err) + } // Receive a message reply, err := stream.Recv() @@ -249,20 +254,20 @@ func TestPasslist(t *testing.T) { t.Skip("appsec disabled") } - setup := func() (FixtureClient, mocktracer.Tracer, func()) { - rig, err := newAppsecRig(false) + setup := func(t *testing.T) (FixtureClient, mocktracer.Tracer, func()) { + rig, err := newAppsecRig(t, false) require.NoError(t, err) mt := mocktracer.Start() return rig.client, mt, func() { - rig.Close() + assert.NoError(t, rig.Close()) mt.Stop() } } t.Run("unary", func(t *testing.T) { - client, mt, cleanup := setup() + client, mt, cleanup := setup(t) defer cleanup() // Send the payload triggering the sec event thanks to the "zouzou" value in the RPC metadata @@ -284,7 +289,7 @@ func TestPasslist(t *testing.T) { }) t.Run("stream", func(t *testing.T) { - client, mt, cleanup := setup() + client, mt, cleanup := setup(t) defer cleanup() // Open the steam triggering the sec event thanks to the "zouzou" value in the RPC metadata @@ -294,8 +299,9 @@ func TestPasslist(t *testing.T) { // Send some messages for i := 0; i < 5; i++ { - err = stream.Send(&FixtureRequest{Name: "hello"}) - require.NoError(t, err) + if err := stream.Send(&FixtureRequest{Name: "hello"}); err != io.EOF { + require.NoError(t, err) + } // Check that the handler was properly called res, err := stream.Recv() @@ -319,7 +325,7 @@ func TestPasslist(t *testing.T) { }) } -func newAppsecRig(traceClient bool, interceptorOpts ...Option) (*appsecRig, error) { +func newAppsecRig(t *testing.T, traceClient bool, interceptorOpts ...Option) (*appsecRig, error) { interceptorOpts = append([]InterceptorOption{WithServiceName("grpc")}, interceptorOpts...) server := grpc.NewServer( @@ -336,7 +342,7 @@ func newAppsecRig(traceClient bool, interceptorOpts ...Option) (*appsecRig, erro } _, port, _ := net.SplitHostPort(li.Addr().String()) // start our test fixtureServer. - go server.Serve(li) + go func() { assert.NoError(t, server.Serve(li)) }() opts := []grpc.DialOption{grpc.WithInsecure()} if traceClient { @@ -370,9 +376,9 @@ type appsecRig struct { client FixtureClient } -func (r *appsecRig) Close() { - r.server.Stop() - r.conn.Close() +func (r *appsecRig) Close() error { + defer r.server.GracefulStop() + return r.conn.Close() } type appsecFixtureServer struct { diff --git a/contrib/google.golang.org/grpc/grpc_test.go b/contrib/google.golang.org/grpc/grpc_test.go index 9d3313b8ee..2534630689 100644 --- a/contrib/google.golang.org/grpc/grpc_test.go +++ b/contrib/google.golang.org/grpc/grpc_test.go @@ -64,7 +64,7 @@ func TestUnary(t *testing.T) { t.Run(name, func(t *testing.T) { rig, err := newRig(true, WithServiceName("grpc"), WithRequestTags()) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(rig.Close()) }() client := rig.client mt := mocktracer.Start() @@ -226,7 +226,7 @@ func TestStreaming(t *testing.T) { rig, err := newRig(true, WithServiceName("grpc")) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() span, ctx := tracer.StartSpanFromContext(context.Background(), "a", tracer.ServiceName("b"), @@ -251,7 +251,7 @@ func TestStreaming(t *testing.T) { rig, err := newRig(true, WithServiceName("grpc"), WithStreamMessages(false)) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() span, ctx := tracer.StartSpanFromContext(context.Background(), "a", tracer.ServiceName("b"), @@ -276,7 +276,7 @@ func TestStreaming(t *testing.T) { rig, err := newRig(true, WithServiceName("grpc"), WithStreamCalls(false)) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() span, ctx := tracer.StartSpanFromContext(context.Background(), "a", tracer.ServiceName("b"), @@ -318,7 +318,7 @@ func TestSpanTree(t *testing.T) { rig, err := newRig(true, WithServiceName("grpc")) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(rig.Close()) }() { // Unary Ping rpc leading to trace: @@ -353,7 +353,7 @@ func TestSpanTree(t *testing.T) { rig, err := newRig(true, WithServiceName("grpc"), WithRequestTags(), WithMetadataTags()) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(rig.Close()) }() client := rig.client { @@ -438,7 +438,7 @@ func TestPass(t *testing.T) { rig, err := newRig(false, WithServiceName("grpc")) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(rig.Close()) }() client := rig.client ctx := context.Background() @@ -472,7 +472,7 @@ func TestPreservesMetadata(t *testing.T) { if err != nil { t.Fatalf("error setting up rig: %s", err) } - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "test-key", "test-value") @@ -500,7 +500,7 @@ func TestStreamSendsErrorCode(t *testing.T) { rig, err := newRig(true) require.NoError(t, err, "error setting up rig") - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() ctx := context.Background() @@ -529,7 +529,7 @@ func TestStreamSendsErrorCode(t *testing.T) { containsErrorCode = true } } - assert.True(t, containsErrorCode, "at least one span should contain error code") + assert.True(t, containsErrorCode, "at least one span should contain error code, the spans were:\n%v", spans) // ensure that last span contains error code also gotLastSpanCode := spans[len(spans)-1].Tag(tagCode) @@ -603,9 +603,9 @@ type rig struct { client FixtureClient } -func (r *rig) Close() { - r.server.Stop() - r.conn.Close() +func (r *rig) Close() error { + defer r.server.GracefulStop() + return r.conn.Close() } func newRigWithInterceptors( @@ -669,7 +669,7 @@ func TestAnalyticsSettings(t *testing.T) { if err != nil { t.Fatalf("error setting up rig: %s", err) } - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() client := rig.client resp, err := client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) @@ -1145,7 +1145,7 @@ func getGenSpansFn(traceClient, traceServer bool) namingschematest.GenSpansFn { } rig, err := newRigWithInterceptors(serverInterceptors, clientInterceptors) require.NoError(t, err) - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() _, err = rig.client.Ping(context.Background(), &FixtureRequest{Name: "pass"}) require.NoError(t, err) @@ -1312,7 +1312,7 @@ func TestIssue2050(t *testing.T) { } rig, err := newRigWithInterceptors(serverInterceptors, clientInterceptors) require.NoError(t, err) - defer rig.Close() + defer func() { assert.NoError(t, rig.Close()) }() // call tracer.Start after integration is initialized, to reproduce the issue tracer.Start(tracer.WithHTTPClient(httpClient)) diff --git a/internal/appsec/config/config.go b/internal/appsec/config/config.go index e2a0b7736a..88016c5e39 100644 --- a/internal/appsec/config/config.go +++ b/internal/appsec/config/config.go @@ -77,6 +77,15 @@ func WithRCConfig(cfg remoteconfig.ClientConfig) StartOption { } } +// WithWAFTimeout sets the AppSec WAF timeout to the specified cfg. This is primarily used for testing purposes. The +// recommended way to specify the WAF timeout for production workloads is by specifying the DD_APPSEC_WAF_TIMEOUT +// environment variable. +func WithWAFTimeout(timeout time.Duration) StartOption { + return func(c *Config) { + c.WAFTimeout = timeout + } +} + // IsEnabled returns true when appsec is enabled by the environment variable DD_APPSEC_ENABLED (as of strconv's boolean // parsing rules). When false, it also returns whether the env var was actually set or not. // In case of a parsing error, it returns a detailed error.