add tests to verify error aggregation for storage GetList

This commit is contained in:
Abu Kashem 2025-09-25 12:29:06 -04:00 committed by Abu Kashem
parent 288a241ce0
commit d6315f6fb0
5 changed files with 493 additions and 12 deletions

View file

@ -82,14 +82,18 @@ type corruptObjErrAggregator struct {
}
func (a *corruptObjErrAggregator) Append(key string, err error) bool {
if len(a.errs) >= a.maxCount {
// add a sentinel error to indicate there are more
a.errs = append(a.errs, errTooMany)
if a.abortErr != nil || len(a.errs) >= a.maxCount {
return true
}
var corruptObjErr *corruptObjectError
if errors.As(err, &corruptObjErr) {
a.errs = append(a.errs, storage.NewCorruptObjError(key, corruptObjErr))
if len(a.errs) >= a.maxCount {
// add a sentinel error to indicate there are more
a.errs = append(a.errs, errTooMany)
return true
}
return false
}

View file

@ -190,6 +190,23 @@ func TestKeySchema(t *testing.T) {
storagetesting.RunTestKeySchema(ctx, t, store)
}
func TestGetListWithErrorAggregation(t *testing.T) {
storagetesting.RunTestGetListWithErrorAggregation(t, func(t *testing.T) (context.Context, *storagetesting.ErrorAggregatorFactory, storagetesting.InterfaceWithCorruptTransformer) {
ctx, s, _ := testSetup(t)
var store storage.Interface = s
if utilfeature.DefaultFeatureGate.Enabled(features.AllowUnsafeMalformedObjectDeletion) {
store = NewStoreWithUnsafeCorruptObjectDeletion(store, s.groupResource)
}
// wrap the original error aggregator the store is using so the test can
// keep track of the values GetList passes to the original aggregator.
factory := storagetesting.WrapListErrorAggregatorFactory(s.listErrAggrFactory)
s.listErrAggrFactory = factory.WithRecorder()
return ctx, factory, &storeWithCorruptedTransformer{Interface: store, store: s}
})
}
type storeWithPrefixTransformer struct {
*store
}
@ -210,20 +227,32 @@ type corruptedTransformer struct {
}
func (f *corruptedTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, stale bool, err error) {
return nil, true, &corruptObjectError{err: fmt.Errorf("bits flipped"), errType: untransformable}
out, stale, err = f.Transformer.TransformFromStorage(ctx, data, dataCtx)
switch {
case err != nil: // unexpected error
return out, stale, err
case strings.Contains(string(data), storagetesting.CorruptErrKey):
return out, stale, &corruptObjectError{err: fmt.Errorf("bits flipped"), errType: untransformable}
case strings.Contains(string(data), storagetesting.UnexpectedErrKey):
return out, stale, fmt.Errorf("bits flipped")
}
return out, stale, err
}
type storeWithCorruptedTransformer struct {
*store
storage.Interface
// we need the original *store instance to mutate the transformer
store *store
}
func (s *storeWithCorruptedTransformer) CorruptTransformer() func() {
ct := &corruptedTransformer{Transformer: s.transformer}
s.transformer = ct
s.watcher.transformer = ct
ct := &corruptedTransformer{Transformer: s.store.transformer}
s.store.transformer = ct
s.store.watcher.transformer = ct
return func() {
s.transformer = ct.Transformer
s.watcher.transformer = ct.Transformer
s.store.transformer = ct.Transformer
s.store.watcher.transformer = ct.Transformer
}
}

View file

@ -114,7 +114,7 @@ func TestProgressNotify(t *testing.T) {
func TestWatchWithUnsafeDelete(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AllowUnsafeMalformedObjectDeletion, true)
ctx, store, _ := testSetup(t)
storagetesting.RunTestWatchWithUnsafeDelete(ctx, t, &storeWithCorruptedTransformer{store})
storagetesting.RunTestWatchWithUnsafeDelete(ctx, t, &storeWithCorruptedTransformer{Interface: store, store: store})
}
// TestWatchDispatchBookmarkEvents makes sure that

View file

@ -44,6 +44,7 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/ptr"
)
@ -2839,6 +2840,444 @@ func RunTestGetListRecursivePrefix(ctx context.Context, t *testing.T, store stor
}
}
// This wraps the current list error aggregator factory to enable the test to
// record the values GetList passed to the error aggregator
func WrapListErrorAggregatorFactory(factory func() storage.ListItemErrors) *ErrorAggregatorFactory {
return &ErrorAggregatorFactory{factory: factory}
}
// This wraps the actual error aggregation factory in use by the store implemenation
// so we can record the values GetList passes to its error aggregator.
type ErrorAggregatorFactory struct {
// this is the original error aggregation factory in use by the store implementation
factory func() storage.ListItemErrors
// every time GetList invokes the factory, we wrap it by a recorder, this
// is stored here so the test can access the values recorded
// TODO: this forces the test be serial, but we are okay for now since this
// is used by a single test that calls GetList serially, in the future we could
// change the signature of the factory to take the request context object, this
// will allow the test to store its recorder (ListErrorAggregator) into the
// request Context object, to enable multiple tests to run concurrently
recorder *listErrorAggregatorRecorder
}
func (f *ErrorAggregatorFactory) WithRecorder() func() storage.ListItemErrors {
return func() storage.ListItemErrors {
f.recorder = &listErrorAggregatorRecorder{ListItemErrors: f.factory()}
return f.recorder
}
}
// it wraps the actual ListErrorAggregator in use by the store and
// records the values GetList passes to the the error aggregator
type listErrorAggregatorRecorder struct {
// the ListItemErrors we are going to observe
storage.ListItemErrors
// Keep track of the values GetList passes to the aggregator
errs []error
keys []string
}
func (f *listErrorAggregatorRecorder) Append(key string, err error) bool {
f.errs = append(f.errs, err)
f.keys = append(f.keys, key)
return f.ListItemErrors.Append(key, err)
}
const (
// if the following annotation is present, the object is marked to become corrupt
CorruptErrKey = "testing.transformer.k8s.io/corrupt-error"
// if the following annotation is present, the object is marked
// to fail with an unexpected non-corrupt error
UnexpectedErrKey = "testing.transformer.k8s.io/unexpected-error"
)
// RunTestGetListWithErrorAggregation tests aggregation of errors while the list operation is in progress
func RunTestGetListWithErrorAggregation(t *testing.T, newStoreFn func(*testing.T) (context.Context, *ErrorAggregatorFactory, InterfaceWithCorruptTransformer)) {
prefix := "/pods/ns/"
// the test creates n objects and assigns each object unique id i ie. 1 <= i <= n
objNameFn := func(id int) string {
// pad with leading zeros so that for i, j where i < j, object
// foo-{j} follows object foo-{i} in lexicographical order
return fmt.Sprintf("foo-%06d", id)
}
keyFn := func(id int) string {
return fmt.Sprintf("%s%s", prefix, objNameFn(id))
}
tests := []struct {
name string
featureEnabled bool
// number of Pod objects to be created when the test starts, the
// objects are named as "foo-{i}" where 1 <= i <= n
n int
// the function decides whether the object represented
// by the given id should be marked to become corrupt or
// fail to transform with an unexpected error
corrupter func(id int) string
// verifies the result from GetList
// recorder: records the values GetList passes to the error aggregator
// list: result of GetList operation is saved into list
// err: error returned from GetList
verifier func(t *testing.T, _ *listErrorAggregatorRecorder, list *example.PodList, err error)
}{
{
name: "feature disabled, should maintain backward compatibility",
// when the feature is disabled, we should maintain
// backward compatibility, which is to abort on the first error
featureEnabled: false,
// we initially create n=7 objects with ids {1, 2, 3 ... 7}, they are put in the following disjoint sets:
// - good: {1, 3, 5, 7}, these objects will never become corrupt
// - corrupt: {2, 4, 6}, these objects are marked to become corrupt
n: 7,
corrupter: func(i int) string {
if i%2 == 0 {
return CorruptErrKey
}
return ""
},
// the following sequence of events are expected to occur in order while retrieving the n objects:
//
// -- |- 1: no error, successfully decoded
// |- 2: yields an expected corruptObjErr, GetList aborts immediately
//
// a) GetList encounters corruptObjErr while retrieving {2} and immediately aborts
// b) GetList successfully decodes {1}
verifier: func(t *testing.T, recorder *listErrorAggregatorRecorder, list *example.PodList, err error) {
// a) the error returned from GetList should not be wrapped
// nolint:errorlint // the aggregator should return the error as is
intErr, ok := err.(storage.InternalError)
if !ok {
t.Fatalf("expected the error to be %T, but got: %#v", storage.InternalError{}, err)
}
if want, got := fmt.Sprintf(`unable to transform key "%s"`, keyFn(2)), intErr.Error(); !strings.HasPrefix(got, want) {
t.Errorf("expected the error to start with %q, but got: %v", want, got)
}
// the error observed by the recorder should be
// the same error returned by GetList
if want, got := 1, len(recorder.errs); want != got {
t.Fatalf("expected exactly %d error(s), but got: %d", want, got)
}
// nolint:errorlint // the aggregator in use should return
// the error as is, so the test is asserting with identity.
if want, got := err, recorder.errs[0]; want != got {
t.Errorf("expected the aggregator to return the original error as is: %v, but got: %v", want, got)
}
// b) GetList successfully decodes {1}
if want, got := 1, len(list.Items); want != got {
t.Errorf("expected the list to have %d item(s), but got: %d", want, got)
}
if want, got := objNameFn(1), list.Items[0].Name; want != got {
t.Errorf("expected an object name of %q, but got: %q", want, got)
}
},
},
{
name: "feature enabled, first error is an unexpected error, no aggregation expected",
featureEnabled: true,
// we initially create n=7 objects with ids {1, 2, 3 ... 7}, they are put in the following disjoint sets:
// - good: {1, 3, 5, 7}, these objects will never become corrupt
// - unexpected: {2}, this object is marked to yield an unexpected error (not corruptObjErr)
// - corrupt: {4, 6}, these objects are marked to become corrupt
n: 7,
corrupter: func(i int) string {
switch {
case i == 2:
return UnexpectedErrKey
case i%2 == 0:
return CorruptErrKey
default:
return ""
}
},
// the following sequence of events are expected to occur in order while retrieving the n objects:
//
// -- |- 1: no error, successfully decoded
// |- 2: unexpected error, GetList aborts
//
// a) GetList encounters an unexpected (not corruptObjErr)
// error while retrieving {2} and immediately aborts
// b) GetList successfully decodes {1}
verifier: func(t *testing.T, recorder *listErrorAggregatorRecorder, list *example.PodList, err error) {
// a) the error observed by the recorder should be the same
// error returned by GetList, and it should not be wrapped
if want, got := 1, len(recorder.errs); want != got {
t.Fatalf("expected exactly %d error(s), but got: %d", want, got)
}
// nolint:errorlint // the aggregator in use should return
// the error as is, so the test is asserting with identity.
if want, got := err, recorder.errs[0]; want != got {
t.Errorf("expected GetList to return the original error as is: %v, but got: %v", want, got)
}
// b) GetList should successfully decode object 1 from the good set
if want, got := 1, len(list.Items); want != got {
t.Fatalf("expected the list to have %d item(s), but got: %d", want, got)
}
if want, got := objNameFn(1), list.Items[0].Name; want != got {
t.Errorf("expected an object name of %q, but got: %q", want, got)
}
},
},
{
name: "feature enabled, should aggregate corrupt object errors",
featureEnabled: true,
// we initially create n=7 objects with ids {1, 2, 3 ... 7}, they are put in the following disjoint sets:
// - good: {1, 3, 5, 7}, these objects will never become corrupt
// - corrupt: {2, 4, 6}, these objects are marked to become corrupt
n: 7,
corrupter: func(i int) string {
if i%2 == 0 {
return CorruptErrKey
}
return ""
},
// while retrieving the n objects, we expect the following from GetList:
// a) GetList encounters corruptObjErr while retrieving objects in the corrupt set
// b) GetList successfully decodes object(s) in the good set
verifier: func(t *testing.T, _ *listErrorAggregatorRecorder, list *example.PodList, listErr error) {
// the error returned from GetList should be an API status object
var statusGot apierrors.APIStatus
if !errors.As(listErr, &statusGot) {
t.Fatalf("expected an API status error object, but got: %v", listErr)
}
details := statusGot.Status().Details
corrupt := []string{keyFn(2), keyFn(4), keyFn(6)}
// a) all the corruptObjErr errors should be aggregated
if details == nil || len(details.Causes) != len(corrupt) {
t.Fatalf("expected the API status to include the corrupt object errors, but got: %v", details)
}
for i, key := range corrupt {
if want, got := key, details.Causes[i].Field; want != got {
t.Errorf("expected an object name of %q, but got: %q", want, got)
}
}
// b) GetList should successfully decode all objects in the good set
good := []int{1, 3, 5, 7}
if want, got := len(good), len(list.Items); want != got {
t.Fatalf("expected the list to have %d item(s), but got: %d", want, got)
}
for i, id := range good {
if want, got := objNameFn(id), list.Items[i].Name; want != got {
t.Errorf("expected an object name of %q, but got: %q", want, got)
}
}
},
},
{
name: "feature enabled, aggregation should abort as soon as it encounters an unexpected error",
featureEnabled: true,
// we initially create n=7 objects with ids {1, 2, 3 ... 7}, they are put in the following disjoint sets:
// - good: {1, 3, 5, 7}, these objects will never become corrupt
// - unexpected: {4}, this object is marked to yield an unexpected error (not corruptObjErr)
// - corrupt: {2, 6}, these objects are marked to become corrupt
n: 7,
corrupter: func(i int) string {
switch {
case i == 4:
return UnexpectedErrKey
case i%2 == 0:
return CorruptErrKey
default:
return ""
}
},
// the following sequence of events are expected to occur in order while retrieving the n objects:
//
// --|- 1: no error, successfully decoded
// |- 2: yields an expected corruptObjErr, GetList aggregates this error
// |- 3: no error, successfully decoded
// |- 4: unexpected error, GetList aborts
//
// a) GetList encounters a corruptObjErr error while retrieving {2} in the corrupt set
// b) GetList encounters an unexpected (not corruptObjErr) error while
// retrieving {4} in the unexpected set, and immediately aborts
// c) GetList successfully decodes {1,3} in the good set before it aborts
verifier: func(t *testing.T, recorder *listErrorAggregatorRecorder, list *example.PodList, err error) {
// the error returned from GetList should be an API status object
var statusGot apierrors.APIStatus
if !errors.As(err, &statusGot) {
t.Fatalf("expected an API status error object, but got: %v", err)
}
details := statusGot.Status().Details
// a) only the corruptObjErr from object {2} should be aggregated
if details == nil || len(details.Causes) != 1 {
t.Errorf("expected the API status to include the corrupt object error aggregated, but got: %v", details)
}
if want, got := keyFn(2), details.Causes[0].Field; want != got {
t.Errorf("expected an object name of %q, but got: %q", want, got)
}
// b) the recorder should observe the unexpected error as well
if want, got := 2, len(recorder.errs); want != got {
t.Fatalf("expected GetList to pass the unexpected error to the aggregator, want %d error(s), but got: %d", want, got)
}
// c) verify that GetList successfully decodes
ids := []int{1, 3}
if want, got := len(ids), len(list.Items); want != got {
t.Errorf("expected GetList to successfully decode %d object(s), but got: %d", want, got)
}
for i, id := range ids {
if want, got := objNameFn(id), list.Items[i].Name; want != got {
t.Errorf("expected an object name of %q, but got: %q", want, got)
}
}
},
},
{
name: "feature enabled, error aggregation should not exceed the maximum limit",
featureEnabled: true,
// aggregation limit is currently hard coded to 100
// we initially create n=210 objects with ids {1, 2, 3 ... 210}, they are put in the following disjoint sets:
// - good: {1, 3, 5 ... 195, 197, 199, ... 207, 209}, these 105 objects will never become corrupt
// - corrupt: {2, 4, 6 ... 196, 198, 200, ... 208, 210}, these 105 objects are marked to become corrupt
n: 210,
corrupter: func(i int) string {
if i%2 == 0 {
return CorruptErrKey
}
return ""
},
// while listing the n objects, we expect the following:
// a) GetList continues to aggregate the corruptObjErr errors
// until it reaches the maximum limit, and then it immediately aborts
// b) GetList successfully decodes the first 100 objects in the good set into list
verifier: func(t *testing.T, recorder *listErrorAggregatorRecorder, list *example.PodList, err error) {
// a) the recorder should observe exactly N errors, N=limit
limit := 100
if want, got := limit, len(recorder.errs); want != got {
t.Errorf("expected GetList to aggregate exactly %d error(s), but got: %d", want, got)
}
var statusGot apierrors.APIStatus
if !errors.As(err, &statusGot) {
t.Fatalf("expected an API status error object, but got: %v", err)
}
details := statusGot.Status().Details
// the (limit+1)th entry should be the sentinel error
if want := limit + 1; details == nil || len(details.Causes) != want {
t.Fatalf("expected GetList to append the sentinel error to the list, want: %d, but got: %d", want, len(details.Causes))
}
want := metav1.StatusCause{
Type: metav1.CauseTypeTooMany,
Message: "too many errors, the list is truncated",
}
if got := details.Causes[limit]; !cmp.Equal(want, got) {
t.Errorf("expected the sentinel error, diff: %s", cmp.Diff(want, got))
}
// b) before the limit is hit and GetList aborts, it successfully decodes 100 objects
if want, got := 100, len(list.Items); want != got {
t.Errorf("expected GetList to successfully decode %d object(s), but got: %d", want, got)
}
},
},
{
name: "feature enabled, there are exactly 100 (error aggregation limit) corrupt errors",
featureEnabled: true,
// aggregation limit is currently hard coded to 100
// we initially create n=100 objects with ids {1, 2, 3 ... 100}, they are put in the following disjoint sets:
// - good: {}, all the objects are marked to become corrupt
// - corrupt: {1, 2, 3 ... 99, 100}, these objects are marked to become corrupt
n: 100,
corrupter: func(i int) string { return CorruptErrKey },
// while listing the n objects, we expect the following:
// a) GetList continues to aggregate the corruptObjErr errors
// until it reaches the maximum limit, and then it immediately aborts
// b) GetList successfully decodes zero objects
verifier: func(t *testing.T, recorder *listErrorAggregatorRecorder, list *example.PodList, err error) {
// a) the recorder should observe the corrupt errors
limit := 100
if want, got := limit, len(recorder.errs); want != got {
t.Errorf("expected GetList to aggregate exactly %d error(s), but got: %d", want, got)
}
var statusGot apierrors.APIStatus
if !errors.As(err, &statusGot) {
t.Fatalf("expected an API status error object, but got: %v", err)
}
// the (limit+1)th entry should be the sentinel error
details := statusGot.Status().Details
if want := limit + 1; details == nil || len(details.Causes) != want {
t.Errorf("expected GetList to append the sentinel error to the list, want: %d, but got: %d", want, len(details.Causes))
}
want := metav1.StatusCause{
Type: metav1.CauseTypeTooMany,
Message: "too many errors, the list is truncated",
}
if got := details.Causes[limit]; !cmp.Equal(want, got) {
t.Errorf("expected the sentinel error, diff: %s", cmp.Diff(want, got))
}
// b) before the limit is hit and GetList aborts, it successfully decodes 0 objects
if want, got := 0, len(list.Items); want != got {
t.Errorf("expected GetList to successfully decode %d object(s), but got: %d", want, got)
}
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.AllowUnsafeMalformedObjectDeletion, test.featureEnabled)
ctx, factory, store := newStoreFn(t)
// Step 1: add N objects to the store foo-{1 ... n}
// we construct the names of the objects in a way that ensures
// that the order of creation is also the lexicographical
// order by which etcd List operation returns the objects
for i := 1; i <= test.n; i++ {
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{
Name: objNameFn(i),
Namespace: "ns",
}}
// add the annotation that will mark the object to become corrupt
if marker := test.corrupter(i); len(marker) > 0 {
obj.Annotations = map[string]string{
marker: "",
}
}
testPropagateStore(ctx, t, store, obj)
}
// step 2: list the N objects, we expect no error
out := &example.PodList{}
storageOpts := storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
}
err := store.GetList(ctx, prefix, storageOpts, out)
if err != nil {
t.Fatalf("GetList failed with unexpected error: %v", err)
}
if want, got := test.n, len(out.Items); want != got {
t.Fatalf("Expected length: %d, but got: %d", want, got)
}
// step 3: change the transformer so the marked objects appear corrupt
revertTransformer := store.CorruptTransformer()
defer revertTransformer()
// step 4: invoke GetList again, this time it should encounter the corrupt object(s)
out = &example.PodList{}
err = store.GetList(ctx, prefix, storageOpts, out)
if err == nil {
t.Fatalf("Expected GetList to return error")
}
// step 5: verify what we expect from GetList
test.verifier(t, factory.recorder, out, err)
})
}
}
type CallsValidation func(t *testing.T, pageSize, estimatedProcessedObjects uint64)
func RunTestListContinuation(ctx context.Context, t *testing.T, store storage.Interface, validation CallsValidation) {

View file

@ -414,7 +414,16 @@ func RunTestWatchError(ctx context.Context, t *testing.T, store InterfaceWithPre
}
func RunTestWatchWithUnsafeDelete(ctx context.Context, t *testing.T, store InterfaceWithCorruptTransformer) {
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}
obj := &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "test-ns",
// this annotation marks the object to become corrupt
Annotations: map[string]string{
CorruptErrKey: "",
},
},
}
key := computePodKey(obj)
out := &example.Pod{}