Skip to content

Commit

Permalink
add topology awareness for node plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Roushan committed Aug 31, 2021
1 parent e59580d commit 9d5246f
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 5 deletions.
12 changes: 11 additions & 1 deletion src/csi/driver/driver.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package driver

import (
"strings"
"utils/k8sutils"
)

type Driver struct {
name string
version string
useMultiPath bool
isNeedMultiPath bool
k8sUtils k8sutils.Interface
nodeName string
}

func NewDriver(name, version string, useMultiPath, isNeedMultiPath bool) *Driver {
func NewDriver(name, version string, useMultiPath, isNeedMultiPath bool,
k8sUtils k8sutils.Interface, nodeName string) *Driver {
return &Driver{
name: name,
version: version,
useMultiPath: useMultiPath,
isNeedMultiPath: isNeedMultiPath,
k8sUtils: k8sUtils,
nodeName: strings.TrimSpace(nodeName),
}
}
7 changes: 7 additions & 0 deletions src/csi/driver/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func (d *Driver) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCa
},
},
},
&csi.PluginCapability{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
},
}, nil
}
Expand Down
18 changes: 17 additions & 1 deletion src/csi/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,26 @@ func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (
log.Errorf("Marshal node info of %s error: %v", nodeBytes, err)
return nil, status.Error(codes.Internal, err.Error())
}

log.Infof("Get NodeId %s", nodeBytes)

if d.nodeName == "" {
return &csi.NodeGetInfoResponse{
NodeId: string(nodeBytes),
}, nil
}

// Get topology info from Node labels
topology, err := d.k8sUtils.GetNodeTopology(d.nodeName)
if err != nil {
log.Errorln(err)
return nil, status.Error(codes.Internal, err.Error())
}

return &csi.NodeGetInfoResponse{
NodeId: string(nodeBytes),
AccessibleTopology: &csi.Topology{
Segments: topology,
},
}, nil
}

Expand Down
24 changes: 21 additions & 3 deletions src/csi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime/debug"
"time"
"utils"
"utils/k8sutils"
"utils/log"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand All @@ -30,6 +31,8 @@ const (

csiVersion = "2.2.13"
defaultDriverName = "csi.huawei.com"

nodeNameEnv = "CSI_NODENAME"
)

var (
Expand All @@ -54,6 +57,12 @@ var (
volumeUseMultiPath = flag.Bool("volume-use-multipath",
true,
"Whether to use multipath when attach block volume")
kubeconfig = flag.String("kubeconfig",
"",
"absolute path to the kubeconfig file")
nodeName = flag.String("nodename",
os.Getenv(nodeNameEnv),
"absolute path to the kubeconfig file")

config CSIConfig
secret CSISecret
Expand All @@ -64,7 +73,7 @@ type CSIConfig struct {
}

type CSISecret struct {
Secrets map[string]interface{} `json:"secrets"`
Secrets map[string]interface{} `json:"secrets"`
}

func init() {
Expand Down Expand Up @@ -97,6 +106,10 @@ func init() {

_ = mergeData(config, secret)

if "" == *nodeName {
logrus.Warning("Node name is empty. Topology aware volume provisioning feature may not behave normal")
}

if *containerized {
*controllerFlagFile = ""
}
Expand Down Expand Up @@ -199,10 +212,15 @@ func main() {
log.Fatalf("Listen on %s error: %v", *endpoint, err)
}

k8sUtils, err := k8sutils.NewK8SUtils(*kubeconfig)
if err != nil {
log.Fatalf("Kubernetes client initialization failed %v", err)
}

isNeedMultiPath := utils.NeedMultiPath(config.Backends)
d := driver.NewDriver(*driverName, csiVersion, *volumeUseMultiPath, isNeedMultiPath)
server := grpc.NewServer()
d := driver.NewDriver(*driverName, csiVersion, *volumeUseMultiPath, isNeedMultiPath, k8sUtils, *nodeName)

server := grpc.NewServer()
csi.RegisterIdentityServer(server, d)
csi.RegisterControllerServer(server, d)
csi.RegisterNodeServer(server, d)
Expand Down
75 changes: 75 additions & 0 deletions src/utils/k8sutils/k8sutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package k8sutils

import (
"errors"
"fmt"
"regexp"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
topologyRegx = "topology.kubernetes.io/*"
)

type Interface interface {
GetNodeTopology(nodeName string) (map[string]string, error)
}

type KubeClient struct {
clientSet *kubernetes.Clientset
}

func NewK8SUtils(kubeConfig string) (Interface, error) {
var clientset *kubernetes.Clientset

if kubeConfig != "" {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return nil, err
}

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
} else {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}

clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
}

return &KubeClient{
clientSet: clientset,
}, nil
}

func (k *KubeClient) GetNodeTopology(nodeName string) (map[string]string, error) {
k8sNode, err := k.getNode(nodeName)
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get node topology with error: %v", err))
}

topology := make(map[string]string)
for key, value := range k8sNode.Labels {
if match, err := regexp.MatchString(topologyRegx, key); err == nil && match {
topology[key] = value
}
}

return topology, nil
}

func (k *KubeClient) getNode(nodeName string) (*corev1.Node, error) {
return k.clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
}
5 changes: 5 additions & 0 deletions yamls/deploy/huawei-csi-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ spec:
env:
- name: CSI_ENDPOINT
value: /var/lib/csi/sockets/pluginproxy/csi.sock
- name: CSI_NODENAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
Expand Down
5 changes: 5 additions & 0 deletions yamls/deploy/huawei-csi-multi-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ spec:
env:
- name: CSI_ENDPOINT
value: /var/lib/csi/sockets/pluginproxy/csi.sock
- name: CSI_NODENAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
Expand Down
6 changes: 6 additions & 0 deletions yamls/deploy/huawei-csi-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ spec:
- "--containerized"
- "--driver-name=csi.huawei.com"
- "--volume-use-multipath=true"
env:
- name: CSI_NODENAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
securityContext:
privileged: true
capabilities:
Expand Down
3 changes: 3 additions & 0 deletions yamls/deploy/huawei-csi-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]

---
kind: ClusterRoleBinding
Expand Down
5 changes: 5 additions & 0 deletions yamls/deploy/huawei-csi-resize-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ spec:
env:
- name: CSI_ENDPOINT
value: /var/lib/csi/sockets/pluginproxy/csi.sock
- name: CSI_NODENAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
Expand Down
3 changes: 3 additions & 0 deletions yamls/deploy/huawei-csi-resize-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]

---
kind: ClusterRoleBinding
Expand Down
5 changes: 5 additions & 0 deletions yamls/deploy/huawei-csi-resize-snapshot-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ spec:
env:
- name: CSI_ENDPOINT
value: /var/lib/csi/sockets/pluginproxy/csi.sock
- name: CSI_NODENAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
Expand Down
3 changes: 3 additions & 0 deletions yamls/deploy/huawei-csi-resize-snapshot-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]

---
kind: ClusterRoleBinding
Expand Down

0 comments on commit 9d5246f

Please sign in to comment.