mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Merge branch 'main' into codesome/stale-series-compaction
This commit is contained in:
commit
e5d7d2b1b1
18 changed files with 282 additions and 125 deletions
|
|
@ -876,7 +876,7 @@ func main() {
|
|||
&cfg.scrape,
|
||||
logger.With("component", "scrape manager"),
|
||||
logging.NewJSONFileLogger,
|
||||
fanoutStorage,
|
||||
fanoutStorage, nil, // TODO(bwplotka): Switch to AppendableV2.
|
||||
prometheus.DefaultRegisterer,
|
||||
)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -734,7 +734,6 @@ func TestTSDBDumpCommand(t *testing.T) {
|
|||
load 1m
|
||||
metric{foo="bar"} 1 2 3
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
for _, c := range []struct {
|
||||
name string
|
||||
|
|
|
|||
|
|
@ -97,7 +97,6 @@ func TestTSDBDump(t *testing.T) {
|
|||
heavy_metric{foo="bar"} 5 4 3 2 1
|
||||
heavy_metric{foo="foo"} 5 4 3 2 1
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
@ -196,7 +195,6 @@ func TestTSDBDumpOpenMetrics(t *testing.T) {
|
|||
my_counter{foo="bar", baz="abc"} 1 2 3 4 5
|
||||
my_gauge{bar="foo", abc="baz"} 9 8 0 4 7
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
|
|||
|
|
@ -338,7 +338,7 @@ func BenchmarkRangeQuery(b *testing.B) {
|
|||
})
|
||||
stor := teststorage.New(b)
|
||||
stor.DisableCompactions() // Don't want auto-compaction disrupting timings.
|
||||
defer stor.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -383,7 +383,6 @@ func BenchmarkRangeQuery(b *testing.B) {
|
|||
func BenchmarkJoinQuery(b *testing.B) {
|
||||
stor := teststorage.New(b)
|
||||
stor.DisableCompactions() // Don't want auto-compaction disrupting timings.
|
||||
defer stor.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
|
|
@ -445,7 +444,6 @@ func BenchmarkJoinQuery(b *testing.B) {
|
|||
|
||||
func BenchmarkNativeHistograms(b *testing.B) {
|
||||
testStorage := teststorage.New(b)
|
||||
defer testStorage.Close()
|
||||
|
||||
app := testStorage.Appender(context.TODO())
|
||||
if err := generateNativeHistogramSeries(app, 3000); err != nil {
|
||||
|
|
@ -523,7 +521,6 @@ func BenchmarkNativeHistograms(b *testing.B) {
|
|||
|
||||
func BenchmarkNativeHistogramsCustomBuckets(b *testing.B) {
|
||||
testStorage := teststorage.New(b)
|
||||
defer testStorage.Close()
|
||||
|
||||
app := testStorage.Appender(context.TODO())
|
||||
if err := generateNativeHistogramCustomBucketsSeries(app, 3000); err != nil {
|
||||
|
|
@ -594,7 +591,6 @@ func BenchmarkNativeHistogramsCustomBuckets(b *testing.B) {
|
|||
func BenchmarkInfoFunction(b *testing.B) {
|
||||
// Initialize test storage and generate test series data.
|
||||
testStorage := teststorage.New(b)
|
||||
defer testStorage.Close()
|
||||
|
||||
start := time.Unix(0, 0)
|
||||
end := start.Add(2 * time.Hour)
|
||||
|
|
|
|||
|
|
@ -676,7 +676,6 @@ func TestEngineEvalStmtTimestamps(t *testing.T) {
|
|||
load 10s
|
||||
metric 1 2
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
cases := []struct {
|
||||
Query string
|
||||
|
|
@ -789,7 +788,6 @@ load 10s
|
|||
metricWith3SampleEvery10Seconds{a="3",b="2"} 1+1x100
|
||||
metricWith1HistogramEvery10Seconds {{schema:1 count:5 sum:20 buckets:[1 2 1 1]}}+{{schema:1 count:10 sum:5 buckets:[1 2 3 4]}}x100
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
cases := []struct {
|
||||
Query string
|
||||
|
|
@ -1339,7 +1337,6 @@ load 10s
|
|||
bigmetric{a="1"} 1+1x100
|
||||
bigmetric{a="2"} 1+1x100
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
// These test cases should be touching the limit exactly (hence no exceeding).
|
||||
// Exceeding the limit will be tested by doing -1 to the MaxSamples.
|
||||
|
|
@ -1523,7 +1520,6 @@ func TestExtendedRangeSelectors(t *testing.T) {
|
|||
withreset 1+1x4 1+1x5
|
||||
notregular 0 5 100 2 8
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
tc := []struct {
|
||||
query string
|
||||
|
|
@ -1677,7 +1673,6 @@ load 10s
|
|||
load 1ms
|
||||
metric_ms 0+1x10000
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
lbls1 := labels.FromStrings("__name__", "metric", "job", "1")
|
||||
lbls2 := labels.FromStrings("__name__", "metric", "job", "2")
|
||||
|
|
@ -2283,7 +2278,6 @@ func TestSubquerySelector(t *testing.T) {
|
|||
t.Run("", func(t *testing.T) {
|
||||
engine := newTestEngine(t)
|
||||
storage := promqltest.LoadedStorage(t, tst.loadString)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
for _, c := range tst.cases {
|
||||
t.Run(c.Query, func(t *testing.T) {
|
||||
|
|
@ -3410,7 +3404,6 @@ metric 0 1 2
|
|||
t.Run(c.name, func(t *testing.T) {
|
||||
engine := promqltest.NewTestEngine(t, false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery)
|
||||
storage := promqltest.LoadedStorage(t, load)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
opts := promql.NewPrometheusQueryOpts(false, c.queryLookback)
|
||||
qry, err := engine.NewInstantQuery(context.Background(), storage, opts, query, c.ts)
|
||||
|
|
@ -3444,7 +3437,7 @@ func TestHistogramCopyFromIteratorRegression(t *testing.T) {
|
|||
histogram {{sum:4 count:4 buckets:[2 2]}} {{sum:6 count:6 buckets:[3 3]}} {{sum:1 count:1 buckets:[1]}}
|
||||
`
|
||||
storage := promqltest.LoadedStorage(t, load)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)
|
||||
|
||||
verify := func(t *testing.T, qry promql.Query, expected []histogram.FloatHistogram) {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ func TestDeriv(t *testing.T) {
|
|||
// This requires more precision than the usual test system offers,
|
||||
// so we test it by hand.
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func TestEvaluations(t *testing.T) {
|
|||
// Run a lot of queries at the same time, to check for race conditions.
|
||||
func TestConcurrentRangeQueries(t *testing.T) {
|
||||
stor := teststorage.New(t)
|
||||
defer stor.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
|
|||
|
|
@ -158,7 +158,6 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
|
|||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70 stale
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -264,7 +263,6 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
|
|||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -359,7 +357,6 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
|
|||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -454,7 +451,6 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
|
|||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -510,7 +506,6 @@ func TestAlertingRuleQueryInTemplate(t *testing.T) {
|
|||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 70 85 70 70
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`sum(http_requests) < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -584,7 +579,6 @@ func BenchmarkAlertingRuleAtomicField(b *testing.B) {
|
|||
|
||||
func TestAlertingRuleDuplicate(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
|
|
@ -621,7 +615,6 @@ func TestAlertingRuleLimit(t *testing.T) {
|
|||
metric{label="1"} 1
|
||||
metric{label="2"} 1
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
tests := []struct {
|
||||
limit int
|
||||
|
|
@ -805,7 +798,6 @@ func TestKeepFiringFor(t *testing.T) {
|
|||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 85 70 70 10x5
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests > 50`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -916,7 +908,6 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
|
|||
load 1m
|
||||
http_requests{job="app-server", instance="0"} 75 10x10
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests > 50`)
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -62,7 +62,6 @@ func TestAlertingRule(t *testing.T) {
|
|||
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85
|
||||
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -205,7 +204,6 @@ func TestForStateAddSamples(t *testing.T) {
|
|||
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85
|
||||
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -367,7 +365,6 @@ func TestForStateRestore(t *testing.T) {
|
|||
http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 50 0 0 25 0 0 40 0 120
|
||||
http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 125 90 60 0 0 25 0 0 40 0 130
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -538,7 +535,7 @@ func TestForStateRestore(t *testing.T) {
|
|||
func TestStaleness(t *testing.T) {
|
||||
for _, queryOffset := range []time.Duration{0, time.Minute} {
|
||||
st := teststorage.New(t)
|
||||
defer st.Close()
|
||||
|
||||
engineOpts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -726,7 +723,7 @@ func TestCopyState(t *testing.T) {
|
|||
|
||||
func TestDeletedRuleMarkedStale(t *testing.T) {
|
||||
st := teststorage.New(t)
|
||||
defer st.Close()
|
||||
|
||||
oldGroup := &Group{
|
||||
rules: []Rule{
|
||||
NewRecordingRule("rule1", nil, labels.FromStrings("l1", "v1")),
|
||||
|
|
@ -772,7 +769,7 @@ func TestUpdate(t *testing.T) {
|
|||
"test": labels.FromStrings("name", "value"),
|
||||
}
|
||||
st := teststorage.New(t)
|
||||
defer st.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -910,7 +907,7 @@ func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File,
|
|||
|
||||
func TestNotify(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
|
||||
engineOpts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -984,7 +981,7 @@ func TestMetricsUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
|
||||
registry := prometheus.NewRegistry()
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
|
|
@ -1057,7 +1054,7 @@ func TestGroupStalenessOnRemoval(t *testing.T) {
|
|||
sameFiles := []string{"fixtures/rules2_copy.yaml"}
|
||||
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -1135,7 +1132,7 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
|
|||
files := []string{"fixtures/rules2.yaml"}
|
||||
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -1205,7 +1202,7 @@ func TestRuleMovedBetweenGroups(t *testing.T) {
|
|||
storage := teststorage.New(t, func(opt *tsdb.Options) {
|
||||
opt.OutOfOrderTimeWindow = 600000
|
||||
})
|
||||
defer storage.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -1287,7 +1284,7 @@ func TestGroupHasAlertingRules(t *testing.T) {
|
|||
|
||||
func TestRuleHealthUpdates(t *testing.T) {
|
||||
st := teststorage.New(t)
|
||||
defer st.Close()
|
||||
|
||||
engineOpts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
|
@ -1348,7 +1345,6 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) {
|
|||
load 5m
|
||||
http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -1463,7 +1459,6 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) {
|
|||
|
||||
func TestNativeHistogramsInRecordingRules(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
// Add some histograms.
|
||||
db := storage.DB
|
||||
|
|
@ -1525,9 +1520,6 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
|
|||
|
||||
func TestManager_LoadGroups_ShouldCheckWhetherEachRuleHasDependentsAndDependencies(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, storage.Close())
|
||||
})
|
||||
|
||||
ruleManager := NewManager(&ManagerOptions{
|
||||
Context: context.Background(),
|
||||
|
|
@ -2021,7 +2013,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
|
|
@ -2060,7 +2052,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
|
|
@ -2099,7 +2091,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
|
|
@ -2144,7 +2136,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
|
|
@ -2192,7 +2184,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
|
|
@ -2231,7 +2223,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
|
|
@ -2277,7 +2269,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
inflightQueries := atomic.Int32{}
|
||||
maxInflight := atomic.Int32{}
|
||||
|
||||
|
|
@ -2325,7 +2317,7 @@ func TestAsyncRuleEvaluation(t *testing.T) {
|
|||
func TestNewRuleGroupRestoration(t *testing.T) {
|
||||
t.Parallel()
|
||||
store := teststorage.New(t)
|
||||
t.Cleanup(func() { store.Close() })
|
||||
|
||||
var (
|
||||
inflightQueries atomic.Int32
|
||||
maxInflight atomic.Int32
|
||||
|
|
@ -2389,7 +2381,7 @@ func TestNewRuleGroupRestoration(t *testing.T) {
|
|||
func TestNewRuleGroupRestorationWithRestoreNewGroupOption(t *testing.T) {
|
||||
t.Parallel()
|
||||
store := teststorage.New(t)
|
||||
t.Cleanup(func() { store.Close() })
|
||||
|
||||
var (
|
||||
inflightQueries atomic.Int32
|
||||
maxInflight atomic.Int32
|
||||
|
|
@ -2459,7 +2451,6 @@ func TestNewRuleGroupRestorationWithRestoreNewGroupOption(t *testing.T) {
|
|||
|
||||
func TestBoundedRuleEvalConcurrency(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
var (
|
||||
inflightQueries atomic.Int32
|
||||
|
|
@ -2514,7 +2505,6 @@ func TestUpdateWhenStopped(t *testing.T) {
|
|||
|
||||
func TestGroup_Eval_RaceConditionOnStoppingGroupEvaluationWhileRulesAreEvaluatedConcurrently(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
var (
|
||||
inflightQueries atomic.Int32
|
||||
|
|
@ -2733,7 +2723,6 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) {
|
|||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
ruleManager := NewManager(&ManagerOptions{
|
||||
Context: context.Background(),
|
||||
|
|
@ -2762,7 +2751,6 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) {
|
|||
|
||||
func BenchmarkRuleDependencyController_AnalyseRules(b *testing.B) {
|
||||
storage := teststorage.New(b)
|
||||
b.Cleanup(func() { storage.Close() })
|
||||
|
||||
ruleManager := NewManager(&ManagerOptions{
|
||||
Context: context.Background(),
|
||||
|
|
|
|||
|
|
@ -121,7 +121,6 @@ func setUpRuleEvalTest(t testing.TB) *teststorage.TestStorage {
|
|||
|
||||
func TestRuleEval(t *testing.T) {
|
||||
storage := setUpRuleEvalTest(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
ng := testEngine(t)
|
||||
for _, scenario := range ruleEvalTestScenarios {
|
||||
|
|
@ -158,7 +157,6 @@ func BenchmarkRuleEval(b *testing.B) {
|
|||
// TestRuleEvalDuplicate tests for duplicate labels in recorded metrics, see #5529.
|
||||
func TestRuleEvalDuplicate(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
|
|
@ -185,7 +183,6 @@ func TestRecordingRuleLimit(t *testing.T) {
|
|||
metric{label="1"} 1
|
||||
metric{label="2"} 1
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
tests := []struct {
|
||||
limit int
|
||||
|
|
|
|||
|
|
@ -39,14 +39,32 @@ import (
|
|||
"github.com/prometheus/prometheus/util/pool"
|
||||
)
|
||||
|
||||
// NewManager is the Manager constructor using Appendable.
|
||||
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), appendable storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
|
||||
// NewManager is the Manager constructor using storage.Appendable or storage.AppendableV2.
|
||||
//
|
||||
// If unsure which one to use/implement, implement AppendableV2 as it significantly simplifies implementation and allows more
|
||||
// (passing ST, always-on metadata, exemplars per sample).
|
||||
//
|
||||
// NewManager returns error if both appendable and appendableV2 are specified.
|
||||
//
|
||||
// Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// storage.Appendable will be removed soon (ETA: Q2 2026).
|
||||
func NewManager(
|
||||
o *Options,
|
||||
logger *slog.Logger,
|
||||
newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error),
|
||||
appendable storage.Appendable,
|
||||
appendableV2 storage.AppendableV2,
|
||||
registerer prometheus.Registerer,
|
||||
) (*Manager, error) {
|
||||
if o == nil {
|
||||
o = &Options{}
|
||||
}
|
||||
if logger == nil {
|
||||
logger = promslog.NewNopLogger()
|
||||
}
|
||||
if appendable != nil && appendableV2 != nil {
|
||||
return nil, errors.New("scrape.NewManager: appendable and appendableV2 cannot be provided at the same time")
|
||||
}
|
||||
|
||||
sm, err := newScrapeMetrics(registerer)
|
||||
if err != nil {
|
||||
|
|
@ -55,6 +73,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str
|
|||
|
||||
m := &Manager{
|
||||
appendable: appendable,
|
||||
appendableV2: appendableV2,
|
||||
opts: o,
|
||||
logger: logger,
|
||||
newScrapeFailureLogger: newScrapeFailureLogger,
|
||||
|
|
|
|||
|
|
@ -522,7 +522,7 @@ scrape_configs:
|
|||
)
|
||||
|
||||
opts := Options{}
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
|
||||
require.NoError(t, err)
|
||||
newLoop := func(scrapeLoopOptions) loop {
|
||||
ch <- struct{}{}
|
||||
|
|
@ -578,7 +578,7 @@ scrape_configs:
|
|||
func TestManagerTargetsUpdates(t *testing.T) {
|
||||
opts := Options{}
|
||||
testRegistry := prometheus.NewRegistry()
|
||||
m, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
m, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
|
||||
require.NoError(t, err)
|
||||
|
||||
ts := make(chan map[string][]*targetgroup.Group)
|
||||
|
|
@ -631,7 +631,7 @@ global:
|
|||
|
||||
opts := Options{}
|
||||
testRegistry := prometheus.NewRegistry()
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Load the first config.
|
||||
|
|
@ -701,7 +701,7 @@ scrape_configs:
|
|||
}
|
||||
|
||||
opts := Options{}
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry)
|
||||
scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry)
|
||||
require.NoError(t, err)
|
||||
|
||||
reload(scrapeManager, cfg1)
|
||||
|
|
@ -735,6 +735,8 @@ func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server
|
|||
}
|
||||
|
||||
// TestManagerSTZeroIngestion tests scrape manager for various ST cases.
|
||||
// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's
|
||||
// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample.
|
||||
func TestManagerSTZeroIngestion(t *testing.T) {
|
||||
t.Parallel()
|
||||
const (
|
||||
|
|
@ -766,7 +768,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
|
|||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: testSTZeroIngest,
|
||||
skipOffsetting: true,
|
||||
}, app)
|
||||
}, app, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)
|
||||
|
|
@ -905,6 +907,8 @@ func generateTestHistogram(i int) *dto.Histogram {
|
|||
return h
|
||||
}
|
||||
|
||||
// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's
|
||||
// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample.
|
||||
func TestManagerSTZeroIngestionHistogram(t *testing.T) {
|
||||
t.Parallel()
|
||||
const mName = "expected_histogram"
|
||||
|
|
@ -950,7 +954,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
|
|||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
|
||||
skipOffsetting: true,
|
||||
}, app)
|
||||
}, app, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
once := sync.Once{}
|
||||
|
|
@ -1030,7 +1034,7 @@ func TestUnregisterMetrics(t *testing.T) {
|
|||
// Check that all metrics can be unregistered, allowing a second manager to be created.
|
||||
for range 2 {
|
||||
opts := Options{}
|
||||
manager, err := NewManager(&opts, nil, nil, nil, reg)
|
||||
manager, err := NewManager(&opts, nil, nil, nil, nil, reg)
|
||||
require.NotNil(t, manager)
|
||||
require.NoError(t, err)
|
||||
// Unregister all metrics.
|
||||
|
|
@ -1043,6 +1047,9 @@ func TestUnregisterMetrics(t *testing.T) {
|
|||
// This test addresses issue #17216 by ensuring the previously blocking check has been removed.
|
||||
// The test verifies that the presence of exemplars in the input does not cause errors,
|
||||
// although exemplars are not preserved during NHCB conversion (as documented below).
|
||||
//
|
||||
// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's
|
||||
// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample.
|
||||
func TestNHCBAndSTZeroIngestion(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
@ -1059,7 +1066,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
|
|||
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
|
||||
EnableStartTimestampZeroIngestion: true,
|
||||
skipOffsetting: true,
|
||||
}, app)
|
||||
}, app, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
once := sync.Once{}
|
||||
|
|
@ -1153,16 +1160,13 @@ func applyConfig(
|
|||
require.NoError(t, discoveryManager.ApplyConfig(c))
|
||||
}
|
||||
|
||||
func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable) (*discovery.Manager, *Manager) {
|
||||
func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable, appV2 storage.AppendableV2) (*discovery.Manager, *Manager) {
|
||||
t.Helper()
|
||||
|
||||
if opts == nil {
|
||||
opts = &Options{}
|
||||
}
|
||||
opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond)
|
||||
if app == nil {
|
||||
app = teststorage.NewAppendable()
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
||||
|
|
@ -1178,7 +1182,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
|
|||
opts,
|
||||
nil,
|
||||
nil,
|
||||
app,
|
||||
app, appV2,
|
||||
prometheus.NewRegistry(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
|
@ -1251,7 +1255,7 @@ scrape_configs:
|
|||
- files: ['%s']
|
||||
`
|
||||
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
applyConfig(
|
||||
|
|
@ -1350,7 +1354,7 @@ scrape_configs:
|
|||
file_sd_configs:
|
||||
- files: ['%s', '%s']
|
||||
`
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
applyConfig(
|
||||
|
|
@ -1409,7 +1413,7 @@ scrape_configs:
|
|||
file_sd_configs:
|
||||
- files: ['%s']
|
||||
`
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
applyConfig(
|
||||
|
|
@ -1475,7 +1479,7 @@ scrape_configs:
|
|||
- targets: ['%s']
|
||||
`
|
||||
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil)
|
||||
discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil)
|
||||
defer scrapeManager.Stop()
|
||||
|
||||
// Apply the initial config with an existing file
|
||||
|
|
@ -1559,7 +1563,7 @@ scrape_configs:
|
|||
|
||||
cfg := loadConfiguration(t, cfgText)
|
||||
|
||||
m, err := NewManager(&Options{}, nil, nil, teststorage.NewAppendable(), prometheus.NewRegistry())
|
||||
m, err := NewManager(&Options{}, nil, nil, nil, nil, prometheus.NewRegistry())
|
||||
require.NoError(t, err)
|
||||
defer m.Stop()
|
||||
require.NoError(t, m.ApplyConfig(cfg))
|
||||
|
|
|
|||
|
|
@ -131,7 +131,6 @@ func testStorageHandlesOutOfOrderTimestamps(t *testing.T, appV2 bool) {
|
|||
// Test with default OutOfOrderTimeWindow (0)
|
||||
t.Run("Out-Of-Order Sample Disabled", func(t *testing.T) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
runScrapeLoopTest(t, appV2, s, false)
|
||||
})
|
||||
|
||||
|
|
@ -140,7 +139,6 @@ func testStorageHandlesOutOfOrderTimestamps(t *testing.T, appV2 bool) {
|
|||
s := teststorage.New(t, func(opt *tsdb.Options) {
|
||||
opt.OutOfOrderTimeWindow = 600000
|
||||
})
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
runScrapeLoopTest(t, appV2, s, true)
|
||||
})
|
||||
|
|
@ -1610,7 +1608,6 @@ func benchScrapeLoopAppend(
|
|||
opt.MaxExemplars = 1e5
|
||||
}
|
||||
})
|
||||
b.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
sl, _ := newTestScrapeLoop(b, withAppendable(s, appV2), func(sl *scrapeLoop) {
|
||||
sl.appendMetadataToWAL = appendMetadataToWAL
|
||||
|
|
@ -1697,7 +1694,6 @@ func BenchmarkScrapeLoopScrapeAndReport(b *testing.B) {
|
|||
parsableText := readTextParseTestMetrics(b)
|
||||
|
||||
s := teststorage.New(b)
|
||||
b.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
sl, scraper := newTestScrapeLoop(b, withAppendable(s, appV2), func(sl *scrapeLoop) {
|
||||
sl.fallbackScrapeProtocol = "application/openmetrics-text"
|
||||
|
|
@ -1730,7 +1726,6 @@ func testSetOptionsHandlingStaleness(t *testing.T, appV2 bool) {
|
|||
s := teststorage.New(t, func(opt *tsdb.Options) {
|
||||
opt.OutOfOrderTimeWindow = 600000
|
||||
})
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
signal := make(chan struct{}, 1)
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
|
|
@ -2001,7 +1996,6 @@ func TestScrapeLoopCache(t *testing.T) {
|
|||
|
||||
func testScrapeLoopCache(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
signal := make(chan struct{}, 1)
|
||||
|
||||
|
|
@ -2071,7 +2065,6 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
|
|||
|
||||
func testScrapeLoopCacheMemoryExhaustionProtection(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
signal := make(chan struct{}, 1)
|
||||
|
||||
|
|
@ -3881,7 +3874,6 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
|
|||
|
||||
func testScrapeLoopRespectTimestamps(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
appTest := teststorage.NewAppendable().Then(s)
|
||||
sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2))
|
||||
|
|
@ -3910,7 +3902,6 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
|
|||
|
||||
func testScrapeLoopDiscardTimestamps(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
appTest := teststorage.NewAppendable().Then(s)
|
||||
sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2), func(sl *scrapeLoop) {
|
||||
|
|
@ -3941,7 +3932,6 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
|
||||
func testScrapeLoopDiscardDuplicateLabels(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
appTest := teststorage.NewAppendable().Then(s)
|
||||
sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2))
|
||||
|
|
@ -3983,7 +3973,6 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|||
|
||||
func testScrapeLoopDiscardUnnamedMetrics(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
appTest := teststorage.NewAppendable().Then(s)
|
||||
sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2), func(sl *scrapeLoop) {
|
||||
|
|
@ -4274,7 +4263,6 @@ func TestScrapeAddFast(t *testing.T) {
|
|||
|
||||
func testScrapeAddFast(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
sl, _ := newTestScrapeLoop(t, withAppendable(s, appV2))
|
||||
|
||||
|
|
@ -4357,7 +4345,6 @@ func TestScrapeReportSingleAppender(t *testing.T) {
|
|||
func testScrapeReportSingleAppender(t *testing.T, appV2 bool) {
|
||||
t.Parallel()
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
signal := make(chan struct{}, 1)
|
||||
|
||||
|
|
@ -4417,7 +4404,6 @@ func TestScrapeReportLimit(t *testing.T) {
|
|||
|
||||
func testScrapeReportLimit(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
cfg := &config.ScrapeConfig{
|
||||
JobName: "test",
|
||||
|
|
@ -4480,7 +4466,6 @@ func TestScrapeUTF8(t *testing.T) {
|
|||
|
||||
func testScrapeUTF8(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
cfg := &config.ScrapeConfig{
|
||||
JobName: "test",
|
||||
|
|
@ -4678,7 +4663,6 @@ func TestLeQuantileReLabel(t *testing.T) {
|
|||
|
||||
func testLeQuantileReLabel(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
cfg := &config.ScrapeConfig{
|
||||
JobName: "test",
|
||||
|
|
@ -5205,7 +5189,6 @@ metric: <
|
|||
t.Run(fmt.Sprintf("%s with %s", name, metricsTextName), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
sl, _ := newTestScrapeLoop(t, withAppendable(s, appV2), func(sl *scrapeLoop) {
|
||||
sl.alwaysScrapeClassicHist = tc.alwaysScrapeClassicHistograms
|
||||
|
|
@ -5293,7 +5276,6 @@ func TestTypeUnitReLabel(t *testing.T) {
|
|||
|
||||
func testTypeUnitReLabel(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
cfg := &config.ScrapeConfig{
|
||||
JobName: "test",
|
||||
|
|
@ -5438,7 +5420,6 @@ func TestScrapeLoopCompression(t *testing.T) {
|
|||
|
||||
func testScrapeLoopCompression(t *testing.T, appV2 bool) {
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
|
||||
metricsText := makeTestGauges(10)
|
||||
|
||||
|
|
@ -5768,17 +5749,12 @@ scrape_configs:
|
|||
`, minBucketFactor, strings.ReplaceAll(metricsServer.URL, "http://", ""))
|
||||
|
||||
s := teststorage.New(t)
|
||||
t.Cleanup(func() { _ = s.Close() })
|
||||
reg := prometheus.NewRegistry()
|
||||
|
||||
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, s, reg)
|
||||
sa := selectAppendable(s, appV2)
|
||||
mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, sa.V1(), sa.V2(), reg)
|
||||
require.NoError(t, err)
|
||||
|
||||
if appV2 {
|
||||
mng.appendableV2 = s
|
||||
mng.appendable = nil
|
||||
}
|
||||
|
||||
cfg, err := config.Load(configStr, promslog.NewNopLogger())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, mng.ApplyConfig(cfg))
|
||||
|
|
@ -6464,7 +6440,6 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) {
|
|||
require.NoError(t, err)
|
||||
|
||||
s := teststorage.New(t)
|
||||
defer s.Close()
|
||||
|
||||
cfg := &config.ScrapeConfig{
|
||||
JobName: "test",
|
||||
|
|
|
|||
|
|
@ -39,7 +39,6 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
|
||||
priStorage := teststorage.New(t)
|
||||
defer priStorage.Close()
|
||||
app1 := priStorage.Appender(ctx)
|
||||
app1.Append(0, inputLabel, 0, 0)
|
||||
inputTotalSize++
|
||||
|
|
@ -51,7 +50,6 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
remoteStorage1 := teststorage.New(t)
|
||||
defer remoteStorage1.Close()
|
||||
app2 := remoteStorage1.Appender(ctx)
|
||||
app2.Append(0, inputLabel, 3000, 3)
|
||||
inputTotalSize++
|
||||
|
|
@ -63,7 +61,6 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
remoteStorage2 := teststorage.New(t)
|
||||
defer remoteStorage2.Close()
|
||||
|
||||
app3 := remoteStorage2.Appender(ctx)
|
||||
app3.Append(0, inputLabel, 6000, 6)
|
||||
|
|
@ -142,7 +139,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
|
|||
inputTotalSize := 0
|
||||
|
||||
priStorage := teststorage.New(t)
|
||||
defer priStorage.Close()
|
||||
app1 := priStorage.AppenderV2(t.Context())
|
||||
_, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
|
|
@ -156,7 +152,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
|
|||
require.NoError(t, app1.Commit())
|
||||
|
||||
remoteStorage1 := teststorage.New(t)
|
||||
defer remoteStorage1.Close()
|
||||
app2 := remoteStorage1.AppenderV2(t.Context())
|
||||
_, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
|
|
@ -170,8 +165,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
|
|||
require.NoError(t, app2.Commit())
|
||||
|
||||
remoteStorage2 := teststorage.New(t)
|
||||
defer remoteStorage2.Close()
|
||||
|
||||
app3 := remoteStorage2.AppenderV2(t.Context())
|
||||
_, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
|
|
@ -246,7 +239,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
|
|||
|
||||
func TestFanoutErrors(t *testing.T) {
|
||||
workingStorage := teststorage.New(t)
|
||||
defer workingStorage.Close()
|
||||
|
||||
cases := []struct {
|
||||
primary storage.Storage
|
||||
|
|
|
|||
|
|
@ -4111,10 +4111,18 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
// Make sure counter resets hints are non-zero, so we can detect ST histogram samples.
|
||||
testHistogram := tsdbutil.GenerateTestHistogram(1)
|
||||
testHistogram.CounterResetHint = histogram.NotCounterReset
|
||||
|
||||
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
|
||||
testFloatHistogram.CounterResetHint = histogram.NotCounterReset
|
||||
|
||||
testNHCB := tsdbutil.GenerateTestCustomBucketsHistogram(1)
|
||||
testNHCB.CounterResetHint = histogram.NotCounterReset
|
||||
|
||||
testFloatNHCB := tsdbutil.GenerateTestCustomBucketsFloatHistogram(1)
|
||||
testFloatNHCB.CounterResetHint = histogram.NotCounterReset
|
||||
|
||||
// TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the
|
||||
// following two zero histograms should be histogram.CounterReset.
|
||||
// following zero histograms should be histogram.CounterReset.
|
||||
testZeroHistogram := &histogram.Histogram{
|
||||
Schema: testHistogram.Schema,
|
||||
ZeroThreshold: testHistogram.ZeroThreshold,
|
||||
|
|
@ -4131,6 +4139,19 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
PositiveBuckets: []float64{0, 0, 0, 0},
|
||||
NegativeBuckets: []float64{0, 0, 0, 0},
|
||||
}
|
||||
testZeroNHCB := &histogram.Histogram{
|
||||
Schema: testNHCB.Schema,
|
||||
PositiveSpans: testNHCB.PositiveSpans,
|
||||
PositiveBuckets: []int64{0, 0, 0, 0},
|
||||
CustomValues: testNHCB.CustomValues,
|
||||
}
|
||||
testZeroFloatNHCB := &histogram.FloatHistogram{
|
||||
Schema: testFloatNHCB.Schema,
|
||||
PositiveSpans: testFloatNHCB.PositiveSpans,
|
||||
PositiveBuckets: []float64{0, 0, 0, 0},
|
||||
CustomValues: testFloatNHCB.CustomValues,
|
||||
}
|
||||
|
||||
type appendableSamples struct {
|
||||
ts int64
|
||||
fSample float64
|
||||
|
|
@ -4183,6 +4204,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "In order ct+normal sample/NHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, h: testNHCB, st: 1},
|
||||
{ts: 101, h: testNHCB, st: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: testZeroNHCB},
|
||||
sample{t: 100, h: testNHCB},
|
||||
sample{t: 101, h: testNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "In order ct+normal sample/floatNHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, fh: testFloatNHCB, st: 1},
|
||||
{ts: 101, fh: testFloatNHCB, st: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: testZeroFloatNHCB},
|
||||
sample{t: 100, fh: testFloatNHCB},
|
||||
sample{t: 101, fh: testFloatNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "Consecutive appends with same st ignore st/floatSample",
|
||||
appendableSamples: []appendableSamples{
|
||||
|
|
@ -4223,6 +4272,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "Consecutive appends with same st ignore st/NHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, h: testNHCB, st: 1},
|
||||
{ts: 101, h: testNHCB, st: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: testZeroNHCB},
|
||||
sample{t: 100, h: testNHCB},
|
||||
sample{t: 101, h: testNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "Consecutive appends with same st ignore st/floatNHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, fh: testFloatNHCB, st: 1},
|
||||
{ts: 101, fh: testFloatNHCB, st: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: testZeroFloatNHCB},
|
||||
sample{t: 100, fh: testFloatNHCB},
|
||||
sample{t: 101, fh: testFloatNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "Consecutive appends with newer st do not ignore st/floatSample",
|
||||
appendableSamples: []appendableSamples{
|
||||
|
|
@ -4262,6 +4339,32 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
sample{t: 102, fh: testFloatHistogram},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Consecutive appends with newer st do not ignore st/NHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, h: testNHCB, st: 1},
|
||||
{ts: 102, h: testNHCB, st: 101},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
sample{t: 1, h: testZeroNHCB},
|
||||
sample{t: 100, h: testNHCB},
|
||||
sample{t: 101, h: testZeroNHCB},
|
||||
sample{t: 102, h: testNHCB},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Consecutive appends with newer st do not ignore st/floatNHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, fh: testFloatNHCB, st: 1},
|
||||
{ts: 102, fh: testFloatNHCB, st: 101},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
sample{t: 1, fh: testZeroFloatNHCB},
|
||||
sample{t: 100, fh: testFloatNHCB},
|
||||
sample{t: 101, fh: testZeroFloatNHCB},
|
||||
sample{t: 102, fh: testFloatNHCB},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ST equals to previous sample timestamp is ignored/floatSample",
|
||||
appendableSamples: []appendableSamples{
|
||||
|
|
@ -4302,6 +4405,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST equals to previous sample timestamp is ignored/NHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, h: testNHCB, st: 1},
|
||||
{ts: 101, h: testNHCB, st: 100},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: testZeroNHCB},
|
||||
sample{t: 100, h: testNHCB},
|
||||
sample{t: 101, h: testNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST equals to previous sample timestamp is ignored/floatNHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, fh: testFloatNHCB, st: 1},
|
||||
{ts: 101, fh: testFloatNHCB, st: 100},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: testZeroFloatNHCB},
|
||||
sample{t: 100, fh: testFloatNHCB},
|
||||
sample{t: 101, fh: testFloatNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST lower than minValidTime/float",
|
||||
appendableSamples: []appendableSamples{
|
||||
|
|
@ -4349,6 +4480,40 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST lower than minValidTime/NHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, h: testNHCB, st: -1},
|
||||
},
|
||||
// ST results ErrOutOfBounds, but ST append is best effort, so
|
||||
// ST should be ignored, but sample appended.
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
// NOTE: Without ST, on query, first histogram sample will get
|
||||
// CounterReset adjusted to 0.
|
||||
firstSample := testNHCB.Copy()
|
||||
firstSample.CounterResetHint = histogram.UnknownCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 100, h: firstSample},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST lower than minValidTime/floatNHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, fh: testFloatNHCB, st: -1},
|
||||
},
|
||||
// ST results ErrOutOfBounds, but ST append is best effort, so
|
||||
// ST should be ignored, but sample appended.
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
// NOTE: Without ST, on query, first histogram sample will get
|
||||
// CounterReset adjusted to 0.
|
||||
firstSample := testFloatNHCB.Copy()
|
||||
firstSample.CounterResetHint = histogram.UnknownCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 100, fh: firstSample},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST duplicates an existing sample/float",
|
||||
appendableSamples: []appendableSamples{
|
||||
|
|
@ -4402,6 +4567,44 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) {
|
|||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST duplicates an existing sample/NHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, h: testNHCB},
|
||||
{ts: 200, h: testNHCB, st: 100},
|
||||
},
|
||||
// ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so
|
||||
// ST should be ignored, but sample appended.
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
// NOTE: Without ST, on query, first histogram sample will get
|
||||
// CounterReset adjusted to 0.
|
||||
firstSample := testNHCB.Copy()
|
||||
firstSample.CounterResetHint = histogram.UnknownCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 100, h: firstSample},
|
||||
sample{t: 200, h: testNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
{
|
||||
name: "ST duplicates an existing sample/floatNHCB",
|
||||
appendableSamples: []appendableSamples{
|
||||
{ts: 100, fh: testFloatNHCB},
|
||||
{ts: 200, fh: testFloatNHCB, st: 100},
|
||||
},
|
||||
// ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so
|
||||
// ST should ignored, but sample appended.
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
// NOTE: Without ST, on query, first histogram sample will get
|
||||
// CounterReset adjusted to 0.
|
||||
firstSample := testFloatNHCB.Copy()
|
||||
firstSample.CounterResetHint = histogram.UnknownCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 100, fh: firstSample},
|
||||
sample{t: 200, fh: testFloatNHCB},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
|
||||
|
|
|
|||
|
|
@ -32,14 +32,22 @@ type Option func(opt *tsdb.Options)
|
|||
|
||||
// New returns a new TestStorage for testing purposes
|
||||
// that removes all associated files on closing.
|
||||
//
|
||||
// Caller does not need to close the TestStorage after use, it's deferred via t.Cleanup.
|
||||
func New(t testing.TB, o ...Option) *TestStorage {
|
||||
s, err := NewWithError(o...)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = s.Close() // Ignore errors, as it could be a double close.
|
||||
})
|
||||
return s
|
||||
}
|
||||
|
||||
// NewWithError returns a new TestStorage for user facing tests, which reports
|
||||
// errors directly.
|
||||
//
|
||||
// It's a caller responsibility to close the TestStorage after use.
|
||||
func NewWithError(o ...Option) (*TestStorage, error) {
|
||||
// Tests just load data for a series sequentially. Thus we
|
||||
// need a long appendable window.
|
||||
|
|
|
|||
|
|
@ -324,7 +324,6 @@ func (m *rulesRetrieverMock) CreateRuleGroups() {
|
|||
m.CreateAlertingRules()
|
||||
arules := m.AlertingRules()
|
||||
storage := teststorage.New(m.testing)
|
||||
defer storage.Close()
|
||||
|
||||
engineOpts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
|
|
@ -414,7 +413,6 @@ func TestEndpoints(t *testing.T) {
|
|||
test_metric5{"host.name"="localhost"} 1+0x100
|
||||
test_metric5{"junk\n{},=: chars"="bar"} 1+0x100
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
start := time.Unix(0, 0)
|
||||
exemplars := []exemplar.QueryResult{
|
||||
|
|
@ -575,7 +573,7 @@ func TestGetSeries(t *testing.T) {
|
|||
test_metric2{foo="boo", xyz="qwerty"} 1+0x100
|
||||
test_metric2{foo="baz", abc="qwerty"} 1+0x100
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
api := &API{
|
||||
Queryable: storage,
|
||||
}
|
||||
|
|
@ -682,7 +680,6 @@ func TestQueryExemplars(t *testing.T) {
|
|||
test_metric4{foo="boo", dup="1"} 1+0x100
|
||||
test_metric4{foo="boo"} 1+0x100
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
api := &API{
|
||||
Queryable: storage,
|
||||
|
|
@ -798,7 +795,7 @@ func TestLabelNames(t *testing.T) {
|
|||
test_metric2{foo="boo", xyz="qwerty"} 1+0x100
|
||||
test_metric2{foo="baz", abc="qwerty"} 1+0x100
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
api := &API{
|
||||
Queryable: storage,
|
||||
}
|
||||
|
|
@ -901,7 +898,6 @@ func (testStats) Builtin() (_ stats.BuiltinStats) {
|
|||
|
||||
func TestStats(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
api := &API{
|
||||
Queryable: storage,
|
||||
|
|
|
|||
|
|
@ -212,7 +212,6 @@ func TestFederation(t *testing.T) {
|
|||
test_metric_stale 1+10x99 stale
|
||||
test_metric_old 1+10x98
|
||||
`)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
h := &Handler{
|
||||
localStorage: &dbAdapter{storage.DB},
|
||||
|
|
@ -303,7 +302,6 @@ func normalizeBody(body *bytes.Buffer) string {
|
|||
|
||||
func TestFederationWithNativeHistograms(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
t.Cleanup(func() { storage.Close() })
|
||||
|
||||
var expVec promql.Vector
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue