From 1e67dbfefbb249836fe7d2f7016fffb50c05b38b Mon Sep 17 00:00:00 2001 From: Jefftree Date: Thu, 29 Jan 2026 17:07:14 -0500 Subject: [PATCH 1/2] Add ctx to endpointslicemirroring controller --- .../endpointslicemirroring_controller.go | 29 ++++++++++--------- .../endpointslicemirroring/reconciler.go | 17 ++++++----- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go index 9f943e6719f..14ff2ecb55a 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go @@ -240,7 +240,7 @@ func (c *Controller) Run(ctx context.Context, workers int) { for i := 0; i < workers; i++ { wg.Go(func() { - wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done()) + wait.UntilWithContext(ctx, c.worker, c.workerLoopPeriod) }) } <-ctx.Done() @@ -250,20 +250,20 @@ func (c *Controller) Run(ctx context.Context, workers int) { // marks them done. You may run as many of these in parallel as you wish; the // workqueue guarantees that they will not end up processing the same service // at the same time -func (c *Controller) worker(logger klog.Logger) { - for c.processNextWorkItem(logger) { +func (c *Controller) worker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } -func (c *Controller) processNextWorkItem(logger klog.Logger) bool { +func (c *Controller) processNextWorkItem(ctx context.Context) bool { cKey, quit := c.queue.Get() if quit { return false } defer c.queue.Done(cKey) - err := c.syncEndpoints(logger, cKey) - c.handleErr(logger, err, cKey) + err := c.syncEndpoints(ctx, cKey) + c.handleErr(klog.FromContext(ctx), err, cKey) return true } @@ -285,7 +285,8 @@ func (c *Controller) handleErr(logger klog.Logger, err error, key string) { utilruntime.HandleError(err) } -func (c *Controller) syncEndpoints(logger klog.Logger, key string) error { +func (c *Controller) syncEndpoints(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) startTime := time.Now() defer func() { syncDuration := float64(time.Since(startTime).Milliseconds()) / 1000 @@ -305,7 +306,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error { if apierrors.IsNotFound(err) { logger.V(4).Info("Endpoints not found, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name)) c.endpointSliceTracker.DeleteService(namespace, name) - return c.deleteMirroredSlices(namespace, name) + return c.deleteMirroredSlices(ctx, namespace, name) } return err } @@ -313,7 +314,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error { if !c.shouldMirror(endpoints) { logger.V(4).Info("Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name)) c.endpointSliceTracker.DeleteService(namespace, name) - return c.deleteMirroredSlices(namespace, name) + return c.deleteMirroredSlices(ctx, namespace, name) } svc, err := c.serviceLister.Services(namespace).Get(name) @@ -321,7 +322,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error { if apierrors.IsNotFound(err) { logger.V(4).Info("Service not found, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name)) c.endpointSliceTracker.DeleteService(namespace, name) - return c.deleteMirroredSlices(namespace, name) + return c.deleteMirroredSlices(ctx, namespace, name) } return err } @@ -330,7 +331,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error { if svc.Spec.Selector != nil { logger.V(4).Info("Service now has selector, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name)) c.endpointSliceTracker.DeleteService(namespace, name) - return c.deleteMirroredSlices(namespace, name) + return c.deleteMirroredSlices(ctx, namespace, name) } endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name) @@ -342,7 +343,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error { return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date") } - err = c.reconciler.reconcile(logger, endpoints, endpointSlices) + err = c.reconciler.reconcile(ctx, endpoints, endpointSlices) if err != nil { return err } @@ -529,14 +530,14 @@ func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.End // deleteMirroredSlices will delete and EndpointSlices that have been mirrored // for Endpoints with this namespace and name. -func (c *Controller) deleteMirroredSlices(namespace, name string) error { +func (c *Controller) deleteMirroredSlices(ctx context.Context, namespace, name string) error { endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name) if err != nil { return err } c.endpointSliceTracker.DeleteService(namespace, name) - return c.reconciler.deleteEndpoints(namespace, name, endpointSlices) + return c.reconciler.deleteEndpoints(ctx, namespace, name, endpointSlices) } // endpointSlicesMirroredForService returns the EndpointSlices that have been diff --git a/pkg/controller/endpointslicemirroring/reconciler.go b/pkg/controller/endpointslicemirroring/reconciler.go index 03044376e61..6a2ffd08706 100644 --- a/pkg/controller/endpointslicemirroring/reconciler.go +++ b/pkg/controller/endpointslicemirroring/reconciler.go @@ -61,7 +61,8 @@ type reconciler struct { // reconcile takes an Endpoints resource and ensures that corresponding // EndpointSlices exist. It creates, updates, or deletes EndpointSlices to // ensure the desired set of addresses are represented by EndpointSlices. -func (r *reconciler) reconcile(logger klog.Logger, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) error { +func (r *reconciler) reconcile(ctx context.Context, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) error { + logger := klog.FromContext(ctx) // Calculate desired state. d := newDesiredCalc() @@ -167,7 +168,7 @@ func (r *reconciler) reconcile(logger klog.Logger, endpoints *corev1.Endpoints, endpointsNN := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace} r.metricsCache.UpdateEndpointPortCache(endpointsNN, epMetrics) - return r.finalize(endpoints, slices) + return r.finalize(ctx, endpoints, slices) } // reconcileByPortMapping compares the endpoints found in existing slices with @@ -237,7 +238,7 @@ func (r *reconciler) reconcileByPortMapping( } // finalize creates, updates, and deletes slices as specified -func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction) error { +func (r *reconciler) finalize(ctx context.Context, endpoints *corev1.Endpoints, slices slicesByAction) error { // If there are slices to create and delete, recycle the slices marked for // deletion by replacing creates with updates of slices that would otherwise // be deleted. @@ -249,7 +250,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction // being deleted. if endpoints.DeletionTimestamp == nil { for _, endpointSlice := range slices.toCreate { - createdSlice, err := epsClient.Create(context.TODO(), endpointSlice, metav1.CreateOptions{}) + createdSlice, err := epsClient.Create(ctx, endpointSlice, metav1.CreateOptions{}) if err != nil { // If the namespace is terminating, creates will continue to fail. Simply drop the item. if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) { @@ -263,7 +264,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction } for _, endpointSlice := range slices.toUpdate { - updatedSlice, err := epsClient.Update(context.TODO(), endpointSlice, metav1.UpdateOptions{}) + updatedSlice, err := epsClient.Update(ctx, endpointSlice, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("failed to update %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err) } @@ -272,7 +273,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction } for _, endpointSlice := range slices.toDelete { - err := epsClient.Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{}) + err := epsClient.Delete(ctx, endpointSlice.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("failed to delete %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err) } @@ -285,11 +286,11 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction // deleteEndpoints deletes any associated EndpointSlices and cleans up any // Endpoints references from the metricsCache. -func (r *reconciler) deleteEndpoints(namespace, name string, endpointSlices []*discovery.EndpointSlice) error { +func (r *reconciler) deleteEndpoints(ctx context.Context, namespace, name string, endpointSlices []*discovery.EndpointSlice) error { r.metricsCache.DeleteEndpoints(types.NamespacedName{Namespace: namespace, Name: name}) var errs []error for _, endpointSlice := range endpointSlices { - err := r.client.DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{}) + err := r.client.DiscoveryV1().EndpointSlices(namespace).Delete(ctx, endpointSlice.Name, metav1.DeleteOptions{}) if err != nil { errs = append(errs, err) } From 063caad801f30ad4584a8aed398b3ed826128692 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Thu, 29 Jan 2026 17:55:08 -0500 Subject: [PATCH 2/2] Fix unit tests --- .../endpointslicemirroring_controller_test.go | 4 ++-- pkg/controller/endpointslicemirroring/reconciler_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go index 4846bcb76ce..07d48f5ea5d 100644 --- a/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go +++ b/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller_test.go @@ -245,8 +245,8 @@ func TestSyncEndpoints(t *testing.T) { } } - logger, _ := ktesting.NewTestContext(t) - err := esController.syncEndpoints(logger, fmt.Sprintf("%s/%s", namespace, endpointsName)) + _, syncCtx := ktesting.NewTestContext(t) + err := esController.syncEndpoints(syncCtx, fmt.Sprintf("%s/%s", namespace, endpointsName)) if err != nil { t.Fatalf("Unexpected error from syncEndpoints: %v", err) } diff --git a/pkg/controller/endpointslicemirroring/reconciler_test.go b/pkg/controller/endpointslicemirroring/reconciler_test.go index e7fc8ffce54..e5c86b3d331 100644 --- a/pkg/controller/endpointslicemirroring/reconciler_test.go +++ b/pkg/controller/endpointslicemirroring/reconciler_test.go @@ -1316,8 +1316,8 @@ func fetchEndpointSlices(t *testing.T, client *fake.Clientset, namespace string) func reconcileHelper(t *testing.T, r *reconciler, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) { t.Helper() - logger, _ := ktesting.NewTestContext(t) - err := r.reconcile(logger, endpoints, existingSlices) + _, ctx := ktesting.NewTestContext(t) + err := r.reconcile(ctx, endpoints, existingSlices) if err != nil { t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err) }