From f1f3a08ba737cba6304252d56a55b3216b672b80 Mon Sep 17 00:00:00 2001 From: Maciej Wyrzuc Date: Thu, 11 Dec 2025 09:41:12 +0000 Subject: [PATCH] Put pods preempted in WaitOnPermit in backoff queue --- .../framework/preemption/preemption.go | 12 +- pkg/scheduler/framework/runtime/framework.go | 5 +- .../framework/runtime/waiting_pods_map.go | 25 ++- .../runtime/waiting_pods_map_test.go | 103 ++++++++++ .../kube-scheduler/framework/interface.go | 5 +- .../scheduler/preemption/preemption_test.go | 186 ++++++++++++++++++ 6 files changed, 324 insertions(+), 12 deletions(-) create mode 100644 pkg/scheduler/framework/runtime/waiting_pods_map_test.go diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 08819574be0..bc53ac2fe1d 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -177,12 +177,16 @@ func NewEvaluator(pluginName string, fh fwk.Handle, i Interface, enableAsyncPree ev.PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error { logger := klog.FromContext(ctx) - // If the victim is a WaitingPod, send a reject message to the PermitPlugin. + skipAPICall := false + // If the victim is a WaitingPod, try to preempt it without a delete call (victim will go back to backoff queue). // Otherwise we should delete the victim. if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil { - waitingPod.Reject(pluginName, "preempted") - logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name()) - } else { + if waitingPod.Preempt(pluginName, "preempted") { + logger.V(2).Info("Preemptor pod preempted a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name()) + skipAPICall = true + } + } + if !skipAPICall { condition := &v1.PodCondition{ Type: v1.DisruptionTarget, ObservedGeneration: apipod.CalculatePodConditionObservedGeneration(&victim.Status, victim.Generation, v1.DisruptionTarget), diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 350c8e0c7fe..2b139c29bb4 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1791,11 +1791,10 @@ func (f *frameworkImpl) GetWaitingPod(uid types.UID) fwk.WaitingPod { } // RejectWaitingPod rejects a WaitingPod given its UID. -// The returned value indicates if the given pod is waiting or not. +// The returned value indicates if the rejection was successful. func (f *frameworkImpl) RejectWaitingPod(uid types.UID) bool { if waitingPod := f.waitingPods.get(uid); waitingPod != nil { - waitingPod.Reject("", "removed") - return true + return waitingPod.Reject("", "removed") } return false } diff --git a/pkg/scheduler/framework/runtime/waiting_pods_map.go b/pkg/scheduler/framework/runtime/waiting_pods_map.go index dc054ee7f5d..4bc2bbeb9fb 100644 --- a/pkg/scheduler/framework/runtime/waiting_pods_map.go +++ b/pkg/scheduler/framework/runtime/waiting_pods_map.go @@ -75,6 +75,7 @@ type waitingPod struct { pendingPlugins map[string]*time.Timer s chan *fwk.Status mu sync.RWMutex + done bool } var _ fwk.WaitingPod = &waitingPod{} @@ -141,15 +142,26 @@ func (w *waitingPod) Allow(pluginName string) { } // The select clause works as a non-blocking send. - // If there is no receiver, it's a no-op (default case). + // If there is no place in the buffer, it's a no-op (default case). select { case w.s <- fwk.NewStatus(fwk.Success, ""): default: } + w.done = true } // Reject declares the waiting pod unschedulable. -func (w *waitingPod) Reject(pluginName, msg string) { +func (w *waitingPod) Reject(pluginName, msg string) bool { + return w.stopWithStatus(fwk.Unschedulable, pluginName, msg) +} + +// Preempt declares the waiting pod is preempted. Compared to reject it does not mark the pod as unschedulable, +// allowing it to be rescheduled. +func (w *waitingPod) Preempt(pluginName, msg string) bool { + return w.stopWithStatus(fwk.Error, pluginName, msg) +} + +func (w *waitingPod) stopWithStatus(status fwk.Code, pluginName, msg string) bool { w.mu.RLock() defer w.mu.RUnlock() for _, timer := range w.pendingPlugins { @@ -157,9 +169,14 @@ func (w *waitingPod) Reject(pluginName, msg string) { } // The select clause works as a non-blocking send. - // If there is no receiver, it's a no-op (default case). + // If there is no place in the buffer, it's a no-op (default case). select { - case w.s <- fwk.NewStatus(fwk.Unschedulable, msg).WithPlugin(pluginName): + case w.s <- fwk.NewStatus(status, msg).WithPlugin(pluginName): default: } + if w.done { + return false + } + w.done = true + return true } diff --git a/pkg/scheduler/framework/runtime/waiting_pods_map_test.go b/pkg/scheduler/framework/runtime/waiting_pods_map_test.go new file mode 100644 index 00000000000..8e49952468d --- /dev/null +++ b/pkg/scheduler/framework/runtime/waiting_pods_map_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "testing" + "time" + + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +func TestWaitingPodReturnFalseOnAllowed(t *testing.T) { + tests := []struct { + name string + action func(wp *waitingPod) bool + actionName string + }{ + { + name: "Preempt returns false on allowed pod", + action: func(wp *waitingPod) bool { + return wp.Preempt("preemption-plugin", "preempted") + }, + actionName: "Preempt", + }, + { + name: "Reject returns false on allowed pod", + action: func(wp *waitingPod) bool { + return wp.Reject("reject-plugin", "rejected") + }, + actionName: "Reject", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pod := st.MakePod().Name("test-pod").UID("test-uid").Obj() + wp := newWaitingPod(pod, map[string]time.Duration{"plugin1": 10 * time.Second}) + + // 1. Simulate Allow from Plugin + wp.Allow("plugin1") + + // 2. Perform Action + if success := tt.action(wp); success { + t.Errorf("Expected %s to return false (failed), but it returned true (success)", tt.actionName) + } + + // 3. Check what signal the pod received (should be Success from Allow) + select { + case status := <-wp.s: + if !status.IsSuccess() { + t.Errorf("Expected Pod to stay Allowed (Success), but got status: %v", status) + } + default: + t.Fatal("No status received") + } + }) + } +} + +func TestWaitingPodMultipleActions(t *testing.T) { + pod := st.MakePod().Name("test-pod").UID("test-uid").Obj() + wp := newWaitingPod(pod, map[string]time.Duration{"plugin1": 10 * time.Second}) + startWaitOnPermit := make(chan struct{}) + endWaitOnPermit := make(chan struct{}) + + // Simulate WaitOnPermit + go func() { + close(startWaitOnPermit) + <-wp.s + close(endWaitOnPermit) + }() + + <-startWaitOnPermit + // 1. Simulate Allow from Plugin, it should be consumed by WaitOnPermit + wp.Allow("plugin1") + <-endWaitOnPermit + + // 2. Simulate Rejection from Plugin + res := wp.Reject("plugin2", "rejected") + if res { + t.Fatalf("Expected reject to fail, but it succeeded") + } + + // 3. Simulate Preempt from Plugin + res = wp.Preempt("preemption-plugin", "preempted") + if res { + t.Fatalf("Expected preempt to fail, but it succeeded") + } +} diff --git a/staging/src/k8s.io/kube-scheduler/framework/interface.go b/staging/src/k8s.io/kube-scheduler/framework/interface.go index dff535cb4c6..d32edd34832 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/interface.go +++ b/staging/src/k8s.io/kube-scheduler/framework/interface.go @@ -326,7 +326,10 @@ type WaitingPod interface { // to unblock the pod. Allow(pluginName string) // Reject declares the waiting pod unschedulable. - Reject(pluginName, msg string) + Reject(pluginName, msg string) bool + // Preempt preempts the waiting pod. Compared to reject it does not mark the pod as unschedulable, + // allowing it to be rescheduled. + Preempt(pluginName, msg string) bool } // PreFilterResult wraps needed info for scheduler framework to act upon PreFilter phase. diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index ff14d11dacc..8a3b621a35a 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -28,6 +28,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -1619,3 +1620,188 @@ var _ fwk.PreFilterPlugin = &reservingPlugin{} var _ fwk.FilterPlugin = &reservingPlugin{} var _ fwk.PreFilterExtensions = &reservingPlugin{} var _ fwk.ReservePlugin = &reservingPlugin{} + +type blockedPod struct { + blocked chan struct{} +} + +// blockingPermitPlugin is a Permit plugin that blocks until a signal is received. +type blockingPermitPlugin struct { + podsToBlock map[string]*blockedPod +} + +const blockingPermitPluginName = "blocking-permit-plugin" + +var _ fwk.PermitPlugin = &blockingPermitPlugin{} + +func newBlockingPermitPlugin(_ context.Context, _ runtime.Object, h fwk.Handle) fwk.Plugin { + return &blockingPermitPlugin{ + podsToBlock: make(map[string]*blockedPod), + } +} + +func (pl *blockingPermitPlugin) Name() string { + return blockingPermitPluginName +} + +func (pl *blockingPermitPlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) { + if p, ok := pl.podsToBlock[pod.Name]; ok { + p.blocked <- struct{}{} + delete(pl.podsToBlock, pod.Name) + return fwk.NewStatus(fwk.Wait, "waiting"), time.Minute + } + return nil, 0 +} + +func TestPreemptionRespectsWaitingPod(t *testing.T) { + // 1. Create a "blocking" permit plugin that signals when it's running and waits for a specific close. + // 2. Create a big node on which low-priority pod will be scheduled. + // 3. Schedule a low-priority pod (victim) that hits this plugin (after being selected to run on a big node). + // 4. While victim is blocked in WaitOnPermit, add a smaller node on which the victim should be rescheduled. + // 5. Schedule a high-priority pod (preemptor), that can only fit on big node. + // 6. High-priority pod should be scheduled on a big node and victim should be preempted. + // 7. Victim should be rescheduled on a smaller node. + + // Create a node with resources for only one pod. + nodeRes := map[v1.ResourceName]string{ + v1.ResourceCPU: "2", + v1.ResourceMemory: "2Gi", + } + node := st.MakeNode().Name("big-node").Capacity(nodeRes).Obj() + + victim := st.MakePod().Name("victim").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1", v1.ResourceMemory: "1Gi"}).Obj() + // Preemptor requires more resources than the small node has. + preemptor := st.MakePod().Name("preemptor").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1.5", v1.ResourceMemory: "1.5Gi"}).Obj() + + // Register the blocking plugin + var plugin *blockingPermitPlugin + registry := make(frameworkruntime.Registry) + err := registry.Register(blockingPermitPluginName, func(ctx context.Context, obj runtime.Object, fh fwk.Handle) (fwk.Plugin, error) { + pl := newBlockingPermitPlugin(ctx, obj, fh) + plugin = pl.(*blockingPermitPlugin) + return pl, nil + }) + if err != nil { + t.Fatalf("Error registering plugin: %v", err) + } + + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ + Profiles: []configv1.KubeSchedulerProfile{{ + SchedulerName: ptr.To(v1.DefaultSchedulerName), + Plugins: &configv1.Plugins{ + Permit: configv1.PluginSet{ + Enabled: []configv1.Plugin{ + {Name: blockingPermitPluginName}, + }, + }, + }, + }}, + }) + + testCtx := testutils.InitTestSchedulerWithOptions(t, + testutils.InitTestAPIServer(t, "preemption-waiting", nil), + 0, + scheduler.WithProfiles(cfg.Profiles...), + scheduler.WithFrameworkOutOfTreeRegistry(registry)) + testutils.SyncSchedulerInformerFactory(testCtx) + go testCtx.Scheduler.Run(testCtx.Ctx) + + victimToBlock := &blockedPod{ + blocked: make(chan struct{}), + } + plugin.podsToBlock[victim.Name] = victimToBlock + + cs := testCtx.ClientSet + + if _, err := createNode(cs, node); err != nil { + t.Fatalf("Error creating node: %v", err) + } + + t.Logf("Creating victim pod") + victim, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, victim, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating victim: %v", err) + } + + t.Logf("Waiting for victim to reach WaitOnPermit") + select { + case <-victimToBlock.blocked: + t.Logf("Victim reached WaitOnPermit") + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("Timed out waiting for victim to reach WaitOnPermit") + } + + smallNodeRes := map[v1.ResourceName]string{ + v1.ResourceCPU: "1", + v1.ResourceMemory: "1Gi", + } + smallNode := st.MakeNode().Name("small-node").Capacity(smallNodeRes).Obj() + if _, err := createNode(cs, smallNode); err != nil { + t.Fatalf("Error creating node: %v", err) + } + + t.Logf("Creating preemptor pod") + _, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, preemptor, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating preemptor: %v", err) + } + + // Preemptor should eventually be scheduled or cause victim preemption. + // Since victim is in WaitingOnPermit, Preemptor's preemption logic (PostFilter) should find it. + // It should call PreemptPod() on waiting victim. + // The plugin returns error on preemption, so the victim scheduling fails. + // The victim should NOT be deleted from API server. + // Instead the victim should go to the backoff queue and get rescheduled eventually. + t.Logf("Waiting for preemptor to be scheduled") + err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, 15*time.Second, false, func(ctx context.Context) (bool, error) { + // Ensure that victim is not deleted + _, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(ctx, victim.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return false, fmt.Errorf("victim pod was deleted") + } + return false, err + } + // Check if preemptor was scheduled + p, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(ctx, preemptor.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return false, fmt.Errorf("preemptor pod was deleted") + } + return false, err + } + return p.Spec.NodeName != "", nil + }) + if err != nil { + t.Fatalf("Failed waiting for preemptor validation: %v", err) + } + + t.Logf("waiting for victim to be rescheduled") + err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, 15*time.Second, false, func(ctx context.Context) (bool, error) { + v, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(ctx, victim.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return v.Spec.NodeName != "", nil + }) + if err != nil { + t.Fatalf("Failed waiting for victim validation: %v", err) + } + + // Check that preemptor and victim are scheduled on expected nodes: victim on a small node and preemptor on a big node. + v, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, victim.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error getting victim: %v", err) + } + if v.Spec.NodeName != "small-node" { + t.Fatalf("Victim should be scheduled on small-node, but was scheduled on %s", v.Spec.NodeName) + } + + p, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, preemptor.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error getting preemptor: %v", err) + } + if p.Spec.NodeName != "big-node" { + t.Fatalf("Preemptor should be scheduled on big-node, but was scheduled on %s", p.Spec.NodeName) + } +}