mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
fix: fix OOO appendFloat; reuse logic between v1 and v2
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
9c7561fb6a
commit
09cb9bbd77
2 changed files with 6 additions and 30 deletions
|
|
@ -451,23 +451,7 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
|||
// we do not need to check for the difference between "unknown
|
||||
// series" and "known series with stNone".
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
if err == nil {
|
||||
if isOOO && a.hints != nil && a.hints.DiscardOutOfOrder {
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
s.pendingCommit = true
|
||||
}
|
||||
if delta > 0 {
|
||||
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
|
||||
}
|
||||
if err != nil {
|
||||
if err := a.appendFloat(s, t, v, a.hints != nil && a.hints.DiscardOutOfOrder); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, storage.ErrOutOfOrderSample):
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
|
||||
|
|
@ -483,14 +467,6 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
|||
if t > a.maxt {
|
||||
a.maxt = t
|
||||
}
|
||||
|
||||
b := a.getCurrentBatch(stFloat, s.ref)
|
||||
b.floats = append(b.floats, record.RefSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
V: v,
|
||||
})
|
||||
b.floatSeries = append(b.floatSeries, s)
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -232,16 +232,16 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
|
|||
return storage.SeriesRef(s.ref), partialErr
|
||||
}
|
||||
|
||||
func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error {
|
||||
func (a *headAppenderBase) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error {
|
||||
s.Lock()
|
||||
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
|
||||
// to skip that sample from the WAL and write only in the WBL.
|
||||
isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
if isOOO && fastRejectOOO {
|
||||
s.Unlock()
|
||||
return storage.ErrOutOfOrderSample
|
||||
}
|
||||
if err == nil {
|
||||
if isOOO && fastRejectOOO {
|
||||
s.Unlock()
|
||||
return storage.ErrOutOfOrderSample
|
||||
}
|
||||
s.pendingCommit = true
|
||||
}
|
||||
s.Unlock()
|
||||
|
|
|
|||
Loading…
Reference in a new issue