From 264f403ffbe3817c832c36e6064697daeeb8093b Mon Sep 17 00:00:00 2001 From: Maciej Wyrzuc Date: Mon, 8 Dec 2025 16:20:08 +0000 Subject: [PATCH] Pass threshold to collected metrics in scheduler perf. With this change, the metrics for which the threshold is defined, will have a "Threshold" data point that should be visible in perf dash. --- .../scheduler_perf/scheduler_perf.go | 22 +- .../scheduler_perf/scheduler_perf_test.go | 317 +++++++++++++++++- 2 files changed, 328 insertions(+), 11 deletions(-) diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 9666fafa031..2729f132f44 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -1472,27 +1472,34 @@ func valueWithinThreshold(value, threshold float64, expectLower bool) bool { return value > threshold } -func compareMetricWithThreshold(items []DataItem, threshold float64, metricSelector thresholdMetricSelector) error { +// applyThreshold adds the threshold to data item with metric specified via metricSelector and verifies that +// this metrics value is within threshold. +func applyThreshold(items []DataItem, threshold float64, metricSelector thresholdMetricSelector) error { if threshold == 0 { return nil } dataBucket := metricSelector.DataBucket + var errs []error for _, item := range items { if item.Labels["Metric"] != metricSelector.Name || !labelsMatch(item.Labels, metricSelector.Labels) { continue } + thresholdItemName := dataBucket + "Threshold" + item.Data[thresholdItemName] = threshold dataItem, ok := item.Data[dataBucket] if !ok { - return fmt.Errorf("%s: no data present for %q metric %q bucket", item.Labels["Name"], metricSelector.Name, dataBucket) + errs = append(errs, fmt.Errorf("%s: no data present for %q metric %q bucket", item.Labels["Name"], metricSelector.Name, dataBucket)) + continue } if !valueWithinThreshold(dataItem, threshold, metricSelector.ExpectLower) { if metricSelector.ExpectLower { - return fmt.Errorf("%s: expected %q %q to be lower: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold) + errs = append(errs, fmt.Errorf("%s: expected %q %q to be lower: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold)) + } else { + errs = append(errs, fmt.Errorf("%s: expected %q %q to be higher: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold)) } - return fmt.Errorf("%s: expected %q %q to be higher: got %f, want %f", item.Labels["Name"], metricSelector.Name, dataBucket, dataItem, threshold) } } - return nil + return errors.Join(errs...) } func checkEmptyInFlightEvents() error { @@ -1554,7 +1561,7 @@ func stopCollectingMetrics(tCtx ktesting.TContext, collectorCtx ktesting.TContex for _, collector := range collectors { items := collector.collect() dataItems = append(dataItems, items...) - err := compareMetricWithThreshold(items, threshold, tms) + err := applyThreshold(items, threshold, tms) if err != nil { tCtx.Errorf("op %d: %s", opIndex, err) } @@ -2026,7 +2033,8 @@ type testDataCollector interface { collect() []DataItem } -func getTestDataCollectors(podInformer coreinformers.PodInformer, name string, namespaces []string, labelSelector map[string]string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { +// var for mocking in tests. +var getTestDataCollectors = func(podInformer coreinformers.PodInformer, name string, namespaces []string, labelSelector map[string]string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector { if mcc == nil { mcc = &defaultMetricsCollectorConfig } diff --git a/test/integration/scheduler_perf/scheduler_perf_test.go b/test/integration/scheduler_perf/scheduler_perf_test.go index 92c448a25e5..a85ea3a7fbe 100644 --- a/test/integration/scheduler_perf/scheduler_perf_test.go +++ b/test/integration/scheduler_perf/scheduler_perf_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/component-base/featuregate" testutils "k8s.io/kubernetes/test/utils" @@ -497,13 +498,14 @@ func TestFeatureGatesMerge(t *testing.T) { } } -func TestCompareMetricWithThreshold(t *testing.T) { +func TestApplyThreshold(t *testing.T) { tests := []struct { name string items []DataItem threshold float64 selector thresholdMetricSelector wantErr bool + errCount int }{ { name: "no items, should pass", @@ -672,18 +674,325 @@ func TestCompareMetricWithThreshold(t *testing.T) { }, wantErr: true, }, + { + name: "multiple items failing threshold, should return joined error", + items: []DataItem{ + { + Labels: map[string]string{"Metric": "Throughput", "ID": "1"}, + Data: map[string]float64{"Average": 10}, + }, + { + Labels: map[string]string{"Metric": "Throughput", "ID": "2"}, + Data: map[string]float64{"Average": 20}, + }, + }, + threshold: 50, + selector: thresholdMetricSelector{ + Name: "Throughput", + DataBucket: "Average", + }, + wantErr: true, + errCount: 2, + }, + { + name: "multiple items failing threshold (ExpectLower=true), should return joined error", + items: []DataItem{ + { + Labels: map[string]string{"Metric": "Throughput", "ID": "1"}, + Data: map[string]float64{"Average": 65}, + }, + { + Labels: map[string]string{"Metric": "Throughput", "ID": "2"}, + Data: map[string]float64{"Average": 75}, + }, + }, + threshold: 50, + selector: thresholdMetricSelector{ + Name: "Throughput", + DataBucket: "Average", + ExpectLower: true, + }, + wantErr: true, + errCount: 2, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := compareMetricWithThreshold(tt.items, tt.threshold, tt.selector) + err := applyThreshold(tt.items, tt.threshold, tt.selector) if err != nil { if !tt.wantErr { - t.Errorf("Expected no error in compareMetricWithThreshold, but got: %v", err) + t.Errorf("Expected no error in applyThreshold, but got: %v", err) + } + if tt.errCount > 0 { + if u, ok := err.(interface{ Unwrap() []error }); ok { + if len(u.Unwrap()) != tt.errCount { + t.Errorf("Expected %d errors, got %d", tt.errCount, len(u.Unwrap())) + } + } else { + t.Errorf("Expected joined error with %d errors, got type %T: %v", tt.errCount, err, err) + } } } else { if tt.wantErr { - t.Errorf("Expected error %v in compareMetricWithThreshold, but got nil", tt.wantErr) + t.Errorf("Expected error %v in applyThreshold, but got nil", tt.wantErr) + } + } + }) + } +} + +// mockDataCollector always returns the same data items, to be used for mocking data collector in unit tests. +type mockDataCollector struct { + dataItems []DataItem +} + +// init does nothing. +func (mc *mockDataCollector) init() error { + return nil +} + +// run does nothing. +func (mc *mockDataCollector) run(_ ktesting.TContext) {} + +// collect always returns DataItems defined in the collector. +func (mc *mockDataCollector) collect() []DataItem { + return mc.dataItems +} + +func TestMetricThreshold(t *testing.T) { + testCases := []struct { + name string + thresholdValue float64 + dataItems []DataItem + thresholdMetricSelector *thresholdMetricSelector + expectCollectionFailure bool + expectedDataItemsWithThresholdIndices []int + expectedThresholdName string + }{ + { + name: "value is above threshold, no error", + thresholdValue: 100, + dataItems: []DataItem{ + { + Data: map[string]float64{ + "Average": 150, + }, + Labels: map[string]string{ + "Metric": "throughput", + }, + }, + }, + thresholdMetricSelector: &thresholdMetricSelector{ + Name: "throughput", + DataBucket: "Average", + }, + expectedDataItemsWithThresholdIndices: []int{0}, + expectedThresholdName: "AverageThreshold", + }, + { + name: "value is below threshold, expect error", + thresholdValue: 100, + dataItems: []DataItem{ + { + Data: map[string]float64{ + "Average": 70, + "Max": 90, + }, + Labels: map[string]string{ + "Metric": "throughput", + }, + }, + }, + thresholdMetricSelector: &thresholdMetricSelector{ + Name: "throughput", + DataBucket: "Max", + }, + expectCollectionFailure: true, + expectedDataItemsWithThresholdIndices: []int{0}, + expectedThresholdName: "MaxThreshold", + }, + { + name: "no error if the labels do not match", + thresholdValue: 100, + dataItems: []DataItem{ + { + Data: map[string]float64{ + "Average": 70, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value", + }, + }, + }, + thresholdMetricSelector: &thresholdMetricSelector{ + Name: "throughput", + DataBucket: "Average", + Labels: map[string]string{ + "label": "value2", + }, + }, + expectedDataItemsWithThresholdIndices: []int{}, + expectedThresholdName: "AverageThreshold", + }, + { + name: "out of multiple data items only matching are selected", + thresholdValue: 100, + dataItems: []DataItem{ + { + Data: map[string]float64{ + "Average": 70, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value", + }, + }, + { + Data: map[string]float64{ + "Average": 150, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value2", + }, + }, + }, + thresholdMetricSelector: &thresholdMetricSelector{ + Name: "throughput", + DataBucket: "Average", + Labels: map[string]string{ + "label": "value2", + }, + }, + expectedDataItemsWithThresholdIndices: []int{1}, + expectedThresholdName: "AverageThreshold", + }, + { + name: "threshold value is added for all matching entries", + thresholdValue: 100, + dataItems: []DataItem{ + { + Data: map[string]float64{ + "Average": 130, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value", + }, + }, + { + Data: map[string]float64{ + "Average": 150, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value2", + }, + }, + }, + thresholdMetricSelector: &thresholdMetricSelector{ + Name: "throughput", + DataBucket: "Average", + }, + expectedDataItemsWithThresholdIndices: []int{0, 1}, + expectedThresholdName: "AverageThreshold", + }, + { + name: "threshold value is added for all matching entries even with error", + thresholdValue: 100, + dataItems: []DataItem{ + { + Data: map[string]float64{ + "Average": 70, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value", + }, + }, + { + Data: map[string]float64{ + "Average": 80, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value2", + }, + }, + { + Data: map[string]float64{ + "Average": 130, + }, + Labels: map[string]string{ + "Metric": "throughput", + "label": "value3", + }, + }, + }, + thresholdMetricSelector: &thresholdMetricSelector{ + Name: "throughput", + DataBucket: "Average", + }, + expectCollectionFailure: true, + expectedDataItemsWithThresholdIndices: []int{0, 1, 2}, + expectedThresholdName: "AverageThreshold", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, tCtx := ktesting.NewTestContext(t) + var capturedErr error + capturingCtx, finalize := tCtx.WithError(&capturedErr) + defer finalize() + + originalGetTestDataCollectors := getTestDataCollectors + defer func() { getTestDataCollectors = originalGetTestDataCollectors }() + getTestDataCollectors = func(_ coreinformers.PodInformer, _ string, _ []string, _ map[string]string, _ *metricsCollectorConfig, _ float64) []testDataCollector { + return []testDataCollector{&mockDataCollector{dataItems: tc.dataItems}} + } + + workload := &workload{ + Name: "some/workload", + Threshold: thresholds{ + valuesByTopic: map[string]float64{"example": tc.thresholdValue}, + }, + ThresholdMetricSelector: tc.thresholdMetricSelector, + } + exec := &WorkloadExecutor{ + topicName: "example", + testCase: &testCase{}, + tCtx: capturingCtx, + numPodsScheduledPerNamespace: make(map[string]int), + workload: workload, + } + + start := &startCollectingMetricsOp{ + Opcode: startCollectingMetricsOpcode, + Name: "test-collection", + Namespaces: []string{"test-namespaces"}, + } + err := exec.runOp(start, 0) + if err != nil { + t.Fatalf("Failed to start metric collection") + } + stop := &stopCollectingMetricsOp{Opcode: stopCollectingMetricsOpcode} + err = exec.runOp(stop, 0) + if err != nil { + t.Fatalf("Failed to stop metric collection") + } + + if tc.expectCollectionFailure != capturingCtx.Failed() { + t.Fatalf("expectCollectionFailure=%v but got %v", tc.expectCollectionFailure, capturingCtx.Failed()) + } + for _, idx := range tc.expectedDataItemsWithThresholdIndices { + if idx >= len(exec.dataItems) { + t.Fatalf("expectedDataItemsWithThresholdIndex out of data items range") + } + if _, ok := exec.dataItems[idx].Data[tc.expectedThresholdName]; !ok { + t.Fatalf("expected data item at index=%d to have %s field", idx, tc.expectedThresholdName) } } })