Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/google.golang.org/grpc: security rule passlist support #2589

Merged
merged 3 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions contrib/google.golang.org/grpc/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ import (
)

// UnaryHandler wrapper to use when AppSec is enabled to monitor its execution.
func appsecUnaryHandlerMiddleware(span ddtrace.Span, handler grpc.UnaryHandler) grpc.UnaryHandler {
func appsecUnaryHandlerMiddleware(method string, span ddtrace.Span, handler grpc.UnaryHandler) grpc.UnaryHandler {
trace.SetAppSecEnabledTags(span)
return func(ctx context.Context, req interface{}) (interface{}, error) {
var err error
var blocked bool
md, _ := metadata.FromIncomingContext(ctx)
clientIP := setClientIP(ctx, span, md)
args := types.HandlerOperationArgs{Metadata: md, ClientIP: clientIP}
args := types.HandlerOperationArgs{
Method: method,
Metadata: md,
ClientIP: clientIP,
}
ctx, op := grpcsec.StartHandlerOperation(ctx, args, nil, func(op *types.HandlerOperation) {
dyngo.OnData(op, func(a *sharedsec.Action) {
code, e := a.GRPC()(md)
Expand Down Expand Up @@ -67,7 +71,7 @@ func appsecUnaryHandlerMiddleware(span ddtrace.Span, handler grpc.UnaryHandler)
}

// StreamHandler wrapper to use when AppSec is enabled to monitor its execution.
func appsecStreamHandlerMiddleware(span ddtrace.Span, handler grpc.StreamHandler) grpc.StreamHandler {
func appsecStreamHandlerMiddleware(method string, span ddtrace.Span, handler grpc.StreamHandler) grpc.StreamHandler {
trace.SetAppSecEnabledTags(span)
return func(srv interface{}, stream grpc.ServerStream) error {
var err error
Expand All @@ -77,7 +81,12 @@ func appsecStreamHandlerMiddleware(span ddtrace.Span, handler grpc.StreamHandler
clientIP := setClientIP(ctx, span, md)
grpctrace.SetRequestMetadataTags(span, md)

ctx, op := grpcsec.StartHandlerOperation(ctx, types.HandlerOperationArgs{Metadata: md, ClientIP: clientIP}, nil, func(op *types.HandlerOperation) {
args := types.HandlerOperationArgs{
Method: method,
Metadata: md,
ClientIP: clientIP,
}
ctx, op := grpcsec.StartHandlerOperation(ctx, args, nil, func(op *types.HandlerOperation) {
dyngo.OnData(op, func(a *sharedsec.Action) {
code, e := a.GRPC()(md)
blocked = a.Blocking()
Expand Down
81 changes: 81 additions & 0 deletions contrib/google.golang.org/grpc/appsec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,87 @@ func TestUserBlocking(t *testing.T) {
})
}

func TestPasslist(t *testing.T) {
// This custom rule file includes two rules detecting the same sec event, a grpc metadata value containing "zouzou",
Julio-Guerra marked this conversation as resolved.
Show resolved Hide resolved
// but only one of them is passlisted (custom-1 is passlisted, custom-2 is not and must trigger).
t.Setenv("DD_APPSEC_RULES", "../../../internal/appsec/testdata/passlist.json")

appsec.Start()
defer appsec.Stop()
if !appsec.Enabled() {
t.Skip("appsec disabled")
}

setup := func() (FixtureClient, mocktracer.Tracer, func()) {
rig, err := newRig(false)
require.NoError(t, err)

mt := mocktracer.Start()

return rig.client, mt, func() {
rig.Close()
mt.Stop()
}
}

t.Run("unary", func(t *testing.T) {
client, mt, cleanup := setup()
defer cleanup()

// Send the payload triggering the sec event thanks to the "zouzou" value in the RPC metadata
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("dd-canary", "zouzou"))
res, err := client.Ping(ctx, &FixtureRequest{Name: "hello"})

// Check that the handler was properly called
require.NoError(t, err)
require.Equal(t, "passed", res.Message)

finished := mt.FinishedSpans()
require.Len(t, finished, 1)

// The service entry span must include the sec event
event, _ := finished[0].Tag("_dd.appsec.json").(string)
require.NotNil(t, event)
require.NotContains(t, event, "custom-1") // custom-1 is in the passlist of this gRPC method
require.Contains(t, event, "custom-2") // custom-2 is not passlisted and must trigger an event
})

t.Run("stream", func(t *testing.T) {
client, mt, cleanup := setup()
defer cleanup()

// Open the steam triggering the sec event thanks to the "zouzou" value in the RPC metadata
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("dd-canary", "zouzou"))
stream, err := client.StreamPing(ctx)
require.NoError(t, err)

// Send some messages
for i := 0; i < 5; i++ {
err = stream.Send(&FixtureRequest{Name: "hello"})
require.NoError(t, err)

// Check that the handler was properly called
res, err := stream.Recv()
require.Equal(t, "passed", res.Message)
require.NoError(t, err)
}

err = stream.CloseSend()
require.NoError(t, err)
// Flush the spans
stream.Recv()

finished := mt.FinishedSpans()
require.Len(t, finished, 12)

// The service entry span must include the sec event
event := finished[len(finished)-1].Tag("_dd.appsec.json")
require.NotNil(t, event, "the _dd.appsec.json tag was not found")
require.NotContains(t, event, "custom-1") // custom-1 is in the passlist of this gRPC method
require.Contains(t, event, "custom-2") // custom-2 is not passlisted and must trigger an event
})
}

func newAppsecRig(traceClient bool, interceptorOpts ...Option) (*appsecRig, error) {
interceptorOpts = append([]InterceptorOption{WithServiceName("grpc")}, interceptorOpts...)

Expand Down
4 changes: 2 additions & 2 deletions contrib/google.golang.org/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
}
defer func() { finishWithError(span, err, cfg) }()
if appsec.Enabled() {
handler = appsecStreamHandlerMiddleware(span, handler)
handler = appsecStreamHandlerMiddleware(info.FullMethod, span, handler)
}
}

Expand Down Expand Up @@ -155,7 +155,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
withMetadataTags(ctx, cfg, span)
withRequestTags(cfg, req, span)
if appsec.Enabled() {
handler = appsecUnaryHandlerMiddleware(span, handler)
handler = appsecUnaryHandlerMiddleware(info.FullMethod, span, handler)
}
resp, err := handler(ctx, req)
finishWithError(span, err, cfg)
Expand Down
13 changes: 12 additions & 1 deletion internal/appsec/emitter/grpcsec/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ type (
trace.TagsHolder
trace.SecurityEventsHolder
}

// HandlerOperationArgs is the grpc handler arguments.
HandlerOperationArgs struct {
// Message received by the gRPC handler.
// Method is the gRPC method name.
// Corresponds to the address `grpc.server.method`.
Method string

// RPC metadata received by the gRPC handler.
// Corresponds to the address `grpc.server.request.metadata`.
Metadata map[string][]string

// ClientIP is the IP address of the client that initiated the gRPC request.
// Corresponds to the address `http.client_ip`.
ClientIP netip.Addr
}

// HandlerOperationRes is the grpc handler results. Empty as of today.
HandlerOperationRes struct{}

Expand All @@ -51,9 +60,11 @@ type (
ReceiveOperation struct {
dyngo.Operation
}

// ReceiveOperationArgs is the gRPC handler receive operation arguments
// Empty as of today.
ReceiveOperationArgs struct{}

// ReceiveOperationRes is the gRPC handler receive operation results which
// contains the message the gRPC handler received.
ReceiveOperationRes struct {
Expand Down
102 changes: 59 additions & 43 deletions internal/appsec/listener/grpcsec/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/DataDog/appsec-internal-go/limiter"
waf "github.com/DataDog/go-libddwaf/v2"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/config"
"gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo"
Expand All @@ -26,18 +27,20 @@ import (

// gRPC rule addresses currently supported by the WAF
const (
GRPCServerRequestMessage = "grpc.server.request.message"
GRPCServerRequestMetadata = "grpc.server.request.metadata"
HTTPClientIPAddr = httpsec.HTTPClientIPAddr
UserIDAddr = httpsec.UserIDAddr
GRPCServerMethodAddr = "grpc.server.method"
GRPCServerRequestMessageAddr = "grpc.server.request.message"
GRPCServerRequestMetadataAddr = "grpc.server.request.metadata"
HTTPClientIPAddr = httpsec.HTTPClientIPAddr
UserIDAddr = httpsec.UserIDAddr
)

// List of gRPC rule addresses currently supported by the WAF
var supportedAddresses = listener.AddressSet{
GRPCServerRequestMessage: {},
GRPCServerRequestMetadata: {},
HTTPClientIPAddr: {},
UserIDAddr: {},
GRPCServerMethodAddr: {},
GRPCServerRequestMessageAddr: {},
GRPCServerRequestMetadataAddr: {},
HTTPClientIPAddr: {},
UserIDAddr: {},
}

// Install registers the gRPC WAF Event Listener on the given root operation.
Expand Down Expand Up @@ -80,8 +83,8 @@ func newWafEventListener(wafHandle *waf.Handle, actions sharedsec.Actions, cfg *
}
}

// NewWAFEventListener returns the WAF event listener to register in order
// to enable it.
// NewWAFEventListener returns the WAF event listener to register in order to enable it, listening to gRPC handler
// events.
func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types.HandlerOperationArgs) {
// Limit the maximum number of security events, as a streaming RPC could
// receive unlimited number of messages where we could find security events
Expand All @@ -100,35 +103,36 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types
return
}

// OnUserIDOperationStart happens when appsec.SetUser() is called. We run the WAF and apply actions to
// see if the associated user should be blocked. Since we don't control the execution flow in this case
// (SetUser is SDK), we delegate the responsibility of interrupting the handler to the user.
dyngo.On(op, func(userIDOp *sharedsec.UserIDOperation, args sharedsec.UserIDOperationArgs) {
values := make(map[string]any, 1)
for addr := range l.addresses {
if addr == UserIDAddr {
values[UserIDAddr] = args.UserID
// Listen to the UserID address if the WAF rules are using it
if l.isSecAddressListened(UserIDAddr) {
Julio-Guerra marked this conversation as resolved.
Show resolved Hide resolved
// UserIDOperation happens when appsec.SetUser() is called. We run the WAF and apply actions to
// see if the associated user should be blocked. Since we don't control the execution flow in this case
// (SetUser is SDK), we delegate the responsibility of interrupting the handler to the user.
dyngo.On(op, func(userIDOp *sharedsec.UserIDOperation, args sharedsec.UserIDOperationArgs) {
values := map[string]any{
UserIDAddr: args.UserID,
}
}
wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values}, l.config.WAFTimeout)
if wafResult.HasActions() || wafResult.HasEvents() {
for _, id := range wafResult.Actions {
if a, ok := l.actions[id]; ok && a.Blocking() {
code, err := a.GRPC()(map[string][]string{})
dyngo.EmitData(userIDOp, types.NewMonitoringError(err.Error(), code))
wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values}, l.config.WAFTimeout)
if wafResult.HasActions() || wafResult.HasEvents() {
for _, id := range wafResult.Actions {
if a, ok := l.actions[id]; ok && a.Blocking() {
code, err := a.GRPC()(map[string][]string{})
dyngo.EmitData(userIDOp, types.NewMonitoringError(err.Error(), code))
}
}
shared.AddSecurityEvents(op, l.limiter, wafResult.Events)
log.Debug("appsec: WAF detected an authenticated user attack: %s", args.UserID)
}
shared.AddSecurityEvents(op, l.limiter, wafResult.Events)
log.Debug("appsec: WAF detected an authenticated user attack: %s", args.UserID)
}
})
})
}

// The same address is used for gRPC and http when it comes to client ip
values := make(map[string]any, 1)
for addr := range l.addresses {
if addr == HTTPClientIPAddr && handlerArgs.ClientIP.IsValid() {
values[HTTPClientIPAddr] = handlerArgs.ClientIP.String()
}
values := make(map[string]any, 2) // 2 because the method and client ip addresses are commonly present in the rules
if l.isSecAddressListened(GRPCServerMethodAddr) {
// Note that this address is passed asap for the passlist, which are created per grpc method
values[GRPCServerMethodAddr] = handlerArgs.Method
}
if l.isSecAddressListened(HTTPClientIPAddr) && handlerArgs.ClientIP.IsValid() {
values[HTTPClientIPAddr] = handlerArgs.ClientIP.String()
}

wafResult := shared.RunWAF(wafCtx, waf.RunAddressData{Persistent: values}, l.config.WAFTimeout)
Expand All @@ -142,6 +146,7 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types
}
}

// When the gRPC handler receives a message
dyngo.OnFinish(op, func(_ types.ReceiveOperation, res types.ReceiveOperationRes) {
if nbEvents.Load() == maxWAFEventsPerRequest {
logOnce.Do(func() {
Expand All @@ -150,16 +155,21 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types
return
}

// Run the WAF on the rule addresses available in the args
// Note that we don't check if the address is present in the rules
// as we only support one at the moment, so this callback cannot be
// set when the address is not present.
values := waf.RunAddressData{
Ephemeral: map[string]any{GRPCServerRequestMessage: res.Message},
// Run the WAF on the rule addresses available and listened to by the sec rules
var values waf.RunAddressData
// Add the gRPC message to the values if the WAF rules are using it.
// Note that it is an ephemeral address as they can happen more than once per RPC.
if l.isSecAddressListened(GRPCServerRequestMessageAddr) {
values.Ephemeral = map[string]any{GRPCServerRequestMessageAddr: res.Message}
}
if md := handlerArgs.Metadata; len(md) > 0 {
values.Persistent = map[string]any{GRPCServerRequestMetadata: md}

// Add the metadata to the values if the WAF rules are using it.
if l.isSecAddressListened(GRPCServerRequestMetadataAddr) {
if md := handlerArgs.Metadata; len(md) > 0 {
values.Persistent = map[string]any{GRPCServerRequestMetadataAddr: md}
}
}

// Run the WAF, ignoring the returned actions - if any - since blocking after the request handler's
// response is not supported at the moment.
wafResult := shared.RunWAF(wafCtx, values, l.config.WAFTimeout)
Expand All @@ -173,6 +183,7 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types
}
})

// When the gRPC handler finishes
dyngo.OnFinish(op, func(op *types.HandlerOperation, _ types.HandlerOperationRes) {
defer wafCtx.Close()
overallRuntimeNs, internalRuntimeNs := wafCtx.TotalRuntime()
Expand All @@ -187,3 +198,8 @@ func (l *wafEventListener) onEvent(op *types.HandlerOperation, handlerArgs types
shared.AddSecurityEvents(op, l.limiter, events)
})
}

func (l *wafEventListener) isSecAddressListened(addr string) bool {
_, listened := l.addresses[addr]
return listened
}
Loading
Loading