Merge pull request #136583 from michaelasp/watchListRVEnforcement

Ensure resource version callbacks aren't called preemptively in reflector
This commit is contained in:
Kubernetes Prow Robot 2026-01-29 06:53:48 +05:30 committed by GitHub
commit ebc5660fa5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 124 additions and 19 deletions

View file

@ -520,6 +520,8 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
// if w is already initialized, it must be past any synthetic non-rv-ordered added events
propagateRVFromStart := true
if w == nil {
timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
@ -532,6 +534,11 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
if options.ResourceVersion == "" || options.ResourceVersion == "0" {
// if we're starting the watch at a resource version that will get synthetic ADDED events in non-rv order,
// wait until we're through that set of events before propagating the RV
propagateRVFromStart = false
}
w, err = r.listerWatcher.WatchWithContext(ctx, options)
if err != nil {
@ -548,7 +555,25 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
}
}
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string, eventReceivedBesidesAdded bool) {
// We update the resource version in the store only if we have received at least one event that is
// not an added event, or if the resource version has been set previously. This is because we can
// encounter 2 scenarios:
// 1. The watch is started from a resource version specified by the LastSyncResourceVersion field.
// In this case, we can update the resource version in the store without worrying about it being
// out of order since we will not receive any synthetic added events for resources that may be
// out of order.
// 2. The watch is started when the LastSyncResourceVersion field is empty. In this case, we may not
// update the LastSyncResourceVersion until we receive at least one event that is not an added
// event, since that is the only way to ensure that the watch has exited the initial list phase.
if propagateRVFromStart || eventReceivedBesidesAdded {
r.setLastSyncResourceVersion(rv)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(rv)
}
}
},
r.clock, resyncerrc)
// handleWatch always stops the watcher. So we don't need to here.
// Just set it to nil to trigger a retry on the next loop.
@ -775,7 +800,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
return nil, err
}
watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv },
func(rv string, eventReceivedBesidesAdded bool) {
if eventReceivedBesidesAdded {
resourceVersion = rv
}
},
r.clock, make(chan error))
if err != nil {
w.Stop() // stop and retry with clean state
@ -832,7 +861,7 @@ func handleListWatch(
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
setLastSyncResourceVersion func(string, bool),
clock clock.Clock,
errCh chan error,
) (bool, error) {
@ -853,7 +882,7 @@ func handleWatch(
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
setLastSyncResourceVersion func(string, bool),
clock clock.Clock,
errCh chan error,
) error {
@ -881,12 +910,13 @@ func handleAnyWatch(
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
setLastSyncResourceVersion func(string, bool),
exitOnWatchListBookmarkReceived bool,
clock clock.Clock,
errCh chan error,
) (bool, error) {
watchListBookmarkReceived := false
eventReceivedBesidesAdded := false
eventCount := 0
logger := klog.FromContext(ctx)
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived)
@ -946,6 +976,7 @@ loop:
utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object)
}
case watch.Modified:
eventReceivedBesidesAdded = true
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object)
@ -954,22 +985,22 @@ loop:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
eventReceivedBesidesAdded = true
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object)
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
eventReceivedBesidesAdded = true
if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
watchListBookmarkReceived = true
}
default:
utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event)
}
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
}
// when eventReceivedBesidesAdded is true, that indicates we are definitely past any initial synthetic Added events
setLastSyncResourceVersion(resourceVersion, eventReceivedBesidesAdded)
eventCount++
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
stopWatcher = false

View file

