Merge pull request #136603 from 249043822/br003

Fix:NewUnmounter always returns error while deleting pod after node reboot
This commit is contained in:
Kubernetes Prow Robot 2026-02-03 11:12:26 +05:30 committed by GitHub
commit 5655a0c19d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 267 additions and 1 deletions

View file

@ -18,6 +18,7 @@ package reconciler
import (
"context"
"errors"
"fmt"
"hash/fnv"
"path/filepath"
@ -2008,6 +2009,27 @@ func waitForMount(
}
}
func waitForUnmount(
t *testing.T,
volumeName v1.UniqueVolumeName,
podName types.UniquePodName,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
testOperationBackOffDuration,
func() (bool, error) {
if asw.PodHasMountedVolumes(podName) {
return false, nil
}
return true, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for pod %q to be unmount from volume %q.", podName, volumeName)
}
}
func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
testOperationBackOffDuration,
@ -2432,3 +2454,145 @@ func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
}
}
}
func TestReconstructedVolumeShouldUnmountSucceedAfterSetupFailed(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
// Arrange
volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
// fake Setup error
fakePlugin.SetUpHook = func(plugin volume.VolumePlugin, mounterArgs volume.MounterArgs) error {
if !mounterArgs.ReconstructedVolume {
// mock cleaned volume files while it's not a reconstructed volume
plugin.(*volumetesting.FakeVolumePlugin).NewUnmounterError = errors.New("unmounter failed to load volume data file")
}
return errors.New("csiRPCError")
}
seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
kubeClient := createTestClient()
fakeRecorder := &record.FakeRecorder{}
fakeHandler := volumetesting.NewBlockVolumePathHandler()
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
volumePluginMgr,
fakeRecorder,
fakeHandler))
rc := NewReconciler(
kubeClient,
true, /* controllerAttachDetachEnabled */
reconcilerLoopSleepDuration,
waitForAttachTimeout,
nodeName,
dsw,
asw,
hasAddedPods,
oex,
mount.NewFakeMounter(nil),
hostutil.NewFakeHostUtil(nil),
volumePluginMgr,
kubeletPodsDir)
reconciler := rc.(*reconciler)
mode := v1.PersistentVolumeFilesystem
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "pv",
UID: "pvuid",
},
Spec: v1.PersistentVolumeSpec{
ClaimRef: &v1.ObjectReference{Name: "pvc"},
Capacity: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1G"),
},
VolumeMode: &mode,
},
}
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: "pvc",
UID: "pvcuid",
},
Spec: v1.PersistentVolumeClaimSpec{
Resources: v1.VolumeResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1G"),
},
},
VolumeName: "pv",
VolumeMode: &mode,
},
Status: v1.PersistentVolumeClaimStatus{
Capacity: v1.ResourceList{
v1.ResourceStorage: resource.MustParse("1G"),
},
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: v1.PodSpec{
Volumes: []v1.Volume{
{
Name: "volume-name",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
},
}
volumeSpec := &volume.Spec{PersistentVolume: pv}
podName := util.GetUniquePodName(pod)
generatedVolumeName, err := dsw.AddPodToVolume(logger,
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil)
// Assert
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
err = asw.AddAttachUncertainReconstructedVolume(logger, generatedVolumeName, volumeSpec, "" /* nodeName */, "fake/device/path")
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
mounter, err := fakePlugin.NewMounter(volumeSpec, pod)
if err != nil {
t.Fatalf("NewMounter failed. Expected: <no error> Actual: <%v>", err)
}
mapper, err := fakePlugin.NewBlockVolumeMapper(volumeSpec, pod)
if err != nil {
t.Fatalf("NewBlockVolumeMapper failed. Expected: <no error> Actual: <%v>", err)
}
markVolumeOpts := operationexecutor.MarkVolumeOpts{
PodName: podName,
PodUID: pod.UID,
VolumeName: generatedVolumeName,
Mounter: mounter,
BlockVolumeMapper: mapper,
VolumeSpec: volumeSpec,
VolumeMountState: operationexecutor.VolumeMountUncertain,
}
_, err = asw.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}
// Act first reconcile to trigger mount reconstructed volume
reconciler.reconcile(ctx)
waitForUncertainPodMount(t, generatedVolumeName, podName, asw)
// mock remove pod
dsw.DeletePodFromVolume(podName, generatedVolumeName)
// Act second reconcile to trigger unmount reconstructed volume after removed pod from volume
reconciler.reconcile(ctx)
// assert volume unmount succeed
waitForUnmount(t, generatedVolumeName, podName, asw)
}

View file

