mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-02-03 20:40:26 -05:00
Merge pull request #136638 from Jefftree/kcm-context
Add ctx to endpointslicemirroring controller
This commit is contained in:
commit
07a697046f
4 changed files with 28 additions and 26 deletions
|
|
@ -240,7 +240,7 @@ func (c *Controller) Run(ctx context.Context, workers int) {
|
|||
|
||||
for i := 0; i < workers; i++ {
|
||||
wg.Go(func() {
|
||||
wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
|
||||
wait.UntilWithContext(ctx, c.worker, c.workerLoopPeriod)
|
||||
})
|
||||
}
|
||||
<-ctx.Done()
|
||||
|
|
@ -250,20 +250,20 @@ func (c *Controller) Run(ctx context.Context, workers int) {
|
|||
// marks them done. You may run as many of these in parallel as you wish; the
|
||||
// workqueue guarantees that they will not end up processing the same service
|
||||
// at the same time
|
||||
func (c *Controller) worker(logger klog.Logger) {
|
||||
for c.processNextWorkItem(logger) {
|
||||
func (c *Controller) worker(ctx context.Context) {
|
||||
for c.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
|
||||
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
||||
cKey, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(cKey)
|
||||
|
||||
err := c.syncEndpoints(logger, cKey)
|
||||
c.handleErr(logger, err, cKey)
|
||||
err := c.syncEndpoints(ctx, cKey)
|
||||
c.handleErr(klog.FromContext(ctx), err, cKey)
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
@ -285,7 +285,8 @@ func (c *Controller) handleErr(logger klog.Logger, err error, key string) {
|
|||
utilruntime.HandleError(err)
|
||||
}
|
||||
|
||||
func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
|
||||
func (c *Controller) syncEndpoints(ctx context.Context, key string) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
syncDuration := float64(time.Since(startTime).Milliseconds()) / 1000
|
||||
|
|
@ -305,7 +306,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
|
|||
if apierrors.IsNotFound(err) {
|
||||
logger.V(4).Info("Endpoints not found, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name))
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
return c.deleteMirroredSlices(ctx, namespace, name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
@ -313,7 +314,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
|
|||
if !c.shouldMirror(endpoints) {
|
||||
logger.V(4).Info("Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name))
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
return c.deleteMirroredSlices(ctx, namespace, name)
|
||||
}
|
||||
|
||||
svc, err := c.serviceLister.Services(namespace).Get(name)
|
||||
|
|
@ -321,7 +322,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
|
|||
if apierrors.IsNotFound(err) {
|
||||
logger.V(4).Info("Service not found, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name))
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
return c.deleteMirroredSlices(ctx, namespace, name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
@ -330,7 +331,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
|
|||
if svc.Spec.Selector != nil {
|
||||
logger.V(4).Info("Service now has selector, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name))
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
return c.deleteMirroredSlices(ctx, namespace, name)
|
||||
}
|
||||
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
|
|
@ -342,7 +343,7 @@ func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
|
|||
return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
|
||||
}
|
||||
|
||||
err = c.reconciler.reconcile(logger, endpoints, endpointSlices)
|
||||
err = c.reconciler.reconcile(ctx, endpoints, endpointSlices)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -529,14 +530,14 @@ func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.End
|
|||
|
||||
// deleteMirroredSlices will delete and EndpointSlices that have been mirrored
|
||||
// for Endpoints with this namespace and name.
|
||||
func (c *Controller) deleteMirroredSlices(namespace, name string) error {
|
||||
func (c *Controller) deleteMirroredSlices(ctx context.Context, namespace, name string) error {
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
|
||||
return c.reconciler.deleteEndpoints(ctx, namespace, name, endpointSlices)
|
||||
}
|
||||
|
||||
// endpointSlicesMirroredForService returns the EndpointSlices that have been
|
||||
|
|
|
|||
|
|
@ -245,8 +245,8 @@ func TestSyncEndpoints(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
err := esController.syncEndpoints(logger, fmt.Sprintf("%s/%s", namespace, endpointsName))
|
||||
_, syncCtx := ktesting.NewTestContext(t)
|
||||
err := esController.syncEndpoints(syncCtx, fmt.Sprintf("%s/%s", namespace, endpointsName))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error from syncEndpoints: %v", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,7 +61,8 @@ type reconciler struct {
|
|||
// reconcile takes an Endpoints resource and ensures that corresponding
|
||||
// EndpointSlices exist. It creates, updates, or deletes EndpointSlices to
|
||||
// ensure the desired set of addresses are represented by EndpointSlices.
|
||||
func (r *reconciler) reconcile(logger klog.Logger, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) error {
|
||||
func (r *reconciler) reconcile(ctx context.Context, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
// Calculate desired state.
|
||||
d := newDesiredCalc()
|
||||
|
||||
|
|
@ -167,7 +168,7 @@ func (r *reconciler) reconcile(logger klog.Logger, endpoints *corev1.Endpoints,
|
|||
endpointsNN := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
|
||||
r.metricsCache.UpdateEndpointPortCache(endpointsNN, epMetrics)
|
||||
|
||||
return r.finalize(endpoints, slices)
|
||||
return r.finalize(ctx, endpoints, slices)
|
||||
}
|
||||
|
||||
// reconcileByPortMapping compares the endpoints found in existing slices with
|
||||
|
|
@ -237,7 +238,7 @@ func (r *reconciler) reconcileByPortMapping(
|
|||
}
|
||||
|
||||
// finalize creates, updates, and deletes slices as specified
|
||||
func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction) error {
|
||||
func (r *reconciler) finalize(ctx context.Context, endpoints *corev1.Endpoints, slices slicesByAction) error {
|
||||
// If there are slices to create and delete, recycle the slices marked for
|
||||
// deletion by replacing creates with updates of slices that would otherwise
|
||||
// be deleted.
|
||||
|
|
@ -249,7 +250,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
|||
// being deleted.
|
||||
if endpoints.DeletionTimestamp == nil {
|
||||
for _, endpointSlice := range slices.toCreate {
|
||||
createdSlice, err := epsClient.Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
createdSlice, err := epsClient.Create(ctx, endpointSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
|
||||
if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
|
||||
|
|
@ -263,7 +264,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
|||
}
|
||||
|
||||
for _, endpointSlice := range slices.toUpdate {
|
||||
updatedSlice, err := epsClient.Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
|
||||
updatedSlice, err := epsClient.Update(ctx, endpointSlice, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)
|
||||
}
|
||||
|
|
@ -272,7 +273,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
|||
}
|
||||
|
||||
for _, endpointSlice := range slices.toDelete {
|
||||
err := epsClient.Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
|
||||
err := epsClient.Delete(ctx, endpointSlice.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)
|
||||
}
|
||||
|
|
@ -285,11 +286,11 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
|||
|
||||
// deleteEndpoints deletes any associated EndpointSlices and cleans up any
|
||||
// Endpoints references from the metricsCache.
|
||||
func (r *reconciler) deleteEndpoints(namespace, name string, endpointSlices []*discovery.EndpointSlice) error {
|
||||
func (r *reconciler) deleteEndpoints(ctx context.Context, namespace, name string, endpointSlices []*discovery.EndpointSlice) error {
|
||||
r.metricsCache.DeleteEndpoints(types.NamespacedName{Namespace: namespace, Name: name})
|
||||
var errs []error
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
err := r.client.DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
|
||||
err := r.client.DiscoveryV1().EndpointSlices(namespace).Delete(ctx, endpointSlice.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1316,8 +1316,8 @@ func fetchEndpointSlices(t *testing.T, client *fake.Clientset, namespace string)
|
|||
|
||||
func reconcileHelper(t *testing.T, r *reconciler, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) {
|
||||
t.Helper()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
err := r.reconcile(logger, endpoints, existingSlices)
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
err := r.reconcile(ctx, endpoints, existingSlices)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue