diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go deleted file mode 100644 index 883b8d3142..0000000000 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go +++ /dev/null @@ -1,244 +0,0 @@ -// Copyright The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// TODO(krajorama): rename this package to otlpappender or similar, as it is -// not specific to Prometheus remote write anymore. -// Note otlptranslator is already used by prometheus/otlptranslator repo. -package prometheusremotewrite - -import ( - "errors" - "fmt" - "log/slog" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/storage" -) - -// Metadata extends metadata.Metadata with the metric family name. -// OTLP calculates the metric family name for all metrics and uses -// it for generating summary, histogram series by adding the magic -// suffixes. The metric family name is passed down to the appender -// in case the storage needs it for metadata updates. -// Known user is Mimir that implements /api/v1/metadata and uses -// Remote-Write 1.0 for this. Might be removed later if no longer -// needed by any downstream project. -type Metadata struct { - metadata.Metadata - MetricFamilyName string -} - -// CombinedAppender is similar to storage.Appender, but combines updates to -// metadata, created timestamps, exemplars and samples into a single call. -type CombinedAppender interface { - // AppendSample appends a sample and related exemplars, metadata, and - // created timestamp to the storage. - AppendSample(ls labels.Labels, meta Metadata, st, t int64, v float64, es []exemplar.Exemplar) error - // AppendHistogram appends a histogram and related exemplars, metadata, and - // created timestamp to the storage. - AppendHistogram(ls labels.Labels, meta Metadata, st, t int64, h *histogram.Histogram, es []exemplar.Exemplar) error -} - -// CombinedAppenderMetrics is for the metrics observed by the -// combinedAppender implementation. -type CombinedAppenderMetrics struct { - samplesAppendedWithoutMetadata prometheus.Counter - outOfOrderExemplars prometheus.Counter -} - -func NewCombinedAppenderMetrics(reg prometheus.Registerer) CombinedAppenderMetrics { - return CombinedAppenderMetrics{ - samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "api", - Name: "otlp_appended_samples_without_metadata_total", - Help: "The total number of samples ingested from OTLP without corresponding metadata.", - }), - outOfOrderExemplars: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Namespace: "prometheus", - Subsystem: "api", - Name: "otlp_out_of_order_exemplars_total", - Help: "The total number of received OTLP exemplars which were rejected because they were out of order.", - }), - } -} - -// NewCombinedAppender creates a combined appender that sets start times and -// updates metadata for each series only once, and appends samples and -// exemplars for each call. -func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestSTZeroSample, appendMetadata bool, metrics CombinedAppenderMetrics) CombinedAppender { - return &combinedAppender{ - app: app, - logger: logger, - ingestSTZeroSample: ingestSTZeroSample, - appendMetadata: appendMetadata, - refs: make(map[uint64]seriesRef), - samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata, - outOfOrderExemplars: metrics.outOfOrderExemplars, - } -} - -type seriesRef struct { - ref storage.SeriesRef - st int64 - ls labels.Labels - meta metadata.Metadata -} - -type combinedAppender struct { - app storage.Appender - logger *slog.Logger - samplesAppendedWithoutMetadata prometheus.Counter - outOfOrderExemplars prometheus.Counter - ingestSTZeroSample bool - appendMetadata bool - // Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs. - // To detect hash collision it also stores the labels. - // There is no overflow/conflict list, the TSDB will handle that part. - refs map[uint64]seriesRef -} - -func (b *combinedAppender) AppendSample(ls labels.Labels, meta Metadata, st, t int64, v float64, es []exemplar.Exemplar) (err error) { - return b.appendFloatOrHistogram(ls, meta.Metadata, st, t, v, nil, es) -} - -func (b *combinedAppender) AppendHistogram(ls labels.Labels, meta Metadata, st, t int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { - if h == nil { - // Sanity check, we should never get here with a nil histogram. - b.logger.Error("Received nil histogram in CombinedAppender.AppendHistogram", "series", ls.String()) - return errors.New("internal error, attempted to append nil histogram") - } - return b.appendFloatOrHistogram(ls, meta.Metadata, st, t, 0, h, es) -} - -func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadata.Metadata, st, t int64, v float64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { - hash := ls.Hash() - series, exists := b.refs[hash] - ref := series.ref - if exists && !labels.Equal(series.ls, ls) { - // Hash collision. The series reference we stored is pointing to a - // different series so we cannot use it, we need to reset the - // reference and cache. - // Note: we don't need to keep track of conflicts here, - // the TSDB will handle that part when we pass 0 reference. - exists = false - ref = 0 - } - updateRefs := !exists || series.st != st - if updateRefs && st != 0 && st < t && b.ingestSTZeroSample { - var newRef storage.SeriesRef - if h != nil { - newRef, err = b.app.AppendHistogramSTZeroSample(ref, ls, t, st, h, nil) - } else { - newRef, err = b.app.AppendSTZeroSample(ref, ls, t, st) - } - if err != nil { - if !errors.Is(err, storage.ErrOutOfOrderST) && !errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { - // Even for the first sample OOO is a common scenario because - // we can't tell if a ST was already ingested in a previous request. - // We ignore the error. - // ErrDuplicateSampleForTimestamp is also a common scenario because - // unknown start times in Opentelemetry are indicated by setting - // the start time to the same as the first sample time. - // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#cumulative-streams-handling-unknown-start-time - b.logger.Warn("Error when appending ST from OTLP", "err", err, "series", ls.String(), "start_timestamp", st, "timestamp", t, "sample_type", sampleType(h)) - } - } else { - // We only use the returned reference on success as otherwise an - // error of ST append could invalidate the series reference. - ref = newRef - } - } - { - var newRef storage.SeriesRef - if h != nil { - newRef, err = b.app.AppendHistogram(ref, ls, t, h, nil) - } else { - newRef, err = b.app.Append(ref, ls, t, v) - } - if err != nil { - // Although Append does not currently return ErrDuplicateSampleForTimestamp there is - // a note indicating its inclusion in the future. - if errors.Is(err, storage.ErrOutOfOrderSample) || - errors.Is(err, storage.ErrOutOfBounds) || - errors.Is(err, storage.ErrDuplicateSampleForTimestamp) { - b.logger.Error("Error when appending sample from OTLP", "err", err.Error(), "series", ls.String(), "timestamp", t, "sample_type", sampleType(h)) - } - } else { - // If the append was successful, we can use the returned reference. - ref = newRef - } - } - - if ref == 0 { - // We cannot update metadata or add exemplars on non existent series. - return err - } - - metadataChanged := exists && (series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit) - - // Update cache if references changed or metadata changed. - if updateRefs || metadataChanged { - b.refs[hash] = seriesRef{ - ref: ref, - st: st, - ls: ls, - meta: meta, - } - } - - // Update metadata in storage if enabled and needed. - if b.appendMetadata && (!exists || metadataChanged) { - // Only update metadata in WAL if the metadata-wal-records feature is enabled. - // Without this feature, metadata is not persisted to WAL. - _, err := b.app.UpdateMetadata(ref, ls, meta) - if err != nil { - b.samplesAppendedWithoutMetadata.Add(1) - b.logger.Warn("Error while updating metadata from OTLP", "err", err) - } - } - - b.appendExemplars(ref, ls, es) - - return err -} - -func sampleType(h *histogram.Histogram) string { - if h == nil { - return "float" - } - return "histogram" -} - -func (b *combinedAppender) appendExemplars(ref storage.SeriesRef, ls labels.Labels, es []exemplar.Exemplar) storage.SeriesRef { - var err error - for _, e := range es { - if ref, err = b.app.AppendExemplar(ref, ls, e); err != nil { - switch { - case errors.Is(err, storage.ErrOutOfOrderExemplar): - b.outOfOrderExemplars.Add(1) - b.logger.Debug("Out of order exemplar from OTLP", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e)) - default: - // Since exemplar storage is still experimental, we don't fail the request on ingestion errors - b.logger.Debug("Error while adding exemplar from OTLP", "series", ls.String(), "exemplar", fmt.Sprintf("%+v", e), "err", err) - } - } - } - return ref -} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go index a1a17fe82b..69d11ed6bd 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender_test.go @@ -14,31 +14,22 @@ package prometheusremotewrite import ( - "bytes" - "context" "errors" - "fmt" - "math" "testing" - "time" "github.com/google/go-cmp/cmp" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/common/promslog" - "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/testutil" ) +// TODO(bwplotka): Move to teststorage.Appendable. This require slight refactor of tests and I couldn't do this before +// switching to AppenderV2 (I would need to adjust AppenderV1 mock exemplar flow which is pointless since we don't plan +// to use it). For now keeping tests diff small for confidence. type mockCombinedAppender struct { pendingSamples []combinedSample pendingHistograms []combinedHistogram @@ -67,30 +58,29 @@ type combinedHistogram struct { es []exemplar.Exemplar } -func (m *mockCombinedAppender) AppendSample(ls labels.Labels, meta Metadata, st, t int64, v float64, es []exemplar.Exemplar) error { +func (m *mockCombinedAppender) Append(_ storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, _ *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) { + if h != nil { + m.pendingHistograms = append(m.pendingHistograms, combinedHistogram{ + metricFamilyName: opts.MetricFamilyName, + ls: ls, + meta: opts.Metadata, + t: t, + st: st, + h: h, + es: opts.Exemplars, + }) + return 0, nil + } m.pendingSamples = append(m.pendingSamples, combinedSample{ - metricFamilyName: meta.MetricFamilyName, + metricFamilyName: opts.MetricFamilyName, ls: ls, - meta: meta.Metadata, + meta: opts.Metadata, t: t, st: st, v: v, - es: es, + es: opts.Exemplars, }) - return nil -} - -func (m *mockCombinedAppender) AppendHistogram(ls labels.Labels, meta Metadata, st, t int64, h *histogram.Histogram, es []exemplar.Exemplar) error { - m.pendingHistograms = append(m.pendingHistograms, combinedHistogram{ - metricFamilyName: meta.MetricFamilyName, - ls: ls, - meta: meta.Metadata, - t: t, - st: st, - h: h, - es: es, - }) - return nil + return 0, nil } func (m *mockCombinedAppender) Commit() error { @@ -101,837 +91,10 @@ func (m *mockCombinedAppender) Commit() error { return nil } +func (*mockCombinedAppender) Rollback() error { + return errors.New("not implemented") +} + func requireEqual(t testing.TB, expected, actual any, msgAndArgs ...any) { testutil.RequireEqualWithOptions(t, expected, actual, []cmp.Option{cmp.AllowUnexported(combinedSample{}, combinedHistogram{})}, msgAndArgs...) } - -// TestCombinedAppenderOnTSDB runs some basic tests on a real TSDB to check -// that the combinedAppender works on a real TSDB. -func TestCombinedAppenderOnTSDB(t *testing.T) { - t.Run("ingestSTZeroSample=false", func(t *testing.T) { testCombinedAppenderOnTSDB(t, false) }) - - t.Run("ingestSTZeroSample=true", func(t *testing.T) { testCombinedAppenderOnTSDB(t, true) }) -} - -func testCombinedAppenderOnTSDB(t *testing.T, ingestSTZeroSample bool) { - t.Helper() - - now := time.Now() - - testExemplars := []exemplar.Exemplar{ - { - Labels: labels.FromStrings("tracid", "122"), - Value: 1337, - }, - { - Labels: labels.FromStrings("tracid", "132"), - Value: 7777, - }, - } - expectedExemplars := []exemplar.QueryResult{ - { - SeriesLabels: labels.FromStrings( - model.MetricNameLabel, "test_bytes_total", - "foo", "bar", - ), - Exemplars: testExemplars, - }, - } - - seriesLabels := labels.FromStrings( - model.MetricNameLabel, "test_bytes_total", - "foo", "bar", - ) - floatMetadata := Metadata{ - Metadata: metadata.Metadata{ - Type: model.MetricTypeCounter, - Unit: "bytes", - Help: "some help", - }, - MetricFamilyName: "test_bytes_total", - } - - histogramMetadata := Metadata{ - Metadata: metadata.Metadata{ - Type: model.MetricTypeHistogram, - Unit: "bytes", - Help: "some help", - }, - MetricFamilyName: "test_bytes", - } - - testCases := map[string]struct { - appendFunc func(*testing.T, CombinedAppender) - extraAppendFunc func(*testing.T, CombinedAppender) - expectedSamples []sample - expectedExemplars []exemplar.QueryResult - expectedLogsForST []string - }{ - "single float sample, zero ST": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, 0, now.UnixMilli(), 42.0, testExemplars)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - f: 42.0, - }, - }, - expectedExemplars: expectedExemplars, - }, - "single float sample, very old ST": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, 1, now.UnixMilli(), 42.0, nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - f: 42.0, - }, - }, - expectedLogsForST: []string{ - "Error when appending ST from OTLP", - "out of bound", - }, - }, - "single float sample, normal ST": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(-2*time.Minute).UnixMilli(), now.UnixMilli(), 42.0, nil)) - }, - expectedSamples: []sample{ - { - stZero: true, - t: now.Add(-2 * time.Minute).UnixMilli(), - }, - { - t: now.UnixMilli(), - f: 42.0, - }, - }, - }, - "single float sample, ST same time as sample": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), 42.0, nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - f: 42.0, - }, - }, - }, - "two float samples in different messages, ST same time as first sample": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), 42.0, nil)) - }, - extraAppendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), 43.0, nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - f: 42.0, - }, - { - t: now.Add(time.Second).UnixMilli(), - f: 43.0, - }, - }, - }, - "single float sample, ST in the future of the sample": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), 42.0, nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - f: 42.0, - }, - }, - }, - "single histogram sample, zero ST": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, 0, now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), testExemplars)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - h: tsdbutil.GenerateTestHistogram(42), - }, - }, - expectedExemplars: expectedExemplars, - }, - "single histogram sample, very old ST": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, 1, now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - h: tsdbutil.GenerateTestHistogram(42), - }, - }, - expectedLogsForST: []string{ - "Error when appending ST from OTLP", - "out of bound", - }, - }, - "single histogram sample, normal ST": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.Add(-2*time.Minute).UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) - }, - expectedSamples: []sample{ - { - stZero: true, - t: now.Add(-2 * time.Minute).UnixMilli(), - h: &histogram.Histogram{}, - }, - { - t: now.UnixMilli(), - h: tsdbutil.GenerateTestHistogram(42), - }, - }, - }, - "single histogram sample, ST same time as sample": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - h: tsdbutil.GenerateTestHistogram(42), - }, - }, - }, - "two histogram samples in different messages, ST same time as first sample": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) - }, - extraAppendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), tsdbutil.GenerateTestHistogram(43), nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - h: tsdbutil.GenerateTestHistogram(42), - }, - { - t: now.Add(time.Second).UnixMilli(), - h: tsdbutil.GenerateTestHistogram(43), - }, - }, - }, - "single histogram sample, ST in the future of the sample": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - h: tsdbutil.GenerateTestHistogram(42), - }, - }, - }, - "multiple float samples": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, 0, now.UnixMilli(), 42.0, nil)) - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, 0, now.Add(15*time.Second).UnixMilli(), 62.0, nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - f: 42.0, - }, - { - t: now.Add(15 * time.Second).UnixMilli(), - f: 62.0, - }, - }, - }, - "multiple histogram samples": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, 0, now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil)) - require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, 0, now.Add(15*time.Second).UnixMilli(), tsdbutil.GenerateTestHistogram(62), nil)) - }, - expectedSamples: []sample{ - { - t: now.UnixMilli(), - h: tsdbutil.GenerateTestHistogram(42), - }, - { - t: now.Add(15 * time.Second).UnixMilli(), - h: tsdbutil.GenerateTestHistogram(62), - }, - }, - }, - "float samples with ST changing": { - appendFunc: func(t *testing.T, app CombinedAppender) { - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(-4*time.Second).UnixMilli(), now.Add(-3*time.Second).UnixMilli(), 42.0, nil)) - require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(-1*time.Second).UnixMilli(), now.UnixMilli(), 62.0, nil)) - }, - expectedSamples: []sample{ - { - stZero: true, - t: now.Add(-4 * time.Second).UnixMilli(), - }, - { - t: now.Add(-3 * time.Second).UnixMilli(), - f: 42.0, - }, - { - stZero: true, - t: now.Add(-1 * time.Second).UnixMilli(), - }, - { - t: now.UnixMilli(), - f: 62.0, - }, - }, - }, - } - - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - var expectedLogs []string - if ingestSTZeroSample { - expectedLogs = append(expectedLogs, tc.expectedLogsForST...) - } - - dir := t.TempDir() - opts := tsdb.DefaultOptions() - opts.EnableExemplarStorage = true - opts.MaxExemplars = 100 - db, err := tsdb.Open(dir, promslog.NewNopLogger(), prometheus.NewRegistry(), opts, nil) - require.NoError(t, err) - - t.Cleanup(func() { db.Close() }) - - var output bytes.Buffer - logger := promslog.New(&promslog.Config{Writer: &output}) - - ctx := context.Background() - reg := prometheus.NewRegistry() - cappMetrics := NewCombinedAppenderMetrics(reg) - app := db.Appender(ctx) - capp := NewCombinedAppender(app, logger, ingestSTZeroSample, false, cappMetrics) - tc.appendFunc(t, capp) - require.NoError(t, app.Commit()) - - if tc.extraAppendFunc != nil { - app = db.Appender(ctx) - capp = NewCombinedAppender(app, logger, ingestSTZeroSample, false, cappMetrics) - tc.extraAppendFunc(t, capp) - require.NoError(t, app.Commit()) - } - - if len(expectedLogs) > 0 { - for _, expectedLog := range expectedLogs { - require.Contains(t, output.String(), expectedLog) - } - } else { - require.Empty(t, output.String(), "unexpected log output") - } - - q, err := db.Querier(int64(math.MinInt64), int64(math.MaxInt64)) - require.NoError(t, err) - - ss := q.Select(ctx, false, &storage.SelectHints{ - Start: int64(math.MinInt64), - End: int64(math.MaxInt64), - }, labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_bytes_total")) - - require.NoError(t, ss.Err()) - - require.True(t, ss.Next()) - series := ss.At() - it := series.Iterator(nil) - for i, sample := range tc.expectedSamples { - if !ingestSTZeroSample && sample.stZero { - continue - } - if sample.h == nil { - require.Equal(t, chunkenc.ValFloat, it.Next()) - ts, v := it.At() - require.Equal(t, sample.t, ts, "sample ts %d", i) - require.Equal(t, sample.f, v, "sample v %d", i) - } else { - require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, h := it.AtHistogram(nil) - require.Equal(t, sample.t, ts, "sample ts %d", i) - require.Equal(t, sample.h.Count, h.Count, "sample v %d", i) - } - } - require.False(t, ss.Next()) - - eq, err := db.ExemplarQuerier(ctx) - require.NoError(t, err) - exResult, err := eq.Select(int64(math.MinInt64), int64(math.MaxInt64), []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_bytes_total")}) - require.NoError(t, err) - if tc.expectedExemplars == nil { - tc.expectedExemplars = []exemplar.QueryResult{} - } - require.Equal(t, tc.expectedExemplars, exResult) - }) - } -} - -type sample struct { - stZero bool - - t int64 - f float64 - h *histogram.Histogram -} - -// TestCombinedAppenderSeriesRefs checks that the combined appender -// correctly uses and updates the series references in the internal map. -func TestCombinedAppenderSeriesRefs(t *testing.T) { - seriesLabels := labels.FromStrings( - model.MetricNameLabel, "test_bytes_total", - "foo", "bar", - ) - - floatMetadata := Metadata{ - Metadata: metadata.Metadata{ - Type: model.MetricTypeCounter, - Unit: "bytes", - Help: "some help", - }, - MetricFamilyName: "test_bytes_total", - } - - t.Run("happy case with ST zero, reference is passed and reused", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 3, 4, 62.0, []exemplar.Exemplar{ - { - Labels: labels.FromStrings("tracid", "122"), - Value: 1337, - }, - })) - - require.Len(t, app.records, 5) - requireEqualOpAndRef(t, "AppendSTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "AppendSTZeroSample", ref, app.records[2]) - requireEqualOpAndRef(t, "Append", ref, app.records[3]) - requireEqualOpAndRef(t, "AppendExemplar", ref, app.records[4]) - }) - - t.Run("error on second ST ingest doesn't update the reference", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) - - app.appendSTZeroSampleError = errors.New("test error") - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 3, 4, 62.0, nil)) - - require.Len(t, app.records, 4) - requireEqualOpAndRef(t, "AppendSTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "AppendSTZeroSample", ref, app.records[2]) - require.Zero(t, app.records[2].outRef, "the second AppendSTZeroSample returned 0") - requireEqualOpAndRef(t, "Append", ref, app.records[3]) - }) - - t.Run("metadata, exemplars are not updated if append failed", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - app.appendError = errors.New("test error") - require.Error(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 0, 1, 42.0, []exemplar.Exemplar{ - { - Labels: labels.FromStrings("tracid", "122"), - Value: 1337, - }, - })) - - require.Len(t, app.records, 1) - require.Equal(t, appenderRecord{ - op: "Append", - ls: labels.FromStrings(model.MetricNameLabel, "test_bytes_total", "foo", "bar"), - }, app.records[0]) - }) - - t.Run("metadata, exemplars are updated if append failed but reference is valid", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - newMetadata := floatMetadata - newMetadata.Help = "some other help" - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) - app.appendError = errors.New("test error") - require.Error(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, []exemplar.Exemplar{ - { - Labels: labels.FromStrings("tracid", "122"), - Value: 1337, - }, - })) - - require.Len(t, app.records, 7) - requireEqualOpAndRef(t, "AppendSTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendSTZeroSample", ref, app.records[3]) - requireEqualOpAndRef(t, "Append", ref, app.records[4]) - require.Zero(t, app.records[4].outRef, "the second Append returned 0") - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5]) - requireEqualOpAndRef(t, "AppendExemplar", ref, app.records[6]) - }) - - t.Run("simulate conflict with existing series", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - ls := labels.FromStrings( - model.MetricNameLabel, "test_bytes_total", - "foo", "bar", - ) - - require.NoError(t, capp.AppendSample(ls, floatMetadata, 1, 2, 42.0, nil)) - - hash := ls.Hash() - cappImpl := capp.(*combinedAppender) - series := cappImpl.refs[hash] - series.ls = labels.FromStrings( - model.MetricNameLabel, "test_bytes_total", - "foo", "club", - ) - // The hash and ref remain the same, but we altered the labels. - // This simulates a conflict with an existing series. - cappImpl.refs[hash] = series - - require.NoError(t, capp.AppendSample(ls, floatMetadata, 3, 4, 62.0, []exemplar.Exemplar{ - { - Labels: labels.FromStrings("tracid", "122"), - Value: 1337, - }, - })) - - require.Len(t, app.records, 5) - requireEqualOpAndRef(t, "AppendSTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "AppendSTZeroSample", 0, app.records[2]) - newRef := app.records[2].outRef - require.NotEqual(t, ref, newRef, "the second AppendSTZeroSample returned a different reference") - requireEqualOpAndRef(t, "Append", newRef, app.records[3]) - requireEqualOpAndRef(t, "AppendExemplar", newRef, app.records[4]) - }) - - t.Run("check that invoking AppendHistogram returns an error for nil histogram", func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, false, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - ls := labels.FromStrings( - model.MetricNameLabel, "test_bytes_total", - "foo", "bar", - ) - err := capp.AppendHistogram(ls, Metadata{}, 4, 2, nil, nil) - require.Error(t, err) - }) - - for _, appendMetadata := range []bool{false, true} { - t.Run(fmt.Sprintf("appendMetadata=%t", appendMetadata), func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, appendMetadata, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), floatMetadata, 1, 2, 42.0, nil)) - - if appendMetadata { - require.Len(t, app.records, 3) - requireEqualOp(t, "AppendSTZeroSample", app.records[0]) - requireEqualOp(t, "Append", app.records[1]) - requireEqualOp(t, "UpdateMetadata", app.records[2]) - } else { - require.Len(t, app.records, 2) - requireEqualOp(t, "AppendSTZeroSample", app.records[0]) - requireEqualOp(t, "Append", app.records[1]) - } - }) - } -} - -// TestCombinedAppenderMetadataChanges verifies that UpdateMetadata is called -// when metadata fields change (help, unit, or type). -func TestCombinedAppenderMetadataChanges(t *testing.T) { - seriesLabels := labels.FromStrings( - model.MetricNameLabel, "test_metric", - "foo", "bar", - ) - - baseMetadata := Metadata{ - Metadata: metadata.Metadata{ - Type: model.MetricTypeCounter, - Unit: "bytes", - Help: "original help", - }, - MetricFamilyName: "test_metric", - } - - tests := []struct { - name string - modifyMetadata func(Metadata) Metadata - }{ - { - name: "help changes", - modifyMetadata: func(m Metadata) Metadata { - m.Help = "new help text" - return m - }, - }, - { - name: "unit changes", - modifyMetadata: func(m Metadata) Metadata { - m.Unit = "seconds" - return m - }, - }, - { - name: "type changes", - modifyMetadata: func(m Metadata) Metadata { - m.Type = model.MetricTypeGauge - return m - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, true, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - newMetadata := tt.modifyMetadata(baseMetadata) - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), baseMetadata, 1, 2, 42.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 4, 62.0, nil)) - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), newMetadata, 3, 5, 162.0, nil)) - - // Verify expected operations. - require.Len(t, app.records, 7) - requireEqualOpAndRef(t, "AppendSTZeroSample", 0, app.records[0]) - ref := app.records[0].outRef - require.NotZero(t, ref) - requireEqualOpAndRef(t, "Append", ref, app.records[1]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[2]) - requireEqualOpAndRef(t, "AppendSTZeroSample", ref, app.records[3]) - requireEqualOpAndRef(t, "Append", ref, app.records[4]) - requireEqualOpAndRef(t, "UpdateMetadata", ref, app.records[5]) - requireEqualOpAndRef(t, "Append", ref, app.records[6]) - }) - } -} - -func requireEqualOp(t *testing.T, expectedOp string, actual appenderRecord) { - t.Helper() - require.Equal(t, expectedOp, actual.op) -} - -func requireEqualOpAndRef(t *testing.T, expectedOp string, expectedRef storage.SeriesRef, actual appenderRecord) { - t.Helper() - require.Equal(t, expectedOp, actual.op) - require.Equal(t, expectedRef, actual.ref) -} - -type appenderRecord struct { - op string - ref storage.SeriesRef - outRef storage.SeriesRef - ls labels.Labels -} - -type appenderRecorder struct { - refcount uint64 - records []appenderRecord - - appendError error - appendSTZeroSampleError error - appendHistogramError error - appendHistogramSTZeroSampleError error - updateMetadataError error - appendExemplarError error -} - -var _ storage.Appender = &appenderRecorder{} - -func (a *appenderRecorder) setOutRef(ref storage.SeriesRef) { - if len(a.records) == 0 { - return - } - a.records[len(a.records)-1].outRef = ref -} - -func (a *appenderRecorder) newRef() storage.SeriesRef { - a.refcount++ - return storage.SeriesRef(a.refcount) -} - -func (a *appenderRecorder) Append(ref storage.SeriesRef, ls labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { - a.records = append(a.records, appenderRecord{op: "Append", ref: ref, ls: ls}) - if a.appendError != nil { - return 0, a.appendError - } - if ref == 0 { - ref = a.newRef() - } - a.setOutRef(ref) - return ref, nil -} - -func (a *appenderRecorder) AppendSTZeroSample(ref storage.SeriesRef, ls labels.Labels, _, _ int64) (storage.SeriesRef, error) { - a.records = append(a.records, appenderRecord{op: "AppendSTZeroSample", ref: ref, ls: ls}) - if a.appendSTZeroSampleError != nil { - return 0, a.appendSTZeroSampleError - } - if ref == 0 { - ref = a.newRef() - } - a.setOutRef(ref) - return ref, nil -} - -func (a *appenderRecorder) AppendHistogram(ref storage.SeriesRef, ls labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - a.records = append(a.records, appenderRecord{op: "AppendHistogram", ref: ref, ls: ls}) - if a.appendHistogramError != nil { - return 0, a.appendHistogramError - } - if ref == 0 { - ref = a.newRef() - } - a.setOutRef(ref) - return ref, nil -} - -func (a *appenderRecorder) AppendHistogramSTZeroSample(ref storage.SeriesRef, ls labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - a.records = append(a.records, appenderRecord{op: "AppendHistogramSTZeroSample", ref: ref, ls: ls}) - if a.appendHistogramSTZeroSampleError != nil { - return 0, a.appendHistogramSTZeroSampleError - } - if ref == 0 { - ref = a.newRef() - } - a.setOutRef(ref) - return ref, nil -} - -func (a *appenderRecorder) UpdateMetadata(ref storage.SeriesRef, ls labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { - a.records = append(a.records, appenderRecord{op: "UpdateMetadata", ref: ref, ls: ls}) - if a.updateMetadataError != nil { - return 0, a.updateMetadataError - } - a.setOutRef(ref) - return ref, nil -} - -func (a *appenderRecorder) AppendExemplar(ref storage.SeriesRef, ls labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { - a.records = append(a.records, appenderRecord{op: "AppendExemplar", ref: ref, ls: ls}) - if a.appendExemplarError != nil { - return 0, a.appendExemplarError - } - a.setOutRef(ref) - return ref, nil -} - -func (a *appenderRecorder) Commit() error { - a.records = append(a.records, appenderRecord{op: "Commit"}) - return nil -} - -func (a *appenderRecorder) Rollback() error { - a.records = append(a.records, appenderRecord{op: "Rollback"}) - return nil -} - -func (*appenderRecorder) SetOptions(_ *storage.AppendOptions) { - panic("not implemented") -} - -func TestMetadataChangedLogic(t *testing.T) { - seriesLabels := labels.FromStrings(model.MetricNameLabel, "test_metric", "foo", "bar") - baseMetadata := Metadata{ - Metadata: metadata.Metadata{Type: model.MetricTypeCounter, Unit: "bytes", Help: "original"}, - MetricFamilyName: "test_metric", - } - - tests := []struct { - name string - appendMetadata bool - modifyMetadata func(Metadata) Metadata - expectWALCall bool - verifyCached func(*testing.T, metadata.Metadata) - }{ - { - name: "appendMetadata=false, no change", - appendMetadata: false, - modifyMetadata: func(m Metadata) Metadata { return m }, - expectWALCall: false, - verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "original", m.Help) }, - }, - { - name: "appendMetadata=false, help changes - cache updated, no WAL", - appendMetadata: false, - modifyMetadata: func(m Metadata) Metadata { m.Help = "changed"; return m }, - expectWALCall: false, - verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "changed", m.Help) }, - }, - { - name: "appendMetadata=true, help changes - cache and WAL updated", - appendMetadata: true, - modifyMetadata: func(m Metadata) Metadata { m.Help = "changed"; return m }, - expectWALCall: true, - verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "changed", m.Help) }, - }, - { - name: "appendMetadata=true, unit changes", - appendMetadata: true, - modifyMetadata: func(m Metadata) Metadata { m.Unit = "seconds"; return m }, - expectWALCall: true, - verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, "seconds", m.Unit) }, - }, - { - name: "appendMetadata=true, type changes", - appendMetadata: true, - modifyMetadata: func(m Metadata) Metadata { m.Type = model.MetricTypeGauge; return m }, - expectWALCall: true, - verifyCached: func(t *testing.T, m metadata.Metadata) { require.Equal(t, model.MetricTypeGauge, m.Type) }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - app := &appenderRecorder{} - capp := NewCombinedAppender(app, promslog.NewNopLogger(), true, tt.appendMetadata, NewCombinedAppenderMetrics(prometheus.NewRegistry())) - - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), baseMetadata, 1, 2, 42.0, nil)) - - modifiedMetadata := tt.modifyMetadata(baseMetadata) - app.records = nil - require.NoError(t, capp.AppendSample(seriesLabels.Copy(), modifiedMetadata, 1, 3, 43.0, nil)) - - hash := seriesLabels.Hash() - cached, exists := capp.(*combinedAppender).refs[hash] - require.True(t, exists) - tt.verifyCached(t, cached.meta) - - updateMetadataCalled := false - for _, record := range app.records { - if record.op == "UpdateMetadata" { - updateMetadataCalled = true - break - } - } - require.Equal(t, tt.expectWALCall, updateMetadataCalled) - }) - } -} diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index 669e10e0a7..fc82a5674f 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -40,6 +40,7 @@ import ( "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/storage" ) const ( @@ -73,8 +74,13 @@ var reservedLabelNames = []string{ // if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. // // This function requires for cached resource and scope labels to be set up first. -func (c *PrometheusConverter) createAttributes(attributes pcommon.Map, settings Settings, - ignoreAttrs []string, logOnOverwrite bool, meta Metadata, extras ...string, +func (c *PrometheusConverter) createAttributes( + attributes pcommon.Map, + settings Settings, + ignoreAttrs []string, + logOnOverwrite bool, + meta metadata.Metadata, + extras ...string, ) (labels.Labels, error) { if c.resourceLabels == nil { return labels.EmptyLabels(), errors.New("createAttributes called without initializing resource context") @@ -210,8 +216,11 @@ func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporali // with the user defined bucket boundaries of non-exponential OTel histograms. // However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets: // https://github.com/prometheus/prometheus/issues/13485. -func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, - settings Settings, meta Metadata, +func (c *PrometheusConverter) addHistogramDataPoints( + ctx context.Context, + dataPoints pmetric.HistogramDataPointSlice, + settings Settings, + appOpts storage.AOptions, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -221,13 +230,11 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) startTimestamp := convertTimeStamp(pt.StartTimestamp()) - baseLabels, err := c.createAttributes(pt.Attributes(), settings, reservedLabelNames, false, meta) + baseLabels, err := c.createAttributes(pt.Attributes(), settings, reservedLabelNames, false, appOpts.Metadata) if err != nil { return err } - baseName := meta.MetricFamilyName - // If the sum is unset, it indicates the _sum metric point should be // omitted if pt.HasSum() { @@ -236,9 +243,8 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo if pt.Flags().NoRecordedValue() { val = math.Float64frombits(value.StaleNaN) } - - sumlabels := c.addLabels(baseName+sumStr, baseLabels) - if err := c.appender.AppendSample(sumlabels, meta, startTimestamp, timestamp, val, nil); err != nil { + sumLabels := c.addLabels(appOpts.MetricFamilyName+sumStr, baseLabels) + if _, err := c.appender.Append(0, sumLabels, startTimestamp, timestamp, val, nil, nil, appOpts); err != nil { return err } } @@ -248,9 +254,8 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo if pt.Flags().NoRecordedValue() { val = math.Float64frombits(value.StaleNaN) } - - countlabels := c.addLabels(baseName+countStr, baseLabels) - if err := c.appender.AppendSample(countlabels, meta, startTimestamp, timestamp, val, nil); err != nil { + countLabels := c.addLabels(appOpts.MetricFamilyName+countStr, baseLabels) + if _, err := c.appender.Append(0, countLabels, startTimestamp, timestamp, val, nil, nil, appOpts); err != nil { return err } exemplars, err := c.getPromExemplars(ctx, pt.Exemplars()) @@ -273,32 +278,35 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo // Find exemplars that belong to this bucket. Both exemplars and // buckets are sorted in ascending order. - var currentBucketExemplars []exemplar.Exemplar + appOpts.Exemplars = appOpts.Exemplars[:0] for ; nextExemplarIdx < len(exemplars); nextExemplarIdx++ { ex := exemplars[nextExemplarIdx] if ex.Value > bound { // This exemplar belongs in a higher bucket. break } - currentBucketExemplars = append(currentBucketExemplars, ex) + appOpts.Exemplars = append(appOpts.Exemplars, ex) } val := float64(cumulativeCount) if pt.Flags().NoRecordedValue() { val = math.Float64frombits(value.StaleNaN) } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) - labels := c.addLabels(baseName+bucketStr, baseLabels, leStr, boundStr) - if err := c.appender.AppendSample(labels, meta, startTimestamp, timestamp, val, currentBucketExemplars); err != nil { + bucketLabels := c.addLabels(appOpts.MetricFamilyName+bucketStr, baseLabels, leStr, boundStr) + if _, err := c.appender.Append(0, bucketLabels, startTimestamp, timestamp, val, nil, nil, appOpts); err != nil { return err } } - // add le=+Inf bucket + + appOpts.Exemplars = exemplars[nextExemplarIdx:] + // add + // le=+Inf bucket val = float64(pt.Count()) if pt.Flags().NoRecordedValue() { val = math.Float64frombits(value.StaleNaN) } - infLabels := c.addLabels(baseName+bucketStr, baseLabels, leStr, pInfStr) - if err := c.appender.AppendSample(infLabels, meta, startTimestamp, timestamp, val, exemplars[nextExemplarIdx:]); err != nil { + infLabels := c.addLabels(appOpts.MetricFamilyName+bucketStr, baseLabels, leStr, pInfStr) + if _, err := c.appender.Append(0, infLabels, startTimestamp, timestamp, val, nil, nil, appOpts); err != nil { return err } } @@ -412,8 +420,11 @@ func findMinAndMaxTimestamps(metric pmetric.Metric, minTimestamp, maxTimestamp p return minTimestamp, maxTimestamp } -func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoints pmetric.SummaryDataPointSlice, - settings Settings, meta Metadata, +func (c *PrometheusConverter) addSummaryDataPoints( + ctx context.Context, + dataPoints pmetric.SummaryDataPointSlice, + settings Settings, + appOpts storage.AOptions, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -423,21 +434,18 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) startTimestamp := convertTimeStamp(pt.StartTimestamp()) - baseLabels, err := c.createAttributes(pt.Attributes(), settings, reservedLabelNames, false, meta) + baseLabels, err := c.createAttributes(pt.Attributes(), settings, reservedLabelNames, false, appOpts.Metadata) if err != nil { return err } - baseName := meta.MetricFamilyName - // treat sum as a sample in an individual TimeSeries val := pt.Sum() if pt.Flags().NoRecordedValue() { val = math.Float64frombits(value.StaleNaN) } - // sum and count of the summary should append suffix to baseName - sumlabels := c.addLabels(baseName+sumStr, baseLabels) - if err := c.appender.AppendSample(sumlabels, meta, startTimestamp, timestamp, val, nil); err != nil { + sumLabels := c.addLabels(appOpts.MetricFamilyName+sumStr, baseLabels) + if _, err := c.appender.Append(0, sumLabels, startTimestamp, timestamp, val, nil, nil, appOpts); err != nil { return err } @@ -446,8 +454,8 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin if pt.Flags().NoRecordedValue() { val = math.Float64frombits(value.StaleNaN) } - countlabels := c.addLabels(baseName+countStr, baseLabels) - if err := c.appender.AppendSample(countlabels, meta, startTimestamp, timestamp, val, nil); err != nil { + countLabels := c.addLabels(appOpts.MetricFamilyName+countStr, baseLabels) + if _, err := c.appender.Append(0, countLabels, startTimestamp, timestamp, val, nil, nil, appOpts); err != nil { return err } @@ -459,8 +467,8 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin val = math.Float64frombits(value.StaleNaN) } percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) - qtlabels := c.addLabels(baseName, baseLabels, quantileStr, percentileStr) - if err := c.appender.AppendSample(qtlabels, meta, startTimestamp, timestamp, val, nil); err != nil { + qtlabels := c.addLabels(appOpts.MetricFamilyName, baseLabels, quantileStr, percentileStr) + if _, err := c.appender.Append(0, qtlabels, startTimestamp, timestamp, val, nil, nil, appOpts); err != nil { return err } } @@ -518,7 +526,7 @@ func (c *PrometheusConverter) addResourceTargetInfo(resource pcommon.Resource, s // Do not pass identifying attributes as ignoreAttrs below. identifyingAttrs = nil } - meta := Metadata{ + appOpts := storage.AOptions{ Metadata: metadata.Metadata{ Type: model.MetricTypeGauge, Help: "Target metadata", @@ -530,7 +538,7 @@ func (c *PrometheusConverter) addResourceTargetInfo(resource pcommon.Resource, s // Temporarily clear scope labels for this call. savedScopeLabels := c.scopeLabels c.scopeLabels = nil - lbls, err := c.createAttributes(attributes, settings, identifyingAttrs, false, Metadata{}, model.MetricNameLabel, name) + lbls, err := c.createAttributes(attributes, settings, identifyingAttrs, false, metadata.Metadata{}, model.MetricNameLabel, name) c.scopeLabels = savedScopeLabels if err != nil { return err @@ -573,7 +581,8 @@ func (c *PrometheusConverter) addResourceTargetInfo(resource pcommon.Resource, s } c.seenTargetInfo[key] = struct{}{} - if err := c.appender.AppendSample(lbls, meta, 0, timestampMs, float64(1), nil); err != nil { + _, err = c.appender.Append(0, lbls, 0, timestampMs, 1.0, nil, nil, appOpts) + if err != nil { return err } } @@ -589,7 +598,8 @@ func (c *PrometheusConverter) addResourceTargetInfo(resource pcommon.Resource, s } c.seenTargetInfo[key] = struct{}{} - return c.appender.AppendSample(lbls, meta, 0, finalTimestampMs, float64(1), nil) + _, err = c.appender.Append(0, lbls, 0, finalTimestampMs, 1.0, nil, nil, appOpts) + return err } // convertTimeStamp converts OTLP timestamp in ns to timestamp in ms. diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go index b86b8cb3ea..3b5a1c4b34 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/testutil" ) @@ -430,7 +431,7 @@ func TestPrometheusConverter_createAttributes(t *testing.T) { require.NoError(t, c.setResourceContext(testResource, settings)) require.NoError(t, c.setScopeContext(tc.scope, settings)) - lbls, err := c.createAttributes(testAttrs, settings, tc.ignoreAttrs, false, Metadata{}, model.MetricNameLabel, "test_metric") + lbls, err := c.createAttributes(testAttrs, settings, tc.ignoreAttrs, false, metadata.Metadata{}, model.MetricNameLabel, "test_metric") require.NoError(t, err) testutil.RequireEqual(t, tc.expectedLabels, lbls) @@ -462,7 +463,7 @@ func TestPrometheusConverter_createAttributes(t *testing.T) { settings, reservedLabelNames, true, - Metadata{}, + metadata.Metadata{}, model.MetricNameLabel, "correct_metric_name", ) require.NoError(t, err) @@ -508,7 +509,7 @@ func TestPrometheusConverter_createAttributes(t *testing.T) { settings, reservedLabelNames, true, - Metadata{Metadata: metadata.Metadata{Type: model.MetricTypeGauge, Unit: "seconds"}}, + metadata.Metadata{Type: model.MetricTypeGauge, Unit: "seconds"}, model.MetricNameLabel, "test_metric", ) require.NoError(t, err) @@ -775,7 +776,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { context.Background(), metric.Summary().DataPoints(), settings, - Metadata{ + storage.AOptions{ MetricFamilyName: metric.Name(), }, ) @@ -942,7 +943,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { context.Background(), metric.Histogram().DataPoints(), settings, - Metadata{ + storage.AOptions{ MetricFamilyName: metric.Name(), }, ) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go index e2537b5cec..31c16b1c10 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" ) @@ -33,8 +34,12 @@ const defaultZeroThreshold = 1e-128 // addExponentialHistogramDataPoints adds OTel exponential histogram data points to the corresponding time series // as native histogram samples. -func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Context, dataPoints pmetric.ExponentialHistogramDataPointSlice, - settings Settings, temporality pmetric.AggregationTemporality, meta Metadata, +func (c *PrometheusConverter) addExponentialHistogramDataPoints( + ctx context.Context, + dataPoints pmetric.ExponentialHistogramDataPointSlice, + settings Settings, + temporality pmetric.AggregationTemporality, + appOpts storage.AOptions, ) (annotations.Annotations, error) { var annots annotations.Annotations for x := 0; x < dataPoints.Len(); x++ { @@ -55,9 +60,9 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont settings, reservedLabelNames, true, - meta, + appOpts.Metadata, model.MetricNameLabel, - meta.MetricFamilyName, + appOpts.MetricFamilyName, ) if err != nil { return annots, err @@ -68,8 +73,10 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont if err != nil { return annots, err } - // OTel exponential histograms are always Int Histograms. - if err = c.appender.AppendHistogram(lbls, meta, st, ts, hp, exemplars); err != nil { + + appOpts.Exemplars = exemplars + // OTel exponential histograms are always integer histograms. + if _, err = c.appender.Append(0, lbls, st, ts, 0, hp, nil, appOpts); err != nil { return annots, err } } @@ -248,8 +255,12 @@ func convertBucketsLayout(bucketCounts []uint64, offset, scaleDown int32, adjust return spans, deltas } -func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice, - settings Settings, temporality pmetric.AggregationTemporality, meta Metadata, +func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints( + ctx context.Context, + dataPoints pmetric.HistogramDataPointSlice, + settings Settings, + temporality pmetric.AggregationTemporality, + appOpts storage.AOptions, ) (annotations.Annotations, error) { var annots annotations.Annotations @@ -271,9 +282,9 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co settings, reservedLabelNames, true, - meta, + appOpts.Metadata, model.MetricNameLabel, - meta.MetricFamilyName, + appOpts.MetricFamilyName, ) if err != nil { return annots, err @@ -284,7 +295,9 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co if err != nil { return annots, err } - if err = c.appender.AppendHistogram(lbls, meta, st, ts, hp, exemplars); err != nil { + + appOpts.Exemplars = exemplars + if _, err = c.appender.Append(0, lbls, st, ts, 0, hp, nil, appOpts); err != nil { return annots, err } } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go index f55aef2f36..58d7c4e835 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/histograms_test.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" ) type expectedBucketLayout struct { @@ -875,7 +876,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { metric.ExponentialHistogram().DataPoints(), settings, pmetric.AggregationTemporalityCumulative, - Metadata{ + storage.AOptions{ MetricFamilyName: name, }, ) @@ -1354,7 +1355,7 @@ func TestPrometheusConverter_addCustomBucketsHistogramDataPoints(t *testing.T) { metric.Histogram().DataPoints(), settings, pmetric.AggregationTemporalityCumulative, - Metadata{ + storage.AOptions{ MetricFamilyName: name, }, ) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 81e99a2f50..600282af6f 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" ) @@ -85,7 +86,7 @@ type PrometheusConverter struct { everyN everyNTimes scratchBuilder labels.ScratchBuilder builder *labels.Builder - appender CombinedAppender + appender storage.AppenderV2 // seenTargetInfo tracks target_info samples within a batch to prevent duplicates. seenTargetInfo map[targetInfoKey]struct{} @@ -105,7 +106,7 @@ type targetInfoKey struct { timestamp int64 } -func NewPrometheusConverter(appender CombinedAppender) *PrometheusConverter { +func NewPrometheusConverter(appender storage.AppenderV2) *PrometheusConverter { return &PrometheusConverter{ scratchBuilder: labels.NewScratchBuilder(0), builder: labels.NewBuilder(labels.EmptyLabels()), @@ -170,7 +171,7 @@ func newScopeFromScopeMetrics(scopeMetrics pmetric.ScopeMetrics) scope { } } -// FromMetrics converts pmetric.Metrics to Prometheus remote write format. +// FromMetrics appends pmetric.Metrics to storage.AppenderV2. func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) { namer := otlptranslator.MetricNamer{ Namespace: settings.Namespace, @@ -236,7 +237,8 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric errs = errors.Join(errs, err) continue } - meta := Metadata{ + + appOpts := storage.AOptions{ Metadata: metadata.Metadata{ Type: otelMetricTypeToPromMetricType(metric), Unit: unitNamer.Build(metric.Unit()), @@ -254,7 +256,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric errs = errors.Join(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - if err := c.addGaugeNumberDataPoints(ctx, dataPoints, settings, meta); err != nil { + if err := c.addGaugeNumberDataPoints(ctx, dataPoints, settings, appOpts); err != nil { errs = errors.Join(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return annots, errs @@ -266,7 +268,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric errs = errors.Join(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - if err := c.addSumNumberDataPoints(ctx, dataPoints, settings, meta); err != nil { + if err := c.addSumNumberDataPoints(ctx, dataPoints, settings, appOpts); err != nil { errs = errors.Join(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return annots, errs @@ -280,7 +282,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric } if settings.ConvertHistogramsToNHCB { ws, err := c.addCustomBucketsHistogramDataPoints( - ctx, dataPoints, settings, temporality, meta, + ctx, dataPoints, settings, temporality, appOpts, ) annots.Merge(ws) if err != nil { @@ -290,7 +292,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric } } } else { - if err := c.addHistogramDataPoints(ctx, dataPoints, settings, meta); err != nil { + if err := c.addHistogramDataPoints(ctx, dataPoints, settings, appOpts); err != nil { errs = errors.Join(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return annots, errs @@ -308,7 +310,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric dataPoints, settings, temporality, - meta, + appOpts, ) annots.Merge(ws) if err != nil { @@ -323,7 +325,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric errs = errors.Join(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name())) break } - if err := c.addSummaryDataPoints(ctx, dataPoints, settings, meta); err != nil { + if err := c.addSummaryDataPoints(ctx, dataPoints, settings, appOpts); err != nil { errs = errors.Join(errs, err) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return annots, errs diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go index f90051e84d..8ac860a291 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw_test.go @@ -22,9 +22,7 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/common/promslog" "github.com/prometheus/otlptranslator" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" @@ -32,7 +30,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" @@ -1239,54 +1236,57 @@ func createOTelEmptyMetricForTranslator(name string) pmetric.Metric { return m } +// Recommended CLI invocation(s): +/* + export bench=fromMetrics && go test ./storage/remote/otlptranslator/prometheusremotewrite/... \ + -run '^$' -bench '^BenchmarkPrometheusConverter_FromMetrics' \ + -benchtime 1s -count 6 -cpu 2 -timeout 999m -benchmem \ + | tee ${bench}.txt +*/ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { for _, resourceAttributeCount := range []int{0, 5, 50} { b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) { - for _, histogramCount := range []int{0, 1000} { - b.Run(fmt.Sprintf("histogram count: %v", histogramCount), func(b *testing.B) { - nonHistogramCounts := []int{0, 1000} + for _, metricCount := range []struct { + histogramCount int + nonHistogramCount int + }{ + {histogramCount: 0, nonHistogramCount: 1000}, + {histogramCount: 1000, nonHistogramCount: 0}, + {histogramCount: 1000, nonHistogramCount: 1000}, + } { + b.Run(fmt.Sprintf("histogram count: %v/non-histogram count: %v", metricCount.histogramCount, metricCount.nonHistogramCount), func(b *testing.B) { + for _, labelsPerMetric := range []int{2, 20} { + b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) { + for _, exemplarsPerSeries := range []int{0, 5, 10} { + b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { + settings := Settings{} + payload, _ := createExportRequest( + resourceAttributeCount, + metricCount.histogramCount, + metricCount.nonHistogramCount, + labelsPerMetric, + exemplarsPerSeries, + settings, + pmetric.AggregationTemporalityCumulative, + ) - if resourceAttributeCount == 0 && histogramCount == 0 { - // Don't bother running a scenario where we'll generate no series. - nonHistogramCounts = []int{1000} - } + b.ResetTimer() + for b.Loop() { + app := &noOpAppender{} + converter := NewPrometheusConverter(app) + annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) + require.NoError(b, err) + require.Empty(b, annots) - for _, nonHistogramCount := range nonHistogramCounts { - b.Run(fmt.Sprintf("non-histogram count: %v", nonHistogramCount), func(b *testing.B) { - for _, labelsPerMetric := range []int{2, 20} { - b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) { - for _, exemplarsPerSeries := range []int{0, 5, 10} { - b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { - settings := Settings{} - payload, _ := createExportRequest( - resourceAttributeCount, - histogramCount, - nonHistogramCount, - labelsPerMetric, - exemplarsPerSeries, - settings, - pmetric.AggregationTemporalityCumulative, - ) - appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry()) - noOpLogger := promslog.NewNopLogger() - b.ResetTimer() - - for b.Loop() { - app := &noOpAppender{} - mockAppender := NewCombinedAppender(app, noOpLogger, false, true, appMetrics) - converter := NewPrometheusConverter(mockAppender) - annots, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) - require.NoError(b, err) - require.Empty(b, annots) - if histogramCount+nonHistogramCount > 0 { - require.Positive(b, app.samples+app.histograms) - require.Positive(b, app.metadata) - } else { - require.Zero(b, app.samples+app.histograms) - require.Zero(b, app.metadata) - } - } - }) + // TODO(bwplotka): This should be tested somewhere else, otherwise we benchmark + // mock too. + if metricCount.histogramCount+metricCount.nonHistogramCount > 0 { + require.Positive(b, app.samples+app.histograms) + require.Positive(b, app.metadata) + } else { + require.Zero(b, app.samples+app.histograms) + require.Zero(b, app.metadata) + } } }) } @@ -1304,35 +1304,20 @@ type noOpAppender struct { metadata int } -var _ storage.Appender = &noOpAppender{} +var _ storage.AppenderV2 = &noOpAppender{} -func (a *noOpAppender) Append(_ storage.SeriesRef, _ labels.Labels, _ int64, _ float64) (storage.SeriesRef, error) { +func (a *noOpAppender) Append(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ float64, h *histogram.Histogram, _ *histogram.FloatHistogram, opts storage.AOptions) (_ storage.SeriesRef, err error) { + if !opts.Metadata.IsEmpty() { + a.metadata++ + } + if h != nil { + a.histograms++ + return 1, nil + } a.samples++ return 1, nil } -func (*noOpAppender) AppendSTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { - return 1, nil -} - -func (a *noOpAppender) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - a.histograms++ - return 1, nil -} - -func (*noOpAppender) AppendHistogramSTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { - return 1, nil -} - -func (a *noOpAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { - a.metadata++ - return 1, nil -} - -func (*noOpAppender) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ exemplar.Exemplar) (storage.SeriesRef, error) { - return 1, nil -} - func (*noOpAppender) Commit() error { return nil } @@ -1341,10 +1326,6 @@ func (*noOpAppender) Rollback() error { return nil } -func (*noOpAppender) SetOptions(_ *storage.AppendOptions) { - panic("not implemented") -} - type wantPrometheusMetric struct { name string familyName string @@ -1677,15 +1658,12 @@ func BenchmarkFromMetrics_LabelCaching_MultipleDatapointsPerResource(b *testing. labelsPerMetric, scopeAttributeCount, ) - appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry()) - noOpLogger := promslog.NewNopLogger() b.ReportAllocs() b.ResetTimer() for b.Loop() { app := &noOpAppender{} - mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics) - converter := NewPrometheusConverter(mockAppender) + converter := NewPrometheusConverter(app) _, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) require.NoError(b, err) } @@ -1709,15 +1687,12 @@ func BenchmarkFromMetrics_LabelCaching_RepeatedLabelNames(b *testing.B) { datapoints, labelsPerDatapoint, ) - appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry()) - noOpLogger := promslog.NewNopLogger() b.ReportAllocs() b.ResetTimer() for b.Loop() { app := &noOpAppender{} - mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics) - converter := NewPrometheusConverter(mockAppender) + converter := NewPrometheusConverter(app) _, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) require.NoError(b, err) } @@ -1747,15 +1722,12 @@ func BenchmarkFromMetrics_LabelCaching_ScopeMetadata(b *testing.B) { labelsPerMetric, scopeAttrs, ) - appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry()) - noOpLogger := promslog.NewNopLogger() b.ReportAllocs() b.ResetTimer() for b.Loop() { app := &noOpAppender{} - mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics) - converter := NewPrometheusConverter(mockAppender) + converter := NewPrometheusConverter(app) _, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) require.NoError(b, err) } @@ -1786,15 +1758,12 @@ func BenchmarkFromMetrics_LabelCaching_MultipleResources(b *testing.B) { metricsPerResource, labelsPerMetric, ) - appMetrics := NewCombinedAppenderMetrics(prometheus.NewRegistry()) - noOpLogger := promslog.NewNopLogger() b.ReportAllocs() b.ResetTimer() for b.Loop() { app := &noOpAppender{} - mockAppender := NewCombinedAppender(app, noOpLogger, false, false, appMetrics) - converter := NewPrometheusConverter(mockAppender) + converter := NewPrometheusConverter(app) _, err := converter.FromMetrics(context.Background(), payload.Metrics(), settings) require.NoError(b, err) } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go index e681bb352b..3c74ec9382 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points.go @@ -24,10 +24,14 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/storage" ) -func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, - settings Settings, meta Metadata, +func (c *PrometheusConverter) addGaugeNumberDataPoints( + ctx context.Context, + dataPoints pmetric.NumberDataPointSlice, + settings Settings, + appOpts storage.AOptions, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -40,9 +44,9 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data settings, reservedLabelNames, true, - meta, + appOpts.Metadata, model.MetricNameLabel, - meta.MetricFamilyName, + appOpts.MetricFamilyName, ) if err != nil { return err @@ -59,7 +63,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data } ts := convertTimeStamp(pt.Timestamp()) st := convertTimeStamp(pt.StartTimestamp()) - if err := c.appender.AppendSample(labels, meta, st, ts, val, nil); err != nil { + if _, err = c.appender.Append(0, labels, st, ts, val, nil, nil, appOpts); err != nil { return err } } @@ -67,8 +71,11 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data return nil } -func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice, - settings Settings, meta Metadata, +func (c *PrometheusConverter) addSumNumberDataPoints( + ctx context.Context, + dataPoints pmetric.NumberDataPointSlice, + settings Settings, + appOpts storage.AOptions, ) error { for x := 0; x < dataPoints.Len(); x++ { if err := c.everyN.checkContext(ctx); err != nil { @@ -81,9 +88,9 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo settings, reservedLabelNames, true, - meta, + appOpts.Metadata, model.MetricNameLabel, - meta.MetricFamilyName, + appOpts.MetricFamilyName, ) if err != nil { return err @@ -104,7 +111,9 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo if err != nil { return err } - if err := c.appender.AppendSample(lbls, meta, st, ts, val, exemplars); err != nil { + + appOpts.Exemplars = exemplars + if _, err = c.appender.Append(0, lbls, st, ts, val, nil, nil, appOpts); err != nil { return err } } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go index 58a27c12e1..67961a2943 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/number_data_points_test.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" ) func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) { @@ -127,7 +128,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) { context.Background(), metric.Gauge().DataPoints(), settings, - Metadata{ + storage.AOptions{ MetricFamilyName: metric.Name(), }, ) @@ -361,7 +362,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) { context.Background(), metric.Sum().DataPoints(), settings, - Metadata{ + storage.AOptions{ MetricFamilyName: metric.Name(), }, ) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index a72712a535..3dac96f6a0 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -537,3 +537,27 @@ func (app *remoteWriteAppender) AppendExemplar(ref storage.SeriesRef, l labels.L } return ref, nil } + +type remoteWriteAppenderV2 struct { + storage.AppenderV2 + + maxTime int64 +} + +func (app *remoteWriteAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { + if t > app.maxTime { + return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds) + } + + if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax { + if err := h.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return 0, err + } + } + if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax { + if err := fh.ReduceResolution(histogram.ExponentialSchemaMax); err != nil { + return 0, err + } + } + return app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts) +} diff --git a/storage/remote/write_otlp_handler.go b/storage/remote/write_otlp_handler.go index 489a7b574a..b3fde1c19c 100644 --- a/storage/remote/write_otlp_handler.go +++ b/storage/remote/write_otlp_handler.go @@ -23,6 +23,7 @@ import ( deltatocumulative "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" @@ -30,6 +31,8 @@ import ( "go.opentelemetry.io/otel/metric/noop" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" @@ -47,16 +50,11 @@ type OTLPOptions struct { LookbackDelta time.Duration // Add type and unit labels to the metrics. EnableTypeAndUnitLabels bool - // IngestSTZeroSample enables writing zero samples based on the start time - // of metrics. - IngestSTZeroSample bool - // AppendMetadata enables writing metadata to WAL when metadata-wal-records feature is enabled. - AppendMetadata bool } // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. -func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.Appendable, configFunc func() config.Config, opts OTLPOptions) http.Handler { +func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.AppendableV2, configFunc func() config.Config, opts OTLPOptions) http.Handler { if opts.NativeDelta && opts.ConvertDelta { // This should be validated when iterating through feature flags, so not expected to fail here. panic("cannot enable native delta ingestion and delta2cumulative conversion at the same time") @@ -64,15 +62,11 @@ func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appenda ex := &rwExporter{ logger: logger, - appendable: appendable, + appendable: newOTLPInstrumentedAppendable(reg, appendable), config: configFunc, allowDeltaTemporality: opts.NativeDelta, lookbackDelta: opts.LookbackDelta, - ingestSTZeroSample: opts.IngestSTZeroSample, enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels, - appendMetadata: opts.AppendMetadata, - // Register metrics. - metrics: otlptranslator.NewCombinedAppenderMetrics(reg), } wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex} @@ -107,26 +101,20 @@ func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appenda type rwExporter struct { logger *slog.Logger - appendable storage.Appendable + appendable storage.AppendableV2 config func() config.Config allowDeltaTemporality bool lookbackDelta time.Duration - ingestSTZeroSample bool enableTypeAndUnitLabels bool - appendMetadata bool - - // Metrics. - metrics otlptranslator.CombinedAppenderMetrics } func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { otlpCfg := rw.config().OTLPConfig - app := &remoteWriteAppender{ - Appender: rw.appendable.Appender(ctx), - maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), + app := &remoteWriteAppenderV2{ + AppenderV2: rw.appendable.AppenderV2(ctx), + maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)), } - combinedAppender := otlptranslator.NewCombinedAppender(app, rw.logger, rw.ingestSTZeroSample, rw.appendMetadata, rw.metrics) - converter := otlptranslator.NewPrometheusConverter(combinedAppender) + converter := otlptranslator.NewPrometheusConverter(app) annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{ AddMetricSuffixes: otlpCfg.TranslationStrategy.ShouldAddSuffixes(), AllowUTF8: !otlpCfg.TranslationStrategy.ShouldEscape(), @@ -225,3 +213,62 @@ func hasDelta(md pmetric.Metrics) bool { } return false } + +type otlpInstrumentedAppendable struct { + storage.AppendableV2 + + samplesAppendedWithoutMetadata prometheus.Counter + outOfOrderExemplars prometheus.Counter +} + +func newOTLPInstrumentedAppendable(reg prometheus.Registerer, app storage.AppendableV2) *otlpInstrumentedAppendable { + return &otlpInstrumentedAppendable{ + AppendableV2: app, + samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "api", + Name: "otlp_appended_samples_without_metadata_total", + Help: "The total number of samples ingested from OTLP without corresponding metadata.", + }), + outOfOrderExemplars: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "api", + Name: "otlp_out_of_order_exemplars_total", + Help: "The total number of received OTLP exemplars which were rejected because they were out of order.", + }), + } +} + +func (a *otlpInstrumentedAppendable) AppenderV2(ctx context.Context) storage.AppenderV2 { + return &otlpInstrumentedAppender{ + AppenderV2: a.AppendableV2.AppenderV2(ctx), + + samplesAppendedWithoutMetadata: a.samplesAppendedWithoutMetadata, + outOfOrderExemplars: a.outOfOrderExemplars, + } +} + +type otlpInstrumentedAppender struct { + storage.AppenderV2 + + samplesAppendedWithoutMetadata prometheus.Counter + outOfOrderExemplars prometheus.Counter +} + +func (app *otlpInstrumentedAppender) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) { + ref, err := app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts) + if err != nil { + var partialErr *storage.AppendPartialError + partialErr, err = partialErr.Handle(err) + if err != nil { + // Not a partial error. + return ref, err + } + app.outOfOrderExemplars.Add(float64(len(partialErr.ExemplarErrors))) + return ref, err + } + if opts.Metadata.IsEmpty() { + app.samplesAppendedWithoutMetadata.Inc() + } + return ref, nil +} diff --git a/storage/remote/write_otlp_handler_test.go b/storage/remote/write_otlp_handler_test.go index 57c0b2ab22..005c807428 100644 --- a/storage/remote/write_otlp_handler_test.go +++ b/storage/remote/write_otlp_handler_test.go @@ -48,6 +48,7 @@ type sample = teststorage.Sample func TestOTLPWriteHandler(t *testing.T) { ts := time.Now() + st := ts.Add(-1 * time.Millisecond) // Expected samples passed via OTLP request without details (labels for now) that // depend on translation or type and unit labels options. @@ -55,7 +56,7 @@ func TestOTLPWriteHandler(t *testing.T) { return []sample{ { M: metadata.Metadata{Type: model.MetricTypeCounter, Unit: "bytes", Help: "test-counter-description"}, - V: 10.0, T: timestamp.FromTime(ts), ES: []exemplar.Exemplar{ + V: 10.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), ES: []exemplar.Exemplar{ { Labels: labels.FromStrings("span_id", "0001020304050607", "trace_id", "000102030405060708090a0b0c0d0e0f"), Value: 10, Ts: timestamp.FromTime(ts), HasTs: true, @@ -64,43 +65,43 @@ func TestOTLPWriteHandler(t *testing.T) { }, { M: metadata.Metadata{Type: model.MetricTypeGauge, Unit: "bytes", Help: "test-gauge-description"}, - V: 10.0, T: timestamp.FromTime(ts), + V: 10.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 30.0, T: timestamp.FromTime(ts), + V: 30.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 12.0, T: timestamp.FromTime(ts), + V: 12.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 2.0, T: timestamp.FromTime(ts), + V: 2.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 4.0, T: timestamp.FromTime(ts), + V: 4.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 6.0, T: timestamp.FromTime(ts), + V: 6.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 8.0, T: timestamp.FromTime(ts), + V: 8.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 10.0, T: timestamp.FromTime(ts), + V: 10.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 12.0, T: timestamp.FromTime(ts), + V: 12.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-histogram-description"}, - V: 12.0, T: timestamp.FromTime(ts), + V: 12.0, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeHistogram, Unit: "bytes", Help: "test-exponential-histogram-description"}, @@ -112,7 +113,7 @@ func TestOTLPWriteHandler(t *testing.T) { ZeroCount: 2, PositiveSpans: []histogram.Span{{Offset: 1, Length: 5}}, PositiveBuckets: []int64{2, 0, 0, 0, 0}, - }, T: timestamp.FromTime(ts), + }, ST: timestamp.FromTime(st), T: timestamp.FromTime(ts), }, { M: metadata.Metadata{Type: model.MetricTypeGauge, Unit: "", Help: "Target metadata"}, V: 1, T: timestamp.FromTime(ts), @@ -120,34 +121,32 @@ func TestOTLPWriteHandler(t *testing.T) { } } - exportRequest := generateOTLPWriteRequest(ts, time.Time{}) + exportRequest := generateOTLPWriteRequest(ts, st) for _, testCase := range []struct { - name string - otlpCfg config.OTLPConfig - typeAndUnitLabels bool - // NOTE: This is a slice of samples, not []labels.Labels because metric family detail will be added once - // OTLP handler moves to AppenderV2. - expectedLabels []sample + name string + otlpCfg config.OTLPConfig + typeAndUnitLabels bool + expectedLabelsAndMFs []sample }{ { name: "NoTranslation/NoTypeAndUnitLabels", otlpCfg: config.OTLPConfig{ TranslationStrategy: otlptranslator.NoTranslation, }, - expectedLabels: []sample{ - {L: labels.FromStrings(model.MetricNameLabel, "test.counter", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.gauge", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_sum", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_count", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.exponential.histogram", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "target_info", "host.name", "test-host", "instance", "test-instance", "job", "test-service")}, + expectedLabelsAndMFs: []sample{ + {MF: "test.counter", L: labels.FromStrings(model.MetricNameLabel, "test.counter", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.gauge", L: labels.FromStrings(model.MetricNameLabel, "test.gauge", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_sum", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_count", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, + {MF: "test.exponential.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.exponential.histogram", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "target_info", L: labels.FromStrings(model.MetricNameLabel, "target_info", "host.name", "test-host", "instance", "test-instance", "job", "test-service")}, }, }, { @@ -156,20 +155,20 @@ func TestOTLPWriteHandler(t *testing.T) { TranslationStrategy: otlptranslator.NoTranslation, }, typeAndUnitLabels: true, - expectedLabels: []sample{ - {L: labels.FromStrings(model.MetricNameLabel, "test.counter", "__type__", "counter", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.gauge", "__type__", "gauge", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_sum", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_count", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.exponential.histogram", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "target_info", "host.name", "test-host", "instance", "test-instance", "job", "test-service")}, + expectedLabelsAndMFs: []sample{ + {MF: "test.counter", L: labels.FromStrings(model.MetricNameLabel, "test.counter", "__type__", "counter", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.gauge", L: labels.FromStrings(model.MetricNameLabel, "test.gauge", "__type__", "gauge", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_sum", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_count", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, + {MF: "test.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, + {MF: "test.exponential.histogram", L: labels.FromStrings(model.MetricNameLabel, "test.exponential.histogram", "__type__", "histogram", "__unit__", "bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "target_info", L: labels.FromStrings(model.MetricNameLabel, "target_info", "host.name", "test-host", "instance", "test-instance", "job", "test-service")}, }, }, // For the following cases, skip type and unit cases, it has nothing todo with translation. @@ -178,20 +177,20 @@ func TestOTLPWriteHandler(t *testing.T) { otlpCfg: config.OTLPConfig{ TranslationStrategy: otlptranslator.UnderscoreEscapingWithSuffixes, }, - expectedLabels: []sample{ - {L: labels.FromStrings(model.MetricNameLabel, "test_counter_bytes_total", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_gauge_bytes", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_sum", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_count", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_exponential_histogram_bytes", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "target_info", "host_name", "test-host", "instance", "test-instance", "job", "test-service")}, + expectedLabelsAndMFs: []sample{ + {MF: "test_counter_bytes_total", L: labels.FromStrings(model.MetricNameLabel, "test_counter_bytes_total", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_gauge_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_gauge_bytes", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_sum", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_count", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, + {MF: "test_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bytes_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, + {MF: "test_exponential_histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test_exponential_histogram_bytes", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "target_info", L: labels.FromStrings(model.MetricNameLabel, "target_info", "host_name", "test-host", "instance", "test-instance", "job", "test-service")}, }, }, { @@ -199,20 +198,20 @@ func TestOTLPWriteHandler(t *testing.T) { otlpCfg: config.OTLPConfig{ TranslationStrategy: otlptranslator.UnderscoreEscapingWithoutSuffixes, }, - expectedLabels: []sample{ - {L: labels.FromStrings(model.MetricNameLabel, "test_counter", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_gauge", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_sum", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_count", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, - {L: labels.FromStrings(model.MetricNameLabel, "test_exponential_histogram", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "target_info", "host_name", "test-host", "instance", "test-instance", "job", "test-service")}, + expectedLabelsAndMFs: []sample{ + {MF: "test_counter", L: labels.FromStrings(model.MetricNameLabel, "test_counter", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_gauge", L: labels.FromStrings(model.MetricNameLabel, "test_gauge", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_sum", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_count", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, + {MF: "test_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_histogram_bucket", "foo_bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, + {MF: "test_exponential_histogram", L: labels.FromStrings(model.MetricNameLabel, "test_exponential_histogram", "foo_bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "target_info", L: labels.FromStrings(model.MetricNameLabel, "target_info", "host_name", "test-host", "instance", "test-instance", "job", "test-service")}, }, }, { @@ -220,34 +219,33 @@ func TestOTLPWriteHandler(t *testing.T) { otlpCfg: config.OTLPConfig{ TranslationStrategy: otlptranslator.NoUTF8EscapingWithSuffixes, }, - expectedLabels: []sample{ + expectedLabelsAndMFs: []sample{ // TODO: Counter MF name looks likea bug. Uncovered in unrelated refactor. fix it. - {L: labels.FromStrings(model.MetricNameLabel, "test.counter_bytes_total", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.gauge_bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_sum", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_count", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, - {L: labels.FromStrings(model.MetricNameLabel, "test.exponential.histogram_bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, - {L: labels.FromStrings(model.MetricNameLabel, "target_info", "host.name", "test-host", "instance", "test-instance", "job", "test-service")}, + {MF: "test.counter_bytes_total", L: labels.FromStrings(model.MetricNameLabel, "test.counter_bytes_total", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.gauge_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.gauge_bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_sum", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_count", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5")}, + {MF: "test.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.histogram_bytes_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf")}, + {MF: "test.exponential.histogram_bytes", L: labels.FromStrings(model.MetricNameLabel, "test.exponential.histogram_bytes", "foo.bar", "baz", "instance", "test-instance", "job", "test-service")}, + {MF: "target_info", L: labels.FromStrings(model.MetricNameLabel, "target_info", "host.name", "test-host", "instance", "test-instance", "job", "test-service")}, }, }, } { t.Run(testCase.name, func(t *testing.T) { otlpOpts := OTLPOptions{ EnableTypeAndUnitLabels: testCase.typeAndUnitLabels, - AppendMetadata: true, } appendable := handleOTLP(t, exportRequest, testCase.otlpCfg, otlpOpts) // Compile final expected samples. expectedSamples := expectedSamplesWithoutLabelsFn() - for i, s := range testCase.expectedLabels { + for i, s := range testCase.expectedLabelsAndMFs { expectedSamples[i].L = s.L expectedSamples[i].MF = s.MF } @@ -256,204 +254,6 @@ func TestOTLPWriteHandler(t *testing.T) { } } -// Check that start time is ingested if ingestSTZeroSample is enabled -// and the start time is actually set (non-zero). -// TODO(bwplotka): This test is still using old mockAppender. Keeping like this as this test -// will be removed when OTLP handling switches to AppenderV2. -func TestOTLPWriteHandler_StartTime(t *testing.T) { - timestamp := time.Now() - startTime := timestamp.Add(-1 * time.Millisecond) - var zeroTime time.Time - - expectedSamples := []mockSample{ - { - l: labels.FromStrings(model.MetricNameLabel, "test.counter", "foo.bar", "baz", "instance", "test-instance", "job", "test-service"), - t: timestamp.UnixMilli(), - v: 10.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.gauge", "foo.bar", "baz", "instance", "test-instance", "job", "test-service"), - t: timestamp.UnixMilli(), - v: 10.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_sum", "foo.bar", "baz", "instance", "test-instance", "job", "test-service"), - t: timestamp.UnixMilli(), - v: 30.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_count", "foo.bar", "baz", "instance", "test-instance", "job", "test-service"), - t: timestamp.UnixMilli(), - v: 12.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "0"), - t: timestamp.UnixMilli(), - v: 2.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "1"), - t: timestamp.UnixMilli(), - v: 4.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "2"), - t: timestamp.UnixMilli(), - v: 6.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "3"), - t: timestamp.UnixMilli(), - v: 8.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "4"), - t: timestamp.UnixMilli(), - v: 10.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "5"), - t: timestamp.UnixMilli(), - v: 12.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "test.histogram_bucket", "foo.bar", "baz", "instance", "test-instance", "job", "test-service", "le", "+Inf"), - t: timestamp.UnixMilli(), - v: 12.0, - }, - { - l: labels.FromStrings(model.MetricNameLabel, "target_info", "host.name", "test-host", "instance", "test-instance", "job", "test-service"), - t: timestamp.UnixMilli(), - v: 1.0, - }, - } - expectedHistograms := []mockHistogram{ - { - l: labels.FromStrings(model.MetricNameLabel, "test.exponential.histogram", "foo.bar", "baz", "instance", "test-instance", "job", "test-service"), - t: timestamp.UnixMilli(), - h: &histogram.Histogram{ - Schema: 2, - ZeroThreshold: 1e-128, - ZeroCount: 2, - Count: 10, - Sum: 30, - PositiveSpans: []histogram.Span{{Offset: 1, Length: 5}}, - PositiveBuckets: []int64{2, 0, 0, 0, 0}, - }, - }, - } - - expectedSamplesWithSTZero := make([]mockSample, 0, len(expectedSamples)*2-1) // All samples will get ST zero, except target_info. - for _, s := range expectedSamples { - if s.l.Get(model.MetricNameLabel) != "target_info" { - expectedSamplesWithSTZero = append(expectedSamplesWithSTZero, mockSample{ - l: s.l.Copy(), - t: startTime.UnixMilli(), - v: 0, - }) - } - expectedSamplesWithSTZero = append(expectedSamplesWithSTZero, s) - } - expectedHistogramsWithSTZero := make([]mockHistogram, 0, len(expectedHistograms)*2) - for _, s := range expectedHistograms { - if s.l.Get(model.MetricNameLabel) != "target_info" { - expectedHistogramsWithSTZero = append(expectedHistogramsWithSTZero, mockHistogram{ - l: s.l.Copy(), - t: startTime.UnixMilli(), - h: &histogram.Histogram{}, - }) - } - expectedHistogramsWithSTZero = append(expectedHistogramsWithSTZero, s) - } - - for _, testCase := range []struct { - name string - otlpOpts OTLPOptions - startTime time.Time - expectSTZero bool - expectedSamples []mockSample - expectedHistograms []mockHistogram - }{ - { - name: "IngestSTZero=false/startTime=0", - otlpOpts: OTLPOptions{ - IngestSTZeroSample: false, - }, - startTime: zeroTime, - expectedSamples: expectedSamples, - expectedHistograms: expectedHistograms, - }, - { - name: "IngestSTZero=true/startTime=0", - otlpOpts: OTLPOptions{ - IngestSTZeroSample: true, - }, - startTime: zeroTime, - expectedSamples: expectedSamples, - expectedHistograms: expectedHistograms, - }, - { - name: "IngestSTZero=false/startTime=ts-1ms", - otlpOpts: OTLPOptions{ - IngestSTZeroSample: false, - }, - startTime: startTime, - expectedSamples: expectedSamples, - expectedHistograms: expectedHistograms, - }, - { - name: "IngestSTZero=true/startTime=ts-1ms", - otlpOpts: OTLPOptions{ - IngestSTZeroSample: true, - }, - startTime: startTime, - expectedSamples: expectedSamplesWithSTZero, - expectedHistograms: expectedHistogramsWithSTZero, - }, - } { - t.Run(testCase.name, func(t *testing.T) { - exportRequest := generateOTLPWriteRequest(timestamp, testCase.startTime) - - buf, err := exportRequest.MarshalProto() - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - require.NoError(t, err) - req.Header.Set("Content-Type", "application/x-protobuf") - - log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelWarn})) - appendable := &mockAppendable{} - handler := NewOTLPWriteHandler(log, nil, appendable, func() config.Config { - return config.Config{ - OTLPConfig: config.OTLPConfig{ - TranslationStrategy: otlptranslator.NoTranslation, - }, - } - }, testCase.otlpOpts) - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusOK, resp.StatusCode) - - for i, expect := range testCase.expectedSamples { - actual := appendable.samples[i] - require.True(t, labels.Equal(expect.l, actual.l), "sample labels,pos=%v", i) - require.Equal(t, expect.t, actual.t, "sample timestamp,pos=%v", i) - require.Equal(t, expect.v, actual.v, "sample value,pos=%v", i) - } - for i, expect := range testCase.expectedHistograms { - actual := appendable.histograms[i] - require.True(t, labels.Equal(expect.l, actual.l), "histogram labels,pos=%v", i) - require.Equal(t, expect.t, actual.t, "histogram timestamp,pos=%v", i) - require.True(t, expect.h.Equals(actual.h), "histogram value,pos=%v", i) - } - require.Len(t, appendable.samples, len(testCase.expectedSamples)) - require.Len(t, appendable.histograms, len(testCase.expectedHistograms)) - }) - } -} - func handleOTLP(t *testing.T, exportRequest pmetricotlp.ExportRequest, otlpCfg config.OTLPConfig, otlpOpts OTLPOptions) *teststorage.Appendable { t.Helper() @@ -608,9 +408,9 @@ func TestOTLPDelta(t *testing.T) { } want := []sample{ - {T: milli(0), L: ls, V: 0}, // +0 - {T: milli(1), L: ls, V: 1}, // +1 - {T: milli(2), L: ls, V: 3}, // +2 + {MF: "some_delta_total", M: metadata.Metadata{Type: model.MetricTypeGauge}, T: milli(0), L: ls, V: 0}, // +0 + {MF: "some_delta_total", M: metadata.Metadata{Type: model.MetricTypeGauge}, T: milli(1), L: ls, V: 1}, // +1 + {MF: "some_delta_total", M: metadata.Metadata{Type: model.MetricTypeGauge}, T: milli(2), L: ls, V: 3}, // +2 } if diff := cmp.Diff(want, appendable.ResultSamples(), cmp.Exporter(func(reflect.Type) bool { return true })); diff != "" { t.Fatal(diff) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 225ef6911d..8f2c848710 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -265,7 +265,7 @@ type API struct { func NewAPI( qe promql.QueryEngine, q storage.SampleAndChunkQueryable, - ap storage.Appendable, + ap storage.Appendable, apV2 storage.AppendableV2, eq storage.ExemplarQueryable, spsr func(context.Context) ScrapePoolsRetriever, tr func(context.Context) TargetRetriever, @@ -342,7 +342,7 @@ func NewAPI( a.statsRenderer = statsRenderer } - if ap == nil && (rwEnabled || otlpEnabled) { + if (ap == nil || apV2 == nil) && (rwEnabled || otlpEnabled) { panic("remote write or otlp write enabled, but no appender passed in.") } @@ -350,13 +350,11 @@ func NewAPI( a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs, stZeroIngestionEnabled, enableTypeAndUnitLabels, appendMetadata) } if otlpEnabled { - a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{ + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, apV2, configFunc, remote.OTLPOptions{ ConvertDelta: otlpDeltaToCumulative, NativeDelta: otlpNativeDeltaIngestion, LookbackDelta: lookbackDelta, - IngestSTZeroSample: stZeroIngestionEnabled, EnableTypeAndUnitLabels: enableTypeAndUnitLabels, - AppendMetadata: appendMetadata, }) } diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index 850bedef17..6e123ac51c 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -134,7 +134,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable, overri api := NewAPI( engine, q, - nil, + nil, nil, nil, func(context.Context) ScrapePoolsRetriever { return &DummyScrapePoolsRetriever{} }, func(context.Context) TargetRetriever { return &DummyTargetRetriever{} }, diff --git a/web/api/v1/test_helpers.go b/web/api/v1/test_helpers.go index 2662b0c84b..2f84cd22d2 100644 --- a/web/api/v1/test_helpers.go +++ b/web/api/v1/test_helpers.go @@ -33,7 +33,7 @@ func newTestAPI(t *testing.T, cfg testhelpers.APIConfig) *testhelpers.APIWrapper api := NewAPI( params.QueryEngine, params.Queryable, - nil, // appendable + nil, nil, // appendables params.ExemplarQueryable, func(ctx context.Context) ScrapePoolsRetriever { return adaptScrapePoolsRetriever(params.ScrapePoolsRetriever(ctx)) diff --git a/web/web.go b/web/web.go index cb9258d87f..5d44cedd97 100644 --- a/web/web.go +++ b/web/web.go @@ -356,9 +356,12 @@ func New(logger *slog.Logger, o *Options) *Handler { factoryAr := func(context.Context) api_v1.AlertmanagerRetriever { return h.notifier } FactoryRr := func(context.Context) api_v1.RulesRetriever { return h.ruleManager } - var app storage.Appendable + var ( + app storage.Appendable + appV2 storage.AppendableV2 + ) if o.EnableRemoteWriteReceiver || o.EnableOTLPWriteReceiver { - app = h.storage + app, appV2 = h.storage, h.storage } version := "" @@ -366,7 +369,7 @@ func New(logger *slog.Logger, o *Options) *Handler { version = o.Version.Version } - h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, app, h.exemplarStorage, factorySPr, factoryTr, factoryAr, + h.apiV1 = api_v1.NewAPI(h.queryEngine, h.storage, app, appV2, h.exemplarStorage, factorySPr, factoryTr, factoryAr, func() config.Config { h.mtx.RLock() defer h.mtx.RUnlock()