Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance and efficiency improvements in daemon/server mode #1154

Merged
merged 13 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
98 changes: 30 additions & 68 deletions cmd/multus-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"syscall"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"

"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
Expand Down Expand Up @@ -58,9 +57,6 @@ func main() {
os.Exit(4)
}

configWatcherDoneChannel := make(chan struct{})
serverDoneChannel := make(chan struct{})
multusConfigFile := ""
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -87,65 +83,42 @@ func main() {
logging.Verbosef("Readiness Indicator file check done!")
}

if err := startMultusDaemon(ctx, daemonConf, serverDoneChannel); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}

// Wait until daemon ready
logging.Verbosef("API readiness check")
if waitUntilAPIReady(daemonConf.SocketDir) != nil {
logging.Panicf("failed to ready multus-daemon socket: %v", err)
os.Exit(1)
}
logging.Verbosef("API readiness check done!")

// Generate multus CNI config from current CNI config
var configManager *config.Manager
var ignoreReadinessIndicator bool
if multusConf.MultusConfigFile == "auto" {
if multusConf.CNIVersion == "" {
_ = logging.Errorf("the CNI version is a mandatory parameter when the '-multus-config-file=auto' option is used")
}

var configManager *config.Manager
if multusConf.MultusMasterCni == "" {
configManager, err = config.NewManager(*multusConf, multusConf.MultusAutoconfigDir, multusConf.ForceCNIVersion)
} else {
configManager, err = config.NewManagerWithExplicitPrimaryCNIPlugin(
*multusConf, multusConf.MultusAutoconfigDir, multusConf.MultusMasterCni, multusConf.ForceCNIVersion)
}
// Generate multus CNI config from current CNI config
configManager, err = config.NewManager(*multusConf)
if err != nil {
_ = logging.Errorf("failed to create the configuration manager for the primary CNI plugin: %v", err)
os.Exit(2)
}

if multusConf.OverrideNetworkName {
if err := configManager.OverrideNetworkName(); err != nil {
_ = logging.Errorf("could not override the network name: %v", err)
}
}

generatedMultusConfig, err := configManager.GenerateConfig()
if err != nil {
_ = logging.Errorf("failed to generated the multus configuration: %v", err)
}
logging.Verbosef("Generated MultusCNI config: %s", generatedMultusConfig)

multusConfigFile, err = configManager.PersistMultusConfig(generatedMultusConfig)
if err != nil {
_ = logging.Errorf("failed to persist the multus configuration: %v", err)
}

go func(ctx context.Context, doneChannel chan<- struct{}) {
if err := configManager.MonitorPluginConfiguration(ctx, doneChannel); err != nil {
_ = logging.Errorf("error watching file: %v", err)
}
}(ctx, configWatcherDoneChannel)
// ConfigManager watches the readiness indicator file (if configured)
// and exits the daemon when that is removed. The CNIServer does
// not need to re-do that check every CNI operation
ignoreReadinessIndicator = true
} else {
if err := copyUserProvidedConfig(multusConf.MultusConfigFile, multusConf.CniConfigDir); err != nil {
logging.Errorf("failed to copy the user provided configuration %s: %v", multusConf.MultusConfigFile, err)
}
}

if err := startMultusDaemon(ctx, daemonConf, ignoreReadinessIndicator); err != nil {
logging.Panicf("failed start the multus thick-plugin listener: %v", err)
os.Exit(3)
}

// Wait until daemon ready
logging.Verbosef("API readiness check")
if waitUntilAPIReady(daemonConf.SocketDir) != nil {
logging.Panicf("failed to ready multus-daemon socket: %v", err)
os.Exit(1)
}
logging.Verbosef("API readiness check done!")

signalCh := make(chan os.Signal, 16)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
Expand All @@ -156,15 +129,11 @@ func main() {
}()

var wg sync.WaitGroup
if multusConf.MultusConfigFile == "auto" {
wg.Add(1)
go func() {
<-configWatcherDoneChannel
logging.Verbosef("ConfigWatcher done")
logging.Verbosef("Delete old config @ %v", multusConfigFile)
os.Remove(multusConfigFile)
wg.Done()
}()
if configManager != nil {
if err := configManager.Start(ctx, &wg); err != nil {
_ = logging.Errorf("failed to start config manager: %v", err)
os.Exit(3)
}
}

wg.Wait()
Expand All @@ -181,7 +150,7 @@ func waitUntilAPIReady(socketPath string) error {
})
}

