Skip to content

Commit

Permalink
Merge pull request #10 from silentred/main
Browse files Browse the repository at this point in the history
setup watches and indexes to PlugableReconciler
  • Loading branch information
silentred authored Dec 13, 2023
2 parents 847dec1 + b196c8b commit 75a260f
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 128 deletions.
44 changes: 23 additions & 21 deletions cmd/controller/antplugins/localstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
Expand Down Expand Up @@ -119,32 +118,35 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin

// After remote volume is scheduled, report the local storage size to Node
if isVolume && volume != nil && volume.Spec.TargetNodeId != "" && !volume.IsLocal() {
var localStorePct int
var volInState *v1.AntstorVolume
node, err := stateObj.GetNodeByNodeID(volume.Spec.TargetNodeId)
var node *state.Node
node, err = stateObj.GetNodeByNodeID(volume.Spec.TargetNodeId)
if err != nil {
log.Error(err, "find node failed")
return plugin.Result{Error: err}
}
sp := node.Pool

for _, item := range r.ReportLocalConfigs {
selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector)
if err != nil {
log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector)
continue
}
if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault {
localStorePct = item.DefaultLocalStoragePct
log.Info("matched local-storage percentage", "pct", localStorePct)
var sp = node.Pool

/*
var localStorePct int
var volInState *v1.AntstorVolume
for _, item := range r.ReportLocalConfigs {
selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector)
if err != nil {
log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector)
continue
}
if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault {
localStorePct = item.DefaultLocalStoragePct
log.Info("matched local-storage percentage", "pct", localStorePct)
}
}
}
volInState, err = node.GetVolumeByID(volume.Spec.Uuid)
if err == nil {
log.Info("copy volume into state")
*volInState = *volume
}
volInState, err = node.GetVolumeByID(volume.Spec.Uuid)
if err == nil {
log.Info("copy volume into state")
*volInState = *volume
}
*/

var expectLocalSize = CalculateLocalStorageCapacity(node)
var localSizeStr = strconv.Itoa(int(expectLocalSize))
Expand Down
15 changes: 9 additions & 6 deletions cmd/controller/antplugins/patchpv.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func (p *PatchPVPlugin) Reconcile(ctx *plugin.Context) (result plugin.Result) {
return plugin.Result{}
}

log.Info("running PatchPVPlugin")
// get pv name from label
var pvName string
if val, has := volume.Labels[v1.VolumePVNameLabelKey]; has {
Expand All @@ -53,11 +52,15 @@ func (p *PatchPVPlugin) Reconcile(ctx *plugin.Context) (result plugin.Result) {
pvName = volume.Name
}

err = p.PvUtil.SetTargetNodeName(pvName, volume.Spec.TargetNodeId)
if err != nil {
log.Error(err, "updating PV label failed")
return plugin.Result{
Error: err,
if volume.Spec.TargetNodeId != "" {
log.Info("patching PV", "nodeId", volume.Spec.TargetNodeId, "pvName", pvName)

err = p.PvUtil.SetTargetNodeName(pvName, volume.Spec.TargetNodeId)
if err != nil {
log.Error(err, "updating PV label failed")
return plugin.Result{
Error: err,
}
}
}

Expand Down
30 changes: 15 additions & 15 deletions pkg/controller/manager/controllers/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,16 @@ import (
"code.alipay.com/dbplatform/node-disk-controller/pkg/agent"
v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state"
hostnvme "code.alipay.com/dbplatform/node-disk-controller/pkg/host-nvme"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc"
"code.alipay.com/dbplatform/node-disk-controller/pkg/version"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
cligoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
rt "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -173,18 +170,21 @@ func (o *OperatorOption) Run() {

ctx := rt.SetupSignalHandler()

// create NodeInformer to sync nodes to cache
nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{})
if err != nil {
klog.Fatal(err)
}
nodeHandler := &handler.NodeEventHandler{
Cfg: cfg,
}
nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: nodeHandler.FilterObject,
Handler: nodeHandler,
})
/*
// create NodeInformer to sync nodes to cache
// moved to pool reconciler
nodeInformer, err := mgr.GetCache().GetInformer(ctx, &corev1.Node{})
if err != nil {
klog.Fatal(err)
}
nodeHandler := &handler.NodeEventHandler{
Cfg: cfg,
}
nodeInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: nodeHandler.FilterObject,
Handler: nodeHandler,
})
*/

go func() {
klog.Info("manager start working")
Expand Down
69 changes: 56 additions & 13 deletions pkg/controller/manager/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@ import (

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
rt "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/handler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin"
sched "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/scheduler"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state"
"code.alipay.com/dbplatform/node-disk-controller/pkg/generated/clientset/versioned"
"code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
rt "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type B64EncodedMysqlDSN string
Expand Down Expand Up @@ -100,7 +104,15 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
PoolUtil: poolUtil,
KubeCli: kubeClient,
},
WatchType: &v1.StoragePool{},
ForType: &v1.StoragePool{},
Watches: []reconciler.WatchObject{
{
Source: &source.Kind{Type: &corev1.Node{}},
EventHandler: &handler.NodeEventHandler{
Cfg: req.ControllerConfig,
},
},
},
}
if err = poolReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller StoragePoolReconciler")
Expand All @@ -123,7 +135,38 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
AntstoreCli: antstorCli,
Scheduler: scheduler,
},
WatchType: &v1.AntstorVolume{},
ForType: &v1.AntstorVolume{},
Watches: []reconciler.WatchObject{
{
Source: &source.Kind{Type: &v1.AntstorVolume{}},
EventHandler: &handler.VolumeEventHandler{
State: stateObj,
}},
},
Indexes: []reconciler.IndexObject{
{
Obj: &v1.AntstorVolume{},
Field: v1.IndexKeyUUID,
ExtractValue: func(rawObj client.Object) []string {
// grab the volume, extract the uuid
if vol, ok := rawObj.(*v1.AntstorVolume); ok {
return []string{vol.Spec.Uuid}
}
return nil
},
},
{
Obj: &v1.AntstorVolume{},
Field: v1.IndexKeyTargetNodeID,
ExtractValue: func(rawObj client.Object) []string {
// grab the volume, extract the targetNodeId
if vol, ok := rawObj.(*v1.AntstorVolume); ok {
return []string{vol.Spec.TargetNodeId}
}
return nil
},
},
},
}
if err = volReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller VolumeReconciler")
Expand All @@ -145,7 +188,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
Scheduler: scheduler,
State: stateObj,
},
WatchType: &v1.AntstorVolumeGroup{},
ForType: &v1.AntstorVolumeGroup{},
}
if err = volGroupReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller VolumeGroupReconciler")
Expand All @@ -165,7 +208,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager {
MainHandler: &reconciler.AntstorDataControlReconcileHandler{
Client: mgr.GetClient(),
},
WatchType: &v1.AntstorDataControl{},
ForType: &v1.AntstorDataControl{},
}
if err = dataControlReconciler.SetupWithManager(mgr); err != nil {
klog.Error(err, "unable to create controller AntstorDataControlReconciler")
Expand Down
106 changes: 83 additions & 23 deletions pkg/controller/manager/reconciler/handler/node_handler.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,84 @@
package handler

import (
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1"
"code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config"
)

var (
_ cache.ResourceEventHandler = &NodeEventHandler{}
_ handler.EventHandler = &NodeEventHandler{}
)

type NodeEventHandler struct {
Cfg config.Config
}

func (e *NodeEventHandler) FilterObject(obj interface{}) bool {
/*
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return false
}
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false
}
*/

if len(e.Cfg.Scheduler.NodeCacheSelector) == 0 {
return true
// Create implements EventHandler.
func (e *NodeEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.Object) {
return
}

// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.Object.GetName(),
}})
}

// Update implements EventHandler.
func (e *NodeEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.ObjectNew) {
return
}

if node, ok := obj.(*corev1.Node); ok {
selector := labels.SelectorFromSet(labels.Set(e.Cfg.Scheduler.NodeCacheSelector))
if selector.Matches(labels.Set(node.Labels)) {
klog.Info("matched node to cache: ", node.Name)
return true
}
// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.ObjectNew.GetName(),
}})
}

// Delete implements EventHandler.
func (e *NodeEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.Object) {
return
}

// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.Object.GetName(),
}})
}

// Generic implements EventHandler.
func (e *NodeEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if !validateNode(&e.Cfg, evt.Object) {
return
}

// Node name is same with StoragePool name
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: v1.DefaultNamespace,
Name: evt.Object.GetName(),
}})
}

func (e *NodeEventHandler) FilterObject(obj interface{}) bool {
if metaObj, ok := obj.(client.Object); ok {
return validateNode(&e.Cfg, metaObj)
}

return false
Expand Down Expand Up @@ -70,3 +110,23 @@ func (e *NodeEventHandler) OnDelete(obj interface{}) {
}
klog.Infof("delete Node %s", key)
}

func validateNode(cfg *config.Config, obj client.Object) bool {
if obj == nil {
klog.Error(nil, "NodeEvent received with nil object")
return false
}

// no config of NodeCacheSelector means appcet all nodes events
if len(cfg.Scheduler.NodeCacheSelector) == 0 {
return true
}

selector := labels.SelectorFromSet(labels.Set(cfg.Scheduler.NodeCacheSelector))
if selector.Matches(labels.Set(obj.GetLabels())) {
klog.Info("matched node to cache: ", obj.GetName())
return true
}

return false
}
Loading

0 comments on commit 75a260f

Please sign in to comment.