Skip to content

Commit

Permalink
some fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Zimin <[email protected]>
  • Loading branch information
AleksZimin committed Mar 13, 2024
1 parent b27a802 commit 50dc08e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 131 deletions.
64 changes: 29 additions & 35 deletions images/agent/pkg/controller/lvm_volume_group_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func RunLVMVolumeGroupWatcherController(
}

createFunc := func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
log.Info(fmt.Sprintf("[RunLVMVolumeGroupController] event create LVMVolumeGroup, name: %s", e.Object.GetName()))
log.Info(fmt.Sprintf("[RunLVMVolumeGroupController] Get event CREATE for resource LVMVolumeGroup, name: %s", e.Object.GetName()))

request := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: e.Object.GetNamespace(), Name: e.Object.GetName()}}
shouldRequeue, err := ReconcileLVMVG(ctx, metrics, e.Object.GetName(), e.Object.GetNamespace(), cfg.NodeName, log, cl)
Expand All @@ -92,7 +92,7 @@ func RunLVMVolumeGroupWatcherController(
}

updateFunc := func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] update LVMVolumeGroupn, name: %s", e.ObjectNew.GetName()))
log.Info(fmt.Sprintf("[RunLVMVolumeGroupWatcherController] Get event UPDATE for resource LVMVolumeGroup, name: %s", e.ObjectNew.GetName()))

newLVG, ok := e.ObjectNew.(*v1alpha1.LvmVolumeGroup)
if !ok {
Expand Down Expand Up @@ -206,33 +206,26 @@ func ReconcileLVMVG(
return true, err
}

validation, status, err := ValidateLVMGroup(ctx, cl, metrics, lvg, objectNameSpace, nodeName)

if status.Health == NonOperational {
health := status.Health
var message string
if err != nil {
message = err.Error()
}

log.Error(err, fmt.Sprintf("[ReconcileLVMVG] ValidateLVMGroup, resource name: %s, message: %s", lvg.Name, message))
err = updateLVMVolumeGroupHealthStatus(ctx, cl, metrics, lvg.Name, lvg.Namespace, message, health)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error update LVMVolumeGroup %s", lvg.Name))
return true, err
}
}
isOwnedByNode, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, lvg, objectNameSpace, nodeName)

if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] validationLVMGroup failed, resource name: %s", lvg.Name))
return false, err
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CheckLVMVGNodeOwnership, resource name: %s", lvg.Name))
if status.Health == NonOperational {
health := status.Health
message := status.Message
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] ValidateLVMGroup, resource name: %s, health: %s, phase: %s, message: %s", lvg.Name, health, status.Phase, message))
err = updateLVMVolumeGroupHealthStatus(ctx, cl, metrics, lvg.Name, lvg.Namespace, message, health)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error update LVMVolumeGroup %s", lvg.Name))
return true, err
}
}
return true, err
}

if validation == false {
err = errors.New("resource validation failed")
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] validation failed for resource, name: %s", lvg.Name))
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] status.Message = %s", status.Message))
return false, err
if !isOwnedByNode {
log.Debug(fmt.Sprintf("[ReconcileLVMVG] resource is not owned by node, name: %s, skip it", lvg.Name))
return false, nil
}

log.Info("[ReconcileLVMVG] validation passed")
Expand Down Expand Up @@ -272,14 +265,14 @@ func ReconcileLVMVG(
}
log.Info(fmt.Sprintf(`[ReconcileLVMVG] event was created for resource, name: %s`, lvg.Name))

existVG, err := ExistVG(lvg.Spec.ActualVGNameOnTheNode, log, metrics)
isVgExist, vg, err := GetVGFromNode(lvg.Spec.ActualVGNameOnTheNode, log, metrics)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error ExistVG, name: %s", lvg.Spec.ActualVGNameOnTheNode))
return true, err
}
if existVG {
log.Debug("[ReconcileLVMVG] tries to update ")
updated, err := UpdateLVMVolumeGroupTagsName(log, metrics, lvg)
if isVgExist {
log.Debug("[ReconcileLVMVG] start UpdateLVMVolumeGroupTagsName for r " + lvg.Name)
updated, err := UpdateLVMVolumeGroupTagsName(log, metrics, vg, lvg)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] unable to update VG tags on VG, name: %s", lvg.Spec.ActualVGNameOnTheNode))
return true, err
Expand All @@ -292,16 +285,16 @@ func ReconcileLVMVG(
}

log.Info("[ReconcileLVMVG] validation and choosing the type of operation")
extendPVs, shrinkPVs, err := ValidateTypeLVMGroup(ctx, cl, metrics, lvg, log)
extendPVs, shrinkPVs, err := ValidateOperationTypeLVMGroup(ctx, cl, metrics, lvg, log)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error ValidateTypeLVMGroup, name: %s", lvg.Name))
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error ValidateOperationTypeLVMGroup, name: %s", lvg.Name))
return true, err
}

