This commit is contained in:
Bartlomiej Plotka 2026-02-03 17:17:17 +01:00 committed by GitHub
commit c241f0ac76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 6985 additions and 4 deletions

View file

@ -75,7 +75,7 @@ func (e *BinaryExpr) Pretty(level int) string {
returnBool = " bool"
}
matching := e.getMatchingStr()
matching := e.GetMatchingStr()
return fmt.Sprintf("%s\n%s%s%s%s\n%s", e.LHS.Pretty(level+1), indent(level), e.Op, returnBool, matching, e.RHS.Pretty(level+1))
}

View file

@ -137,15 +137,15 @@ func (node *BinaryExpr) returnBool() string {
}
func (node *BinaryExpr) String() string {
matching := node.getMatchingStr()
matching := node.GetMatchingStr()
return node.LHS.String() + " " + node.Op.String() + node.returnBool() + matching + " " + node.RHS.String()
}
func (node *BinaryExpr) ShortString() string {
return node.Op.String() + node.returnBool() + node.getMatchingStr()
return node.Op.String() + node.returnBool() + node.GetMatchingStr()
}
func (node *BinaryExpr) getMatchingStr() string {
func (node *BinaryExpr) GetMatchingStr() string {
matching := ""
var b bytes.Buffer
vm := node.VectorMatching

223
promql/pipe.go Normal file
View file

@ -0,0 +1,223 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
package promql
import (
"bytes"
"fmt"
"maps"
"slices"
"strconv"
"strings"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
)
func addVar(vars map[string]string, value string) (varName string) {
// TODO: Naive, fix.
if strings.HasPrefix(value, "(") {
value = strings.TrimPrefix(strings.TrimSuffix(value, ")"), "(")
}
// TODO: This is naive. We can do more with partial searches and "compacting" of variables
// e.g. when adding looping_time{group_name="realtime",location="us-east1"}[2m]
// to a vars with looping_time{group_name="realtime",location="us-east1"}[2m] | rate | sum | histogram_sum
// we could have simpler variables.
for k, v := range vars {
if value == v {
return k
}
}
varName = fmt.Sprintf("x%d", len(vars)+1)
vars[varName] = value
return varName
}
// ToPiped transforms a standard PromQL query string into the piped syntax.
func ToPiped(query string) (string, error) {
expr, err := parser.ParseExpr(query)
if err != nil {
return "", err
}
vars := map[string]string{}
ret := bytes.NewBuffer(nil)
printPipedWithVars(expr, ret, vars)
if len(vars) == 0 {
return ret.String(), nil
}
b := bytes.NewBuffer(nil)
b.WriteString("let\n")
for _, k := range slices.Sorted(maps.Keys(vars)) {
b.WriteString(" ")
b.WriteString(k)
b.WriteString(" = ")
b.WriteString(vars[k])
b.WriteString("\n")
}
b.WriteString("in ")
b.WriteString(ret.String())
return b.String(), nil
}
func stringPipedWithVars(node parser.Node, vars map[string]string) string {
b := bytes.NewBuffer(nil)
printPipedWithVars(node, b, vars)
return b.String()
}
func writeLabels(b *bytes.Buffer, ss []string) {
for i, s := range ss {
if i > 0 {
b.WriteString(", ")
}
if !model.LegacyValidation.IsValidMetricName(s) {
b.Write(strconv.AppendQuote(b.AvailableBuffer(), s))
} else {
b.WriteString(s)
}
}
}
func printPipedWithVars(node parser.Node, b *bytes.Buffer, vars map[string]string) {
switch n := node.(type) {
case *parser.EvalStmt:
printPipedWithVars(n.Expr, b, vars)
case parser.Expressions:
for _, e := range n {
printPipedWithVars(e, b, vars)
}
case *parser.AggregateExpr:
b.WriteString(stringPipedWithVars(n.Expr, vars))
b.WriteString(" | ")
b.WriteString(n.Op.String())
if n.Op.IsAggregatorWithParam() {
b.WriteString("(")
b.WriteString(n.Param.String())
b.WriteString(")")
}
switch {
case n.Without:
b.WriteString(" without (")
writeLabels(b, n.Grouping)
b.WriteString(") ")
case len(n.Grouping) > 0:
b.WriteString(" by (")
writeLabels(b, n.Grouping)
b.WriteString(") ")
}
case *parser.BinaryExpr:
var lhs, rhs string
switch {
case n.LHS.Type() == parser.ValueTypeScalar && n.RHS.Type() == parser.ValueTypeScalar:
// Two scalars.
lhs = n.LHS.String()
rhs = n.RHS.String()
case n.LHS.Type() != parser.ValueTypeScalar && n.RHS.Type() != parser.ValueTypeScalar:
pre := len(vars)
lhs = stringPipedWithVars(n.LHS, vars)
diff := len(vars) - pre
// This is hacky, might be not very true for nested things.
if diff == 0 {
lhs = addVar(vars, lhs)
}
pre = len(vars)
rhs = stringPipedWithVars(n.RHS, vars)
diff = len(vars) - pre
// This is hacky, might be not very true for nested things.
if diff == 0 {
rhs = addVar(vars, rhs)
}
case n.LHS.Type() == parser.ValueTypeScalar:
// With pipe syntax we organize simpler form to the right.
lhs = stringPipedWithVars(n.RHS, vars)
rhs = n.LHS.String()
case n.RHS.Type() == parser.ValueTypeScalar:
// With pipe syntax we organize simpler form to the right.
lhs = stringPipedWithVars(n.LHS, vars)
rhs = n.RHS.String()
}
b.WriteString(lhs)
b.WriteString(" ")
b.WriteString(n.Op.String())
if n.ReturnBool {
b.WriteString(" bool")
}
b.WriteString(n.GetMatchingStr())
b.WriteString(" ")
b.WriteString(rhs)
case *parser.Call:
var (
args = bytes.NewBuffer(nil)
lhs string
)
if len(n.Args) > 0 {
for _, e := range n.Args {
if e.Type() == parser.ValueTypeScalar || e.Type() == parser.ValueTypeString {
if args.Len() > 0 {
args.WriteString(", ")
}
args.WriteString(e.String())
continue
}
if lhs != "" {
// More than one complex arg (e.g. info function).
// TODO: This is YOLO, one could think if there's a more readable way..
if args.Len() > 0 {
args.WriteString(", ")
}
args.WriteString(addVar(vars, lhs))
}
lhs = stringPipedWithVars(e, vars)
}
}
if lhs != "" {
b.WriteString(lhs)
b.WriteString(" | ")
}
b.WriteString(n.Func.Name)
if args.Len() > 0 {
b.WriteString("(")
b.WriteString(args.String())
b.WriteString(")")
}
case *parser.SubqueryExpr:
b.WriteString(stringPipedWithVars(n.Expr, vars))
case *parser.ParenExpr:
b.WriteString("(")
b.WriteString(stringPipedWithVars(n.Expr, vars))
b.WriteString(")")
case *parser.UnaryExpr:
b.WriteString(n.String())
case *parser.MatrixSelector:
b.WriteString(n.String())
case *parser.StepInvariantExpr:
b.WriteString(n.String())
case *parser.NumberLiteral, *parser.StringLiteral, *parser.VectorSelector:
b.WriteString(n.String())
default:
panic(fmt.Errorf("promql.printPiped: unhandled node type %T", node))
}
}

View file

@ -0,0 +1,136 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build ignore
package main
import (
"bytes"
"embed"
"go/format"
"io/fs"
"iter"
"log"
"maps"
"os"
"regexp"
"slices"
"strings"
"text/template"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
)
//go:embed promqltest/testdata
var testsFs embed.FS
// querySeq returns an iterator that yields all query strings found in the test file content.
func findQueries(testFile string) iter.Seq[string] {
evalRegex := regexp.MustCompile(`^(\s*eval\s+(?:instant at \d+m?|range from \d+m? to \d+m? step \d+m?) )(.*)$`)
return func(yield func(string) bool) {
lines := strings.Split(testFile, "\n")
for _, line := range lines {
matches := evalRegex.FindStringSubmatch(line)
// Group 2 is the query part
if len(matches) == 3 {
if !yield(matches[2]) {
return
}
}
}
}
}
type queryCase struct {
Source, Query, Piped, SinglePiped string
}
func main() {
parser.EnableExperimentalFunctions = true
parser.ExperimentalDurationExpr = true
parser.EnableExtendedRangeSelectors = true
files, err := fs.Glob(testsFs, "promqltest/testdata/*.test")
if err != nil {
log.Fatal(err)
}
queries := map[string]queryCase{}
for _, fn := range files {
content, err := fs.ReadFile(testsFs, fn)
if err != nil {
log.Fatal(err)
}
for query := range findQueries(string(content)) {
piped, err := promql.ToPiped(query)
if err != nil {
// Best effort, caller will fix it.
piped = err.Error()
}
queries[query] = queryCase{Source: fn, Query: query, Piped: piped}
}
}
sortedQueries := slices.SortedFunc(maps.Values(queries), func(a, b queryCase) int {
cmp := strings.Compare(a.Source, b.Source)
if cmp == 0 {
return strings.Compare(a.Query, b.Query)
}
return cmp
})
// Generate code.
var buf bytes.Buffer
err = testFile.Execute(&buf, struct {
Queries []queryCase
}{
Queries: sortedQueries,
})
if err != nil {
log.Fatalf("executing template: %v", err)
}
// Format it.
result, err := format.Source(buf.Bytes())
if err != nil {
log.Fatalf("formating code: %v", err)
}
// Write it to a file.
if err := os.WriteFile("pipe_cases_test.go", result, 0o644); err != nil {
log.Fatalf("writing file: %v", err)
}
}
var testFile = template.Must(template.New("testFile").Parse(`// Code generated by pipe_cases_test.gen.go; DO NOT EDIT.
//go:generate go run pipe_cases_test.gen.go
package promql_test
type pipedCase struct {
query, piped string
}
var pipedCases = []pipedCase{
{{ range .Queries -}}
{
// Source: {{ .Source }}
query: ` + "`" + `{{ .Query }}` + "`" + `,
piped: ` + "`" + `{{ .Piped }}` + "`" + `,
},
{{end}}
}
`))

6545
promql/pipe_cases_test.go Normal file

File diff suppressed because it is too large Load diff

77
promql/pipe_test.go Normal file
View file

@ -0,0 +1,77 @@
// Copyright 2025 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
package promql_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
)
func TestToPiped(t *testing.T) {
t.Cleanup(func() {
parser.EnableExperimentalFunctions = false
parser.ExperimentalDurationExpr = false
parser.EnableExtendedRangeSelectors = false
})
parser.EnableExperimentalFunctions = true
parser.ExperimentalDurationExpr = true
parser.EnableExtendedRangeSelectors = true
for _, tc := range manualPipedCases {
t.Run(tc.query, func(t *testing.T) {
got, err := promql.ToPiped(tc.query)
require.NoError(t, err)
require.Equal(t, tc.piped, got)
})
}
for _, tc := range pipedCases {
t.Run(tc.query, func(t *testing.T) {
got, err := promql.ToPiped(tc.query)
require.NoError(t, err)
require.Equal(t, tc.piped, got)
})
}
}
var manualPipedCases = []pipedCase{
{
query: `(
kube_deployment_status_replicas_available{namespace='monitoring',cluster!~'o11y-.+'} > 1
)
*
histogram_sum(sum(rate(looping_time{location='us-east1', group_name='realtime'} [2m])))
/
(
histogram_sum(sum(rate(looping_time{location='us-east1', group_name='realtime'} [2m]))) +
(
histogram_sum(sum(rate(sleeping_time{location='us-east1', group_name='realtime'} [2m]))) OR on() vector(0)
)
)`,
piped: `let
x1 = kube_deployment_status_replicas_available{cluster!~"o11y-.+",namespace="monitoring"} > 1
x2 = looping_time{group_name="realtime",location="us-east1"}[2m] | rate | sum | histogram_sum
x3 = sleeping_time{group_name="realtime",location="us-east1"}[2m] | rate | sum | histogram_sum
x4 = vector(0)
in x1 * x2 / (x2 + (x3 or on () x4))`,
},
{
query: `avg(sum by (group) (http_requests{job="api-server"}))`,
piped: `http_requests{job="api-server"} | sum by (group) | avg`,
},
}