Skip to content

Commit

Permalink
backend: WIP on new list messages RPC handler
Browse files Browse the repository at this point in the history
  • Loading branch information
bojand committed Aug 4, 2023
1 parent e136229 commit e832750
Show file tree
Hide file tree
Showing 15 changed files with 448 additions and 67 deletions.
6 changes: 6 additions & 0 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ module github.com/redpanda-data/console/backend
go 1.20

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.31.0-20230721003620-2341cbb21958.1
connectrpc.com/connect v1.11.0
connectrpc.com/grpcreflect v1.2.0
github.com/basgys/goxml2json v1.1.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869
github.com/bufbuild/protovalidate-go v0.2.1
github.com/cloudhut/common v0.10.0
github.com/cloudhut/connect-client v0.0.0-20230417124247-963e5bcdfee7
github.com/docker/docker v24.0.4+incompatible
Expand Down Expand Up @@ -55,6 +57,7 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20230717121422-5aa5874ade95 // indirect
github.com/acomagu/bufpipe v1.0.4 // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand All @@ -74,6 +77,7 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.17.1 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down Expand Up @@ -125,6 +129,7 @@ require (
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/twmb/tlscfg v1.2.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
Expand All @@ -136,6 +141,7 @@ require (
golang.org/x/term v0.10.0 // indirect
golang.org/x/tools v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230717213848-3f92550aa753 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/grpc v1.56.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
13 changes: 13 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.31.0-20230721003620-2341cbb21958.1 h1:mnhf3O5uBs95ngTaQbGZfAnoZC0lM6yWkpdgjtqPbNE=
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.31.0-20230721003620-2341cbb21958.1/go.mod h1:xafc+XIsTxTy76GJQ1TKgvJWsSugFBqMaN27WhUblew=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -60,6 +62,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 h1:goHVqTbFX3AIo0tzGr14pgfAW2ZfPChKO21Z9MGf/gk=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
Expand Down Expand Up @@ -87,6 +91,8 @@ github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkN
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bufbuild/protovalidate-go v0.2.1 h1:pJr07sYhliyfj/STAM7hU4J3FKpVeLVKvOBmOTN8j+s=
github.com/bufbuild/protovalidate-go v0.2.1/go.mod h1:e7XXDtlxj5vlEyAgsrxpzayp4cEMKCSSb8ZCkin+MVA=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
Expand Down Expand Up @@ -154,6 +160,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
Expand Down Expand Up @@ -231,6 +238,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/cel-go v0.17.1 h1:s2151PDGy/eqpCI80/8dl4VL3xTkqI/YubXLXCFw0mw=
github.com/google/cel-go v0.17.1/go.mod h1:HXZKzB0LXqer5lHHgfWAnlYwJaQBDKMjxjulNQzhwhY=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -537,6 +546,8 @@ github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -967,6 +978,8 @@ google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20230717213848-3f92550aa753 h1:+VoAg+OKmWaommL56xmZSE2sUK8A7m6SUO7X89F2tbw=
google.golang.org/genproto v0.0.0-20230717213848-3f92550aa753/go.mod h1:iqkVr8IRpZ53gx1dEnWlCUIEwDWqWARWrbzpasaTNYM=
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 h1:XVeBY8d/FaK4848myy41HBqnDwvxeV3zMZhwN1TvAMU=
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:mPBs5jNgx2GuQGvFwUvVKqtn6HsUw9nP64BedgvqEsQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130 h1:2FZP5XuJY9zQyGM5N0rtovnoXjiMUEIUMvw0m9wlpLc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:8mL13HKkDa+IuJ8yruA3ci0q+0vsUz4m//+ottjwS5o=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
Expand Down
43 changes: 42 additions & 1 deletion backend/pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
package api

import (
"context"
"errors"
"io/fs"
"math"

connect_go "connectrpc.com/connect"
"connectrpc.com/grpcreflect"
"github.com/bufbuild/protovalidate-go"
"github.com/cloudhut/common/logging"
"github.com/cloudhut/common/rest"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/redpanda-data/console/backend/pkg/config"
"github.com/redpanda-data/console/backend/pkg/connect"
Expand Down Expand Up @@ -124,8 +129,21 @@ func (api *API) Start() {

mux := api.routes()

v, err := protovalidate.New()
if err != nil {
api.Logger.Fatal("failed to create proto validator", zap.Error(err))
}

interceptors := []connect_go.Interceptor{}

// we want the actual request validation after all authorization and permission checks
interceptors = append(interceptors, NewRequestValidationInterceptor(api.Logger, v))

// Connect service(s)
mux.Mount(consolev1alphaconnect.NewConsoleServiceHandler(api))
mux.Mount(consolev1alphaconnect.NewConsoleServiceHandler(
api,
connect_go.WithInterceptors(interceptors...),
))

// Connect reflection
reflector := grpcreflect.NewStaticReflector(consolev1alphaconnect.ConsoleServiceName)
Expand All @@ -146,3 +164,26 @@ func (api *API) Start() {
api.Logger.Fatal("REST Server returned an error", zap.Error(err))
}
}

func NewRequestValidationInterceptor(logger *zap.Logger, validator *protovalidate.Validator) connect_go.UnaryInterceptorFunc {
interceptor := func(next connect_go.UnaryFunc) connect_go.UnaryFunc {
return connect_go.UnaryFunc(func(
ctx context.Context,
req connect_go.AnyRequest,
) (connect_go.AnyResponse, error) {
msg, ok := req.Any().(protoreflect.ProtoMessage)
if !ok {
return nil, connect_go.NewError(connect_go.CodeInvalidArgument, errors.New("request is not a protocol buffer message"))
}

err := validator.Validate(msg)
if err != nil {
return nil, err
}

return next(ctx, req)
})
}

return connect_go.UnaryInterceptorFunc(interceptor)
}
68 changes: 65 additions & 3 deletions backend/pkg/api/handle_topic_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,69 @@ func (api *API) handleGetMessages() http.HandlerFunc {
}

// ListMessages consumes a Kafka topic and streams the Kafka records back.
func (api *API) ListMessages(context.Context, *connect.Request[v1alpha.ListMessagesRequest], *connect.ServerStream[v1alpha.ListMessagesResponse]) error {
api.Logger.Info("ListMessages handler")
return fmt.Errorf("not implemented yet")
func (api *API) ListMessages(ctx context.Context, req *connect.Request[v1alpha.ListMessagesRequest], stream *connect.ServerStream[v1alpha.ListMessagesResponse]) error {
logger := api.Logger

logger.Info("ListMessages handler")

ctx, cancel := context.WithCancel(ctx)
defer cancel()

lmq := ListMessagesRequest{
TopicName: req.Msg.GetTopic(),
StartOffset: req.Msg.GetStartOffset(),
StartTimestamp: req.Msg.GetStartTimestamp(),
PartitionID: req.Msg.GetPartitionId(),
MaxResults: int(req.Msg.GetMaxResults()),
FilterInterpreterCode: req.Msg.GetFilterInterpreterCode(),
Enterprise: req.Msg.GetEnterprise(),
}

// Check if logged in user is allowed to list messages for the given request
canViewMessages, restErr := api.Hooks.Authorization.CanViewTopicMessages(ctx, &lmq)
if restErr != nil || !canViewMessages {
return connect.NewError(connect.CodePermissionDenied, restErr.Err)
}

if len(lmq.FilterInterpreterCode) > 0 {
canUseMessageSearchFilters, restErr := api.Hooks.Authorization.CanUseMessageSearchFilters(ctx, &lmq)
if restErr != nil || !canUseMessageSearchFilters {
return connect.NewError(connect.CodePermissionDenied, restErr.Err)
}
}

interpreterCode, _ := lmq.DecodeInterpreterCode() // Error has been checked in validation function

// Request messages from kafka and return them once we got all the messages or the context is done
listReq := console.ListMessageRequest{
TopicName: lmq.TopicName,
PartitionID: lmq.PartitionID,
StartOffset: lmq.StartOffset,
StartTimestamp: lmq.StartTimestamp,
MessageCount: lmq.MaxResults,
FilterInterpreterCode: interpreterCode,
}
// api.Hooks.Authorization.PrintListMessagesAuditLog(r, &listReq)

// Use 30min duration if we want to search a whole topic or forward messages as they arrive
duration := 45 * time.Second
if listReq.FilterInterpreterCode != "" || listReq.StartOffset == console.StartOffsetNewest {
duration = 30 * time.Minute
}

childCtx, cancel := context.WithTimeout(ctx, duration)
defer cancel()

progress := &streamProgressReporter{
ctx: childCtx,
logger: api.Logger,
request: &listReq,
stream: stream,
statsMutex: &sync.RWMutex{},
messagesConsumed: 0,
bytesConsumed: 0,
}
progress.Start()

return api.ConsoleSvc.ListMessages(childCtx, listReq, progress)
}
90 changes: 90 additions & 0 deletions backend/pkg/api/stream_progress_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2022 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package api

import (
"context"
"sync"
"time"

"connectrpc.com/connect"
"go.uber.org/zap"

"github.com/redpanda-data/console/backend/pkg/console"
"github.com/redpanda-data/console/backend/pkg/kafka"
v1alpha "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/console/v1alpha"
)

// streamProgressReporter is in charge of sending status updates and messages regularly to the frontend.
type streamProgressReporter struct {
ctx context.Context
logger *zap.Logger
request *console.ListMessageRequest
stream *connect.ServerStream[v1alpha.ListMessagesResponse]

statsMutex *sync.RWMutex
messagesConsumed int64
bytesConsumed int64
}

func (p *streamProgressReporter) Start() {
// If search is disabled do not report progress regularly as each consumed message will be sent through the socket
// anyways
if p.request.FilterInterpreterCode == "" {
return
}

// Report the current progress every second to the user. If there's a search request which has to browse a whole
// topic it may take some time until there are messages. This go routine is in charge of keeping the user up to
// date about the progress Kowl made streaming the topic
go func() {
for {
select {
case <-p.ctx.Done():
return
default:
p.reportProgress()
}
time.Sleep(1 * time.Second)
}
}()
}

func (p *streamProgressReporter) reportProgress() {
}

func (p *streamProgressReporter) OnPhase(name string) {
}

func (p *streamProgressReporter) OnMessageConsumed(size int64) {
}

func (p *streamProgressReporter) OnMessage(message *kafka.TopicMessage) {
p.stream.Send(&v1alpha.ListMessagesResponse{
Value: &v1alpha.KafkaRecordPayload{
OriginalPayload: message.Value.Payload.Payload,
PayloadSize: int32(message.Value.Size),
DeserializedPayload: message.Value.Payload.Payload,
IsPayloadTooLarge: false, // TODO check for size
},
Key: &v1alpha.KafkaRecordPayload{
OriginalPayload: message.Key.Payload.Payload,
PayloadSize: int32(message.Key.Size),
DeserializedPayload: message.Key.Payload.Payload,
IsPayloadTooLarge: false, // TODO check for size
},
})
}

func (p *streamProgressReporter) OnComplete(elapsedMs int64, isCancelled bool) {
}

func (p *streamProgressReporter) OnError(message string) {
}
File renamed without changes.
File renamed without changes.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e832750

Please sign in to comment.