diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 53584085ec..e4f15f5cb8 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -876,7 +876,7 @@ func main() { &cfg.scrape, logger.With("component", "scrape manager"), logging.NewJSONFileLogger, - fanoutStorage, + fanoutStorage, nil, // TODO(bwplotka): Switch to AppendableV2. prometheus.DefaultRegisterer, ) if err != nil { diff --git a/cmd/promtool/main_test.go b/cmd/promtool/main_test.go index 4f4ca3de71..9e6e7268f7 100644 --- a/cmd/promtool/main_test.go +++ b/cmd/promtool/main_test.go @@ -734,7 +734,6 @@ func TestTSDBDumpCommand(t *testing.T) { load 1m metric{foo="bar"} 1 2 3 `) - t.Cleanup(func() { storage.Close() }) for _, c := range []struct { name string diff --git a/cmd/promtool/tsdb_test.go b/cmd/promtool/tsdb_test.go index 3a2a5aff72..859c521d64 100644 --- a/cmd/promtool/tsdb_test.go +++ b/cmd/promtool/tsdb_test.go @@ -97,7 +97,6 @@ func TestTSDBDump(t *testing.T) { heavy_metric{foo="bar"} 5 4 3 2 1 heavy_metric{foo="foo"} 5 4 3 2 1 `) - t.Cleanup(func() { storage.Close() }) tests := []struct { name string @@ -196,7 +195,6 @@ func TestTSDBDumpOpenMetrics(t *testing.T) { my_counter{foo="bar", baz="abc"} 1 2 3 4 5 my_gauge{bar="foo", abc="baz"} 9 8 0 4 7 `) - t.Cleanup(func() { storage.Close() }) tests := []struct { name string diff --git a/promql/bench_test.go b/promql/bench_test.go index f647b03600..2e70718b3b 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -338,7 +338,7 @@ func BenchmarkRangeQuery(b *testing.B) { }) stor := teststorage.New(b) stor.DisableCompactions() // Don't want auto-compaction disrupting timings. - defer stor.Close() + opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -383,7 +383,6 @@ func BenchmarkRangeQuery(b *testing.B) { func BenchmarkJoinQuery(b *testing.B) { stor := teststorage.New(b) stor.DisableCompactions() // Don't want auto-compaction disrupting timings. - defer stor.Close() opts := promql.EngineOpts{ Logger: nil, @@ -445,7 +444,6 @@ func BenchmarkJoinQuery(b *testing.B) { func BenchmarkNativeHistograms(b *testing.B) { testStorage := teststorage.New(b) - defer testStorage.Close() app := testStorage.Appender(context.TODO()) if err := generateNativeHistogramSeries(app, 3000); err != nil { @@ -523,7 +521,6 @@ func BenchmarkNativeHistograms(b *testing.B) { func BenchmarkNativeHistogramsCustomBuckets(b *testing.B) { testStorage := teststorage.New(b) - defer testStorage.Close() app := testStorage.Appender(context.TODO()) if err := generateNativeHistogramCustomBucketsSeries(app, 3000); err != nil { @@ -594,7 +591,6 @@ func BenchmarkNativeHistogramsCustomBuckets(b *testing.B) { func BenchmarkInfoFunction(b *testing.B) { // Initialize test storage and generate test series data. testStorage := teststorage.New(b) - defer testStorage.Close() start := time.Unix(0, 0) end := start.Add(2 * time.Hour) diff --git a/promql/engine_test.go b/promql/engine_test.go index 0eff93af4c..ca1d5471c1 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -676,7 +676,6 @@ func TestEngineEvalStmtTimestamps(t *testing.T) { load 10s metric 1 2 `) - t.Cleanup(func() { storage.Close() }) cases := []struct { Query string @@ -789,7 +788,6 @@ load 10s metricWith3SampleEvery10Seconds{a="3",b="2"} 1+1x100 metricWith1HistogramEvery10Seconds {{schema:1 count:5 sum:20 buckets:[1 2 1 1]}}+{{schema:1 count:10 sum:5 buckets:[1 2 3 4]}}x100 `) - t.Cleanup(func() { storage.Close() }) cases := []struct { Query string @@ -1339,7 +1337,6 @@ load 10s bigmetric{a="1"} 1+1x100 bigmetric{a="2"} 1+1x100 `) - t.Cleanup(func() { storage.Close() }) // These test cases should be touching the limit exactly (hence no exceeding). // Exceeding the limit will be tested by doing -1 to the MaxSamples. @@ -1523,7 +1520,6 @@ func TestExtendedRangeSelectors(t *testing.T) { withreset 1+1x4 1+1x5 notregular 0 5 100 2 8 `) - t.Cleanup(func() { storage.Close() }) tc := []struct { query string @@ -1677,7 +1673,6 @@ load 10s load 1ms metric_ms 0+1x10000 `) - t.Cleanup(func() { storage.Close() }) lbls1 := labels.FromStrings("__name__", "metric", "job", "1") lbls2 := labels.FromStrings("__name__", "metric", "job", "2") @@ -2283,7 +2278,6 @@ func TestSubquerySelector(t *testing.T) { t.Run("", func(t *testing.T) { engine := newTestEngine(t) storage := promqltest.LoadedStorage(t, tst.loadString) - t.Cleanup(func() { storage.Close() }) for _, c := range tst.cases { t.Run(c.Query, func(t *testing.T) { @@ -3410,7 +3404,6 @@ metric 0 1 2 t.Run(c.name, func(t *testing.T) { engine := promqltest.NewTestEngine(t, false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery) storage := promqltest.LoadedStorage(t, load) - t.Cleanup(func() { storage.Close() }) opts := promql.NewPrometheusQueryOpts(false, c.queryLookback) qry, err := engine.NewInstantQuery(context.Background(), storage, opts, query, c.ts) @@ -3444,7 +3437,7 @@ func TestHistogramCopyFromIteratorRegression(t *testing.T) { histogram {{sum:4 count:4 buckets:[2 2]}} {{sum:6 count:6 buckets:[3 3]}} {{sum:1 count:1 buckets:[1]}} ` storage := promqltest.LoadedStorage(t, load) - t.Cleanup(func() { storage.Close() }) + engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery) verify := func(t *testing.T, qry promql.Query, expected []histogram.FloatHistogram) { diff --git a/promql/functions_test.go b/promql/functions_test.go index 2566843092..023417bfc2 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -33,7 +33,7 @@ func TestDeriv(t *testing.T) { // This requires more precision than the usual test system offers, // so we test it by hand. storage := teststorage.New(t) - defer storage.Close() + opts := promql.EngineOpts{ Logger: nil, Reg: nil, diff --git a/promql/promql_test.go b/promql/promql_test.go index fc13f7e64f..a6bc437b6b 100644 --- a/promql/promql_test.go +++ b/promql/promql_test.go @@ -39,7 +39,7 @@ func TestEvaluations(t *testing.T) { // Run a lot of queries at the same time, to check for race conditions. func TestConcurrentRangeQueries(t *testing.T) { stor := teststorage.New(t) - defer stor.Close() + opts := promql.EngineOpts{ Logger: nil, Reg: nil, diff --git a/rules/alerting_test.go b/rules/alerting_test.go index caf32e6472..ec53d9086b 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -158,7 +158,6 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { load 1m http_requests{job="app-server", instance="0"} 75 85 70 70 stale `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests < 100`) require.NoError(t, err) @@ -264,7 +263,6 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { load 1m http_requests{job="app-server", instance="0"} 75 85 70 70 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests < 100`) require.NoError(t, err) @@ -359,7 +357,6 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { load 1m http_requests{job="app-server", instance="0"} 75 85 70 70 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests < 100`) require.NoError(t, err) @@ -454,7 +451,6 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { load 1m http_requests{job="app-server", instance="0"} 75 85 70 70 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests < 100`) require.NoError(t, err) @@ -510,7 +506,6 @@ func TestAlertingRuleQueryInTemplate(t *testing.T) { load 1m http_requests{job="app-server", instance="0"} 70 85 70 70 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`sum(http_requests) < 100`) require.NoError(t, err) @@ -584,7 +579,6 @@ func BenchmarkAlertingRuleAtomicField(b *testing.B) { func TestAlertingRuleDuplicate(t *testing.T) { storage := teststorage.New(t) - defer storage.Close() opts := promql.EngineOpts{ Logger: nil, @@ -621,7 +615,6 @@ func TestAlertingRuleLimit(t *testing.T) { metric{label="1"} 1 metric{label="2"} 1 `) - t.Cleanup(func() { storage.Close() }) tests := []struct { limit int @@ -805,7 +798,6 @@ func TestKeepFiringFor(t *testing.T) { load 1m http_requests{job="app-server", instance="0"} 75 85 70 70 10x5 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests > 50`) require.NoError(t, err) @@ -916,7 +908,6 @@ func TestPendingAndKeepFiringFor(t *testing.T) { load 1m http_requests{job="app-server", instance="0"} 75 10x10 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests > 50`) require.NoError(t, err) diff --git a/rules/manager_test.go b/rules/manager_test.go index a716304b7a..3fcb90808e 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -62,7 +62,6 @@ func TestAlertingRule(t *testing.T) { http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85 http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) require.NoError(t, err) @@ -205,7 +204,6 @@ func TestForStateAddSamples(t *testing.T) { http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 95 105 105 95 85 http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 80 90 100 110 120 130 140 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) require.NoError(t, err) @@ -367,7 +365,6 @@ func TestForStateRestore(t *testing.T) { http_requests{job="app-server", instance="0", group="canary", severity="overwrite-me"} 75 85 50 0 0 25 0 0 40 0 120 http_requests{job="app-server", instance="1", group="canary", severity="overwrite-me"} 125 90 60 0 0 25 0 0 40 0 130 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) require.NoError(t, err) @@ -538,7 +535,7 @@ func TestForStateRestore(t *testing.T) { func TestStaleness(t *testing.T) { for _, queryOffset := range []time.Duration{0, time.Minute} { st := teststorage.New(t) - defer st.Close() + engineOpts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -726,7 +723,7 @@ func TestCopyState(t *testing.T) { func TestDeletedRuleMarkedStale(t *testing.T) { st := teststorage.New(t) - defer st.Close() + oldGroup := &Group{ rules: []Rule{ NewRecordingRule("rule1", nil, labels.FromStrings("l1", "v1")), @@ -772,7 +769,7 @@ func TestUpdate(t *testing.T) { "test": labels.FromStrings("name", "value"), } st := teststorage.New(t) - defer st.Close() + opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -910,7 +907,7 @@ func reloadAndValidate(rgs *rulefmt.RuleGroups, t *testing.T, tmpFile *os.File, func TestNotify(t *testing.T) { storage := teststorage.New(t) - defer storage.Close() + engineOpts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -984,7 +981,7 @@ func TestMetricsUpdate(t *testing.T) { } storage := teststorage.New(t) - defer storage.Close() + registry := prometheus.NewRegistry() opts := promql.EngineOpts{ Logger: nil, @@ -1057,7 +1054,7 @@ func TestGroupStalenessOnRemoval(t *testing.T) { sameFiles := []string{"fixtures/rules2_copy.yaml"} storage := teststorage.New(t) - defer storage.Close() + opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -1135,7 +1132,7 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { files := []string{"fixtures/rules2.yaml"} storage := teststorage.New(t) - defer storage.Close() + opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -1205,7 +1202,7 @@ func TestRuleMovedBetweenGroups(t *testing.T) { storage := teststorage.New(t, func(opt *tsdb.Options) { opt.OutOfOrderTimeWindow = 600000 }) - defer storage.Close() + opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -1287,7 +1284,7 @@ func TestGroupHasAlertingRules(t *testing.T) { func TestRuleHealthUpdates(t *testing.T) { st := teststorage.New(t) - defer st.Close() + engineOpts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -1348,7 +1345,6 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { load 5m http_requests{instance="0"} 75 85 50 0 0 25 0 0 40 0 120 `) - t.Cleanup(func() { storage.Close() }) expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) require.NoError(t, err) @@ -1463,7 +1459,6 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { func TestNativeHistogramsInRecordingRules(t *testing.T) { storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) // Add some histograms. db := storage.DB @@ -1525,9 +1520,6 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { func TestManager_LoadGroups_ShouldCheckWhetherEachRuleHasDependentsAndDependencies(t *testing.T) { storage := teststorage.New(t) - t.Cleanup(func() { - require.NoError(t, storage.Close()) - }) ruleManager := NewManager(&ManagerOptions{ Context: context.Background(), @@ -2021,7 +2013,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { t.Run("synchronous evaluation with independent rules", func(t *testing.T) { t.Parallel() storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} maxInflight := atomic.Int32{} @@ -2060,7 +2052,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) { t.Parallel() storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} maxInflight := atomic.Int32{} @@ -2099,7 +2091,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) { t.Parallel() storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} maxInflight := atomic.Int32{} @@ -2144,7 +2136,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) { t.Parallel() storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} maxInflight := atomic.Int32{} @@ -2192,7 +2184,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { t.Run("asynchronous evaluation of independent rules, with indeterminate. Should be synchronous", func(t *testing.T) { t.Parallel() storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} maxInflight := atomic.Int32{} @@ -2231,7 +2223,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { t.Run("asynchronous evaluation of rules that benefit from reordering", func(t *testing.T) { t.Parallel() storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} maxInflight := atomic.Int32{} @@ -2277,7 +2269,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { t.Run("attempted asynchronous evaluation of chained rules", func(t *testing.T) { t.Parallel() storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) + inflightQueries := atomic.Int32{} maxInflight := atomic.Int32{} @@ -2325,7 +2317,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { func TestNewRuleGroupRestoration(t *testing.T) { t.Parallel() store := teststorage.New(t) - t.Cleanup(func() { store.Close() }) + var ( inflightQueries atomic.Int32 maxInflight atomic.Int32 @@ -2389,7 +2381,7 @@ func TestNewRuleGroupRestoration(t *testing.T) { func TestNewRuleGroupRestorationWithRestoreNewGroupOption(t *testing.T) { t.Parallel() store := teststorage.New(t) - t.Cleanup(func() { store.Close() }) + var ( inflightQueries atomic.Int32 maxInflight atomic.Int32 @@ -2459,7 +2451,6 @@ func TestNewRuleGroupRestorationWithRestoreNewGroupOption(t *testing.T) { func TestBoundedRuleEvalConcurrency(t *testing.T) { storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) var ( inflightQueries atomic.Int32 @@ -2514,7 +2505,6 @@ func TestUpdateWhenStopped(t *testing.T) { func TestGroup_Eval_RaceConditionOnStoppingGroupEvaluationWhileRulesAreEvaluatedConcurrently(t *testing.T) { storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) var ( inflightQueries atomic.Int32 @@ -2733,7 +2723,6 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) ruleManager := NewManager(&ManagerOptions{ Context: context.Background(), @@ -2762,7 +2751,6 @@ func TestRuleDependencyController_AnalyseRules(t *testing.T) { func BenchmarkRuleDependencyController_AnalyseRules(b *testing.B) { storage := teststorage.New(b) - b.Cleanup(func() { storage.Close() }) ruleManager := NewManager(&ManagerOptions{ Context: context.Background(), diff --git a/rules/recording_test.go b/rules/recording_test.go index 29208b6392..3a8bb9c2ff 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -121,7 +121,6 @@ func setUpRuleEvalTest(t testing.TB) *teststorage.TestStorage { func TestRuleEval(t *testing.T) { storage := setUpRuleEvalTest(t) - t.Cleanup(func() { storage.Close() }) ng := testEngine(t) for _, scenario := range ruleEvalTestScenarios { @@ -158,7 +157,6 @@ func BenchmarkRuleEval(b *testing.B) { // TestRuleEvalDuplicate tests for duplicate labels in recorded metrics, see #5529. func TestRuleEvalDuplicate(t *testing.T) { storage := teststorage.New(t) - defer storage.Close() opts := promql.EngineOpts{ Logger: nil, @@ -185,7 +183,6 @@ func TestRecordingRuleLimit(t *testing.T) { metric{label="1"} 1 metric{label="2"} 1 `) - t.Cleanup(func() { storage.Close() }) tests := []struct { limit int diff --git a/scrape/manager.go b/scrape/manager.go index ef226ad507..aafd8c1931 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -39,14 +39,32 @@ import ( "github.com/prometheus/prometheus/util/pool" ) -// NewManager is the Manager constructor using Appendable. -func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), appendable storage.Appendable, registerer prometheus.Registerer) (*Manager, error) { +// NewManager is the Manager constructor using storage.Appendable or storage.AppendableV2. +// +// If unsure which one to use/implement, implement AppendableV2 as it significantly simplifies implementation and allows more +// (passing ST, always-on metadata, exemplars per sample). +// +// NewManager returns error if both appendable and appendableV2 are specified. +// +// Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632). +// storage.Appendable will be removed soon (ETA: Q2 2026). +func NewManager( + o *Options, + logger *slog.Logger, + newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), + appendable storage.Appendable, + appendableV2 storage.AppendableV2, + registerer prometheus.Registerer, +) (*Manager, error) { if o == nil { o = &Options{} } if logger == nil { logger = promslog.NewNopLogger() } + if appendable != nil && appendableV2 != nil { + return nil, errors.New("scrape.NewManager: appendable and appendableV2 cannot be provided at the same time") + } sm, err := newScrapeMetrics(registerer) if err != nil { @@ -55,6 +73,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str m := &Manager{ appendable: appendable, + appendableV2: appendableV2, opts: o, logger: logger, newScrapeFailureLogger: newScrapeFailureLogger, diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 8b289cb7e2..17152e8eb1 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -522,7 +522,7 @@ scrape_configs: ) opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) newLoop := func(scrapeLoopOptions) loop { ch <- struct{}{} @@ -578,7 +578,7 @@ scrape_configs: func TestManagerTargetsUpdates(t *testing.T) { opts := Options{} testRegistry := prometheus.NewRegistry() - m, err := NewManager(&opts, nil, nil, nil, testRegistry) + m, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) ts := make(chan map[string][]*targetgroup.Group) @@ -631,7 +631,7 @@ global: opts := Options{} testRegistry := prometheus.NewRegistry() - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) // Load the first config. @@ -701,7 +701,7 @@ scrape_configs: } opts := Options{} - scrapeManager, err := NewManager(&opts, nil, nil, nil, testRegistry) + scrapeManager, err := NewManager(&opts, nil, nil, nil, nil, testRegistry) require.NoError(t, err) reload(scrapeManager, cfg1) @@ -735,6 +735,8 @@ func setupTestServer(t *testing.T, typ string, toWrite []byte) *httptest.Server } // TestManagerSTZeroIngestion tests scrape manager for various ST cases. +// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's +// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample. func TestManagerSTZeroIngestion(t *testing.T) { t.Parallel() const ( @@ -766,7 +768,7 @@ func TestManagerSTZeroIngestion(t *testing.T) { discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: testSTZeroIngest, skipOffsetting: true, - }, app) + }, app, nil) defer scrapeManager.Stop() server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded) @@ -905,6 +907,8 @@ func generateTestHistogram(i int) *dto.Histogram { return h } +// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's +// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample. func TestManagerSTZeroIngestionHistogram(t *testing.T) { t.Parallel() const mName = "expected_histogram" @@ -950,7 +954,7 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) { discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion, skipOffsetting: true, - }, app) + }, app, nil) defer scrapeManager.Stop() once := sync.Once{} @@ -1030,7 +1034,7 @@ func TestUnregisterMetrics(t *testing.T) { // Check that all metrics can be unregistered, allowing a second manager to be created. for range 2 { opts := Options{} - manager, err := NewManager(&opts, nil, nil, nil, reg) + manager, err := NewManager(&opts, nil, nil, nil, nil, reg) require.NotNil(t, manager) require.NoError(t, err) // Unregister all metrics. @@ -1043,6 +1047,9 @@ func TestUnregisterMetrics(t *testing.T) { // This test addresses issue #17216 by ensuring the previously blocking check has been removed. // The test verifies that the presence of exemplars in the input does not cause errors, // although exemplars are not preserved during NHCB conversion (as documented below). +// +// NOTE(bwplotka): There is no AppenderV2 test for this STZeroIngestion feature as in V2 flow it's +// moved to AppenderV2 implementation (e.g. storage) and it's tested there, e.g. tsdb.TestHeadAppenderV2_Append_EnableSTAsZeroSample. func TestNHCBAndSTZeroIngestion(t *testing.T) { t.Parallel() @@ -1059,7 +1066,7 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) { discoveryManager, scrapeManager := runManagers(t, ctx, &Options{ EnableStartTimestampZeroIngestion: true, skipOffsetting: true, - }, app) + }, app, nil) defer scrapeManager.Stop() once := sync.Once{} @@ -1153,16 +1160,13 @@ func applyConfig( require.NoError(t, discoveryManager.ApplyConfig(c)) } -func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable) (*discovery.Manager, *Manager) { +func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.Appendable, appV2 storage.AppendableV2) (*discovery.Manager, *Manager) { t.Helper() if opts == nil { opts = &Options{} } opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond) - if app == nil { - app = teststorage.NewAppendable() - } reg := prometheus.NewRegistry() sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) @@ -1178,7 +1182,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A opts, nil, nil, - app, + app, appV2, prometheus.NewRegistry(), ) require.NoError(t, err) @@ -1251,7 +1255,7 @@ scrape_configs: - files: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1350,7 +1354,7 @@ scrape_configs: file_sd_configs: - files: ['%s', '%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1409,7 +1413,7 @@ scrape_configs: file_sd_configs: - files: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() applyConfig( @@ -1475,7 +1479,7 @@ scrape_configs: - targets: ['%s'] ` - discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil) + discoveryManager, scrapeManager := runManagers(t, ctx, nil, nil, nil) defer scrapeManager.Stop() // Apply the initial config with an existing file @@ -1559,7 +1563,7 @@ scrape_configs: cfg := loadConfiguration(t, cfgText) - m, err := NewManager(&Options{}, nil, nil, teststorage.NewAppendable(), prometheus.NewRegistry()) + m, err := NewManager(&Options{}, nil, nil, nil, nil, prometheus.NewRegistry()) require.NoError(t, err) defer m.Stop() require.NoError(t, m.ApplyConfig(cfg)) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 9c12a31ab3..f9a0834bd1 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -131,7 +131,6 @@ func testStorageHandlesOutOfOrderTimestamps(t *testing.T, appV2 bool) { // Test with default OutOfOrderTimeWindow (0) t.Run("Out-Of-Order Sample Disabled", func(t *testing.T) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) runScrapeLoopTest(t, appV2, s, false) }) @@ -140,7 +139,6 @@ func testStorageHandlesOutOfOrderTimestamps(t *testing.T, appV2 bool) { s := teststorage.New(t, func(opt *tsdb.Options) { opt.OutOfOrderTimeWindow = 600000 }) - t.Cleanup(func() { _ = s.Close() }) runScrapeLoopTest(t, appV2, s, true) }) @@ -1610,7 +1608,6 @@ func benchScrapeLoopAppend( opt.MaxExemplars = 1e5 } }) - b.Cleanup(func() { _ = s.Close() }) sl, _ := newTestScrapeLoop(b, withAppendable(s, appV2), func(sl *scrapeLoop) { sl.appendMetadataToWAL = appendMetadataToWAL @@ -1697,7 +1694,6 @@ func BenchmarkScrapeLoopScrapeAndReport(b *testing.B) { parsableText := readTextParseTestMetrics(b) s := teststorage.New(b) - b.Cleanup(func() { _ = s.Close() }) sl, scraper := newTestScrapeLoop(b, withAppendable(s, appV2), func(sl *scrapeLoop) { sl.fallbackScrapeProtocol = "application/openmetrics-text" @@ -1730,7 +1726,6 @@ func testSetOptionsHandlingStaleness(t *testing.T, appV2 bool) { s := teststorage.New(t, func(opt *tsdb.Options) { opt.OutOfOrderTimeWindow = 600000 }) - t.Cleanup(func() { _ = s.Close() }) signal := make(chan struct{}, 1) ctx, cancel := context.WithCancel(t.Context()) @@ -2001,7 +1996,6 @@ func TestScrapeLoopCache(t *testing.T) { func testScrapeLoopCache(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) signal := make(chan struct{}, 1) @@ -2071,7 +2065,6 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { func testScrapeLoopCacheMemoryExhaustionProtection(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) signal := make(chan struct{}, 1) @@ -3881,7 +3874,6 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) { func testScrapeLoopRespectTimestamps(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) appTest := teststorage.NewAppendable().Then(s) sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2)) @@ -3910,7 +3902,6 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { func testScrapeLoopDiscardTimestamps(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) appTest := teststorage.NewAppendable().Then(s) sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2), func(sl *scrapeLoop) { @@ -3941,7 +3932,6 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { func testScrapeLoopDiscardDuplicateLabels(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) appTest := teststorage.NewAppendable().Then(s) sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2)) @@ -3983,7 +3973,6 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { func testScrapeLoopDiscardUnnamedMetrics(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) appTest := teststorage.NewAppendable().Then(s) sl, _ := newTestScrapeLoop(t, withAppendable(appTest, appV2), func(sl *scrapeLoop) { @@ -4274,7 +4263,6 @@ func TestScrapeAddFast(t *testing.T) { func testScrapeAddFast(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) sl, _ := newTestScrapeLoop(t, withAppendable(s, appV2)) @@ -4357,7 +4345,6 @@ func TestScrapeReportSingleAppender(t *testing.T) { func testScrapeReportSingleAppender(t *testing.T, appV2 bool) { t.Parallel() s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) signal := make(chan struct{}, 1) @@ -4417,7 +4404,6 @@ func TestScrapeReportLimit(t *testing.T) { func testScrapeReportLimit(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) cfg := &config.ScrapeConfig{ JobName: "test", @@ -4480,7 +4466,6 @@ func TestScrapeUTF8(t *testing.T) { func testScrapeUTF8(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) cfg := &config.ScrapeConfig{ JobName: "test", @@ -4678,7 +4663,6 @@ func TestLeQuantileReLabel(t *testing.T) { func testLeQuantileReLabel(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) cfg := &config.ScrapeConfig{ JobName: "test", @@ -5205,7 +5189,6 @@ metric: < t.Run(fmt.Sprintf("%s with %s", name, metricsTextName), func(t *testing.T) { t.Parallel() s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) sl, _ := newTestScrapeLoop(t, withAppendable(s, appV2), func(sl *scrapeLoop) { sl.alwaysScrapeClassicHist = tc.alwaysScrapeClassicHistograms @@ -5293,7 +5276,6 @@ func TestTypeUnitReLabel(t *testing.T) { func testTypeUnitReLabel(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) cfg := &config.ScrapeConfig{ JobName: "test", @@ -5438,7 +5420,6 @@ func TestScrapeLoopCompression(t *testing.T) { func testScrapeLoopCompression(t *testing.T, appV2 bool) { s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) metricsText := makeTestGauges(10) @@ -5768,17 +5749,12 @@ scrape_configs: `, minBucketFactor, strings.ReplaceAll(metricsServer.URL, "http://", "")) s := teststorage.New(t) - t.Cleanup(func() { _ = s.Close() }) reg := prometheus.NewRegistry() - mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, s, reg) + sa := selectAppendable(s, appV2) + mng, err := NewManager(&Options{DiscoveryReloadInterval: model.Duration(10 * time.Millisecond)}, nil, nil, sa.V1(), sa.V2(), reg) require.NoError(t, err) - if appV2 { - mng.appendableV2 = s - mng.appendable = nil - } - cfg, err := config.Load(configStr, promslog.NewNopLogger()) require.NoError(t, err) require.NoError(t, mng.ApplyConfig(cfg)) @@ -6464,7 +6440,6 @@ func testNewScrapeLoopHonorLabelsWiring(t *testing.T, appV2 bool) { require.NoError(t, err) s := teststorage.New(t) - defer s.Close() cfg := &config.ScrapeConfig{ JobName: "test", diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 25f61341cd..948934d041 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -39,7 +39,6 @@ func TestFanout_SelectSorted(t *testing.T) { ctx := context.Background() priStorage := teststorage.New(t) - defer priStorage.Close() app1 := priStorage.Appender(ctx) app1.Append(0, inputLabel, 0, 0) inputTotalSize++ @@ -51,7 +50,6 @@ func TestFanout_SelectSorted(t *testing.T) { require.NoError(t, err) remoteStorage1 := teststorage.New(t) - defer remoteStorage1.Close() app2 := remoteStorage1.Appender(ctx) app2.Append(0, inputLabel, 3000, 3) inputTotalSize++ @@ -63,7 +61,6 @@ func TestFanout_SelectSorted(t *testing.T) { require.NoError(t, err) remoteStorage2 := teststorage.New(t) - defer remoteStorage2.Close() app3 := remoteStorage2.Appender(ctx) app3.Append(0, inputLabel, 6000, 6) @@ -142,7 +139,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) { inputTotalSize := 0 priStorage := teststorage.New(t) - defer priStorage.Close() app1 := priStorage.AppenderV2(t.Context()) _, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{}) require.NoError(t, err) @@ -156,7 +152,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) { require.NoError(t, app1.Commit()) remoteStorage1 := teststorage.New(t) - defer remoteStorage1.Close() app2 := remoteStorage1.AppenderV2(t.Context()) _, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{}) require.NoError(t, err) @@ -170,8 +165,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) { require.NoError(t, app2.Commit()) remoteStorage2 := teststorage.New(t) - defer remoteStorage2.Close() - app3 := remoteStorage2.AppenderV2(t.Context()) _, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{}) require.NoError(t, err) @@ -246,7 +239,6 @@ func TestFanout_SelectSorted_AppenderV2(t *testing.T) { func TestFanoutErrors(t *testing.T) { workingStorage := teststorage.New(t) - defer workingStorage.Close() cases := []struct { primary storage.Storage diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index 91f6ba81cc..20401c16fe 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -4111,10 +4111,18 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { // Make sure counter resets hints are non-zero, so we can detect ST histogram samples. testHistogram := tsdbutil.GenerateTestHistogram(1) testHistogram.CounterResetHint = histogram.NotCounterReset + testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1) testFloatHistogram.CounterResetHint = histogram.NotCounterReset + + testNHCB := tsdbutil.GenerateTestCustomBucketsHistogram(1) + testNHCB.CounterResetHint = histogram.NotCounterReset + + testFloatNHCB := tsdbutil.GenerateTestCustomBucketsFloatHistogram(1) + testFloatNHCB.CounterResetHint = histogram.NotCounterReset + // TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the - // following two zero histograms should be histogram.CounterReset. + // following zero histograms should be histogram.CounterReset. testZeroHistogram := &histogram.Histogram{ Schema: testHistogram.Schema, ZeroThreshold: testHistogram.ZeroThreshold, @@ -4131,6 +4139,19 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { PositiveBuckets: []float64{0, 0, 0, 0}, NegativeBuckets: []float64{0, 0, 0, 0}, } + testZeroNHCB := &histogram.Histogram{ + Schema: testNHCB.Schema, + PositiveSpans: testNHCB.PositiveSpans, + PositiveBuckets: []int64{0, 0, 0, 0}, + CustomValues: testNHCB.CustomValues, + } + testZeroFloatNHCB := &histogram.FloatHistogram{ + Schema: testFloatNHCB.Schema, + PositiveSpans: testFloatNHCB.PositiveSpans, + PositiveBuckets: []float64{0, 0, 0, 0}, + CustomValues: testFloatNHCB.CustomValues, + } + type appendableSamples struct { ts int64 fSample float64 @@ -4183,6 +4204,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "In order ct+normal sample/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 101, h: testNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testNHCB}, + } + }(), + }, + { + name: "In order ct+normal sample/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 101, fh: testFloatNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testFloatNHCB}, + } + }(), + }, { name: "Consecutive appends with same st ignore st/floatSample", appendableSamples: []appendableSamples{ @@ -4223,6 +4272,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "Consecutive appends with same st ignore st/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 101, h: testNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testNHCB}, + } + }(), + }, + { + name: "Consecutive appends with same st ignore st/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 101, fh: testFloatNHCB, st: 1}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testFloatNHCB}, + } + }(), + }, { name: "Consecutive appends with newer st do not ignore st/floatSample", appendableSamples: []appendableSamples{ @@ -4262,6 +4339,32 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { sample{t: 102, fh: testFloatHistogram}, }, }, + { + name: "Consecutive appends with newer st do not ignore st/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 102, h: testNHCB, st: 101}, + }, + expectedSamples: []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testZeroNHCB}, + sample{t: 102, h: testNHCB}, + }, + }, + { + name: "Consecutive appends with newer st do not ignore st/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 102, fh: testFloatNHCB, st: 101}, + }, + expectedSamples: []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testZeroFloatNHCB}, + sample{t: 102, fh: testFloatNHCB}, + }, + }, { name: "ST equals to previous sample timestamp is ignored/floatSample", appendableSamples: []appendableSamples{ @@ -4302,6 +4405,34 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "ST equals to previous sample timestamp is ignored/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: 1}, + {ts: 101, h: testNHCB, st: 100}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, h: testZeroNHCB}, + sample{t: 100, h: testNHCB}, + sample{t: 101, h: testNHCB}, + } + }(), + }, + { + name: "ST equals to previous sample timestamp is ignored/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: 1}, + {ts: 101, fh: testFloatNHCB, st: 100}, + }, + expectedSamples: func() []chunks.Sample { + return []chunks.Sample{ + sample{t: 1, fh: testZeroFloatNHCB}, + sample{t: 100, fh: testFloatNHCB}, + sample{t: 101, fh: testFloatNHCB}, + } + }(), + }, { name: "ST lower than minValidTime/float", appendableSamples: []appendableSamples{ @@ -4349,6 +4480,40 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "ST lower than minValidTime/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB, st: -1}, + }, + // ST results ErrOutOfBounds, but ST append is best effort, so + // ST should be ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, h: firstSample}, + } + }(), + }, + { + name: "ST lower than minValidTime/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB, st: -1}, + }, + // ST results ErrOutOfBounds, but ST append is best effort, so + // ST should be ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testFloatNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, fh: firstSample}, + } + }(), + }, { name: "ST duplicates an existing sample/float", appendableSamples: []appendableSamples{ @@ -4402,6 +4567,44 @@ func TestHeadAppenderV2_Append_EnableSTAsZeroSample(t *testing.T) { } }(), }, + { + name: "ST duplicates an existing sample/NHCB", + appendableSamples: []appendableSamples{ + {ts: 100, h: testNHCB}, + {ts: 200, h: testNHCB, st: 100}, + }, + // ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so + // ST should be ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, h: firstSample}, + sample{t: 200, h: testNHCB}, + } + }(), + }, + { + name: "ST duplicates an existing sample/floatNHCB", + appendableSamples: []appendableSamples{ + {ts: 100, fh: testFloatNHCB}, + {ts: 200, fh: testFloatNHCB, st: 100}, + }, + // ST results ErrDuplicateSampleForTimestamp, but ST append is best effort, so + // ST should ignored, but sample appended. + expectedSamples: func() []chunks.Sample { + // NOTE: Without ST, on query, first histogram sample will get + // CounterReset adjusted to 0. + firstSample := testFloatNHCB.Copy() + firstSample.CounterResetHint = histogram.UnknownCounterReset + return []chunks.Sample{ + sample{t: 100, fh: firstSample}, + sample{t: 200, fh: testFloatNHCB}, + } + }(), + }, } { t.Run(tc.name, func(t *testing.T) { opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index dd83ff8763..055bf3ff22 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -32,14 +32,22 @@ type Option func(opt *tsdb.Options) // New returns a new TestStorage for testing purposes // that removes all associated files on closing. +// +// Caller does not need to close the TestStorage after use, it's deferred via t.Cleanup. func New(t testing.TB, o ...Option) *TestStorage { s, err := NewWithError(o...) require.NoError(t, err) + + t.Cleanup(func() { + _ = s.Close() // Ignore errors, as it could be a double close. + }) return s } // NewWithError returns a new TestStorage for user facing tests, which reports // errors directly. +// +// It's a caller responsibility to close the TestStorage after use. func NewWithError(o ...Option) (*TestStorage, error) { // Tests just load data for a series sequentially. Thus we // need a long appendable window. diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 39c1fa6080..87fe756544 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -324,7 +324,6 @@ func (m *rulesRetrieverMock) CreateRuleGroups() { m.CreateAlertingRules() arules := m.AlertingRules() storage := teststorage.New(m.testing) - defer storage.Close() engineOpts := promql.EngineOpts{ Logger: nil, @@ -414,7 +413,6 @@ func TestEndpoints(t *testing.T) { test_metric5{"host.name"="localhost"} 1+0x100 test_metric5{"junk\n{},=: chars"="bar"} 1+0x100 `) - t.Cleanup(func() { storage.Close() }) start := time.Unix(0, 0) exemplars := []exemplar.QueryResult{ @@ -575,7 +573,7 @@ func TestGetSeries(t *testing.T) { test_metric2{foo="boo", xyz="qwerty"} 1+0x100 test_metric2{foo="baz", abc="qwerty"} 1+0x100 `) - t.Cleanup(func() { storage.Close() }) + api := &API{ Queryable: storage, } @@ -682,7 +680,6 @@ func TestQueryExemplars(t *testing.T) { test_metric4{foo="boo", dup="1"} 1+0x100 test_metric4{foo="boo"} 1+0x100 `) - t.Cleanup(func() { storage.Close() }) api := &API{ Queryable: storage, @@ -798,7 +795,7 @@ func TestLabelNames(t *testing.T) { test_metric2{foo="boo", xyz="qwerty"} 1+0x100 test_metric2{foo="baz", abc="qwerty"} 1+0x100 `) - t.Cleanup(func() { storage.Close() }) + api := &API{ Queryable: storage, } @@ -901,7 +898,6 @@ func (testStats) Builtin() (_ stats.BuiltinStats) { func TestStats(t *testing.T) { storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) api := &API{ Queryable: storage, diff --git a/web/federate_test.go b/web/federate_test.go index 932639e2e6..8e0a15d57b 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -212,7 +212,6 @@ func TestFederation(t *testing.T) { test_metric_stale 1+10x99 stale test_metric_old 1+10x98 `) - t.Cleanup(func() { storage.Close() }) h := &Handler{ localStorage: &dbAdapter{storage.DB}, @@ -303,7 +302,6 @@ func normalizeBody(body *bytes.Buffer) string { func TestFederationWithNativeHistograms(t *testing.T) { storage := teststorage.New(t) - t.Cleanup(func() { storage.Close() }) var expVec promql.Vector