From 4fbd5c1ed4ff16b880afe6d9ddb2be34b6a35c45 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Sat, 6 Dec 2025 00:13:56 +0000 Subject: [PATCH] Move embed into separate package from executor Better isolates the K3s implementation from the interface, and aligns the package path with other projects executors. This should also remove the indirect flannel dep from other projects that don't use the embedded executor. Signed-off-by: Brad Davidson (cherry picked from commit c3ca02aa758773aa2dea46665973f8022675f988) Signed-off-by: Brad Davidson --- cmd/server/main.go | 2 + main.go | 2 + pkg/daemons/executor/executor.go | 54 +++++++++++++++++++ pkg/etcd/etcd.go | 6 +-- .../executor => executor/embed}/embed.go | 54 ++++++++++++++++--- .../embed}/embed_linux.go | 5 +- .../embed}/embed_windows.go | 7 +-- .../executor => executor/embed/etcd}/etcd.go | 44 +++------------ tests/mock/executor_helpers.go | 42 ++++++++++++--- 9 files changed, 153 insertions(+), 63 deletions(-) rename pkg/{daemons/executor => executor/embed}/embed.go (88%) rename pkg/{daemons/executor => executor/embed}/embed_linux.go (74%) rename pkg/{daemons/executor => executor/embed}/embed_windows.go (95%) rename pkg/{daemons/executor => executor/embed/etcd}/etcd.go (54%) diff --git a/cmd/server/main.go b/cmd/server/main.go index 434b24450fb..97b0449a1bf 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -25,6 +25,8 @@ import ( "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" crictl2 "sigs.k8s.io/cri-tools/cmd/crictl" + + _ "github.com/k3s-io/k3s/pkg/executor/embed" ) func init() { diff --git a/main.go b/main.go index 1502284d582..12123ec2028 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,8 @@ import ( "github.com/k3s-io/k3s/pkg/configfilearg" "github.com/sirupsen/logrus" "github.com/urfave/cli/v2" + + _ "github.com/k3s-io/k3s/pkg/executor/embed" ) func main() { diff --git a/pkg/daemons/executor/executor.go b/pkg/daemons/executor/executor.go index 2fa402379e2..7d092e744d5 100644 --- a/pkg/daemons/executor/executor.go +++ b/pkg/daemons/executor/executor.go @@ -2,6 +2,7 @@ package executor import ( "context" + "errors" "net/http" "os" "path/filepath" @@ -19,6 +20,8 @@ import ( var ( executor Executor + + ErrNotInitialized = errors.New("executor not initialized") ) // TestFunc is the signature of a function that returns nil error when the component is ready. @@ -152,54 +155,93 @@ func Set(driver Executor) { } func Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { + if executor == nil { + return ErrNotInitialized + } return executor.Bootstrap(ctx, nodeConfig, cfg) } func Kubelet(ctx context.Context, args []string) error { + if executor == nil { + return ErrNotInitialized + } return executor.Kubelet(ctx, args) } func KubeProxy(ctx context.Context, args []string) error { + if executor == nil { + return ErrNotInitialized + } return executor.KubeProxy(ctx, args) } func APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) { + if executor == nil { + return nil, nil, ErrNotInitialized + } return executor.APIServerHandlers(ctx) } func APIServer(ctx context.Context, args []string) error { + if executor == nil { + return ErrNotInitialized + } return executor.APIServer(ctx, args) } func Scheduler(ctx context.Context, nodeReady <-chan struct{}, args []string) error { + if executor == nil { + return ErrNotInitialized + } return executor.Scheduler(ctx, nodeReady, args) } func ControllerManager(ctx context.Context, args []string) error { + if executor == nil { + return ErrNotInitialized + } return executor.ControllerManager(ctx, args) } func CurrentETCDOptions() (InitialOptions, error) { + if executor == nil { + return InitialOptions{}, ErrNotInitialized + } return executor.CurrentETCDOptions() } func ETCD(ctx context.Context, wg *sync.WaitGroup, args *ETCDConfig, extraArgs []string, test TestFunc) error { + if executor == nil { + return ErrNotInitialized + } return executor.ETCD(ctx, wg, args, extraArgs, test) } func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error { + if executor == nil { + return ErrNotInitialized + } return executor.CloudControllerManager(ctx, ccmRBACReady, args) } func Containerd(ctx context.Context, config *daemonconfig.Node) error { + if executor == nil { + return ErrNotInitialized + } return executor.Containerd(ctx, config) } func Docker(ctx context.Context, config *daemonconfig.Node) error { + if executor == nil { + return ErrNotInitialized + } return executor.Docker(ctx, config) } func CRI(ctx context.Context, config *daemonconfig.Node) error { + if executor == nil { + return ErrNotInitialized + } return executor.CRI(ctx, config) } @@ -208,18 +250,30 @@ func CNI(ctx context.Context, wg *sync.WaitGroup, config *daemonconfig.Node) err } func APIServerReadyChan() <-chan struct{} { + if executor == nil { + return nil + } return executor.APIServerReadyChan() } func ETCDReadyChan() <-chan struct{} { + if executor == nil { + return nil + } return executor.ETCDReadyChan() } func CRIReadyChan() <-chan struct{} { + if executor == nil { + return nil + } return executor.CRIReadyChan() } func IsSelfHosted() bool { + if executor == nil { + return false + } return executor.IsSelfHosted() } diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 92f0cf2a5a9..51fc2f5847a 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -28,6 +28,7 @@ import ( "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/etcd/s3" "github.com/k3s-io/k3s/pkg/etcd/snapshot" + embedded "github.com/k3s-io/k3s/pkg/executor/embed/etcd" "github.com/k3s-io/k3s/pkg/server/auth" "github.com/k3s-io/k3s/pkg/signals" "github.com/k3s-io/k3s/pkg/util" @@ -1125,8 +1126,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context, wg *sync.WaitGroup) e return err } - embedded := executor.Embedded{} - return embedded.ETCD(ctx, wg, &executor.ETCDConfig{ + return embedded.StartETCD(ctx, wg, &executor.ETCDConfig{ InitialOptions: executor.InitialOptions{AdvertisePeerURL: peerURL}, DataDir: tmpDataDir, ForceNewCluster: true, @@ -1146,7 +1146,7 @@ func (e *ETCD) StartEmbeddedTemporary(ctx context.Context, wg *sync.WaitGroup) e }, ExperimentalInitialCorruptCheck: true, ExperimentalWatchProgressNotifyInterval: e.config.Datastore.NotifyInterval, - }, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0"), e.Test) + }, append(e.config.ExtraEtcdArgs, "--max-snapshots=0", "--max-wals=0")) } func addPort(address string, offset int) (string, error) { diff --git a/pkg/daemons/executor/embed.go b/pkg/executor/embed/embed.go similarity index 88% rename from pkg/daemons/executor/embed.go rename to pkg/executor/embed/embed.go index 0c7f39cfecd..df655754186 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/executor/embed/embed.go @@ -1,7 +1,7 @@ //go:build !no_embedded_executor // +build !no_embedded_executor -package executor +package embed import ( "context" @@ -24,6 +24,8 @@ import ( "github.com/k3s-io/k3s/pkg/agent/netpol" "github.com/k3s-io/k3s/pkg/cli/cmds" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/daemons/executor" + "github.com/k3s-io/k3s/pkg/executor/embed/etcd" "github.com/k3s-io/k3s/pkg/signals" "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" @@ -52,7 +54,17 @@ import ( var once sync.Once func init() { - executor = &Embedded{} + executor.Set(&Embedded{}) +} + +// explicit type check +var _ executor.Executor = &Embedded{} + +type Embedded struct { + apiServerReady <-chan struct{} + etcdReady chan struct{} + criReady chan struct{} + nodeConfig *daemonconfig.Node } func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { @@ -312,24 +324,50 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan return nil } -func (e *Embedded) CurrentETCDOptions() (InitialOptions, error) { - return InitialOptions{}, nil +func (e *Embedded) CurrentETCDOptions() (executor.InitialOptions, error) { + return executor.InitialOptions{}, nil +} + +func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string, test executor.TestFunc) error { + // Start a goroutine to call the provided test function until it returns true. + // The test function is reponsible for ensuring that the etcd server is up + // and ready to accept client requests. + if e.etcdReady != nil { + go func() { + for { + if err := test(ctx, true); err != nil { + logrus.Infof("Failed to test etcd connection: %v", err) + } else { + logrus.Info("Connection to etcd is ready") + close(e.etcdReady) + return + } + + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + } + }() + } + return etcd.StartETCD(ctx, wg, args, extraArgs) } func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error { - return CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady) + return executor.CloseIfNilErr(containerd.Run(ctx, cfg), e.criReady) } func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error { - return CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady) + return executor.CloseIfNilErr(cridockerd.Run(ctx, cfg), e.criReady) } func (e *Embedded) CRI(ctx context.Context, cfg *daemonconfig.Node) error { // agentless sets cri socket path to /dev/null to indicate no CRI is needed if cfg.ContainerRuntimeEndpoint != "/dev/null" { - return CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady) + return executor.CloseIfNilErr(cri.WaitForService(ctx, cfg.ContainerRuntimeEndpoint, "CRI"), e.criReady) } - return CloseIfNilErr(nil, e.criReady) + return executor.CloseIfNilErr(nil, e.criReady) } func (e *Embedded) CNI(ctx context.Context, wg *sync.WaitGroup, cfg *daemonconfig.Node) error { diff --git a/pkg/daemons/executor/embed_linux.go b/pkg/executor/embed/embed_linux.go similarity index 74% rename from pkg/daemons/executor/embed_linux.go rename to pkg/executor/embed/embed_linux.go index 0b9f4d5e78f..a41449e32a1 100644 --- a/pkg/daemons/executor/embed_linux.go +++ b/pkg/executor/embed/embed_linux.go @@ -1,13 +1,10 @@ //go:build linux && !no_embedded_executor // +build linux,!no_embedded_executor -package executor +package embed import ( daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" - - // registering k3s cloud provider - _ "github.com/k3s-io/k3s/pkg/cloudprovider" ) func platformKubeProxyArgs(nodeConfig *daemonconfig.Node) map[string]string { diff --git a/pkg/daemons/executor/embed_windows.go b/pkg/executor/embed/embed_windows.go similarity index 95% rename from pkg/daemons/executor/embed_windows.go rename to pkg/executor/embed/embed_windows.go index e65913477c8..cdd5e09d323 100644 --- a/pkg/daemons/executor/embed_windows.go +++ b/pkg/executor/embed/embed_windows.go @@ -1,7 +1,7 @@ //go:build windows && !no_embedded_executor // +build windows,!no_embedded_executor -package executor +package embed import ( "encoding/json" @@ -11,11 +11,8 @@ import ( "time" "github.com/Microsoft/hcsshim" - "github.com/sirupsen/logrus" - - // registering k3s cloud provider - _ "github.com/k3s-io/k3s/pkg/cloudprovider" daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/sirupsen/logrus" ) const ( diff --git a/pkg/daemons/executor/etcd.go b/pkg/executor/embed/etcd/etcd.go similarity index 54% rename from pkg/daemons/executor/etcd.go rename to pkg/executor/embed/etcd/etcd.go index c63890d080f..06f506a7f59 100644 --- a/pkg/daemons/executor/etcd.go +++ b/pkg/executor/embed/etcd/etcd.go @@ -1,4 +1,4 @@ -package executor +package etcd import ( "context" @@ -6,49 +6,19 @@ import ( "os" "path/filepath" "sync" - "time" - daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/daemons/executor" "github.com/k3s-io/k3s/pkg/version" "github.com/sirupsen/logrus" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" ) -// Embedded is defined here so that we can use embedded.ETCD even when the rest -// of the embedded execututor is disabled by build flags -type Embedded struct { - apiServerReady <-chan struct{} - etcdReady chan struct{} - criReady chan struct{} - nodeConfig *daemonconfig.Node -} - -func (e *Embedded) ETCD(ctx context.Context, wg *sync.WaitGroup, args *ETCDConfig, extraArgs []string, test TestFunc) error { - // An unbootstrapped executor is used to start up a temporary embedded etcd when reconciling. - // This temporary executor doesn't have any ready channels set up, so don't bother testing. - if e.etcdReady != nil { - go func() { - for { - if err := test(ctx, true); err != nil { - logrus.Infof("Failed to test etcd connection: %v", err) - } else { - logrus.Info("Connection to etcd is ready") - close(e.etcdReady) - return - } - - select { - case <-time.After(5 * time.Second): - case <-ctx.Done(): - return - } - } - }() - } - - // nil args indicates a no-op start; all we need to do is wait for the test - // func to indicate readiness and close the channel. +// StartETCD runs an embedded etcd server instance with the provided config. +// This function will return if the server has been successfully started. +// The server will continue to run until the context is cancelled or some internal error occurs. +func StartETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string) error { + // nil args indicates a no-op start if args == nil { return nil } diff --git a/tests/mock/executor_helpers.go b/tests/mock/executor_helpers.go index 9c3b0112d41..d26a7b5a4dd 100644 --- a/tests/mock/executor_helpers.go +++ b/tests/mock/executor_helpers.go @@ -1,9 +1,14 @@ package mock import ( + "context" + sync "sync" "testing" + cmds "github.com/k3s-io/k3s/pkg/cli/cmds" + daemonconfig "github.com/k3s-io/k3s/pkg/daemons/config" executor "github.com/k3s-io/k3s/pkg/daemons/executor" + "github.com/k3s-io/k3s/pkg/executor/embed/etcd" "go.uber.org/mock/gomock" ) @@ -13,13 +18,13 @@ import ( func NewExecutorWithEmbeddedETCD(t *testing.T) *Executor { mockController := gomock.NewController(t) mockExecutor := NewExecutor(mockController) + fake := &fakeExecutor{} - embed := &executor.Embedded{} - mockExecutor.EXPECT().Bootstrap(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(embed.Bootstrap) - mockExecutor.EXPECT().CurrentETCDOptions().AnyTimes().DoAndReturn(embed.CurrentETCDOptions) - mockExecutor.EXPECT().ETCD(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(embed.ETCD) - mockExecutor.EXPECT().ETCDReadyChan().AnyTimes().DoAndReturn(embed.ETCDReadyChan) - mockExecutor.EXPECT().IsSelfHosted().AnyTimes().DoAndReturn(embed.IsSelfHosted) + mockExecutor.EXPECT().Bootstrap(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(fake.Bootstrap) + mockExecutor.EXPECT().CurrentETCDOptions().AnyTimes().DoAndReturn(fake.CurrentETCDOptions) + mockExecutor.EXPECT().ETCD(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(fake.ETCD) + mockExecutor.EXPECT().ETCDReadyChan().AnyTimes().DoAndReturn(fake.ETCDReadyChan) + mockExecutor.EXPECT().IsSelfHosted().AnyTimes().DoAndReturn(fake.IsSelfHosted) closedChannel := func() <-chan struct{} { c := make(chan struct{}) @@ -33,3 +38,28 @@ func NewExecutorWithEmbeddedETCD(t *testing.T) *Executor { return mockExecutor } + +type fakeExecutor struct { + etcdReady chan struct{} +} + +func (f *fakeExecutor) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { + f.etcdReady = make(chan struct{}) + return nil +} + +func (f *fakeExecutor) CurrentETCDOptions() (executor.InitialOptions, error) { + return executor.InitialOptions{}, nil +} + +func (f *fakeExecutor) ETCD(ctx context.Context, wg *sync.WaitGroup, args *executor.ETCDConfig, extraArgs []string, test executor.TestFunc) error { + return etcd.StartETCD(ctx, wg, args, extraArgs) +} + +func (f *fakeExecutor) ETCDReadyChan() <-chan struct{} { + return f.etcdReady +} + +func (f *fakeExecutor) IsSelfHosted() bool { + return true +}