diff --git a/tsdb/head.go b/tsdb/head.go index f08453526b..69bc589469 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1846,124 +1846,21 @@ func (h *Head) Tombstones() (tombstones.Reader, error) { return h.tombstones, nil } -// updateSharedMetadata updates the head's shared metadata store after a -// series' kind metadata has been modified. The series lock must be held. -func (h *Head) updateSharedMetadata(s *memSeries, kind seriesmetadata.KindDescriptor) { - if h.seriesMeta == nil { - return - } - v, ok := kind.CollectFromSeries(s) - if !ok { - return - } - hash := s.getStableHash() - +// updateMetaStripes updates the ref↔hash bidirectional mappings in the +// metadata stripe tables. These mappings are used by LabelsForHash queries +// and GC cleanup. The series lock need NOT be held since ref and hash are +// passed as arguments. +func (h *Head) updateMetaStripes(ref chunks.HeadSeriesRef, hash uint64) { mask := uint64(len(h.metaRefStripes) - 1) - refShard := &h.metaRefStripes[uint64(s.ref)&mask] + refShard := &h.metaRefStripes[uint64(ref)&mask] refShard.Lock() - refShard.refToHash[s.ref] = hash + refShard.refToHash[ref] = hash refShard.Unlock() hashShard := &h.metaHashStripes[hash&mask] hashShard.Lock() - hashShard.hashToRef[hash] = s.ref + hashShard.hashToRef[hash] = ref hashShard.Unlock() - - // For resource kind: use SetVersionedWithDiff to get old/new in a single - // lock acquisition (avoids 3 separate MemStore locks: get, set, get). - if kind.ID() == seriesmetadata.KindResource { - vr := v.(*seriesmetadata.VersionedResource) - oldVR, newVR := h.seriesMeta.ResourceStore().SetVersionedWithDiff(hash, vr) - h.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR) - return - } - - // Other kinds: standard path. - store := h.seriesMeta.StoreForKind(kind.ID()) - kind.SetVersioned(store, hash, v) -} - -// updateSharedResourceMetadata is the type-safe hot-path version of -// updateSharedMetadata for resource kind, avoiding interface{} boxing. -func (h *Head) updateSharedResourceMetadata(s *memSeries) { - if h.seriesMeta == nil { - return - } - vr, ok := seriesmetadata.CollectResourceDirect(s) - if !ok { - return - } - hash := s.getStableHash() - - mask := uint64(len(h.metaRefStripes) - 1) - refShard := &h.metaRefStripes[uint64(s.ref)&mask] - refShard.Lock() - refShard.refToHash[s.ref] = hash - refShard.Unlock() - - hashShard := &h.metaHashStripes[hash&mask] - hashShard.Lock() - hashShard.hashToRef[hash] = s.ref - hashShard.Unlock() - - oldVR, newVR := h.seriesMeta.ResourceStore().SetVersionedWithDiff(hash, vr) - h.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR) -} - -// updateSharedScopeMetadata is the type-safe hot-path version of -// updateSharedMetadata for scope kind, avoiding interface{} boxing. -func (h *Head) updateSharedScopeMetadata(s *memSeries) { - if h.seriesMeta == nil { - return - } - vs, ok := seriesmetadata.CollectScopeDirect(s) - if !ok { - return - } - hash := s.getStableHash() - - mask := uint64(len(h.metaRefStripes) - 1) - refShard := &h.metaRefStripes[uint64(s.ref)&mask] - refShard.Lock() - refShard.refToHash[s.ref] = hash - refShard.Unlock() - - hashShard := &h.metaHashStripes[hash&mask] - hashShard.Lock() - hashShard.hashToRef[hash] = s.ref - hashShard.Unlock() - - h.seriesMeta.ScopeStore().SetVersioned(hash, vs) -} - -// internSeriesResource replaces the deep-copied maps/slices on the latest per-series -// ResourceVersion with thin copies sharing canonical pointers from the MemStore -// content table. This deduplicates memory when many series share the same resource. -func (h *Head) internSeriesResource(s *memSeries) { - if h.seriesMeta == nil { - return - } - vr, ok := seriesmetadata.CollectResourceDirect(s) - if !ok || len(vr.Versions) == 0 { - return - } - last := len(vr.Versions) - 1 - vr.Versions[last] = h.seriesMeta.ResourceStore().InternVersion(vr.Versions[last]) -} - -// internSeriesScope replaces the deep-copied maps/slices on the latest per-series -// ScopeVersion with thin copies sharing canonical pointers from the MemStore -// content table. -func (h *Head) internSeriesScope(s *memSeries) { - if h.seriesMeta == nil { - return - } - vs, ok := seriesmetadata.CollectScopeDirect(s) - if !ok || len(vs.Versions) == 0 { - return - } - last := len(vs.Versions) - 1 - vs.Versions[last] = h.seriesMeta.ScopeStore().InternVersion(vs.Versions[last]) } // cleanupSharedMetadata removes metadata for deleted series from the shared store. @@ -2791,23 +2688,6 @@ func (s sample) Copy() chunks.Sample { return c } -// kindMetaEntry stores a single kind's metadata on a memSeries. -type kindMetaEntry struct { - kind seriesmetadata.KindID - data any // *Versioned[V] for the appropriate V -} - -// nativeMeta holds OTel native metadata state for a memSeries. -// Allocated lazily on first SetKindMeta call; nil when native metadata -// is not in use, saving 24 bytes per series (slice header + stableHash) -// compared to storing the fields inline on memSeries. -type nativeMeta struct { - // stableHash caches labels.StableHash(lset). Computed lazily on first - // metadata commit (lset is immutable so the hash never changes). - stableHash uint64 - kindMeta []kindMetaEntry -} - // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { @@ -2823,11 +2703,6 @@ type memSeries struct { meta *metadata.Metadata - // nativeMeta holds OTel native metadata (stableHash cache + per-kind versioned data). - // nil when native metadata is not in use, saving 24 bytes per series. - // Allocated lazily on first SetKindMeta call. - nativeMeta *nativeMeta - lset labels.Labels // Locking required with -tags dedupelabels, not otherwise. // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps. @@ -2892,41 +2767,6 @@ func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, return s } -// GetKindMeta returns the metadata for a kind, or nil/false if not set. -func (s *memSeries) GetKindMeta(id seriesmetadata.KindID) (any, bool) { - if s.nativeMeta == nil { - return nil, false - } - for _, e := range s.nativeMeta.kindMeta { - if e.kind == id { - return e.data, true - } - } - return nil, false -} - -// SetKindMeta sets the metadata for a kind. -func (s *memSeries) SetKindMeta(id seriesmetadata.KindID, v any) { - if s.nativeMeta == nil { - s.nativeMeta = &nativeMeta{} - } - for i, e := range s.nativeMeta.kindMeta { - if e.kind == id { - s.nativeMeta.kindMeta[i].data = v - return - } - } - s.nativeMeta.kindMeta = append(s.nativeMeta.kindMeta, kindMetaEntry{kind: id, data: v}) -} - -// getStableHash returns the cached labels.StableHash, computing it on first call. -// Caller must hold s.Lock() and s.nativeMeta must be non-nil (guaranteed after SetKindMeta). -func (s *memSeries) getStableHash() uint64 { - if s.nativeMeta.stableHash == 0 { - s.nativeMeta.stableHash = labels.StableHash(s.lset) - } - return s.nativeMeta.stableHash -} func (s *memSeries) minTime() int64 { if len(s.mmappedChunks) > 0 { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 95af1f6c9a..050586e0e1 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -18,7 +18,6 @@ import ( "errors" "fmt" "log/slog" - "maps" "math" "time" @@ -1834,24 +1833,6 @@ func commitMetadata(b *appendBatch) { } } -// commitResources commits the resource updates for each series in the provided batch. -func (a *headAppenderBase) commitResources(b *appendBatch) { - for i, r := range b.resources { - s := b.resourceSeries[i] - s.Lock() - seriesmetadata.CommitResourceDirect(s, seriesmetadata.ResourceCommitData{ - Identifying: r.Identifying, - Descriptive: r.Descriptive, - Entities: refResourceEntitiesToCommitData(r.Entities), - MinTime: r.MinTime, - MaxTime: r.MaxTime, - }) - a.head.updateSharedResourceMetadata(s) - a.head.internSeriesResource(s) - s.Unlock() - } -} - // refResourceEntitiesToCommitData converts WAL record entities to ResourceEntityData. func refResourceEntitiesToCommitData(entities []record.RefResourceEntity) []seriesmetadata.ResourceEntityData { result := make([]seriesmetadata.ResourceEntityData, len(entities)) @@ -1865,80 +1846,39 @@ func refResourceEntitiesToCommitData(entities []record.RefResourceEntity) []seri return result } -// commitScopes commits the scope updates for each series in the provided batch. -func (a *headAppenderBase) commitScopes(b *appendBatch) { - for i, sc := range b.scopes { - s := b.scopeSeries[i] - s.Lock() - seriesmetadata.CommitScopeDirect(s, seriesmetadata.ScopeCommitData{ - Name: sc.Name, - Version: sc.Version, - SchemaURL: sc.SchemaURL, - Attrs: sc.Attrs, - MinTime: sc.MinTime, - MaxTime: sc.MaxTime, - }) - a.head.updateSharedScopeMetadata(s) - a.head.internSeriesScope(s) - s.Unlock() +// commitAndFilterResources commits resource data directly to the shared MemStore +// (bypassing per-series storage) and filters out entries where the content was +// unchanged — those need no WAL write. Returns the number of entries filtered out. +func (a *headAppenderBase) commitAndFilterResources(b *appendBatch) int { + if a.head.seriesMeta == nil { + return 0 } -} - -// resourceContentUnchanged checks whether the incoming WAL resource record -// has the same content as the current stored ResourceVersion, ignoring time range. -func resourceContentUnchanged(cur *seriesmetadata.ResourceVersion, r record.RefResource) bool { - if !maps.Equal(cur.Identifying, r.Identifying) || - !maps.Equal(cur.Descriptive, r.Descriptive) || - len(cur.Entities) != len(r.Entities) { - return false - } - // Stored entities are sorted by Type; incoming may not be — pairwise match. - for _, ce := range cur.Entities { - found := false - for _, ie := range r.Entities { - if ce.Type == ie.Type && maps.Equal(ce.ID, ie.ID) && maps.Equal(ce.Description, ie.Description) { - found = true - break - } - } - if !found { - return false - } - } - return true -} - -// scopeContentUnchanged checks whether the incoming WAL scope record -// has the same content as the current stored ScopeVersion, ignoring time range. -func scopeContentUnchanged(cur *seriesmetadata.ScopeVersion, sc record.RefScope) bool { - return cur.Name == sc.Name && cur.Version == sc.Version && - cur.SchemaURL == sc.SchemaURL && maps.Equal(cur.Attrs, sc.Attrs) -} - -// filterUnchangedResources removes entries from b.resources where the content -// is identical to what's already stored on the series. For unchanged entries, -// the time range is extended in-place and the shared metadata store is updated. -// Returns the number of entries filtered out. -func (a *headAppenderBase) filterUnchangedResources(b *appendBatch) int { + store := a.head.seriesMeta.ResourceStore() n := 0 for i, r := range b.resources { s := b.resourceSeries[i] s.Lock() - changed := true - if vr, ok := seriesmetadata.CollectResourceDirect(s); ok && len(vr.Versions) > 0 { - cur := vr.Versions[len(vr.Versions)-1] - if resourceContentUnchanged(cur, r) { - cur.UpdateTimeRange(r.MinTime, r.MaxTime) - a.head.updateSharedResourceMetadata(s) - changed = false - } - } + hash := labels.StableHash(s.lset) + ref := s.ref s.Unlock() - if changed { - b.resources[n] = b.resources[i] - b.resourceSeries[n] = b.resourceSeries[i] - n++ + + oldVR, newVR := seriesmetadata.CommitResourceToStore(store, hash, seriesmetadata.ResourceCommitData{ + Identifying: r.Identifying, + Descriptive: r.Descriptive, + Entities: refResourceEntitiesToCommitData(r.Entities), + MinTime: r.MinTime, + MaxTime: r.MaxTime, + }) + a.head.updateMetaStripes(ref, hash) + a.head.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR) + + // If version count unchanged, content was identical — skip WAL write. + if oldVR != nil && len(oldVR.Versions) == len(newVR.Versions) { + continue } + b.resources[n] = b.resources[i] + b.resourceSeries[n] = b.resourceSeries[i] + n++ } filtered := len(b.resources) - n b.resources = b.resources[:n] @@ -1946,30 +1886,39 @@ func (a *headAppenderBase) filterUnchangedResources(b *appendBatch) int { return filtered } -// filterUnchangedScopes removes entries from b.scopes where the content -// is identical to what's already stored on the series. For unchanged entries, -// the time range is extended in-place and the shared metadata store is updated. -// Returns the number of entries filtered out. -func (a *headAppenderBase) filterUnchangedScopes(b *appendBatch) int { +// commitAndFilterScopes commits scope data directly to the shared MemStore +// (bypassing per-series storage) and filters out entries where the content was +// unchanged. Returns the number of entries filtered out. +func (a *headAppenderBase) commitAndFilterScopes(b *appendBatch) int { + if a.head.seriesMeta == nil { + return 0 + } + store := a.head.seriesMeta.ScopeStore() n := 0 for i, sc := range b.scopes { s := b.scopeSeries[i] s.Lock() - changed := true - if vs, ok := seriesmetadata.CollectScopeDirect(s); ok && len(vs.Versions) > 0 { - cur := vs.Versions[len(vs.Versions)-1] - if scopeContentUnchanged(cur, sc) { - cur.UpdateTimeRange(sc.MinTime, sc.MaxTime) - a.head.updateSharedScopeMetadata(s) - changed = false - } - } + hash := labels.StableHash(s.lset) + ref := s.ref s.Unlock() - if changed { - b.scopes[n] = b.scopes[i] - b.scopeSeries[n] = b.scopeSeries[i] - n++ + + oldVS, newVS := seriesmetadata.CommitScopeToStore(store, hash, seriesmetadata.ScopeCommitData{ + Name: sc.Name, + Version: sc.Version, + SchemaURL: sc.SchemaURL, + Attrs: sc.Attrs, + MinTime: sc.MinTime, + MaxTime: sc.MaxTime, + }) + a.head.updateMetaStripes(ref, hash) + + // If version count unchanged, content was identical — skip WAL write. + if oldVS != nil && len(oldVS.Versions) == len(newVS.Versions) { + continue } + b.scopes[n] = b.scopes[i] + b.scopeSeries[n] = b.scopeSeries[i] + n++ } filtered := len(b.scopes) - n b.scopes = b.scopes[:n] @@ -2007,16 +1956,17 @@ func (a *headAppenderBase) Commit() (err error) { a.closed = true }() - // Count total resource/scope updates before filtering, then filter - // unchanged entries to avoid unnecessary WAL writes. Unchanged entries - // have their time range extended in-place under the series lock. + // Count total resource/scope updates before commit+filter, then commit + // directly to the shared MemStore and filter out unchanged entries to + // avoid unnecessary WAL writes. Committing to MemStore before WAL is + // safe: MemStore is rebuilt from WAL on crash. var resourcesTotal, scopesTotal int var resourcesFiltered, scopesFiltered int for _, b := range a.batches { resourcesTotal += len(b.resources) scopesTotal += len(b.scopes) - resourcesFiltered += a.filterUnchangedResources(b) - scopesFiltered += a.filterUnchangedScopes(b) + resourcesFiltered += a.commitAndFilterResources(b) + scopesFiltered += a.commitAndFilterScopes(b) } if err := a.log(); err != nil { @@ -2068,8 +2018,6 @@ func (a *headAppenderBase) Commit() (err error) { a.commitHistograms(b, acc) a.commitFloatHistograms(b, acc) commitMetadata(b) - a.commitResources(b) - a.commitScopes(b) } // Unmark all series as pending commit after all samples have been committed. a.unmarkCreatedSeriesAsPendingCommit() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2c03a55cec..4007b71be1 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -7892,6 +7892,7 @@ func TestWALReplayRaceWithStaleSeriesCompaction(t *testing.T) { func TestResourceAndScopeWALReplay(t *testing.T) { dir := t.TempDir() opts := newTestHeadDefaultOptions(1000, false) + opts.EnableNativeMetadata = true opts.ChunkDirRoot = dir wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) @@ -7923,16 +7924,12 @@ func TestResourceAndScopeWALReplay(t *testing.T) { require.NoError(t, appErr) require.NoError(t, app.Commit()) - // Verify the series has resource set. - s := head.series.getByID(chunks.HeadSeriesRef(ref)) - require.NotNil(t, s) - s.Lock() - res, resOK := s.GetKindMeta(seriesmetadata.KindResource) - require.True(t, resOK) - vr := res.(*seriesmetadata.VersionedResource) + // Verify resource in shared MemStore. + hash := labels.StableHash(lset) + vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash) + require.True(t, vrOK) require.Len(t, vr.Versions, 1) require.Equal(t, "frontend", vr.Versions[0].Identifying["service.name"]) - s.Unlock() // Close the head and replay WAL into a new head using the same dir. require.NoError(t, head.Close()) @@ -7945,21 +7942,18 @@ func TestResourceAndScopeWALReplay(t *testing.T) { t.Cleanup(func() { _ = head2.Close() }) require.NoError(t, head2.Init(0)) - // The replayed head should have the resource. - s2 := head2.series.getByID(chunks.HeadSeriesRef(ref)) - require.NotNil(t, s2) - s2.Lock() - res2, res2OK := s2.GetKindMeta(seriesmetadata.KindResource) - require.True(t, res2OK, "resource should survive WAL replay") - vr2 := res2.(*seriesmetadata.VersionedResource) + // The replayed head should have the resource in its shared store. + vr2, vr2OK := head2.seriesMeta.ResourceStore().GetVersioned(hash) + require.True(t, vr2OK, "resource should survive WAL replay") require.Len(t, vr2.Versions, 1) require.Equal(t, "frontend", vr2.Versions[0].Identifying["service.name"]) require.Equal(t, "node-1", vr2.Versions[0].Descriptive["host.name"]) - s2.Unlock() } func TestResourceAndScopeRollback(t *testing.T) { - head, _ := newTestHead(t, 1000, compression.None, false) + opts := newTestHeadDefaultOptions(1000, false) + opts.EnableNativeMetadata = true + head, _ := newTestHeadWithOptions(t, compression.None, opts) require.NoError(t, head.Init(0)) ctx := context.Background() @@ -7984,17 +7978,16 @@ func TestResourceAndScopeRollback(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Rollback()) - // The series should NOT have a resource after rollback. - s := head.series.getByID(chunks.HeadSeriesRef(ref)) - require.NotNil(t, s) - s.Lock() - _, resOK := s.GetKindMeta(seriesmetadata.KindResource) + // The shared store should NOT have a resource after rollback. + hash := labels.StableHash(lset) + _, resOK := head.seriesMeta.ResourceStore().GetVersioned(hash) require.False(t, resOK, "resource must not be set after rollback") - s.Unlock() } func TestResourceDedupInV1Appender(t *testing.T) { - head, _ := newTestHead(t, 1000, compression.None, false) + opts := newTestHeadDefaultOptions(1000, false) + opts.EnableNativeMetadata = true + head, _ := newTestHeadWithOptions(t, compression.None, opts) require.NoError(t, head.Init(0)) ctx := context.Background() @@ -8027,19 +8020,17 @@ func TestResourceDedupInV1Appender(t *testing.T) { require.NoError(t, app.Commit()) // Only the first resource update should have been applied. - s := head.series.getByID(chunks.HeadSeriesRef(ref)) - require.NotNil(t, s) - s.Lock() - res, resOK := s.GetKindMeta(seriesmetadata.KindResource) - require.True(t, resOK) - vr := res.(*seriesmetadata.VersionedResource) + hash := labels.StableHash(lset) + vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash) + require.True(t, vrOK) require.Len(t, vr.Versions, 1) require.Equal(t, "v1", vr.Versions[0].Identifying["service.name"]) - s.Unlock() } func TestResourceAndScopeWALReplayWithScope(t *testing.T) { - head, w := newTestHead(t, 1000, compression.None, false) + opts := newTestHeadDefaultOptions(1000, false) + opts.EnableNativeMetadata = true + head, w := newTestHeadWithOptions(t, compression.None, opts) // Manually populate the WAL with series + resource + scope records. populateTestWL(t, w, []any{ []record.RefSeries{ @@ -8072,23 +8063,22 @@ func TestResourceAndScopeWALReplayWithScope(t *testing.T) { require.NoError(t, head.Init(0)) + // Compute the labels hash for series ref=1. s := head.series.getByID(1) require.NotNil(t, s) - s.Lock() - defer s.Unlock() + hash := labels.StableHash(s.lset) + s.Unlock() - // Resource should be replayed. - res, resOK := s.GetKindMeta(seriesmetadata.KindResource) - require.True(t, resOK) - vr := res.(*seriesmetadata.VersionedResource) + // Resource should be replayed in shared store. + vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash) + require.True(t, vrOK) require.Len(t, vr.Versions, 1) require.Equal(t, "frontend", vr.Versions[0].Identifying["service.name"]) - // Scope should be replayed. - sc, scOK := s.GetKindMeta(seriesmetadata.KindScope) - require.True(t, scOK) - vs := sc.(*seriesmetadata.VersionedScope) + // Scope should be replayed in shared store. + vs, vsOK := head.seriesMeta.ScopeStore().GetVersioned(hash) + require.True(t, vsOK) require.Len(t, vs.Versions, 1) require.Equal(t, "go.opentelemetry.io/instrumentation", vs.Versions[0].Name) require.Equal(t, "0.42.0", vs.Versions[0].Version) @@ -8101,6 +8091,7 @@ func TestResourceAndScopeWALReplayWithScope(t *testing.T) { func TestResourceAndScopeWALFilterUnchanged(t *testing.T) { dir := t.TempDir() opts := newTestHeadDefaultOptions(1000, false) + opts.EnableNativeMetadata = true opts.ChunkDirRoot = dir wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None) @@ -8126,15 +8117,11 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) { require.NoError(t, appErr) require.NoError(t, app.Commit()) - // Verify resource exists. - s := head.series.getByID(chunks.HeadSeriesRef(ref)) - require.NotNil(t, s) - s.Lock() - res, resOK := s.GetKindMeta(seriesmetadata.KindResource) - require.True(t, resOK) - vr := res.(*seriesmetadata.VersionedResource) + // Verify resource exists in shared store. + hash := labels.StableHash(lset) + vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash) + require.True(t, vrOK) require.Len(t, vr.Versions, 1) - s.Unlock() // No WAL filtering yet on first commit (nothing stored before). require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.resourceUpdatesWALFiltered)) @@ -8158,13 +8145,10 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) { require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.resourceUpdatesCommitted)) // Time range should have been extended in-place. - s.Lock() - res, _ = s.GetKindMeta(seriesmetadata.KindResource) - vr = res.(*seriesmetadata.VersionedResource) + vr, _ = head.seriesMeta.ResourceStore().GetVersioned(hash) require.Len(t, vr.Versions, 1) require.Equal(t, int64(100), vr.Versions[0].MinTime) require.Equal(t, int64(200), vr.Versions[0].MaxTime) - s.Unlock() // 3. Close head, replay WAL into new head. The filtered entry should // not be in the WAL, but the resource should still exist from the first record. @@ -8178,16 +8162,11 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) { t.Cleanup(func() { _ = head2.Close() }) require.NoError(t, head2.Init(0)) - s2 := head2.series.getByID(chunks.HeadSeriesRef(ref)) - require.NotNil(t, s2) - s2.Lock() - res2, res2OK := s2.GetKindMeta(seriesmetadata.KindResource) - require.True(t, res2OK) - vr2 := res2.(*seriesmetadata.VersionedResource) + vr2, vr2OK := head2.seriesMeta.ResourceStore().GetVersioned(hash) + require.True(t, vr2OK) require.Len(t, vr2.Versions, 1) require.Equal(t, "frontend", vr2.Versions[0].Identifying["service.name"]) require.Equal(t, "node-1", vr2.Versions[0].Descriptive["host.name"]) - s2.Unlock() // 4. Append sample + DIFFERENT resource → Commit. app = head2.Appender(ctx) @@ -8205,12 +8184,9 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) { // Changed content should NOT be filtered. require.Equal(t, 0.0, prom_testutil.ToFloat64(head2.metrics.resourceUpdatesWALFiltered)) - // Two resource versions should exist. - s2.Lock() - res2, _ = s2.GetKindMeta(seriesmetadata.KindResource) - vr2 = res2.(*seriesmetadata.VersionedResource) + // Two resource versions should exist in shared store. + vr2, _ = head2.seriesMeta.ResourceStore().GetVersioned(hash) require.Len(t, vr2.Versions, 2) require.Equal(t, "frontend", vr2.Versions[0].Identifying["service.name"]) require.Equal(t, "backend", vr2.Versions[1].Identifying["service.name"]) - s2.Unlock() } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 783ce3c9d9..747ec7a71c 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -489,54 +489,63 @@ Outer: clear(v) // Zero out to avoid retaining metadata strings. h.wlReplayMetadataPool.Put(v[:0]) case []record.RefResource: - resKind, _ := seriesmetadata.KindByID(seriesmetadata.KindResource) - for _, r := range v { - if ref, ok := multiRef[r.Ref]; ok { - r.Ref = ref + if h.seriesMeta != nil { + store := h.seriesMeta.ResourceStore() + for _, r := range v { + if ref, ok := multiRef[r.Ref]; ok { + r.Ref = ref + } + s := h.series.getByID(r.Ref) + if s == nil { + unknownResourceRefs.Inc() + missingSeries[r.Ref] = struct{}{} + continue + } + s.Lock() + hash := labels.StableHash(s.lset) + ref := s.ref + s.Unlock() + + oldVR, newVR := seriesmetadata.CommitResourceToStore(store, hash, seriesmetadata.ResourceCommitData{ + Identifying: r.Identifying, + Descriptive: r.Descriptive, + Entities: refResourceEntitiesToCommitData(r.Entities), + MinTime: r.MinTime, + MaxTime: r.MaxTime, + }) + h.updateMetaStripes(ref, hash) + h.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR) } - s := h.series.getByID(r.Ref) - if s == nil { - unknownResourceRefs.Inc() - missingSeries[r.Ref] = struct{}{} - continue - } - s.Lock() - resKind.CommitToSeries(s, seriesmetadata.ResourceCommitData{ - Identifying: r.Identifying, - Descriptive: r.Descriptive, - Entities: refResourceEntitiesToCommitData(r.Entities), - MinTime: r.MinTime, - MaxTime: r.MaxTime, - }) - h.updateSharedMetadata(s, resKind) - h.internSeriesResource(s) - s.Unlock() } h.wlReplayResourcesPool.Put(v) case []record.RefScope: - scopeKind, _ := seriesmetadata.KindByID(seriesmetadata.KindScope) - for _, sc := range v { - if ref, ok := multiRef[sc.Ref]; ok { - sc.Ref = ref + if h.seriesMeta != nil { + store := h.seriesMeta.ScopeStore() + for _, sc := range v { + if ref, ok := multiRef[sc.Ref]; ok { + sc.Ref = ref + } + s := h.series.getByID(sc.Ref) + if s == nil { + unknownScopeRefs.Inc() + missingSeries[sc.Ref] = struct{}{} + continue + } + s.Lock() + hash := labels.StableHash(s.lset) + ref := s.ref + s.Unlock() + + seriesmetadata.CommitScopeToStore(store, hash, seriesmetadata.ScopeCommitData{ + Name: sc.Name, + Version: sc.Version, + SchemaURL: sc.SchemaURL, + Attrs: sc.Attrs, + MinTime: sc.MinTime, + MaxTime: sc.MaxTime, + }) + h.updateMetaStripes(ref, hash) } - s := h.series.getByID(sc.Ref) - if s == nil { - unknownScopeRefs.Inc() - missingSeries[sc.Ref] = struct{}{} - continue - } - s.Lock() - scopeKind.CommitToSeries(s, seriesmetadata.ScopeCommitData{ - Name: sc.Name, - Version: sc.Version, - SchemaURL: sc.SchemaURL, - Attrs: sc.Attrs, - MinTime: sc.MinTime, - MaxTime: sc.MaxTime, - }) - h.updateSharedMetadata(s, scopeKind) - h.internSeriesScope(s) - s.Unlock() } h.wlReplayScopesPool.Put(v) default: diff --git a/tsdb/seriesmetadata/resource_kind.go b/tsdb/seriesmetadata/resource_kind.go index 28cbf47a3a..ae870295c9 100644 --- a/tsdb/seriesmetadata/resource_kind.go +++ b/tsdb/seriesmetadata/resource_kind.go @@ -175,6 +175,38 @@ func CommitResourceDirect(accessor kindMetaAccessor, rcd ResourceCommitData) { } } +// CommitResourceToStore builds a ResourceVersion from ResourceCommitData and +// commits it directly to the MemStore via SetVersionedWithDiff, bypassing +// per-series storage entirely. Returns the old and new versioned state for +// index updates. When old and new have the same number of versions, the +// content was unchanged (only a time range extension) and no WAL write is needed. +func CommitResourceToStore(store *MemStore[*ResourceVersion], labelsHash uint64, rcd ResourceCommitData) (old, cur *VersionedResource) { + entities := make([]*Entity, len(rcd.Entities)) + for j, e := range rcd.Entities { + entityType := e.Type + if entityType == "" { + entityType = EntityTypeResource + } + entities[j] = &Entity{ + Type: entityType, + ID: maps.Clone(e.ID), + Description: maps.Clone(e.Description), + } + } + slices.SortFunc(entities, func(a, b *Entity) int { + return cmp.Compare(a.Type, b.Type) + }) + + rv := &ResourceVersion{ + Identifying: maps.Clone(rcd.Identifying), + Descriptive: maps.Clone(rcd.Descriptive), + Entities: entities, + MinTime: rcd.MinTime, + MaxTime: rcd.MaxTime, + } + return store.SetVersionedWithDiff(labelsHash, &Versioned[*ResourceVersion]{Versions: []*ResourceVersion{rv}}) +} + // CollectResourceDirect is the hot-path equivalent of CollectFromSeries // for resources, avoiding interface{} boxing on the return path. func CollectResourceDirect(accessor kindMetaAccessor) (*VersionedResource, bool) { diff --git a/tsdb/seriesmetadata/scope_kind.go b/tsdb/seriesmetadata/scope_kind.go index 68f4a6afbe..d18e5dbe2e 100644 --- a/tsdb/seriesmetadata/scope_kind.go +++ b/tsdb/seriesmetadata/scope_kind.go @@ -157,6 +157,23 @@ func CommitScopeDirect(accessor kindMetaAccessor, scd ScopeCommitData) { } } +// CommitScopeToStore builds a ScopeVersion from ScopeCommitData and +// commits it directly to the MemStore via SetVersionedWithDiff, bypassing +// per-series storage entirely. Returns the old and new versioned state. +// When old and new have the same number of versions, the content was +// unchanged (only a time range extension) and no WAL write is needed. +func CommitScopeToStore(store *MemStore[*ScopeVersion], labelsHash uint64, scd ScopeCommitData) (old, cur *VersionedScope) { + sv := &ScopeVersion{ + Name: scd.Name, + Version: scd.Version, + SchemaURL: scd.SchemaURL, + Attrs: maps.Clone(scd.Attrs), + MinTime: scd.MinTime, + MaxTime: scd.MaxTime, + } + return store.SetVersionedWithDiff(labelsHash, &Versioned[*ScopeVersion]{Versions: []*ScopeVersion{sv}}) +} + // CollectScopeDirect is the hot-path equivalent of CollectFromSeries // for scopes, avoiding interface{} boxing on the return path. func CollectScopeDirect(accessor kindMetaAccessor) (*VersionedScope, bool) {