From ade3f08eca384af91661f889c038707f21fd48d3 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 27 Jan 2026 17:06:46 +0100 Subject: [PATCH] notifier: fix flaky TestHangingNotifier race condition (#17934) * notifier: fix flaky TestHangingNotifier race condition Make deterministic through `synctest.Test()`. --------- Signed-off-by: Arve Knudsen --- notifier/manager_test.go | 241 +++++++++++++++++++-------------------- 1 file changed, 118 insertions(+), 123 deletions(-) diff --git a/notifier/manager_test.go b/notifier/manager_test.go index ed224462ff..ba1d578d99 100644 --- a/notifier/manager_test.go +++ b/notifier/manager_test.go @@ -14,6 +14,7 @@ package notifier import ( + "bytes" "context" "encoding/json" "fmt" @@ -23,6 +24,7 @@ import ( "net/http/httptest" "net/url" "strconv" + "strings" "testing" "time" @@ -41,6 +43,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/util/testutil/synctest" ) func alertsEqual(a, b []*Alert) error { @@ -698,141 +701,133 @@ func makeInputTargetGroup() *targetgroup.Group { // queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676. // and https://github.com/prometheus/prometheus/issues/8768. func TestHangingNotifier(t *testing.T) { - const ( - batches = 100 - alertsCount = DefaultMaxBatchSize * batches - ) + synctest.Test(t, func(t *testing.T) { + const ( + batches = 100 + alertsCount = DefaultMaxBatchSize * batches - var ( - sendTimeout = 100 * time.Millisecond - sdUpdatert = sendTimeout / 2 + faultyURL = "http://faulty:9093/api/v2/alerts" + functionalURL = "http://functional:9093/api/v2/alerts" + ) - done = make(chan struct{}) - ) + var ( + sendTimeout = 100 * time.Millisecond + sdUpdatert = sendTimeout / 2 + ) - // Set up a faulty Alertmanager. - var faultyCalled atomic.Bool - faultyServer := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { - faultyCalled.Store(true) - select { - case <-done: - case <-time.After(time.Hour): - } - })) - defer func() { - close(done) - }() + // Track which alertmanagers have been called. + var faultyCalled, functionalCalled atomic.Bool - faultyURL, err := url.Parse(faultyServer.URL) - require.NoError(t, err) - faultyURL.Path = "/api/v2/alerts" - - // Set up a functional Alertmanager. - var functionalCalled atomic.Bool - functionalServer := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { - functionalCalled.Store(true) - })) - defer functionalServer.Close() - functionalURL, err := url.Parse(functionalServer.URL) - require.NoError(t, err) - functionalURL.Path = "/api/v2/alerts" - - // Initialize the discovery manager - // This is relevant as the updates aren't sent continually in real life, but only each updatert. - // The old implementation of TestHangingNotifier didn't take that into account. - ctx, cancelSdManager := context.WithCancel(t.Context()) - defer cancelSdManager() - reg := prometheus.NewRegistry() - sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) - require.NoError(t, err) - sdManager := discovery.NewManager( - ctx, - promslog.NewNopLogger(), - reg, - sdMetrics, - discovery.Name("sd-manager"), - discovery.Updatert(sdUpdatert), - ) - go sdManager.Run() - - // Set up the notifier with both faulty and functional Alertmanagers. - notifier := NewManager( - &Options{ - QueueCapacity: alertsCount, - Registerer: reg, - }, - model.UTF8Validation, - nil, - ) - notifier.alertmanagers = make(map[string]*alertmanagerSet) - amCfg := config.DefaultAlertmanagerConfig - amCfg.Timeout = model.Duration(sendTimeout) - notifier.alertmanagers["config-0"] = newTestAlertmanagerSet(&amCfg, nil, notifier.opts, notifier.metrics, faultyURL.String(), functionalURL.String()) - - for _, ams := range notifier.alertmanagers { - ams.startSendLoops(ams.ams) - } - - go notifier.Run(sdManager.SyncCh()) - defer notifier.Stop() - - require.Len(t, notifier.Alertmanagers(), 2) - - // Enqueue the alerts. - var alerts []*Alert - for i := range make([]struct{}, alertsCount) { - alerts = append(alerts, &Alert{ - Labels: labels.FromStrings("alertname", strconv.Itoa(i)), - }) - } - notifier.Send(alerts...) - - // Wait for the Alertmanagers to start receiving alerts. - // 10*sdUpdatert is used as an arbitrary timeout here. - timeout := time.After(10 * sdUpdatert) -loop1: - for { - select { - case <-timeout: - t.Fatalf("Timeout waiting for the alertmanagers to be reached for the first time.") - default: - if faultyCalled.Load() && functionalCalled.Load() { - break loop1 + // Fake Do function that simulates alertmanager behavior in-process. + // This runs within the synctest bubble, so time.Sleep uses fake time. + fakeDo := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { + url := req.URL.String() + if strings.Contains(url, "faulty") { + faultyCalled.Store(true) + // Faulty alertmanager hangs until context is canceled (by timeout). + <-ctx.Done() + return nil, ctx.Err() } + // Functional alertmanager responds successfully. + // Sleep simulates network latency that real HTTP would have—without it, + // the queue drains instantly and the final queueLen() assertion fails. + functionalCalled.Store(true) + time.Sleep(sendTimeout / 2) + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBuffer(nil)), + }, nil } - } - // Request to remove the faulty Alertmanager. - c := map[string]discovery.Configs{ - "config-0": { - discovery.StaticConfig{ - &targetgroup.Group{ - Targets: []model.LabelSet{ - { - model.AddressLabel: model.LabelValue(functionalURL.Host), + // Initialize the discovery manager + // This is relevant as the updates aren't sent continually in real life, but only each updatert. + // The old implementation of TestHangingNotifier didn't take that into account. + ctx, cancelSdManager := context.WithCancel(t.Context()) + defer cancelSdManager() + reg := prometheus.NewRegistry() + sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) + require.NoError(t, err) + sdManager := discovery.NewManager( + ctx, + promslog.NewNopLogger(), + reg, + sdMetrics, + discovery.Name("sd-manager"), + discovery.Updatert(sdUpdatert), + ) + go sdManager.Run() + + // Set up the notifier with both faulty and functional Alertmanagers. + notifier := NewManager( + &Options{ + QueueCapacity: alertsCount, + Registerer: reg, + Do: fakeDo, + }, + model.UTF8Validation, + nil, + ) + + notifier.alertmanagers = make(map[string]*alertmanagerSet) + amCfg := config.DefaultAlertmanagerConfig + amCfg.Timeout = model.Duration(sendTimeout) + notifier.alertmanagers["config-0"] = newTestAlertmanagerSet(&amCfg, nil, notifier.opts, notifier.metrics, faultyURL, functionalURL) + + for _, ams := range notifier.alertmanagers { + ams.startSendLoops(ams.ams) + } + + go notifier.Run(sdManager.SyncCh()) + t.Cleanup(func() { + notifier.Stop() + // Advance time so in-flight request timeouts fire. + time.Sleep(sendTimeout * 2) + }) + + require.Len(t, notifier.Alertmanagers(), 2) + + // Enqueue the alerts. + var alerts []*Alert + for i := range make([]struct{}, alertsCount) { + alerts = append(alerts, &Alert{ + Labels: labels.FromStrings("alertname", strconv.Itoa(i)), + }) + } + notifier.Send(alerts...) + + // Wait for the Alertmanagers to start receiving alerts. + // Use a polling loop since we need to wait for goroutines to process. + for !faultyCalled.Load() || !functionalCalled.Load() { + time.Sleep(sdUpdatert) + synctest.Wait() + } + + // Request to remove the faulty Alertmanager. + c := map[string]discovery.Configs{ + "config-0": { + discovery.StaticConfig{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: "functional:9093", + }, }, }, }, }, - }, - } - require.NoError(t, sdManager.ApplyConfig(c)) - - timeout = time.After(batches * sendTimeout) -loop2: - for { - select { - case <-timeout: - t.Fatalf("Timeout, the faulty alertmanager not removed on time.") - default: - // The faulty alertmanager was dropped. - if len(notifier.Alertmanagers()) == 1 { - // The notifier should not wait until the alerts queue of the functional am is empty to apply the discovery changes. - require.NotZero(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL.String()].queueLen()) - break loop2 - } } - } + require.NoError(t, sdManager.ApplyConfig(c)) + + // Wait for the discovery update to be processed. + // Advance time to trigger the discovery manager's update interval. + // The faulty alertmanager should be dropped without waiting for its queue to drain. + for len(notifier.Alertmanagers()) != 1 { + time.Sleep(sdUpdatert) + synctest.Wait() + } + // The notifier should not wait until the alerts queue of the functional am is empty to apply the discovery changes. + require.NotZero(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL].queueLen()) + }) } func TestStop_DrainingDisabled(t *testing.T) {