From f69474f242b43cde2cf45e6f49db7f7175bafb2b Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Tue, 2 May 2023 17:42:41 +0000 Subject: [PATCH] Add support for IPv6 (#1555) * Add support for IPv6 ring address detection * Update dskit dependency after inet6 merge Signed-off-by: Zach Leslie * Add withInet6 jsonnet method to microservices Signed-off-by: Zach Leslie * Update serverless vendor Signed-off-by: Zach Leslie * Update jsonnet overrides for ipv6 testing Signed-off-by: Zach Leslie * Drop unnecessary address spec Signed-off-by: Zach Leslie * Include configuration docs Signed-off-by: Zach Leslie * Update changelog --------- Signed-off-by: Zach Leslie --- CHANGELOG.md | 1 + cmd/tempo-serverless/cloud-run/go.mod | 4 +- cmd/tempo-serverless/cloud-run/go.sum | 8 +- cmd/tempo-serverless/lambda/go.mod | 4 +- cmd/tempo-serverless/lambda/go.sum | 8 +- docs/sources/tempo/configuration/ipv6.md | 89 +++++++++ go.mod | 6 +- go.sum | 12 +- modules/compactor/compactor_ring.go | 1 + modules/compactor/config.go | 9 +- modules/generator/generator_ring.go | 10 +- .../jsonnet/microservices/common.libsonnet | 50 +++++ pkg/cache/memcached_client.go | 4 +- pkg/util/net.go | 62 ------- .../grafana/dskit/crypto/tls/tls.go | 48 ++++- .../grafana/dskit/grpcclient/grpcclient.go | 5 +- .../github.com/grafana/dskit/kv/etcd/mock.go | 10 +- .../dskit/kv/memberlist/tcp_transport.go | 26 ++- .../grafana/dskit/limiter/rate_limiter.go | 9 + .../github.com/grafana/dskit/loser/loser.go | 174 ++++++++++++++++++ .../grafana/dskit/netutil/netutil.go | 164 +++++++++++++++++ vendor/github.com/grafana/dskit/ring/http.go | 2 +- .../grafana/dskit/ring/lifecycler.go | 35 +++- vendor/github.com/grafana/dskit/ring/model.go | 57 +----- vendor/github.com/grafana/dskit/ring/ring.go | 67 ++++++- vendor/github.com/grafana/dskit/ring/util.go | 60 +----- .../grafana/dskit/services/basic_service.go | 26 +-- .../grafana/dskit/services/service.go | 23 ++- .../grafana/dskit/tenant/resolver.go | 2 + .../github.com/grafana/dskit/tenant/tenant.go | 1 + .../grafana/gomemcache/memcache/memcache.go | 131 ++++++++++++- vendor/golang.org/x/exp/slices/slices.go | 45 ++++- vendor/golang.org/x/exp/slices/sort.go | 51 +++-- vendor/modules.txt | 7 +- 34 files changed, 912 insertions(+), 299 deletions(-) create mode 100644 docs/sources/tempo/configuration/ipv6.md delete mode 100644 pkg/util/net.go create mode 100644 vendor/github.com/grafana/dskit/loser/loser.go diff --git a/CHANGELOG.md b/CHANGELOG.md index bfdaa5807c8..867936c4d3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * [ENHANCEMENT] Add override to limit number of blocks inspected in tag value search [#2358](https://github.com/grafana/tempo/pull/2358) (@mapno) * [ENHANCEMENT] Add synchronous read mode to vParquet and vParquet2 optionally enabled by env vars [#2165](https://github.com/grafana/tempo/pull/2165) (@mdisibio) * [ENHANCEMENT] Add option to override metrics-generator ring port [#2399](https://github.com/grafana/tempo/pull/2399) (@mdisibio) +* [ENHANCEMENT] Add support for IPv6 [#1555](https://github.com/grafana/tempo/pull/1555) (@zalegrala) * [BUGFIX] tempodb integer divide by zero error [#2167](https://github.com/grafana/tempo/issues/2167) (@kroksys) * [CHANGE] **Breaking Change** Rename s3.insecure_skip_verify [#???](https://github.com/grafana/tempo/pull/???) (@zalegrala) ```yaml diff --git a/cmd/tempo-serverless/cloud-run/go.mod b/cmd/tempo-serverless/cloud-run/go.mod index 4595dfa6459..86e90dbc63d 100644 --- a/cmd/tempo-serverless/cloud-run/go.mod +++ b/cmd/tempo-serverless/cloud-run/go.mod @@ -55,8 +55,8 @@ require ( github.com/googleapis/gax-go/v2 v2.7.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2 // indirect - github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f // indirect + github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 // indirect + github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect diff --git a/cmd/tempo-serverless/cloud-run/go.sum b/cmd/tempo-serverless/cloud-run/go.sum index 205e09ba924..1cbe3fe5ef8 100644 --- a/cmd/tempo-serverless/cloud-run/go.sum +++ b/cmd/tempo-serverless/cloud-run/go.sum @@ -300,10 +300,10 @@ github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2 h1:NRaJoA3dr/JE6jFJi6RFAyHxCouTL7idPe+Y14EBsAQ= -github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2/go.mod h1:ulYLLoSd71AWIjxgifLO86Lndx82Yj+IcV+fFnh8tkI= -github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f h1:ANwIMe7kOiMNTK88tusoNDb840pWVskI4rCrdoMv5i0= -github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= +github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 h1:sMRNbvPdgXIBh3LMJeZ6ykxZE7kchj8vkvmjBypu43M= +github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1/go.mod h1:31wpEibXmd1yC7sUBw1ilN9dhWatwQwbcOAbZGtTr/M= +github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 h1:WB3bGH2f1UN6jkd6uAEWfHB8OD7dKJ0v2Oo6SNfhpfQ= +github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= diff --git a/cmd/tempo-serverless/lambda/go.mod b/cmd/tempo-serverless/lambda/go.mod index 2ab45cd4ad3..dc35c836bcd 100644 --- a/cmd/tempo-serverless/lambda/go.mod +++ b/cmd/tempo-serverless/lambda/go.mod @@ -58,8 +58,8 @@ require ( github.com/googleapis/gax-go/v2 v2.7.0 // indirect github.com/gorilla/handlers v1.5.1 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2 // indirect - github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f // indirect + github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 // indirect + github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect diff --git a/cmd/tempo-serverless/lambda/go.sum b/cmd/tempo-serverless/lambda/go.sum index d5dcfab6ba9..d01f56e0f9c 100644 --- a/cmd/tempo-serverless/lambda/go.sum +++ b/cmd/tempo-serverless/lambda/go.sum @@ -304,10 +304,10 @@ github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2 h1:NRaJoA3dr/JE6jFJi6RFAyHxCouTL7idPe+Y14EBsAQ= -github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2/go.mod h1:ulYLLoSd71AWIjxgifLO86Lndx82Yj+IcV+fFnh8tkI= -github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f h1:ANwIMe7kOiMNTK88tusoNDb840pWVskI4rCrdoMv5i0= -github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= +github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 h1:sMRNbvPdgXIBh3LMJeZ6ykxZE7kchj8vkvmjBypu43M= +github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1/go.mod h1:31wpEibXmd1yC7sUBw1ilN9dhWatwQwbcOAbZGtTr/M= +github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 h1:WB3bGH2f1UN6jkd6uAEWfHB8OD7dKJ0v2Oo6SNfhpfQ= +github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= diff --git a/docs/sources/tempo/configuration/ipv6.md b/docs/sources/tempo/configuration/ipv6.md new file mode 100644 index 00000000000..8a02a2a8194 --- /dev/null +++ b/docs/sources/tempo/configuration/ipv6.md @@ -0,0 +1,89 @@ +--- +title: Configure IPv6 +weight: 37 +--- + +# Configure IPv6 + +Tempo can be configured to communicate between the components using Internet Protocol Version 6, or IPv6. + +> Note: In order to support this support this configuration, the underlying infrastructure must support this address family. This configuration may be used in a single-stack IPv6 environment, or in a dual-stack environment where both IPv6 and IPv4 are present. In a dual-stack scenario, only one address family may be configured at a time, and all components must be configured for that address family. + +## Protocol configuration + +This sample listen configuration will allow the gRPC and HTTP servers to listen on IPv6, and configure the various memberlist components to enable IPv6. + +```yaml +memberlist: + bind_addr: + - '::' + bind_port: 7946 + +compactor: + ring: + kvstore: + store: memberlist + enable_inet6: true + +metrics_generator: + ring: + enable_inet6: true + +ingester: + lifecycler: + address: '::' + enable_inet6: true + +server: + grpc_listen_address: '[::0]' + grpc_listen_port: 9095 + http_listen_address: '[::0]' + http_listen_port: 3200 +``` + +## Kubernetes service configuration + +Each service fronting the workloads will need to be configured with with `spec.ipFamilies` and `spec.ipFamilyPolicy` set. See this `compactor` example. + +```yaml +apiVersion: v1 +kind: Service +metadata: + labels: + name: compactor + name: compactor + namespace: trace +spec: + clusterIP: fccb::31a7 + clusterIPs: + - fccb::31a7 + internalTrafficPolicy: Cluster + ipFamilies: + - IPv6 + ipFamilyPolicy: SingleStack + ports: + - name: compactor-http-metrics + port: 3200 + protocol: TCP + targetPort: 3200 + selector: + app: compactor + name: compactor + sessionAffinity: None + type: ClusterIP +``` + +You can check the listening service from within a pod. + +```sh +❯ k exec -it compactor-55c778b8d9-2kch2 -- sh +/ # apk add iproute2 +OK: 12 MiB in 27 packages +/ # ss -ltn -f inet +State Recv-Q Send-Q Local Address:Port Peer Address:Port Process +/ # ss -ltn -f inet6 +State Recv-Q Send-Q Local Address:Port Peer Address:Port Process +LISTEN 0 4096 *:7946 *:* +LISTEN 0 4096 *:9095 *:* +LISTEN 0 4096 *:3200 *:* +``` diff --git a/go.mod b/go.mod index b147a1b8964..863a0b4dd11 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/google/go-cmp v0.5.9 github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2 + github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 github.com/grafana/e2e v0.1.1-0.20221018202458-cffd2bb71c7b github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/go-hclog v1.4.0 @@ -101,11 +101,11 @@ require ( require ( github.com/Azure/go-autorest/autorest v0.11.28 github.com/googleapis/gax-go/v2 v2.7.0 - github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f + github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 go.opentelemetry.io/collector/exporter v0.74.0 go.opentelemetry.io/collector/receiver v0.74.0 - golang.org/x/exp v0.0.0-20221002003631-540bb7301a08 + golang.org/x/exp v0.0.0-20230321023759-10a507213a29 ) require ( diff --git a/go.sum b/go.sum index 02c01063d5c..3a6cb3621b0 100644 --- a/go.sum +++ b/go.sum @@ -480,12 +480,12 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2 h1:NRaJoA3dr/JE6jFJi6RFAyHxCouTL7idPe+Y14EBsAQ= -github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2/go.mod h1:ulYLLoSd71AWIjxgifLO86Lndx82Yj+IcV+fFnh8tkI= +github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 h1:sMRNbvPdgXIBh3LMJeZ6ykxZE7kchj8vkvmjBypu43M= +github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1/go.mod h1:31wpEibXmd1yC7sUBw1ilN9dhWatwQwbcOAbZGtTr/M= github.com/grafana/e2e v0.1.1-0.20221018202458-cffd2bb71c7b h1:Ha+kSIoTutf4ytlVw/SaEclDUloYx0+FXDKJWKhNbE4= github.com/grafana/e2e v0.1.1-0.20221018202458-cffd2bb71c7b/go.mod h1:3UsooRp7yW5/NJQBlXcTsAHOoykEhNUYXkQ3r6ehEEY= -github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f h1:ANwIMe7kOiMNTK88tusoNDb840pWVskI4rCrdoMv5i0= -github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= +github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 h1:WB3bGH2f1UN6jkd6uAEWfHB8OD7dKJ0v2Oo6SNfhpfQ= +github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU= github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ= github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 h1:A3dhViTeFDSQcGOXuUi6ukCQSMyDtDISBp2z6OOo2YM= @@ -1118,8 +1118,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20221002003631-540bb7301a08 h1:LtBIgSqNhkuC9gA3BFjGy5obHQT1lnmNsMDFSqWzQ5w= -golang.org/x/exp v0.0.0-20221002003631-540bb7301a08/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= +golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= diff --git a/modules/compactor/compactor_ring.go b/modules/compactor/compactor_ring.go index d0b88674c75..0688d95cdf7 100644 --- a/modules/compactor/compactor_ring.go +++ b/modules/compactor/compactor_ring.go @@ -31,6 +31,7 @@ type RingConfig struct { InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` + EnableInet6 bool `yaml:"enable_inet6"` // Injected internally ListenPort int `yaml:"-"` diff --git a/modules/compactor/config.go b/modules/compactor/config.go index 6a3fd525432..9a514369164 100644 --- a/modules/compactor/config.go +++ b/modules/compactor/config.go @@ -2,7 +2,8 @@ package compactor import ( "flag" - "fmt" + "net" + "strconv" "time" "github.com/go-kit/log" @@ -43,16 +44,18 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) } func toBasicLifecyclerConfig(cfg RingConfig, logger log.Logger) (ring.BasicLifecyclerConfig, error) { - instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger) + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, logger, cfg.EnableInet6) if err != nil { return ring.BasicLifecyclerConfig{}, err } instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + instanceAddrPort := net.JoinHostPort(instanceAddr, strconv.Itoa(instancePort)) + return ring.BasicLifecyclerConfig{ ID: cfg.InstanceID, - Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Addr: instanceAddrPort, HeartbeatPeriod: cfg.HeartbeatPeriod, NumTokens: ringNumTokens, }, nil diff --git a/modules/generator/generator_ring.go b/modules/generator/generator_ring.go index 3bfd91c3a0c..b48a87d65f3 100644 --- a/modules/generator/generator_ring.go +++ b/modules/generator/generator_ring.go @@ -2,8 +2,9 @@ package generator import ( "flag" - "fmt" + "net" "os" + "strconv" "time" "github.com/go-kit/log/level" @@ -23,6 +24,7 @@ type RingConfig struct { InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstanceAddr string `yaml:"instance_addr"` InstancePort int `yaml:"instance_port"` + EnableInet6 bool `yaml:"enable_inet6"` // Injected internally ListenPort int `yaml:"-"` @@ -57,7 +59,7 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { } func (cfg *RingConfig) toLifecyclerConfig() (ring.BasicLifecyclerConfig, error) { - instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, log.Logger) + instanceAddr, err := ring.GetInstanceAddr(cfg.InstanceAddr, cfg.InstanceInterfaceNames, log.Logger, cfg.EnableInet6) if err != nil { level.Error(log.Logger).Log("msg", "failed to get instance address", "err", err) return ring.BasicLifecyclerConfig{}, err @@ -65,9 +67,11 @@ func (cfg *RingConfig) toLifecyclerConfig() (ring.BasicLifecyclerConfig, error) instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) + instanceAddrPort := net.JoinHostPort(instanceAddr, strconv.Itoa(instancePort)) + return ring.BasicLifecyclerConfig{ ID: cfg.InstanceID, - Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Addr: instanceAddrPort, HeartbeatPeriod: cfg.HeartbeatPeriod, NumTokens: ringNumTokens, }, nil diff --git a/operations/jsonnet/microservices/common.libsonnet b/operations/jsonnet/microservices/common.libsonnet index df83eb199be..428c9f2a4f7 100644 --- a/operations/jsonnet/microservices/common.libsonnet +++ b/operations/jsonnet/microservices/common.libsonnet @@ -2,6 +2,7 @@ util+:: { local k = import 'ksonnet-util/kausal.libsonnet', local container = k.core.v1.container, + local service = k.core.v1.service, withResources(resources):: k.util.resourcesRequests(resources.requests.cpu, resources.requests.memory) + @@ -12,5 +13,54 @@ container.mixin.readinessProbe.httpGet.withPort($._config.port) + container.mixin.readinessProbe.withInitialDelaySeconds(15) + container.mixin.readinessProbe.withTimeoutSeconds(1), + + withInet6():: { + tempo_compactor_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + tempo_distributor_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + tempo_ingester_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + tempo_querier_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + tempo_query_frontend_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + tempo_query_frontend_discovery_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + tempo_metrics_generator_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + gossip_ring_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + ingest_service+: + service.mixin.spec.withIpFamilies(['IPv6']), + memcached+: { + service+: + service.mixin.spec.withIpFamilies(['IPv6']), + }, + tempo_config+:: { + server+: { + http_listen_address: '[::0]', + grpc_listen_address: '[::0]', + }, + ingester+: { + lifecycler+: { + enable_inet6: true, + }, + }, + memberlist+: { + bind_addr: ['::'], + }, + compactor+: { + ring+: { + enable_inet6: true, + }, + }, + metrics_generator+: { + ring+: { + enable_inet6: true, + }, + }, + }, + }, }, } diff --git a/pkg/cache/memcached_client.go b/pkg/cache/memcached_client.go index 62d3aedda4e..fed1607c918 100644 --- a/pkg/cache/memcached_client.go +++ b/pkg/cache/memcached_client.go @@ -3,9 +3,9 @@ package cache import ( "context" "flag" - "fmt" "net" "sort" + "strconv" "strings" "sync" "time" @@ -253,7 +253,7 @@ func (c *memcachedClient) updateMemcacheServers() error { return err } for _, srv := range addrs { - servers = append(servers, fmt.Sprintf("%s:%d", srv.Target, srv.Port)) + servers = append(servers, net.JoinHostPort(srv.Target, strconv.Itoa(int(srv.Port)))) } } diff --git a/pkg/util/net.go b/pkg/util/net.go deleted file mode 100644 index a238dd3d0c2..00000000000 --- a/pkg/util/net.go +++ /dev/null @@ -1,62 +0,0 @@ -package util - -import ( - "fmt" - "net" - "strings" - - "github.com/go-kit/log/level" - - util_log "github.com/grafana/tempo/pkg/util/log" -) - -// GetFirstAddressOf returns the first IPv4 address of the supplied interface names, omitting any 169.254.x.x automatic private IPs if possible. -func GetFirstAddressOf(names []string) (string, error) { - var ipAddr string - for _, name := range names { - inf, err := net.InterfaceByName(name) - if err != nil { - level.Warn(util_log.Logger).Log("msg", "error getting interface", "inf", name, "err", err) - continue - } - addrs, err := inf.Addrs() - if err != nil { - level.Warn(util_log.Logger).Log("msg", "error getting addresses for interface", "inf", name, "err", err) - continue - } - if len(addrs) <= 0 { - level.Warn(util_log.Logger).Log("msg", "no addresses found for interface", "inf", name, "err", err) - continue - } - if ip := filterIPs(addrs); ip != "" { - ipAddr = ip - } - if strings.HasPrefix(ipAddr, `169.254.`) || ipAddr == "" { - continue - } - return ipAddr, nil - } - if ipAddr == "" { - return "", fmt.Errorf("No address found for %s", names) - } - if strings.HasPrefix(ipAddr, `169.254.`) { - level.Warn(util_log.Logger).Log("msg", "using automatic private ip", "address", ipAddr) - } - return ipAddr, nil -} - -// filterIPs attempts to return the first non automatic private IP (APIPA / 169.254.x.x) if possible, only returning APIPA if available and no other valid IP is found. -func filterIPs(addrs []net.Addr) string { - var ipAddr string - for _, addr := range addrs { - if v, ok := addr.(*net.IPNet); ok { - if ip := v.IP.To4(); ip != nil { - ipAddr = v.IP.String() - if !strings.HasPrefix(ipAddr, `169.254.`) { - return ipAddr - } - } - } - } - return ipAddr -} diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go index ba4a0da9c9f..7ed818f399a 100644 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ b/vendor/github.com/grafana/dskit/crypto/tls/tls.go @@ -8,13 +8,22 @@ import ( "os" "strings" - "google.golang.org/grpc/credentials/insecure" - "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" ) +type SecretReader interface { + ReadSecret(path string) ([]byte, error) +} + +type fileReader struct{} + +func (f *fileReader) ReadSecret(path string) ([]byte, error) { + return os.ReadFile(path) +} + // ClientConfig is the config for client TLS. type ClientConfig struct { CertPath string `yaml:"tls_cert_path" category:"advanced"` @@ -24,6 +33,8 @@ type ClientConfig struct { InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify" category:"advanced"` CipherSuites string `yaml:"tls_cipher_suites" category:"advanced" doc:"description_method=GetTLSCipherSuitesLongDescription"` MinVersion string `yaml:"tls_min_version" category:"advanced"` + + Reader SecretReader `yaml:"-"` } var ( @@ -40,9 +51,12 @@ var ( // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.CertPath, prefix+".tls-cert-path", "", "Path to the client certificate file, which will be used for authenticating with the server. Also requires the key path to be configured.") - f.StringVar(&cfg.KeyPath, prefix+".tls-key-path", "", "Path to the key file for the client certificate. Also requires the client certificate to be configured.") - f.StringVar(&cfg.CAPath, prefix+".tls-ca-path", "", "Path to the CA certificates file to validate server certificate against. If not set, the host's root CA certificates are used.") + // Trim any trailing "." since we include our own here + prefix = strings.TrimRight(prefix, ".") + + f.StringVar(&cfg.CertPath, prefix+".tls-cert-path", "", "Path to the client certificate, which will be used for authenticating with the server. Also requires the key path to be configured.") + f.StringVar(&cfg.KeyPath, prefix+".tls-key-path", "", "Path to the key for the client certificate. Also requires the client certificate to be configured.") + f.StringVar(&cfg.CAPath, prefix+".tls-ca-path", "", "Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.") f.StringVar(&cfg.ServerName, prefix+".tls-server-name", "", "Override the expected name on the server certificate.") f.BoolVar(&cfg.InsecureSkipVerify, prefix+".tls-insecure-skip-verify", false, "Skip validating server certificate.") f.StringVar(&cfg.CipherSuites, prefix+".tls-cipher-suites", "", cfg.GetTLSCipherSuitesShortDescription()) @@ -76,10 +90,16 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { ServerName: cfg.ServerName, } - // read ca certificates + // If Reader interface not provided, default to reading from File + reader := cfg.Reader + if reader == nil { + reader = &fileReader{} + } + + // Read CA Certificates if cfg.CAPath != "" { var caCertPool *x509.CertPool - caCert, err := os.ReadFile(cfg.CAPath) + caCert, err := reader.ReadSecret(cfg.CAPath) if err != nil { return nil, errors.Wrapf(err, "error loading ca cert: %s", cfg.CAPath) } @@ -89,7 +109,7 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { config.RootCAs = caCertPool } - // read client certificate + // Read Client Certificate if cfg.CertPath != "" || cfg.KeyPath != "" { if cfg.CertPath == "" { return nil, errCertMissing @@ -97,7 +117,17 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { if cfg.KeyPath == "" { return nil, errKeyMissing } - clientCert, err := tls.LoadX509KeyPair(cfg.CertPath, cfg.KeyPath) + + cert, err := reader.ReadSecret(cfg.CertPath) + if err != nil { + return nil, errors.Wrapf(err, "error loading client cert: %s", cfg.CertPath) + } + key, err := reader.ReadSecret(cfg.KeyPath) + if err != nil { + return nil, errors.Wrapf(err, "error loading client key: %s", cfg.KeyPath) + } + + clientCert, err := tls.X509KeyPair(cert, key) if err != nil { return nil, errors.Wrapf(err, "failed to load TLS certificate %s,%s", cfg.CertPath, cfg.KeyPath) } diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 284b64a3175..9e6ef613926 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -5,7 +5,6 @@ import ( "time" "github.com/go-kit/log" - middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" @@ -92,8 +91,8 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep return append( opts, grpc.WithDefaultCallOptions(cfg.CallOptions()...), - grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)), - grpc.WithStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)), + grpc.WithChainUnaryInterceptor(unaryClientInterceptors...), + grpc.WithChainStreamInterceptor(streamClientInterceptors...), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Second * 20, Timeout: time.Second * 10, diff --git a/vendor/github.com/grafana/dskit/kv/etcd/mock.go b/vendor/github.com/grafana/dskit/kv/etcd/mock.go index b7ee2764558..6349cee1c28 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/mock.go +++ b/vendor/github.com/grafana/dskit/kv/etcd/mock.go @@ -62,11 +62,11 @@ func newMockKV() *mockKV { // // Known limitations: // -// * Compact is not implemented and will panic -// * RequestProgress is not implemented and will panic -// * Only exact and prefix matching is supported for Get, Put, and Delete -// * There may be inconsistencies with how various version numbers are adjusted -// but none that are exposed by kv.Client unit tests +// - Compact is not implemented and will panic +// - RequestProgress is not implemented and will panic +// - Only exact and prefix matching is supported for Get, Put, and Delete +// - There may be inconsistencies with how various version numbers are adjusted +// but none that are exposed by kv.Client unit tests type mockKV struct { // Key-value pairs created by put calls or transactions values map[string]mvccpb.KeyValue diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index f0b8846fed5..78fe1963230 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -14,7 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/hashicorp/go-sockaddr" + sockaddr "github.com/hashicorp/go-sockaddr" "github.com/hashicorp/memberlist" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +23,7 @@ import ( dstls "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/netutil" ) type messageType uint8 @@ -34,6 +35,7 @@ const ( ) const zeroZeroZeroZero = "0.0.0.0" +const colonColon = "::" // TCPTransportConfig is a configuration structure for creating new TCPTransport. type TCPTransportConfig struct { @@ -358,13 +360,11 @@ func (t *TCPTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, err return nil, 0, fmt.Errorf("failed to parse advertise address %q", ip) } - // Ensure IPv4 conversion if necessary. - if ip4 := advertiseAddr.To4(); ip4 != nil { - advertiseAddr = ip4 - } advertisePort = port } else { - if t.cfg.BindAddrs[0] == zeroZeroZeroZero { + + switch t.cfg.BindAddrs[0] { + case zeroZeroZeroZero: // Otherwise, if we're not bound to a specific IP, let's // use a suitable private IP address. var err error @@ -378,9 +378,19 @@ func (t *TCPTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, err advertiseAddr = net.ParseIP(ip) if advertiseAddr == nil { - return nil, 0, fmt.Errorf("failed to parse advertise address: %q", ip) + return nil, 0, fmt.Errorf("failed to parse advertise address %q", ip) } - } else { + case colonColon: + inet6Ip, err := netutil.GetFirstAddressOf(nil, t.logger, true) + if err != nil { + return nil, 0, fmt.Errorf("failed to get private inet6 address: %w", err) + } + + advertiseAddr = net.ParseIP(inet6Ip) + if advertiseAddr == nil { + return nil, 0, fmt.Errorf("failed to parse inet6 advertise address %q", ip) + } + default: // Use the IP that we're bound to, based on the first // TCP listener, which we already ensure is there. advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP diff --git a/vendor/github.com/grafana/dskit/limiter/rate_limiter.go b/vendor/github.com/grafana/dskit/limiter/rate_limiter.go index 48fc6a42623..d14a267e3ae 100644 --- a/vendor/github.com/grafana/dskit/limiter/rate_limiter.go +++ b/vendor/github.com/grafana/dskit/limiter/rate_limiter.go @@ -1,6 +1,7 @@ package limiter import ( + "context" "sync" "time" @@ -47,6 +48,14 @@ func (l *RateLimiter) AllowN(now time.Time, tenantID string, n int) bool { return l.getTenantLimiter(now, tenantID).AllowN(now, n) } +// WaitN blocks until n events are allowed to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (l *RateLimiter) WaitN(ctx context.Context, tenantID string, n int) error { + return l.getTenantLimiter(time.Now(), tenantID).WaitN(ctx, n) +} + // Limit returns the currently configured maximum overall tokens rate. func (l *RateLimiter) Limit(now time.Time, tenantID string) float64 { return float64(l.getTenantLimiter(now, tenantID).Limit()) diff --git a/vendor/github.com/grafana/dskit/loser/loser.go b/vendor/github.com/grafana/dskit/loser/loser.go new file mode 100644 index 00000000000..c331eacd0c5 --- /dev/null +++ b/vendor/github.com/grafana/dskit/loser/loser.go @@ -0,0 +1,174 @@ +// Loser tree, from https://en.wikipedia.org/wiki/K-way_merge_algorithm#Tournament_Tree + +package loser + +import "golang.org/x/exp/constraints" + +func New[E constraints.Ordered](lists [][]E, maxVal E) *Tree[E] { + nLists := len(lists) + t := Tree[E]{ + maxVal: maxVal, + nodes: make([]node[E], nLists*2), + } + for i, s := range lists { + t.nodes[i+nLists].items = s + t.moveNext(i + nLists) // Must call Next on each item so that At() has a value. + } + if nLists > 0 { + t.nodes[0].index = -1 // flag to be initialized on first call to Next(). + } + return &t +} + +// A loser tree is a binary tree laid out such that nodes N and N+1 have parent N/2. +// We store M leaf nodes in positions M...2M-1, and M-1 internal nodes in positions 1..M-1. +// Node 0 is a special node, containing the winner of the contest. +type Tree[E constraints.Ordered] struct { + maxVal E + nodes []node[E] +} + +type node[E constraints.Ordered] struct { + index int // This is the loser for all nodes except the 0th, where it is the winner. + value E // Value copied from the loser node, or winner for node 0. + items []E // Only populated for leaf nodes. +} + +func (t *Tree[E]) moveNext(index int) bool { + n := &t.nodes[index] + if len(n.items) > 0 { + n.value = n.items[0] + n.items = n.items[1:] + return true + } + n.value = t.maxVal + n.index = -1 + return false +} + +func (t *Tree[E]) Winner() E { + return t.nodes[t.nodes[0].index].value +} + +func (t *Tree[E]) Next() bool { + if len(t.nodes) == 0 { + return false + } + if t.nodes[0].index == -1 { // If tree has not been initialized yet, do that. + t.initialize() + return t.nodes[t.nodes[0].index].index != -1 + } + if t.nodes[t.nodes[0].index].index == -1 { // already exhausted + return false + } + if t.moveNext(t.nodes[0].index) { + t.replayGames(t.nodes[0].index) + } else { + t.sequenceEnded(t.nodes[0].index) + } + return t.nodes[t.nodes[0].index].index != -1 +} + +func (t *Tree[E]) initialize() { + winners := make([]int, len(t.nodes)) + // Initialize leaf nodes as winners to start. + for i := len(t.nodes) / 2; i < len(t.nodes); i++ { + winners[i] = i + } + for i := len(t.nodes) - 2; i > 0; i -= 2 { + // At each stage the winners play each other, and we record the loser in the node. + loser, winner := t.playGame(winners[i], winners[i+1]) + p := parent(i) + t.nodes[p].index = loser + t.nodes[p].value = t.nodes[loser].value + winners[p] = winner + } + t.nodes[0].index = winners[1] + t.nodes[0].value = t.nodes[winners[1]].value +} + +// Starting at pos, which is a winner, re-consider all values up to the root. +func (t *Tree[E]) replayGames(pos int) { + // At the start, pos is a leaf node, and is the winner at that level. + n := parent(pos) + for n != 0 { + // If n.value < pos.value then pos loses. + // If they are equal, pos wins because n could be a sequence that ended, with value maxval. + if t.nodes[n].value < t.nodes[pos].value { + loser := pos + // Record pos as the loser here, and the old loser is the new winner. + pos = t.nodes[n].index + t.nodes[n].index = loser + t.nodes[n].value = t.nodes[loser].value + } + n = parent(n) + } + // pos is now the winner; store it in node 0. + t.nodes[0].index = pos + t.nodes[0].value = t.nodes[pos].value +} + +func (t *Tree[E]) sequenceEnded(pos int) { + // Find the first active sequence which used to lose to it. + n := parent(pos) + for n != 0 && t.nodes[t.nodes[n].index].index == -1 { + n = parent(n) + } + if n == 0 { + // There are no active sequences left + t.nodes[0].index = pos + t.nodes[0].value = t.maxVal + return + } + + // Record pos as the loser here, and the old loser is the new winner. + loser := pos + winner := t.nodes[n].index + t.nodes[n].index = loser + t.nodes[n].value = t.nodes[loser].value + t.replayGames(winner) +} + +func (t *Tree[E]) playGame(a, b int) (loser, winner int) { + if t.nodes[a].value < t.nodes[b].value { + return b, a + } + return a, b +} + +func parent(i int) int { return i / 2 } + +// Add a new list to the merge set +func (t *Tree[E]) Push(list []E) { + // First, see if we can replace one that was previously finished. + for newPos := len(t.nodes) / 2; newPos < len(t.nodes); newPos++ { + if t.nodes[newPos].index == -1 { + t.nodes[newPos].index = newPos + t.nodes[newPos].items = list + t.moveNext(newPos) + t.nodes[0].index = -1 // flag for re-initialize on next call to Next() + return + } + } + // We need to expand the tree. Pick the next biggest power of 2 to amortise resizing cost. + size := 1 + for ; size <= len(t.nodes)/2; size *= 2 { + } + newPos := size + len(t.nodes)/2 + newNodes := make([]node[E], size*2) + // Copy data over and fix up the indexes. + for i, n := range t.nodes[len(t.nodes)/2:] { + newNodes[i+size] = n + newNodes[i+size].index = i + size + } + t.nodes = newNodes + t.nodes[newPos].index = newPos + t.nodes[newPos].items = list + // Mark all the empty nodes we have added as finished. + for i := newPos + 1; i < len(t.nodes); i++ { + t.nodes[i].index = -1 + t.nodes[i].value = t.maxVal + } + t.moveNext(newPos) + t.nodes[0].index = -1 // flag for re-initialize on next call to Next() +} diff --git a/vendor/github.com/grafana/dskit/netutil/netutil.go b/vendor/github.com/grafana/dskit/netutil/netutil.go index 3803c0df711..2711a0f79d0 100644 --- a/vendor/github.com/grafana/dskit/netutil/netutil.go +++ b/vendor/github.com/grafana/dskit/netutil/netutil.go @@ -1,10 +1,13 @@ package netutil import ( + "fmt" "net" + "net/netip" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/pkg/errors" ) var ( @@ -54,3 +57,164 @@ func privateNetworkInterfaces(all []net.Interface, fallback []string, logger log } return privInts } + +// GetFirstAddressOf returns the first IPv4/IPV6 address of the supplied interface names, omitting any link-local addresses. +func GetFirstAddressOf(names []string, logger log.Logger, enableInet6 bool) (string, error) { + return getFirstAddressOf(names, logger, getInterfaceAddresses, enableInet6) +} + +// NetworkInterfaceAddressGetter matches the signature of net.InterfaceByName() to allow for test mocks. +type NetworkInterfaceAddressGetter func(name string) ([]netip.Addr, error) + +// getFirstAddressOf returns the first IPv4/IPV6 address of the supplied interface names, omitting any link-local addresses. +func getFirstAddressOf(names []string, logger log.Logger, interfaceAddrsFunc NetworkInterfaceAddressGetter, enableInet6 bool) (string, error) { + var ipAddr netip.Addr + + // When passing an empty list of interface names, we select all interfaces. + if len(names) == 0 { + infs, err := net.Interfaces() + if err != nil { + return "", fmt.Errorf("failed to get interface list and no interface names supplied: %w", err) + } + names = make([]string, len(infs)) + for i, v := range infs { + names[i] = v.Name + } + } + + level.Debug(logger).Log("msg", "looking for addresses", "inf", fmt.Sprintf("%s", names), "inet6enabled", enableInet6) + + for _, name := range names { + addrs, err := interfaceAddrsFunc(name) + if err != nil { + level.Warn(logger).Log("msg", "error getting addresses for interface", "inf", name, "err", err) + continue + } + + if len(addrs) <= 0 { + level.Warn(logger).Log("msg", "no addresses found for interface", "inf", name, "err", err) + continue + } + if ip := filterBestIP(addrs, enableInet6); ip.IsValid() { + // Select the best between what we've received + ipAddr = filterBestIP([]netip.Addr{ip, ipAddr}, enableInet6) + } + level.Debug(logger).Log("msg", "detected highest quality address", "ipAddr", ipAddr.String(), "inf", name) + if ipAddr.IsLinkLocalUnicast() || !ipAddr.IsValid() { + level.Debug(logger).Log("msg", "ignoring address", "ipAddr", ipAddr.String(), "inf", name) + continue + } + + if enableInet6 { + if ipAddr.Is6() { + return ipAddr.String(), nil + } + continue + } + + return ipAddr.String(), nil + } + + level.Debug(logger).Log("msg", "detected IP address after looking up all configured interface names", "ipAddr", ipAddr.String()) + if !ipAddr.IsValid() { + return "", fmt.Errorf("no useable address found for interfaces %s", names) + } + if ipAddr.IsLinkLocalUnicast() { + level.Warn(logger).Log("msg", "using link-local address", "address", ipAddr.String()) + } + return ipAddr.String(), nil +} + +// getInterfaceAddresses is the standard approach to collecting []net.Addr from a network interface by name. +func getInterfaceAddresses(name string) ([]netip.Addr, error) { + inf, err := net.InterfaceByName(name) + if err != nil { + return nil, err + } + + addrs, err := inf.Addrs() + if err != nil { + return nil, err + } + + // Using netip.Addr to allow for easier and consistent address parsing. + // Without this, the net.ParseCIDR() that we might like to use in a test does + // not have the same net.Addr implementation that we get from calling + // interface.Addrs() as above. Here we normalize on netip.Addr. + netaddrs := make([]netip.Addr, len(addrs)) + for i, a := range addrs { + prefix, err := netip.ParsePrefix(a.String()) + if err != nil { + return nil, errors.Wrap(err, "failed to parse netip.Prefix") + } + netaddrs[i] = prefix.Addr() + } + + return netaddrs, nil +} + +// filterBestIP returns an opinionated "best" address from a list of addresses. +// A high quality address is one that is considered routable, and not in the link-local address space. +// A low quality address is a link-local address. +// When an IPv6 is enabled using enableInet6, an IPv6 will be preferred over an equivalent quality IPv4 address, +// otherwise IPv6 addresses are guaranteed to not be returned from this function. +// Loopback addresses are never selected. +func filterBestIP(addrs []netip.Addr, enableInet6 bool) netip.Addr { + var invalid, inet4Addr, inet6Addr netip.Addr + + for _, addr := range addrs { + if addr.IsLoopback() || !addr.IsValid() { + continue + } + + if addr.Is6() && !enableInet6 { + continue + } + + if addr.Is4() { + // If we have already been set, can we improve on the quality? + if inet4Addr.IsValid() { + if inet4Addr.IsLinkLocalUnicast() && !addr.IsLinkLocalUnicast() { + inet4Addr = addr + } + continue + } + inet4Addr = addr + } + + if addr.Is6() { + // If we have already been set, can we improve on the quality? + if inet6Addr.IsValid() { + if inet6Addr.IsLinkLocalUnicast() && !addr.IsLinkLocalUnicast() { + inet6Addr = addr + } + continue + } + inet6Addr = addr + } + } + + // If both address families have been set, compare. + if inet4Addr.IsValid() && inet6Addr.IsValid() { + if inet4Addr.IsLinkLocalUnicast() && !inet6Addr.IsLinkLocalUnicast() { + return inet6Addr + } + if inet6Addr.IsLinkLocalUnicast() && !inet4Addr.IsLinkLocalUnicast() { + return inet4Addr + } + if enableInet6 { + return inet6Addr + } + return inet4Addr + } + + if inet4Addr.IsValid() { + return inet4Addr + } + + if inet6Addr.IsValid() { + return inet6Addr + } + + return invalid +} diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index 6521ca205ec..e70b3e6f0a1 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -93,7 +93,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - ownedTokens := ringDesc.countTokens() + ownedTokens := ringDesc.CountTokens() var ingesterIDs []string for id := range ringDesc.Ingesters { diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index dfef6afb642..7f19868e130 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -4,9 +4,11 @@ import ( "context" "flag" "fmt" + "net" "net/http" "os" "sort" + strconv "strconv" "sync" "time" @@ -34,6 +36,7 @@ type LifecyclerConfig struct { JoinAfter time.Duration `yaml:"join_after" category:"advanced"` MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` InfNames []string `yaml:"interface_names" doc:"default=[]"` + EnableInet6 bool `yaml:"enable_inet6" category:"advanced"` // FinalSleep's default value can be overridden by // setting it before calling RegisterFlags or RegisterFlagsWithPrefix. @@ -91,6 +94,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone where this instance is running.") f.BoolVar(&cfg.UnregisterOnShutdown, prefix+"unregister-on-shutdown", true, "Unregister from the ring upon clean shutdown. It can be useful to disable for rolling restarts with consistent naming in conjunction with -distributor.extend-writes=false.") f.BoolVar(&cfg.ReadinessCheckRingHealth, prefix+"readiness-check-ring-health", true, "When enabled the readiness probe succeeds only after all instances are ACTIVE and healthy in the ring, otherwise only the instance itself is checked. This option should be disabled if in your cluster multiple instances can be rolled out simultaneously, otherwise rolling updates may be slowed down.") + f.BoolVar(&cfg.EnableInet6, prefix+"enable-inet6", false, "Enable IPv6 support. Required to make use of IP addresses from IPv6 interfaces.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. @@ -130,6 +134,8 @@ type Lifecycler struct { // Keeps stats updated at every heartbeat period countersLock sync.RWMutex healthyInstancesCount int + instancesCount int + instancesInZoneCount int zonesCount int lifecyclerMetrics *LifecyclerMetrics @@ -138,7 +144,7 @@ type Lifecycler struct { // NewLifecycler creates new Lifecycler. It must be started via StartAsync. func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool, logger log.Logger, reg prometheus.Registerer) (*Lifecycler, error) { - addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames, logger) + addr, err := GetInstanceAddr(cfg.Addr, cfg.InfNames, logger, cfg.EnableInet6) if err != nil { return nil, err } @@ -165,7 +171,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa cfg: cfg, flushTransferer: flushTransferer, KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), + Addr: net.JoinHostPort(addr, strconv.Itoa(port)), ID: cfg.ID, RingName: ringName, RingKey: ringKey, @@ -383,6 +389,23 @@ func (i *Lifecycler) HealthyInstancesCount() int { return i.healthyInstancesCount } +// InstancesCount returns the total number of instances in the ring, updated during the last heartbeat period. +func (i *Lifecycler) InstancesCount() int { + i.countersLock.RLock() + defer i.countersLock.RUnlock() + + return i.instancesCount +} + +// InstancesInZoneCount returns the number of instances in the ring that are registered in +// this lifecycler's zone, updated during the last heartbeat period. +func (i *Lifecycler) InstancesInZoneCount() int { + i.countersLock.RLock() + defer i.countersLock.RUnlock() + + return i.instancesInZoneCount +} + // ZonesCount returns the number of zones for which there's at least 1 instance registered // in the ring. func (i *Lifecycler) ZonesCount() int { @@ -795,13 +818,15 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error func (i *Lifecycler) updateCounters(ringDesc *Desc) { healthyInstancesCount := 0 - zones := map[string]struct{}{} + instancesCount := 0 + zones := map[string]int{} if ringDesc != nil { now := time.Now() for _, ingester := range ringDesc.Ingesters { - zones[ingester.Zone] = struct{}{} + zones[ingester.Zone]++ + instancesCount++ // Count the number of healthy instances for Write operation. if ingester.IsHealthy(Write, i.cfg.RingConfig.HeartbeatTimeout, now) { @@ -813,6 +838,8 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { // Update counters i.countersLock.Lock() i.healthyInstancesCount = healthyInstancesCount + i.instancesCount = instancesCount + i.instancesInZoneCount = zones[i.cfg.Zone] i.zonesCount = len(zones) i.countersLock.Unlock() } diff --git a/vendor/github.com/grafana/dskit/ring/model.go b/vendor/github.com/grafana/dskit/ring/model.go index fd58e534cb7..658160b881b 100644 --- a/vendor/github.com/grafana/dskit/ring/model.go +++ b/vendor/github.com/grafana/dskit/ring/model.go @@ -1,12 +1,14 @@ package ring import ( - "container/heap" "fmt" + "math" "sort" "sync" "time" + "github.com/grafana/dskit/loser" + "github.com/gogo/protobuf/proto" "github.com/grafana/dskit/kv/codec" @@ -590,68 +592,21 @@ func GetOrCreateRingDesc(d interface{}) *Desc { return d.(*Desc) } -// TokensHeap is an heap data structure used to merge multiple lists -// of sorted tokens into a single one. -type TokensHeap [][]uint32 - -func (h TokensHeap) Len() int { - return len(h) -} - -func (h TokensHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h TokensHeap) Less(i, j int) bool { - return h[i][0] < h[j][0] -} - -func (h *TokensHeap) Push(x interface{}) { - *h = append(*h, x.([]uint32)) -} - -func (h *TokensHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - // MergeTokens takes in input multiple lists of tokens and returns a single list // containing all tokens merged and sorted. Each input single list is required // to have tokens already sorted. func MergeTokens(instances [][]uint32) []uint32 { numTokens := 0 - // Build the heap. - h := make(TokensHeap, 0, len(instances)) for _, tokens := range instances { - if len(tokens) == 0 { - continue - } - - // We can safely append the input slice because elements inside are never shuffled. - h = append(h, tokens) numTokens += len(tokens) } - heap.Init(&h) + tree := loser.New(instances, math.MaxUint32) out := make([]uint32, 0, numTokens) - for h.Len() > 0 { - // The minimum element in the tree is the root, at index 0. - lowest := h[0] - out = append(out, lowest[0]) - - if len(lowest) > 1 { - // Remove the first token from the lowest because we popped it - // and then fix the heap to keep it sorted. - h[0] = h[0][1:] - heap.Fix(&h, 0) - } else { - heap.Remove(&h, 0) - } + for tree.Next() { + out = append(out, tree.Winner()) } return out diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 493e6ac14dd..6466231207d 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -6,7 +6,6 @@ import ( "context" "flag" "fmt" - "math" "math/rand" "net/http" @@ -507,10 +506,10 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro }, nil } -// countTokens returns the number tokens within the range for each instance. -func (r *Desc) countTokens() map[string]uint32 { +// CountTokens returns the number tokens within the range for each instance. +func (r *Desc) CountTokens() map[string]uint32 { var ( - owned = map[string]uint32{} + owned = make(map[string]uint32, len(r.Ingesters)) ringTokens = r.GetTokens() ringInstanceByToken = r.getTokensInfo() ) @@ -519,10 +518,11 @@ func (r *Desc) countTokens() map[string]uint32 { var diff uint32 // Compute how many tokens are within the range. - if i+1 == len(ringTokens) { - diff = (math.MaxUint32 - token) + ringTokens[0] + if i == 0 { + lastToken := ringTokens[len(ringTokens)-1] + diff = token + (math.MaxUint32 - lastToken) } else { - diff = ringTokens[i+1] - token + diff = token - ringTokens[i-1] } info := ringInstanceByToken[token] @@ -731,12 +731,13 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // Build a read-only ring for the shard. shardDesc := &Desc{Ingesters: shard} shardTokensByZone := shardDesc.getTokensByZone() + shardTokens := mergeTokenGroups(shardTokensByZone) return &Ring{ cfg: r.cfg, strategy: r.strategy, ringDesc: shardDesc, - ringTokens: shardDesc.GetTokens(), + ringTokens: shardTokens, ringTokensByZone: shardTokensByZone, ringZones: getZones(shardTokensByZone), @@ -752,6 +753,56 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur } } +// mergeTokenGroups returns a sorted list of all tokens in each entry in groupsByName. +// Each element of groupsByName is assumed to already be sorted. +func mergeTokenGroups(groupsByName map[string][]uint32) []uint32 { + tokenCount := 0 + groupsByIndex := make([][]uint32, 0, len(groupsByName)) + nextIndex := make([]int, 0, len(groupsByName)) + + for _, group := range groupsByName { + // If there's only one group, there's nothing to merge. + if len(groupsByName) == 1 { + return group + } + + tokenCount += len(group) + groupsByIndex = append(groupsByIndex, group) + nextIndex = append(nextIndex, 0) + } + + merged := make([]uint32, 0, tokenCount) + + for i := 0; i < tokenCount; i++ { + haveSeenGroupWithRemainingToken := false + lowestToken := uint32(0) + lowestTokenGroupIndex := 0 + + for groupIndex, group := range groupsByIndex { + nextIndexInGroup := nextIndex[groupIndex] + + if nextIndexInGroup >= len(group) { + continue + } + + if group[nextIndexInGroup] < lowestToken || !haveSeenGroupWithRemainingToken { + lowestToken = group[nextIndexInGroup] + lowestTokenGroupIndex = groupIndex + haveSeenGroupWithRemainingToken = true + } + } + + if !haveSeenGroupWithRemainingToken { + return merged + } + + merged = append(merged, lowestToken) + nextIndex[lowestTokenGroupIndex]++ + } + + return merged +} + // GetInstanceState returns the current state of an instance or an error if the // instance does not exist in the ring. func (r *Ring) GetInstanceState(instanceID string) (InstanceState, error) { diff --git a/vendor/github.com/grafana/dskit/ring/util.go b/vendor/github.com/grafana/dskit/ring/util.go index 17eb39833d5..889fbe5ba7c 100644 --- a/vendor/github.com/grafana/dskit/ring/util.go +++ b/vendor/github.com/grafana/dskit/ring/util.go @@ -2,17 +2,14 @@ package ring import ( "context" - "fmt" "math/rand" - "net" "sort" - "strings" "time" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/netutil" ) // GenerateTokens make numTokens unique random tokens, none of which clash @@ -50,12 +47,12 @@ func GenerateTokens(numTokens int, takenTokens []uint32) []uint32 { // GetInstanceAddr returns the address to use to register the instance // in the ring. -func GetInstanceAddr(configAddr string, netInterfaces []string, logger log.Logger) (string, error) { +func GetInstanceAddr(configAddr string, netInterfaces []string, logger log.Logger, enableInet6 bool) (string, error) { if configAddr != "" { return configAddr, nil } - addr, err := getFirstAddressOf(netInterfaces, logger) + addr, err := netutil.GetFirstAddressOf(netInterfaces, logger, enableInet6) if err != nil { return "", err } @@ -171,54 +168,3 @@ func searchToken(tokens []uint32, key uint32) int { } return i } - -// GetFirstAddressOf returns the first IPv4 address of the supplied interface names, omitting any 169.254.x.x automatic private IPs if possible. -func getFirstAddressOf(names []string, logger log.Logger) (string, error) { - var ipAddr string - for _, name := range names { - inf, err := net.InterfaceByName(name) - if err != nil { - level.Warn(logger).Log("msg", "error getting interface", "inf", name, "err", err) - continue - } - addrs, err := inf.Addrs() - if err != nil { - level.Warn(logger).Log("msg", "error getting addresses for interface", "inf", name, "err", err) - continue - } - if len(addrs) <= 0 { - level.Warn(logger).Log("msg", "no addresses found for interface", "inf", name, "err", err) - continue - } - if ip := filterIPs(addrs); ip != "" { - ipAddr = ip - } - if strings.HasPrefix(ipAddr, `169.254.`) || ipAddr == "" { - continue - } - return ipAddr, nil - } - if ipAddr == "" { - return "", fmt.Errorf("No address found for %s", names) - } - if strings.HasPrefix(ipAddr, `169.254.`) { - level.Warn(logger).Log("msg", "using automatic private ip", "address", ipAddr) - } - return ipAddr, nil -} - -// filterIPs attempts to return the first non automatic private IP (APIPA / 169.254.x.x) if possible, only returning APIPA if available and no other valid IP is found. -func filterIPs(addrs []net.Addr) string { - var ipAddr string - for _, addr := range addrs { - if v, ok := addr.(*net.IPNet); ok { - if ip := v.IP.To4(); ip != nil { - ipAddr = v.IP.String() - if !strings.HasPrefix(ipAddr, `169.254.`) { - return ipAddr - } - } - } - } - return ipAddr -} diff --git a/vendor/github.com/grafana/dskit/services/basic_service.go b/vendor/github.com/grafana/dskit/services/basic_service.go index 6ced33aabf9..185a4a10a23 100644 --- a/vendor/github.com/grafana/dskit/services/basic_service.go +++ b/vendor/github.com/grafana/dskit/services/basic_service.go @@ -255,20 +255,20 @@ func (b *BasicService) StopAsync() { // // Example: // -// func (s *exampleService) Send(msg string) bool { -// ctx := s.ServiceContext() -// if ctx == nil { -// // Service is not yet started -// return false -// } -// select { -// case s.ch <- msg: -// return true -// case <-ctx.Done(): -// // Service is not running anymore. -// return false -// } +// func (s *exampleService) Send(msg string) bool { +// ctx := s.ServiceContext() +// if ctx == nil { +// // Service is not yet started +// return false // } +// select { +// case s.ch <- msg: +// return true +// case <-ctx.Done(): +// // Service is not running anymore. +// return false +// } +// } // // This is not part of Service interface, and clients of the Service should not use it. func (b *BasicService) ServiceContext() context.Context { diff --git a/vendor/github.com/grafana/dskit/services/service.go b/vendor/github.com/grafana/dskit/services/service.go index 6170951a106..e3de5a09cf6 100644 --- a/vendor/github.com/grafana/dskit/services/service.go +++ b/vendor/github.com/grafana/dskit/services/service.go @@ -41,18 +41,17 @@ func (s State) String() string { // // State diagram for the service: // -// ┌────────────────────────────────────────────────────────────────────┐ -// │ │ -// │ ▼ -// ┌─────┐ ┌──────────┐ ┌─────────┐ ┌──────────┐ ┌────────────┐ -// │ New │─────▶│ Starting │─────▶│ Running │────▶│ Stopping │───┬─▶│ Terminated │ -// └─────┘ └──────────┘ └─────────┘ └──────────┘ │ └────────────┘ -// │ │ -// │ │ -// │ │ ┌────────┐ -// └──────────────────────────────────────────┴──▶│ Failed │ -// └────────┘ -// +// ┌────────────────────────────────────────────────────────────────────┐ +// │ │ +// │ ▼ +// ┌─────┐ ┌──────────┐ ┌─────────┐ ┌──────────┐ ┌────────────┐ +// │ New │─────▶│ Starting │─────▶│ Running │────▶│ Stopping │───┬─▶│ Terminated │ +// └─────┘ └──────────┘ └─────────┘ └──────────┘ │ └────────────┘ +// │ │ +// │ │ +// │ │ ┌────────┐ +// └──────────────────────────────────────────┴──▶│ Failed │ +// └────────┘ type Service interface { // StartAsync starts Service asynchronously. Service must be in New State, otherwise error is returned. // Context is used as a parent context for service own context. diff --git a/vendor/github.com/grafana/dskit/tenant/resolver.go b/vendor/github.com/grafana/dskit/tenant/resolver.go index 607a290e5d8..f0fd8abfea7 100644 --- a/vendor/github.com/grafana/dskit/tenant/resolver.go +++ b/vendor/github.com/grafana/dskit/tenant/resolver.go @@ -22,6 +22,7 @@ func WithDefaultResolver(r Resolver) { // supplied or user.ErrTooManyOrgIDs if there are multiple tenant IDs present. // // ignore stutter warning +// //nolint:revive func TenantID(ctx context.Context) (string, error) { return defaultResolver.TenantID(ctx) @@ -32,6 +33,7 @@ func TenantID(ctx context.Context) (string, error) { // NormalizeTenantIDs). // // ignore stutter warning +// //nolint:revive func TenantIDs(ctx context.Context) ([]string, error) { return defaultResolver.TenantIDs(ctx) diff --git a/vendor/github.com/grafana/dskit/tenant/tenant.go b/vendor/github.com/grafana/dskit/tenant/tenant.go index 99c1cc4a7d4..c7c772648c5 100644 --- a/vendor/github.com/grafana/dskit/tenant/tenant.go +++ b/vendor/github.com/grafana/dskit/tenant/tenant.go @@ -99,6 +99,7 @@ func isSupported(c rune) bool { // TenantIDsFromOrgID extracts different tenants from an orgID string value // // ignore stutter warning +// //nolint:revive func TenantIDsFromOrgID(orgID string) ([]string, error) { return TenantIDs(user.InjectOrgID(context.TODO(), orgID)) diff --git a/vendor/github.com/grafana/gomemcache/memcache/memcache.go b/vendor/github.com/grafana/gomemcache/memcache/memcache.go index 61fda56e81a..0855df163e1 100644 --- a/vendor/github.com/grafana/gomemcache/memcache/memcache.go +++ b/vendor/github.com/grafana/gomemcache/memcache/memcache.go @@ -23,8 +23,8 @@ import ( "errors" "fmt" "io" + "math" "net" - "strconv" "strings" "sync" @@ -70,6 +70,15 @@ const ( // DefaultMaxIdleConns is the default maximum number of idle connections // kept for any single address. DefaultMaxIdleConns = 2 + + // releaseIdleConnsCheckFrequency is how frequently to check if there are idle + // connections to release, in order to honor the configured min conns headroom. + releaseIdleConnsCheckFrequency = time.Minute + + // defaultRecentlyUsedConnsThreshold is the default grace period given to an + // idle connection to consider it "recently used". The default value has been + // set equal to the default TCP TIME_WAIT timeout in linux. + defaultRecentlyUsedConnsThreshold = 2 * time.Minute ) const buffered = 8 // arbitrary buffered channel size, for readability @@ -126,7 +135,11 @@ func New(server ...string) *Client { // NewFromSelector returns a new Client using the provided ServerSelector. func NewFromSelector(ss ServerSelector) *Client { - return &Client{selector: ss} + c := &Client{selector: ss, closed: make(chan struct{})} + + go c.releaseIdleConnectionsUntilClosed() + + return c } // Client is a memcache client. @@ -134,10 +147,23 @@ func NewFromSelector(ss ServerSelector) *Client { type Client struct { // DialTimeout specifies a custom dialer used to dial new connections to a server. DialTimeout func(network, address string, timeout time.Duration) (net.Conn, error) + // Timeout specifies the socket read/write timeout. // If zero, DefaultTimeout is used. Timeout time.Duration + // ConnectTimeout specifies the timeout for new connections. + // If zero, DefaultTimeout is used. + ConnectTimeout time.Duration + + // MinIdleConnsHeadroomPercentage specifies the percentage of minimum number of idle connections + // that should be kept open, compared to the number of free but recently used connections. + // If there are idle connections but none of them has been recently used, then all idle + // connections get closed. + // + // If the configured value is negative, idle connections are never closed. + MinIdleConnsHeadroomPercentage float64 + // MaxIdleConns specifies the maximum number of idle connections that will // be maintained per address. If less than one, DefaultMaxIdleConns will be // used. @@ -146,6 +172,18 @@ type Client struct { // be set to a number higher than your peak parallel requests. MaxIdleConns int + // recentlyUsedConnsThreshold is the default grace period given to an + // idle connection to consider it "recently used". Recently used connections + // are never closed even if idle. + // + // This field is used for testing. + recentlyUsedConnsThreshold time.Duration + + // closed channel gets closed once the Client.Close() is called. Once closed, + // resources should be released. + closed chan struct{} + closeOnce sync.Once + selector ServerSelector lk sync.Mutex @@ -179,6 +217,10 @@ type conn struct { rw *bufio.ReadWriter addr net.Addr c *Client + + // The timestamp since when this connection is idle. This is used to close + // idle connections. + idleSince time.Time } // release returns this connection back to the client's free pool @@ -213,6 +255,8 @@ func (c *Client) putFreeConn(addr net.Addr, cn *conn) { cn.nc.Close() return } + + cn.idleSince = time.Now() c.freeconn[addr.String()] = append(freelist, cn) } @@ -226,6 +270,9 @@ func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) { if !ok || len(freelist) == 0 { return nil, false } + + // Pop the connection from the end of the list. This way we prefer to always reuse + // the same connections, so that the min idle connections logic is effective. cn = freelist[len(freelist)-1] c.freeconn[addr.String()] = freelist[:len(freelist)-1] return cn, true @@ -238,6 +285,13 @@ func (c *Client) netTimeout() time.Duration { return DefaultTimeout } +func (c *Client) connectTimeout() time.Duration { + if c.ConnectTimeout != 0 { + return c.ConnectTimeout + } + return DefaultTimeout +} + func (c *Client) maxIdleConns() int { if c.MaxIdleConns > 0 { return c.MaxIdleConns @@ -245,6 +299,77 @@ func (c *Client) maxIdleConns() int { return DefaultMaxIdleConns } +func (c *Client) Close() { + c.closeOnce.Do(func() { + close(c.closed) + }) +} + +func (c *Client) releaseIdleConnectionsUntilClosed() { + for { + select { + case <-time.After(releaseIdleConnsCheckFrequency): + c.releaseIdleConnections() + case <-c.closed: + return + } + } +} + +func (c *Client) releaseIdleConnections() { + var toClose []io.Closer + + // Nothing to do if min idle connections headroom is disabled (negative value). + minIdleHeadroomPercentage := c.MinIdleConnsHeadroomPercentage + if minIdleHeadroomPercentage < 0 { + return + } + + // Get the recently used connections threshold, falling back to the default one. + recentlyUsedThreshold := c.recentlyUsedConnsThreshold + if recentlyUsedThreshold == 0 { + recentlyUsedThreshold = defaultRecentlyUsedConnsThreshold + } + + c.lk.Lock() + + for addr, freeConnections := range c.freeconn { + numIdle := 0 + + // Count the number of idle connections. Since the least used connections are at the beginning + // of the list, we can stop searching as soon as we find a non-idle connection. + for _, freeConn := range freeConnections { + if time.Since(freeConn.idleSince) < recentlyUsedThreshold { + break + } + + numIdle++ + } + + // Compute the number of connections to close. It keeps a number of idle connections equal to + // the configured headroom. + numRecentlyUsed := len(freeConnections) - numIdle + numIdleToKeep := int(math.Max(0, math.Ceil(float64(numRecentlyUsed)*minIdleHeadroomPercentage/100))) + numIdleToClose := numIdle - numIdleToKeep + if numIdleToClose <= 0 { + continue + } + + // Close idle connections. + for i := 0; i < numIdleToClose; i++ { + toClose = append(toClose, freeConnections[i].nc) + } + c.freeconn[addr] = c.freeconn[addr][numIdleToClose:] + } + + // Release the lock and then close the connections. + c.lk.Unlock() + + for _, freeConn := range toClose { + freeConn.Close() + } +} + // ConnectTimeoutError is the error type used when it takes // too long to connect to the desired host. This level of // detail can generally be ignored. @@ -261,7 +386,7 @@ func (c *Client) dial(addr net.Addr) (net.Conn, error) { if dialTimeout == nil { dialTimeout = net.DialTimeout } - nc, err := dialTimeout(addr.Network(), addr.String(), c.netTimeout()) + nc, err := dialTimeout(addr.Network(), addr.String(), c.connectTimeout()) if err == nil { return nc, nil } diff --git a/vendor/golang.org/x/exp/slices/slices.go b/vendor/golang.org/x/exp/slices/slices.go index a17b3cf6912..cff0cd49ecf 100644 --- a/vendor/golang.org/x/exp/slices/slices.go +++ b/vendor/golang.org/x/exp/slices/slices.go @@ -128,6 +128,12 @@ func Contains[E comparable](s []E, v E) bool { return Index(s, v) >= 0 } +// ContainsFunc reports whether at least one +// element e of s satisfies f(e). +func ContainsFunc[E any](s []E, f func(E) bool) bool { + return IndexFunc(s, f) >= 0 +} + // Insert inserts the values v... into s at index i, // returning the modified slice. // In the returned slice r, r[i] == v[0]. @@ -162,6 +168,24 @@ func Delete[S ~[]E, E any](s S, i, j int) S { return append(s[:i], s[j:]...) } +// Replace replaces the elements s[i:j] by the given v, and returns the +// modified slice. Replace panics if s[i:j] is not a valid slice of s. +func Replace[S ~[]E, E any](s S, i, j int, v ...E) S { + _ = s[i:j] // verify that i:j is a valid subslice + tot := len(s[:i]) + len(v) + len(s[j:]) + if tot <= cap(s) { + s2 := s[:tot] + copy(s2[i+len(v):], s[j:]) + copy(s2[i:], v) + return s2 + } + s2 := make(S, tot) + copy(s2, s[:i]) + copy(s2[i:], v) + copy(s2[i+len(v):], s[j:]) + return s2 +} + // Clone returns a copy of the slice. // The elements are copied using assignment, so this is a shallow clone. func Clone[S ~[]E, E any](s S) S { @@ -175,8 +199,11 @@ func Clone[S ~[]E, E any](s S) S { // Compact replaces consecutive runs of equal elements with a single copy. // This is like the uniq command found on Unix. // Compact modifies the contents of the slice s; it does not create a new slice. +// When Compact discards m elements in total, it might not modify the elements +// s[len(s)-m:len(s)]. If those elements contain pointers you might consider +// zeroing those elements so that objects they reference can be garbage collected. func Compact[S ~[]E, E comparable](s S) S { - if len(s) == 0 { + if len(s) < 2 { return s } i := 1 @@ -193,7 +220,7 @@ func Compact[S ~[]E, E comparable](s S) S { // CompactFunc is like Compact but uses a comparison function. func CompactFunc[S ~[]E, E any](s S, eq func(E, E) bool) S { - if len(s) == 0 { + if len(s) < 2 { return s } i := 1 @@ -210,11 +237,19 @@ func CompactFunc[S ~[]E, E any](s S, eq func(E, E) bool) S { // Grow increases the slice's capacity, if necessary, to guarantee space for // another n elements. After Grow(n), at least n elements can be appended -// to the slice without another allocation. Grow may modify elements of the -// slice between the length and the capacity. If n is negative or too large to +// to the slice without another allocation. If n is negative or too large to // allocate the memory, Grow panics. func Grow[S ~[]E, E any](s S, n int) S { - return append(s, make(S, n)...)[:len(s)] + if n < 0 { + panic("cannot be negative") + } + if n -= cap(s) - len(s); n > 0 { + // TODO(https://go.dev/issue/53888): Make using []E instead of S + // to workaround a compiler bug where the runtime.growslice optimization + // does not take effect. Revert when the compiler is fixed. + s = append([]E(s)[:cap(s)], make([]E, n)...)[:len(s)] + } + return s } // Clip removes unused capacity from the slice, returning s[:len(s):len(s)]. diff --git a/vendor/golang.org/x/exp/slices/sort.go b/vendor/golang.org/x/exp/slices/sort.go index c22e74bd102..f14f40da712 100644 --- a/vendor/golang.org/x/exp/slices/sort.go +++ b/vendor/golang.org/x/exp/slices/sort.go @@ -30,7 +30,7 @@ func SortFunc[E any](x []E, less func(a, b E) bool) { pdqsortLessFunc(x, 0, n, bits.Len(uint(n)), less) } -// SortStable sorts the slice x while keeping the original order of equal +// SortStableFunc sorts the slice x while keeping the original order of equal // elements, using less to compare elements. func SortStableFunc[E any](x []E, less func(a, b E) bool) { stableLessFunc(x, len(x), less) @@ -62,15 +62,22 @@ func IsSortedFunc[E any](x []E, less func(a, b E) bool) bool { // sort order; it also returns a bool saying whether the target is really found // in the slice. The slice must be sorted in increasing order. func BinarySearch[E constraints.Ordered](x []E, target E) (int, bool) { - // search returns the leftmost position where f returns true, or len(x) if f - // returns false for all x. This is the insertion position for target in x, - // and could point to an element that's either == target or not. - pos := search(len(x), func(i int) bool { return x[i] >= target }) - if pos >= len(x) || x[pos] != target { - return pos, false - } else { - return pos, true + // Inlining is faster than calling BinarySearchFunc with a lambda. + n := len(x) + // Define x[-1] < target and x[n] >= target. + // Invariant: x[i-1] < target, x[j] >= target. + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) // avoid overflow when computing h + // i ≤ h < j + if x[h] < target { + i = h + 1 // preserves x[i-1] < target + } else { + j = h // preserves x[j] >= target + } } + // i == j, x[i-1] < target, and x[j] (= x[i]) >= target => answer is i. + return i, i < n && x[i] == target } // BinarySearchFunc works like BinarySearch, but uses a custom comparison @@ -78,30 +85,22 @@ func BinarySearch[E constraints.Ordered](x []E, target E) (int, bool) { // defined by cmp. cmp(a, b) is expected to return an integer comparing the two // parameters: 0 if a == b, a negative number if a < b and a positive number if // a > b. -func BinarySearchFunc[E any](x []E, target E, cmp func(E, E) int) (int, bool) { - pos := search(len(x), func(i int) bool { return cmp(x[i], target) >= 0 }) - if pos >= len(x) || cmp(x[pos], target) != 0 { - return pos, false - } else { - return pos, true - } -} - -func search(n int, f func(int) bool) int { - // Define f(-1) == false and f(n) == true. - // Invariant: f(i-1) == false, f(j) == true. +func BinarySearchFunc[E, T any](x []E, target T, cmp func(E, T) int) (int, bool) { + n := len(x) + // Define cmp(x[-1], target) < 0 and cmp(x[n], target) >= 0 . + // Invariant: cmp(x[i - 1], target) < 0, cmp(x[j], target) >= 0. i, j := 0, n for i < j { h := int(uint(i+j) >> 1) // avoid overflow when computing h // i ≤ h < j - if !f(h) { - i = h + 1 // preserves f(i-1) == false + if cmp(x[h], target) < 0 { + i = h + 1 // preserves cmp(x[i - 1], target) < 0 } else { - j = h // preserves f(j) == true + j = h // preserves cmp(x[j], target) >= 0 } } - // i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i. - return i + // i == j, cmp(x[i-1], target) < 0, and cmp(x[j], target) (= cmp(x[i], target)) >= 0 => answer is i. + return i, i < n && cmp(x[i], target) == 0 } type sortedHint int // hint for pdqsort when choosing the pivot diff --git a/vendor/modules.txt b/vendor/modules.txt index d917b28a692..83cee866280 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -350,7 +350,7 @@ github.com/gorilla/handlers # github.com/gorilla/mux v1.8.0 ## explicit; go 1.12 github.com/gorilla/mux -# github.com/grafana/dskit v0.0.0-20230202132725-6043e861a8e2 +# github.com/grafana/dskit v0.0.0-20230417151531-1725bcc1e9a1 ## explicit; go 1.18 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency @@ -370,6 +370,7 @@ github.com/grafana/dskit/kv/consul github.com/grafana/dskit/kv/etcd github.com/grafana/dskit/kv/memberlist github.com/grafana/dskit/limiter +github.com/grafana/dskit/loser github.com/grafana/dskit/modules github.com/grafana/dskit/multierror github.com/grafana/dskit/netutil @@ -385,7 +386,7 @@ github.com/grafana/dskit/tenant github.com/grafana/e2e github.com/grafana/e2e/db github.com/grafana/e2e/images -# github.com/grafana/gomemcache v0.0.0-20230105173749-11f792309e1f +# github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9 ## explicit; go 1.18 github.com/grafana/gomemcache/memcache # github.com/grafana/regexp v0.0.0-20221005093135-b4c2bcb0a4b6 @@ -1317,7 +1318,7 @@ golang.org/x/crypto/md4 golang.org/x/crypto/pbkdf2 golang.org/x/crypto/pkcs12 golang.org/x/crypto/pkcs12/internal/rc2 -# golang.org/x/exp v0.0.0-20221002003631-540bb7301a08 +# golang.org/x/exp v0.0.0-20230321023759-10a507213a29 ## explicit; go 1.18 golang.org/x/exp/constraints golang.org/x/exp/slices