fix(unified): populate key_path during bulkimport (#117320)

* fix(unified): populate key_path during bulkimport

Rows in resource_history were being created with empty key_path values
during bulkimport (migration of dashboards/folders). This broke
functionality that depends on key_path for indexing/searching.

The fix adds a buildKeyPath helper function that constructs the key_path
in the same format as normal write operations, and updates processBulkWithTx
to set the KeyPath field when inserting into resource_history.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(unified): add verification for key_path population after bulkimport

Adds verifyKeyPathPopulated function to migration integration tests
that verifies no rows in resource_history have empty key_path values
after bulkimport completes.

Also changes actionStr fallback from "unknown" to the numeric action
value for better debugging.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Rafael Bortolon Paulovic 2026-02-03 16:51:15 +01:00 committed by GitHub
parent 60309200e9
commit e34341472c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 67 additions and 1 deletions

View file

@ -267,6 +267,9 @@ func runMigrationTestSuite(t *testing.T, testCases []resourceMigratorTestCase) {
t.Logf("Verifying migrations are correctly registered")
verifyRegisteredMigrations(t, helper, false, false)
t.Logf("Verifying key_path is populated in resource_history after bulkimport")
verifyKeyPathPopulated(t, helper)
})
}
@ -336,3 +339,39 @@ func verifyResource(t *testing.T, client *apis.K8sResourceClient, uid string, sh
require.Error(t, err)
}
}
// verifyKeyPathPopulated verifies that all rows in resource_history have a non-empty key_path.
// This is important because bulkimport must populate key_path for indexing/searching to work.
func verifyKeyPathPopulated(t *testing.T, helper *apis.K8sTestHelper) {
t.Helper()
query := "SELECT COUNT(*) FROM resource_history WHERE key_path = ''"
rows, err := helper.GetEnv().SQLStore.GetEngine().DB().Query(query)
require.NoError(t, err)
defer func() {
require.NoError(t, rows.Close())
}()
var emptyKeyPathCount int
require.True(t, rows.Next(), "expected at least one row from COUNT query")
require.NoError(t, rows.Scan(&emptyKeyPathCount))
require.NoError(t, rows.Err())
require.Equal(t, 0, emptyKeyPathCount, "found %d rows in resource_history with empty key_path", emptyKeyPathCount)
// Also verify that there are actually some rows with key_path populated
queryTotal := "SELECT COUNT(*) FROM resource_history WHERE key_path != ''"
rowsTotal, err := helper.GetEnv().SQLStore.GetEngine().DB().Query(queryTotal)
require.NoError(t, err)
defer func() {
require.NoError(t, rowsTotal.Close())
}()
var populatedKeyPathCount int
require.True(t, rowsTotal.Next(), "expected at least one row from COUNT query")
require.NoError(t, rowsTotal.Scan(&populatedKeyPathCount))
require.NoError(t, rowsTotal.Err())
t.Logf("Verified %d rows in resource_history have populated key_path", populatedKeyPathCount)
require.Greater(t, populatedKeyPathCount, 0, "expected at least one row in resource_history with populated key_path")
}

View file

@ -24,6 +24,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/rvmanager"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
@ -105,6 +106,25 @@ func (x *bulkLock) Active() bool {
return len(x.running) > 0
}
// buildKeyPath constructs the key_path for a bulk import entry.
// The format matches the key_path used in normal write operations.
func buildKeyPath(key *resourcepb.ResourceKey, rv int64, action resourcepb.BulkRequest_Action, folder string) string {
var actionStr string
switch action {
case resourcepb.BulkRequest_ADDED:
actionStr = "created"
case resourcepb.BulkRequest_MODIFIED:
actionStr = "updated"
case resourcepb.BulkRequest_DELETED:
actionStr = "deleted"
default:
actionStr = fmt.Sprintf("%d", action)
}
snowflakeRV := rvmanager.SnowflakeFromRV(rv)
return fmt.Sprintf("unified/data/%s/%s/%s/%s/%d~%s~%s",
key.Group, key.Resource, key.Namespace, key.Name, snowflakeRV, actionStr, folder)
}
func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
if b.disableStorageServices {
return &resourcepb.BulkResponse{
@ -256,6 +276,12 @@ func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resou
continue
}
// Compute RV first so we can use it for key_path
resourceVersion := rv.next(obj)
// Build key_path for indexing/searching
keyPath := buildKeyPath(req.Key, resourceVersion, req.Action, req.Folder)
// Write the event to history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
@ -267,7 +293,8 @@ func (b *backend) processBulkWithTx(ctx context.Context, tx db.Tx, setting resou
},
Folder: req.Folder,
GUID: uuid.New().String(),
ResourceVersion: rv.next(obj),
ResourceVersion: resourceVersion,
KeyPath: keyPath,
}); err != nil {
return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
}