diff --git a/hack/ginkgo-e2e.sh b/hack/ginkgo-e2e.sh index 1a4f97f18ab..21848dcd3c0 100755 --- a/hack/ginkgo-e2e.sh +++ b/hack/ginkgo-e2e.sh @@ -213,6 +213,13 @@ fi # is not used. suite_args+=(--report-complete-ginkgo --report-complete-junit) +# Additional e2e.test arguments. Split into individual arguments at spaces. +# For more complex arguments pass additional arguments to the script. +if [[ -n "${KUBE_E2E_TEST_ARGS:-}" ]]; then + # shellcheck disable=SC2206 # Splitting at word boundaries is intentional here. + suite_args+=(${KUBE_E2E_TEST_ARGS}) +fi + # When SIGTERM doesn't reach the E2E test suite binaries, ginkgo will exit # without collecting information from about the currently running and # potentially stuck tests. This seems to happen when Prow shuts down a test diff --git a/test/e2e/dra/kind.yaml b/test/e2e/dra/kind.yaml index 15ae095c200..9b8cde1372a 100644 --- a/test/e2e/dra/kind.yaml +++ b/test/e2e/dra/kind.yaml @@ -12,6 +12,11 @@ nodes: - role: worker - role: worker kubeadmConfigPatches: + - | + kind: KubeletConfiguration + apiVersion: kubelet.config.k8s.io/v1beta1 + enableSystemLogHandler: true + enableSystemLogQuery: true # v1beta4 for the future (v1.35.0+ ?) # https://github.com/kubernetes-sigs/kind/issues/3847 # TODO: drop v1beta3 when kind makes the switch @@ -78,3 +83,4 @@ kubeadmConfigPatches: # --config <(cat test/e2e/dra/kind.yaml; echo " : true") featureGates: DynamicResourceAllocation: true + NodeLogQuery: true diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 87dcfef2197..88179d101f8 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -55,6 +55,7 @@ import ( _ "k8s.io/kubernetes/test/e2e/dra" _ "k8s.io/kubernetes/test/e2e/instrumentation" _ "k8s.io/kubernetes/test/e2e/invariants" + "k8s.io/kubernetes/test/e2e/invariants/logcheck" _ "k8s.io/kubernetes/test/e2e/kubectl" _ "k8s.io/kubernetes/test/e2e/lifecycle" _ "k8s.io/kubernetes/test/e2e/lifecycle/bootstrap" @@ -78,6 +79,7 @@ func handleFlags() { config.CopyFlags(config.Flags, flag.CommandLine) framework.RegisterCommonFlags(flag.CommandLine) framework.RegisterClusterFlags(flag.CommandLine) + logcheck.RegisterFlags(flag.CommandLine) flag.Parse() } diff --git a/test/e2e/invariants/logcheck/logcheck.go b/test/e2e/invariants/logcheck/logcheck.go new file mode 100644 index 00000000000..dcdaca9a03f --- /dev/null +++ b/test/e2e/invariants/logcheck/logcheck.go @@ -0,0 +1,667 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package logcheck extends a Ginkgo test suite with optional monitoring +// of system log files. Right now it can detect DATA RACE problems. +// +// This may get expanded in the future, for example to check also +// for unhandled and potentially unexpected errors (https://github.com/kubernetes/kubernetes/issues/122005) +// or log spam (https://github.com/kubernetes/kubernetes/issues/109297). +// +// Because it grabs log output as it is produced, it is possible to dump *all* +// data into $ARTIFACTS/system-logs, in contrast to other approaches which +// potentially only capture the tail of the output (log rotation). +// +// The additional check(s) must be opt-in because they are not portable. +// The package registers additional command line flags for that. +// Dedicated jobs are necessary which enable them. +// +// For DATA RACE, the binaries in the cluster must be compiled with race +// detection (https://github.com/kubernetes/kubernetes/pull/133834/files). +// It's also necessary to specify which containers to check. Other output +// (e.g. kubelet in a kind cluster) currently cannot be checked. +// +// In ginkgo.ReportBeforeSuite, the main Ginkgo worker process starts +// monitoring log output: +// +// - The podlogs package watches pods in namespaces of interest (by default, +// any that have "system" in their name). For each container in those pods, +// a logOutputChecker instance is created and is passed log output lines +// via Write. podlogs streams output, similar to "kubectl logs -f". +// +// - Kubelet log output is retrieved periodically via the node log query +// feature. Start and end times of each chunk are chosen so that there +// are no gaps. The output is stripped of additional journald line +// headers and then passed to one logOutputChecker per node. +// +// - logOutputChecker detects DATA RACE reports based on their prefix and suffix lines. +// +// - Log entries emitted during monitoring are buffered. +// +// In ginkgo.ReportAfterSuite, the log output is emitted as stdout of the check +// together with a summary of what was checked without detecting problems. +// DATA RACEs and unexpected log errors are turned into a failure. +package logcheck + +import ( + "bufio" + "context" + "errors" + "flag" + "fmt" + "io" + "maps" + "os" + "path" + "regexp" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/onsi/ginkgo/v2" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "k8s.io/klog/v2/textlogger" + "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/storage/podlogs" +) + +const ( + // defaultNamespacesRE is the default regular expression for which namespaces need to be checked, + // if checking is enabled: anything which contains the word "system". + // + // If we had an API for identifying "system" namespaces, we could use that, but we + // don't have that yet (https://github.com/kubernetes/enhancements/issues/5708). + defaultNamespacesRE = `system` + + // defaultNodesRE is the default regular expression for which nodes + // should have their kubelet log output checked via the log query + // feature (https://kubernetes.io/docs/concepts/cluster-administration/system-logs/#log-query). + defaultNodesRE = `.*` + + dataRaceStart = "WARNING: DATA RACE\n" + dataRaceEnd = "==================\n" + maxBacktraceLines = 20 +) + +var ( + enabledLogChecks = logCheck{ + namespaces: regexpValue{ + re: regexp.MustCompile(defaultNamespacesRE), + }, + nodes: regexpValue{ + re: regexp.MustCompile(defaultNodesRE), + }, + } + + lc *logChecker + lcLogger klog.Logger +) + +// logCheck determines what gets checked in log output. +type logCheck struct { + // namespaces contains a regular expression which determines which namespaces need to be checked. + // All containers in those namespaces are checked. + // If the regular expression is nil, the default is used. + namespaces regexpValue + + // namespaces contains a regular expression which determines which nodes need to be checked + // by retrieving the kubelet log via the log query feature (https://kubernetes.io/docs/concepts/cluster-administration/system-logs/#log-query). + nodes regexpValue + + // dataRaces enables checking for "DATA RACE" reports. + dataRaces bool + // More checks may get added in the future. +} + +// any returns true if any log output check is enabled. +func (c logCheck) any() bool { + // Needs to be extended when adding new checks. + return c.dataRaces +} + +// regexpValue implements flag.Value for a regular expression. +type regexpValue struct { + re *regexp.Regexp +} + +var _ flag.Value = ®expValue{} + +func (r *regexpValue) String() string { return r.re.String() } +func (r *regexpValue) Set(expr string) error { + re, err := regexp.Compile(expr) + if err != nil { + // This already starts with "error parsing regexp" and + // the caller adds the expression string, + // so no need to wrap the error here. + return err + } + r.re = re + return nil +} + +// RegisterFlags adds command line flags for configuring the package to the given flag set. +// They have "logcheck" as prefix. +func RegisterFlags(fs *flag.FlagSet) { + fs.BoolVar(&enabledLogChecks.dataRaces, "logcheck-data-races", false, "enables checking logs for DATA RACE warnings") + fs.Var(&enabledLogChecks.namespaces, "logcheck-namespaces-regexp", "all namespaces matching this regular expressions get checked") + fs.Var(&enabledLogChecks.nodes, "logcheck-nodes-regexp", "all kubelets on nodes matching this regular expressions get checked") +} + +var _ = ginkgo.ReportBeforeSuite(func(ctx ginkgo.SpecContext, report ginkgo.Report) { + if report.SuiteConfig.DryRun { + return + } + + // This is only reached in the main process. + initialize() +}) + +// SIG Testing runs this as a service for the SIGs which own the code. +var _ = ginkgo.ReportAfterSuite("[sig-testing] Log Check", func(ctx ginkgo.SpecContext, report ginkgo.Report) { + if report.SuiteConfig.DryRun { + return + } + + // This is reached only in the main process after the test run has completed. + finalize() +}) + +// initialize gets called once before tests start to run. +// It sets up monitoring of system component log output, if requested. +func initialize() { + if !enabledLogChecks.any() { + return + } + + config, err := framework.LoadConfig() + framework.ExpectNoError(err, "loading client config") + client, err := kubernetes.NewForConfig(config) + framework.ExpectNoError(err, "creating client") + + ctx, l, err := newLogChecker(context.Background(), client, enabledLogChecks, framework.TestContext.ReportDir) + framework.ExpectNoError(err, "set up log checker") + + lc = l + lcLogger = klog.FromContext(ctx) + lc.start(ctx, podlogs.CopyAllLogs, kubeletLogQuery) +} + +// finalize gets called once after tests have run, in the process where initialize was also called. +func finalize() { + if lc == nil { + return + } + + failure, stdout := lc.stop(lcLogger) + _, _ = ginkgo.GinkgoWriter.Write([]byte(stdout)) + if failure != "" { + // Reports as post-suite failure. + ginkgo.Fail(failure) + } +} + +func newLogChecker(ctx context.Context, client kubernetes.Interface, check logCheck, logDir string) (context.Context, *logChecker, error) { + var logFile string + + if logDir != "" { + // Redirect log output. Also, check for errors when we stop, those mustn't go unnoticed in a CI job. + logFile = path.Join(logDir, "system-logs.log") + output, err := os.Create(logFile) + if err != nil { + return ctx, nil, fmt.Errorf("create log file: %w", err) + } + // Allow increasing verbosity via the command line, but always use at + // least 4 when writing into a file - we can afford that. + vflag := flag.CommandLine.Lookup("v") + v, _ := strconv.Atoi(vflag.Value.String()) + if v < 5 { + v = 5 + } + + logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(output), textlogger.Verbosity((int(v))))) + ctx = klog.NewContext(ctx, logger) + } + if check.namespaces.re == nil { + check.namespaces.re = regexp.MustCompile(defaultNamespacesRE) + } + if check.nodes.re == nil { + check.nodes.re = regexp.MustCompile(defaultNodesRE) + } + + ctx, cancel := context.WithCancelCause(ctx) + return ctx, &logChecker{ + wg: newWaitGroup(), + client: client, + cancel: cancel, + check: check, + logDir: logDir, + logFile: logFile, + dataRaces: make(map[string][][]string), + }, nil +} + +type logChecker struct { + client kubernetes.Interface + cancel func(err error) + check logCheck + logDir string + logFile string + + // wg tracks background activity. + wg *waitGroup + + // mutex protects all following fields. + mutex sync.Mutex + + // All data races detected so far, indexed by "//" + // or "kubelet/". + // + // The last entry is initially empty while waiting for the next data race. + // + // Only entities for which at least some output was received get added here, + // therefore also the ": okay" part of the report only appears for those. + dataRaces map[string][][]string +} + +// stop cancels pod monitoring, waits until that is shut down, and then produces text for a failure message (ideally empty) and stdout. +func (l *logChecker) stop(logger klog.Logger) (failure, stdout string) { + logger.V(4).Info("Asking log monitors to stop") + l.cancel(errors.New("asked to stop")) + + // Wait for completion. + l.wg.wait() + + // Now we can proceed and produce the report. + l.mutex.Lock() + defer l.mutex.Unlock() + + var failureBuffer strings.Builder + var stdoutBuffer strings.Builder + if l.logFile != "" { + logData, err := os.ReadFile(l.logFile) + if err != nil { + stdoutBuffer.WriteString(fmt.Sprintf("Reading %s failed: %v", l.logFile, err)) + } else { + stdoutBuffer.Write(logData) + } + + // Find all error log lines. A bit crude (doesn't handle multi-line outout), but good enough + // because the full log is available. + errorLogs := regexp.MustCompile(`(?m)^E.*\n`).FindAllString(string(logData), -1) + if len(errorLogs) > 0 { + failureBuffer.WriteString("Unexpected errors during log data collection (see stdout for full log):\n\n") + for _, errorLog := range errorLogs { + // Indented to make it a verbatim block in Markdown. + failureBuffer.WriteString(" ") + failureBuffer.WriteString(errorLog) + } + } + } + + keys := slices.AppendSeq([]string(nil), maps.Keys(l.dataRaces)) + slices.Sort(keys) + for _, k := range keys { + races := l.dataRaces[k] + buffer := &failureBuffer + if len(races) == 0 { + buffer = &stdoutBuffer + } + if buffer.Len() > 0 { + buffer.WriteString("\n") + } + buffer.WriteString("#### " + k + "\n") + if len(races) == 0 { + buffer.WriteString("\nOkay.\n") + continue + } + for _, race := range races { + indent := " " + buffer.WriteString("\n") + if len(races) > 1 { + // Format as bullet-point list. + buffer.WriteString("- DATA RACE:\n \n") + // This also shifts the text block to the right. + indent += " " + } else { + // Single line of intro text, then the text block. + buffer.WriteString("DATA RACE:\n\n") + } + // Indented lines are stack backtraces. Those can be long, + // so collect them as we iterate over lines and truncate in the middle. + var backtrace []string + dumpBacktrace := func() { + if len(backtrace) == 0 { + return + } + + if len(backtrace) > maxBacktraceLines { + head := backtrace[0 : maxBacktraceLines/2] + tail := backtrace[len(backtrace)-maxBacktraceLines/2:] + + backtrace = slices.Clone(head) + backtrace = append(backtrace, " ...\n") + backtrace = append(backtrace, tail...) + } + + for _, line := range backtrace { + buffer.WriteString(indent) + buffer.WriteString(line) + } + + backtrace = nil + } + for _, line := range race { + if !strings.HasPrefix(line, " ") { + // Non-backtrace line => flush and write the line. + dumpBacktrace() + buffer.WriteString(indent) + buffer.WriteString(line) + continue + } + backtrace = append(backtrace, line) + } + dumpBacktrace() + } + } + + return failureBuffer.String(), stdoutBuffer.String() +} + +func (l *logChecker) start(ctx context.Context, startCopyAllLogs func(ctx context.Context, cs kubernetes.Interface, ns string, to podlogs.LogOutput) error, startNodeLog func(ctx context.Context, cs kubernetes.Interface, wg *waitGroup, nodeName string) io.Reader) { + if !l.check.any() { + return + } + + logger := klog.FromContext(ctx) + + // Figure out which namespace(s) to watch. + namespaces, err := l.client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + logger.Error(err, "Failed to list namespaces") + return + } + + // Same for nodes. + nodes, err := l.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + logger.Error(err, "Failed to list nodes") + return + } + + for _, namespace := range namespaces.Items { + if !l.check.namespaces.re.MatchString(namespace.Name) { + continue + } + to := podlogs.LogOutput{ + StatusWriter: &statusWriter{logger: logger, namespace: &namespace}, + LogPathPrefix: path.Join(l.logDir, namespace.Name) + "/", + LogOpen: func(podName, containerName string) io.Writer { + return l.logOpen(logger, namespace.Name, podName, containerName) + }, + } + + logger.Info("Watching", "namespace", klog.KObj(&namespace)) + if err := startCopyAllLogs(ctx, l.client, namespace.Name, to); err != nil { + logger.Error(err, "Log output collection failed", "namespace", klog.KObj(&namespace)) + } + } + + for _, node := range nodes.Items { + if !l.check.nodes.re.MatchString(node.Name) { + continue + } + + logger.Info("Watching", "node", klog.KObj(&node)) + kubeletLog := startNodeLog(ctx, l.client, l.wg, node.Name) + l.wg.goIfNotShuttingDown(nil, func() { + scanner := bufio.NewScanner(kubeletLog) + writer := logOutputChecker{ + logger: logger, + k: "kubelet/" + node.Name, + l: l, + } + for scanner.Scan() { + // We need to strip whatever headers might have been added by the log storage + // that was queried the log query feature. We don't exactly know what that might be, + // so let's use a regexp that matches all known line headers. + // + // Unknown lines are passed through, which is okayish (they get treated like + // unknown, raw output from the kubelet). + line := scanner.Text() + line = logQueryLineHeaderRE.ReplaceAllString(line, "") + _, _ = writer.Write([]byte(line + "\n")) + } + if err := scanner.Err(); err != nil { + logger.Error(err, "Reading kubelet log failed", "node", klog.KObj(&node)) + } + }) + } +} + +// journaldHeader matches journald lines containing output from the kubelet: +// +// Jan 06 15:20:26.641748 kind-worker2 kubelet[311]: I0106 15:20:26.641743 311 labels.go:289] ... +// +// We also get messages from systemd itself. Those are not matched: +// +// Jan 07 08:35:52.139136 kind-worker3 systemd[1]: Started kubelet.service - kubelet: The Kubernetes Node Agent. +const journaldHeader = `... .. ..:..:......... \S+ kubelet\[\d+\]: ` + +// logQueryLineHeaderRE combines all supported log query data formats. +// Currently only journald is supported. +var logQueryLineHeaderRE = regexp.MustCompile(`^(?:` + journaldHeader + `)`) + +func (l *logChecker) logOpen(logger klog.Logger, names ...string) io.Writer { + if !l.check.dataRaces { + return nil + } + + if !l.wg.add(1) { + return io.Discard + } + + k := strings.Join(names, "/") + logger.Info("Starting to check for data races", "container", k) + return &logOutputChecker{logger: logger, k: k, l: l} +} + +type statusWriter struct { + logger klog.Logger + namespace *v1.Namespace +} + +// Write gets called with text that describes problems encountered while monitoring pods and their output. +func (s *statusWriter) Write(data []byte) (int, error) { + msg := string(data) + msg = strings.TrimRight(msg, "\n") + if strings.HasPrefix(msg, "WARNING:") { + // Warnings are informational, podlogs recovers from problems (like "connection refused" + // under load, "pod not found" when a pod terminates while trying to read log output). + // They don't need to be treated as a logcheck failure when looking at logs later. + s.logger.Info("PodLogs status", "namespace", klog.KObj(s.namespace), "msg", msg) + } else { + s.logger.Error(nil, "PodLogs status", "namespace", klog.KObj(s.namespace), "msg", msg) + } + return len(msg), nil +} + +// logOutputChecker receives log output for one component and detects embedded DATA RACE reports. +type logOutputChecker struct { + logger klog.Logger + k string + l *logChecker + inDataRace bool +} + +var ( + _ io.Writer = &logOutputChecker{} + _ io.Closer = &logOutputChecker{} +) + +// Write gets called for each line of output received from a container or kubelet instance. +// The line ends with a newline. +func (p *logOutputChecker) Write(l []byte) (int, error) { + line := string(l) + + p.l.mutex.Lock() + defer p.l.mutex.Unlock() + + races := p.l.dataRaces[p.k] + switch { + case p.inDataRace && line != dataRaceEnd: + races[len(races)-1] = append(races[len(races)-1], line) + case p.inDataRace && line == dataRaceEnd: + // Stop collecting data race lines. + p.inDataRace = false + p.logger.Info("Completed data race", "container", p.k, "count", len(races), "dataRace", strings.Join(races[len(races)-1], "")) + case !p.inDataRace && line == dataRaceStart: + // Start collecting data race lines. + p.inDataRace = true + races = append(races, nil) + p.logger.Info("Started new data race", "container", p.k, "count", len(races)) + default: + // Some other log output. + } + p.l.dataRaces[p.k] = races + + return len(l), nil +} + +// Close gets called once all output is processed. +func (p *logOutputChecker) Close() error { + p.logger.Info("Done checking for data races", "container", p.k) + p.l.wg.done() + return nil +} + +// kubeletLogQuery sets up repeatedly querying the log of kubelet on the given node +// via the log query feature. +// +// All background goroutines stop when the context gets canceled and are added +// to the given WaitGroup before kubeletLogQuery returns. The reader will +// stop providing data when that happens. +// +// The lines in the data written to the reader may contain journald headers or other, +// platform specific headers (this is not specified for the log query feature). +func kubeletLogQuery(ctx context.Context, cs kubernetes.Interface, wg *waitGroup, nodeName string) io.Reader { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "KubeletLogQuery") + logger = klog.LoggerWithValues(logger, "node", klog.KRef("", nodeName)) + reader, writer := io.Pipe() + + wg.goIfNotShuttingDown(func() { + _ = writer.Close() + }, func() { + logger.V(4).Info("Started") + defer func() { + logger.V(4).Info("Stopped", "reason", context.Cause(ctx)) + }() + + // How long to wait between queries is a compromise between "too long" (= too much data) + // and "too short" (= too much overhead because of frequent queries). It's not clear + // where the sweet spot is. With the current default, there were 12 calls for one worker node + // during a ~1h pull-kubernetes-e2e-kind-alpha-beta-features run, with an average result + // size of ~16MB. + // + // All log checking activities share the same client instance. It's created specifically + // for that purpose, so client-side throttling does not affect other tests. + ticker := time.NewTicker(300 * time.Second) + defer ticker.Stop() + + var since time.Time + for { + select { + case <-ctx.Done(): + logger.V(4).Info("Asked to stop, will query log one last time", "reason", context.Cause(ctx)) + case <-ticker.C: + logger.V(6).Info("Starting periodic log query") + } + + // Query once also when asked to stop via cancelation, to get the tail of the output. + // We use a timeout to prevent blocking forever here. + err := func() error { + ctx := context.WithoutCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + now := time.Now() + untilTime := now.Format(time.RFC3339) + sinceTime := "" + if !since.IsZero() { + sinceTime = since.Format(time.RFC3339) + } + // The next query will use the current end time as start time. Both start and end + // are inclusive, so unfortunately there is a risk that we get the same log + // output for that time stamp twice. There's no good way to avoid that. + // Let's hope it's rare and doesn't matter when it occurs. Missing + // output would be worse because then the end marker of a DATA RACE + // report could be missed. + since = now + req := cs.CoreV1().RESTClient().Post(). + Resource("nodes"). + Name(nodeName). + SubResource("proxy"). + Suffix("logs"). + Param("query", "kubelet"). + Param("untilTime", untilTime) + if sinceTime != "" { + req = req.Param("sinceTime", sinceTime) + } + data, err := req.DoRaw(ctx) + if loggerV := logger.V(4); loggerV.Enabled() { + head := string(data) + if len(head) > 30 { + head = head[:30] + head += "..." + } + loggerV.Info("Queried log", "sinceTime", sinceTime, "endTime", now, "len", len(data), "data", head, "err", err) + } + + // Let's process whatever data we have. The exact result of the query is a bit + // underspecified. This is based on observed behavior in Kubernetes 1.35. + + // HTML seems to be what the kubelet responds when the feature is disabled?! + // May or may not be intentional according to a Slack discussion, + // to be clarified in https://github.com/kubernetes/kubernetes/issues/136275. + if strings.HasPrefix(string(data), "") { + return fmt.Errorf("unexpected result of log query (feature disabled?): %q", string(data)) + } + + // Skip the special string that is used for "no data available". + if string(data) != "-- No entries --\n" { + _, _ = writer.Write(data) + } + return err + }() + + if err != nil { + logger.Error(err, "Log query failed") + return + } + + if ctx.Err() != nil { + return + } + } + }) + return reader +} diff --git a/test/e2e/invariants/logcheck/logcheck_test.go b/test/e2e/invariants/logcheck/logcheck_test.go new file mode 100644 index 00000000000..34510129fb6 --- /dev/null +++ b/test/e2e/invariants/logcheck/logcheck_test.go @@ -0,0 +1,1088 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logcheck + +import ( + "context" + "errors" + "fmt" + "io" + "regexp" + "strings" + "testing" + "testing/synctest" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/test/e2e/storage/podlogs" + "k8s.io/kubernetes/test/utils/ktesting" +) + +type containerLogs struct { + namespace, pod, container, output string +} + +const ( + logHeader = `E0901 16:50:24.914187 1 reflector.go:418] "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking" err="the server could not find the requested resource" +E0901 16:50:24.918970 1 reflector.go:203] "Failed to watch" err="failed to list *v1.PartialObjectMetadata: the server could not find the requested resource" logger="UnhandledError" reflector="k8s.io/client-go/metadata/metadatainformer/informer.go:138" type="*v1.PartialObjectMetadata" +I0901 16:55:24.615827 1 pathrecorder.go:243] healthz: "/healthz" satisfied by exact match +` + logTail = `I0901 16:55:24.619018 1 httplog.go:134] "HTTP" verb="GET" URI="/healthz" latency="3.382408ms" userAgent="kube-probe/1.35+" audit-ID="" srcIP="127.0.0.1:34558" resp=200 +E0901 17:10:42.624619 1 reflector.go:418] "The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking" err="the server could not find the requested resource" +E0901 17:10:42.630037 1 reflector.go:203] "Failed to watch" err="failed to list *v1.PartialObjectMetadata: the server could not find the requested resource" logger="UnhandledError" reflector="k8s.io/client-go/metadata/metadatainformer/informer.go:138" type="*v1.PartialObjectMetadata" +` +) + +func TestCheckLogs(t *testing.T) { + nothing := regexpValue{regexp.MustCompile(`^$`)} + + for name, tc := range map[string]struct { + logs []containerLogs + numWorkers int + kubeletLogs map[string]string + check logCheck + podLogsError error + expectFailure string + expectStdout string + }{ + "empty": {}, + "data-race": { + logs: []containerLogs{{"kube-system", "pod", "container", `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +`}}, + check: logCheck{dataRaces: true}, + expectFailure: `#### kube-system/pod/container + +DATA RACE: + + Write at ... + Goroutine created at: + scheduleHandler() +`, + expectStdout: `] "Watching" namespace="kube-system" +] "Starting to check for data races" container="kube-system/pod/container" +] "Started new data race" container="kube-system/pod/container" count=1 +] "Completed data race" container="kube-system/pod/container" count=1 dataRace=< + Write at ... + Goroutine created at: + scheduleHandler() + > +] "Done checking for data races" container="kube-system/pod/container" +`, + }, + "disabled-data-race": { + logs: []containerLogs{{"kube-system", "pod", "container", `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +`}}, + check: logCheck{dataRaces: false}, + }, + "ignored-data-race-in-default-namespace": { + logs: []containerLogs{{"default", "pod", "container", `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +`}}, + check: logCheck{dataRaces: true}, + expectStdout: `] "Watching" namespace="kube-system" +`, + }, + "ignored-data-race-because-of-namespace-config": { + logs: []containerLogs{{"kube-system", "pod", "container", `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +`}}, + check: logCheck{dataRaces: true, namespaces: regexpValue{regexp.MustCompile("other")}}, + expectStdout: `] "Watching" namespace="other-namespace" +] "Watching" namespace="yet-another-namespace" +`, + }, + "start-error": { + logs: []containerLogs{{"kube-system", "pod", "container", ``}}, + check: logCheck{dataRaces: true}, + podLogsError: errors.New("fake pod logs error"), + expectStdout: `] "Watching" namespace="kube-system" +] "Log output collection failed" err="fake pod logs error" namespace="kube-system" +`, + expectFailure: `Unexpected errors during log data collection (see stdout for full log): + + ] "Log output collection failed" err="fake pod logs error" namespace="kube-system" +`, + }, + "log-error": { + logs: []containerLogs{{"kube-system", "pod", "container", ``}}, + check: logCheck{dataRaces: true}, + expectStdout: `] "Watching" namespace="kube-system" +] "PodLogs status" namespace="kube-system" msg="ERROR: fake error: no log output" +`, + expectFailure: `Unexpected errors during log data collection (see stdout for full log): + + ] "PodLogs status" namespace="kube-system" msg="ERROR: fake error: no log output" +`, + }, + "two-data-races": { + logs: []containerLogs{{"kube-system", "pod", "container", `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + otherScheduleHandler() +================== +`}}, + check: logCheck{dataRaces: true}, + expectFailure: `#### kube-system/pod/container + +- DATA RACE: + + Write at ... + Goroutine created at: + scheduleHandler() + +- DATA RACE: + + Write at ... + Goroutine created at: + otherScheduleHandler() +`, + expectStdout: `] "Watching" namespace="kube-system" +] "Starting to check for data races" container="kube-system/pod/container" +] "Started new data race" container="kube-system/pod/container" count=1 +] "Completed data race" container="kube-system/pod/container" count=1 dataRace=< + Write at ... + Goroutine created at: + scheduleHandler() + > +] "Started new data race" container="kube-system/pod/container" count=2 +] "Completed data race" container="kube-system/pod/container" count=2 dataRace=< + Write at ... + Goroutine created at: + otherScheduleHandler() + > +] "Done checking for data races" container="kube-system/pod/container" +`, + }, + "both": { + logs: []containerLogs{{"kube-system", "pod", "container", logHeader + `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +` + logTail}}, + check: logCheck{dataRaces: true}, + expectFailure: `#### kube-system/pod/container + +DATA RACE: + + Write at ... + Goroutine created at: + scheduleHandler() +`, + expectStdout: `] "Watching" namespace="kube-system" +] "Starting to check for data races" container="kube-system/pod/container" +] "Started new data race" container="kube-system/pod/container" count=1 +] "Completed data race" container="kube-system/pod/container" count=1 dataRace=< + Write at ... + Goroutine created at: + scheduleHandler() + > +] "Done checking for data races" container="kube-system/pod/container" +`, + }, + "two-containers": { + logs: []containerLogs{ + {"kube-system", "pod", "container1", logHeader}, + {"kube-system", "pod", "container2", `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +`}, + }, + check: logCheck{dataRaces: true}, + expectStdout: `] "Watching" namespace="kube-system" +] "Starting to check for data races" container="kube-system/pod/container1" +] "Done checking for data races" container="kube-system/pod/container1" +] "Starting to check for data races" container="kube-system/pod/container2" +] "Started new data race" container="kube-system/pod/container2" count=1 +] "Completed data race" container="kube-system/pod/container2" count=1 dataRace=< + Write at ... + Goroutine created at: + scheduleHandler() + > +] "Done checking for data races" container="kube-system/pod/container2" + +#### kube-system/pod/container1 + +Okay. +`, + expectFailure: `#### kube-system/pod/container2 + +DATA RACE: + + Write at ... + Goroutine created at: + scheduleHandler() +`, + }, + "two-pods": { + logs: []containerLogs{ + {"kube-system", "pod1", "container", logHeader}, + {"kube-system", "pod2", "container", `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + scheduleHandler() +================== +`}, + }, + check: logCheck{dataRaces: true}, + expectStdout: `] "Watching" namespace="kube-system" +] "Starting to check for data races" container="kube-system/pod1/container" +] "Done checking for data races" container="kube-system/pod1/container" +] "Starting to check for data races" container="kube-system/pod2/container" +] "Started new data race" container="kube-system/pod2/container" count=1 +] "Completed data race" container="kube-system/pod2/container" count=1 dataRace=< + Write at ... + Goroutine created at: + scheduleHandler() + > +] "Done checking for data races" container="kube-system/pod2/container" + +#### kube-system/pod1/container + +Okay. +`, + expectFailure: `#### kube-system/pod2/container + +DATA RACE: + + Write at ... + Goroutine created at: + scheduleHandler() +`, + }, + "kubelet-all-nodes": { + numWorkers: 3, + kubeletLogs: map[string]string{ + "worker1": `Some other output.... +... more output. +`, + "worker2": `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + handlePod2() +================== +`, + }, + check: logCheck{dataRaces: true, namespaces: nothing}, + expectStdout: `] "Watching" node="worker0" +] "Watching" node="worker1" +] "Watching" node="worker2" +] "Started new data race" container="kubelet/worker2" count=1 +] "Completed data race" container="kubelet/worker2" count=1 dataRace=< + Write at ... + Goroutine created at: + handlePod2() + > + +#### kubelet/worker1 + +Okay. +`, + expectFailure: `#### kubelet/worker2 + +DATA RACE: + + Write at ... + Goroutine created at: + handlePod2() +`, + }, + "kubelet-no-nodes": { + numWorkers: 3, + kubeletLogs: map[string]string{ + "worker1": `Some other output.... +... more output. +`, + "worker2": `================== +WARNING: DATA RACE +Write at ... +Goroutine created at: + handlePod2() +================== +`, + }, + check: logCheck{dataRaces: true, namespaces: nothing, nodes: nothing}, + }, + "kubelet-long": { + numWorkers: 3, + kubeletLogs: map[string]string{ + // This is a real example. The race detector seems to replicate stack entries, + // so we should better truncate in the middle to keep failure messages short. + "worker2": `================== +WARNING: DATA RACE +Write at 0x00c0010def18 by goroutine 285: + k8s.io/kubernetes/pkg/kubelet/status.normalizeStatus.func1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1193 +0x1ee + k8s.io/kubernetes/pkg/kubelet/status.normalizeStatus() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1209 +0x175 + k8s.io/kubernetes/pkg/kubelet/status.(*manager).updateStatusInternal() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:838 +0x9dc + k8s.io/kubernetes/pkg/kubelet/status.(*manager).SetContainerReadiness() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:501 +0x20cb + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2650 +0x25a5 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).updateCache() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:83 +0x3e + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).Run() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:1896 +0xfea + k8s.io/kubernetes/cmd/kubelet/app.startKubelet.gowrap1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x50 +Previous read at 0x00c0010def18 by goroutine 537: + k8s.io/apimachinery/pkg/apis/meta/v1.(*Time).MarshalJSON() + :1 +0x44 + encoding/json.marshalerEncoder() + encoding/json/encode.go:483 +0x13c + encoding/json.structEncoder.encode() + encoding/json/encode.go:758 +0x3c7 + encoding/json.structEncoder.encode-fm() + :1 +0xe4 + encoding/json.structEncoder.encode() + encoding/json/encode.go:758 +0x3c7 + encoding/json.structEncoder.encode-fm() + :1 +0xe4 + encoding/json.(*encodeState).reflectValue() + encoding/json/encode.go:367 +0x83 + encoding/json.(*encodeState).marshal() + encoding/json/encode.go:343 +0xdb + encoding/json.Marshal() + encoding/json/encode.go:209 +0x11e + k8s.io/kubernetes/pkg/util/pod.preparePatchBytesForPodStatus() + k8s.io/kubernetes/pkg/util/pod/pod.go:58 +0x2d3 + k8s.io/kubernetes/pkg/util/pod.PatchPodStatus() + k8s.io/kubernetes/pkg/util/pod/pod.go:35 +0x12b + k8s.io/kubernetes/pkg/kubelet/status.(*manager).syncPod() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1064 +0xada + k8s.io/kubernetes/pkg/kubelet/status.(*manager).syncBatch() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1025 +0x199 + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start.func1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:260 +0x1a4 + k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1() + k8s.io/apimachinery/pkg/util/wait/backoff.go:233 +0x2e + k8s.io/apimachinery/pkg/util/wait.BackoffUntilWithContext.func1() + k8s.io/apimachinery/pkg/util/wait/backoff.go:255 +0x98 + k8s.io/apimachinery/pkg/util/wait.BackoffUntilWithContext() + k8s.io/apimachinery/pkg/util/wait/backoff.go:256 +0xed + k8s.io/apimachinery/pkg/util/wait.BackoffUntil() + k8s.io/apimachinery/pkg/util/wait/backoff.go:233 +0x8a + k8s.io/apimachinery/pkg/util/wait.JitterUntil() + k8s.io/apimachinery/pkg/util/wait/backoff.go:210 +0xfb + k8s.io/apimachinery/pkg/util/wait.Until() + k8s.io/apimachinery/pkg/util/wait/backoff.go:163 +0x50 + k8s.io/apimachinery/pkg/util/wait.Forever() + k8s.io/apimachinery/pkg/util/wait/wait.go:80 +0x2a + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start.gowrap1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255 +0x17 +Goroutine 285 (running) created at: + k8s.io/kubernetes/cmd/kubelet/app.startKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x14e + k8s.io/kubernetes/cmd/kubelet/app.RunKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1256 +0x7d7 + k8s.io/kubernetes/cmd/kubelet/app.createAndInitKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1286 +0x5fd + k8s.io/kubernetes/cmd/kubelet/app.RunKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1235 +0x588 + k8s.io/kubernetes/cmd/kubelet/app.run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:901 +0x3c3e + k8s.io/kubernetes/cmd/kubelet/app.getCgroupDriverFromCRI() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1382 +0x116 + k8s.io/cri-client/pkg.(*remoteRuntimeService).RuntimeConfig() + k8s.io/cri-client/pkg/remote_runtime.go:926 +0x144 + k8s.io/kubernetes/cmd/kubelet/app.getCgroupDriverFromCRI() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1382 +0x116 + k8s.io/cri-client/pkg.(*remoteRuntimeService).RuntimeConfig() + k8s.io/cri-client/pkg/remote_runtime.go:926 +0x144 + k8s.io/kubernetes/cmd/kubelet/app.getCgroupDriverFromCRI() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1382 +0x116 + k8s.io/kubernetes/cmd/kubelet/app.run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:732 +0x10f9 + k8s.io/kubernetes/pkg/kubelet.PreInitRuntimeService() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:411 +0x384 + k8s.io/cri-client/pkg.NewRemoteRuntimeService() + k8s.io/cri-client/pkg/remote_runtime.go:134 +0x118a + k8s.io/kubernetes/pkg/kubelet.PreInitRuntimeService() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:408 +0x28f + k8s.io/kubernetes/cmd/kubelet/app.run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:727 +0x10cb + k8s.io/kubernetes/cmd/kubelet/app.Run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:532 +0x4b9 + k8s.io/kubernetes/cmd/kubelet/app.NewKubeletCommand.func1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:295 +0x1837 + github.com/spf13/cobra.(*Command).execute() + github.com/spf13/cobra@v1.10.0/command.go:1015 +0x113b + github.com/spf13/cobra.(*Command).ExecuteC() + github.com/spf13/cobra@v1.10.0/command.go:1148 +0x797 + github.com/spf13/cobra.(*Command).Execute() + github.com/spf13/cobra@v1.10.0/command.go:1071 +0x4d0 + k8s.io/component-base/cli.run() + k8s.io/component-base/cli/run.go:146 +0x4d1 + k8s.io/component-base/cli.Run() + k8s.io/component-base/cli/run.go:44 +0x3b + main.main() + k8s.io/kubernetes/cmd/kubelet/kubelet.go:56 +0x2f +Goroutine 537 (running) created at: + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255 +0x27e + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).Run() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:1877 +0xdde + k8s.io/kubernetes/cmd/kubelet/app.startKubelet.gowrap1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x50 +================== +`, + }, + check: logCheck{dataRaces: true, namespaces: nothing}, + expectStdout: `] "Watching" node="worker0" +] "Watching" node="worker1" +] "Watching" node="worker2" +] "Started new data race" container="kubelet/worker2" count=1 +] "Completed data race" container="kubelet/worker2" count=1 dataRace=< + Write at 0x00c0010def18 by goroutine 285: + k8s.io/kubernetes/pkg/kubelet/status.normalizeStatus.func1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1193 +0x1ee + k8s.io/kubernetes/pkg/kubelet/status.normalizeStatus() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1209 +0x175 + k8s.io/kubernetes/pkg/kubelet/status.(*manager).updateStatusInternal() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:838 +0x9dc + k8s.io/kubernetes/pkg/kubelet/status.(*manager).SetContainerReadiness() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:501 +0x20cb + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2650 +0x25a5 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).updateCache() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:83 +0x3e + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet/container.(*runtimeCache).ForceUpdateIfOlder() + k8s.io/kubernetes/pkg/kubelet/container/runtime_cache.go:77 +0x130 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).Run() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:1896 +0xfea + k8s.io/kubernetes/cmd/kubelet/app.startKubelet.gowrap1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x50 + Previous read at 0x00c0010def18 by goroutine 537: + k8s.io/apimachinery/pkg/apis/meta/v1.(*Time).MarshalJSON() + :1 +0x44 + encoding/json.marshalerEncoder() + encoding/json/encode.go:483 +0x13c + encoding/json.structEncoder.encode() + encoding/json/encode.go:758 +0x3c7 + encoding/json.structEncoder.encode-fm() + :1 +0xe4 + encoding/json.structEncoder.encode() + encoding/json/encode.go:758 +0x3c7 + encoding/json.structEncoder.encode-fm() + :1 +0xe4 + encoding/json.(*encodeState).reflectValue() + encoding/json/encode.go:367 +0x83 + encoding/json.(*encodeState).marshal() + encoding/json/encode.go:343 +0xdb + encoding/json.Marshal() + encoding/json/encode.go:209 +0x11e + k8s.io/kubernetes/pkg/util/pod.preparePatchBytesForPodStatus() + k8s.io/kubernetes/pkg/util/pod/pod.go:58 +0x2d3 + k8s.io/kubernetes/pkg/util/pod.PatchPodStatus() + k8s.io/kubernetes/pkg/util/pod/pod.go:35 +0x12b + k8s.io/kubernetes/pkg/kubelet/status.(*manager).syncPod() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1064 +0xada + k8s.io/kubernetes/pkg/kubelet/status.(*manager).syncBatch() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1025 +0x199 + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start.func1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:260 +0x1a4 + k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1() + k8s.io/apimachinery/pkg/util/wait/backoff.go:233 +0x2e + k8s.io/apimachinery/pkg/util/wait.BackoffUntilWithContext.func1() + k8s.io/apimachinery/pkg/util/wait/backoff.go:255 +0x98 + k8s.io/apimachinery/pkg/util/wait.BackoffUntilWithContext() + k8s.io/apimachinery/pkg/util/wait/backoff.go:256 +0xed + k8s.io/apimachinery/pkg/util/wait.BackoffUntil() + k8s.io/apimachinery/pkg/util/wait/backoff.go:233 +0x8a + k8s.io/apimachinery/pkg/util/wait.JitterUntil() + k8s.io/apimachinery/pkg/util/wait/backoff.go:210 +0xfb + k8s.io/apimachinery/pkg/util/wait.Until() + k8s.io/apimachinery/pkg/util/wait/backoff.go:163 +0x50 + k8s.io/apimachinery/pkg/util/wait.Forever() + k8s.io/apimachinery/pkg/util/wait/wait.go:80 +0x2a + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start.gowrap1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255 +0x17 + Goroutine 285 (running) created at: + k8s.io/kubernetes/cmd/kubelet/app.startKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x14e + k8s.io/kubernetes/cmd/kubelet/app.RunKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1256 +0x7d7 + k8s.io/kubernetes/cmd/kubelet/app.createAndInitKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1286 +0x5fd + k8s.io/kubernetes/cmd/kubelet/app.RunKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1235 +0x588 + k8s.io/kubernetes/cmd/kubelet/app.run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:901 +0x3c3e + k8s.io/kubernetes/cmd/kubelet/app.getCgroupDriverFromCRI() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1382 +0x116 + k8s.io/cri-client/pkg.(*remoteRuntimeService).RuntimeConfig() + k8s.io/cri-client/pkg/remote_runtime.go:926 +0x144 + k8s.io/kubernetes/cmd/kubelet/app.getCgroupDriverFromCRI() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1382 +0x116 + k8s.io/cri-client/pkg.(*remoteRuntimeService).RuntimeConfig() + k8s.io/cri-client/pkg/remote_runtime.go:926 +0x144 + k8s.io/kubernetes/cmd/kubelet/app.getCgroupDriverFromCRI() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1382 +0x116 + k8s.io/kubernetes/cmd/kubelet/app.run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:732 +0x10f9 + k8s.io/kubernetes/pkg/kubelet.PreInitRuntimeService() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:411 +0x384 + k8s.io/cri-client/pkg.NewRemoteRuntimeService() + k8s.io/cri-client/pkg/remote_runtime.go:134 +0x118a + k8s.io/kubernetes/pkg/kubelet.PreInitRuntimeService() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:408 +0x28f + k8s.io/kubernetes/cmd/kubelet/app.run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:727 +0x10cb + k8s.io/kubernetes/cmd/kubelet/app.Run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:532 +0x4b9 + k8s.io/kubernetes/cmd/kubelet/app.NewKubeletCommand.func1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:295 +0x1837 + github.com/spf13/cobra.(*Command).execute() + github.com/spf13/cobra@v1.10.0/command.go:1015 +0x113b + github.com/spf13/cobra.(*Command).ExecuteC() + github.com/spf13/cobra@v1.10.0/command.go:1148 +0x797 + github.com/spf13/cobra.(*Command).Execute() + github.com/spf13/cobra@v1.10.0/command.go:1071 +0x4d0 + k8s.io/component-base/cli.run() + k8s.io/component-base/cli/run.go:146 +0x4d1 + k8s.io/component-base/cli.Run() + k8s.io/component-base/cli/run.go:44 +0x3b + main.main() + k8s.io/kubernetes/cmd/kubelet/kubelet.go:56 +0x2f + Goroutine 537 (running) created at: + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255 +0x27e + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).Run() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:1877 +0xdde + k8s.io/kubernetes/cmd/kubelet/app.startKubelet.gowrap1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x50 + > +`, + expectFailure: `#### kubelet/worker2 + +DATA RACE: + + Write at 0x00c0010def18 by goroutine 285: + k8s.io/kubernetes/pkg/kubelet/status.normalizeStatus.func1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1193 +0x1ee + k8s.io/kubernetes/pkg/kubelet/status.normalizeStatus() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:1209 +0x175 + k8s.io/kubernetes/pkg/kubelet/status.(*manager).updateStatusInternal() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:838 +0x9dc + k8s.io/kubernetes/pkg/kubelet/status.(*manager).SetContainerReadiness() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:501 +0x20cb + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2650 +0x25a5 + ... + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).HandlePodCleanups() + k8s.io/kubernetes/pkg/kubelet/kubelet_pods.go:1263 +0x66f + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoopIteration() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2690 +0x29b2 + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).syncLoop() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:2542 +0x51d + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).Run() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:1896 +0xfea + k8s.io/kubernetes/cmd/kubelet/app.startKubelet.gowrap1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x50 + Previous read at 0x00c0010def18 by goroutine 537: + k8s.io/apimachinery/pkg/apis/meta/v1.(*Time).MarshalJSON() + :1 +0x44 + encoding/json.marshalerEncoder() + encoding/json/encode.go:483 +0x13c + encoding/json.structEncoder.encode() + encoding/json/encode.go:758 +0x3c7 + encoding/json.structEncoder.encode-fm() + :1 +0xe4 + encoding/json.structEncoder.encode() + encoding/json/encode.go:758 +0x3c7 + ... + k8s.io/apimachinery/pkg/util/wait.BackoffUntil() + k8s.io/apimachinery/pkg/util/wait/backoff.go:233 +0x8a + k8s.io/apimachinery/pkg/util/wait.JitterUntil() + k8s.io/apimachinery/pkg/util/wait/backoff.go:210 +0xfb + k8s.io/apimachinery/pkg/util/wait.Until() + k8s.io/apimachinery/pkg/util/wait/backoff.go:163 +0x50 + k8s.io/apimachinery/pkg/util/wait.Forever() + k8s.io/apimachinery/pkg/util/wait/wait.go:80 +0x2a + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start.gowrap1() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255 +0x17 + Goroutine 285 (running) created at: + k8s.io/kubernetes/cmd/kubelet/app.startKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x14e + k8s.io/kubernetes/cmd/kubelet/app.RunKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1256 +0x7d7 + k8s.io/kubernetes/cmd/kubelet/app.createAndInitKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1286 +0x5fd + k8s.io/kubernetes/cmd/kubelet/app.RunKubelet() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1235 +0x588 + k8s.io/kubernetes/cmd/kubelet/app.run() + k8s.io/kubernetes/cmd/kubelet/app/server.go:901 +0x3c3e + ... + github.com/spf13/cobra.(*Command).ExecuteC() + github.com/spf13/cobra@v1.10.0/command.go:1148 +0x797 + github.com/spf13/cobra.(*Command).Execute() + github.com/spf13/cobra@v1.10.0/command.go:1071 +0x4d0 + k8s.io/component-base/cli.run() + k8s.io/component-base/cli/run.go:146 +0x4d1 + k8s.io/component-base/cli.Run() + k8s.io/component-base/cli/run.go:44 +0x3b + main.main() + k8s.io/kubernetes/cmd/kubelet/kubelet.go:56 +0x2f + Goroutine 537 (running) created at: + k8s.io/kubernetes/pkg/kubelet/status.(*manager).Start() + k8s.io/kubernetes/pkg/kubelet/status/status_manager.go:255 +0x27e + k8s.io/kubernetes/pkg/kubelet.(*Kubelet).Run() + k8s.io/kubernetes/pkg/kubelet/kubelet.go:1877 +0xdde + k8s.io/kubernetes/cmd/kubelet/app.startKubelet.gowrap1() + k8s.io/kubernetes/cmd/kubelet/app/server.go:1264 +0x50 +`, + }, + } { + tCtx := ktesting.Init(t) + tCtx.SyncTest(name, func(tCtx ktesting.TContext) { + var objs []runtime.Object + for _, name := range []string{"kube-system", "default", "other-namespace", "yet-another-namespace"} { + objs = append(objs, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}}) + } + for i := range tc.numWorkers { + objs = append(objs, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("worker%d", i)}}) + } + client := fake.NewClientset(objs...) + tmpDir := tCtx.TempDir() + ctx, lc, err := newLogChecker(tCtx, client, tc.check, tmpDir) + require.NoError(tCtx, err) + startPodLogs := func(ctx context.Context, cs kubernetes.Interface, ns string, to podlogs.LogOutput) error { + if tc.podLogsError != nil { + return tc.podLogsError + } + + go func() { + for _, log := range tc.logs { + if log.namespace != ns { + continue + } + if log.output == "" { + _, _ = to.StatusWriter.Write([]byte("ERROR: fake error: no log output\n")) + continue + } + writer := to.LogOpen(log.pod, log.container) + if writer == nil { + continue + } + for _, line := range strings.Split(log.output, "\n") { + _, _ = writer.Write([]byte(line + "\n")) + } + if closer, ok := writer.(io.Closer); ok { + _ = closer.Close() + } + } + }() + + return nil + } + startNodeLog := func(ctx context.Context, cs kubernetes.Interface, wg *waitGroup, nodeName string) io.Reader { + return strings.NewReader(tc.kubeletLogs[nodeName]) + } + lc.start(ctx, startPodLogs, startNodeLog) + + // Wait for goroutines to spin up. + // lc.stop() alone is not enough because it races + // with adding more background goroutines. + synctest.Wait() + + actualFailure, actualStdout := lc.stop(tCtx.Logger()) + actualStdout = logHeaderRE.ReplaceAllString(actualStdout, "$1]") + assert.Equal(tCtx, tc.expectStdout, actualStdout, "report") + actualFailure = logHeaderRE.ReplaceAllString(actualFailure, "$1]") + assert.Equal(tCtx, tc.expectFailure, actualFailure, "failure message") + }) + } +} + +var logHeaderRE = regexp.MustCompile(`(?m)^(\s*).*logcheck.go:[[:digit:]]+\]`) diff --git a/test/e2e/invariants/logcheck/waitgroup.go b/test/e2e/invariants/logcheck/waitgroup.go new file mode 100644 index 00000000000..d9413f673d4 --- /dev/null +++ b/test/e2e/invariants/logcheck/waitgroup.go @@ -0,0 +1,91 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logcheck + +import ( + "sync" +) + +// waitGroup, in contrast to sync.WaitGroup, supports races between +// go and wait. Once wait is called, no new goroutines are started. +type waitGroup struct { + mutex sync.Mutex + cond *sync.Cond + + shuttingDown bool + running int +} + +func (wg *waitGroup) wait() { + wg.mutex.Lock() + defer wg.mutex.Unlock() + + wg.shuttingDown = true + for wg.running > 0 { + wg.cond.Wait() + } +} + +func (wg *waitGroup) add(n int) bool { + wg.mutex.Lock() + defer wg.mutex.Unlock() + + if wg.shuttingDown { + return false + } + + wg.running += n + return true +} + +func (wg *waitGroup) done() { + wg.mutex.Lock() + defer wg.mutex.Unlock() + + wg.running-- + wg.cond.Broadcast() +} + +// goIfNotShuttingDown executes the callback in a goroutine if the wait group +// is not already shutting down. It always calls the cleanup callback once, either way. +func (wg *waitGroup) goIfNotShuttingDown(cleanup, cb func()) { + wg.mutex.Lock() + defer wg.mutex.Unlock() + + if cleanup == nil { + cleanup = func() {} + } + + if wg.shuttingDown { + // Clean up directly. + cleanup() + return + } + + wg.running++ + go func() { + defer wg.done() + defer cleanup() + cb() + }() +} + +func newWaitGroup() *waitGroup { + var wg waitGroup + wg.cond = sync.NewCond(&wg.mutex) + return &wg +} diff --git a/test/e2e/storage/podlogs/podlogs.go b/test/e2e/storage/podlogs/podlogs.go index da1c35fcc00..7002bb9ef44 100644 --- a/test/e2e/storage/podlogs/podlogs.go +++ b/test/e2e/storage/podlogs/podlogs.go @@ -41,20 +41,36 @@ import ( ) // LogOutput determines where output from CopyAllLogs goes. +// +// Error messages about receiving log output is kept +// separate from the log output and optionally goes to StatusWriter +// +// The log output can go to one or more possible destinations. type LogOutput struct { - // If not nil, errors will be logged here. + // If not nil, errors encountered will be logged here. StatusWriter io.Writer - // If not nil, all output goes to this writer with "/:" as prefix. + // If not nil, all container output goes to this writer with "/:" as prefix. LogWriter io.Writer // Base directory for one log file per container. - // The full path of each log file will be -.log. + // The full path of each log file will be -.log, + // if not empty. LogPathPrefix string + + // LogOpen, if not nil, gets called whenever log output watching starts for + // a certain container. Returning nil means that the output can be discarded + // unless it gets written elsewhere. + // + // The container's stdout and stderr output get written to the returned writer. + // The writer gets closed once all output is processed if the writer implements io.Closer. + // Each write is a single line, including a newline. + // Write errors are ignored. + LogOpen func(podName, containerName string) io.Writer } // Matches harmless errors from pkg/kubelet/kubelet_pods.go. -var expectedErrors = regexp.MustCompile(`container .* in pod .* is (terminated|waiting to start|not available)|the server could not find the requested resource`) +var expectedErrors = regexp.MustCompile(`container .* in pod .* is (terminated|waiting to start|not available)|the server could not find the requested resource|context canceled`) // CopyPodLogs is basically CopyPodLogs for all current or future pods in the given namespace ns func CopyAllLogs(ctx context.Context, cs clientset.Interface, ns string, to LogOutput) error { @@ -92,7 +108,7 @@ func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string FieldSelector: fmt.Sprintf("metadata.name=%s", podName), } } - watcher, err := cs.CoreV1().Pods(ns).Watch(context.TODO(), options) + watcher, err := cs.CoreV1().Pods(ns).Watch(ctx, options) if err != nil { return fmt.Errorf("cannot create Pod event watcher: %w", err) @@ -104,15 +120,17 @@ func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string active := map[string]bool{} // Key is pod/container/container-id, true if we have ever started to capture its output. started := map[string]bool{} + // Key is pod/container/container-id, value the time stamp of the last log line that has been seen. + latest := map[string]*meta.Time{} check := func() { m.Lock() defer m.Unlock() - pods, err := cs.CoreV1().Pods(ns).List(context.TODO(), options) + pods, err := cs.CoreV1().Pods(ns).List(ctx, options) if err != nil { - if to.StatusWriter != nil { - fmt.Fprintf(to.StatusWriter, "ERROR: get pod list in %s: %s\n", ns, err) + if ctx.Err() == nil && to.StatusWriter != nil { + _, _ = fmt.Fprintf(to.StatusWriter, "ERROR: get pod list in %s: %s\n", ns, err) } return } @@ -139,41 +157,28 @@ func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string pod.Status.ContainerStatuses[i].State.Terminated == nil) { continue } - readCloser, err := logsForPod(ctx, cs, ns, pod.ObjectMeta.Name, - &v1.PodLogOptions{ - Container: c.Name, - Follow: true, - }) - if err != nil { - // We do get "normal" errors here, like trying to read too early. - // We can ignore those. - if to.StatusWriter != nil && - expectedErrors.FindStringIndex(err.Error()) == nil { - fmt.Fprintf(to.StatusWriter, "WARNING: pod log: %s: %s\n", name, err) - } - continue - } // Determine where we write. If this fails, we intentionally return without clearing // the active[name] flag, which prevents trying over and over again to // create the output file. - var out io.Writer - var closer io.Closer + var logWithPrefix, logWithoutPrefix, output io.Writer var prefix string + podHandled := false if to.LogWriter != nil { - out = to.LogWriter + podHandled = true + logWithPrefix = to.LogWriter nodeName := pod.Spec.NodeName if len(nodeName) > 10 { nodeName = nodeName[0:4] + ".." + nodeName[len(nodeName)-4:] } prefix = name + "@" + nodeName + ": " - } else { - var err error + } + if to.LogPathPrefix != "" { + podHandled = true filename := to.LogPathPrefix + pod.ObjectMeta.Name + "-" + c.Name + ".log" - err = os.MkdirAll(path.Dir(filename), 0755) - if err != nil { + if err := os.MkdirAll(path.Dir(filename), 0755); err != nil { if to.StatusWriter != nil { - fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create directory for %s: %s\n", filename, err) + _, _ = fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create directory for %s: %s\n", filename, err) } return } @@ -182,26 +187,66 @@ func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { if to.StatusWriter != nil { - fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create file %s: %s\n", filename, err) + _, _ = fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create file %s: %s\n", filename, err) } return } - closer = file - out = file + logWithoutPrefix = file } - go func() { - if closer != nil { - defer closer.Close() + if to.LogOpen != nil { + if writer := to.LogOpen(pod.Name, c.Name); writer != nil { + podHandled = true + output = writer } + } + + if !podHandled { + // No-one is interested in this pod, don't bother... + continue + } + + closeOutput := func() { + // Execute all of these, even if one one of them panics. + defer maybeClose(logWithPrefix) + defer maybeClose(logWithoutPrefix) + defer maybeClose(output) + } + + sinceTime := latest[id] + readCloser, err := logsForPod(ctx, cs, ns, pod.ObjectMeta.Name, + &v1.PodLogOptions{ + Container: c.Name, + Follow: true, + Timestamps: true, + SinceTime: sinceTime, + }) + if err != nil { + closeOutput() + + // We do get "normal" errors here, like trying to read too early. + // We can ignore those. + if to.StatusWriter != nil && + expectedErrors.FindStringIndex(err.Error()) == nil { + _, _ = fmt.Fprintf(to.StatusWriter, "WARNING: pod log: %s: %s\n", name, err) + } + continue + } + + active[name] = true + started[id] = true + + go func() { + defer closeOutput() first := true defer func() { m.Lock() // If we never printed anything, then also skip the final message. if !first { - if prefix != "" { - fmt.Fprintf(out, "%s==== end of pod log ====\n", prefix) - } else { - fmt.Fprintf(out, "==== end of pod log for container %s ====\n", name) + if logWithPrefix != nil { + _, _ = fmt.Fprintf(logWithPrefix, "%s==== end of pod log at %s ====\n", prefix, latest[id]) + } + if logWithoutPrefix != nil { + _, _ = fmt.Fprintf(logWithoutPrefix, "==== end of pod log for container %s at %s ====\n", name, latest[id]) } } active[name] = false @@ -209,6 +254,7 @@ func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string readCloser.Close() }() scanner := bufio.NewScanner(readCloser) + var latestTS time.Time for scanner.Scan() { line := scanner.Text() // Filter out the expected "end of stream" error message, @@ -222,30 +268,62 @@ func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string // Because the same log might be written to multiple times // in different test instances, log an extra line to separate them. // Also provides some useful extra information. - if prefix == "" { - fmt.Fprintf(out, "==== start of pod log for container %s ====\n", name) - } else { - fmt.Fprintf(out, "%s==== start of pod log ====\n", prefix) + since := "(initial part)" + if sinceTime != nil { + since = fmt.Sprintf("since %s", sinceTime) + } + if logWithPrefix != nil { + _, _ = fmt.Fprintf(logWithPrefix, "%s==== start of pod log %s ====\n", prefix, since) + } + if logWithoutPrefix != nil { + _, _ = fmt.Fprintf(logWithoutPrefix, "==== start of pod log for container %s %s ====\n", name, since) } first = false } - fmt.Fprintf(out, "%s%s\n", prefix, line) + index := strings.Index(line, " ") + if index > 0 { + ts, err := time.Parse(time.RFC3339, line[:index]) + if err == nil { + latestTS = ts + // Log output typically has it's own log header with a time stamp, + // so let's strip the PodLogOptions time stamp. + line = line[index+1:] + } + } + if logWithPrefix != nil { + _, _ = fmt.Fprintf(logWithPrefix, "%s%s\n", prefix, line) + } + if logWithoutPrefix != nil { + _, _ = fmt.Fprintln(logWithoutPrefix, line) + } + if output != nil { + _, _ = output.Write([]byte(line + "\n")) + } } } + + if !latestTS.IsZero() { + m.Lock() + defer m.Unlock() + latest[id] = &meta.Time{Time: latestTS} + } }() - active[name] = true - started[id] = true } } } // Watch events to see whether we can start logging - // and log interesting ones. + // and log interesting ones. Also check periodically, + // in case of failures which are not followed by + // some pod change. + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() check() for { select { case <-watcher.ResultChan(): check() + case <-ticker.C: case <-ctx.Done(): return } @@ -255,6 +333,12 @@ func CopyPodLogs(ctx context.Context, cs clientset.Interface, ns, podName string return nil } +func maybeClose(writer io.Writer) { + if closer, ok := writer.(io.Closer); ok { + _ = closer.Close() + } +} + // logsForPod starts reading the logs for a certain pod. If the pod has more than one // container, opts.Container must be set. Reading stops when the context is done. // The stream includes formatted error messages and ends with @@ -276,7 +360,7 @@ func WatchPods(ctx context.Context, cs clientset.Interface, ns string, to io.Wri } }() - pods, err := cs.CoreV1().Pods(ns).Watch(context.Background(), meta.ListOptions{}) + pods, err := cs.CoreV1().Pods(ns).Watch(ctx, meta.ListOptions{}) if err != nil { return fmt.Errorf("cannot create Pod watcher: %w", err) } @@ -286,7 +370,7 @@ func WatchPods(ctx context.Context, cs clientset.Interface, ns string, to io.Wri } }() - events, err := cs.CoreV1().Events(ns).Watch(context.Background(), meta.ListOptions{}) + events, err := cs.CoreV1().Events(ns).Watch(ctx, meta.ListOptions{}) if err != nil { return fmt.Errorf("cannot create Event watcher: %w", err) }