if err == nil && extendPVs == nil && shrinkPVs == nil {
log.Warning("[ReconcileLVMVG] ValidateTypeLVMGroup FAIL")
//todo retry and send message
}
// if err == nil && extendPVs == nil && shrinkPVs == nil {
// log.Warning(fmt.Sprintf("[ReconcileLVMVG] ValidateOperationTypeLVMGroup FAIL for resource %s", lvg.Name))
// //todo retry and send message
// }

log.Debug("----- extendPVs list -----")
for _, pvExt := range extendPVs {
Expand Down Expand Up @@ -449,6 +442,7 @@ func ReconcileLVMVG(
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] error CreateEventLVMVolumeGroup, resource name: %s", lvg.Name))
}

log.Debug("[ReconcileLVMVG] Start CreateVGComplex function for resource " + lvg.Name)
err := CreateVGComplex(ctx, cl, metrics, lvg, log)
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] unable to CreateVGComplex for resource, name: %s", lvg.Name))
Expand Down
105 changes: 28 additions & 77 deletions images/agent/pkg/controller/lvm_volume_group_watcher_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func getBlockDevice(ctx context.Context, cl client.Client, metrics monitoring.Me
return obj, nil
}

func ValidateLVMGroup(ctx context.Context, cl client.Client, metrics monitoring.Metrics, lvmVolumeGroup *v1alpha1.LvmVolumeGroup, namespace, nodeName string) (bool, *StatusLVMVolumeGroup, error) {
func CheckLVMVGNodeOwnership(ctx context.Context, cl client.Client, metrics monitoring.Metrics, lvmVolumeGroup *v1alpha1.LvmVolumeGroup, namespace, nodeName string) (bool, *StatusLVMVolumeGroup, error) {
status := StatusLVMVolumeGroup{}
if lvmVolumeGroup == nil {
return false, nil, errors.New("lvmVolumeGroup is nil")
Expand Down Expand Up @@ -143,38 +143,14 @@ func ValidateLVMGroup(ctx context.Context, cl client.Client, metrics monitoring.
}

if membership == 0 {
status.Health = NonOperational
status.Phase = Failed
status.Message = "no selected block devices are from the current node for local LVMVolumeGroup"
return false, &status, nil
}
}

// if lvmVolumeGroup.Spec.Type == Shared {
// if len(lvmVolumeGroup.Spec.BlockDeviceNames) != 1 {
// status.Health = NonOperational
// status.Phase = Failed
// status.Message = "several block devices are selected for the shared LVMVolumeGroup"
// return false, &status, errors.New(status.Message)
// }

// singleBD := lvmVolumeGroup.Spec.BlockDeviceNames[0]
// bd, err := getBlockDevice(ctx, cl, metrics, namespace, singleBD)
// if err != nil {
// status.Health = NonOperational
// status.Phase = Failed
// status.Message = "selected unknown block device for the shared LVMVolumeGroup"
// return false, &status, err
// }

// if bd.Status.NodeName == nodeName {
// return true, &status, nil
// }
// }
return false, &status, nil
}

func ValidateTypeLVMGroup(ctx context.Context, cl client.Client, metrics monitoring.Metrics, lvmVolumeGroup *v1alpha1.LvmVolumeGroup, l logger.Logger) (extendPV, shrinkPV []string, err error) {
func ValidateOperationTypeLVMGroup(ctx context.Context, cl client.Client, metrics monitoring.Metrics, lvmVolumeGroup *v1alpha1.LvmVolumeGroup, l logger.Logger) (extendPV, shrinkPV []string, err error) {
pvs, cmdStr, _, err := utils.GetAllPVs()
l.Debug(fmt.Sprintf("GetAllPVs exec cmd: %s", cmdStr))
if err != nil {
Expand All @@ -188,36 +164,28 @@ func ValidateTypeLVMGroup(ctx context.Context, cl client.Client, metrics monitor
}

if dev.Status.Consumable == true {
extendPV = append(extendPV, dev.Status.Path)
isReallyConsumable := true
for _, pv := range pvs {
if pv.PVName == dev.Status.Path {
if pv.VGName == lvmVolumeGroup.Spec.ActualVGNameOnTheNode {
isReallyConsumable = false
}
}
}
if isReallyConsumable {
extendPV = append(extendPV, dev.Status.Path)
}

continue
}

if dev.Status.ActualVGNameOnTheNode != lvmVolumeGroup.Spec.ActualVGNameOnTheNode && (len(dev.Status.VGUuid) != 0) {
return nil, nil, nil
// validation fail, send message => LVG ?
err = fmt.Errorf("block device %s is already in use by another VG: %s with uuid %s. Our VG: %s with uuid %s", devName, dev.Status.ActualVGNameOnTheNode, dev.Status.VGUuid, lvmVolumeGroup.Spec.ActualVGNameOnTheNode, dev.Status.VGUuid)
return nil, nil, err
}
// TODO: realisation of shrinkPV
}

var flag bool

for _, pv := range pvs {
if pv.VGName == lvmVolumeGroup.Spec.ActualVGNameOnTheNode {
flag = false
for _, devName := range lvmVolumeGroup.Spec.BlockDeviceNames {
dev, err := getBlockDevice(ctx, cl, metrics, lvmVolumeGroup.Namespace, devName)
if err != nil {
return nil, nil, err
}

if pv.PVUuid == dev.Status.PVUuid {
flag = true
}
}
}
if !flag && pv.VGName == lvmVolumeGroup.Spec.ActualVGNameOnTheNode {
shrinkPV = append(shrinkPV, pv.PVName)
}
}
return extendPV, shrinkPV, nil
}

Expand Down Expand Up @@ -338,24 +306,25 @@ func DeleteVG(vgName string, log logger.Logger, metrics monitoring.Metrics) erro
return nil
}

func ExistVG(vgName string, log logger.Logger, metrics monitoring.Metrics) (bool, error) {
func GetVGFromNode(vgName string, log logger.Logger, metrics monitoring.Metrics) (bool, internal.VGData, error) {
start := time.Now()
vg, command, _, err := utils.GetAllVGs()
var vg internal.VGData
vgs, command, _, err := utils.GetAllVGs()
metrics.UtilsCommandsDuration(LVMVolumeGroupWatcherCtrlName, "vgs").Observe(metrics.GetEstimatedTimeInSeconds(start))
metrics.UtilsCommandsExecutionCount(LVMVolumeGroupWatcherCtrlName, "vgs").Inc()
log.Debug(command)
if err != nil {
metrics.UtilsCommandsErrorsCount(LVMVolumeGroupWatcherCtrlName, "vgs").Inc()
log.Error(err, " error CreateEventLVMVolumeGroup")
return false, err
return false, vg, err
}

for _, v := range vg {
if v.VGName == vgName {
return true, nil
for _, vg := range vgs {
if vg.VGName == vgName {
return true, vg, nil
}
}
return false, nil
return false, vg, nil
}

func ValidateConsumableDevices(ctx context.Context, cl client.Client, metrics monitoring.Metrics, group *v1alpha1.LvmVolumeGroup) (bool, error) {
Expand Down Expand Up @@ -480,31 +449,13 @@ func CreateVGComplex(ctx context.Context, cl client.Client, metrics monitoring.M
return nil
}

func UpdateLVMVolumeGroupTagsName(log logger.Logger, metrics monitoring.Metrics, lvg *v1alpha1.LvmVolumeGroup) (bool, error) {
func UpdateLVMVolumeGroupTagsName(log logger.Logger, metrics monitoring.Metrics, vg internal.VGData, lvg *v1alpha1.LvmVolumeGroup) (bool, error) {
const tag = "storage.deckhouse.io/lvmVolumeGroupName"

start := time.Now()
vgs, cmd, _, err := utils.GetAllVGs()
metrics.UtilsCommandsDuration(LVMVolumeGroupWatcherCtrlName, "vgs").Observe(metrics.GetEstimatedTimeInSeconds(start))
metrics.UtilsCommandsExecutionCount(LVMVolumeGroupWatcherCtrlName, "vgs").Inc()
log.Debug(fmt.Sprintf("[ReconcileLVMVG] exec cmd: %s", cmd))
if err != nil {
log.Error(err, fmt.Sprintf("[ReconcileLVMVG] unable to get VG by resource, name: %s", lvg.Name))
metrics.UtilsCommandsErrorsCount(LVMVolumeGroupWatcherCtrlName, "vgs").Inc()
return false, err
}

var vg internal.VGData
for _, v := range vgs {
if v.VGName == lvg.Spec.ActualVGNameOnTheNode {
vg = v
}
}

found, tagName := CheckTag(vg.VGTags)
if found && lvg.Name != tagName {
start = time.Now()
cmd, err = utils.VGChangeDelTag(vg.VGName, fmt.Sprintf("%s=%s", tag, tagName))
start := time.Now()
cmd, err := utils.VGChangeDelTag(vg.VGName, fmt.Sprintf("%s=%s", tag, tagName))
metrics.UtilsCommandsDuration(LVMVolumeGroupWatcherCtrlName, "vgchange").Observe(metrics.GetEstimatedTimeInSeconds(start))
metrics.UtilsCommandsExecutionCount(LVMVolumeGroupWatcherCtrlName, "vgchange").Inc()
log.Debug(fmt.Sprintf("[UpdateLVMVolumeGroupTagsName] exec cmd: %s", cmd))
Expand Down
23 changes: 12 additions & 11 deletions images/agent/pkg/controller/lvm_volume_group_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package controller

import (
"context"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sds-node-configurator/api/v1alpha1"
"sds-node-configurator/pkg/monitoring"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
Expand Down Expand Up @@ -318,7 +319,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
})

t.Run("ValidateLVMGroup_lvg_is_nil_returns_error", func(t *testing.T) {
valid, obj, err := ValidateLVMGroup(ctx, cl, metrics, nil, "test_ns", "test_node")
valid, obj, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, nil, "test_ns", "test_node")
assert.False(t, valid)
assert.Nil(t, obj)
assert.EqualError(t, err, "lvmVolumeGroup is nil")
Expand Down Expand Up @@ -350,7 +351,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}()
}

valid, status, err := ValidateLVMGroup(ctx, cl, metrics, testObj, namespace, "test_node")
valid, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, testObj, namespace, "test_node")
assert.False(t, valid)
if assert.NotNil(t, status) {
assert.Equal(t, NonOperational, status.Health)
Expand Down Expand Up @@ -431,7 +432,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}()
}

valid, status, err := ValidateLVMGroup(ctx, cl, metrics, testLvg, namespace, testNode)
valid, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, testLvg, namespace, testNode)
assert.False(t, valid)
if assert.NotNil(t, status) {
assert.Equal(t, NonOperational, status.Health)
Expand Down Expand Up @@ -513,7 +514,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}()
}

valid, status, err := ValidateLVMGroup(ctx, cl, metrics, testLvg, namespace, "another-node")
valid, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, testLvg, namespace, "another-node")
assert.False(t, valid)
if assert.NotNil(t, status) {
assert.Equal(t, NonOperational, status.Health)
Expand Down Expand Up @@ -595,7 +596,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}()
}

