add integration tests for pod replacement during scaling and

Recreate/RollingUpdate strategies
This commit is contained in:
Filip Křepinský 2025-10-29 21:49:03 +01:00
parent 3daf280c46
commit 9b80964efd
2 changed files with 523 additions and 0 deletions

View file

@ -19,9 +19,13 @@ package deployment
import (
"context"
"fmt"
"math"
"reflect"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -1403,3 +1407,425 @@ func TestTerminatingReplicasDeploymentStatus(t *testing.T) {
t.Fatal(err)
}
}
func TestRecreateDeploymentForPodReplacement(t *testing.T) {
tests := []struct {
name string
enableDeploymentReplicaSetTerminatingReplicas bool
expectedReplicasAfterOldRSScaleDown int32
expectedTerminatingReplicasAfterOldRSScaleDown *int32
expectedReplicasAfterNewRS int32
expectedTerminatingReplicasAfterNewRS *int32
expectedReplicasAfterInFlightPodTermination int32
expectedTerminatingReplicasAfterInFlightPodTermination *int32
expectedReplicasAfterInFlightScaleUp int32
expectedTerminatingReplicasAfterInFlightScaleUp *int32
expectedReplicasAfterInFlightScaleDown int32
expectedTerminatingReplicasAfterInFlightScaleDown *int32
expectedReplicasForDeploymentComplete int32
expectedTerminatingReplicasForDeploymentComplete *int32
}{
{
name: "recreate should wait for terminating pods to complete in a new rollout with DeploymentReplicaSetTerminatingReplicas=false",
enableDeploymentReplicaSetTerminatingReplicas: false,
expectedReplicasAfterOldRSScaleDown: 0,
expectedTerminatingReplicasAfterOldRSScaleDown: nil, // terminating counting disabled for all expectedTerminating
expectedReplicasAfterNewRS: 6,
expectedTerminatingReplicasAfterNewRS: nil,
expectedReplicasAfterInFlightPodTermination: 6, // 1 pod terminated
expectedTerminatingReplicasAfterInFlightPodTermination: nil,
expectedReplicasAfterInFlightScaleUp: 7, // +1 scale up
expectedTerminatingReplicasAfterInFlightScaleUp: nil,
expectedReplicasAfterInFlightScaleDown: 5, // -2 scale down
expectedTerminatingReplicasAfterInFlightScaleDown: nil,
expectedReplicasForDeploymentComplete: 5,
expectedTerminatingReplicasForDeploymentComplete: nil,
},
{
name: "recreate should wait for terminating pods to complete in a new rollout with DeploymentReplicaSetTerminatingReplicas=true",
enableDeploymentReplicaSetTerminatingReplicas: true,
expectedReplicasAfterOldRSScaleDown: 0,
expectedTerminatingReplicasAfterOldRSScaleDown: ptr.To[int32](6),
expectedReplicasAfterNewRS: 6,
expectedTerminatingReplicasAfterNewRS: ptr.To[int32](0),
expectedReplicasAfterInFlightPodTermination: 6, // 1 pod terminated
expectedTerminatingReplicasAfterInFlightPodTermination: ptr.To[int32](1),
expectedReplicasAfterInFlightScaleUp: 7, // +1 scale up
expectedTerminatingReplicasAfterInFlightScaleUp: ptr.To[int32](1),
expectedReplicasAfterInFlightScaleDown: 5, // -2 scale down
expectedTerminatingReplicasAfterInFlightScaleDown: ptr.To[int32](3),
expectedReplicasForDeploymentComplete: 5,
expectedTerminatingReplicasForDeploymentComplete: ptr.To[int32](3),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentReplicaSetTerminatingReplicas, test.enableDeploymentReplicaSetTerminatingReplicas)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
closeFn, rm, dc, informers, c := dcSetup(ctx, t)
defer closeFn()
name := "test-recreate-deployment-pod-replacement"
ns := framework.CreateNamespaceOrDie(c, name, t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
// Start informer and controllers
stopControllers := runControllersAndInformers(t, rm, dc, informers)
defer stopControllers()
deploymentName := "deployment"
replicas := int32(6)
tester := &deploymentTester{t: t, c: c, deployment: newDeployment(deploymentName, ns.Name, replicas)}
tester.deployment.Spec.Strategy.Type = apps.RecreateDeploymentStrategyType
tester.deployment.Spec.Strategy.RollingUpdate = nil
tester.deployment.Spec.Template.Spec.NodeName = "fake-node"
tester.deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300))
var err error
tester.deployment, err = c.AppsV1().Deployments(ns.Name).Create(context.TODO(), tester.deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create deployment %q: %v", deploymentName, err)
}
// Ensure the deployment completes while marking its pods as ready simultaneously
if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil {
t.Fatal(err)
}
// Record current replicaset before starting new rollout
firstRS, err := tester.expectNewReplicaSet()
if err != nil {
t.Fatal(err)
}
// trigger a new rollout
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Template.Spec.Containers[0].Env = append(update.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "deploy2", Value: "true"})
})
if err != nil {
t.Fatalf("failed updating deployment %q: %v", deploymentName, err)
}
// Wait for old replicaset of 1st rollout to have 0 replicas first
firstRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), firstRS.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get replicaset %q: %v", firstRS.Name, err)
}
firstRS.Spec.Replicas = ptr.To[int32](0)
if err = tester.waitRSStable(firstRS); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts after scale down phase
expectedReplicas := test.expectedReplicasAfterOldRSScaleDown
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterOldRSScaleDown); err != nil {
t.Fatal(err)
}
// Verify that the new rollout won't create new replica set, until the old pods terminate
if err := tester.expectNoNewReplicaSet(); err != nil {
t.Fatal(err)
}
// remove terminating pods and skip graceful termination of the old RS
if err := tester.removeRSPods(ctx, firstRS, math.MaxInt, true, 0); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts after new RS creation
expectedReplicas = test.expectedReplicasAfterNewRS
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterNewRS); err != nil {
t.Fatal(err)
}
// Verify that the new rollout created new replica set
secondRS, err := tester.expectNewReplicaSet()
if err != nil {
t.Fatal(err)
}
// start terminating 1 pod
err = tester.removeRSPods(ctx, secondRS, 1, false, 300)
if err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts after surprise pod termination
expectedReplicas = test.expectedReplicasAfterInFlightPodTermination
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterInFlightPodTermination); err != nil {
t.Fatal(err)
}
// Scale up during the deployment rollout
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Replicas = ptr.To[int32](7)
})
if err != nil {
t.Fatalf("failed to update deployment %q: %v", deploymentName, err)
}
// Verify all replicas fields of DeploymentStatus have desired counts after in flight scale up
expectedReplicas = test.expectedReplicasAfterInFlightScaleUp
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterInFlightScaleUp); err != nil {
t.Fatal(err)
}
// Scale down during the deployment rollout
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Replicas = ptr.To[int32](5)
})
if err != nil {
t.Fatalf("failed to update/scale deployment %q: %v", deploymentName, err)
}
// Verify all replicas fields of DeploymentStatus have desired counts after in flight scale down
expectedReplicas = test.expectedReplicasAfterInFlightScaleDown
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasAfterInFlightScaleDown); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts before the deployment is completed
expectedReplicas = test.expectedReplicasForDeploymentComplete
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, 0, 0, expectedReplicas, test.expectedTerminatingReplicasForDeploymentComplete); err != nil {
t.Fatal(err)
}
// Ensure the new deployment completes while marking its pods as ready simultaneously
if err = tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts after the deployment is completed
expectedReplicas = test.expectedReplicasForDeploymentComplete
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, expectedReplicas, expectedReplicas, 0, test.expectedTerminatingReplicasForDeploymentComplete); err != nil {
t.Fatal(err)
}
// remove terminating pods (if there are any) and skip graceful termination of the old RS
if err := tester.removeRSPods(ctx, firstRS, math.MaxInt, true, 0); err != nil {
t.Fatal(err)
}
// remove terminating pods and skip graceful termination of the new RS
if err := tester.removeRSPods(ctx, secondRS, math.MaxInt, true, 0); err != nil {
t.Fatal(err)
}
// Verify all replicas fields of DeploymentStatus have desired counts after the deployment is completed and old pods have terminated
expectedReplicas = test.expectedReplicasForDeploymentComplete
var expectedFinalTerminatingReplicas *int32
if test.enableDeploymentReplicaSetTerminatingReplicas {
expectedFinalTerminatingReplicas = ptr.To[int32](0)
}
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicas, expectedReplicas, expectedReplicas, expectedReplicas, 0, expectedFinalTerminatingReplicas); err != nil {
t.Fatal(err)
}
})
}
}
func TestRollingUpdateAndProportionalScalingForDeploymentPodReplacement(t *testing.T) {
tests := []struct {
name string
enableDeploymentReplicaSetTerminatingReplicas bool
terminatingReplicasFirstRS int32
terminatingReplicasSecondRS int32
expectedFirstRSReplicasDuringNewRollout int32
expectedSecondRSReplicasDuringNewRollout int32
expectedTerminatingReplicasDuringNewRollout *int32
expectedFirstRSReplicasAfterInFlightScaleUp int32
expectedSecondRSReplicasAfterInFlightScaleUp int32
expectedTerminatingReplicasDuringInFlightScaleUp *int32
expectedFirstRSAnnotationsAfterInFlightScaleUp map[string]string
expectedSecondRSAnnotationsAfterInFlightScaleUp map[string]string
}{
// starts with 100 replicas + 20 maxSurge
{
name: "rolling update should not wait for terminating pods with DeploymentReplicaSetTerminatingReplicas=false",
enableDeploymentReplicaSetTerminatingReplicas: false,
expectedFirstRSReplicasDuringNewRollout: 100,
expectedSecondRSReplicasDuringNewRollout: 20,
expectedTerminatingReplicasDuringNewRollout: nil,
expectedFirstRSReplicasAfterInFlightScaleUp: 117,
expectedSecondRSReplicasAfterInFlightScaleUp: 23,
expectedTerminatingReplicasDuringInFlightScaleUp: nil,
expectedFirstRSAnnotationsAfterInFlightScaleUp: map[string]string{
deploymentutil.DesiredReplicasAnnotation: "120",
deploymentutil.MaxReplicasAnnotation: "140",
deploymentutil.RevisionAnnotation: "1",
},
expectedSecondRSAnnotationsAfterInFlightScaleUp: map[string]string{
deploymentutil.DesiredReplicasAnnotation: "120",
deploymentutil.MaxReplicasAnnotation: "140",
deploymentutil.RevisionAnnotation: "2",
},
},
{
name: "rolling update and scaling should not wait for terminating pods with DeploymentReplicaSetTerminatingReplicas=true",
enableDeploymentReplicaSetTerminatingReplicas: true,
terminatingReplicasFirstRS: 15,
terminatingReplicasSecondRS: 1,
expectedFirstRSReplicasDuringNewRollout: 100,
expectedSecondRSReplicasDuringNewRollout: 20,
expectedTerminatingReplicasDuringNewRollout: ptr.To[int32](15),
expectedFirstRSReplicasAfterInFlightScaleUp: 117,
expectedSecondRSReplicasAfterInFlightScaleUp: 23,
expectedTerminatingReplicasDuringInFlightScaleUp: ptr.To[int32](16),
expectedFirstRSAnnotationsAfterInFlightScaleUp: map[string]string{
deploymentutil.DesiredReplicasAnnotation: "120",
deploymentutil.MaxReplicasAnnotation: "140",
deploymentutil.RevisionAnnotation: "1",
},
expectedSecondRSAnnotationsAfterInFlightScaleUp: map[string]string{
deploymentutil.DesiredReplicasAnnotation: "120",
deploymentutil.MaxReplicasAnnotation: "140",
deploymentutil.RevisionAnnotation: "2",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DeploymentReplicaSetTerminatingReplicas, test.enableDeploymentReplicaSetTerminatingReplicas)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
closeFn, rm, dc, informers, c := dcSetup(ctx, t)
defer closeFn()
name := "test-proportional-scaling"
ns := framework.CreateNamespaceOrDie(c, name, t)
defer framework.DeleteNamespaceOrDie(c, ns, t)
// Start informer and controllers
stopControllers := runControllersAndInformers(t, rm, dc, informers)
defer stopControllers()
deploymentName := "deployment"
replicas := int32(100)
maxSurge := int32(20)
tester := &deploymentTester{t: t, c: c, deployment: newDeployment(deploymentName, ns.Name, replicas)}
tester.deployment.Spec.Strategy.RollingUpdate.MaxSurge = ptr.To(intstr.FromInt32(maxSurge))
tester.deployment.Spec.Strategy.RollingUpdate.MaxUnavailable = ptr.To(intstr.FromInt32(0))
tester.deployment.Spec.Template.Spec.NodeName = "fake-node"
tester.deployment.Spec.Template.Spec.TerminationGracePeriodSeconds = ptr.To(int64(300))
var err error
tester.deployment, err = c.AppsV1().Deployments(ns.Name).Create(context.TODO(), tester.deployment, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create deployment %q: %v", deploymentName, err)
}
// Ensure the deployment completes while marking its pods as ready simultaneously
if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil {
t.Fatal(err)
}
// Record current replicaset before starting new rollout
firstRS, err := tester.expectNewReplicaSet()
if err != nil {
t.Fatal(err)
}
// Terminating some replicas
err = tester.removeRSPods(ctx, firstRS, int(test.terminatingReplicasFirstRS), false, 300)
if err != nil {
t.Fatal(err)
}
// Ensure the deployment completes while marking its pods as ready simultaneously
if err := tester.waitForDeploymentCompleteAndMarkPodsReady(); err != nil {
t.Fatal(err)
}
// Trigger a new rollout
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Template.Spec.Containers[0].Env = append(update.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "deploy2", Value: "true"})
})
if err != nil {
t.Fatalf("failed updating deployment %q: %v", deploymentName, err)
}
expectedReplicasDuringNewRollout := test.expectedFirstRSReplicasDuringNewRollout + test.expectedSecondRSReplicasDuringNewRollout
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedTerminatingReplicasDuringNewRollout); err != nil {
t.Fatal(err)
}
// Verify that the new rollout created new replica set
secondRS, err := tester.expectNewReplicaSet()
if err != nil {
t.Fatal(err)
}
// terminating additional replicas
err = tester.removeRSPods(ctx, secondRS, int(test.terminatingReplicasSecondRS), false, 300)
if err != nil {
t.Fatal(err)
}
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, test.expectedSecondRSReplicasDuringNewRollout, test.expectedTerminatingReplicasDuringInFlightScaleUp); err != nil {
t.Fatal(err)
}
// Scale up during the deployment rollout
tester.deployment, err = tester.updateDeployment(func(update *apps.Deployment) {
update.Spec.Replicas = ptr.To[int32](120)
})
if err != nil {
t.Fatalf("failed to update/scale deployment %q: %v", deploymentName, err)
}
expectedReplicasDuringInFlightScaleUp := test.expectedFirstRSReplicasAfterInFlightScaleUp + test.expectedSecondRSReplicasAfterInFlightScaleUp
expectedSurgeReplicas := expectedReplicasDuringInFlightScaleUp - test.expectedFirstRSReplicasDuringNewRollout
if err = tester.waitForDeploymentStatusReplicasFields(ctx, expectedReplicasDuringInFlightScaleUp, test.expectedSecondRSReplicasAfterInFlightScaleUp, test.expectedFirstRSReplicasDuringNewRollout, test.expectedFirstRSReplicasDuringNewRollout, expectedSurgeReplicas, test.expectedTerminatingReplicasDuringInFlightScaleUp); err != nil {
t.Fatal(err)
}
// Check pod count and annotations for all replica sets
firstRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), firstRS.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get replicaset %q: %v", firstRS.Name, err)
}
if *(firstRS.Spec.Replicas) != test.expectedFirstRSReplicasAfterInFlightScaleUp {
t.Fatalf("unexpected first RS .spec.replicas: expect %d, got %d", test.expectedFirstRSReplicasAfterInFlightScaleUp, *(firstRS.Spec.Replicas))
}
if !reflect.DeepEqual(test.expectedFirstRSAnnotationsAfterInFlightScaleUp, firstRS.Annotations) {
t.Fatalf("unexpected %q replica set annotations: %s", firstRS.Name, cmp.Diff(test.expectedFirstRSAnnotationsAfterInFlightScaleUp, firstRS.Annotations))
}
secondRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), secondRS.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get replicaset %q: %v", secondRS.Name, err)
}
if *(secondRS.Spec.Replicas) != test.expectedSecondRSReplicasAfterInFlightScaleUp {
t.Fatalf("unexpected second RS .spec.replicas: expect %d, got %d", test.expectedSecondRSReplicasAfterInFlightScaleUp, *(secondRS.Spec.Replicas))
}
if !reflect.DeepEqual(test.expectedSecondRSAnnotationsAfterInFlightScaleUp, secondRS.Annotations) {
t.Fatalf("unexpected %q replica set annotations: %s", secondRS.Name, cmp.Diff(test.expectedSecondRSAnnotationsAfterInFlightScaleUp, secondRS.Annotations))
}
// Ensure the deployment completes while marking its pods as ready and removing terminated pods simultaneously
if err := tester.waitForDeploymentCompleteAndMarkPodsReadyAndRemoveTerminated(ctx); err != nil {
t.Fatal(err)
}
// all replica sets' annotations should be up-to-date in the end
rss := []*apps.ReplicaSet{firstRS, secondRS}
for idx, curRS := range rss {
curRS, err = c.AppsV1().ReplicaSets(ns.Name).Get(context.TODO(), curRS.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("failed to get replicaset when checking desired replicas annotation: %v", err)
}
expectedFinalAnnotations := map[string]string{
deploymentutil.DesiredReplicasAnnotation: "120",
deploymentutil.MaxReplicasAnnotation: "140",
deploymentutil.RevisionAnnotation: fmt.Sprintf("%d", idx+1),
}
if !reflect.DeepEqual(expectedFinalAnnotations, curRS.Annotations) {
t.Fatalf("unexpected %q replica set annotations: %s", curRS.Name, cmp.Diff(expectedFinalAnnotations, curRS.Annotations))
}
}
})
}
}

