diff --git a/pkg/storage/unified/resource/continue.go b/pkg/storage/unified/resource/continue.go index 32d960e785a..ddf19449866 100644 --- a/pkg/storage/unified/resource/continue.go +++ b/pkg/storage/unified/resource/continue.go @@ -18,6 +18,10 @@ type ContinueToken struct { ResourceVersion int64 `json:"v"` // SortAscending indicates the sort order (used by list history). SortAscending bool `json:"s,omitempty"` + // SearchAfter is a []string of sort values for search pagination. + SearchAfter []string `json:"sa,omitempty"` + // SearchBefore is a []string of sort values for search pagination. + SearchBefore []string `json:"sb,omitempty"` } func (c ContinueToken) String() string { @@ -39,3 +43,8 @@ func GetContinueToken(token string) (*ContinueToken, error) { return t, nil } + +// NewSearchContinueToken encodes SearchAfter values into a continue token string. +func NewSearchContinueToken(searchAfter []string, rv int64) (string, error) { + return ContinueToken{SearchAfter: searchAfter, ResourceVersion: rv}.String(), nil +} diff --git a/pkg/storage/unified/resource/list_with_field_selectors.go b/pkg/storage/unified/resource/list_with_field_selectors.go new file mode 100644 index 00000000000..6a40595c20e --- /dev/null +++ b/pkg/storage/unified/resource/list_with_field_selectors.go @@ -0,0 +1,114 @@ +package resource + +import ( + "context" + "net/http" + + "github.com/grafana/grafana/pkg/storage/unified/resourcepb" +) + +func (s *server) listWithFieldSelectors(ctx context.Context, req *resourcepb.ListRequest) (*resourcepb.ListResponse, error) { + if req.Options.Key.Namespace == "" { + return &resourcepb.ListResponse{ + Error: NewBadRequestError("namespace must be specified for list with filter"), + }, nil + } + + for _, v := range req.Options.Fields { + v.Key = SEARCH_SELECTABLE_FIELDS_PREFIX + v.Key + } + + srq := &resourcepb.ResourceSearchRequest{ + Options: req.Options, + Limit: req.Limit, + } + + var listRv int64 + if req.NextPageToken != "" { + token, err := GetContinueToken(req.NextPageToken) + if err != nil { + return &resourcepb.ListResponse{ + Error: NewBadRequestError("invalid continue token"), + }, nil + } + listRv = token.ResourceVersion + srq.SearchAfter = token.SearchAfter + srq.SearchBefore = token.SearchBefore + } + + searchResp, err := s.searchClient.Search(ctx, srq) + if err != nil { + return nil, err + } + + // If it's the first page, set the listRv to the search response RV + if listRv <= 0 { + listRv = searchResp.ResourceVersion + } + + pageBytes := 0 + rsp := &resourcepb.ListResponse{ + ResourceVersion: listRv, + } + + // Using searchResp.GetResults().GetRows() will not panic if anything is nil on the path. + for _, row := range searchResp.GetResults().GetRows() { + // TODO: use batch reads + // The Read() will also handle permission checks here + val, err := s.Read(ctx, &resourcepb.ReadRequest{ + Key: row.Key, + ResourceVersion: row.ResourceVersion, + }) + if err != nil { + return &resourcepb.ListResponse{Error: AsErrorResult(err)}, nil + } + if val.Error != nil { + if val.Error.Code == http.StatusForbidden { + continue + } + return &resourcepb.ListResponse{Error: val.Error}, nil + } + pageBytes += len(val.Value) + rsp.Items = append(rsp.Items, &resourcepb.ResourceWrapper{ + Value: val.Value, + ResourceVersion: val.ResourceVersion, + }) + if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= s.maxPageSizeBytes { + token, err := NewSearchContinueToken(row.GetSortFields(), listRv) + if err != nil { + return &resourcepb.ListResponse{ + Error: NewBadRequestError("invalid continue token"), + }, nil + } + rsp.NextPageToken = token + return rsp, nil + } + } + + return rsp, nil +} + +func filterFieldSelectors(req *resourcepb.ListRequest) *resourcepb.ListRequest { + fields := make([]*resourcepb.Requirement, 0, len(req.Options.Fields)) + for _, f := range req.Options.Fields { + if (f.Operator != "=" && f.Operator != "==") || f.Key == "metadata.namespace" { + continue + } + fields = append(fields, f) + } + req.Options.Fields = fields + + return req +} + +func (s *server) useFieldSelectorSearch(req *resourcepb.ListRequest) bool { + if s.searchClient == nil || req.Source != resourcepb.ListRequest_STORE || len(req.Options.Fields) == 0 { + return false + } + + if req.VersionMatchV2 == resourcepb.ResourceVersionMatchV2_Exact || req.VersionMatchV2 == resourcepb.ResourceVersionMatchV2_NotOlderThan { + return false + } + + return true +} diff --git a/pkg/storage/unified/resource/list_with_field_selectors_test.go b/pkg/storage/unified/resource/list_with_field_selectors_test.go new file mode 100644 index 00000000000..c0492daca2f --- /dev/null +++ b/pkg/storage/unified/resource/list_with_field_selectors_test.go @@ -0,0 +1,434 @@ +package resource + +import ( + "context" + "iter" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + claims "github.com/grafana/authlib/types" + "github.com/grafana/grafana/pkg/apimachinery/identity" + "github.com/grafana/grafana/pkg/storage/unified/resourcepb" + "github.com/grafana/grafana/pkg/util/scheduler" +) + +func TestUseFieldSelectorSearch(t *testing.T) { + tests := map[string]struct { + disableSearch bool + req *resourcepb.ListRequest + expectedAllowed bool + }{ + "false when no search client": { + disableSearch: true, + req: &resourcepb.ListRequest{ + Source: resourcepb.ListRequest_STORE, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + }, + expectedAllowed: false, + }, + "false when source is not store": { + req: &resourcepb.ListRequest{ + Source: resourcepb.ListRequest_HISTORY, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + }, + expectedAllowed: false, + }, + "false when no field selectors": { + req: &resourcepb.ListRequest{ + Source: resourcepb.ListRequest_STORE, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + }, + }, + expectedAllowed: false, + }, + "false when version match exact": { + req: &resourcepb.ListRequest{ + Source: resourcepb.ListRequest_STORE, + VersionMatchV2: resourcepb.ResourceVersionMatchV2_Exact, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + }, + expectedAllowed: false, + }, + "false when version match not older than": { + req: &resourcepb.ListRequest{ + Source: resourcepb.ListRequest_STORE, + VersionMatchV2: resourcepb.ResourceVersionMatchV2_NotOlderThan, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + }, + expectedAllowed: false, + }, + "true when store, fields, and search client": { + req: &resourcepb.ListRequest{ + Source: resourcepb.ListRequest_STORE, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + }, + expectedAllowed: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + s := &server{} + if !tc.disableSearch { + s.searchClient = &stubSearchClient{} + } + + require.Equal(t, tc.expectedAllowed, s.useFieldSelectorSearch(tc.req)) + }) + } +} + +func TestFilterFieldSelectors(t *testing.T) { + tests := map[string]struct { + req *resourcepb.ListRequest + wantFieldKeys []string + }{ + "removes metadata.namespace and keep valid field": { + req: &resourcepb.ListRequest{ + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{ + {Key: "metadata.namespace", Operator: "=", Values: []string{"ns"}}, + {Key: "spec.foo", Operator: "="}, + }, + }, + }, + wantFieldKeys: []string{"spec.foo"}, + }, + "removes multiple unsupported fields": { + req: &resourcepb.ListRequest{ + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{ + {Key: "metadata.namespace", Operator: "=", Values: []string{"ns", "other"}}, + {Key: "spec.foo", Operator: "!="}, + }, + }, + }, + wantFieldKeys: []string{}, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + out := filterFieldSelectors(tc.req) + + gotKeys := make([]string, 0, len(out.Options.Fields)) + for _, f := range out.Options.Fields { + gotKeys = append(gotKeys, f.Key) + } + require.Equal(t, tc.wantFieldKeys, gotKeys) + }) + } +} + +func TestListWithFieldSelectors(t *testing.T) { + searchServerRv := int64(100) + + t.Run("a single page result will have index rv and no next page token", func(t *testing.T) { + ctx := identity.WithServiceIdentityContext(context.Background(), 1) + searchClient := &stubSearchClient{ + resp: &resourcepb.ResourceSearchResponse{ + ResourceVersion: searchServerRv, + Results: &resourcepb.ResourceTable{ + Rows: []*resourcepb.ResourceTableRow{ + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}, + ResourceVersion: 1, + SortFields: []string{"s1"}, + }, + }, + }, + }, + } + s := createTestServer(searchClient, 1024) + req := &resourcepb.ListRequest{ + Limit: 10, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + } + + resp, err := s.listWithFieldSelectors(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Items, 1) + require.Equal(t, searchServerRv, resp.ResourceVersion) + require.Empty(t, resp.NextPageToken) + }) + + t.Run("skips results when Read returns forbidden", func(t *testing.T) { + ctx := identity.WithServiceIdentityContext(context.Background(), 1) + searchClient := &stubSearchClient{ + resp: &resourcepb.ResourceSearchResponse{ + ResourceVersion: searchServerRv, + Results: &resourcepb.ResourceTable{ + Rows: []*resourcepb.ResourceTableRow{ + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}, + ResourceVersion: 1, + SortFields: []string{"s1"}, + }, + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}, + ResourceVersion: 2, + SortFields: []string{"s2"}, + }, + }, + }, + }, + } + s := &server{ + searchClient: searchClient, + backend: &fakeBackend{forbidden: map[string]struct{}{"a": {}}}, + access: claims.FixedAccessClient(true), + queue: scheduler.NewNoopQueue(), + queueConfig: QueueConfig{Timeout: time.Second, MinBackoff: time.Millisecond, MaxBackoff: time.Millisecond, MaxRetries: 1}, + maxPageSizeBytes: 1024, + } + req := &resourcepb.ListRequest{ + Limit: 10, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + } + + resp, err := s.listWithFieldSelectors(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Items, 1) + require.Equal(t, searchServerRv, resp.ResourceVersion) + }) + + t.Run("first page of paginated result will have next page token set and correct number of results", func(t *testing.T) { + ctx := identity.WithServiceIdentityContext(context.Background(), 1) + searchClient := &stubSearchClient{ + resp: &resourcepb.ResourceSearchResponse{ + ResourceVersion: searchServerRv, + Results: &resourcepb.ResourceTable{ + Rows: []*resourcepb.ResourceTableRow{ + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}, + ResourceVersion: 1, + SortFields: []string{"s1"}, + }, + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}, + ResourceVersion: 2, + SortFields: []string{"s2"}, + }, + }, + }, + }, + } + s := createTestServer(searchClient, 1024) + req := &resourcepb.ListRequest{ + Limit: 1, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + } + + resp, err := s.listWithFieldSelectors(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, searchServerRv, resp.ResourceVersion) + require.Len(t, resp.Items, 1) + require.NotEmpty(t, resp.NextPageToken) + token, err := GetContinueToken(resp.NextPageToken) + require.NoError(t, err) + require.NotNil(t, token) + require.Equal(t, []string{"s1"}, token.SearchAfter) + require.Equal(t, searchServerRv, token.ResourceVersion) + }) + + t.Run("can handle pagination when list request has a token present", func(t *testing.T) { + ctx := identity.WithServiceIdentityContext(context.Background(), 1) + continueToken, err := NewSearchContinueToken([]string{"s1"}, searchServerRv) + require.NoError(t, err) + + searchClient := &stubSearchClient{ + resp: &resourcepb.ResourceSearchResponse{ + ResourceVersion: searchServerRv, + Results: &resourcepb.ResourceTable{ + Rows: []*resourcepb.ResourceTableRow{ + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}, + ResourceVersion: 2, + SortFields: []string{"s2"}, + }, + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "c"}, + ResourceVersion: 2, + SortFields: []string{"s3"}, + }, + }, + }, + }, + } + s := createTestServer(searchClient, 1024) + req := &resourcepb.ListRequest{ + Limit: 1, + NextPageToken: continueToken, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + } + + resp, err := s.listWithFieldSelectors(ctx, req) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, searchServerRv, resp.ResourceVersion) + require.Len(t, resp.Items, 1) + require.NotEmpty(t, resp.NextPageToken) + + parsedToken, err := GetContinueToken(continueToken) + require.NoError(t, err) + require.NotNil(t, searchClient.last) + require.Equal(t, parsedToken.SearchAfter, searchClient.last.SearchAfter) + require.Equal(t, parsedToken.SearchBefore, searchClient.last.SearchBefore) + + token, err := GetContinueToken(resp.NextPageToken) + require.NoError(t, err) + require.NotNil(t, token) + require.Equal(t, []string{"s2"}, token.SearchAfter) + require.Equal(t, searchServerRv, token.ResourceVersion) + }) + + t.Run("will paginate when max page size bytes is reached", func(t *testing.T) { + ctx := identity.WithServiceIdentityContext(context.Background(), 1) + searchClient := &stubSearchClient{ + resp: &resourcepb.ResourceSearchResponse{ + ResourceVersion: searchServerRv, + Results: &resourcepb.ResourceTable{ + Rows: []*resourcepb.ResourceTableRow{ + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "a"}, + ResourceVersion: 1, + SortFields: []string{"s1"}, + }, + { + Key: &resourcepb.ResourceKey{Namespace: "ns", Group: "g", Resource: "r", Name: "b"}, + ResourceVersion: 2, + SortFields: []string{"s2"}, + }, + }, + }, + }, + } + s := createTestServer(searchClient, 5) + req := &resourcepb.ListRequest{ + Limit: 10, + Options: &resourcepb.ListOptions{ + Key: &resourcepb.ResourceKey{Namespace: "ns"}, + Fields: []*resourcepb.Requirement{{Key: "spec.foo"}}, + }, + } + + resp, err := s.listWithFieldSelectors(ctx, req) + + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Items, 1) + require.Equal(t, searchServerRv, resp.ResourceVersion) + require.NotEmpty(t, resp.NextPageToken) + + parsedToken, err := GetContinueToken(resp.NextPageToken) + require.NoError(t, err) + require.Equal(t, []string{"s1"}, parsedToken.SearchAfter) + require.Equal(t, searchServerRv, parsedToken.ResourceVersion) + }) +} + +func createTestServer(searchClient resourcepb.ResourceIndexClient, maxPageSizeBytes int) *server { + return &server{ + searchClient: searchClient, + backend: &fakeBackend{}, + access: claims.FixedAccessClient(true), + queue: scheduler.NewNoopQueue(), + queueConfig: QueueConfig{Timeout: time.Second, MinBackoff: time.Millisecond, MaxBackoff: time.Millisecond, MaxRetries: 1}, + maxPageSizeBytes: maxPageSizeBytes, + } +} + +type stubSearchClient struct { + resp *resourcepb.ResourceSearchResponse + err error + last *resourcepb.ResourceSearchRequest +} + +func (s *stubSearchClient) Search(_ context.Context, req *resourcepb.ResourceSearchRequest, _ ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) { + s.last = req + return s.resp, s.err +} + +func (*stubSearchClient) GetStats(_ context.Context, _ *resourcepb.ResourceStatsRequest, _ ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) { + return nil, nil +} + +func (*stubSearchClient) RebuildIndexes(_ context.Context, _ *resourcepb.RebuildIndexesRequest, _ ...grpc.CallOption) (*resourcepb.RebuildIndexesResponse, error) { + return nil, nil +} + +type fakeBackend struct { + forbidden map[string]struct{} +} + +func (*fakeBackend) WriteEvent(context.Context, WriteEvent) (int64, error) { return 0, nil } +func (b *fakeBackend) ReadResource(_ context.Context, req *resourcepb.ReadRequest) *BackendReadResponse { + if b != nil && b.forbidden != nil { + if _, ok := b.forbidden[req.Key.Name]; ok { + return &BackendReadResponse{ + Key: req.Key, + Error: &resourcepb.ErrorResult{Code: http.StatusForbidden}, + } + } + } + return &BackendReadResponse{ + Key: req.Key, + ResourceVersion: req.ResourceVersion, + Value: []byte("value"), + } +} +func (*fakeBackend) ListIterator(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error) { + return 0, nil +} +func (*fakeBackend) ListHistory(context.Context, *resourcepb.ListRequest, func(ListIterator) error) (int64, error) { + return 0, nil +} +func (*fakeBackend) ListModifiedSince(context.Context, NamespacedResource, int64) (int64, iter.Seq2[*ModifiedResource, error]) { + return 0, func(func(*ModifiedResource, error) bool) {} +} +func (*fakeBackend) WatchWriteEvents(context.Context) (<-chan *WrittenEvent, error) { + return nil, nil +} +func (*fakeBackend) GetResourceStats(context.Context, NamespacedResource, int) ([]ResourceStats, error) { + return nil, nil +} +func (*fakeBackend) GetResourceLastImportTimes(context.Context) iter.Seq2[ResourceLastImportTime, error] { + return func(func(ResourceLastImportTime, error) bool) {} +} diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 16737cf5653..8651c3feb13 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -380,6 +380,7 @@ func NewResourceServer(opts ResourceServerOptions) (*server, error) { queueConfig: opts.QOSConfig, overridesService: opts.OverridesService, storageEnabled: true, + searchClient: opts.SearchClient, artificialSuccessfulWriteDelay: opts.Search.IndexMinUpdateInterval, } @@ -432,6 +433,7 @@ type server struct { blob BlobSupport secure secrets.InlineSecureValueSupport search *searchServer + searchClient resourcepb.ResourceIndexClient diagnostics resourcepb.DiagnosticsServer access claims.AccessClient writeHooks WriteAccessHooks @@ -1101,10 +1103,16 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour if req.Limit < 1 { req.Limit = 500 // default max 500 items in a page } - maxPageBytes := s.maxPageSizeBytes pageBytes := 0 rsp := &resourcepb.ListResponse{} + req = filterFieldSelectors(req) + if s.useFieldSelectorSearch(req) { + // If we get here, we're doing list with selectable fields. Let's do search instead, since + // we index all selectable fields, and fetch resulting documents one by one. + return s.listWithFieldSelectors(ctx, req) + } + key := req.Options.Key //nolint:staticcheck // SA1019: Compile is deprecated but BatchCheck is not yet fully implemented checker, _, err := s.access.Compile(ctx, user, claims.ListRequest{ @@ -1157,7 +1165,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour pageBytes += len(item.Value) rsp.Items = append(rsp.Items, item) - if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= maxPageBytes { + if (req.Limit > 0 && len(rsp.Items) >= int(req.Limit)) || pageBytes >= s.maxPageSizeBytes { t := iter.ContinueToken() if iter.Next() { rsp.NextPageToken = t