Skip to content

Commit

Permalink
backend: increase SR client timeout and limit concurrency when gettin… (
Browse files Browse the repository at this point in the history
#1030)

* backend: increase SR client timeout and limit concurrency when getting all schemas and subjects

* backend: use errgroup with limit in GetSchemasIndividually() and use singleflight for tryCreateProtoRegistry()

* backend: set schema registry client timeout to 10s
  • Loading branch information
bojand authored Jan 25, 2024
1 parent 99100c3 commit 6c8f916
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
27 changes: 22 additions & 5 deletions backend/pkg/proto/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"strings"
"sync"
"time"

//nolint:staticcheck // Switching to the google golang protojson comes with a few breaking changes.
"github.com/golang/protobuf/jsonpb"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/jhump/protoreflect/dynamic/msgregistry"
"github.com/twmb/franz-go/pkg/sr"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/protobuf/runtime/protoiface"

"github.com/redpanda-data/console/backend/pkg/config"
Expand Down Expand Up @@ -64,6 +66,8 @@ type Service struct {

registryMutex sync.RWMutex
registry *msgregistry.MessageRegistry

sfGroup singleflight.Group
}

// NewService creates a new proto.Service.
Expand Down Expand Up @@ -419,13 +423,23 @@ func (*Service) decodeConfluentBinaryWrapper(payload []byte) (*confluentEnvelope
}

func (s *Service) tryCreateProtoRegistry() {
err := s.createProtoRegistry(context.Background())
if err != nil {
s.logger.Error("failed to update proto registry", zap.Error(err))
}
// since this is triggered on proto schema registry refresh interval,
// as well as when git or file system is updated
// lets protect against too aggressive refresh interval
// or multiple concurrent triggers
s.sfGroup.Do("tryCreateProtoRegistry", func() (any, error) {
err := s.createProtoRegistry(context.Background())
if err != nil {
s.logger.Error("failed to update proto registry", zap.Error(err))
}

return nil, nil
})
}

func (s *Service) createProtoRegistry(ctx context.Context) error {
startTime := time.Now()

files := make(map[string]filesystem.File)

if s.gitSvc != nil {
Expand Down Expand Up @@ -503,10 +517,13 @@ func (s *Service) createProtoRegistry(ctx context.Context) error {
}
}

totalDuration := time.Since(startTime)

s.logger.Info("checked whether all mapped proto types also exist in the local registry",
zap.Int("types_found", foundTypes),
zap.Int("types_missing", missingTypes),
zap.Int("registered_types", len(fileDescriptors)))
zap.Int("registered_types", len(fileDescriptors)),
zap.Duration("operation_duration", totalDuration))

return nil
}
Expand Down
41 changes: 21 additions & 20 deletions backend/pkg/schema/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/go-resty/resty/v2"
"golang.org/x/sync/errgroup"

"github.com/redpanda-data/console/backend/pkg/config"
)
Expand Down Expand Up @@ -52,7 +54,7 @@ func newClient(cfg config.Schema) (*Client, error) {
SetHeader("Accept", "application/vnd.schemaregistry.v1+json").
SetHeader("Content-Type", "application/vnd.schemaregistry.v1+json").
SetError(&RestError{}).
SetTimeout(5 * time.Second)
SetTimeout(10 * time.Second)

// Configure credentials
if cfg.Username != "" {
Expand Down Expand Up @@ -691,30 +693,29 @@ func (c *Client) GetSchemasIndividually(ctx context.Context, showSoftDeleted boo
return nil, fmt.Errorf("failed to get subjects to fetch schemas for: %w", err)
}

type chRes struct {
schemaRes []SchemaVersionedResponse
err error
}
ch := make(chan chRes, len(subjectsRes.Subjects))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // limit max concurrency to 10 requests at a time

schemas := make([]SchemaVersionedResponse, 0, len(subjectsRes.Subjects))
mutex := sync.Mutex{}

// Describe all subjects concurrently one by one
for _, subject := range subjectsRes.Subjects {
go func(s string) {
srRes, err := c.GetSchemasBySubject(ctx, s, showSoftDeleted)
ch <- chRes{
schemaRes: srRes,
err: err,
subject := subject
g.Go(func() error {
srRes, err := c.GetSchemasBySubject(ctx, subject, showSoftDeleted)
if err != nil {
return err
}
}(subject)

mutex.Lock()
schemas = append(schemas, srRes...)
mutex.Unlock()
return err
})
}

schemas := make([]SchemaVersionedResponse, 0)
for i := 0; i < cap(ch); i++ {
res := <-ch
if res.err != nil {
return nil, fmt.Errorf("failed to fetch at least one schema: %w", res.err)
}
schemas = append(schemas, res.schemaRes...)
if err := g.Wait(); err != nil {
return nil, err
}

return schemas, nil
Expand Down

0 comments on commit 6c8f916

Please sign in to comment.