tests(teststorage, api_test): Fix leaking readers; kill fake exemplar storage (#17906)

* tests(teststorage): Fix leaking readers; use TSDB exemplar storage instead of fake

Signed-off-by: bwplotka <bwplotka@gmail.com>

* switched to v1 exemplar flow for now

Signed-off-by: bwplotka <bwplotka@gmail.com>

---------

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2026-01-27 10:07:56 +00:00 committed by GitHub
parent d25c8476e7
commit aa0f00efdf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 60 additions and 80 deletions

View file

@ -19,12 +19,8 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
)
@ -57,6 +53,10 @@ func NewWithError(o ...Option) (*TestStorage, error) {
opts.RetentionDuration = 0
opts.OutOfOrderTimeWindow = 0
// Enable exemplars storage by default.
opts.EnableExemplarStorage = true
opts.MaxExemplars = 1e5
for _, opt := range o {
opt(opts)
}
@ -70,20 +70,12 @@ func NewWithError(o ...Option) (*TestStorage, error) {
if err != nil {
return nil, fmt.Errorf("opening test storage: %w", err)
}
reg := prometheus.NewRegistry()
eMetrics := tsdb.NewExemplarMetrics(reg)
es, err := tsdb.NewCircularExemplarStorage(10, eMetrics, opts.OutOfOrderTimeWindow)
if err != nil {
return nil, fmt.Errorf("opening test exemplar storage: %w", err)
}
return &TestStorage{DB: db, exemplarStorage: es, dir: dir}, nil
return &TestStorage{DB: db, dir: dir}, nil
}
type TestStorage struct {
*tsdb.DB
exemplarStorage tsdb.ExemplarStorage
dir string
dir string
}
func (s TestStorage) Close() error {
@ -92,15 +84,3 @@ func (s TestStorage) Close() error {
}
return os.RemoveAll(s.dir)
}
func (s TestStorage) ExemplarAppender() storage.ExemplarAppender {
return s
}
func (s TestStorage) ExemplarQueryable() storage.ExemplarQueryable {
return s.exemplarStorage
}
func (s TestStorage) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return ref, s.exemplarStorage.AddExemplar(l, e)
}

View file

