mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
tsdb: Add StaleHead and GC for stale series in the Head block
Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
parent
bec70227f1
commit
9b444b57af
3 changed files with 355 additions and 10 deletions
256
tsdb/head.go
256
tsdb/head.go
|
|
@ -1203,6 +1203,36 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
|||
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
||||
}
|
||||
|
||||
// truncateStaleSeries removes the provided series as long as they are still stale.
|
||||
func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) error {
|
||||
h.chunkSnapshotMtx.Lock()
|
||||
defer h.chunkSnapshotMtx.Unlock()
|
||||
|
||||
if h.MinTime() >= maxt {
|
||||
return nil
|
||||
}
|
||||
|
||||
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
|
||||
|
||||
deleted := h.gcStaleSeries(seriesRefs, maxt)
|
||||
|
||||
// Record these stale series refs in the WAL so that we can ignore them during replay.
|
||||
if h.wal != nil {
|
||||
stones := make([]tombstones.Stone, 0, len(seriesRefs))
|
||||
for ref := range deleted {
|
||||
stones = append(stones, tombstones.Stone{
|
||||
Ref: ref,
|
||||
Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}},
|
||||
})
|
||||
}
|
||||
var enc record.Encoder
|
||||
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
||||
// The query timeout limits the max wait time of this function implicitly.
|
||||
// The mint is inclusive and maxt is the truncation time hence exclusive.
|
||||
|
|
@ -1556,6 +1586,53 @@ func (h *RangeHead) String() string {
|
|||
return fmt.Sprintf("range head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
||||
}
|
||||
|
||||
// StaleHead allows querying the stale series in the Head via an IndexReader, ChunkReader and tombstones.Reader.
|
||||
// Used only for compactions.
|
||||
type StaleHead struct {
|
||||
RangeHead
|
||||
staleSeriesRefs []storage.SeriesRef
|
||||
}
|
||||
|
||||
// NewStaleHead returns a *StaleHead.
|
||||
func NewStaleHead(head *Head, mint, maxt int64, staleSeriesRefs []storage.SeriesRef) *StaleHead {
|
||||
return &StaleHead{
|
||||
RangeHead: RangeHead{
|
||||
head: head,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
},
|
||||
staleSeriesRefs: staleSeriesRefs,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *StaleHead) Index() (_ IndexReader, err error) {
|
||||
return h.head.staleIndex(h.mint, h.maxt, h.staleSeriesRefs)
|
||||
}
|
||||
|
||||
func (h *StaleHead) NumSeries() uint64 {
|
||||
return h.head.NumStaleSeries()
|
||||
}
|
||||
|
||||
var staleHeadULID = ulid.MustParse("0000000000XXXXXXXSTALEHEAD")
|
||||
|
||||
func (h *StaleHead) Meta() BlockMeta {
|
||||
return BlockMeta{
|
||||
MinTime: h.MinTime(),
|
||||
MaxTime: h.MaxTime(),
|
||||
ULID: staleHeadULID,
|
||||
Stats: BlockStats{
|
||||
NumSeries: h.NumSeries(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// String returns an human readable representation of the stake head. It's important to
|
||||
// keep this function in order to avoid the struct dump when the head is stringified in
|
||||
// errors or logs.
|
||||
func (h *StaleHead) String() string {
|
||||
return fmt.Sprintf("stale head (mint: %d, maxt: %d)", h.MinTime(), h.MaxTime())
|
||||
}
|
||||
|
||||
// Delete all samples in the range of [mint, maxt] for series that satisfy the given
|
||||
// label matchers.
|
||||
func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
|
||||
|
|
@ -1625,13 +1702,14 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
|||
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries)
|
||||
deleted, affected, chunksRemoved, staleSeriesDeleted, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
|
||||
seriesRemoved := len(deleted)
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||
h.numSeries.Sub(uint64(seriesRemoved))
|
||||
h.numStaleSeries.Sub(uint64(staleSeriesDeleted))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted, affected)
|
||||
|
|
@ -1948,13 +2026,14 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
|
|||
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
|
||||
// and there's no easy way to cast maps.
|
||||
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
|
||||
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _, _ int, _, _ int64, minMmapFile int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
staleSeriesDeleted = 0
|
||||
actualMint int64 = math.MaxInt64
|
||||
minOOOTime int64 = math.MaxInt64
|
||||
)
|
||||
minMmapFile = math.MaxInt32
|
||||
|
||||
|
|
@ -2009,7 +2088,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n
|
|||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
numStaleSeries.Dec()
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
|
|
@ -2025,7 +2104,166 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, n
|
|||
actualMint = mint
|
||||
}
|
||||
|
||||
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
|
||||
return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile
|
||||
}
|
||||
|
||||
// gcStaleSeries removes all the provided series as long as they are still stale
|
||||
// and the series maxt is <= the given max.
|
||||
// The returned references are the series that got deleted.
|
||||
func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) map[storage.SeriesRef]struct{} {
|
||||
// Drop old chunks and remember series IDs and hashes if they can be
|
||||
// deleted entirely.
|
||||
deleted, affected, chunksRemoved := h.series.gcStaleSeries(seriesRefs, maxt)
|
||||
seriesRemoved := len(deleted)
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||
h.numSeries.Sub(uint64(seriesRemoved))
|
||||
h.numStaleSeries.Sub(uint64(seriesRemoved))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted, affected)
|
||||
|
||||
// Remove tombstones referring to the deleted series.
|
||||
h.tombstones.DeleteTombstones(deleted)
|
||||
|
||||
if h.wal != nil {
|
||||
_, last, _ := wlog.Segments(h.wal.Dir())
|
||||
h.walExpiriesMtx.Lock()
|
||||
// Keep series records until we're past segment 'last'
|
||||
// because the WAL will still have samples records with
|
||||
// this ref ID. If we didn't keep these series records then
|
||||
// on start up when we replay the WAL, or any other code
|
||||
// that reads the WAL, wouldn't be able to use those
|
||||
// samples since we would have no labels for that ref ID.
|
||||
for ref := range deleted {
|
||||
h.walExpiries[chunks.HeadSeriesRef(ref)] = int64(last)
|
||||
}
|
||||
h.walExpiriesMtx.Unlock()
|
||||
}
|
||||
|
||||
return deleted
|
||||
}
|
||||
|
||||
// deleteSeriesByID deletes the series with the given reference.
|
||||
// Only used for WAL replay.
|
||||
func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
staleSeriesDeleted = 0
|
||||
chunksRemoved = 0
|
||||
)
|
||||
|
||||
for _, ref := range refs {
|
||||
refShard := int(ref) & (h.series.size - 1)
|
||||
h.series.locks[refShard].Lock()
|
||||
|
||||
// Copying getByID here to avoid locking and unlocking twice.
|
||||
series := h.series.series[refShard][ref]
|
||||
if series == nil {
|
||||
h.series.locks[refShard].Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||
staleSeriesDeleted++
|
||||
}
|
||||
|
||||
hash := series.lset.Hash()
|
||||
hashShard := int(hash) & (h.series.size - 1)
|
||||
|
||||
chunksRemoved += len(series.mmappedChunks)
|
||||
if series.headChunks != nil {
|
||||
chunksRemoved += series.headChunks.len()
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||
h.series.hashes[hashShard].del(hash, series.ref)
|
||||
delete(h.series.series[refShard], series.ref)
|
||||
|
||||
h.series.locks[refShard].Unlock()
|
||||
}
|
||||
|
||||
h.metrics.seriesRemoved.Add(float64(len(deleted)))
|
||||
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||
h.numSeries.Sub(uint64(len(deleted)))
|
||||
h.numStaleSeries.Sub(uint64(staleSeriesDeleted))
|
||||
|
||||
// Remove deleted series IDs from the postings lists.
|
||||
h.postings.Delete(deleted, affected)
|
||||
|
||||
// Remove tombstones referring to the deleted series.
|
||||
h.tombstones.DeleteTombstones(deleted)
|
||||
}
|
||||
|
||||
// gcStaleSeries removes all the stale series provided that they are still stale
|
||||
// and the series maxt is <= the given max.
|
||||
func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
|
||||
var (
|
||||
deleted = map[storage.SeriesRef]struct{}{}
|
||||
affected = map[labels.Label]struct{}{}
|
||||
rmChunks = 0
|
||||
)
|
||||
|
||||
staleSeriesMap := map[storage.SeriesRef]struct{}{}
|
||||
for _, ref := range seriesRefs {
|
||||
staleSeriesMap[ref] = struct{}{}
|
||||
}
|
||||
|
||||
check := func(hashShard int, hash uint64, series *memSeries, deletedForCallback map[chunks.HeadSeriesRef]labels.Labels) {
|
||||
if _, exists := staleSeriesMap[storage.SeriesRef(series.ref)]; !exists {
|
||||
// This series was not compacted. Skip it.
|
||||
return
|
||||
}
|
||||
|
||||
series.Lock()
|
||||
defer series.Unlock()
|
||||
|
||||
if series.maxTime() > maxt {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if the series is still stale.
|
||||
isStale := value.IsStaleNaN(series.lastValue) ||
|
||||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum))
|
||||
|
||||
if !isStale {
|
||||
return
|
||||
}
|
||||
|
||||
if series.headChunks != nil {
|
||||
rmChunks += series.headChunks.len()
|
||||
}
|
||||
rmChunks += len(series.mmappedChunks)
|
||||
|
||||
// The series is gone entirely. We need to keep the series lock
|
||||
// and make sure we have acquired the stripe locks for hash and ID of the
|
||||
// series alike.
|
||||
// If we don't hold them all, there's a very small chance that a series receives
|
||||
// samples again while we are half-way into deleting it.
|
||||
refShard := int(series.ref) & (s.size - 1)
|
||||
if hashShard != refShard {
|
||||
s.locks[refShard].Lock()
|
||||
defer s.locks[refShard].Unlock()
|
||||
}
|
||||
|
||||
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||
s.hashes[hashShard].del(hash, series.ref)
|
||||
delete(s.series[refShard], series.ref)
|
||||
deletedForCallback[series.ref] = series.lset // OK to access lset; series is locked at the top of this function.
|
||||
}
|
||||
|
||||
s.iterForDeletion(check)
|
||||
|
||||
return deleted, affected, rmChunks
|
||||
}
|
||||
|
||||
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/value"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
|
|
@ -201,6 +202,112 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchB
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *Head) staleIndex(mint, maxt int64, staleSeriesRefs []storage.SeriesRef) (*headStaleIndexReader, error) {
|
||||
return &headStaleIndexReader{
|
||||
headIndexReader: h.indexRange(mint, maxt),
|
||||
staleSeriesRefs: staleSeriesRefs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// headStaleIndexReader gives the stale series that have no out-of-order data.
|
||||
// This is only used for stale series compaction at the moment, that will only ask for all
|
||||
// the series during compaction. So to make that efficient, this index reader requires the
|
||||
// pre-calculated list of stale series refs that can be returned without re-reading the Head.
|
||||
type headStaleIndexReader struct {
|
||||
*headIndexReader
|
||||
staleSeriesRefs []storage.SeriesRef
|
||||
}
|
||||
|
||||
func (h *headStaleIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error) {
|
||||
// If all postings are requested, return the precalculated list.
|
||||
k, v := index.AllPostingsKey()
|
||||
if len(h.staleSeriesRefs) > 0 && name == k && len(values) == 1 && values[0] == v {
|
||||
return index.NewListPostings(h.staleSeriesRefs), nil
|
||||
}
|
||||
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.Postings(ctx, name, values...))
|
||||
if err != nil {
|
||||
return index.ErrPostings(err), err
|
||||
}
|
||||
return index.NewListPostings(seriesRefs), nil
|
||||
}
|
||||
|
||||
func (h *headStaleIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings {
|
||||
// Unused for compaction, so we don't need to optimise.
|
||||
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForLabelMatching(ctx, name, match))
|
||||
if err != nil {
|
||||
return index.ErrPostings(err)
|
||||
}
|
||||
return index.NewListPostings(seriesRefs)
|
||||
}
|
||||
|
||||
func (h *headStaleIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
|
||||
// Unused for compaction, so we don't need to optimise.
|
||||
seriesRefs, err := h.head.filterStaleSeriesAndSortPostings(h.head.postings.PostingsForAllLabelValues(ctx, name))
|
||||
if err != nil {
|
||||
return index.ErrPostings(err)
|
||||
}
|
||||
return index.NewListPostings(seriesRefs)
|
||||
}
|
||||
|
||||
// filterStaleSeriesAndSortPostings returns the stale series references from the given postings
|
||||
// that also do not have any out-of-order data.
|
||||
func (h *Head) filterStaleSeriesAndSortPostings(p index.Postings) ([]storage.SeriesRef, error) {
|
||||
series := make([]*memSeries, 0, 1024)
|
||||
|
||||
notFoundSeriesCount := 0
|
||||
for p.Next() {
|
||||
s := h.series.getByID(chunks.HeadSeriesRef(p.At()))
|
||||
if s == nil {
|
||||
notFoundSeriesCount++
|
||||
continue
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
if s.ooo != nil {
|
||||
// Has out-of-order data; skip it because we cannot determine if a series
|
||||
// is stale when it's getting out-of-order data.
|
||||
s.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if value.IsStaleNaN(s.lastValue) ||
|
||||
(s.lastHistogramValue != nil && value.IsStaleNaN(s.lastHistogramValue.Sum)) ||
|
||||
(s.lastFloatHistogramValue != nil && value.IsStaleNaN(s.lastFloatHistogramValue.Sum)) {
|
||||
series = append(series, s)
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
||||
if notFoundSeriesCount > 0 {
|
||||
h.logger.Debug("Looked up stale series not found", "count", notFoundSeriesCount)
|
||||
}
|
||||
if err := p.Err(); err != nil {
|
||||
return nil, fmt.Errorf("expand postings: %w", err)
|
||||
}
|
||||
|
||||
slices.SortFunc(series, func(a, b *memSeries) int {
|
||||
return labels.Compare(a.labels(), b.labels())
|
||||
})
|
||||
|
||||
refs := make([]storage.SeriesRef, 0, len(series))
|
||||
for _, p := range series {
|
||||
refs = append(refs, storage.SeriesRef(p.ref))
|
||||
}
|
||||
return refs, nil
|
||||
}
|
||||
|
||||
// SortedPostings returns the postings as it is because we expect any postings obtained via
|
||||
// headStaleIndexReader to be already sorted.
|
||||
func (*headStaleIndexReader) SortedPostings(p index.Postings) index.Postings {
|
||||
// All the postings function above already give the sorted list of postings.
|
||||
return p
|
||||
}
|
||||
|
||||
// SortedStaleSeriesRefsNoOOOData returns all the series refs of the stale series that do not have any out-of-order data.
|
||||
func (h *Head) SortedStaleSeriesRefsNoOOOData(ctx context.Context) ([]storage.SeriesRef, error) {
|
||||
k, v := index.AllPostingsKey()
|
||||
return h.filterStaleSeriesAndSortPostings(h.postings.Postings(ctx, k, v))
|
||||
}
|
||||
|
||||
func appendSeriesChunks(s *memSeries, mint, maxt int64, chks []chunks.Meta) []chunks.Meta {
|
||||
for i, c := range s.mmappedChunks {
|
||||
// Do not expose chunks that are outside of the specified range.
|
||||
|
|
|
|||
|
|
@ -6519,7 +6519,7 @@ func TestStripeSeries_gc(t *testing.T) {
|
|||
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
|
||||
hash := ms1.lset.Hash()
|
||||
|
||||
s.gc(0, 0, nil)
|
||||
s.gc(0, 0)
|
||||
|
||||
// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
|
||||
got := s.getByHash(hash, ms1.lset)
|
||||
|
|
|
|||
Loading…
Reference in a new issue