Skip to content

Commit

Permalink
Split agent into register, clusterstatus, controller
Browse files Browse the repository at this point in the history
* register init container
* clusterstatus ticker
* bundledeployment controller
* add init container for registration
* no leader election for bundledeployment controller
  • Loading branch information
Mario Manno committed Sep 29, 2023
1 parent 333301d commit c652c1e
Show file tree
Hide file tree
Showing 26 changed files with 1,445 additions and 1,003 deletions.
51 changes: 50 additions & 1 deletion charts/fleet-agent/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: apps/v1
kind: Deployment
kind: StatefulSet
metadata:
name: fleet-agent
spec:
Expand All @@ -11,8 +11,56 @@ spec:
labels:
app: fleet-agent
spec:
initContainers:
- env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: '{{ template "system_default_registry" . }}{{.Values.image.repository}}:{{.Values.image.tag}}'
name: fleet-agent
command:
- fleetagent
- register
{{- if .Values.debug }}
- --debug
- --debug-level
- {{ quote .Values.debugLevel }}
{{- else }}
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
privileged: false
capabilities:
drop:
- ALL
{{- end }}
containers:
- env:
# missing AGENT_SCOPE
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: '{{ template "system_default_registry" . }}{{.Values.image.repository}}:{{.Values.image.tag}}'
name: fleet-agent
command:
- fleetagent
{{- if .Values.debug }}
- --debug
- --debug-level
- {{ quote .Values.debugLevel }}
{{- else }}
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
privileged: false
capabilities:
drop:
- ALL
{{- end }}
- env:
# missing CHECKIN_INTERVAL
- name: NAMESPACE
valueFrom:
fieldRef:
Expand All @@ -21,6 +69,7 @@ spec:
name: fleet-agent
command:
- fleetagent
- clusterstatus
{{- if .Values.debug }}
- --debug
- --debug-level
Expand Down
2 changes: 1 addition & 1 deletion charts/fleet/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: apps/v1
kind: Deployment
kind: StatefulSet
metadata:
name: fleet-controller
spec:
Expand Down
4 changes: 2 additions & 2 deletions dev/setup-fleet
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ helm -n cattle-fleet-system upgrade --install --create-namespace --wait --reset-
--set debug=true --set debugLevel=99 fleet charts/fleet

# wait for controller and agent rollout
kubectl -n cattle-fleet-system rollout status deploy/fleet-controller
kubectl -n cattle-fleet-system rollout status statefulset/fleet-controller
{ grep -E -q -m 1 "fleet-agent-local.*1/1"; kill $!; } < <(kubectl get bundles -n fleet-local -w)
kubectl -n cattle-fleet-system rollout status deploy/fleet-agent
kubectl -n cattle-fleet-system rollout status statefulset/fleet-agent

# label local cluster
kubectl patch clusters.fleet.cattle.io -n fleet-local local --type=json -p '[{"op": "add", "path": "/metadata/labels/management.cattle.io~1cluster-display-name", "value": "local" }]'
4 changes: 2 additions & 2 deletions integrationtests/agent/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/rancher/fleet/integrationtests/utils"
"github.com/rancher/fleet/internal/cmd/agent/controllers/bundledeployment"
"github.com/rancher/fleet/internal/cmd/agent/deployer"
"github.com/rancher/fleet/internal/cmd/agent/trigger"
"github.com/rancher/fleet/internal/helmdeployer"
Expand Down Expand Up @@ -157,7 +156,8 @@ func registerBundleDeploymentController(cfg *rest.Config, namespace string, look
helmDeployer,
wranglerApply)

bundledeployment.Register(ctx, trig, mapper, dyn, deployManager, factory.Fleet().V1alpha1().BundleDeployment())
// TODO fix this
// bundledeployment.Register(ctx, trig, mapper, dyn, deployManager, factory.Fleet().V1alpha1().BundleDeployment())

err = factory.Start(ctx, 50)
Expect(err).ToNot(HaveOccurred())
Expand Down
179 changes: 179 additions & 0 deletions internal/cmd/agent/clusterstatus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package agent

import (
"context"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
memory "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"

"github.com/rancher/fleet/internal/cmd/agent/clusterstatus"
"github.com/rancher/fleet/internal/cmd/agent/register"
"github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1"
"github.com/rancher/fleet/pkg/durations"
"github.com/rancher/fleet/pkg/generated/controllers/fleet.cattle.io"

cache2 "github.com/rancher/lasso/pkg/cache"
"github.com/rancher/lasso/pkg/client"
"github.com/rancher/lasso/pkg/controller"
"github.com/rancher/lasso/pkg/mapper"
command "github.com/rancher/wrangler-cli"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
"github.com/rancher/wrangler/pkg/kubeconfig"
"github.com/rancher/wrangler/pkg/ratelimit"
"github.com/rancher/wrangler/pkg/ticker"
)

func NewClusterStatus() *cobra.Command {
cmd := command.Command(&ClusterStatus{}, cobra.Command{
Use: "clusterstatus [flags]",
Short: "Continuously report resource status to the upstream cluster",
})
command.AddDebug(cmd, &debugConfig)
return cmd
}

