diff --git a/notifier/manager_test.go b/notifier/manager_test.go index ba1d578d99..d7108c1628 100644 --- a/notifier/manager_test.go +++ b/notifier/manager_test.go @@ -831,171 +831,153 @@ func TestHangingNotifier(t *testing.T) { } func TestStop_DrainingDisabled(t *testing.T) { - releaseReceiver := make(chan struct{}) - receiverReceivedRequest := make(chan struct{}, 2) - alertsReceived := atomic.NewInt64(0) + synctest.Test(t, func(t *testing.T) { + const alertmanagerURL = "http://alertmanager:9093/api/v2/alerts" - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Let the test know we've received a request. - receiverReceivedRequest <- struct{}{} + handlerStarted := make(chan struct{}) + alertsReceived := atomic.NewInt64(0) - var alerts []*Alert + // Fake Do function that simulates a hanging alertmanager that times out. + fakeDo := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { + var alerts []*Alert + b, err := io.ReadAll(req.Body) + if err != nil { + return nil, fmt.Errorf("read request body: %w", err) + } + if err := json.Unmarshal(b, &alerts); err != nil { + return nil, fmt.Errorf("unmarshal request body: %w", err) + } + alertsReceived.Add(int64(len(alerts))) - b, err := io.ReadAll(r.Body) - require.NoError(t, err) + // Signal arrival, then block until context times out. + handlerStarted <- struct{}{} + <-ctx.Done() - err = json.Unmarshal(b, &alerts) - require.NoError(t, err) + return nil, ctx.Err() + } - alertsReceived.Add(int64(len(alerts))) + reg := prometheus.NewRegistry() + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: false, + Registerer: reg, + Do: fakeDo, + }, + model.UTF8Validation, + nil, + ) - // Wait for the test to release us. - <-releaseReceiver + m.alertmanagers = make(map[string]*alertmanagerSet) - w.WriteHeader(http.StatusOK) - })) - defer func() { - server.Close() - }() + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, alertmanagerURL) - reg := prometheus.NewRegistry() - m := NewManager( - &Options{ - QueueCapacity: 10, - DrainOnShutdown: false, - Registerer: reg, - }, - model.UTF8Validation, - nil, - ) + for _, ams := range m.alertmanagers { + ams.startSendLoops(ams.ams) + } - m.alertmanagers = make(map[string]*alertmanagerSet) + // This will be waited on automatically when synctest.Test exits. + go m.Run(nil) - am1Cfg := config.DefaultAlertmanagerConfig - am1Cfg.Timeout = model.Duration(time.Second) - m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL) + // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) - for _, ams := range m.alertmanagers { - ams.startSendLoops(ams.ams) - } + // Wait for receiver to get the request. + <-handlerStarted - notificationManagerStopped := make(chan struct{}) + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) - go func() { - defer close(notificationManagerStopped) - m.Run(nil) - }() + // Stop the notification manager, then advance time to trigger the request timeout. + m.Stop() + time.Sleep(time.Second) - // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. - m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + // Allow goroutines to finish. + synctest.Wait() - select { - case <-receiverReceivedRequest: - // Nothing more to do. - case <-time.After(time.Second): - require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") - } - - m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) - - // Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed. - m.Stop() - time.Sleep(time.Second) - close(releaseReceiver) - - // Wait for the notification manager to stop and confirm only the first notification was sent. - // The second notification should be dropped. - select { - case <-notificationManagerStopped: - // Nothing more to do. - case <-time.After(time.Second): - require.FailNow(t, "gave up waiting for notification manager to stop") - } - - require.Equal(t, int64(1), alertsReceived.Load()) + // Confirm only the first notification was sent. The second notification should be dropped. + require.Equal(t, int64(1), alertsReceived.Load()) + }) } func TestStop_DrainingEnabled(t *testing.T) { - releaseReceiver := make(chan struct{}) - receiverReceivedRequest := make(chan struct{}, 2) - alertsReceived := atomic.NewInt64(0) + synctest.Test(t, func(t *testing.T) { + const alertmanagerURL = "http://alertmanager:9093/api/v2/alerts" - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - var alerts []*Alert + handlerStarted := make(chan struct{}, 1) + alertsReceived := atomic.NewInt64(0) - // Let the test know we've received a request. - receiverReceivedRequest <- struct{}{} + // Fake Do function that simulates alertmanager responding slowly but successfully. + fakeDo := func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) { + var alerts []*Alert + b, err := io.ReadAll(req.Body) + if err != nil { + return nil, fmt.Errorf("read request body: %w", err) + } + if err := json.Unmarshal(b, &alerts); err != nil { + return nil, fmt.Errorf("unmarshal request body: %w", err) + } + alertsReceived.Add(int64(len(alerts))) - b, err := io.ReadAll(r.Body) - require.NoError(t, err) + // Signal arrival. + handlerStarted <- struct{}{} - err = json.Unmarshal(b, &alerts) - require.NoError(t, err) + // Block to allow for alert-2 to be queued while this request is in-flight. + time.Sleep(100 * time.Millisecond) - alertsReceived.Add(int64(len(alerts))) + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBuffer(nil)), + }, nil + } - // Wait for the test to release us. - <-releaseReceiver + reg := prometheus.NewRegistry() + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: true, + Registerer: reg, + Do: fakeDo, + }, + model.UTF8Validation, + nil, + ) - w.WriteHeader(http.StatusOK) - })) - defer func() { - server.Close() - }() + m.alertmanagers = make(map[string]*alertmanagerSet) - reg := prometheus.NewRegistry() - m := NewManager( - &Options{ - QueueCapacity: 10, - DrainOnShutdown: true, - Registerer: reg, - }, - model.UTF8Validation, - nil, - ) + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, alertmanagerURL) - m.alertmanagers = make(map[string]*alertmanagerSet) + for _, ams := range m.alertmanagers { + ams.startSendLoops(ams.ams) + } - am1Cfg := config.DefaultAlertmanagerConfig - am1Cfg.Timeout = model.Duration(time.Second) - m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL) + go m.Run(nil) - for _, ams := range m.alertmanagers { - ams.startSendLoops(ams.ams) - } + // Queue two alerts. The first should be immediately sent to the receiver. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) - notificationManagerStopped := make(chan struct{}) + // Wait for receiver to get the first request. + <-handlerStarted - go func() { - defer close(notificationManagerStopped) - m.Run(nil) - }() + // Send second alert while first is still being processed (fakeDo has 100ms delay). + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) - // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. - m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + // Stop the notification manager. With DrainOnShutdown=true, this should wait + // for the queue to drain, ensuring both alerts are sent. + m.Stop() - select { - case <-receiverReceivedRequest: - // Nothing more to do. - case <-time.After(time.Second): - require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") - } + // Advance time so in-flight requests complete. + time.Sleep(time.Second) - m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + // Allow goroutines to finish. + synctest.Wait() - // Stop the notification manager and allow the receiver to proceed. - m.Stop() - close(releaseReceiver) - - // Wait for the notification manager to stop and confirm both notifications were sent. - select { - case <-notificationManagerStopped: - // Nothing more to do. - case <-time.After(200 * time.Millisecond): - require.FailNow(t, "gave up waiting for notification manager to stop") - } - - require.Equal(t, int64(2), alertsReceived.Load()) + // Confirm both notifications were sent. + require.Equal(t, int64(2), alertsReceived.Load()) + }) } // TestQueuesDrainingOnApplyConfig ensures that when an alertmanagerSet disappears after an ApplyConfig(), its