Merge pull request #17677 from prometheus/bwplotka/a2-agent

refactor(tsdb/agent)[PART3]: add AppenderV2 support to agent
This commit is contained in:
Bartlomiej Plotka 2025-12-17 14:27:51 +00:00 committed by GitHub
commit 1c0537dc02
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1552 additions and 183 deletions

View file

@ -265,6 +265,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "created-timestamp-zero-ingestion":
c.scrape.EnableStartTimestampZeroIngestion = true
c.web.STZeroIngestionEnabled = true
c.agent.EnableSTAsZeroSample = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
@ -1417,6 +1418,7 @@ func main() {
"MinWALTime", cfg.agent.MinWALTime,
"MaxWALTime", cfg.agent.MaxWALTime,
"OutOfOrderTimeWindow", cfg.agent.OutOfOrderTimeWindow,
"EnableSTAsZeroSample", cfg.agent.EnableSTAsZeroSample,
)
localStorage.Set(db, 0)
@ -1957,7 +1959,8 @@ type agentOptions struct {
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
OutOfOrderTimeWindow int64
OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove.
EnableSTAsZeroSample bool
}
func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options {
@ -1973,6 +1976,7 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
OutOfOrderTimeWindow: outOfOrderTimeWindow,
EnableSTAsZeroSample: opts.EnableSTAsZeroSample,
}
}

View file

