Merge pull request #135948 from pohly/dra-scheduler-resource-plugin-unit-test-fix

DRA extended resources: fix flake in unit tests
This commit is contained in:
Kubernetes Prow Robot 2025-12-30 16:12:35 +05:30 committed by GitHub
commit 3226fe520d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 149 additions and 150 deletions

View file

@ -17,7 +17,6 @@ limitations under the License.
package noderesources
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
@ -26,7 +25,6 @@ import (
apiruntime "k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -36,9 +34,13 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestNodeResourcesBalancedAllocation(t *testing.T) {
testNodeResourcesBalancedAllocation(ktesting.Init(t))
}
func testNodeResourcesBalancedAllocation(tCtx ktesting.TContext) {
cpuAndMemoryAndGPU := v1.PodSpec{
Containers: []v1.Container{
{
@ -363,45 +365,43 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, test.draObjects != nil)
tCtx.SyncTest(test.name, func(tCtx ktesting.TContext) {
featuregatetesting.SetFeatureGateDuringTest(tCtx, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, test.draObjects != nil)
snapshot := cache.NewSnapshot(test.pods, test.nodes)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
p, _ := NewBalancedAllocation(ctx, &test.args, fh, feature.Features{
fh, _ := runtime.NewFramework(tCtx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
defer func() {
tCtx.Cancel("test has completed")
runtime.WaitForShutdown(fh)
}()
p, _ := NewBalancedAllocation(tCtx, &test.args, fh, feature.Features{
EnableDRAExtendedResource: test.draObjects != nil,
})
draManager, err := newTestDRAManager(t, ctx, logger, test.draObjects...)
if err != nil {
t.Fatalf("failed to create test DRA manager: %v", err)
}
draManager := newTestDRAManager(tCtx, test.draObjects...)
p.(*BalancedAllocation).draManager = draManager
state := framework.NewCycleState()
if test.runPreScore {
status := p.(fwk.PreScorePlugin).PreScore(ctx, state, test.pod, tf.BuildNodeInfos(test.nodes))
status := p.(fwk.PreScorePlugin).PreScore(tCtx, state, test.pod, tf.BuildNodeInfos(test.nodes))
if status.Code() != test.wantPreScoreStatusCode {
t.Errorf("unexpected status code, want: %v, got: %v", test.wantPreScoreStatusCode, status.Code())
tCtx.Errorf("unexpected status code, want: %v, got: %v", test.wantPreScoreStatusCode, status.Code())
}
if status.Code() == fwk.Skip {
t.Log("skipping score test as PreScore returned skip")
tCtx.Log("skipping score test as PreScore returned skip")
return
}
}
for i := range test.nodes {
nodeInfo, err := snapshot.Get(test.nodes[i].Name)
if err != nil {
t.Errorf("failed to get node %q from snapshot: %v", test.nodes[i].Name, err)
tCtx.Errorf("failed to get node %q from snapshot: %v", test.nodes[i].Name, err)
}
hostResult, status := p.(fwk.ScorePlugin).Score(ctx, state, test.pod, nodeInfo)
hostResult, status := p.(fwk.ScorePlugin).Score(tCtx, state, test.pod, nodeInfo)
if !status.IsSuccess() {
t.Errorf("Score is expected to return success, but didn't. Got status: %v", status)
tCtx.Errorf("Score is expected to return success, but didn't. Got status: %v", status)
}
if diff := cmp.Diff(test.expectedList[i].Score, hostResult); diff != "" {
t.Errorf("unexpected score for host %v (-want,+got):\n%s", test.nodes[i].Name, diff)
tCtx.Errorf("unexpected score for host %v (-want,+got):\n%s", test.nodes[i].Name, diff)
}
}
})

View file

@ -17,9 +17,9 @@ limitations under the License.
package noderesources
import (
"context"
"fmt"
"testing"
"testing/synctest"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
@ -35,8 +35,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -49,6 +47,7 @@ import (
st "k8s.io/kubernetes/pkg/scheduler/testing"
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@ -151,7 +150,8 @@ func newPodLevelResourcesPod(pod *v1.Pod, podResources v1.ResourceRequirements)
return pod
}
func TestEnoughRequests(t *testing.T) {
func TestEnoughRequests(t *testing.T) { testEnoughRequests(ktesting.Init(t)) }
func testEnoughRequests(tCtx ktesting.TContext) {
enoughPodsTests := []struct {
pod *v1.Pod
nodeInfo *framework.NodeInfo
@ -706,9 +706,9 @@ func TestEnoughRequests(t *testing.T) {
}
for _, test := range enoughPodsTests {
t.Run(test.name, func(t *testing.T) {
tCtx.SyncTest(test.name, func(tCtx ktesting.TContext) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, test.draExtendedResourceEnabled)
featuregatetesting.SetFeatureGateDuringTest(tCtx, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, test.draExtendedResourceEnabled)
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5), Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
test.nodeInfo.SetNode(&node)
@ -716,43 +716,47 @@ func TestEnoughRequests(t *testing.T) {
test.args.ScoringStrategy = defaultScoringStrategy
}
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
client := fake.NewSimpleClientset(deviceClassWithExtendResourceName)
informerFactory := informers.NewSharedInformerFactory(client, 0)
claimsCache := assumecache.NewAssumeCache(logger, informerFactory.Resource().V1().ResourceClaims().Informer(), "resource claim", "", nil)
draManager := dynamicresources.NewDRAManager(ctx, claimsCache, nil, informerFactory)
claimsCache := assumecache.NewAssumeCache(tCtx.Logger(), informerFactory.Resource().V1().ResourceClaims().Informer(), "resource claim", "", nil)
draManager := dynamicresources.NewDRAManager(tCtx, claimsCache, nil, informerFactory)
if test.draExtendedResourceEnabled {
cache := draManager.DeviceClassResolver().(*extendedresourcecache.ExtendedResourceCache)
if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache); err != nil {
logger.Error(err, "failed to add device class informer event handler")
}
handle, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache)
tCtx.ExpectNoError(err, "add device class informer event handler")
tCtx.Cleanup(func() {
_ = informerFactory.Resource().V1().DeviceClasses().Informer().RemoveEventHandler(handle)
})
}
informerFactory.Start(ctx.Done())
t.Cleanup(func() {
informerFactory.Start(tCtx.Done())
tCtx.Cleanup(func() {
tCtx.Cancel("test has completed")
// Now we can wait for all goroutines to stop.
informerFactory.Shutdown()
})
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
// Wait for event delivery.
synctest.Wait()
runOpts := []runtime.Option{
runtime.WithSharedDRAManager(draManager),
}
fh, _ := runtime.NewFramework(ctx, nil, nil, runOpts...)
p, err := NewFit(ctx, &test.args, fh, plfeature.Features{EnablePodLevelResources: test.podLevelResourcesEnabled, EnableDRAExtendedResource: test.draExtendedResourceEnabled})
if err != nil {
t.Fatal(err)
}
fh, _ := runtime.NewFramework(tCtx, nil, nil, runOpts...)
defer func() {
tCtx.Cancel("test has completed")
runtime.WaitForShutdown(fh)
}()
p, err := NewFit(tCtx, &test.args, fh, plfeature.Features{EnablePodLevelResources: test.podLevelResourcesEnabled, EnableDRAExtendedResource: test.draExtendedResourceEnabled})
tCtx.ExpectNoError(err, "create fit plugin")
cycleState := framework.NewCycleState()
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod, nil)
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(tCtx, cycleState, test.pod, nil)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
tCtx.Errorf("prefilter failed with status: %v", preFilterStatus)
}
gotStatus := p.(fwk.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
gotStatus := p.(fwk.FilterPlugin).Filter(tCtx, cycleState, test.pod, test.nodeInfo)
if diff := cmp.Diff(test.wantStatus, gotStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
tCtx.Errorf("status does not match (-want,+got):\n%s", diff)
}
opts := ResourceRequestsOptions{EnablePodLevelResources: test.podLevelResourcesEnabled, EnableDRAExtendedResource: test.draExtendedResourceEnabled}
@ -763,33 +767,30 @@ func TestEnoughRequests(t *testing.T) {
}
gotInsufficientResources := fitsRequest(state, test.nodeInfo, p.(*Fit).ignoredResources, p.(*Fit).ignoredResourceGroups, testDRAManager, opts)
if diff := cmp.Diff(test.wantInsufficientResources, gotInsufficientResources); diff != "" {
t.Errorf("insufficient resources do not match (-want,+got):\n%s", diff)
tCtx.Errorf("insufficient resources do not match (-want,+got):\n%s", diff)
}
})
}
}
func TestPreFilterDisabled(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
func TestPreFilterDisabled(t *testing.T) { testPreFilterDisabled(ktesting.Init(t)) }
func testPreFilterDisabled(tCtx ktesting.TContext) {
pod := &v1.Pod{}
nodeInfo := framework.NewNodeInfo()
node := v1.Node{}
nodeInfo.SetNode(&node)
p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
if err != nil {
t.Fatal(err)
}
p, err := NewFit(tCtx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
tCtx.ExpectNoError(err, "create fit plugin")
cycleState := framework.NewCycleState()
gotStatus := p.(fwk.FilterPlugin).Filter(ctx, cycleState, pod, nodeInfo)
gotStatus := p.(fwk.FilterPlugin).Filter(tCtx, cycleState, pod, nodeInfo)
wantStatus := fwk.AsStatus(fwk.ErrNotFound)
if diff := cmp.Diff(wantStatus, gotStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
tCtx.Errorf("status does not match (-want,+got):\n%s", diff)
}
}
func TestNotEnoughRequests(t *testing.T) {
func TestNotEnoughRequests(t *testing.T) { testNotEnoughRequests(ktesting.Init(t)) }
func testNotEnoughRequests(tCtx ktesting.TContext) {
notEnoughPodsTests := []struct {
pod *v1.Pod
nodeInfo *framework.NodeInfo
@ -823,33 +824,29 @@ func TestNotEnoughRequests(t *testing.T) {
},
}
for _, test := range notEnoughPodsTests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx.Run(test.name, func(tCtx ktesting.TContext) {
node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1, 0, 0, 0)}}
test.nodeInfo.SetNode(&node)
p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
if err != nil {
t.Fatal(err)
}
p, err := NewFit(tCtx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
tCtx.ExpectNoError(err, "create fit plugin")
cycleState := framework.NewCycleState()
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod, nil)
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(tCtx, cycleState, test.pod, nil)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
tCtx.Errorf("prefilter failed with status: %v", preFilterStatus)
}
gotStatus := p.(fwk.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
gotStatus := p.(fwk.FilterPlugin).Filter(tCtx, cycleState, test.pod, test.nodeInfo)
if diff := cmp.Diff(test.wantStatus, gotStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
tCtx.Errorf("status does not match (-want,+got):\n%s", diff)
}
})
}
}
func TestStorageRequests(t *testing.T) {
func TestStorageRequests(t *testing.T) { testStorageRequests(ktesting.Init(t)) }
func testStorageRequests(tCtx ktesting.TContext) {
storagePodsTests := []struct {
pod *v1.Pod
nodeInfo *framework.NodeInfo
@ -884,33 +881,29 @@ func TestStorageRequests(t *testing.T) {
}
for _, test := range storagePodsTests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx.Run(test.name, func(tCtx ktesting.TContext) {
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 32, 5, 20, 5), Allocatable: makeAllocatableResources(10, 20, 32, 5, 20, 5)}}
test.nodeInfo.SetNode(&node)
p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
if err != nil {
t.Fatal(err)
}
p, err := NewFit(tCtx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{})
tCtx.ExpectNoError(err, "create fit plugin")
cycleState := framework.NewCycleState()
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod, nil)
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(tCtx, cycleState, test.pod, nil)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
tCtx.Errorf("prefilter failed with status: %v", preFilterStatus)
}
gotStatus := p.(fwk.FilterPlugin).Filter(ctx, cycleState, test.pod, test.nodeInfo)
gotStatus := p.(fwk.FilterPlugin).Filter(tCtx, cycleState, test.pod, test.nodeInfo)
if diff := cmp.Diff(test.wantStatus, gotStatus); diff != "" {
t.Errorf("status does not match (-want,+got):\n%s", diff)
tCtx.Errorf("status does not match (-want,+got):\n%s", diff)
}
})
}
}
func TestRestartableInitContainers(t *testing.T) {
func TestRestartableInitContainers(t *testing.T) { testRestartableInitContainers(ktesting.Init(t)) }
func testRestartableInitContainers(tCtx ktesting.TContext) {
newPod := func() *v1.Pod {
return &v1.Pod{
Spec: v1.PodSpec{
@ -993,30 +986,25 @@ func TestRestartableInitContainers(t *testing.T) {
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx.Run(test.name, func(tCtx ktesting.TContext) {
node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(2, 0, 1, 0, 0, 0)}}
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(&node)
p, err := NewFit(ctx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{EnableSidecarContainers: test.enableSidecarContainers})
if err != nil {
t.Fatal(err)
}
p, err := NewFit(tCtx, &config.NodeResourcesFitArgs{ScoringStrategy: defaultScoringStrategy}, nil, plfeature.Features{EnableSidecarContainers: test.enableSidecarContainers})
tCtx.ExpectNoError(err, "create fit plugin")
cycleState := framework.NewCycleState()
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(ctx, cycleState, test.pod, nil)
_, preFilterStatus := p.(fwk.PreFilterPlugin).PreFilter(tCtx, cycleState, test.pod, nil)
if diff := cmp.Diff(test.wantPreFilterStatus, preFilterStatus); diff != "" {
t.Error("prefilter status does not match (-expected +actual):\n", diff)
tCtx.Error("prefilter status does not match (-expected +actual):\n", diff)
}
if !preFilterStatus.IsSuccess() {
return
}
filterStatus := p.(fwk.FilterPlugin).Filter(ctx, cycleState, test.pod, nodeInfo)
filterStatus := p.(fwk.FilterPlugin).Filter(tCtx, cycleState, test.pod, nodeInfo)
if diff := cmp.Diff(test.wantFilterStatus, filterStatus); diff != "" {
t.Error("filter status does not match (-expected +actual):\n", diff)
tCtx.Error("filter status does not match (-expected +actual):\n", diff)
}
})
}
@ -1024,6 +1012,9 @@ func TestRestartableInitContainers(t *testing.T) {
}
func TestFitScore(t *testing.T) {
testFitScore(ktesting.Init(t))
}
func testFitScore(tCtx ktesting.TContext) {
tests := []struct {
name string
requestedPod *v1.Pod
@ -1300,52 +1291,49 @@ func TestFitScore(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, test.draObjects != nil)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tCtx.SyncTest(test.name, func(tCtx ktesting.TContext) {
featuregatetesting.SetFeatureGateDuringTest(tCtx, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, test.draObjects != nil)
state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(tCtx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
defer func() {
tCtx.Cancel("test has completed")
runtime.WaitForShutdown(fh)
}()
args := test.nodeResourcesFitArgs
p, err := NewFit(ctx, &args, fh, plfeature.Features{
p, err := NewFit(tCtx, &args, fh, plfeature.Features{
EnableDRAExtendedResource: test.draObjects != nil,
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
tCtx.Fatalf("unexpected error: %v", err)
}
if test.draObjects != nil {
draManager, err := newTestDRAManager(t, ctx, logger, test.draObjects...)
if err != nil {
t.Fatalf("failed to create test DRA manager: %v", err)
}
draManager := newTestDRAManager(tCtx, test.draObjects...)
p.(*Fit).draManager = draManager
}
var gotPriorities fwk.NodeScoreList
for _, n := range test.nodes {
if test.runPreScore {
status := p.(fwk.PreScorePlugin).PreScore(ctx, state, test.requestedPod, tf.BuildNodeInfos(test.nodes))
status := p.(fwk.PreScorePlugin).PreScore(tCtx, state, test.requestedPod, tf.BuildNodeInfos(test.nodes))
if !status.IsSuccess() {
t.Errorf("PreScore is expected to return success, but didn't. Got status: %v", status)
tCtx.Errorf("PreScore is expected to return success, but didn't. Got status: %v", status)
}
}
nodeInfo, err := snapshot.Get(n.Name)
if err != nil {
t.Errorf("failed to get node %q from snapshot: %v", n.Name, err)
tCtx.Errorf("failed to get node %q from snapshot: %v", n.Name, err)
}
score, status := p.(fwk.ScorePlugin).Score(ctx, state, test.requestedPod, nodeInfo)
score, status := p.(fwk.ScorePlugin).Score(tCtx, state, test.requestedPod, nodeInfo)
if !status.IsSuccess() {
t.Errorf("Score is expected to return success, but didn't. Got status: %v", status)
tCtx.Errorf("Score is expected to return success, but didn't. Got status: %v", status)
}
gotPriorities = append(gotPriorities, fwk.NodeScore{Name: n.Name, Score: score})
}
if diff := cmp.Diff(test.expectedPriorities, gotPriorities); diff != "" {
t.Errorf("expected:\n\t%+v,\ngot:\n\t%+v", test.expectedPriorities, gotPriorities)
tCtx.Errorf("expected:\n\t%+v,\ngot:\n\t%+v", test.expectedPriorities, gotPriorities)
}
})
}
@ -1440,8 +1428,6 @@ func BenchmarkTestFitScore(b *testing.B) {
for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
_, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
existingPods := []*v1.Pod{
st.MakePod().Node("node1").Req(map[v1.ResourceName]string{"cpu": "2000", "memory": "4000"}).Obj(),
}

View file

@ -17,8 +17,6 @@ limitations under the License.
package noderesources
import (
"context"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
@ -37,8 +35,6 @@ import (
"k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache"
"k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/dynamic-resource-allocation/structured"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
@ -47,6 +43,7 @@ import (
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
@ -265,6 +262,9 @@ func TestResourceAllocationScorerCalculateRequests(t *testing.T) {
}
func TestCalculateResourceAllocatableRequest(t *testing.T) {
testCalculateResourceAllocatableRequest(ktesting.Init(t))
}
func testCalculateResourceAllocatableRequest(tCtx ktesting.TContext) {
// Initialize test variables
nodeName := "resource-node"
driverName := "test-driver"
@ -552,18 +552,11 @@ func TestCalculateResourceAllocatableRequest(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
tCtx.SyncTest(name, func(tCtx ktesting.TContext) {
// Setup environment, create required objects
logger, tCtx := ktesting.NewTestContext(t)
tCtx, cancel := context.WithCancel(tCtx)
defer cancel()
featuregatetesting.SetFeatureGateDuringTest(tCtx, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, tc.enableDRAExtendedResource)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, tc.enableDRAExtendedResource)
draManager, err := newTestDRAManager(t, tCtx, logger, tc.objects...)
if err != nil {
t.Fatalf("failed to create fake DRA manager: %v", err)
}
draManager := newTestDRAManager(tCtx, tc.objects...)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(tc.node)
@ -582,24 +575,26 @@ func TestCalculateResourceAllocatableRequest(t *testing.T) {
var status *fwk.Status
draPreScoreState, status = getDRAPreScoredParams(draManager, []config.ResourceSpec{{Name: string(tc.extendedResource)}})
if status != nil {
t.Fatalf("getting DRA pre-scored params failed: %v", status)
tCtx.Fatalf("getting DRA pre-scored params failed: %v", status)
}
}
// Test calculateResourceAllocatableRequest API
allocatable, requested := scorer.calculateResourceAllocatableRequest(tCtx, nodeInfo, tc.extendedResource, tc.podRequest, draPreScoreState)
if !cmp.Equal(allocatable, tc.expectedAllocatable) {
t.Errorf("Expected allocatable=%v, but got allocatable=%v", tc.expectedAllocatable, allocatable)
tCtx.Errorf("Expected allocatable=%v, but got allocatable=%v", tc.expectedAllocatable, allocatable)
}
if !cmp.Equal(requested, tc.expectedRequested) {
t.Errorf("Expected requested=%v, but got requested=%v", tc.expectedRequested, requested)
tCtx.Errorf("Expected requested=%v, but got requested=%v", tc.expectedRequested, requested)
}
})
}
}
// newTestDRAManager creates a DefaultDRAManager for testing purposes
func newTestDRAManager(t *testing.T, ctx context.Context, logger klog.Logger, objects ...apiruntime.Object) (*dynamicresources.DefaultDRAManager, error) {
// newTestDRAManager creates a DefaultDRAManager for testing purposes.
// Only usable in a syntest bubble.
func newTestDRAManager(tCtx ktesting.TContext, objects ...apiruntime.Object) *dynamicresources.DefaultDRAManager {
tCtx = ktesting.WithCancel(tCtx)
client := fake.NewClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(client, 0)
resourceSliceTrackerOpts := tracker.Options{
@ -608,14 +603,12 @@ func newTestDRAManager(t *testing.T, ctx context.Context, logger klog.Logger, ob
ClassInformer: informerFactory.Resource().V1().DeviceClasses(),
KubeClient: client,
}
resourceSliceTracker, err := tracker.StartTracker(ctx, resourceSliceTrackerOpts)
if err != nil {
return nil, fmt.Errorf("couldn't start resource slice tracker: %w", err)
}
resourceSliceTracker, err := tracker.StartTracker(tCtx, resourceSliceTrackerOpts)
tCtx.ExpectNoError(err, "couldn't start resource slice tracker")
draManager := dynamicresources.NewDRAManager(
ctx,
tCtx,
assumecache.NewAssumeCache(
logger,
tCtx.Logger(),
informerFactory.Resource().V1().ResourceClaims().Informer(),
"resource claim",
"",
@ -624,18 +617,25 @@ func newTestDRAManager(t *testing.T, ctx context.Context, logger klog.Logger, ob
informerFactory)
cache := draManager.DeviceClassResolver().(*extendedresourcecache.ExtendedResourceCache)
if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache); err != nil {
return nil, fmt.Errorf("failed to add device class informer event handler: %w", err)
}
handle, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache)
tCtx.ExpectNoError(err, "add device class informer event handler")
tCtx.Cleanup(func() {
_ = informerFactory.Resource().V1().DeviceClasses().Informer().RemoveEventHandler(handle)
})
informerFactory.Start(ctx.Done())
t.Cleanup(func() {
informerFactory.Start(tCtx.Done())
tCtx.Cleanup(func() {
tCtx.Cancel("test has completed")
// Now we can wait for all goroutines to stop.
informerFactory.Shutdown()
})
informerFactory.WaitForCacheSync(ctx.Done())
informerFactory.WaitForCacheSync(tCtx.Done())
return draManager, nil
// Wait for full initialization of manager, including
// processing of all informer events.
tCtx.Wait()
return draManager
}
// getCachedDeviceMatch checks the cache for a DeviceMatches result

View file

@ -303,6 +303,10 @@ func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
var _ framework.Framework = &frameworkImpl{}
// NewFramework initializes plugins given the configuration and the registry.
//
// It creates background goroutines (for example, via defaultFrameworkOptions -> metrics.NewMetricsAsyncRecorder)
// which continue running until the context gets canceled. WaitForShutdown can be used to block
// until they have terminated.
func NewFramework(ctx context.Context, r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
options := defaultFrameworkOptions(ctx.Done())
for _, opt := range opts {
@ -451,6 +455,15 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
return f, nil
}
// WaitForShutdown waits for completion of all background goroutines of a framework
// instance created by NewFramework. The context given to NewFramework must be canceled
// to stop those background goroutines.
func WaitForShutdown(f framework.Framework) {
if f.(*frameworkImpl).metricsRecorder != nil {
<-f.(*frameworkImpl).metricsRecorder.IsStoppedCh
}
}
// setInstrumentedPlugins initializes instrumented plugins from current plugins that frameworkImpl has.
func (f *frameworkImpl) setInstrumentedPlugins() {
// Cache metric streams for prefilter and filter plugins.