@ -235,7 +235,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
return resultCh
},
}
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc)
require.Equal(t, err, errorStopRequested)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
@ -268,7 +268,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
return resultCh
},
}
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc)
require.Equal(t, err, errorStopRequested)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
@ -294,7 +294,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
}
// Simulate the result channel being closed by the producer before handleWatch is called.
close(resultCh)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc)
require.Equal(t, &VeryShortWatchError{Name: g.name}, err)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
@ -325,7 +325,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
return resultCh
},
}
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc)
require.Equal(t, &VeryShortWatchError{Name: g.name}, err)
// Ensure handleWatch calls ResultChan and Stop
assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
@ -338,7 +338,7 @@ func TestReflectorWatchHandler(t *testing.T) {
// watching after all the events have been consumed.
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
setLastSyncResourceVersion := func(rv string) {
setLastSyncResourceVersion := func(rv string, _ bool) {
g.setLastSyncResourceVersion(rv)
if rv == "32" {
cancel(errors.New("LastSyncResourceVersion is 32"))
@ -398,7 +398,7 @@ func TestReflectorStopWatch(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
cancel(errors.New("don't run"))
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc)
require.Equal(t, err, errorStopRequested)
}

View file

@ -168,6 +168,68 @@ func TestInitialEventsEndBookmarkTicker(t *testing.T) {
})
}
func TestWatchListResourceVersion(t *testing.T) {
scenarios := []struct {
name string
watchEvents []watch.Event
expectedResourceVersion string
}{
{
name: "empty resource version",
watchEvents: []watch.Event{},
expectedResourceVersion: "",
},
{
name: "empty resource version without bookmark event",
watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p1", "1")}},
expectedResourceVersion: "",
},
{
name: "non empty resource version with bookmark event and initial events annotation",
watchEvents: []watch.Event{
{Type: watch.Added, Object: makePod("p1", "1")},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "2",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
}},
},
expectedResourceVersion: "2",
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
scenario := scenario
_, ctx := ktesting.NewTestContext(t)
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
lw, s, r, ctx, cancel := testData(ctx)
go func() {
for _, e := range scenario.watchEvents {
lw.fakeWatcher.Action(e.Type, e.Object)
}
cancel(errors.New("done"))
}()
curRV := ""
trackRV := func(rv string, eventReceivedBesidesAdded bool) {
if eventReceivedBesidesAdded {
curRV = rv
}
}
_, err := handleListWatch(ctx, time.Now(), lw, s, r.expectedType, r.expectedGVK, r.name, r.typeDescription, trackRV, r.clock, nevererrc)
if err != nil && !errors.Is(err, errorStopRequested) {
t.Errorf("expected errorStopRequested, got %v", err)
}
require.Equal(t, scenario.expectedResourceVersion, curRV)
})
}
}
func TestWatchList(t *testing.T) {
scenarios := []struct {
name string
@ -632,7 +694,7 @@ func testData(ctx context.Context) (*fakeListWatcher, Store, *Reflector, context
s := NewStore(MetaNamespaceKeyFunc)
lw := &fakeListWatcher{
fakeWatcher: watch.NewFake(),
stop: func() {
stopFunc: func() {
cancel(errors.New("time to stop"))
},
}
@ -648,7 +710,7 @@ type fakeListWatcher struct {
watchCounter int
closeAfterWatchRequests int
closeAfterListRequests int
stop func()
stopFunc func()
requestOptions []metav1.ListOptions
@ -660,7 +722,7 @@ func (lw *fakeListWatcher) List(options metav1.ListOptions) (runtime.Object, err
lw.listCounter++
lw.requestOptions = append(lw.requestOptions, options)
if lw.listCounter == lw.closeAfterListRequests {
lw.stop()
lw.stopFunc()
}
if lw.customListResponse != nil {
return lw.customListResponse, nil
@ -672,7 +734,7 @@ func (lw *fakeListWatcher) Watch(options metav1.ListOptions) (watch.Interface, e
lw.watchCounter++
lw.requestOptions = append(lw.requestOptions, options)
if lw.watchCounter == lw.closeAfterWatchRequests {
lw.stop()
lw.stopFunc()
}
if lw.watchOptionsPredicate != nil {
if err := lw.watchOptionsPredicate(options); err != nil {
@ -690,3 +752,15 @@ func (lw *fakeListWatcher) StopAndRecreateWatch() {
lw.fakeWatcher.Stop()
lw.fakeWatcher = watch.NewFake()
}
func (lw *fakeListWatcher) Stop() {
lw.lock.Lock()
defer lw.lock.Unlock()
lw.fakeWatcher.Stop()
}
func (lw *fakeListWatcher) ResultChan() <-chan watch.Event {
lw.lock.Lock()
defer lw.lock.Unlock()
return lw.fakeWatcher.ResultChan()
}