Add the ability for the replicaset controller to read its own writes

This commit is contained in:
Michael Aspinwall 2026-02-24 00:26:11 +00:00
parent c6d1649721
commit f18f0df7fe
6 changed files with 119 additions and 12 deletions

View file

@ -22,19 +22,34 @@ import (
const ReplicaSetControllerSubsystem = "replicaset_controller"
var SortingDeletionAgeRatio = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: ReplicaSetControllerSubsystem,
Name: "sorting_deletion_age_ratio",
Help: "The ratio of chosen deleted pod's ages to the current youngest pod's age (at the time). Should be <2. " +
"The intent of this metric is to measure the rough efficacy of the LogarithmicScaleDown feature gate's effect on " +
"the sorting (and deletion) of pods when a replicaset scales down. This only considers Ready pods when calculating and reporting.",
Buckets: metrics.ExponentialBuckets(0.25, 2, 6),
StabilityLevel: metrics.ALPHA,
},
var (
SortingDeletionAgeRatio = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: ReplicaSetControllerSubsystem,
Name: "sorting_deletion_age_ratio",
Help: "The ratio of chosen deleted pod's ages to the current youngest pod's age (at the time). Should be <2. " +
"The intent of this metric is to measure the rough efficacy of the LogarithmicScaleDown feature gate's effect on " +
"the sorting (and deletion) of pods when a replicaset scales down. This only considers Ready pods when calculating and reporting.",
Buckets: metrics.ExponentialBuckets(0.25, 2, 6),
StabilityLevel: metrics.ALPHA,
},
)
ReplicaSetRequeueSkips = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: ReplicaSetControllerSubsystem,
Name: "stale_sync_skips_total",
Help: "Total number of ReplicaSet syncs skipped due to a stale watch cache.",
StabilityLevel: metrics.ALPHA,
},
[]string{"group", "resource"},
)
)
// Register registers ReplicaSet controller metrics.
func Register(registrationFunc func(metrics.Registerable) error) error {
return registrationFunc(SortingDeletionAgeRatio)
if err := registrationFunc(SortingDeletionAgeRatio); err != nil {
return err
}
return registrationFunc(ReplicaSetRequeueSkips)
}

View file

@ -29,6 +29,7 @@ package replicaset
import (
"context"
"errors"
"fmt"
"reflect"
"sort"
@ -61,6 +62,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/replicaset/metrics"
consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
@ -79,6 +81,17 @@ const (
controllerUIDIndex = "controllerUID"
)
var (
replicaSetGroupResource = schema.GroupResource{
Group: "apps",
Resource: "replicasets",
}
podGroupResource = schema.GroupResource{
Group: "",
Resource: "pods",
}
)
// ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
// in the system with actual running pods.
type ReplicaSetController struct {
@ -120,6 +133,8 @@ type ReplicaSetController struct {
clock clock.PassiveClock
consistencyStore consistencyutil.ConsistencyStore
// Controller specific features; see ReplicaSetControllerFeatures for details.
controllerFeatures ReplicaSetControllerFeatures
}
@ -143,6 +158,32 @@ func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.Repli
if err := metrics.Register(legacyregistry.Register); err != nil {
logger.Error(err, "unable to register metrics")
}
var consistencyStore consistencyutil.ConsistencyStore
var podWriteCallback func(pod *v1.Pod, rs *metav1.OwnerReference)
if utilfeature.DefaultFeatureGate.Enabled(features.StaleControllerConsistencyReplicaSet) {
consistencyStore = consistencyutil.NewConsistencyStore(map[schema.GroupResource]consistencyutil.LastSyncRVGetter{
podGroupResource: podInformer.Informer().GetStore(),
replicaSetGroupResource: rsInformer.Informer().GetStore(),
})
podWriteCallback = func(pod *v1.Pod, rs *metav1.OwnerReference) {
if rs == nil {
return
}
consistencyStore.WroteAt(
types.NamespacedName{
Namespace: pod.Namespace,
Name: rs.Name,
},
rs.UID,
podGroupResource,
pod.ResourceVersion,
)
}
} else {
consistencyStore = consistencyutil.NewNoopConsistencyStore()
}
return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas,
apps.SchemeGroupVersion.WithKind("ReplicaSet"),
"replicaset_controller",
@ -150,16 +191,18 @@ func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.Repli
controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
OnWrite: podWriteCallback,
},
eventBroadcaster,
DefaultReplicaSetControllerFeatures(),
consistencyStore,
)
}
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster, controllerFeatures ReplicaSetControllerFeatures) *ReplicaSetController {
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster, controllerFeatures ReplicaSetControllerFeatures, consistencyStore consistencyutil.ConsistencyStore) *ReplicaSetController {
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
@ -174,6 +217,7 @@ func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetIn
),
clock: clock.RealClock{},
controllerFeatures: controllerFeatures,
consistencyStore: consistencyStore,
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -405,6 +449,10 @@ func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) {
logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs))
rsc.consistencyStore.Clear(types.NamespacedName{
Namespace: rs.Namespace,
Name: rs.Name,
}, rs.UID)
// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
rsc.expectations.DeleteExpectations(logger, key)
@ -715,9 +763,22 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
if err != nil {
return err
}
rsNamespacedName := types.NamespacedName{Namespace: namespace, Name: name}
if err := rsc.consistencyStore.EnsureReady(rsNamespacedName); err != nil {
var consistencyErr *consistencyutil.ConsistencyError
if errors.As(err, &consistencyErr) {
metrics.ReplicaSetRequeueSkips.WithLabelValues(
consistencyErr.GroupResource.Group,
consistencyErr.GroupResource.Resource,
).Inc()
}
return err
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if apierrors.IsNotFound(err) {
logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key)
rsc.consistencyStore.Clear(rsNamespacedName, "")
rsc.expectations.DeleteExpectations(logger, key)
return nil
}
@ -769,6 +830,12 @@ func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string)
// Returning an error causes a requeue without forcing a hotloop
return err
}
rsc.consistencyStore.WroteAt(
types.NamespacedName{Name: rs.Name, Namespace: rs.Namespace},
rs.UID,
replicaSetGroupResource,
updatedRS.ResourceVersion,
)
if manageReplicasErr != nil {
return manageReplicasErr
}

