Skip to content

Commit

Permalink
backend: add config to enable/disable connectrpc endpoints (#955)
Browse files Browse the repository at this point in the history
* backend: add config to enable/disable connectrpc endpoints
  • Loading branch information
weeco authored Dec 7, 2023
1 parent cd96afc commit 7587a84
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 2 deletions.
104 changes: 104 additions & 0 deletions backend/pkg/api/connect/interceptor/endpoint_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package interceptor

import (
"context"
"errors"

"connectrpc.com/connect"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"go.uber.org/zap"

apierrors "github.com/redpanda-data/console/backend/pkg/api/connect/errors"
"github.com/redpanda-data/console/backend/pkg/config"
v1alpha1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1alpha1"
)

// EndpointCheckInterceptor checks whether incoming requests on the given endpoint
// should pass or not. An endpoint can be enabled or disabled via the Console config.
type EndpointCheckInterceptor struct {
cfg *config.ConsoleAPI
logger *zap.Logger
}

// NewEndpointCheckInterceptor creates a new EndpointCheckInterceptor.
func NewEndpointCheckInterceptor(cfg *config.ConsoleAPI, logger *zap.Logger) *EndpointCheckInterceptor {
return &EndpointCheckInterceptor{
cfg: cfg,
logger: logger,
}
}

// WrapUnary creates an interceptor to validate Connect requests.
func (in *EndpointCheckInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
procedure := req.Spec().Procedure

// For HTTP paths invoked via gRPC gateway this is expected not to be set.
// Let's try to retrieve it with gRPC gateway's runtime pkg.
if procedure == "" {
path, ok := runtime.RPCMethod(ctx)
if !ok {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
errors.New("failed to extract procedure name"),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String()))
}
procedure = path
}

if procedure == "" {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
errors.New("failed to retrieve procedure name"),
apierrors.NewErrorInfo(v1alpha1.Reason_REASON_CONSOLE_ERROR.String()))
}

notEnabledError := apierrors.NewConnectError(
connect.CodeUnimplemented,
errors.New("this endpoint has not been enabled"),
apierrors.NewErrorInfo(
v1alpha1.Reason_REASON_FEATURE_NOT_CONFIGURED.String(),
apierrors.KeyVal{
Key: "requested_procedure",
Value: procedure,
},
))

if !in.cfg.Enabled {
return nil, notEnabledError
}

// Check wildcard that allows all procedures first
if in.cfg.AllowsAllProcedures {
return next(ctx, req)
}

_, isAllowed := in.cfg.EnabledProceduresMap[procedure]
if !isAllowed {
return nil, notEnabledError
}

return next(ctx, req)
}
}

// WrapStreamingClient is the middleware handler for bidirectional requests from
// the client perspective.
func (*EndpointCheckInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return next
}

// WrapStreamingHandler is the middleware handler for bidirectional requests from
// the server handling perspective.
func (*EndpointCheckInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return next
}
1 change: 1 addition & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (api *API) setupConnectWithGRPCGateway(r chi.Router) {
// Base baseInterceptors configured in OSS.
baseInterceptors := []connect.Interceptor{
interceptor.NewRequestValidationInterceptor(v, api.Logger.Named("validator")),
interceptor.NewEndpointCheckInterceptor(&api.Cfg.Console.API, api.Logger.Named("endpoint_checker")),
}

// Setup gRPC-Gateway
Expand Down
1 change: 1 addition & 0 deletions backend/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func LoadConfig(logger *zap.Logger) (Config, error) {
}

unmarshalCfg.DecoderConfig.ErrorUnused = false
unmarshalCfg.DecoderConfig.ZeroFields = true // Empty default slices/maps if a value is configured
err = k.UnmarshalWithConf("", &cfg, unmarshalCfg)
if err != nil {
return Config{}, err
Expand Down
9 changes: 7 additions & 2 deletions backend/pkg/config/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type Console struct {
// implementation that satisfies the Console interface.
Enabled bool `yaml:"enabled"`
TopicDocumentation ConsoleTopicDocumentation `yaml:"topicDocumentation"`
API ConsoleAPI `yaml:"api"`
}

// SetDefaults for Console configs.
func (c *Console) SetDefaults() {
c.Enabled = true
c.TopicDocumentation.SetDefaults()
c.API.SetDefaults()
}

// RegisterFlags for sensitive Console configurations.
Expand All @@ -36,10 +38,13 @@ func (c *Console) RegisterFlags(f *flag.FlagSet) {

// Validate Console configurations.
func (c *Console) Validate() error {
err := c.TopicDocumentation.Validate()
if err != nil {
if err := c.TopicDocumentation.Validate(); err != nil {
return fmt.Errorf("failed to validate topic documentation config: %w", err)
}

if err := c.API.Validate(); err != nil {
return fmt.Errorf("failed to validate API config: %w", err)
}

return nil
}
73 changes: 73 additions & 0 deletions backend/pkg/config/console_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package config

import (
"fmt"
"strings"
)

// ConsoleAPI declares the configuration properties for managing the
// connect/grpc/grpc-gateway API endpoints.
type ConsoleAPI struct {
// Enabled determines whether any of the connect/grpc/grpc-gateway endpoints
// will be mounted to the server.
Enabled bool `yaml:"enabled"`

// EnabledProcedures is a list of procedure names that shall be allowed.
// If a procedure is called that is not on this list a descriptive error
// will be returned. A procedure name has the following format, regardless
// whether it's called via connect, gRPC or the HTTP interface:
// "/redpanda.api.dataplane.v1alpha1.UserService/ListUsers".
// You can use "*" to enable all procedures.
EnabledProcedures []string `yaml:"enabledProcedures"`

// ProceduresByKey is the procedures slice in a map, where the key is the procedure.
// This allows a more efficient look-up when comparing the requested procedure against
// the enabled procedures.
EnabledProceduresMap map[string]any

// AllowsAllProcedures is calculated after parsing the config. It is set to true if
// at least one string of EnabledProcedures contains the wildcard ("*").
AllowsAllProcedures bool
}

// Validate configuration options for the Console topic documentation feature.
func (c *ConsoleAPI) Validate() error {
if !c.Enabled {
return nil
}

for _, p := range c.EnabledProcedures {
if p == "*" {
continue
}
if !strings.HasPrefix(p, "/") {
return fmt.Errorf("every procedure must start with a slash. Entry %q does not start with a slash", p)
}
}

// Post-processing; config is loaded at this point
c.EnabledProceduresMap = make(map[string]any, len(c.EnabledProcedures))
for _, p := range c.EnabledProcedures {
c.EnabledProceduresMap[p] = struct{}{}
if p == "*" {
c.AllowsAllProcedures = true
}
}

return nil
}

// SetDefaults for ConsoleAPI.
func (c *ConsoleAPI) SetDefaults() {
c.Enabled = true
c.EnabledProcedures = []string{"*"}
}

0 comments on commit 7587a84

Please sign in to comment.