@ -314,7 +314,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
if csiRPCError != nil {
// If operation finished with error then we can remove the mount directory.
if volumetypes.IsOperationFinishedError(csiRPCError) {
if volumetypes.IsOperationFinishedError(csiRPCError) && !mounterArgs.ReconstructedVolume {
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
}

View file

@ -28,6 +28,8 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
authenticationv1 "k8s.io/api/authentication/v1"
corev1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
@ -1169,6 +1171,91 @@ func TestMounterSetUpWithFSGroup(t *testing.T) {
}
}
func TestMounterSetUpFWithNodePublishFinalError(t *testing.T) {
testCases := []struct {
name string
podUID types.UID
options []string
spec func(string, []string) *volume.Spec
reconstructedVolume bool
}{
{
name: "setup with reconstructed volume",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
spec: func(fsType string, options []string) *volume.Spec {
pvSrc := makeTestPV("pv1", 20, testDriver, "vol1")
pvSrc.Spec.CSI.FSType = fsType
pvSrc.Spec.MountOptions = options
return volume.NewSpecFromPersistentVolume(pvSrc, false)
},
reconstructedVolume: true,
},
{
name: "setup with new volume",
podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())),
spec: func(fsType string, options []string) *volume.Spec {
pvSrc := makeTestPV("pv1", 20, testDriver, "vol1")
pvSrc.Spec.CSI.FSType = fsType
pvSrc.Spec.MountOptions = options
return volume.NewSpecFromPersistentVolume(pvSrc, false)
},
reconstructedVolume: false,
},
}
for _, tc := range testCases {
volumeLifecycleModes := []storage.VolumeLifecycleMode{
storage.VolumeLifecyclePersistent,
}
driver := getTestCSIDriver(testDriver, nil, nil, volumeLifecycleModes)
fakeClient := fakeclient.NewClientset(driver)
plug, tmpDir := newTestPlugin(t, fakeClient)
defer func() {
_ = os.RemoveAll(tmpDir)
}()
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
t.Run(tc.name, func(t *testing.T) {
mounter, err := plug.NewMounter(
tc.spec("zfs", tc.options),
&corev1.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}},
)
if mounter == nil || err != nil {
t.Fatal("failed to create CSI mounter")
}
csiMounter := mounter.(*csiMountMgr)
csiMounter.csiClient = setupClient(t, true)
attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName()))
attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name())
_, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, meta.CreateOptions{})
if err != nil {
t.Fatalf("failed to setup VolumeAttachment: %v", err)
}
csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.SetNextError(status.Errorf(codes.InvalidArgument, "mount failed"))
// Mounter.SetUp()
if err := csiMounter.SetUp(volume.MounterArgs{ReconstructedVolume: tc.reconstructedVolume}); err == nil {
t.Fatalf("mounter.Setup expected err but succeed")
}
mountPath := csiMounter.GetPath()
volPath := filepath.Dir(mountPath)
dataFile := filepath.Join(volPath, volDataFileName)
if tc.reconstructedVolume {
if _, err := os.Stat(dataFile); os.IsNotExist(err) {
t.Errorf("volume file [%s] expects to be exists, but removed", dataFile)
}
return
}
if _, err := os.Stat(dataFile); err == nil {
t.Errorf("volume file [%s] expects to be removed, but exists", dataFile)
}
})
}
}
func TestUnmounterTeardown(t *testing.T) {
plug, tmpDir := newTestPlugin(t, nil)
defer os.RemoveAll(tmpDir)

View file

@ -199,6 +199,10 @@ type FakeVolumePlugin struct {
// Add callbacks as needed
WaitForAttachHook func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
UnmountDeviceHook func(globalMountPath string) error
SetUpHook func(plugin volume.VolumePlugin, mounterArgs volume.MounterArgs) error
// Inject error to NewUnmounterError
NewUnmounterError error
Mounters []*FakeVolume
Unmounters []*FakeVolume
@ -226,12 +230,14 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
defer volume.Unlock()
volume.WaitForAttachHook = plugin.WaitForAttachHook
volume.UnmountDeviceHook = plugin.UnmountDeviceHook
volume.SetUpHook = plugin.SetUpHook
return volume
}
}
volume := &FakeVolume{
WaitForAttachHook: plugin.WaitForAttachHook,
UnmountDeviceHook: plugin.UnmountDeviceHook,
SetUpHook: plugin.SetUpHook,
}
volume.VolumesAttached = make(map[string]sets.Set[string])
volume.DeviceMountState = make(map[string]string)
@ -321,6 +327,9 @@ func (plugin *FakeVolumePlugin) GetMounters() (Mounters []*FakeVolume) {
func (plugin *FakeVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
plugin.Lock()
defer plugin.Unlock()
if plugin.NewUnmounterError != nil {
return nil, plugin.NewUnmounterError
}
fakeVolume := plugin.getFakeVolume(&plugin.Unmounters)
fakeVolume.Lock()
defer fakeVolume.Unlock()
@ -689,6 +698,7 @@ type FakeVolume struct {
// Add callbacks as needed
WaitForAttachHook func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
UnmountDeviceHook func(globalMountPath string) error
SetUpHook func(plugin volume.VolumePlugin, mounterArgs volume.MounterArgs) error
SetUpCallCount int
TearDownCallCount int
@ -738,6 +748,9 @@ func (fv *FakeVolume) SetUp(mounterArgs volume.MounterArgs) error {
defer fv.Unlock()
err := fv.setupInternal(mounterArgs)
fv.SetUpCallCount++
if fv.SetUpHook != nil {
return fv.SetUpHook(fv.Plugin, mounterArgs)
}
return err
}

View file

@ -586,6 +586,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
FSGroupChangePolicy: fsGroupChangePolicy,
Recorder: og.recorder,
SELinuxLabel: volumeToMount.SELinuxLabel,
ReconstructedVolume: actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName),
})
// Update actual state of world
markOpts := MarkVolumeOpts{

View file

@ -138,6 +138,7 @@ type MounterArgs struct {
// Optional interface that will be used to change the ownership of the volume, if specified.
// mainly used by unit tests
VolumeOwnershipApplicator VolumeOwnershipChanger
ReconstructedVolume bool
}
type VolumeOwnershipChanger interface {