refactor: eliminate per-series nativeMeta to fix ingester OOMs

Remove the per-series nativeMeta struct (kindMetaEntry slice, stableHash
cache, GetKindMeta/SetKindMeta methods) from memSeries, saving ~360 bytes
per OTLP series (~10GB for 30M series).

Resource/scope metadata is now committed directly to the shared MemStore
via new CommitResourceToStore/CommitScopeToStore functions, which use
SetVersionedWithDiff to atomically check-and-commit. The filter and commit
steps are merged into commitAndFilterResources/commitAndFilterScopes,
which write to MemStore before WAL logging and skip WAL writes when content
is unchanged (detected by comparing version counts before/after commit).

WAL replay also commits directly to MemStore instead of going through
per-series storage.

Committing to MemStore before WAL is safe because MemStore is in-memory
only and rebuilt from WAL on crash recovery.

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2026-03-08 09:06:39 +01:00
parent 49398f9d51
commit ceaa69d40f
6 changed files with 210 additions and 388 deletions

View file

@ -1846,124 +1846,21 @@ func (h *Head) Tombstones() (tombstones.Reader, error) {
return h.tombstones, nil
}
// updateSharedMetadata updates the head's shared metadata store after a
// series' kind metadata has been modified. The series lock must be held.
func (h *Head) updateSharedMetadata(s *memSeries, kind seriesmetadata.KindDescriptor) {
if h.seriesMeta == nil {
return
}
v, ok := kind.CollectFromSeries(s)
if !ok {
return
}
hash := s.getStableHash()
// updateMetaStripes updates the ref↔hash bidirectional mappings in the
// metadata stripe tables. These mappings are used by LabelsForHash queries
// and GC cleanup. The series lock need NOT be held since ref and hash are
// passed as arguments.
func (h *Head) updateMetaStripes(ref chunks.HeadSeriesRef, hash uint64) {
mask := uint64(len(h.metaRefStripes) - 1)
refShard := &h.metaRefStripes[uint64(s.ref)&mask]
refShard := &h.metaRefStripes[uint64(ref)&mask]
refShard.Lock()
refShard.refToHash[s.ref] = hash
refShard.refToHash[ref] = hash
refShard.Unlock()
hashShard := &h.metaHashStripes[hash&mask]
hashShard.Lock()
hashShard.hashToRef[hash] = s.ref
hashShard.hashToRef[hash] = ref
hashShard.Unlock()
// For resource kind: use SetVersionedWithDiff to get old/new in a single
// lock acquisition (avoids 3 separate MemStore locks: get, set, get).
if kind.ID() == seriesmetadata.KindResource {
vr := v.(*seriesmetadata.VersionedResource)
oldVR, newVR := h.seriesMeta.ResourceStore().SetVersionedWithDiff(hash, vr)
h.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR)
return
}
// Other kinds: standard path.
store := h.seriesMeta.StoreForKind(kind.ID())
kind.SetVersioned(store, hash, v)
}
// updateSharedResourceMetadata is the type-safe hot-path version of
// updateSharedMetadata for resource kind, avoiding interface{} boxing.
func (h *Head) updateSharedResourceMetadata(s *memSeries) {
if h.seriesMeta == nil {
return
}
vr, ok := seriesmetadata.CollectResourceDirect(s)
if !ok {
return
}
hash := s.getStableHash()
mask := uint64(len(h.metaRefStripes) - 1)
refShard := &h.metaRefStripes[uint64(s.ref)&mask]
refShard.Lock()
refShard.refToHash[s.ref] = hash
refShard.Unlock()
hashShard := &h.metaHashStripes[hash&mask]
hashShard.Lock()
hashShard.hashToRef[hash] = s.ref
hashShard.Unlock()
oldVR, newVR := h.seriesMeta.ResourceStore().SetVersionedWithDiff(hash, vr)
h.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR)
}
// updateSharedScopeMetadata is the type-safe hot-path version of
// updateSharedMetadata for scope kind, avoiding interface{} boxing.
func (h *Head) updateSharedScopeMetadata(s *memSeries) {
if h.seriesMeta == nil {
return
}
vs, ok := seriesmetadata.CollectScopeDirect(s)
if !ok {
return
}
hash := s.getStableHash()
mask := uint64(len(h.metaRefStripes) - 1)
refShard := &h.metaRefStripes[uint64(s.ref)&mask]
refShard.Lock()
refShard.refToHash[s.ref] = hash
refShard.Unlock()
hashShard := &h.metaHashStripes[hash&mask]
hashShard.Lock()
hashShard.hashToRef[hash] = s.ref
hashShard.Unlock()
h.seriesMeta.ScopeStore().SetVersioned(hash, vs)
}
// internSeriesResource replaces the deep-copied maps/slices on the latest per-series
// ResourceVersion with thin copies sharing canonical pointers from the MemStore
// content table. This deduplicates memory when many series share the same resource.
func (h *Head) internSeriesResource(s *memSeries) {
if h.seriesMeta == nil {
return
}
vr, ok := seriesmetadata.CollectResourceDirect(s)
if !ok || len(vr.Versions) == 0 {
return
}
last := len(vr.Versions) - 1
vr.Versions[last] = h.seriesMeta.ResourceStore().InternVersion(vr.Versions[last])
}
// internSeriesScope replaces the deep-copied maps/slices on the latest per-series
// ScopeVersion with thin copies sharing canonical pointers from the MemStore
// content table.
func (h *Head) internSeriesScope(s *memSeries) {
if h.seriesMeta == nil {
return
}
vs, ok := seriesmetadata.CollectScopeDirect(s)
if !ok || len(vs.Versions) == 0 {
return
}
last := len(vs.Versions) - 1
vs.Versions[last] = h.seriesMeta.ScopeStore().InternVersion(vs.Versions[last])
}
// cleanupSharedMetadata removes metadata for deleted series from the shared store.
@ -2791,23 +2688,6 @@ func (s sample) Copy() chunks.Sample {
return c
}
// kindMetaEntry stores a single kind's metadata on a memSeries.
type kindMetaEntry struct {
kind seriesmetadata.KindID
data any // *Versioned[V] for the appropriate V
}
// nativeMeta holds OTel native metadata state for a memSeries.
// Allocated lazily on first SetKindMeta call; nil when native metadata
// is not in use, saving 24 bytes per series (slice header + stableHash)
// compared to storing the fields inline on memSeries.
type nativeMeta struct {
// stableHash caches labels.StableHash(lset). Computed lazily on first
// metadata commit (lset is immutable so the hash never changes).
stableHash uint64
kindMeta []kindMetaEntry
}
// memSeries is the in-memory representation of a series. None of its methods
// are goroutine safe and it is the caller's responsibility to lock it.
type memSeries struct {
@ -2823,11 +2703,6 @@ type memSeries struct {
meta *metadata.Metadata
// nativeMeta holds OTel native metadata (stableHash cache + per-kind versioned data).
// nil when native metadata is not in use, saving 24 bytes per series.
// Allocated lazily on first SetKindMeta call.
nativeMeta *nativeMeta
lset labels.Labels // Locking required with -tags dedupelabels, not otherwise.
// Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
@ -2892,41 +2767,6 @@ func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64,
return s
}
// GetKindMeta returns the metadata for a kind, or nil/false if not set.
func (s *memSeries) GetKindMeta(id seriesmetadata.KindID) (any, bool) {
if s.nativeMeta == nil {
return nil, false
}
for _, e := range s.nativeMeta.kindMeta {
if e.kind == id {
return e.data, true
}
}
return nil, false
}
// SetKindMeta sets the metadata for a kind.
func (s *memSeries) SetKindMeta(id seriesmetadata.KindID, v any) {
if s.nativeMeta == nil {
s.nativeMeta = &nativeMeta{}
}
for i, e := range s.nativeMeta.kindMeta {
if e.kind == id {
s.nativeMeta.kindMeta[i].data = v
return
}
}
s.nativeMeta.kindMeta = append(s.nativeMeta.kindMeta, kindMetaEntry{kind: id, data: v})
}
// getStableHash returns the cached labels.StableHash, computing it on first call.
// Caller must hold s.Lock() and s.nativeMeta must be non-nil (guaranteed after SetKindMeta).
func (s *memSeries) getStableHash() uint64 {
if s.nativeMeta.stableHash == 0 {
s.nativeMeta.stableHash = labels.StableHash(s.lset)
}
return s.nativeMeta.stableHash
}
func (s *memSeries) minTime() int64 {
if len(s.mmappedChunks) > 0 {

View file

@ -18,7 +18,6 @@ import (
"errors"
"fmt"
"log/slog"
"maps"
"math"
"time"
@ -1834,24 +1833,6 @@ func commitMetadata(b *appendBatch) {
}
}
// commitResources commits the resource updates for each series in the provided batch.
func (a *headAppenderBase) commitResources(b *appendBatch) {
for i, r := range b.resources {
s := b.resourceSeries[i]
s.Lock()
seriesmetadata.CommitResourceDirect(s, seriesmetadata.ResourceCommitData{
Identifying: r.Identifying,
Descriptive: r.Descriptive,
Entities: refResourceEntitiesToCommitData(r.Entities),
MinTime: r.MinTime,
MaxTime: r.MaxTime,
})
a.head.updateSharedResourceMetadata(s)
a.head.internSeriesResource(s)
s.Unlock()
}
}
// refResourceEntitiesToCommitData converts WAL record entities to ResourceEntityData.
func refResourceEntitiesToCommitData(entities []record.RefResourceEntity) []seriesmetadata.ResourceEntityData {
result := make([]seriesmetadata.ResourceEntityData, len(entities))
@ -1865,80 +1846,39 @@ func refResourceEntitiesToCommitData(entities []record.RefResourceEntity) []seri
return result
}
// commitScopes commits the scope updates for each series in the provided batch.
func (a *headAppenderBase) commitScopes(b *appendBatch) {
for i, sc := range b.scopes {
s := b.scopeSeries[i]
s.Lock()
seriesmetadata.CommitScopeDirect(s, seriesmetadata.ScopeCommitData{
Name: sc.Name,
Version: sc.Version,
SchemaURL: sc.SchemaURL,
Attrs: sc.Attrs,
MinTime: sc.MinTime,
MaxTime: sc.MaxTime,
})
a.head.updateSharedScopeMetadata(s)
a.head.internSeriesScope(s)
s.Unlock()
// commitAndFilterResources commits resource data directly to the shared MemStore
// (bypassing per-series storage) and filters out entries where the content was
// unchanged — those need no WAL write. Returns the number of entries filtered out.
func (a *headAppenderBase) commitAndFilterResources(b *appendBatch) int {
if a.head.seriesMeta == nil {
return 0
}
}
// resourceContentUnchanged checks whether the incoming WAL resource record
// has the same content as the current stored ResourceVersion, ignoring time range.
func resourceContentUnchanged(cur *seriesmetadata.ResourceVersion, r record.RefResource) bool {
if !maps.Equal(cur.Identifying, r.Identifying) ||
!maps.Equal(cur.Descriptive, r.Descriptive) ||
len(cur.Entities) != len(r.Entities) {
return false
}
// Stored entities are sorted by Type; incoming may not be — pairwise match.
for _, ce := range cur.Entities {
found := false
for _, ie := range r.Entities {
if ce.Type == ie.Type && maps.Equal(ce.ID, ie.ID) && maps.Equal(ce.Description, ie.Description) {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// scopeContentUnchanged checks whether the incoming WAL scope record
// has the same content as the current stored ScopeVersion, ignoring time range.
func scopeContentUnchanged(cur *seriesmetadata.ScopeVersion, sc record.RefScope) bool {
return cur.Name == sc.Name && cur.Version == sc.Version &&
cur.SchemaURL == sc.SchemaURL && maps.Equal(cur.Attrs, sc.Attrs)
}
// filterUnchangedResources removes entries from b.resources where the content
// is identical to what's already stored on the series. For unchanged entries,
// the time range is extended in-place and the shared metadata store is updated.
// Returns the number of entries filtered out.
func (a *headAppenderBase) filterUnchangedResources(b *appendBatch) int {
store := a.head.seriesMeta.ResourceStore()
n := 0
for i, r := range b.resources {
s := b.resourceSeries[i]
s.Lock()
changed := true
if vr, ok := seriesmetadata.CollectResourceDirect(s); ok && len(vr.Versions) > 0 {
cur := vr.Versions[len(vr.Versions)-1]
if resourceContentUnchanged(cur, r) {
cur.UpdateTimeRange(r.MinTime, r.MaxTime)
a.head.updateSharedResourceMetadata(s)
changed = false
}
}
hash := labels.StableHash(s.lset)
ref := s.ref
s.Unlock()
if changed {
b.resources[n] = b.resources[i]
b.resourceSeries[n] = b.resourceSeries[i]
n++
oldVR, newVR := seriesmetadata.CommitResourceToStore(store, hash, seriesmetadata.ResourceCommitData{
Identifying: r.Identifying,
Descriptive: r.Descriptive,
Entities: refResourceEntitiesToCommitData(r.Entities),
MinTime: r.MinTime,
MaxTime: r.MaxTime,
})
a.head.updateMetaStripes(ref, hash)
a.head.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR)
// If version count unchanged, content was identical — skip WAL write.
if oldVR != nil && len(oldVR.Versions) == len(newVR.Versions) {
continue
}
b.resources[n] = b.resources[i]
b.resourceSeries[n] = b.resourceSeries[i]
n++
}
filtered := len(b.resources) - n
b.resources = b.resources[:n]
@ -1946,30 +1886,39 @@ func (a *headAppenderBase) filterUnchangedResources(b *appendBatch) int {
return filtered
}
// filterUnchangedScopes removes entries from b.scopes where the content
// is identical to what's already stored on the series. For unchanged entries,
// the time range is extended in-place and the shared metadata store is updated.
// Returns the number of entries filtered out.
func (a *headAppenderBase) filterUnchangedScopes(b *appendBatch) int {
// commitAndFilterScopes commits scope data directly to the shared MemStore
// (bypassing per-series storage) and filters out entries where the content was
// unchanged. Returns the number of entries filtered out.
func (a *headAppenderBase) commitAndFilterScopes(b *appendBatch) int {
if a.head.seriesMeta == nil {
return 0
}
store := a.head.seriesMeta.ScopeStore()
n := 0
for i, sc := range b.scopes {
s := b.scopeSeries[i]
s.Lock()
changed := true
if vs, ok := seriesmetadata.CollectScopeDirect(s); ok && len(vs.Versions) > 0 {
cur := vs.Versions[len(vs.Versions)-1]
if scopeContentUnchanged(cur, sc) {
cur.UpdateTimeRange(sc.MinTime, sc.MaxTime)
a.head.updateSharedScopeMetadata(s)
changed = false
}
}
hash := labels.StableHash(s.lset)
ref := s.ref
s.Unlock()
if changed {
b.scopes[n] = b.scopes[i]
b.scopeSeries[n] = b.scopeSeries[i]
n++
oldVS, newVS := seriesmetadata.CommitScopeToStore(store, hash, seriesmetadata.ScopeCommitData{
Name: sc.Name,
Version: sc.Version,
SchemaURL: sc.SchemaURL,
Attrs: sc.Attrs,
MinTime: sc.MinTime,
MaxTime: sc.MaxTime,
})
a.head.updateMetaStripes(ref, hash)
// If version count unchanged, content was identical — skip WAL write.
if oldVS != nil && len(oldVS.Versions) == len(newVS.Versions) {
continue
}
b.scopes[n] = b.scopes[i]
b.scopeSeries[n] = b.scopeSeries[i]
n++
}
filtered := len(b.scopes) - n
b.scopes = b.scopes[:n]
@ -2007,16 +1956,17 @@ func (a *headAppenderBase) Commit() (err error) {
a.closed = true
}()
// Count total resource/scope updates before filtering, then filter
// unchanged entries to avoid unnecessary WAL writes. Unchanged entries
// have their time range extended in-place under the series lock.
// Count total resource/scope updates before commit+filter, then commit
// directly to the shared MemStore and filter out unchanged entries to
// avoid unnecessary WAL writes. Committing to MemStore before WAL is
// safe: MemStore is rebuilt from WAL on crash.
var resourcesTotal, scopesTotal int
var resourcesFiltered, scopesFiltered int
for _, b := range a.batches {
resourcesTotal += len(b.resources)
scopesTotal += len(b.scopes)
resourcesFiltered += a.filterUnchangedResources(b)
scopesFiltered += a.filterUnchangedScopes(b)
resourcesFiltered += a.commitAndFilterResources(b)
scopesFiltered += a.commitAndFilterScopes(b)
}
if err := a.log(); err != nil {
@ -2068,8 +2018,6 @@ func (a *headAppenderBase) Commit() (err error) {
a.commitHistograms(b, acc)
a.commitFloatHistograms(b, acc)
commitMetadata(b)
a.commitResources(b)
a.commitScopes(b)
}
// Unmark all series as pending commit after all samples have been committed.
a.unmarkCreatedSeriesAsPendingCommit()

View file

@ -7892,6 +7892,7 @@ func TestWALReplayRaceWithStaleSeriesCompaction(t *testing.T) {
func TestResourceAndScopeWALReplay(t *testing.T) {
dir := t.TempDir()
opts := newTestHeadDefaultOptions(1000, false)
opts.EnableNativeMetadata = true
opts.ChunkDirRoot = dir
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
@ -7923,16 +7924,12 @@ func TestResourceAndScopeWALReplay(t *testing.T) {
require.NoError(t, appErr)
require.NoError(t, app.Commit())
// Verify the series has resource set.
s := head.series.getByID(chunks.HeadSeriesRef(ref))
require.NotNil(t, s)
s.Lock()
res, resOK := s.GetKindMeta(seriesmetadata.KindResource)
require.True(t, resOK)
vr := res.(*seriesmetadata.VersionedResource)
// Verify resource in shared MemStore.
hash := labels.StableHash(lset)
vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash)
require.True(t, vrOK)
require.Len(t, vr.Versions, 1)
require.Equal(t, "frontend", vr.Versions[0].Identifying["service.name"])
s.Unlock()
// Close the head and replay WAL into a new head using the same dir.
require.NoError(t, head.Close())
@ -7945,21 +7942,18 @@ func TestResourceAndScopeWALReplay(t *testing.T) {
t.Cleanup(func() { _ = head2.Close() })
require.NoError(t, head2.Init(0))
// The replayed head should have the resource.
s2 := head2.series.getByID(chunks.HeadSeriesRef(ref))
require.NotNil(t, s2)
s2.Lock()
res2, res2OK := s2.GetKindMeta(seriesmetadata.KindResource)
require.True(t, res2OK, "resource should survive WAL replay")
vr2 := res2.(*seriesmetadata.VersionedResource)
// The replayed head should have the resource in its shared store.
vr2, vr2OK := head2.seriesMeta.ResourceStore().GetVersioned(hash)
require.True(t, vr2OK, "resource should survive WAL replay")
require.Len(t, vr2.Versions, 1)
require.Equal(t, "frontend", vr2.Versions[0].Identifying["service.name"])
require.Equal(t, "node-1", vr2.Versions[0].Descriptive["host.name"])
s2.Unlock()
}
func TestResourceAndScopeRollback(t *testing.T) {
head, _ := newTestHead(t, 1000, compression.None, false)
opts := newTestHeadDefaultOptions(1000, false)
opts.EnableNativeMetadata = true
head, _ := newTestHeadWithOptions(t, compression.None, opts)
require.NoError(t, head.Init(0))
ctx := context.Background()
@ -7984,17 +7978,16 @@ func TestResourceAndScopeRollback(t *testing.T) {
require.NoError(t, err)
require.NoError(t, app.Rollback())
// The series should NOT have a resource after rollback.
s := head.series.getByID(chunks.HeadSeriesRef(ref))
require.NotNil(t, s)
s.Lock()
_, resOK := s.GetKindMeta(seriesmetadata.KindResource)
// The shared store should NOT have a resource after rollback.
hash := labels.StableHash(lset)
_, resOK := head.seriesMeta.ResourceStore().GetVersioned(hash)
require.False(t, resOK, "resource must not be set after rollback")
s.Unlock()
}
func TestResourceDedupInV1Appender(t *testing.T) {
head, _ := newTestHead(t, 1000, compression.None, false)
opts := newTestHeadDefaultOptions(1000, false)
opts.EnableNativeMetadata = true
head, _ := newTestHeadWithOptions(t, compression.None, opts)
require.NoError(t, head.Init(0))
ctx := context.Background()
@ -8027,19 +8020,17 @@ func TestResourceDedupInV1Appender(t *testing.T) {
require.NoError(t, app.Commit())
// Only the first resource update should have been applied.
s := head.series.getByID(chunks.HeadSeriesRef(ref))
require.NotNil(t, s)
s.Lock()
res, resOK := s.GetKindMeta(seriesmetadata.KindResource)
require.True(t, resOK)
vr := res.(*seriesmetadata.VersionedResource)
hash := labels.StableHash(lset)
vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash)
require.True(t, vrOK)
require.Len(t, vr.Versions, 1)
require.Equal(t, "v1", vr.Versions[0].Identifying["service.name"])
s.Unlock()
}
func TestResourceAndScopeWALReplayWithScope(t *testing.T) {
head, w := newTestHead(t, 1000, compression.None, false)
opts := newTestHeadDefaultOptions(1000, false)
opts.EnableNativeMetadata = true
head, w := newTestHeadWithOptions(t, compression.None, opts)
// Manually populate the WAL with series + resource + scope records.
populateTestWL(t, w, []any{
[]record.RefSeries{
@ -8072,23 +8063,22 @@ func TestResourceAndScopeWALReplayWithScope(t *testing.T) {
require.NoError(t, head.Init(0))
// Compute the labels hash for series ref=1.
s := head.series.getByID(1)
require.NotNil(t, s)
s.Lock()
defer s.Unlock()
hash := labels.StableHash(s.lset)
s.Unlock()
// Resource should be replayed.
res, resOK := s.GetKindMeta(seriesmetadata.KindResource)
require.True(t, resOK)
vr := res.(*seriesmetadata.VersionedResource)
// Resource should be replayed in shared store.
vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash)
require.True(t, vrOK)
require.Len(t, vr.Versions, 1)
require.Equal(t, "frontend", vr.Versions[0].Identifying["service.name"])
// Scope should be replayed.
sc, scOK := s.GetKindMeta(seriesmetadata.KindScope)
require.True(t, scOK)
vs := sc.(*seriesmetadata.VersionedScope)
// Scope should be replayed in shared store.
vs, vsOK := head.seriesMeta.ScopeStore().GetVersioned(hash)
require.True(t, vsOK)
require.Len(t, vs.Versions, 1)
require.Equal(t, "go.opentelemetry.io/instrumentation", vs.Versions[0].Name)
require.Equal(t, "0.42.0", vs.Versions[0].Version)
@ -8101,6 +8091,7 @@ func TestResourceAndScopeWALReplayWithScope(t *testing.T) {
func TestResourceAndScopeWALFilterUnchanged(t *testing.T) {
dir := t.TempDir()
opts := newTestHeadDefaultOptions(1000, false)
opts.EnableNativeMetadata = true
opts.ChunkDirRoot = dir
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.None)
@ -8126,15 +8117,11 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) {
require.NoError(t, appErr)
require.NoError(t, app.Commit())
// Verify resource exists.
s := head.series.getByID(chunks.HeadSeriesRef(ref))
require.NotNil(t, s)
s.Lock()
res, resOK := s.GetKindMeta(seriesmetadata.KindResource)
require.True(t, resOK)
vr := res.(*seriesmetadata.VersionedResource)
// Verify resource exists in shared store.
hash := labels.StableHash(lset)
vr, vrOK := head.seriesMeta.ResourceStore().GetVersioned(hash)
require.True(t, vrOK)
require.Len(t, vr.Versions, 1)
s.Unlock()
// No WAL filtering yet on first commit (nothing stored before).
require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.resourceUpdatesWALFiltered))
@ -8158,13 +8145,10 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) {
require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.resourceUpdatesCommitted))
// Time range should have been extended in-place.
s.Lock()
res, _ = s.GetKindMeta(seriesmetadata.KindResource)
vr = res.(*seriesmetadata.VersionedResource)
vr, _ = head.seriesMeta.ResourceStore().GetVersioned(hash)
require.Len(t, vr.Versions, 1)
require.Equal(t, int64(100), vr.Versions[0].MinTime)
require.Equal(t, int64(200), vr.Versions[0].MaxTime)
s.Unlock()
// 3. Close head, replay WAL into new head. The filtered entry should
// not be in the WAL, but the resource should still exist from the first record.
@ -8178,16 +8162,11 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) {
t.Cleanup(func() { _ = head2.Close() })
require.NoError(t, head2.Init(0))
s2 := head2.series.getByID(chunks.HeadSeriesRef(ref))
require.NotNil(t, s2)
s2.Lock()
res2, res2OK := s2.GetKindMeta(seriesmetadata.KindResource)
require.True(t, res2OK)
vr2 := res2.(*seriesmetadata.VersionedResource)
vr2, vr2OK := head2.seriesMeta.ResourceStore().GetVersioned(hash)
require.True(t, vr2OK)
require.Len(t, vr2.Versions, 1)
require.Equal(t, "frontend", vr2.Versions[0].Identifying["service.name"])
require.Equal(t, "node-1", vr2.Versions[0].Descriptive["host.name"])
s2.Unlock()
// 4. Append sample + DIFFERENT resource → Commit.
app = head2.Appender(ctx)
@ -8205,12 +8184,9 @@ func TestResourceAndScopeWALFilterUnchanged(t *testing.T) {
// Changed content should NOT be filtered.
require.Equal(t, 0.0, prom_testutil.ToFloat64(head2.metrics.resourceUpdatesWALFiltered))
// Two resource versions should exist.
s2.Lock()
res2, _ = s2.GetKindMeta(seriesmetadata.KindResource)
vr2 = res2.(*seriesmetadata.VersionedResource)
// Two resource versions should exist in shared store.
vr2, _ = head2.seriesMeta.ResourceStore().GetVersioned(hash)
require.Len(t, vr2.Versions, 2)
require.Equal(t, "frontend", vr2.Versions[0].Identifying["service.name"])
require.Equal(t, "backend", vr2.Versions[1].Identifying["service.name"])
s2.Unlock()
}

View file

@ -489,54 +489,63 @@ Outer:
clear(v) // Zero out to avoid retaining metadata strings.
h.wlReplayMetadataPool.Put(v[:0])
case []record.RefResource:
resKind, _ := seriesmetadata.KindByID(seriesmetadata.KindResource)
for _, r := range v {
if ref, ok := multiRef[r.Ref]; ok {
r.Ref = ref
if h.seriesMeta != nil {
store := h.seriesMeta.ResourceStore()
for _, r := range v {
if ref, ok := multiRef[r.Ref]; ok {
r.Ref = ref
}
s := h.series.getByID(r.Ref)
if s == nil {
unknownResourceRefs.Inc()
missingSeries[r.Ref] = struct{}{}
continue
}
s.Lock()
hash := labels.StableHash(s.lset)
ref := s.ref
s.Unlock()
oldVR, newVR := seriesmetadata.CommitResourceToStore(store, hash, seriesmetadata.ResourceCommitData{
Identifying: r.Identifying,
Descriptive: r.Descriptive,
Entities: refResourceEntitiesToCommitData(r.Entities),
MinTime: r.MinTime,
MaxTime: r.MaxTime,
})
h.updateMetaStripes(ref, hash)
h.seriesMeta.UpdateResourceAttrIndex(hash, oldVR, newVR)
}
s := h.series.getByID(r.Ref)
if s == nil {
unknownResourceRefs.Inc()
missingSeries[r.Ref] = struct{}{}
continue
}
s.Lock()
resKind.CommitToSeries(s, seriesmetadata.ResourceCommitData{
Identifying: r.Identifying,
Descriptive: r.Descriptive,
Entities: refResourceEntitiesToCommitData(r.Entities),
MinTime: r.MinTime,
MaxTime: r.MaxTime,
})
h.updateSharedMetadata(s, resKind)
h.internSeriesResource(s)
s.Unlock()
}
h.wlReplayResourcesPool.Put(v)
case []record.RefScope:
scopeKind, _ := seriesmetadata.KindByID(seriesmetadata.KindScope)
for _, sc := range v {
if ref, ok := multiRef[sc.Ref]; ok {
sc.Ref = ref
if h.seriesMeta != nil {
store := h.seriesMeta.ScopeStore()
for _, sc := range v {
if ref, ok := multiRef[sc.Ref]; ok {
sc.Ref = ref
}
s := h.series.getByID(sc.Ref)
if s == nil {
unknownScopeRefs.Inc()
missingSeries[sc.Ref] = struct{}{}
continue
}
s.Lock()
hash := labels.StableHash(s.lset)
ref := s.ref
s.Unlock()
seriesmetadata.CommitScopeToStore(store, hash, seriesmetadata.ScopeCommitData{
Name: sc.Name,
Version: sc.Version,
SchemaURL: sc.SchemaURL,
Attrs: sc.Attrs,
MinTime: sc.MinTime,
MaxTime: sc.MaxTime,
})
h.updateMetaStripes(ref, hash)
}
s := h.series.getByID(sc.Ref)
if s == nil {
unknownScopeRefs.Inc()
missingSeries[sc.Ref] = struct{}{}
continue
}
s.Lock()
scopeKind.CommitToSeries(s, seriesmetadata.ScopeCommitData{
Name: sc.Name,
Version: sc.Version,
SchemaURL: sc.SchemaURL,
Attrs: sc.Attrs,
MinTime: sc.MinTime,
MaxTime: sc.MaxTime,
})
h.updateSharedMetadata(s, scopeKind)
h.internSeriesScope(s)
s.Unlock()
}
h.wlReplayScopesPool.Put(v)
default:

View file

@ -175,6 +175,38 @@ func CommitResourceDirect(accessor kindMetaAccessor, rcd ResourceCommitData) {
}
}
// CommitResourceToStore builds a ResourceVersion from ResourceCommitData and
// commits it directly to the MemStore via SetVersionedWithDiff, bypassing
// per-series storage entirely. Returns the old and new versioned state for
// index updates. When old and new have the same number of versions, the
// content was unchanged (only a time range extension) and no WAL write is needed.
func CommitResourceToStore(store *MemStore[*ResourceVersion], labelsHash uint64, rcd ResourceCommitData) (old, cur *VersionedResource) {
entities := make([]*Entity, len(rcd.Entities))
for j, e := range rcd.Entities {
entityType := e.Type
if entityType == "" {
entityType = EntityTypeResource
}
entities[j] = &Entity{
Type: entityType,
ID: maps.Clone(e.ID),
Description: maps.Clone(e.Description),
}
}
slices.SortFunc(entities, func(a, b *Entity) int {
return cmp.Compare(a.Type, b.Type)
})
rv := &ResourceVersion{
Identifying: maps.Clone(rcd.Identifying),
Descriptive: maps.Clone(rcd.Descriptive),
Entities: entities,
MinTime: rcd.MinTime,
MaxTime: rcd.MaxTime,
}
return store.SetVersionedWithDiff(labelsHash, &Versioned[*ResourceVersion]{Versions: []*ResourceVersion{rv}})
}
// CollectResourceDirect is the hot-path equivalent of CollectFromSeries
// for resources, avoiding interface{} boxing on the return path.
func CollectResourceDirect(accessor kindMetaAccessor) (*VersionedResource, bool) {

View file

@ -157,6 +157,23 @@ func CommitScopeDirect(accessor kindMetaAccessor, scd ScopeCommitData) {
}
}
// CommitScopeToStore builds a ScopeVersion from ScopeCommitData and
// commits it directly to the MemStore via SetVersionedWithDiff, bypassing
// per-series storage entirely. Returns the old and new versioned state.
// When old and new have the same number of versions, the content was
// unchanged (only a time range extension) and no WAL write is needed.
func CommitScopeToStore(store *MemStore[*ScopeVersion], labelsHash uint64, scd ScopeCommitData) (old, cur *VersionedScope) {
sv := &ScopeVersion{
Name: scd.Name,
Version: scd.Version,
SchemaURL: scd.SchemaURL,
Attrs: maps.Clone(scd.Attrs),
MinTime: scd.MinTime,
MaxTime: scd.MaxTime,
}
return store.SetVersionedWithDiff(labelsHash, &Versioned[*ScopeVersion]{Versions: []*ScopeVersion{sv}})
}
// CollectScopeDirect is the hot-path equivalent of CollectFromSeries
// for scopes, avoiding interface{} boxing on the return path.
func CollectScopeDirect(accessor kindMetaAccessor) (*VersionedScope, bool) {