refactor(tsdb/agent)[PART3]: add AppenderV2 support to agent
Some checks failed
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-12-12 11:12:28 +00:00
parent 763b935b45
commit ad367b504b
8 changed files with 455 additions and 1717 deletions

View file

@ -265,6 +265,7 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
case "created-timestamp-zero-ingestion":
c.scrape.EnableStartTimestampZeroIngestion = true
c.web.STZeroIngestionEnabled = true
c.agent.EnableSTAsZeroSample = true
// Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers.
config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
config.DefaultGlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols
@ -1409,6 +1410,7 @@ func main() {
"MinWALTime", cfg.agent.MinWALTime,
"MaxWALTime", cfg.agent.MaxWALTime,
"OutOfOrderTimeWindow", cfg.agent.OutOfOrderTimeWindow,
"EnableSTAsZeroSample", cfg.agent.EnableSTAsZeroSample,
)
localStorage.Set(db, 0)
@ -1947,7 +1949,8 @@ type agentOptions struct {
TruncateFrequency model.Duration
MinWALTime, MaxWALTime model.Duration
NoLockfile bool
OutOfOrderTimeWindow int64
OutOfOrderTimeWindow int64 // TODO(bwplotka): Unused option, fix it or remove.
EnableSTAsZeroSample bool
}
func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Options {
@ -1963,6 +1966,7 @@ func (opts agentOptions) ToAgentOptions(outOfOrderTimeWindow int64) agent.Option
MaxWALTime: durationToInt64Millis(time.Duration(opts.MaxWALTime)),
NoLockfile: opts.NoLockfile,
OutOfOrderTimeWindow: outOfOrderTimeWindow,
EnableSTAsZeroSample: opts.EnableSTAsZeroSample,
}
}

View file

@ -274,8 +274,8 @@ type AppendOptions struct {
//
// Operations on the Appender interface are not goroutine-safe.
//
// The order of samples appended via the Appender is preserved within each
// series. I.e. samples are not reordered per timestamp, or by float/histogram
// The order of samples appended via the Appender is preserved within each series.
// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram
// type.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).

View file

@ -103,8 +103,8 @@ var _ error = &AppendPartialError{}
//
// Operations on the AppenderV2 interface are not goroutine-safe.
//
// The order of samples appended via the AppenderV2 is preserved within each
// series. I.e. samples are not reordered per timestamp, or by float/histogram
// The order of samples appended via the AppenderV2 is preserved within each series.
// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram
// type.
type AppenderV2 interface {
AppenderTransaction

View file

@ -84,6 +84,15 @@ type Options struct {
// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
OutOfOrderTimeWindow int64
// EnableSTAsZeroSample represents 'created-timestamp-zero-ingestion' feature flag.
// If true, ST, if non-empty and earlier than sample timestamp, will be stored
// as a zero sample before the actual sample.
//
// The zero sample is best-effort, only debug log on failure is emitted.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableSTAsZeroSample bool
}
// DefaultOptions used for the WAL storage. They are reasonable for setups using
@ -233,8 +242,9 @@ type DB struct {
wal *wlog.WL
locker *tsdbutil.DirLocker
appenderPool sync.Pool
bufPool sync.Pool
appenderPool sync.Pool
appenderV2Pool sync.Pool
bufPool sync.Pool
// These pools are only used during WAL replay and are reset at the end.
// NOTE: Adjust resetWALReplayResources() upon changes to the pools.
@ -303,12 +313,26 @@ func Open(l *slog.Logger, reg prometheus.Registerer, rs *remote.Storage, dir str
db.appenderPool.New = func() any {
return &appender{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
appenderBase: appenderBase{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
},
}
}
db.appenderV2Pool.New = func() any {
return &appenderV2{
appenderBase: appenderBase{
DB: db,
pendingSeries: make([]record.RefSeries, 0, 100),
pendingSamples: make([]record.RefSample, 0, 100),
pendingHistograms: make([]record.RefHistogramSample, 0, 100),
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
},
}
}
@ -777,9 +801,8 @@ func (db *DB) Close() error {
return tsdb_errors.NewMulti(db.locker.Release(), db.wal.Close()).Err()
}
type appender struct {
type appenderBase struct {
*DB
hints *storage.AppendOptions
pendingSeries []record.RefSeries
pendingSamples []record.RefSample
@ -800,6 +823,12 @@ type appender struct {
floatHistogramSeries []*memSeries
}
type appender struct {
appenderBase
hints *storage.AppendOptions
}
func (a *appender) SetOptions(opts *storage.AppendOptions) {
a.hints = opts
}
@ -853,7 +882,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
return storage.SeriesRef(series.ref), nil
}
func (a *appender) getOrCreate(l labels.Labels) (series *memSeries, created bool) {
func (a *appenderBase) getOrCreate(l labels.Labels) (series *memSeries, created bool) {
hash := l.Hash()
series = a.series.GetByHash(hash, l)
@ -879,47 +908,53 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exem
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
return 0, fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar)
}
// Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
labelSetLen := 0
err := e.Labels.Validate(func(l labels.Label) error {
labelSetLen += utf8.RuneCountInString(l.Name)
labelSetLen += utf8.RuneCountInString(l.Value)
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
return storage.ErrExemplarLabelLength
if err := a.validateExemplar(s.ref, e); err != nil {
if errors.Is(err, storage.ErrDuplicateExemplar) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
return nil
})
if err != nil {
return 0, err
}
// Check for duplicate vs last stored exemplar for this series, and discard those.
// Otherwise, record the current exemplar as the latest.
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
prevExemplar := a.series.GetLatestExemplar(s.ref)
if prevExemplar != nil && prevExemplar.Equals(e) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
}
a.series.SetLatestExemplar(s.ref, &e)
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
Ref: s.ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})
a.metrics.totalAppendedExemplars.Inc()
return storage.SeriesRef(s.ref), nil
}
func (a *appenderBase) validateExemplar(ref chunks.HeadSeriesRef, e exemplar.Exemplar) error {
if lbl, dup := e.Labels.HasDuplicateLabelNames(); dup {
return fmt.Errorf(`label name "%s" is not unique: %w`, lbl, tsdb.ErrInvalidExemplar)
}
// Exemplar label length does not include chars involved in text rendering such as quotes
// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
labelSetLen := 0
if err := e.Labels.Validate(func(l labels.Label) error {
labelSetLen += utf8.RuneCountInString(l.Name)
labelSetLen += utf8.RuneCountInString(l.Value)
if labelSetLen > exemplar.ExemplarMaxLabelSetLength {
return storage.ErrExemplarLabelLength
}
return nil
}); err != nil {
return err
}
// Check for duplicate vs last stored exemplar for this series, and discard those.
// Otherwise, record the current exemplar as the latest.
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
prevExemplar := a.series.GetLatestExemplar(ref)
if prevExemplar != nil && prevExemplar.Equals(e) {
return storage.ErrDuplicateExemplar
}
return nil
}
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
if err := h.Validate(); err != nil {
@ -1046,6 +1081,9 @@ func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.L
// discard the sample if it's out of order.
return 0, storage.ErrOutOfOrderST
}
// NOTE(bwplotka): This is a bug, as we "commit" pending sample TS as the WAL last TS. It was likely done
// to satisfy incorrect TestDBStartTimestampSamplesIngestion test. We are leaving it as-is given the planned removal
// of AppenderV1 as per https://github.com/prometheus/prometheus/issues/17632.
series.lastTs = st
switch {
@ -1110,6 +1148,9 @@ func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
// discard the sample if it's out of order.
return 0, storage.ErrOutOfOrderST
}
// NOTE(bwplotka): This is a bug, as we "commit" pending sample TS as the WAL last TS. It was likely done
// to satisfy incorrect TestDBStartTimestampSamplesIngestion test. We are leaving it as-is given the planned removal
// of AppenderV1 as per https://github.com/prometheus/prometheus/issues/17632.
series.lastTs = st
// NOTE: always modify pendingSamples and sampleSeries together.
@ -1126,7 +1167,7 @@ func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, t,
}
// Commit submits the collected samples and purges the batch.
func (a *appender) Commit() error {
func (a *appenderBase) Commit() error {
if err := a.log(); err != nil {
return err
}
@ -1141,7 +1182,7 @@ func (a *appender) Commit() error {
}
// log logs all pending data to the WAL.
func (a *appender) log() error {
func (a *appenderBase) log() error {
a.mtx.RLock()
defer a.mtx.RUnlock()
@ -1235,7 +1276,7 @@ func (a *appender) log() error {
}
// clearData clears all pending data.
func (a *appender) clearData() {
func (a *appenderBase) clearData() {
a.pendingSeries = a.pendingSeries[:0]
a.pendingSamples = a.pendingSamples[:0]
a.pendingHistograms = a.pendingHistograms[:0]
@ -1246,7 +1287,7 @@ func (a *appender) clearData() {
a.floatHistogramSeries = a.floatHistogramSeries[:0]
}
func (a *appender) Rollback() error {
func (a *appenderBase) Rollback() error {
// Series are created in-memory regardless of rollback. This means we must
// log them to the WAL, otherwise subsequent commits may reference a series
// which was never written to the WAL.
@ -1260,7 +1301,7 @@ func (a *appender) Rollback() error {
}
// logSeries logs only pending series records to the WAL.
func (a *appender) logSeries() error {
func (a *appenderBase) logSeries() error {
a.mtx.RLock()
defer a.mtx.RUnlock()
@ -1283,7 +1324,7 @@ func (a *appender) logSeries() error {
// minValidTime returns the minimum timestamp that a sample can have
// and is needed for preventing underflow.
func (a *appender) minValidTime(lastTs int64) int64 {
func (a *appenderBase) minValidTime(lastTs int64) int64 {
if lastTs < math.MinInt64+a.opts.OutOfOrderTimeWindow {
return math.MinInt64
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
@ -1142,6 +1143,10 @@ type walSample struct {
ref storage.SeriesRef
}
// NOTE(bwplotka): This test is testing behaviour of storage.Appender interface against its invariants (see
// storage.Appender comment) around validation of the order of samples within a single Appender. This results
// in a slight bug in AppendSTZero* methods. We are leaving it as-is given the planned removal of AppenderV1 as
// per https://github.com/prometheus/prometheus/issues/17632.
func TestDBStartTimestampSamplesIngestion(t *testing.T) {
t.Parallel()
@ -1154,7 +1159,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
expectsError bool
}
testHistogram := tsdbutil.GenerateTestHistograms(1)[0]
testHistograms := tsdbutil.GenerateTestHistograms(2)
zeroHistogram := &histogram.Histogram{}
lbls := labelsForTest(t.Name(), 1)
@ -1163,7 +1168,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
testCases := []struct {
name string
inputSamples []appendableSample
expectedSamples []*walSample
expectedSamples []walSample
expectedSeriesCount int
}{
{
@ -1172,10 +1177,10 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
{t: 100, st: 1, v: 10, lbls: defLbls},
{t: 101, st: 1, v: 10, lbls: defLbls},
},
expectedSamples: []*walSample{
{t: 1, f: 0, lbls: defLbls},
{t: 100, f: 10, lbls: defLbls},
{t: 101, f: 10, lbls: defLbls},
expectedSamples: []walSample{
{t: 1, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 101, f: 10, lbls: defLbls, ref: 1},
},
},
{
@ -1190,15 +1195,15 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
{
t: 300,
st: 230,
h: testHistogram,
h: testHistograms[0],
lbls: defLbls,
},
},
expectedSamples: []*walSample{
{t: 30, f: 0, lbls: defLbls},
{t: 100, f: 20, lbls: defLbls},
{t: 230, h: zeroHistogram, lbls: defLbls},
{t: 300, h: testHistogram, lbls: defLbls},
expectedSamples: []walSample{
{t: 30, f: 0, lbls: defLbls, ref: 1},
{t: 100, f: 20, lbls: defLbls, ref: 1},
{t: 230, h: zeroHistogram, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 1,
},
@ -1217,27 +1222,27 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
// invalid ST histogram
t: 300,
st: 300,
h: testHistogram,
h: testHistograms[0],
lbls: defLbls,
expectsError: true,
},
},
expectedSamples: []*walSample{
{t: 100, f: 10, lbls: defLbls},
{t: 300, h: testHistogram, lbls: defLbls},
expectedSamples: []walSample{
{t: 100, f: 10, lbls: defLbls, ref: 1},
{t: 300, h: testHistograms[0], lbls: defLbls, ref: 1},
},
expectedSeriesCount: 0,
},
{
name: "In order ct+normal sample/histogram",
inputSamples: []appendableSample{
{t: 100, h: testHistogram, st: 1, lbls: defLbls},
{t: 101, h: testHistogram, st: 1, lbls: defLbls},
{t: 100, h: testHistograms[0], st: 1, lbls: defLbls},
{t: 101, h: testHistograms[1], st: 1, lbls: defLbls},
},
expectedSamples: []*walSample{
{t: 1, h: &histogram.Histogram{}},
{t: 100, h: testHistogram},
{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.NotCounterReset}},
expectedSamples: []walSample{
{t: 1, h: &histogram.Histogram{}, lbls: defLbls, ref: 1},
{t: 100, h: testHistograms[0], lbls: defLbls, ref: 1},
{t: 101, h: testHistograms[1], lbls: defLbls, ref: 1},
},
},
{
@ -1248,12 +1253,12 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
{t: 180_000, st: 40_000, v: 10, lbls: defLbls},
{t: 50_000, st: 40_000, v: 10, lbls: defLbls},
},
expectedSamples: []*walSample{
{t: 40_000, f: 0, lbls: defLbls},
{t: 50_000, f: 10, lbls: defLbls},
{t: 60_000, f: 10, lbls: defLbls},
{t: 120_000, f: 10, lbls: defLbls},
{t: 180_000, f: 10, lbls: defLbls},
expectedSamples: []walSample{
{t: 40_000, f: 0, lbls: defLbls, ref: 1},
{t: 60_000, f: 10, lbls: defLbls, ref: 1},
{t: 120_000, f: 10, lbls: defLbls, ref: 1},
{t: 180_000, f: 10, lbls: defLbls, ref: 1},
{t: 50_000, f: 10, lbls: defLbls, ref: 1}, // OOO sample.
},
},
}
@ -1294,7 +1299,7 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
// Close the DB to ensure all data is flushed to the WAL
require.NoError(t, s.Close())
// Check that we dont have any OOO samples in the WAL by checking metrics
// Check that we don't have any OOO samples in the WAL by checking metrics
families, err := reg.Gather()
require.NoError(t, err, "failed to gather metrics")
for _, f := range families {
@ -1303,26 +1308,13 @@ func TestDBStartTimestampSamplesIngestion(t *testing.T) {
}
}
outputSamples := readWALSamples(t, s.wal.Dir())
require.Len(t, outputSamples, len(tc.expectedSamples), "Expected %d samples", len(tc.expectedSamples))
for i, expectedSample := range tc.expectedSamples {
for _, sample := range outputSamples {
if sample.t == expectedSample.t && sample.lbls.String() == expectedSample.lbls.String() {
if expectedSample.h != nil {
require.Equal(t, expectedSample.h, sample.h, "histogram value mismatch (sample index %d)", i)
} else {
require.Equal(t, expectedSample.f, sample.f, "value mismatch (sample index %d)", i)
}
}
}
}
got := readWALSamples(t, s.wal.Dir())
testutil.RequireEqualWithOptions(t, tc.expectedSamples, got, cmp.Options{cmp.AllowUnexported(walSample{})})
})
}
}
func readWALSamples(t *testing.T, walDir string) []*walSample {
func readWALSamples(t *testing.T, walDir string) []walSample {
t.Helper()
sr, err := wlog.NewSegmentsReader(walDir)
require.NoError(t, err)
@ -1339,7 +1331,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
histograms []record.RefHistogramSample
lastSeries record.RefSeries
outputSamples = make([]*walSample, 0)
outputSamples = make([]walSample, 0)
)
for r.Next() {
@ -1353,7 +1345,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
samples, err = dec.Samples(rec, samples[:0])
require.NoError(t, err)
for _, s := range samples {
outputSamples = append(outputSamples, &walSample{
outputSamples = append(outputSamples, walSample{
t: s.T,
f: s.V,
lbls: lastSeries.Labels.Copy(),
@ -1364,7 +1356,7 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
histograms, err = dec.HistogramSamples(rec, histograms[:0])
require.NoError(t, err)
for _, h := range histograms {
outputSamples = append(outputSamples, &walSample{
outputSamples = append(outputSamples, walSample{
t: h.T,
h: h.H,
lbls: lastSeries.Labels.Copy(),
@ -1373,14 +1365,14 @@ func readWALSamples(t *testing.T, walDir string) []*walSample {
}
}
}
return outputSamples
}
func BenchmarkCreateSeries(b *testing.B) {
func BenchmarkGetOrCreate(b *testing.B) {
s := createTestAgentDB(b, nil, DefaultOptions())
defer s.Close()
// NOTE: This benchmarks appenderBase, so it does not matter if it's V1 or V2.
app := s.Appender(context.Background()).(*appender)
lbls := make([]labels.Labels, b.N)

View file

@ -167,11 +167,11 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
// an optimization for the more likely case.
switch a.typesInBatch[s.ref] {
case stHistogram, stCustomBucketHistogram:
return a.Append(ref, ls, st, t, 0, &histogram.Histogram{Sum: v}, nil, storage.AOptions{
return a.Append(storage.SeriesRef(s.ref), ls, st, t, 0, &histogram.Histogram{Sum: v}, nil, storage.AOptions{
RejectOutOfOrder: opts.RejectOutOfOrder,
})
case stFloatHistogram, stCustomBucketFloatHistogram:
return a.Append(ref, ls, st, t, 0, nil, &histogram.FloatHistogram{Sum: v}, storage.AOptions{
return a.Append(storage.SeriesRef(s.ref), ls, st, t, 0, nil, &histogram.FloatHistogram{Sum: v}, storage.AOptions{
RejectOutOfOrder: opts.RejectOutOfOrder,
})
}
@ -202,7 +202,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
if isStale {
// For stale values we never attempt to process metadata/exemplars, claim the success.
return ref, nil
return storage.SeriesRef(s.ref), nil
}
// Append exemplars if any and if storage was configured for it.
@ -324,6 +324,7 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp
if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) {
// Except duplicates, return partial errors.
errs = append(errs, err)
continue
}
if !errors.Is(err, storage.ErrOutOfOrderExemplar) {
a.head.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", e)