Move embed into separate package from executor

Better isolates the K3s implementation from the interface, and aligns
the package path with other projects executors. This should also remove
the indirect flannel dep from other projects that don't use the embedded
executor.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
This commit is contained in:
Brad Davidson 2025-12-06 00:13:56 +00:00 committed by Brad Davidson
parent d582a0da84
commit c3ca02aa75
9 changed files with 153 additions and 63 deletions

View file

@ -25,6 +25,8 @@ import (
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
crictl2 "sigs.k8s.io/cri-tools/cmd/crictl"
_ "github.com/k3s-io/k3s/pkg/executor/embed"
)
func init() {

View file

@ -17,6 +17,8 @@ import (
"github.com/k3s-io/k3s/pkg/configfilearg"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
_ "github.com/k3s-io/k3s/pkg/executor/embed"
)
func main() {

View file

@ -2,6 +2,7 @@ package executor
import (
"context"
"errors"
"net/http"
"os"
"path/filepath"
@ -19,6 +20,8 @@ import (
var (
executor Executor
ErrNotInitialized = errors.New("executor not initialized")
)
// TestFunc is the signature of a function that returns nil error when the component is ready.
@ -152,54 +155,93 @@ func Set(driver Executor) {
}
func Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
if executor == nil {
return ErrNotInitialized
}
return executor.Bootstrap(ctx, nodeConfig, cfg)
}
func Kubelet(ctx context.Context, args []string) error {
if executor == nil {
return ErrNotInitialized
}
return executor.Kubelet(ctx, args)
}
func KubeProxy(ctx context.Context, args []string) error {
if executor == nil {
return ErrNotInitialized
}
return executor.KubeProxy(ctx, args)
}
func APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) {
if executor == nil {
return nil, nil, ErrNotInitialized
}
return executor.APIServerHandlers(ctx)
}
func APIServer(ctx context.Context, args []string) error {
if executor == nil {
return ErrNotInitialized
}
return executor.APIServer(ctx, args)
}
func Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error {
if executor == nil {
return ErrNotInitialized
}
return executor.Scheduler(ctx, nodeReady, args)
}
func ControllerManager(ctx context.Context, args []string) error {
if executor == nil {
return ErrNotInitialized
}
return executor.ControllerManager(ctx, args)
}
func CurrentETCDOptions() (InitialOptions, error) {
if executor == nil {
return InitialOptions{}, ErrNotInitialized
}
return executor.CurrentETCDOptions()
}
func ETCD(ctx context.Context, wg *sync.WaitGroup, args *ETCDConfig, extraArgs []string, test TestFunc) error {
if executor == nil {
return ErrNotInitialized
}
return executor.ETCD(ctx, wg, args, extraArgs, test)
}
func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {
if executor == nil {
return ErrNotInitialized
}
return executor.CloudControllerManager(ctx, ccmRBACReady, args)
}
func Containerd(ctx context.Context, config *daemonconfig.Node) error {
if executor == nil {
return ErrNotInitialized
}
return executor.Containerd(ctx, config)
}
func Docker(ctx context.Context, config *daemonconfig.Node) error {
if executor == nil {
return ErrNotInitialized
}
return executor.Docker(ctx, config)
}
func CRI(ctx context.Context, config *daemonconfig.Node) error {
if executor == nil {
return ErrNotInitialized
}
return executor.CRI(ctx, config)
}
@ -208,18 +250,30 @@ func CNI(ctx context.Context, wg *sync.WaitGroup, config *daemonconfig.Node) err
}
func APIServerReadyChan() <-chan struct{} {
if executor == nil {
return nil
}
return executor.APIServerReadyChan()
}
func ETCDReadyChan() <-chan struct{} {
if executor == nil {
return nil
}
return executor.ETCDReadyChan()
}
func CRIReadyChan() <-chan struct{} {
if executor == nil {
return nil
}
return executor.CRIReadyChan()
}
func IsSelfHosted() bool {
if executor == nil {
return false
}
return executor.IsSelfHosted()
}

View file

@ -28,6 +28,7 @@ import (
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/etcd/s3"
"github.com/k3s-io/k3s/pkg/etcd/snapshot"
embedded "github.com/k3s-io/k3s/pkg/executor/embed/etcd"
"github.com/k3s-io/k3s/pkg/server/auth"
"github.com/k3s-io/k3s/pkg/signals"
"github.com/k3s-io/k3s/pkg/util"
@ -1125,8 +1126,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context, wg *sync.WaitGroup) e
return err
}
embedded := executor.Embedded{}
return embedded.ETCD(ctx, wg, &executor.ETCDConfig{
return embedded.StartETCD(ctx, wg, &executor.ETCDConfig{
InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL},
DataDir: tmpDataDir,
ForceNewCluster: true,
@ -1146,7 +1146,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context, wg *sync.WaitGroup) e
},
ExperimentalInitialCorruptCheck: true,
ExperimentalWatchProgressNotifyInterval: e.config.Datastore.NotifyInterval,
}, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0"), e.Test)
}, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0"))
}
func addPort(address string, offset int) (string, error) {

View file

@ -1,7 +1,7 @@
//go:build !no_embedded_executor
// +build !no_embedded_executor
package executor
package embed
import (
"context"
@ -24,6 +24,8 @@ import (
"github.com/k3s-io/k3s/pkg/agent/netpol"
"github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/executor/embed/etcd"
"github.com/k3s-io/k3s/pkg/signals"
"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
@ -52,7 +54,17 @@ import (
var once sync.Once
func init() {
executor = &Embedded{}
executor.Set(&Embedded{})
}
// explicit type check
var _ executor.Executor = &Embedded{}
type Embedded struct {
apiServerReady <-chan struct{}
etcdReady chan struct{}
criReady chan struct{}
nodeConfig *daemonconfig.Node
}
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
@ -312,24 +324,50 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan
return nil
}
func (e *Embedded) CurrentETCDOptions() (InitialOptions, error) {
return InitialOptions{}, nil
func (e *Embedded) CurrentETCDOptions() (executor.InitialOptions, error) {
return executor.InitialOptions{}, nil
}
func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string, test executor.TestFunc) error {
// Start a goroutine to call the provided test function until it returns true.
// The test function is reponsible for ensuring that the etcd server is up
// and ready to accept client requests.
if e.etcdReady != nil {
go func() {
for {
if err := test(ctx, true); err != nil {
logrus.Infof("Failed to test etcd connection: %v", err)
} else {
logrus.Info("Connection to etcd is ready")
close(e.etcdReady)
return
}
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return
}
}
}()
}
return etcd.StartETCD(ctx, wg, args, extraArgs)
}
func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error {
return CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady)
return executor.CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady)
}
func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error {
return CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady)
return executor.CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady)
}
func (e *Embedded) CRI(ctx context.Context, cfg *daemonconfig.Node) error {
// agentless sets cri socket path to /dev/null to indicate no CRI is needed
if cfg.ContainerRuntimeEndpoint != "/dev/null" {
return CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady)
return executor.CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady)
}
return CloseIfNilErr(nil, e.criReady)
return executor.CloseIfNilErr(nil, e.criReady)
}
func (e *Embedded) CNI(ctx context.Context, wg *sync.WaitGroup, cfg *daemonconfig.Node) error {

View file

@ -1,13 +1,10 @@
//go:build linux && !no_embedded_executor
// +build linux,!no_embedded_executor
package executor
package embed
import (
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
// registering k3s cloud provider
_ "github.com/k3s-io/k3s/pkg/cloudprovider"
)
func platformKubeProxyArgs(nodeConfig *daemonconfig.Node) map[string]string {

View file

@ -1,7 +1,7 @@
//go:build windows && !no_embedded_executor
// +build windows,!no_embedded_executor
package executor
package embed
import (
"encoding/json"
@ -11,11 +11,8 @@ import (
"time"
"github.com/Microsoft/hcsshim"
"github.com/sirupsen/logrus"
// registering k3s cloud provider
_ "github.com/k3s-io/k3s/pkg/cloudprovider"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/sirupsen/logrus"
)
const (

View file

@ -1,4 +1,4 @@
package executor
package etcd
import (
"context"
@ -6,49 +6,19 @@ import (
"os"
"path/filepath"
"sync"
"time"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/version"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
)
// Embedded is defined here so that we can use embedded.ETCD even when the rest
// of the embedded execututor is disabled by build flags
type Embedded struct {
apiServerReady <-chan struct{}
etcdReady chan struct{}
criReady chan struct{}
nodeConfig *daemonconfig.Node
}
func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *ETCDConfig, extraArgs []string, test TestFunc) error {
// An unbootstrapped executor is used to start up a temporary embedded etcd when reconciling.
// This temporary executor doesn't have any ready channels set up, so don't bother testing.
if e.etcdReady != nil {
go func() {
for {
if err := test(ctx, true); err != nil {
logrus.Infof("Failed to test etcd connection: %v", err)
} else {
logrus.Info("Connection to etcd is ready")
close(e.etcdReady)
return
}
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return
}
}
}()
}
// nil args indicates a no-op start; all we need to do is wait for the test
// func to indicate readiness and close the channel.
// StartETCD runs an embedded etcd server instance with the provided config.
// This function will return if the server has been successfully started.
// The server will continue to run until the context is cancelled or some internal error occurs.
func StartETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string) error {
// nil args indicates a no-op start
if args == nil {
return nil
}

View file

@ -1,9 +1,14 @@
package mock
import (
"context"
sync "sync"
"testing"
cmds "github.com/k3s-io/k3s/pkg/cli/cmds"
daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config"
executor "github.com/k3s-io/k3s/pkg/daemons/executor"
"github.com/k3s-io/k3s/pkg/executor/embed/etcd"
"go.uber.org/mock/gomock"
)
@ -13,13 +18,13 @@ import (
func NewExecutorWithEmbeddedETCD(t *testing.T) *Executor {
mockController := gomock.NewController(t)
mockExecutor := NewExecutor(mockController)
fake := &fakeExecutor{}
embed := &executor.Embedded{}
mockExecutor.EXPECT().Bootstrap(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(embed.Bootstrap)
mockExecutor.EXPECT().CurrentETCDOptions().AnyTimes().DoAndReturn(embed.CurrentETCDOptions)
mockExecutor.EXPECT().ETCD(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(embed.ETCD)
mockExecutor.EXPECT().ETCDReadyChan().AnyTimes().DoAndReturn(embed.ETCDReadyChan)
mockExecutor.EXPECT().IsSelfHosted().AnyTimes().DoAndReturn(embed.IsSelfHosted)
mockExecutor.EXPECT().Bootstrap(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(fake.Bootstrap)
mockExecutor.EXPECT().CurrentETCDOptions().AnyTimes().DoAndReturn(fake.CurrentETCDOptions)
mockExecutor.EXPECT().ETCD(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(fake.ETCD)
mockExecutor.EXPECT().ETCDReadyChan().AnyTimes().DoAndReturn(fake.ETCDReadyChan)
mockExecutor.EXPECT().IsSelfHosted().AnyTimes().DoAndReturn(fake.IsSelfHosted)
closedChannel := func() <-chan struct{} {
c := make(chan struct{})
@ -33,3 +38,28 @@ func NewExecutorWithEmbeddedETCD(t *testing.T) *Executor {
return mockExecutor
}
type fakeExecutor struct {
etcdReady chan struct{}
}
func (f *fakeExecutor) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
f.etcdReady = make(chan struct{})
return nil
}
func (f *fakeExecutor) CurrentETCDOptions() (executor.InitialOptions, error) {
return executor.InitialOptions{}, nil
}
func (f *fakeExecutor) ETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string, test executor.TestFunc) error {
return etcd.StartETCD(ctx, wg, args, extraArgs)
}
func (f *fakeExecutor) ETCDReadyChan() <-chan struct{} {
return f.etcdReady
}
func (f *fakeExecutor) IsSelfHosted() bool {
return true
}