Merge pull request #138541 from ahrtr/automated-cherry-pick-of-#138403-upstream-release-1.33

Automated cherry pick of #138403: kubeadm: Evaluate etcd cluster health using quorum
This commit is contained in:
Kubernetes Prow Robot 2026-04-23 18:46:48 +05:30 committed by GitHub
commit 32137bf43b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 221 additions and 20 deletions

View file

@ -23,6 +23,7 @@ import (
"net"
"net/url"
"path/filepath"
"slices"
"strconv"
"strings"
"time"
@ -36,6 +37,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
@ -101,6 +103,13 @@ type Client struct {
listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error)
}
type etcdMemberStatus struct {
ep string
status *clientv3.StatusResponse
// err is any error encountered while communicating with the etcd server.
err error
}
// New creates a new EtcdCluster client
func New(endpoints []string, ca, cert, key string) (*Client, error) {
client := Client{Endpoints: endpoints}
@ -535,7 +544,7 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
if !isLearner {
// Add the new member client address to the list of endpoints
c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
c.addEndpoint(GetClientURLByIP(parsedPeerAddrs.Hostname()))
}
return ret, nil
@ -617,11 +626,12 @@ func (c *Client) MemberPromote(learnerID uint64) error {
// 2. context deadline exceeded
// 3. peer URLs already exists
// Once the client provides a way to check if the etcd learner is ready to promote, the retry logic can be revisited.
var promoteResp *clientv3.MemberPromoteResponse
err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().EtcdAPICall.Duration,
true, func(_ context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
_, err = cli.MemberPromote(ctx, learnerID)
promoteResp, err = cli.MemberPromote(ctx, learnerID)
if err == nil {
klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %s", learnerIDUint)
return true, nil
@ -633,18 +643,52 @@ func (c *Client) MemberPromote(learnerID uint64) error {
if err != nil {
return lastError
}
for _, m := range promoteResp.Members {
if m.ID == learnerID {
parsedPeerAddrs, err := url.Parse(m.PeerURLs[0])
if err != nil {
return errors.Wrapf(err, "error parsing peer address %s", m.PeerURLs[0])
}
c.addEndpoint(GetClientURLByIP(parsedPeerAddrs.Hostname()))
break
}
}
return nil
}
func (c *Client) addEndpoint(ep string) {
if slices.Contains(c.Endpoints, ep) {
return
}
c.Endpoints = append(c.Endpoints, ep)
}
// CheckClusterHealth returns nil for status Up or error for status Down
func (c *Client) CheckClusterHealth() error {
_, err := c.getClusterStatus()
_, ok, err := c.getClusterStatus()
if err != nil {
klog.V(1).Infof("[etcd] cluster has quorum: %t; some members are not healthy: %v\n", ok, err)
}
if ok {
return nil
}
return err
}
// getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down
func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) {
clusterStatus := make(map[string]*clientv3.StatusResponse)
// getClusterStatus checks the health of the cluster members and returns
// their individual status map, whether cluster quorum is satisfied, and any
// aggregated member errors.
//
// The boolean result is true when a majority of members are healthy
// (healthyCount > totalCount/2).
//
// A member is considered unhealthy if its status request failed or if the
// reported status contains health errors.
func (c *Client) getClusterStatus() (map[string]*etcdMemberStatus, bool, error) {
// Step 1: get the cluster status first
clusterStatus := make(map[string]*etcdMemberStatus)
for _, ep := range c.Endpoints {
// Gets the member status
var lastError error
@ -653,6 +697,7 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error)
true, func(_ context.Context) (bool, error) {
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
klog.V(5).Infof("Failed to create etcd client with %v: %v", c.Endpoints, err)
lastError = err
return false, nil
}
@ -669,15 +714,33 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error)
return false, nil
})
if err != nil {
return nil, lastError
clusterStatus[ep] = &etcdMemberStatus{ep: ep, err: lastError}
} else {
clusterStatus[ep] = &etcdMemberStatus{ep: ep, status: resp}
}
clusterStatus[ep] = resp
}
return clusterStatus, nil
// Step 2: evaluate the cluster status
totalCount, healthyCount := len(clusterStatus), 0
var memberErrs []error
for ep, epStatus := range clusterStatus {
if epStatus.err != nil {
memberErrs = append(memberErrs, errors.Wrapf(epStatus.err, "the status of member %s is not available", ep))
continue
}
if len(epStatus.status.Errors) > 0 {
memberErrs = append(memberErrs, errors.Errorf("member %s is not healthy: %s", ep, strings.Join(epStatus.status.Errors, ",")))
continue
}
healthyCount++
}
err := utilerrors.NewAggregate(memberErrs)
return clusterStatus, healthyCount > totalCount/2, err
}
// WaitForClusterAvailable returns true if all endpoints in the cluster are available after retry attempts, an error is returned otherwise
// WaitForClusterAvailable returns true if the etcd cluster is healthy after retry attempts, otherwise returns an error.
func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) {
for i := 0; i < retries; i++ {
if i > 0 {
@ -685,17 +748,13 @@ func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duratio
time.Sleep(retryInterval)
}
klog.V(2).Infof("[etcd] attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
_, err := c.getClusterStatus()
_, ok, err := c.getClusterStatus()
if err != nil {
switch err {
case context.DeadlineExceeded:
klog.V(1).Infof("[etcd] Attempt timed out")
default:
klog.V(1).Infof("[etcd] Attempt failed with error: %v\n", err)
}
continue
klog.V(1).Infof("[etcd] cluster has quorum: %t; some members are not healthy: %v\n", ok, err)
}
if ok {
return true, nil
}
return true, nil
}
return false, errors.New("timeout waiting for etcd cluster to be available")
}

View file

@ -29,6 +29,7 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
@ -823,3 +824,144 @@ func TestGetMemberStatus(t *testing.T) {
})
}
}
type fakeEtcdClientWithStatusResponse struct {
fakeEtcdClient
statusResponses map[string]*clientv3.StatusResponse
statusRequestErrors map[string]error
}
// Status gets the status of the endpoint.
func (f *fakeEtcdClientWithStatusResponse) Status(_ context.Context, ep string) (*clientv3.StatusResponse, error) {
if f.statusRequestErrors != nil {
if _, ok := f.statusRequestErrors[ep]; ok {
return nil, f.statusRequestErrors[ep]
}
}
return f.statusResponses[ep], nil
}
func TestEvaluateClusterStatus(t *testing.T) {
testCases := []struct {
name string
Endpoints []string
newEtcdClient func(endpoints []string) (etcdClient, error)
wantClusterHealthy bool
wantMemberErrors bool
}{
{
name: "all the three members are healthy",
Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClientWithStatusResponse{
statusResponses: map[string]*clientv3.StatusResponse{
"https://192.168.10.100:2379": {},
"https://192.168.10.200:2379": {},
"https://192.168.10.300:2379": {},
},
}
return f, nil
},
wantClusterHealthy: true,
wantMemberErrors: false,
},
{
name: "one out of three members has errors",
Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClientWithStatusResponse{
statusResponses: map[string]*clientv3.StatusResponse{
"https://192.168.10.100:2379": {},
"https://192.168.10.200:2379": {Errors: []string{"etcdserver: mvcc: database space exceeded"}},
"https://192.168.10.300:2379": {},
},
}
return f, nil
},
wantClusterHealthy: true,
wantMemberErrors: true,
},
{
name: "one out of three members is unreachable",
Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClientWithStatusResponse{
statusResponses: map[string]*clientv3.StatusResponse{
"https://192.168.10.100:2379": {},
"https://192.168.10.200:2379": {},
"https://192.168.10.300:2379": {},
},
statusRequestErrors: map[string]error{
"https://192.168.10.200:2379": errors.New("context deadline exceeded"),
},
}
return f, nil
},
wantClusterHealthy: true,
wantMemberErrors: true,
},
{
name: "two out of three members has errors",
Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClientWithStatusResponse{
statusResponses: map[string]*clientv3.StatusResponse{
"https://192.168.10.100:2379": {},
"https://192.168.10.200:2379": {Errors: []string{"etcdserver: mvcc: database space exceeded"}},
"https://192.168.10.300:2379": {Errors: []string{"etcdserver: mvcc: data corrupted"}},
},
}
return f, nil
},
wantClusterHealthy: false,
wantMemberErrors: true,
},
{
name: "two out of three members are unreachable",
Endpoints: []string{"https://192.168.10.100:2379", "https://192.168.10.200:2379", "https://192.168.10.300:2379"},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClientWithStatusResponse{
statusResponses: map[string]*clientv3.StatusResponse{
"https://192.168.10.100:2379": {},
"https://192.168.10.200:2379": {},
"https://192.168.10.300:2379": {},
},
statusRequestErrors: map[string]error{
"https://192.168.10.200:2379": errors.New("context deadline exceeded"),
"https://192.168.10.300:2379": errors.New("context deadline exceeded"),
},
}
return f, nil
},
wantClusterHealthy: false,
wantMemberErrors: true,
},
}
// Temporarily reduce the etcd API call timeout from 2 minutes to 1 second.
oldActiveTimeout := kubeadmapi.GetActiveTimeouts()
newActiveTimeout := oldActiveTimeout.DeepCopy()
newActiveTimeout.EtcdAPICall = &metav1.Duration{Duration: 1 * time.Second}
kubeadmapi.SetActiveTimeouts(newActiveTimeout)
defer func() {
kubeadmapi.SetActiveTimeouts(oldActiveTimeout)
}()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := &Client{
Endpoints: tc.Endpoints,
newEtcdClient: tc.newEtcdClient,
}
_, gotClusterHealthy, err := c.getClusterStatus()
if gotClusterHealthy != tc.wantClusterHealthy {
t.Errorf("gotClusterHealthy = %t, want = %t", gotClusterHealthy, tc.wantClusterHealthy)
}
if tc.wantMemberErrors != (err != nil) {
t.Errorf("gotMemberErrors = %v, wantMemberErrors = %t", err, tc.wantMemberErrors)
}
})
}
}