Skip to content

Commit

Permalink
[local] Split support for ephemeral and persistent volumes
Browse files Browse the repository at this point in the history
Running TopoLVM for local persistent-data like we do with the local volume provisioner will be a pain in terms of scheduling (how to make sure a pod using shared local data on a node can always come back on it). As a result, we will keep the current setup with the local volume provisioner for persistent storage while we leverage TopoLVM for ephemeral storage on  the node. This way, we won't have to deal with TopoLVM scheduling issues.
  • Loading branch information
Fricounet committed Jun 18, 2024
1 parent db15117 commit 411e2cd
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 41 deletions.
8 changes: 6 additions & 2 deletions cluster-autoscaler/processors/datadog/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
// nodes offering local storage, and currently injected as requests on
// Pending pods having a PVC for local-data volumes.
DatadogLocalDataResource apiv1.ResourceName = "storageclass/local-data"
// DatadogEphemeralLocalDataResource is a virtual resource placed on new or future
// nodes offering ephemeral local storage, and currently injected as requests on
// Pending pods having an ephemeral PVC for local-data volumes.
DatadogEphemeralLocalDataResource apiv1.ResourceName = "storageclass/ephemeral-local-data"

// DatadogLocalStorageProvisionerLabel is indicating which technology will be used to provide local storage
DatadogLocalStorageProvisionerLabel = "nodegroups.datadoghq.com/local-storage-provisioner"
Expand Down Expand Up @@ -84,8 +88,8 @@ func SetNodeLocalDataResource(nodeInfo *schedulerframework.NodeInfo) {
capacity := node.Labels[DatadogInitialStorageCapacityLabel]
capacityResource, err := resource.ParseQuantity(capacity)
if err == nil {
node.Status.Capacity[DatadogLocalDataResource] = capacityResource.DeepCopy()
node.Status.Allocatable[DatadogLocalDataResource] = capacityResource.DeepCopy()
node.Status.Capacity[DatadogEphemeralLocalDataResource] = capacityResource.DeepCopy()
node.Status.Allocatable[DatadogEphemeralLocalDataResource] = capacityResource.DeepCopy()
} else {
klog.Warningf("failed to attach capacity information (%s) to node (%s): %v", capacity, node.Name, err)
}
Expand Down
26 changes: 24 additions & 2 deletions cluster-autoscaler/processors/datadog/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func TestSetNodeLocalDataResource(t *testing.T) {
assert.True(t, ok)
assert.Equal(t, niValue, int64(1))

// Only DatadogLocalDataResource should be set
_, ok = ni.Node().Status.Allocatable[DatadogEphemeralLocalDataResource]
assert.False(t, ok)

_, ok = ni.Allocatable.ScalarResources[DatadogEphemeralLocalDataResource]
assert.False(t, ok)

assert.Equal(t, len(ni.Pods), 2)
}

Expand All @@ -129,13 +136,20 @@ func TestSetNodeResourceFromTopolvm(t *testing.T) {

SetNodeLocalDataResource(ni)

nodeValue, ok := ni.Node().Status.Allocatable[DatadogLocalDataResource]
nodeValue, ok := ni.Node().Status.Allocatable[DatadogEphemeralLocalDataResource]
assert.True(t, ok)
assert.Equal(t, nodeValue.String(), resource.NewQuantity(hundredGB, resource.BinarySI).String())

niValue, ok := ni.Allocatable.ScalarResources[DatadogLocalDataResource]
niValue, ok := ni.Allocatable.ScalarResources[DatadogEphemeralLocalDataResource]
assert.True(t, ok)
assert.Equal(t, niValue, hundredGB)

// Only DatadogEphemeralLocalDataResource should be set
_, ok = ni.Node().Status.Allocatable[DatadogLocalDataResource]
assert.False(t, ok)

_, ok = ni.Allocatable.ScalarResources[DatadogLocalDataResource]
assert.False(t, ok)
}

func TestShouldNotSetResourcesWithMissingLabel(t *testing.T) {
Expand All @@ -157,4 +171,12 @@ func TestShouldNotSetResourcesWithMissingLabel(t *testing.T) {

_, ok = ni.Allocatable.ScalarResources[DatadogLocalDataResource]
assert.False(t, ok)

_, ok = ni.Node().Status.Allocatable[DatadogEphemeralLocalDataResource]
assert.False(t, ok)
_, ok = ni.Node().Status.Capacity[DatadogEphemeralLocalDataResource]
assert.False(t, ok)

_, ok = ni.Allocatable.ScalarResources[DatadogEphemeralLocalDataResource]
assert.False(t, ok)
}
41 changes: 24 additions & 17 deletions cluster-autoscaler/processors/datadog/pods/transform_local_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/datadog/common"

apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
client "k8s.io/client-go/kubernetes"
Expand All @@ -66,7 +67,7 @@ import (

const (
storageClassNameLocal = "local-data"
storageClassNameTopolvm = "dynamic-local-data"
storageClassNameTopolvm = "ephemeral-local-data"
)

type transformLocalData struct {
Expand Down Expand Up @@ -95,19 +96,25 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap
for _, po := range pods {
var volumes []apiv1.Volume
for _, vol := range po.Spec.Volumes {
if vol.PersistentVolumeClaim == nil {
volumes = append(volumes, vol)
continue
}
pvc, err := p.pvcLister.PersistentVolumeClaims(po.Namespace).Get(vol.PersistentVolumeClaim.ClaimName)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Warningf("failed to fetch pvc for %s/%s: %v", po.GetNamespace(), po.GetName(), err)
var pvcSpec *apiv1.PersistentVolumeClaimSpec
if vol.PersistentVolumeClaim != nil {
pvc, err := p.pvcLister.PersistentVolumeClaims(po.Namespace).Get(vol.PersistentVolumeClaim.ClaimName)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Warningf("failed to fetch pvc for %s/%s: %v", po.GetNamespace(), po.GetName(), err)
}
volumes = append(volumes, vol)
continue
}
pvcSpec = &pvc.Spec
} else if vol.Ephemeral != nil {
pvcSpec = &vol.Ephemeral.VolumeClaimTemplate.Spec
} else {
volumes = append(volumes, vol)
continue
}
if !isSpecialPVCStorageClass(*pvc.Spec.StorageClassName) {

if !isSpecialPVCStorageClass(*pvcSpec.StorageClassName) {
volumes = append(volumes, vol)
continue
}
Expand All @@ -118,15 +125,15 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap
if len(po.Spec.Containers[0].Resources.Limits) == 0 {
po.Spec.Containers[0].Resources.Limits = apiv1.ResourceList{}
}
if len(pvc.Spec.Resources.Requests) == 0 {
pvc.Spec.Resources.Requests = apiv1.ResourceList{}
if len(pvcSpec.Resources.Requests) == 0 {
pvcSpec.Resources.Requests = apiv1.ResourceList{}
}

switch *pvc.Spec.StorageClassName {
switch *pvcSpec.StorageClassName {
case storageClassNameTopolvm:
if storage, ok := pvc.Spec.Resources.Requests["storage"]; ok {
po.Spec.Containers[0].Resources.Requests[common.DatadogLocalDataResource] = storage.DeepCopy()
po.Spec.Containers[0].Resources.Limits[common.DatadogLocalDataResource] = storage.DeepCopy()
if storage, ok := pvcSpec.Resources.Requests[corev1.ResourceStorage]; ok {
po.Spec.Containers[0].Resources.Requests[common.DatadogEphemeralLocalDataResource] = storage.DeepCopy()
po.Spec.Containers[0].Resources.Limits[common.DatadogEphemeralLocalDataResource] = storage.DeepCopy()
} else {
klog.Warningf("ignoring pvc as it does not have storage request information")
volumes = append(volumes, vol)
Expand All @@ -135,7 +142,7 @@ func (p *transformLocalData) Process(ctx *context.AutoscalingContext, pods []*ap
po.Spec.Containers[0].Resources.Requests[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy()
po.Spec.Containers[0].Resources.Limits[common.DatadogLocalDataResource] = common.DatadogLocalDataQuantity.DeepCopy()
default:
klog.Warningf("this should never be reached. pvc storage class (%s) cannot be used for scaling on pod: %s", *pvc.Spec.StorageClassName, po.Name)
klog.Warningf("this should never be reached. pvc storage class (%s) cannot be used for scaling on pod: %s", *pvcSpec.StorageClassName, po.Name)
volumes = append(volumes, vol)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pods

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -30,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/datadog/common"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"
)

var (
Expand All @@ -39,14 +41,17 @@ var (
testLdResources = corev1.ResourceList{
common.DatadogLocalDataResource: common.DatadogLocalDataQuantity.DeepCopy(),
}
test100GResource = resource.MustParse("100Gi")
testTopolvmResources = corev1.ResourceList{
common.DatadogEphemeralLocalDataResource: test100GResource,
}
testMixedResources = corev1.ResourceList{
common.DatadogLocalDataResource: common.DatadogLocalDataQuantity.DeepCopy(),
common.DatadogEphemeralLocalDataResource: test100GResource,
}
)

func TestTransformLocalDataProcess(t *testing.T) {
test100GResource, _ := resource.ParseQuantity("100Gi")
testTopolvmResources := corev1.ResourceList{
common.DatadogLocalDataResource: test100GResource,
}

tests := []struct {
name string
pods []*corev1.Pod
Expand Down Expand Up @@ -100,23 +105,21 @@ func TestTransformLocalDataProcess(t *testing.T) {
},

{
"topolvm provisioner is using proper storage capacity value",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "pvc-2")},
"topolvm handles ephemeral volumes",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "ephemeral-pvc-2")},
[]*corev1.PersistentVolumeClaim{
buildPVC("pvc-1", testRemoteClass),
buildPVCWithStorage("pvc-2", storageClassNameTopolvm, "100Gi"),
},
[]*corev1.Pod{buildPod("pod1", testTopolvmResources, testTopolvmResources, "pvc-1")},
},

{
"one pvc will override the other",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "pvc-1", "pvc-2")},
"ephemeral and persistent volumes are handled separately",
[]*corev1.Pod{buildPod("pod1", testEmptyResources, testEmptyResources, "ephemeral-pvc-1", "pvc-2")},
[]*corev1.PersistentVolumeClaim{
buildPVCWithStorage("pvc-1", storageClassNameTopolvm, "100Gi"),
buildPVC("pvc-2", storageClassNameLocal),
},
[]*corev1.Pod{buildPod("pod1", testLdResources, testLdResources)},
[]*corev1.Pod{buildPod("pod1", testMixedResources, testMixedResources)},
},
}

Expand Down Expand Up @@ -167,15 +170,36 @@ func buildPod(name string, requests, limits corev1.ResourceList, claimNames ...s
}

for _, name := range claimNames {
pod.Spec.Volumes = append(pod.Spec.Volumes,
corev1.Volume{
Name: name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: name,
if strings.Contains(name, "ephemeral") {
pod.Spec.Volumes = append(pod.Spec.Volumes,
corev1.Volume{
Name: name,
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: pointer.String(storageClassNameTopolvm),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("100Gi"),
},
},
},
},
},
},
},
})
})
} else {
pod.Spec.Volumes = append(pod.Spec.Volumes,
corev1.Volume{
Name: name,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: name,
},
},
})
}
}

return pod
Expand Down

0 comments on commit 411e2cd

Please sign in to comment.