diff --git a/pkg/storage/unified/migrations/migrator_test.go b/pkg/storage/unified/migrations/migrator_test.go index cfd909a9255..7c085d36746 100644 --- a/pkg/storage/unified/migrations/migrator_test.go +++ b/pkg/storage/unified/migrations/migrator_test.go @@ -267,6 +267,9 @@ func runMigrationTestSuite(t *testing.T, testCases []resourceMigratorTestCase) { t.Logf("Verifying migrations are correctly registered") verifyRegisteredMigrations(t, helper, false, false) + + t.Logf("Verifying key_path is populated in resource_history after bulkimport") + verifyKeyPathPopulated(t, helper) }) } @@ -336,3 +339,39 @@ func verifyResource(t *testing.T, client *apis.K8sResourceClient, uid string, sh require.Error(t, err) } } + +// verifyKeyPathPopulated verifies that all rows in resource_history have a non-empty key_path. +// This is important because bulkimport must populate key_path for indexing/searching to work. +func verifyKeyPathPopulated(t *testing.T, helper *apis.K8sTestHelper) { + t.Helper() + + query := "SELECT COUNT(*) FROM resource_history WHERE key_path = ''" + rows, err := helper.GetEnv().SQLStore.GetEngine().DB().Query(query) + require.NoError(t, err) + defer func() { + require.NoError(t, rows.Close()) + }() + + var emptyKeyPathCount int + require.True(t, rows.Next(), "expected at least one row from COUNT query") + require.NoError(t, rows.Scan(&emptyKeyPathCount)) + require.NoError(t, rows.Err()) + + require.Equal(t, 0, emptyKeyPathCount, "found %d rows in resource_history with empty key_path", emptyKeyPathCount) + + // Also verify that there are actually some rows with key_path populated + queryTotal := "SELECT COUNT(*) FROM resource_history WHERE key_path != ''" + rowsTotal, err := helper.GetEnv().SQLStore.GetEngine().DB().Query(queryTotal) + require.NoError(t, err) + defer func() { + require.NoError(t, rowsTotal.Close()) + }() + + var populatedKeyPathCount int + require.True(t, rowsTotal.Next(), "expected at least one row from COUNT query") + require.NoError(t, rowsTotal.Scan(&populatedKeyPathCount)) + require.NoError(t, rowsTotal.Err()) + + t.Logf("Verified %d rows in resource_history have populated key_path", populatedKeyPathCount) + require.Greater(t, populatedKeyPathCount, 0, "expected at least one row in resource_history with populated key_path") +} diff --git a/pkg/storage/unified/sql/backend_bulk.go b/pkg/storage/unified/sql/backend_bulk.go index dec154b79ec..94b13ff5a9c 100644 --- a/pkg/storage/unified/sql/backend_bulk.go +++ b/pkg/storage/unified/sql/backend_bulk.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" + "github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" ) @@ -105,6 +106,25 @@ func (x *bulkLock) Active() bool { return len(x.running) > 0 } +// buildKeyPath constructs the key_path for a bulk import entry. +// The format matches the key_path used in normal write operations. +func buildKeyPath(key *resourcepb.ResourceKey, rv int64, action resourcepb.BulkRequest_Action, folder string) string { + var actionStr string + switch action { + case resourcepb.BulkRequest_ADDED: + actionStr = "created" + case resourcepb.BulkRequest_MODIFIED: + actionStr = "updated" + case resourcepb.BulkRequest_DELETED: + actionStr = "deleted" + default: + actionStr = fmt.Sprintf("%d", action) + } + snowflakeRV := rvmanager.SnowflakeFromRV(rv) + return fmt.Sprintf("unified/data/%s/%s/%s/%s/%d~%s~%s", + key.Group, key.Resource, key.Namespace, key.Name, snowflakeRV, actionStr, folder) +} + func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse { if b.disableStorageServices { return &resourcepb.BulkResponse{ @@ -256,6 +276,12 @@ func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resou continue } + // Compute RV first so we can use it for key_path + resourceVersion := rv.next(obj) + + // Build key_path for indexing/searching + keyPath := buildKeyPath(req.Key, resourceVersion, req.Action, req.Folder) + // Write the event to history if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{ SQLTemplate: sqltemplate.New(b.dialect), @@ -267,7 +293,8 @@ func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resou }, Folder: req.Folder, GUID: uuid.New().String(), - ResourceVersion: rv.next(obj), + ResourceVersion: resourceVersion, + KeyPath: keyPath, }); err != nil { return rollbackWithError(fmt.Errorf("insert into resource history: %w", err)) }