@ -274,8 +274,8 @@ type AppendOptions struct {
//
// Operations on the Appender interface are not goroutine-safe.
//
// The order of samples appended via the Appender is preserved within each
// series. I.e. samples are not reordered per timestamp, or by float/histogram
// The order of samples appended via the Appender is preserved within each series.
// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram
// type.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).

View file

@ -103,8 +103,8 @@ var _ error = &AppendPartialError{}
//
// Operations on the AppenderV2 interface are not goroutine-safe.
//
// The order of samples appended via the AppenderV2 is preserved within each
// series. I.e. samples are not reordered per timestamp, or by float/histogram
// The order of samples appended via the AppenderV2 is preserved within each series.
// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram
// type.
type AppenderV2 interface {
AppenderTransaction

View file

@ -84,6 +84,15 @@ type Options struct {
// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
OutOfOrderTimeWindow int64
// EnableSTAsZeroSample represents 'created-timestamp-zero-ingestion' feature flag.
// If true, ST, if non-empty and earlier than sample timestamp, will be stored
// as a zero sample before the actual sample.
//
// The zero sample is best-effort, only debug log on failure is emitted.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableSTAsZeroSample bool
}
// DefaultOptions used for the WAL storage. They are reasonable for setups using
@ -233,8 +242,9 @@ type DB struct {
wal *wlog.WL
locker *tsdbutil.DirLocker
appenderPool sync.Pool
bufPool sync.Pool
appenderPool sync.Pool
appenderV2Pool sync.Pool
bufPool sync.Pool
// These pools are only used during WAL replay and are reset at the end.
// NOTE: Adjust resetWALReplayResources() upon changes to the pools.
@ -303,12 +313,26 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str
db.appenderPool.New = func() any {
return &appender{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
appenderBase: appenderBase{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
},
}
}
db.appenderV2Pool.New = func() any {
return &appenderV2{
appenderBase: appenderBase{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
},
}
}
@ -777,9 +801,8 @@ func (db *DB) Close() error {
return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err()
}
type appender struct {
type appenderBase struct {
*DB
hints *storage.AppendOptions
pendingSeries []record.RefSeries
pendingSamples []record.RefSample
@ -800,6 +823,12 @@ type appender struct {
floatHistogramSeries []*memSeries
}
type appender struct {
appenderBase
hints *storage.AppendOptions
}
func (a *appender) SetOptions(opts *storage.AppendOptions) {
a.hints = opts
}
@ -810,26 +839,10 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
series := a.series.GetByID(headRef)
if series == nil {
// Ensure no empty or duplicate labels have gotten through. This mirrors the
// equivalent validation code in the TSDB's headAppender.
l = l.WithoutEmpty()
if l.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
}
var created bool
series, created = a.getOrCreate(l)
if created {
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: series.ref,
Labels: l,
})
a.metrics.numActiveSeries.Inc()
var err error
series, err = a.getOrCreate(l)
if err != nil {
return 0, err
}
}
@ -853,18 +866,35 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
return storage.SeriesRef(series.ref), nil
}
func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) {
func (a *appenderBase) getOrCreate(l labels.Labels) (series *memSeries, err error) {
// Ensure no empty or duplicate labels have gotten through. This mirrors the
// equivalent validation code in the TSDB's headAppender.
l = l.WithoutEmpty()
if l.IsEmpty() {
return nil, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return nil, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
}
hash := l.Hash()
series = a.series.GetByHash(hash, l)
if series != nil {
return series, false
return series, nil
}
ref := chunks.HeadSeriesRef(a.nextRef.Inc())
series = &memSeries{ref: ref, lset: l, lastTs: math.MinInt64}
a.series.Set(hash, series)
return series, true
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: series.ref,
Labels: l,
})
a.metrics.numActiveSeries.Inc()
return series, nil
}
func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
@ -879,47 +909,53 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar)
}
// Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
labelSetLen := 0
err := e.Labels.Validate(func(l labels.Label) error {
labelSetLen += utf8.RuneCountInString(l.Name)
labelSetLen += utf8.RuneCountInString(l.Value)
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
return storage.ErrExemplarLabelLength
if err := a.validateExemplar(s.ref, e); err != nil {
if errors.Is(err, storage.ErrDuplicateExemplar) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
return nil
})
if err != nil {
return 0, err
}
// Check for duplicate vs last stored exemplar for this series, and discard those.
// Otherwise, record the current exemplar as the latest.
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
prevExemplar := a.series.GetLatestExemplar(s.ref)
if prevExemplar != nil && prevExemplar.Equals(e) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
a.series.SetLatestExemplar(s.ref, &e)
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
Ref: s.ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})
a.metrics.totalAppendedExemplars.Inc()
return storage.SeriesRef(s.ref), nil
}
func (a *appenderBase) validateExemplar(ref chunks.HeadSeriesRef, e exemplar.Exemplar) error {
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
return fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar)
}
// Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
labelSetLen := 0
if err := e.Labels.Validate(func(l labels.Label) error {
labelSetLen += utf8.RuneCountInString(l.Name)
labelSetLen += utf8.RuneCountInString(l.Value)
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
return storage.ErrExemplarLabelLength
}
return nil
}); err != nil {
return err
}
// Check for duplicate vs last stored exemplar for this series, and discard those.
// Otherwise, record the current exemplar as the latest.
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
prevExemplar := a.series.GetLatestExemplar(ref)
if prevExemplar != nil && prevExemplar.Equals(e) {
return storage.ErrDuplicateExemplar
}
return nil
}
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if err := h.Validate(); err != nil {
@ -938,26 +974,10 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
series := a.series.GetByID(headRef)
if series == nil {
// Ensure no empty or duplicate labels have gotten through. This mirrors the
// equivalent validation code in the TSDB's headAppender.
l = l.WithoutEmpty()
if l.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
}
var created bool
series, created = a.getOrCreate(l)
if created {
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: series.ref,
Labels: l,
})
a.metrics.numActiveSeries.Inc()
var err error
series, err = a.getOrCreate(l)
if err != nil {
return 0, err
}
}
@ -1014,24 +1034,10 @@ func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.L
series := a.series.GetByID(chunks.HeadSeriesRef(ref))
if series == nil {
// Ensure no empty labels have gotten through.
l = l.WithoutEmpty()
if l.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
}
var created bool
series, created = a.getOrCreate(l)
if created {
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: series.ref,
Labels: l,
})
a.metrics.numActiveSeries.Inc()
var err error
series, err = a.getOrCreate(l)
if err != nil {
return 0, err
}
}
@ -1046,6 +1052,9 @@ func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.L
// discard the sample if it's out of order.
return 0, storage.ErrOutOfOrderST
}
// NOTE(bwplotka): This is a bug, as we "commit" pending sample TS as the WAL last TS. It was likely done
// to satisfy incorrect TestDBStartTimestampSamplesIngestion test. We are leaving it as-is given the planned removal
// of AppenderV1 as per https://github.com/prometheus/prometheus/issues/17632.
series.lastTs = st
switch {
@ -1077,25 +1086,11 @@ func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
series := a.series.GetByID(chunks.HeadSeriesRef(ref))
if series == nil {
l = l.WithoutEmpty()
if l.IsEmpty() {
return 0, fmt.Errorf("empty labelset: %w", tsdb.ErrInvalidSample)
var err error
series, err = a.getOrCreate(l)
if err != nil {
return 0, err
}
if lbl, dup := l.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidSample)
}
newSeries, created := a.getOrCreate(l)
if created {
a.pendingSeries = append(a.pendingSeries, record.RefSeries{
Ref: newSeries.ref,
Labels: l,
})
a.metrics.numActiveSeries.Inc()
}
series = newSeries
}
series.Lock()
@ -1110,6 +1105,9 @@ func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
// discard the sample if it's out of order.
return 0, storage.ErrOutOfOrderST
}
// NOTE(bwplotka): This is a bug, as we "commit" pending sample TS as the WAL last TS. It was likely done
// to satisfy incorrect TestDBStartTimestampSamplesIngestion test. We are leaving it as-is given the planned removal
// of AppenderV1 as per https://github.com/prometheus/prometheus/issues/17632.
series.lastTs = st
// NOTE: always modify pendingSamples and sampleSeries together.
@ -1126,7 +1124,7 @@ func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
}
// Commit submits the collected samples and purges the batch.
func (a *appender) Commit() error {
func (a *appenderBase) Commit() error {
if err := a.log(); err != nil {
return err
}
@ -1141,7 +1139,7 @@ func (a *appender) Commit() error {
}
// log logs all pending data to the WAL.
func (a *appender) log() error {
func (a *appenderBase) log() error {
a.mtx.RLock()
defer a.mtx.RUnlock()
@ -1235,7 +1233,7 @@ func (a *appender) log() error {
}
// clearData clears all pending data.
func (a *appender) clearData() {
func (a *appenderBase) clearData() {
a.pendingSeries = a.pendingSeries[:0]
a.pendingSamples = a.pendingSamples[:0]
a.pendingHistograms = a.pendingHistograms[:0]
@ -1246,7 +1244,7 @@ func (a *appender) clearData() {
a.floatHistogramSeries = a.floatHistogramSeries[:0]
}
func (a *appender) Rollback() error {
func (a *appenderBase) Rollback() error {
// Series are created in-memory regardless of rollback. This means we must
// log them to the WAL, otherwise subsequent commits may reference a series
// which was never written to the WAL.
@ -1260,7 +1258,7 @@ func (a *appender) Rollback() error {
}
// logSeries logs only pending series records to the WAL.
func (a *appender) logSeries() error {
func (a *appenderBase) logSeries() error {
a.mtx.RLock()
defer a.mtx.RUnlock()
@ -1283,7 +1281,7 @@ func (a *appender) logSeries() error {
// minValidTime returns the minimum timestamp that a sample can have
// and is needed for preventing underflow.
func (a *appender) minValidTime(lastTs int64) int64 {
func (a *appenderBase) minValidTime(lastTs int64) int64 {
if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow {
return math.MinInt64
}

208
tsdb/agent/db_append_v2.go Normal file
View file

@ -0,0 +1,208 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agent
import (
"context"
"errors"
"fmt"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
// AppenderV2 implements storage.AppenderV2.
func (db *DB) AppenderV2(context.Context) storage.AppenderV2 {
return db.appenderV2Pool.Get().(storage.AppenderV2)
}
type appenderV2 struct {
appenderBase
}
// Append appends pending sample to agent's DB.
// TODO: Wire metadata in the Agent's appender.
func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
var (
// Avoid shadowing err variables for reliability.
valErr, partialErr error
sampleMetricType = sampleMetricTypeFloat
isStale bool
)
// Fail fast on incorrect histograms.
switch {
case fh != nil:
sampleMetricType = sampleMetricTypeHistogram
valErr = fh.Validate()
case h != nil:
sampleMetricType = sampleMetricTypeHistogram
valErr = h.Validate()
}
if valErr != nil {
return 0, valErr
}
// series references and chunk references are identical for agent mode.
s := a.series.GetByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, err = a.getOrCreate(ls)
if err != nil {
return 0, err
}
}
s.Lock()
lastTS := s.lastTs
s.Unlock()
// TODO(bwplotka): Handle ST natively (as per PROM-60).
if a.opts.EnableSTAsZeroSample && st != 0 {
a.bestEffortAppendSTZeroSample(s, ls, lastTS, st, t, h, fh)
}
if t <= a.minValidTime(lastTS) {
a.metrics.totalOutOfOrderSamples.Inc()
return 0, storage.ErrOutOfOrderSample
}
switch {
case fh != nil:
isStale = value.IsStaleNaN(fh.Sum)
// NOTE: always modify pendingFloatHistograms and floatHistogramSeries together
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: t,
FH: fh,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
case h != nil:
isStale = value.IsStaleNaN(h.Sum)
// NOTE: always modify pendingHistograms and histogramSeries together
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{
Ref: s.ref,
T: t,
H: h,
})
a.histogramSeries = append(a.histogramSeries, s)
default:
isStale = value.IsStaleNaN(v)
// NOTE: always modify pendingSamples and sampleSeries together.
a.pendingSamples = append(a.pendingSamples, record.RefSample{
Ref: s.ref,
T: t,
V: v,
})
a.sampleSeries = append(a.sampleSeries, s)
}
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricType).Inc()
if isStale {
// For stale values we never attempt to process metadata/exemplars, claim the success.
return storage.SeriesRef(s.ref), nil
}
// Append exemplars if any and if storage was configured for it.
// TODO(bwplotka): Agent does not have equivalent of a.head.opts.EnableExemplarStorage && a.head.opts.MaxExemplars.Load() > 0 ?
if len(opts.Exemplars) > 0 {
// Currently only exemplars can return partial errors.
partialErr = a.appendExemplars(s, opts.Exemplars)
}
return storage.SeriesRef(s.ref), partialErr
}
func (a *appenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemplar) error {
var errs []error
for _, e := range exemplar {
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()
if err := a.validateExemplar(s.ref, e); err != nil {
if !errors.Is(err, storage.ErrDuplicateExemplar) {
// Except duplicates, return partial errors.
errs = append(errs, err)
continue
}
if !errors.Is(err, storage.ErrOutOfOrderExemplar) {
a.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", e)
}
continue
}
a.series.SetLatestExemplar(s.ref, &e)
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
Ref: s.ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})
a.metrics.totalAppendedExemplars.Inc()
}
if len(errs) > 0 {
return &storage.AppendPartialError{ExemplarErrors: errs}
}
return nil
}
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
//
// ST is an experimental feature, we don't fail the append on errors, just debug log.
func (a *appenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.Labels, lastTS, st, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) {
// NOTE: Use lset instead of s.lset to avoid locking memSeries. Using s.ref is acceptable without locking.
if st >= t {
a.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrSTNewerThanSample)
return
}
if st <= lastTS {
a.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrOutOfOrderST)
return
}
switch {
case fh != nil:
zeroFloatHistogram := &histogram.FloatHistogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
CustomValues: fh.CustomValues,
}
a.pendingFloatHistograms = append(a.pendingFloatHistograms, record.RefFloatHistogramSample{Ref: s.ref, T: st, FH: zeroFloatHistogram})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
case h != nil:
zeroHistogram := &histogram.Histogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
CustomValues: h.CustomValues,
}
a.pendingHistograms = append(a.pendingHistograms, record.RefHistogramSample{Ref: s.ref, T: st, H: zeroHistogram})
a.histogramSeries = append(a.histogramSeries, s)
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeHistogram).Inc()
default:
a.pendingSamples = append(a.pendingSamples, record.RefSample{Ref: s.ref, T: st, V: 0})
a.sampleSeries = append(a.sampleSeries, s)
a.metrics.totalAppendedSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
}
}

File diff suppressed because it is too large Load diff

View file

@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
@ -1142,6 +1143,10 @@ type walSample struct {
ref storage.SeriesRef
}
// NOTE(bwplotka): This test is testing behaviour of storage.Appender interface against its invariants (see
// storage.Appender comment) around validation of the order of samples within a single Appender. This results
// in a slight bug in AppendSTZero* methods. We are leaving it as-is given the planned removal of AppenderV1 as
// per https://github.com/prometheus/prometheus/issues/17632.
func TestDBStartTimestampSamplesIngestion(t *testing.T) {
t.Parallel()
@ -1154,7 +1159,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
expectsError bool
}
testHistogram := tsdbutil.GenerateTestHistograms(1)[0]
testHistograms := tsdbutil.GenerateTestHistograms(2)
zeroHistogram := &histogram.Histogram{}
lbls := labelsForTest(t.Name(), 1)
@ -1163,7 +1168,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
testCases := []struct {
name string
inputSamples []appendableSample
expectedSamples []*walSample
expectedSamples []walSample
expectedSeriesCount int
}{
{
@ -1172,10 +1177,10 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
{t: 100, st: 1, v: 10, lbls: defLbls},
{t: 101, st: 1, v: 10, lbls: defLbls},
},
expectedSamples: []*walSample{
{t: 1, f: 0, lbls: defLbls},
{t: 100, f: 10, lbls: defLbls},
{t: 101, f: 10, lbls: defLbls},
expectedSamples: []walSample{
{t: 1, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 101, f: 10, lbls: defLbls, ref: 1},
},
},
{
@ -1190,15 +1195,15 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
{
t: 300,
st: 230,
h: testHistogram,
h: testHistograms[0],
lbls: defLbls,
},
},
expectedSamples: []*walSample{
{t: 30, f: 0, lbls: defLbls},
{t: 100, f: 20, lbls: defLbls},
{t: 230, h: zeroHistogram, lbls: defLbls},
{t: 300, h: testHistogram, lbls: defLbls},
expectedSamples: []walSample{
{t: 30, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 20, lbls: defLbls, ref: 1},
{t: 230, h: zeroHistogram, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 1,
},
@ -1217,27 +1222,27 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
// invalid ST histogram
t: 300,
st: 300,
h: testHistogram,
h: testHistograms[0],
lbls: defLbls,
expectsError: true,
},
},
expectedSamples: []*walSample{
{t: 100, f: 10, lbls: defLbls},
{t: 300, h: testHistogram, lbls: defLbls},
expectedSamples: []walSample{
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 0,
},
{
name: "In order ct+normal sample/histogram",
inputSamples: []appendableSample{
{t: 100, h: testHistogram, st: 1, lbls: defLbls},
{t: 101, h: testHistogram, st: 1, lbls: defLbls},
{t: 100, h: testHistograms[0], st: 1, lbls: defLbls},
{t: 101, h: testHistograms[1], st: 1, lbls: defLbls},
},
expectedSamples: []*walSample{
{t: 1, h: &histogram.Histogram{}},
{t: 100, h: testHistogram},
{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.NotCounterReset}},
expectedSamples: []walSample{
{t: 1, h: &histogram.Histogram{}, lbls: defLbls, ref: 1},
{t: 100, h: testHistograms[0], lbls: defLbls, ref: 1},
{t: 101, h: testHistograms[1], lbls: defLbls, ref: 1},
},
},
{
@ -1248,12 +1253,12 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
{t: 180_000, st: 40_000, v: 10, lbls: defLbls},
{t: 50_000, st: 40_000, v: 10, lbls: defLbls},
},
expectedSamples: []*walSample{
{t: 40_000, f: 0, lbls: defLbls},
{t: 50_000, f: 10, lbls: defLbls},
{t: 60_000, f: 10, lbls: defLbls},
{t: 120_000, f: 10, lbls: defLbls},
{t: 180_000, f: 10, lbls: defLbls},
expectedSamples: []walSample{
{t: 40_000, f: 0, lbls: defLbls, ref: 1},
{t: 60_000, f: 10, lbls: defLbls, ref: 1},
{t: 120_000, f: 10, lbls: defLbls, ref: 1},
{t: 180_000, f: 10, lbls: defLbls, ref: 1},
{t: 50_000, f: 10, lbls: defLbls, ref: 1}, // OOO sample.
},
},
}
@ -1294,7 +1299,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
// Close the DB to ensure all data is flushed to the WAL
require.NoError(t, s.Close())
// Check that we dont have any OOO samples in the WAL by checking metrics
// Check that we don't have any OOO samples in the WAL by checking metrics
families, err := reg.Gather()
require.NoError(t, err, "failed to gather metrics")
for _, f := range families {
@ -1303,26 +1308,13 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
}
}
outputSamples := readWALSamples(t, s.wal.Dir())
require.Len(t, outputSamples, len(tc.expectedSamples), "Expected %d samples", len(tc.expectedSamples))
for i, expectedSample := range tc.expectedSamples {
for _, sample := range outputSamples {
if sample.t == expectedSample.t && sample.lbls.String() == expectedSample.lbls.String() {
if expectedSample.h != nil {
require.Equal(t, expectedSample.h, sample.h, "histogram value mismatch (sample index %d)", i)
} else {
require.Equal(t, expectedSample.f, sample.f, "value mismatch (sample index %d)", i)
}
}
}
}
got := readWALSamples(t, s.wal.Dir())
testutil.RequireEqualWithOptions(t, tc.expectedSamples, got, cmp.Options{cmp.AllowUnexported(walSample{})})
})
}
}
func readWALSamples(t *testing.T, walDir string) []*walSample {
func readWALSamples(t *testing.T, walDir string) []walSample {
t.Helper()
sr, err := wlog.NewSegmentsReader(walDir)
require.NoError(t, err)
@ -1339,7 +1331,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
histograms []record.RefHistogramSample
lastSeries record.RefSeries
outputSamples = make([]*walSample, 0)
outputSamples = make([]walSample, 0)
)
for r.Next() {
@ -1353,7 +1345,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
samples, err = dec.Samples(rec, samples[:0])
require.NoError(t, err)
for _, s := range samples {
outputSamples = append(outputSamples, &walSample{
outputSamples = append(outputSamples, walSample{
t: s.T,
f: s.V,
lbls: lastSeries.Labels.Copy(),
@ -1364,7 +1356,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
histograms, err = dec.HistogramSamples(rec, histograms[:0])
require.NoError(t, err)
for _, h := range histograms {
outputSamples = append(outputSamples, &walSample{
outputSamples = append(outputSamples, walSample{
t: h.T,
h: h.H,
lbls: lastSeries.Labels.Copy(),
@ -1373,14 +1365,14 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
}
}
}
return outputSamples
}
func BenchmarkCreateSeries(b *testing.B) {
func BenchmarkGetOrCreate(b *testing.B) {
s := createTestAgentDB(b, nil, DefaultOptions())
defer s.Close()
// NOTE: This benchmarks appenderBase, so it does not matter if it's V1 or V2.
app := s.Appender(context.Background()).(*appender)
lbls := make([]labels.Labels, b.N)

View file

@ -145,7 +145,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
// TODO(bwplotka): Handle ST natively (as per PROM-60).
if a.head.opts.EnableSTAsZeroSample && st != 0 {
a.bestEffortAppendSTZeroSample(s, st, t, h, fh)
a.bestEffortAppendSTZeroSample(s, ls, st, t, h, fh)
}
switch {
@ -167,11 +167,11 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
// an optimization for the more likely case.
switch a.typesInBatch[s.ref] {
case stHistogram, stCustomBucketHistogram:
return a.Append(ref, ls, st, t, 0, &histogram.Histogram{Sum: v}, nil, storage.AOptions{
return a.Append(storage.SeriesRef(s.ref), ls, st, t, 0, &histogram.Histogram{Sum: v}, nil, storage.AOptions{
RejectOutOfOrder: opts.RejectOutOfOrder,
})
case stFloatHistogram, stCustomBucketFloatHistogram:
return a.Append(ref, ls, st, t, 0, nil, &histogram.FloatHistogram{Sum: v}, storage.AOptions{
return a.Append(storage.SeriesRef(s.ref), ls, st, t, 0, nil, &histogram.FloatHistogram{Sum: v}, storage.AOptions{
RejectOutOfOrder: opts.RejectOutOfOrder,
})
}
@ -202,7 +202,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
if isStale {
// For stale values we never attempt to process metadata/exemplars, claim the success.
return ref, nil
return storage.SeriesRef(s.ref), nil
}
// Append exemplars if any and if storage was configured for it.
@ -324,6 +324,7 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp
if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) {
// Except duplicates, return partial errors.
errs = append(errs, err)
continue
}
if !errors.Is(err, storage.ErrOutOfOrderExemplar) {
a.head.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", err)
@ -343,13 +344,14 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp
// is implemented.
//
// ST is an experimental feature, we don't fail the append on errors, just debug log.
func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, st, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) {
func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.Labels, st, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) {
// NOTE: Use lset instead of s.lset to avoid locking memSeries. Using s.ref is acceptable without locking.
if st >= t {
a.head.logger.Debug("Error when appending ST", "series", s.lset.String(), "st", st, "t", t, "err", storage.ErrSTNewerThanSample)
a.head.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrSTNewerThanSample)
return
}
if st < a.minValidTime {
a.head.logger.Debug("Error when appending ST", "series", s.lset.String(), "st", st, "t", t, "err", storage.ErrOutOfBounds)
a.head.logger.Debug("Error when appending ST", "series", ls.String(), "st", st, "t", t, "err", storage.ErrOutOfBounds)
return
}