tsdb: Add unit tests for stale series compaction

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2026-01-23 18:07:34 -08:00
parent 43dc23afe7
commit 4f3de8da29
2 changed files with 251 additions and 3 deletions

View file

@ -447,7 +447,17 @@ func (e errChunksIterator) Err() error { return e.err }
// ExpandSamples iterates over all samples in the iterator, buffering all in slice.
// Optionally it takes samples constructor, useful when you want to compare sample slices with different
// sample implementations. if nil, sample type from this package will be used.
// For float sample, NaN values are replaced with -42.
func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
return expandSamples(iter, true, newSampleFn)
}
// ExpandSamplesWithoutReplacingNaNs is same as ExpandSamples but it does not replace float sample NaN values with anything.
func ExpandSamplesWithoutReplacingNaNs(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
return expandSamples(iter, false, newSampleFn)
}
func expandSamples(iter chunkenc.Iterator, replaceNaN bool, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) {
if newSampleFn == nil {
newSampleFn = func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample {
switch {
@ -470,7 +480,7 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float
t, f := iter.At()
st := iter.AtST()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(f) {
if replaceNaN && math.IsNaN(f) {
f = -42
}
result = append(result, newSampleFn(st, t, f, nil, nil))

View file

@ -52,6 +52,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
@ -145,6 +146,16 @@ func TestDBClose_AfterClose(t *testing.T) {
// query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
return queryHelper(t, q, true, matchers...)
}
// queryWithoutReplacingNaNs runs a matcher query against the querier and fully expands its data.
func queryWithoutReplacingNaNs(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
return queryHelper(t, q, false, matchers...)
}
// queryHelper runs a matcher query against the querier and fully expands its data.
func queryHelper(t testing.TB, q storage.Querier, withNaNReplacement bool, matchers ...*labels.Matcher) map[string][]chunks.Sample {
ss := q.Select(context.Background(), false, nil, matchers...)
defer func() {
require.NoError(t, q.Close())
@ -156,7 +167,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str
series := ss.At()
it = series.Iterator(it)
samples, err := storage.ExpandSamples(it, newSample)
var samples []chunks.Sample
var err error
if withNaNReplacement {
samples, err = storage.ExpandSamples(it, newSample)
} else {
samples, err = storage.ExpandSamplesWithoutReplacingNaNs(it, newSample)
}
require.NoError(t, err)
require.NoError(t, it.Err())
@ -2610,7 +2627,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
db.DisableCompactions()
app := db.Appender(ctx)
maxt = 1000
for i := 0; i < maxt; i++ {
for i := range maxt {
_, err := app.Append(0, labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
require.NoError(t, err)
}
@ -9323,3 +9340,224 @@ func TestBlockReloadInterval(t *testing.T) {
})
}
}
func TestStaleSeriesCompaction(t *testing.T) {
opts := DefaultOptions()
opts.MinBlockDuration = 1000
opts.MaxBlockDuration = 1000
db := newTestDB(t, withOpts(opts))
db.DisableCompactions()
t.Cleanup(func() {
require.NoError(t, db.Close())
})
var (
nonStaleSeries, staleSeries,
nonStaleHist, staleHist,
nonStaleFHist, staleFHist,
staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary []labels.Labels
numSeriesPerCategory = 1
)
for i := range numSeriesPerCategory {
nonStaleSeries = append(nonStaleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 1000+i)))
nonStaleHist = append(nonStaleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 2000+i)))
nonStaleFHist = append(nonStaleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 3000+i)))
staleSeries = append(staleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 4000+i)))
staleHist = append(staleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 5000+i)))
staleFHist = append(staleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 6000+i)))
staleSeriesCrossingBoundary = append(staleSeriesCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 7000+i)))
staleHistCrossingBoundary = append(staleHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 8000+i)))
staleFHistCrossingBoundary = append(staleFHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 9000+i)))
}
var (
v = 10.0
staleV = math.Float64frombits(value.StaleNaN)
h = tsdbutil.GenerateTestHistograms(1)[0]
fh = tsdbutil.GenerateTestFloatHistograms(1)[0]
staleH = &histogram.Histogram{Sum: staleV}
staleFH = &histogram.FloatHistogram{Sum: staleV}
)
addNormalSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
app := db.Appender(context.Background())
for i := range len(floatSeries) {
_, err := app.Append(0, floatSeries[i], ts, v)
require.NoError(t, err)
_, err = app.AppendHistogram(0, histSeries[i], ts, h, nil)
require.NoError(t, err)
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, fh)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
addStaleSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) {
app := db.Appender(context.Background())
for i := range len(floatSeries) {
_, err := app.Append(0, floatSeries[i], ts, staleV)
require.NoError(t, err)
_, err = app.AppendHistogram(0, histSeries[i], ts, staleH, nil)
require.NoError(t, err)
_, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, staleFH)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
}
// Normal sample for all.
addNormalSamples(100, nonStaleSeries, nonStaleHist, nonStaleFHist)
addNormalSamples(100, staleSeries, staleHist, staleFHist)
// Stale sample for the stale series. Normal sample for the non-stale series.
addNormalSamples(200, nonStaleSeries, nonStaleHist, nonStaleFHist)
addStaleSamples(200, staleSeries, staleHist, staleFHist)
// Normal samples for the non-stale series later
addNormalSamples(300, nonStaleSeries, nonStaleHist, nonStaleFHist)
require.Equal(t, uint64(6*numSeriesPerCategory), db.Head().NumSeries())
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumStaleSeries())
// Series crossing block boundary and gets stale.
addNormalSamples(300, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
addNormalSamples(700, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
addNormalSamples(1100, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
addStaleSamples(1200, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary)
require.NoError(t, db.CompactStaleHead())
require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumSeries())
require.Equal(t, uint64(0), db.Head().NumStaleSeries())
require.Len(t, db.Blocks(), 2)
m := db.Blocks()[0].Meta()
require.Equal(t, int64(0), m.MinTime)
require.Equal(t, int64(1000), m.MaxTime)
require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta")
m = db.Blocks()[1].Meta()
require.Equal(t, int64(1000), m.MinTime)
require.Equal(t, int64(2000), m.MaxTime)
require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta")
// To make sure that Head is not truncated based on stale series block.
require.NoError(t, db.reload())
nonFirstH := h.Copy()
nonFirstH.CounterResetHint = histogram.NotCounterReset
nonFirstFH := fh.Copy()
nonFirstFH.CounterResetHint = histogram.NotCounterReset
// Verify head block.
verifyHeadBlock := func() {
require.Equal(t, uint64(3), db.head.NumSeries())
require.Equal(t, uint64(0), db.head.NumStaleSeries())
expHeadQuery := make(map[string][]chunks.Sample)
for i := range numSeriesPerCategory {
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleSeries[i].Get("name"))] = []chunks.Sample{
sample{t: 100, f: v}, sample{t: 200, f: v}, sample{t: 300, f: v},
}
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, h: h}, sample{t: 200, h: nonFirstH}, sample{t: 300, h: nonFirstH},
}
expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleFHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, fh: fh}, sample{t: 200, fh: nonFirstFH}, sample{t: 300, fh: nonFirstFH},
}
}
querier, err := NewBlockQuerier(NewRangeHead(db.head, 0, 300), 0, 300)
require.NoError(t, err)
t.Cleanup(func() {
querier.Close()
})
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
require.Equal(t, expHeadQuery, seriesSet)
}
verifyHeadBlock()
// Verify blocks from stale series.
{
expBlockQuery := make(map[string][]chunks.Sample)
for i := range numSeriesPerCategory {
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))] = []chunks.Sample{
sample{t: 100, f: v}, sample{t: 200, f: staleV},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, h: h}, sample{t: 200, h: staleH},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))] = []chunks.Sample{
sample{t: 100, fh: fh}, sample{t: 200, fh: staleFH},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeriesCrossingBoundary[i].Get("name"))] = []chunks.Sample{
sample{t: 300, f: v}, sample{t: 700, f: v}, sample{t: 1100, f: v}, sample{t: 1200, f: staleV},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{
sample{t: 300, h: h}, sample{t: 700, h: nonFirstH}, sample{t: 1100, h: h}, sample{t: 1200, h: staleH},
}
expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{
sample{t: 300, fh: fh}, sample{t: 700, fh: nonFirstFH}, sample{t: 1100, fh: fh}, sample{t: 1200, fh: staleFH},
}
}
querier, err := NewBlockQuerier(db.Blocks()[0], 0, 1000)
require.NoError(t, err)
t.Cleanup(func() {
querier.Close()
})
seriesSet := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
querier, err = NewBlockQuerier(db.Blocks()[1], 1000, 2000)
require.NoError(t, err)
t.Cleanup(func() {
querier.Close()
})
seriesSet2 := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*"))
for k, v := range seriesSet2 {
seriesSet[k] = append(seriesSet[k], v...)
}
require.Len(t, seriesSet, len(expBlockQuery))
// Compare all the samples except the stale value that needs special handling.
for _, category := range [][]labels.Labels{
staleSeries, staleHist, staleFHist,
staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary,
} {
for i := range numSeriesPerCategory {
seriesKey := fmt.Sprintf(`{name="%s"}`, category[i].Get("name"))
samples := expBlockQuery[seriesKey]
actSamples, exists := seriesSet[seriesKey]
require.Truef(t, exists, "series not found in result %s", seriesKey)
require.Len(t, actSamples, len(samples))
for i := range len(samples) - 1 {
require.Equal(t, samples[i], actSamples[i])
}
l := len(samples) - 1
require.Equal(t, samples[l].T(), actSamples[l].T())
switch {
case value.IsStaleNaN(samples[l].F()):
require.True(t, value.IsStaleNaN(actSamples[l].F()))
case samples[l].H() != nil:
require.True(t, value.IsStaleNaN(actSamples[l].H().Sum))
default:
require.True(t, value.IsStaleNaN(actSamples[l].FH().Sum))
}
}
}
}
{
// Restart DB and verify that stale series were discarded from WAL replay.
require.NoError(t, db.Close())
var err error
db, err = Open(db.Dir(), db.logger, db.registerer, db.opts, nil)
require.NoError(t, err)
verifyHeadBlock()
}
}