Raft config refactor for mount entry size limit (#25992)

* CE parts for mount-namespace entry limit

* Remove redundant code from refactor

* Add doc comment note about ent-only use of interface

* Add CHANGELOG
This commit is contained in:
Paul Banks 2024-03-19 17:28:23 +00:00 committed by GitHub
parent 012c3422f8
commit 3a2a922b26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 851 additions and 504 deletions

7
changelog/25992.txt Normal file
View file

@ -0,0 +1,7 @@
```release-note:improvement
storage/raft (enterprise): add support for separate entry size limit for mount
and namespace table paths in storage to allow increased mount table size without
allowing other user storage entries to become larger.
```

320
physical/raft/config.go Normal file
View file

@ -0,0 +1,320 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package raft
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"time"
bolt "github.com/hashicorp-forge/bbolt"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/go-uuid"
goversion "github.com/hashicorp/go-version"
autopilot "github.com/hashicorp/raft-autopilot"
etcdbolt "go.etcd.io/bbolt"
)
type RaftBackendConfig struct {
Path string
NodeId string
ApplyDelay time.Duration
RaftWal bool
RaftLogVerifierEnabled bool
RaftLogVerificationInterval time.Duration
SnapshotDelay time.Duration
MaxEntrySize uint64
MaxBatchEntries int
MaxBatchSize int
AutopilotReconcileInterval time.Duration
AutopilotUpdateInterval time.Duration
RetryJoin string
// Enterprise only
RaftNonVoter bool
MaxMountAndNamespaceTableEntrySize uint64
AutopilotUpgradeVersion string
AutopilotRedundancyZone string
}
func parseRaftBackendConfig(conf map[string]string, logger log.Logger) (*RaftBackendConfig, error) {
c := &RaftBackendConfig{}
c.Path = conf["path"]
envPath := os.Getenv(EnvVaultRaftPath)
if envPath != "" {
c.Path = envPath
}
if c.Path == "" {
return nil, fmt.Errorf("'path' must be set")
}
c.NodeId = conf["node_id"]
envNodeId := os.Getenv(EnvVaultRaftNodeID)
if envNodeId != "" {
c.NodeId = envNodeId
}
if c.NodeId == "" {
localIDRaw, err := os.ReadFile(filepath.Join(c.Path, "node-id"))
if err == nil && len(localIDRaw) > 0 {
c.NodeId = string(localIDRaw)
}
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}
}
if c.NodeId == "" {
id, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
if err = os.WriteFile(filepath.Join(c.Path, "node-id"), []byte(id), 0o600); err != nil {
return nil, err
}
c.NodeId = id
}
if delayRaw, ok := conf["apply_delay"]; ok {
delay, err := parseutil.ParseDurationSecond(delayRaw)
if err != nil {
return nil, fmt.Errorf("apply_delay does not parse as a duration: %w", err)
}
c.ApplyDelay = delay
}
if walRaw, ok := conf["raft_wal"]; ok {
useRaftWal, err := strconv.ParseBool(walRaw)
if err != nil {
return nil, fmt.Errorf("raft_wal does not parse as a boolean: %w", err)
}
c.RaftWal = useRaftWal
}
if rlveRaw, ok := conf["raft_log_verifier_enabled"]; ok {
rlve, err := strconv.ParseBool(rlveRaw)
if err != nil {
return nil, fmt.Errorf("raft_log_verifier_enabled does not parse as a boolean: %w", err)
}
c.RaftLogVerifierEnabled = rlve
c.RaftLogVerificationInterval = defaultRaftLogVerificationInterval
if rlviRaw, ok := conf["raft_log_verification_interval"]; ok {
rlvi, err := parseutil.ParseDurationSecond(rlviRaw)
if err != nil {
return nil, fmt.Errorf("raft_log_verification_interval does not parse as a duration: %w", err)
}
// Make sure our interval is capped to a reasonable value, so e.g. people don't use 0s or 1s
if rlvi >= minimumRaftLogVerificationInterval {
c.RaftLogVerificationInterval = rlvi
} else {
logger.Warn("raft_log_verification_interval is less than the minimum allowed, using default instead",
"given", rlveRaw,
"minimum", minimumRaftLogVerificationInterval,
"default", defaultRaftLogVerificationInterval)
}
}
}
if delayRaw, ok := conf["snapshot_delay"]; ok {
delay, err := parseutil.ParseDurationSecond(delayRaw)
if err != nil {
return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err)
}
c.SnapshotDelay = delay
}
c.MaxEntrySize = defaultMaxEntrySize
if maxEntrySizeCfg := conf["max_entry_size"]; len(maxEntrySizeCfg) != 0 {
i, err := strconv.Atoi(maxEntrySizeCfg)
if err != nil {
return nil, fmt.Errorf("failed to parse 'max_entry_size': %w", err)
}
c.MaxEntrySize = uint64(i)
}
c.MaxMountAndNamespaceTableEntrySize = c.MaxEntrySize
if maxMNTEntrySize := conf["max_mount_and_namespace_table_entry_size"]; len(maxMNTEntrySize) != 0 {
i, err := strconv.Atoi(maxMNTEntrySize)
if err != nil {
return nil, fmt.Errorf("failed to parse 'max_mount_and_namespace_table_entry_size': %w", err)
}
if i < 1024 {
return nil, fmt.Errorf("'max_mount_and_namespace_table_entry_size' must be at least 1024 bytes")
}
if i > 10_485_760 {
return nil, fmt.Errorf("'max_mount_and_namespace_table_entry_size' must be at most 10,485,760 bytes (10MiB)")
}
c.MaxMountAndNamespaceTableEntrySize = uint64(i)
emitEntWarning(logger, "max_mount_and_namespace_table_entry_size")
}
c.MaxBatchEntries, c.MaxBatchSize = batchLimitsFromEnv(logger)
if interval := conf["autopilot_reconcile_interval"]; interval != "" {
interval, err := parseutil.ParseDurationSecond(interval)
if err != nil {
return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err)
}
c.AutopilotReconcileInterval = interval
}
if interval := conf["autopilot_update_interval"]; interval != "" {
interval, err := parseutil.ParseDurationSecond(interval)
if err != nil {
return nil, fmt.Errorf("autopilot_update_interval does not parse as a duration: %w", err)
}
c.AutopilotUpdateInterval = interval
}
effectiveReconcileInterval := autopilot.DefaultReconcileInterval
effectiveUpdateInterval := autopilot.DefaultUpdateInterval
if c.AutopilotReconcileInterval != 0 {
effectiveReconcileInterval = c.AutopilotReconcileInterval
}
if c.AutopilotUpdateInterval != 0 {
effectiveUpdateInterval = c.AutopilotUpdateInterval
}
if effectiveReconcileInterval < effectiveUpdateInterval {
return nil, fmt.Errorf("autopilot_reconcile_interval (%v) should be larger than autopilot_update_interval (%v)", effectiveReconcileInterval, effectiveUpdateInterval)
}
if uv, ok := conf["autopilot_upgrade_version"]; ok && uv != "" {
_, err := goversion.NewVersion(uv)
if err != nil {
return nil, fmt.Errorf("autopilot_upgrade_version does not parse as a semantic version: %w", err)
}
c.AutopilotUpgradeVersion = uv
}
if c.AutopilotUpgradeVersion != "" {
emitEntWarning(logger, "autopilot_upgrade_version")
}
// Note: historically we've never parsed retry_join here because we have to
// wait until we have leader TLS info before we can work out the final retry
// join parameters. That happens in JoinConfig. So right now nothing uses
// c.RetryJoin because it's not available at that point. But I think it's less
// surprising that if the field is present in the returned struct, that it
// should actually be populated and makes tests of this function less confusing
// too.
c.RetryJoin = conf["retry_join"]
c.RaftNonVoter = false
if v := os.Getenv(EnvVaultRaftNonVoter); v != "" {
// Consistent with handling of other raft boolean env vars
// VAULT_RAFT_AUTOPILOT_DISABLE and VAULT_RAFT_FREELIST_SYNC
c.RaftNonVoter = true
} else if v, ok := conf[raftNonVoterConfigKey]; ok {
nonVoter, err := strconv.ParseBool(v)
if err != nil {
return nil, fmt.Errorf("failed to parse %s config value %q as a boolean: %w", raftNonVoterConfigKey, v, err)
}
c.RaftNonVoter = nonVoter
}
if c.RaftNonVoter && c.RetryJoin == "" {
return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey)
}
if c.RaftNonVoter {
emitEntWarning(logger, raftNonVoterConfigKey)
}
c.AutopilotRedundancyZone = conf["autopilot_redundancy_zone"]
if c.AutopilotRedundancyZone == "" {
emitEntWarning(logger, "autopilot_redundancy_zone")
}
return c, nil
}
// boltOptions returns a bolt.Options struct, suitable for passing to
// bolt.Open(), pre-configured with all of our preferred defaults.
func boltOptions(path string) *bolt.Options {
o := &bolt.Options{
Timeout: 1 * time.Second,
FreelistType: bolt.FreelistMapType,
NoFreelistSync: true,
MmapFlags: getMmapFlags(path),
}
if os.Getenv("VAULT_RAFT_FREELIST_TYPE") == "array" {
o.FreelistType = bolt.FreelistArrayType
}
if os.Getenv("VAULT_RAFT_FREELIST_SYNC") != "" {
o.NoFreelistSync = false
}
// By default, we want to set InitialMmapSize to 100GB, but only on 64bit platforms.
// Otherwise, we set it to whatever the value of VAULT_RAFT_INITIAL_MMAP_SIZE
// is, assuming it can be parsed as an int. Bolt itself sets this to 0 by default,
// so if users are wanting to turn this off, they can also set it to 0. Setting it
// to a negative value is the same as not setting it at all.
if os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE") == "" {
o.InitialMmapSize = initialMmapSize
} else {
imms, err := strconv.Atoi(os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE"))
// If there's an error here, it means they passed something that's not convertible to
// a number. Rather than fail startup, just ignore it.
if err == nil && imms > 0 {
o.InitialMmapSize = imms
}
}
return o
}
func etcdboltOptions(path string) *etcdbolt.Options {
o := &etcdbolt.Options{
Timeout: 1 * time.Second,
FreelistType: etcdbolt.FreelistMapType,
NoFreelistSync: true,
MmapFlags: getMmapFlags(path),
}
if os.Getenv("VAULT_RAFT_FREELIST_TYPE") == "array" {
o.FreelistType = etcdbolt.FreelistArrayType
}
if os.Getenv("VAULT_RAFT_FREELIST_SYNC") != "" {
o.NoFreelistSync = false
}
// By default, we want to set InitialMmapSize to 100GB, but only on 64bit platforms.
// Otherwise, we set it to whatever the value of VAULT_RAFT_INITIAL_MMAP_SIZE
// is, assuming it can be parsed as an int. Bolt itself sets this to 0 by default,
// so if users are wanting to turn this off, they can also set it to 0. Setting it
// to a negative value is the same as not setting it at all.
if os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE") == "" {
o.InitialMmapSize = initialMmapSize
} else {
imms, err := strconv.Atoi(os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE"))
// If there's an error here, it means they passed something that's not convertible to
// a number. Rather than fail startup, just ignore it.
if err == nil && imms > 0 {
o.InitialMmapSize = imms
}
}
return o
}

View file

@ -0,0 +1,399 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package raft
import (
"bytes"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
func TestRaft_ParseConfig(t *testing.T) {
// Note some of these can be parallel tests but since we need to setEnv in
// some we can't make them all parallel so it's don inside the loop. We assume
// if a case doesn't set anything on the Env it's safe to run in parallel.
tcs := []struct {
name string
conf map[string]string
env map[string]string
wantMutation func(cfg *RaftBackendConfig)
wantErr string
wantWarns []string
}{
// RAFT WAL --------------------------------------------------------------
{
name: "WAL backend junk",
conf: map[string]string{
"raft_wal": "notabooleanlol",
},
wantErr: "does not parse as a boolean",
},
{
name: "WAL verifier junk",
conf: map[string]string{
"raft_wal": "true",
"raft_log_verifier_enabled": "notabooleanlol",
},
wantErr: "does not parse as a boolean",
},
{
name: "WAL verifier interval, zero",
conf: map[string]string{
"raft_log_verifier_enabled": "true",
"raft_log_verification_interval": "0s",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RaftLogVerifierEnabled = true
cfg.RaftLogVerificationInterval = defaultRaftLogVerificationInterval
},
},
{
name: "WAL verifier interval, one",
conf: map[string]string{
"raft_log_verifier_enabled": "true",
"raft_log_verification_interval": "0s",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RaftLogVerifierEnabled = true
// Below min so should get default
cfg.RaftLogVerificationInterval = defaultRaftLogVerificationInterval
},
},
{
name: "WAL verifier interval, nothing",
conf: map[string]string{
"raft_log_verifier_enabled": "true",
"raft_log_verification_interval": "",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RaftLogVerifierEnabled = true
cfg.RaftLogVerificationInterval = defaultRaftLogVerificationInterval
},
},
{
name: "WAL verifier interval, valid",
conf: map[string]string{
"raft_log_verifier_enabled": "true",
"raft_log_verification_interval": "75s",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RaftLogVerifierEnabled = true
cfg.RaftLogVerificationInterval = 75 * time.Second
},
},
{
name: "WAL verifier interval, junk",
conf: map[string]string{
"raft_log_verifier_enabled": "true",
"raft_log_verification_interval": "notaduration",
},
wantErr: "does not parse as a duration",
},
// AUTOPILOT Upgrades ----------------------------------------------------
{
name: "Autopilot upgrade version, junk",
conf: map[string]string{
"autopilot_upgrade_version": "hahano",
},
wantErr: "does not parse",
},
// Non-voter config ------------------------------------------------------
{
name: "non-voter, no retry-join, valid false",
conf: map[string]string{
raftNonVoterConfigKey: "false",
},
wantMutation: func(cfg *RaftBackendConfig) {
// Should be default
},
wantWarns: []string{"is configuration for a Vault Enterprise feature and has been ignored"},
},
{
name: "non-voter, retry-join, valid false",
conf: map[string]string{
"retry_join": "not-empty",
raftNonVoterConfigKey: "false",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RetryJoin = "not-empty"
},
},
{
name: "non-voter, no retry-join, valid true",
conf: map[string]string{
raftNonVoterConfigKey: "true",
},
wantErr: "only valid if at least one retry_join stanza is specified",
},
{
name: "non-voter, retry-join, valid true",
conf: map[string]string{
"retry_join": "not-empty",
raftNonVoterConfigKey: "true",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RetryJoin = "not-empty"
cfg.RaftNonVoter = true
},
},
{
name: "non-voter, no retry-join, invalid empty",
conf: map[string]string{
raftNonVoterConfigKey: "",
},
wantErr: "failed to parse retry_join_as_non_voter",
},
{
name: "non-voter, retry-join, invalid empty",
conf: map[string]string{
"retry_join": "not-empty",
raftNonVoterConfigKey: "",
},
wantErr: "failed to parse retry_join_as_non_voter",
},
{
name: "non-voter, no retry-join, invalid truthy",
conf: map[string]string{
raftNonVoterConfigKey: "no",
},
wantErr: "failed to parse retry_join_as_non_voter",
},
{
name: "non-voter, retry-join, invalid truthy",
conf: map[string]string{
"retry_join": "not-empty",
raftNonVoterConfigKey: "no",
},
wantErr: "failed to parse retry_join_as_non_voter",
},
{
name: "non-voter, no retry-join, invalid",
conf: map[string]string{
raftNonVoterConfigKey: "totallywrong",
},
wantErr: "failed to parse retry_join_as_non_voter",
},
{
name: "non-voter, retry-join, invalid",
conf: map[string]string{
"retry_join": "not-empty",
raftNonVoterConfigKey: "totallywrong",
},
wantErr: "failed to parse retry_join_as_non_voter",
},
{
// Note for historical reasons we treat any non-empty value as true in ENV
// vars.
name: "non-voter, no retry-join, valid env false",
env: map[string]string{
EnvVaultRaftNonVoter: "false",
},
wantErr: "only valid if at least one retry_join stanza is specified",
},
{
name: "non-voter, retry-join, valid env false",
env: map[string]string{
EnvVaultRaftNonVoter: "false",
},
conf: map[string]string{
"retry_join": "not-empty",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RetryJoin = "not-empty"
cfg.RaftNonVoter = true // Any non-empty value is true
},
},
{
name: "non-voter, no retry-join, valid env true",
env: map[string]string{
EnvVaultRaftNonVoter: "true",
},
wantErr: "only valid if at least one retry_join stanza is specified",
},
{
name: "non-voter, retry-join, valid env true",
env: map[string]string{
EnvVaultRaftNonVoter: "true",
},
conf: map[string]string{
"retry_join": "not-empty",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RetryJoin = "not-empty"
cfg.RaftNonVoter = true
},
},
{
name: "non-voter, no retry-join, valid env not-boolean",
env: map[string]string{
EnvVaultRaftNonVoter: "anything",
},
wantErr: "only valid if at least one retry_join stanza is specified",
},
{
name: "non-voter, retry-join, valid env not-boolean",
env: map[string]string{
EnvVaultRaftNonVoter: "anything",
},
conf: map[string]string{
"retry_join": "not-empty",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RetryJoin = "not-empty"
cfg.RaftNonVoter = true
},
},
{
name: "non-voter, no retry-join, valid env empty",
env: map[string]string{
EnvVaultRaftNonVoter: "",
},
wantMutation: func(cfg *RaftBackendConfig) {
// Default
},
},
{
name: "non-voter, retry-join, valid env empty",
env: map[string]string{
EnvVaultRaftNonVoter: "",
},
conf: map[string]string{
"retry_join": "not-empty",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RetryJoin = "not-empty"
},
},
{
name: "non-voter, no retry-join, both set env preferred",
env: map[string]string{
EnvVaultRaftNonVoter: "true",
},
conf: map[string]string{
raftNonVoterConfigKey: "false",
},
wantErr: "only valid if at least one retry_join stanza is specified",
},
{
name: "non-voter, retry-join, both set env preferred",
env: map[string]string{
EnvVaultRaftNonVoter: "true",
},
conf: map[string]string{
"retry_join": "not-empty",
raftNonVoterConfigKey: "false",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.RetryJoin = "not-empty"
cfg.RaftNonVoter = true // Env should win
},
},
// Entry Size Limits -----------------------------------------------------
{
name: "entry size, happy path",
conf: map[string]string{
"max_entry_size": "123456",
"max_mount_and_namespace_table_entry_size": "654321",
},
wantMutation: func(cfg *RaftBackendConfig) {
cfg.MaxEntrySize = 123456
cfg.MaxMountAndNamespaceTableEntrySize = 654321
},
},
{
name: "entry size, junk entry size",
conf: map[string]string{
"max_entry_size": "sadfsaf",
"max_mount_and_namespace_table_entry_size": "654321",
},
wantErr: "failed to parse 'max_entry_size'",
},
{
name: "entry size, junk mount entry size",
conf: map[string]string{
"max_entry_size": "123456",
"max_mount_and_namespace_table_entry_size": "1MiB",
},
wantErr: "failed to parse 'max_mount_and_namespace_table_entry_size'",
},
{
name: "entry size, way too small mount entry size",
conf: map[string]string{
"max_mount_and_namespace_table_entry_size": "1",
},
wantErr: "'max_mount_and_namespace_table_entry_size' must be at least 1024 bytes",
},
{
name: "entry size, way too big mount entry size",
conf: map[string]string{
"max_mount_and_namespace_table_entry_size": "20000000",
},
wantErr: "'max_mount_and_namespace_table_entry_size' must be at most 10,485,760 bytes (10MiB)",
},
}
// Set a nodeid and path to remove noise from all the test cases.
baseConf := map[string]string{
"node_id": "abc123",
"path": "/dummy/path",
}
for _, tc := range tcs {
tc := tc
t.Run(tc.name, func(t *testing.T) {
if len(tc.env) == 0 {
// Only run in parallel if there are no env vars to set.
t.Parallel()
}
var logs bytes.Buffer
logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Warn,
Output: &logs,
})
if tc.conf == nil {
tc.conf = make(map[string]string)
}
for k, v := range baseConf {
if _, ok := tc.conf[k]; !ok {
tc.conf[k] = v
}
}
// Make a default-valued config to compare against later. Note we do this
// before setting ENV as that would could change behavior!
wantCfg, err := parseRaftBackendConfig(baseConf, hclog.NewNullLogger())
require.NoError(t, err)
for k, v := range tc.env {
t.Setenv(k, v)
}
cfg, err := parseRaftBackendConfig(tc.conf, logger)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
return
}
tc.wantMutation(wantCfg)
require.Equal(t, wantCfg, cfg)
allLogs := logs.String()
for _, warn := range tc.wantWarns {
require.Contains(t, allLogs, warn)
}
})
}
}

View file

@ -26,8 +26,6 @@ import (
"github.com/hashicorp/go-raftchunking"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/go-secure-stdlib/tlsutil"
"github.com/hashicorp/go-uuid"
goversion "github.com/hashicorp/go-version"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
@ -80,11 +78,12 @@ var getMmapFlags = func(string) int { return 0 }
// Verify RaftBackend satisfies the correct interfaces
var (
_ physical.Backend = (*RaftBackend)(nil)
_ physical.Transactional = (*RaftBackend)(nil)
_ physical.TransactionalLimits = (*RaftBackend)(nil)
_ physical.HABackend = (*RaftBackend)(nil)
_ physical.Lock = (*RaftLock)(nil)
_ physical.Backend = (*RaftBackend)(nil)
_ physical.Transactional = (*RaftBackend)(nil)
_ physical.TransactionalLimits = (*RaftBackend)(nil)
_ physical.HABackend = (*RaftBackend)(nil)
_ physical.MountTableLimitingBackend = (*RaftBackend)(nil)
_ physical.Lock = (*RaftLock)(nil)
)
var (
@ -179,6 +178,13 @@ type RaftBackend struct {
// performance.
maxEntrySize uint64
// maxMountAndNamespaceEntrySize imposes a size limit (in bytes) on a raft
// entry (put or transaction) for paths related to mount table and namespace
// metadata. The Raft storage doesn't "know" about these paths but Vault can
// call RegisterMountTablePath to inform it so that it can apply this
// alternate limit if one is configured.
maxMountAndNamespaceEntrySize uint64
// maxBatchEntries is the number of operation entries in each batch. It is set
// by default to a value we've tested to work well but may be overridden by
// Environment variable VAULT_RAFT_MAX_BATCH_ENTRIES.
@ -241,6 +247,10 @@ type RaftBackend struct {
// it writes checkpoints.
raftLogVerifierEnabled bool
raftLogVerificationInterval time.Duration
// specialPathLimits is a map of special paths to their configured entrySize
// limits.
specialPathLimits map[string]uint64
}
// LeaderJoinInfo contains information required by a node to join itself as a
@ -298,25 +308,6 @@ type LeaderJoinInfo struct {
TLSConfig *tls.Config `json:"-"`
}
type RaftBackendConfig struct {
Path string
NodeId string
ApplyDelay time.Duration
RaftWal bool
RaftLogVerifierEnabled bool
RaftLogVerificationInterval time.Duration
SnapshotDelay time.Duration
MaxEntrySize uint64
MaxBatchEntries int
MaxBatchSize int
AutopilotReconcileInterval time.Duration
AutopilotUpdateInterval time.Duration
AutopilotUpgradeVersion string
AutopilotRedundancyZone string
RaftNonVoter bool
RetryJoin string
}
// JoinConfig returns a list of information about possible leader nodes that
// this node can join as a follower
func (b *RaftBackend) JoinConfig() ([]*LeaderJoinInfo, error) {
@ -568,32 +559,48 @@ func NewRaftBackend(conf map[string]string, logger log.Logger) (physical.Backend
}
return &RaftBackend{
logger: logger,
fsm: fsm,
raftInitCh: make(chan struct{}),
conf: conf,
logStore: logStore,
stableStore: stableStore,
snapStore: snapStore,
closers: closers,
dataDir: backendConfig.Path,
localID: backendConfig.NodeId,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
maxEntrySize: backendConfig.MaxEntrySize,
maxBatchEntries: backendConfig.MaxBatchEntries,
maxBatchSize: backendConfig.MaxBatchSize,
followerHeartbeatTicker: time.NewTicker(time.Second),
autopilotReconcileInterval: backendConfig.AutopilotReconcileInterval,
autopilotUpdateInterval: backendConfig.AutopilotUpdateInterval,
redundancyZone: backendConfig.AutopilotRedundancyZone,
nonVoter: backendConfig.RaftNonVoter,
upgradeVersion: backendConfig.AutopilotUpgradeVersion,
failGetInTxn: new(uint32),
raftLogVerifierEnabled: backendConfig.RaftLogVerifierEnabled,
raftLogVerificationInterval: backendConfig.RaftLogVerificationInterval,
logger: logger,
fsm: fsm,
raftInitCh: make(chan struct{}),
conf: conf,
logStore: logStore,
stableStore: stableStore,
snapStore: snapStore,
closers: closers,
dataDir: backendConfig.Path,
localID: backendConfig.NodeId,
permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
maxEntrySize: backendConfig.MaxEntrySize,
maxMountAndNamespaceEntrySize: backendConfig.MaxMountAndNamespaceTableEntrySize,
maxBatchEntries: backendConfig.MaxBatchEntries,
maxBatchSize: backendConfig.MaxBatchSize,
followerHeartbeatTicker: time.NewTicker(time.Second),
autopilotReconcileInterval: backendConfig.AutopilotReconcileInterval,
autopilotUpdateInterval: backendConfig.AutopilotUpdateInterval,
redundancyZone: backendConfig.AutopilotRedundancyZone,
nonVoter: backendConfig.RaftNonVoter,
upgradeVersion: backendConfig.AutopilotUpgradeVersion,
failGetInTxn: new(uint32),
raftLogVerifierEnabled: backendConfig.RaftLogVerifierEnabled,
raftLogVerificationInterval: backendConfig.RaftLogVerificationInterval,
}, nil
}
// RegisterMountTablePath informs the Backend that the given path represents
// part of the mount tables or related metadata. This allows the backend to
// apply different limits for this entry if configured to do so.
func (b *RaftBackend) RegisterMountTablePath(path string) {
// We don't need to lock here because this is only called during startup
if b.maxMountAndNamespaceEntrySize > 0 {
// Set up the limit for this special path.
if b.specialPathLimits == nil {
b.specialPathLimits = make(map[string]uint64)
}
b.specialPathLimits[path] = b.maxMountAndNamespaceEntrySize
}
}
type snapshotStoreDelay struct {
logger log.Logger
wrapped raft.SnapshotStore
@ -1717,9 +1724,10 @@ func (b *RaftBackend) Get(ctx context.Context, path string) (*physical.Entry, er
entry, err := b.fsm.Get(ctx, path)
if entry != nil {
valueLen := len(entry.Value)
if uint64(valueLen) > b.maxEntrySize {
b.logger.Warn("retrieved entry value is too large, has raft's max_entry_size been reduced?",
"size", valueLen, "max_entry_size", b.maxEntrySize)
maxSize := b.entrySizeLimitForPath(path)
if uint64(valueLen) > maxSize {
b.logger.Warn("retrieved entry value is too large, has raft's max_entry_size or max_mount_and_namespace_table_entry_size been reduced?",
"size", valueLen, "max_size", maxSize)
}
}
@ -1847,6 +1855,31 @@ func (b *RaftBackend) TransactionLimits() (int, int) {
return b.maxBatchEntries, b.maxBatchSize
}
// validateCommandEntrySizes is a helper to check the size of each transaction
// value isn't larger than is allowed. It must take into account the path in
// case any special limits have been set for mount table paths. Finally it
// returns the largest limit it needed to use so that calling code can size the
// overall log entry check correctly. In other words if max_entry_size is 1MB
// and max_mount_and_namespace_table_entry_size is 2MB, we check each key
// against the right limit and then return 1MB unless there is at least one
// mount table key being written in which case we allow the larger limit of 2MB.
func (b *RaftBackend) validateCommandEntrySizes(command *LogData) (uint64, error) {
largestEntryLimit := b.maxEntrySize
for _, op := range command.Operations {
if op.OpType == putOp {
entrySize := b.entrySizeLimitForPath(op.Key)
if len(op.Value) > int(entrySize) {
return 0, fmt.Errorf("%s, max value size for key %s is %d, got %d", physical.ErrValueTooLarge, op.Key, entrySize, len(op.Value))
}
if entrySize > largestEntryLimit {
largestEntryLimit = entrySize
}
}
}
return largestEntryLimit, nil
}
// applyLog will take a given log command and apply it to the raft log. applyLog
// doesn't return until the log has been applied to a quorum of servers and is
// persisted to the local FSM. Caller should hold the backend's read lock.
@ -1858,14 +1891,19 @@ func (b *RaftBackend) applyLog(ctx context.Context, command *LogData) error {
return err
}
totalLogSizeLimit, err := b.validateCommandEntrySizes(command)
if err != nil {
return err
}
commandBytes, err := proto.Marshal(command)
if err != nil {
return err
}
cmdSize := len(commandBytes)
if uint64(cmdSize) > b.maxEntrySize {
return fmt.Errorf("%s; got %d bytes, max: %d bytes", physical.ErrValueTooLarge, cmdSize, b.maxEntrySize)
if uint64(cmdSize) > totalLogSizeLimit {
return fmt.Errorf("%s; got %d bytes, max: %d bytes", physical.ErrValueTooLarge, cmdSize, totalLogSizeLimit)
}
defer metrics.AddSample([]string{"raft-storage", "entry_size"}, float32(cmdSize))
@ -2120,248 +2158,6 @@ func fileExists(name string) (bool, error) {
return false, err
}
func parseRaftBackendConfig(conf map[string]string, logger log.Logger) (*RaftBackendConfig, error) {
c := &RaftBackendConfig{}
c.Path = conf["path"]
envPath := os.Getenv(EnvVaultRaftPath)
if envPath != "" {
c.Path = envPath
}
if c.Path == "" {
return nil, fmt.Errorf("'path' must be set")
}
c.NodeId = conf["node_id"]
envNodeId := os.Getenv(EnvVaultRaftNodeID)
if envNodeId != "" {
c.NodeId = envNodeId
}
if c.NodeId == "" {
localIDRaw, err := os.ReadFile(filepath.Join(c.Path, "node-id"))
if err == nil && len(localIDRaw) > 0 {
c.NodeId = string(localIDRaw)
}
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}
}
if c.NodeId == "" {
id, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
if err = os.WriteFile(filepath.Join(c.Path, "node-id"), []byte(id), 0o600); err != nil {
return nil, err
}
c.NodeId = id
}
if delayRaw, ok := conf["apply_delay"]; ok {
delay, err := parseutil.ParseDurationSecond(delayRaw)
if err != nil {
return nil, fmt.Errorf("apply_delay does not parse as a duration: %w", err)
}
c.ApplyDelay = delay
}
if walRaw, ok := conf["raft_wal"]; ok {
useRaftWal, err := strconv.ParseBool(walRaw)
if err != nil {
return nil, fmt.Errorf("raft_wal does not parse as a boolean: %w", err)
}
c.RaftWal = useRaftWal
}
if rlveRaw, ok := conf["raft_log_verifier_enabled"]; ok {
rlve, err := strconv.ParseBool(rlveRaw)
if err != nil {
return nil, fmt.Errorf("raft_log_verifier_enabled does not parse as a boolean: %w", err)
}
c.RaftLogVerifierEnabled = rlve
c.RaftLogVerificationInterval = defaultRaftLogVerificationInterval
if rlviRaw, ok := conf["raft_log_verification_interval"]; ok {
rlvi, err := parseutil.ParseDurationSecond(rlviRaw)
if err != nil {
return nil, fmt.Errorf("raft_log_verification_interval does not parse as a duration: %w", err)
}
// Make sure our interval is capped to a reasonable value, so e.g. people don't use 0s or 1s
if rlvi >= minimumRaftLogVerificationInterval {
c.RaftLogVerificationInterval = rlvi
} else {
logger.Warn("raft_log_verification_interval is less than the minimum allowed, using default instead",
"given", rlveRaw,
"minimum", minimumRaftLogVerificationInterval,
"default", defaultRaftLogVerificationInterval)
}
}
}
if delayRaw, ok := conf["snapshot_delay"]; ok {
delay, err := parseutil.ParseDurationSecond(delayRaw)
if err != nil {
return nil, fmt.Errorf("snapshot_delay does not parse as a duration: %w", err)
}
c.SnapshotDelay = delay
}
c.MaxEntrySize = defaultMaxEntrySize
if maxEntrySizeCfg := conf["max_entry_size"]; len(maxEntrySizeCfg) != 0 {
i, err := strconv.Atoi(maxEntrySizeCfg)
if err != nil {
return nil, fmt.Errorf("failed to parse 'max_entry_size': %w", err)
}
c.MaxEntrySize = uint64(i)
}
c.MaxBatchEntries, c.MaxBatchSize = batchLimitsFromEnv(logger)
if interval := conf["autopilot_reconcile_interval"]; interval != "" {
interval, err := parseutil.ParseDurationSecond(interval)
if err != nil {
return nil, fmt.Errorf("autopilot_reconcile_interval does not parse as a duration: %w", err)
}
c.AutopilotReconcileInterval = interval
}
if interval := conf["autopilot_update_interval"]; interval != "" {
interval, err := parseutil.ParseDurationSecond(interval)
if err != nil {
return nil, fmt.Errorf("autopilot_update_interval does not parse as a duration: %w", err)
}
c.AutopilotUpdateInterval = interval
}
effectiveReconcileInterval := autopilot.DefaultReconcileInterval
effectiveUpdateInterval := autopilot.DefaultUpdateInterval
if c.AutopilotReconcileInterval != 0 {
effectiveReconcileInterval = c.AutopilotReconcileInterval
}
if c.AutopilotUpdateInterval != 0 {
effectiveUpdateInterval = c.AutopilotUpdateInterval
}
if effectiveReconcileInterval < effectiveUpdateInterval {
return nil, fmt.Errorf("autopilot_reconcile_interval (%v) should be larger than autopilot_update_interval (%v)", effectiveReconcileInterval, effectiveUpdateInterval)
}
if uv, ok := conf["autopilot_upgrade_version"]; ok && uv != "" {
_, err := goversion.NewVersion(uv)
if err != nil {
return nil, fmt.Errorf("autopilot_upgrade_version does not parse as a semantic version: %w", err)
}
c.AutopilotUpgradeVersion = uv
}
c.RaftNonVoter = false
if v := os.Getenv(EnvVaultRaftNonVoter); v != "" {
// Consistent with handling of other raft boolean env vars
// VAULT_RAFT_AUTOPILOT_DISABLE and VAULT_RAFT_FREELIST_SYNC
c.RaftNonVoter = true
} else if v, ok := conf[raftNonVoterConfigKey]; ok {
nonVoter, err := strconv.ParseBool(v)
if err != nil {
return nil, fmt.Errorf("failed to parse %s config value %q as a boolean: %w", raftNonVoterConfigKey, v, err)
}
c.RaftNonVoter = nonVoter
}
if c.RaftNonVoter && conf["retry_join"] == "" {
return nil, fmt.Errorf("setting %s to true is only valid if at least one retry_join stanza is specified", raftNonVoterConfigKey)
}
c.AutopilotRedundancyZone = conf["autopilot_redundancy_zone"]
return c, nil
}
// boltOptions returns a bolt.Options struct, suitable for passing to
// bolt.Open(), pre-configured with all of our preferred defaults.
func boltOptions(path string) *bolt.Options {
o := &bolt.Options{
Timeout: 1 * time.Second,
FreelistType: bolt.FreelistMapType,
NoFreelistSync: true,
MmapFlags: getMmapFlags(path),
}
if os.Getenv("VAULT_RAFT_FREELIST_TYPE") == "array" {
o.FreelistType = bolt.FreelistArrayType
}
if os.Getenv("VAULT_RAFT_FREELIST_SYNC") != "" {
o.NoFreelistSync = false
}
// By default, we want to set InitialMmapSize to 100GB, but only on 64bit platforms.
// Otherwise, we set it to whatever the value of VAULT_RAFT_INITIAL_MMAP_SIZE
// is, assuming it can be parsed as an int. Bolt itself sets this to 0 by default,
// so if users are wanting to turn this off, they can also set it to 0. Setting it
// to a negative value is the same as not setting it at all.
if os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE") == "" {
o.InitialMmapSize = initialMmapSize
} else {
imms, err := strconv.Atoi(os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE"))
// If there's an error here, it means they passed something that's not convertible to
// a number. Rather than fail startup, just ignore it.
if err == nil && imms > 0 {
o.InitialMmapSize = imms
}
}
return o
}
func etcdboltOptions(path string) *etcdbolt.Options {
o := &etcdbolt.Options{
Timeout: 1 * time.Second,
FreelistType: etcdbolt.FreelistMapType,
NoFreelistSync: true,
MmapFlags: getMmapFlags(path),
}
if os.Getenv("VAULT_RAFT_FREELIST_TYPE") == "array" {
o.FreelistType = etcdbolt.FreelistArrayType
}
if os.Getenv("VAULT_RAFT_FREELIST_SYNC") != "" {
o.NoFreelistSync = false
}
// By default, we want to set InitialMmapSize to 100GB, but only on 64bit platforms.
// Otherwise, we set it to whatever the value of VAULT_RAFT_INITIAL_MMAP_SIZE
// is, assuming it can be parsed as an int. Bolt itself sets this to 0 by default,
// so if users are wanting to turn this off, they can also set it to 0. Setting it
// to a negative value is the same as not setting it at all.
if os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE") == "" {
o.InitialMmapSize = initialMmapSize
} else {
imms, err := strconv.Atoi(os.Getenv("VAULT_RAFT_INITIAL_MMAP_SIZE"))
// If there's an error here, it means they passed something that's not convertible to
// a number. Rather than fail startup, just ignore it.
if err == nil && imms > 0 {
o.InitialMmapSize = imms
}
}
return o
}
func isRaftLogVerifyCheckpoint(l *raft.Log) bool {
if !bytes.Equal(l.Data, []byte{byte(verifierCheckpointOp)}) {
return false

View file

@ -23,7 +23,6 @@ import (
bolt "github.com/hashicorp-forge/bbolt"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/base62"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
@ -285,209 +284,6 @@ func TestRaft_VerifierEnabled(t *testing.T) {
})
}
// TestRaft_ParseRaftWalBackend ensures that the raft_wal config option parses correctly and returns an error if not
func TestRaft_ParseRaftWalBackend(t *testing.T) {
raftDir := t.TempDir()
conf := map[string]string{
"path": raftDir,
"node_id": "abc123",
"raft_wal": "notabooleanlol",
}
_, err := NewRaftBackend(conf, hclog.NewNullLogger())
if err == nil {
t.Fatal("expected an error but got none")
}
if !strings.Contains(err.Error(), "does not parse as a boolean") {
t.Fatal("expected an error about parsing config keys but got none")
}
}
// TestRaft_ParseRaftWalVerifierEnabled checks to make sure we error correctly if raft_log_verifier_enabled is not a boolean
func TestRaft_ParseRaftWalVerifierEnabled(t *testing.T) {
raftDir := t.TempDir()
conf := map[string]string{
"path": raftDir,
"node_id": "abc123",
"raft_wal": "true",
"raft_log_verifier_enabled": "notabooleanlol",
}
_, err := NewRaftBackend(conf, hclog.NewNullLogger())
if err == nil {
t.Fatal("expected an error but got none")
}
if !strings.Contains(err.Error(), "does not parse as a boolean") {
t.Fatal("expected an error about parsing config keys but got none")
}
}
// TestRaft_ParseRaftWalVerifierInterval checks to make sure we handle various intervals correctly and have a default
func TestRaft_ParseRaftWalVerifierInterval(t *testing.T) {
testCases := []struct {
name string
givenInterval string
expectedInterval string
shouldError bool
}{
{
"zero",
"0s",
defaultRaftLogVerificationInterval.String(),
false,
},
{
"one",
"1s",
defaultRaftLogVerificationInterval.String(),
false,
},
{
"nothing",
"",
defaultRaftLogVerificationInterval.String(),
false,
},
{
"default",
"60s",
defaultRaftLogVerificationInterval.String(),
false,
},
{
"more than the default",
"75s",
"75s",
false,
},
{
"obviously wrong",
"notadurationlol",
"",
true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
raftDir := t.TempDir()
conf := map[string]string{
"path": raftDir,
"node_id": "abc123",
"raft_wal": "true",
"raft_log_verifier_enabled": "true",
"raft_log_verification_interval": tc.givenInterval,
}
rbRaw, err := NewRaftBackend(conf, hclog.NewNullLogger())
if tc.shouldError {
if err == nil {
t.Fatal("expected an error but got none")
}
// return early, since we got the error we wanted
return
}
if !tc.shouldError && err != nil {
t.Fatal(err)
}
rb := rbRaw.(*RaftBackend)
parsedExpectedInterval, err := parseutil.ParseDurationSecond(tc.expectedInterval)
if err != nil {
t.Fatal(err)
}
if parsedExpectedInterval != rb.verificationInterval() {
t.Fatal("expected intervals to match but they didn't")
}
})
}
}
// TestRaft_ParseAutopilotUpgradeVersion tests that autopilot_upgrade_version parses correctly and returns an error if not
func TestRaft_ParseAutopilotUpgradeVersion(t *testing.T) {
raftDir := t.TempDir()
conf := map[string]string{
"path": raftDir,
"node_id": "abc123",
"autopilot_upgrade_version": "hahano",
}
_, err := NewRaftBackend(conf, hclog.NewNullLogger())
if err == nil {
t.Fatal("expected an error but got none")
}
if !strings.Contains(err.Error(), "does not parse") {
t.Fatal("expected an error about unparseable versions but got none")
}
}
func TestRaft_ParseNonVoter(t *testing.T) {
p := func(s string) *string {
return &s
}
for _, retryJoinConf := range []string{"", "not-empty"} {
t.Run(retryJoinConf, func(t *testing.T) {
for name, tc := range map[string]struct {
envValue *string
configValue *string
expectNonVoter bool
invalidNonVoterValue bool
}{
"valid false": {nil, p("false"), false, false},
"valid true": {nil, p("true"), true, false},
"invalid empty": {nil, p(""), false, true},
"invalid truthy": {nil, p("no"), false, true},
"invalid": {nil, p("totallywrong"), false, true},
"valid env false": {p("false"), nil, true, false},
"valid env true": {p("true"), nil, true, false},
"valid env not boolean": {p("anything"), nil, true, false},
"valid env empty": {p(""), nil, false, false},
"neither set, default false": {nil, nil, false, false},
"both set, env preferred": {p("true"), p("false"), true, false},
} {
t.Run(name, func(t *testing.T) {
if tc.envValue != nil {
t.Setenv(EnvVaultRaftNonVoter, *tc.envValue)
}
raftDir := t.TempDir()
conf := map[string]string{
"path": raftDir,
"node_id": "abc123",
"retry_join": retryJoinConf,
}
if tc.configValue != nil {
conf[raftNonVoterConfigKey] = *tc.configValue
}
backend, err := NewRaftBackend(conf, hclog.NewNullLogger())
switch {
case tc.invalidNonVoterValue || (retryJoinConf == "" && tc.expectNonVoter):
if err == nil {
t.Fatal("expected an error but got none")
}
default:
if err != nil {
t.Fatalf("expected no error but got: %s", err)
}
raftBackend := backend.(*RaftBackend)
if tc.expectNonVoter != raftBackend.NonVoter() {
t.Fatalf("expected %s %v but got %v", raftNonVoterConfigKey, tc.expectNonVoter, raftBackend.NonVoter())
}
}
})
}
})
}
}
func TestRaft_Backend_LargeKey(t *testing.T) {
t.Parallel()

View file

@ -9,6 +9,7 @@ import (
"context"
"errors"
"github.com/hashicorp/go-hclog"
autopilot "github.com/hashicorp/raft-autopilot"
)
@ -23,6 +24,10 @@ func (b *RaftBackend) AddNonVotingPeer(ctx context.Context, peerID, clusterAddr
return errors.New("adding non voting peer is not allowed")
}
func (b *RaftBackend) entrySizeLimitForPath(path string) uint64 {
return b.maxEntrySize
}
func autopilotToAPIServerEnterprise(_ *autopilot.Server, _ *AutopilotServer) error {
return nil
}
@ -42,3 +47,7 @@ func (d *Delegate) autopilotServerExt(_ *FollowerState) interface{} {
func (d *Delegate) meta(_ *FollowerState) map[string]string {
return nil
}
func emitEntWarning(logger hclog.Logger, field string) {
logger.Warn("%s is configuration for a Vault Enterprise feature and has been ignored.", field)
}

View file

@ -14,18 +14,25 @@ import (
)
func GetRaft(t testing.TB, bootstrap bool, noStoreState bool) (*RaftBackend, string) {
return getRaftInternal(t, bootstrap, defaultRaftConfig(t, bootstrap, noStoreState), nil)
return getRaftInternal(t, bootstrap, defaultRaftConfig(t, bootstrap, noStoreState), nil, nil)
}
func GetRaftWithConfig(t testing.TB, bootstrap bool, noStoreState bool, conf map[string]string) (*RaftBackend, string) {
defaultConf := defaultRaftConfig(t, bootstrap, noStoreState)
conf["path"] = defaultConf["path"]
conf["doNotStoreLatestState"] = defaultConf["doNotStoreLatestState"]
return getRaftInternal(t, bootstrap, conf, nil)
return getRaftInternal(t, bootstrap, conf, nil, nil)
}
func GetRaftWithConfigAndInitFn(t testing.TB, bootstrap bool, noStoreState bool, conf map[string]string, initFn func(b *RaftBackend)) (*RaftBackend, string) {
defaultConf := defaultRaftConfig(t, bootstrap, noStoreState)
conf["path"] = defaultConf["path"]
conf["doNotStoreLatestState"] = defaultConf["doNotStoreLatestState"]
return getRaftInternal(t, bootstrap, conf, nil, initFn)
}
func GetRaftWithLogOutput(t testing.TB, bootstrap bool, noStoreState bool, logOutput io.Writer) (*RaftBackend, string) {
return getRaftInternal(t, bootstrap, defaultRaftConfig(t, bootstrap, noStoreState), logOutput)
return getRaftInternal(t, bootstrap, defaultRaftConfig(t, bootstrap, noStoreState), logOutput, nil)
}
func defaultRaftConfig(t testing.TB, bootstrap bool, noStoreState bool) map[string]string {
@ -44,7 +51,7 @@ func defaultRaftConfig(t testing.TB, bootstrap bool, noStoreState bool) map[stri
return conf
}
func getRaftInternal(t testing.TB, bootstrap bool, conf map[string]string, logOutput io.Writer) (*RaftBackend, string) {
func getRaftInternal(t testing.TB, bootstrap bool, conf map[string]string, logOutput io.Writer, initFn func(b *RaftBackend)) (*RaftBackend, string) {
id, err := uuid.GenerateUUID()
if err != nil {
t.Fatal(err)
@ -63,6 +70,9 @@ func getRaftInternal(t testing.TB, bootstrap bool, conf map[string]string, logOu
t.Fatal(err)
}
backend := backendRaw.(*RaftBackend)
if initFn != nil {
initFn(backend)
}
if bootstrap {
err = backend.Bootstrap([]Peer{

View file

@ -139,6 +139,16 @@ type RedirectDetect interface {
DetectHostAddr() (string, error)
}
// MountTableLimitingBackend is an optional interface a Backend can implement
// that allows it to support different entry size limits for mount-table-related
// paths. It will only be called in Vault Enterprise.
type MountTableLimitingBackend interface {
// RegisterMountTablePath informs the Backend that the given path represents
// part of the mount tables or related metadata. This allows the backend to
// apply different limits for this entry if configured to do so.
RegisterMountTablePath(path string)
}
type Lock interface {
// Lock is used to acquire the given lock
// The stopCh is optional and if closed should interrupt the lock