mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-02-03 20:40:26 -05:00
etcd: use Unix Domain socket for testserver
Choosing a port in advance is racy. A better solution is to use a Unix Domain socket in the per-etcd-instance data directory. Then the name can be determined in advance and there's no risk of conflicts with other etcd instances. With unix:// for the endpoint, we have to be a bit more careful about passing a TLS config to the etcd client library because for unix://, in contrast to http://, it tries to use an incomplete config which then fails to establish the connection.
This commit is contained in:
parent
047e4c8e56
commit
8672956f73
4 changed files with 67 additions and 73 deletions
|
|
@ -22,6 +22,7 @@ import (
|
|||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"crypto/rsa"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/pem"
|
||||
|
|
@ -501,26 +502,9 @@ func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions,
|
|||
return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err)
|
||||
}
|
||||
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: storageConfig.Transport.CertFile,
|
||||
KeyFile: storageConfig.Transport.KeyFile,
|
||||
TrustedCAFile: storageConfig.Transport.TrustedCAFile,
|
||||
}
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
etcdClient, _, err := GetEtcdClients(storageConfig.Transport)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
etcdConfig := clientv3.Config{
|
||||
Endpoints: storageConfig.Transport.ServerList,
|
||||
DialTimeout: 20 * time.Second,
|
||||
DialOptions: []grpc.DialOption{
|
||||
grpc.WithBlock(), // block until the underlying connection is up
|
||||
},
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
etcdClient, err := clientv3.New(etcdConfig)
|
||||
if err != nil {
|
||||
return result, err
|
||||
return result, fmt.Errorf("create etcd client: %w", err)
|
||||
}
|
||||
|
||||
// from here the caller must call tearDown
|
||||
|
|
@ -538,6 +522,45 @@ func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions,
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// GetEtcdClients returns an initialized etcd clientv3.Client and clientv3.KV.
|
||||
func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) {
|
||||
// clientv3.New ignores an invalid TLS config for http://, but not for unix:// (https://github.com/etcd-io/etcd/blob/5a8fba466087686fc15815f5bc041fb7eb1f23ea/client/v3/internal/endpoint/endpoint.go#L61-L66).
|
||||
// To support unix://, we must not set Config.TLS unless we really have
|
||||
// transport security.
|
||||
var tlsConfig *tls.Config
|
||||
if config.CertFile != "" ||
|
||||
config.KeyFile != "" ||
|
||||
config.TrustedCAFile != "" {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: config.CertFile,
|
||||
KeyFile: config.KeyFile,
|
||||
TrustedCAFile: config.TrustedCAFile,
|
||||
}
|
||||
|
||||
var err error
|
||||
tlsConfig, err = tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: config.ServerList,
|
||||
DialTimeout: 20 * time.Second,
|
||||
DialOptions: []grpc.DialOption{
|
||||
grpc.WithBlock(), // block until the underlying connection is up
|
||||
},
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
|
||||
c, err := clientv3.New(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return c, clientv3.NewKV(c), nil
|
||||
}
|
||||
|
||||
// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
|
||||
func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
|
||||
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
|
||||
|
|
|
|||
|
|
@ -86,6 +86,12 @@ func DefaultEtcdOptions() *options.EtcdOptions {
|
|||
}
|
||||
|
||||
// SharedEtcd creates a storage config for a shared etcd instance, with a unique prefix.
|
||||
//
|
||||
// The transport CertFile/KeyFile/TrustedCAFile will be empty for insecure connections.
|
||||
// In that case, *no* TLS config should be used because etcd would try to use
|
||||
// it for Unix Domain sockets (https://github.com/etcd-io/etcd/blob/5a8fba466087686fc15815f5bc041fb7eb1f23ea/client/v3/internal/endpoint/endpoint.go#L61-L66)
|
||||
// and fail to connect because the TLS config is insufficient. It works
|
||||
// for TCP because http disables using TLS.
|
||||
func SharedEtcd() *storagebackend.Config {
|
||||
cfg := storagebackend.NewDefaultConfig(path.Join(uuid.New().String(), "registry"), nil)
|
||||
cfg.Transport.ServerList = []string{GetEtcdURL()}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import (
|
|||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
|
@ -51,18 +52,6 @@ func getEtcdPath() (string, error) {
|
|||
return exec.LookPath("etcd")
|
||||
}
|
||||
|
||||
// getAvailablePort returns a TCP port that is available for binding.
|
||||
func getAvailablePort() (int, error) {
|
||||
l, err := net.Listen("tcp", ":0")
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("could not bind to a port: %v", err)
|
||||
}
|
||||
// It is possible but unlikely that someone else will bind this port before we
|
||||
// get a chance to use it.
|
||||
defer l.Close()
|
||||
return l.Addr().(*net.TCPAddr).Port, nil
|
||||
}
|
||||
|
||||
// startEtcd executes an etcd instance. The returned function will signal the
|
||||
// etcd process and wait for it to exit.
|
||||
func startEtcd(output io.Writer, forceCreate bool) (func(), error) {
|
||||
|
|
@ -105,28 +94,30 @@ func RunCustomEtcd(dataDir string, customFlags []string, output io.Writer) (url
|
|||
fmt.Fprint(os.Stderr, installEtcd)
|
||||
return "", nil, fmt.Errorf("could not find etcd in PATH: %v", err)
|
||||
}
|
||||
etcdPort, err := getAvailablePort()
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("could not get a port: %v", err)
|
||||
}
|
||||
customURL := fmt.Sprintf("http://127.0.0.1:%d", etcdPort)
|
||||
|
||||
klog.Infof("starting etcd on %s", customURL)
|
||||
|
||||
etcdDataDir, err := os.MkdirTemp(os.TempDir(), dataDir)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("unable to make temp etcd data dir %s: %v", dataDir, err)
|
||||
}
|
||||
klog.Infof("storing etcd data in: %v", etcdDataDir)
|
||||
etcdSocketPath := path.Join(etcdDataDir, "etcd.sock")
|
||||
customURL := "unix://" + etcdSocketPath
|
||||
|
||||
klog.V(2).InfoS("starting etcd", "url", customURL, "dataDir", etcdDataDir)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
args := []string{
|
||||
"--data-dir",
|
||||
etcdDataDir,
|
||||
"--listen-client-urls",
|
||||
customURL,
|
||||
// This should be how clients connect to etcd, but https://github.com/etcd-io/etcd/pull/12469
|
||||
// apparently was incomplete: trying to pass a Unix Domain URL here is rejected by ectd 3.15.13 with
|
||||
// --advertise-client-urls "unix:///tmp/etcd.sock" must be "host:port" (missing port in address)
|
||||
//
|
||||
// We don't need to advertise the correct address. To prevent connecting to the default URL
|
||||
// in the unlikely case that something does use this URL after all, an invalid URL is set here.
|
||||
"--advertise-client-urls",
|
||||
customURL,
|
||||
"http://127.0.0.111:0",
|
||||
// With :0 we let the kernel pick a unique port. We don't care which port this will be,
|
||||
// no other peer is going to connect.
|
||||
"--listen-peer-urls",
|
||||
"http://127.0.0.1:0",
|
||||
"-log-level",
|
||||
|
|
@ -176,7 +167,7 @@ func RunCustomEtcd(dataDir string, customFlags []string, output io.Writer) (url
|
|||
const pollCount = int32(300)
|
||||
|
||||
for i <= pollCount {
|
||||
conn, err := net.DialTimeout("tcp", strings.TrimPrefix(customURL, "http://"), 1*time.Second)
|
||||
conn, err := net.DialTimeout("unix", etcdSocketPath, 1*time.Second)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
break
|
||||
|
|
|
|||
|
|
@ -21,16 +21,15 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||
)
|
||||
|
||||
// DeletePodOrErrorf deletes a pod or fails with a call to t.Errorf.
|
||||
|
|
@ -67,32 +66,7 @@ func WaitForPodToDisappear(podClient coreclient.PodInterface, podName string, in
|
|||
})
|
||||
}
|
||||
|
||||
// GetEtcdClients returns an initialized clientv3.Client and clientv3.KV.
|
||||
// GetEtcdClients returns an initialized etcd clientv3.Client and clientv3.KV.
|
||||
func GetEtcdClients(config storagebackend.TransportConfig) (*clientv3.Client, clientv3.KV, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: config.CertFile,
|
||||
KeyFile: config.KeyFile,
|
||||
TrustedCAFile: config.TrustedCAFile,
|
||||
}
|
||||
|
||||
tlsConfig, err := tlsInfo.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: config.ServerList,
|
||||
DialTimeout: 20 * time.Second,
|
||||
DialOptions: []grpc.DialOption{
|
||||
grpc.WithBlock(), // block until the underlying connection is up
|
||||
},
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
|
||||
c, err := clientv3.New(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return c, clientv3.NewKV(c), nil
|
||||
return kubeapiservertesting.GetEtcdClients(config)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue