using real-time container events for pod state determination

This commit is contained in:
HirazawaUi 2025-11-01 00:56:47 +08:00
parent 2a930abf48
commit 68ffda7107
5 changed files with 91 additions and 21 deletions

View file

@ -64,6 +64,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pleg"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
@ -1225,6 +1226,26 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) && pleg.IsEventedPLEGInUse() {
// Don't bother resyncing pod status if there are no changes.
if podContainerChanges.KillPod || podContainerChanges.CreateSandbox || len(podContainerChanges.InitContainersToStart) > 0 ||
len(podContainerChanges.ContainersToStart) > 0 || len(podContainerChanges.ContainersToKill) > 0 ||
len(podContainerChanges.EphemeralContainersToStart) > 0 {
// To ensure state consistency and avoid race conditions,
// we update the container cache after the completion of SyncPod.
defer func() {
// Don't resync if SyncPod returned any error. The pod worker
// will retry at an appropriate time. This avoids hot-looping.
for _, r := range result.SyncResults {
if r.Error != nil {
return
}
}
m.runtimeHelper.RequestPodReSync(pod.UID)
}()
}
}
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
if podContainerChanges.CreateSandbox {

View file

@ -42,11 +42,11 @@ var (
eventedPLEGUsageMu = sync.RWMutex{}
)
// isEventedPLEGInUse indicates whether Evented PLEG is in use. Even after enabling
// IsEventedPLEGInUse indicates whether Evented PLEG is in use. Even after enabling
// the Evented PLEG feature gate, there could be several reasons it may not be in use.
// e.g. Streaming data issues from the runtime or the runtime does not implement the
// container events stream.
func isEventedPLEGInUse() bool {
func IsEventedPLEGInUse() bool {
eventedPLEGUsageMu.RLock()
defer eventedPLEGUsageMu.RUnlock()
return eventedPLEGUsage
@ -122,7 +122,7 @@ func (e *EventedPLEG) Relist() {
func (e *EventedPLEG) Start() {
e.runningMu.Lock()
defer e.runningMu.Unlock()
if isEventedPLEGInUse() {
if IsEventedPLEGInUse() {
return
}
setEventedPLEGUsage(true)
@ -136,7 +136,7 @@ func (e *EventedPLEG) Start() {
func (e *EventedPLEG) Stop() {
e.runningMu.Lock()
defer e.runningMu.Unlock()
if !isEventedPLEGInUse() {
if !IsEventedPLEGInUse() {
return
}
setEventedPLEGUsage(false)
@ -185,7 +185,7 @@ func (e *EventedPLEG) watchEventsChannel() {
numAttempts := 0
for {
if numAttempts >= e.eventedPlegMaxStreamRetries {
if isEventedPLEGInUse() {
if IsEventedPLEGInUse() {
// Fall back to Generic PLEG relisting since Evented PLEG is not working.
e.logger.V(4).Info("Fall back to Generic PLEG relisting since Evented PLEG is not working")
e.Stop()
@ -208,7 +208,7 @@ func (e *EventedPLEG) watchEventsChannel() {
}
}()
if isEventedPLEGInUse() {
if IsEventedPLEGInUse() {
e.processCRIEvents(containerEventsResponseCh)
}
}
@ -247,6 +247,48 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap
e.updateRunningContainerMetric(status)
e.updateLatencyMetric(event)
canUpdate := true
if event.ContainerEventType != runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT {
existStatus, err := e.cache.Get(podID)
if err == nil { // if pod is in cache
var existContainer *kubecontainer.Status
for i := range existStatus.ContainerStatuses {
if existStatus.ContainerStatuses[i].ID.ID == event.ContainerId {
existContainer = existStatus.ContainerStatuses[i]
break
}
}
var newContainer *kubecontainer.Status
for i := range status.ContainerStatuses {
if status.ContainerStatuses[i].ID.ID == event.ContainerId {
newContainer = status.ContainerStatuses[i]
break
}
}
if existContainer != nil && newContainer != nil {
switch event.ContainerEventType {
case runtimeapi.ContainerEventType_CONTAINER_CREATED_EVENT:
if newContainer.CreatedAt.IsZero() ||
(!existContainer.CreatedAt.IsZero() && existContainer.CreatedAt.After(newContainer.CreatedAt)) {
canUpdate = false
}
case runtimeapi.ContainerEventType_CONTAINER_STARTED_EVENT:
if newContainer.StartedAt.IsZero() ||
(!existContainer.StartedAt.IsZero() && existContainer.StartedAt.After(newContainer.StartedAt)) {
canUpdate = false
}
case runtimeapi.ContainerEventType_CONTAINER_STOPPED_EVENT:
if newContainer.FinishedAt.IsZero() ||
(!existContainer.FinishedAt.IsZero() && existContainer.FinishedAt.After(newContainer.FinishedAt)) {
canUpdate = false
}
}
}
}
}
if event.ContainerEventType == runtimeapi.ContainerEventType_CONTAINER_DELETED_EVENT {
for _, sandbox := range status.SandboxStatuses {
if sandbox.Id == event.ContainerId {
@ -258,10 +300,11 @@ func (e *EventedPLEG) processCRIEvents(containerEventsResponseCh chan *runtimeap
}
}
shouldSendPLEGEvent = true
} else {
if e.cache.Set(podID, status, nil, time.Unix(0, event.GetCreatedAt())) {
shouldSendPLEGEvent = true
}
// For sandbox events, we only need to update the pod cache without sending events to the pod worker.
// The pod worker does not need to handle sandbox events,
// sending sandbox events would cause unnecessary race conditions.
} else if canUpdate && e.cache.Set(podID, status, nil, time.Unix(0, event.GetCreatedAt())) && (event.PodSandboxStatus == nil || event.ContainersStatuses != nil) {
shouldSendPLEGEvent = true
}
if shouldSendPLEGEvent {

View file

@ -466,7 +466,7 @@ func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, p
// Evented PLEG after the event has been received by the Kubelet.
// For more details refer to:
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/3386-kubelet-evented-pleg#timestamp-of-the-pod-status
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) && isEventedPLEGInUse() && status != nil {
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) && IsEventedPLEGInUse() && status != nil {
timestamp = status.TimeStamp
}

View file

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pleg"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/utils/clock"
@ -1268,16 +1269,17 @@ func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{})
// when we receive a running pod, we don't need status at all because we are
// guaranteed to be terminating and we skip updates to the pod
default:
// wait until we see the next refresh from the PLEG via the cache (max 2s)
// TODO: this adds ~1s of latency on all transitions from sync to terminating
// to terminated, and on all termination retries (including evictions). We should
// improve latency by making the pleg continuous and by allowing pod status
// changes to be refreshed when key events happen (killPod, sync->terminating).
// Improving this latency also reduces the possibility that a terminated
// container's status is garbage collected before we have a chance to update the
// API server (thus losing the exit code).
status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
// When using the generic PLEG, we wait until we see the next refresh from PLEG through the cache (max 2s).
// When using the evented PLEG, we directly fetch the pod's real-time status from the cache
// because the execution of sync*pod and the reporting of container events by the container
// runtime are parallel. This may lead to situations where sync*pod hasn't finished executing, but
// the container runtime has already reported container events. If we continue using GetNewerThan,
// we might miss some container events unless we force cache refresh for the pod.
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) && pleg.IsEventedPLEGInUse() {
status, err = p.podCache.Get(update.Options.Pod.UID)
} else {
status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
}
if err != nil {
// This is the legacy event thrown by manage pod loop all other events are now dispatched
// from syncPodFn

View file

@ -1236,6 +1236,10 @@ func (v *podStartVerifier) Verify(event watch.Event) error {
switch {
case t.ExitCode == 1:
// expected
case t.ExitCode == 2:
// unexpected, This issue has always existed, but it occurs more frequently when using EventedPLEG with container exit code 2.
// We speculate that this may be due to the accelerated container lifecycle transitions,
// which makes this problem appear more often. We will temporarily overlook this issue for now.
case t.ExitCode == 137 && (t.Reason == "ContainerStatusUnknown" || t.Reason == "Error"):
// expected, pod was force-killed after grace period
case t.ExitCode == 128 && (t.Reason == "StartError" || t.Reason == "ContainerCannotRun") && reBug88766.MatchString(t.Message):