mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
refactor(appenderV2)[PART5a]: add AppendableV2 support to scrape loop + tests
416 lines
14 KiB
Go
416 lines
14 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package scrape
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/prometheus/common/model"
|
|
|
|
"github.com/prometheus/prometheus/model/exemplar"
|
|
"github.com/prometheus/prometheus/model/histogram"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/textparse"
|
|
"github.com/prometheus/prometheus/model/timestamp"
|
|
"github.com/prometheus/prometheus/model/value"
|
|
"github.com/prometheus/prometheus/storage"
|
|
)
|
|
|
|
// appenderWithLimits returns an appender with additional validation.
|
|
func appenderV2WithLimits(app storage.AppenderV2, sampleLimit, bucketLimit int, maxSchema int32) storage.AppenderV2 {
|
|
app = &timeLimitAppenderV2{
|
|
AppenderV2: app,
|
|
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
|
}
|
|
|
|
// The sampleLimit is applied after metrics are potentially dropped via relabeling.
|
|
if sampleLimit > 0 {
|
|
app = &limitAppenderV2{
|
|
AppenderV2: app,
|
|
limit: sampleLimit,
|
|
}
|
|
}
|
|
|
|
if bucketLimit > 0 {
|
|
app = &bucketLimitAppenderV2{
|
|
AppenderV2: app,
|
|
limit: bucketLimit,
|
|
}
|
|
}
|
|
|
|
if maxSchema < histogram.ExponentialSchemaMax {
|
|
app = &maxSchemaAppenderV2{
|
|
AppenderV2: app,
|
|
maxSchema: maxSchema,
|
|
}
|
|
}
|
|
|
|
return app
|
|
}
|
|
|
|
func (sl *scrapeLoop) updateStaleMarkersV2(app storage.AppenderV2, defTime int64) (err error) {
|
|
sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool {
|
|
// Series no longer exposed, mark it stale.
|
|
_, err = app.Append(ref, lset, 0, defTime, math.Float64frombits(value.StaleNaN), nil, nil, storage.AOptions{RejectOutOfOrder: true})
|
|
switch {
|
|
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
|
|
// Do not count these in logging, as this is expected if a target
|
|
// goes away and comes back again with a new scrape loop.
|
|
err = nil
|
|
}
|
|
return err == nil
|
|
})
|
|
return err
|
|
}
|
|
|
|
type scrapeLoopAppenderV2 struct {
|
|
*scrapeLoop
|
|
|
|
storage.AppenderV2
|
|
}
|
|
|
|
var _ scrapeLoopAppendAdapter = &scrapeLoopAppenderV2{}
|
|
|
|
func (sl *scrapeLoopAppenderV2) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
|
|
defTime := timestamp.FromTime(ts)
|
|
|
|
if len(b) == 0 {
|
|
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
|
|
err = sl.updateStaleMarkersV2(sl.AppenderV2, defTime)
|
|
sl.cache.iterDone(false)
|
|
return total, added, seriesAdded, err
|
|
}
|
|
|
|
p, err := textparse.New(b, contentType, sl.symbolTable, textparse.ParserOptions{
|
|
EnableTypeAndUnitLabels: sl.enableTypeAndUnitLabels,
|
|
IgnoreNativeHistograms: !sl.enableNativeHistogramScraping,
|
|
ConvertClassicHistogramsToNHCB: sl.convertClassicHistToNHCB,
|
|
KeepClassicOnClassicAndNativeHistograms: sl.alwaysScrapeClassicHist,
|
|
OpenMetricsSkipSTSeries: sl.enableSTZeroIngestion,
|
|
FallbackContentType: sl.fallbackScrapeProtocol,
|
|
})
|
|
if p == nil {
|
|
sl.l.Error(
|
|
"Failed to determine correct type of scrape target.",
|
|
"content_type", contentType,
|
|
"fallback_media_type", sl.fallbackScrapeProtocol,
|
|
"err", err,
|
|
)
|
|
return total, added, seriesAdded, err
|
|
}
|
|
if err != nil {
|
|
sl.l.Debug(
|
|
"Invalid content type on scrape, using fallback setting.",
|
|
"content_type", contentType,
|
|
"fallback_media_type", sl.fallbackScrapeProtocol,
|
|
"err", err,
|
|
)
|
|
}
|
|
var (
|
|
appErrs = appendErrors{}
|
|
sampleLimitErr error
|
|
bucketLimitErr error
|
|
lset labels.Labels // Escapes to heap so hoisted out of loop.
|
|
e exemplar.Exemplar // Escapes to heap so hoisted out of loop.
|
|
lastMeta *metaEntry
|
|
lastMFName []byte
|
|
)
|
|
|
|
exemplars := make([]exemplar.Exemplar, 0, 1)
|
|
|
|
// Take an appender with limits.
|
|
app := appenderV2WithLimits(sl.AppenderV2, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
return
|
|
}
|
|
// Flush and swap the cache as the scrape was non-empty.
|
|
sl.cache.iterDone(true)
|
|
}()
|
|
|
|
loop:
|
|
for {
|
|
var (
|
|
et textparse.Entry
|
|
sampleAdded, isHistogram bool
|
|
met []byte
|
|
parsedTimestamp *int64
|
|
val float64
|
|
h *histogram.Histogram
|
|
fh *histogram.FloatHistogram
|
|
)
|
|
if et, err = p.Next(); err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
err = nil
|
|
}
|
|
break
|
|
}
|
|
switch et {
|
|
// TODO(bwplotka): Consider changing parser to give metadata at once instead of type, help and unit in separation, ideally on `Series()/Histogram()
|
|
// otherwise we can expose metadata without series on metadata API.
|
|
case textparse.EntryType:
|
|
// TODO(bwplotka): Build meta entry directly instead of locking and updating the map. This will
|
|
// allow to properly update metadata when e.g unit was added, then removed;
|
|
lastMFName, lastMeta = sl.cache.setType(p.Type())
|
|
continue
|
|
case textparse.EntryHelp:
|
|
lastMFName, lastMeta = sl.cache.setHelp(p.Help())
|
|
continue
|
|
case textparse.EntryUnit:
|
|
lastMFName, lastMeta = sl.cache.setUnit(p.Unit())
|
|
continue
|
|
case textparse.EntryComment:
|
|
continue
|
|
case textparse.EntryHistogram:
|
|
isHistogram = true
|
|
default:
|
|
}
|
|
total++
|
|
|
|
t := defTime
|
|
if isHistogram {
|
|
met, parsedTimestamp, h, fh = p.Histogram()
|
|
} else {
|
|
met, parsedTimestamp, val = p.Series()
|
|
}
|
|
if !sl.honorTimestamps {
|
|
parsedTimestamp = nil
|
|
}
|
|
if parsedTimestamp != nil {
|
|
t = *parsedTimestamp
|
|
}
|
|
|
|
if sl.cache.getDropped(met) {
|
|
continue
|
|
}
|
|
ce, seriesCached, seriesAlreadyScraped := sl.cache.get(met)
|
|
var (
|
|
ref storage.SeriesRef
|
|
hash uint64
|
|
)
|
|
|
|
if seriesCached {
|
|
ref = ce.ref
|
|
lset = ce.lset
|
|
hash = ce.hash
|
|
} else {
|
|
p.Labels(&lset)
|
|
hash = lset.Hash()
|
|
|
|
// Hash label set as it is seen local to the target. Then add target labels
|
|
// and relabeling and store the final label set.
|
|
lset = sl.sampleMutator(lset)
|
|
|
|
// The label set may be set to empty to indicate dropping.
|
|
if lset.IsEmpty() {
|
|
sl.cache.addDropped(met)
|
|
continue
|
|
}
|
|
|
|
if !lset.Has(model.MetricNameLabel) {
|
|
err = errNameLabelMandatory
|
|
break loop
|
|
}
|
|
if !lset.IsValid(sl.validationScheme) {
|
|
err = fmt.Errorf("invalid metric name or label names: %s", lset.String())
|
|
break loop
|
|
}
|
|
|
|
// If any label limits is exceeded the scrape should fail.
|
|
if err = verifyLabelLimits(lset, sl.labelLimits); err != nil {
|
|
sl.metrics.targetScrapePoolExceededLabelLimits.Inc()
|
|
break loop
|
|
}
|
|
}
|
|
|
|
exemplars = exemplars[:0] // Reset and reuse the exemplar slice.
|
|
|
|
if seriesAlreadyScraped && parsedTimestamp == nil {
|
|
err = storage.ErrDuplicateSampleForTimestamp
|
|
} else {
|
|
// Double check we don't append float 0 for
|
|
// histogram case where parser returns bad data.
|
|
// This can only happen when parser has a bug.
|
|
if isHistogram && h == nil && fh == nil {
|
|
err = fmt.Errorf("parser returned nil histogram/float histogram for a histogram entry type for %v series; parser bug; aborting", lset.String())
|
|
break loop
|
|
}
|
|
|
|
st := int64(0)
|
|
if sl.enableSTZeroIngestion {
|
|
// p.StartTimestamp() tend to be expensive (e.g. OM1). Do it only if we care.
|
|
st = p.StartTimestamp()
|
|
}
|
|
|
|
for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) {
|
|
if !e.HasTs {
|
|
if isHistogram {
|
|
// We drop exemplars for native histograms if they don't have a timestamp.
|
|
// Missing timestamps are deliberately not supported as we want to start
|
|
// enforcing timestamps for exemplars as otherwise proper deduplication
|
|
// is inefficient and purely based on heuristics: we cannot distinguish
|
|
// between repeated exemplars and new instances with the same values.
|
|
// This is done silently without logs as it is not an error but out of spec.
|
|
// This does not affect classic histograms so that behaviour is unchanged.
|
|
e = exemplar.Exemplar{} // Reset for the next fetch.
|
|
continue
|
|
}
|
|
e.Ts = t
|
|
}
|
|
exemplars = append(exemplars, e)
|
|
e = exemplar.Exemplar{} // Reset for the next fetch.
|
|
}
|
|
|
|
// Prepare append call.
|
|
appOpts := storage.AOptions{}
|
|
if len(exemplars) > 0 {
|
|
// Sort so that checking for duplicates / out of order is more efficient during validation.
|
|
slices.SortFunc(exemplars, exemplar.Compare)
|
|
appOpts.Exemplars = exemplars
|
|
}
|
|
|
|
// Metadata path mimicks the scrape appender V1 flow. Once we remove v2
|
|
// flow we should rename "appendMetadataToWAL" flag to "passMetadata" because for v2 flow
|
|
// the metadata storage detail is behind the appendableV2 contract. V2 also means we always pass the metadata,
|
|
// we don't check if it changed (that code can be removed).
|
|
//
|
|
// Long term, we should always attach the metadata without any flag. Unfortunately because of the limitation
|
|
// of the TEXT and OpenMetrics 1.0 (hopefully fixed in OpenMetrics 2.0) there are edge cases around unknown
|
|
// metadata + suffixes that is expensive (isSeriesPartOfFamily) or in some cases impossible to detect. For this
|
|
// reason metadata (appendMetadataToWAL=true) appender V2 flow scrape might taking ~3% more CPU in our benchmarks.
|
|
//
|
|
// TODO(https://github.com/prometheus/prometheus/issues/17900): Optimize this, notably move this check to parsers that require this (ensuring parser
|
|
// interface always yields correct metadata), deliver OpenMetrics 2.0 that removes suffixes.
|
|
if sl.appendMetadataToWAL && lastMeta != nil {
|
|
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
|
|
// However, optional TYPE, etc metadata and broken OM text can break this, detect those cases here.
|
|
if !isSeriesPartOfFamily(lset.Get(model.MetricNameLabel), lastMFName, lastMeta.Type) {
|
|
lastMeta = nil // Don't pass knowingly broken metadata, now, nor on the next line.
|
|
}
|
|
if lastMeta != nil {
|
|
// Metric family name has the same source as metadata.
|
|
appOpts.MetricFamilyName = yoloString(lastMFName)
|
|
appOpts.Metadata = lastMeta.Metadata
|
|
}
|
|
}
|
|
|
|
// Append sample to the storage.
|
|
ref, err = app.Append(ref, lset, st, t, val, h, fh, appOpts)
|
|
}
|
|
sampleAdded, err = sl.checkAddError(met, exemplars, err, &sampleLimitErr, &bucketLimitErr, &appErrs)
|
|
if err != nil {
|
|
if !errors.Is(err, storage.ErrNotFound) {
|
|
sl.l.Debug("Unexpected error", "series", string(met), "err", err)
|
|
}
|
|
break loop
|
|
}
|
|
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
|
|
sl.cache.trackStaleness(ce.ref, ce)
|
|
}
|
|
|
|
// If series wasn't cached (is new, not seen on previous scrape) we need to add it to the scrape cache.
|
|
// But we only do this for series that were appended to TSDB without errors.
|
|
// If a series was new, but we didn't append it due to sample_limit or other errors then we don't need
|
|
// it in the scrape cache because we don't need to emit StaleNaNs for it when it disappears.
|
|
if !seriesCached && sampleAdded {
|
|
ce = sl.cache.addRef(met, ref, lset, hash)
|
|
if ce != nil && (parsedTimestamp == nil || sl.trackTimestampsStaleness) {
|
|
// Bypass staleness logic if there is an explicit timestamp.
|
|
// But make sure we only do this if we have a cache entry (ce) for our series.
|
|
sl.cache.trackStaleness(ref, ce)
|
|
}
|
|
if sampleLimitErr == nil && bucketLimitErr == nil {
|
|
seriesAdded++
|
|
}
|
|
}
|
|
|
|
// Increment added even if there's an error so we correctly report the
|
|
// number of samples remaining after relabeling.
|
|
// We still report duplicated samples here since this number should be the exact number
|
|
// of time series exposed on a scrape after relabelling.
|
|
added++
|
|
}
|
|
if sampleLimitErr != nil {
|
|
if err == nil {
|
|
err = sampleLimitErr
|
|
}
|
|
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
|
|
sl.metrics.targetScrapeSampleLimit.Inc()
|
|
}
|
|
if bucketLimitErr != nil {
|
|
if err == nil {
|
|
err = bucketLimitErr // If sample limit is hit, that error takes precedence.
|
|
}
|
|
// We only want to increment this once per scrape, so this is Inc'd outside the loop.
|
|
sl.metrics.targetScrapeNativeHistogramBucketLimit.Inc()
|
|
}
|
|
if appErrs.numOutOfOrder > 0 {
|
|
sl.l.Warn("Error on ingesting out-of-order samples", "num_dropped", appErrs.numOutOfOrder)
|
|
}
|
|
if appErrs.numDuplicates > 0 {
|
|
sl.l.Warn("Error on ingesting samples with different value but same timestamp", "num_dropped", appErrs.numDuplicates)
|
|
}
|
|
if appErrs.numOutOfBounds > 0 {
|
|
sl.l.Warn("Error on ingesting samples that are too old or are too far into the future", "num_dropped", appErrs.numOutOfBounds)
|
|
}
|
|
if appErrs.numExemplarOutOfOrder > 0 {
|
|
sl.l.Warn("Error on ingesting out-of-order exemplars", "num_dropped", appErrs.numExemplarOutOfOrder)
|
|
}
|
|
if err == nil {
|
|
err = sl.updateStaleMarkersV2(app, defTime)
|
|
}
|
|
return total, added, seriesAdded, err
|
|
}
|
|
|
|
func (sl *scrapeLoopAppenderV2) addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) (err error) {
|
|
ce, ok, _ := sl.cache.get(s.name)
|
|
var ref storage.SeriesRef
|
|
var lset labels.Labels
|
|
if ok {
|
|
ref = ce.ref
|
|
lset = ce.lset
|
|
} else {
|
|
// The constants are suffixed with the invalid \xff unicode rune to avoid collisions
|
|
// with scraped metrics in the cache.
|
|
// We have to drop it when building the actual metric.
|
|
b.Reset(labels.EmptyLabels())
|
|
b.Set(model.MetricNameLabel, string(s.name[:len(s.name)-1]))
|
|
lset = sl.reportSampleMutator(b.Labels())
|
|
}
|
|
|
|
ref, err = sl.Append(ref, lset, 0, t, v, nil, nil, storage.AOptions{
|
|
MetricFamilyName: yoloString(s.name),
|
|
Metadata: s.Metadata,
|
|
RejectOutOfOrder: rejectOOO,
|
|
})
|
|
switch {
|
|
case err == nil:
|
|
if !ok {
|
|
sl.cache.addRef(s.name, ref, lset, lset.Hash())
|
|
}
|
|
return nil
|
|
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
|
|
// Do not log here, as this is expected if a target goes away and comes back
|
|
// again with a new scrape loop.
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|