Merge pull request #135719 from Argh4k/waiting-pod-integration-test

Put pods preempted in WaitOnPermit to backoff queue
This commit is contained in:
Kubernetes Prow Robot 2026-01-30 23:36:24 +05:30 committed by GitHub
commit 49fe2ecce1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 324 additions and 12 deletions

View file

@ -177,12 +177,16 @@ func NewEvaluator(pluginName string, fh fwk.Handle, i Interface, enableAsyncPree
ev.PreemptPod = func(ctx context.Context, c Candidate, preemptor, victim *v1.Pod, pluginName string) error {
logger := klog.FromContext(ctx)
// If the victim is a WaitingPod, send a reject message to the PermitPlugin.
skipAPICall := false
// If the victim is a WaitingPod, try to preempt it without a delete call (victim will go back to backoff queue).
// Otherwise we should delete the victim.
if waitingPod := ev.Handler.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject(pluginName, "preempted")
logger.V(2).Info("Preemptor pod rejected a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name())
} else {
if waitingPod.Preempt(pluginName, "preempted") {
logger.V(2).Info("Preemptor pod preempted a waiting pod", "preemptor", klog.KObj(preemptor), "waitingPod", klog.KObj(victim), "node", c.Name())
skipAPICall = true
}
}
if !skipAPICall {
condition := &v1.PodCondition{
Type: v1.DisruptionTarget,
ObservedGeneration: apipod.CalculatePodConditionObservedGeneration(&victim.Status, victim.Generation, v1.DisruptionTarget),

View file

@ -1863,11 +1863,10 @@ func (f *frameworkImpl) GetWaitingPod(uid types.UID) fwk.WaitingPod {
}
// RejectWaitingPod rejects a WaitingPod given its UID.
// The returned value indicates if the given pod is waiting or not.
// The returned value indicates if the rejection was successful.
func (f *frameworkImpl) RejectWaitingPod(uid types.UID) bool {
if waitingPod := f.waitingPods.get(uid); waitingPod != nil {
waitingPod.Reject("", "removed")
return true
return waitingPod.Reject("", "removed")
}
return false
}

View file

@ -75,6 +75,7 @@ type waitingPod struct {
pendingPlugins map[string]*time.Timer
s chan *fwk.Status
mu sync.RWMutex
done bool
}
var _ fwk.WaitingPod = &waitingPod{}
@ -141,15 +142,26 @@ func (w *waitingPod) Allow(pluginName string) {
}
// The select clause works as a non-blocking send.
// If there is no receiver, it's a no-op (default case).
// If there is no place in the buffer, it's a no-op (default case).
select {
case w.s <- fwk.NewStatus(fwk.Success, ""):
default:
}
w.done = true
}
// Reject declares the waiting pod unschedulable.
func (w *waitingPod) Reject(pluginName, msg string) {
func (w *waitingPod) Reject(pluginName, msg string) bool {
return w.stopWithStatus(fwk.Unschedulable, pluginName, msg)
}
// Preempt declares the waiting pod is preempted. Compared to reject it does not mark the pod as unschedulable,
// allowing it to be rescheduled.
func (w *waitingPod) Preempt(pluginName, msg string) bool {
return w.stopWithStatus(fwk.Error, pluginName, msg)
}
func (w *waitingPod) stopWithStatus(status fwk.Code, pluginName, msg string) bool {
w.mu.RLock()
defer w.mu.RUnlock()
for _, timer := range w.pendingPlugins {
@ -157,9 +169,14 @@ func (w *waitingPod) Reject(pluginName, msg string) {
}
// The select clause works as a non-blocking send.
// If there is no receiver, it's a no-op (default case).
// If there is no place in the buffer, it's a no-op (default case).
select {
case w.s <- fwk.NewStatus(fwk.Unschedulable, msg).WithPlugin(pluginName):
case w.s <- fwk.NewStatus(status, msg).WithPlugin(pluginName):
default:
}
if w.done {
return false
}
w.done = true
return true
}

View file

@ -0,0 +1,103 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runtime
import (
"testing"
"time"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestWaitingPodReturnFalseOnAllowed(t *testing.T) {
tests := []struct {
name string
action func(wp *waitingPod) bool
actionName string
}{
{
name: "Preempt returns false on allowed pod",
action: func(wp *waitingPod) bool {
return wp.Preempt("preemption-plugin", "preempted")
},
actionName: "Preempt",
},
{
name: "Reject returns false on allowed pod",
action: func(wp *waitingPod) bool {
return wp.Reject("reject-plugin", "rejected")
},
actionName: "Reject",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pod := st.MakePod().Name("test-pod").UID("test-uid").Obj()
wp := newWaitingPod(pod, map[string]time.Duration{"plugin1": 10 * time.Second})
// 1. Simulate Allow from Plugin
wp.Allow("plugin1")
// 2. Perform Action
if success := tt.action(wp); success {
t.Errorf("Expected %s to return false (failed), but it returned true (success)", tt.actionName)
}
// 3. Check what signal the pod received (should be Success from Allow)
select {
case status := <-wp.s:
if !status.IsSuccess() {
t.Errorf("Expected Pod to stay Allowed (Success), but got status: %v", status)
}
default:
t.Fatal("No status received")
}
})
}
}
func TestWaitingPodMultipleActions(t *testing.T) {
pod := st.MakePod().Name("test-pod").UID("test-uid").Obj()
wp := newWaitingPod(pod, map[string]time.Duration{"plugin1": 10 * time.Second})
startWaitOnPermit := make(chan struct{})
endWaitOnPermit := make(chan struct{})
// Simulate WaitOnPermit
go func() {
close(startWaitOnPermit)
<-wp.s
close(endWaitOnPermit)
}()
<-startWaitOnPermit
// 1. Simulate Allow from Plugin, it should be consumed by WaitOnPermit
wp.Allow("plugin1")
<-endWaitOnPermit
// 2. Simulate Rejection from Plugin
res := wp.Reject("plugin2", "rejected")
if res {
t.Fatalf("Expected reject to fail, but it succeeded")
}
// 3. Simulate Preempt from Plugin
res = wp.Preempt("preemption-plugin", "preempted")
if res {
t.Fatalf("Expected preempt to fail, but it succeeded")
}
}

View file

@ -326,7 +326,10 @@ type WaitingPod interface {
// to unblock the pod.
Allow(pluginName string)
// Reject declares the waiting pod unschedulable.
Reject(pluginName, msg string)
Reject(pluginName, msg string) bool
// Preempt preempts the waiting pod. Compared to reject it does not mark the pod as unschedulable,
// allowing it to be rescheduled.
Preempt(pluginName, msg string) bool
}
// PreFilterResult wraps needed info for scheduler framework to act upon PreFilter phase.

View file

@ -28,6 +28,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -1619,3 +1620,188 @@ var _ fwk.PreFilterPlugin = &reservingPlugin{}
var _ fwk.FilterPlugin = &reservingPlugin{}
var _ fwk.PreFilterExtensions = &reservingPlugin{}
var _ fwk.ReservePlugin = &reservingPlugin{}
type blockedPod struct {
blocked chan struct{}
}
// blockingPermitPlugin is a Permit plugin that blocks until a signal is received.
type blockingPermitPlugin struct {
podsToBlock map[string]*blockedPod
}
const blockingPermitPluginName = "blocking-permit-plugin"
var _ fwk.PermitPlugin = &blockingPermitPlugin{}
func newBlockingPermitPlugin(_ context.Context, _ runtime.Object, h fwk.Handle) fwk.Plugin {
return &blockingPermitPlugin{
podsToBlock: make(map[string]*blockedPod),
}
}
func (pl *blockingPermitPlugin) Name() string {
return blockingPermitPluginName
}
func (pl *blockingPermitPlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
if p, ok := pl.podsToBlock[pod.Name]; ok {
p.blocked <- struct{}{}
delete(pl.podsToBlock, pod.Name)
return fwk.NewStatus(fwk.Wait, "waiting"), time.Minute
}
return nil, 0
}
func TestPreemptionRespectsWaitingPod(t *testing.T) {
// 1. Create a "blocking" permit plugin that signals when it's running and waits for a specific close.
// 2. Create a big node on which low-priority pod will be scheduled.
// 3. Schedule a low-priority pod (victim) that hits this plugin (after being selected to run on a big node).
// 4. While victim is blocked in WaitOnPermit, add a smaller node on which the victim should be rescheduled.
// 5. Schedule a high-priority pod (preemptor), that can only fit on big node.
// 6. High-priority pod should be scheduled on a big node and victim should be preempted.
// 7. Victim should be rescheduled on a smaller node.
// Create a node with resources for only one pod.
nodeRes := map[v1.ResourceName]string{
v1.ResourceCPU: "2",
v1.ResourceMemory: "2Gi",
}
node := st.MakeNode().Name("big-node").Capacity(nodeRes).Obj()
victim := st.MakePod().Name("victim").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1", v1.ResourceMemory: "1Gi"}).Obj()
// Preemptor requires more resources than the small node has.
preemptor := st.MakePod().Name("preemptor").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1.5", v1.ResourceMemory: "1.5Gi"}).Obj()
// Register the blocking plugin
var plugin *blockingPermitPlugin
registry := make(frameworkruntime.Registry)
err := registry.Register(blockingPermitPluginName, func(ctx context.Context, obj runtime.Object, fh fwk.Handle) (fwk.Plugin, error) {
pl := newBlockingPermitPlugin(ctx, obj, fh)
plugin = pl.(*blockingPermitPlugin)
return pl, nil
})
if err != nil {
t.Fatalf("Error registering plugin: %v", err)
}
cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
Profiles: []configv1.KubeSchedulerProfile{{
SchedulerName: ptr.To(v1.DefaultSchedulerName),
Plugins: &configv1.Plugins{
Permit: configv1.PluginSet{
Enabled: []configv1.Plugin{
{Name: blockingPermitPluginName},
},
},
},
}},
})
testCtx := testutils.InitTestSchedulerWithOptions(t,
testutils.InitTestAPIServer(t, "preemption-waiting", nil),
0,
scheduler.WithProfiles(cfg.Profiles...),
scheduler.WithFrameworkOutOfTreeRegistry(registry))
testutils.SyncSchedulerInformerFactory(testCtx)
go testCtx.Scheduler.Run(testCtx.Ctx)
victimToBlock := &blockedPod{
blocked: make(chan struct{}),
}
plugin.podsToBlock[victim.Name] = victimToBlock
cs := testCtx.ClientSet
if _, err := createNode(cs, node); err != nil {
t.Fatalf("Error creating node: %v", err)
}
t.Logf("Creating victim pod")
victim, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, victim, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating victim: %v", err)
}
t.Logf("Waiting for victim to reach WaitOnPermit")
select {
case <-victimToBlock.blocked:
t.Logf("Victim reached WaitOnPermit")
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("Timed out waiting for victim to reach WaitOnPermit")
}
smallNodeRes := map[v1.ResourceName]string{
v1.ResourceCPU: "1",
v1.ResourceMemory: "1Gi",
}
smallNode := st.MakeNode().Name("small-node").Capacity(smallNodeRes).Obj()
if _, err := createNode(cs, smallNode); err != nil {
t.Fatalf("Error creating node: %v", err)
}
t.Logf("Creating preemptor pod")
_, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, preemptor, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating preemptor: %v", err)
}
// Preemptor should eventually be scheduled or cause victim preemption.
// Since victim is in WaitingOnPermit, Preemptor's preemption logic (PostFilter) should find it.
// It should call PreemptPod() on waiting victim.
// The plugin returns error on preemption, so the victim scheduling fails.
// The victim should NOT be deleted from API server.
// Instead the victim should go to the backoff queue and get rescheduled eventually.
t.Logf("Waiting for preemptor to be scheduled")
err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, 15*time.Second, false, func(ctx context.Context) (bool, error) {
// Ensure that victim is not deleted
_, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(ctx, victim.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return false, fmt.Errorf("victim pod was deleted")
}
return false, err
}
// Check if preemptor was scheduled
p, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(ctx, preemptor.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return false, fmt.Errorf("preemptor pod was deleted")
}
return false, err
}
return p.Spec.NodeName != "", nil
})
if err != nil {
t.Fatalf("Failed waiting for preemptor validation: %v", err)
}
t.Logf("waiting for victim to be rescheduled")
err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, 15*time.Second, false, func(ctx context.Context) (bool, error) {
v, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(ctx, victim.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
return v.Spec.NodeName != "", nil
})
if err != nil {
t.Fatalf("Failed waiting for victim validation: %v", err)
}
// Check that preemptor and victim are scheduled on expected nodes: victim on a small node and preemptor on a big node.
v, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, victim.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error getting victim: %v", err)
}
if v.Spec.NodeName != "small-node" {
t.Fatalf("Victim should be scheduled on small-node, but was scheduled on %s", v.Spec.NodeName)
}
p, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, preemptor.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error getting preemptor: %v", err)
}
if p.Spec.NodeName != "big-node" {
t.Fatalf("Preemptor should be scheduled on big-node, but was scheduled on %s", p.Spec.NodeName)
}
}