prometheus/storage/remote/write_otlp_handler.go
Bartlomiej Plotka 7769495a4a
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
refactor: switch OTLP handler to AppendableV2 (#17996)
* refactor: switch OTLP handler to AppendableV2

Signed-off-by: bwplotka <bwplotka@gmail.com>

* addressed comments

Signed-off-by: bwplotka <bwplotka@gmail.com>

---------

Signed-off-by: bwplotka <bwplotka@gmail.com>
2026-02-03 16:44:40 +00:00

276 lines
10 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"time"
deltatocumulative "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/metric/noop"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
)
type OTLPOptions struct {
// Convert delta samples to their cumulative equivalent by aggregating in-memory
ConvertDelta bool
// Store the raw delta samples as metrics with unknown type (we don't have a proper type for delta yet, therefore
// marking the metric type as unknown for now).
// We're in an early phase of implementing delta support (proposal: https://github.com/prometheus/proposals/pull/48/)
NativeDelta bool
// LookbackDelta is the query lookback delta.
// Used to calculate the target_info sample timestamp interval.
LookbackDelta time.Duration
// Add type and unit labels to the metrics.
EnableTypeAndUnitLabels bool
}
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appendable storage.AppendableV2, configFunc func() config.Config, opts OTLPOptions) http.Handler {
if opts.NativeDelta && opts.ConvertDelta {
// This should be validated when iterating through feature flags, so not expected to fail here.
panic("cannot enable native delta ingestion and delta2cumulative conversion at the same time")
}
ex := &rwExporter{
logger: logger,
appendable: newOTLPInstrumentedAppendable(reg, appendable),
config: configFunc,
allowDeltaTemporality: opts.NativeDelta,
lookbackDelta: opts.LookbackDelta,
enableTypeAndUnitLabels: opts.EnableTypeAndUnitLabels,
}
wh := &otlpWriteHandler{logger: logger, defaultConsumer: ex}
if opts.ConvertDelta {
fac := deltatocumulative.NewFactory()
set := processor.Settings{
ID: component.NewID(fac.Type()),
TelemetrySettings: component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()},
}
d2c, err := fac.CreateMetrics(context.Background(), set, fac.CreateDefaultConfig(), wh.defaultConsumer)
if err != nil {
// fac.CreateMetrics directly calls [deltatocumulativeprocessor.createMetricsProcessor],
// which only errors if:
// - cfg.(type) != *Config
// - telemetry.New fails due to bad set.TelemetrySettings
//
// both cannot be the case, as we pass a valid *Config and valid TelemetrySettings.
// as such, we assume this error to never occur.
// if it is, our assumptions are broken in which case a panic seems acceptable.
panic(fmt.Errorf("failed to create metrics processor: %w", err))
}
if err := d2c.Start(context.Background(), nil); err != nil {
// deltatocumulative does not error on start. see above for panic reasoning
panic(err)
}
wh.d2cConsumer = d2c
}
return wh
}
type rwExporter struct {
logger *slog.Logger
appendable storage.AppendableV2
config func() config.Config
allowDeltaTemporality bool
lookbackDelta time.Duration
enableTypeAndUnitLabels bool
}
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
otlpCfg := rw.config().OTLPConfig
app := &remoteWriteAppenderV2{
AppenderV2: rw.appendable.AppenderV2(ctx),
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
}
converter := otlptranslator.NewPrometheusConverter(app)
annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{
AddMetricSuffixes: otlpCfg.TranslationStrategy.ShouldAddSuffixes(),
AllowUTF8: !otlpCfg.TranslationStrategy.ShouldEscape(),
PromoteResourceAttributes: otlptranslator.NewPromoteResourceAttributes(otlpCfg),
KeepIdentifyingResourceAttributes: otlpCfg.KeepIdentifyingResourceAttributes,
ConvertHistogramsToNHCB: otlpCfg.ConvertHistogramsToNHCB,
PromoteScopeMetadata: otlpCfg.PromoteScopeMetadata,
AllowDeltaTemporality: rw.allowDeltaTemporality,
LookbackDelta: rw.lookbackDelta,
EnableTypeAndUnitLabels: rw.enableTypeAndUnitLabels,
LabelNameUnderscoreSanitization: otlpCfg.LabelNameUnderscoreSanitization,
LabelNamePreserveMultipleUnderscores: otlpCfg.LabelNamePreserveMultipleUnderscores,
})
defer func() {
if err != nil {
_ = app.Rollback()
return
}
err = app.Commit()
}()
ws, _ := annots.AsStrings("", 0, 0)
if len(ws) > 0 {
rw.logger.Warn("Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
}
return err
}
func (*rwExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
type otlpWriteHandler struct {
logger *slog.Logger
defaultConsumer consumer.Metrics // stores deltas as-is
d2cConsumer consumer.Metrics // converts deltas to cumulative
}
func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
req, err := DecodeOTLPWriteRequest(r)
if err != nil {
h.logger.Error("Error decoding OTLP write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
md := req.Metrics()
// If deltatocumulative conversion enabled AND delta samples exist, use slower conversion path.
// While deltatocumulative can also accept cumulative metrics (and then just forwards them as-is), it currently
// holds a sync.Mutex when entering ConsumeMetrics. This is slow and not necessary when ingesting cumulative metrics.
if h.d2cConsumer != nil && hasDelta(md) {
err = h.d2cConsumer.ConsumeMetrics(r.Context(), md)
} else {
// Otherwise use default consumer (alongside cumulative samples, this will accept delta samples and write as-is
// if native-delta-support is enabled).
err = h.defaultConsumer.ConsumeMetrics(r.Context(), md)
}
switch {
case err == nil:
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrOutOfBounds), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Indicated an out of order sample is a bad request to prevent retries.
http.Error(w, err.Error(), http.StatusBadRequest)
return
default:
h.logger.Error("Error appending remote write", "err", err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func hasDelta(md pmetric.Metrics) bool {
for i := range md.ResourceMetrics().Len() {
sms := md.ResourceMetrics().At(i).ScopeMetrics()
for i := range sms.Len() {
ms := sms.At(i).Metrics()
for i := range ms.Len() {
temporality := pmetric.AggregationTemporalityUnspecified
m := ms.At(i)
switch ms.At(i).Type() {
case pmetric.MetricTypeSum:
temporality = m.Sum().AggregationTemporality()
case pmetric.MetricTypeExponentialHistogram:
temporality = m.ExponentialHistogram().AggregationTemporality()
case pmetric.MetricTypeHistogram:
temporality = m.Histogram().AggregationTemporality()
}
if temporality == pmetric.AggregationTemporalityDelta {
return true
}
}
}
}
return false
}
type otlpInstrumentedAppendable struct {
storage.AppendableV2
samplesAppendedWithoutMetadata prometheus.Counter
outOfOrderExemplars prometheus.Counter
}
// newOTLPInstrumentedAppendable instruments some OTLP metrics per append and
// handles partial errors, so the caller does not need to.
func newOTLPInstrumentedAppendable(reg prometheus.Registerer, app storage.AppendableV2) *otlpInstrumentedAppendable {
return &otlpInstrumentedAppendable{
AppendableV2: app,
samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
Name: "otlp_appended_samples_without_metadata_total",
Help: "The total number of samples ingested from OTLP without corresponding metadata.",
}),
outOfOrderExemplars: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
Name: "otlp_out_of_order_exemplars_total",
Help: "The total number of received OTLP exemplars which were rejected because they were out of order.",
}),
}
}
func (a *otlpInstrumentedAppendable) AppenderV2(ctx context.Context) storage.AppenderV2 {
return &otlpInstrumentedAppender{
AppenderV2: a.AppendableV2.AppenderV2(ctx),
samplesAppendedWithoutMetadata: a.samplesAppendedWithoutMetadata,
outOfOrderExemplars: a.outOfOrderExemplars,
}
}
type otlpInstrumentedAppender struct {
storage.AppenderV2
samplesAppendedWithoutMetadata prometheus.Counter
outOfOrderExemplars prometheus.Counter
}
func (app *otlpInstrumentedAppender) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
ref, err := app.AppenderV2.Append(ref, ls, st, t, v, h, fh, opts)
if err != nil {
var partialErr *storage.AppendPartialError
partialErr, hErr := partialErr.Handle(err)
if hErr != nil {
// Not a partial error, return err.
return 0, err
}
app.outOfOrderExemplars.Add(float64(len(partialErr.ExemplarErrors)))
// Hide the partial error as otlp converter does not handle it.
}
if opts.Metadata.IsEmpty() {
app.samplesAppendedWithoutMetadata.Inc()
}
return ref, nil
}