From f684d90648ad848f7cf02ca2af591af86aca4803 Mon Sep 17 00:00:00 2001 From: hoteye Date: Sat, 24 Jan 2026 11:44:37 +0800 Subject: [PATCH] kubelet: migrate core sync path to contextual logging --- cmd/kubelet/app/server.go | 4 +- pkg/kubelet/container/helpers.go | 2 +- .../container/testing/fake_runtime_helper.go | 2 +- pkg/kubelet/kubelet.go | 258 +++++++++--------- pkg/kubelet/kubelet_network.go | 9 +- pkg/kubelet/kubelet_network_linux.go | 24 +- pkg/kubelet/kubelet_network_others.go | 4 +- pkg/kubelet/kubelet_pods_test.go | 6 +- pkg/kubelet/kubelet_test.go | 105 ++++--- .../kuberuntime/kuberuntime_sandbox.go | 2 +- 10 files changed, 220 insertions(+), 196 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 638cacc7699..a2dab27e588 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -1278,7 +1278,7 @@ func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps func startKubelet(ctx context.Context, k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) { // start the kubelet - go k.Run(podCfg.Updates()) + go k.Run(ctx, podCfg.Updates()) // start the kubelet server if enableServer { @@ -1333,7 +1333,7 @@ func createAndInitKubelet( k.BirthCry() - k.StartGarbageCollection() + k.StartGarbageCollection(ctx) return k, nil } diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 8f495f99eb8..59f470c597c 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -50,7 +50,7 @@ type HandlerRunner interface { // able to get necessary informations like the RunContainerOptions, DNS settings, Host IP. type RuntimeHelper interface { GenerateRunContainerOptions(ctx context.Context, pod *v1.Pod, container *v1.Container, podIP string, podIPs []string, imageVolumes ImageVolumes) (contOpts *RunContainerOptions, cleanupAction func(), err error) - GetPodDNS(pod *v1.Pod) (dnsConfig *runtimeapi.DNSConfig, err error) + GetPodDNS(ctx context.Context, pod *v1.Pod) (dnsConfig *runtimeapi.DNSConfig, err error) // GetPodCgroupParent returns the CgroupName identifier, and its literal cgroupfs form on the host // of a pod. GetPodCgroupParent(pod *v1.Pod) string diff --git a/pkg/kubelet/container/testing/fake_runtime_helper.go b/pkg/kubelet/container/testing/fake_runtime_helper.go index 0246e3ca6b6..d7da9e33b2f 100644 --- a/pkg/kubelet/container/testing/fake_runtime_helper.go +++ b/pkg/kubelet/container/testing/fake_runtime_helper.go @@ -55,7 +55,7 @@ func (f *FakeRuntimeHelper) GetPodCgroupParent(pod *v1.Pod) string { return "" } -func (f *FakeRuntimeHelper) GetPodDNS(pod *v1.Pod) (*runtimeapi.DNSConfig, error) { +func (f *FakeRuntimeHelper) GetPodDNS(_ context.Context, pod *v1.Pod) (*runtimeapi.DNSConfig, error) { return &runtimeapi.DNSConfig{ Servers: f.DNSServers, Searches: f.DNSSearches, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8eea28106af..fc83e5449eb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -281,11 +281,11 @@ func getContainerEtcHostsPath() string { // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - HandlePodAdditions(pods []*v1.Pod) - HandlePodUpdates(logger klog.Logger, pods []*v1.Pod) - HandlePodRemoves(pods []*v1.Pod) - HandlePodReconcile(pods []*v1.Pod) - HandlePodSyncs(pods []*v1.Pod) + HandlePodAdditions(ctx context.Context, pods []*v1.Pod) + HandlePodUpdates(ctx context.Context, pods []*v1.Pod) + HandlePodRemoves(ctx context.Context, pods []*v1.Pod) + HandlePodReconcile(ctx context.Context, pods []*v1.Pod) + HandlePodSyncs(ctx context.Context, pods []*v1.Pod) HandlePodCleanups(ctx context.Context) error } @@ -296,11 +296,11 @@ type Option func(*Kubelet) type Bootstrap interface { GetConfiguration() kubeletconfiginternal.KubeletConfiguration BirthCry() - StartGarbageCollection() + StartGarbageCollection(ctx context.Context) ListenAndServe(ctx context.Context, kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface, tp trace.TracerProvider) ListenAndServeReadOnly(ctx context.Context, address net.IP, port uint, tp trace.TracerProvider) ListenAndServePodResources(ctx context.Context) - Run(<-chan kubetypes.PodUpdate) + Run(ctx context.Context, updates <-chan kubetypes.PodUpdate) } // Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed @@ -362,7 +362,8 @@ func newCrashLoopBackOff(kubeCfg *kubeletconfiginternal.KubeletConfiguration) (t // makePodSourceConfig creates a config.PodConfig from the given // KubeletConfiguration or returns an error. -func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) { +func makePodSourceConfig(ctx context.Context, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) { + logger := klog.FromContext(ctx) manifestURLHeader := make(http.Header) if len(kubeCfg.StaticPodURLHeader) > 0 { for k, v := range kubeCfg.StaticPodURLHeader { @@ -375,23 +376,20 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku // source of all configuration cfg := config.NewPodConfig(kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker) - // TODO: it needs to be replaced by a proper context in the future - ctx := context.TODO() - logger := klog.FromContext(ctx) // define file config source if kubeCfg.StaticPodPath != "" { - klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath) + logger.Info("Adding static pod path", "path", kubeCfg.StaticPodPath) config.NewSourceFile(logger, kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource)) } // define url config source if kubeCfg.StaticPodURL != "" { - klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader) + logger.Info("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader) config.NewSourceURL(logger, kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource)) } if kubeDeps.KubeClient != nil { - klog.InfoS("Adding apiserver pod source") + logger.Info("Adding apiserver pod source") config.NewSourceApiserver(logger, kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource)) } return cfg, nil @@ -479,18 +477,18 @@ func NewMainKubelet(ctx context.Context, return kubeInformers.Core().V1().Nodes().Informer().HasSynced() } kubeInformers.Start(wait.NeverStop) - klog.InfoS("Attempting to sync node with API server") + logger.Info("Attempting to sync node with API server") } else { // we don't have a client to sync! nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) nodeLister = corelisters.NewNodeLister(nodeIndexer) nodeHasSynced = func() bool { return true } - klog.InfoS("Kubelet is running in standalone mode, will skip API server sync") + logger.Info("Kubelet is running in standalone mode, will skip API server sync") } if kubeDeps.PodConfig == nil { var err error - kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced) + kubeDeps.PodConfig, err = makePodSourceConfig(ctx, kubeCfg, kubeDeps, nodeName, nodeHasSynced) if err != nil { return nil, err } @@ -562,10 +560,10 @@ func NewMainKubelet(ctx context.Context, if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) { // oomwatcher.NewWatcher returns "open /dev/kmsg: operation not permitted" error, // when running in a user namespace with sysctl value `kernel.dmesg_restrict=1`. - klog.V(2).InfoS("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err) + logger.V(2).Info("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err) oomWatcher = nil } else { - klog.ErrorS(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)") + logger.Error(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)") return nil, err } } else { @@ -577,7 +575,7 @@ func NewMainKubelet(ctx context.Context, for _, ipEntry := range kubeCfg.ClusterDNS { ip := netutils.ParseIPSloppy(ipEntry) if ip == nil { - klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry) + logger.Info("Invalid clusterDNS IP", "IP", ipEntry) } else { clusterDNS = append(clusterDNS, ip) } @@ -694,7 +692,7 @@ func NewMainKubelet(ctx context.Context, klet.containerManager.GetNodeConfig(), klet.containerManager.GetNodeAllocatableAbsolute(), klet.statusManager, - func(pod *v1.Pod) { klet.HandlePodSyncs([]*v1.Pod{pod}) }, + func(pod *v1.Pod) { klet.HandlePodSyncs(ctx, []*v1.Pod{pod}) }, klet.GetActivePods, klet.podManager.GetPodByUID, klet.sourcesReady, @@ -880,7 +878,7 @@ func NewMainKubelet(ctx context.Context, klet.runtimeState.addHealthCheck("EventedPLEG", klet.eventedPleg.Healthy) } if _, err := klet.updatePodCIDR(ctx, kubeCfg.PodCIDR); err != nil { - klog.ErrorS(err, "Pod CIDR update failed") + logger.Error(err, "Pod CIDR update failed") } // setup containerGC @@ -947,9 +945,9 @@ func NewMainKubelet(ctx context.Context, var clusterTrustBundleManager clustertrustbundle.Manager = &clustertrustbundle.NoopManager{} if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) { clusterTrustBundleManager = clustertrustbundle.NewLazyInformerManager(ctx, kubeDeps.KubeClient, 2*int(kubeCfg.MaxPods)) - klog.InfoS("ClusterTrustBundle informer will be started eventually once a trust bundle is requested") + logger.Info("ClusterTrustBundle informer will be started eventually once a trust bundle is requested") } else { - klog.InfoS("Not starting ClusterTrustBundle informer because we are in static kubelet mode or the ClusterTrustBundleProjection featuregate is disabled") + logger.Info("Not starting ClusterTrustBundle informer because we are in static kubelet mode or the ClusterTrustBundleProjection featuregate is disabled") } if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodCertificateRequest) { @@ -976,7 +974,7 @@ func NewMainKubelet(ctx context.Context, metrics.RegisterCollectors(collectors.PodCertificateCollectorFor(podCertificateManager)) } else { klet.podCertificateManager = &podcertificate.NoOpManager{} - klog.InfoS("Not starting PodCertificateRequest manager because we are in static kubelet mode or the PodCertificateProjection feature gate is disabled") + logger.Info("Not starting PodCertificateRequest manager because we are in static kubelet mode or the PodCertificateProjection feature gate is disabled") } // NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init) @@ -1580,7 +1578,7 @@ func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) { // 4. the pod-resources directory // 5. the checkpoint directory // 6. the pod logs root directory -func (kl *Kubelet) setupDataDirs() error { +func (kl *Kubelet) setupDataDirs(logger klog.Logger) error { if cleanedRoot := filepath.Clean(kl.rootDirectory); cleanedRoot != kl.rootDirectory { return fmt.Errorf("rootDirectory not in canonical form: expected %s, was %s", cleanedRoot, kl.rootDirectory) } @@ -1615,23 +1613,23 @@ func (kl *Kubelet) setupDataDirs() error { if selinux.GetEnabled() { err := selinux.SetFileLabel(pluginRegistrationDir, kubeletconfig.KubeletPluginsDirSELinuxLabel) if err != nil { - klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err) + logger.Info("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err) } err = selinux.SetFileLabel(pluginsDir, kubeletconfig.KubeletPluginsDirSELinuxLabel) if err != nil { - klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err) + logger.Info("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err) } } return nil } // StartGarbageCollection starts garbage collection threads. -func (kl *Kubelet) StartGarbageCollection() { +func (kl *Kubelet) StartGarbageCollection(ctx context.Context) { + logger := klog.FromContext(ctx) loggedContainerGCFailure := false go wait.Until(func() { - ctx := context.Background() if err := kl.containerGC.GarbageCollect(ctx); err != nil { - klog.ErrorS(err, "Container garbage collection failed") + logger.Error(err, "Container garbage collection failed") kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error()) loggedContainerGCFailure = true } else { @@ -1641,28 +1639,27 @@ func (kl *Kubelet) StartGarbageCollection() { loggedContainerGCFailure = false } - klog.V(vLevel).InfoS("Container garbage collection succeeded") + logger.V(int(vLevel)).Info("Container garbage collection succeeded") } }, ContainerGCPeriod, wait.NeverStop) // when the high threshold is set to 100, and the max age is 0 (or the max age feature is disabled) // stub the image GC manager if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 && kl.kubeletConfiguration.ImageMaximumGCAge.Duration == 0 { - klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100 and ImageMaximumGCAge is 0, Disable image GC") + logger.V(2).Info("ImageGCHighThresholdPercent is set 100 and ImageMaximumGCAge is 0, Disable image GC") return } prevImageGCFailed := false beganGC := time.Now() go wait.Until(func() { - ctx := context.Background() if err := kl.imageManager.GarbageCollect(ctx, beganGC); err != nil { if prevImageGCFailed { - klog.ErrorS(err, "Image garbage collection failed multiple times in a row") + logger.Error(err, "Image garbage collection failed multiple times in a row") // Only create an event for repeated failures kl.recorder.Event(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error()) } else { - klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet") + logger.Error(err, "Image garbage collection failed once. Stats initialization may not have completed yet") } prevImageGCFailed = true } else { @@ -1672,7 +1669,7 @@ func (kl *Kubelet) StartGarbageCollection() { prevImageGCFailed = false } - klog.V(vLevel).InfoS("Image garbage collection succeeded") + logger.V(int(vLevel)).Info("Image garbage collection succeeded") } }, ImageGCPeriod, wait.NeverStop) } @@ -1680,6 +1677,7 @@ func (kl *Kubelet) StartGarbageCollection() { // initializeModules will initialize internal modules that do not require the container runtime to be up. // Note that the modules here must not depend on modules that are not initialized here. func (kl *Kubelet) initializeModules(ctx context.Context) error { + logger := klog.FromContext(ctx) // Prometheus metrics. metrics.Register() metrics.RegisterCollectors( @@ -1690,7 +1688,7 @@ func (kl *Kubelet) initializeModules(ctx context.Context) error { servermetrics.Register() // Setup filesystem directories. - if err := kl.setupDataDirs(); err != nil { + if err := kl.setupDataDirs(logger); err != nil { return err } @@ -1732,9 +1730,10 @@ func (kl *Kubelet) initializeModules(ctx context.Context) error { // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up. func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) { + logger := klog.FromContext(ctx) if err := kl.cadvisor.Start(); err != nil { // Fail kubelet and rely on the babysitter to retry starting kubelet. - klog.ErrorS(err, "Failed to start cAdvisor") + logger.Error(err, "Failed to start cAdvisor") os.Exit(1) } @@ -1745,13 +1744,13 @@ func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) { node, err := kl.getNodeAnyWay(ctx) if err != nil { // Fail kubelet and rely on the babysitter to retry starting kubelet. - klog.ErrorS(err, "Kubelet failed to get node info") + logger.Error(err, "Kubelet failed to get node info") os.Exit(1) } // containerManager must start after cAdvisor because it needs filesystem capacity information if err := kl.containerManager.Start(ctx, node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil { // Fail kubelet and rely on the babysitter to retry starting kubelet. - klog.ErrorS(err, "Failed to start ContainerManager") + logger.Error(err, "Failed to start ContainerManager") os.Exit(1) } // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs @@ -1769,19 +1768,19 @@ func (kl *Kubelet) initializeRuntimeDependentModules(ctx context.Context) { } // Start the plugin manager - klog.V(4).InfoS("Starting plugin manager") + logger.V(4).Info("Starting plugin manager") go kl.pluginManager.Run(ctx, kl.sourcesReady, wait.NeverStop) err = kl.shutdownManager.Start() if err != nil { // The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it. - klog.ErrorS(err, "Failed to start node shutdown manager") + logger.Error(err, "Failed to start node shutdown manager") } } // Run starts the kubelet reacting to config updates -func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { - ctx := context.Background() +func (kl *Kubelet) Run(ctx context.Context, updates <-chan kubetypes.PodUpdate) { + logger := klog.FromContext(ctx) if kl.logServer == nil { file := http.FileServer(http.Dir(nodeLogDir)) if utilfeature.DefaultFeatureGate.Enabled(features.NodeLogQuery) && kl.kubeletConfiguration.EnableSystemLogQuery { @@ -1818,17 +1817,17 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { } } if kl.kubeClient == nil { - klog.InfoS("No API server defined - no node status update will be sent") + logger.Info("No API server defined - no node status update will be sent") } if err := kl.initializeModules(ctx); err != nil { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) - klog.ErrorS(err, "Failed to initialize internal modules") + logger.Error(err, "Failed to initialize internal modules") os.Exit(1) } if err := kl.cgroupVersionCheck(); err != nil { - klog.V(2).InfoS("Warning: cgroup check", "error", err) + logger.V(2).Info("Warning: cgroup check", "error", err) } // Start the allocation manager @@ -1872,7 +1871,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // Set up iptables util rules if kl.makeIPTablesUtilChains { - kl.initNetworkUtil() + kl.initNetworkUtil(logger) } // Start component sync loops. @@ -2098,12 +2097,10 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType } if !podKilled || !runOnce { if !pcm.Exists(pod) { - // TODO: Pass logger from context once contextual logging migration is complete if err := kl.containerManager.UpdateQOSCgroups(logger); err != nil { logger.V(2).Info("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err) } - // TODO: Pass logger from context once contextual logging migration is complete - if err := pcm.EnsureExists(klog.TODO(), pod); err != nil { + if err := pcm.EnsureExists(logger, pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } @@ -2121,7 +2118,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err) - klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) + logger.Error(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) return false, err } @@ -2129,13 +2126,13 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType if err := kl.volumeManager.WaitForAttachAndMount(ctx, pod); err != nil { var volumeAttachLimitErr *volumemanager.VolumeAttachLimitExceededError if errors.As(err, &volumeAttachLimitErr) { - kl.rejectPod(pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error()) + kl.rejectPod(ctx, pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error()) recordAdmissionRejection(volumemanager.VolumeAttachmentLimitExceededReason) return true, nil } if !wait.Interrupted(err) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) - klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) + logger.Error(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) } return false, err } @@ -2188,13 +2185,12 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType // during eviction). This method is not guaranteed to be called if a pod is force deleted from the // configuration and the kubelet is restarted - SyncTerminatingRuntimePod handles those orphaned // pods. -func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) (err error) { +func (kl *Kubelet) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) (err error) { // TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. // Currently, using that context causes test failures. - ctx := context.Background() - logger := klog.FromContext(ctx) - klog.V(4).InfoS("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) + ctx = klog.NewContext(context.TODO(), logger) + logger.V(4).Info("SyncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) ctx, otelSpan := kl.tracer.Start(ctx, "syncTerminatingPod", trace.WithAttributes( semconv.K8SPodUIDKey.String(string(pod.UID)), @@ -2208,7 +2204,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus otelSpan.SetStatus(codes.Error, err.Error()) } otelSpan.End() - klog.V(4).InfoS("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("SyncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) }() apiPodStatus := kl.generateAPIPodStatus(ctx, pod, podStatus, false) @@ -2218,9 +2214,9 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus kl.statusManager.SetPodStatus(logger, pod, apiPodStatus) if gracePeriod != nil { - klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) + logger.V(4).Info("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) } else { - klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) + logger.V(4).Info("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) } kl.probeManager.StopLivenessAndStartup(pod) @@ -2246,7 +2242,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus // cache immediately stoppedPodStatus, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace) if err != nil { - klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.Error(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID) return err } preserveDataFromBeforeStopping(stoppedPodStatus, podStatus) @@ -2258,19 +2254,19 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus FinishedAt string } var containers []container - klogV := klog.V(4) - klogVEnabled := klogV.Enabled() + loggerV := logger.V(4) + loggerVEnabled := loggerV.Enabled() for _, s := range stoppedPodStatus.ContainerStatuses { if s.State == kubecontainer.ContainerStateRunning { runningContainers = append(runningContainers, s.ID.String()) } - if klogVEnabled { + if loggerVEnabled { containers = append(containers, container{Name: s.Name, State: string(s.State), ExitCode: s.ExitCode, FinishedAt: s.FinishedAt.UTC().Format(time.RFC3339Nano)}) } } - if klogVEnabled { + if loggerVEnabled { sort.Slice(containers, func(i, j int) bool { return containers[i].Name < containers[j].Name }) - klog.V(4).InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers) + logger.V(4).Info("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", containers) } if len(runningContainers) > 0 { return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers) @@ -2293,7 +2289,7 @@ func (kl *Kubelet) SyncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus kl.statusManager.SetPodStatus(logger, pod, apiPodStatus) // we have successfully stopped all containers, the pod is terminating, our status is "done" - klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID) return nil } @@ -2314,16 +2310,17 @@ func preserveDataFromBeforeStopping(stoppedPodStatus, podStatus *kubecontainer.P // that the remnant of the running pod is terminated and allow garbage collection to proceed. We do // not update the status of the pod because with the source of configuration removed, we have no // place to send that status. -func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kubecontainer.Pod) error { +func (kl *Kubelet) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error { // TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker. // Currently, using that context causes test failures. - ctx := context.Background() + logger := klog.FromContext(ctx) + ctx = klog.NewContext(context.TODO(), logger) pod := runningPod.ToAPIPod() - klog.V(4).InfoS("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID) - defer klog.V(4).InfoS("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("SyncTerminatingRuntimePod enter", "pod", klog.KObj(pod), "podUID", pod.UID) + defer logger.V(4).Info("SyncTerminatingRuntimePod exit", "pod", klog.KObj(pod), "podUID", pod.UID) // we kill the pod directly since we have lost all other information about the pod. - klog.V(4).InfoS("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Orphaned running pod terminating without grace period", "pod", klog.KObj(pod), "podUID", pod.UID) // TODO: this should probably be zero, to bypass any waiting (needs fixes in container runtime) gracePeriod := int64(1) if err := kl.killPod(ctx, pod, *runningPod, &gracePeriod); err != nil { @@ -2332,7 +2329,7 @@ func (kl *Kubelet) SyncTerminatingRuntimePod(_ context.Context, runningPod *kube utilruntime.HandleError(err) return err } - klog.V(4).InfoS("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Pod termination stopped all running orphaned containers", "pod", klog.KObj(pod), "podUID", pod.UID) return nil } @@ -2354,8 +2351,8 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus )) logger := klog.FromContext(ctx) defer otelSpan.End() - klog.V(4).InfoS("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) - defer klog.V(4).InfoS("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("SyncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) + defer logger.V(4).Info("SyncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) // generate the final status of the pod // TODO: should we simply fold this into TerminatePod? that would give a single pod update @@ -2368,20 +2365,20 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus if err := kl.volumeManager.WaitForUnmount(ctx, pod); err != nil { return err } - klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID) // This waiting loop relies on the background cleanup which starts after pod workers respond // true for ShouldPodRuntimeBeRemoved, which happens after `SyncTerminatingPod` is completed. if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { volumesExist := kl.podVolumesExist(logger, pod.UID) if volumesExist { - klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(3).Info("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod), "podUID", pod.UID) } return !volumesExist, nil }); err != nil { return err } - klog.V(3).InfoS("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(3).Info("Pod termination cleaned up volume paths", "pod", klog.KObj(pod), "podUID", pod.UID) // After volume unmount is complete, let the secret and configmap managers know we're done with this pod if kl.secretManager != nil { @@ -2402,14 +2399,14 @@ func (kl *Kubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus if err := pcm.Destroy(logger, name); err != nil { return err } - klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID) } kl.usernsManager.Release(logger, pod.UID) // mark the final pod status kl.statusManager.TerminatePod(logger, pod) - klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(4).Info("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID) return nil } @@ -2456,7 +2453,7 @@ func (kl *Kubelet) getPodsToSync() []*v1.Pod { // // deletePod returns an error if not all sources are ready or the pod is not // found in the runtime cache. -func (kl *Kubelet) deletePod(pod *v1.Pod) error { +func (kl *Kubelet) deletePod(logger klog.Logger, pod *v1.Pod) error { if pod == nil { return fmt.Errorf("deletePod does not allow nil pod") } @@ -2465,7 +2462,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { // for sources that haven't reported yet. return fmt.Errorf("skipping delete because sources aren't ready yet") } - klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID) + logger.V(3).Info("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID) kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodKill, @@ -2476,9 +2473,10 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { // rejectPod records an event about the pod with the given reason and message, // and updates the pod to the failed phase in the status manager. -func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { +func (kl *Kubelet) rejectPod(ctx context.Context, pod *v1.Pod, reason, message string) { + logger := klog.FromContext(ctx) kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message) - kl.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + kl.statusManager.SetPodStatus(logger, pod, v1.PodStatus{ QOSClass: v1qos.GetPodQOS(pod), // keep it as is Phase: v1.PodFailed, Reason: reason, @@ -2600,20 +2598,20 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety // ADD as if they are new pods. These pods will then go through the // admission process and *may* be rejected. This can be resolved // once we have checkpointing. - handler.HandlePodAdditions(u.Pods) + handler.HandlePodAdditions(ctx, u.Pods) case kubetypes.UPDATE: logger.V(2).Info("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) - handler.HandlePodUpdates(logger, u.Pods) + handler.HandlePodUpdates(ctx, u.Pods) case kubetypes.REMOVE: logger.V(2).Info("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) - handler.HandlePodRemoves(u.Pods) + handler.HandlePodRemoves(ctx, u.Pods) case kubetypes.RECONCILE: logger.V(4).Info("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) - handler.HandlePodReconcile(u.Pods) + handler.HandlePodReconcile(ctx, u.Pods) case kubetypes.DELETE: logger.V(2).Info("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods)) // DELETE is treated as a UPDATE because of graceful deletion. - handler.HandlePodUpdates(logger, u.Pods) + handler.HandlePodUpdates(ctx, u.Pods) default: logger.Error(nil, "Invalid operation type received", "operation", u.Op) } @@ -2624,7 +2622,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety // PLEG event for a pod; sync it. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { logger.V(2).Info("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e) - handler.HandlePodSyncs([]*v1.Pod{pod}) + handler.HandlePodSyncs(ctx, []*v1.Pod{pod}) } else { // If the pod no longer exists, ignore the event. logger.V(4).Info("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e) @@ -2643,10 +2641,10 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety break } logger.V(4).Info("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjSlice(podsToSync)) - handler.HandlePodSyncs(podsToSync) + handler.HandlePodSyncs(ctx, podsToSync) case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { - handleProbeSync(kl, update, handler, "liveness", "unhealthy") + handleProbeSync(ctx, kl, update, handler, "liveness", "unhealthy") } case update := <-kl.readinessManager.Updates(): ready := update.Result == proberesults.Success @@ -2656,7 +2654,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety if ready { status = "ready" } - handleProbeSync(kl, update, handler, "readiness", status) + handleProbeSync(ctx, kl, update, handler, "readiness", status) case update := <-kl.startupManager.Updates(): started := update.Result == proberesults.Success kl.statusManager.SetContainerStartup(logger, update.PodUID, update.ContainerID, started) @@ -2665,7 +2663,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety if started { status = "started" } - handleProbeSync(kl, update, handler, "startup", status) + handleProbeSync(ctx, kl, update, handler, "startup", status) case update := <-kl.containerManager.Updates(): pods := []*v1.Pod{} for _, p := range update.PodUIDs { @@ -2680,7 +2678,7 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety if len(pods) > 0 { // Updating the pod by syncing it again // We do not apply the optimization by updating the status directly, but can do it later - handler.HandlePodSyncs(pods) + handler.HandlePodSyncs(ctx, pods) } case <-housekeepingCh: if !kl.sourcesReady.AllReady() { @@ -2703,22 +2701,24 @@ func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubety return true } -func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) { +func handleProbeSync(ctx context.Context, kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) { + logger := klog.FromContext(ctx) // We should not use the pod from manager, because it is never updated after initialization. pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. - klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update) + logger.V(4).Info("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update) return } - klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod)) - handler.HandlePodSyncs([]*v1.Pod{pod}) + logger.V(1).Info("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod)) + handler.HandlePodSyncs(ctx, []*v1.Pod{pod}) } // HandlePodAdditions is the callback in SyncHandler for pods being added from // a config source. -func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { +func (kl *Kubelet) HandlePodAdditions(ctx context.Context, pods []*v1.Pod) { start := kl.clock.Now() + logger := klog.FromContext(ctx) sort.Sort(sliceutils.PodsByCreationTime(pods)) var pendingResizes []types.UID for _, pod := range pods { @@ -2728,12 +2728,12 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // the apiserver and no action (other than cleanup) is required. kl.podManager.AddPod(pod) - kl.podCertificateManager.TrackPod(context.TODO(), pod) + kl.podCertificateManager.TrackPod(ctx, pod) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { if pod == nil { - klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) continue } kl.podWorkers.UpdatePod(UpdatePodOptions{ @@ -2757,7 +2757,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // We failed pods that we rejected, so activePods include all admitted // pods that are alive. if ok, reason, message := kl.allocationManager.AddPod(kl.GetActivePods(), pod); !ok { - kl.rejectPod(pod, reason, message) + kl.rejectPod(ctx, pod, reason, message) // We avoid recording the metric in canAdmitPod because it's called // repeatedly during a resize, which would inflate the metric. // Instead, we record the metric here in HandlePodAdditions for new pods @@ -2796,8 +2796,9 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { // HandlePodUpdates is the callback in the SyncHandler interface for pods // being updated from a config source. -func (kl *Kubelet) HandlePodUpdates(logger klog.Logger, pods []*v1.Pod) { +func (kl *Kubelet) HandlePodUpdates(ctx context.Context, pods []*v1.Pod) { start := kl.clock.Now() + logger := klog.FromContext(ctx) for _, pod := range pods { oldPod, _ := kl.podManager.GetPodByUID(pod.UID) kl.podManager.UpdatePod(pod) @@ -2944,17 +2945,18 @@ func resizeOperationForResources(new, old *resource.Quantity) string { // HandlePodRemoves is the callback in the SyncHandler interface for pods // being removed from a config source. -func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { +func (kl *Kubelet) HandlePodRemoves(ctx context.Context, pods []*v1.Pod) { start := kl.clock.Now() + logger := klog.FromContext(ctx) for _, pod := range pods { - kl.podCertificateManager.ForgetPod(context.TODO(), pod) + kl.podCertificateManager.ForgetPod(ctx, pod) kl.podManager.RemovePod(pod) kl.allocationManager.RemovePod(pod.UID) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { if pod == nil { - klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) continue } kl.podWorkers.UpdatePod(UpdatePodOptions{ @@ -2968,8 +2970,8 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { // Deletion is allowed to fail because the periodic cleanup routine // will trigger deletion again. - if err := kl.deletePod(pod); err != nil { - klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err) + if err := kl.deletePod(logger, pod); err != nil { + logger.V(2).Info("Failed to delete pod", "pod", klog.KObj(pod), "err", err) } } @@ -2981,8 +2983,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { // HandlePodReconcile is the callback in the SyncHandler interface for pods // that should be reconciled. Pods are reconciled when only the status of the // pod is updated in the API. -func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { +func (kl *Kubelet) HandlePodReconcile(ctx context.Context, pods []*v1.Pod) { start := kl.clock.Now() + logger := klog.FromContext(ctx) retryPendingResizes := false hasPendingResizes := kl.allocationManager.HasPendingResizes() for _, pod := range pods { @@ -2994,7 +2997,7 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { if pod == nil { - klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) continue } // Static pods should be reconciled the same way as regular pods @@ -3061,19 +3064,20 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // HandlePodSyncs is the callback in the syncHandler interface for pods // that should be dispatched to pod workers for sync. -func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { +func (kl *Kubelet) HandlePodSyncs(ctx context.Context, pods []*v1.Pod) { start := kl.clock.Now() + logger := klog.FromContext(ctx) for _, pod := range pods { pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { if pod == nil { - klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + logger.V(2).Info("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) continue } // Syncing a mirror pod is a programmer error since the intent of sync is to // batch notify all pending work. We should make it impossible to double sync, // but for now log a programmer error to prevent accidental introduction. - klog.V(3).InfoS("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID) + logger.V(3).Info("Programmer error, HandlePodSyncs does not expect to receive mirror pods", "podUID", pod.UID, "mirrorPodUID", mirrorPod.UID) continue } kl.podWorkers.UpdatePod(UpdatePodOptions{ @@ -3200,7 +3204,7 @@ func (pp *kubeletPodsProvider) GetPodByName(namespace, name string) (*v1.Pod, bo func (kl *Kubelet) ListenAndServePodResources(ctx context.Context) { endpoint, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket) if err != nil { - klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err) + klog.FromContext(ctx).V(2).Info("Failed to get local endpoint for PodResources endpoint", "err", err) return } @@ -3319,6 +3323,7 @@ func (kl *Kubelet) UnprepareDynamicResources(ctx context.Context, pod *v1.Pod) e // Ensure Mirror Pod for Static Pod exists and matches the current pod definition. // The function logs and ignores any errors. func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirrorPod *v1.Pod) { + logger := klog.FromContext(ctx) if !kubetypes.IsStaticPod(staticPod) { return } @@ -3327,26 +3332,26 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) { // The mirror pod is semantically different from the static pod. Remove // it. The mirror pod will get recreated later. - klog.InfoS("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.UID) + logger.Info("Trying to delete pod", "pod", klog.KObj(mirrorPod), "podUID", mirrorPod.UID) podFullName := kubecontainer.GetPodFullName(staticPod) if ok, err := kl.mirrorPodClient.DeleteMirrorPod(ctx, podFullName, &mirrorPod.UID); err != nil { - klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod)) + logger.Error(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod)) } else if ok { deleted = ok - klog.InfoS("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod)) + logger.Info("Deleted mirror pod as it didn't match the static Pod", "pod", klog.KObj(mirrorPod)) } } } if mirrorPod == nil || deleted { node, err := kl.GetNode(ctx) if err != nil { - klog.ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName))) + logger.Error(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName))) } else if node.DeletionTimestamp != nil { - klog.InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) + logger.Info("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) } else { - klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod)) + logger.Info("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod)) if err := kl.mirrorPodClient.CreateMirrorPod(ctx, staticPod); err != nil { - klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod)) + logger.Error(err, "Failed creating a mirror pod", "pod", klog.KObj(staticPod)) } } } @@ -3354,16 +3359,17 @@ func (kl *Kubelet) tryReconcileMirrorPods(ctx context.Context, staticPod, mirror // Ensure Mirror Pod for Static Pod exists as soon as node is registered. func (kl *Kubelet) fastStaticPodsRegistration(ctx context.Context) { + logger := klog.FromContext(ctx) if err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) { _, err := kl.GetNode(ctx) if err == nil { return true, nil } - klog.V(4).ErrorS(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName))) + logger.Error(err, "Unable to register mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName))) return false, nil }); err != nil { - klog.V(4).ErrorS(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName))) + logger.Error(err, "Failed to wait until node is registered", "node", klog.KRef("", string(kl.nodeName))) } staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap() diff --git a/pkg/kubelet/kubelet_network.go b/pkg/kubelet/kubelet_network.go index 45b208f960a..b6f0aadaa2d 100644 --- a/pkg/kubelet/kubelet_network.go +++ b/pkg/kubelet/kubelet_network.go @@ -28,6 +28,7 @@ import ( // updatePodCIDR updates the pod CIDR in the runtime state if it is different // from the current CIDR. Return true if pod CIDR is actually changed. func (kl *Kubelet) updatePodCIDR(ctx context.Context, cidr string) (bool, error) { + logger := klog.FromContext(ctx) kl.updatePodCIDRMux.Lock() defer kl.updatePodCIDRMux.Unlock() @@ -44,7 +45,7 @@ func (kl *Kubelet) updatePodCIDR(ctx context.Context, cidr string) (bool, error) // But it is better to be on the safe side to still return true here. return true, fmt.Errorf("failed to update pod CIDR: %v", err) } - klog.InfoS("Updating Pod CIDR", "originalPodCIDR", podCIDR, "newPodCIDR", cidr) + logger.Info("Updating Pod CIDR", "originalPodCIDR", podCIDR, "newPodCIDR", cidr) kl.runtimeState.setPodCIDR(cidr) return true, nil } @@ -52,8 +53,6 @@ func (kl *Kubelet) updatePodCIDR(ctx context.Context, cidr string) (bool, error) // GetPodDNS returns DNS settings for the pod. // This function is defined in kubecontainer.RuntimeHelper interface so we // have to implement it. -func (kl *Kubelet) GetPodDNS(pod *v1.Pod) (*runtimeapi.DNSConfig, error) { - // Use context.TODO() because we currently do not have a proper context to pass in. - // Replace this with an appropriate context when refactoring this function to accept a context parameter. - return kl.dnsConfigurer.GetPodDNS(context.TODO(), pod) +func (kl *Kubelet) GetPodDNS(ctx context.Context, pod *v1.Pod) (*runtimeapi.DNSConfig, error) { + return kl.dnsConfigurer.GetPodDNS(ctx, pod) } diff --git a/pkg/kubelet/kubelet_network_linux.go b/pkg/kubelet/kubelet_network_linux.go index b454a3e58f5..b330cce559d 100644 --- a/pkg/kubelet/kubelet_network_linux.go +++ b/pkg/kubelet/kubelet_network_linux.go @@ -35,7 +35,7 @@ const ( KubeFirewallChain utiliptables.Chain = "KUBE-FIREWALL" ) -func (kl *Kubelet) initNetworkUtil() { +func (kl *Kubelet) initNetworkUtil(logger klog.Logger) { iptClients := utiliptables.NewBestEffort() if len(iptClients) == 0 { // We don't log this as an error because kubelet itself doesn't need any @@ -43,33 +43,33 @@ func (kl *Kubelet) initNetworkUtil() { // and because we *expect* this to fail on hosts where only nftables is // supported (in which case there can't be any other components using // iptables that would need these rules anyway). - klog.InfoS("No iptables support on this system; not creating the KUBE-IPTABLES-HINT chain") + logger.Info("No iptables support on this system; not creating the KUBE-IPTABLES-HINT chain") return } for family := range iptClients { iptClient := iptClients[family] - if kl.syncIPTablesRules(iptClient) { - klog.InfoS("Initialized iptables rules.", "protocol", iptClient.Protocol()) + if kl.syncIPTablesRules(logger, iptClient) { + logger.Info("Initialized iptables rules.", "protocol", iptClient.Protocol()) go iptClient.Monitor( utiliptables.Chain("KUBE-KUBELET-CANARY"), []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter}, - func() { kl.syncIPTablesRules(iptClient) }, + func() { kl.syncIPTablesRules(logger, iptClient) }, 1*time.Minute, wait.NeverStop, ) } else { - klog.InfoS("Failed to initialize iptables rules; some functionality may be missing.", "protocol", iptClient.Protocol()) + logger.Info("Failed to initialize iptables rules; some functionality may be missing.", "protocol", iptClient.Protocol()) } } } // syncIPTablesRules ensures the KUBE-IPTABLES-HINT chain exists, and the martian packet // protection rule is installed. -func (kl *Kubelet) syncIPTablesRules(iptClient utiliptables.Interface) bool { +func (kl *Kubelet) syncIPTablesRules(logger klog.Logger, iptClient utiliptables.Interface) bool { // Create hint chain so other components can see whether we are using iptables-legacy // or iptables-nft. if _, err := iptClient.EnsureChain(utiliptables.TableMangle, KubeIPTablesHintChain); err != nil { - klog.ErrorS(err, "Failed to ensure that iptables hint chain exists") + logger.Error(err, "Failed to ensure that iptables hint chain exists") return false } @@ -83,16 +83,16 @@ func (kl *Kubelet) syncIPTablesRules(iptClient utiliptables.Interface) bool { // created by kube-proxy. if _, err := iptClient.EnsureChain(utiliptables.TableFilter, KubeFirewallChain); err != nil { - klog.ErrorS(err, "Failed to ensure that filter table KUBE-FIREWALL chain exists") + logger.Error(err, "Failed to ensure that filter table KUBE-FIREWALL chain exists") return false } if _, err := iptClient.EnsureRule(utiliptables.Prepend, utiliptables.TableFilter, utiliptables.ChainOutput, "-j", string(KubeFirewallChain)); err != nil { - klog.ErrorS(err, "Failed to ensure that OUTPUT chain jumps to KUBE-FIREWALL") + logger.Error(err, "Failed to ensure that OUTPUT chain jumps to KUBE-FIREWALL") return false } if _, err := iptClient.EnsureRule(utiliptables.Prepend, utiliptables.TableFilter, utiliptables.ChainInput, "-j", string(KubeFirewallChain)); err != nil { - klog.ErrorS(err, "Failed to ensure that INPUT chain jumps to KUBE-FIREWALL") + logger.Error(err, "Failed to ensure that INPUT chain jumps to KUBE-FIREWALL") return false } @@ -109,7 +109,7 @@ func (kl *Kubelet) syncIPTablesRules(iptClient utiliptables.Interface) bool { "-m", "conntrack", "!", "--ctstate", "RELATED,ESTABLISHED,DNAT", "-j", "DROP"); err != nil { - klog.ErrorS(err, "Failed to ensure rule to drop invalid localhost packets in filter table KUBE-FIREWALL chain") + logger.Error(err, "Failed to ensure rule to drop invalid localhost packets in filter table KUBE-FIREWALL chain") return false } } diff --git a/pkg/kubelet/kubelet_network_others.go b/pkg/kubelet/kubelet_network_others.go index bbca343daef..0901debe844 100644 --- a/pkg/kubelet/kubelet_network_others.go +++ b/pkg/kubelet/kubelet_network_others.go @@ -18,5 +18,7 @@ limitations under the License. package kubelet +import "k8s.io/klog/v2" + // Do nothing. -func (kl *Kubelet) initNetworkUtil() {} +func (kl *Kubelet) initNetworkUtil(logger klog.Logger) {} diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index 9e4fb47f0b8..119270572ba 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -49,6 +49,7 @@ import ( featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/klog/v2" "k8s.io/kubelet/pkg/cri/streaming/portforward" "k8s.io/kubelet/pkg/cri/streaming/remotecommand" _ "k8s.io/kubernetes/pkg/apis/core/install" @@ -7475,7 +7476,7 @@ func TestKubelet_HandlePodCleanups(t *testing.T) { if !ok { t.Fatalf("unable to reject pod by UID %v", reject.uid) } - kl.rejectPod(pod, reject.reason, reject.message) + kl.rejectPod(tCtx, pod, reject.reason, reject.message) } if err := kl.HandlePodCleanups(tCtx); (err != nil) != tt.wantErr { @@ -7516,7 +7517,8 @@ func testMetric(t *testing.T, metricName string, expectedMetric string) { } func TestGetNonExistentImagePullSecret(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + _, tCtx := ktesting.NewTestContext(t) + logger := klog.FromContext(tCtx) secrets := make([]*v1.Secret, 0) fakeRecorder := record.NewFakeRecorder(1) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 57f19e00d0c..38890c487c7 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -285,7 +285,7 @@ func newTestKubeletWithImageList( }, } kubelet.recorder = fakeRecorder - if err := kubelet.setupDataDirs(); err != nil { + if err := kubelet.setupDataDirs(logger); err != nil { t.Fatalf("can't initialize kubelet data dirs: %v", err) } kubelet.daemonEndpoints = &v1.NodeDaemonEndpoints{} @@ -335,7 +335,7 @@ func newTestKubeletWithImageList( kubelet.containerManager.GetNodeConfig(), kubelet.containerManager.GetNodeAllocatableAbsolute(), kubelet.statusManager, - func(pod *v1.Pod) { kubelet.HandlePodSyncs([]*v1.Pod{pod}) }, + func(pod *v1.Pod) { kubelet.HandlePodSyncs(tCtx, []*v1.Pod{pod}) }, kubelet.GetActivePods, kubelet.podManager.GetPodByUID, config.NewSourcesReady(func(_ sets.Set[string]) bool { return enableResizing }), @@ -504,6 +504,7 @@ func TestSyncLoopAbort(t *testing.T) { } func TestSyncPodsStartPod(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -516,7 +517,7 @@ func TestSyncPodsStartPod(t *testing.T) { }), } kubelet.podManager.SetPods(pods) - kubelet.HandlePodSyncs(pods) + kubelet.HandlePodSyncs(ctx, pods) fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)}) } @@ -727,7 +728,7 @@ func TestDispatchWorkOfActivePod(t *testing.T) { } func TestHandlePodCleanups(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -746,7 +747,7 @@ func TestHandlePodCleanups(t *testing.T) { } kubelet := testKubelet.kubelet - kubelet.HandlePodCleanups(ctx) + require.NoError(t, kubelet.HandlePodCleanups(tCtx)) // assert that unwanted pods were queued to kill if actual, expected := kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion, []types.UID{"12345678"}; !reflect.DeepEqual(actual, expected) { @@ -756,6 +757,7 @@ func TestHandlePodCleanups(t *testing.T) { } func TestVolumeAttachLimitExceededCleanup(t *testing.T) { + ctx := ktesting.Init(t) const podCount = 500 tk := newTestKubelet(t, true /* controller-attach-detach enabled */) defer tk.Cleanup() @@ -784,9 +786,7 @@ func TestVolumeAttachLimitExceededCleanup(t *testing.T) { pods, _ := newTestPodsWithResources(podCount) kl.podManager.SetPods(pods) - kl.HandlePodSyncs(pods) - - ctx := context.Background() + kl.HandlePodSyncs(ctx, pods) // all pods must reach a terminal, Failed state due to VolumeAttachmentLimitExceeded. if err := wait.PollUntilContextTimeout( @@ -858,6 +858,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } + ctx := ktesting.Init(t) ready := false @@ -884,7 +885,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) { kubelet := testKubelet.kubelet kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.Set[string]) bool { return ready }) - kubelet.HandlePodRemoves(pods) + kubelet.HandlePodRemoves(ctx, pods) time.Sleep(2 * time.Second) // Sources are not ready yet. Don't remove any pods. @@ -893,7 +894,7 @@ func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) { } ready = true - kubelet.HandlePodRemoves(pods) + kubelet.HandlePodRemoves(ctx, pods) time.Sleep(2 * time.Second) // Sources are ready. Remove unwanted pods. @@ -928,6 +929,7 @@ func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) { // Tests that we handle port conflicts correctly by setting the failed status in status map. func TestHandlePortConflicts(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -969,7 +971,7 @@ func TestHandlePortConflicts(t *testing.T) { pods[1].UID: true, } - kl.HandlePodAdditions(pods) + kl.HandlePodAdditions(ctx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -978,6 +980,7 @@ func TestHandlePortConflicts(t *testing.T) { // Tests that we handle host name conflicts correctly by setting the failed status in status map. func TestHandleHostNameConflicts(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1012,7 +1015,7 @@ func TestHandleHostNameConflicts(t *testing.T) { notfittingPod := pods[0] fittingPod := pods[1] - kl.HandlePodAdditions(pods) + kl.HandlePodAdditions(ctx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -1021,6 +1024,7 @@ func TestHandleHostNameConflicts(t *testing.T) { // Tests that we handle not matching labels selector correctly by setting the failed status in status map. func TestHandleNodeSelector(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1054,7 +1058,7 @@ func TestHandleNodeSelector(t *testing.T) { notfittingPod := pods[0] fittingPod := pods[1] - kl.HandlePodAdditions(pods) + kl.HandlePodAdditions(ctx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -1091,6 +1095,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1118,7 +1123,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) { pod := podWithUIDNameNsSpec("123456789", "podA", "foo", v1.PodSpec{NodeSelector: test.podSelector}) - kl.HandlePodAdditions([]*v1.Pod{pod}) + kl.HandlePodAdditions(ctx, []*v1.Pod{pod}) // Check pod status stored in the status map. checkPodStatus(t, kl, pod, test.podStatus) @@ -1128,6 +1133,7 @@ func TestHandleNodeSelectorBasedOnOS(t *testing.T) { // Tests that we handle exceeded resources correctly by setting the failed status in status map. func TestHandleMemExceeded(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1173,7 +1179,7 @@ func TestHandleMemExceeded(t *testing.T) { pods[1].UID: true, } - kl.HandlePodAdditions(pods) + kl.HandlePodAdditions(ctx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, notfittingPod, v1.PodFailed) @@ -1183,6 +1189,7 @@ func TestHandleMemExceeded(t *testing.T) { // Tests that we handle result of interface UpdatePluginResources correctly // by setting corresponding status in status map. func TestHandlePluginResources(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, false /*enableResizing*/) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1304,7 +1311,7 @@ func TestHandlePluginResources(t *testing.T) { missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec) failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec) - kl.HandlePodAdditions([]*v1.Pod{fittingPod, emptyPod, missingPod, failedPod}) + kl.HandlePodAdditions(ctx, []*v1.Pod{fittingPod, emptyPod, missingPod, failedPod}) // Check pod status stored in the status map. checkPodStatus(t, kl, fittingPod, v1.PodPending) @@ -1315,7 +1322,7 @@ func TestHandlePluginResources(t *testing.T) { // TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal. func TestPurgingObsoleteStatusMapEntries(t *testing.T) { - ctx := context.Background() + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -1326,7 +1333,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { } podToTest := pods[1] // Run once to populate the status map. - kl.HandlePodAdditions(pods) + kl.HandlePodAdditions(ctx, pods) if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found { t.Fatalf("expected to have status cached for pod2") } @@ -1504,6 +1511,7 @@ func TestCreateMirrorPod(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -1513,7 +1521,7 @@ func TestCreateMirrorPod(t *testing.T) { pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" pods := []*v1.Pod{pod} kl.podManager.SetPods(pods) - isTerminal, err := kl.SyncPod(context.Background(), tt.updateType, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kl.SyncPod(tCtx, tt.updateType, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err) if isTerminal { t.Fatalf("pod should not be terminal: %#v", pod) @@ -1526,6 +1534,7 @@ func TestCreateMirrorPod(t *testing.T) { } func TestDeleteOutdatedMirrorPod(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -1549,7 +1558,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { pods := []*v1.Pod{pod, mirrorPod} kl.podManager.SetPods(pods) - isTerminal, err := kl.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) + isTerminal, err := kl.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{}) assert.NoError(t, err) if isTerminal { t.Fatalf("pod should not be terminal: %#v", pod) @@ -1562,7 +1571,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { } func TestDeleteOrphanedMirrorPods(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() @@ -1612,7 +1621,7 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) { } // Sync with an empty pod list to delete all mirror pods. - kl.HandlePodCleanups(ctx) + require.NoError(t, kl.HandlePodCleanups(tCtx)) assert.Empty(t, manager.GetPods(), "Expected no mirror pods") for i, pod := range orphanPods { name := kubecontainer.GetPodFullName(pod) @@ -1631,6 +1640,7 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) { } func TestNetworkErrorsWithoutHostNetwork(t *testing.T) { + tCtx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -1646,7 +1656,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) { }) kubelet.podManager.SetPods([]*v1.Pod{pod}) - isTerminal, err := kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err := kubelet.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error") if isTerminal { t.Fatalf("pod should not be terminal: %#v", pod) @@ -1654,7 +1664,7 @@ func TestNetworkErrorsWithoutHostNetwork(t *testing.T) { pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource pod.Spec.HostNetwork = true - isTerminal, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) + isTerminal, err = kubelet.SyncPod(tCtx, kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{}) assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error") if isTerminal { t.Fatalf("pod should not be terminal: %#v", pod) @@ -1787,13 +1797,13 @@ func TestCheckpointContainer(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) options := &runtimeapi.CheckpointContainerRequest{} if test.checkpointLocation != "" { options.Location = test.checkpointLocation } status := kubelet.CheckpointContainer( - ctx, + tCtx, fakePod.Pod.ID, fmt.Sprintf( "%s_%s", @@ -1827,7 +1837,7 @@ func TestCheckpointContainer(t *testing.T) { } func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() fakeRuntime := testKubelet.fakeRuntime @@ -1868,7 +1878,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } // Let the pod worker sets the status to fail after this sync. - kubelet.HandlePodUpdates(logger, pods) + kubelet.HandlePodUpdates(ctx, pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.Equal(t, v1.PodFailed, status.Phase) @@ -1877,7 +1887,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() fakeRuntime := testKubelet.fakeRuntime @@ -1919,7 +1929,7 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { } kubelet.podManager.SetPods(pods) - kubelet.HandlePodUpdates(logger, pods) + kubelet.HandlePodUpdates(ctx, pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.NotEqual(t, v1.PodFailed, status.Phase) @@ -1943,7 +1953,7 @@ func podWithUIDNameNsSpec(uid types.UID, name, namespace string, spec v1.PodSpec } func TestDeletePodDirsForDeletedPods(t *testing.T) { - ctx := context.Background() + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -1954,7 +1964,7 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) { kl.podManager.SetPods(pods) // Sync to create pod directories. - kl.HandlePodSyncs(kl.podManager.GetPods()) + kl.HandlePodSyncs(ctx, kl.podManager.GetPods()) for i := range pods { assert.True(t, dirExists(kl.getPodDir(pods[i].UID)), "Expected directory to exist for pod %d", i) } @@ -1967,12 +1977,12 @@ func TestDeletePodDirsForDeletedPods(t *testing.T) { } func syncAndVerifyPodDir(t *testing.T, testKubelet *TestKubelet, pods []*v1.Pod, podsToCheck []*v1.Pod, shouldExist bool) { - ctx := context.Background() + ctx := ktesting.Init(t) t.Helper() kl := testKubelet.kubelet kl.podManager.SetPods(pods) - kl.HandlePodSyncs(pods) + kl.HandlePodSyncs(ctx, pods) kl.HandlePodCleanups(ctx) for i, pod := range podsToCheck { exist := dirExists(kl.getPodDir(pod.UID)) @@ -2706,6 +2716,7 @@ func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecyc // Test verifies that the kubelet invokes an admission handler during HandlePodAdditions. func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) { + ctx := ktesting.Init(t) testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kl := testKubelet.kubelet @@ -2742,7 +2753,7 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) { kl.allocationManager.AddPodAdmitHandlers(lifecycle.PodAdmitHandlers{&testPodAdmitHandler{podsToReject: podsToReject}}) - kl.HandlePodAdditions(pods) + kl.HandlePodAdditions(ctx, pods) // Check pod status stored in the status map. checkPodStatus(t, kl, podToReject, v1.PodFailed) @@ -2945,6 +2956,7 @@ func TestPodResourceAllocationReset(t *testing.T) { } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ctx := ktesting.Init(t) if tc.existingPodAllocation != nil { // when kubelet restarts, AllocatedResources has already existed before adding pod err := kubelet.allocationManager.SetAllocatedResources(tc.existingPodAllocation) @@ -2952,7 +2964,7 @@ func TestPodResourceAllocationReset(t *testing.T) { t.Fatalf("failed to set pod allocation: %v", err) } } - kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) + kubelet.HandlePodAdditions(ctx, []*v1.Pod{tc.pod}) allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(tc.pod.UID, tc.pod.Spec.Containers[0].Name) if !found { @@ -3259,7 +3271,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s } func createRemoteRuntimeService(endpoint string, t *testing.T, tp oteltrace.TracerProvider) internalapi.RuntimeService { - logger := klog.Background() + logger, _ := ktesting.NewTestContext(t) runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, tp, &logger) require.NoError(t, err) return runtimeService @@ -3355,7 +3367,8 @@ func TestNewMainKubeletStandAlone(t *testing.T) { assert.NotNil(t, testMainKubelet, "testMainKubelet should not be nil") testMainKubelet.BirthCry() - testMainKubelet.StartGarbageCollection() + ctx := ktesting.Init(t) + testMainKubelet.StartGarbageCollection(ctx) // Nil pointer panic can be reproduced if configmap manager is not nil. // See https://github.com/kubernetes/kubernetes/issues/113492 // pod := &v1.Pod{ @@ -3422,7 +3435,7 @@ func TestSyncPodSpans(t *testing.T) { fakeRuntime.ImageService.SetFakeImageSize(100) fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"}) - logger := klog.Background() + logger := klog.FromContext(tCtx) imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp, &logger) assert.NoError(t, err) @@ -3939,7 +3952,7 @@ func TestSyncPodWithErrorsDuringInPlacePodResize(t *testing.T) { } func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + ctx := ktesting.Init(t) metrics.Register() metrics.ContainerRequestedResizes.Reset() @@ -4521,7 +4534,7 @@ func TestHandlePodUpdates_RecordContainerRequestedResizes(t *testing.T) { kubelet.podManager.AddPod(initialPod) require.NoError(t, kubelet.allocationManager.SetAllocatedResources(initialPod)) - kubelet.HandlePodUpdates(logger, []*v1.Pod{updatedPod}) + kubelet.HandlePodUpdates(ctx, []*v1.Pod{updatedPod}) tc.updateExpectedFunc(&expectedMetrics) @@ -4584,6 +4597,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { t.Skip("InPlacePodVerticalScaling is not currently supported for Windows") } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true) + ctx := ktesting.Init(t) testKubelet := newTestKubeletExcludeAdmitHandlers(t, false /* controllerAttachDetachEnabled */, true /*enableResizing*/) defer testKubelet.Cleanup() @@ -4707,7 +4721,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { kubelet.allocationManager.PushPendingResize(pendingResizeDesired.UID) kubelet.statusManager.ClearPodResizePendingCondition(pendingResizeDesired.UID) - kubelet.HandlePodReconcile([]*v1.Pod{tc.newPod}) + kubelet.HandlePodReconcile(ctx, []*v1.Pod{tc.newPod}) require.Equal(t, tc.shouldRetryPendingResize, kubelet.statusManager.IsPodResizeDeferred(pendingResizeDesired.UID)) kubelet.allocationManager.RemovePod(pendingResizeDesired.UID) @@ -4718,7 +4732,7 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { } func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) { - logger, _ := ktesting.NewTestContext(t) + ctx := ktesting.Init(t) cpu1000m := resource.MustParse("1") mem1000M := resource.MustParse("1Gi") cpu2000m := resource.MustParse("2") @@ -4840,6 +4854,7 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + logger := klog.FromContext(ctx) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeDeclaredFeatures, tc.featureGateEnabled) testKubelet := newTestKubelet(t, false) @@ -4861,8 +4876,8 @@ func TestSyncPodNodeDeclaredFeaturesUpdate(t *testing.T) { kubelet.podManager.SetPods([]*v1.Pod{tc.oldPod}) } - kubelet.statusManager.SetPodStatus(klog.TODO(), tc.newPod, v1.PodStatus{Phase: v1.PodRunning}) - kubelet.HandlePodUpdates(logger, []*v1.Pod{tc.newPod}) + kubelet.statusManager.SetPodStatus(logger, tc.newPod, v1.PodStatus{Phase: v1.PodRunning}) + kubelet.HandlePodUpdates(ctx, []*v1.Pod{tc.newPod}) if tc.expectEvent { select { case event := <-recorder.Events: diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go index 4713ba8157c..0f569ebfeba 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go @@ -92,7 +92,7 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(ctx context.Context Annotations: newPodAnnotations(pod), } - dnsConfig, err := m.runtimeHelper.GetPodDNS(pod) + dnsConfig, err := m.runtimeHelper.GetPodDNS(ctx, pod) if err != nil { return nil, err }