Skip to content

Commit

Permalink
Revert "check volume directories instead of mounts for cleanupOrphane…
Browse files Browse the repository at this point in the history
…dPodDirs"
  • Loading branch information
msau42 authored Nov 20, 2020
1 parent 7d72ddc commit 25edb8b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 50 deletions.
4 changes: 2 additions & 2 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bo
return false
}

if kl.podVolumePathsExistInCacheOrDisk(pod.UID) && !kl.keepTerminatedPodVolumes {
if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
// We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
return false
Expand Down Expand Up @@ -1962,7 +1962,7 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupP
// parent croup. If the volumes still exist, reduce the cpu shares for any
// process in the cgroup to the minimum value while we wait. if the kubelet
// is configured to keep terminated volumes, we will delete the cgroup and not block.
if podVolumesExist := kl.podVolumePathsExistInCacheOrDisk(uid); podVolumesExist && !kl.keepTerminatedPodVolumes {
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist && !kl.keepTerminatedPodVolumes {
klog.V(3).Infof("Orphaned pod %q found, but volumes not yet removed. Reducing cpu to minimum", uid)
if err := pcm.ReduceCPULimits(val); err != nil {
klog.Warningf("Failed to reduce cpu time for pod %q pending volume cleanup due to %v", uid, err)
Expand Down
42 changes: 16 additions & 26 deletions pkg/kubelet/kubelet_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func (kl *Kubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume
return volumesToReturn, len(volumesToReturn) > 0
}

// podMountedVolumesExistInCacheOrDisk checks with the volume manager and returns true any of the
// podVolumesExist checks with the volume manager and returns true any of the
// pods for the specified volume are mounted.
func (kl *Kubelet) podMountedVolumesExistInCacheOrDisk(podUID types.UID) bool {
func (kl *Kubelet) podVolumesExist(podUID types.UID) bool {
if mountedVolumes :=
kl.volumeManager.GetMountedVolumesForPod(
volumetypes.UniquePodName(podUID)); len(mountedVolumes) > 0 {
return true
}
// TODO: This checks pod volume paths and whether they are mounted. If checking returns error, podMountedVolumesExistInCacheOrDisk will return true
// TODO: This checks pod volume paths and whether they are mounted. If checking returns error, podVolumesExist will return true
// which means we consider volumes might exist and requires further checking.
// There are some volume plugins such as flexvolume might not have mounts. See issue #61229
volumePaths, err := kl.getMountedVolumePathListFromDisk(podUID)
Expand All @@ -73,27 +73,6 @@ func (kl *Kubelet) podMountedVolumesExistInCacheOrDisk(podUID types.UID) bool {
return false
}

// podVolumePathsExistInCacheOrDisk checks with the volume manager and returns true any of the
// volumes for the specified pod are mounted or any of the volume paths of the specified pod exist.
func (kl *Kubelet) podVolumePathsExistInCacheOrDisk(podUID types.UID) bool {
if mountedVolumes :=
kl.volumeManager.GetMountedVolumesForPod(
volumetypes.UniquePodName(podUID)); len(mountedVolumes) > 0 {
return true
}

volumePaths, err := kl.getPodVolumePathListFromDisk(podUID)
if err != nil {
klog.Errorf("pod %q found, but error %v occurred during checking volume dirs from disk", podUID, err)
return true
}
if len(volumePaths) > 0 {
klog.V(4).Infof("pod %q found, but volume paths are still present on disk %v", podUID, volumePaths)
return true
}
return false
}

// newVolumeMounterFromPlugins attempts to find a plugin by volume spec, pod
// and volume options and then creates a Mounter.
// Returns a valid mounter or an error.
Expand Down Expand Up @@ -134,12 +113,23 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*v1.Pod, runningPods []*kubecon
continue
}
// If volumes have not been unmounted/detached, do not delete directory.
// If there are still volume directories, do not delete directory
// Doing so may result in corruption of data.
if kl.podVolumePathsExistInCacheOrDisk(uid) {
// TODO: getMountedVolumePathListFromDisk() call may be redundant with
// kl.getPodVolumePathListFromDisk(). Can this be cleaned up?
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
klog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up", uid)
continue
}
// If there are still volume directories, do not delete directory
volumePaths, err := kl.getPodVolumePathListFromDisk(uid)
if err != nil {
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but error %v occurred during reading volume dir from disk", uid, err))
continue
}
if len(volumePaths) > 0 {
orphanVolumeErrors = append(orphanVolumeErrors, fmt.Errorf("orphaned pod %q found, but volume paths are still present on disk", uid))
continue
}

// If there are any volume-subpaths, do not cleanup directories
volumeSubpathExists, err := kl.podVolumeSubpathsDirExists(uid)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_volumes_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestPodVolumesExistWithMount(t *testing.T) {
}
}

exist := kubelet.podMountedVolumesExistInCacheOrDisk(poduid)
exist := kubelet.podVolumesExist(poduid)
if tc.expected != exist {
t.Errorf("%s failed: expected %t, got %t", name, tc.expected, exist)
}
Expand Down
22 changes: 1 addition & 21 deletions pkg/kubelet/kubelet_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
"os"
"path"
"path/filepath"
)

func TestListVolumesForPod(t *testing.T) {
Expand Down Expand Up @@ -200,27 +196,11 @@ func TestPodVolumesExist(t *testing.T) {
}

for _, pod := range pods {
podVolumesExist := kubelet.podMountedVolumesExistInCacheOrDisk(pod.UID)
podVolumesExist := kubelet.podVolumesExist(pod.UID)
assert.True(t, podVolumesExist, "pod %q", pod.UID)
}
}

func TestPodVolumePathsExist(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
pod := podWithUIDNameNs(uuid.NewUUID(), "pod1", "ns")

// To pass volumeManager GetMountedVolumesForPod control create pod volume data directory directly
podVolumeDir := kubelet.getPodVolumeDir(pod.UID, "volumePlugin1", "volume1")
podDataDir := filepath.Join(podVolumeDir, "data1")
os.MkdirAll(path.Dir(podDataDir), 0755)
defer os.RemoveAll(path.Dir(kubelet.getPodDir(pod.UID)))

podVolumePathsExist := kubelet.podVolumePathsExistInCacheOrDisk(pod.UID)
assert.True(t, podVolumePathsExist, "pod %q", pod.UID)
}

func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
Expand Down

0 comments on commit 25edb8b

Please sign in to comment.