From 43e69388df9ffc6aaa5f390df810da0d2d8e1088 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 23 Jan 2026 18:00:08 -0800 Subject: [PATCH] tsdb: Add stale series compaction support in the DB Signed-off-by: Ganesh Vernekar --- tsdb/block.go | 16 ++++++++++++++++ tsdb/compact.go | 3 +++ tsdb/db.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/tsdb/block.go b/tsdb/block.go index 3f089b9da7..92638df164 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -228,6 +228,18 @@ func (bm *BlockMetaCompaction) FromOutOfOrder() bool { return slices.Contains(bm.Hints, CompactionHintFromOutOfOrder) } +func (bm *BlockMetaCompaction) SetStaleSeries() { + if bm.FromStaleSeries() { + return + } + bm.Hints = append(bm.Hints, CompactionHintFromStaleSeries) + slices.Sort(bm.Hints) +} + +func (bm *BlockMetaCompaction) FromStaleSeries() bool { + return slices.Contains(bm.Hints, CompactionHintFromStaleSeries) +} + const ( indexFilename = "index" metaFilename = "meta.json" @@ -236,6 +248,10 @@ const ( // CompactionHintFromOutOfOrder is a hint noting that the block // was created from out-of-order chunks. CompactionHintFromOutOfOrder = "from-out-of-order" + + // CompactionHintFromStaleSeries is a hint noting that the block + // was created from stale series. + CompactionHintFromStaleSeries = "from-stale-series" ) func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } diff --git a/tsdb/compact.go b/tsdb/compact.go index 7c21cbcc13..35e0a5b1fd 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -598,6 +598,9 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b if base.Compaction.FromOutOfOrder() { meta.Compaction.SetOutOfOrder() } + if base.Compaction.FromStaleSeries() { + meta.Compaction.SetStaleSeries() + } } err := c.write(dest, meta, DefaultBlockPopulator{}, b) diff --git a/tsdb/db.go b/tsdb/db.go index 3f8bf16209..e3b00a2d11 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1583,6 +1583,52 @@ func (db *DB) compactHead(head *RangeHead) error { return nil } +func (db *DB) CompactStaleHead() error { + db.cmtx.Lock() + defer db.cmtx.Unlock() + + db.logger.Info("Starting stale series compaction") + start := time.Now() + + // We get the stale series reference first because this list can change during the compaction below. + // It is more efficient and easier to provide an index interface for the stale series when we have a static list. + staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background()) + if err != nil { + return err + } + meta := &BlockMeta{} + meta.Compaction.SetStaleSeries() + mint, maxt := db.head.opts.ChunkRange*(db.head.MinTime()/db.head.opts.ChunkRange), db.head.MaxTime() + for ; mint < maxt; mint += db.head.chunkRange.Load() { + staleHead := NewStaleHead(db.Head(), mint, mint+db.head.chunkRange.Load()-1, staleSeriesRefs) + + uids, err := db.compactor.Write(db.dir, staleHead, staleHead.MinTime(), staleHead.BlockMaxTime(), meta) + if err != nil { + return fmt.Errorf("persist stale head: %w", err) + } + + db.logger.Info("Stale series block created", "ulids", fmt.Sprintf("%v", uids), "min_time", mint, "max_time", maxt) + + if err := db.reloadBlocks(); err != nil { + errs := []error{fmt.Errorf("reloadBlocks blocks: %w", err)} + for _, uid := range uids { + if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { + errs = append(errs, fmt.Errorf("delete persisted stale head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll)) + } + } + return errors.Join(errs...) + } + } + + if err := db.head.truncateStaleSeries(staleSeriesRefs, maxt); err != nil { + return fmt.Errorf("head truncate: %w", err) + } + db.head.RebuildSymbolTable(db.logger) + + db.logger.Info("Ending stale series compaction", "num_series", meta.Stats.NumSeries, "duration", time.Since(start)) + return nil +} + // compactBlocks compacts all the eligible on-disk blocks. // The db.cmtx should be held before calling this method. func (db *DB) compactBlocks() (err error) { @@ -2042,7 +2088,7 @@ func (db *DB) inOrderBlocksMaxTime() (maxt int64, ok bool) { maxt, ok = int64(math.MinInt64), false // If blocks are overlapping, last block might not have the max time. So check all blocks. for _, b := range db.Blocks() { - if !b.meta.Compaction.FromOutOfOrder() && b.meta.MaxTime > maxt { + if !b.meta.Compaction.FromOutOfOrder() && !b.meta.Compaction.FromStaleSeries() && b.meta.MaxTime > maxt { ok = true maxt = b.meta.MaxTime }