From 7f2f96375daaf2c153aa27fad7cfe1f0239cb4d3 Mon Sep 17 00:00:00 2001 From: Yaroslav Borbat <86148689+yaroslavborbat@users.noreply.github.com> Date: Wed, 31 Jul 2024 14:59:32 +0300 Subject: [PATCH] feat(vm-route-forge): impl route reconciliation (#242) Signed-off-by: yaroslavborbat Signed-off-by: Yaroslav Borbat <86148689+yaroslavborbat@users.noreply.github.com> Co-authored-by: Ivan Mikheykin --- CONTRIBUTING.md | 2 +- Taskfile.yaml | 6 +- .../{vmi-router => vm-route-forge}/.gitignore | 0 .../{vmi-router => vm-route-forge}/README.md | 11 +- .../Taskfile.yaml | 0 .../cmd/vm-route-forge/app/options/options.go | 80 +++++ .../cmd/vm-route-forge/app/root.go | 203 ++++++++++++ .../vm-route-forge/cmd/vm-route-forge/main.go | 32 ++ images/{vmi-router => vm-route-forge}/go.mod | 2 +- images/{vmi-router => vm-route-forge}/go.sum | 0 images/vm-route-forge/internal/cache/cache.go | 90 ++++++ .../internal/controller/route/host.go | 123 ++++++++ .../controller/route/route_controller.go | 291 ++++++++++++++++++ .../internal/informer/informer.go | 51 +++ .../internal}/netlinkmanager/manager.go | 97 +++--- .../internal}/netlinkwrap/funcs.go | 0 .../internal}/netlinkwrap/funcs_linux.go | 0 .../internal}/netlinkwrap/funcs_others.go | 0 .../internal}/netutil/cidr.go | 0 .../vm-route-forge/internal/server/healthz.go | 29 ++ images/vm-route-forge/internal/server/http.go | 68 ++++ .../vm-route-forge/internal/server/readyz.go | 60 ++++ .../internal/server/runnable.go | 73 +++++ .../vm-route-forge/internal/server/server.go | 143 +++++++++ .../werf.inc.yaml | 16 +- images/vmi-router/controllers/suite_test.go | 69 ----- .../controllers/vmirouter_controller.go | 55 ---- .../controllers/vmirouter_reconciler.go | 96 ------ images/vmi-router/main.go | 215 ------------- openapi/config-values.yaml | 2 +- openapi/doc-ru-config-values.yaml | 2 +- .../daemonset.yaml | 28 +- .../rbac-for-us.yaml | 24 +- 33 files changed, 1353 insertions(+), 515 deletions(-) rename images/{vmi-router => vm-route-forge}/.gitignore (100%) rename images/{vmi-router => vm-route-forge}/README.md (71%) rename images/{vmi-router => vm-route-forge}/Taskfile.yaml (100%) create mode 100644 images/vm-route-forge/cmd/vm-route-forge/app/options/options.go create mode 100644 images/vm-route-forge/cmd/vm-route-forge/app/root.go create mode 100644 images/vm-route-forge/cmd/vm-route-forge/main.go rename images/{vmi-router => vm-route-forge}/go.mod (99%) rename images/{vmi-router => vm-route-forge}/go.sum (100%) create mode 100644 images/vm-route-forge/internal/cache/cache.go create mode 100644 images/vm-route-forge/internal/controller/route/host.go create mode 100644 images/vm-route-forge/internal/controller/route/route_controller.go create mode 100644 images/vm-route-forge/internal/informer/informer.go rename images/{vmi-router => vm-route-forge/internal}/netlinkmanager/manager.go (74%) rename images/{vmi-router => vm-route-forge/internal}/netlinkwrap/funcs.go (100%) rename images/{vmi-router => vm-route-forge/internal}/netlinkwrap/funcs_linux.go (100%) rename images/{vmi-router => vm-route-forge/internal}/netlinkwrap/funcs_others.go (100%) rename images/{vmi-router => vm-route-forge/internal}/netutil/cidr.go (100%) create mode 100644 images/vm-route-forge/internal/server/healthz.go create mode 100644 images/vm-route-forge/internal/server/http.go create mode 100644 images/vm-route-forge/internal/server/readyz.go create mode 100644 images/vm-route-forge/internal/server/runnable.go create mode 100644 images/vm-route-forge/internal/server/server.go rename images/{vmi-router => vm-route-forge}/werf.inc.yaml (65%) delete mode 100644 images/vmi-router/controllers/suite_test.go delete mode 100644 images/vmi-router/controllers/vmirouter_controller.go delete mode 100644 images/vmi-router/controllers/vmirouter_reconciler.go delete mode 100644 images/vmi-router/main.go rename templates/kubevirt/{vmi-router => vm-route-forge}/daemonset.yaml (73%) rename templates/kubevirt/{vmi-router => vm-route-forge}/rbac-for-us.yaml (55%) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 69ed0f2bd..74ba63641 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -100,7 +100,7 @@ Supported scopes are the following: # Core mechanisms and low-level system functionalities. - core - api-service - - vmi-router + - vm-route-forge - kubevirt - kube-api-rewriter - cdi diff --git a/Taskfile.yaml b/Taskfile.yaml index ce94cf26f..fd08da473 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -5,9 +5,9 @@ version: "3" silent: true includes: - vmi-router: - taskfile: ./images/vmi-router - dir: ./images/vmi-router + vm-route-forge: + taskfile: ./images/vm-route-forge + dir: ./images/vm-route-forge virtualization-controller: taskfile: ./images/virtualization-artifact dir: ./images/virtualization-artifact diff --git a/images/vmi-router/.gitignore b/images/vm-route-forge/.gitignore similarity index 100% rename from images/vmi-router/.gitignore rename to images/vm-route-forge/.gitignore diff --git a/images/vmi-router/README.md b/images/vm-route-forge/README.md similarity index 71% rename from images/vmi-router/README.md rename to images/vm-route-forge/README.md index 63f3c6d82..2e9beda76 100644 --- a/images/vmi-router/README.md +++ b/images/vm-route-forge/README.md @@ -1,6 +1,4 @@ -## vmi-router - -> **NOTE:** Not an accurate name, should be 'cilium-route-updater'. +## vm-route-forge This controller watches for VirtualMachines in virtualization.deckhouse.io group and updates routes in table 1490 to route traffic between VMs via Cilium agents. @@ -23,7 +21,7 @@ Use --cidr flags to specify CIDRs to limit managed IPs. Controller will update r Example: ``` -vmi-router --cidr 10.2.0.0/24 --cidr 10.2.1.0/24 --cidr 10.2.2.0/24 +vm-route-forge --cidr 10.2.0.0/24 --cidr 10.2.1.0/24 --cidr 10.2.2.0/24 ``` Controller will update route for VM with IP 10.2.1.32, but will ignore VM with IP 10.2.4.5. @@ -32,10 +30,9 @@ Controller will update route for VM with IP 10.2.1.32, but will ignore VM with I Use --dry-run flag to enable non destructive mode. The controller will not actually delete or replace rules and routes, only log these actions. -#### Metrics and healthz addresses +#### Healthz addresses -Controller can't predict used ports when starting in host network mode. So, be default, metrics and healthz are started on random free ports. Use flags to specify these addresses: +Controller can't predict used ports when starting in host network mode. So, be default, healthz are started on random free ports. Use flags to specify these addresses: -`--metrics-bind-address` - set port for /metrics endpoint, e.g. `--metrics-bind-address=:9250` `--health-probe-bind-address` - set port for /healthz endpoint, e.g. `--health-probe-bind-address=:9321` diff --git a/images/vmi-router/Taskfile.yaml b/images/vm-route-forge/Taskfile.yaml similarity index 100% rename from images/vmi-router/Taskfile.yaml rename to images/vm-route-forge/Taskfile.yaml diff --git a/images/vm-route-forge/cmd/vm-route-forge/app/options/options.go b/images/vm-route-forge/cmd/vm-route-forge/app/options/options.go new file mode 100644 index 000000000..b75c2d1cd --- /dev/null +++ b/images/vm-route-forge/cmd/vm-route-forge/app/options/options.go @@ -0,0 +1,80 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "os" + "strconv" + + "github.com/spf13/pflag" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "vm-route-forge/internal/netutil" +) + +type Options struct { + ZapOptions zap.Options + + Verbosity int + Cidrs netutil.CIDRSet + DryRun bool + ProbeAddr string + NodeName string + TableID string +} + +const ( + flagCidr, flagCidrShort = "cidr", "c" + flagDryRun, flagDryRunShort = "dry-run", "d" + flagProbeAddr = "health-probe-bind-address" + flagVerbosity, flagVerbosityShort = "verbosity", "v" + flagNodeName, flagNodeNameShort = "nodeName", "n" + flagTableId, flagTableIdShort = "tableId", "t" + defaultVerbosity = 1 + + VerbosityEnv = "VERBOSITY" + NodeNameEnv = "NODE_NAME" + TableIDEnv = "TABLE_ID" +) + +func NewOptions() Options { + return Options{ + ZapOptions: zap.Options{ + Development: true, + }, + } +} + +func (o *Options) Flags(fs *pflag.FlagSet) { + fs.StringSliceVarP((*[]string)(&o.Cidrs), flagCidr, flagCidrShort, []string{}, "CIDRs enabled to route (multiple flags allowed)") + fs.BoolVarP(&o.DryRun, flagDryRun, flagDryRunShort, false, "Don't perform any changes on the node.") + fs.StringVar(&o.ProbeAddr, flagProbeAddr, ":0", "The address the probe endpoint binds to.") + fs.StringVarP(&o.NodeName, flagNodeName, flagNodeNameShort, os.Getenv(NodeNameEnv), "The name of the node.") + fs.StringVarP(&o.TableID, flagTableId, flagTableIdShort, os.Getenv(TableIDEnv), "The id of the table.") + fs.IntVarP(&o.Verbosity, flagVerbosity, flagVerbosityShort, getDefaultVerbosity(), "Verbosity of output") +} + +func getDefaultVerbosity() int { + if v, ok := os.LookupEnv(VerbosityEnv); ok { + verbosity, err := strconv.Atoi(v) + if err != nil { + return defaultVerbosity + } + return verbosity + } + return defaultVerbosity +} diff --git a/images/vm-route-forge/cmd/vm-route-forge/app/root.go b/images/vm-route-forge/cmd/vm-route-forge/app/root.go new file mode 100644 index 000000000..e9a7e62c9 --- /dev/null +++ b/images/vm-route-forge/cmd/vm-route-forge/app/root.go @@ -0,0 +1,203 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "context" + "fmt" + "net" + "strconv" + + "github.com/spf13/cobra" + "go.uber.org/zap/zapcore" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/config" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + + "vm-route-forge/cmd/vm-route-forge/app/options" + "vm-route-forge/internal/cache" + "vm-route-forge/internal/controller/route" + "vm-route-forge/internal/informer" + "vm-route-forge/internal/netlinkmanager" + "vm-route-forge/internal/server" +) + +const long = ` + _ __ +__ ___ __ ___ _ __ ___ _ _| |_ ___ / _| ___ _ __ __ _ ___ +\ \ / / '_ ` + "`" + ` _ \ _____| '__/ _ \| | | | __/ _ \_____| |_ / _ \| '__/ _` + "`" + ` |/ _ \ + \ V /| | | | | |_____| | | (_) | |_| | || __/_____| _| (_) | | | (_| | __/ + \_/ |_| |_| |_| |_| \___/ \__,_|\__\___| |_| \___/|_| \__, |\___| + |___/ +Managing virtual machine routes +` + +const ( + appName = "vm-route-forge" + // The count of workers that will be started for the route controller. + // We are currently supporting only one worker. + countWorkersRouteController = 1 +) + +var ( + log = ctrl.Log.WithName(appName) +) + +func NewVmRouteForgeCommand() *cobra.Command { + opts := options.NewOptions() + cmd := &cobra.Command{ + Short: "Managing virtual machine routes", + Long: long, + RunE: func(c *cobra.Command, args []string) error { + return run(opts) + }, + } + opts.Flags(cmd.Flags()) + return cmd +} + +func setupLogger(verbosity int) { + debug := false + if verbosity > 1 { + debug = true + } + + // The logger instantiated here can be changed to any logger + // implementing the logr.Logger interface. This logger will + // be propagated through the whole operator, generating + // uniform and structured logs. + logf.SetLogger(zap.New(zap.Level(zapcore.Level(-1*verbosity)), zap.UseDevMode(debug))) +} + +func run(opts options.Options) error { + setupLogger(opts.Verbosity) + var parsedCIDRs []*net.IPNet + for _, cidr := range opts.Cidrs { + _, parsedCIDR, err := net.ParseCIDR(cidr) + if err != nil || parsedCIDR == nil { + log.Error(err, "failed to parse passed CIDRs") + return err + } + parsedCIDRs = append(parsedCIDRs, parsedCIDR) + } + log.Info(fmt.Sprintf("Got CIDRs to manage: %+v", opts.Cidrs)) + + if opts.DryRun { + log.Info("Dry run mode is enabled, will not change network rules and routes") + } + + tableID := netlinkmanager.DefaultCiliumRouteTable + tableIDStr := opts.TableID + if tableIDStr != "" { + tableId, err := strconv.ParseInt(tableIDStr, 10, 32) + if err != nil { + log.Error(err, "failed to parse Cilium table id, should be integer") + return err + } + tableID = int(tableId) + } + log.Info(fmt.Sprintf("Use cilium route table id %d", tableID)) + + // Load configuration to connect to Kubernetes API Server. + kubeCfg, err := config.GetConfig() + if err != nil { + log.Error(err, "Failed to load Kubernetes config") + return err + } + kubeClient, err := kubernetes.NewForConfig(kubeCfg) + if err != nil { + log.Error(err, "Failed to create Kubernetes client") + return err + } + + ctx := signals.SetupSignalHandler() + + vmSharedInformerFactory, err := informer.VirtualizationInformerFactory(kubeCfg) + if err != nil { + log.Error(err, "Failed to create virtualization shared factory") + return err + } + go vmSharedInformerFactory.Virtualization().V1alpha2().VirtualMachines().Informer().Run(ctx.Done()) + + ciliumSharedInformerFactory, err := informer.CiliumInformerFactory(kubeCfg) + if err != nil { + log.Error(err, "Failed to create cilium shared factory") + return err + } + go ciliumSharedInformerFactory.Cilium().V2().CiliumNodes().Informer().Run(ctx.Done()) + + sharedCache := cache.NewCache() + + netMgr := netlinkmanager.New(vmSharedInformerFactory.Virtualization().V1alpha2().VirtualMachines(), + ciliumSharedInformerFactory.Cilium().V2().CiliumNodes(), + sharedCache, + log, + tableID, + parsedCIDRs, + opts.DryRun, + ) + + err = startupSync(ctx, netMgr) + if err != nil { + log.Error(err, "Failed to run pre sync") + return err + } + routeCtrl, err := route.NewRouteController( + vmSharedInformerFactory.Virtualization().V1alpha2().VirtualMachines(), + ciliumSharedInformerFactory.Cilium().V2().CiliumNodes(), + netMgr, + sharedCache, + parsedCIDRs, + log, + ) + if err != nil { + log.Error(err, "Failed to create route controller") + return err + } + go routeCtrl.Run(ctx, countWorkersRouteController) + + srv, err := server.NewServer( + kubeClient, + server.Options{HealthProbeBindAddress: opts.ProbeAddr}, + log, + ) + if err != nil { + log.Error(err, "Failed to create server") + return err + } + return srv.Run(ctx) +} + +func startupSync(ctx context.Context, mgr *netlinkmanager.Manager) error { + log.Info("Synchronize route rules at start") + err := mgr.SyncRules() + if err != nil { + log.Error(err, fmt.Sprintf("failed to synchronize routing rules ar start")) + return err + } + + log.Info("Synchronize VM routes at start") + err = mgr.SyncRoutes(ctx) + if err != nil { + log.Error(err, fmt.Sprintf("failed to synchronize VM routes at start")) + return err + } + return nil +} diff --git a/images/vm-route-forge/cmd/vm-route-forge/main.go b/images/vm-route-forge/cmd/vm-route-forge/main.go new file mode 100644 index 000000000..32c49d221 --- /dev/null +++ b/images/vm-route-forge/cmd/vm-route-forge/main.go @@ -0,0 +1,32 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "os" + + "vm-route-forge/cmd/vm-route-forge/app" +) + +func main() { + vmRouteForge := app.NewVmRouteForgeCommand() + if err := vmRouteForge.Execute(); err != nil { + fmt.Fprintf(os.Stderr, "vm-route-forge: %s\n", err) + os.Exit(1) + } +} diff --git a/images/vmi-router/go.mod b/images/vm-route-forge/go.mod similarity index 99% rename from images/vmi-router/go.mod rename to images/vm-route-forge/go.mod index eebee70f6..0b7d9a7e6 100644 --- a/images/vmi-router/go.mod +++ b/images/vm-route-forge/go.mod @@ -1,4 +1,4 @@ -module vmi-router +module vm-route-forge go 1.21.0 diff --git a/images/vmi-router/go.sum b/images/vm-route-forge/go.sum similarity index 100% rename from images/vmi-router/go.sum rename to images/vm-route-forge/go.sum diff --git a/images/vm-route-forge/internal/cache/cache.go b/images/vm-route-forge/internal/cache/cache.go new file mode 100644 index 000000000..ecd72ec76 --- /dev/null +++ b/images/vm-route-forge/internal/cache/cache.go @@ -0,0 +1,90 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "sync" + + "k8s.io/apimachinery/pkg/types" +) + +type Cache interface { + GetAddresses(k types.NamespacedName) (Addresses, bool) + GetName(ip string) (types.NamespacedName, bool) + Set(k types.NamespacedName, addrs Addresses) + DeleteByKey(k types.NamespacedName) + DeleteByIP(ip string) +} + +func NewCache() Cache { + return &defaultCache{ + vmAddr: make(map[types.NamespacedName]Addresses), + addrVm: make(map[string]types.NamespacedName), + } +} + +type defaultCache struct { + mu sync.RWMutex + vmAddr map[types.NamespacedName]Addresses + addrVm map[string]types.NamespacedName +} + +func (c *defaultCache) GetAddresses(k types.NamespacedName) (Addresses, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + res, ok := c.vmAddr[k] + return res, ok +} + +func (c *defaultCache) GetName(ip string) (types.NamespacedName, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + res, ok := c.addrVm[ip] + return res, ok +} + +func (c *defaultCache) Set(k types.NamespacedName, addrs Addresses) { + c.mu.Lock() + defer c.mu.Unlock() + c.vmAddr[k] = addrs + c.addrVm[addrs.VMIP] = k +} + +func (c *defaultCache) DeleteByKey(k types.NamespacedName) { + c.mu.Lock() + defer c.mu.Unlock() + addrs, ok := c.vmAddr[k] + if ok { + delete(c.addrVm, addrs.VMIP) + } + delete(c.vmAddr, k) +} + +func (c *defaultCache) DeleteByIP(ip string) { + c.mu.Lock() + defer c.mu.Unlock() + k, ok := c.addrVm[ip] + if ok { + delete(c.vmAddr, k) + } + delete(c.addrVm, ip) +} + +type Addresses struct { + NodeIP string + VMIP string +} diff --git a/images/vm-route-forge/internal/controller/route/host.go b/images/vm-route-forge/internal/controller/route/host.go new file mode 100644 index 000000000..68f7bf7f9 --- /dev/null +++ b/images/vm-route-forge/internal/controller/route/host.go @@ -0,0 +1,123 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package route + +import ( + "context" + "fmt" + "net" + + "github.com/go-logr/logr" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + + "vm-route-forge/internal/cache" +) + +func NewHostController(queue workqueue.RateLimitingInterface, cidrs []*net.IPNet, cache cache.Cache, log logr.Logger) *HostRouteController { + return &HostRouteController{ + queue: queue, + cidrs: cidrs, + cache: cache, + log: log, + } +} + +type HostRouteController struct { + queue workqueue.RateLimitingInterface + cidrs []*net.IPNet + cache cache.Cache + log logr.Logger +} + +func (r *HostRouteController) Run(ctx context.Context) error { + ch := make(chan netlink.RouteUpdate) + if err := netlink.RouteSubscribe(ch, ctx.Done()); err != nil { + return fmt.Errorf("failed to subscribe to route updates: %w", err) + } + for ru := range ch { + if err := r.sync(ru); err != nil { + r.log.Error(err, "failed to sync route update") + } + } + return nil +} + +// The cache is the source of truth. +// It contains all relevant information about the cluster, +// including the name and namespace of the virtual machine, its ip and ip nodes. +// We monitor updates in the routes and if we find a mismatch with the cache, +// we put the virtual machine in the queue for processing. +func (r *HostRouteController) sync(ru netlink.RouteUpdate) error { + dst := ru.Dst + if dst == nil { + return nil + } + isManaged, err := r.isManagedIP(dst.IP) + if err != nil { + return err + } + if !isManaged { + return nil + } + src := ru.Src + + key, found := r.cache.GetName(dst.IP.String()) + switch ru.Type { + case unix.RTM_NEWROUTE: + // if the route was added but not added to cache, then do nothing, because we can't get name of vm. + if !found { + break + } + // if the route has been added, but there is no addresses in the cache, then add the VM to the queue. + addrs, found := r.cache.GetAddresses(key) + if !found { + r.enqueueKey(key) + break + } + // if the route was added, but the addresses from the cache and from the route do not match, then add the VM to the queue. + if addrs.NodeIP != src.String() || addrs.VMIP != dst.String() { + r.enqueueKey(key) + } + // if the route was deleted but not deleted from the cache, then add the VM to the queue. + case unix.RTM_DELROUTE: + if found { + r.enqueueKey(key) + } + } + return nil +} + +func (r *HostRouteController) isManagedIP(ip net.IP) (bool, error) { + if len(ip) == 0 { + return false, fmt.Errorf("invalid IP address %s", ip) + } + + for _, cidr := range r.cidrs { + if cidr.Contains(ip) { + return true, nil + } + } + + return false, nil +} + +func (r *HostRouteController) enqueueKey(key types.NamespacedName) { + r.queue.Add(key.String()) +} diff --git a/images/vm-route-forge/internal/controller/route/route_controller.go b/images/vm-route-forge/internal/controller/route/route_controller.go new file mode 100644 index 000000000..067e0e088 --- /dev/null +++ b/images/vm-route-forge/internal/controller/route/route_controller.go @@ -0,0 +1,291 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package route + +import ( + "context" + "fmt" + "net" + "strings" + "time" + + ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + ciliumv2Informers "github.com/cilium/cilium/pkg/k8s/client/informers/externalversions/cilium.io/v2" + "github.com/cilium/cilium/pkg/node/addressing" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + virtinformers "github.com/deckhouse/virtualization/api/client/generated/informers/externalversions/core/v1alpha2" + virtlisters "github.com/deckhouse/virtualization/api/client/generated/listers/core/v1alpha2" + "github.com/deckhouse/virtualization/api/core/v1alpha2" + cache2 "vm-route-forge/internal/cache" + "vm-route-forge/internal/netlinkmanager" +) + +const controllerName = "routeController" + +var ( + KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc +) + +type Runnable interface { + Run(ctx context.Context) error +} + +func NewRouteController( + vmInformer virtinformers.VirtualMachineInformer, + cnInformer ciliumv2Informers.CiliumNodeInformer, + netlinkMgr *netlinkmanager.Manager, + sharedCache cache2.Cache, + cidrs []*net.IPNet, + logger logr.Logger, +) (*Controller, error) { + + queue := workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: controllerName}) + log := logger.WithValues("controller", controllerName) + routeController := &Controller{ + queue: queue, + hostReconciler: NewHostController(queue, cidrs, sharedCache, log), + cache: sharedCache, + netlinkMgr: netlinkMgr, + log: log, + } + + _, err := vmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: routeController.addVirtualMachine, + DeleteFunc: routeController.deleteVirtualMachine, + UpdateFunc: routeController.updateVirtualMachine, + }) + if err != nil { + return nil, err + } + _, err = cnInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: routeController.addCiliumNode, + DeleteFunc: routeController.deleteCiliumNode, + UpdateFunc: routeController.updateCiliumNode, + }) + if err != nil { + return nil, err + } + routeController.vmIndexer = vmInformer.Informer().GetIndexer() + routeController.vmLister = vmInformer.Lister() + routeController.hasSynced = func() bool { + return vmInformer.Informer().HasSynced() && cnInformer.Informer().HasSynced() + } + + return routeController, nil +} + +type Controller struct { + vmIndexer cache.Indexer + vmLister virtlisters.VirtualMachineLister + hostReconciler Runnable + hasSynced cache.InformerSynced + queue workqueue.RateLimitingInterface + cache cache2.Cache + netlinkMgr *netlinkmanager.Manager + log logr.Logger +} + +func (c *Controller) addVirtualMachine(obj interface{}) { + vm, ok := obj.(*v1alpha2.VirtualMachine) + if !ok { + return + } + c.enqueueVirtualMachine(vm) +} + +func (c *Controller) deleteVirtualMachine(obj interface{}) { + vm, ok := obj.(*v1alpha2.VirtualMachine) + if !ok { + return + } + c.enqueueVirtualMachine(vm) +} +func (c *Controller) updateVirtualMachine(oldObj interface{}, newObj interface{}) { + oldVm, ok := oldObj.(*v1alpha2.VirtualMachine) + if !ok { + return + } + newVm, ok := newObj.(*v1alpha2.VirtualMachine) + if !ok { + return + } + if oldVm.Status.IPAddress != newVm.Status.IPAddress || oldVm.Status.Node != newVm.Status.Node { + c.enqueueVirtualMachine(newVm) + } +} + +func (c *Controller) addCiliumNode(_ interface{}) { + // Do nothing +} + +func (c *Controller) deleteCiliumNode(obj interface{}) { + node, ok := obj.(*ciliumv2.CiliumNode) + if !ok { + return + } + vms, err := c.vmLister.List(labels.Everything()) + if err != nil { + c.log.Error(err, "failed to list virtual machines") + return + } + + for _, vm := range vms { + if vm.Status.Node == node.Name { + c.enqueueVirtualMachine(vm) + } + } + +} +func (c *Controller) updateCiliumNode(oldObj interface{}, newObj interface{}) { + oldNode, ok := oldObj.(*ciliumv2.CiliumNode) + if !ok { + return + } + newNode, ok := newObj.(*ciliumv2.CiliumNode) + if !ok { + return + } + + oldIP := c.getCiliumInternalIP(oldNode) + newIP := c.getCiliumInternalIP(newNode) + + if oldIP == newIP { + return + } + vms, err := c.vmLister.List(labels.Everything()) + if err != nil { + c.log.Error(err, "failed to list virtual machines") + return + } + + for _, vm := range vms { + if vm.Status.Node == newNode.Name { + c.enqueueVirtualMachine(vm) + } + } +} + +func (c *Controller) getCiliumInternalIP(node *ciliumv2.CiliumNode) string { + for _, addr := range node.Spec.Addresses { + if addr.Type == addressing.NodeCiliumInternalIP { + return addr.IP + } + } + return "" +} + +func (c *Controller) enqueueVirtualMachine(vm *v1alpha2.VirtualMachine) { + key, err := KeyFunc(vm) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", vm, err)) + return + } + c.queue.Add(key) +} + +func (c *Controller) Run(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + c.log.Info("Starting route controller") + defer c.log.Info("Shutting down route controller") + + if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.hasSynced) { + c.log.Error(fmt.Errorf("cache is not synced"), "Controller will be stopped", "controller", controllerName) + return + } + + c.log.Info("Starting workers of route controller") + for i := 0; i < workers; i++ { + go wait.UntilWithContext(ctx, c.worker, time.Second) + } + c.log.Info("Starting localhost route controller") + errCh := make(chan error) + go func() { + errCh <- c.hostReconciler.Run(ctx) + }() + + for { + select { + case err := <-errCh: + if err != nil { + c.log.Error(err, "host reconciliation failed") + } + return + case <-ctx.Done(): + return + } + } +} + +func (c *Controller) worker(ctx context.Context) { + workFunc := func(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return true + } + defer c.queue.Done(key) + + if err := c.sync(ctx, key.(string)); err != nil { + c.log.Error(err, fmt.Sprintf("re-enqueuing VirtualMachine %v", key)) + c.queue.AddRateLimited(key) + } else { + c.log.Info(fmt.Sprintf("processed VirtualMachine %v", key)) + c.queue.Forget(key) + } + return false + } + for { + quit := workFunc(ctx) + + if quit { + return + } + } +} + +func (c *Controller) sync(ctx context.Context, key string) error { + obj, exists, err := c.vmIndexer.GetByKey(key) + if err != nil { + return err + } + ns, name, _ := strings.Cut(key, "/") + k := types.NamespacedName{Name: name, Namespace: ns} + if !exists { + c.netlinkMgr.DeleteRoute(k, "") + return nil + } + originalVM := obj.(*v1alpha2.VirtualMachine) + vm := originalVM.DeepCopy() + log := c.log.WithValues("virtualmachine", key) + log.Info("Started processing vm") + + if vm.GetDeletionTimestamp() != nil { + c.netlinkMgr.DeleteRoute(k, vm.Status.IPAddress) + return nil + } + + c.netlinkMgr.UpdateRoute(ctx, vm) + return nil +} diff --git a/images/vm-route-forge/internal/informer/informer.go b/images/vm-route-forge/internal/informer/informer.go new file mode 100644 index 000000000..3898d1f98 --- /dev/null +++ b/images/vm-route-forge/internal/informer/informer.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informer + +import ( + "fmt" + + ciliumClient "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" + ciliumInformers "github.com/cilium/cilium/pkg/k8s/client/informers/externalversions" + "k8s.io/client-go/rest" + + virtClient "github.com/deckhouse/virtualization/api/client/generated/clientset/versioned" + virtInformers "github.com/deckhouse/virtualization/api/client/generated/informers/externalversions" +) + +const ( + // we should never need to resync, since we're not worried about missing events, + // and resync is actually for regular interval-based reconciliation these days, + // so set the default resync interval to 0 + defaultResync = 0 +) + +func VirtualizationInformerFactory(rest *rest.Config) (virtInformers.SharedInformerFactory, error) { + client, err := virtClient.NewForConfig(rest) + if err != nil { + return nil, fmt.Errorf("unable to construct lister client: %w", err) + } + return virtInformers.NewSharedInformerFactory(client, defaultResync), nil +} + +func CiliumInformerFactory(restConfig *rest.Config) (ciliumInformers.SharedInformerFactory, error) { + client, err := ciliumClient.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("unable to create Cilium client: %w", err) + } + return ciliumInformers.NewSharedInformerFactory(client, defaultResync), nil +} diff --git a/images/vmi-router/netlinkmanager/manager.go b/images/vm-route-forge/internal/netlinkmanager/manager.go similarity index 74% rename from images/vmi-router/netlinkmanager/manager.go rename to images/vm-route-forge/internal/netlinkmanager/manager.go index ead3697e2..b09b0cd3a 100644 --- a/images/vmi-router/netlinkmanager/manager.go +++ b/images/vm-route-forge/internal/netlinkmanager/manager.go @@ -22,50 +22,69 @@ import ( "fmt" "net" "os" - "sync" ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + ciliumv2Informers "github.com/cilium/cilium/pkg/k8s/client/informers/externalversions/cilium.io/v2" "github.com/cilium/cilium/pkg/node/addressing" "github.com/go-logr/logr" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/client-go/tools/cache" + + virtinformers "github.com/deckhouse/virtualization/api/client/generated/informers/externalversions/core/v1alpha2" + virtlisters "github.com/deckhouse/virtualization/api/client/generated/listers/core/v1alpha2" + cache2 "vm-route-forge/internal/cache" virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" - "vmi-router/netlinkwrap" - "vmi-router/netutil" + netlinkwrap2 "vm-route-forge/internal/netlinkwrap" + "vm-route-forge/internal/netutil" ) const ( CiliumIfaceName = "cilium_host" DefaultCiliumRouteTable = 1490 LocalRouteTable = 255 + netlinkManager = "netlinkManager" ) type Manager struct { - client client.Client + vmLister virtlisters.VirtualMachineLister + cnIndexer cache.Indexer + hasSynced cache.InformerSynced log logr.Logger - nlWrapper *netlinkwrap.Funcs + nlWrapper *netlinkwrap2.Funcs tableId int cidrs []*net.IPNet nodeName string - vmIPs map[types.NamespacedName]string - vmIPsLock sync.RWMutex + cache cache2.Cache } -func New(client client.Client, log logr.Logger, tableId int, cidrs []*net.IPNet, dryRun bool) *Manager { - nlWrapper := netlinkwrap.NewFuncs() +func New(vmInformer virtinformers.VirtualMachineInformer, + cnInformer ciliumv2Informers.CiliumNodeInformer, + cache cache2.Cache, + log logr.Logger, + tableId int, + cidrs []*net.IPNet, + dryRun bool, +) *Manager { + nlWrapper := netlinkwrap2.NewFuncs() if dryRun { - nlWrapper = netlinkwrap.DryRunFuncs() + nlWrapper = netlinkwrap2.DryRunFuncs() } return &Manager{ - client: client, - log: log, + //client: client, + vmLister: vmInformer.Lister(), + cnIndexer: cnInformer.Informer().GetIndexer(), + hasSynced: func() bool { + return vmInformer.Informer().HasSynced() && cnInformer.Informer().HasSynced() + }, + log: log.WithValues("manager", netlinkManager), tableId: tableId, cidrs: cidrs, nlWrapper: nlWrapper, - vmIPs: make(map[types.NamespacedName]string), + cache: cache, } } @@ -73,7 +92,7 @@ func New(client client.Client, log logr.Logger, tableId int, cidrs []*net.IPNet, // Also, it removes existing rules for previously configured CIDRs. func (m *Manager) SyncRules() error { // Get rules state. - rules, err := m.nlWrapper.RuleListFiltered(netlinkwrap.FAMILY_ALL, &netlink.Rule{Table: m.tableId}, netlink.RT_FILTER_TABLE) + rules, err := m.nlWrapper.RuleListFiltered(netlinkwrap2.FAMILY_ALL, &netlink.Rule{Table: m.tableId}, netlink.RT_FILTER_TABLE) if err != nil { return fmt.Errorf("failed to list rules: %v", err) } @@ -118,8 +137,10 @@ func (m *Manager) SyncRules() error { func (m *Manager) SyncRoutes(ctx context.Context) error { // List all Virtual Machines to collect all IPs on this Node. - vmList := &virtv2.VirtualMachineList{} - err := m.client.List(ctx, vmList) + if !cache.WaitForNamedCacheSync(netlinkManager, ctx.Done(), m.hasSynced) { + return fmt.Errorf("cache is not synced") + } + vms, err := m.vmLister.List(labels.Everything()) if err != nil { return fmt.Errorf("list VirtualMachines: %w", err) } @@ -127,7 +148,7 @@ func (m *Manager) SyncRoutes(ctx context.Context) error { vmIPsIdx := make(map[string]struct{}) // Collect managed IPs from all VirtualMachines in the cluster. - for _, vm := range vmList.Items { + for _, vm := range vms { vmIP := vm.Status.IPAddress if vmIP == "" { continue @@ -145,7 +166,7 @@ func (m *Manager) SyncRoutes(ctx context.Context) error { } // Remove routes unknown for vm IPs. - nodeRoutes, err := m.nlWrapper.RouteListFiltered(netlinkwrap.FAMILY_ALL, &netlink.Route{Table: m.tableId}, netlink.RT_FILTER_TABLE) + nodeRoutes, err := m.nlWrapper.RouteListFiltered(netlinkwrap2.FAMILY_ALL, &netlink.Route{Table: m.tableId}, netlink.RT_FILTER_TABLE) if err != nil { return fmt.Errorf("failed to list node routes: %v", err) } @@ -219,19 +240,21 @@ func (m *Manager) UpdateRoute(ctx context.Context, vm *virtv2.VirtualMachine) { // Save IP to the in-memory cache to restore IP later. vmKey := types.NamespacedName{Name: vm.GetName(), Namespace: vm.GetNamespace()} - m.vmIPsLock.Lock() - m.vmIPs[vmKey] = vmIP - m.vmIPsLock.Unlock() // Retrieve a Cilium Node by VMs node name. - ciliumNode := &ciliumv2.CiliumNode{} - err = m.client.Get(ctx, types.NamespacedName{Namespace: "", Name: vm.Status.Node}, ciliumNode) + var ciliumNode *ciliumv2.CiliumNode + + obj, exists, err := m.cnIndexer.GetByKey(vm.Status.Node) if err != nil { - m.log.Error(err, "failed to get cilium node for vmi") + m.log.Error(err, "failed to get cilium node for vm") + } + if exists { + ciliumNode = obj.(*ciliumv2.CiliumNode) } + nodeIP := getCiliumInternalIPAddress(ciliumNode) if nodeIP == "" { - m.log.Error(nil, "CiliumNode has no %s specified\n", addressing.NodeCiliumInternalIP) + m.log.Error(fmt.Errorf("nodeIP is empty"), fmt.Sprintf("CiliumNode has no %s specified", addressing.NodeCiliumInternalIP)) return } nodeIPx := net.ParseIP(nodeIP) @@ -239,6 +262,7 @@ func (m *Manager) UpdateRoute(ctx context.Context, vm *virtv2.VirtualMachine) { m.log.Error(fmt.Errorf(nodeIP), "failed to parse IP address") return } + m.cache.Set(vmKey, cache2.Addresses{VMIP: vmIP, NodeIP: nodeIP}) // Get route for specific nodeIP and create similar for our Virtual Machine. routes, err := m.nlWrapper.RouteGet(nodeIPx) @@ -263,13 +287,16 @@ func (m *Manager) UpdateRoute(ctx context.Context, vm *virtv2.VirtualMachine) { route.Table = m.tableId route.Type = 1 - if err := m.nlWrapper.RouteReplace(&route); err != nil { - m.log.Error(err, fmt.Sprintf("failed to update route '%s' to '%s' for VM %s/%s", fmtRoute(origRoute), fmtRoute(route), vm.GetNamespace(), vm.GetName())) + if err = m.nlWrapper.RouteReplace(&route); err != nil { + m.log.Error(err, fmt.Sprintf("failed to update route %q to %q for VM %s/%s", fmtRoute(origRoute), fmtRoute(route), vm.GetNamespace(), vm.GetName())) } - m.log.Info(fmt.Sprintf("route '%s' updated for VM %s/%s", fmtRoute(route), vm.GetNamespace(), vm.GetName())) + m.log.Info(fmt.Sprintf("route %q updated for VM %s/%s", fmtRoute(route), vm.GetNamespace(), vm.GetName())) } func getCiliumInternalIPAddress(node *ciliumv2.CiliumNode) string { + if node == nil { + return "" + } for _, address := range node.Spec.Addresses { if address.Type == addressing.NodeCiliumInternalIP { return address.IP @@ -281,9 +308,9 @@ func getCiliumInternalIPAddress(node *ciliumv2.CiliumNode) string { func (m *Manager) DeleteRoute(vmKey types.NamespacedName, vmIP string) { if vmIP == "" { // Try to recover IP from the cache. - m.vmIPsLock.RLock() - vmIP = m.vmIPs[vmKey] - m.vmIPsLock.RUnlock() + if addr, found := m.cache.GetAddresses(vmKey); found { + vmIP = addr.VMIP + } } if vmIP == "" { m.log.Info(fmt.Sprintf("Can't retrieve IP for VM %q, it may lead to stale routes.", vmKey.String())) @@ -292,7 +319,7 @@ func (m *Manager) DeleteRoute(vmKey types.NamespacedName, vmIP string) { // Prepare ip with the mask to use as the route destination. vmIPWithNetmask := netutil.AppendHostNetmask(vmIP) - _, vmRouteDst, err := net.ParseCIDR(netutil.AppendHostNetmask(vmIP)) + _, vmRouteDst, err := net.ParseCIDR(vmIPWithNetmask) if err != nil { m.log.Error(err, fmt.Sprintf("failed to parse IP with netmask %s for VM %q", vmIPWithNetmask, vmKey.String())) return @@ -309,9 +336,7 @@ func (m *Manager) DeleteRoute(vmKey types.NamespacedName, vmIP string) { m.log.Info(fmt.Sprintf("route %s deleted for VM %q", fmtRoute(route), vmKey)) // Delete IP from the cache. - m.vmIPsLock.Lock() - delete(m.vmIPs, vmKey) - m.vmIPsLock.Unlock() + m.cache.DeleteByKey(vmKey) } func fmtRoute(route netlink.Route) string { diff --git a/images/vmi-router/netlinkwrap/funcs.go b/images/vm-route-forge/internal/netlinkwrap/funcs.go similarity index 100% rename from images/vmi-router/netlinkwrap/funcs.go rename to images/vm-route-forge/internal/netlinkwrap/funcs.go diff --git a/images/vmi-router/netlinkwrap/funcs_linux.go b/images/vm-route-forge/internal/netlinkwrap/funcs_linux.go similarity index 100% rename from images/vmi-router/netlinkwrap/funcs_linux.go rename to images/vm-route-forge/internal/netlinkwrap/funcs_linux.go diff --git a/images/vmi-router/netlinkwrap/funcs_others.go b/images/vm-route-forge/internal/netlinkwrap/funcs_others.go similarity index 100% rename from images/vmi-router/netlinkwrap/funcs_others.go rename to images/vm-route-forge/internal/netlinkwrap/funcs_others.go diff --git a/images/vmi-router/netutil/cidr.go b/images/vm-route-forge/internal/netutil/cidr.go similarity index 100% rename from images/vmi-router/netutil/cidr.go rename to images/vm-route-forge/internal/netutil/cidr.go diff --git a/images/vm-route-forge/internal/server/healthz.go b/images/vm-route-forge/internal/server/healthz.go new file mode 100644 index 000000000..008fca9ae --- /dev/null +++ b/images/vm-route-forge/internal/server/healthz.go @@ -0,0 +1,29 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import "net/http" + +func (s *Server) getHealthzHandler() http.Handler { + if s.healthzHandler != nil { + return s.healthzHandler + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + }) +} diff --git a/images/vm-route-forge/internal/server/http.go b/images/vm-route-forge/internal/server/http.go new file mode 100644 index 000000000..fe24f1716 --- /dev/null +++ b/images/vm-route-forge/internal/server/http.go @@ -0,0 +1,68 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "errors" + "net" + "net/http" + "time" + + "github.com/go-logr/logr" +) + +func NewHTTPServer(handler http.Handler) *http.Server { + return &http.Server{ + Handler: handler, + MaxHeaderBytes: 1 << 20, + IdleTimeout: 90 * time.Second, + ReadHeaderTimeout: 32 * time.Second, + } +} + +type httpServer struct { + name string + gracefulShutdownTimeout time.Duration + server *http.Server + listener net.Listener + log logr.Logger +} + +func (s *httpServer) Run(ctx context.Context) error { + log := s.log.WithValues("httpServerName", s.name, "addr", s.listener.Addr()) + + serverShutdown := make(chan struct{}) + go func() { + <-ctx.Done() + log.Info("shutting down server") + shutdownCtx, cancel := context.WithTimeout(context.Background(), s.gracefulShutdownTimeout) + defer cancel() + if err := s.server.Shutdown(shutdownCtx); err != nil { + log.Error(err, "error shutting down server") + } + close(serverShutdown) + }() + + log.Info("starting server") + if err := s.server.Serve(s.listener); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + + <-serverShutdown + return nil +} diff --git a/images/vm-route-forge/internal/server/readyz.go b/images/vm-route-forge/internal/server/readyz.go new file mode 100644 index 000000000..30545d25d --- /dev/null +++ b/images/vm-route-forge/internal/server/readyz.go @@ -0,0 +1,60 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "encoding/json" + "fmt" + "net/http" +) + +func (s *Server) getReadyzHandler() http.Handler { + if s.readyzHandler != nil { + return s.readyzHandler + } + unhealthy := func(err error, w http.ResponseWriter) { + res := map[string]interface{}{"connectivity": "failed", "error": fmt.Sprintf("%v", err)} + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(res) + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := s.client. + CoreV1(). + RESTClient(). + Get(). + AbsPath("/version"). + Do(context.Background()). + Raw() + if err != nil { + unhealthy(err, w) + return + } + var version interface{} + err = json.Unmarshal(body, &version) + if err != nil { + unhealthy(err, w) + return + } + res := map[string]interface{}{"connectivity": "ok", "version": version} + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(res) + + }) +} diff --git a/images/vm-route-forge/internal/server/runnable.go b/images/vm-route-forge/internal/server/runnable.go new file mode 100644 index 000000000..eebfaaa44 --- /dev/null +++ b/images/vm-route-forge/internal/server/runnable.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "errors" + "sync" +) + +type Runnable interface { + Run(ctx context.Context) error +} + +func newRunnableGroup() *runnableGroup { + return &runnableGroup{ + runnable: make([]Runnable, 0), + } +} + +type runnableGroup struct { + runnable []Runnable + startOnce sync.Once + err error +} + +func (r *runnableGroup) Add(runnable Runnable) { + r.runnable = append(r.runnable, runnable) +} + +func (r *runnableGroup) Run(ctx context.Context) error { + r.startOnce.Do(func() { + r.err = r.run(ctx) + }) + return r.err +} + +func (r *runnableGroup) run(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + wg := sync.WaitGroup{} + var mu sync.Mutex + var retErr error + for _, runnable := range r.runnable { + wg.Add(1) + runnable := runnable + go func() { + defer wg.Done() + if err := runnable.Run(ctx); err != nil { + mu.Lock() + retErr = errors.Join(retErr, err) + mu.Unlock() + } + cancel() + }() + } + wg.Wait() + return retErr +} diff --git a/images/vm-route-forge/internal/server/server.go b/images/vm-route-forge/internal/server/server.go new file mode 100644 index 000000000..008523d51 --- /dev/null +++ b/images/vm-route-forge/internal/server/server.go @@ -0,0 +1,143 @@ +/* +Copyright 2024 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "github.com/go-logr/logr" + "k8s.io/client-go/kubernetes" +) + +const ( + defaultGracefulShutdownPeriod = 30 * time.Second + defaultReadinessEndpoint = "/readyz" + defaultLivenessEndpoint = "/healthz" +) + +type Server struct { + runnableGroup *runnableGroup + gracefulShutdownTimeout time.Duration + healthProbeListener net.Listener + readyzHandler http.Handler + healthzHandler http.Handler + readinessEndpointRoute string + livenessEndpointRoute string + + client kubernetes.Interface + log logr.Logger +} + +func (s *Server) Run(ctx context.Context) error { + if s.healthProbeListener != nil { + if err := s.addHealthProbeServer(); err != nil { + return fmt.Errorf("failed to add health probe server: %w", err) + } + } + return s.runnableGroup.run(ctx) +} + +func (s *Server) addHealthProbeServer() error { + mux := http.NewServeMux() + srv := NewHTTPServer(mux) + + if s.readyzHandler != nil { + mux.Handle(s.readinessEndpointRoute, http.StripPrefix(s.readinessEndpointRoute, s.getReadyzHandler())) + // Append '/' suffix to handle subpaths + mux.Handle(s.readinessEndpointRoute+"/", http.StripPrefix(s.readinessEndpointRoute, s.getReadyzHandler())) + } + if s.healthzHandler != nil { + mux.Handle(s.livenessEndpointRoute, http.StripPrefix(s.livenessEndpointRoute, s.getHealthzHandler())) + // Append '/' suffix to handle subpaths + mux.Handle(s.livenessEndpointRoute+"/", http.StripPrefix(s.livenessEndpointRoute, s.getHealthzHandler())) + } + + s.Add(&httpServer{ + name: "health", + gracefulShutdownTimeout: s.gracefulShutdownTimeout, + listener: s.healthProbeListener, + server: srv, + log: s.log, + }) + return nil +} + +func (s *Server) Add(r Runnable) { + s.runnableGroup.Add(r) +} + +type Options struct { + HealthProbeBindAddress string + ReadinessEndpointRoute string + LivenessEndpointRoute string + GracefulShutdownTimeout *time.Duration + ReadyzHandler http.Handler + HealthzHandler http.Handler +} + +func setOptionsDefault(options Options) Options { + if options.GracefulShutdownTimeout == nil { + gracefulShutdownTimeout := defaultGracefulShutdownPeriod + options.GracefulShutdownTimeout = &gracefulShutdownTimeout + } + if options.ReadinessEndpointRoute == "" { + options.ReadinessEndpointRoute = defaultReadinessEndpoint + } + if options.LivenessEndpointRoute == "" { + options.LivenessEndpointRoute = defaultLivenessEndpoint + } + return options +} + +func NewServer(client kubernetes.Interface, options Options, log logr.Logger) (*Server, error) { + options = setOptionsDefault(options) + + // Create health probes listener. This will throw an error if the bind + // address is invalid or already in use. + healthProbeListener, err := defaultHealthProbeListener(options.HealthProbeBindAddress) + if err != nil { + return nil, err + } + + return &Server{ + healthProbeListener: healthProbeListener, + gracefulShutdownTimeout: *options.GracefulShutdownTimeout, + readinessEndpointRoute: options.ReadinessEndpointRoute, + livenessEndpointRoute: options.LivenessEndpointRoute, + runnableGroup: newRunnableGroup(), + readyzHandler: options.ReadyzHandler, + healthzHandler: options.HealthzHandler, + client: client, + log: log, + }, nil +} + +func defaultHealthProbeListener(addr string) (net.Listener, error) { + if addr == "" || addr == "0" { + return nil, nil + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("error listening on %s: %w", addr, err) + } + return ln, nil +} diff --git a/images/vmi-router/werf.inc.yaml b/images/vm-route-forge/werf.inc.yaml similarity index 65% rename from images/vmi-router/werf.inc.yaml rename to images/vm-route-forge/werf.inc.yaml index 0574a7ae7..3ab5bfcd3 100644 --- a/images/vmi-router/werf.inc.yaml +++ b/images/vm-route-forge/werf.inc.yaml @@ -4,7 +4,7 @@ final: false from: {{ .Images.BASE_GOLANG_21_BOOKWORM }} git: - add: /images/{{ $.ImageName }} - to: /app/images/vmi-router + to: /app/images/vm-route-forge stageDependencies: install: - go.mod @@ -21,23 +21,23 @@ mount: to: /go/pkg shell: install: - - cd /app/images/vmi-router + - cd /app/images/vm-route-forge - go mod download setup: - - cd /app/images/vmi-router + - cd /app/images/vm-route-forge - export GOOS=linux - export GOARCH=amd64 - export CGO_ENABLED=0 - - go build -v -a -o vmi-router main.go + - go build -v -a -o vm-route-forge cmd/vm-route-forge/main.go --- image: {{ $.ImageName }} from: {{ .Images.BASE_SCRATCH }} import: -- image: vmi-router-builder - add: /app/images/vmi-router/vmi-router - to: /app/vmi-router +- image: vm-route-forge-builder + add: /app/images/vm-route-forge/vm-route-forge + to: /app/vm-route-forge after: install docker: USER: "65532:65532" WORKDIR: "/app" - ENTRYPOINT: ["/app/vmi-router"] + ENTRYPOINT: ["/app/vm-route-forge"] diff --git a/images/vmi-router/controllers/suite_test.go b/images/vmi-router/controllers/suite_test.go deleted file mode 100644 index a597d46d3..000000000 --- a/images/vmi-router/controllers/suite_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2023 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "path/filepath" - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" -) - -// These tests use Ginkgo (BDD-style Go testing framework). Refer to -// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. - -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment - -func TestAPIs(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Controller Suite") -} - -var _ = BeforeSuite(func() { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - - By("bootstrapping test environment") - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "crds")}, - ErrorIfCRDPathMissing: true, - } - - var err error - // cfg is defined in this file globally. - cfg, err = testEnv.Start() - Expect(err).NotTo(HaveOccurred()) - Expect(cfg).NotTo(BeNil()) - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient).NotTo(BeNil()) -}) - -var _ = AfterSuite(func() { - By("tearing down the test environment") - err := testEnv.Stop() - Expect(err).NotTo(HaveOccurred()) -}) diff --git a/images/vmi-router/controllers/vmirouter_controller.go b/images/vmi-router/controllers/vmirouter_controller.go deleted file mode 100644 index 09dff309a..000000000 --- a/images/vmi-router/controllers/vmirouter_controller.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2023 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "github.com/go-logr/logr" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/manager" - "vmi-router/netlinkmanager" -) - -const controllerName = "vm-cilium-router" - -func NewVMRouterController( - mgr manager.Manager, - log logr.Logger, - netlinkMgr *netlinkmanager.Manager, -) error { - reconciler := &VMRouterReconciler{ - client: mgr.GetClient(), - cache: mgr.GetCache(), - recorder: mgr.GetEventRecorderFor(controllerName), - scheme: mgr.GetScheme(), - log: log.WithName(controllerName), - netlinkMgr: netlinkMgr, - } - - // Add new controller to manager. - ctrl, err := controller.New(controllerName, mgr, controller.Options{Reconciler: reconciler}) - if err != nil { - return err - } - - // Add watches to controller. - if err = reconciler.SetupWatches(mgr, ctrl); err != nil { - return err - } - - log.Info("Initialized VMI Router controller") - return nil -} diff --git a/images/vmi-router/controllers/vmirouter_reconciler.go b/images/vmi-router/controllers/vmirouter_reconciler.go deleted file mode 100644 index 3ce44cd8b..000000000 --- a/images/vmi-router/controllers/vmirouter_reconciler.go +++ /dev/null @@ -1,96 +0,0 @@ -/* -Copyright 2024 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - "fmt" - - "github.com/go-logr/logr" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - virtv2 "github.com/deckhouse/virtualization/api/core/v1alpha2" - "vmi-router/netlinkmanager" -) - -type VMRouterReconciler struct { - client client.Client - cache cache.Cache - recorder record.EventRecorder - scheme *runtime.Scheme - log logr.Logger - netlinkMgr *netlinkmanager.Manager -} - -func (r *VMRouterReconciler) SetupWatches(mgr manager.Manager, ctr controller.Controller) error { - if err := ctr.Watch(source.Kind(mgr.GetCache(), &virtv2.VirtualMachine{}), &handler.EnqueueRequestForObject{}, - predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { - r.log.V(4).Info(fmt.Sprintf("Got CREATE event for VM %s/%s", e.Object.GetNamespace(), e.Object.GetName())) - return true - }, - DeleteFunc: func(e event.DeleteEvent) bool { - r.log.V(4).Info(fmt.Sprintf("Got DELETE event for VM %s/%s", e.Object.GetNamespace(), e.Object.GetName())) - return true - }, - UpdateFunc: func(e event.UpdateEvent) bool { - r.log.V(4).Info(fmt.Sprintf("Got UPDATE event for VM %s/%s", e.ObjectNew.GetNamespace(), e.ObjectNew.GetName())) - return true - }, - }, - ); err != nil { - return fmt.Errorf("error setting watch on VM: %w", err) - } - - return nil -} - -func (r *VMRouterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - r.log.V(4).Info(fmt.Sprintf("Got reconcile request for %s", req.String())) - - // Start with retrieving affected VMI. - vm := &virtv2.VirtualMachine{} - err := r.client.Get(ctx, req.NamespacedName, vm) - if err != nil { - if k8serrors.IsNotFound(err) { - r.netlinkMgr.DeleteRoute(req.NamespacedName, "") - return reconcile.Result{}, nil - } - r.log.Error(err, fmt.Sprintf("fail to retrieve vm/%s", req.String())) - return reconcile.Result{}, err - } - - // Delete route on VM deletion. - if vm.GetDeletionTimestamp() != nil { - r.netlinkMgr.DeleteRoute(req.NamespacedName, vm.Status.IPAddress) - return reconcile.Result{}, nil - } - - r.netlinkMgr.UpdateRoute(ctx, vm) - return reconcile.Result{}, nil -} diff --git a/images/vmi-router/main.go b/images/vmi-router/main.go deleted file mode 100644 index c68153b58..000000000 --- a/images/vmi-router/main.go +++ /dev/null @@ -1,215 +0,0 @@ -/* -Copyright 2023,2024 Flant JSC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "context" - "flag" - "fmt" - "net" - "os" - "strconv" - - ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - virtv1alpha2 "github.com/deckhouse/virtualization/api/core/v1alpha2" - "go.uber.org/zap/zapcore" - "k8s.io/apimachinery/pkg/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client/config" - "sigs.k8s.io/controller-runtime/pkg/healthz" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - - "vmi-router/controllers" - "vmi-router/netlinkmanager" - "vmi-router/netutil" -) - -const ( - defaultVerbosity = "1" - appName = "vmi-router" - NodeNameEnv = "NODE_NAME" - CiliumRouteTableIdEnv = "CILIUM_ROUTE_TABLE_ID" -) - -var ( - log = ctrl.Log.WithName(appName) - nodeName = os.Getenv(NodeNameEnv) - resourcesSchemeFuncs = []func(*runtime.Scheme) error{ - clientgoscheme.AddToScheme, - ciliumv2.AddToScheme, - virtv1alpha2.AddToScheme, - } -) - -func setupLogger() { - verbose := defaultVerbosity - if verboseEnvVarVal := os.Getenv("VERBOSITY"); verboseEnvVarVal != "" { - verbose = verboseEnvVarVal - } - // visit actual flags passed in and if passed check -v and set verbose - if fv := flag.Lookup("v"); fv != nil { - verbose = fv.Value.String() - } - if verbose == defaultVerbosity { - log.V(1).Info(fmt.Sprintf("Note: increase the -v level in the controller deployment for more detailed logging, eg. -v=%d or -v=%d\n", 2, 3)) - } - verbosityLevel, err := strconv.Atoi(verbose) - debug := false - if err == nil && verbosityLevel > 1 { - debug = true - } - - // The logger instantiated here can be changed to any logger - // implementing the logr.Logger interface. This logger will - // be propagated through the whole operator, generating - // uniform and structured logs. - logf.SetLogger(zap.New(zap.Level(zapcore.Level(-1*verbosityLevel)), zap.UseDevMode(debug))) -} - -func main() { - var cidrs netutil.CIDRSet - var dryRun bool - var metricsAddr string - var probeAddr string - flag.Var(&cidrs, "cidr", "CIDRs enabled to route (multiple flags allowed)") - flag.BoolVar(&dryRun, "dry-run", false, "Don't perform any changes on the node.") - flag.StringVar(&metricsAddr, "metrics-bind-address", ":0", "The address the metric endpoint binds to.") - flag.StringVar(&probeAddr, "health-probe-bind-address", ":0", "The address the probe endpoint binds to.") - opts := zap.Options{ - Development: true, - } - opts.BindFlags(flag.CommandLine) - flag.Parse() - - setupLogger() - - var parsedCIDRs []*net.IPNet - for _, cidr := range cidrs { - _, parsedCIDR, err := net.ParseCIDR(cidr) - if err != nil || parsedCIDR == nil { - log.Error(err, "failed to parse passed CIDRs") - os.Exit(1) - } - parsedCIDRs = append(parsedCIDRs, parsedCIDR) - } - log.Info(fmt.Sprintf("Got CIDRs to manage: %+v", cidrs)) - - if dryRun { - log.Info("Dry run mode is enabled, will not change network rules and routes") - } - - ciliumTableId := netlinkmanager.DefaultCiliumRouteTable - ciliumTableIdStr := os.Getenv(CiliumRouteTableIdEnv) - if ciliumTableIdStr != "" { - tableId, err := strconv.ParseInt(ciliumTableIdStr, 10, 32) - if err != nil { - log.Error(err, "failed to parse Cilium table id, should be integer") - os.Exit(1) - } - ciliumTableId = int(tableId) - } - log.Info(fmt.Sprintf("Use cilium route table id %d", ciliumTableId)) - - // Load configuration to connect to Kubernetes API Server. - kubeCfg, err := config.GetConfig() - if err != nil { - log.Error(err, "Failed to load Kubernetes config") - os.Exit(1) - } - - // Setup scheme for all used resources (needed for controller-runtime). - scheme := runtime.NewScheme() - for _, f := range resourcesSchemeFuncs { - err = f(scheme) - if err != nil { - log.Error(err, "Failed to add to scheme") - os.Exit(1) - } - } - - // This controller watches resources in all namespaces without leader election. - // Start metrics and health probe listeners on random ports as hostNetwork is used. - managerOpts := manager.Options{ - LeaderElection: false, - Scheme: scheme, - Metrics: metricsserver.Options{ - BindAddress: metricsAddr, - }, - HealthProbeBindAddress: probeAddr, - } - - mgr, err := ctrl.NewManager(kubeCfg, managerOpts) - if err != nil { - log.Error(err, "Unable to create manager") - os.Exit(1) - } - - // Setup context to gracefully handle termination. - ctx := signals.SetupSignalHandler() - - // Create netlink manager. - netlinkMgr := netlinkmanager.New(mgr.GetClient(), log, ciliumTableId, parsedCIDRs, dryRun) - - // Setup main controller with its dependencies. - if err = controllers.NewVMRouterController(mgr, log, netlinkMgr); err != nil { - log.Error(err, "Unable to add vmi router controller to manager") - os.Exit(1) - } - - // Init rules and cleanup unused routes at start. - err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { - log.Info("Synchronize route rules at start") - err := netlinkMgr.SyncRules() - if err != nil { - log.Error(err, fmt.Sprintf("failed to synchronize routing rules ar start")) - return err - } - - log.Info("Synchronize VM routes at start") - err = netlinkMgr.SyncRoutes(ctx) - if err != nil { - log.Error(err, fmt.Sprintf("failed to synchronize VM routes at start")) - return err - } - - return nil - })) - if err != nil { - log.Error(err, "Add routes synchronizer") - os.Exit(1) - } - - if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { - log.Error(err, "Unable to set up health check") - os.Exit(1) - } - if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { - log.Error(err, "Unable to set up ready check") - os.Exit(1) - } - - log.Info("Starting manager") - if err = mgr.Start(ctx); err != nil { - log.Error(err, "Unable to start manager") - os.Exit(1) - } -} diff --git a/openapi/config-values.yaml b/openapi/config-values.yaml index f0b0aec7d..cb50ef136 100644 --- a/openapi/config-values.yaml +++ b/openapi/config-values.yaml @@ -193,7 +193,7 @@ properties: - `virtualization-api` - `virtualization-controller` - `kube-api-proxy` - - `vmi-router` + - `vm-route-forge` enum: - "debug" - "info" diff --git a/openapi/doc-ru-config-values.yaml b/openapi/doc-ru-config-values.yaml index ccad5f064..a9775f688 100644 --- a/openapi/doc-ru-config-values.yaml +++ b/openapi/doc-ru-config-values.yaml @@ -100,4 +100,4 @@ properties: - `virtualization-api` - `virtualization-controller` - `kube-api-proxy` - - `vmi-router` + - `vm-route-forge` diff --git a/templates/kubevirt/vmi-router/daemonset.yaml b/templates/kubevirt/vm-route-forge/daemonset.yaml similarity index 73% rename from templates/kubevirt/vmi-router/daemonset.yaml rename to templates/kubevirt/vm-route-forge/daemonset.yaml index f28aaebed..f1eece68d 100644 --- a/templates/kubevirt/vmi-router/daemonset.yaml +++ b/templates/kubevirt/vm-route-forge/daemonset.yaml @@ -1,4 +1,4 @@ -{{- define "vmi_router_resources" }} +{{- define "vm-route-forge_resources" }} cpu: 10m memory: 25Mi {{- end }} @@ -8,21 +8,21 @@ memory: 25Mi apiVersion: autoscaling.k8s.io/v1 kind: VerticalPodAutoscaler metadata: - name: vmi-router + name: vm-route-forge namespace: d8-{{ .Chart.Name }} - {{- include "helm_lib_module_labels" (list . (dict "app" "vmi-router" "workload-resource-policy.deckhouse.io" "every-node")) | nindent 2 }} + {{- include "helm_lib_module_labels" (list . (dict "app" "vm-route-forge" "workload-resource-policy.deckhouse.io" "every-node")) | nindent 2 }} spec: targetRef: apiVersion: "apps/v1" kind: DaemonSet - name: vmi-router + name: vm-route-forge updatePolicy: updateMode: "Auto" resourcePolicy: containerPolicies: - - containerName: vmi-router + - containerName: vm-route-forge minAllowed: - {{- include "vmi_router_resources" . | nindent 8 }} + {{- include "vm-route-forge_resources" . | nindent 8 }} maxAllowed: cpu: 20m memory: 25Mi @@ -31,34 +31,34 @@ spec: apiVersion: apps/v1 kind: DaemonSet metadata: - name: vmi-router + name: vm-route-forge namespace: d8-{{ .Chart.Name }} {{- include "helm_lib_module_labels" (list .) | nindent 2 }} spec: selector: matchLabels: - app: vmi-router + app: vm-route-forge template: metadata: labels: - app: vmi-router + app: vm-route-forge spec: {{- include "helm_lib_priority_class" (tuple . "system-cluster-critical") | nindent 6 }} {{- include "helm_lib_tolerations" (tuple . "any-node") | nindent 6 }} {{- include "helm_lib_module_pod_security_context_run_as_user_root" . | nindent 6 }} imagePullSecrets: - - name: virtualization-module-registry - serviceAccountName: vmi-router + - name: virtualization-module-registry + serviceAccountName: vm-route-forge hostNetwork: true dnsPolicy: ClusterFirstWithHostNet containers: - - name: vmi-router + - name: vm-route-forge args: {{- range .Values.virtualization.virtualMachineCIDRs }} - --cidr={{ . }} {{- end }} {{- include "helm_lib_module_container_security_context_privileged" . | nindent 10 }} - image: {{ include "helm_lib_module_image" (list . "vmiRouter") }} + image: {{ include "helm_lib_module_image" (list . "vmRouteForge") }} imagePullPolicy: "IfNotPresent" env: - name: NODE_NAME @@ -73,6 +73,6 @@ spec: requests: {{- include "helm_lib_module_ephemeral_storage_only_logs" . | nindent 14 }} {{- if not ( .Values.global.enabledModules | has "vertical-pod-autoscaler-crd") }} - {{- include "vmi_router_resources" . | nindent 14 }} + {{- include "vm-route-forge_resources" . | nindent 14 }} {{- end }} {{- end }} diff --git a/templates/kubevirt/vmi-router/rbac-for-us.yaml b/templates/kubevirt/vm-route-forge/rbac-for-us.yaml similarity index 55% rename from templates/kubevirt/vmi-router/rbac-for-us.yaml rename to templates/kubevirt/vm-route-forge/rbac-for-us.yaml index 4e5ad8411..4257521c4 100644 --- a/templates/kubevirt/vmi-router/rbac-for-us.yaml +++ b/templates/kubevirt/vm-route-forge/rbac-for-us.yaml @@ -3,15 +3,15 @@ apiVersion: v1 kind: ServiceAccount metadata: - name: vmi-router + name: vm-route-forge namespace: d8-{{ .Chart.Name }} - {{- include "helm_lib_module_labels" (list . (dict "app" "vmi-router")) | nindent 2 }} + {{- include "helm_lib_module_labels" (list . (dict "app" "vm-route-forge")) | nindent 2 }} --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: - name: d8:{{ .Chart.Name }}:vmi-router - {{- include "helm_lib_module_labels" (list . (dict "app" "vmi-router")) | nindent 2 }} + name: d8:{{ .Chart.Name }}:vm-route-forge + {{- include "helm_lib_module_labels" (list . (dict "app" "vm-route-forge")) | nindent 2 }} rules: - apiGroups: - virtualization.deckhouse.io @@ -29,18 +29,26 @@ rules: - list - get - watch + - apiGroups: + - "" + resources: + - nodes + verbs: + - list + - get + - watch --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: - name: d8:{{ .Chart.Name }}:vmi-router - {{- include "helm_lib_module_labels" (list . (dict "app" "vmi-router")) | nindent 2 }} + name: d8:{{ .Chart.Name }}:vm-route-forge + {{- include "helm_lib_module_labels" (list . (dict "app" "vm-route-forge")) | nindent 2 }} subjects: - kind: ServiceAccount - name: vmi-router + name: vm-route-forge namespace: d8-{{ .Chart.Name }} roleRef: kind: ClusterRole - name: d8:{{ .Chart.Name }}:vmi-router + name: d8:{{ .Chart.Name }}:vm-route-forge apiGroup: rbac.authorization.k8s.io {{- end }}