diff --git a/storage/series.go b/storage/series.go index ebc5a16c07..bf6df7db3e 100644 --- a/storage/series.go +++ b/storage/series.go @@ -447,7 +447,17 @@ func (e errChunksIterator) Err() error { return e.err } // ExpandSamples iterates over all samples in the iterator, buffering all in slice. // Optionally it takes samples constructor, useful when you want to compare sample slices with different // sample implementations. if nil, sample type from this package will be used. +// For float sample, NaN values are replaced with -42. func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { + return expandSamples(iter, true, newSampleFn) +} + +// ExpandSamplesWithoutReplacingNaNs is same as ExpandSamples but it does not replace float sample NaN values with anything. +func ExpandSamplesWithoutReplacingNaNs(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { + return expandSamples(iter, false, newSampleFn) +} + +func expandSamples(iter chunkenc.Iterator, replaceNaN bool, newSampleFn func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample) ([]chunks.Sample, error) { if newSampleFn == nil { newSampleFn = func(st, t int64, f float64, h *histogram.Histogram, fh *histogram.FloatHistogram) chunks.Sample { switch { @@ -470,7 +480,7 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(st, t int64, f float t, f := iter.At() st := iter.AtST() // NaNs can't be compared normally, so substitute for another value. - if math.IsNaN(f) { + if replaceNaN && math.IsNaN(f) { f = -42 } result = append(result, newSampleFn(st, t, f, nil, nil)) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 5e57982b5d..2dbcb11645 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -52,6 +52,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/remote" @@ -145,6 +146,16 @@ func TestDBClose_AfterClose(t *testing.T) { // query runs a matcher query against the querier and fully expands its data. func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { + return queryHelper(t, q, true, matchers...) +} + +// queryWithoutReplacingNaNs runs a matcher query against the querier and fully expands its data. +func queryWithoutReplacingNaNs(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { + return queryHelper(t, q, false, matchers...) +} + +// queryHelper runs a matcher query against the querier and fully expands its data. +func queryHelper(t testing.TB, q storage.Querier, withNaNReplacement bool, matchers ...*labels.Matcher) map[string][]chunks.Sample { ss := q.Select(context.Background(), false, nil, matchers...) defer func() { require.NoError(t, q.Close()) @@ -156,7 +167,13 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str series := ss.At() it = series.Iterator(it) - samples, err := storage.ExpandSamples(it, newSample) + var samples []chunks.Sample + var err error + if withNaNReplacement { + samples, err = storage.ExpandSamples(it, newSample) + } else { + samples, err = storage.ExpandSamplesWithoutReplacingNaNs(it, newSample) + } require.NoError(t, err) require.NoError(t, it.Err()) @@ -2610,7 +2627,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { db.DisableCompactions() app := db.Appender(ctx) maxt = 1000 - for i := 0; i < maxt; i++ { + for i := range maxt { _, err := app.Append(0, labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0) require.NoError(t, err) } @@ -9323,3 +9340,224 @@ func TestBlockReloadInterval(t *testing.T) { }) } } + +func TestStaleSeriesCompaction(t *testing.T) { + opts := DefaultOptions() + opts.MinBlockDuration = 1000 + opts.MaxBlockDuration = 1000 + db := newTestDB(t, withOpts(opts)) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + var ( + nonStaleSeries, staleSeries, + nonStaleHist, staleHist, + nonStaleFHist, staleFHist, + staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary []labels.Labels + numSeriesPerCategory = 1 + ) + for i := range numSeriesPerCategory { + nonStaleSeries = append(nonStaleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 1000+i))) + nonStaleHist = append(nonStaleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 2000+i))) + nonStaleFHist = append(nonStaleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 3000+i))) + + staleSeries = append(staleSeries, labels.FromStrings("name", fmt.Sprintf("series%d", 4000+i))) + staleHist = append(staleHist, labels.FromStrings("name", fmt.Sprintf("series%d", 5000+i))) + staleFHist = append(staleFHist, labels.FromStrings("name", fmt.Sprintf("series%d", 6000+i))) + + staleSeriesCrossingBoundary = append(staleSeriesCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 7000+i))) + staleHistCrossingBoundary = append(staleHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 8000+i))) + staleFHistCrossingBoundary = append(staleFHistCrossingBoundary, labels.FromStrings("name", fmt.Sprintf("series%d", 9000+i))) + } + + var ( + v = 10.0 + staleV = math.Float64frombits(value.StaleNaN) + h = tsdbutil.GenerateTestHistograms(1)[0] + fh = tsdbutil.GenerateTestFloatHistograms(1)[0] + staleH = &histogram.Histogram{Sum: staleV} + staleFH = &histogram.FloatHistogram{Sum: staleV} + ) + + addNormalSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) { + app := db.Appender(context.Background()) + for i := range len(floatSeries) { + _, err := app.Append(0, floatSeries[i], ts, v) + require.NoError(t, err) + _, err = app.AppendHistogram(0, histSeries[i], ts, h, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, fh) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + addStaleSamples := func(ts int64, floatSeries, histSeries, floatHistSeries []labels.Labels) { + app := db.Appender(context.Background()) + for i := range len(floatSeries) { + _, err := app.Append(0, floatSeries[i], ts, staleV) + require.NoError(t, err) + _, err = app.AppendHistogram(0, histSeries[i], ts, staleH, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, floatHistSeries[i], ts, nil, staleFH) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + } + + // Normal sample for all. + addNormalSamples(100, nonStaleSeries, nonStaleHist, nonStaleFHist) + addNormalSamples(100, staleSeries, staleHist, staleFHist) + + // Stale sample for the stale series. Normal sample for the non-stale series. + addNormalSamples(200, nonStaleSeries, nonStaleHist, nonStaleFHist) + addStaleSamples(200, staleSeries, staleHist, staleFHist) + + // Normal samples for the non-stale series later + addNormalSamples(300, nonStaleSeries, nonStaleHist, nonStaleFHist) + + require.Equal(t, uint64(6*numSeriesPerCategory), db.Head().NumSeries()) + require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumStaleSeries()) + + // Series crossing block boundary and gets stale. + addNormalSamples(300, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + addNormalSamples(700, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + addNormalSamples(1100, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + addStaleSamples(1200, staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary) + + require.NoError(t, db.CompactStaleHead()) + + require.Equal(t, uint64(3*numSeriesPerCategory), db.Head().NumSeries()) + require.Equal(t, uint64(0), db.Head().NumStaleSeries()) + + require.Len(t, db.Blocks(), 2) + m := db.Blocks()[0].Meta() + require.Equal(t, int64(0), m.MinTime) + require.Equal(t, int64(1000), m.MaxTime) + require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta") + m = db.Blocks()[1].Meta() + require.Equal(t, int64(1000), m.MinTime) + require.Equal(t, int64(2000), m.MaxTime) + require.Truef(t, m.Compaction.FromStaleSeries(), "stale series info not found in block meta") + + // To make sure that Head is not truncated based on stale series block. + require.NoError(t, db.reload()) + + nonFirstH := h.Copy() + nonFirstH.CounterResetHint = histogram.NotCounterReset + nonFirstFH := fh.Copy() + nonFirstFH.CounterResetHint = histogram.NotCounterReset + + // Verify head block. + verifyHeadBlock := func() { + require.Equal(t, uint64(3), db.head.NumSeries()) + require.Equal(t, uint64(0), db.head.NumStaleSeries()) + + expHeadQuery := make(map[string][]chunks.Sample) + for i := range numSeriesPerCategory { + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleSeries[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, f: v}, sample{t: 200, f: v}, sample{t: 300, f: v}, + } + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, h: h}, sample{t: 200, h: nonFirstH}, sample{t: 300, h: nonFirstH}, + } + expHeadQuery[fmt.Sprintf(`{name="%s"}`, nonStaleFHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, fh: fh}, sample{t: 200, fh: nonFirstFH}, sample{t: 300, fh: nonFirstFH}, + } + } + + querier, err := NewBlockQuerier(NewRangeHead(db.head, 0, 300), 0, 300) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + require.Equal(t, expHeadQuery, seriesSet) + } + + verifyHeadBlock() + + // Verify blocks from stale series. + { + expBlockQuery := make(map[string][]chunks.Sample) + for i := range numSeriesPerCategory { + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeries[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, f: v}, sample{t: 200, f: staleV}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, h: h}, sample{t: 200, h: staleH}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHist[i].Get("name"))] = []chunks.Sample{ + sample{t: 100, fh: fh}, sample{t: 200, fh: staleFH}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleSeriesCrossingBoundary[i].Get("name"))] = []chunks.Sample{ + sample{t: 300, f: v}, sample{t: 700, f: v}, sample{t: 1100, f: v}, sample{t: 1200, f: staleV}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{ + sample{t: 300, h: h}, sample{t: 700, h: nonFirstH}, sample{t: 1100, h: h}, sample{t: 1200, h: staleH}, + } + expBlockQuery[fmt.Sprintf(`{name="%s"}`, staleFHistCrossingBoundary[i].Get("name"))] = []chunks.Sample{ + sample{t: 300, fh: fh}, sample{t: 700, fh: nonFirstFH}, sample{t: 1100, fh: fh}, sample{t: 1200, fh: staleFH}, + } + } + + querier, err := NewBlockQuerier(db.Blocks()[0], 0, 1000) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + + querier, err = NewBlockQuerier(db.Blocks()[1], 1000, 2000) + require.NoError(t, err) + t.Cleanup(func() { + querier.Close() + }) + seriesSet2 := queryWithoutReplacingNaNs(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "name", "series.*")) + for k, v := range seriesSet2 { + seriesSet[k] = append(seriesSet[k], v...) + } + + require.Len(t, seriesSet, len(expBlockQuery)) + + // Compare all the samples except the stale value that needs special handling. + for _, category := range [][]labels.Labels{ + staleSeries, staleHist, staleFHist, + staleSeriesCrossingBoundary, staleHistCrossingBoundary, staleFHistCrossingBoundary, + } { + for i := range numSeriesPerCategory { + seriesKey := fmt.Sprintf(`{name="%s"}`, category[i].Get("name")) + samples := expBlockQuery[seriesKey] + actSamples, exists := seriesSet[seriesKey] + require.Truef(t, exists, "series not found in result %s", seriesKey) + require.Len(t, actSamples, len(samples)) + + for i := range len(samples) - 1 { + require.Equal(t, samples[i], actSamples[i]) + } + + l := len(samples) - 1 + require.Equal(t, samples[l].T(), actSamples[l].T()) + switch { + case value.IsStaleNaN(samples[l].F()): + require.True(t, value.IsStaleNaN(actSamples[l].F())) + case samples[l].H() != nil: + require.True(t, value.IsStaleNaN(actSamples[l].H().Sum)) + default: + require.True(t, value.IsStaleNaN(actSamples[l].FH().Sum)) + } + } + } + } + + { + // Restart DB and verify that stale series were discarded from WAL replay. + require.NoError(t, db.Close()) + var err error + db, err = Open(db.Dir(), db.logger, db.registerer, db.opts, nil) + require.NoError(t, err) + + verifyHeadBlock() + } +}