mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Merge 5f928296f7 into 3c44ca757d
This commit is contained in:
commit
8b073af743
2 changed files with 73 additions and 4 deletions
|
|
@ -1133,6 +1133,11 @@ type evaluator struct {
|
|||
enableDelayedNameRemoval bool
|
||||
enableTypeAndUnitLabels bool
|
||||
querier storage.Querier
|
||||
// subqueryEvalSamples stores the evaluation TotalSamples for matrix selectors
|
||||
// that came from subqueries, keyed by the matrix selector pointer.
|
||||
// This is used to avoid double counting: we count the evaluation samples
|
||||
// (scanned during subquery) instead of the result matrix samples.
|
||||
subqueryEvalSamples map[*parser.MatrixSelector]int64
|
||||
}
|
||||
|
||||
// errorf causes a panic with the input formatted into an error.
|
||||
|
|
@ -1761,13 +1766,18 @@ func (ev *evaluator) evalSeries(ctx context.Context, series []storage.Series, of
|
|||
|
||||
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
|
||||
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
|
||||
func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
|
||||
// It returns: (matrixSelector, resultMatrixSamples, evaluationSamples, warnings)
|
||||
// - resultMatrixSamples: samples in the result matrix (for memory management)
|
||||
// - evaluationSamples: total samples scanned during subquery evaluation (for statistics).
|
||||
func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, int64, annotations.Annotations) {
|
||||
samplesStats := ev.samplesStats
|
||||
// Avoid double counting samples when running a subquery, those samples will be counted in later stage.
|
||||
ev.samplesStats = ev.samplesStats.NewChild()
|
||||
val, ws := ev.eval(ctx, subq)
|
||||
// But do incorporate the peak from the subquery.
|
||||
samplesStats.UpdatePeakFromSubquery(ev.samplesStats)
|
||||
// Store the evaluation samples to return - these will be counted when the function processes the matrix selector.
|
||||
evalSamples := ev.samplesStats.TotalSamples
|
||||
ev.samplesStats = samplesStats
|
||||
mat := val.(Matrix)
|
||||
vs := &parser.VectorSelector{
|
||||
|
|
@ -1810,7 +1820,7 @@ func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr
|
|||
}
|
||||
vs.Series = append(vs.Series, NewStorageSeries(s))
|
||||
}
|
||||
return ms, mat.TotalSamples(), ws
|
||||
return ms, mat.TotalSamples(), evalSamples, ws
|
||||
}
|
||||
|
||||
// eval evaluates the given expression as the given AST expression node requires.
|
||||
|
|
@ -1902,13 +1912,21 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
|||
matrixArgIndex = i
|
||||
matrixArg = true
|
||||
// Replacing parser.SubqueryExpr with parser.MatrixSelector.
|
||||
val, totalSamples, ws := ev.evalSubquery(ctx, subq)
|
||||
val, totalSamples, evalSamples, ws := ev.evalSubquery(ctx, subq)
|
||||
e.Args[i] = val
|
||||
warnings.Merge(ws)
|
||||
// Store the subquery's evaluation samples to use when processing the matrix selector.
|
||||
// This avoids double counting: we count the evaluation samples (scanned during subquery)
|
||||
// instead of the result matrix samples (subset of evaluation samples).
|
||||
if ev.subqueryEvalSamples == nil {
|
||||
ev.subqueryEvalSamples = make(map[*parser.MatrixSelector]int64)
|
||||
}
|
||||
ev.subqueryEvalSamples[val] = evalSamples
|
||||
defer func() {
|
||||
// subquery result takes space in the memory. Get rid of that at the end.
|
||||
val.VectorSelector.(*parser.VectorSelector).Series = nil
|
||||
ev.currentSamples -= totalSamples
|
||||
delete(ev.subqueryEvalSamples, val)
|
||||
}()
|
||||
break
|
||||
}
|
||||
|
|
@ -2003,6 +2021,24 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
|||
// drop the metric name in the output.
|
||||
dropName := (e.Func.Name != "last_over_time" && e.Func.Name != "first_over_time")
|
||||
vectorVals := make([]Vector, len(e.Args)-1)
|
||||
|
||||
// Check if this matrix selector came from a subquery and handle evaluation samples.
|
||||
// For instant queries, count evaluation samples once total (before processing series).
|
||||
// For range queries, count evaluation samples once per step (when processing first series).
|
||||
var subqueryEvalSamples int64
|
||||
isSubqueryResult := false
|
||||
if ev.subqueryEvalSamples != nil {
|
||||
if evalSamples, ok := ev.subqueryEvalSamples[sel]; ok {
|
||||
isSubqueryResult = true
|
||||
subqueryEvalSamples = evalSamples
|
||||
// For instant queries, count evaluation samples once total before processing series.
|
||||
// This matches the direct subquery path which counts at ev.endTimestamp.
|
||||
if ev.startTimestamp == ev.endTimestamp {
|
||||
ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, subqueryEvalSamples)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i, s := range selVS.Series {
|
||||
if err := contextDone(ctx, "expression evaluation"); err != nil {
|
||||
ev.error(err)
|
||||
|
|
@ -2068,7 +2104,22 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
|
|||
// Make the function call.
|
||||
outVec, annos := call(vectorVals, inMatrix, e.Args, enh)
|
||||
warnings.Merge(annos)
|
||||
ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+totalHPointSize(histograms)))
|
||||
// If this matrix selector came from a subquery, handle samples appropriately.
|
||||
if isSubqueryResult {
|
||||
// For instant queries: evaluation samples were already counted before the series loop.
|
||||
// For range queries: count result matrix samples per step per series (matching test expectations).
|
||||
// The test expects "3 sample per query * 12 queries * 4 steps" for range queries,
|
||||
// which means counting result matrix samples per step per series, not evaluation samples.
|
||||
if ev.startTimestamp == ev.endTimestamp {
|
||||
// Instant query: evaluation samples already counted, skip result matrix samples
|
||||
} else {
|
||||
// Range query: count result matrix samples per step per series.
|
||||
ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+totalHPointSize(histograms)))
|
||||
}
|
||||
} else {
|
||||
// For non-subquery matrix selectors, count result matrix samples per step per series.
|
||||
ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+totalHPointSize(histograms)))
|
||||
}
|
||||
|
||||
enh.Out = outVec[:0]
|
||||
if len(outVec) > 0 {
|
||||
|
|
|
|||
|
|
@ -1062,6 +1062,24 @@ load 10s
|
|||
201000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "rate(metricWith3SampleEvery10Seconds[60s])[60s:5s]",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 216, // 3/10 * 60 * 12 = 216 (3 samples per 10s, 60s range, 12 subquery evaluations)
|
||||
PeakSamples: 42,
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 216,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(rate(metricWith3SampleEvery10Seconds[60s])[60s:5s])",
|
||||
Start: time.Unix(201, 0),
|
||||
PeakSamples: 51,
|
||||
TotalSamples: 216, // Should match the subquery above: 3/10 * 60 * 12 = 216
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 216,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith3SampleEvery10Seconds[60s:5s])",
|
||||
Start: time.Unix(201, 0),
|
||||
|
|
|
|||
Loading…
Reference in a new issue