type ClusterStatus struct {
UpstreamOptions
CheckinInterval string `usage:"How often to post cluster status" env:"CHECKIN_INTERVAL"`
}

func (cs *ClusterStatus) Run(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

var err error
var checkinInterval time.Duration
if cs.CheckinInterval != "" {
checkinInterval, err = time.ParseDuration(cs.CheckinInterval)
if err != nil {
return err
}
}

clientConfig := kubeconfig.GetNonInteractiveClientConfig(cs.Kubeconfig)
kc, err := clientConfig.ClientConfig()
if err != nil {
logrus.Fatal(err)
}

// try to claim leadership lease without rate limiting
localConfig := rest.CopyConfig(kc)
localConfig.RateLimiter = ratelimit.None

// cannot start without kubeconfig for upstream cluster
agentInfo, err := register.Get(ctx, cs.Namespace, localConfig)
if err != nil {
logrus.Fatal(err)
}

// set up factory for upstream cluster
fleetNamespace, _, err := agentInfo.ClientConfig.Namespace()
if err != nil {
logrus.Fatal(err)
}

fleetRESTConfig, err := agentInfo.ClientConfig.ClientConfig()
if err != nil {
logrus.Fatal(err)
}

// now we have both configs
fleetMapper, mapper, _, err := newMappers(ctx, fleetRESTConfig, clientConfig)
if err != nil {
logrus.Fatal(err)
}

fleetFactory, err := newSharedControllerFactory(fleetRESTConfig, fleetMapper, fleetNamespace)
if err != nil {
logrus.Fatal(err)
}

fleet, err := fleet.NewFactoryFromConfigWithOptions(fleetRESTConfig, &fleet.FactoryOptions{
SharedControllerFactory: fleetFactory,
})
if err != nil {
logrus.Fatal(err)
}

// set up factory for local cluster
localFactory, err := newSharedControllerFactory(localConfig, mapper, "")
if err != nil {
logrus.Fatal(err)
}

core, err := core.NewFactoryFromConfigWithOptions(localConfig, &core.FactoryOptions{
SharedControllerFactory: localFactory,
})
if err != nil {
logrus.Fatal(err)
}

clusterstatus.Start(ctx,
cs.Namespace, // appCtx.AgentNamespace // #2 // namespace
agentInfo.ClusterNamespace, // appCtx.ClusterNamespace // #3 // agentInfo.ClusterNamespace
agentInfo.ClusterName,
checkinInterval,
core.Core().V1().Node().Cache(),
fleet.Fleet().V1alpha1().Cluster(),
)

<-cmd.Context().Done()
return nil
}

func newSharedControllerFactory(config *rest.Config, mapper meta.RESTMapper, namespace string) (controller.SharedControllerFactory, error) {
cf, err := client.NewSharedClientFactory(config, &client.SharedClientFactoryOptions{
Mapper: mapper,
})
if err != nil {
return nil, err
}

cacheFactory := cache2.NewSharedCachedFactory(cf, &cache2.SharedCacheFactoryOptions{
DefaultNamespace: namespace,
DefaultResync: durations.DefaultResyncAgent,
})
slowRateLimiter := workqueue.NewItemExponentialFailureRateLimiter(durations.SlowFailureRateLimiterBase, durations.SlowFailureRateLimiterMax)
return controller.NewSharedControllerFactory(cacheFactory, &controller.SharedControllerFactoryOptions{
KindRateLimiter: map[schema.GroupVersionKind]workqueue.RateLimiter{
v1alpha1.SchemeGroupVersion.WithKind("BundleDeployment"): slowRateLimiter,
},
}), nil
}

func newMappers(ctx context.Context, fleetRESTConfig *rest.Config, clientconfig clientcmd.ClientConfig) (meta.RESTMapper, meta.RESTMapper, discovery.CachedDiscoveryInterface, error) {
fleetMapper, err := mapper.New(fleetRESTConfig)
if err != nil {
return nil, nil, nil, err
}

client, err := clientconfig.ClientConfig()
if err != nil {
return nil, nil, nil, err
}
client.RateLimiter = ratelimit.None

d, err := discovery.NewDiscoveryClientForConfig(client)
if err != nil {
return nil, nil, nil, err
}
discovery := memory.NewMemCacheClient(d)
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discovery)

go func() {
for range ticker.Context(ctx, 30*time.Second) {
discovery.Invalidate()
mapper.Reset()
}
}()

return fleetMapper, mapper, discovery, nil
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package cluster updates the cluster.fleet.cattle.io status in the upstream cluster with the current node status.
package cluster
// Package clusterstatus updates the cluster.fleet.cattle.io status in the upstream cluster with the current node status.
package clusterstatus

import (
"context"
Expand Down Expand Up @@ -32,7 +32,7 @@ type handler struct {
reported fleet.AgentStatus
}

func Register(ctx context.Context,
func Start(ctx context.Context,
agentNamespace string,
clusterNamespace string,
clusterName string,
Expand Down
Loading

0 comments on commit c652c1e

Please sign in to comment.