kubelet: migrate module logs to contextual logging

Signed-off-by: Omer Aplatony <omerap12@gmail.com>
Co-authored-by: Ed Bartosh <eduard.bartosh@intel.com>
This commit is contained in:
Omer Aplatony 2025-02-19 18:31:05 +00:00 committed by Ed Bartosh
parent 9c1cf79d74
commit d75d4860e7
8 changed files with 38 additions and 30 deletions

View file

@ -235,6 +235,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*
contextual k8s.io/kubernetes/pkg/kubelet/logs/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View file

@ -249,6 +249,7 @@ linters:
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*
contextual k8s.io/kubernetes/pkg/kubelet/logs/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View file

@ -81,6 +81,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/pod/.*
contextual k8s.io/kubernetes/pkg/kubelet/preemption/.*
contextual k8s.io/kubernetes/pkg/kubelet/volumemanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*
contextual k8s.io/kubernetes/pkg/kubelet/logs/.*
# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift

View file

@ -1715,7 +1715,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.
kl.containerLogManager.Start()
kl.containerLogManager.Start(ctx)
// Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for DRA Plugin and Device Plugin

View file

@ -54,7 +54,7 @@ const (
// Implementation is thread-safe.
type ContainerLogManager interface {
// Start container log manager.
Start()
Start(ctx context.Context)
// Clean removes all logs of specified container.
Clean(ctx context.Context, containerID string) error
}
@ -141,9 +141,9 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterfa
}
// Start the container log manager.
func (c *containerLogManager) Start() {
ctx := context.Background()
klog.InfoS("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod)
func (c *containerLogManager) Start(ctx context.Context) {
logger := klog.FromContext(ctx)
logger.Info("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod)
for i := 0; i < c.maxWorkers; i++ {
worker := i + 1
go c.processQueueItems(ctx, worker)
@ -151,7 +151,7 @@ func (c *containerLogManager) Start() {
// Start a goroutine periodically does container log rotation.
go wait.Forever(func() {
if err := c.rotateLogs(ctx); err != nil {
klog.ErrorS(err, "Failed to rotate container logs")
logger.Error(err, "Failed to rotate container logs")
}
}, c.monitoringPeriod.Duration)
}
@ -183,16 +183,19 @@ func (c *containerLogManager) Clean(ctx context.Context, containerID string) err
}
func (c *containerLogManager) processQueueItems(ctx context.Context, worker int) {
klog.V(4).InfoS("Starting container log rotation worker", "workerID", worker)
logger := klog.FromContext(ctx)
logger.V(4).Info("Starting container log rotation worker", "workerID", worker)
for c.processContainer(ctx, worker) {
}
klog.V(4).InfoS("Terminating container log rotation worker", "workerID", worker)
logger.V(4).Info("Terminating container log rotation worker", "workerID", worker)
}
func (c *containerLogManager) rotateLogs(ctx context.Context) error {
logger := klog.FromContext(ctx)
c.mutex.Lock()
defer c.mutex.Unlock()
klog.V(4).InfoS("Starting container log rotation sequence")
logger.V(4).Info("Starting container log rotation sequence")
// TODO(#59998): Use kubelet pod cache.
containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
if err != nil {
return fmt.Errorf("failed to list containers: %w", err)
@ -204,8 +207,8 @@ func (c *containerLogManager) rotateLogs(ctx context.Context) error {
continue
}
// Doing this to avoid additional overhead with logging of label like arguments that can prove costly
if v := klog.V(4); v.Enabled() {
klog.V(4).InfoS("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels())
if v := logger.V(4); v.Enabled() {
logger.V(4).Info("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels())
}
c.queue.Add(container.GetId())
}
@ -224,14 +227,14 @@ func (c *containerLogManager) processContainer(ctx context.Context, worker int)
// Always default the return to true to keep the processing of Queue ongoing
ok = true
id := key
logger := klog.FromContext(ctx)
resp, err := c.runtimeService.ContainerStatus(ctx, id, false)
if err != nil {
klog.ErrorS(err, "Failed to get container status", "worker", worker, "containerID", id)
logger.Error(err, "Failed to get container status", "worker", worker, "containerID", id)
return
}
if resp.GetStatus() == nil {
klog.ErrorS(err, "Container status is nil", "worker", worker, "containerID", id)
logger.Error(err, "Container status is nil", "worker", worker, "containerID", id)
return
}
path := resp.GetStatus().GetLogPath()
@ -239,28 +242,28 @@ func (c *containerLogManager) processContainer(ctx context.Context, worker int)
if err != nil {
if !os.IsNotExist(err) {
klog.ErrorS(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path)
logger.Error(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path)
return
}
if err = c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path)
logger.Error(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path)
return
}
info, err = c.osInterface.Stat(path)
if err != nil {
klog.ErrorS(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path)
logger.Error(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path)
return
}
}
if info.Size() < c.policy.MaxSize {
klog.V(7).InfoS("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
logger.V(7).Info("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
return
}
if err := c.rotateLog(ctx, id, path); err != nil {
klog.ErrorS(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
logger.Error(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
return
}
return
@ -411,6 +414,7 @@ func (c *containerLogManager) compressLog(log string) error {
// rotateLatestLog rotates latest log without compression, so that container can still write
// and fluentd can finish reading.
func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log string) error {
logger := klog.FromContext(ctx)
timestamp := c.clock.Now().Format(timestampFormat)
rotated := fmt.Sprintf("%s.%s", log, timestamp)
if err := c.osInterface.Rename(log, rotated); err != nil {
@ -424,7 +428,7 @@ func (c *containerLogManager) rotateLatestLog(ctx context.Context, id, log strin
// This shouldn't happen.
// Report an error if this happens, because we will lose original
// log.
klog.ErrorS(renameErr, "Failed to rename rotated log", "rotatedLog", rotated, "newLog", log, "containerID", id)
logger.Error(renameErr, "Failed to rename rotated log", "rotatedLog", rotated, "newLog", log, "containerID", id)
}
return fmt.Errorf("failed to reopen container log %q: %w", id, err)
}

View file

@ -20,7 +20,7 @@ import "context"
type containerLogManagerStub struct{}
func (*containerLogManagerStub) Start() {}
func (*containerLogManagerStub) Start(ctx context.Context) {}
func (*containerLogManagerStub) Clean(ctx context.Context, containerID string) error {
return nil

View file

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/test/utils/ktesting"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
critest "k8s.io/cri-api/pkg/apis/testing"
@ -77,7 +78,7 @@ func TestGetAllLogs(t *testing.T) {
}
func TestRotateLogs(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
dir, err := os.MkdirTemp("", "test-rotate-logs")
require.NoError(t, err)
defer os.RemoveAll(dir)
@ -160,12 +161,12 @@ func TestRotateLogs(t *testing.T) {
f.SetFakeContainers(testContainers)
// Push the items into the queue for before starting the worker to avoid issue with the queue being empty.
require.NoError(t, c.rotateLogs(ctx))
require.NoError(t, c.rotateLogs(tCtx))
// Start a routine that can monitor the queue and shutdown the queue to trigger the retrun from the processQueueItems
// Keeping the monitor duration smaller in order to keep the unwanted delay in the test to a minimal.
go func() {
pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
pollTimeoutCtx, cancel := context.WithTimeout(tCtx, 10*time.Second)
defer cancel()
err = wait.PollUntilContextCancel(pollTimeoutCtx, 5*time.Millisecond, false, func(ctx context.Context) (done bool, err error) {
return c.queue.Len() == 0, nil
@ -176,7 +177,7 @@ func TestRotateLogs(t *testing.T) {
c.queue.ShutDown()
}()
// This is a blocking call. But the above routine takes care of ensuring that this is terminated once the queue is shutdown
c.processQueueItems(ctx, 1)
c.processQueueItems(tCtx, 1)
timestamp := now.Format(timestampFormat)
logs, err := os.ReadDir(dir)
@ -190,7 +191,7 @@ func TestRotateLogs(t *testing.T) {
}
func TestClean(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
dir, err := os.MkdirTemp("", "test-clean")
require.NoError(t, err)
defer os.RemoveAll(dir)
@ -256,7 +257,7 @@ func TestClean(t *testing.T) {
}
f.SetFakeContainers(testContainers)
err = c.Clean(ctx, "container-3")
err = c.Clean(tCtx, "container-3")
require.NoError(t, err)
logs, err := os.ReadDir(dir)
@ -387,7 +388,7 @@ func TestCompressLog(t *testing.T) {
}
func TestRotateLatestLog(t *testing.T) {
ctx := context.Background()
tCtx := ktesting.Init(t)
dir, err := os.MkdirTemp("", "test-rotate-latest-log")
require.NoError(t, err)
defer os.RemoveAll(dir)
@ -438,7 +439,7 @@ func TestRotateLatestLog(t *testing.T) {
defer testFile.Close()
testLog := testFile.Name()
rotatedLog := fmt.Sprintf("%s.%s", testLog, now.Format(timestampFormat))
err = c.rotateLatestLog(ctx, "test-id", testLog)
err = c.rotateLatestLog(tCtx, "test-id", testLog)
assert.Equal(t, test.expectError, err != nil)
_, err = os.Stat(testLog)
assert.Equal(t, test.expectOriginal, err == nil)

View file

@ -275,7 +275,7 @@ func TestImmutableSecretStopsTheReflector(t *testing.T) {
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
fakeClock := testingclock.NewFakeClock(time.Now())
store := newSecretCache(fakeClient, fakeClock, time.Minute)
store := newSecretCache(tCtx, fakeClient, fakeClock, time.Minute)
key := objectKey{namespace: "ns", name: "name"}
itemExists := func(_ context.Context) (bool, error) {