View file

@ -19,6 +19,7 @@ package deployment
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"
@ -220,6 +221,37 @@ func (d *deploymentTester) markUpdatedPodsReady(wg *sync.WaitGroup) {
}
}
// removeTerminatedPods manually removes terminated Deployment pods,
// until the deployment is complete
func (d *deploymentTester) removeTerminatedPods(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, func(ctx context.Context) (bool, error) {
// We're done when the deployment is complete
if completed, err := d.deploymentComplete(); err != nil {
return false, err
} else if completed {
return true, nil
}
// Otherwise, mark remaining pods as ready
rsList, err := deploymentutil.ListReplicaSets(d.deployment, deploymentutil.RsListFromClient(d.c.AppsV1()))
if err != nil {
d.t.Log(err)
return false, nil
}
for _, rs := range rsList {
if err := d.removeRSPods(ctx, rs, math.MaxInt, true, 0); err != nil {
d.t.Log(err)
return false, nil
}
}
return false, nil
})
if err != nil {
d.t.Errorf("failed to remove terminated Deployment pods: %v", err)
}
}
func (d *deploymentTester) deploymentComplete() (bool, error) {
latest, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{})
if err != nil {
@ -283,6 +315,30 @@ func (d *deploymentTester) waitForDeploymentCompleteAndMarkPodsReady() error {
return nil
}
// waitForDeploymentCompleteAndMarkPodsReadyAndRemoveTerminating waits for the Deployment to complete
// while marking updated Deployment pods as ready and removes terminated pods at the same time.
func (d *deploymentTester) waitForDeploymentCompleteAndMarkPodsReadyAndRemoveTerminated(ctx context.Context) error {
var wg sync.WaitGroup
// Manually mark updated Deployment pods as ready in a separate goroutine
wg.Add(1)
go d.markUpdatedPodsReady(&wg)
wg.Add(1)
go d.removeTerminatedPods(ctx, &wg)
// Wait for the Deployment status to complete using soft check, while Deployment pods are becoming ready
err := d.waitForDeploymentComplete()
if err != nil {
return fmt.Errorf("failed to wait for Deployment status %s: %w", d.deployment.Name, err)
}
// Wait for goroutine to finish
wg.Wait()
return nil
}
func (d *deploymentTester) updateDeployment(applyUpdate testutil.UpdateDeploymentFunc) (*apps.Deployment, error) {
return testutil.UpdateDeploymentWithRetries(d.c, d.deployment.Namespace, d.deployment.Name, applyUpdate, d.t.Logf, pollInterval, pollTimeout)
}
@ -432,6 +488,20 @@ func (d *deploymentTester) markUpdatedPodsReadyWithoutComplete() error {
return nil
}
// waitForReadyReplicas waits for number of ready replicas to equal number of replicas.
func (d *deploymentTester) waitForDeploymentStatusReplicasFields(ctx context.Context, replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32, terminatingReplicas *int32) error {
var lastErr error
err := wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, func(_ context.Context) (bool, error) {
// do not pass ctx to checkDeploymentStatusReplicasFields (Deployments().Get) so we always obtain the latest error and not the one from deadline exceeded
lastErr = d.checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas, terminatingReplicas)
return lastErr == nil, nil
})
if err != nil {
return fmt.Errorf("%w: %w", lastErr, err)
}
return nil
}
// Verify all replicas fields of DeploymentStatus have desired count.
// Immediately return an error when found a non-matching replicas field.
func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32, terminatingReplicas *int32) error {
@ -456,6 +526,33 @@ func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updated
}
if !ptr.Equal(deployment.Status.TerminatingReplicas, terminatingReplicas) {
return fmt.Errorf("unexpected .terminatingReplicas: expect %v, got %v", ptr.Deref(terminatingReplicas, -1), ptr.Deref(deployment.Status.TerminatingReplicas, -1))
}
return nil
}
func (d *deploymentTester) removeRSPods(ctx context.Context, replicaset *apps.ReplicaSet, count int, targetOnlyTerminating bool, gracePeriodSeconds int64) error {
selector, err := metav1.LabelSelectorAsSelector(replicaset.Spec.Selector)
if err != nil {
return fmt.Errorf("could not parse a selector for a replica set %q: %w", replicaset.Name, err)
}
pods, err := d.c.CoreV1().Pods(replicaset.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return fmt.Errorf("failed to get pods for a replica set %q: %w", replicaset.Name, err)
}
for i, pod := range pods.Items {
if i >= count {
break
}
if targetOnlyTerminating && pod.DeletionTimestamp == nil {
continue
}
err := d.c.CoreV1().Pods(replicaset.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To(gracePeriodSeconds)})
if err != nil {
return fmt.Errorf("failed to delete pod %q for a replica set %q: %w", pod.Name, replicaset.Name, err)
}
}
return nil
}