mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
tsdb: Add stale series compaction support in the DB
Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
parent
9b444b57af
commit
43e69388df
3 changed files with 66 additions and 1 deletions
|
|
@ -228,6 +228,18 @@ func (bm *BlockMetaCompaction) FromOutOfOrder() bool {
|
|||
return slices.Contains(bm.Hints, CompactionHintFromOutOfOrder)
|
||||
}
|
||||
|
||||
func (bm *BlockMetaCompaction) SetStaleSeries() {
|
||||
if bm.FromStaleSeries() {
|
||||
return
|
||||
}
|
||||
bm.Hints = append(bm.Hints, CompactionHintFromStaleSeries)
|
||||
slices.Sort(bm.Hints)
|
||||
}
|
||||
|
||||
func (bm *BlockMetaCompaction) FromStaleSeries() bool {
|
||||
return slices.Contains(bm.Hints, CompactionHintFromStaleSeries)
|
||||
}
|
||||
|
||||
const (
|
||||
indexFilename = "index"
|
||||
metaFilename = "meta.json"
|
||||
|
|
@ -236,6 +248,10 @@ const (
|
|||
// CompactionHintFromOutOfOrder is a hint noting that the block
|
||||
// was created from out-of-order chunks.
|
||||
CompactionHintFromOutOfOrder = "from-out-of-order"
|
||||
|
||||
// CompactionHintFromStaleSeries is a hint noting that the block
|
||||
// was created from stale series.
|
||||
CompactionHintFromStaleSeries = "from-stale-series"
|
||||
)
|
||||
|
||||
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
||||
|
|
|
|||
|
|
@ -598,6 +598,9 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b
|
|||
if base.Compaction.FromOutOfOrder() {
|
||||
meta.Compaction.SetOutOfOrder()
|
||||
}
|
||||
if base.Compaction.FromStaleSeries() {
|
||||
meta.Compaction.SetStaleSeries()
|
||||
}
|
||||
}
|
||||
|
||||
err := c.write(dest, meta, DefaultBlockPopulator{}, b)
|
||||
|
|
|
|||
48
tsdb/db.go
48
tsdb/db.go
|
|
@ -1583,6 +1583,52 @@ func (db *DB) compactHead(head *RangeHead) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) CompactStaleHead() error {
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
db.logger.Info("Starting stale series compaction")
|
||||
start := time.Now()
|
||||
|
||||
// We get the stale series reference first because this list can change during the compaction below.
|
||||
// It is more efficient and easier to provide an index interface for the stale series when we have a static list.
|
||||
staleSeriesRefs, err := db.head.SortedStaleSeriesRefsNoOOOData(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta := &BlockMeta{}
|
||||
meta.Compaction.SetStaleSeries()
|
||||
mint, maxt := db.head.opts.ChunkRange*(db.head.MinTime()/db.head.opts.ChunkRange), db.head.MaxTime()
|
||||
for ; mint < maxt; mint += db.head.chunkRange.Load() {
|
||||
staleHead := NewStaleHead(db.Head(), mint, mint+db.head.chunkRange.Load()-1, staleSeriesRefs)
|
||||
|
||||
uids, err := db.compactor.Write(db.dir, staleHead, staleHead.MinTime(), staleHead.BlockMaxTime(), meta)
|
||||
if err != nil {
|
||||
return fmt.Errorf("persist stale head: %w", err)
|
||||
}
|
||||
|
||||
db.logger.Info("Stale series block created", "ulids", fmt.Sprintf("%v", uids), "min_time", mint, "max_time", maxt)
|
||||
|
||||
if err := db.reloadBlocks(); err != nil {
|
||||
errs := []error{fmt.Errorf("reloadBlocks blocks: %w", err)}
|
||||
for _, uid := range uids {
|
||||
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
|
||||
errs = append(errs, fmt.Errorf("delete persisted stale head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
|
||||
}
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.head.truncateStaleSeries(staleSeriesRefs, maxt); err != nil {
|
||||
return fmt.Errorf("head truncate: %w", err)
|
||||
}
|
||||
db.head.RebuildSymbolTable(db.logger)
|
||||
|
||||
db.logger.Info("Ending stale series compaction", "num_series", meta.Stats.NumSeries, "duration", time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
||||
// compactBlocks compacts all the eligible on-disk blocks.
|
||||
// The db.cmtx should be held before calling this method.
|
||||
func (db *DB) compactBlocks() (err error) {
|
||||
|
|
@ -2042,7 +2088,7 @@ func (db *DB) inOrderBlocksMaxTime() (maxt int64, ok bool) {
|
|||
maxt, ok = int64(math.MinInt64), false
|
||||
// If blocks are overlapping, last block might not have the max time. So check all blocks.
|
||||
for _, b := range db.Blocks() {
|
||||
if !b.meta.Compaction.FromOutOfOrder() && b.meta.MaxTime > maxt {
|
||||
if !b.meta.Compaction.FromOutOfOrder() && !b.meta.Compaction.FromStaleSeries() && b.meta.MaxTime > maxt {
|
||||
ok = true
|
||||
maxt = b.meta.MaxTime
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue