k3s/pkg/util/api.go
Brad Davidson 4974fc7c24 Use sync.WaitGroup to avoid exiting before components have shut down
Currently only waits on etcd and kine, as other components
are stateless and do not need to shut down cleanly.

Terminal but non-fatal errors now request shutdown via context
cancellation, instead of just logging a fatal error.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2025-09-17 09:37:08 -07:00

267 lines
9.2 KiB
Go

package util
import (
"context"
"errors"
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/k3s-io/k3s/pkg/signals"
pkgerrors "github.com/pkg/errors"
"github.com/rancher/wrangler/v3/pkg/merr"
"github.com/rancher/wrangler/v3/pkg/schemes"
"github.com/sirupsen/logrus"
authorizationv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
authorizationv1client "k8s.io/client-go/kubernetes/typed/authorization/v1"
coregetter "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
)
// This sets a default duration to wait for the apiserver to become ready. This is primarily used to
// block startup of agent supervisor controllers until the apiserver is ready to serve requests, in the
// same way that the apiReady channel is used in the server packages, so it can be fairly long. It must
// be at least long enough for downstream projects like RKE2 to start the apiserver in the background.
const DefaultAPIServerReadyTimeout = 15 * time.Minute
func GetAddresses(endpoint *v1.Endpoints) []string {
serverAddresses := []string{}
if endpoint == nil {
return serverAddresses
}
for _, subset := range endpoint.Subsets {
var port string
if len(subset.Ports) > 0 {
port = strconv.Itoa(int(subset.Ports[0].Port))
}
if port == "" {
port = "443"
}
for _, address := range subset.Addresses {
serverAddresses = append(serverAddresses, net.JoinHostPort(address.IP, port))
}
}
return serverAddresses
}
func GetAddressesFromSlices(slices ...discoveryv1.EndpointSlice) []string {
serverAddresses := []string{}
for _, slice := range slices {
var port string
if len(slice.Ports) > 0 && slice.Ports[0].Port != nil {
port = strconv.Itoa(int(*slice.Ports[0].Port))
}
if port == "" {
port = "443"
}
for _, endpoint := range slice.Endpoints {
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true {
for _, address := range endpoint.Addresses {
serverAddresses = append(serverAddresses, net.JoinHostPort(address, port))
}
}
}
}
return serverAddresses
}
// WaitForAPIServerReady waits for the API server's /readyz endpoint to report "ok" with timeout.
// This is modified from WaitForAPIServer from the Kubernetes controller-manager app, but checks the
// readyz endpoint instead of the deprecated healthz endpoint, and supports context.
func WaitForAPIServerReady(ctx context.Context, kubeconfigPath string, timeout time.Duration) error {
lastErr := errors.New("API server not polled")
restConfig, err := GetRESTConfig(kubeconfigPath)
if err != nil {
return err
}
// Probe apiserver readiness with a 15 second timeout
// https://github.com/kubernetes/kubernetes/blob/v1.24.0/cmd/kubeadm/app/util/staticpod/utils.go#L252
restConfig.Timeout = time.Second * 15
// By default, idle connections to the apiserver are returned to a global pool
// between requests. Explicitly flag this client's request for closure so that
// we re-dial through the loadbalancer in case the endpoints have changed.
restConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return roundTripFunc(func(req *http.Request) (*http.Response, error) {
req.Close = true
return rt.RoundTrip(req)
})
})
restConfig = dynamic.ConfigFor(restConfig)
restConfig.GroupVersion = &schema.GroupVersion{}
restClient, err := rest.RESTClientFor(restConfig)
if err != nil {
return err
}
err = wait.PollUntilContextTimeout(ctx, time.Second*2, timeout, true, func(ctx context.Context) (bool, error) {
// DoRaw returns an error if the response code is < 200 OK or > 206 Partial Content
if _, err := restClient.Get().AbsPath("/readyz").Param("verbose", "").DoRaw(ctx); err != nil {
if err.Error() != lastErr.Error() {
logrus.Infof("Polling for API server readiness: GET /readyz failed: %v", err)
} else {
logrus.Debug("Polling for API server readiness: GET /readyz failed: status unchanged")
}
lastErr = err
return false, nil
}
return true, nil
})
if err != nil {
return merr.NewErrors(err, lastErr)
}
return nil
}
// APIServerReadyChan wraps WaitForAPIServerReady, returning a channel that
// is closed when the apiserver is ready. If the apiserver does not become
// ready within the expected duration, a fatal error is raised.
func APIServerReadyChan(ctx context.Context, kubeConfig string, timeout time.Duration) <-chan struct{} {
ready := make(chan struct{})
go func() {
if err := WaitForAPIServerReady(ctx, kubeConfig, timeout); err != nil {
signals.RequestShutdown(pkgerrors.WithMessage(err, "failed to wait for API server to become ready"))
return
}
close(ready)
}()
return ready
}
type genericAccessReviewRequest func(context.Context) (*authorizationv1.SubjectAccessReviewStatus, error)
// WaitForRBACReady polls an AccessReview request until it returns an allowed response. If the user
// and group are empty, it uses SelfSubjectAccessReview, otherwise SubjectAccessReview is used. It
// will return an error if the timeout expires, or nil if the SubjectAccessReviewStatus indicates
// the access would be allowed.
func WaitForRBACReady(ctx context.Context, kubeconfigPath string, timeout time.Duration, ra authorizationv1.ResourceAttributes, user string, groups ...string) error {
var lastErr error
restConfig, err := GetRESTConfig(kubeconfigPath)
if err != nil {
return err
}
authClient, err := authorizationv1client.NewForConfig(restConfig)
if err != nil {
return err
}
var reviewFunc genericAccessReviewRequest
if len(user) == 0 && len(groups) == 0 {
reviewFunc = selfSubjectAccessReview(authClient, ra)
} else {
reviewFunc = subjectAccessReview(authClient, ra, user, groups)
}
err = wait.PollUntilContextTimeout(ctx, time.Second, timeout, true, func(ctx context.Context) (bool, error) {
status, rerr := reviewFunc(ctx)
if rerr != nil {
lastErr = rerr
return false, nil
}
if status.Allowed {
return true, nil
}
lastErr = errors.New(status.Reason)
return false, nil
})
if err != nil {
return merr.NewErrors(err, lastErr)
}
return nil
}
// CheckRBAC performs a single SelfSubjectAccessReview or SubjectAccessReview, returning a
// boolean indicating whether or not the requested access would be allowed. This is basically
// `kubectl auth can-i`.
func CheckRBAC(ctx context.Context, kubeconfigPath string, ra authorizationv1.ResourceAttributes, user string, groups ...string) (bool, error) {
restConfig, err := GetRESTConfig(kubeconfigPath)
if err != nil {
return false, err
}
authClient, err := authorizationv1client.NewForConfig(restConfig)
if err != nil {
return false, err
}
var reviewFunc genericAccessReviewRequest
if len(user) == 0 && len(groups) == 0 {
reviewFunc = selfSubjectAccessReview(authClient, ra)
} else {
reviewFunc = subjectAccessReview(authClient, ra, user, groups)
}
status, err := reviewFunc(ctx)
if err != nil {
return false, err
}
return status.Allowed, nil
}
// selfSubjectAccessReview returns a function that makes SelfSubjectAccessReview requests using the
// provided client and attributes, returning a status or error.
func selfSubjectAccessReview(authClient *authorizationv1client.AuthorizationV1Client, ra authorizationv1.ResourceAttributes) genericAccessReviewRequest {
return func(ctx context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) {
r, err := authClient.SelfSubjectAccessReviews().Create(ctx, &authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &ra,
},
}, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return &r.Status, nil
}
}
// subjectAccessReview returns a function that makes SubjectAccessReview requests using the
// provided client, attributes, user, and group, returning a status or error.
func subjectAccessReview(authClient *authorizationv1client.AuthorizationV1Client, ra authorizationv1.ResourceAttributes, user string, groups []string) genericAccessReviewRequest {
return func(ctx context.Context) (*authorizationv1.SubjectAccessReviewStatus, error) {
r, err := authClient.SubjectAccessReviews().Create(ctx, &authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
ResourceAttributes: &ra,
User: user,
Groups: groups,
},
}, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return &r.Status, nil
}
}
func BuildControllerEventRecorder(k8s clientset.Interface, controllerName, namespace string) record.EventRecorder {
logrus.Infof("Creating %s event broadcaster", controllerName)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&coregetter.EventSinkImpl{Interface: k8s.CoreV1().Events(namespace)})
nodeName := os.Getenv("NODE_NAME")
return eventBroadcaster.NewRecorder(schemes.All, v1.EventSource{Component: controllerName, Host: nodeName})
}
type roundTripFunc func(req *http.Request) (*http.Response, error)
func (w roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return w(req)
}