mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
tsdb: Clear stale series from the Head during WAL replay
Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
parent
43e69388df
commit
43dc23afe7
1 changed files with 31 additions and 4 deletions
|
|
@ -308,7 +308,21 @@ Outer:
|
|||
}
|
||||
h.wlReplaySamplesPool.Put(v)
|
||||
case []tombstones.Stone:
|
||||
// Tombstone records will be fairly rare, so not trying to optimise the allocations here.
|
||||
deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency)
|
||||
for _, s := range v {
|
||||
if len(s.Intervals) == 1 && s.Intervals[0].Mint == math.MinInt64 && s.Intervals[0].Maxt == math.MaxInt64 {
|
||||
// This series was fully deleted at this point. This record is only done for stale series at the moment.
|
||||
mod := uint64(s.Ref) % uint64(concurrency)
|
||||
deleteSeriesShards[mod] = append(deleteSeriesShards[mod], chunks.HeadSeriesRef(s.Ref))
|
||||
|
||||
// If the series is with a different reference, try deleting that.
|
||||
if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok {
|
||||
mod := uint64(r) % uint64(concurrency)
|
||||
deleteSeriesShards[mod] = append(deleteSeriesShards[mod], r)
|
||||
}
|
||||
continue
|
||||
}
|
||||
for _, itv := range s.Intervals {
|
||||
if itv.Maxt < h.minValidTime.Load() {
|
||||
continue
|
||||
|
|
@ -326,6 +340,14 @@ Outer:
|
|||
h.tombstones.AddInterval(s.Ref, itv)
|
||||
}
|
||||
}
|
||||
|
||||
for i := range concurrency {
|
||||
if len(deleteSeriesShards[i]) > 0 {
|
||||
processors[i].input <- walSubsetProcessorInputItem{deletedSeriesRefs: deleteSeriesShards[i]}
|
||||
deleteSeriesShards[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
h.wlReplaytStonesPool.Put(v)
|
||||
case []record.RefExemplar:
|
||||
for _, e := range v {
|
||||
|
|
@ -558,10 +580,11 @@ type walSubsetProcessor struct {
|
|||
}
|
||||
|
||||
type walSubsetProcessorInputItem struct {
|
||||
samples []record.RefSample
|
||||
histogramSamples []histogramRecord
|
||||
existingSeries *memSeries
|
||||
walSeriesRef chunks.HeadSeriesRef
|
||||
samples []record.RefSample
|
||||
histogramSamples []histogramRecord
|
||||
existingSeries *memSeries
|
||||
walSeriesRef chunks.HeadSeriesRef
|
||||
deletedSeriesRefs []chunks.HeadSeriesRef
|
||||
}
|
||||
|
||||
func (wp *walSubsetProcessor) setup() {
|
||||
|
|
@ -712,6 +735,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
|||
case wp.histogramsOutput <- in.histogramSamples:
|
||||
default:
|
||||
}
|
||||
|
||||
if len(in.deletedSeriesRefs) > 0 {
|
||||
h.deleteSeriesByID(in.deletedSeriesRefs)
|
||||
}
|
||||
}
|
||||
h.updateMinMaxTime(mint, maxt)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue