mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Add stale_series_compaction_threshold config file option
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
parent
4f3de8da29
commit
3e4a094dbb
3 changed files with 48 additions and 1 deletions
|
|
@ -692,6 +692,7 @@ func main() {
|
|||
}
|
||||
if cfgFile.StorageConfig.TSDBConfig != nil {
|
||||
cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
|
||||
cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold
|
||||
if cfgFile.StorageConfig.TSDBConfig.Retention != nil {
|
||||
if cfgFile.StorageConfig.TSDBConfig.Retention.Time > 0 {
|
||||
cfg.tsdb.RetentionDuration = cfgFile.StorageConfig.TSDBConfig.Retention.Time
|
||||
|
|
@ -1943,6 +1944,7 @@ type tsdbOptions struct {
|
|||
UseUncachedIO bool
|
||||
BlockCompactionExcludeFunc tsdb.BlockExcludeFilterFunc
|
||||
BlockReloadInterval model.Duration
|
||||
StaleSeriesCompactionThreshold float64
|
||||
}
|
||||
|
||||
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||
|
|
@ -1969,6 +1971,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
|||
BlockCompactionExcludeFunc: opts.BlockCompactionExcludeFunc,
|
||||
BlockReloadInterval: time.Duration(opts.BlockReloadInterval),
|
||||
FeatureRegistry: features.DefaultRegistry,
|
||||
StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1107,6 +1107,10 @@ type TSDBConfig struct {
|
|||
// This should not be used directly and must be converted into OutOfOrderTimeWindow.
|
||||
OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"`
|
||||
|
||||
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||
// the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately.
|
||||
StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"`
|
||||
|
||||
Retention *TSDBRetentionConfig `yaml:"retention,omitempty"`
|
||||
}
|
||||
|
||||
|
|
|
|||
42
tsdb/db.go
42
tsdb/db.go
|
|
@ -100,6 +100,10 @@ func DefaultOptions() *Options {
|
|||
|
||||
// Options of the DB storage.
|
||||
type Options struct {
|
||||
// staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
|
||||
// This is the one that must be used by the code.
|
||||
staleSeriesCompactionThreshold atomic.Float64
|
||||
|
||||
// Segments (wal files) max size.
|
||||
// WALSegmentSize = 0, segment size is default size.
|
||||
// WALSegmentSize > 0, segment size is WALSegmentSize.
|
||||
|
|
@ -245,6 +249,10 @@ type Options struct {
|
|||
|
||||
// FeatureRegistry is used to register TSDB features.
|
||||
FeatureRegistry features.Collector
|
||||
|
||||
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
|
||||
// the in-memory Head block. If the % of stale series crosses this threshold, stale series compaction is run immediately.
|
||||
StaleSeriesCompactionThreshold float64
|
||||
}
|
||||
|
||||
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
||||
|
|
@ -305,6 +313,10 @@ type DB struct {
|
|||
// out-of-order compaction and vertical queries.
|
||||
oooWasEnabled atomic.Bool
|
||||
|
||||
// lastHeadCompactionTime is the last wall clock time when the head block compaction was started,
|
||||
// irrespective of success or failure. This does not include out-of-order compaction and stale series compaction.
|
||||
lastHeadCompactionTime time.Time
|
||||
|
||||
writeNotified wlog.WriteNotified
|
||||
|
||||
registerer prometheus.Registerer
|
||||
|
|
@ -857,6 +869,8 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
|
|||
// configured maximum block duration.
|
||||
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
|
||||
}
|
||||
|
||||
opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold)
|
||||
return opts, rngs
|
||||
}
|
||||
|
||||
|
|
@ -1151,6 +1165,28 @@ func (db *DB) run(ctx context.Context) {
|
|||
}
|
||||
// We attempt mmapping of head chunks regularly.
|
||||
db.head.mmapHeadChunks()
|
||||
|
||||
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
|
||||
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
|
||||
if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 &&
|
||||
staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() {
|
||||
nextCompactionIsSoon := false
|
||||
if !db.lastHeadCompactionTime.IsZero() {
|
||||
compactionInterval := time.Duration(db.head.chunkRange.Load()) * time.Millisecond
|
||||
nextEstimatedCompactionTime := db.lastHeadCompactionTime.Add(compactionInterval)
|
||||
if time.Now().Add(10 * time.Minute).After(nextEstimatedCompactionTime) {
|
||||
// Next compaction is starting within next 10 mins.
|
||||
nextCompactionIsSoon = true
|
||||
}
|
||||
}
|
||||
|
||||
if !nextCompactionIsSoon {
|
||||
if err := db.CompactStaleHead(); err != nil {
|
||||
db.logger.Error("immediate stale series compaction failed", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case <-db.compactc:
|
||||
db.metrics.compactionsTriggered.Inc()
|
||||
|
||||
|
|
@ -1203,7 +1239,7 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
|
|||
oooTimeWindow := int64(0)
|
||||
if conf.StorageConfig.TSDBConfig != nil {
|
||||
oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
|
||||
|
||||
db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold)
|
||||
// Update retention configuration if provided.
|
||||
if conf.StorageConfig.TSDBConfig.Retention != nil {
|
||||
db.retentionMtx.Lock()
|
||||
|
|
@ -1217,6 +1253,8 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
|
|||
}
|
||||
db.retentionMtx.Unlock()
|
||||
}
|
||||
} else {
|
||||
db.opts.staleSeriesCompactionThreshold.Store(0)
|
||||
}
|
||||
if oooTimeWindow < 0 {
|
||||
oooTimeWindow = 0
|
||||
|
|
@ -1560,6 +1598,8 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
|
|||
// compactHead compacts the given RangeHead.
|
||||
// The db.cmtx should be held before calling this method.
|
||||
func (db *DB) compactHead(head *RangeHead) error {
|
||||
db.lastHeadCompactionTime = time.Now()
|
||||
|
||||
uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("persist head block: %w", err)
|
||||
|
|
|
|||
Loading…
Reference in a new issue