diff --git a/README.md b/README.md index 66fe00c..5ffb61d 100644 --- a/README.md +++ b/README.md @@ -97,3 +97,10 @@ However, it is important to note that LiteIO does not currently support data rep - [x] Disk-Agent exposes metric service - [ ] SPDK volume replica + + +## Contact + +Wechat Group QRCode + +![Wechat Group](doc/image/wechat_group.JPG) \ No newline at end of file diff --git a/cmd/controller/antplugins/localstorage.go b/cmd/controller/antplugins/localstorage.go index 1aef29f..cf3ba19 100644 --- a/cmd/controller/antplugins/localstorage.go +++ b/cmd/controller/antplugins/localstorage.go @@ -5,7 +5,9 @@ import ( "fmt" "strconv" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil" @@ -41,6 +43,7 @@ func NewReportLocalStoragePlugin(h *controllers.PluginHandle) (p plugin.Plugin, } p = &ReportLocalStoragePlugin{ + Client: h.Client, NodeUpdater: kubeutil.NewKubeNodeInfoGetter(h.Req.KubeCli), PoolUtil: kubeutil.NewStoragePoolUtil(h.Client), ReportLocalConfigs: pluginCfg.DefaultLocalSpaceRules, @@ -50,7 +53,7 @@ func NewReportLocalStoragePlugin(h *controllers.PluginHandle) (p plugin.Plugin, // ReportLocalStoragePlugin is a AntstorVolume plugin. type ReportLocalStoragePlugin struct { - // NodeGetter kubeutil.NodeInfoGetterIface + Client client.Client NodeUpdater kubeutil.NodeUpdaterIface PoolUtil kubeutil.StoragePoolUpdater @@ -96,18 +99,44 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin // report the local storage when the StoragePool is created in the first place. if isPool && pool != nil { - totalBs := pool.GetAvailableBytes() - if _, has := pool.Labels[v1.PoolLocalStorageBytesKey]; !has { - log.Info("update node/status capacity", "local-storage", totalBs) - // update Pool Label "obnvmf/node-local-storage-size" = totalBs - err = r.PoolUtil.SavePoolLocalStorageMark(pool, uint64(totalBs)) + var ( + localBS uint64 + node corev1.Node + snode *state.Node + hasNodeRes bool + hasPoolLabel bool + ) + + // calculate local storage + snode, err = stateObj.GetNodeByNodeID(pool.Name) + if err != nil { + log.Error(err, "find node failed") + return plugin.Result{Error: err} + } + localBS = CalculateLocalStorageCapacity(snode) + + // get node + err = r.Client.Get(ctx.ReqCtx.Ctx, client.ObjectKey{Name: pool.Name}, &node) + if err != nil { + log.Error(err, "getting Node failed") + return plugin.Result{Error: err} + } + + _, hasNodeRes = node.Status.Allocatable[kubeutil.SdsLocalStorageResourceKey] + _, hasPoolLabel = pool.Labels[v1.PoolLocalStorageBytesKey] + log.Info("check pool PoolLocalStorageBytesKey and node SdsLocalStorageResourceKey", "nodeResource", hasNodeRes, "hasPoolLabel", hasPoolLabel) + + if !hasPoolLabel || !hasNodeRes { + log.Info("update node/status capacity", "local-storage", localBS) + // update Pool Label "obnvmf/local-storage-bytes" = totalBs + err = r.PoolUtil.SavePoolLocalStorageMark(pool, localBS) if err != nil { log.Error(err, "SavePoolLocalStorageMark failed") return plugin.Result{Error: err} } // update node/status capacity = totalBs - _, err = r.NodeUpdater.ReportLocalDiskResource(pool.Name, uint64(totalBs)) + _, err = r.NodeUpdater.ReportLocalDiskResource(pool.Name, localBS) if err != nil { log.Error(err, "ReportLocalDiskResource failed") return plugin.Result{Error: err} @@ -126,28 +155,6 @@ func (r *ReportLocalStoragePlugin) Reconcile(ctx *plugin.Context) (result plugin } var sp = node.Pool - /* - var localStorePct int - var volInState *v1.AntstorVolume - for _, item := range r.ReportLocalConfigs { - selector, err := metav1.LabelSelectorAsSelector(&item.LabelSelector) - if err != nil { - log.Error(err, "LabelSelectorAsSelector failed", "selector", item.LabelSelector) - continue - } - if selector.Matches(labels.Set(sp.Spec.NodeInfo.Labels)) && item.EnableDefault { - localStorePct = item.DefaultLocalStoragePct - log.Info("matched local-storage percentage", "pct", localStorePct) - } - } - - volInState, err = node.GetVolumeByID(volume.Spec.Uuid) - if err == nil { - log.Info("copy volume into state") - *volInState = *volume - } - */ - var expectLocalSize = CalculateLocalStorageCapacity(node) var localSizeStr = strconv.Itoa(int(expectLocalSize)) log.Info("compare local storage size", "in label", sp.Labels[v1.PoolLocalStorageBytesKey], "expect", localSizeStr, "delTS", volume.DeletionTimestamp) diff --git a/doc/image/wechat_group.JPG b/doc/image/wechat_group.JPG new file mode 100644 index 0000000..adf3cb2 Binary files /dev/null and b/doc/image/wechat_group.JPG differ diff --git a/hack/deploy/lvm/050-configmap.yaml b/hack/deploy/lvm/050-configmap.yaml index c70e611..d501efa 100644 --- a/hack/deploy/lvm/050-configmap.yaml +++ b/hack/deploy/lvm/050-configmap.yaml @@ -25,6 +25,9 @@ data: nodeTaints: - key: node.sigma.ali/lifecycle operator: Exists + #nodeReservations: + #- id: obnvmf/app-vol + # size: 107374182400 # 100Gi pluginConfigs: defaultLocalSpaceRules: - enableDefault: true diff --git a/pkg/controller/manager/config/config.go b/pkg/controller/manager/config/config.go index 825ef9e..5fe774b 100644 --- a/pkg/controller/manager/config/config.go +++ b/pkg/controller/manager/config/config.go @@ -30,6 +30,13 @@ type SchedulerConfig struct { NodeCacheSelector map[string]string `json:"nodeCacheSelector" yaml:"nodeCacheSelector"` // MinLocalStoragePct defines the minimun percentage of local storage to be reserved on one node. MinLocalStoragePct int `json:"minLocalStoragePct" yaml:"minLocalStoragePct"` + // NodeReservations defines the reservations on each node + NodeReservations []NodeReservation `json:"nodeReservations" yaml:"nodeReservations"` +} + +type NodeReservation struct { + ID string `json:"id" yaml:"id"` + Size int64 `json:"size" yaml:"size"` } type NoScheduleConfig struct { diff --git a/pkg/controller/manager/controllers/manager.go b/pkg/controller/manager/controllers/manager.go index 067864b..fd33eb5 100644 --- a/pkg/controller/manager/controllers/manager.go +++ b/pkg/controller/manager/controllers/manager.go @@ -100,6 +100,7 @@ func NewAndInitControllerManager(req NewManagerRequest) manager.Manager { Concurrency: 4, MainHandler: &reconciler.StoragePoolReconcileHandler{ Client: mgr.GetClient(), + Cfg: req.ControllerConfig, State: stateObj, PoolUtil: poolUtil, KubeCli: kubeClient, diff --git a/pkg/controller/manager/reconciler/pool_reconciler.go b/pkg/controller/manager/reconciler/pool_reconciler.go index 061b090..e545de8 100644 --- a/pkg/controller/manager/reconciler/pool_reconciler.go +++ b/pkg/controller/manager/reconciler/pool_reconciler.go @@ -20,6 +20,7 @@ import ( v1 "code.alipay.com/dbplatform/node-disk-controller/pkg/api/volume.antstor.alipay.com/v1" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/kubeutil" + "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/config" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/reconciler/plugin" "code.alipay.com/dbplatform/node-disk-controller/pkg/controller/manager/state" "code.alipay.com/dbplatform/node-disk-controller/pkg/util/misc" @@ -39,6 +40,7 @@ var ( type StoragePoolReconcileHandler struct { client.Client + Cfg config.Config State state.StateIface PoolUtil kubeutil.StoragePoolUpdater KubeCli kubernetes.Interface @@ -227,6 +229,7 @@ func (r *StoragePoolReconcileHandler) handleDeletion(pCtx *plugin.Context) (resu func (r *StoragePoolReconcileHandler) saveToState(sp *v1.StoragePool, log logr.Logger) (result plugin.Result) { var patch = client.MergeFrom(sp.DeepCopy()) var err error + var node *state.Node r.State.SetStoragePool(sp) @@ -244,6 +247,19 @@ func (r *StoragePoolReconcileHandler) saveToState(sp *v1.StoragePool, log logr.L } } + // try to add reservation by config + node, err = r.State.GetNodeByNodeID(sp.Name) + if err != nil { + log.Error(err, "GetNodeByNodeID error") + } + for _, item := range r.Cfg.Scheduler.NodeReservations { + if node != nil { + if _, has := node.GetReservation(item.ID); !has { + node.Reserve(state.NewReservation(item.ID, item.Size)) + } + } + } + return plugin.Result{} } diff --git a/pkg/controller/manager/scheduler/filter/basic.go b/pkg/controller/manager/scheduler/filter/basic.go index 9c9956e..f9cad92 100644 --- a/pkg/controller/manager/scheduler/filter/basic.go +++ b/pkg/controller/manager/scheduler/filter/basic.go @@ -21,6 +21,11 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b ) // check if err is nil + // if voume matches reservation, then do not do following checks + if pass, hasErr := matchReservationFilter(ctx, n, vol); hasErr || pass { + return pass + } + // consider Pool FreeSpace var freeRes = n.GetFreeResourceNonLock() var freeDisk = freeRes[v1.ResourceDiskPoolByte] @@ -100,3 +105,23 @@ func BasicFilterFunc(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) b return true } + +func matchReservationFilter(ctx *FilterContext, n *state.Node, vol *v1.AntstorVolume) (pass, hasError bool) { + if resvId, has := vol.Annotations[v1.ReservationIDKey]; has { + free := n.FreeResource.Storage() + if free.CmpInt64(0) < 0 { + ctx.Error.AddReason(ReasonPoolFreeSize) + return false, true + } + + if r, has := n.GetReservation(resvId); has { + if r.Size() < int64(vol.Spec.SizeByte) { + ctx.Error.AddReason(ReasonReservationSize) + return false, true + } + return true, false + } + } + + return false, false +} diff --git a/pkg/controller/manager/scheduler/filter/error.go b/pkg/controller/manager/scheduler/filter/error.go index dc2d24a..3954036 100644 --- a/pkg/controller/manager/scheduler/filter/error.go +++ b/pkg/controller/manager/scheduler/filter/error.go @@ -16,6 +16,7 @@ const ( ReasonNodeAffinity = "NodeAffinity" ReasonPoolAffinity = "PoolAffinity" ReasonPoolUnschedulable = "PoolUnschedulable" + ReasonReservationSize = "ReservationTooSmall" NoStoragePoolAvailable = "NoStoragePoolAvailable" // diff --git a/pkg/controller/manager/state/http.go b/pkg/controller/manager/state/http.go index f1ac7c8..564b8f4 100644 --- a/pkg/controller/manager/state/http.go +++ b/pkg/controller/manager/state/http.go @@ -13,20 +13,31 @@ type NodeStateAPI struct { KernelLVM *v1.KernelLVM `json:"kernelLVM,omitempty"` SpdkLVS *v1.SpdkLVStore `json:"spdkLVS,omitempty"` // Volumes breif info - Volumes []VolumeBreif `json:"volumes"` - // FreeSize of the pool - FreeSize int64 `json:"freeSize"` + Volumes []VolumeBrief `json:"volumes"` + // VgFreeSize of the pool + VgFreeSize int64 `json:"vgFreeSize"` + // MemFreeSize in controller memory + MemFreeSize int64 `json:"memFreeSize"` + // MemFreeSizeStr readable size in controller memory + MemFreeSizeStr string `json:"memFreeSizeStr"` // Conditions of the pool status Conditions map[v1.PoolConditionType]v1.ConditionStatus `json:"conditions"` + // Resvervations on the node + Resvervations []ReservationBreif `json:"reservations"` } -type VolumeBreif struct { +type VolumeBrief struct { Namespace string `json:"ns"` Name string `json:"name"` DataHolder string `json:"dataHolder"` Size int64 `json:"size"` } +type ReservationBreif struct { + ID string `json:"id"` + Size int64 `json:"size"` +} + func NewStateHandler(s StateIface) *StateHandler { return &StateHandler{state: s} } @@ -49,11 +60,14 @@ func (h *StateHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) } var api = NodeStateAPI{ - Name: spName, - PoolLabels: node.Pool.Labels, - KernelLVM: &node.Pool.Spec.KernelLVM, - SpdkLVS: &node.Pool.Spec.SpdkLVStore, - FreeSize: node.Pool.Status.VGFreeSize.Value(), + Name: spName, + PoolLabels: node.Pool.Labels, + KernelLVM: &node.Pool.Spec.KernelLVM, + SpdkLVS: &node.Pool.Spec.SpdkLVStore, + VgFreeSize: node.Pool.Status.VGFreeSize.Value(), + MemFreeSize: int64(node.FreeResource.Storage().AsApproximateFloat64()), + MemFreeSizeStr: node.FreeResource.Storage().String(), + Conditions: make(map[v1.PoolConditionType]v1.ConditionStatus), } @@ -62,7 +76,7 @@ func (h *StateHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) } for _, vol := range node.Volumes { - api.Volumes = append(api.Volumes, VolumeBreif{ + api.Volumes = append(api.Volumes, VolumeBrief{ Namespace: vol.Namespace, Name: vol.Name, Size: int64(vol.Spec.SizeByte), @@ -70,6 +84,15 @@ func (h *StateHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) }) } + if node.resvSet != nil { + for _, resv := range node.resvSet.Items() { + api.Resvervations = append(api.Resvervations, ReservationBreif{ + ID: resv.ID(), + Size: resv.Size(), + }) + } + } + bs, err := json.Marshal(api) if err != nil { writer.Write([]byte(err.Error())) diff --git a/pkg/controller/manager/state/node.go b/pkg/controller/manager/state/node.go index 5295f52..131b9d7 100644 --- a/pkg/controller/manager/state/node.go +++ b/pkg/controller/manager/state/node.go @@ -63,6 +63,13 @@ func (n *Node) AddVolume(vol *v1.AntstorVolume) (err error) { defer n.volLock.Unlock() var nodeID = n.Info.ID + var duplicate bool + + // delete reservation if volume has reservation id + if resvID := getVolumeReservationID(vol); resvID != "" { + n.resvSet.Unreserve(resvID) + } + // check duplicate for _, item := range n.Volumes { if item.Name == vol.Name { @@ -77,19 +84,16 @@ func (n *Node) AddVolume(vol *v1.AntstorVolume) (err error) { // save the newer volume *item = *vol.DeepCopy() klog.Infof("vol %s already in node %s. type and sizes equal to each other", vol.Name, nodeID) - return + duplicate = true } } - // delete reservation if volume has reservation id - if resvID := getVolumeReservationID(vol); resvID != "" { - n.resvSet.Unreserve(resvID) + if !duplicate { + // volume reside on Node + vol.Spec.TargetNodeId = n.Pool.Spec.NodeInfo.ID + n.Volumes = append(n.Volumes, vol) } - n.Volumes = append(n.Volumes, vol) - // volume reside on Node - vol.Spec.TargetNodeId = n.Pool.Spec.NodeInfo.ID - // update free resource n.FreeResource = n.GetFreeResourceNonLock() @@ -231,6 +235,25 @@ func (n *Node) GetFreeResourceNonLock() (free corev1.ResourceList) { // Reserve storage resource for Node func (n *Node) Reserve(r ReservationIface) { + // if volume is already binded, then skip reservation. + var resvID = r.ID() + for _, vol := range n.Volumes { + if resvID == getVolumeReservationID(vol) { + return + } + } + + // check free resource + if free := n.FreeResource.Storage(); free != nil { + if free.CmpInt64(r.Size()) < 0 { + klog.Errorf("node %s have no enough disk pool space for reservation %s", n.Info.ID, resvID) + return + } + } + + n.volLock.Lock() + defer n.volLock.Unlock() + n.resvSet.Reserve(r) // update free resource n.FreeResource = n.GetFreeResourceNonLock() @@ -238,7 +261,14 @@ func (n *Node) Reserve(r ReservationIface) { // Unreserve storage resource func (n *Node) Unreserve(id string) { + n.volLock.Lock() + defer n.volLock.Unlock() + n.resvSet.Unreserve(id) // update free resource n.FreeResource = n.GetFreeResourceNonLock() } + +func (n *Node) GetReservation(id string) (r ReservationIface, has bool) { + return n.resvSet.GetById(id) +} diff --git a/pkg/controller/manager/state/reservation.go b/pkg/controller/manager/state/reservation.go index 2456905..9fd7e47 100644 --- a/pkg/controller/manager/state/reservation.go +++ b/pkg/controller/manager/state/reservation.go @@ -13,6 +13,7 @@ type ReservationSetIface interface { Reserve(r ReservationIface) Unreserve(id string) Items() (list []ReservationIface) + GetById(id string) (r ReservationIface, has bool) } type ReservationIface interface { @@ -57,12 +58,32 @@ func (rs *reservationSet) Items() (list []ReservationIface) { return list } +func (rs *reservationSet) GetById(id string) (r ReservationIface, has bool) { + rs.lock.Lock() + defer rs.lock.Unlock() + + for _, item := range rs.rMap { + if item.ID() == id { + return item, true + } + } + + return nil, false +} + type reservation struct { id string namespacedName string sizeByte int64 } +func NewReservation(id string, size int64) ReservationIface { + return &reservation{ + id: id, + sizeByte: size, + } +} + func NewPvcReservation(pvc *corev1.PersistentVolumeClaim) ReservationIface { if pvc.DeletionTimestamp != nil { return nil diff --git a/pkg/controller/manager/state/state_test.go b/pkg/controller/manager/state/state_test.go index 40e25fa..7374aab 100644 --- a/pkg/controller/manager/state/state_test.go +++ b/pkg/controller/manager/state/state_test.go @@ -102,3 +102,48 @@ func TestCompareError(t *testing.T) { err := newNotFoundNodeError("test") assert.True(t, IsNotFoundNodeError(err)) } + +func TestMinusResource(t *testing.T) { + q := resource.NewQuantity(0, resource.BinarySI) + assert.Zero(t, q.CmpInt64(0)) + + q.Sub(resource.MustParse("1Mi")) + assert.True(t, q.CmpInt64(0) == -1) + + t.Log(q.AsInt64()) +} + +func TestReservation(t *testing.T) { + pool := v1.StoragePool{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: v1.DefaultNamespace, + Name: "node1", + Labels: map[string]string{}, + }, + Spec: v1.StoragePoolSpec{ + NodeInfo: v1.NodeInfo{ + ID: "node1", + }, + KernelLVM: v1.KernelLVM{ + Bytes: 38654705664, // 36864 MiB + ReservedLVol: []v1.KernelLVol{ + { + Name: "reserved-lv", + SizeByte: 1024 * 1024 * 100, // 100MiB + }, + }, + }, + }, + Status: v1.StoragePoolStatus{ + Capacity: corev1.ResourceList{ + v1.ResourceDiskPoolByte: resource.MustParse("36864Mi"), + }, + }, + } + node := NewNode(&pool) + + node.Reserve(NewReservation("resv-id", 1024*1024*100)) + + t.Log(node.FreeResource.Storage().String()) + +} diff --git a/pkg/util/lvm/cmd.go b/pkg/util/lvm/cmd.go index 2bb8342..5c6ebda 100644 --- a/pkg/util/lvm/cmd.go +++ b/pkg/util/lvm/cmd.go @@ -18,7 +18,7 @@ const ( LvDeviceOpen = "open" LvDeviceNotOpen = "" - pvLostErrStr = "not found or rejected by a filter" + pvLostErrStr = "Couldn't find device" ) var (