diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index 3d3a8dadf1a..4df094aae15 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -23,6 +23,7 @@ import ( "net" "net/url" "path/filepath" + "slices" "strconv" "strings" "time" @@ -36,6 +37,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" @@ -101,6 +103,13 @@ type Client struct { listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error) } +type etcdMemberStatus struct { + ep string + status *clientv3.StatusResponse + // err is any error encountered while communicating with the etcd server. + err error +} + // New creates a new EtcdCluster client func New(endpoints []string, ca, cert, key string) (*Client, error) { client := Client{Endpoints: endpoints} @@ -535,7 +544,7 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem if !isLearner { // Add the new member client address to the list of endpoints - c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname())) + c.addEndpoint(GetClientURLByIP(parsedPeerAddrs.Hostname())) } return ret, nil @@ -617,11 +626,12 @@ func (c *Client) MemberPromote(learnerID uint64) error { // 2. context deadline exceeded // 3. peer URLs already exists // Once the client provides a way to check if the etcd learner is ready to promote, the retry logic can be revisited. + var promoteResp *clientv3.MemberPromoteResponse err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().EtcdAPICall.Duration, true, func(_ context.Context) (bool, error) { ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) defer cancel() - _, err = cli.MemberPromote(ctx, learnerID) + promoteResp, err = cli.MemberPromote(ctx, learnerID) if err == nil { klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %s", learnerIDUint) return true, nil @@ -633,18 +643,52 @@ func (c *Client) MemberPromote(learnerID uint64) error { if err != nil { return lastError } + + for _, m := range promoteResp.Members { + if m.ID == learnerID { + parsedPeerAddrs, err := url.Parse(m.PeerURLs[0]) + if err != nil { + return errors.Wrapf(err, "error parsing peer address %s", m.PeerURLs[0]) + } + c.addEndpoint(GetClientURLByIP(parsedPeerAddrs.Hostname())) + break + } + } + return nil } +func (c *Client) addEndpoint(ep string) { + if slices.Contains(c.Endpoints, ep) { + return + } + c.Endpoints = append(c.Endpoints, ep) +} + // CheckClusterHealth returns nil for status Up or error for status Down func (c *Client) CheckClusterHealth() error { - _, err := c.getClusterStatus() + _, ok, err := c.getClusterStatus() + if err != nil { + klog.V(1).Infof("[etcd] cluster has quorum: %t; some members are not healthy: %v\n", ok, err) + } + if ok { + return nil + } return err } -// getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down -func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) { - clusterStatus := make(map[string]*clientv3.StatusResponse) +// getClusterStatus checks the health of the cluster members and returns +// their individual status map, whether cluster quorum is satisfied, and any +// aggregated member errors. +// +// The boolean result is true when a majority of members are healthy +// (healthyCount > totalCount/2). +// +// A member is considered unhealthy if its status request failed or if the +// reported status contains health errors. +func (c *Client) getClusterStatus() (map[string]*etcdMemberStatus, bool, error) { + // Step 1: get the cluster status first + clusterStatus := make(map[string]*etcdMemberStatus) for _, ep := range c.Endpoints { // Gets the member status var lastError error @@ -653,6 +697,7 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) true, func(_ context.Context) (bool, error) { cli, err := c.newEtcdClient(c.Endpoints) if err != nil { + klog.V(5).Infof("Failed to create etcd client with %v: %v", c.Endpoints, err) lastError = err return false, nil } @@ -669,15 +714,33 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) return false, nil }) if err != nil { - return nil, lastError + clusterStatus[ep] = &etcdMemberStatus{ep: ep, err: lastError} + } else { + clusterStatus[ep] = &etcdMemberStatus{ep: ep, status: resp} } - - clusterStatus[ep] = resp } - return clusterStatus, nil + + // Step 2: evaluate the cluster status + totalCount, healthyCount := len(clusterStatus), 0 + var memberErrs []error + + for ep, epStatus := range clusterStatus { + if epStatus.err != nil { + memberErrs = append(memberErrs, errors.Wrapf(epStatus.err, "the status of member %s is not available", ep)) + continue + } + if len(epStatus.status.Errors) > 0 { + memberErrs = append(memberErrs, errors.Errorf("member %s is not healthy: %s", ep, strings.Join(epStatus.status.Errors, ","))) + continue + } + healthyCount++ + } + + err := utilerrors.NewAggregate(memberErrs) + return clusterStatus, healthyCount > totalCount/2, err } -// WaitForClusterAvailable returns true if all endpoints in the cluster are available after retry attempts, an error is returned otherwise +// WaitForClusterAvailable returns true if the etcd cluster is healthy after retry attempts, otherwise returns an error. func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) { for i := 0; i < retries; i++ { if i > 0 { @@ -685,17 +748,13 @@ func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duratio time.Sleep(retryInterval) } klog.V(2).Infof("[etcd] attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries) - _, err := c.getClusterStatus() + _, ok, err := c.getClusterStatus() if err != nil { - switch err { - case context.DeadlineExceeded: - klog.V(1).Infof("[etcd] Attempt timed out") - default: - klog.V(1).Infof("[etcd] Attempt failed with error: %v\n", err) - } - continue + klog.V(1).Infof("[etcd] cluster has quorum: %t; some members are not healthy: %v\n", ok, err) + } + if ok { + return true, nil } - return true, nil } return false, errors.New("timeout waiting for etcd cluster to be available") } diff --git a/cmd/kubeadm/app/util/etcd/etcd_test.go b/cmd/kubeadm/app/util/etcd/etcd_test.go index d88968f6f91..5776e19f535 100644 --- a/cmd/kubeadm/app/util/etcd/etcd_test.go +++ b/cmd/kubeadm/app/util/etcd/etcd_test.go @@ -29,6 +29,7 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" clientsetfake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" @@ -823,3 +824,144 @@ func TestGetMemberStatus(t *testing.T) { }) } } + +type fakeEtcdClientWithStatusResponse struct { + fakeEtcdClient + statusResponses map[string]*clientv3.StatusResponse + statusRequestErrors map[string]error +} + +// Status gets the status of the endpoint. +func (f *fakeEtcdClientWithStatusResponse) Status(_ context.Context, ep string) (*clientv3.StatusResponse, error) { + if f.statusRequestErrors != nil { + if _, ok := f.statusRequestErrors[ep]; ok { + return nil, f.statusRequestErrors[ep] + } + } + return f.statusResponses[ep], nil +} + +func TestEvaluateClusterStatus(t *testing.T) { + testCases := []struct { + name string + Endpoints []string + newEtcdClient func(endpoints []string) (etcdClient, error) + wantClusterHealthy bool + wantMemberErrors bool + }{ + { + name: "all the three members are healthy", + Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"}, + newEtcdClient: func(endpoints []string) (etcdClient, error) { + f := &fakeEtcdClientWithStatusResponse{ + statusResponses: map[string]*clientv3.StatusResponse{ + "https://192.168.10.100:2379": {}, + "https://192.168.10.200:2379": {}, + "https://192.168.10.300:2379": {}, + }, + } + return f, nil + }, + wantClusterHealthy: true, + wantMemberErrors: false, + }, + { + name: "one out of three members has errors", + Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"}, + newEtcdClient: func(endpoints []string) (etcdClient, error) { + f := &fakeEtcdClientWithStatusResponse{ + statusResponses: map[string]*clientv3.StatusResponse{ + "https://192.168.10.100:2379": {}, + "https://192.168.10.200:2379": {Errors: []string{"etcdserver: mvcc: database space exceeded"}}, + "https://192.168.10.300:2379": {}, + }, + } + return f, nil + }, + wantClusterHealthy: true, + wantMemberErrors: true, + }, + { + name: "one out of three members is unreachable", + Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"}, + newEtcdClient: func(endpoints []string) (etcdClient, error) { + f := &fakeEtcdClientWithStatusResponse{ + statusResponses: map[string]*clientv3.StatusResponse{ + "https://192.168.10.100:2379": {}, + "https://192.168.10.200:2379": {}, + "https://192.168.10.300:2379": {}, + }, + statusRequestErrors: map[string]error{ + "https://192.168.10.200:2379": errors.New("context deadline exceeded"), + }, + } + return f, nil + }, + wantClusterHealthy: true, + wantMemberErrors: true, + }, + { + name: "two out of three members has errors", + Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"}, + newEtcdClient: func(endpoints []string) (etcdClient, error) { + f := &fakeEtcdClientWithStatusResponse{ + statusResponses: map[string]*clientv3.StatusResponse{ + "https://192.168.10.100:2379": {}, + "https://192.168.10.200:2379": {Errors: []string{"etcdserver: mvcc: database space exceeded"}}, + "https://192.168.10.300:2379": {Errors: []string{"etcdserver: mvcc: data corrupted"}}, + }, + } + return f, nil + }, + wantClusterHealthy: false, + wantMemberErrors: true, + }, + { + name: "two out of three members are unreachable", + Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"}, + newEtcdClient: func(endpoints []string) (etcdClient, error) { + f := &fakeEtcdClientWithStatusResponse{ + statusResponses: map[string]*clientv3.StatusResponse{ + "https://192.168.10.100:2379": {}, + "https://192.168.10.200:2379": {}, + "https://192.168.10.300:2379": {}, + }, + statusRequestErrors: map[string]error{ + "https://192.168.10.200:2379": errors.New("context deadline exceeded"), + "https://192.168.10.300:2379": errors.New("context deadline exceeded"), + }, + } + return f, nil + }, + wantClusterHealthy: false, + wantMemberErrors: true, + }, + } + + // Temporarily reduce the etcd API call timeout from 2 minutes to 1 second. + oldActiveTimeout := kubeadmapi.GetActiveTimeouts() + newActiveTimeout := oldActiveTimeout.DeepCopy() + newActiveTimeout.EtcdAPICall = &metav1.Duration{Duration: 1 * time.Second} + kubeadmapi.SetActiveTimeouts(newActiveTimeout) + defer func() { + kubeadmapi.SetActiveTimeouts(oldActiveTimeout) + }() + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c := &Client{ + Endpoints: tc.Endpoints, + newEtcdClient: tc.newEtcdClient, + } + _, gotClusterHealthy, err := c.getClusterStatus() + + if gotClusterHealthy != tc.wantClusterHealthy { + t.Errorf("gotClusterHealthy = %t, want = %t", gotClusterHealthy, tc.wantClusterHealthy) + } + + if tc.wantMemberErrors != (err != nil) { + t.Errorf("gotMemberErrors = %v, wantMemberErrors = %t", err, tc.wantMemberErrors) + } + }) + } +}