@ -187,15 +187,12 @@ func (testTargetRetriever) ScrapePoolConfig(_ string) (*config.ScrapeConfig, err
func (t *testTargetRetriever) SetMetadataStoreForTargets(identifier string, metadata scrape.MetricMetadataStore) error {
targets, ok := t.activeTargets[identifier]
if !ok {
return errors.New("targets not found")
return fmt.Errorf("no active target for %v", identifier)
}
for _, at := range targets {
at.SetMetadataStore(metadata)
}
return nil
}
@ -323,7 +320,8 @@ func (m *rulesRetrieverMock) CreateAlertingRules() {
func (m *rulesRetrieverMock) CreateRuleGroups() {
m.CreateAlertingRules()
arules := m.AlertingRules()
storage := teststorage.New(m.testing)
// Create separate storage for recordings to not pollute the main one.
s := teststorage.New(m.testing)
engineOpts := promql.EngineOpts{
Logger: nil,
@ -333,8 +331,8 @@ func (m *rulesRetrieverMock) CreateRuleGroups() {
}
engine := promqltest.NewTestEngineWithOpts(m.testing, engineOpts)
opts := &rules.ManagerOptions{
QueryFunc: rules.EngineQueryFunc(engine, storage),
Appendable: storage,
QueryFunc: rules.EngineQueryFunc(engine, s),
Appendable: s,
Context: context.Background(),
Logger: promslog.NewNopLogger(),
NotifyFunc: func(context.Context, string, ...*rules.Alert) {},
@ -399,8 +397,23 @@ var sampleFlagMap = map[string]string{
"flag2": "value2",
}
func appendExemplars(t testing.TB, s storage.Storage, ex []exemplar.QueryResult) {
t.Helper()
// TODO(bwplotka): Use AppenderV2.AppendExemplar per series flow
// once its implemented: https://github.com/prometheus/prometheus/issues/17632#issuecomment-3759315095
app := s.Appender(t.Context())
for _, ed := range ex {
for _, e := range ed.Exemplars {
_, err := app.AppendExemplar(0, ed.SeriesLabels, e)
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
}
func TestEndpoints(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
test_metric1{foo="boo"} 1+0x100
@ -414,6 +427,7 @@ func TestEndpoints(t *testing.T) {
test_metric5{"junk\n{},=: chars"="bar"} 1+0x100
`)
// Add exemplar testdata here, given promqltest does not support exemplars.
start := time.Unix(0, 0)
exemplars := []exemplar.QueryResult{
{
@ -457,15 +471,10 @@ func TestEndpoints(t *testing.T) {
},
},
}
for _, ed := range exemplars {
_, err := storage.AppendExemplar(0, ed.SeriesLabels, ed.Exemplars[0])
require.NoError(t, err, "failed to add exemplar: %+v", ed.Exemplars[0])
}
appendExemplars(t, s, exemplars)
now := time.Now()
ng := testEngine(t)
t.Run("local", func(t *testing.T) {
algr := rulesRetrieverMock{testing: t}
@ -478,9 +487,9 @@ func TestEndpoints(t *testing.T) {
testTargetRetriever := setupTestTargetRetriever(t)
api := &API{
Queryable: storage,
Queryable: s,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
flagsMap: sampleFlagMap,
@ -489,14 +498,14 @@ func TestEndpoints(t *testing.T) {
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
rulesRetriever: algr.toFactory(),
}
testEndpoints(t, api, testTargetRetriever, storage, true)
testEndpoints(t, api, testTargetRetriever, true)
})
// Run all the API tests against an API that is wired to forward queries via
// the remote read client to a test server, which in turn sends them to the
// data from the test storage.
t.Run("remote", func(t *testing.T) {
server := setupRemote(storage)
server := setupRemote(s)
defer server.Close()
u, err := url.Parse(server.URL)
@ -518,6 +527,7 @@ func TestEndpoints(t *testing.T) {
remote := remote.NewStorage(promslog.New(&promslogConfig), prometheus.DefaultRegisterer, func() (int64, error) {
return 0, nil
}, dbDir, 1*time.Second, nil, false)
t.Cleanup(func() { _ = remote.Close() })
err = remote.ApplyConfig(&config.Config{
RemoteReadConfigs: []*config.RemoteReadConfig{
@ -543,7 +553,7 @@ func TestEndpoints(t *testing.T) {
api := &API{
Queryable: remote,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
flagsMap: sampleFlagMap,
@ -552,7 +562,7 @@ func TestEndpoints(t *testing.T) {
ready: func(f http.HandlerFunc) http.HandlerFunc { return f },
rulesRetriever: algr.toFactory(),
}
testEndpoints(t, api, testTargetRetriever, storage, false)
testEndpoints(t, api, testTargetRetriever, false)
})
}
@ -565,7 +575,7 @@ func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 }
func TestGetSeries(t *testing.T) {
// TestEndpoints doesn't have enough label names to test api.labelNames
// endpoint properly. Hence we test it separately.
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo1="bar", baz="abc"} 0+100x100
test_metric1{foo2="boo"} 1+0x100
@ -575,7 +585,7 @@ func TestGetSeries(t *testing.T) {
`)
api := &API{
Queryable: storage,
Queryable: s,
}
request := func(method string, matchers ...string) (*http.Request, error) {
u, err := url.Parse("http://example.com")
@ -669,7 +679,7 @@ func TestGetSeries(t *testing.T) {
func TestQueryExemplars(t *testing.T) {
start := time.Unix(0, 0)
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
test_metric1{foo="boo"} 1+0x100
@ -682,9 +692,9 @@ func TestQueryExemplars(t *testing.T) {
`)
api := &API{
Queryable: storage,
Queryable: s,
QueryEngine: testEngine(t),
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
}
request := func(method string, qs url.Values) (*http.Request, error) {
@ -762,15 +772,10 @@ func TestQueryExemplars(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
es := storage
es := s
ctx := context.Background()
for _, te := range tc.exemplars {
for _, e := range te.Exemplars {
_, err := es.AppendExemplar(0, te.SeriesLabels, e)
require.NoError(t, err)
}
}
appendExemplars(t, es, tc.exemplars)
req, err := request(http.MethodGet, tc.query)
require.NoError(t, err)
@ -787,7 +792,7 @@ func TestQueryExemplars(t *testing.T) {
func TestLabelNames(t *testing.T) {
// TestEndpoints doesn't have enough label names to test api.labelNames
// endpoint properly. Hence we test it separately.
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo1="bar", baz="abc"} 0+100x100
test_metric1{foo2="boo"} 1+0x100
@ -797,7 +802,7 @@ func TestLabelNames(t *testing.T) {
`)
api := &API{
Queryable: storage,
Queryable: s,
}
request := func(method, limit string, matchers ...string) (*http.Request, error) {
u, err := url.Parse("http://example.com")
@ -897,10 +902,10 @@ func (testStats) Builtin() (_ stats.BuiltinStats) {
}
func TestStats(t *testing.T) {
storage := teststorage.New(t)
s := teststorage.New(t)
api := &API{
Queryable: storage,
Queryable: s,
QueryEngine: testEngine(t),
now: func() time.Time {
return time.Unix(123, 0)
@ -1115,7 +1120,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
return httptest.NewServer(handler)
}
func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.ExemplarStorage, testLabelAPI bool) {
func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI bool) {
start := time.Unix(0, 0)
type targetMetadata struct {
@ -1135,7 +1140,6 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
errType errorType
sorter func(any)
metadata []targetMetadata
exemplars []exemplar.QueryResult
zeroFunc func(any)
}
@ -2043,8 +2047,8 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
},
sorter: func(m any) {
sort.Slice(m.([]metricMetadata), func(i, j int) bool {
s := m.([]metricMetadata)
return s[i].MetricFamily < s[j].MetricFamily
mm := m.([]metricMetadata)
return mm[i].MetricFamily < mm[j].MetricFamily
})
},
},
@ -3758,17 +3762,16 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, es storage.E
tr.ResetMetadataStore()
for _, tm := range test.metadata {
tr.SetMetadataStoreForTargets(tm.identifier, &testMetaStore{Metadata: tm.metadata})
}
for _, te := range test.exemplars {
for _, e := range te.Exemplars {
_, err := es.AppendExemplar(0, te.SeriesLabels, e)
require.NoError(t, err)
}
// TODO: Check error and fixed broken test/bug.
// TestEndpoints/local/run_60_metricMetadata_"limit=1&limit_per_metric=1"/GET fails if we check the error.
_ = tr.SetMetadataStoreForTargets(tm.identifier, &testMetaStore{Metadata: tm.metadata})
}
res := test.endpoint(req.WithContext(ctx))
if res.finalizer != nil {
// Finalizers were added to ensure closed readers on API panics, ensure they are closed here too.
res.finalizer()
}
assertAPIError(t, res.err, test.errType)
if test.sorter != nil {
@ -4766,13 +4769,10 @@ func TestExtractQueryOpts(t *testing.T) {
// Test query timeout parameter.
func TestQueryTimeout(t *testing.T) {
storage := promqltest.LoadedStorage(t, `
s := promqltest.LoadedStorage(t, `
load 1m
test_metric1{foo="bar"} 0+100x100
`)
t.Cleanup(func() {
_ = storage.Close()
})
now := time.Now()
@ -4792,9 +4792,9 @@ func TestQueryTimeout(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
engine := &fakeEngine{}
api := &API{
Queryable: storage,
Queryable: s,
QueryEngine: engine,
ExemplarQueryable: storage.ExemplarQueryable(),
ExemplarQueryable: s,
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
flagsMap: sampleFlagMap,
now: func() time.Time { return now },