From 43dc23afe77db51070736eafb98fa87cd7ebd707 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 23 Jan 2026 18:02:45 -0800 Subject: [PATCH] tsdb: Clear stale series from the Head during WAL replay Signed-off-by: Ganesh Vernekar --- tsdb/head_wal.go | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index bbcad9d855..b323f0dbf6 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -308,7 +308,21 @@ Outer: } h.wlReplaySamplesPool.Put(v) case []tombstones.Stone: + // Tombstone records will be fairly rare, so not trying to optimise the allocations here. + deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency) for _, s := range v { + if len(s.Intervals) == 1 && s.Intervals[0].Mint == math.MinInt64 && s.Intervals[0].Maxt == math.MaxInt64 { + // This series was fully deleted at this point. This record is only done for stale series at the moment. + mod := uint64(s.Ref) % uint64(concurrency) + deleteSeriesShards[mod] = append(deleteSeriesShards[mod], chunks.HeadSeriesRef(s.Ref)) + + // If the series is with a different reference, try deleting that. + if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok { + mod := uint64(r) % uint64(concurrency) + deleteSeriesShards[mod] = append(deleteSeriesShards[mod], r) + } + continue + } for _, itv := range s.Intervals { if itv.Maxt < h.minValidTime.Load() { continue @@ -326,6 +340,14 @@ Outer: h.tombstones.AddInterval(s.Ref, itv) } } + + for i := range concurrency { + if len(deleteSeriesShards[i]) > 0 { + processors[i].input <- walSubsetProcessorInputItem{deletedSeriesRefs: deleteSeriesShards[i]} + deleteSeriesShards[i] = nil + } + } + h.wlReplaytStonesPool.Put(v) case []record.RefExemplar: for _, e := range v { @@ -558,10 +580,11 @@ type walSubsetProcessor struct { } type walSubsetProcessorInputItem struct { - samples []record.RefSample - histogramSamples []histogramRecord - existingSeries *memSeries - walSeriesRef chunks.HeadSeriesRef + samples []record.RefSample + histogramSamples []histogramRecord + existingSeries *memSeries + walSeriesRef chunks.HeadSeriesRef + deletedSeriesRefs []chunks.HeadSeriesRef } func (wp *walSubsetProcessor) setup() { @@ -712,6 +735,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp case wp.histogramsOutput <- in.histogramSamples: default: } + + if len(in.deletedSeriesRefs) > 0 { + h.deleteSeriesByID(in.deletedSeriesRefs) + } } h.updateMinMaxTime(mint, maxt)