Skip to content

Commit

Permalink
Merge pull request kubernetes#74863 from gnufied/csi-volume-expansion
Browse files Browse the repository at this point in the history
CSI volume expansion
  • Loading branch information
k8s-ci-robot authored Mar 8, 2019
2 parents f229aa0 + 1bd9ed0 commit 3624c74
Show file tree
Hide file tree
Showing 32 changed files with 1,813 additions and 403 deletions.
4 changes: 2 additions & 2 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ const (
// Ability to expand persistent volumes' file system without unmounting volumes.
ExpandInUsePersistentVolumes utilfeature.Feature = "ExpandInUsePersistentVolumes"

// owner: @gnufied
// alpha: v1.14
// Ability to expand CSI volumes
ExpandCSIVolumes utilfeature.Feature = "ExpandCSIVolumes"

// owner: @verb
// alpha: v1.10
//
Expand Down Expand Up @@ -450,6 +455,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
QOSReserved: {Default: false, PreRelease: utilfeature.Alpha},
ExpandPersistentVolumes: {Default: true, PreRelease: utilfeature.Beta},
ExpandInUsePersistentVolumes: {Default: false, PreRelease: utilfeature.Alpha},
ExpandCSIVolumes: {Default: false, PreRelease: utilfeature.Alpha},
AttachVolumeLimit: {Default: true, PreRelease: utilfeature.Beta},
CPUManager: {Default: true, PreRelease: utilfeature.Beta},
CPUCFSQuotaPeriod: {Default: false, PreRelease: utilfeature.Alpha},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/volumemanager/cache/actual_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (asw *actualStateOfWorld) MarkFSResizeRequired(
}

volumePlugin, err :=
asw.volumePluginMgr.FindExpandablePluginBySpec(podObj.volumeSpec)
asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.Errorf(
Expand Down
11 changes: 7 additions & 4 deletions pkg/volume/awsebs/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,15 @@ func (plugin *awsElasticBlockStorePlugin) ExpandVolumeDevice(
return awsVolume.ResizeDisk(volumeID, oldSize, newSize)
}

func (plugin *awsElasticBlockStorePlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *awsElasticBlockStorePlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}

var _ volume.FSResizableVolumePlugin = &awsElasticBlockStorePlugin{}
var _ volume.NodeExpandableVolumePlugin = &awsElasticBlockStorePlugin{}
var _ volume.ExpandableVolumePlugin = &awsElasticBlockStorePlugin{}
var _ volume.VolumePluginWithAttachLimits = &awsElasticBlockStorePlugin{}

Expand Down
11 changes: 7 additions & 4 deletions pkg/volume/azure_dd/azure_dd.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,15 @@ func (plugin *azureDataDiskPlugin) ExpandVolumeDevice(
return diskController.ResizeDisk(spec.PersistentVolume.Spec.AzureDisk.DataDiskURI, oldSize, newSize)
}

func (plugin *azureDataDiskPlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *azureDataDiskPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}

var _ volume.FSResizableVolumePlugin = &azureDataDiskPlugin{}
var _ volume.NodeExpandableVolumePlugin = &azureDataDiskPlugin{}

func (plugin *azureDataDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter(plugin.GetPluginName())
Expand Down
11 changes: 7 additions & 4 deletions pkg/volume/cinder/cinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,15 @@ func (plugin *cinderPlugin) ExpandVolumeDevice(spec *volume.Spec, newSize resour
return expandedSize, nil
}

func (plugin *cinderPlugin) ExpandFS(spec *volume.Spec, devicePath, deviceMountPath string, _, _ resource.Quantity) error {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), devicePath, deviceMountPath)
return err
func (plugin *cinderPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
_, err := util.GenericResizeFS(plugin.host, plugin.GetPluginName(), resizeOptions.DevicePath, resizeOptions.DeviceMountPath)
if err != nil {
return false, err
}
return true, nil
}

var _ volume.FSResizableVolumePlugin = &cinderPlugin{}
var _ volume.NodeExpandableVolumePlugin = &cinderPlugin{}

func (plugin *cinderPlugin) RequiresFSResize() bool {
return true
Expand Down
3 changes: 3 additions & 0 deletions pkg/volume/csi/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"csi_mounter.go",
"csi_plugin.go",
"csi_util.go",
"expander.go",
],
importpath = "k8s.io/kubernetes/pkg/volume/csi",
visibility = ["//visibility:public"],
Expand All @@ -22,6 +23,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
Expand Down Expand Up @@ -50,6 +52,7 @@ go_test(
"csi_drivers_store_test.go",
"csi_mounter_test.go",
"csi_plugin_test.go",
"expander_test.go",
],
embed = [":go_default_library"],
deps = [
Expand Down
73 changes: 73 additions & 0 deletions pkg/volume/csi/csi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -55,6 +56,7 @@ type csiClient interface {
fsType string,
mountOptions []string,
) error
NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error)
NodeUnpublishVolume(
ctx context.Context,
volID string,
Expand All @@ -71,6 +73,7 @@ type csiClient interface {
) error
NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
NodeSupportsStageUnstage(ctx context.Context) (bool, error)
NodeSupportsNodeExpand(ctx context.Context) (bool, error)
}

// Strongly typed address
Expand Down Expand Up @@ -304,6 +307,41 @@ func (c *csiDriverClient) NodePublishVolume(

}

func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, volumeID, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
if c.nodeV1ClientCreator == nil {
return newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
}

if volumeID == "" {
return newSize, errors.New("missing volume id")
}
if volumePath == "" {
return newSize, errors.New("missing volume path")
}

if newSize.Value() < 0 {
return newSize, errors.New("size can not be less than 0")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return newSize, err
}
defer closer.Close()

req := &csipbv1.NodeExpandVolumeRequest{
VolumeId: volumeID,
VolumePath: volumePath,
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
}
resp, err := nodeClient.NodeExpandVolume(ctx, req)
if err != nil {
return newSize, err
}
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
return *updatedQuantity, nil
}

func (c *csiDriverClient) nodePublishVolumeV1(
ctx context.Context,
volID string,
Expand Down Expand Up @@ -624,6 +662,41 @@ func (c *csiDriverClient) nodeUnstageVolumeV0(ctx context.Context, volID, stagin
return err
}

func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if Node has EXPAND_VOLUME capability"))

if c.nodeV1ClientCreator != nil {
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
if err != nil {
return false, err
}
defer closer.Close()

req := &csipbv1.NodeGetCapabilitiesRequest{}
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}

capabilities := resp.GetCapabilities()

nodeExpandSet := false
if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
nodeExpandSet = true
}
}
return nodeExpandSet, nil
} else if c.nodeV0ClientCreator != nil {
return false, nil
}
return false, fmt.Errorf("failed to call NodeSupportsNodeExpand. Both nodeV1ClientCreator and nodeV0ClientCreator are nil")

}

func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if NodeSupportsStageUnstage"))

Expand Down
105 changes: 105 additions & 0 deletions pkg/volume/csi/csi_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/volume/csi/fake"
)

Expand All @@ -40,6 +41,13 @@ func newFakeCsiDriverClient(t *testing.T, stagingCapable bool) *fakeCsiDriverCli
}
}

func newFakeCsiDriverClientWithExpansion(t *testing.T, stagingCapable bool, expansionSet bool) *fakeCsiDriverClient {
return &fakeCsiDriverClient{
t: t,
nodeClient: fake.NewNodeClientWithExpansion(stagingCapable, expansionSet),
}
}

func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
nodeID string,
maxVolumePerNode int64,
Expand Down Expand Up @@ -144,6 +152,28 @@ func (c *fakeCsiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stag
return err
}

func (c *fakeCsiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
c.t.Log("calling fake.NodeSupportsNodeExpand...")
req := &csipbv1.NodeGetCapabilitiesRequest{}

resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return false, err
}

capabilities := resp.GetCapabilities()

if capabilities == nil {
return false, nil
}
for _, capability := range capabilities {
if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME {
return true, nil
}
}
return false, nil
}

func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsStageUnstage...")
req := &csipbv1.NodeGetCapabilitiesRequest{}
Expand All @@ -166,10 +196,29 @@ func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (boo
return stageUnstageSet, nil
}

func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, volumeid, volumePath string, newSize resource.Quantity) (resource.Quantity, error) {
c.t.Log("calling fake.NodeExpandVolume")
req := &csipbv1.NodeExpandVolumeRequest{
VolumeId: volumeid,
VolumePath: volumePath,
CapacityRange: &csipbv1.CapacityRange{RequiredBytes: newSize.Value()},
}
resp, err := c.nodeClient.NodeExpandVolume(ctx, req)
if err != nil {
return newSize, err
}
updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
return *updatedQuantity, nil
}

func setupClient(t *testing.T, stageUnstageSet bool) csiClient {
return newFakeCsiDriverClient(t, stageUnstageSet)
}

func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet bool) csiClient {
return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet)
}

func checkErr(t *testing.T, expectedAnError bool, actualError error) {
t.Helper()

Expand Down Expand Up @@ -415,3 +464,59 @@ func TestClientNodeUnstageVolume(t *testing.T) {
}
}
}

func TestNodeExpandVolume(t *testing.T) {
testCases := []struct {
name string
volID string
volumePath string
newSize resource.Quantity
mustFail bool
err error
}{
{
name: "with all correct values",
volID: "vol-abcde",
volumePath: "/foo/bar",
newSize: resource.MustParse("10Gi"),
mustFail: false,
},
{
name: "with missing volume-id",
volumePath: "/foo/bar",
newSize: resource.MustParse("10Gi"),
mustFail: true,
},
{
name: "with missing volume path",
volID: "vol-1234",
newSize: resource.MustParse("10Gi"),
mustFail: true,
},
{
name: "with invalid quantity",
volID: "vol-1234",
volumePath: "/foo/bar",
newSize: *resource.NewQuantity(-10, resource.DecimalSI),
mustFail: true,
},
}

for _, tc := range testCases {
t.Logf("Running test cases : %s", tc.name)
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
},
}
_, err := client.NodeExpandVolume(context.Background(), tc.volID, tc.volumePath, tc.newSize)
checkErr(t, tc.mustFail, err)
if !tc.mustFail {
fakeCloser.Check()
}
}
}
Loading

0 comments on commit 3624c74

Please sign in to comment.