mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
storage: optimized fanoutAppenderV2 (#17976)
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
* storage: add BenchmarkFanoutAppenderV2 Signed-off-by: bwplotka <bwplotka@gmail.com> * fix: optimized fanoutAppenderV2 Signed-off-by: bwplotka <bwplotka@gmail.com> * optimized more Signed-off-by: bwplotka <bwplotka@gmail.com> --------- Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
d9db76631d
commit
9657c23c37
4 changed files with 119 additions and 45 deletions
|
|
@ -300,20 +300,22 @@ type fanoutAppenderV2 struct {
|
|||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) {
|
||||
var partialErr *AppendPartialError
|
||||
|
||||
ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts)
|
||||
var partialErr AppendPartialError
|
||||
if partialErr.Handle(err) != nil {
|
||||
partialErr, err = partialErr.Handle(err)
|
||||
if err != nil {
|
||||
return ref, err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil {
|
||||
if partialErr.Handle(err) != nil {
|
||||
return ref, err
|
||||
}
|
||||
_, serr := appender.Append(ref, l, st, t, v, h, fh, opts)
|
||||
partialErr, serr = partialErr.Handle(serr)
|
||||
if serr != nil {
|
||||
return ref, serr
|
||||
}
|
||||
}
|
||||
return ref, partialErr.ErrOrNil()
|
||||
return ref, partialErr.ToError()
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Commit() (err error) {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ package storage_test
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
|
@ -563,3 +564,41 @@ func TestFanoutAppenderV2(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Recommended CLI invocation:
|
||||
/*
|
||||
export bench=fanoutAppender && go test ./storage/... \
|
||||
-run '^$' -bench '^BenchmarkFanoutAppenderV2' \
|
||||
-benchtime 2s -count 6 -cpu 2 -timeout 999m \
|
||||
| tee ${bench}.txt
|
||||
*/
|
||||
func BenchmarkFanoutAppenderV2(b *testing.B) {
|
||||
ex := []exemplar.Exemplar{{Value: 1}}
|
||||
|
||||
var series []labels.Labels
|
||||
for i := range 1000 {
|
||||
series = append(series, labels.FromStrings(model.MetricNameLabel, "metric1", "i", strconv.Itoa(i)))
|
||||
}
|
||||
for _, tt := range fanoutAppenderTestCases(nil) {
|
||||
// Turn our mock appender into ~noop for no allocs.
|
||||
tt.primary.SkipRecording(true)
|
||||
tt.secondary.SkipRecording(true)
|
||||
|
||||
b.Run(tt.name, func(b *testing.B) {
|
||||
f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary})
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for b.Loop() {
|
||||
app := f.AppenderV2(b.Context())
|
||||
for _, s := range series {
|
||||
// Purposefully skip errors as we want to benchmark error cases too (majority of the fanout logic).
|
||||
_, _ = app.Append(0, s, 0, 0, 1, nil, nil, storage.AOptions{
|
||||
Exemplars: ex,
|
||||
})
|
||||
}
|
||||
require.NoError(b, app.Rollback())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,6 +90,10 @@ type AppendPartialError struct {
|
|||
|
||||
// Error returns combined error string.
|
||||
func (e *AppendPartialError) Error() string {
|
||||
if e == nil {
|
||||
return "<nil>"
|
||||
}
|
||||
|
||||
errs := errors.Join(e.ExemplarErrors...)
|
||||
if errs == nil {
|
||||
return ""
|
||||
|
|
@ -97,29 +101,46 @@ func (e *AppendPartialError) Error() string {
|
|||
return errs.Error()
|
||||
}
|
||||
|
||||
// ErrOrNil returns AppendPartialError as error, returning nil
|
||||
// ToError returns AppendPartialError as error, returning nil
|
||||
// if there are no errors.
|
||||
func (e *AppendPartialError) ErrOrNil() error {
|
||||
if len(e.ExemplarErrors) == 0 {
|
||||
func (e *AppendPartialError) ToError() error {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Is implements method that's expected by errors.Is.
|
||||
func (*AppendPartialError) Is(target error) bool {
|
||||
// This does not need to handle wrapped errors as AppendPartialError.Is should be used
|
||||
// via errors.Is.
|
||||
_, ok := target.(*AppendPartialError)
|
||||
return ok
|
||||
}
|
||||
|
||||
// Handle handles the given err that may be an AppendPartialError.
|
||||
// If the err is nil or not an AppendPartialError it returns err.
|
||||
// Otherwise, partial errors are aggregated.
|
||||
func (e *AppendPartialError) Handle(err error) error {
|
||||
func (e *AppendPartialError) Handle(err error) (*AppendPartialError, error) {
|
||||
if err == nil {
|
||||
return nil
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// Fast, alloc-free path first for non-partial error cases.
|
||||
if !errors.Is(err, e) {
|
||||
return e, err
|
||||
}
|
||||
var pErr *AppendPartialError
|
||||
if !errors.As(err, &pErr) {
|
||||
return err
|
||||
return e, err
|
||||
}
|
||||
|
||||
if e == nil {
|
||||
// Lazy allocation.
|
||||
e = &AppendPartialError{}
|
||||
}
|
||||
e.ExemplarErrors = append(e.ExemplarErrors, pErr.ExemplarErrors...)
|
||||
return nil
|
||||
return e, nil
|
||||
}
|
||||
|
||||
var _ error = &AppendPartialError{}
|
||||
|
|
|
|||
|
|
@ -185,6 +185,7 @@ type Appendable struct {
|
|||
appendErrFn func(ls labels.Labels) error // If non-nil, inject appender error on every Append, AppendHistogram and ST zero calls.
|
||||
appendExemplarsError error // If non-nil, inject exemplar error.
|
||||
commitErr error // If non-nil, inject commit error.
|
||||
skipRecording bool // If true, Appendable won't record samples, useful for benchmarks.
|
||||
|
||||
mtx sync.Mutex
|
||||
openAppenders atomic.Int32 // Guard against multi-appender use.
|
||||
|
|
@ -222,6 +223,13 @@ func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendEx
|
|||
return a
|
||||
}
|
||||
|
||||
// SkipRecording enables or disables recording appended samples.
|
||||
// If skipped, Appendable allocs less, but Result*() methods will give always empty results. This is useful for benchmarking.
|
||||
func (a *Appendable) SkipRecording(skipRecording bool) *Appendable {
|
||||
a.skipRecording = skipRecording
|
||||
return a
|
||||
}
|
||||
|
||||
// PendingSamples returns pending samples (samples appended without commit).
|
||||
func (a *Appendable) PendingSamples() []Sample {
|
||||
a.mtx.Lock()
|
||||
|
|
@ -335,8 +343,10 @@ func (a *baseAppender) Commit() error {
|
|||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
if !a.a.skipRecording {
|
||||
a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
}
|
||||
a.err = errClosedAppender
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
|
|
@ -353,8 +363,10 @@ func (a *baseAppender) Rollback() error {
|
|||
defer a.a.openAppenders.Dec()
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
if !a.a.skipRecording {
|
||||
a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...)
|
||||
a.a.pendingSamples = a.a.pendingSamples[:0]
|
||||
}
|
||||
a.err = errClosedAppender
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
|
|
@ -548,37 +560,37 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
}
|
||||
}
|
||||
|
||||
var (
|
||||
es []exemplar.Exemplar
|
||||
partialErr error
|
||||
)
|
||||
var partialErr error
|
||||
if !a.a.skipRecording {
|
||||
var es []exemplar.Exemplar
|
||||
|
||||
if len(opts.Exemplars) > 0 {
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
if len(opts.Exemplars) > 0 {
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
} else {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
} else {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{
|
||||
MF: opts.MetricFamilyName,
|
||||
M: opts.Metadata,
|
||||
L: ls,
|
||||
ST: st, T: t,
|
||||
V: v, H: h, FH: fh,
|
||||
ES: es,
|
||||
})
|
||||
a.a.mtx.Unlock()
|
||||
a.a.mtx.Lock()
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{
|
||||
MF: opts.MetricFamilyName,
|
||||
M: opts.Metadata,
|
||||
L: ls,
|
||||
ST: st, T: t,
|
||||
V: v, H: h, FH: fh,
|
||||
ES: es,
|
||||
})
|
||||
a.a.mtx.Unlock()
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)
|
||||
|
|
|
|||
Loading…
Reference in a new issue