Merge pull request #136522 from hoteye/kubelet-contextual-core

kubelet: migrate core sync path to contextual logging
This commit is contained in:
Kubernetes Prow Robot 2026-02-03 02:18:26 +05:30 committed by GitHub
commit 2ad7178741
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 220 additions and 196 deletions

View file

@ -1278,7 +1278,7 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps
func startKubelet(ctx context.Context, k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())
go k.Run(ctx, podCfg.Updates())
// start the kubelet server
if enableServer {
@ -1333,7 +1333,7 @@ func createAndInitKubelet(
k.BirthCry()
k.StartGarbageCollection()
k.StartGarbageCollection(ctx)
return k, nil
}

View file

@ -50,7 +50,7 @@ type HandlerRunner interface {
// able to get necessary informations like the RunContainerOptions, DNS settings, Host IP.
type RuntimeHelper interface {
GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, imageVolumes ImageVolumes) (contOpts *RunContainerOptions, cleanupAction func(), err error)
GetPodDNS(pod *v1.Pod) (dnsConfig *runtimeapi.DNSConfig, err error)
GetPodDNS(ctx context.Context, pod *v1.Pod) (dnsConfig *runtimeapi.DNSConfig, err error)
// GetPodCgroupParent returns the CgroupName identifier, and its literal cgroupfs form on the host
// of a pod.
GetPodCgroupParent(pod *v1.Pod) string

View file

@ -55,7 +55,7 @@ func (f *FakeRuntimeHelper) GetPodCgroupParent(pod *v1.Pod) string {
return ""
}
func (f *FakeRuntimeHelper) GetPodDNS(pod *v1.Pod) (*runtimeapi.DNSConfig, error) {
func (f *FakeRuntimeHelper) GetPodDNS(_ context.Context, pod *v1.Pod) (*runtimeapi.DNSConfig, error) {
return &runtimeapi.DNSConfig{
Servers: f.DNSServers,
Searches: f.DNSSearches,

View file

@ -281,11 +281,11 @@ func getContainerEtcHostsPath() string {
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(logger klog.Logger, pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodAdditions(ctx context.Context, pods []*v1.Pod)
HandlePodUpdates(ctx context.Context, pods []*v1.Pod)
HandlePodRemoves(ctx context.Context, pods []*v1.Pod)
HandlePodReconcile(ctx context.Context, pods []*v1.Pod)
HandlePodSyncs(ctx context.Context, pods []*v1.Pod)
HandlePodCleanups(ctx context.Context) error
}
@ -296,11 +296,11 @@ type Option func(*Kubelet)
type Bootstrap interface {
GetConfiguration() kubeletconfiginternal.KubeletConfiguration
BirthCry()
StartGarbageCollection()
StartGarbageCollection(ctx context.Context)
ListenAndServe(ctx context.Context, kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider)
ListenAndServeReadOnly(ctx context.Context, address net.IP, port uint, tp trace.TracerProvider)
ListenAndServePodResources(ctx context.Context)
Run(<-chan kubetypes.PodUpdate)
Run(ctx context.Context, updates <-chan kubetypes.PodUpdate)
}
// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
@ -362,7 +362,8 @@ func newCrashLoopBackOff(kubeCfg *kubeletconfiginternal.KubeletConfiguration) (t
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
func makePodSourceConfig(ctx context.Context, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
logger := klog.FromContext(ctx)
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
@ -375,23 +376,20 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
// source of all configuration
cfg := config.NewPodConfig(kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker)
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
logger := klog.FromContext(ctx)
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
logger.Info("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(logger, kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
logger.Info("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(logger, kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
logger.Info("Adding apiserver pod source")
config.NewSourceApiserver(logger, kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
return cfg, nil
@ -479,18 +477,18 @@ func NewMainKubelet(ctx context.Context,
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.InfoS("Attempting to sync node with API server")
logger.Info("Attempting to sync node with API server")
} else {
// we don't have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
logger.Info("Kubelet is running in standalone mode, will skip API server sync")
}
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
kubeDeps.PodConfig, err = makePodSourceConfig(ctx, kubeCfg, kubeDeps, nodeName, nodeHasSynced)
if err != nil {
return nil, err
}
@ -562,10 +560,10 @@ func NewMainKubelet(ctx context.Context,
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) {
// oomwatcher.NewWatcher returns "open /dev/kmsg: operation not permitted" error,
// when running in a user namespace with sysctl value `kernel.dmesg_restrict=1`.
klog.V(2).InfoS("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err)
logger.V(2).Info("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err)
oomWatcher = nil
} else {
klog.ErrorS(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)")
logger.Error(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)")
return nil, err
}
} else {
@ -577,7 +575,7 @@ func NewMainKubelet(ctx context.Context,
for _, ipEntry := range kubeCfg.ClusterDNS {
ip := netutils.ParseIPSloppy(ipEntry)
if ip == nil {
klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry)
logger.Info("Invalid clusterDNS IP", "IP", ipEntry)
} else {
clusterDNS = append(clusterDNS, ip)
}
@ -694,7 +692,7 @@ func NewMainKubelet(ctx context.Context,
klet.containerManager.GetNodeConfig(),
klet.containerManager.GetNodeAllocatableAbsolute(),
klet.statusManager,
func(pod *v1.Pod) { klet.HandlePodSyncs([]*v1.Pod{pod}) },
func(pod *v1.Pod) { klet.HandlePodSyncs(ctx, []*v1.Pod{pod}) },
klet.GetActivePods,
klet.podManager.GetPodByUID,
klet.sourcesReady,
@ -880,7 +878,7 @@ func NewMainKubelet(ctx context.Context,
klet.runtimeState.addHealthCheck("EventedPLEG", klet.eventedPleg.Healthy)
}
if _, err := klet.updatePodCIDR(ctx, kubeCfg.PodCIDR); err != nil {
klog.ErrorS(err, "Pod CIDR update failed")
logger.Error(err, "Pod CIDR update failed")
}
// setup containerGC
@ -947,9 +945,9 @@ func NewMainKubelet(ctx context.Context,
var clusterTrustBundleManager clustertrustbundle.Manager = &clustertrustbundle.NoopManager{}
if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) {
clusterTrustBundleManager = clustertrustbundle.NewLazyInformerManager(ctx, kubeDeps.KubeClient, 2*int(kubeCfg.MaxPods))
klog.InfoS("ClusterTrustBundle informer will be started eventually once a trust bundle is requested")
logger.Info("ClusterTrustBundle informer will be started eventually once a trust bundle is requested")
} else {
klog.InfoS("Not starting ClusterTrustBundle informer because we are in static kubelet mode or the ClusterTrustBundleProjection featuregate is disabled")
logger.Info("Not starting ClusterTrustBundle informer because we are in static kubelet mode or the ClusterTrustBundleProjection featuregate is disabled")
}
if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodCertificateRequest) {
@ -976,7 +974,7 @@ func NewMainKubelet(ctx context.Context,
metrics.RegisterCollectors(collectors.PodCertificateCollectorFor(podCertificateManager))
} else {
klet.podCertificateManager = &podcertificate.NoOpManager{}
klog.InfoS("Not starting PodCertificateRequest manager because we are in static kubelet mode or the PodCertificateProjection feature gate is disabled")
logger.Info("Not starting PodCertificateRequest manager because we are in static kubelet mode or the PodCertificateProjection feature gate is disabled")
}
// NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
@ -1580,7 +1578,7 @@ func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) {
// 4. the pod-resources directory
// 5. the checkpoint directory
// 6. the pod logs root directory
func (kl *Kubelet) setupDataDirs() error {
func (kl *Kubelet) setupDataDirs(logger klog.Logger) error {
if cleanedRoot := filepath.Clean(kl.rootDirectory); cleanedRoot != kl.rootDirectory {
return fmt.Errorf("rootDirectory not in canonical form: expected %s, was %s", cleanedRoot, kl.rootDirectory)
}
@ -1615,23 +1613,23 @@ func (kl *Kubelet) setupDataDirs() error {
if selinux.GetEnabled() {
err := selinux.SetFileLabel(pluginRegistrationDir, kubeletconfig.KubeletPluginsDirSELinuxLabel)
if err != nil {
klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err)
logger.Info("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err)
}
err = selinux.SetFileLabel(pluginsDir, kubeletconfig.KubeletPluginsDirSELinuxLabel)
if err != nil {
klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err)
logger.Info("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err)
}
}
return nil
}
// StartGarbageCollection starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
func (kl *Kubelet) StartGarbageCollection(ctx context.Context) {
logger := klog.FromContext(ctx)
loggedContainerGCFailure := false
go wait.Until(func() {
ctx := context.Background()
if err := kl.containerGC.GarbageCollect(ctx); err != nil {
klog.ErrorS(err, "Container garbage collection failed")
logger.Error(err, "Container garbage collection failed")
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
loggedContainerGCFailure = true
} else {
@ -1641,28 +1639,27 @@ func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure = false
}
klog.V(vLevel).InfoS("Container garbage collection succeeded")
logger.V(int(vLevel)).Info("Container garbage collection succeeded")
}
}, ContainerGCPeriod, wait.NeverStop)
// when the high threshold is set to 100, and the max age is 0 (or the max age feature is disabled)
// stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 && kl.kubeletConfiguration.ImageMaximumGCAge.Duration == 0 {
klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100 and ImageMaximumGCAge is 0, Disable image GC")
logger.V(2).Info("ImageGCHighThresholdPercent is set 100 and ImageMaximumGCAge is 0, Disable image GC")
return
}
prevImageGCFailed := false
beganGC := time.Now()
go wait.Until(func() {
ctx := context.Background()
if err := kl.imageManager.GarbageCollect(ctx, beganGC); err != nil {
if prevImageGCFailed {
klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
logger.Error(err, "Image garbage collection failed multiple times in a row")
// Only create an event for repeated failures
kl.recorder.Event(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
} else {
klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
logger.Error(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
}
prevImageGCFailed = true
} else {
@ -1672,7 +1669,7 @@ func (kl *Kubelet) StartGarbageCollection() {
prevImageGCFailed = false
}
klog.V(vLevel).InfoS("Image garbage collection succeeded")
logger.V(int(vLevel)).Info("Image garbage collection succeeded")
}
}, ImageGCPeriod, wait.NeverStop)
}
@ -1680,6 +1677,7 @@ func (kl *Kubelet) StartGarbageCollection() {
// initializeModules will initialize internal modules that do not require the container runtime to be up.
// Note that the modules here must not depend on modules that are not initialized here.
func (kl *Kubelet) initializeModules(ctx context.Context) error {
logger := klog.FromContext(ctx)
// Prometheus metrics.
metrics.Register()
metrics.RegisterCollectors(
@ -1690,7 +1688,7 @@ func (kl *Kubelet) initializeModules(ctx context.Context) error {
servermetrics.Register()
// Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
if err := kl.setupDataDirs(logger); err != nil {
return err
}
@ -1732,9 +1730,10 @@ func (kl *Kubelet) initializeModules(ctx context.Context) error {
// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) {
logger := klog.FromContext(ctx)
if err := kl.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start cAdvisor")
logger.Error(err, "Failed to start cAdvisor")
os.Exit(1)
}
@ -1745,13 +1744,13 @@ func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) {
node, err := kl.getNodeAnyWay(ctx)
if err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Kubelet failed to get node info")
logger.Error(err, "Kubelet failed to get node info")
os.Exit(1)
}
// containerManager must start after cAdvisor because it needs filesystem capacity information
if err := kl.containerManager.Start(ctx, node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start ContainerManager")
logger.Error(err, "Failed to start ContainerManager")
os.Exit(1)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
@ -1769,19 +1768,19 @@ func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) {
}
// Start the plugin manager
klog.V(4).InfoS("Starting plugin manager")
logger.V(4).Info("Starting plugin manager")
go kl.pluginManager.Run(ctx, kl.sourcesReady, wait.NeverStop)
err = kl.shutdownManager.Start()
if err != nil {
// The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it.
klog.ErrorS(err, "Failed to start node shutdown manager")
logger.Error(err, "Failed to start node shutdown manager")
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
ctx := context.Background()
func (kl *Kubelet) Run(ctx context.Context, updates <-chan kubetypes.PodUpdate) {
logger := klog.FromContext(ctx)
if kl.logServer == nil {
file := http.FileServer(http.Dir(nodeLogDir))
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery {
@ -1818,17 +1817,17 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
}
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
logger.Info("No API server defined - no node status update will be sent")
}
if err := kl.initializeModules(ctx); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
logger.Error(err, "Failed to initialize internal modules")
os.Exit(1)
}
if err := kl.cgroupVersionCheck(); err != nil {
klog.V(2).InfoS("Warning: cgroup check", "error", err)
logger.V(2).Info("Warning: cgroup check", "error", err)
}
// Start the allocation manager
@ -1872,7 +1871,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
kl.initNetworkUtil(logger)
}
// Start component sync loops.
@ -2098,12 +2097,10 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}
if !podKilled || !runOnce {
if !pcm.Exists(pod) {
// TODO: Pass logger from context once contextual logging migration is complete
if err := kl.containerManager.UpdateQOSCgroups(logger); err != nil {
logger.V(2).Info("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
}
// TODO: Pass logger from context once contextual logging migration is complete
if err := pcm.EnsureExists(klog.TODO(), pod); err != nil {
if err := pcm.EnsureExists(logger, pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
@ -2121,7 +2118,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
logger.Error(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
return false, err
}
@ -2129,13 +2126,13 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil {
var volumeAttachLimitErr *volumemanager.VolumeAttachLimitExceededError
if errors.As(err, &volumeAttachLimitErr) {
kl.rejectPod(pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error())
kl.rejectPod(ctx, pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error())
recordAdmissionRejection(volumemanager.VolumeAttachmentLimitExceededReason)
return true, nil
}
if !wait.Interrupted(err) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
logger.Error(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
}
return false, err
}
@ -2188,13 +2185,12 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
// during eviction). This method is not guaranteed to be called if a pod is force deleted from the
// configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned
// pods.
func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) (err error) {
func (kl *Kubelet) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) (err error) {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx := context.Background()
logger := klog.FromContext(ctx)
klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
ctx = klog.NewContext(context.TODO(), logger)
logger.V(4).Info("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
ctx, otelSpan := kl.tracer.Start(ctx, "syncTerminatingPod", trace.WithAttributes(
semconv.K8SPodUIDKey.String(string(pod.UID)),
@ -2208,7 +2204,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
otelSpan.SetStatus(codes.Error, err.Error())
}
otelSpan.End()
klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
}()
apiPodStatus := kl.generateAPIPodStatus(ctx, pod, podStatus, false)
@ -2218,9 +2214,9 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
kl.statusManager.SetPodStatus(logger, pod, apiPodStatus)
if gracePeriod != nil {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
logger.V(4).Info("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
} else {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
logger.V(4).Info("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
}
kl.probeManager.StopLivenessAndStartup(pod)
@ -2246,7 +2242,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
// cache immediately
stoppedPodStatus, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
if err != nil {
klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.Error(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
return err
}
preserveDataFromBeforeStopping(stoppedPodStatus, podStatus)
@ -2258,19 +2254,19 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
FinishedAt string
}
var containers []container
klogV := klog.V(4)
klogVEnabled := klogV.Enabled()
loggerV := logger.V(4)
loggerVEnabled := loggerV.Enabled()
for _, s := range stoppedPodStatus.ContainerStatuses {
if s.State == kubecontainer.ContainerStateRunning {
runningContainers = append(runningContainers, s.ID.String())
}
if klogVEnabled {
if loggerVEnabled {
containers = append(containers, container{Name: s.Name, State: string(s.State), ExitCode: s.ExitCode, FinishedAt: s.FinishedAt.UTC().Format(time.RFC3339Nano)})
}
}
if klogVEnabled {
if loggerVEnabled {
sort.Slice(containers, func(i, j int) bool { return containers[i].Name < containers[j].Name })
klog.V(4).InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers)
logger.V(4).Info("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers)
}
if len(runningContainers) > 0 {
return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
@ -2293,7 +2289,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
kl.statusManager.SetPodStatus(logger, pod, apiPodStatus)
// we have successfully stopped all containers, the pod is terminating, our status is "done"
klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
@ -2314,16 +2310,17 @@ func preserveDataFromBeforeStopping(stoppedPodStatus, podStatus *kubecontainer.P
// that the remnant of the running pod is terminated and allow garbage collection to proceed. We do
// not update the status of the pod because with the source of configuration removed, we have no
// place to send that status.
func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error {
func (kl *Kubelet) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error {
// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
// Currently, using that context causes test failures.
ctx := context.Background()
logger := klog.FromContext(ctx)
ctx = klog.NewContext(context.TODO(), logger)
pod := runningPod.ToAPIPod()
klog.V(4).InfoS("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer logger.V(4).Info("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
// we kill the pod directly since we have lost all other information about the pod.
klog.V(4).InfoS("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID)
// TODO: this should probably be zero, to bypass any waiting (needs fixes in container runtime)
gracePeriod := int64(1)
if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil {
@ -2332,7 +2329,7 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube
utilruntime.HandleError(err)
return err
}
klog.V(4).InfoS("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
@ -2354,8 +2351,8 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
))
logger := klog.FromContext(ctx)
defer otelSpan.End()
klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
defer logger.V(4).Info("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
// generate the final status of the pod
// TODO: should we simply fold this into TerminatePod? that would give a single pod update
@ -2368,20 +2365,20 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil {
return err
}
klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
// This waiting loop relies on the background cleanup which starts after pod workers respond
// true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed.
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
volumesExist := kl.podVolumesExist(logger, pod.UID)
if volumesExist {
klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(3).Info("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID)
}
return !volumesExist, nil
}); err != nil {
return err
}
klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(3).Info("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID)
// After volume unmount is complete, let the secret and configmap managers know we're done with this pod
if kl.secretManager != nil {
@ -2402,14 +2399,14 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus
if err := pcm.Destroy(logger, name); err != nil {
return err
}
klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
}
kl.usernsManager.Release(logger, pod.UID)
// mark the final pod status
kl.statusManager.TerminatePod(logger, pod)
klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(4).Info("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return nil
}
@ -2456,7 +2453,7 @@ func (kl *Kubelet) getPodsToSync() []*v1.Pod {
//
// deletePod returns an error if not all sources are ready or the pod is not
// found in the runtime cache.
func (kl *Kubelet) deletePod(pod *v1.Pod) error {
func (kl *Kubelet) deletePod(logger klog.Logger, pod *v1.Pod) error {
if pod == nil {
return fmt.Errorf("deletePod does not allow nil pod")
}
@ -2465,7 +2462,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
// for sources that haven't reported yet.
return fmt.Errorf("skipping delete because sources aren't ready yet")
}
klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
logger.V(3).Info("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
@ -2476,9 +2473,10 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
// rejectPod records an event about the pod with the given reason and message,
// and updates the pod to the failed phase in the status manager.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
func (kl *Kubelet) rejectPod(ctx context.Context, pod *v1.Pod, reason, message string) {
logger := klog.FromContext(ctx)
kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{
kl.statusManager.SetPodStatus(logger, pod, v1.PodStatus{
QOSClass: v1qos.GetPodQOS(pod), // keep it as is
Phase: v1.PodFailed,
Reason: reason,
@ -2600,20 +2598,20 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
handler.HandlePodAdditions(ctx, u.Pods)
case kubetypes.UPDATE:
logger.V(2).Info("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(logger, u.Pods)
handler.HandlePodUpdates(ctx, u.Pods)
case kubetypes.REMOVE:
logger.V(2).Info("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
handler.HandlePodRemoves(ctx, u.Pods)
case kubetypes.RECONCILE:
logger.V(4).Info("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
handler.HandlePodReconcile(ctx, u.Pods)
case kubetypes.DELETE:
logger.V(2).Info("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(logger, u.Pods)
handler.HandlePodUpdates(ctx, u.Pods)
default:
logger.Error(nil, "Invalid operation type received", "operation", u.Op)
}
@ -2624,7 +2622,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
logger.V(2).Info("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
handler.HandlePodSyncs(ctx, []*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
logger.V(4).Info("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
@ -2643,10 +2641,10 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
break
}
logger.V(4).Info("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjSlice(podsToSync))
handler.HandlePodSyncs(podsToSync)
handler.HandlePodSyncs(ctx, podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
handleProbeSync(ctx, kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
ready := update.Result == proberesults.Success
@ -2656,7 +2654,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
handleProbeSync(ctx, kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(logger, update.PodUID, update.ContainerID, started)
@ -2665,7 +2663,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
handleProbeSync(ctx, kl, update, handler, "startup", status)
case update := <-kl.containerManager.Updates():
pods := []*v1.Pod{}
for _, p := range update.PodUIDs {
@ -2680,7 +2678,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
if len(pods) > 0 {
// Updating the pod by syncing it again
// We do not apply the optimization by updating the status directly, but can do it later
handler.HandlePodSyncs(pods)
handler.HandlePodSyncs(ctx, pods)
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
@ -2703,22 +2701,24 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety
return true
}
func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
func handleProbeSync(ctx context.Context, kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
logger := klog.FromContext(ctx)
// We should not use the pod from manager, because it is never updated after initialization.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
logger.V(4).Info("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
return
}
klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
handler.HandlePodSyncs([]*v1.Pod{pod})
logger.V(1).Info("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
handler.HandlePodSyncs(ctx, []*v1.Pod{pod})
}
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
func (kl *Kubelet) HandlePodAdditions(ctx context.Context, pods []*v1.Pod) {
start := kl.clock.Now()
logger := klog.FromContext(ctx)
sort.Sort(sliceutils.PodsByCreationTime(pods))
var pendingResizes []types.UID
for _, pod := range pods {
@ -2728,12 +2728,12 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
kl.podCertificateManager.TrackPod(context.TODO(), pod)
kl.podCertificateManager.TrackPod(ctx, pod)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
@ -2757,7 +2757,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
if ok, reason, message := kl.allocationManager.AddPod(kl.GetActivePods(), pod); !ok {
kl.rejectPod(pod, reason, message)
kl.rejectPod(ctx, pod, reason, message)
// We avoid recording the metric in canAdmitPod because it's called
// repeatedly during a resize, which would inflate the metric.
// Instead, we record the metric here in HandlePodAdditions for new pods
@ -2796,8 +2796,9 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
// HandlePodUpdates is the callback in the SyncHandler interface for pods
// being updated from a config source.
func (kl *Kubelet) HandlePodUpdates(logger klog.Logger, pods []*v1.Pod) {
func (kl *Kubelet) HandlePodUpdates(ctx context.Context, pods []*v1.Pod) {
start := kl.clock.Now()
logger := klog.FromContext(ctx)
for _, pod := range pods {
oldPod, _ := kl.podManager.GetPodByUID(pod.UID)
kl.podManager.UpdatePod(pod)
@ -2944,17 +2945,18 @@ func resizeOperationForResources(new, old *resource.Quantity) string {
// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
func (kl *Kubelet) HandlePodRemoves(ctx context.Context, pods []*v1.Pod) {
start := kl.clock.Now()
logger := klog.FromContext(ctx)
for _, pod := range pods {
kl.podCertificateManager.ForgetPod(context.TODO(), pod)
kl.podCertificateManager.ForgetPod(ctx, pod)
kl.podManager.RemovePod(pod)
kl.allocationManager.RemovePod(pod.UID)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
@ -2968,8 +2970,8 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
// Deletion is allowed to fail because the periodic cleanup routine
// will trigger deletion again.
if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
if err := kl.deletePod(logger, pod); err != nil {
logger.V(2).Info("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
}
}
@ -2981,8 +2983,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
// HandlePodReconcile is the callback in the SyncHandler interface for pods
// that should be reconciled. Pods are reconciled when only the status of the
// pod is updated in the API.
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
func (kl *Kubelet) HandlePodReconcile(ctx context.Context, pods []*v1.Pod) {
start := kl.clock.Now()
logger := klog.FromContext(ctx)
retryPendingResizes := false
hasPendingResizes := kl.allocationManager.HasPendingResizes()
for _, pod := range pods {
@ -2994,7 +2997,7 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
// Static pods should be reconciled the same way as regular pods
@ -3061,19 +3064,20 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
func (kl *Kubelet) HandlePodSyncs(ctx context.Context, pods []*v1.Pod) {
start := kl.clock.Now()
logger := klog.FromContext(ctx)
for _, pod := range pods {
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
continue
}
// Syncing a mirror pod is a programmer error since the intent of sync is to
// batch notify all pending work. We should make it impossible to double sync,
// but for now log a programmer error to prevent accidental introduction.
klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID)
logger.V(3).Info("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
@ -3200,7 +3204,7 @@ func (pp *kubeletPodsProvider) GetPodByName(namespace, name string) (*v1.Pod, bo
func (kl *Kubelet) ListenAndServePodResources(ctx context.Context) {
endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
if err != nil {
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
klog.FromContext(ctx).V(2).Info("Failed to get local endpoint for PodResources endpoint", "err", err)
return
}
@ -3319,6 +3323,7 @@ func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) e
// Ensure Mirror Pod for Static Pod exists and matches the current pod definition.
// The function logs and ignores any errors.
func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirrorPod *v1.Pod) {
logger := klog.FromContext(ctx)
if !kubetypes.IsStaticPod(staticPod) {
return
}
@ -3327,26 +3332,26 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.UID)
logger.Info("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.UID)
podFullName := kubecontainer.GetPodFullName(staticPod)
if ok, err := kl.mirrorPodClient.DeleteMirrorPod(ctx, podFullName, &mirrorPod.UID); err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
logger.Error(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
} else if ok {
deleted = ok
klog.InfoS("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod))
logger.Info("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode(ctx)
if err != nil {
klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
logger.Error(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
logger.Info("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod))
logger.Info("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod))
if err := kl.mirrorPodClient.CreateMirrorPod(ctx, staticPod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod))
logger.Error(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod))
}
}
}
@ -3354,16 +3359,17 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror
// Ensure Mirror Pod for Static Pod exists as soon as node is registered.
func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) {
logger := klog.FromContext(ctx)
if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
_, err := kl.GetNode(ctx)
if err == nil {
return true, nil
}
klog.V(4).ErrorS(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName)))
logger.Error(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName)))
return false, nil
}); err != nil {
klog.V(4).ErrorS(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName)))
logger.Error(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName)))
}
staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()

View file

@ -28,6 +28,7 @@ import (
// updatePodCIDR updates the pod CIDR in the runtime state if it is different
// from the current CIDR. Return true if pod CIDR is actually changed.
func (kl *Kubelet) updatePodCIDR(ctx context.Context, cidr string) (bool, error) {
logger := klog.FromContext(ctx)
kl.updatePodCIDRMux.Lock()
defer kl.updatePodCIDRMux.Unlock()
@ -44,7 +45,7 @@ func (kl *Kubelet) updatePodCIDR(ctx context.Context, cidr string) (bool, error)
// But it is better to be on the safe side to still return true here.
return true, fmt.Errorf("failed to update pod CIDR: %v", err)
}
klog.InfoS("Updating Pod CIDR", "originalPodCIDR", podCIDR, "newPodCIDR", cidr)
logger.Info("Updating Pod CIDR", "originalPodCIDR", podCIDR, "newPodCIDR", cidr)
kl.runtimeState.setPodCIDR(cidr)
return true, nil
}
@ -52,8 +53,6 @@ func (kl *Kubelet) updatePodCIDR(ctx context.Context, cidr string) (bool, error)
// GetPodDNS returns DNS settings for the pod.
// This function is defined in kubecontainer.RuntimeHelper interface so we
// have to implement it.
func (kl *Kubelet) GetPodDNS(pod *v1.Pod) (*runtimeapi.DNSConfig, error) {
// Use context.TODO() because we currently do not have a proper context to pass in.
// Replace this with an appropriate context when refactoring this function to accept a context parameter.
return kl.dnsConfigurer.GetPodDNS(context.TODO(), pod)
func (kl *Kubelet) GetPodDNS(ctx context.Context, pod *v1.Pod) (*runtimeapi.DNSConfig, error) {
return kl.dnsConfigurer.GetPodDNS(ctx, pod)
}

View file

@ -35,7 +35,7 @@ const (
KubeFirewallChain utiliptables.Chain = "KUBE-FIREWALL"
)
func (kl *Kubelet) initNetworkUtil() {
func (kl *Kubelet) initNetworkUtil(logger klog.Logger) {
iptClients := utiliptables.NewBestEffort()
if len(iptClients) == 0 {
// We don't log this as an error because kubelet itself doesn't need any
@ -43,33 +43,33 @@ func (kl *Kubelet) initNetworkUtil() {
// and because we *expect* this to fail on hosts where only nftables is
// supported (in which case there can't be any other components using
// iptables that would need these rules anyway).
klog.InfoS("No iptables support on this system; not creating the KUBE-IPTABLES-HINT chain")
logger.Info("No iptables support on this system; not creating the KUBE-IPTABLES-HINT chain")
return
}
for family := range iptClients {
iptClient := iptClients[family]
if kl.syncIPTablesRules(iptClient) {
klog.InfoS("Initialized iptables rules.", "protocol", iptClient.Protocol())
if kl.syncIPTablesRules(logger, iptClient) {
logger.Info("Initialized iptables rules.", "protocol", iptClient.Protocol())
go iptClient.Monitor(
utiliptables.Chain("KUBE-KUBELET-CANARY"),
[]utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
func() { kl.syncIPTablesRules(iptClient) },
func() { kl.syncIPTablesRules(logger, iptClient) },
1*time.Minute, wait.NeverStop,
)
} else {
klog.InfoS("Failed to initialize iptables rules; some functionality may be missing.", "protocol", iptClient.Protocol())
logger.Info("Failed to initialize iptables rules; some functionality may be missing.", "protocol", iptClient.Protocol())
}
}
}
// syncIPTablesRules ensures the KUBE-IPTABLES-HINT chain exists, and the martian packet
// protection rule is installed.
func (kl *Kubelet) syncIPTablesRules(iptClient utiliptables.Interface) bool {
func (kl *Kubelet) syncIPTablesRules(logger klog.Logger, iptClient utiliptables.Interface) bool {
// Create hint chain so other components can see whether we are using iptables-legacy
// or iptables-nft.
if _, err := iptClient.EnsureChain(utiliptables.TableMangle, KubeIPTablesHintChain); err != nil {
klog.ErrorS(err, "Failed to ensure that iptables hint chain exists")
logger.Error(err, "Failed to ensure that iptables hint chain exists")
return false
}
@ -83,16 +83,16 @@ func (kl *Kubelet) syncIPTablesRules(iptClient utiliptables.Interface) bool {
// created by kube-proxy.
if _, err := iptClient.EnsureChain(utiliptables.TableFilter, KubeFirewallChain); err != nil {
klog.ErrorS(err, "Failed to ensure that filter table KUBE-FIREWALL chain exists")
logger.Error(err, "Failed to ensure that filter table KUBE-FIREWALL chain exists")
return false
}
if _, err := iptClient.EnsureRule(utiliptables.Prepend, utiliptables.TableFilter, utiliptables.ChainOutput, "-j", string(KubeFirewallChain)); err != nil {
klog.ErrorS(err, "Failed to ensure that OUTPUT chain jumps to KUBE-FIREWALL")
logger.Error(err, "Failed to ensure that OUTPUT chain jumps to KUBE-FIREWALL")
return false
}
if _, err := iptClient.EnsureRule(utiliptables.Prepend, utiliptables.TableFilter, utiliptables.ChainInput, "-j", string(KubeFirewallChain)); err != nil {
klog.ErrorS(err, "Failed to ensure that INPUT chain jumps to KUBE-FIREWALL")
logger.Error(err, "Failed to ensure that INPUT chain jumps to KUBE-FIREWALL")
return false
}
@ -109,7 +109,7 @@ func (kl *Kubelet) syncIPTablesRules(iptClient utiliptables.Interface) bool {
"-m", "conntrack",
"!", "--ctstate", "RELATED,ESTABLISHED,DNAT",
"-j", "DROP"); err != nil {
klog.ErrorS(err, "Failed to ensure rule to drop invalid localhost packets in filter table KUBE-FIREWALL chain")
logger.Error(err, "Failed to ensure rule to drop invalid localhost packets in filter table KUBE-FIREWALL chain")
return false
}
}

View file

@ -18,5 +18,7 @@ limitations under the License.
package kubelet
import "k8s.io/klog/v2"
// Do nothing.
func (kl *Kubelet) initNetworkUtil() {}
func (kl *Kubelet) initNetworkUtil(logger klog.Logger) {}

View file

@ -49,6 +49,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/testutil"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubelet/pkg/cri/streaming/portforward"
"k8s.io/kubelet/pkg/cri/streaming/remotecommand"
_ "k8s.io/kubernetes/pkg/apis/core/install"
@ -7475,7 +7476,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) {
if !ok {
t.Fatalf("unable to reject pod by UID %v", reject.uid)
}
kl.rejectPod(pod, reject.reason, reject.message)
kl.rejectPod(tCtx, pod, reject.reason, reject.message)
}
if err := kl.HandlePodCleanups(tCtx); (err != nil) != tt.wantErr {
@ -7516,7 +7517,8 @@ func testMetric(t *testing.T, metricName string, expectedMetric string) {
}
func TestGetNonExistentImagePullSecret(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
_, tCtx := ktesting.NewTestContext(t)
logger := klog.FromContext(tCtx)
secrets := make([]*v1.Secret, 0)
fakeRecorder := record.NewFakeRecorder(1)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)

View file

@ -285,7 +285,7 @@ func newTestKubeletWithImageList(
},
}
kubelet.recorder = fakeRecorder
if err := kubelet.setupDataDirs(); err != nil {
if err := kubelet.setupDataDirs(logger); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}
kubelet.daemonEndpoints = &v1.NodeDaemonEndpoints{}
@ -335,7 +335,7 @@ func newTestKubeletWithImageList(
kubelet.containerManager.GetNodeConfig(),
kubelet.containerManager.GetNodeAllocatableAbsolute(),
kubelet.statusManager,
func(pod *v1.Pod) { kubelet.HandlePodSyncs([]*v1.Pod{pod}) },
func(pod *v1.Pod) { kubelet.HandlePodSyncs(tCtx, []*v1.Pod{pod}) },
kubelet.GetActivePods,
kubelet.podManager.GetPodByUID,
config.NewSourcesReady(func(_ sets.Set[string]) bool { return enableResizing }),
@ -504,6 +504,7 @@ func TestSyncLoopAbort(t *testing.T) {
}
func TestSyncPodsStartPod(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@ -516,7 +517,7 @@ func TestSyncPodsStartPod(t *testing.T) {
}),
}
kubelet.podManager.SetPods(pods)
kubelet.HandlePodSyncs(pods)
kubelet.HandlePodSyncs(ctx, pods)
fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)})
}
@ -727,7 +728,7 @@ func TestDispatchWorkOfActivePod(t *testing.T) {
}
func TestHandlePodCleanups(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -746,7 +747,7 @@ func TestHandlePodCleanups(t *testing.T) {
}
kubelet := testKubelet.kubelet
kubelet.HandlePodCleanups(ctx)
require.NoError(t, kubelet.HandlePodCleanups(tCtx))
// assert that unwanted pods were queued to kill
if actual, expected := kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion, []types.UID{"12345678"}; !reflect.DeepEqual(actual, expected) {
@ -756,6 +757,7 @@ func TestHandlePodCleanups(t *testing.T) {
}
func TestVolumeAttachLimitExceededCleanup(t *testing.T) {
ctx := ktesting.Init(t)
const podCount = 500
tk := newTestKubelet(t, true /* controller-attach-detach enabled */)
defer tk.Cleanup()
@ -784,9 +786,7 @@ func TestVolumeAttachLimitExceededCleanup(t *testing.T) {
pods, _ := newTestPodsWithResources(podCount)
kl.podManager.SetPods(pods)
kl.HandlePodSyncs(pods)
ctx := context.Background()
kl.HandlePodSyncs(ctx, pods)
// all pods must reach a terminal, Failed state due to VolumeAttachmentLimitExceeded.
if err := wait.PollUntilContextTimeout(
@ -858,6 +858,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
ctx := ktesting.Init(t)
ready := false
@ -884,7 +885,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
kubelet := testKubelet.kubelet
kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.Set[string]) bool { return ready })
kubelet.HandlePodRemoves(pods)
kubelet.HandlePodRemoves(ctx, pods)
time.Sleep(2 * time.Second)
// Sources are not ready yet. Don't remove any pods.
@ -893,7 +894,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
}
ready = true
kubelet.HandlePodRemoves(pods)
kubelet.HandlePodRemoves(ctx, pods)
time.Sleep(2 * time.Second)
// Sources are ready. Remove unwanted pods.
@ -928,6 +929,7 @@ func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) {
// Tests that we handle port conflicts correctly by setting the failed status in status map.
func TestHandlePortConflicts(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -969,7 +971,7 @@ func TestHandlePortConflicts(t *testing.T) {
pods[1].UID: true,
}
kl.HandlePodAdditions(pods)
kl.HandlePodAdditions(ctx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -978,6 +980,7 @@ func TestHandlePortConflicts(t *testing.T) {
// Tests that we handle host name conflicts correctly by setting the failed status in status map.
func TestHandleHostNameConflicts(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1012,7 +1015,7 @@ func TestHandleHostNameConflicts(t *testing.T) {
notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(pods)
kl.HandlePodAdditions(ctx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -1021,6 +1024,7 @@ func TestHandleHostNameConflicts(t *testing.T) {
// Tests that we handle not matching labels selector correctly by setting the failed status in status map.
func TestHandleNodeSelector(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1054,7 +1058,7 @@ func TestHandleNodeSelector(t *testing.T) {
notfittingPod := pods[0]
fittingPod := pods[1]
kl.HandlePodAdditions(pods)
kl.HandlePodAdditions(ctx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -1091,6 +1095,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1118,7 +1123,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
pod := podWithUIDNameNsSpec("123456789", "podA", "foo", v1.PodSpec{NodeSelector: test.podSelector})
kl.HandlePodAdditions([]*v1.Pod{pod})
kl.HandlePodAdditions(ctx, []*v1.Pod{pod})
// Check pod status stored in the status map.
checkPodStatus(t, kl, pod, test.podStatus)
@ -1128,6 +1133,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
// Tests that we handle exceeded resources correctly by setting the failed status in status map.
func TestHandleMemExceeded(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1173,7 +1179,7 @@ func TestHandleMemExceeded(t *testing.T) {
pods[1].UID: true,
}
kl.HandlePodAdditions(pods)
kl.HandlePodAdditions(ctx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
@ -1183,6 +1189,7 @@ func TestHandleMemExceeded(t *testing.T) {
// Tests that we handle result of interface UpdatePluginResources correctly
// by setting corresponding status in status map.
func TestHandlePluginResources(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, false /*enableResizing*/)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1304,7 +1311,7 @@ func TestHandlePluginResources(t *testing.T) {
missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec)
failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec)
kl.HandlePodAdditions([]*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})
kl.HandlePodAdditions(ctx, []*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})
// Check pod status stored in the status map.
checkPodStatus(t, kl, fittingPod, v1.PodPending)
@ -1315,7 +1322,7 @@ func TestHandlePluginResources(t *testing.T) {
// TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal.
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
ctx := context.Background()
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -1326,7 +1333,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
}
podToTest := pods[1]
// Run once to populate the status map.
kl.HandlePodAdditions(pods)
kl.HandlePodAdditions(ctx, pods)
if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found {
t.Fatalf("expected to have status cached for pod2")
}
@ -1504,6 +1511,7 @@ func TestCreateMirrorPod(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -1513,7 +1521,7 @@ func TestCreateMirrorPod(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
pods := []*v1.Pod{pod}
kl.podManager.SetPods(pods)
isTerminal, err := kl.SyncPod(context.Background(), tt.updateType, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kl.SyncPod(tCtx, tt.updateType, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -1526,6 +1534,7 @@ func TestCreateMirrorPod(t *testing.T) {
}
func TestDeleteOutdatedMirrorPod(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -1549,7 +1558,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
pods := []*v1.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods)
isTerminal, err := kl.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
isTerminal, err := kl.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
assert.NoError(t, err)
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -1562,7 +1571,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
}
func TestDeleteOrphanedMirrorPods(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
@ -1612,7 +1621,7 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
}
// Sync with an empty pod list to delete all mirror pods.
kl.HandlePodCleanups(ctx)
require.NoError(t, kl.HandlePodCleanups(tCtx))
assert.Empty(t, manager.GetPods(), "Expected no mirror pods")
for i, pod := range orphanPods {
name := kubecontainer.GetPodFullName(pod)
@ -1631,6 +1640,7 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
}
func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
tCtx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
@ -1646,7 +1656,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
})
kubelet.podManager.SetPods([]*v1.Pod{pod})
isTerminal, err := kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err := kubelet.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -1654,7 +1664,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
pod.Spec.HostNetwork = true
isTerminal, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
isTerminal, err = kubelet.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error")
if isTerminal {
t.Fatalf("pod should not be terminal: %#v", pod)
@ -1787,13 +1797,13 @@ func TestCheckpointContainer(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
options := &runtimeapi.CheckpointContainerRequest{}
if test.checkpointLocation != "" {
options.Location = test.checkpointLocation
}
status := kubelet.CheckpointContainer(
ctx,
tCtx,
fakePod.Pod.ID,
fmt.Sprintf(
"%s_%s",
@ -1827,7 +1837,7 @@ func TestCheckpointContainer(t *testing.T) {
}
func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
fakeRuntime := testKubelet.fakeRuntime
@ -1868,7 +1878,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
}
// Let the pod worker sets the status to fail after this sync.
kubelet.HandlePodUpdates(logger, pods)
kubelet.HandlePodUpdates(ctx, pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
assert.Equal(t, v1.PodFailed, status.Phase)
@ -1877,7 +1887,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
}
func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
fakeRuntime := testKubelet.fakeRuntime
@ -1919,7 +1929,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
}
kubelet.podManager.SetPods(pods)
kubelet.HandlePodUpdates(logger, pods)
kubelet.HandlePodUpdates(ctx, pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
assert.NotEqual(t, v1.PodFailed, status.Phase)
@ -1943,7 +1953,7 @@ func podWithUIDNameNsSpec(uid types.UID, name, namespace string, spec v1.PodSpec
}
func TestDeletePodDirsForDeletedPods(t *testing.T) {
ctx := context.Background()
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -1954,7 +1964,7 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) {
kl.podManager.SetPods(pods)
// Sync to create pod directories.
kl.HandlePodSyncs(kl.podManager.GetPods())
kl.HandlePodSyncs(ctx, kl.podManager.GetPods())
for i := range pods {
assert.True(t, dirExists(kl.getPodDir(pods[i].UID)), "Expected directory to exist for pod %d", i)
}
@ -1967,12 +1977,12 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) {
}
func syncAndVerifyPodDir(t *testing.T, testKubelet *TestKubelet, pods []*v1.Pod, podsToCheck []*v1.Pod, shouldExist bool) {
ctx := context.Background()
ctx := ktesting.Init(t)
t.Helper()
kl := testKubelet.kubelet
kl.podManager.SetPods(pods)
kl.HandlePodSyncs(pods)
kl.HandlePodSyncs(ctx, pods)
kl.HandlePodCleanups(ctx)
for i, pod := range podsToCheck {
exist := dirExists(kl.getPodDir(pod.UID))
@ -2706,6 +2716,7 @@ func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecyc
// Test verifies that the kubelet invokes an admission handler during HandlePodAdditions.
func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
ctx := ktesting.Init(t)
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
@ -2742,7 +2753,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
kl.allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{&testPodAdmitHandler{podsToReject: podsToReject}})
kl.HandlePodAdditions(pods)
kl.HandlePodAdditions(ctx, pods)
// Check pod status stored in the status map.
checkPodStatus(t, kl, podToReject, v1.PodFailed)
@ -2945,6 +2956,7 @@ func TestPodResourceAllocationReset(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := ktesting.Init(t)
if tc.existingPodAllocation != nil {
// when kubelet restarts, AllocatedResources has already existed before adding pod
err := kubelet.allocationManager.SetAllocatedResources(tc.existingPodAllocation)
@ -2952,7 +2964,7 @@ func TestPodResourceAllocationReset(t *testing.T) {
t.Fatalf("failed to set pod allocation: %v", err)
}
}
kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
kubelet.HandlePodAdditions(ctx, []*v1.Pod{tc.pod})
allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(tc.pod.UID, tc.pod.Spec.Containers[0].Name)
if !found {
@ -3259,7 +3271,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s
}
func createRemoteRuntimeService(endpoint string, t *testing.T, tp oteltrace.TracerProvider) internalapi.RuntimeService {
logger := klog.Background()
logger, _ := ktesting.NewTestContext(t)
runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, tp, &logger)
require.NoError(t, err)
return runtimeService
@ -3355,7 +3367,8 @@ func TestNewMainKubeletStandAlone(t *testing.T) {
assert.NotNil(t, testMainKubelet, "testMainKubelet should not be nil")
testMainKubelet.BirthCry()
testMainKubelet.StartGarbageCollection()
ctx := ktesting.Init(t)
testMainKubelet.StartGarbageCollection(ctx)
// Nil pointer panic can be reproduced if configmap manager is not nil.
// See https://github.com/kubernetes/kubernetes/issues/113492
// pod := &v1.Pod{
@ -3422,7 +3435,7 @@ func TestSyncPodSpans(t *testing.T) {
fakeRuntime.ImageService.SetFakeImageSize(100)
fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"})
logger := klog.Background()
logger := klog.FromContext(tCtx)
imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp, &logger)
assert.NoError(t, err)
@ -3939,7 +3952,7 @@ func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) {
}
func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ctx := ktesting.Init(t)
metrics.Register()
metrics.ContainerRequestedResizes.Reset()
@ -4521,7 +4534,7 @@ func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) {
kubelet.podManager.AddPod(initialPod)
require.NoError(t, kubelet.allocationManager.SetAllocatedResources(initialPod))
kubelet.HandlePodUpdates(logger, []*v1.Pod{updatedPod})
kubelet.HandlePodUpdates(ctx, []*v1.Pod{updatedPod})
tc.updateExpectedFunc(&expectedMetrics)
@ -4584,6 +4597,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) {
t.Skip("InPlacePodVerticalScaling is not currently supported for Windows")
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
ctx := ktesting.Init(t)
testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, true /*enableResizing*/)
defer testKubelet.Cleanup()
@ -4707,7 +4721,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) {
kubelet.allocationManager.PushPendingResize(pendingResizeDesired.UID)
kubelet.statusManager.ClearPodResizePendingCondition(pendingResizeDesired.UID)
kubelet.HandlePodReconcile([]*v1.Pod{tc.newPod})
kubelet.HandlePodReconcile(ctx, []*v1.Pod{tc.newPod})
require.Equal(t, tc.shouldRetryPendingResize, kubelet.statusManager.IsPodResizeDeferred(pendingResizeDesired.UID))
kubelet.allocationManager.RemovePod(pendingResizeDesired.UID)
@ -4718,7 +4732,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) {
}
func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
ctx := ktesting.Init(t)
cpu1000m := resource.MustParse("1")
mem1000M := resource.MustParse("1Gi")
cpu2000m := resource.MustParse("2")
@ -4840,6 +4854,7 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logger := klog.FromContext(ctx)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeDeclaredFeatures, tc.featureGateEnabled)
testKubelet := newTestKubelet(t, false)
@ -4861,8 +4876,8 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) {
kubelet.podManager.SetPods([]*v1.Pod{tc.oldPod})
}
kubelet.statusManager.SetPodStatus(klog.TODO(), tc.newPod, v1.PodStatus{Phase: v1.PodRunning})
kubelet.HandlePodUpdates(logger, []*v1.Pod{tc.newPod})
kubelet.statusManager.SetPodStatus(logger, tc.newPod, v1.PodStatus{Phase: v1.PodRunning})
kubelet.HandlePodUpdates(ctx, []*v1.Pod{tc.newPod})
if tc.expectEvent {
select {
case event := <-recorder.Events:

View file

@ -92,7 +92,7 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(ctx context.Context
Annotations: newPodAnnotations(pod),
}
dnsConfig, err := m.runtimeHelper.GetPodDNS(pod)
dnsConfig, err := m.runtimeHelper.GetPodDNS(ctx, pod)
if err != nil {
return nil, err
}