diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 06d5540380..e4f15f5cb8 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -692,6 +692,7 @@ func main() { } if cfgFile.StorageConfig.TSDBConfig != nil { cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow + cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold if cfgFile.StorageConfig.TSDBConfig.Retention != nil { if cfgFile.StorageConfig.TSDBConfig.Retention.Time > 0 { cfg.tsdb.RetentionDuration = cfgFile.StorageConfig.TSDBConfig.Retention.Time @@ -1943,6 +1944,7 @@ type tsdbOptions struct { UseUncachedIO bool BlockCompactionExcludeFunc tsdb.BlockExcludeFilterFunc BlockReloadInterval model.Duration + StaleSeriesCompactionThreshold float64 } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1969,6 +1971,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { BlockCompactionExcludeFunc: opts.BlockCompactionExcludeFunc, BlockReloadInterval: time.Duration(opts.BlockReloadInterval), FeatureRegistry: features.DefaultRegistry, + StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold, } } diff --git a/config/config.go b/config/config.go index 0b9b059ab2..d721d7fb86 100644 --- a/config/config.go +++ b/config/config.go @@ -1107,6 +1107,10 @@ type TSDBConfig struct { // This should not be used directly and must be converted into OutOfOrderTimeWindow. OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"` + // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately. + StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"` + Retention *TSDBRetentionConfig `yaml:"retention,omitempty"` } diff --git a/tsdb/db.go b/tsdb/db.go index e3b00a2d11..1dd524a76a 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -100,6 +100,10 @@ func DefaultOptions() *Options { // Options of the DB storage. type Options struct { + // staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks. + // This is the one that must be used by the code. + staleSeriesCompactionThreshold atomic.Float64 + // Segments (wal files) max size. // WALSegmentSize = 0, segment size is default size. // WALSegmentSize > 0, segment size is WALSegmentSize. @@ -245,6 +249,10 @@ type Options struct { // FeatureRegistry is used to register TSDB features. FeatureRegistry features.Collector + + // StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in + // the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately. + StaleSeriesCompactionThreshold float64 } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -305,6 +313,10 @@ type DB struct { // out-of-order compaction and vertical queries. oooWasEnabled atomic.Bool + // lastHeadCompactionTime is the last wall clock time when the head block compaction was started, + // irrespective of success or failure. This does not include out-of-order compaction and stale series compaction. + lastHeadCompactionTime time.Time + writeNotified wlog.WriteNotified registerer prometheus.Registerer @@ -857,6 +869,8 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { // configured maximum block duration. rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3) } + + opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold) return opts, rngs } @@ -1151,6 +1165,28 @@ func (db *DB) run(ctx context.Context) { } // We attempt mmapping of head chunks regularly. db.head.mmapHeadChunks() + + numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() + staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) + if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 && + staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() { + nextCompactionIsSoon := false + if !db.lastHeadCompactionTime.IsZero() { + compactionInterval := time.Duration(db.head.chunkRange.Load()) * time.Millisecond + nextEstimatedCompactionTime := db.lastHeadCompactionTime.Add(compactionInterval) + if time.Now().Add(10 * time.Minute).After(nextEstimatedCompactionTime) { + // Next compaction is starting within next 10 mins. + nextCompactionIsSoon = true + } + } + + if !nextCompactionIsSoon { + if err := db.CompactStaleHead(); err != nil { + db.logger.Error("immediate stale series compaction failed", "err", err) + } + } + } + case <-db.compactc: db.metrics.compactionsTriggered.Inc() @@ -1203,7 +1239,7 @@ func (db *DB) ApplyConfig(conf *config.Config) error { oooTimeWindow := int64(0) if conf.StorageConfig.TSDBConfig != nil { oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow - + db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold) // Update retention configuration if provided. if conf.StorageConfig.TSDBConfig.Retention != nil { db.retentionMtx.Lock() @@ -1217,6 +1253,8 @@ func (db *DB) ApplyConfig(conf *config.Config) error { } db.retentionMtx.Unlock() } + } else { + db.opts.staleSeriesCompactionThreshold.Store(0) } if oooTimeWindow < 0 { oooTimeWindow = 0 @@ -1560,6 +1598,8 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID // compactHead compacts the given RangeHead. // The db.cmtx should be held before calling this method. func (db *DB) compactHead(head *RangeHead) error { + db.lastHeadCompactionTime = time.Now() + uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil) if err != nil { return fmt.Errorf("persist head block: %w", err)