func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, done chan struct{}) error {
func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, ignoreReadinessIndicator bool) error {
if user, err := user.Current(); err != nil || user.Uid != "0" {
return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid)
}
Expand All @@ -190,7 +159,7 @@ func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf,
return fmt.Errorf("failed to prepare the cni-socket for communicating with the shim: %w", err)
}

server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents)
server, err := srv.NewCNIServer(daemonConfig, daemonConfig.ConfigFileContents, ignoreReadinessIndicator)
if err != nil {
return fmt.Errorf("failed to create the server: %v", err)
}
Expand All @@ -208,15 +177,8 @@ func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf,
return fmt.Errorf("failed to start the CNI server using socket %s. Reason: %+v", api.SocketPath(daemonConfig.SocketDir), err)
}

server.SetKeepAlivesEnabled(false)
go func() {
utilwait.UntilWithContext(ctx, func(ctx context.Context) {
logging.Debugf("open for business")
if err := server.Serve(l); err != nil {
utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err))
}
}, 0)
}()
server.Start(ctx, l)

go func() {
<-ctx.Done()
server.Shutdown(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions cmd/multus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {

skel.PluginMain(
func(args *skel.CmdArgs) error {
result, err := multus.CmdAdd(args, nil, nil)
result, err := multus.CmdAdd(args, nil, nil, nil)
if err != nil {
return err
}
Expand All @@ -54,6 +54,6 @@ func main() {
func(args *skel.CmdArgs) error {
return multus.CmdCheck(args, nil, nil)
},
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil) },
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil, nil) },
cniversion.All, "meta-plugin that delegates to other CNI plugins")
}
6 changes: 6 additions & 0 deletions deployments/multus-daemonset-thick.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ rules:
- pods/status
verbs:
- get
- list
- update
- apiGroups:
- ""
Expand Down Expand Up @@ -183,6 +184,11 @@ spec:
- name: hostroot
mountPath: /hostroot
mountPropagation: HostToContainer
env:
- name: MULTUS_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
initContainers:
- name: install-multus-binary
image: ghcr.io/k8snetworkplumbingwg/multus-cni:snapshot-thick
Expand Down
6 changes: 6 additions & 0 deletions e2e/templates/multus-daemonset-thick.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ rules:
- pods/status
verbs:
- get
- list
- update
- apiGroups:
- ""
Expand Down Expand Up @@ -157,6 +158,11 @@ spec:
- name: multus-daemon-config
mountPath: /etc/cni/net.d/multus.d
readOnly: true
env:
- name: MULTUS_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
initContainers:
- name: install-multus-shim
image: localhost:5000/multus:e2e
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
k8s.io/api v0.22.8
k8s.io/apimachinery v0.22.8
k8s.io/client-go v0.22.8
k8s.io/client-go v1.5.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220413171646-5e7f5fdc6da6 // indirect
Expand Down
19 changes: 12 additions & 7 deletions pkg/k8sclient/k8sclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,14 @@ func TryLoadPodDelegates(pod *v1.Pod, conf *types.NetConf, clientInfo *ClientInf
// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to
// the k8s API.
func InClusterK8sClient() (*ClientInfo, error) {
config, err := rest.InClusterConfig()
clientInfo, err := GetK8sClient("", nil)
if err != nil {
return nil, err
}

logging.Debugf("InClusterK8sClient: in cluster config: %+v", config)
return NewClientInfo(config)
if clientInfo == nil {
return nil, fmt.Errorf("failed to create in-cluster kube client")
}
return clientInfo, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1061123 just want to make sure this approach works with your incoming changes?

}

// GetK8sClient gets client info from kubeconfig
Expand Down Expand Up @@ -440,13 +441,17 @@ func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error
config.ContentType = "application/vnd.kubernetes.protobuf"
// Set the config timeout to one minute.
config.Timeout = time.Minute
// Allow multus (especially in server mode) to make more concurrent requests
// to reduce client-side throttling
config.QPS = 50
config.Burst = 50

return NewClientInfo(config)
return newClientInfo(config)
}

// NewClientInfo returns a `ClientInfo` from a configuration created from an
// newClientInfo returns a `ClientInfo` from a configuration created from an
// existing kubeconfig file.
func NewClientInfo(config *rest.Config) (*ClientInfo, error) {
func newClientInfo(config *rest.Config) (*ClientInfo, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
Expand Down
Loading