DRA e2e+integration: test ResourceSlice controller

The "create 100 slices" E2E sometimes flaked with timeouts (e.g. 95 out of 100
slices created). It created too much load for an E2E test.

The same test now uses ktesting as API, which makes it possible to run it as
integration test with the original 100 slices and with more moderate 10 slices
as E2E test.
This commit is contained in:
Patrick Ohly 2025-12-11 12:43:47 +01:00
parent 047682908d
commit c47ad64820
3 changed files with 142 additions and 97 deletions

View file

@ -41,10 +41,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation"
applyv1 "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/events"
@ -56,6 +54,7 @@ import (
e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
e2eevents "k8s.io/kubernetes/test/e2e/framework/events"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
dratest "k8s.io/kubernetes/test/integration/dra"
"k8s.io/kubernetes/test/utils/ktesting"
admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/utils/ptr"
@ -2682,106 +2681,13 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() {
})
ginkgo.Context("ResourceSlice Controller", func() {
// This is a stress test for creating many large slices.
// This is a test for creating some large slices.
// Each slice is as large as API limits allow.
//
// Could become a conformance test because it only depends
// on the apiserver.
f.It("creates slices", func(ctx context.Context) {
// Define desired resource slices.
driverName := f.Namespace.Name
numSlices := 100
devicePrefix := "dev-"
domainSuffix := ".example.com"
poolName := "network-attached"
domain := strings.Repeat("x", 63 /* TODO(pohly): add to API */ -len(domainSuffix)) + domainSuffix
stringValue := strings.Repeat("v", resourceapi.DeviceAttributeMaxValueLength)
pool := resourceslice.Pool{
Slices: make([]resourceslice.Slice, numSlices),
}
numDevices := 0
for i := 0; i < numSlices; i++ {
devices := make([]resourceapi.Device, resourceapi.ResourceSliceMaxDevices)
for e := 0; e < resourceapi.ResourceSliceMaxDevices; e++ {
device := resourceapi.Device{
Name: devicePrefix + strings.Repeat("x", validation.DNS1035LabelMaxLength-len(devicePrefix)-6) + fmt.Sprintf("%06d", numDevices),
Attributes: make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice),
}
numDevices++
for j := 0; j < resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice; j++ {
name := resourceapi.QualifiedName(domain + "/" + strings.Repeat("x", resourceapi.DeviceMaxIDLength-4) + fmt.Sprintf("%04d", j))
device.Attributes[name] = resourceapi.DeviceAttribute{
StringValue: &stringValue,
}
}
devices[e] = device
}
pool.Slices[i].Devices = devices
}
resources := &resourceslice.DriverResources{
Pools: map[string]resourceslice.Pool{poolName: pool},
}
listSlices := framework.ListObjects(f.ClientSet.ResourceV1().ResourceSlices().List, metav1.ListOptions{
FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName,
})
ginkgo.By("Creating slices")
mutationCacheTTL := 10 * time.Second
controller, err := resourceslice.StartController(ctx, resourceslice.Options{
DriverName: driverName,
KubeClient: f.ClientSet,
Resources: resources,
MutationCacheTTL: &mutationCacheTTL,
})
framework.ExpectNoError(err, "start controller")
ginkgo.DeferCleanup(func(ctx context.Context) {
controller.Stop()
gomega.Eventually(ctx, func(ctx context.Context) (*resourceapi.ResourceSliceList, error) {
err := f.ClientSet.ResourceV1().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName,
})
if err != nil {
return nil, fmt.Errorf("delete slices: %w", err)
}
return listSlices(ctx)
}).Should(gomega.HaveField("Items", gomega.BeEmpty()))
})
// Eventually we should have all desired slices.
gomega.Eventually(ctx, listSlices).WithTimeout(3 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(numSlices)))
// Verify state.
expectSlices, err := listSlices(ctx)
framework.ExpectNoError(err)
gomega.Expect(expectSlices.Items).ShouldNot(gomega.BeEmpty())
framework.Logf("Protobuf size of one slice is %d bytes = %d KB.", expectSlices.Items[0].Size(), expectSlices.Items[0].Size()/1024)
gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically(">=", 600*1024), "ResourceSlice size")
gomega.Expect(expectSlices.Items[0].Size()).Should(gomega.BeNumerically("<", 1024*1024), "ResourceSlice size")
expectStats := resourceslice.Stats{NumCreates: int64(numSlices)}
gomega.Expect(controller.GetStats()).Should(gomega.Equal(expectStats))
// No further changes expected now, after after checking again.
gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats))
// Ask the controller to delete all slices except for one empty slice.
ginkgo.By("Deleting slices")
resources = resources.DeepCopy()
resources.Pools[poolName] = resourceslice.Pool{Slices: []resourceslice.Slice{{}}}
controller.Update(resources)
// One empty slice should remain, after removing the full ones and adding the empty one.
emptySlice := gomega.HaveField("Spec.Devices", gomega.BeEmpty())
gomega.Eventually(ctx, listSlices).WithTimeout(2 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveExactElements(emptySlice)))
expectStats = resourceslice.Stats{NumCreates: int64(numSlices) + 1, NumDeletes: int64(numSlices)}
// There is a window of time where the ResourceSlice exists and is
// returned in a list but before that ResourceSlice is accounted for
// in the controller's stats, consisting mostly of network latency
// between this test process and the API server. Wait for the stats
// to converge before asserting there are no further changes.
gomega.Eventually(ctx, controller.GetStats).WithTimeout(30 * time.Second).Should(gomega.Equal(expectStats))
gomega.Consistently(ctx, controller.GetStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats))
dratest.TestCreateResourceSlices(f.TContext(ctx), 10)
})
})

