DRA e2e: make driver deployment possible in Go unit tests

This leverages ktesting as wrapper around Ginkgo and testing.T to make all
helper code that is needed to deploy a DRA driver available to Go unit
tests and thus integration tests.

How to proceed with unifying helper code for integration and E2E testing is
open. This is just a minimal first step in that direction. Ideally, such
code should be in separate packages where usage of Ginkgo, e2e/framework
and gomega.Expect/Eventually/Consistently are forbidden.

While at it, the builder gets extended to make cleanup optional.
This will be needed for upgrade/downgrade testing with sub-tests.
This commit is contained in:
Patrick Ohly 2025-12-10 08:18:33 +01:00
parent 65ef31973c
commit 7c7b1e1018
10 changed files with 612 additions and 613 deletions

File diff suppressed because it is too large Load diff

View file

@ -42,8 +42,10 @@ import (
draclient "k8s.io/dynamic-resource-allocation/client"
"k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/utils/ktesting"
admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/utils/ptr"
)
@ -64,21 +66,22 @@ func (b *Builder) ExtendedResourceName(i int) string {
}
}
// Builder contains a running counter to make objects unique within thir
// Builder contains a running counter to make objects unique within their
// namespace.
type Builder struct {
f *framework.Framework
namespace string
driver *Driver
UseExtendedResourceName bool
podCounter int
claimCounter int
ClassParameters string // JSON
SkipCleanup bool
}
// ClassName returns the default device class name.
func (b *Builder) ClassName() string {
return b.f.UniqueName + b.driver.NameSuffix + "-class"
return b.namespace + b.driver.NameSuffix + "-class"
}
// SingletonIndex causes Builder.Class and ExtendedResourceName to create a
@ -243,7 +246,7 @@ func (b *Builder) Pod() *v1.Pod {
//
// It is tempting to use `terminationGraceperiodSeconds: 0`, but that is a very bad
// idea because it removes the pod before the kubelet had a chance to react (https://github.com/kubernetes/kubernetes/issues/120671).
pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, admissionapi.LevelRestricted, "" /* no command = pause */)
pod := e2epod.MakePod(b.namespace, nil, nil, admissionapi.LevelRestricted, "" /* no command = pause */)
pod.Labels = make(map[string]string)
pod.Spec.RestartPolicy = v1.RestartPolicyNever
pod.GenerateName = ""
@ -338,100 +341,108 @@ func (b *Builder) PodExternalMultiple() *v1.Pod {
}
// Create takes a bunch of objects and calls their Create function.
func (b *Builder) Create(ctx context.Context, objs ...klog.KMetadata) []klog.KMetadata {
func (b *Builder) Create(tCtx ktesting.TContext, objs ...klog.KMetadata) []klog.KMetadata {
tCtx.Helper()
cleanupCtx := tCtx.CleanupCtx
if b.SkipCleanup {
cleanupCtx = func(func(tCtx ktesting.TContext)) {}
}
var createdObjs []klog.KMetadata
for _, obj := range objs {
ginkgo.By(fmt.Sprintf("creating %T %s", obj, obj.GetName()))
tCtx.Logf("Creating %T %s", obj, obj.GetName())
var err error
var createdObj klog.KMetadata
switch obj := obj.(type) {
case *resourceapi.DeviceClass:
createdObj, err = b.ClientV1().DeviceClasses().Create(ctx, obj, metav1.CreateOptions{})
ginkgo.DeferCleanup(func(ctx context.Context) {
err := b.ClientV1().DeviceClasses().Delete(ctx, createdObj.GetName(), metav1.DeleteOptions{})
framework.ExpectNoError(err, "delete device class")
createdObj, err = b.ClientV1(tCtx).DeviceClasses().Create(tCtx, obj, metav1.CreateOptions{})
cleanupCtx(func(tCtx ktesting.TContext) {
err := b.ClientV1(tCtx).DeviceClasses().Delete(tCtx, createdObj.GetName(), metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete device class")
})
case *v1.Pod:
createdObj, err = b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().CoreV1().Pods(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *v1.ResourceQuota:
createdObj, err = b.f.ClientSet.CoreV1().ResourceQuotas(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().CoreV1().ResourceQuotas(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *v1.ConfigMap:
createdObj, err = b.f.ClientSet.CoreV1().ConfigMaps(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().CoreV1().ConfigMaps(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *resourceapi.ResourceClaim:
createdObj, err = b.ClientV1().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = b.ClientV1(tCtx).ResourceClaims(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *resourcev1beta1.ResourceClaim:
createdObj, err = b.f.ClientSet.ResourceV1beta1().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().ResourceV1beta1().ResourceClaims(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *resourcev1beta2.ResourceClaim:
createdObj, err = b.f.ClientSet.ResourceV1beta2().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().ResourceV1beta2().ResourceClaims(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *resourceapi.ResourceClaimTemplate:
createdObj, err = b.ClientV1().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = b.ClientV1(tCtx).ResourceClaimTemplates(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *resourcev1beta1.ResourceClaimTemplate:
createdObj, err = b.f.ClientSet.ResourceV1beta1().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().ResourceV1beta1().ResourceClaimTemplates(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *resourcev1beta2.ResourceClaimTemplate:
createdObj, err = b.f.ClientSet.ResourceV1beta2().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().ResourceV1beta2().ResourceClaimTemplates(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
case *resourceapi.ResourceSlice:
createdObj, err = b.ClientV1().ResourceSlices().Create(ctx, obj, metav1.CreateOptions{})
ginkgo.DeferCleanup(func(ctx context.Context) {
err := b.ClientV1().ResourceSlices().Delete(ctx, createdObj.GetName(), metav1.DeleteOptions{})
framework.ExpectNoError(err, "delete node resource slice")
createdObj, err = b.ClientV1(tCtx).ResourceSlices().Create(tCtx, obj, metav1.CreateOptions{})
cleanupCtx(func(tCtx ktesting.TContext) {
err := b.ClientV1(tCtx).ResourceSlices().Delete(tCtx, createdObj.GetName(), metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete node resource slice")
})
case *resourcealphaapi.DeviceTaintRule:
createdObj, err = b.f.ClientSet.ResourceV1alpha3().DeviceTaintRules().Create(ctx, obj, metav1.CreateOptions{})
ginkgo.DeferCleanup(func(ctx context.Context) {
err := b.f.ClientSet.ResourceV1alpha3().DeviceTaintRules().Delete(ctx, createdObj.GetName(), metav1.DeleteOptions{})
framework.ExpectNoError(err, "delete DeviceTaintRule")
createdObj, err = tCtx.Client().ResourceV1alpha3().DeviceTaintRules().Create(tCtx, obj, metav1.CreateOptions{})
cleanupCtx(func(tCtx ktesting.TContext) {
err := tCtx.Client().ResourceV1alpha3().DeviceTaintRules().Delete(tCtx, createdObj.GetName(), metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete DeviceTaintRule")
})
case *appsv1.DaemonSet:
createdObj, err = b.f.ClientSet.AppsV1().DaemonSets(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
createdObj, err = tCtx.Client().AppsV1().DaemonSets(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
// Cleanup not really needed, but speeds up namespace shutdown.
ginkgo.DeferCleanup(func(ctx context.Context) {
err := b.f.ClientSet.AppsV1().DaemonSets(b.f.Namespace.Name).Delete(ctx, obj.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "delete daemonset")
cleanupCtx(func(tCtx ktesting.TContext) {
err := tCtx.Client().AppsV1().DaemonSets(b.namespace).Delete(tCtx, obj.Name, metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete daemonset")
})
default:
framework.Fail(fmt.Sprintf("internal error, unsupported type %T", obj), 1)
tCtx.Fatalf("internal error, unsupported type %T", obj)
}
framework.ExpectNoErrorWithOffset(1, err, "create %T", obj)
tCtx.ExpectNoError(err, "create %T", obj)
createdObjs = append(createdObjs, createdObj)
}
return createdObjs
}
func (b *Builder) DeletePodAndWaitForNotFound(ctx context.Context, pod *v1.Pod) {
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
framework.ExpectNoErrorWithOffset(1, err, "delete %T", pod)
err = e2epod.WaitForPodNotFoundInNamespace(ctx, b.f.ClientSet, pod.Name, pod.Namespace, b.f.Timeouts.PodDelete)
framework.ExpectNoErrorWithOffset(1, err, "terminate %T", pod)
func (b *Builder) DeletePodAndWaitForNotFound(tCtx ktesting.TContext, pod *v1.Pod) {
tCtx.Helper()
err := tCtx.Client().CoreV1().Pods(b.namespace).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
tCtx.ExpectNoError(err, "delete %T", pod)
/* TODO: add timeouts to TContext? */
err = e2epod.WaitForPodNotFoundInNamespace(tCtx, tCtx.Client(), pod.Name, pod.Namespace, 5*time.Minute /* former b.f.Timeouts.PodDelete */)
tCtx.ExpectNoError(err, "terminate %T", pod)
}
// TestPod runs pod and checks if container logs contain expected environment variables
func (b *Builder) TestPod(ctx context.Context, f *framework.Framework, pod *v1.Pod, env ...string) {
ginkgo.GinkgoHelper()
func (b *Builder) TestPod(tCtx ktesting.TContext, pod *v1.Pod, env ...string) {
tCtx.Helper()
if !b.driver.WithKubelet {
// Less testing when we cannot rely on the kubelet to actually run the pod.
err := e2epod.WaitForPodScheduled(ctx, f.ClientSet, pod.Namespace, pod.Name)
framework.ExpectNoError(err, "schedule pod")
err := e2epod.WaitForPodScheduled(tCtx, tCtx.Client(), pod.Namespace, pod.Name)
tCtx.ExpectNoError(err, "schedule pod")
return
}
err := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
framework.ExpectNoError(err, "start pod")
err := e2epod.WaitForPodRunningInNamespace(tCtx, tCtx.Client(), pod)
tCtx.ExpectNoError(err, "start pod")
if len(env) == 0 {
_, env = b.ParametersEnv()
}
for _, container := range pod.Spec.Containers {
TestContainerEnv(ctx, f, pod, container.Name, false, env...)
TestContainerEnv(tCtx, pod, container.Name, false, env...)
}
}
// envLineRE matches env output with variables set by test/e2e/dra/test-driver.
var envLineRE = regexp.MustCompile(`^(?:admin|user|claim)_[a-zA-Z0-9_]*=.*$`)
func TestContainerEnv(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string, fullMatch bool, env ...string) {
ginkgo.GinkgoHelper()
stdout, stderr, err := e2epod.ExecWithOptionsContext(ctx, f, e2epod.ExecOptions{
func TestContainerEnv(tCtx ktesting.TContext, pod *v1.Pod, containerName string, fullMatch bool, env ...string) {
tCtx.Helper()
stdout, stderr, err := e2epod.ExecWithOptionsTCtx(tCtx, e2epod.ExecOptions{
Command: []string{"env"},
Namespace: pod.Namespace,
PodName: pod.Name,
@ -440,8 +451,8 @@ func TestContainerEnv(ctx context.Context, f *framework.Framework, pod *v1.Pod,
CaptureStderr: true,
Quiet: true,
})
framework.ExpectNoError(err, fmt.Sprintf("get env output for container %s", containerName))
gomega.Expect(stderr).To(gomega.BeEmpty(), fmt.Sprintf("env stderr for container %s", containerName))
tCtx.ExpectNoError(err, fmt.Sprintf("get env output for container %s", containerName))
tCtx.Expect(stderr).To(gomega.BeEmpty(), fmt.Sprintf("env stderr for container %s", containerName))
if fullMatch {
// Find all env variables set by the test driver.
var actualEnv, expectEnv []string
@ -455,91 +466,96 @@ func TestContainerEnv(ctx context.Context, f *framework.Framework, pod *v1.Pod,
}
sort.Strings(actualEnv)
sort.Strings(expectEnv)
gomega.Expect(actualEnv).To(gomega.Equal(expectEnv), fmt.Sprintf("container %s env output:\n%s", containerName, stdout))
tCtx.Expect(actualEnv).To(gomega.Equal(expectEnv), fmt.Sprintf("container %s env output:\n%s", containerName, stdout))
} else {
for i := 0; i < len(env); i += 2 {
envStr := fmt.Sprintf("%s=%s\n", env[i], env[i+1])
gomega.Expect(stdout).To(gomega.ContainSubstring(envStr), fmt.Sprintf("container %s env variables", containerName))
tCtx.Expect(stdout).To(gomega.ContainSubstring(envStr), fmt.Sprintf("container %s env variables", containerName))
}
}
}
func NewBuilder(f *framework.Framework, driver *Driver) *Builder {
b := &Builder{f: f, driver: driver}
ginkgo.BeforeEach(b.setUp)
b := &Builder{driver: driver}
ginkgo.BeforeEach(func() {
b.setUp(f.TContext(context.Background()))
})
return b
}
func NewBuilderNow(ctx context.Context, f *framework.Framework, driver *Driver) *Builder {
b := &Builder{f: f, driver: driver}
b.setUp(ctx)
func NewBuilderNow(tCtx ktesting.TContext, driver *Driver) *Builder {
b := &Builder{driver: driver}
b.setUp(tCtx)
return b
}
func (b *Builder) setUp(ctx context.Context) {
func (b *Builder) setUp(tCtx ktesting.TContext) {
b.namespace = tCtx.Namespace()
b.podCounter = 0
b.claimCounter = 0
b.Create(ctx, b.Class(0))
ginkgo.DeferCleanup(b.tearDown)
b.Create(tCtx, b.Class(0))
tCtx.CleanupCtx(b.tearDown)
}
// ClientV1 returns a wrapper for client-go which provides the V1 API on top of whatever is enabled in the cluster.
func (b *Builder) ClientV1() cgoresource.ResourceV1Interface {
return draclient.New(b.f.ClientSet)
func (b *Builder) ClientV1(tCtx ktesting.TContext) cgoresource.ResourceV1Interface {
return draclient.New(tCtx.Client())
}
func (b *Builder) tearDown(ctx context.Context) {
func (b *Builder) tearDown(tCtx ktesting.TContext) {
client := b.ClientV1(tCtx)
// Before we allow the namespace and all objects in it do be deleted by
// the framework, we must ensure that test pods and the claims that
// they use are deleted. Otherwise the driver might get deleted first,
// in which case deleting the claims won't work anymore.
ginkgo.By("delete pods and claims")
pods, err := b.listTestPods(ctx)
framework.ExpectNoError(err, "list pods")
tCtx.Log("delete pods and claims")
pods, err := b.listTestPods(tCtx)
tCtx.ExpectNoError(err, "list pods")
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
continue
}
ginkgo.By(fmt.Sprintf("deleting %T %s", &pod, klog.KObj(&pod)))
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
tCtx.Logf("Deleting %T %s", &pod, klog.KObj(&pod))
err := tCtx.Client().CoreV1().Pods(b.namespace).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
framework.ExpectNoError(err, "delete pod")
tCtx.ExpectNoError(err, "delete pod")
}
}
gomega.Eventually(func() ([]v1.Pod, error) {
return b.listTestPods(ctx)
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) []v1.Pod {
pods, err := b.listTestPods(tCtx)
tCtx.ExpectNoError(err)
return pods
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "remaining pods despite deletion")
claims, err := b.ClientV1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err, "get resource claims")
claims, err := b.ClientV1(tCtx).ResourceClaims(b.namespace).List(tCtx, metav1.ListOptions{})
tCtx.ExpectNoError(err, "get resource claims")
for _, claim := range claims.Items {
if claim.DeletionTimestamp != nil {
continue
}
ginkgo.By(fmt.Sprintf("deleting %T %s", &claim, klog.KObj(&claim)))
err := b.ClientV1().ResourceClaims(b.f.Namespace.Name).Delete(ctx, claim.Name, metav1.DeleteOptions{})
tCtx.Logf("Deleting %T %s", &claim, klog.KObj(&claim))
err := client.ResourceClaims(b.namespace).Delete(tCtx, claim.Name, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
framework.ExpectNoError(err, "delete claim")
tCtx.ExpectNoError(err, "delete claim")
}
}
for host, plugin := range b.driver.Nodes {
ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host))
gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
tCtx.Logf("Waiting for resources on %s to be unprepared", host)
ktesting.Eventually(tCtx, func(ktesting.TContext) []app.ClaimID { return plugin.GetPreparedResources() }).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
}
ginkgo.By("waiting for claims to be deallocated and deleted")
gomega.Eventually(func() ([]resourceapi.ResourceClaim, error) {
claims, err := b.ClientV1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
return claims.Items, nil
tCtx.Log("waiting for claims to be deallocated and deleted")
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) []resourceapi.ResourceClaim {
claims, err := client.ResourceClaims(tCtx.Namespace()).List(tCtx, metav1.ListOptions{})
tCtx.ExpectNoError(err)
return claims.Items
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "claims in the namespaces")
}
func (b *Builder) listTestPods(ctx context.Context) ([]v1.Pod, error) {
pods, err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
func (b *Builder) listTestPods(tCtx ktesting.TContext) ([]v1.Pod, error) {
pods, err := tCtx.Client().CoreV1().Pods(b.namespace).List(tCtx, metav1.ListOptions{})
if err != nil {
return nil, err
}

View file

@ -75,6 +75,7 @@ import (
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
"k8s.io/kubernetes/test/e2e/storage/utils"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
@ -102,30 +103,31 @@ type Nodes struct {
func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
nodes := &Nodes{}
ginkgo.BeforeEach(func(ctx context.Context) {
nodes.init(ctx, f, minNodes, maxNodes)
nodes.init(f.TContext(ctx), minNodes, maxNodes)
})
return nodes
}
// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It.
func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) *Nodes {
// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It
// or a Go unit test.
func NewNodesNow(tCtx ktesting.TContext, minNodes, maxNodes int) *Nodes {
nodes := &Nodes{}
nodes.init(ctx, f, minNodes, maxNodes)
nodes.init(tCtx, minNodes, maxNodes)
return nodes
}
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
nodes.tempDir = ginkgo.GinkgoT().TempDir()
func (nodes *Nodes) init(tCtx ktesting.TContext, minNodes, maxNodes int) {
nodes.tempDir = tCtx.TempDir()
ginkgo.By("selecting nodes")
tCtx.Log("selecting nodes")
// The kubelet plugin is harder. We deploy the builtin manifest
// after patching in the driver name and all nodes on which we
// want the plugin to run.
//
// Only a subset of the nodes are picked to avoid causing
// unnecessary load on a big cluster.
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
framework.ExpectNoError(err, "get nodes")
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(tCtx, tCtx.Client(), maxNodes)
tCtx.ExpectNoError(err, "get nodes")
numNodes := int32(len(nodeList.Items))
if int(numNodes) < minNodes {
e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes+nodes.NumReservedNodes, numNodes)
@ -139,17 +141,18 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
nodes.NodeNames = append(nodes.NodeNames, node.Name)
}
sort.Strings(nodes.NodeNames)
framework.Logf("testing on nodes %v", nodes.NodeNames)
tCtx.Logf("testing on nodes %v", nodes.NodeNames)
// Watch claims in the namespace. This is useful for monitoring a test
// and enables additional sanity checks.
resourceClaimLogger := klog.LoggerWithName(klog.FromContext(ctx), "ResourceClaimListWatch")
resourceClaimLogger := klog.LoggerWithName(klog.FromContext(tCtx), "ResourceClaimListWatch")
var resourceClaimWatchCounter atomic.Int32
resourceClient := draclient.New(f.ClientSet)
resourceClient := draclient.New(tCtx.Client())
claimInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
slices, err := resourceClient.ResourceClaims(f.Namespace.Name).List(ctx, options)
tCtx := tCtx.WithContext(ctx)
slices, err := resourceClient.ResourceClaims(tCtx.Namespace()).List(tCtx, options)
if err == nil {
resourceClaimLogger.Info("Listed ResourceClaims", "resourceAPI", resourceClient.CurrentAPI(), "numClaims", len(slices.Items), "listMeta", slices.ListMeta)
} else {
@ -158,7 +161,8 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
return slices, err
},
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
w, err := resourceClient.ResourceClaims(f.Namespace.Name).Watch(ctx, options)
tCtx := tCtx.WithContext(ctx)
w, err := resourceClient.ResourceClaims(tCtx.Namespace()).Watch(tCtx, options)
if err == nil {
resourceClaimLogger.Info("Started watching ResourceClaims", "resourceAPI", resourceClient.CurrentAPI())
wrapper := newWatchWrapper(klog.LoggerWithName(resourceClaimLogger, fmt.Sprintf("%d", resourceClaimWatchCounter.Load())), w)
@ -179,26 +183,23 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
)
cancelCtx, cancel := context.WithCancelCause(context.Background())
var wg sync.WaitGroup
ginkgo.DeferCleanup(func() {
tCtx.Cleanup(func() {
cancel(errors.New("test has completed"))
wg.Wait()
})
_, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
claim := obj.(*resourceapi.ResourceClaim)
resourceClaimLogger.Info("New claim", "claim", format.Object(claim, 0))
validateClaim(claim)
validateClaim(tCtx, claim)
},
UpdateFunc: func(oldObj, newObj any) {
defer ginkgo.GinkgoRecover()
oldClaim := oldObj.(*resourceapi.ResourceClaim)
newClaim := newObj.(*resourceapi.ResourceClaim)
resourceClaimLogger.Info("Updated claim", "newClaim", format.Object(newClaim, 0), "diff", cmp.Diff(oldClaim, newClaim))
validateClaim(newClaim)
validateClaim(tCtx, newClaim)
},
DeleteFunc: func(obj any) {
defer ginkgo.GinkgoRecover()
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
@ -206,11 +207,11 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
resourceClaimLogger.Info("Deleted claim", "claim", format.Object(claim, 0))
},
})
framework.ExpectNoError(err, "AddEventHandler")
tCtx.ExpectNoError(err, "AddEventHandler")
wg.Add(1)
go func() {
defer wg.Done()
claimInformer.Run(cancelCtx.Done())
claimInformer.RunWithContext(cancelCtx)
}()
}
@ -251,13 +252,13 @@ func (w *watchWrapper) ResultChan() <-chan watch.Event {
return w.resultChan
}
func validateClaim(claim *resourceapi.ResourceClaim) {
func validateClaim(tCtx ktesting.TContext, claim *resourceapi.ResourceClaim) {
// The apiserver doesn't enforce that a claim always has a finalizer
// while being allocated. This is a convention that whoever allocates a
// claim has to follow to prevent using a claim that is at risk of
// being deleted.
if claim.Status.Allocation != nil && len(claim.Finalizers) == 0 {
framework.Failf("Invalid claim: allocated without any finalizer:\n%s", format.Object(claim, 1))
tCtx.Errorf("Invalid claim: allocated without any finalizer:\n%s", format.Object(claim, 1))
}
}
@ -282,23 +283,24 @@ type driverResourcesMutatorFunc func(map[string]resourceslice.DriverResources)
//
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
func NewDriver(f *framework.Framework, nodes *Nodes, driverResourcesGenerator driverResourcesGenFunc, driverResourcesMutators ...driverResourcesMutatorFunc) *Driver {
d := NewDriverInstance(f)
d := NewDriverInstance(nil)
ginkgo.BeforeEach(func() {
tCtx := f.TContext(context.Background())
d.initName(tCtx)
driverResources := driverResourcesGenerator(nodes)
for _, mutator := range driverResourcesMutators {
mutator(driverResources)
}
d.Run(nodes, driverResources)
d.Run(tCtx, framework.TestContext.KubeletRootDir, nodes, driverResources)
})
return d
}
// NewDriverInstance is a variant of NewDriver where the driver is inactive and must
// be started explicitly with Run. May be used inside ginkgo.It.
func NewDriverInstance(f *framework.Framework) *Driver {
// be started explicitly with Run. May be used inside ginkgo.It or a Go unit test.
func NewDriverInstance(tCtx ktesting.TContext) *Driver {
d := &Driver{
f: f,
fail: map[MethodInstance]bool{},
callCounts: map[MethodInstance]int64{},
// By default, test with all gRPC APIs.
@ -310,24 +312,30 @@ func NewDriverInstance(f *framework.Framework) *Driver {
WithKubelet: true,
ExpectResourceSliceRemoval: true,
}
d.initName()
if tCtx != nil {
d.initName(tCtx)
}
return d
}
// ClientV1 returns a wrapper for client-go which provides the V1 API on top of whatever is enabled in the cluster.
func (d *Driver) ClientV1() cgoresource.ResourceV1Interface {
return draclient.New(d.f.ClientSet)
func (d *Driver) ClientV1(tCtx ktesting.TContext) cgoresource.ResourceV1Interface {
return draclient.New(tCtx.Client())
}
func (d *Driver) Run(nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
d.SetUp(nodes, driverResources)
ginkgo.DeferCleanup(d.TearDown)
func (d *Driver) Run(tCtx ktesting.TContext, kubeletRootDir string, nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
d.SetUp(tCtx, kubeletRootDir, nodes, driverResources)
tCtx.CleanupCtx(d.TearDown)
}
// NewGetSlices generates a function for gomega.Eventually/Consistently which
// NewGetSlices generates a function for ktesting.Eventually/Consistently which
// returns the ResourceSliceList.
func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] {
return framework.ListObjects(d.ClientV1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
func (d *Driver) NewGetSlices() func(tCtx ktesting.TContext) *resourceapi.ResourceSliceList {
return func(tCtx ktesting.TContext) *resourceapi.ResourceSliceList {
slices, err := framework.ListObjects(d.ClientV1(tCtx).ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})(tCtx)
tCtx.ExpectNoError(err, "list ResourceSlices")
return slices
}
}
type MethodInstance struct {
@ -336,9 +344,7 @@ type MethodInstance struct {
}
type Driver struct {
f *framework.Framework
ctx context.Context
cleanup []func(context.Context) // executed first-in-first-out
cleanup []func(ktesting.TContext) // executed first-in-first-out
wg sync.WaitGroup
serviceAccountName string
@ -388,39 +394,37 @@ type KubeletPlugin struct {
ClientSet kubernetes.Interface
}
func (d *Driver) initName() {
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
func (d *Driver) initName(tCtx ktesting.TContext) {
d.Name = tCtx.Namespace() + d.NameSuffix + ".k8s.io"
}
func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
d.initName()
ginkgo.By(fmt.Sprintf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames))
func (d *Driver) SetUp(tCtx ktesting.TContext, kubeletRootDir string, nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
tCtx.Logf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames)
d.Nodes = make(map[string]KubeletPlugin)
ctx, cancel := context.WithCancel(context.Background())
logger := klog.FromContext(ctx)
tCtx = tCtx.WithCancel()
logger := klog.FromContext(tCtx)
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
if d.InstanceSuffix != "" {
instance, _ := strings.CutPrefix(d.InstanceSuffix, "-")
logger = klog.LoggerWithValues(logger, "instance", instance)
}
ctx = klog.NewContext(ctx, logger)
d.ctx = ctx
d.cleanup = append(d.cleanup, func(context.Context) { cancel() })
tCtx = tCtx.WithLogger(logger)
d.cleanup = append(d.cleanup, func(ktesting.TContext) { tCtx.Cancel("cleaning up test") })
// After shutdown, check that all ResourceSlices were removed, either by the kubelet
// or our own test code. This runs last because it gets registered first.
if d.ExpectResourceSliceRemoval {
ginkgo.DeferCleanup(d.IsGone)
tCtx.CleanupCtx(d.IsGone)
}
driverResource, useMultiHostDriverResources := driverResources[multiHostDriverResources]
if useMultiHostDriverResources || !d.WithKubelet {
// We have to remove ResourceSlices ourselves.
// Otherwise the kubelet does it after unregistering the driver.
ginkgo.DeferCleanup(func(ctx context.Context) {
err := d.f.ClientSet.ResourceV1().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
framework.ExpectNoError(err, "delete ResourceSlices of the driver")
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
err := tCtx.Client().ResourceV1().ResourceSlices().DeleteCollection(tCtx, metav1.DeleteOptions{}, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
tCtx.ExpectNoError(err, "delete ResourceSlices of the driver")
})
}
@ -445,8 +449,8 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
Devices: slice.Devices,
},
}
_, err := d.f.ClientSet.ResourceV1().ResourceSlices().Create(ctx, resourceSlice, metav1.CreateOptions{})
framework.ExpectNoError(err)
_, err := tCtx.Client().ResourceV1().ResourceSlices().Create(tCtx, resourceSlice, metav1.CreateOptions{})
tCtx.ExpectNoError(err)
}
}
}
@ -460,9 +464,9 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
// Create service account and corresponding RBAC rules.
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account"
content := example.PluginPermissions
content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name)
content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", tCtx.Namespace())
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix)
d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name)
d.createFromYAML(tCtx, []byte(content), tCtx.Namespace())
// Using a ReplicaSet instead of a DaemonSet has the advantage that we can control
// the lifecycle explicitly, in particular run two pods per node long enough to
@ -470,10 +474,10 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
instanceKey := "app.kubernetes.io/instance"
rsName := ""
numNodes := int32(len(nodes.NodeNames))
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
pluginDataDirectoryPath := path.Join(kubeletRootDir, "plugins", d.Name)
registrarDirectoryPath := path.Join(kubeletRootDir, "plugins_registry")
instanceName := d.Name + d.InstanceSuffix
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
err := utils.CreateFromManifestsTCtx(tCtx, func(item interface{}) error {
switch item := item.(type) {
case *appsv1.ReplicaSet:
item.Name += d.NameSuffix + d.InstanceSuffix
@ -514,21 +518,21 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
}
return nil
}, manifests...)
framework.ExpectNoError(err, "deploy kubelet plugin replicaset")
tCtx.ExpectNoError(err, "deploy kubelet plugin replicaset")
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
framework.ExpectNoError(err, "get replicaset")
rs, err := tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace()).Get(tCtx, rsName, metav1.GetOptions{})
tCtx.ExpectNoError(err, "get replicaset")
// Wait for all pods to be running.
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil {
framework.ExpectNoError(err, "all kubelet plugin proxies running")
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(tCtx, tCtx.Client(), rs, numNodes); err != nil {
tCtx.ExpectNoError(err, "all kubelet plugin proxies running")
}
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName})
framework.ExpectNoError(err, "create label selector requirement")
tCtx.ExpectNoError(err, "create label selector requirement")
selector := labels.NewSelector().Add(*requirement)
pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
framework.ExpectNoError(err, "list proxy pods")
gomega.Expect(numNodes).To(gomega.Equal(int32(len(pods.Items))), "number of proxy pods")
pods, err := tCtx.Client().CoreV1().Pods(tCtx.Namespace()).List(tCtx, metav1.ListOptions{LabelSelector: selector.String()})
tCtx.ExpectNoError(err, "list proxy pods")
tCtx.Expect(numNodes).To(gomega.Equal(int32(len(pods.Items))), "number of proxy pods")
sort.Slice(pods.Items, func(i, j int) bool {
return pods.Items[i].Spec.NodeName < pods.Items[j].Spec.NodeName
})
@ -550,13 +554,13 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
// https://github.com/kubernetes/kubernetes/pull/124711).
//
// Here we merely use impersonation, which is faster.
driverClient := d.ImpersonateKubeletPlugin(&pod)
driverClient := d.ImpersonateKubeletPlugin(tCtx, &pod)
logger := klog.LoggerWithValues(klog.LoggerWithName(logger, "kubelet-plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
loggerCtx := klog.NewContext(ctx, logger)
loggerCtx := klog.NewContext(tCtx, logger)
fileOps := app.FileOperations{
Create: func(name string, content []byte) error {
klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
logger.Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
if d.IsLocal {
// Name starts with /cdi, which is how it is mapped in the container.
// Here we need it under /var/run.
@ -570,19 +574,21 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
}
return nil
}
return d.createFile(&pod, name, content)
return d.createFile(tCtx, &pod, name, content)
},
Remove: func(name string) error {
klog.Background().Info("deleting CDI file", "node", nodename, "filename", name)
logger.Info("deleting CDI file", "node", nodename, "filename", name)
if d.IsLocal {
name = path.Join("/var/run", name)
return os.Remove(name)
}
return d.removeFile(&pod, name)
return d.removeFile(tCtx, &pod, name)
},
HandleError: func(ctx context.Context, err error, msg string) {
// Record a failure, but don't kill the background goroutine.
// TODO: add to TContext or do it in Error/Assert/etc?
defer ginkgo.GinkgoRecover()
// During tests when canceling the context it is possible to get all kinds of
// follow-up errors for that, like:
// processing ResourceSlice objects: retrieve node "127.0.0.1": client rate limiter Wait returned an error: context canceled
@ -592,7 +598,7 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
// treat errors as failures which definitely shouldn't occur:
var droppedFields *resourceslice.DroppedFieldsError
if errors.As(err, &droppedFields) {
framework.Failf("driver %s: %v", d.Name, err)
tCtx.Errorf("driver %s: %v", d.Name, err)
}
},
}
@ -628,25 +634,25 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
kubeletplugin.FlockDirectoryPath(nodes.tempDir),
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
kubeletplugin.PluginListener(d.listen(&pod, &listenerPort)),
kubeletplugin.PluginListener(d.listen(tCtx, &pod, &listenerPort)),
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
kubeletplugin.RegistrarListener(d.listen(&pod, &listenerPort)),
kubeletplugin.RegistrarListener(d.listen(tCtx, &pod, &listenerPort)),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func(ctx context.Context) {
tCtx.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func(tCtx ktesting.TContext) {
// Depends on cancel being called first.
plugin.Stop()
// Also explicitly stop all pods.
ginkgo.By("scaling down driver proxy pods for " + d.Name)
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name)
tCtx.Log("scaling down driver proxy pods for", d.Name)
rs, err := tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace()).Get(tCtx, rsName, metav1.GetOptions{})
tCtx.ExpectNoError(err, "get ReplicaSet for driver "+d.Name)
rs.Spec.Replicas = ptr.To(int32(0))
rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{})
framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name)
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil {
framework.ExpectNoError(err, "all kubelet plugin proxies stopped")
rs, err = tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace()).Update(tCtx, rs, metav1.UpdateOptions{})
tCtx.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name)
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(tCtx, tCtx.Client(), rs, 0); err != nil {
tCtx.ExpectNoError(err, "all kubelet plugin proxies stopped")
}
})
d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient}
@ -657,8 +663,8 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
}
// Wait for registration.
ginkgo.By("wait for plugin registration")
gomega.Eventually(func() map[string][]app.GRPCCall {
tCtx.Log("wait for plugin registration")
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) map[string][]app.GRPCCall {
notRegistered := make(map[string][]app.GRPCCall)
for nodename, plugin := range d.Nodes {
calls := plugin.GetGRPCCalls()
@ -670,8 +676,8 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "hosts where the plugin has not been registered yet")
}
func (d *Driver) ImpersonateKubeletPlugin(pod *v1.Pod) kubernetes.Interface {
ginkgo.GinkgoHelper()
func (d *Driver) ImpersonateKubeletPlugin(tCtx ktesting.TContext, pod *v1.Pod) kubernetes.Interface {
tCtx.Helper()
driverUserInfo := (&serviceaccount.ServiceAccountInfo{
Name: d.serviceAccountName,
Namespace: pod.Namespace,
@ -679,36 +685,36 @@ func (d *Driver) ImpersonateKubeletPlugin(pod *v1.Pod) kubernetes.Interface {
PodName: pod.Name,
PodUID: string(pod.UID),
}).UserInfo()
driverClientConfig := d.f.ClientConfig()
driverClientConfig := tCtx.RESTConfig()
driverClientConfig.Impersonate = rest.ImpersonationConfig{
UserName: driverUserInfo.GetName(),
Groups: driverUserInfo.GetGroups(),
Extra: driverUserInfo.GetExtra(),
}
driverClient, err := kubernetes.NewForConfig(driverClientConfig)
framework.ExpectNoError(err, "create client for driver")
tCtx.ExpectNoError(err, "create client for driver")
return driverClient
}
func (d *Driver) createFile(pod *v1.Pod, name string, content []byte) error {
func (d *Driver) createFile(tCtx ktesting.TContext, pod *v1.Pod, name string, content []byte) error {
buffer := bytes.NewBuffer(content)
// Writing the content can be slow. Better create a temporary file and
// move it to the final destination once it is complete.
tmpName := name + ".tmp"
if err := d.podIO(pod).CreateFile(tmpName, buffer); err != nil {
_ = d.podIO(pod).RemoveAll(tmpName)
if err := d.podIO(tCtx, pod).CreateFile(tmpName, buffer); err != nil {
_ = d.podIO(tCtx, pod).RemoveAll(tmpName)
return err
}
return d.podIO(pod).Rename(tmpName, name)
return d.podIO(tCtx, pod).Rename(tmpName, name)
}
func (d *Driver) removeFile(pod *v1.Pod, name string) error {
return d.podIO(pod).RemoveAll(name)
func (d *Driver) removeFile(tCtx ktesting.TContext, pod *v1.Pod, name string) error {
return d.podIO(tCtx, pod).RemoveAll(name)
}
func (d *Driver) createFromYAML(ctx context.Context, content []byte, namespace string) {
func (d *Driver) createFromYAML(tCtx ktesting.TContext, content []byte, namespace string) {
// Not caching the discovery result isn't very efficient, but good enough.
discoveryCache := memory.NewMemCacheClient(d.f.ClientSet.Discovery())
discoveryCache := memory.NewMemCacheClient(tCtx.Client().Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryCache)
for _, content := range bytes.Split(content, []byte("---\n")) {
@ -717,16 +723,16 @@ func (d *Driver) createFromYAML(ctx context.Context, content []byte, namespace s
}
var obj *unstructured.Unstructured
framework.ExpectNoError(yaml.UnmarshalStrict(content, &obj), fmt.Sprintf("Full YAML:\n%s\n", string(content)))
tCtx.ExpectNoError(yaml.UnmarshalStrict(content, &obj), fmt.Sprintf("Full YAML:\n%s\n", string(content)))
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
framework.ExpectNoError(err, fmt.Sprintf("extract group+version from object %q", klog.KObj(obj)))
tCtx.ExpectNoError(err, fmt.Sprintf("extract group+version from object %q", klog.KObj(obj)))
gk := schema.GroupKind{Group: gv.Group, Kind: obj.GetKind()}
mapping, err := restMapper.RESTMapping(gk, gv.Version)
framework.ExpectNoError(err, fmt.Sprintf("map %q to resource", gk))
tCtx.ExpectNoError(err, fmt.Sprintf("map %q to resource", gk))
resourceClient := d.f.DynamicClient.Resource(mapping.Resource)
resourceClient := tCtx.Dynamic().Resource(mapping.Resource)
options := metav1.CreateOptions{
// If the YAML input is invalid, then we want the
// apiserver to tell us via an error. This can
@ -736,31 +742,31 @@ func (d *Driver) createFromYAML(ctx context.Context, content []byte, namespace s
}
switch mapping.Scope.Name() {
case meta.RESTScopeNameRoot:
_, err = resourceClient.Create(ctx, obj, options)
_, err = resourceClient.Create(tCtx, obj, options)
case meta.RESTScopeNameNamespace:
if namespace == "" {
framework.Failf("need namespace for object type %s", gk)
tCtx.Fatalf("need namespace for object type %s", gk)
}
_, err = resourceClient.Namespace(namespace).Create(ctx, obj, options)
_, err = resourceClient.Namespace(namespace).Create(tCtx, obj, options)
}
framework.ExpectNoError(err, "create object")
ginkgo.DeferCleanup(func(ctx context.Context) {
tCtx.ExpectNoError(err, "create object")
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
del := resourceClient.Delete
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
del = resourceClient.Namespace(namespace).Delete
}
err := del(ctx, obj.GetName(), metav1.DeleteOptions{})
err := del(tCtx, obj.GetName(), metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) {
framework.ExpectNoError(err, fmt.Sprintf("deleting %s.%s %s", obj.GetKind(), obj.GetAPIVersion(), klog.KObj(obj)))
tCtx.ExpectNoError(err, fmt.Sprintf("deleting %s.%s %s", obj.GetKind(), obj.GetAPIVersion(), klog.KObj(obj)))
}
})
}
}
func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
logger := klog.Background()
func (d *Driver) podIO(tCtx ktesting.TContext, pod *v1.Pod) proxy.PodDirIO {
logger := tCtx.Logger()
return proxy.PodDirIO{
F: d.f,
TCtx: tCtx,
Namespace: pod.Namespace,
PodName: pod.Name,
ContainerName: pod.Spec.Containers[0].Name,
@ -775,7 +781,7 @@ var errListenerDone = errors.New("listener is shutting down")
// listen returns the function which the kubeletplugin helper needs to open a listening socket.
// For that it spins up hostpathplugin in the pod for the desired node
// and connects to hostpathplugin via port forwarding.
func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endpoint string) (net.Listener, error) {
func (d *Driver) listen(tCtx ktesting.TContext, pod *v1.Pod, port *int32) func(ctx context.Context, endpoint string) (net.Listener, error) {
return func(ctx context.Context, endpoint string) (l net.Listener, e error) {
// No need create sockets, the kubelet is not expected to use them.
if !d.WithKubelet {
@ -801,9 +807,9 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
ctx = klog.NewContext(ctx, logger)
// Start hostpathplugin in proxy mode and keep it running until the listener gets closed.
req := d.f.ClientSet.CoreV1().RESTClient().Post().
req := tCtx.Client().CoreV1().RESTClient().Post().
Resource("pods").
Namespace(d.f.Namespace.Name).
Namespace(tCtx.Namespace()).
Name(pod.Name).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
@ -838,7 +844,7 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
runHostpathPlugin := func(ctx context.Context) (bool, error) {
// errors.Is(err, listenerDoneErr) would be nicer, but we don't get
// that error from remotecommand. Instead forgo logging when we already shut down.
if err := execute(ctx, req.URL(), d.f.ClientConfig(), 5); err != nil && ctx.Err() == nil {
if err := execute(ctx, req.URL(), tCtx.RESTConfig(), 5); err != nil && ctx.Err() == nil {
klog.FromContext(ctx).V(5).Info("execution failed, will retry", "err", err)
}
// There is no reason to stop except for context cancellation =>
@ -848,9 +854,9 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
_ = delayFn.Until(cmdCtx, true /* immediate */, true /* sliding */, runHostpathPlugin)
// Killing hostpathplugin does not remove the socket. Need to do that manually.
req := d.f.ClientSet.CoreV1().RESTClient().Post().
req := tCtx.Client().CoreV1().RESTClient().Post().
Resource("pods").
Namespace(d.f.Namespace.Name).
Namespace(tCtx.Namespace()).
Name(pod.Name).
SubResource("exec").
VersionedParams(&v1.PodExecOptions{
@ -865,7 +871,7 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
}, scheme.ParameterCodec)
cleanupLogger := klog.LoggerWithName(logger, "cleanup")
cleanupCtx := klog.NewContext(ctx, cleanupLogger)
if err := execute(cleanupCtx, req.URL(), d.f.ClientConfig(), 0); err != nil {
if err := execute(cleanupCtx, req.URL(), tCtx.RESTConfig(), 0); err != nil {
cleanupLogger.Error(err, "Socket removal failed")
}
}()
@ -881,12 +887,12 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
}
addr := proxy.Addr{
Namespace: d.f.Namespace.Name,
Namespace: tCtx.Namespace(),
PodName: pod.Name,
ContainerName: pod.Spec.Containers[0].Name,
Port: int(port),
}
listener, err := proxy.Listen(ctx, d.f.ClientSet, d.f.ClientConfig(), addr)
listener, err := proxy.Listen(ctx, tCtx.Client(), tCtx.RESTConfig(), addr)
if err != nil {
return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err)
}
@ -973,9 +979,9 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
return writer
}
func (d *Driver) TearDown(ctx context.Context) {
func (d *Driver) TearDown(tCtx ktesting.TContext) {
for _, c := range d.cleanup {
c(ctx)
c(tCtx)
}
d.cleanup = nil
d.wg.Wait()
@ -987,9 +993,9 @@ func (d *Driver) TearDown(ctx context.Context) {
// because of the delay in the kubelet.
//
// Only use this in tests where kubelet support for DRA is guaranteed.
func (d *Driver) IsGone(ctx context.Context) {
ginkgo.By(fmt.Sprintf("Waiting for ResourceSlices of driver %s to be removed...", d.Name))
gomega.Eventually(ctx, d.NewGetSlices()).WithTimeout(2 * time.Minute).Should(gomega.HaveField("Items", gomega.BeEmpty()))
func (d *Driver) IsGone(tCtx ktesting.TContext) {
tCtx.Logf("Waiting for ResourceSlices of driver %s to be removed...", d.Name)
ktesting.Eventually(tCtx, d.NewGetSlices()).WithTimeout(2 * time.Minute).Should(gomega.HaveField("Items", gomega.BeEmpty()))
}
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {

View file

@ -29,10 +29,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
clientexec "k8s.io/client-go/util/exec"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/utils/ktesting"
"github.com/onsi/gomega"
)
@ -59,14 +59,18 @@ func ExecWithOptions(f *framework.Framework, options ExecOptions) (string, strin
}
func ExecWithOptionsContext(ctx context.Context, f *framework.Framework, options ExecOptions) (string, string, error) {
return ExecWithOptionsTCtx(f.TContext(ctx), options)
}
func ExecWithOptionsTCtx(tCtx ktesting.TContext, options ExecOptions) (string, string, error) {
if !options.Quiet {
framework.Logf("ExecWithOptions %+v", options)
tCtx.Logf("ExecWithOptions %+v", options)
}
const tty = false
framework.Logf("ExecWithOptions: Clientset creation")
req := f.ClientSet.CoreV1().RESTClient().Post().
tCtx.Logf("ExecWithOptions: Clientset creation")
req := tCtx.Client().CoreV1().RESTClient().Post().
Resource("pods").
Name(options.PodName).
Namespace(options.Namespace).
@ -81,8 +85,8 @@ func ExecWithOptionsContext(ctx context.Context, f *framework.Framework, options
}, scheme.ParameterCodec)
var stdout, stderr bytes.Buffer
framework.Logf("ExecWithOptions: execute(%s)", req.URL())
err := execute(ctx, req.URL(), f.ClientConfig(), options.Stdin, &stdout, &stderr, tty)
tCtx.Logf("ExecWithOptions: execute(%s)", req.URL())
err := execute(tCtx, req.URL(), options.Stdin, &stdout, &stderr, tty)
if options.PreserveWhitespace {
return stdout.String(), stderr.String(), err
@ -182,7 +186,8 @@ func VerifyExecInPodFail(ctx context.Context, f *framework.Framework, pod *v1.Po
return fmt.Errorf("%q should fail with exit code %d, but exit without error", shExec, exitCode)
}
func execute(ctx context.Context, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
func execute(tCtx ktesting.TContext, url *url.URL, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
config := tCtx.RESTConfig()
// WebSocketExecutor executor is default
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
@ -205,7 +210,7 @@ func execute(ctx context.Context, url *url.URL, config *restclient.Config, stdin
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
return exec.StreamWithContext(tCtx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,

View file

@ -657,7 +657,7 @@ func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework)
VolumeMountGroupRequired: m.enableVolumeMountGroup,
EnableTopology: m.enableTopology,
IO: proxy.PodDirIO{
F: f,
TCtx: f.TContext(ctx),
Namespace: m.driverNamespace.Name,
PodName: podname,
ContainerName: "busybox",

View file

@ -22,13 +22,13 @@ import (
"io"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
"k8s.io/kubernetes/test/utils/ktesting"
)
type PodDirIO struct {
F *framework.Framework
TCtx ktesting.TContext
Namespace string
PodName string
ContainerName string
@ -105,7 +105,7 @@ func (p PodDirIO) RemoveAll(path string) error {
}
func (p PodDirIO) execute(command []string, stdin io.Reader) (string, string, error) {
stdout, stderr, err := e2epod.ExecWithOptions(p.F, e2epod.ExecOptions{
stdout, stderr, err := e2epod.ExecWithOptionsTCtx(p.TCtx, e2epod.ExecOptions{
Command: command,
Namespace: p.Namespace,
PodName: p.PodName,

View file

@ -23,13 +23,12 @@ import (
"errors"
"fmt"
"github.com/onsi/ginkgo/v2"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@ -39,6 +38,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework"
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/kubernetes/test/utils/ktesting"
)
// LoadFromManifests loads .yaml or .json manifest files and returns
@ -121,31 +121,31 @@ func visitManifests(cb func([]byte) error, files ...string) error {
// - only some common items are supported, unknown ones trigger an error
// - only the latest stable API version for each item is supported
func PatchItems(f *framework.Framework, driverNamespace *v1.Namespace, items ...interface{}) error {
tCtx := f.TContext(context.Background())
if driverNamespace != nil {
tCtx = tCtx.WithNamespace(driverNamespace.Name)
}
return PatchItemsTCtx(tCtx, items...)
}
// PatchItemsTCtx is a variant of PatchItems where all parameters, including
// the namespace, are passed through a TContext.
func PatchItemsTCtx(tCtx ktesting.TContext, items ...interface{}) error {
for _, item := range items {
// Uncomment when debugging the loading and patching of items.
// Logf("patching original content of %T:\n%s", item, PrettyPrint(item))
if err := patchItemRecursively(f, driverNamespace, item); err != nil {
if err := patchItemRecursively(tCtx, item); err != nil {
return err
}
}
return nil
}
// CreateItems creates the items. Each of them must be an API object
// createItems creates the items. Each of them must be an API object
// of a type that is registered in Factory.
//
// It returns either a cleanup function or an error, but never both.
//
// Cleaning up after a test can be triggered in two ways:
// - the test invokes the returned cleanup function,
// usually in an AfterEach
// - the test suite terminates, potentially after
// skipping the test's AfterEach (https://github.com/onsi/ginkgo/issues/222)
//
// PatchItems has the some limitations as LoadFromManifests:
// - only some common items are supported, unknown ones trigger an error
// - only the latest stable API version for each item is supported
func CreateItems(ctx context.Context, f *framework.Framework, ns *v1.Namespace, items ...interface{}) error {
// Object get deleted automatically during test cleanup.
func createItems(tCtx ktesting.TContext, items ...interface{}) error {
var result error
for _, item := range items {
// Each factory knows which item(s) it supports, so try each one.
@ -153,11 +153,17 @@ func CreateItems(ctx context.Context, f *framework.Framework, ns *v1.Namespace,
description := describeItem(item)
// Uncomment this line to get a full dump of the entire item.
// description = fmt.Sprintf("%s:\n%s", description, PrettyPrint(item))
framework.Logf("creating %s", description)
tCtx.Logf("creating %s", description)
for _, factory := range factories {
destructor, err := factory.Create(ctx, f, ns, item)
destructor, err := factory.Create(tCtx, item)
if destructor != nil {
ginkgo.DeferCleanup(framework.IgnoreNotFound(destructor), framework.AnnotatedLocation(fmt.Sprintf("deleting %s", description)))
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
err := destructor(tCtx)
if apierrors.IsNotFound(err) {
return
}
tCtx.ExpectNoError(err, fmt.Sprintf("deleting %s", description))
})
}
if err == nil {
done = true
@ -178,13 +184,25 @@ func CreateItems(ctx context.Context, f *framework.Framework, ns *v1.Namespace,
// CreateFromManifests is a combination of LoadFromManifests,
// PatchItems, patching with an optional custom function,
// and CreateItems.
// and creating the resulting items.
//
// Objects get deleted automatically during test cleanup.
func CreateFromManifests(ctx context.Context, f *framework.Framework, driverNamespace *v1.Namespace, patch func(item interface{}) error, files ...string) error {
tCtx := f.TContext(ctx)
if driverNamespace != nil {
tCtx = tCtx.WithNamespace(driverNamespace.Name)
}
return CreateFromManifestsTCtx(tCtx, patch, files...)
}
// CreateFromManifestsTCtx is a variant of CreateFromManifests where all parameters, including
// the driver namespace, are passed through a TContext. It is therefore usable from Go unit tests.
func CreateFromManifestsTCtx(tCtx ktesting.TContext, patch func(item interface{}) error, files ...string) error {
items, err := LoadFromManifests(files...)
if err != nil {
return fmt.Errorf("CreateFromManifests: %w", err)
}
if err := PatchItems(f, driverNamespace, items...); err != nil {
if err := PatchItemsTCtx(tCtx, items...); err != nil {
return err
}
if patch != nil {
@ -194,7 +212,7 @@ func CreateFromManifests(ctx context.Context, f *framework.Framework, driverName
}
}
}
return CreateItems(ctx, f, driverNamespace, items...)
return createItems(tCtx, items...)
}
// What is a subset of metav1.TypeMeta which (in contrast to
@ -234,7 +252,7 @@ type ItemFactory interface {
// error or a cleanup function for the created item.
// If the item is of an unsupported type, it must return
// an error that has errorItemNotSupported as cause.
Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, item interface{}) (func(ctx context.Context) error, error)
Create(tCtx ktesting.TContext, item interface{}) (func(ctx context.Context) error, error)
}
// describeItem always returns a string that describes the item,
@ -271,82 +289,79 @@ var factories = map[What]ItemFactory{
{"CustomResourceDefinition"}: &customResourceDefinitionFactory{},
}
// PatchName makes the name of some item unique by appending the
// patchName makes the name of some item unique by appending the
// generated unique name.
func PatchName(f *framework.Framework, item *string) {
func patchName(uniqueName string, item *string) {
if *item != "" {
*item = *item + "-" + f.UniqueName
*item = *item + "-" + uniqueName
}
}
// PatchNamespace moves the item into the test's namespace. Not
// patchNamespace moves the item into the test's namespace. Not
// all items can be namespaced. For those, the name also needs to be
// patched.
func PatchNamespace(f *framework.Framework, driverNamespace *v1.Namespace, item *string) {
if driverNamespace != nil {
*item = driverNamespace.GetName()
return
}
if f.Namespace != nil {
*item = f.Namespace.GetName()
func patchNamespace(tCtx ktesting.TContext, item *string) {
namespace := tCtx.Namespace()
if namespace != "" {
*item = namespace
}
}
func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace, item interface{}) error {
func patchItemRecursively(tCtx ktesting.TContext, item interface{}) error {
uniqueName := tCtx.Namespace()
switch item := item.(type) {
case *rbacv1.Subject:
PatchNamespace(f, driverNamespace, &item.Namespace)
patchNamespace(tCtx, &item.Namespace)
case *rbacv1.RoleRef:
// TODO: avoid hard-coding this special name. Perhaps add a Framework.PredefinedRoles
// which contains all role names that are defined cluster-wide before the test starts?
// All those names are exempt from renaming. That list could be populated by querying
// and get extended by tests.
if item.Name != "e2e-test-privileged-psp" {
PatchName(f, &item.Name)
patchName(uniqueName, &item.Name)
}
case *rbacv1.ClusterRole:
PatchName(f, &item.Name)
patchName(uniqueName, &item.Name)
case *rbacv1.Role:
PatchNamespace(f, driverNamespace, &item.Namespace)
patchNamespace(tCtx, &item.Namespace)
// Roles are namespaced, but because for RoleRef above we don't
// know whether the referenced role is a ClusterRole or Role
// and therefore always renames, we have to do the same here.
PatchName(f, &item.Name)
patchName(uniqueName, &item.Name)
case *storagev1.StorageClass:
PatchName(f, &item.Name)
patchName(uniqueName, &item.Name)
case *storagev1.VolumeAttributesClass:
PatchName(f, &item.Name)
patchName(uniqueName, &item.Name)
case *storagev1.CSIDriver:
PatchName(f, &item.Name)
patchName(uniqueName, &item.Name)
case *v1.ServiceAccount:
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
case *v1.Secret:
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
case *rbacv1.ClusterRoleBinding:
PatchName(f, &item.Name)
patchName(uniqueName, &item.Name)
for i := range item.Subjects {
if err := patchItemRecursively(f, driverNamespace, &item.Subjects[i]); err != nil {
return fmt.Errorf("%T: %w", f, err)
if err := patchItemRecursively(tCtx, &item.Subjects[i]); err != nil {
return fmt.Errorf("%T: %w", &item.Subjects[i], err)
}
}
if err := patchItemRecursively(f, driverNamespace, &item.RoleRef); err != nil {
return fmt.Errorf("%T: %w", f, err)
if err := patchItemRecursively(tCtx, &item.RoleRef); err != nil {
return fmt.Errorf("%T: %w", &item.RoleRef, err)
}
case *rbacv1.RoleBinding:
PatchNamespace(f, driverNamespace, &item.Namespace)
patchNamespace(tCtx, &item.Namespace)
for i := range item.Subjects {
if err := patchItemRecursively(f, driverNamespace, &item.Subjects[i]); err != nil {
return fmt.Errorf("%T: %w", f, err)
if err := patchItemRecursively(tCtx, &item.Subjects[i]); err != nil {
return fmt.Errorf("%T: %w", &item.Subjects[i], err)
}
}
if err := patchItemRecursively(f, driverNamespace, &item.RoleRef); err != nil {
return fmt.Errorf("%T: %w", f, err)
if err := patchItemRecursively(tCtx, &item.RoleRef); err != nil {
return fmt.Errorf("%T: %w", &item.RoleRef, err)
}
case *v1.Service:
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
case *appsv1.StatefulSet:
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
return err
}
@ -354,7 +369,7 @@ func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace,
return err
}
case *appsv1.Deployment:
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
return err
}
@ -362,7 +377,7 @@ func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace,
return err
}
case *appsv1.DaemonSet:
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
return err
}
@ -370,7 +385,7 @@ func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace,
return err
}
case *appsv1.ReplicaSet:
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
return err
}
@ -396,13 +411,13 @@ func (f *serviceAccountFactory) New() runtime.Object {
return &v1.ServiceAccount{}
}
func (*serviceAccountFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*serviceAccountFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*v1.ServiceAccount)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.CoreV1().ServiceAccounts(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().CoreV1().ServiceAccounts(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create ServiceAccount: %w", err)
}
return func(ctx context.Context) error {
@ -416,15 +431,15 @@ func (f *clusterRoleFactory) New() runtime.Object {
return &rbacv1.ClusterRole{}
}
func (*clusterRoleFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*clusterRoleFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*rbacv1.ClusterRole)
if !ok {
return nil, errorItemNotSupported
}
framework.Logf("Define cluster role %v", item.GetName())
client := f.ClientSet.RbacV1().ClusterRoles()
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
tCtx.Logf("define cluster role %v", item.GetName())
client := tCtx.Client().RbacV1().ClusterRoles()
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create ClusterRole: %w", err)
}
return func(ctx context.Context) error {
@ -438,14 +453,14 @@ func (f *clusterRoleBindingFactory) New() runtime.Object {
return &rbacv1.ClusterRoleBinding{}
}
func (*clusterRoleBindingFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*clusterRoleBindingFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*rbacv1.ClusterRoleBinding)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.RbacV1().ClusterRoleBindings()
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().RbacV1().ClusterRoleBindings()
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create ClusterRoleBinding: %w", err)
}
return func(ctx context.Context) error {
@ -459,14 +474,14 @@ func (f *roleFactory) New() runtime.Object {
return &rbacv1.Role{}
}
func (*roleFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*roleFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*rbacv1.Role)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.RbacV1().Roles(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().RbacV1().Roles(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create Role: %w", err)
}
return func(ctx context.Context) error {
@ -480,14 +495,14 @@ func (f *roleBindingFactory) New() runtime.Object {
return &rbacv1.RoleBinding{}
}
func (*roleBindingFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*roleBindingFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*rbacv1.RoleBinding)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.RbacV1().RoleBindings(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().RbacV1().RoleBindings(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create RoleBinding: %w", err)
}
return func(ctx context.Context) error {
@ -501,14 +516,14 @@ func (f *serviceFactory) New() runtime.Object {
return &v1.Service{}
}
func (*serviceFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*serviceFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*v1.Service)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.CoreV1().Services(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().CoreV1().Services(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create Service: %w", err)
}
return func(ctx context.Context) error {
@ -522,14 +537,14 @@ func (f *statefulSetFactory) New() runtime.Object {
return &appsv1.StatefulSet{}
}
func (*statefulSetFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*statefulSetFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*appsv1.StatefulSet)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.AppsV1().StatefulSets(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().AppsV1().StatefulSets(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create StatefulSet: %w", err)
}
return func(ctx context.Context) error {
@ -543,14 +558,14 @@ func (f *deploymentFactory) New() runtime.Object {
return &appsv1.Deployment{}
}
func (*deploymentFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*deploymentFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*appsv1.Deployment)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.AppsV1().Deployments(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().AppsV1().Deployments(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create Deployment: %w", err)
}
return func(ctx context.Context) error {
@ -564,14 +579,14 @@ func (f *daemonSetFactory) New() runtime.Object {
return &appsv1.DaemonSet{}
}
func (*daemonSetFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*daemonSetFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*appsv1.DaemonSet)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.AppsV1().DaemonSets(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().AppsV1().DaemonSets(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create DaemonSet: %w", err)
}
return func(ctx context.Context) error {
@ -585,14 +600,14 @@ func (f *replicaSetFactory) New() runtime.Object {
return &appsv1.ReplicaSet{}
}
func (*replicaSetFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*replicaSetFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*appsv1.ReplicaSet)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.AppsV1().ReplicaSets(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create ReplicaSet: %w", err)
}
return func(ctx context.Context) error {
@ -606,14 +621,14 @@ func (f *storageClassFactory) New() runtime.Object {
return &storagev1.StorageClass{}
}
func (*storageClassFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*storageClassFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*storagev1.StorageClass)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.StorageV1().StorageClasses()
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().StorageV1().StorageClasses()
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create StorageClass: %w", err)
}
return func(ctx context.Context) error {
@ -627,14 +642,14 @@ func (f *volumeAttributesClassFactory) New() runtime.Object {
return &storagev1.VolumeAttributesClass{}
}
func (*volumeAttributesClassFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*volumeAttributesClassFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*storagev1.VolumeAttributesClass)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.StorageV1().VolumeAttributesClasses()
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().StorageV1().VolumeAttributesClasses()
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create VolumeAttributesClass: %w", err)
}
return func(ctx context.Context) error {
@ -648,14 +663,14 @@ func (f *csiDriverFactory) New() runtime.Object {
return &storagev1.CSIDriver{}
}
func (*csiDriverFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*csiDriverFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*storagev1.CSIDriver)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.StorageV1().CSIDrivers()
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().StorageV1().CSIDrivers()
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create CSIDriver: %w", err)
}
return func(ctx context.Context) error {
@ -669,14 +684,14 @@ func (f *secretFactory) New() runtime.Object {
return &v1.Secret{}
}
func (*secretFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*secretFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
item, ok := i.(*v1.Secret)
if !ok {
return nil, errorItemNotSupported
}
client := f.ClientSet.CoreV1().Secrets(ns.Name)
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
client := tCtx.Client().CoreV1().Secrets(tCtx.Namespace())
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create Secret: %w", err)
}
return func(ctx context.Context) error {
@ -690,7 +705,7 @@ func (f *customResourceDefinitionFactory) New() runtime.Object {
return &apiextensionsv1.CustomResourceDefinition{}
}
func (*customResourceDefinitionFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
func (*customResourceDefinitionFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
var err error
unstructCRD := &unstructured.Unstructured{}
gvr := schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"}
@ -705,11 +720,11 @@ func (*customResourceDefinitionFactory) Create(ctx context.Context, f *framework
return nil, err
}
if _, err = f.DynamicClient.Resource(gvr).Create(ctx, unstructCRD, metav1.CreateOptions{}); err != nil {
if _, err = tCtx.Dynamic().Resource(gvr).Create(tCtx, unstructCRD, metav1.CreateOptions{}); err != nil {
return nil, fmt.Errorf("create CustomResourceDefinition: %w", err)
}
return func(ctx context.Context) error {
return f.DynamicClient.Resource(gvr).Delete(ctx, item.GetName(), metav1.DeleteOptions{})
return tCtx.Dynamic().Resource(gvr).Delete(ctx, item.GetName(), metav1.DeleteOptions{})
}, nil
}

View file

@ -32,7 +32,7 @@ func coreDRA(tCtx ktesting.TContext, f *framework.Framework, b *drautils.Builder
claim := b.ExternalClaim()
pod := b.PodExternal()
b.Create(tCtx, claim, pod)
b.TestPod(tCtx, f, pod)
b.TestPod(tCtx, pod)
return func(tCtx ktesting.TContext) step3Func {
// Remove pod prepared in step 1.
@ -45,7 +45,7 @@ func coreDRA(tCtx ktesting.TContext, f *framework.Framework, b *drautils.Builder
pod = b.PodExternal()
pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name
b.Create(tCtx, claim, pod)
b.TestPod(tCtx, f, pod)
b.TestPod(tCtx, pod)
return func(tCtx ktesting.TContext) {
// We need to clean up explicitly because the normal

View file

@ -1,89 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2edra
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/onsi/ginkgo/v2"
ginkgotypes "github.com/onsi/ginkgo/v2/types"
"k8s.io/kubernetes/test/utils/ktesting"
)
var (
TimeNow = time.Now // Can be stubbed out for testing.
Pid = os.Getpid() // Can be stubbed out for testing.
)
// TODO: replace with helper code from https://github.com/kubernetes/kubernetes/pull/122481 should that get merged - or vice versa.
type ginkgoTB struct {
ktesting.TB
}
var _ ktesting.ContextTB = &ginkgoTB{}
func GinkgoContextTB() ktesting.ContextTB {
return &ginkgoTB{
TB: ginkgo.GinkgoT(),
}
}
// CleanupCtx implements [ktesting.ContextTB.CleanupCtx]. It's identical to
// ginkgo.DeferCleanup.
func (g *ginkgoTB) CleanupCtx(cb func(context.Context)) {
ginkgo.GinkgoHelper()
ginkgo.DeferCleanup(cb)
}
// Log overrides the implementation from Ginkgo to ensure consistent output.
func (g *ginkgoTB) Log(args ...any) {
log(1, fmt.Sprint(args...))
}
// Logf overrides the implementation from Ginkgo to ensure consistent output.
func (g *ginkgoTB) Logf(format string, args ...any) {
log(1, fmt.Sprintf(format, args...))
}
// log re-implements klog.Info: same header, but stack unwinding
// with support for ginkgo.GinkgoWriter and skipping stack levels.
func log(offset int, msg string) {
now := TimeNow()
file, line := unwind(offset + 1)
if file == "" {
file = "???"
line = 1
} else if slash := strings.LastIndex(file, "/"); slash >= 0 {
file = file[slash+1:]
}
_, month, day := now.Date()
hour, minute, second := now.Clock()
header := fmt.Sprintf("I%02d%02d %02d:%02d:%02d.%06d %d %s:%d]",
month, day, hour, minute, second, now.Nanosecond()/1000, Pid, file, line)
_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, header, msg)
}
func unwind(skip int) (string, int) {
location := ginkgotypes.NewCodeLocation(skip + 1)
return location.FileName, location.LineNumber
}

View file

@ -266,15 +266,15 @@ var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("v%d.%d", major, previousMinor))
tCtx.ExpectNoError(e2enode.WaitForAllNodesSchedulable(tCtx, tCtx.Client(), f.Timeouts.NodeSchedulable), "wait for all nodes to be schedulable")
nodes := drautils.NewNodesNow(tCtx, f, 1, 1)
nodes := drautils.NewNodesNow(tCtx, 1, 1)
// Opening sockets locally avoids intermittent errors and delays caused by proxying through the restarted apiserver.
// We could speed up testing by shortening the sync delay in the ResourceSlice controller, but let's better
// test the defaults.
driver := drautils.NewDriverInstance(f)
driver := drautils.NewDriverInstance(tCtx)
driver.IsLocal = true
driver.Run(nodes, drautils.DriverResourcesNow(nodes, 8))
b := drautils.NewBuilderNow(ctx, f, driver)
driver.Run(tCtx, nodes, drautils.DriverResourcesNow(nodes, 8))
b := drautils.NewBuilderNow(tCtx, driver)
tCtx = ktesting.End(tCtx)
@ -305,7 +305,7 @@ var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
// The kubelet wipes all ResourceSlices on a restart because it doesn't know which drivers were running.
// Wait for the ResourceSlice controller in the driver to notice and recreate the ResourceSlices.
tCtx = ktesting.Begin(tCtx, "wait for ResourceSlices")
gomega.Eventually(ctx, driver.NewGetSlices()).WithTimeout(5 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
ktesting.Eventually(tCtx, driver.NewGetSlices()).WithTimeout(5 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
tCtx = ktesting.End(tCtx)
steps3 := make(map[string]step3Func, len(subTests))