mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
promql: Fix collision error with delayed name removal for non-overlapping series
Signed-off-by: Julien Pivotto <291750+roidelapluie@users.noreply.github.com>
This commit is contained in:
parent
c2b86775b6
commit
6efbb873c7
2 changed files with 104 additions and 5 deletions
|
|
@ -1175,7 +1175,7 @@ func (ev *evaluator) Eval(ctx context.Context, expr parser.Expr) (v parser.Value
|
|||
|
||||
v, ws = ev.eval(ctx, expr)
|
||||
if ev.enableDelayedNameRemoval {
|
||||
ev.cleanupMetricLabels(v)
|
||||
v = ev.cleanupMetricLabels(v)
|
||||
}
|
||||
return v, ws, nil
|
||||
}
|
||||
|
|
@ -3832,7 +3832,7 @@ func (*evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []str
|
|||
return enh.Out, nil
|
||||
}
|
||||
|
||||
func (ev *evaluator) cleanupMetricLabels(v parser.Value) {
|
||||
func (ev *evaluator) cleanupMetricLabels(v parser.Value) parser.Value {
|
||||
if v.Type() == parser.ValueTypeMatrix {
|
||||
mat := v.(Matrix)
|
||||
for i := range mat {
|
||||
|
|
@ -3840,9 +3840,7 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) {
|
|||
mat[i].Metric = mat[i].Metric.DropReserved(schema.IsMetadataLabel)
|
||||
}
|
||||
}
|
||||
if mat.ContainsSameLabelset() {
|
||||
ev.errorf("vector cannot contain metrics with the same labelset")
|
||||
}
|
||||
return ev.mergeSeriesWithSameLabelset(mat)
|
||||
} else if v.Type() == parser.ValueTypeVector {
|
||||
vec := v.(Vector)
|
||||
for i := range vec {
|
||||
|
|
@ -3853,7 +3851,98 @@ func (ev *evaluator) cleanupMetricLabels(v parser.Value) {
|
|||
if vec.ContainsSameLabelset() {
|
||||
ev.errorf("vector cannot contain metrics with the same labelset")
|
||||
}
|
||||
return vec
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
// mergeSeriesWithSameLabelset merges series in a matrix that have the same labelset
|
||||
// after __name__ label removal. This happens when delayed name removal is enabled and
|
||||
// operations like OR combine series that originally had different names but end up
|
||||
// with the same labelset after dropping the name. If series with the same labelset
|
||||
// have overlapping timestamps, an error is returned.
|
||||
func (ev *evaluator) mergeSeriesWithSameLabelset(mat Matrix) Matrix {
|
||||
if len(mat) <= 1 {
|
||||
return mat
|
||||
}
|
||||
|
||||
// Group series by their labelset hash.
|
||||
seriesByHash := make(map[uint64][]int)
|
||||
for i := range mat {
|
||||
hash := mat[i].Metric.Hash()
|
||||
seriesByHash[hash] = append(seriesByHash[hash], i)
|
||||
}
|
||||
|
||||
// Check if any merging is needed.
|
||||
needsMerge := false
|
||||
for _, indices := range seriesByHash {
|
||||
if len(indices) > 1 {
|
||||
needsMerge = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !needsMerge {
|
||||
return mat
|
||||
}
|
||||
|
||||
// Merge series with the same labelset.
|
||||
merged := make(Matrix, 0, len(seriesByHash))
|
||||
for _, indices := range seriesByHash {
|
||||
base := mat[indices[0]]
|
||||
|
||||
if len(indices) == 1 {
|
||||
// No collision, add as-is.
|
||||
merged = append(merged, base)
|
||||
continue
|
||||
}
|
||||
|
||||
// Multiple series with the same labelset - check for overlaps and merge.
|
||||
// Build a set of timestamps to detect overlaps.
|
||||
timestamps := make(map[int64]struct{}, len(base.Floats)+len(base.Histograms))
|
||||
for _, p := range base.Floats {
|
||||
timestamps[p.T] = struct{}{}
|
||||
}
|
||||
for _, p := range base.Histograms {
|
||||
timestamps[p.T] = struct{}{}
|
||||
}
|
||||
|
||||
// Merge remaining series, checking for timestamp overlaps.
|
||||
for _, idx := range indices[1:] {
|
||||
series := mat[idx]
|
||||
|
||||
// Check floats for overlaps.
|
||||
for _, p := range series.Floats {
|
||||
if _, exists := timestamps[p.T]; exists {
|
||||
ev.errorf("vector cannot contain metrics with the same labelset")
|
||||
}
|
||||
timestamps[p.T] = struct{}{}
|
||||
}
|
||||
// Check histograms for overlaps.
|
||||
for _, p := range series.Histograms {
|
||||
if _, exists := timestamps[p.T]; exists {
|
||||
ev.errorf("vector cannot contain metrics with the same labelset")
|
||||
}
|
||||
timestamps[p.T] = struct{}{}
|
||||
}
|
||||
|
||||
// No overlaps, merge the samples.
|
||||
base.Floats = append(base.Floats, series.Floats...)
|
||||
base.Histograms = append(base.Histograms, series.Histograms...)
|
||||
}
|
||||
|
||||
// Sort merged samples by timestamp.
|
||||
sort.Slice(base.Floats, func(i, j int) bool {
|
||||
return base.Floats[i].T < base.Floats[j].T
|
||||
})
|
||||
sort.Slice(base.Histograms, func(i, j int) bool {
|
||||
return base.Histograms[i].T < base.Histograms[j].T
|
||||
})
|
||||
|
||||
merged = append(merged, base)
|
||||
}
|
||||
|
||||
return merged
|
||||
}
|
||||
|
||||
func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, numSteps int) {
|
||||
|
|
|
|||
|
|
@ -126,3 +126,13 @@ eval instant at 10m sum by (__name__) (metric_total{env="3"} or rate(metric_tota
|
|||
# Same as above, but with reversed order.
|
||||
eval instant at 10m sum by (__name__) (rate(metric_total{env="3"}[5m]) or metric_total{env="1"})
|
||||
metric_total 10
|
||||
|
||||
clear
|
||||
|
||||
# Test delayed name removal with range queries and OR operator.
|
||||
load 10m
|
||||
metric_a 1 _
|
||||
metric_b 3 4
|
||||
|
||||
eval range from 0 to 20m step 10m -metric_a or -metric_b
|
||||
{} -1 -4 _
|
||||
|
|
|
|||
Loading…
Reference in a new issue