View file

@ -169,6 +169,11 @@ func TestDRA(t *testing.T) {
tCtx.Run("ExtendedResource", func(tCtx ktesting.TContext) { testExtendedResource(tCtx, false) })
tCtx.Run("ResourceClaimDeviceStatus", func(tCtx ktesting.TContext) { testResourceClaimDeviceStatus(tCtx, false) })
tCtx.Run("DeviceBindingConditions", func(tCtx ktesting.TContext) { testDeviceBindingConditions(tCtx, false) })
tCtx.Run("ResourceSliceController", func(tCtx ktesting.TContext) {
namespace := createTestNamespace(tCtx, nil)
tCtx = tCtx.WithNamespace(namespace)
TestCreateResourceSlices(tCtx, 100)
})
},
},
"v1beta1": {

View file

@ -0,0 +1,134 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dra
import (
"fmt"
"strings"
"time"
"github.com/onsi/gomega"
resourceapi "k8s.io/api/resource/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/kubernetes/test/utils/ktesting"
)
// TestCreateResourceSlices uses the ResourceSlice controller to create slices.
// It runs both as integration and as E2E test, with different number of slices.
func TestCreateResourceSlices(tCtx ktesting.TContext, numSlices int) {
// Define desired resource slices.
namespace := tCtx.Namespace()
driverName := namespace
devicePrefix := "dev-"
domainSuffix := ".example.com"
poolName := "network-attached"
domain := strings.Repeat("x", resourceapi.DeviceMaxDomainLength-len(domainSuffix)) + domainSuffix
stringValue := strings.Repeat("v", resourceapi.DeviceAttributeMaxValueLength)
pool := resourceslice.Pool{
Slices: make([]resourceslice.Slice, numSlices),
}
numDevices := 0
for i := 0; i < numSlices; i++ {
devices := make([]resourceapi.Device, resourceapi.ResourceSliceMaxDevices)
for e := 0; e < resourceapi.ResourceSliceMaxDevices; e++ {
device := resourceapi.Device{
Name: devicePrefix + strings.Repeat("x", validation.DNS1035LabelMaxLength-len(devicePrefix)-6) + fmt.Sprintf("%06d", numDevices),
Attributes: make(map[resourceapi.QualifiedName]resourceapi.DeviceAttribute, resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice),
}
numDevices++
for j := 0; j < resourceapi.ResourceSliceMaxAttributesAndCapacitiesPerDevice; j++ {
name := resourceapi.QualifiedName(domain + "/" + strings.Repeat("x", resourceapi.DeviceMaxIDLength-4) + fmt.Sprintf("%04d", j))
device.Attributes[name] = resourceapi.DeviceAttribute{
StringValue: &stringValue,
}
}
devices[e] = device
}
pool.Slices[i].Devices = devices
}
resources := &resourceslice.DriverResources{
Pools: map[string]resourceslice.Pool{poolName: pool},
}
listSlices := func(tCtx ktesting.TContext) *resourceapi.ResourceSliceList {
tCtx.Helper()
// TODO: replicate framework.ListObjects/Get/etc. with ktesting
slices, err := tCtx.Client().ResourceV1().ResourceSlices().List(tCtx, metav1.ListOptions{
FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName,
})
tCtx.ExpectNoError(err, "list slices")
return slices
}
tCtx.Log("Creating slices")
mutationCacheTTL := 10 * time.Second
controller, err := resourceslice.StartController(tCtx, resourceslice.Options{
DriverName: driverName,
KubeClient: tCtx.Client(),
Resources: resources,
MutationCacheTTL: &mutationCacheTTL,
})
tCtx.ExpectNoError(err, "start controller")
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
controller.Stop()
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) *resourceapi.ResourceSliceList {
err := tCtx.Client().ResourceV1().ResourceSlices().DeleteCollection(tCtx, metav1.DeleteOptions{}, metav1.ListOptions{
FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + driverName,
})
tCtx.ExpectNoError(err, "delete slices")
return listSlices(tCtx)
}).Should(gomega.HaveField("Items", gomega.BeEmpty()))
})
// Eventually we should have all desired slices.
ktesting.Eventually(tCtx, listSlices).WithTimeout(3 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(numSlices)))
// Verify state.
expectSlices := listSlices(tCtx)
tCtx.Assert(expectSlices.Items).ShouldNot(gomega.BeEmpty())
tCtx.Logf("Protobuf size of one slice is %d bytes = %d KB.", expectSlices.Items[0].Size(), expectSlices.Items[0].Size()/1024)
tCtx.Assert(expectSlices.Items[0].Size()).Should(gomega.BeNumerically(">=", 600*1024), "ResourceSlice size")
tCtx.Assert(expectSlices.Items[0].Size()).Should(gomega.BeNumerically("<", 1024*1024), "ResourceSlice size")
expectStats := resourceslice.Stats{NumCreates: int64(numSlices)}
tCtx.Assert(controller.GetStats()).Should(gomega.Equal(expectStats))
// No further changes expected now, after checking again.
getStats := func(tCtx ktesting.TContext) resourceslice.Stats { return controller.GetStats() }
ktesting.Consistently(tCtx, getStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats))
// Ask the controller to delete all slices except for one empty slice.
tCtx.Log("Deleting slices")
resources = resources.DeepCopy()
resources.Pools[poolName] = resourceslice.Pool{Slices: []resourceslice.Slice{{}}}
controller.Update(resources)
// One empty slice should remain, after removing the full ones and adding the empty one.
emptySlice := gomega.HaveField("Spec.Devices", gomega.BeEmpty())
ktesting.Eventually(tCtx, listSlices).WithTimeout(2 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveExactElements(emptySlice)))
expectStats = resourceslice.Stats{NumCreates: int64(numSlices) + 1, NumDeletes: int64(numSlices)}
// There is a window of time where the ResourceSlice exists and is
// returned in a list but before that ResourceSlice is accounted for
// in the controller's stats, consisting mostly of network latency
// between this test process and the API server. Wait for the stats
// to converge before asserting there are no further changes.
ktesting.Eventually(tCtx, getStats).WithTimeout(30 * time.Second).Should(gomega.Equal(expectStats))
ktesting.Consistently(tCtx, getStats).WithTimeout(2 * mutationCacheTTL).Should(gomega.Equal(expectStats))
}