k3s/pkg/cluster/cluster.go
Brad Davidson 3acf8db8f2 Update packages to remove dep on archived github.com/pkg/errors
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2026-03-09 16:09:01 -07:00

222 lines
7.9 KiB
Go

package cluster
import (
"context"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/cli/cmds"
"github.com/k3s-io/k3s/pkg/clientaccess"
"github.com/k3s-io/k3s/pkg/cluster/managed"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/etcd"
"github.com/k3s-io/k3s/pkg/metrics"
"github.com/k3s-io/k3s/pkg/signals"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/util/errors"
"github.com/k3s-io/kine/pkg/endpoint"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
utilsnet "k8s.io/utils/net"
)
type Cluster struct {
clientAccessInfo *clientaccess.Info
config *config.Control
managedDB managed.Driver
joining bool
storageRunning bool
saveBootstrap bool
cnFilterFunc func(...string) []string
}
// ListenAndServe creates the dynamic tls listener, registers http request
// handlers, and starts the supervisor API server loop.
func (c *Cluster) ListenAndServe(ctx context.Context) error {
// Set up the dynamiclistener and http request handlers
return c.initClusterAndHTTPS(ctx)
}
// Start handles writing/reading bootstrap data. If embedded etcd is in use,
// a secondary call to Cluster.save is made.
func (c *Cluster) Start(ctx context.Context, wg *sync.WaitGroup) error {
// if etcd is disabled or we're using kine, perform a no-op start of etcd
// to close the etcd ready channel. When etcd is in use, this is handled by
// c.start() -> c.managedDB.Start() -> etcd.Start() -> executor.ETCD()
if c.config.DisableETCD {
return executor.ETCD(ctx, wg, nil, nil, func(ctx context.Context, _ bool) error { return c.managedDB.Test(ctx, false) })
}
if c.managedDB == nil {
if err := executor.ETCD(ctx, wg, nil, nil, func(context.Context, bool) error { return nil }); err != nil {
return err
}
}
// start managed etcd database; when kine is in use this is a no-op.
if err := c.start(ctx, wg); err != nil {
return errors.WithMessage(err, "start managed database")
}
// set c.config.Datastore and c.config.Runtime.EtcdConfig with values
// necessary to build etcd clients, and start kine listener if necessary.
if err := c.startStorage(ctx, false); err != nil {
return err
}
// if necessary, store bootstrap data to datastore. saveBootstrap is only set
// when using kine, so this can be done before the ready channel has been closed.
if c.saveBootstrap {
if err := Save(ctx, c.config, false); err != nil {
return err
}
}
if c.managedDB != nil {
go func() {
for {
select {
case <-executor.ETCDReadyChan():
// always save to managed etcd, to ensure that any file modified locally are in sync with the datastore.
// this will fail if multiple keys exist, to prevent nodes from running with different bootstrap data.
if err := Save(ctx, c.config, false); err != nil && !errors.Is(err, context.Canceled) {
signals.RequestShutdown(errors.WithMessage(err, "failed to save bootstrap data"))
return
}
if !c.config.EtcdDisableSnapshots {
// do an initial reconcile of snapshots with a fast retry until it succeeds
wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
return false, nil
}
return true, nil
})
// continue reconciling snapshots in the background at the configured interval.
// the interval is jittered by 5% to avoid all nodes reconciling at the same time.
wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil {
logrus.Errorf("Failed to record snapshots for cluster: %v", err)
}
}, c.config.EtcdSnapshotReconcile.Duration, 0.05, false)
}
return
case <-ctx.Done():
return
}
}
}()
}
return nil
}
// startEtcdProxy starts an etcd load-balancer proxy, for control-plane-only nodes
// without a local datastore.
func (c *Cluster) startEtcdProxy(ctx context.Context) error {
defaultURL, err := url.Parse(c.config.JoinURL)
if err != nil {
return err
}
_, nodeIPs, err := util.GetHostnameAndIPs(cmds.AgentConfig.NodeName, cmds.AgentConfig.NodeIP.Value())
if err != nil {
errors.WithMessage(err, "failed to get node name and addresses")
}
defaultURL.Host = net.JoinHostPort(defaultURL.Hostname(), "2379")
etcdProxy, err := etcd.NewETCDProxy(ctx, c.config.SupervisorPort, c.config.DataDir, defaultURL.String(), utilsnet.IsIPv6(nodeIPs[0]))
if err != nil {
return err
}
// immediately update the load balancer with all etcd addresses
// from /db/info, for a current list of etcd cluster member client URLs.
// client URLs are a full URI, but the proxy only wants host:port
if clientURLs, _, err := etcd.ClientURLs(ctx, c.clientAccessInfo, c.config.PrivateIP); err != nil || len(clientURLs) == 0 {
logrus.Warnf("Failed to get etcd ClientURLs: %v", err)
} else {
for i, c := range clientURLs {
u, err := url.Parse(c)
if err != nil {
return errors.WithMessage(err, "failed to parse etcd ClientURL")
}
clientURLs[i] = u.Host
}
etcdProxy.Update(clientURLs)
}
// start periodic endpoint sync goroutine
c.setupEtcdProxy(ctx, etcdProxy)
// remove etcd member if it exists
if err := c.managedDB.RemoveSelf(ctx); err != nil {
logrus.Warnf("Failed to remove this node from etcd members: %v", err)
}
c.config.Runtime.EtcdConfig.Endpoints = strings.Split(c.config.Datastore.Endpoint, ",")
c.config.Runtime.EtcdConfig.TLSConfig = c.config.Datastore.BackendTLSConfig
return nil
}
// startStorage starts the kine listener and configures the endpoints, if necessary.
// This calls into the kine endpoint code, which sets up the database client
// and unix domain socket listener if using an external database. In the case of an etcd
// backend it just returns the user-provided etcd endpoints and tls config.
func (c *Cluster) startStorage(ctx context.Context, bootstrap bool) error {
if c.storageRunning {
return nil
}
// the datastore is started multiple times when TLS is enabled.
// ensure that storage is no longer marked as running after its context is cancelled.
go func() {
<-ctx.Done()
c.storageRunning = false
}()
c.storageRunning = true
if !bootstrap {
// only register metrics when not bootstrapping, to prevent
// multiple datastore metrics from being registered.
c.config.Datastore.MetricsRegisterer = metrics.DefaultRegisterer
// set the tls config for the kine storage
c.config.Datastore.ServerTLSConfig.CAFile = c.config.Runtime.ETCDServerCA
c.config.Datastore.ServerTLSConfig.CertFile = c.config.Runtime.ServerETCDCert
c.config.Datastore.ServerTLSConfig.KeyFile = c.config.Runtime.ServerETCDKey
}
// start listening on the kine socket as an etcd endpoint, or return the external etcd endpoints
etcdConfig, err := endpoint.Listen(ctx, c.config.Datastore)
if err != nil {
return errors.WithMessage(err, "creating storage endpoint")
}
// Persist the returned etcd configuration. We decide if we're doing leader election for embedded controllers
// based on what the kine wrapper tells us about the datastore. Single-node datastores like sqlite don't require
// leader election, while basically all others (etcd, external database, etc) do since they allow multiple servers.
c.config.Runtime.EtcdConfig = etcdConfig
// after the bootstrap we need to set the args for api-server with kine in unixs or just set the
// values if the datastoreTLS is not enabled
if !bootstrap || !c.config.KineTLS {
c.config.Datastore.BackendTLSConfig = etcdConfig.TLSConfig
c.config.Datastore.Endpoint = strings.Join(etcdConfig.Endpoints, ",")
c.config.NoLeaderElect = !etcdConfig.LeaderElect
}
return nil
}
// New creates an initial cluster using the provided configuration.
func New(config *config.Control) *Cluster {
return &Cluster{
config: config,
}
}