From c7785a79faf07ff9380b24fdb953cfca10a04f08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ro=C5=BCek?= Date: Wed, 30 Oct 2024 12:20:42 +0100 Subject: [PATCH] Add deployments, statefulSets, replicaSets to .workloads Helm chart values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jakub Rożek --- CHANGELOG.yml | 6 + charts/telepresence/README.md | 3 + charts/telepresence/templates/_helpers.tpl | 2 +- charts/telepresence/templates/deployment.yaml | 23 ++- charts/telepresence/values.yaml | 6 + cmd/traffic/cmd/manager/cluster/info.go | 2 + .../cmd/manager/managerutil/argorollouts.go | 9 -- .../cmd/manager/managerutil/envconfig.go | 23 ++- .../cmd/manager/managerutil/envconfig_test.go | 6 +- .../cmd/manager/mutator/agent_injector.go | 17 ++- .../manager/mutator/agent_injector_test.go | 16 ++- cmd/traffic/cmd/manager/mutator/watcher.go | 54 ++++--- cmd/traffic/cmd/manager/service.go | 17 ++- cmd/traffic/cmd/manager/state/state.go | 2 +- .../workload_configuration_test.go | 135 ++++++++++++++++++ pkg/client/userd/trafficmgr/session.go | 34 +++-- pkg/workload/watcher.go | 122 +++++++++------- rpc/manager/manager.proto | 4 +- 18 files changed, 375 insertions(+), 106 deletions(-) delete mode 100644 cmd/traffic/cmd/manager/managerutil/argorollouts.go create mode 100644 integration_test/workload_configuration_test.go diff --git a/CHANGELOG.yml b/CHANGELOG.yml index af23adc0ab..3d0d4ba580 100644 --- a/CHANGELOG.yml +++ b/CHANGELOG.yml @@ -78,6 +78,12 @@ items: body: >- The OSS code-base will no longer report usage data to the proprietary collector at Ambassador Labs. The actual calls to the collector remain, but will be no-ops unless a proper collector client is installed using an extension point. + - type: feature + title: Add deployments, statefulSets, replicaSets to workloads Helm chart value + body: >- + The Helm chart value workloads now supports the kinds deployments.enabled, statefulSets.enabled, and replicaSets.enabled. + By default, all three are enabled, but can be disabled by setting the corresponding value to false. + When disabled, the traffic-manager will ignore workloads of a corresponding kind, and Telepresence will not be able to intercept them. - version: 2.20.2 date: 2024-10-21 notes: diff --git a/charts/telepresence/README.md b/charts/telepresence/README.md index 94f1a9d616..14a55b7677 100644 --- a/charts/telepresence/README.md +++ b/charts/telepresence/README.md @@ -102,6 +102,9 @@ The following tables lists the configurable parameters of the Telepresence chart | client.routing.allowConflictingSubnets | Allow the specified subnets to be routed even if they conflict with other routes on the local machine. | `[]` | | client.dns.excludeSuffixes | Suffixes for which the client DNS resolver will always fail (or fallback in case of the overriding resolver) | `[".com", ".io", ".net", ".org", ".ru"]` | | client.dns.includeSuffixes | Suffixes for which the client DNS resolver will always attempt to do a lookup. Includes have higher priority than excludes. | `[]` | +| workloads.deployments.enabled | Enable/Disable the support for Deployments. | `true` | +| workloads.replicaSets.enabled | Enable/Disable the support for ReplicaSets. | `true` | +| workloads.statefulSets.enabled | Enable/Disable the support for StatefulSets. | `true` | | workloads.argoRollouts.enabled | Enable/Disable the argo-rollouts integration. | `false` | ### RBAC diff --git a/charts/telepresence/templates/_helpers.tpl b/charts/telepresence/templates/_helpers.tpl index da3ce3b5bf..97e10c6282 100644 --- a/charts/telepresence/templates/_helpers.tpl +++ b/charts/telepresence/templates/_helpers.tpl @@ -88,7 +88,7 @@ RBAC rules required to create an intercept in a namespace; excludes any rules th - apiGroups: ["apps"] resources: ["deployments", "replicasets", "statefulsets"] verbs: ["get", "watch", "list"] -{{- if .Values.workloads.argoRollouts.enabled }} +{{- if and .Values.workloads .Values.workloads.argoRollouts .Values.workloads.argoRollouts.enabled }} - apiGroups: ["argoproj.io"] resources: ["rollouts"] verbs: ["get", "watch", "list"] diff --git a/charts/telepresence/templates/deployment.yaml b/charts/telepresence/templates/deployment.yaml index 676a587efe..defea30663 100644 --- a/charts/telepresence/templates/deployment.yaml +++ b/charts/telepresence/templates/deployment.yaml @@ -81,9 +81,26 @@ spec: value: {{ .grpc.maxReceiveSize }} {{- end }} {{- end }} - {{- if .workloads.argoRollouts }} - - name: ARGO_ROLLOUTS_ENABLED - value: {{ .workloads.argoRollouts.enabled | quote }} + {{- if .workloads }} + {{- with .workloads }} + - name: ENABLED_WORKLOAD_KINDS + value: >- + {{- if or (not .deployments) .deployments.enabled }} + deployment + {{- end }} + {{- if or (not .statefulSets) .statefulSets.enabled }} + statefulset + {{- end }} + {{- if or (not .replicaSets) .replicaSets.enabled }} + replicaset + {{- end }} + {{- if and .argoRollouts .argoRollouts.enabled }} + rollout + {{- end }} + {{- end }} + {{- else }} + - name: ENABLED_WORKLOAD_KINDS + value: deployment statefulset rollout {{- end }} {{- if .agentInjector.enabled }} {{- /* diff --git a/charts/telepresence/values.yaml b/charts/telepresence/values.yaml index dd85245a98..23c6e22ead 100644 --- a/charts/telepresence/values.yaml +++ b/charts/telepresence/values.yaml @@ -347,6 +347,12 @@ client: # Controls which workload kinds are recognized by Telepresence workloads: + deployments: + enabled: true + replicaSets: + enabled: true + statefulSets: + enabled: true argoRollouts: enabled: false diff --git a/cmd/traffic/cmd/manager/cluster/info.go b/cmd/traffic/cmd/manager/cluster/info.go index c76b22d7de..487bfab04a 100644 --- a/cmd/traffic/cmd/manager/cluster/info.go +++ b/cmd/traffic/cmd/manager/cluster/info.go @@ -129,6 +129,8 @@ func NewInfo(ctx context.Context) Info { } } + dlog.Infof(ctx, "Enabled support for the following workload kinds: %v", env.EnabledWorkloadKinds) + // make an attempt to create a service with ClusterIP that is out of range and then // check the error message for the correct range as suggested tin the second answer here: // https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster diff --git a/cmd/traffic/cmd/manager/managerutil/argorollouts.go b/cmd/traffic/cmd/manager/managerutil/argorollouts.go deleted file mode 100644 index da55341274..0000000000 --- a/cmd/traffic/cmd/manager/managerutil/argorollouts.go +++ /dev/null @@ -1,9 +0,0 @@ -package managerutil - -import ( - "context" -) - -func ArgoRolloutsEnabled(ctx context.Context) bool { - return GetEnv(ctx).ArgoRolloutsEnabled -} diff --git a/cmd/traffic/cmd/manager/managerutil/envconfig.go b/cmd/traffic/cmd/manager/managerutil/envconfig.go index 77cc0906e3..dc3d151f5e 100644 --- a/cmd/traffic/cmd/manager/managerutil/envconfig.go +++ b/cmd/traffic/cmd/manager/managerutil/envconfig.go @@ -2,6 +2,7 @@ package managerutil import ( "context" + "fmt" "net/netip" "reflect" "strconv" @@ -18,6 +19,7 @@ import ( "github.com/datawire/k8sapi/pkg/k8sapi" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) // Env is the traffic-manager's environment. It does not define any defaults because all @@ -70,7 +72,7 @@ type Env struct { ClientDnsIncludeSuffixes []string `env:"CLIENT_DNS_INCLUDE_SUFFIXES, parser=split-trim, default="` ClientConnectionTTL time.Duration `env:"CLIENT_CONNECTION_TTL, parser=time.ParseDuration"` - ArgoRolloutsEnabled bool `env:"ARGO_ROLLOUTS_ENABLED, parser=bool, default=false"` + EnabledWorkloadKinds []workload.WorkloadKind `env:"ENABLED_WORKLOAD_KINDS, parser=split-trim, default=deployment statefulset replicaset"` // For testing only CompatibilityVersion *semver.Version `env:"COMPATIBILITY_VERSION, parser=version, default="` @@ -256,6 +258,25 @@ func fieldTypeHandlers() map[reflect.Type]envconfig.FieldTypeHandler { }, Setter: func(dst reflect.Value, src any) { dst.Set(reflect.ValueOf(src.(*semver.Version))) }, } + fhs[reflect.TypeOf([]workload.WorkloadKind{})] = envconfig.FieldTypeHandler{ + Parsers: map[string]func(string) (any, error){ + "split-trim": func(str string) (any, error) { //nolint:unparam // API requirement + if len(str) == 0 { + return nil, nil + } + ss := strings.Split(str, " ") + ks := make([]workload.WorkloadKind, len(ss)) + for i, s := range ss { + ks[i] = workload.WorkloadKind(s) + if !ks[i].IsValid() { + return nil, fmt.Errorf("invalid workload kind: %q", s) + } + } + return ks, nil + }, + }, + Setter: func(dst reflect.Value, src interface{}) { dst.Set(reflect.ValueOf(src.([]workload.WorkloadKind))) }, + } return fhs } diff --git a/cmd/traffic/cmd/manager/managerutil/envconfig_test.go b/cmd/traffic/cmd/manager/managerutil/envconfig_test.go index 7dd401d4af..527d63b22f 100644 --- a/cmd/traffic/cmd/manager/managerutil/envconfig_test.go +++ b/cmd/traffic/cmd/manager/managerutil/envconfig_test.go @@ -13,6 +13,7 @@ import ( "github.com/datawire/k8sapi/pkg/k8sapi" "github.com/telepresenceio/telepresence/v2/cmd/traffic/cmd/manager/managerutil" "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) func TestEnvconfig(t *testing.T) { @@ -53,6 +54,7 @@ func TestEnvconfig(t *testing.T) { PodCIDRStrategy: "auto", PodIP: netip.AddrFrom4([4]byte{203, 0, 113, 18}), ServerPort: 8081, + EnabledWorkloadKinds: []workload.WorkloadKind{workload.DeploymentWorkloadKind, workload.StatefulSetWorkloadKind, workload.ReplicaSetWorkloadKind}, } testcases := map[string]struct { @@ -65,12 +67,10 @@ func TestEnvconfig(t *testing.T) { }, "simple": { Input: map[string]string{ - "AGENT_REGISTRY": "ghcr.io/telepresenceio", - "ARGO_ROLLOUTS_ENABLED": "true", + "AGENT_REGISTRY": "ghcr.io/telepresenceio", }, Output: func(e *managerutil.Env) { e.AgentRegistry = "ghcr.io/telepresenceio" - e.ArgoRolloutsEnabled = true }, }, "complex": { diff --git a/cmd/traffic/cmd/manager/mutator/agent_injector.go b/cmd/traffic/cmd/manager/mutator/agent_injector.go index a47de6517d..20f128eaa0 100644 --- a/cmd/traffic/cmd/manager/mutator/agent_injector.go +++ b/cmd/traffic/cmd/manager/mutator/agent_injector.go @@ -28,6 +28,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/agentmap" "github.com/telepresenceio/telepresence/v2/pkg/maps" "github.com/telepresenceio/telepresence/v2/pkg/tracing" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) var podResource = meta.GroupVersionResource{Version: "v1", Group: "", Resource: "pods"} //nolint:gochecknoglobals // constant @@ -145,9 +146,19 @@ func (a *agentInjector) Inject(ctx context.Context, req *admission.AdmissionRequ return nil, nil } - supportedKinds := []string{"Deployment", "ReplicaSet", "StatefulSet"} - if managerutil.ArgoRolloutsEnabled(ctx) { - supportedKinds = append(supportedKinds, "Rollout") + enabledWorkloads := managerutil.GetEnv(ctx).EnabledWorkloadKinds + supportedKinds := make([]string, len(enabledWorkloads)) + for i, wlKind := range enabledWorkloads { + switch wlKind { + case workload.DeploymentWorkloadKind: + supportedKinds[i] = "Deployment" + case workload.ReplicaSetWorkloadKind: + supportedKinds[i] = "ReplicaSet" + case workload.StatefulSetWorkloadKind: + supportedKinds[i] = "StatefulSet" + case workload.RolloutWorkloadKind: + supportedKinds[i] = "Rollout" + } } wl, err := agentmap.FindOwnerWorkload(ctx, k8sapi.Pod(pod), supportedKinds) if err != nil { diff --git a/cmd/traffic/cmd/manager/mutator/agent_injector_test.go b/cmd/traffic/cmd/manager/mutator/agent_injector_test.go index 755d450246..d7b45c0c3a 100644 --- a/cmd/traffic/cmd/manager/mutator/agent_injector_test.go +++ b/cmd/traffic/cmd/manager/mutator/agent_injector_test.go @@ -31,6 +31,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/agentconfig" "github.com/telepresenceio/telepresence/v2/pkg/agentmap" "github.com/telepresenceio/telepresence/v2/pkg/informer" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) const serviceAccountMountPath = "/var/run/secrets/kubernetes.io/serviceaccount" @@ -1907,9 +1908,18 @@ func toAdmissionRequest(resource meta.GroupVersionResource, object any) *admissi } func generateForPod(t *testing.T, ctx context.Context, pod *core.Pod, gc agentmap.GeneratorConfig) (agentconfig.SidecarExt, error) { - supportedKinds := []string{"Deployment", "ReplicaSet", "StatefulSet"} - if managerutil.ArgoRolloutsEnabled(ctx) { - supportedKinds = append(supportedKinds, "Rollout") + supportedKinds := make([]string, 0, 4) + for _, wlKind := range managerutil.GetEnv(ctx).EnabledWorkloadKinds { + switch wlKind { + case workload.DeploymentWorkloadKind: + supportedKinds = append(supportedKinds, "Deployment") + case workload.ReplicaSetWorkloadKind: + supportedKinds = append(supportedKinds, "ReplicaSet") + case workload.StatefulSetWorkloadKind: + supportedKinds = append(supportedKinds, "StatefulSet") + case workload.RolloutWorkloadKind: + supportedKinds = append(supportedKinds, "Rollout") + } } wl, err := agentmap.FindOwnerWorkload(ctx, k8sapi.Pod(pod), supportedKinds) if err != nil { diff --git a/cmd/traffic/cmd/manager/mutator/watcher.go b/cmd/traffic/cmd/manager/mutator/watcher.go index 77b94a1f03..70bda356b2 100644 --- a/cmd/traffic/cmd/manager/mutator/watcher.go +++ b/cmd/traffic/cmd/manager/mutator/watcher.go @@ -560,19 +560,25 @@ func (c *configWatcher) StartWatchers(ctx context.Context) error { return err } } - for _, si := range c.dps { - if err := c.watchWorkloads(ctx, si); err != nil { - return err + if c.dps != nil { + for _, si := range c.dps { + if err := c.watchWorkloads(ctx, si); err != nil { + return err + } } } - for _, si := range c.rss { - if err := c.watchWorkloads(ctx, si); err != nil { - return err + if c.rss != nil { + for _, si := range c.rss { + if err := c.watchWorkloads(ctx, si); err != nil { + return err + } } } - for _, si := range c.sss { - if err := c.watchWorkloads(ctx, si); err != nil { - return err + if c.sss != nil { + for _, si := range c.sss { + if err := c.watchWorkloads(ctx, si); err != nil { + return err + } } } if c.rls != nil { @@ -834,22 +840,36 @@ func (c *configWatcher) Start(ctx context.Context) { c.svs = make([]cache.SharedIndexInformer, len(nss)) c.cms = make([]cache.SharedIndexInformer, len(nss)) - c.dps = make([]cache.SharedIndexInformer, len(nss)) - c.rss = make([]cache.SharedIndexInformer, len(nss)) - c.sss = make([]cache.SharedIndexInformer, len(nss)) + for _, wlKind := range env.EnabledWorkloadKinds { + switch wlKind { + case workload.DeploymentWorkloadKind: + c.dps = make([]cache.SharedIndexInformer, len(nss)) + case workload.ReplicaSetWorkloadKind: + c.rss = make([]cache.SharedIndexInformer, len(nss)) + case workload.StatefulSetWorkloadKind: + c.sss = make([]cache.SharedIndexInformer, len(nss)) + case workload.RolloutWorkloadKind: + c.rls = make([]cache.SharedIndexInformer, len(nss)) + } + } for i, ns := range nss { c.cms[i] = c.startConfigMap(ctx, ns) c.svs[i] = c.startServices(ctx, ns) - c.dps[i] = workload.StartDeployments(ctx, ns) - c.rss[i] = workload.StartReplicaSets(ctx, ns) - c.sss[i] = workload.StartStatefulSets(ctx, ns) + if c.dps != nil { + c.dps[i] = workload.StartDeployments(ctx, ns) + } + if c.rss != nil { + c.rss[i] = workload.StartReplicaSets(ctx, ns) + } + if c.sss != nil { + c.sss[i] = workload.StartStatefulSets(ctx, ns) + } c.startPods(ctx, ns) kf := informer.GetK8sFactory(ctx, ns) kf.Start(ctx.Done()) kf.WaitForCacheSync(ctx.Done()) } - if managerutil.ArgoRolloutsEnabled(ctx) { - c.rls = make([]cache.SharedIndexInformer, len(nss)) + if c.rls != nil { for i, ns := range nss { c.rls[i] = workload.StartRollouts(ctx, ns) rf := informer.GetArgoRolloutsFactory(ctx, ns) diff --git a/cmd/traffic/cmd/manager/service.go b/cmd/traffic/cmd/manager/service.go index f2b09f1d72..cc4ce73b34 100644 --- a/cmd/traffic/cmd/manager/service.go +++ b/cmd/traffic/cmd/manager/service.go @@ -32,6 +32,7 @@ import ( "github.com/telepresenceio/telepresence/v2/pkg/tracing" "github.com/telepresenceio/telepresence/v2/pkg/tunnel" "github.com/telepresenceio/telepresence/v2/pkg/version" + "github.com/telepresenceio/telepresence/v2/pkg/workload" ) // Clock is the mechanism used by the Manager state to get the current time. @@ -583,9 +584,19 @@ func (s *service) GetKnownWorkloadKinds(ctx context.Context, request *rpc.Sessio } ctx = managerutil.WithSessionInfo(ctx, request) dlog.Debugf(ctx, "GetKnownWorkloadKinds called") - kinds := []rpc.WorkloadInfo_Kind{rpc.WorkloadInfo_DEPLOYMENT, rpc.WorkloadInfo_REPLICASET, rpc.WorkloadInfo_STATEFULSET} - if managerutil.ArgoRolloutsEnabled(ctx) { - kinds = append(kinds, rpc.WorkloadInfo_ROLLOUT) + enabledWorkloadKinds := managerutil.GetEnv(ctx).EnabledWorkloadKinds + kinds := make([]rpc.WorkloadInfo_Kind, len(enabledWorkloadKinds)) + for i, wlKind := range enabledWorkloadKinds { + switch wlKind { + case workload.DeploymentWorkloadKind: + kinds[i] = rpc.WorkloadInfo_DEPLOYMENT + case workload.ReplicaSetWorkloadKind: + kinds[i] = rpc.WorkloadInfo_REPLICASET + case workload.StatefulSetWorkloadKind: + kinds[i] = rpc.WorkloadInfo_STATEFULSET + case workload.RolloutWorkloadKind: + kinds[i] = rpc.WorkloadInfo_ROLLOUT + } } return &rpc.KnownWorkloadKinds{Kinds: kinds}, nil } diff --git a/cmd/traffic/cmd/manager/state/state.go b/cmd/traffic/cmd/manager/state/state.go index e245401118..c75f0ecd7f 100644 --- a/cmd/traffic/cmd/manager/state/state.go +++ b/cmd/traffic/cmd/manager/state/state.go @@ -495,7 +495,7 @@ func (s *state) WatchWorkloads(ctx context.Context, sessionID string) (ch <-chan } ns := client.Namespace ww, _ := s.workloadWatchers.LoadOrCompute(ns, func() (ww workload.Watcher) { - ww, err = workload.NewWatcher(s.backgroundCtx, ns, managerutil.ArgoRolloutsEnabled(ctx)) + ww, err = workload.NewWatcher(s.backgroundCtx, ns, managerutil.GetEnv(ctx).EnabledWorkloadKinds) return ww }) if err != nil { diff --git a/integration_test/workload_configuration_test.go b/integration_test/workload_configuration_test.go new file mode 100644 index 0000000000..599e957ae9 --- /dev/null +++ b/integration_test/workload_configuration_test.go @@ -0,0 +1,135 @@ +package integration_test + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/telepresenceio/telepresence/v2/integration_test/itest" +) + +type workloadConfigurationSuite struct { + itest.Suite + itest.NamespacePair +} + +func (s *workloadConfigurationSuite) SuiteName() string { + return "WorkloadConfiguration" +} + +func init() { + itest.AddTrafficManagerSuite("-workload-configuration", func(h itest.NamespacePair) itest.TestingSuite { + return &workloadConfigurationSuite{Suite: itest.Suite{Harness: h}, NamespacePair: h} + }) +} + +func (s *workloadConfigurationSuite) disabledWorkloadKind(tp, wl string) { + ctx := s.Context() + require := s.Require() + + s.ApplyApp(ctx, wl, strings.ToLower(tp)+"/"+wl) + defer s.DeleteSvcAndWorkload(ctx, strings.ToLower(tp), wl) + + defer s.uninstallAgents(ctx, wl) + + s.TelepresenceConnect(ctx) + defer itest.TelepresenceDisconnectOk(ctx) + + // give it time for the workload to be detected (if it was going to be) + time.Sleep(6 * time.Second) + + list := itest.TelepresenceOk(ctx, "list") + require.Equal("No Workloads (Deployments, StatefulSets, ReplicaSets, or Rollouts)", list) + + _, stderr, err := itest.Telepresence(ctx, "intercept", wl) + require.Error(err) + require.Contains(stderr, fmt.Sprintf("connector.CreateIntercept: workload \"%s.%s\" not found", wl, s.NamespacePair.AppNamespace())) +} + +func (s *workloadConfigurationSuite) uninstallAgents(ctx context.Context, wl string) { + dfltCtx := itest.WithUser(ctx, "default") + itest.TelepresenceOk(dfltCtx, "connect", "--namespace", s.AppNamespace(), "--manager-namespace", s.ManagerNamespace()) + itest.TelepresenceOk(dfltCtx, "uninstall", "--agent", wl) + itest.TelepresenceDisconnectOk(dfltCtx) +} + +func (s *workloadConfigurationSuite) Test_DisabledReplicaSet() { + s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.replicaSets.enabled=false") + defer s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.replicaSets.enabled=true") + s.disabledWorkloadKind("ReplicaSet", "rs-echo") +} + +func (s *workloadConfigurationSuite) Test_DisabledStatefulSet() { + s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.statefulSets.enabled=false") + defer s.TelepresenceHelmInstallOK(s.Context(), true, "--set", "workloads.statefulSets.enabled=true") + s.disabledWorkloadKind("StatefulSet", "ss-echo") +} + +func (s *workloadConfigurationSuite) Test_InterceptsDeploymentWithDisabledReplicaSets() { + ctx := s.Context() + require := s.Require() + + wl, tp := "echo-one", "Deployment" + s.ApplyApp(ctx, wl, strings.ToLower(tp)+"/"+wl) + defer s.DeleteSvcAndWorkload(ctx, strings.ToLower(tp), wl) + + s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.replicaSets.enabled=false") + defer s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.replicaSets.enabled=true") + + defer s.uninstallAgents(ctx, wl) + + s.TelepresenceConnect(ctx) + defer itest.TelepresenceDisconnectOk(ctx) + + require.Eventually( + func() bool { + stdout, _, err := itest.Telepresence(ctx, "list") + return err == nil && strings.Contains(stdout, fmt.Sprintf("%s: ready to intercept", wl)) + }, + 6*time.Second, // waitFor + 2*time.Second, // polling interval + ) + + stdout := itest.TelepresenceOk(ctx, "intercept", wl) + require.Contains(stdout, fmt.Sprintf("Using %s %s", tp, wl)) + + stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") + require.Contains(stdout, fmt.Sprintf("%s: intercepted", wl)) + itest.TelepresenceOk(ctx, "leave", wl) +} + +func (s *workloadConfigurationSuite) Test_InterceptsReplicaSetWithDisabledDeployments() { + ctx := s.Context() + require := s.Require() + + wl, tp := "echo-one", "Deployment" + s.ApplyApp(ctx, wl, strings.ToLower(tp)+"/"+wl) + defer s.DeleteSvcAndWorkload(ctx, strings.ToLower(tp), wl) + + interceptableWl := s.KubectlOk(ctx, "get", "replicasets", "-l", fmt.Sprintf("app=%s", wl), "-o", "jsonpath={.items[*].metadata.name}") + + s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.deployments.enabled=false") + defer s.TelepresenceHelmInstallOK(ctx, true, "--set", "workloads.deployments.enabled=true") + + defer s.uninstallAgents(ctx, interceptableWl) + + s.TelepresenceConnect(ctx) + defer itest.TelepresenceDisconnectOk(ctx) + + require.Eventually( + func() bool { + stdout, _, err := itest.Telepresence(ctx, "list") + return err == nil && strings.Contains(stdout, fmt.Sprintf("%s: ready to intercept", interceptableWl)) + }, + 6*time.Second, // waitFor + 2*time.Second, // polling interval + ) + + stdout := itest.TelepresenceOk(ctx, "intercept", interceptableWl) + require.Contains(stdout, fmt.Sprintf("Using %s %s", "ReplicaSet", interceptableWl)) + + stdout = itest.TelepresenceOk(ctx, "list", "--intercepts") + require.Contains(stdout, fmt.Sprintf("%s: intercepted", interceptableWl)) + itest.TelepresenceOk(ctx, "leave", interceptableWl) +} diff --git a/pkg/client/userd/trafficmgr/session.go b/pkg/client/userd/trafficmgr/session.go index d7cae2dc49..15ed7d83c6 100644 --- a/pkg/client/userd/trafficmgr/session.go +++ b/pkg/client/userd/trafficmgr/session.go @@ -1111,22 +1111,32 @@ func (s *session) localWorkloadsWatcher(ctx context.Context, namespace string, s ctx = informer.WithFactory(ctx, namespace) fc = informer.GetFactory(ctx, namespace) } - workload.StartDeployments(ctx, namespace) - workload.StartReplicaSets(ctx, namespace) - workload.StartStatefulSets(ctx, namespace) - kf := fc.GetK8sInformerFactory() - kf.Start(ctx.Done()) - rolloutsEnabled := slices.Index(knownWorkloadKinds.Kinds, manager.WorkloadInfo_ROLLOUT) >= 0 - if rolloutsEnabled { - workload.StartRollouts(ctx, namespace) - af := fc.GetArgoRolloutsInformerFactory() - af.Start(ctx.Done()) + enabledWorkloadKinds := make([]workload.WorkloadKind, len(knownWorkloadKinds.Kinds)) + for i, kind := range knownWorkloadKinds.Kinds { + switch kind { + case manager.WorkloadInfo_DEPLOYMENT: + workload.StartDeployments(ctx, namespace) + enabledWorkloadKinds[i] = workload.DeploymentWorkloadKind + case manager.WorkloadInfo_REPLICASET: + enabledWorkloadKinds[i] = workload.ReplicaSetWorkloadKind + workload.StartReplicaSets(ctx, namespace) + case manager.WorkloadInfo_STATEFULSET: + enabledWorkloadKinds[i] = workload.StatefulSetWorkloadKind + workload.StartStatefulSets(ctx, namespace) + case manager.WorkloadInfo_ROLLOUT: + enabledWorkloadKinds[i] = workload.RolloutWorkloadKind + workload.StartRollouts(ctx, namespace) + af := fc.GetArgoRolloutsInformerFactory() + af.Start(ctx.Done()) + } } - ww, err := workload.NewWatcher(ctx, namespace, rolloutsEnabled) + kf := fc.GetK8sInformerFactory() + kf.Start(ctx.Done()) + + ww, err := workload.NewWatcher(ctx, namespace, enabledWorkloadKinds) if err != nil { - workload.StartRollouts(ctx, namespace) return err } kf.WaitForCacheSync(ctx.Done()) diff --git a/pkg/workload/watcher.go b/pkg/workload/watcher.go index 5d8fc173cc..c0e63fe511 100644 --- a/pkg/workload/watcher.go +++ b/pkg/workload/watcher.go @@ -3,6 +3,7 @@ package workload import ( "context" "math" + "slices" "sync" "time" @@ -34,6 +35,19 @@ type WorkloadEvent struct { Workload k8sapi.Workload } +type WorkloadKind string + +const ( + DeploymentWorkloadKind WorkloadKind = "deployment" + StatefulSetWorkloadKind WorkloadKind = "statefulset" + ReplicaSetWorkloadKind WorkloadKind = "replicaset" + RolloutWorkloadKind WorkloadKind = "rollout" +) + +func (w *WorkloadKind) IsValid() bool { + return w != nil && slices.Contains([]WorkloadKind{DeploymentWorkloadKind, StatefulSetWorkloadKind, ReplicaSetWorkloadKind, RolloutWorkloadKind}, *w) +} + func (e EventType) String() string { switch e { case EventTypeAdd: @@ -53,17 +67,17 @@ type Watcher interface { type watcher struct { sync.Mutex - namespace string - subscriptions map[uuid.UUID]chan<- []WorkloadEvent - timer *time.Timer - events []WorkloadEvent - rolloutsEnabled bool + namespace string + subscriptions map[uuid.UUID]chan<- []WorkloadEvent + timer *time.Timer + events []WorkloadEvent + enabledWorkloadKinds []WorkloadKind } -func NewWatcher(ctx context.Context, ns string, rolloutsEnabled bool) (Watcher, error) { +func NewWatcher(ctx context.Context, ns string, enabledWorkloadKinds []WorkloadKind) (Watcher, error) { w := new(watcher) w.namespace = ns - w.rolloutsEnabled = rolloutsEnabled + w.enabledWorkloadKinds = enabledWorkloadKinds w.subscriptions = make(map[uuid.UUID]chan<- []WorkloadEvent) w.timer = time.AfterFunc(time.Duration(math.MaxInt64), func() { w.Lock() @@ -92,14 +106,17 @@ func NewWatcher(ctx context.Context, ns string, rolloutsEnabled bool) (Watcher, return w, nil } -func hasValidReplicasetOwner(wl k8sapi.Workload, rolloutsEnabled bool) bool { +func hasValidReplicasetOwner(wl k8sapi.Workload, enabledWorkloadKinds []WorkloadKind) bool { for _, ref := range wl.GetOwnerReferences() { if ref.Controller != nil && *ref.Controller { switch ref.Kind { case "Deployment": - return true + if slices.Contains(enabledWorkloadKinds, DeploymentWorkloadKind) { + return true + } + case "Rollout": - if rolloutsEnabled { + if slices.Contains(enabledWorkloadKinds, RolloutWorkloadKind) { return true } } @@ -120,41 +137,47 @@ func (w *watcher) Subscribe(ctx context.Context) <-chan []WorkloadEvent { kf := informer.GetFactory(ctx, w.namespace) ai := kf.GetK8sInformerFactory().Apps().V1() dlog.Debugf(ctx, "workload.Watcher producing initial events for namespace %s", w.namespace) - if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil { - for _, obj := range dps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) && !trafficManagerSelector.Matches(labels.Set(obj.Labels)) { - initialEvents = append(initialEvents, WorkloadEvent{ - Type: EventTypeAdd, - Workload: wl, - }) + if slices.Contains(w.enabledWorkloadKinds, DeploymentWorkloadKind) { + if dps, err := ai.Deployments().Lister().Deployments(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range dps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) && !trafficManagerSelector.Matches(labels.Set(obj.Labels)) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } } } } - if rps, err := ai.ReplicaSets().Lister().ReplicaSets(w.namespace).List(labels.Everything()); err == nil { - for _, obj := range rps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { - initialEvents = append(initialEvents, WorkloadEvent{ - Type: EventTypeAdd, - Workload: wl, - }) + if slices.Contains(w.enabledWorkloadKinds, ReplicaSetWorkloadKind) { + if rps, err := ai.ReplicaSets().Lister().ReplicaSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range rps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } } } } - if sps, err := ai.StatefulSets().Lister().StatefulSets(w.namespace).List(labels.Everything()); err == nil { - for _, obj := range sps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { - initialEvents = append(initialEvents, WorkloadEvent{ - Type: EventTypeAdd, - Workload: wl, - }) + if slices.Contains(w.enabledWorkloadKinds, StatefulSetWorkloadKind) { + if sps, err := ai.StatefulSets().Lister().StatefulSets(w.namespace).List(labels.Everything()); err == nil { + for _, obj := range sps { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) { + initialEvents = append(initialEvents, WorkloadEvent{ + Type: EventTypeAdd, + Workload: wl, + }) + } } } } - if w.rolloutsEnabled { + if slices.Contains(w.enabledWorkloadKinds, RolloutWorkloadKind) { ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() if sps, err := ri.Rollouts().Lister().Rollouts(w.namespace).List(labels.Everything()); err == nil { for _, obj := range sps { - if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.rolloutsEnabled) { + if wl, ok := FromAny(obj); ok && !hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) { initialEvents = append(initialEvents, WorkloadEvent{ Type: EventTypeAdd, Workload: wl, @@ -244,24 +267,27 @@ func (w *watcher) watch(ix cache.SharedIndexInformer, ns string, hasValidControl func (w *watcher) addEventHandler(ctx context.Context, ns string) error { kf := informer.GetFactory(ctx, ns) hvc := func(wl k8sapi.Workload) bool { - return hasValidReplicasetOwner(wl, w.rolloutsEnabled) + return hasValidReplicasetOwner(wl, w.enabledWorkloadKinds) } ai := kf.GetK8sInformerFactory().Apps().V1() - if err := w.watch(ai.Deployments().Informer(), ns, hvc); err != nil { - return err - } - if err := w.watch(ai.ReplicaSets().Informer(), ns, hvc); err != nil { - return err - } - if err := w.watch(ai.StatefulSets().Informer(), ns, hvc); err != nil { - return err - } - if !w.rolloutsEnabled { - dlog.Infof(ctx, "Argo Rollouts is disabled, Argo Rollouts will not be watched") - } else { - ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() - if err := w.watch(ri.Rollouts().Informer(), ns, hvc); err != nil { + for _, wlKind := range w.enabledWorkloadKinds { + var ssi cache.SharedIndexInformer + switch wlKind { + case DeploymentWorkloadKind: + ssi = ai.Deployments().Informer() + case ReplicaSetWorkloadKind: + ssi = ai.ReplicaSets().Informer() + case StatefulSetWorkloadKind: + ssi = ai.StatefulSets().Informer() + case RolloutWorkloadKind: + ri := kf.GetArgoRolloutsInformerFactory().Argoproj().V1alpha1() + ssi = ri.Rollouts().Informer() + default: + continue + } + + if err := w.watch(ssi, ns, hvc); err != nil { return err } } diff --git a/rpc/manager/manager.proto b/rpc/manager/manager.proto index 61a382d6a7..40fbd38397 100644 --- a/rpc/manager/manager.proto +++ b/rpc/manager/manager.proto @@ -779,8 +779,8 @@ service Manager { rpc ReviewIntercept(ReviewInterceptRequest) returns (google.protobuf.Empty); // GetKnownWorkloadKinds returns the known workload kinds - // that the manager can handle. This base set should always include Deployment, StatefulSet, and ReplicaSet, - // and it may include Rollout (Argo Rollouts) if the support for it is enabled. + // that the manager can handle. This set may include Deployment, StatefulSet, ReplicaSet, Rollout (Argo Rollouts) + // as configured in the manager's Helm values. rpc GetKnownWorkloadKinds(SessionInfo) returns (KnownWorkloadKinds); // LookupDNS performs a DNS lookup in the cluster. If the caller has intercepts