From 6c584477e1f21c4f03732837567ab6b7aa3d1829 Mon Sep 17 00:00:00 2001 From: Franco Posa Date: Tue, 21 Nov 2023 02:18:28 -0600 Subject: [PATCH] vendor dskit: update httpgrpc utils import paths (#3157) * vendor dskit: update httpgrpc utils import paths * fix httpgrpc WriteError usage * update dskit dep in serverless go mod --- cmd/tempo-serverless/cloud-run/go.mod | 2 +- cmd/tempo-serverless/cloud-run/go.sum | 4 +- cmd/tempo-serverless/lambda/go.mod | 2 +- cmd/tempo-serverless/lambda/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- modules/frontend/handler.go | 7 +- modules/frontend/transport/roundtripper.go | 3 +- .../grafana/dskit/concurrency/worker.go | 38 +++++ .../grafana/dskit/httpgrpc/httpgrpc.go | 99 ++++++++++-- .../grafana/dskit/httpgrpc/server/server.go | 84 ++-------- .../dskit/kv/memberlist/memberlist_client.go | 146 ++++++++++++++---- vendor/github.com/grafana/dskit/ring/batch.go | 78 ++++++---- vendor/modules.txt | 2 +- 14 files changed, 315 insertions(+), 160 deletions(-) create mode 100644 vendor/github.com/grafana/dskit/concurrency/worker.go diff --git a/cmd/tempo-serverless/cloud-run/go.mod b/cmd/tempo-serverless/cloud-run/go.mod index 34dc5477fe9..71316e0fe40 100644 --- a/cmd/tempo-serverless/cloud-run/go.mod +++ b/cmd/tempo-serverless/cloud-run/go.mod @@ -62,7 +62,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 // indirect + github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f // indirect github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/cmd/tempo-serverless/cloud-run/go.sum b/cmd/tempo-serverless/cloud-run/go.sum index 41309eead35..f552a5c0abf 100644 --- a/cmd/tempo-serverless/cloud-run/go.sum +++ b/cmd/tempo-serverless/cloud-run/go.sum @@ -275,8 +275,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 h1:uphjpuVv+q/D9vdBfQyGBueeZlwgep7CFS7ik+YP/Xo= -github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= +github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f h1:gyojr97YeWZ70pKNakWv5/tKwBHuLy3icnIeCo9gQr4= +github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= diff --git a/cmd/tempo-serverless/lambda/go.mod b/cmd/tempo-serverless/lambda/go.mod index 8ca67df3202..f3af86739d7 100644 --- a/cmd/tempo-serverless/lambda/go.mod +++ b/cmd/tempo-serverless/lambda/go.mod @@ -65,7 +65,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 // indirect + github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f // indirect github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/cmd/tempo-serverless/lambda/go.sum b/cmd/tempo-serverless/lambda/go.sum index ee021d8b9da..899ce990b39 100644 --- a/cmd/tempo-serverless/lambda/go.sum +++ b/cmd/tempo-serverless/lambda/go.sum @@ -279,8 +279,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 h1:uphjpuVv+q/D9vdBfQyGBueeZlwgep7CFS7ik+YP/Xo= -github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= +github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f h1:gyojr97YeWZ70pKNakWv5/tKwBHuLy3icnIeCo9gQr4= +github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= diff --git a/go.mod b/go.mod index 553391d6eb8..b068bcf627c 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.1 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 + github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f github.com/grafana/e2e v0.1.1 github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/go-hclog v1.5.0 diff --git a/go.sum b/go.sum index 6329fc56892..c314d317f39 100644 --- a/go.sum +++ b/go.sum @@ -511,8 +511,8 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 h1:uphjpuVv+q/D9vdBfQyGBueeZlwgep7CFS7ik+YP/Xo= -github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= +github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f h1:gyojr97YeWZ70pKNakWv5/tKwBHuLy3icnIeCo9gQr4= +github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f/go.mod h1:8dsy5tQOkeNQyjXpm5mQsbCu3H5uzeBD35MzRQFznKU= github.com/grafana/e2e v0.1.1 h1:/b6xcv5BtoBnx8cZnCiey9DbjEc8z7gXHO5edoeRYxc= github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1ATaE= github.com/grafana/gomemcache v0.0.0-20231023152154-6947259a0586 h1:/of8Z8taCPftShATouOrBVy6GaTTjgQd/VfNiZp/VXQ= diff --git a/modules/frontend/handler.go b/modules/frontend/handler.go index 9678552be7e..2a06011e5e7 100644 --- a/modules/frontend/handler.go +++ b/modules/frontend/handler.go @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" - "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/user" "github.com/opentracing/opentracing-go" @@ -151,8 +150,8 @@ func copyHeader(dst, src http.Header) { } } -// writeError handles writing errors to the http.ResponseWriter. It uses weavework common -// server.WriteError() to handle httpgrc errors. The handler handles all incoming HTTP requests +// writeError handles writing errors to the http.ResponseWriter. It uses dskit's +// httpgrpc.WriteError() to handle httpgrc errors. The handler handles all incoming HTTP requests // to the query frontend which then distributes them via httpgrpc to the queriers. As a result // httpgrpc errors can bubble up to here and should be translated to http errors. It returns // httpgrpc error. @@ -164,7 +163,7 @@ func writeError(w http.ResponseWriter, err error) error { } else if isRequestBodyTooLarge(err) { err = errRequestEntityTooLarge } - server.WriteError(w, err) + httpgrpc.WriteError(w, err) return err } diff --git a/modules/frontend/transport/roundtripper.go b/modules/frontend/transport/roundtripper.go index 8d899364955..c8fe2db1489 100644 --- a/modules/frontend/transport/roundtripper.go +++ b/modules/frontend/transport/roundtripper.go @@ -7,7 +7,6 @@ import ( "net/http" "github.com/grafana/dskit/httpgrpc" - "github.com/grafana/dskit/httpgrpc/server" ) // GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages. @@ -34,7 +33,7 @@ func (b *buffer) Bytes() []byte { } func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) { - req, err := server.HTTPRequest(r) + req, err := httpgrpc.FromHTTPRequest(r) if err != nil { return nil, err } diff --git a/vendor/github.com/grafana/dskit/concurrency/worker.go b/vendor/github.com/grafana/dskit/concurrency/worker.go new file mode 100644 index 00000000000..f40f0334800 --- /dev/null +++ b/vendor/github.com/grafana/dskit/concurrency/worker.go @@ -0,0 +1,38 @@ +package concurrency + +// NewReusableGoroutinesPool creates a new worker pool with the given size. +// These workers will run the workloads passed through Go() calls. +// If all workers are busy, Go() will spawn a new goroutine to run the workload. +func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { + p := &ReusableGoroutinesPool{ + jobs: make(chan func()), + } + for i := 0; i < size; i++ { + go func() { + for f := range p.jobs { + f() + } + }() + } + return p +} + +type ReusableGoroutinesPool struct { + jobs chan func() +} + +// Go will run the given function in a worker of the pool. +// If all workers are busy, Go() will spawn a new goroutine to run the workload. +func (p *ReusableGoroutinesPool) Go(f func()) { + select { + case p.jobs <- f: + default: + go f() + } +} + +// Close stops the workers of the pool. +// No new Do() calls should be performed after calling Close(). +// Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. +// Close is intended to be used in tests to ensure that no goroutines are leaked. +func (p *ReusableGoroutinesPool) Close() { close(p.jobs) } diff --git a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go index 213c261b700..e1f044d8650 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/httpgrpc.go @@ -5,8 +5,11 @@ package httpgrpc import ( + "bytes" "context" "fmt" + "io" + "net/http" "github.com/go-kit/log/level" spb "github.com/gogo/googleapis/google/rpc" @@ -15,10 +18,92 @@ import ( "google.golang.org/grpc/metadata" "github.com/grafana/dskit/grpcutil" - "github.com/grafana/dskit/log" ) +const ( + MetadataMethod = "httpgrpc-method" + MetadataURL = "httpgrpc-url" +) + +// AppendRequestMetadataToContext appends metadata of HTTPRequest into gRPC metadata. +func AppendRequestMetadataToContext(ctx context.Context, req *HTTPRequest) context.Context { + return metadata.AppendToOutgoingContext(ctx, + MetadataMethod, req.Method, + MetadataURL, req.Url) +} + +type nopCloser struct { + *bytes.Buffer +} + +func (nopCloser) Close() error { return nil } + +// BytesBuffer returns the underlaying `bytes.buffer` used to build this io.ReadCloser. +func (n nopCloser) BytesBuffer() *bytes.Buffer { return n.Buffer } + +// FromHTTPRequest converts an ordinary http.Request into an httpgrpc.HTTPRequest +func FromHTTPRequest(r *http.Request) (*HTTPRequest, error) { + body, err := io.ReadAll(r.Body) + if err != nil { + return nil, err + } + return &HTTPRequest{ + Method: r.Method, + Url: r.RequestURI, + Body: body, + Headers: FromHeader(r.Header), + }, nil +} + +// ToHTTPRequest converts httpgrpc.HTTPRequest to http.Request. +func ToHTTPRequest(ctx context.Context, r *HTTPRequest) (*http.Request, error) { + req, err := http.NewRequest(r.Method, r.Url, nopCloser{Buffer: bytes.NewBuffer(r.Body)}) + if err != nil { + return nil, err + } + ToHeader(r.Headers, req.Header) + req = req.WithContext(ctx) + req.RequestURI = r.Url + req.ContentLength = int64(len(r.Body)) + return req, nil +} + +// WriteResponse converts an httpgrpc response to an HTTP one +func WriteResponse(w http.ResponseWriter, resp *HTTPResponse) error { + ToHeader(resp.Headers, w.Header()) + w.WriteHeader(int(resp.Code)) + _, err := w.Write(resp.Body) + return err +} + +// WriteError converts an httpgrpc error to an HTTP one +func WriteError(w http.ResponseWriter, err error) { + resp, ok := HTTPResponseFromError(err) + if ok { + _ = WriteResponse(w, resp) + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func ToHeader(hs []*Header, header http.Header) { + for _, h := range hs { + header[h.Key] = h.Values + } +} + +func FromHeader(hs http.Header) []*Header { + result := make([]*Header, 0, len(hs)) + for k, vs := range hs { + result = append(result, &Header{ + Key: k, + Values: vs, + }) + } + return result +} + // Errorf returns a HTTP gRPC error than is correctly forwarded over // gRPC, and can eventually be converted back to a HTTP response with // HTTPResponseFromError. @@ -63,15 +148,3 @@ func HTTPResponseFromError(err error) (*HTTPResponse, bool) { return &resp, true } - -const ( - MetadataMethod = "httpgrpc-method" - MetadataURL = "httpgrpc-url" -) - -// AppendRequestMetadataToContext appends metadata of HTTPRequest into gRPC metadata. -func AppendRequestMetadataToContext(ctx context.Context, req *HTTPRequest) context.Context { - return metadata.AppendToOutgoingContext(ctx, - MetadataMethod, req.Method, - MetadataURL, req.Url) -} diff --git a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go index 35656c94342..c642f7fa13f 100644 --- a/vendor/github.com/grafana/dskit/httpgrpc/server/server.go +++ b/vendor/github.com/grafana/dskit/httpgrpc/server/server.go @@ -5,10 +5,8 @@ package server import ( - "bytes" "context" "fmt" - "io" "net" "net/http" "net/http/httptest" @@ -47,37 +45,31 @@ func NewServer(handler http.Handler) *Server { } } -type nopCloser struct { - *bytes.Buffer -} - -func (nopCloser) Close() error { return nil } - -// BytesBuffer returns the underlaying `bytes.buffer` used to build this io.ReadCloser. -func (n nopCloser) BytesBuffer() *bytes.Buffer { return n.Buffer } - // Handle implements HTTPServer. func (s Server) Handle(ctx context.Context, r *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { - req, err := http.NewRequest(r.Method, r.Url, nopCloser{Buffer: bytes.NewBuffer(r.Body)}) + req, err := httpgrpc.ToHTTPRequest(ctx, r) if err != nil { return nil, err } - toHeader(r.Headers, req.Header) - req = req.WithContext(ctx) - req.RequestURI = r.Url - req.ContentLength = int64(len(r.Body)) recorder := httptest.NewRecorder() s.handler.ServeHTTP(recorder, req) header := recorder.Header() + + doNotLogError := false + if _, ok := header[DoNotLogErrorHeaderKey]; ok { + doNotLogError = true + header.Del(DoNotLogErrorHeaderKey) // remove before converting to httpgrpc resp + } + resp := &httpgrpc.HTTPResponse{ Code: int32(recorder.Code), - Headers: fromHeader(header), + Headers: httpgrpc.FromHeader(header), Body: recorder.Body.Bytes(), } if recorder.Code/100 == 5 { err := httpgrpc.ErrorFromHTTPResponse(resp) - if _, ok := header[DoNotLogErrorHeaderKey]; ok { + if doNotLogError { err = middleware.DoNotLogError{Err: err} } return nil, err @@ -165,38 +157,6 @@ func NewClient(address string) (*Client, error) { }, nil } -// HTTPRequest wraps an ordinary HTTPRequest with a gRPC one -func HTTPRequest(r *http.Request) (*httpgrpc.HTTPRequest, error) { - body, err := io.ReadAll(r.Body) - if err != nil { - return nil, err - } - return &httpgrpc.HTTPRequest{ - Method: r.Method, - Url: r.RequestURI, - Body: body, - Headers: fromHeader(r.Header), - }, nil -} - -// WriteResponse converts an httpgrpc response to an HTTP one -func WriteResponse(w http.ResponseWriter, resp *httpgrpc.HTTPResponse) error { - toHeader(resp.Headers, w.Header()) - w.WriteHeader(int(resp.Code)) - _, err := w.Write(resp.Body) - return err -} - -// WriteError converts an httpgrpc error to an HTTP one -func WriteError(w http.ResponseWriter, err error) { - resp, ok := httpgrpc.HTTPResponseFromError(err) - if ok { - _ = WriteResponse(w, resp) - } else { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - // ServeHTTP implements http.Handler func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { if tracer := opentracing.GlobalTracer(); tracer != nil { @@ -207,7 +167,7 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - req, err := HTTPRequest(r) + req, err := httpgrpc.FromHTTPRequest(r) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -224,28 +184,8 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - if err := WriteResponse(w, resp); err != nil { + if err := httpgrpc.WriteResponse(w, resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } - -func toHeader(hs []*httpgrpc.Header, header http.Header) { - for _, h := range hs { - header[h.Key] = h.Values - } -} - -func fromHeader(hs http.Header) []*httpgrpc.Header { - result := make([]*httpgrpc.Header, 0, len(hs)) - for k, vs := range hs { - if k == DoNotLogErrorHeaderKey { - continue - } - result = append(result, &httpgrpc.Header{ - Key: k, - Values: vs, - }) - } - return result -} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 30a27531fd0..693964b5ad0 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -222,7 +222,7 @@ func generateRandomSuffix(logger log.Logger) string { // If joining of the cluster if configured, it is done in Running state, and if join fails and Abort flag is set, service // fails. type KV struct { - services.Service + services.NamedService cfg KVConfig logger log.Logger @@ -374,7 +374,8 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer mlkv.codecs[c.CodecID()] = c } - mlkv.Service = services.NewBasicService(mlkv.starting, mlkv.running, mlkv.stopping) + mlkv.NamedService = services.NewBasicService(mlkv.starting, mlkv.running, mlkv.stopping).WithName("memberlist_kv") + return mlkv } @@ -485,17 +486,17 @@ func (m *KV) running(ctx context.Context) error { tickerChan = t.C } + logger := log.With(m.logger, "phase", "periodic_rejoin") for { select { case <-tickerChan: - members := m.discoverMembers(ctx, m.cfg.JoinMembers) - - reached, err := m.memberlist.Join(members) + const numAttempts = 1 // don't retry if resolution fails, we will try again next time + reached, err := m.joinMembersWithRetries(ctx, numAttempts, logger) if err == nil { - level.Info(m.logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached) + level.Info(logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached) } else { // Don't report error from rejoin, otherwise KV service would be stopped completely. - level.Warn(m.logger).Log("msg", "re-joining memberlist cluster failed", "err", err) + level.Warn(logger).Log("msg", "re-joining memberlist cluster failed", "err", err, "next_try_in", m.cfg.RejoinInterval) } case <-ctx.Done(): @@ -540,7 +541,7 @@ func (m *KV) fastJoinMembersOnStartup(ctx context.Context) { level.Info(m.logger).Log("msg", "memberlist fast-join starting", "nodes_found", len(nodes), "to_join", toJoin) totalJoined := 0 - for toJoin > 0 && len(nodes) > 0 { + for toJoin > 0 && len(nodes) > 0 && ctx.Err() == nil { reached, err := m.memberlist.Join(nodes[0:1]) // Try to join single node only. if err != nil { level.Debug(m.logger).Log("msg", "fast-joining node failed", "node", nodes[0], "err", err) @@ -568,41 +569,122 @@ func (m *KV) joinMembersOnStartup(ctx context.Context) bool { return true } + logger := log.With(m.logger, "phase", "startup") + level.Info(logger).Log("msg", "joining memberlist cluster", "join_members", strings.Join(m.cfg.JoinMembers, ",")) startTime := time.Now() + reached, err := m.joinMembersWithRetries(ctx, m.cfg.MaxJoinRetries, logger) + if err != nil { + level.Error(logger).Log("msg", "joining memberlist cluster failed", "err", err, "elapsed_time", time.Since(startTime)) + return false + } + level.Info(logger).Log("msg", "joining memberlist cluster succeeded", "reached_nodes", reached, "elapsed_time", time.Since(startTime)) + return true +} - level.Info(m.logger).Log("msg", "joining memberlist cluster", "join_members", strings.Join(m.cfg.JoinMembers, ",")) - - cfg := backoff.Config{ - MinBackoff: m.cfg.MinJoinBackoff, - MaxBackoff: m.cfg.MaxJoinBackoff, - MaxRetries: m.cfg.MaxJoinRetries, +// joinMembersWithRetries joins m.cfg.JoinMembers 100 at a time. After each batch of 100 it rediscoveres the members. +// This helps when the list of members is big and by the time we reach the end the originally resolved addresses may be obsolete. +// joinMembersWithRetries returns an error iff it couldn't successfully join any node OR the context was cancelled. +func (m *KV) joinMembersWithRetries(ctx context.Context, numAttempts int, logger log.Logger) (int, error) { + var ( + cfg = backoff.Config{ + MinBackoff: m.cfg.MinJoinBackoff, + MaxBackoff: m.cfg.MaxJoinBackoff, + MaxRetries: numAttempts, + } + boff = backoff.New(ctx, cfg) + err error + successfullyJoined = 0 + ) + + for ; boff.Ongoing(); boff.Wait() { + successfullyJoined, err = m.joinMembersInBatches(ctx) + if successfullyJoined > 0 { + // If there are _some_ successful joins, then we can consider the join done. + // Mimicking the Join semantics we return an error only when we couldn't join any node at all + err = nil + break + } + level.Warn(logger).Log("msg", "joining memberlist cluster", "attempts", boff.NumRetries()+1, "max_attempts", numAttempts, "err", err) + } + if err == nil && boff.Err() != nil { + err = fmt.Errorf("joining memberlist: %w", boff.Err()) } - boff := backoff.New(ctx, cfg) - var lastErr error + return successfullyJoined, err +} - for boff.Ongoing() { - // We rejoin all nodes, including those that were joined during "fast-join". - // This is harmless and simpler. - nodes := m.discoverMembers(ctx, m.cfg.JoinMembers) +// joinMembersInBatches joins m.cfg.JoinMembers and re-resolves the address of m.cfg.JoinMembers after joining 100 nodes. +// joinMembersInBatches returns the number of nodes joined. joinMembersInBatches returns an error only when the +// number of joined nodes is 0. +func (m *KV) joinMembersInBatches(ctx context.Context) (int, error) { + const batchSize = 100 + var ( + attemptedNodes = make(map[string]bool) + successfullyJoined = 0 + lastErr error + batch = make([]string, batchSize) + nodes []string + ) + for moreAvailableNodes := true; ctx.Err() == nil && moreAvailableNodes; { + // Rediscover nodes and try to join a subset of them with each batch. + // When the list of nodes is large by the time we reach the end of the list some of the + // IPs can be unreachable. + newlyResolved := m.discoverMembers(ctx, m.cfg.JoinMembers) + if len(newlyResolved) > 0 { + // If the resolution fails we keep using the nodes list from the last resolution. + // If that failed too, then we fail the join attempt. + nodes = newlyResolved + } - if len(nodes) > 0 { - reached, err := m.memberlist.Join(nodes) // err is only returned if reached==0. - if err == nil { - level.Info(m.logger).Log("msg", "joining memberlist cluster succeeded", "reached_nodes", reached, "elapsed_time", time.Since(startTime)) - return true + // Prepare batch + batch = batch[:0] + moreAvailableNodes = false + for _, n := range nodes { + if attemptedNodes[n] { + continue } - level.Warn(m.logger).Log("msg", "joining memberlist cluster: failed to reach any nodes", "retries", boff.NumRetries(), "err", err) - lastErr = err - } else { - level.Warn(m.logger).Log("msg", "joining memberlist cluster: found no nodes to join", "retries", boff.NumRetries()) + if len(batch) >= batchSize { + moreAvailableNodes = true + break + } + batch = append(batch, n) + attemptedNodes[n] = true } - boff.Wait() + // Join batch + joinedInBatch, err := m.joinMembersBatch(ctx, batch) + if err != nil { + lastErr = err + } + successfullyJoined += joinedInBatch + } + if successfullyJoined > 0 { + return successfullyJoined, nil + } + if successfullyJoined == 0 && lastErr == nil { + return 0, errors.New("found no nodes to join") } + return 0, lastErr +} - level.Error(m.logger).Log("msg", "joining memberlist cluster failed", "last_error", lastErr, "elapsed_time", time.Since(startTime)) - return false +// joinMembersBatch returns an error only if it couldn't successfully join any nodes or if ctx is cancelled. +func (m *KV) joinMembersBatch(ctx context.Context, nodes []string) (successfullyJoined int, lastErr error) { + for nodeIdx := range nodes { + if ctx.Err() != nil { + return successfullyJoined, fmt.Errorf("joining batch: %w", context.Cause(ctx)) + } + // Attempt to join a single node. + // The cost of calling Join shouldn't be different between passing all nodes in one invocation versus passing a single node per invocation. + reached, err := m.memberlist.Join(nodes[nodeIdx : nodeIdx+1]) + successfullyJoined += reached + if err != nil { + lastErr = err + } + } + if successfullyJoined > 0 { + lastErr = nil + } + return successfullyJoined, lastErr } // Provides a dns-based member disovery to join a memberlist cluster w/o knowning members' addresses upfront. diff --git a/vendor/github.com/grafana/dskit/ring/batch.go b/vendor/github.com/grafana/dskit/ring/batch.go index 11e511e0c62..5acd8fd0086 100644 --- a/vendor/github.com/grafana/dskit/ring/batch.go +++ b/vendor/github.com/grafana/dskit/ring/batch.go @@ -49,32 +49,56 @@ func isHTTPStatus4xx(err error) bool { return code/100 == 4 } -// DoBatch is a special case of DoBatchWithClientError where errors -// containing HTTP status code 4xx are treated as client errors. +// DoBatch is a deprecated version of DoBatchWithOptions where grpc errors containing status codes 4xx are treated as client errors. +// Deprecated. Use DoBatchWithOptions instead. func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func()) error { - return DoBatchWithClientError(ctx, op, r, keys, callback, cleanup, isHTTPStatus4xx) + return DoBatchWithOptions(ctx, op, r, keys, callback, DoBatchOptions{ + Cleanup: cleanup, + IsClientError: isHTTPStatus4xx, + }) } -// DoBatchWithClientError request against a set of keys in the ring, -// handling replication and failures. For example if we want to write -// N items where they may all hit different instances, and we want them -// all replicated R ways with quorum writes, we track the relationship -// between batch RPCs and the items within them. -// -// callback() is passed the instance to target, and the indexes of the keys -// to send to that instance. -// -// cleanup() is always called, either on an error before starting the batches -// or after they all finish. +// DoBatchOptions defines options for the DoBatchWithOptions call. +// Zero value options are valid, as well as individual zero valued fields. +type DoBatchOptions struct { + // Cleanup is always called, either on an error before starting the batches or after they are all finished. + // If nil, a noop will be called. + Cleanup func() + + // IsClientError classifies errors returned by `callback()` into client or server errors. + // See `batchTracker.record()` function for details about how errors are combined into final error returned by DoBatchWithClientError. + // If nil, a default implementation is used that classifies grpc errors containing status codes 4xx as client errors. + IsClientError func(error) bool + + // Go will be used to spawn the callback goroutines, and can be used to use a worker pool like concurrency.ReusableGoroutinesPool. + Go func(func()) +} + +func (o *DoBatchOptions) replaceZeroValuesWithDefaults() { + if o.Cleanup == nil { + o.Cleanup = func() {} + } + if o.IsClientError == nil { + o.IsClientError = isHTTPStatus4xx + } + if o.Go == nil { + o.Go = func(f func()) { go f() } + } +} + +// DoBatchWithOptions request against a set of keys in the ring, handling replication and failures. +// For example if we want to write N items where they may all hit different instances, +// and we want them all replicated R ways with quorum writes, +// we track the relationship between batch RPCs and the items within them. // -// isClientError() classifies errors returned by `callback()` into client or -// server errors. See `batchTracker.record()` function for details about how -// errors are combined into final error returned by DoBatchWithClientError. +// See comments on DoBatchOptions for available options for this call. // // Not implemented as a method on Ring, so we can test separately. -func DoBatchWithClientError(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, cleanup func(), isClientError func(error) bool) error { +func DoBatchWithOptions(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(InstanceDesc, []int) error, o DoBatchOptions) error { + o.replaceZeroValuesWithDefaults() + if r.InstancesCount() <= 0 { - cleanup() + o.Cleanup() return fmt.Errorf("DoBatch: InstancesCount <= 0") } expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.InstancesCount() @@ -89,7 +113,7 @@ func DoBatchWithClientError(ctx context.Context, op Operation, r ReadRing, keys for i, key := range keys { replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0]) if err != nil { - cleanup() + o.Cleanup() return err } itemTrackers[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors @@ -120,19 +144,19 @@ func DoBatchWithClientError(ctx context.Context, op Operation, r ReadRing, keys wg.Add(len(instances)) for _, i := range instances { - go func(i instance) { + i := i + o.Go(func() { err := callback(i.desc, i.indexes) - tracker.record(i.itemTrackers, err, isClientError) + tracker.record(i.itemTrackers, err, o.IsClientError) wg.Done() - }(i) + }) } // Perform cleanup at the end. - go func() { + o.Go(func() { wg.Wait() - - cleanup() - }() + o.Cleanup() + }) select { case err := <-tracker.err: diff --git a/vendor/modules.txt b/vendor/modules.txt index ada12eca287..dfd223e0078 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -475,7 +475,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.5.0 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20231113165309-40e91edb6190 +# github.com/grafana/dskit v0.0.0-20231120170505-765e343eda4f ## explicit; go 1.20 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency