This commit is contained in:
Will Bollock 2026-02-03 12:33:24 +01:00 committed by GitHub
commit 117ea050e6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 145 additions and 55 deletions

View file

@ -867,19 +867,20 @@ func main() {
os.Exit(1)
}
sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
sdMetrics, refreshMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer)
if err != nil {
logger.Error("failed to register service discovery metrics", "err", err)
os.Exit(1)
}
discoveryManagerScrape = discovery.NewManager(ctxScrape, logger.With("component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("scrape"), discovery.FeatureRegistry(features.DefaultRegistry))
discoveryManagerScrape = discovery.NewManager(ctxScrape, logger.With("component", "discovery manager scrape"), prometheus.DefaultRegisterer, sdMetrics, refreshMetrics, discovery.Name("scrape"), discovery.FeatureRegistry(features.DefaultRegistry))
if discoveryManagerScrape == nil {
logger.Error("failed to create a discovery manager scrape")
os.Exit(1)
}
discoveryManagerNotify = discovery.NewManager(ctxNotify, logger.With("component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, discovery.Name("notify"), discovery.FeatureRegistry(features.DefaultRegistry))
discoveryManagerNotify = discovery.NewManager(ctxNotify, logger.With("component", "discovery manager notify"), prometheus.DefaultRegisterer, sdMetrics, refreshMetrics, discovery.Name("notify"), discovery.FeatureRegistry(features.DefaultRegistry))
if discoveryManagerNotify == nil {
logger.Error("failed to create a discovery manager notify")
os.Exit(1)

View file

@ -83,6 +83,7 @@ type RefreshMetricsInstantiator interface {
type RefreshMetricsManager interface {
DiscovererMetrics
RefreshMetricsInstantiator
DeleteLabelValues(mech, config string)
}
// A Config provides the configuration and constructor for a Discoverer.

View file

@ -70,34 +70,34 @@ func (p *Provider) Config() any {
// CreateAndRegisterSDMetrics registers the metrics needed for SD mechanisms.
// Does not register the metrics for the Discovery Manager.
// TODO(ptodev): Add ability to unregister the metrics?
func CreateAndRegisterSDMetrics(reg prometheus.Registerer) (map[string]DiscovererMetrics, error) {
func CreateAndRegisterSDMetrics(reg prometheus.Registerer) (map[string]DiscovererMetrics, RefreshMetricsManager, error) {
// Some SD mechanisms use the "refresh" package, which has its own metrics.
refreshSdMetrics := NewRefreshMetrics(reg)
// Register the metrics specific for each SD mechanism, and the ones for the refresh package.
sdMetrics, err := RegisterSDMetrics(reg, refreshSdMetrics)
if err != nil {
return nil, fmt.Errorf("failed to register service discovery metrics: %w", err)
return nil, nil, fmt.Errorf("failed to register service discovery metrics: %w", err)
}
return sdMetrics, nil
return sdMetrics, refreshSdMetrics, nil
}
// NewManager is the Discovery Manager constructor.
func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.Registerer, sdMetrics map[string]DiscovererMetrics, options ...func(*Manager)) *Manager {
func NewManager(ctx context.Context, logger *slog.Logger, registerer prometheus.Registerer, sdMetrics map[string]DiscovererMetrics, refreshMetrics RefreshMetricsManager, options ...func(*Manager)) *Manager {
if logger == nil {
logger = promslog.NewNopLogger()
}
mgr := &Manager{
logger: logger,
syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey]map[string]*targetgroup.Group),
ctx: ctx,
updatert: 5 * time.Second,
triggerSend: make(chan struct{}, 1),
registerer: registerer,
sdMetrics: sdMetrics,
logger: logger,
syncCh: make(chan map[string][]*targetgroup.Group),
targets: make(map[poolKey]map[string]*targetgroup.Group),
ctx: ctx,
updatert: 5 * time.Second,
triggerSend: make(chan struct{}, 1),
registerer: registerer,
sdMetrics: sdMetrics,
refreshMetrics: refreshMetrics,
}
for _, option := range options {
option(mgr)
@ -190,8 +190,9 @@ type Manager struct {
// A registerer for all service discovery metrics.
registerer prometheus.Registerer
metrics *Metrics
sdMetrics map[string]DiscovererMetrics
metrics *Metrics
sdMetrics map[string]DiscovererMetrics
refreshMetrics RefreshMetricsManager
// featureRegistry is used to track which service discovery providers are configured.
featureRegistry features.Collector
@ -251,6 +252,21 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
prov.cancel()
prov.mu.RUnlock()
// Clear up refresh metrics associated with this cancelled provider (sub means scrape job name).
m.targetsMtx.Lock()
for s := range prov.subs {
// Also clean up discovered targets metric. targetsMtx lock needed for safe access to m.targets.
delete(m.targets, poolKey{s, prov.name})
m.metrics.DiscoveredTargets.DeleteLabelValues(s)
if m.refreshMetrics != nil {
if cfg, ok := prov.config.(Config); ok {
m.refreshMetrics.DeleteLabelValues(cfg.Name(), s)
}
}
}
m.targetsMtx.Unlock()
continue
}
prov.mu.RUnlock()
@ -266,7 +282,15 @@ func (m *Manager) ApplyConfig(cfg map[string]Configs) error {
// Remove obsolete subs' targets.
if _, ok := prov.newSubs[s]; !ok {
delete(m.targets, poolKey{s, prov.name})
m.metrics.DiscoveredTargets.DeleteLabelValues(m.name, s)
m.metrics.DiscoveredTargets.DeleteLabelValues(s)
// Clean up refresh metrics again for subs that are being removed from a provider that is still running.
if m.refreshMetrics != nil {
cfg, ok := prov.config.(Config)
if ok {
m.refreshMetrics.DeleteLabelValues(cfg.Name(), s)
}
}
}
}
// Set metrics and targets for new subs.

View file

@ -673,9 +673,9 @@ func TestTargetUpdatesOrder(t *testing.T) {
defer cancel()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
@ -786,9 +786,9 @@ func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -822,9 +822,9 @@ func TestTargetSetTargetGroupsPresentOnConfigRename(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -861,9 +861,9 @@ func TestTargetSetTargetGroupsPresentOnConfigDuplicateAndDeleteOriginal(t *testi
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -903,9 +903,9 @@ func TestTargetSetTargetGroupsPresentOnConfigChange(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -970,9 +970,9 @@ func TestTargetSetRecreatesTargetGroupsOnConfigChange(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -1013,9 +1013,9 @@ func TestDiscovererConfigs(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -1049,9 +1049,9 @@ func TestTargetSetRecreatesEmptyStaticConfigs(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -1090,9 +1090,9 @@ func TestIdenticalConfigurationsAreCoalesced(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, nil, reg, sdMetrics)
discoveryManager := NewManager(ctx, nil, reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -1128,9 +1128,9 @@ func TestApplyConfigDoesNotModifyStaticTargets(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -1188,9 +1188,9 @@ func TestGaugeFailedConfigs(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -1344,9 +1344,9 @@ func TestCoordinationWithReceiver(t *testing.T) {
defer cancel()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
mgr := NewManager(ctx, nil, reg, sdMetrics)
mgr := NewManager(ctx, nil, reg, sdMetrics, refreshMetrics)
require.NotNil(t, mgr)
mgr.updatert = updateDelay
go mgr.Run()
@ -1438,9 +1438,9 @@ func TestTargetSetTargetGroupsUpdateDuringApplyConfig(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
@ -1537,7 +1537,7 @@ func TestUnregisterMetrics(t *testing.T) {
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
// discoveryManager will be nil if there was an error configuring metrics.
require.NotNil(t, discoveryManager)
// Unregister all metrics.
@ -1550,14 +1550,64 @@ func TestUnregisterMetrics(t *testing.T) {
}
}
// Refresh and discovery metrics should be deleted for providers that are removed.
func TestMetricsCleanupAfterConfigReload(t *testing.T) {
ctx := t.Context()
reg := prometheus.NewRegistry()
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
discoveryManager := NewManager(ctx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond
go discoveryManager.Run()
c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090", "bar:9090"),
},
"other": {
staticConfig("baz:9090"),
},
}
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
// Manually instantiate refresh metrics to make them visible
refreshMetrics.Instantiate("static", "prometheus").Failures.Add(0)
refreshMetrics.Instantiate("static", "other").Failures.Add(0)
count, err := client_testutil.GatherAndCount(reg, "prometheus_sd_discovered_targets")
require.NoError(t, err)
require.Equal(t, 2, count)
count, err = client_testutil.GatherAndCount(reg, "prometheus_sd_refresh_failures_total")
require.NoError(t, err)
require.Equal(t, 2, count)
// Simulate a config refresh.
delete(c, "prometheus")
discoveryManager.ApplyConfig(c)
<-discoveryManager.SyncCh()
// Ensure we still have metrics for the remaining provider.
count, err = client_testutil.GatherAndCount(reg, "prometheus_sd_discovered_targets")
require.NoError(t, err)
require.Equal(t, 1, count)
count, err = client_testutil.GatherAndCount(reg, "prometheus_sd_refresh_failures_total")
require.NoError(t, err)
require.Equal(t, 1, count)
}
// Calling ApplyConfig() that removes providers at the same time as shutting down
// the manager should not hang.
func TestConfigReloadAndShutdownRace(t *testing.T) {
reg := prometheus.NewRegistry()
_, sdMetrics := NewTestMetrics(t, reg)
refreshMetrics, sdMetrics := NewTestMetrics(t, reg)
mgrCtx, mgrCancel := context.WithCancel(context.Background())
discoveryManager := NewManager(mgrCtx, promslog.NewNopLogger(), reg, sdMetrics)
discoveryManager := NewManager(mgrCtx, promslog.NewNopLogger(), reg, sdMetrics, refreshMetrics)
require.NotNil(t, discoveryManager)
discoveryManager.updatert = 100 * time.Millisecond

View file

@ -88,3 +88,12 @@ func (m *RefreshMetricsVecs) Register() error {
func (m *RefreshMetricsVecs) Unregister() {
m.metricRegisterer.UnregisterMetrics()
}
// DeleteLabelValues deletes refresh metrics for a specific mechanism and config.
// Smart to use this when a scrape job is removed.
func (m *RefreshMetricsVecs) DeleteLabelValues(mech, config string) {
// DeleteLabelValues is used over UnregisterMetrics to only delete metrics for a specific
// mechanism and config combination.
m.failuresVec.DeleteLabelValues(mech, config)
m.durationVec.DeleteLabelValues(mech, config)
}

View file

@ -41,7 +41,8 @@ Prometheus caches target lists. If an error occurs while fetching an updated
targets list, Prometheus keeps using the current targets list. The targets list
is not saved across restart. The `prometheus_sd_refresh_failures_total` counter
metric tracks the number of refresh failures and the `prometheus_sd_refresh_duration_seconds`
bucket can be used to track HTTP SD refresh attempts or performance.
bucket can be used to track HTTP SD refresh attempts or performance. These metrics are
removed when the underlying scrape job disappears on Prometheus configuration reload.
The whole list of targets must be returned on every scrape. There is no support
for incremental updates. A Prometheus instance does not send its hostname and it

View file

@ -281,7 +281,7 @@ func main() {
os.Exit(1)
}
sdAdapter := adapter.NewAdapter(ctx, *outputFile, "exampleSD", disc, logger, metrics, reg)
sdAdapter := adapter.NewAdapter(ctx, *outputFile, "exampleSD", disc, logger, metrics, refreshMetrics, reg)
sdAdapter.Run()
<-ctx.Done()

View file

@ -162,12 +162,12 @@ func (a *Adapter) Run() {
}
// NewAdapter creates a new instance of Adapter.
func NewAdapter(ctx context.Context, file, name string, d discovery.Discoverer, logger *slog.Logger, sdMetrics map[string]discovery.DiscovererMetrics, registerer prometheus.Registerer) *Adapter {
func NewAdapter(ctx context.Context, file, name string, d discovery.Discoverer, logger *slog.Logger, sdMetrics map[string]discovery.DiscovererMetrics, refreshMetrics discovery.RefreshMetricsManager, registerer prometheus.Registerer) *Adapter {
return &Adapter{
ctx: ctx,
disc: d,
groups: make(map[string]*customSD),
manager: discovery.NewManager(ctx, logger, registerer, sdMetrics),
manager: discovery.NewManager(ctx, logger, registerer, sdMetrics, refreshMetrics),
output: file,
name: name,
logger: logger,

View file

@ -235,6 +235,6 @@ func TestWriteOutput(t *testing.T) {
sdMetrics, err := discovery.RegisterSDMetrics(reg, refreshMetrics)
require.NoError(t, err)
adapter := NewAdapter(ctx, tmpfile.Name(), "test_sd", nil, nil, sdMetrics, reg)
adapter := NewAdapter(ctx, tmpfile.Name(), "test_sd", nil, nil, sdMetrics, refreshMetrics, reg)
require.NoError(t, adapter.writeOutput())
}

View file

@ -745,13 +745,15 @@ func TestHangingNotifier(t *testing.T) {
ctx, cancelSdManager := context.WithCancel(t.Context())
defer cancelSdManager()
reg := prometheus.NewRegistry()
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
refreshMetrics := discovery.NewRefreshMetrics(reg)
sdMetrics, err := discovery.RegisterSDMetrics(reg, refreshMetrics)
require.NoError(t, err)
sdManager := discovery.NewManager(
ctx,
promslog.NewNopLogger(),
reg,
sdMetrics,
refreshMetrics,
discovery.Name("sd-manager"),
discovery.Updatert(sdUpdatert),
)

View file

@ -1169,13 +1169,15 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond)
reg := prometheus.NewRegistry()
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
refreshMetrics := discovery.NewRefreshMetrics(reg)
sdMetrics, err := discovery.RegisterSDMetrics(reg, refreshMetrics)
require.NoError(t, err)
discoveryManager := discovery.NewManager(
ctx,
promslog.NewNopLogger(),
reg,
sdMetrics,
refreshMetrics,
discovery.Updatert(100*time.Millisecond),
)
scrapeManager, err := NewManager(