From 9657c23c374db182dfefc1d4eb007fa44d4d711a Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 2 Feb 2026 07:04:30 +0000 Subject: [PATCH] storage: optimized fanoutAppenderV2 (#17976) * storage: add BenchmarkFanoutAppenderV2 Signed-off-by: bwplotka * fix: optimized fanoutAppenderV2 Signed-off-by: bwplotka * optimized more Signed-off-by: bwplotka --------- Signed-off-by: bwplotka --- storage/fanout.go | 16 ++++---- storage/fanout_test.go | 39 +++++++++++++++++++ storage/interface_append.go | 35 +++++++++++++---- util/teststorage/appender.go | 74 +++++++++++++++++++++--------------- 4 files changed, 119 insertions(+), 45 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index 9baa31d9af..21f5f715e4 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -300,20 +300,22 @@ type fanoutAppenderV2 struct { } func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) { + var partialErr *AppendPartialError + ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts) - var partialErr AppendPartialError - if partialErr.Handle(err) != nil { + partialErr, err = partialErr.Handle(err) + if err != nil { return ref, err } for _, appender := range f.secondaries { - if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil { - if partialErr.Handle(err) != nil { - return ref, err - } + _, serr := appender.Append(ref, l, st, t, v, h, fh, opts) + partialErr, serr = partialErr.Handle(serr) + if serr != nil { + return ref, serr } } - return ref, partialErr.ErrOrNil() + return ref, partialErr.ToError() } func (f *fanoutAppenderV2) Commit() (err error) { diff --git a/storage/fanout_test.go b/storage/fanout_test.go index 948934d041..027511aa3a 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -16,6 +16,7 @@ package storage_test import ( "context" "errors" + "strconv" "testing" "github.com/prometheus/common/model" @@ -563,3 +564,41 @@ func TestFanoutAppenderV2(t *testing.T) { }) } } + +// Recommended CLI invocation: +/* + export bench=fanoutAppender && go test ./storage/... \ + -run '^$' -bench '^BenchmarkFanoutAppenderV2' \ + -benchtime 2s -count 6 -cpu 2 -timeout 999m \ + | tee ${bench}.txt +*/ +func BenchmarkFanoutAppenderV2(b *testing.B) { + ex := []exemplar.Exemplar{{Value: 1}} + + var series []labels.Labels + for i := range 1000 { + series = append(series, labels.FromStrings(model.MetricNameLabel, "metric1", "i", strconv.Itoa(i))) + } + for _, tt := range fanoutAppenderTestCases(nil) { + // Turn our mock appender into ~noop for no allocs. + tt.primary.SkipRecording(true) + tt.secondary.SkipRecording(true) + + b.Run(tt.name, func(b *testing.B) { + f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary}) + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + app := f.AppenderV2(b.Context()) + for _, s := range series { + // Purposefully skip errors as we want to benchmark error cases too (majority of the fanout logic). + _, _ = app.Append(0, s, 0, 0, 1, nil, nil, storage.AOptions{ + Exemplars: ex, + }) + } + require.NoError(b, app.Rollback()) + } + }) + } +} diff --git a/storage/interface_append.go b/storage/interface_append.go index aa4ae84152..b5ee4b49c8 100644 --- a/storage/interface_append.go +++ b/storage/interface_append.go @@ -90,6 +90,10 @@ type AppendPartialError struct { // Error returns combined error string. func (e *AppendPartialError) Error() string { + if e == nil { + return "" + } + errs := errors.Join(e.ExemplarErrors...) if errs == nil { return "" @@ -97,29 +101,46 @@ func (e *AppendPartialError) Error() string { return errs.Error() } -// ErrOrNil returns AppendPartialError as error, returning nil +// ToError returns AppendPartialError as error, returning nil // if there are no errors. -func (e *AppendPartialError) ErrOrNil() error { - if len(e.ExemplarErrors) == 0 { +func (e *AppendPartialError) ToError() error { + if e == nil { return nil } return e } +// Is implements method that's expected by errors.Is. +func (*AppendPartialError) Is(target error) bool { + // This does not need to handle wrapped errors as AppendPartialError.Is should be used + // via errors.Is. + _, ok := target.(*AppendPartialError) + return ok +} + // Handle handles the given err that may be an AppendPartialError. // If the err is nil or not an AppendPartialError it returns err. // Otherwise, partial errors are aggregated. -func (e *AppendPartialError) Handle(err error) error { +func (e *AppendPartialError) Handle(err error) (*AppendPartialError, error) { if err == nil { - return nil + return e, nil } + // Fast, alloc-free path first for non-partial error cases. + if !errors.Is(err, e) { + return e, err + } var pErr *AppendPartialError if !errors.As(err, &pErr) { - return err + return e, err + } + + if e == nil { + // Lazy allocation. + e = &AppendPartialError{} } e.ExemplarErrors = append(e.ExemplarErrors, pErr.ExemplarErrors...) - return nil + return e, nil } var _ error = &AppendPartialError{} diff --git a/util/teststorage/appender.go b/util/teststorage/appender.go index dc0825f98f..d2d550be2e 100644 --- a/util/teststorage/appender.go +++ b/util/teststorage/appender.go @@ -185,6 +185,7 @@ type Appendable struct { appendErrFn func(ls labels.Labels) error // If non-nil, inject appender error on every Append, AppendHistogram and ST zero calls. appendExemplarsError error // If non-nil, inject exemplar error. commitErr error // If non-nil, inject commit error. + skipRecording bool // If true, Appendable won't record samples, useful for benchmarks. mtx sync.Mutex openAppenders atomic.Int32 // Guard against multi-appender use. @@ -222,6 +223,13 @@ func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendEx return a } +// SkipRecording enables or disables recording appended samples. +// If skipped, Appendable allocs less, but Result*() methods will give always empty results. This is useful for benchmarking. +func (a *Appendable) SkipRecording(skipRecording bool) *Appendable { + a.skipRecording = skipRecording + return a +} + // PendingSamples returns pending samples (samples appended without commit). func (a *Appendable) PendingSamples() []Sample { a.mtx.Lock() @@ -335,8 +343,10 @@ func (a *baseAppender) Commit() error { } a.a.mtx.Lock() - a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...) - a.a.pendingSamples = a.a.pendingSamples[:0] + if !a.a.skipRecording { + a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...) + a.a.pendingSamples = a.a.pendingSamples[:0] + } a.err = errClosedAppender a.a.mtx.Unlock() @@ -353,8 +363,10 @@ func (a *baseAppender) Rollback() error { defer a.a.openAppenders.Dec() a.a.mtx.Lock() - a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...) - a.a.pendingSamples = a.a.pendingSamples[:0] + if !a.a.skipRecording { + a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...) + a.a.pendingSamples = a.a.pendingSamples[:0] + } a.err = errClosedAppender a.a.mtx.Unlock() @@ -548,37 +560,37 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64 } } - var ( - es []exemplar.Exemplar - partialErr error - ) + var partialErr error + if !a.a.skipRecording { + var es []exemplar.Exemplar - if len(opts.Exemplars) > 0 { - if a.a.appendExemplarsError != nil { - var exErrs []error - for range opts.Exemplars { - exErrs = append(exErrs, a.a.appendExemplarsError) + if len(opts.Exemplars) > 0 { + if a.a.appendExemplarsError != nil { + var exErrs []error + for range opts.Exemplars { + exErrs = append(exErrs, a.a.appendExemplarsError) + } + if len(exErrs) > 0 { + partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} + } + } else { + // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. + es = make([]exemplar.Exemplar, len(opts.Exemplars)) + copy(es, opts.Exemplars) } - if len(exErrs) > 0 { - partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs} - } - } else { - // As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse. - es = make([]exemplar.Exemplar, len(opts.Exemplars)) - copy(es, opts.Exemplars) } - } - a.a.mtx.Lock() - a.a.pendingSamples = append(a.a.pendingSamples, Sample{ - MF: opts.MetricFamilyName, - M: opts.Metadata, - L: ls, - ST: st, T: t, - V: v, H: h, FH: fh, - ES: es, - }) - a.a.mtx.Unlock() + a.a.mtx.Lock() + a.a.pendingSamples = append(a.a.pendingSamples, Sample{ + MF: opts.MetricFamilyName, + M: opts.Metadata, + L: ls, + ST: st, T: t, + V: v, H: h, FH: fh, + ES: es, + }) + a.a.mtx.Unlock() + } if a.next != nil { ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)