From 9b444b57afcc72d5820e303f047a965366168a7e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 23 Jan 2026 17:59:41 -0800 Subject: [PATCH] tsdb: Add StaleHead and GC for stale series in the Head block Signed-off-by: Ganesh Vernekar --- tsdb/head.go | 256 ++++++++++++++++++++++++++++++++++++++++++++-- tsdb/head_read.go | 107 +++++++++++++++++++ tsdb/head_test.go | 2 +- 3 files changed, 355 insertions(+), 10 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 4410da407e..3d700944d9 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1203,6 +1203,36 @@ func (h *Head) truncateMemory(mint int64) (err error) { return h.truncateSeriesAndChunkDiskMapper("truncateMemory") } +// truncateStaleSeries removes the provided series as long as they are still stale. +func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) error { + h.chunkSnapshotMtx.Lock() + defer h.chunkSnapshotMtx.Unlock() + + if h.MinTime() >= maxt { + return nil + } + + h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt) + + deleted := h.gcStaleSeries(seriesRefs, maxt) + + // Record these stale series refs in the WAL so that we can ignore them during replay. + if h.wal != nil { + stones := make([]tombstones.Stone, 0, len(seriesRefs)) + for ref := range deleted { + stones = append(stones, tombstones.Stone{ + Ref: ref, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, + }) + } + var enc record.Encoder + if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { + return err + } + } + return nil +} + // WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. // The query timeout limits the max wait time of this function implicitly. // The mint is inclusive and maxt is the truncation time hence exclusive. @@ -1556,6 +1586,53 @@ func (h *RangeHead) String() string { return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) } +// StaleHead allows querying the stale series in the Head via an IndexReader, ChunkReader and tombstones.Reader. +// Used only for compactions. +type StaleHead struct { + RangeHead + staleSeriesRefs []storage.SeriesRef +} + +// NewStaleHead returns a *StaleHead. +func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.SeriesRef) *StaleHead { + return &StaleHead{ + RangeHead: RangeHead{ + head: head, + mint: mint, + maxt: maxt, + }, + staleSeriesRefs: staleSeriesRefs, + } +} + +func (h *StaleHead) Index() (_ IndexReader, err error) { + return h.head.staleIndex(h.mint, h.maxt, h.staleSeriesRefs) +} + +func (h *StaleHead) NumSeries() uint64 { + return h.head.NumStaleSeries() +} + +var staleHeadULID = ulid.MustParse("0000000000XXXXXXXSTALEHEAD") + +func (h *StaleHead) Meta() BlockMeta { + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: staleHeadULID, + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, + } +} + +// String returns an human readable representation of the stake head. It's important to +// keep this function in order to avoid the struct dump when the head is stringified in +// errors or logs. +func (h *StaleHead) String() string { + return fmt.Sprintf("stale head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime()) +} + // Delete all samples in the range of [mint, maxt] for series that satisfy the given // label matchers. func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error { @@ -1625,13 +1702,14 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) { // Drop old chunks and remember series IDs and hashes if they can be // deleted entirely. - deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries) + deleted, affected, chunksRemoved, staleSeriesDeleted, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef) seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) h.numSeries.Sub(uint64(seriesRemoved)) + h.numStaleSeries.Sub(uint64(staleSeriesDeleted)) // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted, affected) @@ -1948,13 +2026,14 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st // but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct // and there's no easy way to cast maps. // minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series. -func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) { +func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _, _ int, _, _ int64, minMmapFile int) { var ( - deleted = map[storage.SeriesRef]struct{}{} - affected = map[labels.Label]struct{}{} - rmChunks = 0 - actualMint int64 = math.MaxInt64 - minOOOTime int64 = math.MaxInt64 + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + rmChunks = 0 + staleSeriesDeleted = 0 + actualMint int64 = math.MaxInt64 + minOOOTime int64 = math.MaxInt64 ) minMmapFile = math.MaxInt32 @@ -2009,7 +2088,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n if value.IsStaleNaN(series.lastValue) || (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { - numStaleSeries.Dec() + staleSeriesDeleted++ } deleted[storage.SeriesRef(series.ref)] = struct{}{} @@ -2025,7 +2104,166 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n actualMint = mint } - return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile + return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile +} + +// gcStaleSeries removes all the provided series as long as they are still stale +// and the series maxt is <= the given max. +// The returned references are the series that got deleted. +func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) map[storage.SeriesRef]struct{} { + // Drop old chunks and remember series IDs and hashes if they can be + // deleted entirely. + deleted, affected, chunksRemoved := h.series.gcStaleSeries(seriesRefs, maxt) + seriesRemoved := len(deleted) + + h.metrics.seriesRemoved.Add(float64(seriesRemoved)) + h.metrics.chunksRemoved.Add(float64(chunksRemoved)) + h.metrics.chunks.Sub(float64(chunksRemoved)) + h.numSeries.Sub(uint64(seriesRemoved)) + h.numStaleSeries.Sub(uint64(seriesRemoved)) + + // Remove deleted series IDs from the postings lists. + h.postings.Delete(deleted, affected) + + // Remove tombstones referring to the deleted series. + h.tombstones.DeleteTombstones(deleted) + + if h.wal != nil { + _, last, _ := wlog.Segments(h.wal.Dir()) + h.walExpiriesMtx.Lock() + // Keep series records until we're past segment 'last' + // because the WAL will still have samples records with + // this ref ID. If we didn't keep these series records then + // on start up when we replay the WAL, or any other code + // that reads the WAL, wouldn't be able to use those + // samples since we would have no labels for that ref ID. + for ref := range deleted { + h.walExpiries[chunks.HeadSeriesRef(ref)] = int64(last) + } + h.walExpiriesMtx.Unlock() + } + + return deleted +} + +// deleteSeriesByID deletes the series with the given reference. +// Only used for WAL replay. +func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) { + var ( + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + staleSeriesDeleted = 0 + chunksRemoved = 0 + ) + + for _, ref := range refs { + refShard := int(ref) & (h.series.size - 1) + h.series.locks[refShard].Lock() + + // Copying getByID here to avoid locking and unlocking twice. + series := h.series.series[refShard][ref] + if series == nil { + h.series.locks[refShard].Unlock() + continue + } + + if value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) { + staleSeriesDeleted++ + } + + hash := series.lset.Hash() + hashShard := int(hash) & (h.series.size - 1) + + chunksRemoved += len(series.mmappedChunks) + if series.headChunks != nil { + chunksRemoved += series.headChunks.len() + } + + deleted[storage.SeriesRef(series.ref)] = struct{}{} + series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) + h.series.hashes[hashShard].del(hash, series.ref) + delete(h.series.series[refShard], series.ref) + + h.series.locks[refShard].Unlock() + } + + h.metrics.seriesRemoved.Add(float64(len(deleted))) + h.metrics.chunksRemoved.Add(float64(chunksRemoved)) + h.metrics.chunks.Sub(float64(chunksRemoved)) + h.numSeries.Sub(uint64(len(deleted))) + h.numStaleSeries.Sub(uint64(staleSeriesDeleted)) + + // Remove deleted series IDs from the postings lists. + h.postings.Delete(deleted, affected) + + // Remove tombstones referring to the deleted series. + h.tombstones.DeleteTombstones(deleted) +} + +// gcStaleSeries removes all the stale series provided that they are still stale +// and the series maxt is <= the given max. +func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) { + var ( + deleted = map[storage.SeriesRef]struct{}{} + affected = map[labels.Label]struct{}{} + rmChunks = 0 + ) + + staleSeriesMap := map[storage.SeriesRef]struct{}{} + for _, ref := range seriesRefs { + staleSeriesMap[ref] = struct{}{} + } + + check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) { + if _, exists := staleSeriesMap[storage.SeriesRef(series.ref)]; !exists { + // This series was not compacted. Skip it. + return + } + + series.Lock() + defer series.Unlock() + + if series.maxTime() > maxt { + return + } + + // Check if the series is still stale. + isStale := value.IsStaleNaN(series.lastValue) || + (series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) || + (series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) + + if !isStale { + return + } + + if series.headChunks != nil { + rmChunks += series.headChunks.len() + } + rmChunks += len(series.mmappedChunks) + + // The series is gone entirely. We need to keep the series lock + // and make sure we have acquired the stripe locks for hash and ID of the + // series alike. + // If we don't hold them all, there's a very small chance that a series receives + // samples again while we are half-way into deleting it. + refShard := int(series.ref) & (s.size - 1) + if hashShard != refShard { + s.locks[refShard].Lock() + defer s.locks[refShard].Unlock() + } + + deleted[storage.SeriesRef(series.ref)] = struct{}{} + series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} }) + s.hashes[hashShard].del(hash, series.ref) + delete(s.series[refShard], series.ref) + deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function. + } + + s.iterForDeletion(check) + + return deleted, affected, rmChunks } // The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each. diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 924b04bf0a..f0a1331fbb 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" @@ -201,6 +202,112 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB return nil } +func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) { + return &headStaleIndexReader{ + headIndexReader: h.indexRange(mint, maxt), + staleSeriesRefs: staleSeriesRefs, + }, nil +} + +// headStaleIndexReader gives the stale series that have no out-of-order data. +// This is only used for stale series compaction at the moment, that will only ask for all +// the series during compaction. So to make that efficient, this index reader requires the +// pre-calculated list of stale series refs that can be returned without re-reading the Head. +type headStaleIndexReader struct { + *headIndexReader + staleSeriesRefs []storage.SeriesRef +} + +func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) { + // If all postings are requested, return the precalculated list. + k, v := index.AllPostingsKey() + if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v { + return index.NewListPostings(h.staleSeriesRefs), nil + } + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...)) + if err != nil { + return index.ErrPostings(err), err + } + return index.NewListPostings(seriesRefs), nil +} + +func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings { + // Unused for compaction, so we don't need to optimise. + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match)) + if err != nil { + return index.ErrPostings(err) + } + return index.NewListPostings(seriesRefs) +} + +func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + // Unused for compaction, so we don't need to optimise. + seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name)) + if err != nil { + return index.ErrPostings(err) + } + return index.NewListPostings(seriesRefs) +} + +// filterStaleSeriesAndSortPostings returns the stale series references from the given postings +// that also do not have any out-of-order data. +func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) { + series := make([]*memSeries, 0, 1024) + + notFoundSeriesCount := 0 + for p.Next() { + s := h.series.getByID(chunks.HeadSeriesRef(p.At())) + if s == nil { + notFoundSeriesCount++ + continue + } + + s.Lock() + if s.ooo != nil { + // Has out-of-order data; skip it because we cannot determine if a series + // is stale when it's getting out-of-order data. + s.Unlock() + continue + } + + if value.IsStaleNaN(s.lastValue) || + (s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) || + (s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) { + series = append(series, s) + } + s.Unlock() + } + if notFoundSeriesCount > 0 { + h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount) + } + if err := p.Err(); err != nil { + return nil, fmt.Errorf("expand postings: %w", err) + } + + slices.SortFunc(series, func(a, b *memSeries) int { + return labels.Compare(a.labels(), b.labels()) + }) + + refs := make([]storage.SeriesRef, 0, len(series)) + for _, p := range series { + refs = append(refs, storage.SeriesRef(p.ref)) + } + return refs, nil +} + +// SortedPostings returns the postings as it is because we expect any postings obtained via +// headStaleIndexReader to be already sorted. +func (*headStaleIndexReader) SortedPostings(p index.Postings) index.Postings { + // All the postings function above already give the sorted list of postings. + return p +} + +// SortedStaleSeriesRefsNoOOOData returns all the series refs of the stale series that do not have any out-of-order data. +func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.SeriesRef, error) { + k, v := index.AllPostingsKey() + return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v)) +} + func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta { for i, c := range s.mmappedChunks { // Do not expose chunks that are outside of the specified range. diff --git a/tsdb/head_test.go b/tsdb/head_test.go index e2b87b6f3f..493f938860 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -6519,7 +6519,7 @@ func TestStripeSeries_gc(t *testing.T) { s, ms1, ms2 := stripeSeriesWithCollidingSeries(t) hash := ms1.lset.Hash() - s.gc(0, 0, nil) + s.gc(0, 0) // Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series got := s.getByHash(hash, ms1.lset)