Pass threshold to collected metrics in scheduler perf.

With this change, the metrics for which the threshold is defined, will have a "Threshold" data point that should be visible in perf dash.
This commit is contained in:
Maciej Wyrzuc 2025-12-08 16:20:08 +00:00
parent f4eedc41b8
commit 264f403ffb
2 changed files with 328 additions and 11 deletions

View file

@ -1472,27 +1472,34 @@ func valueWithinThreshold(value, threshold float64, expectLower bool) bool {
return value > threshold
}
func compareMetricWithThreshold(items []DataItem, threshold float64, metricSelector thresholdMetricSelector) error {
// applyThreshold adds the threshold to data item with metric specified via metricSelector and verifies that
// this metrics value is within threshold.
func applyThreshold(items []DataItem, threshold float64, metricSelector thresholdMetricSelector) error {
if threshold == 0 {
return nil
}
dataBucket := metricSelector.DataBucket
var errs []error
for _, item := range items {
if item.Labels["Metric"] != metricSelector.Name || !labelsMatch(item.Labels, metricSelector.Labels) {
continue
}
thresholdItemName := dataBucket + "Threshold"
item.Data[thresholdItemName] = threshold
dataItem, ok := item.Data[dataBucket]
if !ok {
return fmt.Errorf("%s: no data present for %q metric %q bucket", item.Labels["Name"], metricSelector.Name, dataBucket)
errs = append(errs, fmt.Errorf("%s: no data present for %q metric %q bucket", item.Labels["Name"], metricSelector.Name, dataBucket))
continue
}
if !valueWithinThreshold(dataItem, threshold, metricSelector.ExpectLower) {
if metricSelector.ExpectLower {
return fmt.Errorf("%s: expected %q %q to be lower: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold)
errs = append(errs, fmt.Errorf("%s: expected %q %q to be lower: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold))
} else {
errs = append(errs, fmt.Errorf("%s: expected %q %q to be higher: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold))
}
return fmt.Errorf("%s: expected %q %q to be higher: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold)
}
}
return nil
return errors.Join(errs...)
}
func checkEmptyInFlightEvents() error {
@ -1554,7 +1561,7 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex
for _, collector := range collectors {
items := collector.collect()
dataItems = append(dataItems, items...)
err := compareMetricWithThreshold(items, threshold, tms)
err := applyThreshold(items, threshold, tms)
if err != nil {
tCtx.Errorf("op %d: %s", opIndex, err)
}
@ -2026,7 +2033,8 @@ type testDataCollector interface {
collect() []DataItem
}
func getTestDataCollectors(podInformer coreinformers.PodInformer, name string, namespaces []string, labelSelector map[string]string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
// var for mocking in tests.
var getTestDataCollectors = func(podInformer coreinformers.PodInformer, name string, namespaces []string, labelSelector map[string]string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
if mcc == nil {
mcc = &defaultMetricsCollectorConfig
}

View file

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/component-base/featuregate"
testutils "k8s.io/kubernetes/test/utils"
@ -497,13 +498,14 @@ func TestFeatureGatesMerge(t *testing.T) {
}
}
func TestCompareMetricWithThreshold(t *testing.T) {
func TestApplyThreshold(t *testing.T) {
tests := []struct {
name string
items []DataItem
threshold float64
selector thresholdMetricSelector
wantErr bool
errCount int
}{
{
name: "no items, should pass",
@ -672,18 +674,325 @@ func TestCompareMetricWithThreshold(t *testing.T) {
},
wantErr: true,
},
{
name: "multiple items failing threshold, should return joined error",
items: []DataItem{
{
Labels: map[string]string{"Metric": "Throughput", "ID": "1"},
Data: map[string]float64{"Average": 10},
},
{
Labels: map[string]string{"Metric": "Throughput", "ID": "2"},
Data: map[string]float64{"Average": 20},
},
},
threshold: 50,
selector: thresholdMetricSelector{
Name: "Throughput",
DataBucket: "Average",
},
wantErr: true,
errCount: 2,
},
{
name: "multiple items failing threshold (ExpectLower=true), should return joined error",
items: []DataItem{
{
Labels: map[string]string{"Metric": "Throughput", "ID": "1"},
Data: map[string]float64{"Average": 65},
},
{
Labels: map[string]string{"Metric": "Throughput", "ID": "2"},
Data: map[string]float64{"Average": 75},
},
},
threshold: 50,
selector: thresholdMetricSelector{
Name: "Throughput",
DataBucket: "Average",
ExpectLower: true,
},
wantErr: true,
errCount: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := compareMetricWithThreshold(tt.items, tt.threshold, tt.selector)
err := applyThreshold(tt.items, tt.threshold, tt.selector)
if err != nil {
if !tt.wantErr {
t.Errorf("Expected no error in compareMetricWithThreshold, but got: %v", err)
t.Errorf("Expected no error in applyThreshold, but got: %v", err)
}
if tt.errCount > 0 {
if u, ok := err.(interface{ Unwrap() []error }); ok {
if len(u.Unwrap()) != tt.errCount {
t.Errorf("Expected %d errors, got %d", tt.errCount, len(u.Unwrap()))
}
} else {
t.Errorf("Expected joined error with %d errors, got type %T: %v", tt.errCount, err, err)
}
}
} else {
if tt.wantErr {
t.Errorf("Expected error %v in compareMetricWithThreshold, but got nil", tt.wantErr)
t.Errorf("Expected error %v in applyThreshold, but got nil", tt.wantErr)
}
}
})
}
}
// mockDataCollector always returns the same data items, to be used for mocking data collector in unit tests.
type mockDataCollector struct {
dataItems []DataItem
}
// init does nothing.
func (mc *mockDataCollector) init() error {
return nil
}
// run does nothing.
func (mc *mockDataCollector) run(_ ktesting.TContext) {}
// collect always returns DataItems defined in the collector.
func (mc *mockDataCollector) collect() []DataItem {
return mc.dataItems
}
func TestMetricThreshold(t *testing.T) {
testCases := []struct {
name string
thresholdValue float64
dataItems []DataItem
thresholdMetricSelector *thresholdMetricSelector
expectCollectionFailure bool
expectedDataItemsWithThresholdIndices []int
expectedThresholdName string
}{
{
name: "value is above threshold, no error",
thresholdValue: 100,
dataItems: []DataItem{
{
Data: map[string]float64{
"Average": 150,
},
Labels: map[string]string{
"Metric": "throughput",
},
},
},
thresholdMetricSelector: &thresholdMetricSelector{
Name: "throughput",
DataBucket: "Average",
},
expectedDataItemsWithThresholdIndices: []int{0},
expectedThresholdName: "AverageThreshold",
},
{
name: "value is below threshold, expect error",
thresholdValue: 100,
dataItems: []DataItem{
{
Data: map[string]float64{
"Average": 70,
"Max": 90,
},
Labels: map[string]string{
"Metric": "throughput",
},
},
},
thresholdMetricSelector: &thresholdMetricSelector{
Name: "throughput",
DataBucket: "Max",
},
expectCollectionFailure: true,
expectedDataItemsWithThresholdIndices: []int{0},
expectedThresholdName: "MaxThreshold",
},
{
name: "no error if the labels do not match",
thresholdValue: 100,
dataItems: []DataItem{
{
Data: map[string]float64{
"Average": 70,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value",
},
},
},
thresholdMetricSelector: &thresholdMetricSelector{
Name: "throughput",
DataBucket: "Average",
Labels: map[string]string{
"label": "value2",
},
},
expectedDataItemsWithThresholdIndices: []int{},
expectedThresholdName: "AverageThreshold",
},
{
name: "out of multiple data items only matching are selected",
thresholdValue: 100,
dataItems: []DataItem{
{
Data: map[string]float64{
"Average": 70,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value",
},
},
{
Data: map[string]float64{
"Average": 150,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value2",
},
},
},
thresholdMetricSelector: &thresholdMetricSelector{
Name: "throughput",
DataBucket: "Average",
Labels: map[string]string{
"label": "value2",
},
},
expectedDataItemsWithThresholdIndices: []int{1},
expectedThresholdName: "AverageThreshold",
},
{
name: "threshold value is added for all matching entries",
thresholdValue: 100,
dataItems: []DataItem{
{
Data: map[string]float64{
"Average": 130,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value",
},
},
{
Data: map[string]float64{
"Average": 150,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value2",
},
},
},
thresholdMetricSelector: &thresholdMetricSelector{
Name: "throughput",
DataBucket: "Average",
},
expectedDataItemsWithThresholdIndices: []int{0, 1},
expectedThresholdName: "AverageThreshold",
},
{
name: "threshold value is added for all matching entries even with error",
thresholdValue: 100,
dataItems: []DataItem{
{
Data: map[string]float64{
"Average": 70,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value",
},
},
{
Data: map[string]float64{
"Average": 80,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value2",
},
},
{
Data: map[string]float64{
"Average": 130,
},
Labels: map[string]string{
"Metric": "throughput",
"label": "value3",
},
},
},
thresholdMetricSelector: &thresholdMetricSelector{
Name: "throughput",
DataBucket: "Average",
},
expectCollectionFailure: true,
expectedDataItemsWithThresholdIndices: []int{0, 1, 2},
expectedThresholdName: "AverageThreshold",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, tCtx := ktesting.NewTestContext(t)
var capturedErr error
capturingCtx, finalize := tCtx.WithError(&capturedErr)
defer finalize()
originalGetTestDataCollectors := getTestDataCollectors
defer func() { getTestDataCollectors = originalGetTestDataCollectors }()
getTestDataCollectors = func(_ coreinformers.PodInformer, _ string, _ []string, _ map[string]string, _ *metricsCollectorConfig, _ float64) []testDataCollector {
return []testDataCollector{&mockDataCollector{dataItems: tc.dataItems}}
}
workload := &workload{
Name: "some/workload",
Threshold: thresholds{
valuesByTopic: map[string]float64{"example": tc.thresholdValue},
},
ThresholdMetricSelector: tc.thresholdMetricSelector,
}
exec := &WorkloadExecutor{
topicName: "example",
testCase: &testCase{},
tCtx: capturingCtx,
numPodsScheduledPerNamespace: make(map[string]int),
workload: workload,
}
start := &startCollectingMetricsOp{
Opcode: startCollectingMetricsOpcode,
Name: "test-collection",
Namespaces: []string{"test-namespaces"},
}
err := exec.runOp(start, 0)
if err != nil {
t.Fatalf("Failed to start metric collection")
}
stop := &stopCollectingMetricsOp{Opcode: stopCollectingMetricsOpcode}
err = exec.runOp(stop, 0)
if err != nil {
t.Fatalf("Failed to stop metric collection")
}
if tc.expectCollectionFailure != capturingCtx.Failed() {
t.Fatalf("expectCollectionFailure=%v but got %v", tc.expectCollectionFailure, capturingCtx.Failed())
}
for _, idx := range tc.expectedDataItemsWithThresholdIndices {
if idx >= len(exec.dataItems) {
t.Fatalf("expectedDataItemsWithThresholdIndex out of data items range")
}
if _, ok := exec.dataItems[idx].Data[tc.expectedThresholdName]; !ok {
t.Fatalf("expected data item at index=%d to have %s field", idx, tc.expectedThresholdName)
}
}
})