mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-02-14 08:14:28 -05:00
DRA e2e: make driver deployment possible in Go unit tests
This leverages ktesting as wrapper around Ginkgo and testing.T to make all helper code that is needed to deploy a DRA driver available to Go unit tests and thus integration tests. How to proceed with unifying helper code for integration and E2E testing is open. This is just a minimal first step in that direction. Ideally, such code should be in separate packages where usage of Ginkgo, e2e/framework and gomega.Expect/Eventually/Consistently are forbidden. While at it, the builder gets extended to make cleanup optional. This will be needed for upgrade/downgrade testing with sub-tests.
This commit is contained in:
parent
65ef31973c
commit
7c7b1e1018
10 changed files with 612 additions and 613 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -42,8 +42,10 @@ import (
|
|||
draclient "k8s.io/dynamic-resource-allocation/client"
|
||||
"k8s.io/dynamic-resource-allocation/resourceslice"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/test/e2e/dra/test-driver/app"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
admissionapi "k8s.io/pod-security-admission/api"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
|
@ -64,21 +66,22 @@ func (b *Builder) ExtendedResourceName(i int) string {
|
|||
}
|
||||
}
|
||||
|
||||
// Builder contains a running counter to make objects unique within thir
|
||||
// Builder contains a running counter to make objects unique within their
|
||||
// namespace.
|
||||
type Builder struct {
|
||||
f *framework.Framework
|
||||
namespace string
|
||||
driver *Driver
|
||||
UseExtendedResourceName bool
|
||||
|
||||
podCounter int
|
||||
claimCounter int
|
||||
ClassParameters string // JSON
|
||||
SkipCleanup bool
|
||||
}
|
||||
|
||||
// ClassName returns the default device class name.
|
||||
func (b *Builder) ClassName() string {
|
||||
return b.f.UniqueName + b.driver.NameSuffix + "-class"
|
||||
return b.namespace + b.driver.NameSuffix + "-class"
|
||||
}
|
||||
|
||||
// SingletonIndex causes Builder.Class and ExtendedResourceName to create a
|
||||
|
|
@ -243,7 +246,7 @@ func (b *Builder) Pod() *v1.Pod {
|
|||
//
|
||||
// It is tempting to use `terminationGraceperiodSeconds: 0`, but that is a very bad
|
||||
// idea because it removes the pod before the kubelet had a chance to react (https://github.com/kubernetes/kubernetes/issues/120671).
|
||||
pod := e2epod.MakePod(b.f.Namespace.Name, nil, nil, admissionapi.LevelRestricted, "" /* no command = pause */)
|
||||
pod := e2epod.MakePod(b.namespace, nil, nil, admissionapi.LevelRestricted, "" /* no command = pause */)
|
||||
pod.Labels = make(map[string]string)
|
||||
pod.Spec.RestartPolicy = v1.RestartPolicyNever
|
||||
pod.GenerateName = ""
|
||||
|
|
@ -338,100 +341,108 @@ func (b *Builder) PodExternalMultiple() *v1.Pod {
|
|||
}
|
||||
|
||||
// Create takes a bunch of objects and calls their Create function.
|
||||
func (b *Builder) Create(ctx context.Context, objs ...klog.KMetadata) []klog.KMetadata {
|
||||
func (b *Builder) Create(tCtx ktesting.TContext, objs ...klog.KMetadata) []klog.KMetadata {
|
||||
tCtx.Helper()
|
||||
cleanupCtx := tCtx.CleanupCtx
|
||||
if b.SkipCleanup {
|
||||
cleanupCtx = func(func(tCtx ktesting.TContext)) {}
|
||||
}
|
||||
|
||||
var createdObjs []klog.KMetadata
|
||||
for _, obj := range objs {
|
||||
ginkgo.By(fmt.Sprintf("creating %T %s", obj, obj.GetName()))
|
||||
tCtx.Logf("Creating %T %s", obj, obj.GetName())
|
||||
var err error
|
||||
var createdObj klog.KMetadata
|
||||
switch obj := obj.(type) {
|
||||
case *resourceapi.DeviceClass:
|
||||
createdObj, err = b.ClientV1().DeviceClasses().Create(ctx, obj, metav1.CreateOptions{})
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
err := b.ClientV1().DeviceClasses().Delete(ctx, createdObj.GetName(), metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "delete device class")
|
||||
createdObj, err = b.ClientV1(tCtx).DeviceClasses().Create(tCtx, obj, metav1.CreateOptions{})
|
||||
cleanupCtx(func(tCtx ktesting.TContext) {
|
||||
err := b.ClientV1(tCtx).DeviceClasses().Delete(tCtx, createdObj.GetName(), metav1.DeleteOptions{})
|
||||
tCtx.ExpectNoError(err, "delete device class")
|
||||
})
|
||||
case *v1.Pod:
|
||||
createdObj, err = b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().CoreV1().Pods(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *v1.ResourceQuota:
|
||||
createdObj, err = b.f.ClientSet.CoreV1().ResourceQuotas(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().CoreV1().ResourceQuotas(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *v1.ConfigMap:
|
||||
createdObj, err = b.f.ClientSet.CoreV1().ConfigMaps(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().CoreV1().ConfigMaps(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *resourceapi.ResourceClaim:
|
||||
createdObj, err = b.ClientV1().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = b.ClientV1(tCtx).ResourceClaims(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *resourcev1beta1.ResourceClaim:
|
||||
createdObj, err = b.f.ClientSet.ResourceV1beta1().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().ResourceV1beta1().ResourceClaims(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *resourcev1beta2.ResourceClaim:
|
||||
createdObj, err = b.f.ClientSet.ResourceV1beta2().ResourceClaims(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().ResourceV1beta2().ResourceClaims(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *resourceapi.ResourceClaimTemplate:
|
||||
createdObj, err = b.ClientV1().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = b.ClientV1(tCtx).ResourceClaimTemplates(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *resourcev1beta1.ResourceClaimTemplate:
|
||||
createdObj, err = b.f.ClientSet.ResourceV1beta1().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().ResourceV1beta1().ResourceClaimTemplates(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *resourcev1beta2.ResourceClaimTemplate:
|
||||
createdObj, err = b.f.ClientSet.ResourceV1beta2().ResourceClaimTemplates(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().ResourceV1beta2().ResourceClaimTemplates(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
case *resourceapi.ResourceSlice:
|
||||
createdObj, err = b.ClientV1().ResourceSlices().Create(ctx, obj, metav1.CreateOptions{})
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
err := b.ClientV1().ResourceSlices().Delete(ctx, createdObj.GetName(), metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "delete node resource slice")
|
||||
createdObj, err = b.ClientV1(tCtx).ResourceSlices().Create(tCtx, obj, metav1.CreateOptions{})
|
||||
cleanupCtx(func(tCtx ktesting.TContext) {
|
||||
err := b.ClientV1(tCtx).ResourceSlices().Delete(tCtx, createdObj.GetName(), metav1.DeleteOptions{})
|
||||
tCtx.ExpectNoError(err, "delete node resource slice")
|
||||
})
|
||||
case *resourcealphaapi.DeviceTaintRule:
|
||||
createdObj, err = b.f.ClientSet.ResourceV1alpha3().DeviceTaintRules().Create(ctx, obj, metav1.CreateOptions{})
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
err := b.f.ClientSet.ResourceV1alpha3().DeviceTaintRules().Delete(ctx, createdObj.GetName(), metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "delete DeviceTaintRule")
|
||||
createdObj, err = tCtx.Client().ResourceV1alpha3().DeviceTaintRules().Create(tCtx, obj, metav1.CreateOptions{})
|
||||
cleanupCtx(func(tCtx ktesting.TContext) {
|
||||
err := tCtx.Client().ResourceV1alpha3().DeviceTaintRules().Delete(tCtx, createdObj.GetName(), metav1.DeleteOptions{})
|
||||
tCtx.ExpectNoError(err, "delete DeviceTaintRule")
|
||||
})
|
||||
case *appsv1.DaemonSet:
|
||||
createdObj, err = b.f.ClientSet.AppsV1().DaemonSets(b.f.Namespace.Name).Create(ctx, obj, metav1.CreateOptions{})
|
||||
createdObj, err = tCtx.Client().AppsV1().DaemonSets(b.namespace).Create(tCtx, obj, metav1.CreateOptions{})
|
||||
// Cleanup not really needed, but speeds up namespace shutdown.
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
err := b.f.ClientSet.AppsV1().DaemonSets(b.f.Namespace.Name).Delete(ctx, obj.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoError(err, "delete daemonset")
|
||||
cleanupCtx(func(tCtx ktesting.TContext) {
|
||||
err := tCtx.Client().AppsV1().DaemonSets(b.namespace).Delete(tCtx, obj.Name, metav1.DeleteOptions{})
|
||||
tCtx.ExpectNoError(err, "delete daemonset")
|
||||
})
|
||||
default:
|
||||
framework.Fail(fmt.Sprintf("internal error, unsupported type %T", obj), 1)
|
||||
tCtx.Fatalf("internal error, unsupported type %T", obj)
|
||||
}
|
||||
framework.ExpectNoErrorWithOffset(1, err, "create %T", obj)
|
||||
tCtx.ExpectNoError(err, "create %T", obj)
|
||||
createdObjs = append(createdObjs, createdObj)
|
||||
}
|
||||
return createdObjs
|
||||
}
|
||||
|
||||
func (b *Builder) DeletePodAndWaitForNotFound(ctx context.Context, pod *v1.Pod) {
|
||||
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
|
||||
framework.ExpectNoErrorWithOffset(1, err, "delete %T", pod)
|
||||
err = e2epod.WaitForPodNotFoundInNamespace(ctx, b.f.ClientSet, pod.Name, pod.Namespace, b.f.Timeouts.PodDelete)
|
||||
framework.ExpectNoErrorWithOffset(1, err, "terminate %T", pod)
|
||||
func (b *Builder) DeletePodAndWaitForNotFound(tCtx ktesting.TContext, pod *v1.Pod) {
|
||||
tCtx.Helper()
|
||||
err := tCtx.Client().CoreV1().Pods(b.namespace).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
|
||||
tCtx.ExpectNoError(err, "delete %T", pod)
|
||||
/* TODO: add timeouts to TContext? */
|
||||
err = e2epod.WaitForPodNotFoundInNamespace(tCtx, tCtx.Client(), pod.Name, pod.Namespace, 5*time.Minute /* former b.f.Timeouts.PodDelete */)
|
||||
tCtx.ExpectNoError(err, "terminate %T", pod)
|
||||
}
|
||||
|
||||
// TestPod runs pod and checks if container logs contain expected environment variables
|
||||
func (b *Builder) TestPod(ctx context.Context, f *framework.Framework, pod *v1.Pod, env ...string) {
|
||||
ginkgo.GinkgoHelper()
|
||||
func (b *Builder) TestPod(tCtx ktesting.TContext, pod *v1.Pod, env ...string) {
|
||||
tCtx.Helper()
|
||||
|
||||
if !b.driver.WithKubelet {
|
||||
// Less testing when we cannot rely on the kubelet to actually run the pod.
|
||||
err := e2epod.WaitForPodScheduled(ctx, f.ClientSet, pod.Namespace, pod.Name)
|
||||
framework.ExpectNoError(err, "schedule pod")
|
||||
err := e2epod.WaitForPodScheduled(tCtx, tCtx.Client(), pod.Namespace, pod.Name)
|
||||
tCtx.ExpectNoError(err, "schedule pod")
|
||||
return
|
||||
}
|
||||
|
||||
err := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
|
||||
framework.ExpectNoError(err, "start pod")
|
||||
err := e2epod.WaitForPodRunningInNamespace(tCtx, tCtx.Client(), pod)
|
||||
tCtx.ExpectNoError(err, "start pod")
|
||||
|
||||
if len(env) == 0 {
|
||||
_, env = b.ParametersEnv()
|
||||
}
|
||||
for _, container := range pod.Spec.Containers {
|
||||
TestContainerEnv(ctx, f, pod, container.Name, false, env...)
|
||||
TestContainerEnv(tCtx, pod, container.Name, false, env...)
|
||||
}
|
||||
}
|
||||
|
||||
// envLineRE matches env output with variables set by test/e2e/dra/test-driver.
|
||||
var envLineRE = regexp.MustCompile(`^(?:admin|user|claim)_[a-zA-Z0-9_]*=.*$`)
|
||||
|
||||
func TestContainerEnv(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string, fullMatch bool, env ...string) {
|
||||
ginkgo.GinkgoHelper()
|
||||
stdout, stderr, err := e2epod.ExecWithOptionsContext(ctx, f, e2epod.ExecOptions{
|
||||
func TestContainerEnv(tCtx ktesting.TContext, pod *v1.Pod, containerName string, fullMatch bool, env ...string) {
|
||||
tCtx.Helper()
|
||||
stdout, stderr, err := e2epod.ExecWithOptionsTCtx(tCtx, e2epod.ExecOptions{
|
||||
Command: []string{"env"},
|
||||
Namespace: pod.Namespace,
|
||||
PodName: pod.Name,
|
||||
|
|
@ -440,8 +451,8 @@ func TestContainerEnv(ctx context.Context, f *framework.Framework, pod *v1.Pod,
|
|||
CaptureStderr: true,
|
||||
Quiet: true,
|
||||
})
|
||||
framework.ExpectNoError(err, fmt.Sprintf("get env output for container %s", containerName))
|
||||
gomega.Expect(stderr).To(gomega.BeEmpty(), fmt.Sprintf("env stderr for container %s", containerName))
|
||||
tCtx.ExpectNoError(err, fmt.Sprintf("get env output for container %s", containerName))
|
||||
tCtx.Expect(stderr).To(gomega.BeEmpty(), fmt.Sprintf("env stderr for container %s", containerName))
|
||||
if fullMatch {
|
||||
// Find all env variables set by the test driver.
|
||||
var actualEnv, expectEnv []string
|
||||
|
|
@ -455,91 +466,96 @@ func TestContainerEnv(ctx context.Context, f *framework.Framework, pod *v1.Pod,
|
|||
}
|
||||
sort.Strings(actualEnv)
|
||||
sort.Strings(expectEnv)
|
||||
gomega.Expect(actualEnv).To(gomega.Equal(expectEnv), fmt.Sprintf("container %s env output:\n%s", containerName, stdout))
|
||||
tCtx.Expect(actualEnv).To(gomega.Equal(expectEnv), fmt.Sprintf("container %s env output:\n%s", containerName, stdout))
|
||||
} else {
|
||||
for i := 0; i < len(env); i += 2 {
|
||||
envStr := fmt.Sprintf("%s=%s\n", env[i], env[i+1])
|
||||
gomega.Expect(stdout).To(gomega.ContainSubstring(envStr), fmt.Sprintf("container %s env variables", containerName))
|
||||
tCtx.Expect(stdout).To(gomega.ContainSubstring(envStr), fmt.Sprintf("container %s env variables", containerName))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewBuilder(f *framework.Framework, driver *Driver) *Builder {
|
||||
b := &Builder{f: f, driver: driver}
|
||||
ginkgo.BeforeEach(b.setUp)
|
||||
b := &Builder{driver: driver}
|
||||
ginkgo.BeforeEach(func() {
|
||||
b.setUp(f.TContext(context.Background()))
|
||||
})
|
||||
return b
|
||||
}
|
||||
|
||||
func NewBuilderNow(ctx context.Context, f *framework.Framework, driver *Driver) *Builder {
|
||||
b := &Builder{f: f, driver: driver}
|
||||
b.setUp(ctx)
|
||||
func NewBuilderNow(tCtx ktesting.TContext, driver *Driver) *Builder {
|
||||
b := &Builder{driver: driver}
|
||||
b.setUp(tCtx)
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *Builder) setUp(ctx context.Context) {
|
||||
func (b *Builder) setUp(tCtx ktesting.TContext) {
|
||||
b.namespace = tCtx.Namespace()
|
||||
b.podCounter = 0
|
||||
b.claimCounter = 0
|
||||
b.Create(ctx, b.Class(0))
|
||||
ginkgo.DeferCleanup(b.tearDown)
|
||||
b.Create(tCtx, b.Class(0))
|
||||
tCtx.CleanupCtx(b.tearDown)
|
||||
}
|
||||
|
||||
// ClientV1 returns a wrapper for client-go which provides the V1 API on top of whatever is enabled in the cluster.
|
||||
func (b *Builder) ClientV1() cgoresource.ResourceV1Interface {
|
||||
return draclient.New(b.f.ClientSet)
|
||||
func (b *Builder) ClientV1(tCtx ktesting.TContext) cgoresource.ResourceV1Interface {
|
||||
return draclient.New(tCtx.Client())
|
||||
}
|
||||
|
||||
func (b *Builder) tearDown(ctx context.Context) {
|
||||
func (b *Builder) tearDown(tCtx ktesting.TContext) {
|
||||
client := b.ClientV1(tCtx)
|
||||
|
||||
// Before we allow the namespace and all objects in it do be deleted by
|
||||
// the framework, we must ensure that test pods and the claims that
|
||||
// they use are deleted. Otherwise the driver might get deleted first,
|
||||
// in which case deleting the claims won't work anymore.
|
||||
ginkgo.By("delete pods and claims")
|
||||
pods, err := b.listTestPods(ctx)
|
||||
framework.ExpectNoError(err, "list pods")
|
||||
tCtx.Log("delete pods and claims")
|
||||
pods, err := b.listTestPods(tCtx)
|
||||
tCtx.ExpectNoError(err, "list pods")
|
||||
for _, pod := range pods {
|
||||
if pod.DeletionTimestamp != nil {
|
||||
continue
|
||||
}
|
||||
ginkgo.By(fmt.Sprintf("deleting %T %s", &pod, klog.KObj(&pod)))
|
||||
err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{})
|
||||
tCtx.Logf("Deleting %T %s", &pod, klog.KObj(&pod))
|
||||
err := tCtx.Client().CoreV1().Pods(b.namespace).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
|
||||
if !apierrors.IsNotFound(err) {
|
||||
framework.ExpectNoError(err, "delete pod")
|
||||
tCtx.ExpectNoError(err, "delete pod")
|
||||
}
|
||||
}
|
||||
gomega.Eventually(func() ([]v1.Pod, error) {
|
||||
return b.listTestPods(ctx)
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) []v1.Pod {
|
||||
pods, err := b.listTestPods(tCtx)
|
||||
tCtx.ExpectNoError(err)
|
||||
return pods
|
||||
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "remaining pods despite deletion")
|
||||
|
||||
claims, err := b.ClientV1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
framework.ExpectNoError(err, "get resource claims")
|
||||
claims, err := b.ClientV1(tCtx).ResourceClaims(b.namespace).List(tCtx, metav1.ListOptions{})
|
||||
tCtx.ExpectNoError(err, "get resource claims")
|
||||
for _, claim := range claims.Items {
|
||||
if claim.DeletionTimestamp != nil {
|
||||
continue
|
||||
}
|
||||
ginkgo.By(fmt.Sprintf("deleting %T %s", &claim, klog.KObj(&claim)))
|
||||
err := b.ClientV1().ResourceClaims(b.f.Namespace.Name).Delete(ctx, claim.Name, metav1.DeleteOptions{})
|
||||
tCtx.Logf("Deleting %T %s", &claim, klog.KObj(&claim))
|
||||
err := client.ResourceClaims(b.namespace).Delete(tCtx, claim.Name, metav1.DeleteOptions{})
|
||||
if !apierrors.IsNotFound(err) {
|
||||
framework.ExpectNoError(err, "delete claim")
|
||||
tCtx.ExpectNoError(err, "delete claim")
|
||||
}
|
||||
}
|
||||
|
||||
for host, plugin := range b.driver.Nodes {
|
||||
ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host))
|
||||
gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
|
||||
tCtx.Logf("Waiting for resources on %s to be unprepared", host)
|
||||
ktesting.Eventually(tCtx, func(ktesting.TContext) []app.ClaimID { return plugin.GetPreparedResources() }).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host)
|
||||
}
|
||||
|
||||
ginkgo.By("waiting for claims to be deallocated and deleted")
|
||||
gomega.Eventually(func() ([]resourceapi.ResourceClaim, error) {
|
||||
claims, err := b.ClientV1().ResourceClaims(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return claims.Items, nil
|
||||
tCtx.Log("waiting for claims to be deallocated and deleted")
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) []resourceapi.ResourceClaim {
|
||||
claims, err := client.ResourceClaims(tCtx.Namespace()).List(tCtx, metav1.ListOptions{})
|
||||
tCtx.ExpectNoError(err)
|
||||
return claims.Items
|
||||
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "claims in the namespaces")
|
||||
}
|
||||
|
||||
func (b *Builder) listTestPods(ctx context.Context) ([]v1.Pod, error) {
|
||||
pods, err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
func (b *Builder) listTestPods(tCtx ktesting.TContext) ([]v1.Pod, error) {
|
||||
pods, err := tCtx.Client().CoreV1().Pods(b.namespace).List(tCtx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ import (
|
|||
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
|
||||
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
|
||||
"k8s.io/kubernetes/test/e2e/storage/utils"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
"k8s.io/utils/clock"
|
||||
"k8s.io/utils/ptr"
|
||||
"sigs.k8s.io/yaml"
|
||||
|
|
@ -102,30 +103,31 @@ type Nodes struct {
|
|||
func NewNodes(f *framework.Framework, minNodes, maxNodes int) *Nodes {
|
||||
nodes := &Nodes{}
|
||||
ginkgo.BeforeEach(func(ctx context.Context) {
|
||||
nodes.init(ctx, f, minNodes, maxNodes)
|
||||
nodes.init(f.TContext(ctx), minNodes, maxNodes)
|
||||
})
|
||||
return nodes
|
||||
}
|
||||
|
||||
// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It.
|
||||
func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) *Nodes {
|
||||
// NewNodesNow is a variant of NewNodes which can be used inside a ginkgo.It
|
||||
// or a Go unit test.
|
||||
func NewNodesNow(tCtx ktesting.TContext, minNodes, maxNodes int) *Nodes {
|
||||
nodes := &Nodes{}
|
||||
nodes.init(ctx, f, minNodes, maxNodes)
|
||||
nodes.init(tCtx, minNodes, maxNodes)
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
|
||||
nodes.tempDir = ginkgo.GinkgoT().TempDir()
|
||||
func (nodes *Nodes) init(tCtx ktesting.TContext, minNodes, maxNodes int) {
|
||||
nodes.tempDir = tCtx.TempDir()
|
||||
|
||||
ginkgo.By("selecting nodes")
|
||||
tCtx.Log("selecting nodes")
|
||||
// The kubelet plugin is harder. We deploy the builtin manifest
|
||||
// after patching in the driver name and all nodes on which we
|
||||
// want the plugin to run.
|
||||
//
|
||||
// Only a subset of the nodes are picked to avoid causing
|
||||
// unnecessary load on a big cluster.
|
||||
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
|
||||
framework.ExpectNoError(err, "get nodes")
|
||||
nodeList, err := e2enode.GetBoundedReadySchedulableNodes(tCtx, tCtx.Client(), maxNodes)
|
||||
tCtx.ExpectNoError(err, "get nodes")
|
||||
numNodes := int32(len(nodeList.Items))
|
||||
if int(numNodes) < minNodes {
|
||||
e2eskipper.Skipf("%d ready nodes required, only have %d", minNodes+nodes.NumReservedNodes, numNodes)
|
||||
|
|
@ -139,17 +141,18 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
|
|||
nodes.NodeNames = append(nodes.NodeNames, node.Name)
|
||||
}
|
||||
sort.Strings(nodes.NodeNames)
|
||||
framework.Logf("testing on nodes %v", nodes.NodeNames)
|
||||
tCtx.Logf("testing on nodes %v", nodes.NodeNames)
|
||||
|
||||
// Watch claims in the namespace. This is useful for monitoring a test
|
||||
// and enables additional sanity checks.
|
||||
resourceClaimLogger := klog.LoggerWithName(klog.FromContext(ctx), "ResourceClaimListWatch")
|
||||
resourceClaimLogger := klog.LoggerWithName(klog.FromContext(tCtx), "ResourceClaimListWatch")
|
||||
var resourceClaimWatchCounter atomic.Int32
|
||||
resourceClient := draclient.New(f.ClientSet)
|
||||
resourceClient := draclient.New(tCtx.Client())
|
||||
claimInformer := cache.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
|
||||
slices, err := resourceClient.ResourceClaims(f.Namespace.Name).List(ctx, options)
|
||||
tCtx := tCtx.WithContext(ctx)
|
||||
slices, err := resourceClient.ResourceClaims(tCtx.Namespace()).List(tCtx, options)
|
||||
if err == nil {
|
||||
resourceClaimLogger.Info("Listed ResourceClaims", "resourceAPI", resourceClient.CurrentAPI(), "numClaims", len(slices.Items), "listMeta", slices.ListMeta)
|
||||
} else {
|
||||
|
|
@ -158,7 +161,8 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
|
|||
return slices, err
|
||||
},
|
||||
WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
|
||||
w, err := resourceClient.ResourceClaims(f.Namespace.Name).Watch(ctx, options)
|
||||
tCtx := tCtx.WithContext(ctx)
|
||||
w, err := resourceClient.ResourceClaims(tCtx.Namespace()).Watch(tCtx, options)
|
||||
if err == nil {
|
||||
resourceClaimLogger.Info("Started watching ResourceClaims", "resourceAPI", resourceClient.CurrentAPI())
|
||||
wrapper := newWatchWrapper(klog.LoggerWithName(resourceClaimLogger, fmt.Sprintf("%d", resourceClaimWatchCounter.Load())), w)
|
||||
|
|
@ -179,26 +183,23 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
|
|||
)
|
||||
cancelCtx, cancel := context.WithCancelCause(context.Background())
|
||||
var wg sync.WaitGroup
|
||||
ginkgo.DeferCleanup(func() {
|
||||
tCtx.Cleanup(func() {
|
||||
cancel(errors.New("test has completed"))
|
||||
wg.Wait()
|
||||
})
|
||||
_, err = claimInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj any) {
|
||||
defer ginkgo.GinkgoRecover()
|
||||
claim := obj.(*resourceapi.ResourceClaim)
|
||||
resourceClaimLogger.Info("New claim", "claim", format.Object(claim, 0))
|
||||
validateClaim(claim)
|
||||
validateClaim(tCtx, claim)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj any) {
|
||||
defer ginkgo.GinkgoRecover()
|
||||
oldClaim := oldObj.(*resourceapi.ResourceClaim)
|
||||
newClaim := newObj.(*resourceapi.ResourceClaim)
|
||||
resourceClaimLogger.Info("Updated claim", "newClaim", format.Object(newClaim, 0), "diff", cmp.Diff(oldClaim, newClaim))
|
||||
validateClaim(newClaim)
|
||||
validateClaim(tCtx, newClaim)
|
||||
},
|
||||
DeleteFunc: func(obj any) {
|
||||
defer ginkgo.GinkgoRecover()
|
||||
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||
obj = tombstone.Obj
|
||||
}
|
||||
|
|
@ -206,11 +207,11 @@ func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes,
|
|||
resourceClaimLogger.Info("Deleted claim", "claim", format.Object(claim, 0))
|
||||
},
|
||||
})
|
||||
framework.ExpectNoError(err, "AddEventHandler")
|
||||
tCtx.ExpectNoError(err, "AddEventHandler")
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
claimInformer.Run(cancelCtx.Done())
|
||||
claimInformer.RunWithContext(cancelCtx)
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
@ -251,13 +252,13 @@ func (w *watchWrapper) ResultChan() <-chan watch.Event {
|
|||
return w.resultChan
|
||||
}
|
||||
|
||||
func validateClaim(claim *resourceapi.ResourceClaim) {
|
||||
func validateClaim(tCtx ktesting.TContext, claim *resourceapi.ResourceClaim) {
|
||||
// The apiserver doesn't enforce that a claim always has a finalizer
|
||||
// while being allocated. This is a convention that whoever allocates a
|
||||
// claim has to follow to prevent using a claim that is at risk of
|
||||
// being deleted.
|
||||
if claim.Status.Allocation != nil && len(claim.Finalizers) == 0 {
|
||||
framework.Failf("Invalid claim: allocated without any finalizer:\n%s", format.Object(claim, 1))
|
||||
tCtx.Errorf("Invalid claim: allocated without any finalizer:\n%s", format.Object(claim, 1))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -282,23 +283,24 @@ type driverResourcesMutatorFunc func(map[string]resourceslice.DriverResources)
|
|||
//
|
||||
// Call this outside of ginkgo.It, then use the instance inside ginkgo.It.
|
||||
func NewDriver(f *framework.Framework, nodes *Nodes, driverResourcesGenerator driverResourcesGenFunc, driverResourcesMutators ...driverResourcesMutatorFunc) *Driver {
|
||||
d := NewDriverInstance(f)
|
||||
d := NewDriverInstance(nil)
|
||||
|
||||
ginkgo.BeforeEach(func() {
|
||||
tCtx := f.TContext(context.Background())
|
||||
d.initName(tCtx)
|
||||
driverResources := driverResourcesGenerator(nodes)
|
||||
for _, mutator := range driverResourcesMutators {
|
||||
mutator(driverResources)
|
||||
}
|
||||
d.Run(nodes, driverResources)
|
||||
d.Run(tCtx, framework.TestContext.KubeletRootDir, nodes, driverResources)
|
||||
})
|
||||
return d
|
||||
}
|
||||
|
||||
// NewDriverInstance is a variant of NewDriver where the driver is inactive and must
|
||||
// be started explicitly with Run. May be used inside ginkgo.It.
|
||||
func NewDriverInstance(f *framework.Framework) *Driver {
|
||||
// be started explicitly with Run. May be used inside ginkgo.It or a Go unit test.
|
||||
func NewDriverInstance(tCtx ktesting.TContext) *Driver {
|
||||
d := &Driver{
|
||||
f: f,
|
||||
fail: map[MethodInstance]bool{},
|
||||
callCounts: map[MethodInstance]int64{},
|
||||
// By default, test with all gRPC APIs.
|
||||
|
|
@ -310,24 +312,30 @@ func NewDriverInstance(f *framework.Framework) *Driver {
|
|||
WithKubelet: true,
|
||||
ExpectResourceSliceRemoval: true,
|
||||
}
|
||||
d.initName()
|
||||
if tCtx != nil {
|
||||
d.initName(tCtx)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// ClientV1 returns a wrapper for client-go which provides the V1 API on top of whatever is enabled in the cluster.
|
||||
func (d *Driver) ClientV1() cgoresource.ResourceV1Interface {
|
||||
return draclient.New(d.f.ClientSet)
|
||||
func (d *Driver) ClientV1(tCtx ktesting.TContext) cgoresource.ResourceV1Interface {
|
||||
return draclient.New(tCtx.Client())
|
||||
}
|
||||
|
||||
func (d *Driver) Run(nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
|
||||
d.SetUp(nodes, driverResources)
|
||||
ginkgo.DeferCleanup(d.TearDown)
|
||||
func (d *Driver) Run(tCtx ktesting.TContext, kubeletRootDir string, nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
|
||||
d.SetUp(tCtx, kubeletRootDir, nodes, driverResources)
|
||||
tCtx.CleanupCtx(d.TearDown)
|
||||
}
|
||||
|
||||
// NewGetSlices generates a function for gomega.Eventually/Consistently which
|
||||
// NewGetSlices generates a function for ktesting.Eventually/Consistently which
|
||||
// returns the ResourceSliceList.
|
||||
func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] {
|
||||
return framework.ListObjects(d.ClientV1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
|
||||
func (d *Driver) NewGetSlices() func(tCtx ktesting.TContext) *resourceapi.ResourceSliceList {
|
||||
return func(tCtx ktesting.TContext) *resourceapi.ResourceSliceList {
|
||||
slices, err := framework.ListObjects(d.ClientV1(tCtx).ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})(tCtx)
|
||||
tCtx.ExpectNoError(err, "list ResourceSlices")
|
||||
return slices
|
||||
}
|
||||
}
|
||||
|
||||
type MethodInstance struct {
|
||||
|
|
@ -336,9 +344,7 @@ type MethodInstance struct {
|
|||
}
|
||||
|
||||
type Driver struct {
|
||||
f *framework.Framework
|
||||
ctx context.Context
|
||||
cleanup []func(context.Context) // executed first-in-first-out
|
||||
cleanup []func(ktesting.TContext) // executed first-in-first-out
|
||||
wg sync.WaitGroup
|
||||
serviceAccountName string
|
||||
|
||||
|
|
@ -388,39 +394,37 @@ type KubeletPlugin struct {
|
|||
ClientSet kubernetes.Interface
|
||||
}
|
||||
|
||||
func (d *Driver) initName() {
|
||||
d.Name = d.f.UniqueName + d.NameSuffix + ".k8s.io"
|
||||
func (d *Driver) initName(tCtx ktesting.TContext) {
|
||||
d.Name = tCtx.Namespace() + d.NameSuffix + ".k8s.io"
|
||||
}
|
||||
|
||||
func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
|
||||
d.initName()
|
||||
ginkgo.By(fmt.Sprintf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames))
|
||||
func (d *Driver) SetUp(tCtx ktesting.TContext, kubeletRootDir string, nodes *Nodes, driverResources map[string]resourceslice.DriverResources) {
|
||||
tCtx.Logf("deploying driver %s on nodes %v", d.Name, nodes.NodeNames)
|
||||
d.Nodes = make(map[string]KubeletPlugin)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
logger := klog.FromContext(ctx)
|
||||
tCtx = tCtx.WithCancel()
|
||||
logger := klog.FromContext(tCtx)
|
||||
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
|
||||
if d.InstanceSuffix != "" {
|
||||
instance, _ := strings.CutPrefix(d.InstanceSuffix, "-")
|
||||
logger = klog.LoggerWithValues(logger, "instance", instance)
|
||||
}
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
d.ctx = ctx
|
||||
d.cleanup = append(d.cleanup, func(context.Context) { cancel() })
|
||||
tCtx = tCtx.WithLogger(logger)
|
||||
d.cleanup = append(d.cleanup, func(ktesting.TContext) { tCtx.Cancel("cleaning up test") })
|
||||
|
||||
// After shutdown, check that all ResourceSlices were removed, either by the kubelet
|
||||
// or our own test code. This runs last because it gets registered first.
|
||||
if d.ExpectResourceSliceRemoval {
|
||||
ginkgo.DeferCleanup(d.IsGone)
|
||||
tCtx.CleanupCtx(d.IsGone)
|
||||
}
|
||||
|
||||
driverResource, useMultiHostDriverResources := driverResources[multiHostDriverResources]
|
||||
if useMultiHostDriverResources || !d.WithKubelet {
|
||||
// We have to remove ResourceSlices ourselves.
|
||||
// Otherwise the kubelet does it after unregistering the driver.
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
err := d.f.ClientSet.ResourceV1().ResourceSlices().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
|
||||
framework.ExpectNoError(err, "delete ResourceSlices of the driver")
|
||||
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
|
||||
err := tCtx.Client().ResourceV1().ResourceSlices().DeleteCollection(tCtx, metav1.DeleteOptions{}, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
|
||||
tCtx.ExpectNoError(err, "delete ResourceSlices of the driver")
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -445,8 +449,8 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
Devices: slice.Devices,
|
||||
},
|
||||
}
|
||||
_, err := d.f.ClientSet.ResourceV1().ResourceSlices().Create(ctx, resourceSlice, metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
_, err := tCtx.Client().ResourceV1().ResourceSlices().Create(tCtx, resourceSlice, metav1.CreateOptions{})
|
||||
tCtx.ExpectNoError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -460,9 +464,9 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
// Create service account and corresponding RBAC rules.
|
||||
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account"
|
||||
content := example.PluginPermissions
|
||||
content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name)
|
||||
content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", tCtx.Namespace())
|
||||
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix)
|
||||
d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name)
|
||||
d.createFromYAML(tCtx, []byte(content), tCtx.Namespace())
|
||||
|
||||
// Using a ReplicaSet instead of a DaemonSet has the advantage that we can control
|
||||
// the lifecycle explicitly, in particular run two pods per node long enough to
|
||||
|
|
@ -470,10 +474,10 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
instanceKey := "app.kubernetes.io/instance"
|
||||
rsName := ""
|
||||
numNodes := int32(len(nodes.NodeNames))
|
||||
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
|
||||
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
|
||||
pluginDataDirectoryPath := path.Join(kubeletRootDir, "plugins", d.Name)
|
||||
registrarDirectoryPath := path.Join(kubeletRootDir, "plugins_registry")
|
||||
instanceName := d.Name + d.InstanceSuffix
|
||||
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
|
||||
err := utils.CreateFromManifestsTCtx(tCtx, func(item interface{}) error {
|
||||
switch item := item.(type) {
|
||||
case *appsv1.ReplicaSet:
|
||||
item.Name += d.NameSuffix + d.InstanceSuffix
|
||||
|
|
@ -514,21 +518,21 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
}
|
||||
return nil
|
||||
}, manifests...)
|
||||
framework.ExpectNoError(err, "deploy kubelet plugin replicaset")
|
||||
tCtx.ExpectNoError(err, "deploy kubelet plugin replicaset")
|
||||
|
||||
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err, "get replicaset")
|
||||
rs, err := tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace()).Get(tCtx, rsName, metav1.GetOptions{})
|
||||
tCtx.ExpectNoError(err, "get replicaset")
|
||||
|
||||
// Wait for all pods to be running.
|
||||
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil {
|
||||
framework.ExpectNoError(err, "all kubelet plugin proxies running")
|
||||
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(tCtx, tCtx.Client(), rs, numNodes); err != nil {
|
||||
tCtx.ExpectNoError(err, "all kubelet plugin proxies running")
|
||||
}
|
||||
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName})
|
||||
framework.ExpectNoError(err, "create label selector requirement")
|
||||
tCtx.ExpectNoError(err, "create label selector requirement")
|
||||
selector := labels.NewSelector().Add(*requirement)
|
||||
pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
|
||||
framework.ExpectNoError(err, "list proxy pods")
|
||||
gomega.Expect(numNodes).To(gomega.Equal(int32(len(pods.Items))), "number of proxy pods")
|
||||
pods, err := tCtx.Client().CoreV1().Pods(tCtx.Namespace()).List(tCtx, metav1.ListOptions{LabelSelector: selector.String()})
|
||||
tCtx.ExpectNoError(err, "list proxy pods")
|
||||
tCtx.Expect(numNodes).To(gomega.Equal(int32(len(pods.Items))), "number of proxy pods")
|
||||
sort.Slice(pods.Items, func(i, j int) bool {
|
||||
return pods.Items[i].Spec.NodeName < pods.Items[j].Spec.NodeName
|
||||
})
|
||||
|
|
@ -550,13 +554,13 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
// https://github.com/kubernetes/kubernetes/pull/124711).
|
||||
//
|
||||
// Here we merely use impersonation, which is faster.
|
||||
driverClient := d.ImpersonateKubeletPlugin(&pod)
|
||||
driverClient := d.ImpersonateKubeletPlugin(tCtx, &pod)
|
||||
|
||||
logger := klog.LoggerWithValues(klog.LoggerWithName(logger, "kubelet-plugin"), "node", pod.Spec.NodeName, "pod", klog.KObj(&pod))
|
||||
loggerCtx := klog.NewContext(ctx, logger)
|
||||
loggerCtx := klog.NewContext(tCtx, logger)
|
||||
fileOps := app.FileOperations{
|
||||
Create: func(name string, content []byte) error {
|
||||
klog.Background().Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
|
||||
logger.Info("creating CDI file", "node", nodename, "filename", name, "content", string(content))
|
||||
if d.IsLocal {
|
||||
// Name starts with /cdi, which is how it is mapped in the container.
|
||||
// Here we need it under /var/run.
|
||||
|
|
@ -570,19 +574,21 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
}
|
||||
return nil
|
||||
}
|
||||
return d.createFile(&pod, name, content)
|
||||
return d.createFile(tCtx, &pod, name, content)
|
||||
},
|
||||
Remove: func(name string) error {
|
||||
klog.Background().Info("deleting CDI file", "node", nodename, "filename", name)
|
||||
logger.Info("deleting CDI file", "node", nodename, "filename", name)
|
||||
if d.IsLocal {
|
||||
name = path.Join("/var/run", name)
|
||||
return os.Remove(name)
|
||||
}
|
||||
return d.removeFile(&pod, name)
|
||||
return d.removeFile(tCtx, &pod, name)
|
||||
},
|
||||
HandleError: func(ctx context.Context, err error, msg string) {
|
||||
// Record a failure, but don't kill the background goroutine.
|
||||
// TODO: add to TContext or do it in Error/Assert/etc?
|
||||
defer ginkgo.GinkgoRecover()
|
||||
|
||||
// During tests when canceling the context it is possible to get all kinds of
|
||||
// follow-up errors for that, like:
|
||||
// processing ResourceSlice objects: retrieve node "127.0.0.1": client rate limiter Wait returned an error: context canceled
|
||||
|
|
@ -592,7 +598,7 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
// treat errors as failures which definitely shouldn't occur:
|
||||
var droppedFields *resourceslice.DroppedFieldsError
|
||||
if errors.As(err, &droppedFields) {
|
||||
framework.Failf("driver %s: %v", d.Name, err)
|
||||
tCtx.Errorf("driver %s: %v", d.Name, err)
|
||||
}
|
||||
},
|
||||
}
|
||||
|
|
@ -628,25 +634,25 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
kubeletplugin.FlockDirectoryPath(nodes.tempDir),
|
||||
|
||||
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
|
||||
kubeletplugin.PluginListener(d.listen(&pod, &listenerPort)),
|
||||
kubeletplugin.PluginListener(d.listen(tCtx, &pod, &listenerPort)),
|
||||
|
||||
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
|
||||
kubeletplugin.RegistrarListener(d.listen(&pod, &listenerPort)),
|
||||
kubeletplugin.RegistrarListener(d.listen(tCtx, &pod, &listenerPort)),
|
||||
)
|
||||
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
|
||||
d.cleanup = append(d.cleanup, func(ctx context.Context) {
|
||||
tCtx.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
|
||||
d.cleanup = append(d.cleanup, func(tCtx ktesting.TContext) {
|
||||
// Depends on cancel being called first.
|
||||
plugin.Stop()
|
||||
|
||||
// Also explicitly stop all pods.
|
||||
ginkgo.By("scaling down driver proxy pods for " + d.Name)
|
||||
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name)
|
||||
tCtx.Log("scaling down driver proxy pods for", d.Name)
|
||||
rs, err := tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace()).Get(tCtx, rsName, metav1.GetOptions{})
|
||||
tCtx.ExpectNoError(err, "get ReplicaSet for driver "+d.Name)
|
||||
rs.Spec.Replicas = ptr.To(int32(0))
|
||||
rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{})
|
||||
framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name)
|
||||
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil {
|
||||
framework.ExpectNoError(err, "all kubelet plugin proxies stopped")
|
||||
rs, err = tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace()).Update(tCtx, rs, metav1.UpdateOptions{})
|
||||
tCtx.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name)
|
||||
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(tCtx, tCtx.Client(), rs, 0); err != nil {
|
||||
tCtx.ExpectNoError(err, "all kubelet plugin proxies stopped")
|
||||
}
|
||||
})
|
||||
d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient}
|
||||
|
|
@ -657,8 +663,8 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
}
|
||||
|
||||
// Wait for registration.
|
||||
ginkgo.By("wait for plugin registration")
|
||||
gomega.Eventually(func() map[string][]app.GRPCCall {
|
||||
tCtx.Log("wait for plugin registration")
|
||||
ktesting.Eventually(tCtx, func(tCtx ktesting.TContext) map[string][]app.GRPCCall {
|
||||
notRegistered := make(map[string][]app.GRPCCall)
|
||||
for nodename, plugin := range d.Nodes {
|
||||
calls := plugin.GetGRPCCalls()
|
||||
|
|
@ -670,8 +676,8 @@ func (d *Driver) SetUp(nodes *Nodes, driverResources map[string]resourceslice.Dr
|
|||
}).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "hosts where the plugin has not been registered yet")
|
||||
}
|
||||
|
||||
func (d *Driver) ImpersonateKubeletPlugin(pod *v1.Pod) kubernetes.Interface {
|
||||
ginkgo.GinkgoHelper()
|
||||
func (d *Driver) ImpersonateKubeletPlugin(tCtx ktesting.TContext, pod *v1.Pod) kubernetes.Interface {
|
||||
tCtx.Helper()
|
||||
driverUserInfo := (&serviceaccount.ServiceAccountInfo{
|
||||
Name: d.serviceAccountName,
|
||||
Namespace: pod.Namespace,
|
||||
|
|
@ -679,36 +685,36 @@ func (d *Driver) ImpersonateKubeletPlugin(pod *v1.Pod) kubernetes.Interface {
|
|||
PodName: pod.Name,
|
||||
PodUID: string(pod.UID),
|
||||
}).UserInfo()
|
||||
driverClientConfig := d.f.ClientConfig()
|
||||
driverClientConfig := tCtx.RESTConfig()
|
||||
driverClientConfig.Impersonate = rest.ImpersonationConfig{
|
||||
UserName: driverUserInfo.GetName(),
|
||||
Groups: driverUserInfo.GetGroups(),
|
||||
Extra: driverUserInfo.GetExtra(),
|
||||
}
|
||||
driverClient, err := kubernetes.NewForConfig(driverClientConfig)
|
||||
framework.ExpectNoError(err, "create client for driver")
|
||||
tCtx.ExpectNoError(err, "create client for driver")
|
||||
return driverClient
|
||||
}
|
||||
|
||||
func (d *Driver) createFile(pod *v1.Pod, name string, content []byte) error {
|
||||
func (d *Driver) createFile(tCtx ktesting.TContext, pod *v1.Pod, name string, content []byte) error {
|
||||
buffer := bytes.NewBuffer(content)
|
||||
// Writing the content can be slow. Better create a temporary file and
|
||||
// move it to the final destination once it is complete.
|
||||
tmpName := name + ".tmp"
|
||||
if err := d.podIO(pod).CreateFile(tmpName, buffer); err != nil {
|
||||
_ = d.podIO(pod).RemoveAll(tmpName)
|
||||
if err := d.podIO(tCtx, pod).CreateFile(tmpName, buffer); err != nil {
|
||||
_ = d.podIO(tCtx, pod).RemoveAll(tmpName)
|
||||
return err
|
||||
}
|
||||
return d.podIO(pod).Rename(tmpName, name)
|
||||
return d.podIO(tCtx, pod).Rename(tmpName, name)
|
||||
}
|
||||
|
||||
func (d *Driver) removeFile(pod *v1.Pod, name string) error {
|
||||
return d.podIO(pod).RemoveAll(name)
|
||||
func (d *Driver) removeFile(tCtx ktesting.TContext, pod *v1.Pod, name string) error {
|
||||
return d.podIO(tCtx, pod).RemoveAll(name)
|
||||
}
|
||||
|
||||
func (d *Driver) createFromYAML(ctx context.Context, content []byte, namespace string) {
|
||||
func (d *Driver) createFromYAML(tCtx ktesting.TContext, content []byte, namespace string) {
|
||||
// Not caching the discovery result isn't very efficient, but good enough.
|
||||
discoveryCache := memory.NewMemCacheClient(d.f.ClientSet.Discovery())
|
||||
discoveryCache := memory.NewMemCacheClient(tCtx.Client().Discovery())
|
||||
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryCache)
|
||||
|
||||
for _, content := range bytes.Split(content, []byte("---\n")) {
|
||||
|
|
@ -717,16 +723,16 @@ func (d *Driver) createFromYAML(ctx context.Context, content []byte, namespace s
|
|||
}
|
||||
|
||||
var obj *unstructured.Unstructured
|
||||
framework.ExpectNoError(yaml.UnmarshalStrict(content, &obj), fmt.Sprintf("Full YAML:\n%s\n", string(content)))
|
||||
tCtx.ExpectNoError(yaml.UnmarshalStrict(content, &obj), fmt.Sprintf("Full YAML:\n%s\n", string(content)))
|
||||
|
||||
gv, err := schema.ParseGroupVersion(obj.GetAPIVersion())
|
||||
framework.ExpectNoError(err, fmt.Sprintf("extract group+version from object %q", klog.KObj(obj)))
|
||||
tCtx.ExpectNoError(err, fmt.Sprintf("extract group+version from object %q", klog.KObj(obj)))
|
||||
gk := schema.GroupKind{Group: gv.Group, Kind: obj.GetKind()}
|
||||
|
||||
mapping, err := restMapper.RESTMapping(gk, gv.Version)
|
||||
framework.ExpectNoError(err, fmt.Sprintf("map %q to resource", gk))
|
||||
tCtx.ExpectNoError(err, fmt.Sprintf("map %q to resource", gk))
|
||||
|
||||
resourceClient := d.f.DynamicClient.Resource(mapping.Resource)
|
||||
resourceClient := tCtx.Dynamic().Resource(mapping.Resource)
|
||||
options := metav1.CreateOptions{
|
||||
// If the YAML input is invalid, then we want the
|
||||
// apiserver to tell us via an error. This can
|
||||
|
|
@ -736,31 +742,31 @@ func (d *Driver) createFromYAML(ctx context.Context, content []byte, namespace s
|
|||
}
|
||||
switch mapping.Scope.Name() {
|
||||
case meta.RESTScopeNameRoot:
|
||||
_, err = resourceClient.Create(ctx, obj, options)
|
||||
_, err = resourceClient.Create(tCtx, obj, options)
|
||||
case meta.RESTScopeNameNamespace:
|
||||
if namespace == "" {
|
||||
framework.Failf("need namespace for object type %s", gk)
|
||||
tCtx.Fatalf("need namespace for object type %s", gk)
|
||||
}
|
||||
_, err = resourceClient.Namespace(namespace).Create(ctx, obj, options)
|
||||
_, err = resourceClient.Namespace(namespace).Create(tCtx, obj, options)
|
||||
}
|
||||
framework.ExpectNoError(err, "create object")
|
||||
ginkgo.DeferCleanup(func(ctx context.Context) {
|
||||
tCtx.ExpectNoError(err, "create object")
|
||||
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
|
||||
del := resourceClient.Delete
|
||||
if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
|
||||
del = resourceClient.Namespace(namespace).Delete
|
||||
}
|
||||
err := del(ctx, obj.GetName(), metav1.DeleteOptions{})
|
||||
err := del(tCtx, obj.GetName(), metav1.DeleteOptions{})
|
||||
if !apierrors.IsNotFound(err) {
|
||||
framework.ExpectNoError(err, fmt.Sprintf("deleting %s.%s %s", obj.GetKind(), obj.GetAPIVersion(), klog.KObj(obj)))
|
||||
tCtx.ExpectNoError(err, fmt.Sprintf("deleting %s.%s %s", obj.GetKind(), obj.GetAPIVersion(), klog.KObj(obj)))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Driver) podIO(pod *v1.Pod) proxy.PodDirIO {
|
||||
logger := klog.Background()
|
||||
func (d *Driver) podIO(tCtx ktesting.TContext, pod *v1.Pod) proxy.PodDirIO {
|
||||
logger := tCtx.Logger()
|
||||
return proxy.PodDirIO{
|
||||
F: d.f,
|
||||
TCtx: tCtx,
|
||||
Namespace: pod.Namespace,
|
||||
PodName: pod.Name,
|
||||
ContainerName: pod.Spec.Containers[0].Name,
|
||||
|
|
@ -775,7 +781,7 @@ var errListenerDone = errors.New("listener is shutting down")
|
|||
// listen returns the function which the kubeletplugin helper needs to open a listening socket.
|
||||
// For that it spins up hostpathplugin in the pod for the desired node
|
||||
// and connects to hostpathplugin via port forwarding.
|
||||
func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endpoint string) (net.Listener, error) {
|
||||
func (d *Driver) listen(tCtx ktesting.TContext, pod *v1.Pod, port *int32) func(ctx context.Context, endpoint string) (net.Listener, error) {
|
||||
return func(ctx context.Context, endpoint string) (l net.Listener, e error) {
|
||||
// No need create sockets, the kubelet is not expected to use them.
|
||||
if !d.WithKubelet {
|
||||
|
|
@ -801,9 +807,9 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
|
|||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
// Start hostpathplugin in proxy mode and keep it running until the listener gets closed.
|
||||
req := d.f.ClientSet.CoreV1().RESTClient().Post().
|
||||
req := tCtx.Client().CoreV1().RESTClient().Post().
|
||||
Resource("pods").
|
||||
Namespace(d.f.Namespace.Name).
|
||||
Namespace(tCtx.Namespace()).
|
||||
Name(pod.Name).
|
||||
SubResource("exec").
|
||||
VersionedParams(&v1.PodExecOptions{
|
||||
|
|
@ -838,7 +844,7 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
|
|||
runHostpathPlugin := func(ctx context.Context) (bool, error) {
|
||||
// errors.Is(err, listenerDoneErr) would be nicer, but we don't get
|
||||
// that error from remotecommand. Instead forgo logging when we already shut down.
|
||||
if err := execute(ctx, req.URL(), d.f.ClientConfig(), 5); err != nil && ctx.Err() == nil {
|
||||
if err := execute(ctx, req.URL(), tCtx.RESTConfig(), 5); err != nil && ctx.Err() == nil {
|
||||
klog.FromContext(ctx).V(5).Info("execution failed, will retry", "err", err)
|
||||
}
|
||||
// There is no reason to stop except for context cancellation =>
|
||||
|
|
@ -848,9 +854,9 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
|
|||
_ = delayFn.Until(cmdCtx, true /* immediate */, true /* sliding */, runHostpathPlugin)
|
||||
|
||||
// Killing hostpathplugin does not remove the socket. Need to do that manually.
|
||||
req := d.f.ClientSet.CoreV1().RESTClient().Post().
|
||||
req := tCtx.Client().CoreV1().RESTClient().Post().
|
||||
Resource("pods").
|
||||
Namespace(d.f.Namespace.Name).
|
||||
Namespace(tCtx.Namespace()).
|
||||
Name(pod.Name).
|
||||
SubResource("exec").
|
||||
VersionedParams(&v1.PodExecOptions{
|
||||
|
|
@ -865,7 +871,7 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
|
|||
}, scheme.ParameterCodec)
|
||||
cleanupLogger := klog.LoggerWithName(logger, "cleanup")
|
||||
cleanupCtx := klog.NewContext(ctx, cleanupLogger)
|
||||
if err := execute(cleanupCtx, req.URL(), d.f.ClientConfig(), 0); err != nil {
|
||||
if err := execute(cleanupCtx, req.URL(), tCtx.RESTConfig(), 0); err != nil {
|
||||
cleanupLogger.Error(err, "Socket removal failed")
|
||||
}
|
||||
}()
|
||||
|
|
@ -881,12 +887,12 @@ func (d *Driver) listen(pod *v1.Pod, port *int32) func(ctx context.Context, endp
|
|||
}
|
||||
|
||||
addr := proxy.Addr{
|
||||
Namespace: d.f.Namespace.Name,
|
||||
Namespace: tCtx.Namespace(),
|
||||
PodName: pod.Name,
|
||||
ContainerName: pod.Spec.Containers[0].Name,
|
||||
Port: int(port),
|
||||
}
|
||||
listener, err := proxy.Listen(ctx, d.f.ClientSet, d.f.ClientConfig(), addr)
|
||||
listener, err := proxy.Listen(ctx, tCtx.Client(), tCtx.RESTConfig(), addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listen for connections from %+v: %w", addr, err)
|
||||
}
|
||||
|
|
@ -973,9 +979,9 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
|
|||
return writer
|
||||
}
|
||||
|
||||
func (d *Driver) TearDown(ctx context.Context) {
|
||||
func (d *Driver) TearDown(tCtx ktesting.TContext) {
|
||||
for _, c := range d.cleanup {
|
||||
c(ctx)
|
||||
c(tCtx)
|
||||
}
|
||||
d.cleanup = nil
|
||||
d.wg.Wait()
|
||||
|
|
@ -987,9 +993,9 @@ func (d *Driver) TearDown(ctx context.Context) {
|
|||
// because of the delay in the kubelet.
|
||||
//
|
||||
// Only use this in tests where kubelet support for DRA is guaranteed.
|
||||
func (d *Driver) IsGone(ctx context.Context) {
|
||||
ginkgo.By(fmt.Sprintf("Waiting for ResourceSlices of driver %s to be removed...", d.Name))
|
||||
gomega.Eventually(ctx, d.NewGetSlices()).WithTimeout(2 * time.Minute).Should(gomega.HaveField("Items", gomega.BeEmpty()))
|
||||
func (d *Driver) IsGone(tCtx ktesting.TContext) {
|
||||
tCtx.Logf("Waiting for ResourceSlices of driver %s to be removed...", d.Name)
|
||||
ktesting.Eventually(tCtx, d.NewGetSlices()).WithTimeout(2 * time.Minute).Should(gomega.HaveField("Items", gomega.BeEmpty()))
|
||||
}
|
||||
|
||||
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
|
|
|
|||
|
|
@ -29,10 +29,10 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/remotecommand"
|
||||
clientexec "k8s.io/client-go/util/exec"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
|
||||
"github.com/onsi/gomega"
|
||||
)
|
||||
|
|
@ -59,14 +59,18 @@ func ExecWithOptions(f *framework.Framework, options ExecOptions) (string, strin
|
|||
}
|
||||
|
||||
func ExecWithOptionsContext(ctx context.Context, f *framework.Framework, options ExecOptions) (string, string, error) {
|
||||
return ExecWithOptionsTCtx(f.TContext(ctx), options)
|
||||
}
|
||||
|
||||
func ExecWithOptionsTCtx(tCtx ktesting.TContext, options ExecOptions) (string, string, error) {
|
||||
if !options.Quiet {
|
||||
framework.Logf("ExecWithOptions %+v", options)
|
||||
tCtx.Logf("ExecWithOptions %+v", options)
|
||||
}
|
||||
|
||||
const tty = false
|
||||
|
||||
framework.Logf("ExecWithOptions: Clientset creation")
|
||||
req := f.ClientSet.CoreV1().RESTClient().Post().
|
||||
tCtx.Logf("ExecWithOptions: Clientset creation")
|
||||
req := tCtx.Client().CoreV1().RESTClient().Post().
|
||||
Resource("pods").
|
||||
Name(options.PodName).
|
||||
Namespace(options.Namespace).
|
||||
|
|
@ -81,8 +85,8 @@ func ExecWithOptionsContext(ctx context.Context, f *framework.Framework, options
|
|||
}, scheme.ParameterCodec)
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
framework.Logf("ExecWithOptions: execute(%s)", req.URL())
|
||||
err := execute(ctx, req.URL(), f.ClientConfig(), options.Stdin, &stdout, &stderr, tty)
|
||||
tCtx.Logf("ExecWithOptions: execute(%s)", req.URL())
|
||||
err := execute(tCtx, req.URL(), options.Stdin, &stdout, &stderr, tty)
|
||||
|
||||
if options.PreserveWhitespace {
|
||||
return stdout.String(), stderr.String(), err
|
||||
|
|
@ -182,7 +186,8 @@ func VerifyExecInPodFail(ctx context.Context, f *framework.Framework, pod *v1.Po
|
|||
return fmt.Errorf("%q should fail with exit code %d, but exit without error", shExec, exitCode)
|
||||
}
|
||||
|
||||
func execute(ctx context.Context, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
func execute(tCtx ktesting.TContext, url *url.URL, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
|
||||
config := tCtx.RESTConfig()
|
||||
// WebSocketExecutor executor is default
|
||||
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
|
||||
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
|
||||
|
|
@ -205,7 +210,7 @@ func execute(ctx context.Context, url *url.URL, config *restclient.Config, stdin
|
|||
return err
|
||||
}
|
||||
|
||||
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
|
||||
return exec.StreamWithContext(tCtx, remotecommand.StreamOptions{
|
||||
Stdin: stdin,
|
||||
Stdout: stdout,
|
||||
Stderr: stderr,
|
||||
|
|
|
|||
|
|
@ -657,7 +657,7 @@ func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework)
|
|||
VolumeMountGroupRequired: m.enableVolumeMountGroup,
|
||||
EnableTopology: m.enableTopology,
|
||||
IO: proxy.PodDirIO{
|
||||
F: f,
|
||||
TCtx: f.TContext(ctx),
|
||||
Namespace: m.driverNamespace.Name,
|
||||
PodName: podname,
|
||||
ContainerName: "busybox",
|
||||
|
|
|
|||
|
|
@ -22,13 +22,13 @@ import (
|
|||
"io"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
||||
"k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
type PodDirIO struct {
|
||||
F *framework.Framework
|
||||
TCtx ktesting.TContext
|
||||
Namespace string
|
||||
PodName string
|
||||
ContainerName string
|
||||
|
|
@ -105,7 +105,7 @@ func (p PodDirIO) RemoveAll(path string) error {
|
|||
}
|
||||
|
||||
func (p PodDirIO) execute(command []string, stdin io.Reader) (string, string, error) {
|
||||
stdout, stderr, err := e2epod.ExecWithOptions(p.F, e2epod.ExecOptions{
|
||||
stdout, stderr, err := e2epod.ExecWithOptionsTCtx(p.TCtx, e2epod.ExecOptions{
|
||||
Command: command,
|
||||
Namespace: p.Namespace,
|
||||
PodName: p.PodName,
|
||||
|
|
|
|||
|
|
@ -23,13 +23,12 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
|
@ -39,6 +38,7 @@ import (
|
|||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
// LoadFromManifests loads .yaml or .json manifest files and returns
|
||||
|
|
@ -121,31 +121,31 @@ func visitManifests(cb func([]byte) error, files ...string) error {
|
|||
// - only some common items are supported, unknown ones trigger an error
|
||||
// - only the latest stable API version for each item is supported
|
||||
func PatchItems(f *framework.Framework, driverNamespace *v1.Namespace, items ...interface{}) error {
|
||||
tCtx := f.TContext(context.Background())
|
||||
if driverNamespace != nil {
|
||||
tCtx = tCtx.WithNamespace(driverNamespace.Name)
|
||||
}
|
||||
return PatchItemsTCtx(tCtx, items...)
|
||||
}
|
||||
|
||||
// PatchItemsTCtx is a variant of PatchItems where all parameters, including
|
||||
// the namespace, are passed through a TContext.
|
||||
func PatchItemsTCtx(tCtx ktesting.TContext, items ...interface{}) error {
|
||||
for _, item := range items {
|
||||
// Uncomment when debugging the loading and patching of items.
|
||||
// Logf("patching original content of %T:\n%s", item, PrettyPrint(item))
|
||||
if err := patchItemRecursively(f, driverNamespace, item); err != nil {
|
||||
if err := patchItemRecursively(tCtx, item); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateItems creates the items. Each of them must be an API object
|
||||
// createItems creates the items. Each of them must be an API object
|
||||
// of a type that is registered in Factory.
|
||||
//
|
||||
// It returns either a cleanup function or an error, but never both.
|
||||
//
|
||||
// Cleaning up after a test can be triggered in two ways:
|
||||
// - the test invokes the returned cleanup function,
|
||||
// usually in an AfterEach
|
||||
// - the test suite terminates, potentially after
|
||||
// skipping the test's AfterEach (https://github.com/onsi/ginkgo/issues/222)
|
||||
//
|
||||
// PatchItems has the some limitations as LoadFromManifests:
|
||||
// - only some common items are supported, unknown ones trigger an error
|
||||
// - only the latest stable API version for each item is supported
|
||||
func CreateItems(ctx context.Context, f *framework.Framework, ns *v1.Namespace, items ...interface{}) error {
|
||||
// Object get deleted automatically during test cleanup.
|
||||
func createItems(tCtx ktesting.TContext, items ...interface{}) error {
|
||||
var result error
|
||||
for _, item := range items {
|
||||
// Each factory knows which item(s) it supports, so try each one.
|
||||
|
|
@ -153,11 +153,17 @@ func CreateItems(ctx context.Context, f *framework.Framework, ns *v1.Namespace,
|
|||
description := describeItem(item)
|
||||
// Uncomment this line to get a full dump of the entire item.
|
||||
// description = fmt.Sprintf("%s:\n%s", description, PrettyPrint(item))
|
||||
framework.Logf("creating %s", description)
|
||||
tCtx.Logf("creating %s", description)
|
||||
for _, factory := range factories {
|
||||
destructor, err := factory.Create(ctx, f, ns, item)
|
||||
destructor, err := factory.Create(tCtx, item)
|
||||
if destructor != nil {
|
||||
ginkgo.DeferCleanup(framework.IgnoreNotFound(destructor), framework.AnnotatedLocation(fmt.Sprintf("deleting %s", description)))
|
||||
tCtx.CleanupCtx(func(tCtx ktesting.TContext) {
|
||||
err := destructor(tCtx)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return
|
||||
}
|
||||
tCtx.ExpectNoError(err, fmt.Sprintf("deleting %s", description))
|
||||
})
|
||||
}
|
||||
if err == nil {
|
||||
done = true
|
||||
|
|
@ -178,13 +184,25 @@ func CreateItems(ctx context.Context, f *framework.Framework, ns *v1.Namespace,
|
|||
|
||||
// CreateFromManifests is a combination of LoadFromManifests,
|
||||
// PatchItems, patching with an optional custom function,
|
||||
// and CreateItems.
|
||||
// and creating the resulting items.
|
||||
//
|
||||
// Objects get deleted automatically during test cleanup.
|
||||
func CreateFromManifests(ctx context.Context, f *framework.Framework, driverNamespace *v1.Namespace, patch func(item interface{}) error, files ...string) error {
|
||||
tCtx := f.TContext(ctx)
|
||||
if driverNamespace != nil {
|
||||
tCtx = tCtx.WithNamespace(driverNamespace.Name)
|
||||
}
|
||||
return CreateFromManifestsTCtx(tCtx, patch, files...)
|
||||
}
|
||||
|
||||
// CreateFromManifestsTCtx is a variant of CreateFromManifests where all parameters, including
|
||||
// the driver namespace, are passed through a TContext. It is therefore usable from Go unit tests.
|
||||
func CreateFromManifestsTCtx(tCtx ktesting.TContext, patch func(item interface{}) error, files ...string) error {
|
||||
items, err := LoadFromManifests(files...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("CreateFromManifests: %w", err)
|
||||
}
|
||||
if err := PatchItems(f, driverNamespace, items...); err != nil {
|
||||
if err := PatchItemsTCtx(tCtx, items...); err != nil {
|
||||
return err
|
||||
}
|
||||
if patch != nil {
|
||||
|
|
@ -194,7 +212,7 @@ func CreateFromManifests(ctx context.Context, f *framework.Framework, driverName
|
|||
}
|
||||
}
|
||||
}
|
||||
return CreateItems(ctx, f, driverNamespace, items...)
|
||||
return createItems(tCtx, items...)
|
||||
}
|
||||
|
||||
// What is a subset of metav1.TypeMeta which (in contrast to
|
||||
|
|
@ -234,7 +252,7 @@ type ItemFactory interface {
|
|||
// error or a cleanup function for the created item.
|
||||
// If the item is of an unsupported type, it must return
|
||||
// an error that has errorItemNotSupported as cause.
|
||||
Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, item interface{}) (func(ctx context.Context) error, error)
|
||||
Create(tCtx ktesting.TContext, item interface{}) (func(ctx context.Context) error, error)
|
||||
}
|
||||
|
||||
// describeItem always returns a string that describes the item,
|
||||
|
|
@ -271,82 +289,79 @@ var factories = map[What]ItemFactory{
|
|||
{"CustomResourceDefinition"}: &customResourceDefinitionFactory{},
|
||||
}
|
||||
|
||||
// PatchName makes the name of some item unique by appending the
|
||||
// patchName makes the name of some item unique by appending the
|
||||
// generated unique name.
|
||||
func PatchName(f *framework.Framework, item *string) {
|
||||
func patchName(uniqueName string, item *string) {
|
||||
if *item != "" {
|
||||
*item = *item + "-" + f.UniqueName
|
||||
*item = *item + "-" + uniqueName
|
||||
}
|
||||
}
|
||||
|
||||
// PatchNamespace moves the item into the test's namespace. Not
|
||||
// patchNamespace moves the item into the test's namespace. Not
|
||||
// all items can be namespaced. For those, the name also needs to be
|
||||
// patched.
|
||||
func PatchNamespace(f *framework.Framework, driverNamespace *v1.Namespace, item *string) {
|
||||
if driverNamespace != nil {
|
||||
*item = driverNamespace.GetName()
|
||||
return
|
||||
}
|
||||
|
||||
if f.Namespace != nil {
|
||||
*item = f.Namespace.GetName()
|
||||
func patchNamespace(tCtx ktesting.TContext, item *string) {
|
||||
namespace := tCtx.Namespace()
|
||||
if namespace != "" {
|
||||
*item = namespace
|
||||
}
|
||||
}
|
||||
|
||||
func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace, item interface{}) error {
|
||||
func patchItemRecursively(tCtx ktesting.TContext, item interface{}) error {
|
||||
uniqueName := tCtx.Namespace()
|
||||
switch item := item.(type) {
|
||||
case *rbacv1.Subject:
|
||||
PatchNamespace(f, driverNamespace, &item.Namespace)
|
||||
patchNamespace(tCtx, &item.Namespace)
|
||||
case *rbacv1.RoleRef:
|
||||
// TODO: avoid hard-coding this special name. Perhaps add a Framework.PredefinedRoles
|
||||
// which contains all role names that are defined cluster-wide before the test starts?
|
||||
// All those names are exempt from renaming. That list could be populated by querying
|
||||
// and get extended by tests.
|
||||
if item.Name != "e2e-test-privileged-psp" {
|
||||
PatchName(f, &item.Name)
|
||||
patchName(uniqueName, &item.Name)
|
||||
}
|
||||
case *rbacv1.ClusterRole:
|
||||
PatchName(f, &item.Name)
|
||||
patchName(uniqueName, &item.Name)
|
||||
case *rbacv1.Role:
|
||||
PatchNamespace(f, driverNamespace, &item.Namespace)
|
||||
patchNamespace(tCtx, &item.Namespace)
|
||||
// Roles are namespaced, but because for RoleRef above we don't
|
||||
// know whether the referenced role is a ClusterRole or Role
|
||||
// and therefore always renames, we have to do the same here.
|
||||
PatchName(f, &item.Name)
|
||||
patchName(uniqueName, &item.Name)
|
||||
case *storagev1.StorageClass:
|
||||
PatchName(f, &item.Name)
|
||||
patchName(uniqueName, &item.Name)
|
||||
case *storagev1.VolumeAttributesClass:
|
||||
PatchName(f, &item.Name)
|
||||
patchName(uniqueName, &item.Name)
|
||||
case *storagev1.CSIDriver:
|
||||
PatchName(f, &item.Name)
|
||||
patchName(uniqueName, &item.Name)
|
||||
case *v1.ServiceAccount:
|
||||
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
|
||||
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
|
||||
case *v1.Secret:
|
||||
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
|
||||
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
|
||||
case *rbacv1.ClusterRoleBinding:
|
||||
PatchName(f, &item.Name)
|
||||
patchName(uniqueName, &item.Name)
|
||||
for i := range item.Subjects {
|
||||
if err := patchItemRecursively(f, driverNamespace, &item.Subjects[i]); err != nil {
|
||||
return fmt.Errorf("%T: %w", f, err)
|
||||
if err := patchItemRecursively(tCtx, &item.Subjects[i]); err != nil {
|
||||
return fmt.Errorf("%T: %w", &item.Subjects[i], err)
|
||||
}
|
||||
}
|
||||
if err := patchItemRecursively(f, driverNamespace, &item.RoleRef); err != nil {
|
||||
return fmt.Errorf("%T: %w", f, err)
|
||||
if err := patchItemRecursively(tCtx, &item.RoleRef); err != nil {
|
||||
return fmt.Errorf("%T: %w", &item.RoleRef, err)
|
||||
}
|
||||
case *rbacv1.RoleBinding:
|
||||
PatchNamespace(f, driverNamespace, &item.Namespace)
|
||||
patchNamespace(tCtx, &item.Namespace)
|
||||
for i := range item.Subjects {
|
||||
if err := patchItemRecursively(f, driverNamespace, &item.Subjects[i]); err != nil {
|
||||
return fmt.Errorf("%T: %w", f, err)
|
||||
if err := patchItemRecursively(tCtx, &item.Subjects[i]); err != nil {
|
||||
return fmt.Errorf("%T: %w", &item.Subjects[i], err)
|
||||
}
|
||||
}
|
||||
if err := patchItemRecursively(f, driverNamespace, &item.RoleRef); err != nil {
|
||||
return fmt.Errorf("%T: %w", f, err)
|
||||
if err := patchItemRecursively(tCtx, &item.RoleRef); err != nil {
|
||||
return fmt.Errorf("%T: %w", &item.RoleRef, err)
|
||||
}
|
||||
case *v1.Service:
|
||||
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
|
||||
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
|
||||
case *appsv1.StatefulSet:
|
||||
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
|
||||
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
|
||||
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -354,7 +369,7 @@ func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace,
|
|||
return err
|
||||
}
|
||||
case *appsv1.Deployment:
|
||||
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
|
||||
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
|
||||
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -362,7 +377,7 @@ func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace,
|
|||
return err
|
||||
}
|
||||
case *appsv1.DaemonSet:
|
||||
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
|
||||
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
|
||||
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -370,7 +385,7 @@ func patchItemRecursively(f *framework.Framework, driverNamespace *v1.Namespace,
|
|||
return err
|
||||
}
|
||||
case *appsv1.ReplicaSet:
|
||||
PatchNamespace(f, driverNamespace, &item.ObjectMeta.Namespace)
|
||||
patchNamespace(tCtx, &item.ObjectMeta.Namespace)
|
||||
if err := patchContainerImages(item.Spec.Template.Spec.Containers); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -396,13 +411,13 @@ func (f *serviceAccountFactory) New() runtime.Object {
|
|||
return &v1.ServiceAccount{}
|
||||
}
|
||||
|
||||
func (*serviceAccountFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*serviceAccountFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*v1.ServiceAccount)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
client := f.ClientSet.CoreV1().ServiceAccounts(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().CoreV1().ServiceAccounts(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create ServiceAccount: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -416,15 +431,15 @@ func (f *clusterRoleFactory) New() runtime.Object {
|
|||
return &rbacv1.ClusterRole{}
|
||||
}
|
||||
|
||||
func (*clusterRoleFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*clusterRoleFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*rbacv1.ClusterRole)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
framework.Logf("Define cluster role %v", item.GetName())
|
||||
client := f.ClientSet.RbacV1().ClusterRoles()
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
tCtx.Logf("define cluster role %v", item.GetName())
|
||||
client := tCtx.Client().RbacV1().ClusterRoles()
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create ClusterRole: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -438,14 +453,14 @@ func (f *clusterRoleBindingFactory) New() runtime.Object {
|
|||
return &rbacv1.ClusterRoleBinding{}
|
||||
}
|
||||
|
||||
func (*clusterRoleBindingFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*clusterRoleBindingFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*rbacv1.ClusterRoleBinding)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.RbacV1().ClusterRoleBindings()
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().RbacV1().ClusterRoleBindings()
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create ClusterRoleBinding: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -459,14 +474,14 @@ func (f *roleFactory) New() runtime.Object {
|
|||
return &rbacv1.Role{}
|
||||
}
|
||||
|
||||
func (*roleFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*roleFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*rbacv1.Role)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.RbacV1().Roles(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().RbacV1().Roles(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create Role: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -480,14 +495,14 @@ func (f *roleBindingFactory) New() runtime.Object {
|
|||
return &rbacv1.RoleBinding{}
|
||||
}
|
||||
|
||||
func (*roleBindingFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*roleBindingFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*rbacv1.RoleBinding)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.RbacV1().RoleBindings(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().RbacV1().RoleBindings(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create RoleBinding: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -501,14 +516,14 @@ func (f *serviceFactory) New() runtime.Object {
|
|||
return &v1.Service{}
|
||||
}
|
||||
|
||||
func (*serviceFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*serviceFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*v1.Service)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.CoreV1().Services(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().CoreV1().Services(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create Service: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -522,14 +537,14 @@ func (f *statefulSetFactory) New() runtime.Object {
|
|||
return &appsv1.StatefulSet{}
|
||||
}
|
||||
|
||||
func (*statefulSetFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*statefulSetFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*appsv1.StatefulSet)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.AppsV1().StatefulSets(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().AppsV1().StatefulSets(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create StatefulSet: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -543,14 +558,14 @@ func (f *deploymentFactory) New() runtime.Object {
|
|||
return &appsv1.Deployment{}
|
||||
}
|
||||
|
||||
func (*deploymentFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*deploymentFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*appsv1.Deployment)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.AppsV1().Deployments(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().AppsV1().Deployments(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create Deployment: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -564,14 +579,14 @@ func (f *daemonSetFactory) New() runtime.Object {
|
|||
return &appsv1.DaemonSet{}
|
||||
}
|
||||
|
||||
func (*daemonSetFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*daemonSetFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*appsv1.DaemonSet)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.AppsV1().DaemonSets(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().AppsV1().DaemonSets(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create DaemonSet: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -585,14 +600,14 @@ func (f *replicaSetFactory) New() runtime.Object {
|
|||
return &appsv1.ReplicaSet{}
|
||||
}
|
||||
|
||||
func (*replicaSetFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*replicaSetFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*appsv1.ReplicaSet)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.AppsV1().ReplicaSets(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().AppsV1().ReplicaSets(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create ReplicaSet: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -606,14 +621,14 @@ func (f *storageClassFactory) New() runtime.Object {
|
|||
return &storagev1.StorageClass{}
|
||||
}
|
||||
|
||||
func (*storageClassFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*storageClassFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*storagev1.StorageClass)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.StorageV1().StorageClasses()
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().StorageV1().StorageClasses()
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create StorageClass: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -627,14 +642,14 @@ func (f *volumeAttributesClassFactory) New() runtime.Object {
|
|||
return &storagev1.VolumeAttributesClass{}
|
||||
}
|
||||
|
||||
func (*volumeAttributesClassFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*volumeAttributesClassFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*storagev1.VolumeAttributesClass)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.StorageV1().VolumeAttributesClasses()
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().StorageV1().VolumeAttributesClasses()
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create VolumeAttributesClass: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -648,14 +663,14 @@ func (f *csiDriverFactory) New() runtime.Object {
|
|||
return &storagev1.CSIDriver{}
|
||||
}
|
||||
|
||||
func (*csiDriverFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*csiDriverFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*storagev1.CSIDriver)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.StorageV1().CSIDrivers()
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().StorageV1().CSIDrivers()
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create CSIDriver: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -669,14 +684,14 @@ func (f *secretFactory) New() runtime.Object {
|
|||
return &v1.Secret{}
|
||||
}
|
||||
|
||||
func (*secretFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*secretFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
item, ok := i.(*v1.Secret)
|
||||
if !ok {
|
||||
return nil, errorItemNotSupported
|
||||
}
|
||||
|
||||
client := f.ClientSet.CoreV1().Secrets(ns.Name)
|
||||
if _, err := client.Create(ctx, item, metav1.CreateOptions{}); err != nil {
|
||||
client := tCtx.Client().CoreV1().Secrets(tCtx.Namespace())
|
||||
if _, err := client.Create(tCtx, item, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create Secret: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
|
|
@ -690,7 +705,7 @@ func (f *customResourceDefinitionFactory) New() runtime.Object {
|
|||
return &apiextensionsv1.CustomResourceDefinition{}
|
||||
}
|
||||
|
||||
func (*customResourceDefinitionFactory) Create(ctx context.Context, f *framework.Framework, ns *v1.Namespace, i interface{}) (func(ctx context.Context) error, error) {
|
||||
func (*customResourceDefinitionFactory) Create(tCtx ktesting.TContext, i interface{}) (func(ctx context.Context) error, error) {
|
||||
var err error
|
||||
unstructCRD := &unstructured.Unstructured{}
|
||||
gvr := schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"}
|
||||
|
|
@ -705,11 +720,11 @@ func (*customResourceDefinitionFactory) Create(ctx context.Context, f *framework
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if _, err = f.DynamicClient.Resource(gvr).Create(ctx, unstructCRD, metav1.CreateOptions{}); err != nil {
|
||||
if _, err = tCtx.Dynamic().Resource(gvr).Create(tCtx, unstructCRD, metav1.CreateOptions{}); err != nil {
|
||||
return nil, fmt.Errorf("create CustomResourceDefinition: %w", err)
|
||||
}
|
||||
return func(ctx context.Context) error {
|
||||
return f.DynamicClient.Resource(gvr).Delete(ctx, item.GetName(), metav1.DeleteOptions{})
|
||||
return tCtx.Dynamic().Resource(gvr).Delete(ctx, item.GetName(), metav1.DeleteOptions{})
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ func coreDRA(tCtx ktesting.TContext, f *framework.Framework, b *drautils.Builder
|
|||
claim := b.ExternalClaim()
|
||||
pod := b.PodExternal()
|
||||
b.Create(tCtx, claim, pod)
|
||||
b.TestPod(tCtx, f, pod)
|
||||
b.TestPod(tCtx, pod)
|
||||
|
||||
return func(tCtx ktesting.TContext) step3Func {
|
||||
// Remove pod prepared in step 1.
|
||||
|
|
@ -45,7 +45,7 @@ func coreDRA(tCtx ktesting.TContext, f *framework.Framework, b *drautils.Builder
|
|||
pod = b.PodExternal()
|
||||
pod.Spec.ResourceClaims[0].ResourceClaimName = &claim.Name
|
||||
b.Create(tCtx, claim, pod)
|
||||
b.TestPod(tCtx, f, pod)
|
||||
b.TestPod(tCtx, pod)
|
||||
|
||||
return func(tCtx ktesting.TContext) {
|
||||
// We need to clean up explicitly because the normal
|
||||
|
|
|
|||
|
|
@ -1,89 +0,0 @@
|
|||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2edra
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
ginkgotypes "github.com/onsi/ginkgo/v2/types"
|
||||
|
||||
"k8s.io/kubernetes/test/utils/ktesting"
|
||||
)
|
||||
|
||||
var (
|
||||
TimeNow = time.Now // Can be stubbed out for testing.
|
||||
Pid = os.Getpid() // Can be stubbed out for testing.
|
||||
)
|
||||
|
||||
// TODO: replace with helper code from https://github.com/kubernetes/kubernetes/pull/122481 should that get merged - or vice versa.
|
||||
type ginkgoTB struct {
|
||||
ktesting.TB
|
||||
}
|
||||
|
||||
var _ ktesting.ContextTB = &ginkgoTB{}
|
||||
|
||||
func GinkgoContextTB() ktesting.ContextTB {
|
||||
return &ginkgoTB{
|
||||
TB: ginkgo.GinkgoT(),
|
||||
}
|
||||
}
|
||||
|
||||
// CleanupCtx implements [ktesting.ContextTB.CleanupCtx]. It's identical to
|
||||
// ginkgo.DeferCleanup.
|
||||
func (g *ginkgoTB) CleanupCtx(cb func(context.Context)) {
|
||||
ginkgo.GinkgoHelper()
|
||||
ginkgo.DeferCleanup(cb)
|
||||
}
|
||||
|
||||
// Log overrides the implementation from Ginkgo to ensure consistent output.
|
||||
func (g *ginkgoTB) Log(args ...any) {
|
||||
log(1, fmt.Sprint(args...))
|
||||
}
|
||||
|
||||
// Logf overrides the implementation from Ginkgo to ensure consistent output.
|
||||
func (g *ginkgoTB) Logf(format string, args ...any) {
|
||||
log(1, fmt.Sprintf(format, args...))
|
||||
}
|
||||
|
||||
// log re-implements klog.Info: same header, but stack unwinding
|
||||
// with support for ginkgo.GinkgoWriter and skipping stack levels.
|
||||
func log(offset int, msg string) {
|
||||
now := TimeNow()
|
||||
file, line := unwind(offset + 1)
|
||||
if file == "" {
|
||||
file = "???"
|
||||
line = 1
|
||||
} else if slash := strings.LastIndex(file, "/"); slash >= 0 {
|
||||
file = file[slash+1:]
|
||||
}
|
||||
_, month, day := now.Date()
|
||||
hour, minute, second := now.Clock()
|
||||
header := fmt.Sprintf("I%02d%02d %02d:%02d:%02d.%06d %d %s:%d]",
|
||||
month, day, hour, minute, second, now.Nanosecond()/1000, Pid, file, line)
|
||||
|
||||
_, _ = fmt.Fprintln(ginkgo.GinkgoWriter, header, msg)
|
||||
}
|
||||
|
||||
func unwind(skip int) (string, int) {
|
||||
location := ginkgotypes.NewCodeLocation(skip + 1)
|
||||
return location.FileName, location.LineNumber
|
||||
}
|
||||
|
|
@ -266,15 +266,15 @@ var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
|
|||
tCtx = ktesting.Begin(tCtx, fmt.Sprintf("v%d.%d", major, previousMinor))
|
||||
|
||||
tCtx.ExpectNoError(e2enode.WaitForAllNodesSchedulable(tCtx, tCtx.Client(), f.Timeouts.NodeSchedulable), "wait for all nodes to be schedulable")
|
||||
nodes := drautils.NewNodesNow(tCtx, f, 1, 1)
|
||||
nodes := drautils.NewNodesNow(tCtx, 1, 1)
|
||||
|
||||
// Opening sockets locally avoids intermittent errors and delays caused by proxying through the restarted apiserver.
|
||||
// We could speed up testing by shortening the sync delay in the ResourceSlice controller, but let's better
|
||||
// test the defaults.
|
||||
driver := drautils.NewDriverInstance(f)
|
||||
driver := drautils.NewDriverInstance(tCtx)
|
||||
driver.IsLocal = true
|
||||
driver.Run(nodes, drautils.DriverResourcesNow(nodes, 8))
|
||||
b := drautils.NewBuilderNow(ctx, f, driver)
|
||||
driver.Run(tCtx, nodes, drautils.DriverResourcesNow(nodes, 8))
|
||||
b := drautils.NewBuilderNow(tCtx, driver)
|
||||
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
|
|
@ -305,7 +305,7 @@ var _ = ginkgo.Describe("DRA upgrade/downgrade", func() {
|
|||
// The kubelet wipes all ResourceSlices on a restart because it doesn't know which drivers were running.
|
||||
// Wait for the ResourceSlice controller in the driver to notice and recreate the ResourceSlices.
|
||||
tCtx = ktesting.Begin(tCtx, "wait for ResourceSlices")
|
||||
gomega.Eventually(ctx, driver.NewGetSlices()).WithTimeout(5 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
|
||||
ktesting.Eventually(tCtx, driver.NewGetSlices()).WithTimeout(5 * time.Minute).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
|
||||
tCtx = ktesting.End(tCtx)
|
||||
|
||||
steps3 := make(map[string]step3Func, len(subTests))
|
||||
|
|
|
|||
Loading…
Reference in a new issue