valid, status, err := ValidateLVMGroup(ctx, cl, metrics, testLvg, namespace, testNode)
valid, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, testLvg, namespace, testNode)
assert.True(t, valid)
if assert.NotNil(t, status) {
assert.Equal(t, "", status.Health)
Expand Down Expand Up @@ -677,7 +678,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}()
}

valid, status, err := ValidateLVMGroup(ctx, cl, metrics, testLvg, namespace, testNode)
valid, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, testLvg, namespace, testNode)
assert.False(t, valid)
if assert.NotNil(t, status) {
assert.Equal(t, NonOperational, status.Health)
Expand Down Expand Up @@ -749,7 +750,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}()
}

valid, status, err := ValidateLVMGroup(ctx, cl, metrics, testLvg, namespace, testNode)
valid, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, testLvg, namespace, testNode)
assert.False(t, valid)
if assert.NotNil(t, status) {
assert.Equal(t, NonOperational, status.Health)
Expand Down Expand Up @@ -821,7 +822,7 @@ func TestLVMVolumeGroupWatcherCtrl(t *testing.T) {
}()
}

valid, status, err := ValidateLVMGroup(ctx, cl, metrics, testLvg, namespace, testNode)
valid, status, err := CheckLVMVGNodeOwnership(ctx, cl, metrics, testLvg, namespace, testNode)
assert.True(t, valid)
if assert.NotNil(t, status) {
assert.Equal(t, "", status.Health)
Expand Down
Loading

0 comments on commit 50dc08e

Please sign in to comment.