diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 873d5e0bbef..49ff4c5bdb8 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -51,6 +51,7 @@ import ( "k8s.io/client-go/metadata/metadatainformer" restclient "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" certutil "k8s.io/client-go/util/cert" @@ -261,6 +262,7 @@ func Run(ctx context.Context, c *config.CompletedConfig) error { // Start the informers. stopCh := ctx.Done() controllerContext.InformerFactory.Start(stopCh) + defer controllerContext.InformerFactory.Shutdown() controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh) close(controllerContext.InformersStarted) @@ -486,7 +488,12 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo return ControllerContext{}, fmt.Errorf("failed to create Kubernetes client for %q: %w", "shared-informers", err) } - sharedInformers := informers.NewSharedInformerFactoryWithOptions(versionedClient, ResyncPeriod(s)(), informers.WithTransform(trim)) + informerName, err := cache.NewInformerName("kube-controller-manager") + if err != nil { + return ControllerContext{}, fmt.Errorf("failed to create informer name: %w", err) + } + + sharedInformers := informers.NewSharedInformerFactoryWithOptions(versionedClient, ResyncPeriod(s)(), informers.WithTransform(trim), informers.WithInformerName(informerName)) metadataConfig, err := rootClientBuilder.Config("metadata-informers") if err != nil { diff --git a/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/metrics.go b/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/metrics.go index 43574ca9a39..a307a1ab3ec 100644 --- a/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/metrics.go +++ b/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package clientgo import ( + _ "k8s.io/component-base/metrics/prometheus/clientgo/fifo" // load fifo metrics _ "k8s.io/component-base/metrics/prometheus/clientgo/leaderelection" // load leaderelection metrics _ "k8s.io/component-base/metrics/prometheus/restclient" // load restclient metrics _ "k8s.io/component-base/metrics/prometheus/workqueue" // load the workqueue metrics diff --git a/test/integration/serving/serving_test.go b/test/integration/serving/serving_test.go index 12d9a35f734..69cdcaa016f 100644 --- a/test/integration/serving/serving_test.go +++ b/test/integration/serving/serving_test.go @@ -35,6 +35,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" flagzv1alpha1 "k8s.io/apiserver/pkg/server/flagz/api/v1alpha1" "k8s.io/apiserver/pkg/server/statusz/api/v1alpha1" + "k8s.io/client-go/tools/cache" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/options" @@ -43,6 +44,9 @@ import ( cloudctrlmgrtesting "k8s.io/cloud-provider/app/testing" "k8s.io/cloud-provider/fake" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/prometheus/clientgo/fifo" + "k8s.io/component-base/metrics/testutil" zpagesfeatures "k8s.io/component-base/zpages/features" "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" @@ -630,3 +634,151 @@ users: }) } } + +func TestKubeControllerManagerInformerMetrics(t *testing.T) { + // authenticate to apiserver via bearer token + token := "flwqkenfjasasdfmwerasd" // Fake token for testing. + tokenFile, err := os.CreateTemp("", "kubeconfig") + if err != nil { + t.Fatal(err) + } + if _, err = fmt.Fprintf(tokenFile, ` +%s,system:kube-controller-manager,system:kube-controller-manager,"" +`, token); err != nil { + t.Fatal(err) + } + if err = tokenFile.Close(); err != nil { + t.Fatal(err) + } + + // start apiserver + server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{ + "--token-auth-file", tokenFile.Name(), + "--authorization-mode", "RBAC", + }, framework.SharedEtcd()) + defer server.TearDownFn() + + // create kubeconfig for the apiserver + apiserverConfig, err := os.CreateTemp("", "kubeconfig") + if err != nil { + t.Fatal(err) + } + if _, err = fmt.Fprintf(apiserverConfig, ` +apiVersion: v1 +kind: Config +clusters: +- cluster: + server: %s + certificate-authority: %s + name: integration +contexts: +- context: + cluster: integration + user: controller-manager + name: default-context +current-context: default-context +users: +- name: controller-manager + user: + token: %s +`, server.ClientConfig.Host, server.ServerOpts.SecureServing.ServerCert.CertKey.CertFile, token); err != nil { + t.Fatal(err) + } + if err = apiserverConfig.Close(); err != nil { + t.Fatal(err) + } + + legacyregistry.Reset() + cache.ResetInformerNamesForTesting() + fifo.Register() + _, ctx := ktesting.NewTestContext(t) + flags := []string{ + "--authentication-skip-lookup", + "--authentication-kubeconfig", apiserverConfig.Name(), + "--authorization-kubeconfig", apiserverConfig.Name(), + "--authorization-always-allow-paths", "/metrics", + "--kubeconfig", apiserverConfig.Name(), + "--leader-elect=false", + // Enable a minimal set of controllers to ensure informers are started and metrics are produced. + "--controllers=garbagecollector", + } + secureOptions, secureInfo, tearDownFn, err := kubeControllerManagerTester{}.StartTestServer(t, ctx, flags) + if tearDownFn != nil { + defer tearDownFn() + } + if err != nil { + t.Fatalf("StartTestServer() error = %v", err) + } + if secureInfo == nil { + t.Fatalf("SecureServing not enabled") + } + _, port, err := net.SplitHostPort(secureInfo.Listener.Addr().String()) + if err != nil { + t.Fatalf("could not get host and port from %s : %v", secureInfo.Listener.Addr().String(), err) + } + metricsURL := fmt.Sprintf("https://127.0.0.1:%s/metrics", port) + + // read self-signed server cert disk + pool := x509.NewCertPool() + serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt") + serverCert, err := os.ReadFile(serverCertPath) + if err != nil { + t.Fatalf("Failed to read component server cert %q: %v", serverCertPath, err) + } + pool.AppendCertsFromPEM(serverCert) + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: pool, + }, + } + client := &http.Client{Transport: tr} + + req, err := http.NewRequest(http.MethodGet, metricsURL, nil) + if err != nil { + t.Fatal(err) + } + req.Header.Add("Authorization", fmt.Sprintf("Token %s", token)) + r, err := client.Do(req) + if err != nil { + t.Fatalf("failed to GET /metrics: %v", err) + } + + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatalf("failed to read response body: %v", err) + } + if err = r.Body.Close(); err != nil { + t.Fatalf("failed to close response body: %v", err) + } + + if r.StatusCode != http.StatusOK { + t.Fatalf("want status %d, got %d: %s", http.StatusOK, r.StatusCode, string(body)) + } + + // Parse metrics using testutil + metrics := testutil.NewMetrics() + if err := testutil.ParseMetrics(string(body), &metrics); err != nil { + t.Fatalf("failed to parse metrics: %v", err) + } + + // Verify that informer_queued_items metric exists + wantMetricName := "informer_queued_items" + samples, found := metrics[wantMetricName] + if !found { + t.Fatalf("expected metrics to contain %q, but it was not found", wantMetricName) + } + + // Verify that at least one sample has the kube-controller-manager name label + wantLabelValue := testutil.LabelValue("kube-controller-manager") + foundKCM := false + for _, sample := range samples { + if nameLabel, ok := sample.Metric["name"]; ok && nameLabel == wantLabelValue { + foundKCM = true + t.Logf("Found expected metric: %s", sample.String()) + break + } + } + if !foundKCM { + t.Errorf("expected to find informer_queued_items metric with name=\"kube-controller-manager\" label") + } +}