diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index ac703ae327e..7e397d61a32 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -235,6 +235,7 @@ linters: contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* contextual k8s.io/kubernetes/pkg/kubelet/util/.* + contextual k8s.io/kubernetes/pkg/kubelet/logs/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/golangci.yaml b/hack/golangci.yaml index 70acdd9aaa7..3b3670afbea 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -249,6 +249,7 @@ linters: contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* contextual k8s.io/kubernetes/pkg/kubelet/util/.* + contextual k8s.io/kubernetes/pkg/kubelet/logs/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 33382f5fa60..d45b4d47cd6 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -81,6 +81,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/pod/.* contextual k8s.io/kubernetes/pkg/kubelet/preemption/.* contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.* contextual k8s.io/kubernetes/pkg/kubelet/util/.* +contextual k8s.io/kubernetes/pkg/kubelet/logs/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 4a642966ede..a0b60958304 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1715,7 +1715,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { // container log manager must start after container runtime is up to retrieve information from container runtime // and inform container to reopen log file after log rotation. - kl.containerLogManager.Start() + kl.containerLogManager.Start(ctx) // Adding Registration Callback function for CSI Driver kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for DRA Plugin and Device Plugin diff --git a/pkg/kubelet/logs/container_log_manager.go b/pkg/kubelet/logs/container_log_manager.go index a3e5f46d9c6..9e93fb195b7 100644 --- a/pkg/kubelet/logs/container_log_manager.go +++ b/pkg/kubelet/logs/container_log_manager.go @@ -54,7 +54,7 @@ const ( // Implementation is thread-safe. type ContainerLogManager interface { // Start container log manager. - Start() + Start(ctx context.Context) // Clean removes all logs of specified container. Clean(ctx context.Context, containerID string) error } @@ -141,9 +141,9 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterfa } // Start the container log manager. -func (c *containerLogManager) Start() { - ctx := context.Background() - klog.InfoS("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod) +func (c *containerLogManager) Start(ctx context.Context) { + logger := klog.FromContext(ctx) + logger.Info("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod) for i := 0; i < c.maxWorkers; i++ { worker := i + 1 go c.processQueueItems(ctx, worker) @@ -151,7 +151,7 @@ func (c *containerLogManager) Start() { // Start a goroutine periodically does container log rotation. go wait.Forever(func() { if err := c.rotateLogs(ctx); err != nil { - klog.ErrorS(err, "Failed to rotate container logs") + logger.Error(err, "Failed to rotate container logs") } }, c.monitoringPeriod.Duration) } @@ -183,16 +183,19 @@ func (c *containerLogManager) Clean(ctx context.Context, containerID string) err } func (c *containerLogManager) processQueueItems(ctx context.Context, worker int) { - klog.V(4).InfoS("Starting container log rotation worker", "workerID", worker) + logger := klog.FromContext(ctx) + logger.V(4).Info("Starting container log rotation worker", "workerID", worker) for c.processContainer(ctx, worker) { } - klog.V(4).InfoS("Terminating container log rotation worker", "workerID", worker) + logger.V(4).Info("Terminating container log rotation worker", "workerID", worker) } func (c *containerLogManager) rotateLogs(ctx context.Context) error { + logger := klog.FromContext(ctx) c.mutex.Lock() defer c.mutex.Unlock() - klog.V(4).InfoS("Starting container log rotation sequence") + logger.V(4).Info("Starting container log rotation sequence") + // TODO(#59998): Use kubelet pod cache. containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{}) if err != nil { return fmt.Errorf("failed to list containers: %w", err) @@ -204,8 +207,8 @@ func (c *containerLogManager) rotateLogs(ctx context.Context) error { continue } // Doing this to avoid additional overhead with logging of label like arguments that can prove costly - if v := klog.V(4); v.Enabled() { - klog.V(4).InfoS("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels()) + if v := logger.V(4); v.Enabled() { + logger.V(4).Info("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels()) } c.queue.Add(container.GetId()) } @@ -224,14 +227,14 @@ func (c *containerLogManager) processContainer(ctx context.Context, worker int) // Always default the return to true to keep the processing of Queue ongoing ok = true id := key - + logger := klog.FromContext(ctx) resp, err := c.runtimeService.ContainerStatus(ctx, id, false) if err != nil { - klog.ErrorS(err, "Failed to get container status", "worker", worker, "containerID", id) + logger.Error(err, "Failed to get container status", "worker", worker, "containerID", id) return } if resp.GetStatus() == nil { - klog.ErrorS(err, "Container status is nil", "worker", worker, "containerID", id) + logger.Error(err, "Container status is nil", "worker", worker, "containerID", id) return } path := resp.GetStatus().GetLogPath() @@ -239,28 +242,28 @@ func (c *containerLogManager) processContainer(ctx context.Context, worker int) if err != nil { if !os.IsNotExist(err) { - klog.ErrorS(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path) + logger.Error(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path) return } if err = c.runtimeService.ReopenContainerLog(ctx, id); err != nil { - klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path) + logger.Error(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path) return } info, err = c.osInterface.Stat(path) if err != nil { - klog.ErrorS(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path) + logger.Error(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path) return } } if info.Size() < c.policy.MaxSize { - klog.V(7).InfoS("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize) + logger.V(7).Info("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize) return } if err := c.rotateLog(ctx, id, path); err != nil { - klog.ErrorS(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize) + logger.Error(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize) return } return @@ -411,6 +414,7 @@ func (c *containerLogManager) compressLog(log string) error { // rotateLatestLog rotates latest log without compression, so that container can still write // and fluentd can finish reading. func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log string) error { + logger := klog.FromContext(ctx) timestamp := c.clock.Now().Format(timestampFormat) rotated := fmt.Sprintf("%s.%s", log, timestamp) if err := c.osInterface.Rename(log, rotated); err != nil { @@ -424,7 +428,7 @@ func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log strin // This shouldn't happen. // Report an error if this happens, because we will lose original // log. - klog.ErrorS(renameErr, "Failed to rename rotated log", "rotatedLog", rotated, "newLog", log, "containerID", id) + logger.Error(renameErr, "Failed to rename rotated log", "rotatedLog", rotated, "newLog", log, "containerID", id) } return fmt.Errorf("failed to reopen container log %q: %w", id, err) } diff --git a/pkg/kubelet/logs/container_log_manager_stub.go b/pkg/kubelet/logs/container_log_manager_stub.go index f0a2ef9fdf2..80d684224bc 100644 --- a/pkg/kubelet/logs/container_log_manager_stub.go +++ b/pkg/kubelet/logs/container_log_manager_stub.go @@ -20,7 +20,7 @@ import "context" type containerLogManagerStub struct{} -func (*containerLogManagerStub) Start() {} +func (*containerLogManagerStub) Start(ctx context.Context) {} func (*containerLogManagerStub) Clean(ctx context.Context, containerID string) error { return nil diff --git a/pkg/kubelet/logs/container_log_manager_test.go b/pkg/kubelet/logs/container_log_manager_test.go index 3760d1c727f..dbb69b828b0 100644 --- a/pkg/kubelet/logs/container_log_manager_test.go +++ b/pkg/kubelet/logs/container_log_manager_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/test/utils/ktesting" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" critest "k8s.io/cri-api/pkg/apis/testing" @@ -77,7 +78,7 @@ func TestGetAllLogs(t *testing.T) { } func TestRotateLogs(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) dir, err := os.MkdirTemp("", "test-rotate-logs") require.NoError(t, err) defer os.RemoveAll(dir) @@ -160,12 +161,12 @@ func TestRotateLogs(t *testing.T) { f.SetFakeContainers(testContainers) // Push the items into the queue for before starting the worker to avoid issue with the queue being empty. - require.NoError(t, c.rotateLogs(ctx)) + require.NoError(t, c.rotateLogs(tCtx)) // Start a routine that can monitor the queue and shutdown the queue to trigger the retrun from the processQueueItems // Keeping the monitor duration smaller in order to keep the unwanted delay in the test to a minimal. go func() { - pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + pollTimeoutCtx, cancel := context.WithTimeout(tCtx, 10*time.Second) defer cancel() err = wait.PollUntilContextCancel(pollTimeoutCtx, 5*time.Millisecond, false, func(ctx context.Context) (done bool, err error) { return c.queue.Len() == 0, nil @@ -176,7 +177,7 @@ func TestRotateLogs(t *testing.T) { c.queue.ShutDown() }() // This is a blocking call. But the above routine takes care of ensuring that this is terminated once the queue is shutdown - c.processQueueItems(ctx, 1) + c.processQueueItems(tCtx, 1) timestamp := now.Format(timestampFormat) logs, err := os.ReadDir(dir) @@ -190,7 +191,7 @@ func TestRotateLogs(t *testing.T) { } func TestClean(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) dir, err := os.MkdirTemp("", "test-clean") require.NoError(t, err) defer os.RemoveAll(dir) @@ -256,7 +257,7 @@ func TestClean(t *testing.T) { } f.SetFakeContainers(testContainers) - err = c.Clean(ctx, "container-3") + err = c.Clean(tCtx, "container-3") require.NoError(t, err) logs, err := os.ReadDir(dir) @@ -387,7 +388,7 @@ func TestCompressLog(t *testing.T) { } func TestRotateLatestLog(t *testing.T) { - ctx := context.Background() + tCtx := ktesting.Init(t) dir, err := os.MkdirTemp("", "test-rotate-latest-log") require.NoError(t, err) defer os.RemoveAll(dir) @@ -438,7 +439,7 @@ func TestRotateLatestLog(t *testing.T) { defer testFile.Close() testLog := testFile.Name() rotatedLog := fmt.Sprintf("%s.%s", testLog, now.Format(timestampFormat)) - err = c.rotateLatestLog(ctx, "test-id", testLog) + err = c.rotateLatestLog(tCtx, "test-id", testLog) assert.Equal(t, test.expectError, err != nil) _, err = os.Stat(testLog) assert.Equal(t, test.expectOriginal, err == nil) diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go index 9c9c6fe8818..552cfb2b160 100644 --- a/pkg/kubelet/util/manager/watch_based_manager_test.go +++ b/pkg/kubelet/util/manager/watch_based_manager_test.go @@ -275,7 +275,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) { fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) fakeClock := testingclock.NewFakeClock(time.Now()) - store := newSecretCache(fakeClient, fakeClock, time.Minute) + store := newSecretCache(tCtx, fakeClient, fakeClock, time.Minute) key := objectKey{namespace: "ns", name: "name"} itemExists := func(_ context.Context) (bool, error) {