mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Add retry support for HTTP service discovery
Implements configurable retry logic for HTTP service discovery with exponential backoff, following the same pattern as remote write. Configuration: - min_backoff: Minimum backoff duration for retries (default: 0, disabled) - Retries are disabled by default (backward compatible) - When enabled, uses exponential backoff (doubling on each retry) - refresh_interval acts as both max backoff and total timeout - Retries only occur for retryable errors (5xx, 429, network timeouts) Example configuration: ```yaml http_sd_configs: - url: http://example.com/targets.json refresh_interval: 30s min_backoff: 1s # Set to 0 or omit to disable retries ``` Implementation details: - Simple configuration with single min_backoff parameter - Aligned with remote write retry pattern for consistency - Total timeout enforced via context.WithTimeout(refresh_interval) - HTTP requests use timeout context to prevent hanging - Non-retryable errors (4xx, parsing errors) fail immediately - Context cancellation stops retries gracefully Metrics: - prometheus_sd_http_retries_total: Counter of retry attempts by attempt number - prometheus_sd_http_failures_total: Counter of final failures Error classification: - Retryable: HTTP 5xx, HTTP 429, network timeouts, temporary errors - Non-retryable: HTTP 4xx, parsing errors, context cancellation The implementation ensures Refresh() always returns within refresh_interval, preventing deadlocks even with persistent failures or slow connections. Fixes #17137 Signed-off-by: fancivez <fancivez@gmail.com>
This commit is contained in:
parent
9e4d23ddaf
commit
fec0f10b59
4 changed files with 579 additions and 10 deletions
|
|
@ -43,6 +43,7 @@ var (
|
|||
DefaultSDConfig = SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
RefreshInterval: model.Duration(60 * time.Second),
|
||||
MinBackoff: model.Duration(0), // No retries by default
|
||||
}
|
||||
userAgent = version.PrometheusUserAgent()
|
||||
matchContentType = regexp.MustCompile(`^(?i:application\/json(;\s*charset=("utf-8"|utf-8))?)$`)
|
||||
|
|
@ -57,6 +58,11 @@ type SDConfig struct {
|
|||
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
|
||||
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
|
||||
URL string `yaml:"url"`
|
||||
// MinBackoff is the minimum backoff duration for retries.
|
||||
// Set to 0 to disable retries (default behavior).
|
||||
// Retries use exponential backoff, doubling on each attempt,
|
||||
// with refresh_interval as the hard upper limit.
|
||||
MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
|
||||
}
|
||||
|
||||
// NewDiscovererMetrics implements discovery.Config.
|
||||
|
|
@ -98,6 +104,15 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(any) error) error {
|
|||
if parsedURL.Host == "" {
|
||||
return errors.New("host is missing in URL")
|
||||
}
|
||||
|
||||
// Validate min_backoff
|
||||
if c.MinBackoff < 0 {
|
||||
return errors.New("min_backoff must not be negative")
|
||||
}
|
||||
if c.MinBackoff > c.RefreshInterval {
|
||||
return errors.New("min_backoff must not be greater than refresh_interval")
|
||||
}
|
||||
|
||||
return c.HTTPClientConfig.Validate()
|
||||
}
|
||||
|
||||
|
|
@ -110,8 +125,10 @@ type Discovery struct {
|
|||
url string
|
||||
client *http.Client
|
||||
refreshInterval time.Duration
|
||||
minBackoff time.Duration
|
||||
tgLastLength int
|
||||
metrics *httpMetrics
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewDiscovery returns a new HTTP discovery for the given config.
|
||||
|
|
@ -135,7 +152,9 @@ func NewDiscovery(conf *SDConfig, logger *slog.Logger, clientOpts []config.HTTPC
|
|||
url: conf.URL,
|
||||
client: client,
|
||||
refreshInterval: time.Duration(conf.RefreshInterval), // Stored to be sent as headers.
|
||||
minBackoff: time.Duration(conf.MinBackoff),
|
||||
metrics: m,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
d.Discovery = refresh.NewDiscovery(
|
||||
|
|
@ -150,7 +169,18 @@ func NewDiscovery(conf *SDConfig, logger *slog.Logger, clientOpts []config.HTTPC
|
|||
return d, nil
|
||||
}
|
||||
|
||||
func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
||||
// retryableHTTPError wraps an HTTP status code error that should be retried.
|
||||
type retryableHTTPError struct {
|
||||
statusCode int
|
||||
status string
|
||||
}
|
||||
|
||||
func (e *retryableHTTPError) Error() string {
|
||||
return fmt.Sprintf("server returned HTTP status %s", e.status)
|
||||
}
|
||||
|
||||
// doRefresh performs a single HTTP SD refresh attempt without retry logic.
|
||||
func (d *Discovery) doRefresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, d.url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -161,7 +191,6 @@ func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
|||
|
||||
resp, err := d.client.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
d.metrics.failuresCount.Inc()
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
|
|
@ -170,33 +199,34 @@ func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
|||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
d.metrics.failuresCount.Inc()
|
||||
// Wrap retryable status codes in retryableHTTPError
|
||||
if isRetryableStatusCode(resp.StatusCode) {
|
||||
return nil, &retryableHTTPError{
|
||||
statusCode: resp.StatusCode,
|
||||
status: resp.Status,
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
|
||||
}
|
||||
|
||||
if !matchContentType.MatchString(strings.TrimSpace(resp.Header.Get("Content-Type"))) {
|
||||
d.metrics.failuresCount.Inc()
|
||||
return nil, fmt.Errorf("unsupported content type %q", resp.Header.Get("Content-Type"))
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
d.metrics.failuresCount.Inc()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var targetGroups []*targetgroup.Group
|
||||
|
||||
if err := json.Unmarshal(b, &targetGroups); err != nil {
|
||||
d.metrics.failuresCount.Inc()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, tg := range targetGroups {
|
||||
if tg == nil {
|
||||
d.metrics.failuresCount.Inc()
|
||||
err = errors.New("nil target group item found")
|
||||
return nil, err
|
||||
return nil, errors.New("nil target group item found")
|
||||
}
|
||||
|
||||
tg.Source = urlSource(d.url, i)
|
||||
|
|
@ -216,7 +246,125 @@ func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
|||
return targetGroups, nil
|
||||
}
|
||||
|
||||
// Refresh performs HTTP SD refresh with retry logic.
|
||||
func (d *Discovery) Refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
||||
// If min_backoff is 0, retries are disabled
|
||||
if d.minBackoff == 0 {
|
||||
tgs, err := d.doRefresh(ctx)
|
||||
if err != nil {
|
||||
d.metrics.failuresCount.Inc()
|
||||
}
|
||||
return tgs, err
|
||||
}
|
||||
|
||||
// Retry logic enabled - use refresh_interval as total timeout
|
||||
retryCtx, cancel := context.WithTimeout(ctx, d.refreshInterval)
|
||||
defer cancel()
|
||||
|
||||
var lastErr error
|
||||
backoff := d.minBackoff
|
||||
attempt := 0
|
||||
|
||||
for {
|
||||
attempt++
|
||||
|
||||
// Attempt refresh - use retryCtx so HTTP request respects total timeout
|
||||
tgs, err := d.doRefresh(retryCtx)
|
||||
if err == nil {
|
||||
// Success
|
||||
if attempt > 1 {
|
||||
d.logger.Info("HTTP SD refresh succeeded after retry",
|
||||
"attempt", attempt,
|
||||
)
|
||||
}
|
||||
return tgs, nil
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
|
||||
// Check if timeout/cancellation occurred during request
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
|
||||
d.metrics.failuresCount.Inc()
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return nil, fmt.Errorf("HTTP SD refresh failed after %d attempts within %v timeout: %w", attempt, d.refreshInterval, lastErr)
|
||||
}
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
// Check if we should retry
|
||||
shouldRetry := false
|
||||
var httpErr *retryableHTTPError
|
||||
if errors.As(err, &httpErr) {
|
||||
shouldRetry = true
|
||||
} else {
|
||||
shouldRetry = isRetryableError(err)
|
||||
}
|
||||
|
||||
// If error is not retryable, stop immediately
|
||||
if !shouldRetry {
|
||||
d.metrics.failuresCount.Inc()
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
// Log retry decision
|
||||
d.logger.Warn("HTTP SD refresh failed, will retry",
|
||||
"attempt", attempt,
|
||||
"backoff", backoff,
|
||||
"err", err,
|
||||
)
|
||||
d.metrics.retriesCount.WithLabelValues(strconv.Itoa(attempt)).Inc()
|
||||
|
||||
// Wait before retrying with backoff
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
// Double the backoff for next attempt, capped at refresh_interval
|
||||
backoff *= 2
|
||||
if backoff > d.refreshInterval {
|
||||
backoff = d.refreshInterval
|
||||
}
|
||||
case <-retryCtx.Done():
|
||||
// Timeout or cancellation during backoff
|
||||
d.metrics.failuresCount.Inc()
|
||||
if errors.Is(retryCtx.Err(), context.DeadlineExceeded) {
|
||||
return nil, fmt.Errorf("HTTP SD refresh failed after %d attempts within %v timeout: %w", attempt, d.refreshInterval, lastErr)
|
||||
}
|
||||
return nil, retryCtx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// urlSource returns a source ID for the i-th target group per URL.
|
||||
func urlSource(url string, i int) string {
|
||||
return fmt.Sprintf("%s:%d", url, i)
|
||||
}
|
||||
|
||||
// isRetryableStatusCode returns true if the HTTP status code indicates a retryable error.
|
||||
func isRetryableStatusCode(statusCode int) bool {
|
||||
// Retry on 5xx server errors and 429 rate limit
|
||||
return statusCode >= 500 || statusCode == http.StatusTooManyRequests
|
||||
}
|
||||
|
||||
// isRetryableError returns true if the error is a network error that should be retried.
|
||||
func isRetryableError(err error) bool {
|
||||
// Do not retry on context cancellation or deadline exceeded
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check for url.Error which wraps network errors
|
||||
var urlErr *url.Error
|
||||
if errors.As(err, &urlErr) {
|
||||
// Timeout errors are retryable
|
||||
if urlErr.Timeout() {
|
||||
return true
|
||||
}
|
||||
// Temporary network errors are retryable
|
||||
type temporary interface{ Temporary() bool }
|
||||
if te, ok := urlErr.Err.(temporary); ok {
|
||||
return te.Temporary()
|
||||
}
|
||||
}
|
||||
|
||||
// Do not retry on other errors (e.g., HTTP 4xx, parsing errors, etc.)
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promslog"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
|
|
@ -210,6 +211,418 @@ func TestContentTypeRegex(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRetryOnRetryableErrors(t *testing.T) {
|
||||
var attemptCount int
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
attemptCount++
|
||||
if attemptCount < 3 {
|
||||
// Return retryable error for first 2 attempts
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// Succeed on 3rd attempt
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintln(w, `[{"labels": {"k": "v"}, "targets": ["127.0.0.1"]}]`)
|
||||
}))
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
cfg := SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
URL: ts.URL,
|
||||
RefreshInterval: model.Duration(30 * time.Second),
|
||||
MinBackoff: model.Duration(10 * time.Millisecond),
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
||||
defer refreshMetrics.Unregister()
|
||||
metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics)
|
||||
require.NoError(t, metrics.Register())
|
||||
defer metrics.Unregister()
|
||||
|
||||
d, err := NewDiscovery(&cfg, promslog.NewNopLogger(), nil, metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
tgs, err := d.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, attemptCount)
|
||||
require.Len(t, tgs, 1)
|
||||
|
||||
// Verify retry metrics were incremented
|
||||
httpMetrics := metrics.(*httpMetrics)
|
||||
retryMetric, err := httpMetrics.retriesCount.GetMetricWithLabelValues("1")
|
||||
require.NoError(t, err)
|
||||
var m dto.Metric
|
||||
require.NoError(t, retryMetric.Write(&m))
|
||||
require.Equal(t, 1.0, *m.Counter.Value)
|
||||
|
||||
retryMetric2, err := httpMetrics.retriesCount.GetMetricWithLabelValues("2")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, retryMetric2.Write(&m))
|
||||
require.Equal(t, 1.0, *m.Counter.Value)
|
||||
}
|
||||
|
||||
func TestRetryExhaustion(t *testing.T) {
|
||||
var attemptCount int
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
attemptCount++
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
cfg := SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
URL: ts.URL,
|
||||
RefreshInterval: model.Duration(200 * time.Millisecond),
|
||||
MinBackoff: model.Duration(10 * time.Millisecond),
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
||||
defer refreshMetrics.Unregister()
|
||||
metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics)
|
||||
require.NoError(t, metrics.Register())
|
||||
defer metrics.Unregister()
|
||||
|
||||
d, err := NewDiscovery(&cfg, promslog.NewNopLogger(), nil, metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = d.Refresh(ctx)
|
||||
require.Error(t, err)
|
||||
// Should timeout after refresh_interval (200ms)
|
||||
require.Contains(t, err.Error(), "timeout")
|
||||
// Should have made multiple attempts within the timeout
|
||||
require.Greater(t, attemptCount, 1)
|
||||
require.Equal(t, 1.0, getFailureCount(d.metrics.failuresCount))
|
||||
}
|
||||
|
||||
func TestNoRetryOnNonRetryableErrors(t *testing.T) {
|
||||
var attemptCount int
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
attemptCount++
|
||||
w.WriteHeader(http.StatusBadRequest) // 4xx is not retryable
|
||||
}))
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
cfg := SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
URL: ts.URL,
|
||||
RefreshInterval: model.Duration(30 * time.Second),
|
||||
MinBackoff: model.Duration(10 * time.Millisecond),
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
||||
defer refreshMetrics.Unregister()
|
||||
metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics)
|
||||
require.NoError(t, metrics.Register())
|
||||
defer metrics.Unregister()
|
||||
|
||||
d, err := NewDiscovery(&cfg, promslog.NewNopLogger(), nil, metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = d.Refresh(ctx)
|
||||
require.Error(t, err)
|
||||
require.EqualError(t, err, "server returned HTTP status 400 Bad Request")
|
||||
require.Equal(t, 1, attemptCount) // Should only attempt once
|
||||
require.Equal(t, 1.0, getFailureCount(d.metrics.failuresCount))
|
||||
}
|
||||
|
||||
func TestRetryOn429RateLimit(t *testing.T) {
|
||||
var attemptCount int
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
attemptCount++
|
||||
if attemptCount < 2 {
|
||||
w.WriteHeader(http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintln(w, `[{"labels": {"k": "v"}, "targets": ["127.0.0.1"]}]`)
|
||||
}))
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
cfg := SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
URL: ts.URL,
|
||||
RefreshInterval: model.Duration(30 * time.Second),
|
||||
MinBackoff: model.Duration(10 * time.Millisecond),
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
||||
defer refreshMetrics.Unregister()
|
||||
metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics)
|
||||
require.NoError(t, metrics.Register())
|
||||
defer metrics.Unregister()
|
||||
|
||||
d, err := NewDiscovery(&cfg, promslog.NewNopLogger(), nil, metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
tgs, err := d.Refresh(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, attemptCount)
|
||||
require.Len(t, tgs, 1)
|
||||
}
|
||||
|
||||
func TestRetryContextCancellation(t *testing.T) {
|
||||
var attemptCount int
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
attemptCount++
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
cfg := SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
URL: ts.URL,
|
||||
RefreshInterval: model.Duration(30 * time.Second),
|
||||
MinBackoff: model.Duration(100 * time.Millisecond),
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
||||
defer refreshMetrics.Unregister()
|
||||
metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics)
|
||||
require.NoError(t, metrics.Register())
|
||||
defer metrics.Unregister()
|
||||
|
||||
d, err := NewDiscovery(&cfg, promslog.NewNopLogger(), nil, metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// Cancel context after a short delay
|
||||
go func() {
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
_, err = d.Refresh(ctx)
|
||||
require.Error(t, err)
|
||||
require.ErrorIs(t, err, context.Canceled)
|
||||
// Should have attempted at least once
|
||||
require.Positive(t, attemptCount)
|
||||
}
|
||||
|
||||
func TestSlowHTTPRequestTimeout(t *testing.T) {
|
||||
requestStarted := make(chan struct{})
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
|
||||
close(requestStarted)
|
||||
// Simulate a slow/hanging request by waiting for context cancellation
|
||||
<-r.Context().Done()
|
||||
// Request was cancelled by timeout
|
||||
}))
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
cfg := SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
URL: ts.URL,
|
||||
RefreshInterval: model.Duration(300 * time.Millisecond),
|
||||
MinBackoff: model.Duration(50 * time.Millisecond),
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
||||
defer refreshMetrics.Unregister()
|
||||
metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics)
|
||||
require.NoError(t, metrics.Register())
|
||||
defer metrics.Unregister()
|
||||
|
||||
d, err := NewDiscovery(&cfg, promslog.NewNopLogger(), nil, metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
start := time.Now()
|
||||
_, err = d.Refresh(ctx)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
require.Error(t, err)
|
||||
// Should timeout after refresh_interval (300ms)
|
||||
require.Greater(t, elapsed, 250*time.Millisecond, "should wait at least close to refresh_interval")
|
||||
require.Less(t, elapsed, 500*time.Millisecond, "should not wait much longer than refresh_interval")
|
||||
require.Contains(t, err.Error(), "timeout")
|
||||
// Verify request was started
|
||||
select {
|
||||
case <-requestStarted:
|
||||
default:
|
||||
t.Fatal("HTTP request was never started")
|
||||
}
|
||||
require.Equal(t, 1.0, getFailureCount(d.metrics.failuresCount))
|
||||
}
|
||||
|
||||
func TestDefaultMinBackoff(t *testing.T) {
|
||||
// Test that default min_backoff is 0 (retries disabled)
|
||||
require.Equal(t, model.Duration(0), DefaultSDConfig.MinBackoff)
|
||||
}
|
||||
|
||||
func TestMinBackoffValidation(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
yaml string
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "valid config with min_backoff",
|
||||
yaml: `
|
||||
url: http://example.com
|
||||
refresh_interval: 60s
|
||||
min_backoff: 1s
|
||||
`,
|
||||
expectedErr: "",
|
||||
},
|
||||
{
|
||||
name: "min_backoff zero disables retries",
|
||||
yaml: `
|
||||
url: http://example.com
|
||||
refresh_interval: 60s
|
||||
min_backoff: 0
|
||||
`,
|
||||
expectedErr: "",
|
||||
},
|
||||
{
|
||||
name: "negative min_backoff is invalid",
|
||||
yaml: `
|
||||
url: http://example.com
|
||||
refresh_interval: 60s
|
||||
min_backoff: -1s
|
||||
`,
|
||||
expectedErr: "not a valid duration string",
|
||||
},
|
||||
{
|
||||
name: "min_backoff greater than refresh_interval",
|
||||
yaml: `
|
||||
url: http://example.com
|
||||
refresh_interval: 30s
|
||||
min_backoff: 60s
|
||||
`,
|
||||
expectedErr: "min_backoff must not be greater than refresh_interval",
|
||||
},
|
||||
{
|
||||
name: "missing URL",
|
||||
yaml: `
|
||||
refresh_interval: 60s
|
||||
`,
|
||||
expectedErr: "URL is missing",
|
||||
},
|
||||
{
|
||||
name: "invalid URL scheme",
|
||||
yaml: `
|
||||
url: ftp://example.com
|
||||
refresh_interval: 60s
|
||||
`,
|
||||
expectedErr: "URL scheme must be 'http' or 'https'",
|
||||
},
|
||||
{
|
||||
name: "missing host in URL",
|
||||
yaml: `
|
||||
url: http://
|
||||
refresh_interval: 60s
|
||||
`,
|
||||
expectedErr: "host is missing in URL",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var cfg SDConfig
|
||||
err := yaml.Unmarshal([]byte(tt.yaml), &cfg)
|
||||
|
||||
if tt.expectedErr != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), tt.expectedErr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNoRetryWhenMinBackoffIsZero(t *testing.T) {
|
||||
var attemptCount int
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
attemptCount++
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
cfg := SDConfig{
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
URL: ts.URL,
|
||||
RefreshInterval: model.Duration(30 * time.Second),
|
||||
MinBackoff: model.Duration(0), // Retries disabled
|
||||
}
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
||||
defer refreshMetrics.Unregister()
|
||||
metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics)
|
||||
require.NoError(t, metrics.Register())
|
||||
defer metrics.Unregister()
|
||||
|
||||
d, err := NewDiscovery(&cfg, promslog.NewNopLogger(), nil, metrics)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = d.Refresh(ctx)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, 1, attemptCount) // Should only attempt once
|
||||
require.Equal(t, 1.0, getFailureCount(d.metrics.failuresCount))
|
||||
}
|
||||
|
||||
func TestUnmarshalYAML(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
yaml string
|
||||
validate func(*testing.T, SDConfig, error)
|
||||
}{
|
||||
{
|
||||
name: "apply defaults when min_backoff not specified",
|
||||
yaml: `
|
||||
url: http://example.com
|
||||
refresh_interval: 60s
|
||||
`,
|
||||
validate: func(t *testing.T, cfg SDConfig, err error) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, model.Duration(0), cfg.MinBackoff)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "preserve user-specified min_backoff",
|
||||
yaml: `
|
||||
url: http://example.com
|
||||
refresh_interval: 60s
|
||||
min_backoff: 5s
|
||||
`,
|
||||
validate: func(t *testing.T, cfg SDConfig, err error) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, model.Duration(5*time.Second), cfg.MinBackoff)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "https URL is valid",
|
||||
yaml: `
|
||||
url: https://example.com
|
||||
refresh_interval: 60s
|
||||
`,
|
||||
validate: func(t *testing.T, cfg SDConfig, err error) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "https://example.com", cfg.URL)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
var cfg SDConfig
|
||||
err := yaml.Unmarshal([]byte(tt.yaml), &cfg)
|
||||
tt.validate(t, cfg, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSourceDisappeared(t *testing.T) {
|
||||
var stubResponse string
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ type httpMetrics struct {
|
|||
refreshMetrics discovery.RefreshMetricsInstantiator
|
||||
|
||||
failuresCount prometheus.Counter
|
||||
retriesCount *prometheus.CounterVec
|
||||
|
||||
metricRegisterer discovery.MetricRegisterer
|
||||
}
|
||||
|
|
@ -37,10 +38,17 @@ func newDiscovererMetrics(reg prometheus.Registerer, rmi discovery.RefreshMetric
|
|||
Name: "prometheus_sd_http_failures_total",
|
||||
Help: "Number of HTTP service discovery refresh failures.",
|
||||
}),
|
||||
retriesCount: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "prometheus_sd_http_retries_total",
|
||||
Help: "Number of HTTP service discovery retries by attempt number.",
|
||||
},
|
||||
[]string{"attempt"}),
|
||||
}
|
||||
|
||||
m.metricRegisterer = discovery.NewMetricRegisterer(reg, []prometheus.Collector{
|
||||
m.failuresCount,
|
||||
m.retriesCount,
|
||||
})
|
||||
|
||||
return m
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -230,7 +230,7 @@ require (
|
|||
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
gotest.tools/v3 v3.0.3 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b // indirect
|
||||
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect
|
||||
|
|
|
|||
Loading…
Reference in a new issue