View file

@ -36,6 +36,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/replicaset"
consistencyutil "k8s.io/kubernetes/pkg/controller/util/consistency"
)
const (
@ -68,6 +69,10 @@ func NewReplicationManager(ctx context.Context, podInformer coreinformers.PodInf
// ReplicaSets do support this field, which is then propagated to Deployments for higher-level features.
EnableStatusTerminatingReplicas: false,
},
// TODO: Replication controller does not currently support stale controller consistency.
// In order to support stale controller consistency, we would need to parameterize the metrics
// and resource types passed to the consistency store.
consistencyutil.NewNoopConsistencyStore(),
),
}
}

View file

@ -941,6 +941,13 @@ const (
// prior to running a reconcile on the same object.
StaleControllerConsistencyJob featuregate.Feature = "StaleControllerConsistencyJob"
// owner: @michaelasp
// kep: http://kep.k8s.io/5647
//
// Introduces the ability for the ReplicaSet controller to be able to read its writes
// prior to running a reconcile on the same object.
StaleControllerConsistencyReplicaSet featuregate.Feature = "StaleControllerConsistencyReplicaSet"
// owner: @liggitt
//
// Mitigates spurious statefulset rollouts due to controller revision comparison mismatches
@ -1791,6 +1798,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
},
StaleControllerConsistencyReplicaSet: {
{Version: version.MustParse("1.36"), Default: true, PreRelease: featuregate.Beta},
},
StatefulSetSemanticRevisionComparison: {
// This is a mitigation for a 1.34 regression due to serialization differences that cannot be feature-gated,
// so this mitigation should not auto-disable even if emulating versions prior to 1.34 with --emulation-version.
@ -2416,6 +2427,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
StaleControllerConsistencyJob: {featuregate.Feature(clientfeatures.AtomicFIFO)},
StaleControllerConsistencyReplicaSet: {featuregate.Feature(clientfeatures.AtomicFIFO)},
StatefulSetSemanticRevisionComparison: {},
StorageCapacityScoring: {},

View file

@ -180,6 +180,7 @@
| SizeBasedListCostEstimate | :ballot_box_with_check:&nbsp;1.34+ | | | 1.34 | | | | [code](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbSizeBasedListCostEstimate%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StaleControllerConsistencyDaemonSet | :ballot_box_with_check:&nbsp;1.36+ | | | 1.36 | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyDaemonSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StaleControllerConsistencyJob | :ballot_box_with_check:&nbsp;1.36+ | | | 1.36 | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyJob%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StaleControllerConsistencyReplicaSet | :ballot_box_with_check:&nbsp;1.36+ | | | 1.36 | | | AtomicFIFO | [code](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyReplicaSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStaleControllerConsistencyReplicaSet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StatefulSetSemanticRevisionComparison | :ballot_box_with_check:&nbsp;1.0+ | | | 1.0 | | | | [code](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStatefulSetSemanticRevisionComparison%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StorageCapacityScoring | | | 1.33 | | | | | [code](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageCapacityScoring%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
| StorageNamespaceIndex | :ballot_box_with_check:&nbsp;1.30+ | | | 1.301.32 | | 1.33 | | [code](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbStorageNamespaceIndex%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |

View file

@ -1737,6 +1737,12 @@
lockToDefault: false
preRelease: Beta
version: "1.36"
- name: StaleControllerConsistencyReplicaSet
versionedSpecs:
- default: true
lockToDefault: false
preRelease: Beta
version: "1.36"
- name: StatefulSetSemanticRevisionComparison
versionedSpecs:
- default: true