diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go index ec907e886bb..79932984fba 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler_test.go @@ -18,6 +18,7 @@ package reconciler import ( "context" + "errors" "fmt" "hash/fnv" "path/filepath" @@ -2008,6 +2009,27 @@ func waitForMount( } } +func waitForUnmount( + t *testing.T, + volumeName v1.UniqueVolumeName, + podName types.UniquePodName, + asw cache.ActualStateOfWorld) { + err := retryWithExponentialBackOff( + testOperationBackOffDuration, + func() (bool, error) { + if asw.PodHasMountedVolumes(podName) { + return false, nil + } + + return true, nil + }, + ) + + if err != nil { + t.Fatalf("Timed out waiting for pod %q to be unmount from volume %q.", podName, volumeName) + } +} + func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) { err := retryWithExponentialBackOff( testOperationBackOffDuration, @@ -2432,3 +2454,145 @@ func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) { } } } + +func TestReconstructedVolumeShouldUnmountSucceedAfterSetupFailed(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t) + // fake Setup error + fakePlugin.SetUpHook = func(plugin volume.VolumePlugin, mounterArgs volume.MounterArgs) error { + if !mounterArgs.ReconstructedVolume { + // mock cleaned volume files while it's not a reconstructed volume + plugin.(*volumetesting.FakeVolumePlugin).NewUnmounterError = errors.New("unmounter failed to load volume data file") + } + return errors.New("csiRPCError") + } + seLinuxTranslator := util.NewFakeSELinuxLabelTranslator() + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator) + asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) + kubeClient := createTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + kubeClient, + volumePluginMgr, + fakeRecorder, + fakeHandler)) + rc := NewReconciler( + kubeClient, + true, /* controllerAttachDetachEnabled */ + reconcilerLoopSleepDuration, + waitForAttachTimeout, + nodeName, + dsw, + asw, + hasAddedPods, + oex, + mount.NewFakeMounter(nil), + hostutil.NewFakeHostUtil(nil), + volumePluginMgr, + kubeletPodsDir) + reconciler := rc.(*reconciler) + + mode := v1.PersistentVolumeFilesystem + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv", + UID: "pvuid", + }, + Spec: v1.PersistentVolumeSpec{ + ClaimRef: &v1.ObjectReference{Name: "pvc"}, + Capacity: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1G"), + }, + VolumeMode: &mode, + }, + } + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc", + UID: "pvcuid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1G"), + }, + }, + VolumeName: "pv", + VolumeMode: &mode, + }, + Status: v1.PersistentVolumeClaimStatus{ + Capacity: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1G"), + }, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: v1.PodSpec{ + Volumes: []v1.Volume{ + { + Name: "volume-name", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }, + }, + }, + } + + volumeSpec := &volume.Spec{PersistentVolume: pv} + podName := util.GetUniquePodName(pod) + generatedVolumeName, err := dsw.AddPodToVolume(logger, + podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil) + + // Assert + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + err = asw.AddAttachUncertainReconstructedVolume(logger, generatedVolumeName, volumeSpec, "" /* nodeName */, "fake/device/path") + if err != nil { + t.Fatalf("MarkVolumeAsAttached failed. Expected: Actual: <%v>", err) + } + + mounter, err := fakePlugin.NewMounter(volumeSpec, pod) + if err != nil { + t.Fatalf("NewMounter failed. Expected: Actual: <%v>", err) + } + + mapper, err := fakePlugin.NewBlockVolumeMapper(volumeSpec, pod) + if err != nil { + t.Fatalf("NewBlockVolumeMapper failed. Expected: Actual: <%v>", err) + } + markVolumeOpts := operationexecutor.MarkVolumeOpts{ + PodName: podName, + PodUID: pod.UID, + VolumeName: generatedVolumeName, + Mounter: mounter, + BlockVolumeMapper: mapper, + VolumeSpec: volumeSpec, + VolumeMountState: operationexecutor.VolumeMountUncertain, + } + _, err = asw.CheckAndMarkVolumeAsUncertainViaReconstruction(markVolumeOpts) + if err != nil { + t.Fatalf("AddPodToVolume failed. Expected: Actual: <%v>", err) + } + + // Act first reconcile to trigger mount reconstructed volume + reconciler.reconcile(ctx) + waitForUncertainPodMount(t, generatedVolumeName, podName, asw) + + // mock remove pod + dsw.DeletePodFromVolume(podName, generatedVolumeName) + // Act second reconcile to trigger unmount reconstructed volume after removed pod from volume + reconciler.reconcile(ctx) + + // assert volume unmount succeed + waitForUnmount(t, generatedVolumeName, podName, asw) +} diff --git a/pkg/volume/csi/csi_mounter.go b/pkg/volume/csi/csi_mounter.go index 63ac8341074..9f8e64ccf32 100644 --- a/pkg/volume/csi/csi_mounter.go +++ b/pkg/volume/csi/csi_mounter.go @@ -314,7 +314,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error if csiRPCError != nil { // If operation finished with error then we can remove the mount directory. - if volumetypes.IsOperationFinishedError(csiRPCError) { + if volumetypes.IsOperationFinishedError(csiRPCError) && !mounterArgs.ReconstructedVolume { if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil { klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr)) } diff --git a/pkg/volume/csi/csi_mounter_test.go b/pkg/volume/csi/csi_mounter_test.go index 62aecb809f0..1bf5bbd69a4 100644 --- a/pkg/volume/csi/csi_mounter_test.go +++ b/pkg/volume/csi/csi_mounter_test.go @@ -28,6 +28,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" authenticationv1 "k8s.io/api/authentication/v1" corev1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" @@ -1169,6 +1171,91 @@ func TestMounterSetUpWithFSGroup(t *testing.T) { } } +func TestMounterSetUpFWithNodePublishFinalError(t *testing.T) { + testCases := []struct { + name string + podUID types.UID + options []string + spec func(string, []string) *volume.Spec + reconstructedVolume bool + }{ + { + name: "setup with reconstructed volume", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + spec: func(fsType string, options []string) *volume.Spec { + pvSrc := makeTestPV("pv1", 20, testDriver, "vol1") + pvSrc.Spec.CSI.FSType = fsType + pvSrc.Spec.MountOptions = options + return volume.NewSpecFromPersistentVolume(pvSrc, false) + }, + reconstructedVolume: true, + }, + { + name: "setup with new volume", + podUID: types.UID(fmt.Sprintf("%08X", rand.Uint64())), + spec: func(fsType string, options []string) *volume.Spec { + pvSrc := makeTestPV("pv1", 20, testDriver, "vol1") + pvSrc.Spec.CSI.FSType = fsType + pvSrc.Spec.MountOptions = options + return volume.NewSpecFromPersistentVolume(pvSrc, false) + }, + reconstructedVolume: false, + }, + } + + for _, tc := range testCases { + volumeLifecycleModes := []storage.VolumeLifecycleMode{ + storage.VolumeLifecyclePersistent, + } + driver := getTestCSIDriver(testDriver, nil, nil, volumeLifecycleModes) + fakeClient := fakeclient.NewClientset(driver) + plug, tmpDir := newTestPlugin(t, fakeClient) + defer func() { + _ = os.RemoveAll(tmpDir) + }() + registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t) + t.Run(tc.name, func(t *testing.T) { + mounter, err := plug.NewMounter( + tc.spec("zfs", tc.options), + &corev1.Pod{ObjectMeta: meta.ObjectMeta{UID: tc.podUID, Namespace: testns}}, + ) + if mounter == nil || err != nil { + t.Fatal("failed to create CSI mounter") + } + + csiMounter := mounter.(*csiMountMgr) + csiMounter.csiClient = setupClient(t, true) + + attachID := getAttachmentName(csiMounter.volumeID, string(csiMounter.driverName), string(plug.host.GetNodeName())) + attachment := makeTestAttachment(attachID, "test-node", csiMounter.spec.Name()) + _, err = csiMounter.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, meta.CreateOptions{}) + if err != nil { + t.Fatalf("failed to setup VolumeAttachment: %v", err) + } + + csiMounter.csiClient.(*fakeCsiDriverClient).nodeClient.SetNextError(status.Errorf(codes.InvalidArgument, "mount failed")) + + // Mounter.SetUp() + if err := csiMounter.SetUp(volume.MounterArgs{ReconstructedVolume: tc.reconstructedVolume}); err == nil { + t.Fatalf("mounter.Setup expected err but succeed") + } + + mountPath := csiMounter.GetPath() + volPath := filepath.Dir(mountPath) + dataFile := filepath.Join(volPath, volDataFileName) + if tc.reconstructedVolume { + if _, err := os.Stat(dataFile); os.IsNotExist(err) { + t.Errorf("volume file [%s] expects to be exists, but removed", dataFile) + } + return + } + if _, err := os.Stat(dataFile); err == nil { + t.Errorf("volume file [%s] expects to be removed, but exists", dataFile) + } + }) + } +} + func TestUnmounterTeardown(t *testing.T) { plug, tmpDir := newTestPlugin(t, nil) defer os.RemoveAll(tmpDir) diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index e96df4eb0f9..dba600bd25c 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -199,6 +199,10 @@ type FakeVolumePlugin struct { // Add callbacks as needed WaitForAttachHook func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) UnmountDeviceHook func(globalMountPath string) error + SetUpHook func(plugin volume.VolumePlugin, mounterArgs volume.MounterArgs) error + + // Inject error to NewUnmounterError + NewUnmounterError error Mounters []*FakeVolume Unmounters []*FakeVolume @@ -226,12 +230,14 @@ func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume { defer volume.Unlock() volume.WaitForAttachHook = plugin.WaitForAttachHook volume.UnmountDeviceHook = plugin.UnmountDeviceHook + volume.SetUpHook = plugin.SetUpHook return volume } } volume := &FakeVolume{ WaitForAttachHook: plugin.WaitForAttachHook, UnmountDeviceHook: plugin.UnmountDeviceHook, + SetUpHook: plugin.SetUpHook, } volume.VolumesAttached = make(map[string]sets.Set[string]) volume.DeviceMountState = make(map[string]string) @@ -321,6 +327,9 @@ func (plugin *FakeVolumePlugin) GetMounters() (Mounters []*FakeVolume) { func (plugin *FakeVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) { plugin.Lock() defer plugin.Unlock() + if plugin.NewUnmounterError != nil { + return nil, plugin.NewUnmounterError + } fakeVolume := plugin.getFakeVolume(&plugin.Unmounters) fakeVolume.Lock() defer fakeVolume.Unlock() @@ -689,6 +698,7 @@ type FakeVolume struct { // Add callbacks as needed WaitForAttachHook func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) UnmountDeviceHook func(globalMountPath string) error + SetUpHook func(plugin volume.VolumePlugin, mounterArgs volume.MounterArgs) error SetUpCallCount int TearDownCallCount int @@ -738,6 +748,9 @@ func (fv *FakeVolume) SetUp(mounterArgs volume.MounterArgs) error { defer fv.Unlock() err := fv.setupInternal(mounterArgs) fv.SetUpCallCount++ + if fv.SetUpHook != nil { + return fv.SetUpHook(fv.Plugin, mounterArgs) + } return err } diff --git a/pkg/volume/util/operationexecutor/operation_generator.go b/pkg/volume/util/operationexecutor/operation_generator.go index 2707f703448..85965b9d8d3 100644 --- a/pkg/volume/util/operationexecutor/operation_generator.go +++ b/pkg/volume/util/operationexecutor/operation_generator.go @@ -586,6 +586,7 @@ func (og *operationGenerator) GenerateMountVolumeFunc( FSGroupChangePolicy: fsGroupChangePolicy, Recorder: og.recorder, SELinuxLabel: volumeToMount.SELinuxLabel, + ReconstructedVolume: actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName), }) // Update actual state of world markOpts := MarkVolumeOpts{ diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index 4a6fc55e29b..152c7bc68ef 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -138,6 +138,7 @@ type MounterArgs struct { // Optional interface that will be used to change the ownership of the volume, if specified. // mainly used by unit tests VolumeOwnershipApplicator VolumeOwnershipChanger + ReconstructedVolume bool } type VolumeOwnershipChanger interface {