mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-04-26 00:29:44 -04:00
controller/bootstrap: Improve goroutine mgmt
Make sure all threads are terminated when Run returns.
This commit is contained in:
parent
f0ed028e75
commit
3a50f28ff8
2 changed files with 25 additions and 10 deletions
|
|
@ -19,6 +19,7 @@ package bootstrap
|
|||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
|
@ -155,19 +156,26 @@ func NewSigner(cl clientset.Interface, secrets informers.SecretInformer, configM
|
|||
|
||||
// Run runs controller loops and returns when they are done
|
||||
func (e *Signer) Run(ctx context.Context) {
|
||||
// Shut down queues
|
||||
defer utilruntime.HandleCrash()
|
||||
defer e.syncQueue.ShutDown()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(5).Info("Starting")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
logger.V(1).Info("Shutting down")
|
||||
e.syncQueue.ShutDown()
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
if !cache.WaitForNamedCacheSyncWithContext(ctx, e.configMapSynced, e.secretSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.V(5).Info("Starting workers")
|
||||
go wait.UntilWithContext(ctx, e.serviceConfigMapQueue, 0)
|
||||
wg.Go(func() {
|
||||
wait.UntilWithContext(ctx, e.serviceConfigMapQueue, 0)
|
||||
})
|
||||
<-ctx.Done()
|
||||
logger.V(1).Info("Shutting down")
|
||||
}
|
||||
|
||||
func (e *Signer) pokeConfigMapSync() {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package bootstrap
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
|
@ -111,18 +112,24 @@ func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInforme
|
|||
// Run runs controller loops and returns when they are done
|
||||
func (tc *TokenCleaner) Run(ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer tc.queue.ShutDown()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Starting token cleaner controller")
|
||||
defer logger.Info("Shutting down token cleaner controller")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
logger.Info("Shutting down token cleaner controller")
|
||||
tc.queue.ShutDown()
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
if !cache.WaitForNamedCacheSyncWithContext(ctx, tc.secretSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
go wait.UntilWithContext(ctx, tc.worker, 10*time.Second)
|
||||
|
||||
wg.Go(func() {
|
||||
wait.UntilWithContext(ctx, tc.worker, 10*time.Second)
|
||||
})
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue