From 2852c9c431dd0920d60641ffd023c4ab263aa0cb Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 17 Oct 2025 10:26:46 +0100 Subject: [PATCH 1/2] [REFACTOR] TSDB: Simplify series creation Refactor the code so that everything proceeds linearly. Also renamed `getOrSet` to `setUnlessAlreadySet` to emphasise that the caller is expecting it not to be set. Signed-off-by: Bryan Boreham --- tsdb/head.go | 48 ++++++++++++++--------------------------------- tsdb/head_test.go | 10 ++-------- 2 files changed, 16 insertions(+), 42 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 4e77314b02..a02b05f95d 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1753,17 +1753,17 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels, pendingCommit bool) } func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { - s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - shardHash := uint64(0) - if h.opts.EnableSharding { - shardHash = labels.StableHash(lset) - } - - return newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit) - }) - if err != nil { - return nil, false, err + if preCreationErr := h.series.seriesLifecycleCallback.PreCreation(lset); preCreationErr != nil { + return nil, false, preCreationErr } + + shardHash := uint64(0) + if h.opts.EnableSharding { + shardHash = labels.StableHash(lset) + } + optimisticallyCreatedSeries := newMemSeries(lset, id, shardHash, h.opts.IsolationDisabled, pendingCommit) + + s, created := h.series.setUnlessAlreadySet(hash, lset, optimisticallyCreatedSeries) if !created { return s, false, nil } @@ -2061,43 +2061,23 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { return series } -func (s *stripeSeries) getOrSet(hash uint64, lset labels.Labels, createSeries func() *memSeries) (*memSeries, bool, error) { - // PreCreation is called here to avoid calling it inside the lock. - // It is not necessary to call it just before creating a series, - // rather it gives a 'hint' whether to create a series or not. - preCreationErr := s.seriesLifecycleCallback.PreCreation(lset) - - // Create the series, unless the PreCreation() callback as failed. - // If failed, we'll not allow to create a new series anyway. - var series *memSeries - if preCreationErr == nil { - series = createSeries() - } - +func (s *stripeSeries) setUnlessAlreadySet(hash uint64, lset labels.Labels, series *memSeries) (*memSeries, bool) { i := hash & uint64(s.size-1) s.locks[i].Lock() - if prev := s.hashes[i].get(hash, lset); prev != nil { s.locks[i].Unlock() - return prev, false, nil - } - if preCreationErr == nil { - s.hashes[i].set(hash, series) + return prev, false } + s.hashes[i].set(hash, series) s.locks[i].Unlock() - if preCreationErr != nil { - // The callback prevented creation of series. - return nil, false, preCreationErr - } - i = uint64(series.ref) & uint64(s.size-1) s.locks[i].Lock() s.series[i][series.ref] = series s.locks[i].Unlock() - return series, true, nil + return series, true } func (s *stripeSeries) postCreation(lset labels.Labels) { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index d32e632074..8565ac3bbe 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6628,18 +6628,12 @@ func stripeSeriesWithCollidingSeries(t *testing.T) (*stripeSeries, *memSeries, * hash := lbls1.Hash() s := newStripeSeries(1, noopSeriesLifecycleCallback{}) - got, created, err := s.getOrSet(hash, lbls1, func() *memSeries { - return &ms1 - }) - require.NoError(t, err) + got, created := s.setUnlessAlreadySet(hash, lbls1, &ms1) require.True(t, created) require.Same(t, &ms1, got) // Add a conflicting series - got, created, err = s.getOrSet(hash, lbls2, func() *memSeries { - return &ms2 - }) - require.NoError(t, err) + got, created = s.setUnlessAlreadySet(hash, lbls2, &ms2) require.True(t, created) require.Same(t, &ms2, got) From 42b52ecc4be1392a54653a38698909eda03a9a40 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 17 Oct 2025 11:04:34 +0100 Subject: [PATCH 2/2] TSDB: Allocate series ID after seriesLifecycleCallback This callback is not used by Prometheus, but in downstream projects it is wasteful to allocate an ID only to abandon it. Remove lengthy commment which I feel is distracting from the flow. Signed-off-by: Bryan Boreham --- tsdb/head.go | 15 +++++++-------- tsdb/head_test.go | 2 +- tsdb/head_wal.go | 4 ++-- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index a02b05f95d..2c71977b1a 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1738,24 +1738,23 @@ func (*Head) String() string { } func (h *Head) getOrCreate(hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { - // Just using `getOrCreateWithID` below would be semantically sufficient, but we'd create - // a new series on every sample inserted via Add(), which causes allocations - // and makes our series IDs rather random and harder to compress in postings. s := h.series.getByHash(hash, lset) if s != nil { return s, false, nil } - // Optimistically assume that we are the first one to create the series. - id := chunks.HeadSeriesRef(h.lastSeriesID.Inc()) - - return h.getOrCreateWithID(id, hash, lset, pendingCommit) + return h.getOrCreateWithOptionalID(0, hash, lset, pendingCommit) } -func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { +// If id is zero, one will be allocated. +func (h *Head) getOrCreateWithOptionalID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels, pendingCommit bool) (*memSeries, bool, error) { if preCreationErr := h.series.seriesLifecycleCallback.PreCreation(lset); preCreationErr != nil { return nil, false, preCreationErr } + if id == 0 { + // Note this id is wasted in the case where a concurrent operation creates the same series first. + id = chunks.HeadSeriesRef(h.lastSeriesID.Inc()) + } shardHash := uint64(0) if h.opts.EnableSharding { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 8565ac3bbe..44daa7cddc 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1152,7 +1152,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { { name: "keep series still in the head", prepare: func(t *testing.T, h *Head) { - _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) + _, _, err := h.getOrCreateWithOptionalID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) require.NoError(t, err) }, expected: true, diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 3c5390cab4..cb7397e502 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -255,7 +255,7 @@ Outer: switch v := d.(type) { case []record.RefSeries: for _, walSeries := range v { - mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false) + mSeries, created, err := h.getOrCreateWithOptionalID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels, false) if err != nil { seriesCreationErr = err break Outer @@ -1590,7 +1590,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie localRefSeries := shardedRefSeries[idx] for csr := range rc { - series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset, false) + series, _, err := h.getOrCreateWithOptionalID(csr.ref, csr.lset.Hash(), csr.lset, false) if err != nil { errChan <- err return