mirror of
https://github.com/hashicorp/vault.git
synced 2026-02-03 20:40:45 -05:00
VAULT-31755: Add removed and HA health to the sys/health endpoint (#28991)
* logic * actually got test working * heartbeat health test * fix healthy definition and add changelog * fix test condition * actually fix test condition * Update vault/testing.go Co-authored-by: Kuba Wieczorek <kuba.wieczorek@hashicorp.com> * close body --------- Co-authored-by: Kuba Wieczorek <kuba.wieczorek@hashicorp.com>
This commit is contained in:
parent
826d2be5b3
commit
73bf3ebc7c
12 changed files with 360 additions and 31 deletions
|
|
@ -25,6 +25,8 @@ func (c *Sys) HealthWithContext(ctx context.Context) (*HealthResponse, error) {
|
|||
r.Params.Add("standbycode", "299")
|
||||
r.Params.Add("drsecondarycode", "299")
|
||||
r.Params.Add("performancestandbycode", "299")
|
||||
r.Params.Add("removedcode", "299")
|
||||
r.Params.Add("haunhealthycode", "299")
|
||||
|
||||
resp, err := c.c.rawRequestWithContext(ctx, r)
|
||||
if err != nil {
|
||||
|
|
@ -38,19 +40,22 @@ func (c *Sys) HealthWithContext(ctx context.Context) (*HealthResponse, error) {
|
|||
}
|
||||
|
||||
type HealthResponse struct {
|
||||
Initialized bool `json:"initialized"`
|
||||
Sealed bool `json:"sealed"`
|
||||
Standby bool `json:"standby"`
|
||||
PerformanceStandby bool `json:"performance_standby"`
|
||||
ReplicationPerformanceMode string `json:"replication_performance_mode"`
|
||||
ReplicationDRMode string `json:"replication_dr_mode"`
|
||||
ServerTimeUTC int64 `json:"server_time_utc"`
|
||||
Version string `json:"version"`
|
||||
ClusterName string `json:"cluster_name,omitempty"`
|
||||
ClusterID string `json:"cluster_id,omitempty"`
|
||||
LastWAL uint64 `json:"last_wal,omitempty"`
|
||||
Enterprise bool `json:"enterprise"`
|
||||
EchoDurationMillis int64 `json:"echo_duration_ms"`
|
||||
ClockSkewMillis int64 `json:"clock_skew_ms"`
|
||||
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
|
||||
Initialized bool `json:"initialized"`
|
||||
Sealed bool `json:"sealed"`
|
||||
Standby bool `json:"standby"`
|
||||
PerformanceStandby bool `json:"performance_standby"`
|
||||
ReplicationPerformanceMode string `json:"replication_performance_mode"`
|
||||
ReplicationDRMode string `json:"replication_dr_mode"`
|
||||
ServerTimeUTC int64 `json:"server_time_utc"`
|
||||
Version string `json:"version"`
|
||||
ClusterName string `json:"cluster_name,omitempty"`
|
||||
ClusterID string `json:"cluster_id,omitempty"`
|
||||
LastWAL uint64 `json:"last_wal,omitempty"`
|
||||
Enterprise bool `json:"enterprise"`
|
||||
EchoDurationMillis int64 `json:"echo_duration_ms"`
|
||||
ClockSkewMillis int64 `json:"clock_skew_ms"`
|
||||
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
|
||||
RemovedFromCluster *bool `json:"removed_from_cluster,omitempty"`
|
||||
HAConnectionHealthy *bool `json:"ha_connection_healthy,omitempty"`
|
||||
LastRequestForwardingHeartbeatMillis int64 `json:"last_request_forwarding_heartbeat_ms,omitempty"`
|
||||
}
|
||||
|
|
|
|||
6
changelog/28991.txt
Normal file
6
changelog/28991.txt
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
```release-note:change
|
||||
api: Add to sys/health whether the node has been removed from the HA cluster. If the node has been removed, return code 530 by default or the value of the `removedcode` query parameter.
|
||||
```
|
||||
```release-note:change
|
||||
api: Add to sys/health whether the standby node has been able to successfully send heartbeats to the active node and the time in milliseconds since the last heartbeat. If the standby has been unable to send a heartbeat, return code 474 by default or the value of the `haunhealthycode` query parameter.
|
||||
```
|
||||
|
|
@ -158,6 +158,20 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
|
|||
perfStandbyCode = code
|
||||
}
|
||||
|
||||
haUnhealthyCode := 474
|
||||
if code, found, ok := fetchStatusCode(r, "haunhealthycode"); !ok {
|
||||
return http.StatusBadRequest, nil, nil
|
||||
} else if found {
|
||||
haUnhealthyCode = code
|
||||
}
|
||||
|
||||
removedCode := 530
|
||||
if code, found, ok := fetchStatusCode(r, "removedcode"); !ok {
|
||||
return http.StatusBadRequest, nil, nil
|
||||
} else if found {
|
||||
removedCode = code
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Check system status
|
||||
|
|
@ -175,13 +189,21 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
|
|||
return http.StatusInternalServerError, nil, err
|
||||
}
|
||||
|
||||
removed, shouldIncludeRemoved := core.IsRemovedFromCluster()
|
||||
|
||||
haHealthy, lastHeartbeat := core.GetHAHeartbeatHealth()
|
||||
|
||||
// Determine the status code
|
||||
code := activeCode
|
||||
switch {
|
||||
case !init:
|
||||
code = uninitCode
|
||||
case removed:
|
||||
code = removedCode
|
||||
case sealed:
|
||||
code = sealedCode
|
||||
case !haHealthy && lastHeartbeat != nil:
|
||||
code = haUnhealthyCode
|
||||
case replicationState.HasState(consts.ReplicationDRSecondary):
|
||||
code = drSecondaryCode
|
||||
case perfStandby:
|
||||
|
|
@ -233,6 +255,15 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
|
|||
return http.StatusInternalServerError, nil, err
|
||||
}
|
||||
|
||||
if shouldIncludeRemoved {
|
||||
body.RemovedFromCluster = &removed
|
||||
}
|
||||
|
||||
if lastHeartbeat != nil {
|
||||
body.LastRequestForwardingHeartbeatMillis = lastHeartbeat.Milliseconds()
|
||||
body.HAConnectionHealthy = &haHealthy
|
||||
}
|
||||
|
||||
if licenseState != nil {
|
||||
body.License = &HealthResponseLicense{
|
||||
State: licenseState.State,
|
||||
|
|
@ -257,20 +288,23 @@ type HealthResponseLicense struct {
|
|||
}
|
||||
|
||||
type HealthResponse struct {
|
||||
Initialized bool `json:"initialized"`
|
||||
Sealed bool `json:"sealed"`
|
||||
Standby bool `json:"standby"`
|
||||
PerformanceStandby bool `json:"performance_standby"`
|
||||
ReplicationPerformanceMode string `json:"replication_performance_mode"`
|
||||
ReplicationDRMode string `json:"replication_dr_mode"`
|
||||
ServerTimeUTC int64 `json:"server_time_utc"`
|
||||
Version string `json:"version"`
|
||||
Enterprise bool `json:"enterprise"`
|
||||
ClusterName string `json:"cluster_name,omitempty"`
|
||||
ClusterID string `json:"cluster_id,omitempty"`
|
||||
LastWAL uint64 `json:"last_wal,omitempty"`
|
||||
License *HealthResponseLicense `json:"license,omitempty"`
|
||||
EchoDurationMillis int64 `json:"echo_duration_ms"`
|
||||
ClockSkewMillis int64 `json:"clock_skew_ms"`
|
||||
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
|
||||
Initialized bool `json:"initialized"`
|
||||
Sealed bool `json:"sealed"`
|
||||
Standby bool `json:"standby"`
|
||||
PerformanceStandby bool `json:"performance_standby"`
|
||||
ReplicationPerformanceMode string `json:"replication_performance_mode"`
|
||||
ReplicationDRMode string `json:"replication_dr_mode"`
|
||||
ServerTimeUTC int64 `json:"server_time_utc"`
|
||||
Version string `json:"version"`
|
||||
Enterprise bool `json:"enterprise"`
|
||||
ClusterName string `json:"cluster_name,omitempty"`
|
||||
ClusterID string `json:"cluster_id,omitempty"`
|
||||
LastWAL uint64 `json:"last_wal,omitempty"`
|
||||
License *HealthResponseLicense `json:"license,omitempty"`
|
||||
EchoDurationMillis int64 `json:"echo_duration_ms"`
|
||||
ClockSkewMillis int64 `json:"clock_skew_ms"`
|
||||
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
|
||||
RemovedFromCluster *bool `json:"removed_from_cluster,omitempty"`
|
||||
HAConnectionHealthy *bool `json:"ha_connection_healthy,omitempty"`
|
||||
LastRequestForwardingHeartbeatMillis int64 `json:"last_request_forwarding_heartbeat_ms,omitempty"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/hashicorp/vault/helper/constants"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSysHealth_get(t *testing.T) {
|
||||
|
|
@ -215,3 +216,29 @@ func TestSysHealth_head(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSysHealth_Removed checks that a removed node returns a 530 and sets
|
||||
// removed from cluster to be true. The test also checks that the removedcode
|
||||
// query parameter is respected.
|
||||
func TestSysHealth_Removed(t *testing.T) {
|
||||
core, err := vault.TestCoreWithMockRemovableNodeHABackend(t, true)
|
||||
require.NoError(t, err)
|
||||
vault.TestCoreInit(t, core)
|
||||
ln, addr := TestServer(t, core)
|
||||
defer ln.Close()
|
||||
raw, err := http.Get(addr + "/v1/sys/health")
|
||||
require.NoError(t, err)
|
||||
testResponseStatus(t, raw, 530)
|
||||
healthResp := HealthResponse{}
|
||||
testResponseBody(t, raw, &healthResp)
|
||||
require.NotNil(t, healthResp.RemovedFromCluster)
|
||||
require.True(t, *healthResp.RemovedFromCluster)
|
||||
|
||||
raw, err = http.Get(addr + "/v1/sys/health?removedcode=299")
|
||||
require.NoError(t, err)
|
||||
testResponseStatus(t, raw, 299)
|
||||
secondHealthResp := HealthResponse{}
|
||||
testResponseBody(t, raw, &secondHealthResp)
|
||||
require.NotNil(t, secondHealthResp.RemovedFromCluster)
|
||||
require.True(t, *secondHealthResp.RemovedFromCluster)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -116,6 +116,24 @@ func (l *InmemLayer) Listeners() []NetworkListener {
|
|||
return []NetworkListener{l.listener}
|
||||
}
|
||||
|
||||
// Partition forces the inmem layer to disconnect itself from peers and prevents
|
||||
// creating new connections. The returned function will add all peers back
|
||||
// and re-enable connections
|
||||
func (l *InmemLayer) Partition() (unpartition func()) {
|
||||
l.l.Lock()
|
||||
peersCopy := make([]*InmemLayer, 0, len(l.peers))
|
||||
for _, peer := range l.peers {
|
||||
peersCopy = append(peersCopy, peer)
|
||||
}
|
||||
l.l.Unlock()
|
||||
l.DisconnectAll()
|
||||
return func() {
|
||||
for _, peer := range peersCopy {
|
||||
l.Connect(peer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Dial implements NetworkLayer.
|
||||
func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Config) (*tls.Conn, error) {
|
||||
l.l.Lock()
|
||||
|
|
|
|||
|
|
@ -529,6 +529,8 @@ type Core struct {
|
|||
rpcClientConn *grpc.ClientConn
|
||||
// The grpc forwarding client
|
||||
rpcForwardingClient *forwardingClient
|
||||
// The time of the last successful request forwarding heartbeat
|
||||
rpcLastSuccessfulHeartbeat *atomic.Value
|
||||
// The UUID used to hold the leader lock. Only set on active node
|
||||
leaderUUID string
|
||||
|
||||
|
|
@ -1092,6 +1094,7 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
|
|||
echoDuration: uberAtomic.NewDuration(0),
|
||||
activeNodeClockSkewMillis: uberAtomic.NewInt64(0),
|
||||
periodicLeaderRefreshInterval: conf.PeriodicLeaderRefreshInterval,
|
||||
rpcLastSuccessfulHeartbeat: new(atomic.Value),
|
||||
}
|
||||
|
||||
c.standbyStopCh.Store(make(chan struct{}))
|
||||
|
|
|
|||
|
|
@ -31,8 +31,10 @@ import (
|
|||
vaulthttp "github.com/hashicorp/vault/http"
|
||||
"github.com/hashicorp/vault/internalshared/configutil"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/jsonutil"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
"github.com/hashicorp/vault/vault/cluster"
|
||||
vaultseal "github.com/hashicorp/vault/vault/seal"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/http2"
|
||||
|
|
@ -1414,3 +1416,126 @@ func TestRaftCluster_Removed_RaftConfig(t *testing.T) {
|
|||
})
|
||||
require.Eventually(t, follower.Sealed, 10*time.Second, 500*time.Millisecond)
|
||||
}
|
||||
|
||||
// TestSysHealth_Raft creates a raft cluster and verifies that the health status
|
||||
// is OK for a healthy follower. The test partitions one of the nodes so that it
|
||||
// can't send request forwarding RPCs. The test verifies that the status
|
||||
// endpoint shows that HA isn't healthy. Finally, the test removes the
|
||||
// partitioned follower and unpartitions it. The follower will learn that it has
|
||||
// been removed, and should return the removed status.
|
||||
func TestSysHealth_Raft(t *testing.T) {
|
||||
parseHealthBody := func(t *testing.T, resp *api.Response) *vaulthttp.HealthResponse {
|
||||
t.Helper()
|
||||
health := vaulthttp.HealthResponse{}
|
||||
defer resp.Body.Close()
|
||||
require.NoError(t, jsonutil.DecodeJSONFromReader(resp.Body, &health))
|
||||
return &health
|
||||
}
|
||||
|
||||
opts := &vault.TestClusterOptions{
|
||||
HandlerFunc: vaulthttp.Handler,
|
||||
NumCores: 3,
|
||||
InmemClusterLayers: true,
|
||||
}
|
||||
heartbeat := 500 * time.Millisecond
|
||||
teststorage.RaftBackendSetup(nil, opts)
|
||||
conf := &vault.CoreConfig{
|
||||
ClusterHeartbeatInterval: heartbeat,
|
||||
}
|
||||
vaultCluster := vault.NewTestCluster(t, conf, opts)
|
||||
defer vaultCluster.Cleanup()
|
||||
testhelpers.WaitForActiveNodeAndStandbys(t, vaultCluster)
|
||||
followerClient := vaultCluster.Cores[1].Client
|
||||
|
||||
t.Run("healthy", func(t *testing.T) {
|
||||
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
|
||||
"perfstandbyok": {"true"},
|
||||
"standbyok": {"true"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, resp.StatusCode, 200)
|
||||
r := parseHealthBody(t, resp)
|
||||
require.False(t, *r.RemovedFromCluster)
|
||||
require.True(t, *r.HAConnectionHealthy)
|
||||
require.Less(t, r.LastRequestForwardingHeartbeatMillis, 2*heartbeat.Milliseconds())
|
||||
})
|
||||
nl := vaultCluster.Cores[1].NetworkLayer()
|
||||
inmem, ok := nl.(*cluster.InmemLayer)
|
||||
require.True(t, ok)
|
||||
unpartition := inmem.Partition()
|
||||
|
||||
t.Run("partition", func(t *testing.T) {
|
||||
time.Sleep(2 * heartbeat)
|
||||
var erroredResponse *api.Response
|
||||
// the node isn't able to send/receive heartbeats, so it will have
|
||||
// haunhealthy status.
|
||||
testhelpers.RetryUntil(t, 3*time.Second, func() error {
|
||||
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
|
||||
"perfstandbyok": {"true"},
|
||||
"standbyok": {"true"},
|
||||
})
|
||||
if err == nil {
|
||||
if resp != nil && resp.Body != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
return errors.New("expected error")
|
||||
}
|
||||
if resp.StatusCode != 474 {
|
||||
resp.Body.Close()
|
||||
return fmt.Errorf("status code %d", resp.StatusCode)
|
||||
}
|
||||
erroredResponse = resp
|
||||
return nil
|
||||
})
|
||||
r := parseHealthBody(t, erroredResponse)
|
||||
require.False(t, *r.RemovedFromCluster)
|
||||
require.False(t, *r.HAConnectionHealthy)
|
||||
require.Greater(t, r.LastRequestForwardingHeartbeatMillis, 2*heartbeat.Milliseconds())
|
||||
|
||||
// ensure haunhealthycode is respected
|
||||
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
|
||||
"perfstandbyok": {"true"},
|
||||
"standbyok": {"true"},
|
||||
"haunhealthycode": {"299"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 299, resp.StatusCode)
|
||||
resp.Body.Close()
|
||||
})
|
||||
|
||||
t.Run("remove and unpartition", func(t *testing.T) {
|
||||
leaderClient := vaultCluster.Cores[0].Client
|
||||
_, err := leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{
|
||||
"server_id": vaultCluster.Cores[1].NodeID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
unpartition()
|
||||
|
||||
var erroredResponse *api.Response
|
||||
|
||||
// now that the node can connect again, it will start getting the removed
|
||||
// error when trying to connect. The code should be removed, and the ha
|
||||
// connection will be nil because there is no ha connection
|
||||
testhelpers.RetryUntil(t, 10*time.Second, func() error {
|
||||
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
|
||||
"perfstandbyok": {"true"},
|
||||
"standbyok": {"true"},
|
||||
})
|
||||
if err == nil {
|
||||
if resp != nil && resp.Body != nil {
|
||||
resp.Body.Close()
|
||||
}
|
||||
return fmt.Errorf("expected error")
|
||||
}
|
||||
if resp.StatusCode != 530 {
|
||||
resp.Body.Close()
|
||||
return fmt.Errorf("status code %d", resp.StatusCode)
|
||||
}
|
||||
erroredResponse = resp
|
||||
return nil
|
||||
})
|
||||
r := parseHealthBody(t, erroredResponse)
|
||||
require.True(t, true, *r.RemovedFromCluster)
|
||||
require.Nil(t, r.HAConnectionHealthy)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
23
vault/ha.go
23
vault/ha.go
|
|
@ -46,6 +46,8 @@ const (
|
|||
// leaderPrefixCleanDelay is how long to wait between deletions
|
||||
// of orphaned leader keys, to prevent slamming the backend.
|
||||
leaderPrefixCleanDelay = 200 * time.Millisecond
|
||||
|
||||
haAllowedMissedHeartbeats = 2
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
@ -1236,3 +1238,24 @@ func (c *Core) getRemovableHABackend() physical.RemovableNodeHABackend {
|
|||
|
||||
return haBackend
|
||||
}
|
||||
|
||||
// GetHAHeartbeatHealth returns whether a node's last successful heartbeat was
|
||||
// more than 2 intervals ago. If the node's request forwarding clients were
|
||||
// cleared (due to the node being sealed or finding a new leader), or the node
|
||||
// is uninitialized, healthy will be false.
|
||||
func (c *Core) GetHAHeartbeatHealth() (healthy bool, sinceLastHeartbeat *time.Duration) {
|
||||
heartbeat := c.rpcLastSuccessfulHeartbeat.Load()
|
||||
if heartbeat == nil {
|
||||
return false, nil
|
||||
}
|
||||
lastHeartbeat := heartbeat.(time.Time)
|
||||
if lastHeartbeat.IsZero() {
|
||||
return false, nil
|
||||
}
|
||||
diff := time.Now().Sub(lastHeartbeat)
|
||||
heartbeatInterval := c.clusterHeartbeatInterval
|
||||
if heartbeatInterval <= 0 {
|
||||
heartbeatInterval = 5 * time.Second
|
||||
}
|
||||
return diff < heartbeatInterval*haAllowedMissedHeartbeats, &diff
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,8 @@ import (
|
|||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestGrabLockOrStop is a non-deterministic test to detect deadlocks in the
|
||||
|
|
@ -85,3 +87,76 @@ func TestGrabLockOrStop(t *testing.T) {
|
|||
}
|
||||
workerWg.Wait()
|
||||
}
|
||||
|
||||
// TestGetHAHeartbeatHealth checks that heartbeat health is correctly determined
|
||||
// for a variety of scenarios
|
||||
func TestGetHAHeartbeatHealth(t *testing.T) {
|
||||
now := time.Now().UTC()
|
||||
oldLastHeartbeat := now.Add(-1 * time.Hour)
|
||||
futureHeartbeat := now.Add(10 * time.Second)
|
||||
zeroHeartbeat := time.Time{}
|
||||
testCases := []struct {
|
||||
name string
|
||||
lastHeartbeat *time.Time
|
||||
heartbeatInterval time.Duration
|
||||
wantHealthy bool
|
||||
}{
|
||||
{
|
||||
name: "old heartbeat",
|
||||
lastHeartbeat: &oldLastHeartbeat,
|
||||
heartbeatInterval: 5 * time.Second,
|
||||
wantHealthy: false,
|
||||
},
|
||||
{
|
||||
name: "no heartbeat",
|
||||
lastHeartbeat: nil,
|
||||
heartbeatInterval: 5 * time.Second,
|
||||
wantHealthy: false,
|
||||
},
|
||||
{
|
||||
name: "recent heartbeat",
|
||||
lastHeartbeat: &now,
|
||||
heartbeatInterval: 20 * time.Second,
|
||||
wantHealthy: true,
|
||||
},
|
||||
{
|
||||
name: "recent heartbeat, empty interval",
|
||||
lastHeartbeat: &futureHeartbeat,
|
||||
heartbeatInterval: 0,
|
||||
wantHealthy: true,
|
||||
},
|
||||
{
|
||||
name: "old heartbeat, empty interval",
|
||||
lastHeartbeat: &oldLastHeartbeat,
|
||||
heartbeatInterval: 0,
|
||||
wantHealthy: false,
|
||||
},
|
||||
{
|
||||
name: "zero value heartbeat",
|
||||
lastHeartbeat: &zeroHeartbeat,
|
||||
heartbeatInterval: 5 * time.Second,
|
||||
wantHealthy: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
v := new(atomic.Value)
|
||||
if tc.lastHeartbeat != nil {
|
||||
v.Store(*tc.lastHeartbeat)
|
||||
}
|
||||
c := &Core{
|
||||
rpcLastSuccessfulHeartbeat: v,
|
||||
clusterHeartbeatInterval: tc.heartbeatInterval,
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
gotHealthy, gotLastHeartbeat := c.GetHAHeartbeatHealth()
|
||||
require.Equal(t, tc.wantHealthy, gotHealthy)
|
||||
if tc.lastHeartbeat != nil && !tc.lastHeartbeat.IsZero() {
|
||||
require.InDelta(t, now.Sub(*tc.lastHeartbeat).Milliseconds(), gotLastHeartbeat.Milliseconds(), float64(3*time.Second.Milliseconds()))
|
||||
} else {
|
||||
require.Nil(t, gotLastHeartbeat)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -442,6 +442,7 @@ func (c *Core) clearForwardingClients() {
|
|||
clusterListener.RemoveClient(consts.RequestForwardingALPN)
|
||||
}
|
||||
c.clusterLeaderParams.Store((*ClusterLeaderParams)(nil))
|
||||
c.rpcLastSuccessfulHeartbeat.Store(time.Time{})
|
||||
}
|
||||
|
||||
// ForwardRequest forwards a given request to the active node and returns the
|
||||
|
|
|
|||
|
|
@ -201,6 +201,7 @@ func (c *forwardingClient) startHeartbeat() {
|
|||
c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err)
|
||||
return
|
||||
}
|
||||
c.core.rpcLastSuccessfulHeartbeat.Store(now)
|
||||
if resp == nil {
|
||||
c.core.logger.Debug("forwarding: empty echo response from active node")
|
||||
return
|
||||
|
|
@ -214,6 +215,9 @@ func (c *forwardingClient) startHeartbeat() {
|
|||
atomic.StoreUint32(c.core.activeNodeReplicationState, resp.ReplicationState)
|
||||
}
|
||||
|
||||
// store a value before the first tick to indicate that we've started
|
||||
// sending heartbeats
|
||||
c.core.rpcLastSuccessfulHeartbeat.Store(time.Now())
|
||||
tick()
|
||||
|
||||
for {
|
||||
|
|
|
|||
|
|
@ -977,6 +977,14 @@ func (c *TestClusterCore) ClusterListener() *cluster.Listener {
|
|||
return c.getClusterListener()
|
||||
}
|
||||
|
||||
// NetworkLayer returns the network layer for the cluster core. This can be used
|
||||
// in conjunction with the cluster.InmemLayer to disconnect specific nodes from
|
||||
// the cluster when we need to simulate abrupt node failure or a network
|
||||
// partition in NewTestCluster tests.
|
||||
func (c *TestClusterCore) NetworkLayer() cluster.NetworkLayer {
|
||||
return c.Core.clusterNetworkLayer
|
||||
}
|
||||
|
||||
func (c *TestCluster) Cleanup() {
|
||||
c.Logger.Info("cleaning up vault cluster")
|
||||
if tl, ok := c.Logger.(*corehelpers.TestLogger); ok {
|
||||
|
|
|
|||
Loading…
Reference in a new issue