mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Merge pull request #17835 from prometheus/bwplotka/a2-storage-support
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
Some checks failed
buf.build / lint and publish (push) Has been cancelled
CI / Go tests (push) Has been cancelled
CI / More Go tests (push) Has been cancelled
CI / Go tests with previous Go version (push) Has been cancelled
CI / UI tests (push) Has been cancelled
CI / Go tests on Windows (push) Has been cancelled
CI / Mixins tests (push) Has been cancelled
CI / Build Prometheus for common architectures (push) Has been cancelled
CI / Build Prometheus for all architectures (push) Has been cancelled
CI / Check generated parser (push) Has been cancelled
CI / golangci-lint (push) Has been cancelled
CI / fuzzing (push) Has been cancelled
CI / codeql (push) Has been cancelled
Scorecards supply-chain security / Scorecards analysis (push) Has been cancelled
CI / Report status of build Prometheus for all architectures (push) Has been cancelled
CI / Publish main branch artifacts (push) Has been cancelled
CI / Publish release artefacts (push) Has been cancelled
CI / Publish UI on npm Registry (push) Has been cancelled
feat(storage)[PART4b]: add AppenderV2 to the rest of storage.Storage implementations + mock exemplar fix
This commit is contained in:
commit
c4b0da94db
9 changed files with 535 additions and 37 deletions
|
|
@ -1746,6 +1746,14 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
|
|||
return notReadyAppender{}
|
||||
}
|
||||
|
||||
// AppenderV2 implements the Storage interface.
|
||||
func (s *readyStorage) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
||||
if x := s.get(); x != nil {
|
||||
return x.AppenderV2(ctx)
|
||||
}
|
||||
return notReadyAppenderV2{}
|
||||
}
|
||||
|
||||
type notReadyAppender struct{}
|
||||
|
||||
// SetOptions does nothing in this appender implementation.
|
||||
|
|
@ -1779,6 +1787,15 @@ func (notReadyAppender) Commit() error { return tsdb.ErrNotReady }
|
|||
|
||||
func (notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
|
||||
|
||||
type notReadyAppenderV2 struct{}
|
||||
|
||||
func (notReadyAppenderV2) Append(storage.SeriesRef, labels.Labels, int64, int64, float64, *histogram.Histogram, *histogram.FloatHistogram, storage.AOptions) (storage.SeriesRef, error) {
|
||||
return 0, tsdb.ErrNotReady
|
||||
}
|
||||
func (notReadyAppenderV2) Commit() error { return tsdb.ErrNotReady }
|
||||
|
||||
func (notReadyAppenderV2) Rollback() error { return tsdb.ErrNotReady }
|
||||
|
||||
// Close implements the Storage interface.
|
||||
func (s *readyStorage) Close() error {
|
||||
if x := s.get(); x != nil {
|
||||
|
|
|
|||
|
|
@ -136,6 +136,19 @@ func (f *fanout) Appender(ctx context.Context) Appender {
|
|||
}
|
||||
}
|
||||
|
||||
func (f *fanout) AppenderV2(ctx context.Context) AppenderV2 {
|
||||
primary := f.primary.AppenderV2(ctx)
|
||||
secondaries := make([]AppenderV2, 0, len(f.secondaries))
|
||||
for _, storage := range f.secondaries {
|
||||
secondaries = append(secondaries, storage.AppenderV2(ctx))
|
||||
}
|
||||
return &fanoutAppenderV2{
|
||||
logger: f.logger,
|
||||
primary: primary,
|
||||
secondaries: secondaries,
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the storage and all its underlying resources.
|
||||
func (f *fanout) Close() error {
|
||||
errs := []error{
|
||||
|
|
@ -276,5 +289,59 @@ func (f *fanoutAppender) Rollback() (err error) {
|
|||
f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
type fanoutAppenderV2 struct {
|
||||
logger *slog.Logger
|
||||
|
||||
primary AppenderV2
|
||||
secondaries []AppenderV2
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Append(ref SeriesRef, l labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AOptions) (SeriesRef, error) {
|
||||
ref, err := f.primary.Append(ref, l, st, t, v, h, fh, opts)
|
||||
var partialErr AppendPartialError
|
||||
if partialErr.Handle(err) != nil {
|
||||
return ref, err
|
||||
}
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if _, err := appender.Append(ref, l, st, t, v, h, fh, opts); err != nil {
|
||||
if partialErr.Handle(err) != nil {
|
||||
return ref, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return ref, partialErr.ErrOrNil()
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Commit() (err error) {
|
||||
err = f.primary.Commit()
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
if err == nil {
|
||||
err = appender.Commit()
|
||||
} else {
|
||||
if rollbackErr := appender.Rollback(); rollbackErr != nil {
|
||||
f.logger.Error("Squashed rollback error on commit", "err", rollbackErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fanoutAppenderV2) Rollback() (err error) {
|
||||
err = f.primary.Rollback()
|
||||
|
||||
for _, appender := range f.secondaries {
|
||||
rollbackErr := appender.Rollback()
|
||||
switch {
|
||||
case err == nil:
|
||||
err = rollbackErr
|
||||
case rollbackErr != nil:
|
||||
f.logger.Error("Squashed rollback error on rollback", "err", rollbackErr)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,11 +21,14 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
"github.com/prometheus/prometheus/util/teststorage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
func TestFanout_SelectSorted(t *testing.T) {
|
||||
|
|
@ -132,6 +135,115 @@ func TestFanout_SelectSorted(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestFanout_SelectSorted_AppenderV2(t *testing.T) {
|
||||
inputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
||||
outputLabel := labels.FromStrings(model.MetricNameLabel, "a")
|
||||
|
||||
inputTotalSize := 0
|
||||
|
||||
priStorage := teststorage.New(t)
|
||||
defer priStorage.Close()
|
||||
app1 := priStorage.AppenderV2(t.Context())
|
||||
_, err := app1.Append(0, inputLabel, 0, 0, 0, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
_, err = app1.Append(0, inputLabel, 0, 1000, 1, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
_, err = app1.Append(0, inputLabel, 0, 2000, 2, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
require.NoError(t, app1.Commit())
|
||||
|
||||
remoteStorage1 := teststorage.New(t)
|
||||
defer remoteStorage1.Close()
|
||||
app2 := remoteStorage1.AppenderV2(t.Context())
|
||||
_, err = app2.Append(0, inputLabel, 0, 3000, 3, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
_, err = app2.Append(0, inputLabel, 0, 4000, 4, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
_, err = app2.Append(0, inputLabel, 0, 5000, 5, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
require.NoError(t, app2.Commit())
|
||||
|
||||
remoteStorage2 := teststorage.New(t)
|
||||
defer remoteStorage2.Close()
|
||||
|
||||
app3 := remoteStorage2.AppenderV2(t.Context())
|
||||
_, err = app3.Append(0, inputLabel, 0, 6000, 6, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
_, err = app3.Append(0, inputLabel, 0, 7000, 7, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
_, err = app3.Append(0, inputLabel, 0, 8000, 8, nil, nil, storage.AOptions{})
|
||||
require.NoError(t, err)
|
||||
inputTotalSize++
|
||||
|
||||
require.NoError(t, app3.Commit())
|
||||
|
||||
fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2)
|
||||
|
||||
t.Run("querier", func(t *testing.T) {
|
||||
querier, err := fanoutStorage.Querier(0, 8000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := querier.Select(t.Context(), true, nil, matcher)
|
||||
|
||||
result := make(map[int64]float64)
|
||||
var labelsResult labels.Labels
|
||||
var iterator chunkenc.Iterator
|
||||
for seriesSet.Next() {
|
||||
series := seriesSet.At()
|
||||
seriesLabels := series.Labels()
|
||||
labelsResult = seriesLabels
|
||||
iterator := series.Iterator(iterator)
|
||||
for iterator.Next() == chunkenc.ValFloat {
|
||||
timestamp, value := iterator.At()
|
||||
result[timestamp] = value
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, labelsResult, outputLabel)
|
||||
require.Len(t, result, inputTotalSize)
|
||||
})
|
||||
t.Run("chunk querier", func(t *testing.T) {
|
||||
querier, err := fanoutStorage.ChunkQuerier(0, 8000)
|
||||
require.NoError(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
||||
require.NoError(t, err)
|
||||
|
||||
seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(t.Context(), true, nil, matcher))
|
||||
|
||||
result := make(map[int64]float64)
|
||||
var labelsResult labels.Labels
|
||||
var iterator chunkenc.Iterator
|
||||
for seriesSet.Next() {
|
||||
series := seriesSet.At()
|
||||
seriesLabels := series.Labels()
|
||||
labelsResult = seriesLabels
|
||||
iterator := series.Iterator(iterator)
|
||||
for iterator.Next() == chunkenc.ValFloat {
|
||||
timestamp, value := iterator.At()
|
||||
result[timestamp] = value
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, seriesSet.Err())
|
||||
require.Equal(t, labelsResult, outputLabel)
|
||||
require.Len(t, result, inputTotalSize)
|
||||
})
|
||||
}
|
||||
|
||||
func TestFanoutErrors(t *testing.T) {
|
||||
workingStorage := teststorage.New(t)
|
||||
defer workingStorage.Close()
|
||||
|
|
@ -224,9 +336,10 @@ type errChunkQuerier struct{ errQuerier }
|
|||
func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) {
|
||||
return errChunkQuerier{}, nil
|
||||
}
|
||||
func (errStorage) Appender(context.Context) storage.Appender { return nil }
|
||||
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
||||
func (errStorage) Close() error { return nil }
|
||||
func (errStorage) Appender(context.Context) storage.Appender { return nil }
|
||||
func (errStorage) AppenderV2(context.Context) storage.AppenderV2 { return nil }
|
||||
func (errStorage) StartTime() (int64, error) { return 0, nil }
|
||||
func (errStorage) Close() error { return nil }
|
||||
|
||||
func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
|
||||
return storage.ErrSeriesSet(errSelect)
|
||||
|
|
@ -245,3 +358,216 @@ func (errQuerier) Close() error { return nil }
|
|||
func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
return storage.ErrChunkSeriesSet(errSelect)
|
||||
}
|
||||
|
||||
type mockStorage struct {
|
||||
app storage.Appendable
|
||||
appV2 storage.AppendableV2
|
||||
storage.Storage
|
||||
}
|
||||
|
||||
func (m mockStorage) Appender(ctx context.Context) storage.Appender {
|
||||
return m.app.Appender(ctx)
|
||||
}
|
||||
|
||||
func (m mockStorage) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
||||
return m.appV2.AppenderV2(ctx)
|
||||
}
|
||||
|
||||
type sample = teststorage.Sample
|
||||
|
||||
func withoutExemplars(s []sample) (ret []sample) {
|
||||
ret = make([]sample, len(s))
|
||||
copy(ret, s)
|
||||
for i := range ret {
|
||||
ret[i].ES = nil
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
type fanoutAppenderTestCase struct {
|
||||
name string
|
||||
primary *teststorage.Appendable
|
||||
secondary *teststorage.Appendable
|
||||
|
||||
expectAppendErr bool
|
||||
expectExemplarError bool
|
||||
expectCommitError bool
|
||||
|
||||
expectPrimarySamples []sample
|
||||
expectSecondarySamples []sample
|
||||
}
|
||||
|
||||
func fanoutAppenderTestCases(expected []sample) []fanoutAppenderTestCase {
|
||||
appErr := errors.New("append test error")
|
||||
exErr := errors.New("exemplar test error")
|
||||
commitErr := errors.New("commit test error")
|
||||
|
||||
return []fanoutAppenderTestCase{
|
||||
{
|
||||
name: "both works",
|
||||
primary: teststorage.NewAppendable(),
|
||||
secondary: teststorage.NewAppendable(),
|
||||
|
||||
expectPrimarySamples: expected,
|
||||
expectSecondarySamples: expected,
|
||||
},
|
||||
{
|
||||
name: "primary errors",
|
||||
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
|
||||
secondary: teststorage.NewAppendable(),
|
||||
|
||||
expectAppendErr: true,
|
||||
expectExemplarError: true,
|
||||
expectCommitError: true,
|
||||
},
|
||||
{
|
||||
name: "exemplar errors",
|
||||
primary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
|
||||
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return nil }, exErr, nil),
|
||||
|
||||
expectAppendErr: false,
|
||||
expectExemplarError: true,
|
||||
expectCommitError: false,
|
||||
|
||||
expectPrimarySamples: withoutExemplars(expected),
|
||||
expectSecondarySamples: withoutExemplars(expected),
|
||||
},
|
||||
{
|
||||
name: "secondary errors",
|
||||
primary: teststorage.NewAppendable(),
|
||||
secondary: teststorage.NewAppendable().WithErrs(func(labels.Labels) error { return appErr }, exErr, commitErr),
|
||||
|
||||
expectAppendErr: true,
|
||||
expectExemplarError: true,
|
||||
expectCommitError: true,
|
||||
|
||||
expectPrimarySamples: expected,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestFanoutAppender(t *testing.T) {
|
||||
h := tsdbutil.GenerateTestHistogram(0)
|
||||
fh := tsdbutil.GenerateTestFloatHistogram(0)
|
||||
ex := exemplar.Exemplar{Value: 1}
|
||||
|
||||
expected := []sample{
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), V: 1, ES: []exemplar.Exemplar{ex}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), T: 1, H: h},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), T: 2, FH: fh},
|
||||
}
|
||||
for _, tt := range fanoutAppenderTestCases(expected) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
f := storage.NewFanout(nil, mockStorage{app: tt.primary}, mockStorage{app: tt.secondary})
|
||||
|
||||
app := f.Appender(t.Context())
|
||||
ref, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), 0, 1)
|
||||
if tt.expectAppendErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = app.AppendExemplar(ref, labels.FromStrings(model.MetricNameLabel, "metric1"), ex)
|
||||
if tt.expectExemplarError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric2"), 1, h, nil)
|
||||
if tt.expectAppendErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = app.AppendHistogram(0, labels.FromStrings(model.MetricNameLabel, "metric3"), 2, nil, fh)
|
||||
if tt.expectAppendErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err = app.Commit()
|
||||
if tt.expectCommitError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Nil(t, tt.primary.PendingSamples())
|
||||
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
|
||||
require.Nil(t, tt.primary.RolledbackSamples())
|
||||
|
||||
require.Nil(t, tt.secondary.PendingSamples())
|
||||
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
|
||||
require.Nil(t, tt.secondary.RolledbackSamples())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFanoutAppenderV2(t *testing.T) {
|
||||
h := tsdbutil.GenerateTestHistogram(0)
|
||||
fh := tsdbutil.GenerateTestFloatHistogram(0)
|
||||
ex := exemplar.Exemplar{Value: 1}
|
||||
|
||||
expected := []sample{
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "metric1"), ST: -1, V: 1, ES: []exemplar.Exemplar{ex}},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "metric2"), ST: -2, T: 1, H: h},
|
||||
{L: labels.FromStrings(model.MetricNameLabel, "metric3"), ST: -3, T: 2, FH: fh},
|
||||
}
|
||||
|
||||
for _, tt := range fanoutAppenderTestCases(expected) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
f := storage.NewFanout(nil, mockStorage{appV2: tt.primary}, mockStorage{appV2: tt.secondary})
|
||||
|
||||
app := f.AppenderV2(t.Context())
|
||||
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric1"), -1, 0, 1, nil, nil, storage.AOptions{
|
||||
Exemplars: []exemplar.Exemplar{ex},
|
||||
})
|
||||
switch {
|
||||
case tt.expectAppendErr:
|
||||
require.Error(t, err)
|
||||
case tt.expectExemplarError:
|
||||
var pErr *storage.AppendPartialError
|
||||
require.ErrorAs(t, err, &pErr)
|
||||
// One for primary, one for secondary.
|
||||
// This is because in V2 flow we must append sample even when first append partially failed with exemplars.
|
||||
// Filtering out exemplars is neither feasible, nor important.
|
||||
require.Len(t, pErr.ExemplarErrors, 2)
|
||||
default:
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric2"), -2, 1, 0, h, nil, storage.AOptions{})
|
||||
if tt.expectAppendErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = app.Append(0, labels.FromStrings(model.MetricNameLabel, "metric3"), -3, 2, 0, nil, fh, storage.AOptions{})
|
||||
if tt.expectAppendErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err = app.Commit()
|
||||
if tt.expectCommitError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Nil(t, tt.primary.PendingSamples())
|
||||
testutil.RequireEqual(t, tt.expectPrimarySamples, tt.primary.ResultSamples())
|
||||
require.Nil(t, tt.primary.RolledbackSamples())
|
||||
|
||||
require.Nil(t, tt.secondary.PendingSamples())
|
||||
testutil.RequireEqual(t, tt.expectSecondarySamples, tt.secondary.ResultSamples())
|
||||
require.Nil(t, tt.secondary.RolledbackSamples())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,7 +61,8 @@ type SeriesRef uint64
|
|||
|
||||
// Appendable allows creating Appender.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
type Appendable interface {
|
||||
// Appender returns a new appender for the storage.
|
||||
//
|
||||
|
|
@ -77,10 +78,16 @@ type SampleAndChunkQueryable interface {
|
|||
}
|
||||
|
||||
// Storage ingests and manages samples, along with various indexes. All methods
|
||||
// are goroutine-safe. Storage implements storage.Appender.
|
||||
// are goroutine-safe.
|
||||
type Storage interface {
|
||||
SampleAndChunkQueryable
|
||||
|
||||
// Appendable allows appending to storage.
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appendable will be removed soon (ETA: Q2 2026).
|
||||
Appendable
|
||||
// AppendableV2 allows appending to storage.
|
||||
AppendableV2
|
||||
|
||||
// StartTime returns the oldest timestamp stored in the storage.
|
||||
StartTime() (int64, error)
|
||||
|
|
@ -261,7 +268,8 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
|
|||
|
||||
// AppendOptions provides options for implementations of the Appender interface.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// AppendOptions will be removed soon (ETA: Q2 2026).
|
||||
type AppendOptions struct {
|
||||
// DiscardOutOfOrder tells implementation that this append should not be out
|
||||
// of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample
|
||||
|
|
@ -278,7 +286,8 @@ type AppendOptions struct {
|
|||
// I.e. timestamp order within batch is not validated, samples are not reordered per timestamp or by float/histogram
|
||||
// type.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// Appender will be removed soon (ETA: Q2 2026).
|
||||
type Appender interface {
|
||||
AppenderTransaction
|
||||
|
||||
|
|
@ -315,7 +324,8 @@ type GetRef interface {
|
|||
// ExemplarAppender provides an interface for adding samples to exemplar storage, which
|
||||
// within Prometheus is in-memory only.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// ExemplarAppender will be removed soon (ETA: Q2 2026).
|
||||
type ExemplarAppender interface {
|
||||
// AppendExemplar adds an exemplar for the given series labels.
|
||||
// An optional reference number can be provided to accelerate calls.
|
||||
|
|
@ -333,7 +343,8 @@ type ExemplarAppender interface {
|
|||
|
||||
// HistogramAppender provides an interface for appending histograms to the storage.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// HistogramAppender will be removed soon (ETA: Q2 2026).
|
||||
type HistogramAppender interface {
|
||||
// AppendHistogram adds a histogram for the given series labels. An
|
||||
// optional reference number can be provided to accelerate calls. A
|
||||
|
|
@ -365,7 +376,8 @@ type HistogramAppender interface {
|
|||
|
||||
// MetadataUpdater provides an interface for associating metadata to stored series.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// MetadataUpdater will be removed soon (ETA: Q2 2026).
|
||||
type MetadataUpdater interface {
|
||||
// UpdateMetadata updates a metadata entry for the given series and labels.
|
||||
// A series reference number is returned which can be used to modify the
|
||||
|
|
@ -379,7 +391,8 @@ type MetadataUpdater interface {
|
|||
|
||||
// StartTimestampAppender provides an interface for appending ST to storage.
|
||||
//
|
||||
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
|
||||
// WARNING(bwplotka): Switch to AppendableV2 is in progress (https://github.com/prometheus/prometheus/issues/17632).
|
||||
// StartTimestampAppender will be removed soon (ETA: Q2 2026).
|
||||
type StartTimestampAppender interface {
|
||||
// AppendSTZeroSample adds synthetic zero sample for the given st timestamp,
|
||||
// which will be associated with given series, labels and the incoming
|
||||
|
|
|
|||
|
|
@ -69,6 +69,7 @@ type AppendV2Options struct {
|
|||
// Exemplars (optional) attached to the appended sample.
|
||||
// Exemplar slice MUST be sorted by Exemplar.TS.
|
||||
// Exemplar slice is unsafe for reuse.
|
||||
// Duplicate exemplars errors MUST be ignored by implementations.
|
||||
Exemplars []exemplar.Exemplar
|
||||
|
||||
// RejectOutOfOrder tells implementation that this append should not be out
|
||||
|
|
@ -96,6 +97,31 @@ func (e *AppendPartialError) Error() string {
|
|||
return errs.Error()
|
||||
}
|
||||
|
||||
// ErrOrNil returns AppendPartialError as error, returning nil
|
||||
// if there are no errors.
|
||||
func (e *AppendPartialError) ErrOrNil() error {
|
||||
if len(e.ExemplarErrors) == 0 {
|
||||
return nil
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Handle handles the given err that may be an AppendPartialError.
|
||||
// If the err is nil or not an AppendPartialError it returns err.
|
||||
// Otherwise, partial errors are aggregated.
|
||||
func (e *AppendPartialError) Handle(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var pErr *AppendPartialError
|
||||
if !errors.As(err, &pErr) {
|
||||
return err
|
||||
}
|
||||
e.ExemplarErrors = append(e.ExemplarErrors, pErr.ExemplarErrors...)
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ error = &AppendPartialError{}
|
||||
|
||||
// AppenderV2 provides appends against a storage for all types of samples.
|
||||
|
|
|
|||
|
|
@ -63,6 +63,8 @@ type Storage struct {
|
|||
localStartTimeCallback startTimeCallback
|
||||
}
|
||||
|
||||
var _ storage.Storage = &Storage{}
|
||||
|
||||
// NewStorage returns a remote.Storage.
|
||||
func NewStorage(l *slog.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, enableTypeAndUnitLabels bool) *Storage {
|
||||
if l == nil {
|
||||
|
|
@ -193,6 +195,11 @@ func (s *Storage) Appender(ctx context.Context) storage.Appender {
|
|||
return s.rws.Appender(ctx)
|
||||
}
|
||||
|
||||
// AppenderV2 implements storage.Storage.
|
||||
func (s *Storage) AppenderV2(ctx context.Context) storage.AppenderV2 {
|
||||
return s.rws.AppenderV2(ctx)
|
||||
}
|
||||
|
||||
// LowestSentTimestamp returns the lowest sent timestamp across all queues.
|
||||
func (s *Storage) LowestSentTimestamp() int64 {
|
||||
return s.rws.LowestSentTimestamp()
|
||||
|
|
|
|||
|
|
@ -238,8 +238,20 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
|
|||
// Appender implements storage.Storage.
|
||||
func (rws *WriteStorage) Appender(context.Context) storage.Appender {
|
||||
return ×tampTracker{
|
||||
writeStorage: rws,
|
||||
highestRecvTimestamp: rws.highestTimestamp,
|
||||
baseTimestampTracker: baseTimestampTracker{
|
||||
writeStorage: rws,
|
||||
highestRecvTimestamp: rws.highestTimestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// AppenderV2 implements storage.Storage.
|
||||
func (rws *WriteStorage) AppenderV2(context.Context) storage.AppenderV2 {
|
||||
return ×tampTrackerV2{
|
||||
baseTimestampTracker: baseTimestampTracker{
|
||||
writeStorage: rws,
|
||||
highestRecvTimestamp: rws.highestTimestamp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -282,9 +294,9 @@ func (rws *WriteStorage) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type timestampTracker struct {
|
||||
writeStorage *WriteStorage
|
||||
appendOptions *storage.AppendOptions
|
||||
type baseTimestampTracker struct {
|
||||
writeStorage *WriteStorage
|
||||
|
||||
samples int64
|
||||
exemplars int64
|
||||
histograms int64
|
||||
|
|
@ -292,6 +304,12 @@ type timestampTracker struct {
|
|||
highestRecvTimestamp *maxTimestamp
|
||||
}
|
||||
|
||||
type timestampTracker struct {
|
||||
baseTimestampTracker
|
||||
|
||||
appendOptions *storage.AppendOptions
|
||||
}
|
||||
|
||||
func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) {
|
||||
t.appendOptions = opts
|
||||
}
|
||||
|
|
@ -345,7 +363,7 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada
|
|||
}
|
||||
|
||||
// Commit implements storage.Appender.
|
||||
func (t *timestampTracker) Commit() error {
|
||||
func (t *baseTimestampTracker) Commit() error {
|
||||
t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)
|
||||
|
||||
samplesIn.Add(float64(t.samples))
|
||||
|
|
@ -356,6 +374,25 @@ func (t *timestampTracker) Commit() error {
|
|||
}
|
||||
|
||||
// Rollback implements storage.Appender.
|
||||
func (*timestampTracker) Rollback() error {
|
||||
func (*baseTimestampTracker) Rollback() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type timestampTrackerV2 struct {
|
||||
baseTimestampTracker
|
||||
}
|
||||
|
||||
// Append implements storage.AppenderV2.
|
||||
func (t *timestampTrackerV2) Append(ref storage.SeriesRef, _ labels.Labels, _, ts int64, _ float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
|
||||
switch {
|
||||
case fh != nil, h != nil:
|
||||
t.histograms++
|
||||
default:
|
||||
t.samples++
|
||||
}
|
||||
if ts > t.highestTimestamp {
|
||||
t.highestTimestamp = ts
|
||||
}
|
||||
t.exemplars += int64(len(opts.Exemplars))
|
||||
return ref, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -323,6 +323,7 @@ func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemp
|
|||
if err := a.head.exemplars.ValidateExemplar(s.labels(), e); err != nil {
|
||||
if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) {
|
||||
// Except duplicates, return partial errors.
|
||||
// TODO(bwplotka): Add exemplar info into error.
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
|
|
|
|||
|
|
@ -374,7 +374,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
|
|||
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
|
||||
i := len(a.a.pendingSamples) - 1
|
||||
for ; i >= 0; i-- { // Attach exemplars to the last matching sample.
|
||||
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
|
||||
if labels.Equal(l, a.a.pendingSamples[i].L) {
|
||||
a.a.pendingSamples[i].ES = append(a.a.pendingSamples[i].ES, e)
|
||||
appended = true
|
||||
break
|
||||
|
|
@ -415,7 +415,7 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta
|
|||
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
|
||||
i := len(a.a.pendingSamples) - 1
|
||||
for ; i >= 0; i-- { // Attach metadata to the last matching sample.
|
||||
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
|
||||
if labels.Equal(l, a.a.pendingSamples[i].L) {
|
||||
a.a.pendingSamples[i].M = m
|
||||
updated = true
|
||||
break
|
||||
|
|
@ -464,13 +464,28 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
var es []exemplar.Exemplar
|
||||
var (
|
||||
es []exemplar.Exemplar
|
||||
partialErr error
|
||||
)
|
||||
|
||||
if len(opts.Exemplars) > 0 {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
} else {
|
||||
// As per AppenderV2 interface, opts.Exemplar slice is unsafe for reuse.
|
||||
es = make([]exemplar.Exemplar, len(opts.Exemplars))
|
||||
copy(es, opts.Exemplars)
|
||||
}
|
||||
}
|
||||
|
||||
a.a.mtx.Lock()
|
||||
a.a.pendingSamples = append(a.a.pendingSamples, Sample{
|
||||
MF: opts.MetricFamilyName,
|
||||
M: opts.Metadata,
|
||||
|
|
@ -481,17 +496,6 @@ func (a *appenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64
|
|||
})
|
||||
a.a.mtx.Unlock()
|
||||
|
||||
var partialErr error
|
||||
if a.a.appendExemplarsError != nil {
|
||||
var exErrs []error
|
||||
for range opts.Exemplars {
|
||||
exErrs = append(exErrs, a.a.appendExemplarsError)
|
||||
}
|
||||
if len(exErrs) > 0 {
|
||||
partialErr = &storage.AppendPartialError{ExemplarErrors: exErrs}
|
||||
}
|
||||
}
|
||||
|
||||
if a.next != nil {
|
||||
ref, err = a.next.Append(ref, ls, st, t, v, h, fh, opts)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue