package util import ( "context" "net" "net/http" "os" "strconv" "time" "github.com/k3s-io/k3s/pkg/signals" "github.com/k3s-io/k3s/pkg/util/errors" "github.com/rancher/wrangler/v3/pkg/schemes" "github.com/sirupsen/logrus" authorizationv1 "k8s.io/api/authorization/v1" v1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1" coregetter "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" ) // This sets a default duration to wait for the apiserver to become ready. This is primarily used to // block startup of agent supervisor controllers until the apiserver is ready to serve requests, in the // same way that the apiReady channel is used in the server packages, so it can be fairly long. It must // be at least long enough for downstream projects like RKE2 to start the apiserver in the background. const DefaultAPIServerReadyTimeout = 15 * time.Minute func GetAddresses(endpoint *v1.Endpoints) []string { serverAddresses := []string{} if endpoint == nil { return serverAddresses } for _, subset := range endpoint.Subsets { var port string if len(subset.Ports) > 0 { port = strconv.Itoa(int(subset.Ports[0].Port)) } if port == "" { port = "443" } for _, address := range subset.Addresses { serverAddresses = append(serverAddresses, net.JoinHostPort(address.IP, port)) } } return serverAddresses } func GetAddressesFromSlices(slices ...discoveryv1.EndpointSlice) []string { serverAddresses := []string{} for _, slice := range slices { var port string if len(slice.Ports) > 0 && slice.Ports[0].Port != nil { port = strconv.Itoa(int(*slice.Ports[0].Port)) } if port == "" { port = "443" } for _, endpoint := range slice.Endpoints { if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready { for _, address := range endpoint.Addresses { serverAddresses = append(serverAddresses, net.JoinHostPort(address, port)) } } } } return serverAddresses } // WaitForAPIServerReady waits for the API server's /readyz endpoint to report "ok" with timeout. // This is modified from WaitForAPIServer from the Kubernetes controller-manager app, but checks the // readyz endpoint instead of the deprecated healthz endpoint, and supports context. func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout time.Duration) error { lastErr := errors.New("API server not polled") restConfig, err := GetRESTConfig(kubeconfigPath) if err != nil { return err } // Probe apiserver readiness with a 15 second timeout // https://github.com/kubernetes/kubernetes/blob/v1.24.0/cmd/kubeadm/app/util/staticpod/utils.go#L252 restConfig.Timeout = time.Second * 15 // By default, idle connections to the apiserver are returned to a global pool // between requests. Explicitly flag this client's request for closure so that // we re-dial through the loadbalancer in case the endpoints have changed. restConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { return roundTripFunc(func(req *http.Request) (*http.Response, error) { req.Close = true return rt.RoundTrip(req) }) }) restConfig = dynamic.ConfigFor(restConfig) restConfig.GroupVersion = &schema.GroupVersion{} restClient, err := rest.RESTClientFor(restConfig) if err != nil { return err } err = wait.PollUntilContextTimeout(ctx, time.Second*2, timeout, true, func(ctx context.Context) (bool, error) { // DoRaw returns an error if the response code is < 200 OK or > 206 Partial Content if _, err := restClient.Get().AbsPath("/readyz").Param("verbose", "").DoRaw(ctx); err != nil { if err.Error() != lastErr.Error() { logrus.Infof("Polling for API server readiness: GET /readyz failed: %v", err) } else { logrus.Debug("Polling for API server readiness: GET /readyz failed: status unchanged") } lastErr = err return false, nil } return true, nil }) if err != nil { return errors.Join(err, lastErr) } return nil } // APIServerReadyChan wraps WaitForAPIServerReady, returning a channel that // is closed when the apiserver is ready. If the apiserver does not become // ready within the expected duration, a fatal error is raised. func APIServerReadyChan(ctx context.Context, kubeConfig string, timeout time.Duration) <-chan struct{} { ready := make(chan struct{}) go func() { if err := WaitForAPIServerReady(ctx, kubeConfig, timeout); err != nil { signals.RequestShutdown(errors.WithMessage(err, "failed to wait for API server to become ready")) return } close(ready) }() return ready } type genericAccessReviewRequest func(context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) // WaitForRBACReady polls an AccessReview request until it returns an allowed response. If the user // and group are empty, it uses SelfSubjectAccessReview, otherwise SubjectAccessReview is used. It // will return an error if the timeout expires, or nil if the SubjectAccessReviewStatus indicates // the access would be allowed. func WaitForRBACReady(ctx context.Context, kubeconfigPath string, timeout time.Duration, ra authorizationv1.ResourceAttributes, user string, groups ...string) error { var lastErr error restConfig, err := GetRESTConfig(kubeconfigPath) if err != nil { return err } authClient, err := authorizationv1client.NewForConfig(restConfig) if err != nil { return err } var reviewFunc genericAccessReviewRequest if len(user) == 0 && len(groups) == 0 { reviewFunc = selfSubjectAccessReview(authClient, ra) } else { reviewFunc = subjectAccessReview(authClient, ra, user, groups) } err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) { status, rerr := reviewFunc(ctx) if rerr != nil { lastErr = rerr return false, nil } if status.Allowed { return true, nil } lastErr = errors.New(status.Reason) return false, nil }) if err != nil { return errors.Join(err, lastErr) } return nil } // CheckRBAC performs a single SelfSubjectAccessReview or SubjectAccessReview, returning a // boolean indicating whether or not the requested access would be allowed. This is basically // `kubectl auth can-i`. func CheckRBAC(ctx context.Context, kubeconfigPath string, ra authorizationv1.ResourceAttributes, user string, groups ...string) (bool, error) { restConfig, err := GetRESTConfig(kubeconfigPath) if err != nil { return false, err } authClient, err := authorizationv1client.NewForConfig(restConfig) if err != nil { return false, err } var reviewFunc genericAccessReviewRequest if len(user) == 0 && len(groups) == 0 { reviewFunc = selfSubjectAccessReview(authClient, ra) } else { reviewFunc = subjectAccessReview(authClient, ra, user, groups) } status, err := reviewFunc(ctx) if err != nil { return false, err } return status.Allowed, nil } // selfSubjectAccessReview returns a function that makes SelfSubjectAccessReview requests using the // provided client and attributes, returning a status or error. func selfSubjectAccessReview(authClient *authorizationv1client.AuthorizationV1Client, ra authorizationv1.ResourceAttributes) genericAccessReviewRequest { return func(ctx context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) { r, err := authClient.SelfSubjectAccessReviews().Create(ctx, &authorizationv1.SelfSubjectAccessReview{ Spec: authorizationv1.SelfSubjectAccessReviewSpec{ ResourceAttributes: &ra, }, }, metav1.CreateOptions{}) if err != nil { return nil, err } return &r.Status, nil } } // subjectAccessReview returns a function that makes SubjectAccessReview requests using the // provided client, attributes, user, and group, returning a status or error. func subjectAccessReview(authClient *authorizationv1client.AuthorizationV1Client, ra authorizationv1.ResourceAttributes, user string, groups []string) genericAccessReviewRequest { return func(ctx context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) { r, err := authClient.SubjectAccessReviews().Create(ctx, &authorizationv1.SubjectAccessReview{ Spec: authorizationv1.SubjectAccessReviewSpec{ ResourceAttributes: &ra, User: user, Groups: groups, }, }, metav1.CreateOptions{}) if err != nil { return nil, err } return &r.Status, nil } } func BuildControllerEventRecorder(ctx context.Context, k8s clientset.Interface, controllerName, namespace string) record.EventRecorder { logrus.Infof("Creating %s event broadcaster", controllerName) eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) eventBroadcaster.StartStructuredLogging(0) eventBroadcaster.StartRecordingToSink(&coregetter.EventSinkImpl{Interface: k8s.CoreV1().Events(namespace)}) nodeName := os.Getenv("NODE_NAME") return eventBroadcaster.NewRecorder(schemes.All, v1.EventSource{Component: controllerName, Host: nodeName}) } type roundTripFunc func(req *http.Request) (*http.Response, error) func (w roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { return w(req) }