mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
notifier: fix flaky TestStop_DrainingEnabled and TestStop_DrainingDisabled race conditions (#17938)
Fix flaky TestStop_DrainingEnabled and TestStop_DrainingDisabled tests. The tests used real HTTP servers and real time, making them susceptible to race conditions and timing-dependent failures. The solution is to convert both tests to use synctest for deterministic fake time. --------- Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
00a7faa2e3
commit
020a0b30a0
1 changed files with 112 additions and 130 deletions
|
|
@ -831,171 +831,153 @@ func TestHangingNotifier(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStop_DrainingDisabled(t *testing.T) {
|
||||
releaseReceiver := make(chan struct{})
|
||||
receiverReceivedRequest := make(chan struct{}, 2)
|
||||
alertsReceived := atomic.NewInt64(0)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
const alertmanagerURL = "http://alertmanager:9093/api/v2/alerts"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Let the test know we've received a request.
|
||||
receiverReceivedRequest <- struct{}{}
|
||||
handlerStarted := make(chan struct{})
|
||||
alertsReceived := atomic.NewInt64(0)
|
||||
|
||||
var alerts []*Alert
|
||||
// Fake Do function that simulates a hanging alertmanager that times out.
|
||||
fakeDo := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
|
||||
var alerts []*Alert
|
||||
b, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read request body: %w", err)
|
||||
}
|
||||
if err := json.Unmarshal(b, &alerts); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal request body: %w", err)
|
||||
}
|
||||
alertsReceived.Add(int64(len(alerts)))
|
||||
|
||||
b, err := io.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
// Signal arrival, then block until context times out.
|
||||
handlerStarted <- struct{}{}
|
||||
<-ctx.Done()
|
||||
|
||||
err = json.Unmarshal(b, &alerts)
|
||||
require.NoError(t, err)
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
alertsReceived.Add(int64(len(alerts)))
|
||||
reg := prometheus.NewRegistry()
|
||||
m := NewManager(
|
||||
&Options{
|
||||
QueueCapacity: 10,
|
||||
DrainOnShutdown: false,
|
||||
Registerer: reg,
|
||||
Do: fakeDo,
|
||||
},
|
||||
model.UTF8Validation,
|
||||
nil,
|
||||
)
|
||||
|
||||
// Wait for the test to release us.
|
||||
<-releaseReceiver
|
||||
m.alertmanagers = make(map[string]*alertmanagerSet)
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer func() {
|
||||
server.Close()
|
||||
}()
|
||||
am1Cfg := config.DefaultAlertmanagerConfig
|
||||
am1Cfg.Timeout = model.Duration(time.Second)
|
||||
m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, alertmanagerURL)
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
m := NewManager(
|
||||
&Options{
|
||||
QueueCapacity: 10,
|
||||
DrainOnShutdown: false,
|
||||
Registerer: reg,
|
||||
},
|
||||
model.UTF8Validation,
|
||||
nil,
|
||||
)
|
||||
for _, ams := range m.alertmanagers {
|
||||
ams.startSendLoops(ams.ams)
|
||||
}
|
||||
|
||||
m.alertmanagers = make(map[string]*alertmanagerSet)
|
||||
// This will be waited on automatically when synctest.Test exits.
|
||||
go m.Run(nil)
|
||||
|
||||
am1Cfg := config.DefaultAlertmanagerConfig
|
||||
am1Cfg.Timeout = model.Duration(time.Second)
|
||||
m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL)
|
||||
// Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
|
||||
|
||||
for _, ams := range m.alertmanagers {
|
||||
ams.startSendLoops(ams.ams)
|
||||
}
|
||||
// Wait for receiver to get the request.
|
||||
<-handlerStarted
|
||||
|
||||
notificationManagerStopped := make(chan struct{})
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
|
||||
|
||||
go func() {
|
||||
defer close(notificationManagerStopped)
|
||||
m.Run(nil)
|
||||
}()
|
||||
// Stop the notification manager, then advance time to trigger the request timeout.
|
||||
m.Stop()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
|
||||
// Allow goroutines to finish.
|
||||
synctest.Wait()
|
||||
|
||||
select {
|
||||
case <-receiverReceivedRequest:
|
||||
// Nothing more to do.
|
||||
case <-time.After(time.Second):
|
||||
require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
|
||||
}
|
||||
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
|
||||
|
||||
// Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed.
|
||||
m.Stop()
|
||||
time.Sleep(time.Second)
|
||||
close(releaseReceiver)
|
||||
|
||||
// Wait for the notification manager to stop and confirm only the first notification was sent.
|
||||
// The second notification should be dropped.
|
||||
select {
|
||||
case <-notificationManagerStopped:
|
||||
// Nothing more to do.
|
||||
case <-time.After(time.Second):
|
||||
require.FailNow(t, "gave up waiting for notification manager to stop")
|
||||
}
|
||||
|
||||
require.Equal(t, int64(1), alertsReceived.Load())
|
||||
// Confirm only the first notification was sent. The second notification should be dropped.
|
||||
require.Equal(t, int64(1), alertsReceived.Load())
|
||||
})
|
||||
}
|
||||
|
||||
func TestStop_DrainingEnabled(t *testing.T) {
|
||||
releaseReceiver := make(chan struct{})
|
||||
receiverReceivedRequest := make(chan struct{}, 2)
|
||||
alertsReceived := atomic.NewInt64(0)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
const alertmanagerURL = "http://alertmanager:9093/api/v2/alerts"
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var alerts []*Alert
|
||||
handlerStarted := make(chan struct{}, 1)
|
||||
alertsReceived := atomic.NewInt64(0)
|
||||
|
||||
// Let the test know we've received a request.
|
||||
receiverReceivedRequest <- struct{}{}
|
||||
// Fake Do function that simulates alertmanager responding slowly but successfully.
|
||||
fakeDo := func(_ context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
|
||||
var alerts []*Alert
|
||||
b, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read request body: %w", err)
|
||||
}
|
||||
if err := json.Unmarshal(b, &alerts); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal request body: %w", err)
|
||||
}
|
||||
alertsReceived.Add(int64(len(alerts)))
|
||||
|
||||
b, err := io.ReadAll(r.Body)
|
||||
require.NoError(t, err)
|
||||
// Signal arrival.
|
||||
handlerStarted <- struct{}{}
|
||||
|
||||
err = json.Unmarshal(b, &alerts)
|
||||
require.NoError(t, err)
|
||||
// Block to allow for alert-2 to be queued while this request is in-flight.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
alertsReceived.Add(int64(len(alerts)))
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(bytes.NewBuffer(nil)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Wait for the test to release us.
|
||||
<-releaseReceiver
|
||||
reg := prometheus.NewRegistry()
|
||||
m := NewManager(
|
||||
&Options{
|
||||
QueueCapacity: 10,
|
||||
DrainOnShutdown: true,
|
||||
Registerer: reg,
|
||||
Do: fakeDo,
|
||||
},
|
||||
model.UTF8Validation,
|
||||
nil,
|
||||
)
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
defer func() {
|
||||
server.Close()
|
||||
}()
|
||||
m.alertmanagers = make(map[string]*alertmanagerSet)
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
m := NewManager(
|
||||
&Options{
|
||||
QueueCapacity: 10,
|
||||
DrainOnShutdown: true,
|
||||
Registerer: reg,
|
||||
},
|
||||
model.UTF8Validation,
|
||||
nil,
|
||||
)
|
||||
am1Cfg := config.DefaultAlertmanagerConfig
|
||||
am1Cfg.Timeout = model.Duration(time.Second)
|
||||
m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, alertmanagerURL)
|
||||
|
||||
m.alertmanagers = make(map[string]*alertmanagerSet)
|
||||
for _, ams := range m.alertmanagers {
|
||||
ams.startSendLoops(ams.ams)
|
||||
}
|
||||
|
||||
am1Cfg := config.DefaultAlertmanagerConfig
|
||||
am1Cfg.Timeout = model.Duration(time.Second)
|
||||
m.alertmanagers["1"] = newTestAlertmanagerSet(&am1Cfg, nil, m.opts, m.metrics, server.URL)
|
||||
go m.Run(nil)
|
||||
|
||||
for _, ams := range m.alertmanagers {
|
||||
ams.startSendLoops(ams.ams)
|
||||
}
|
||||
// Queue two alerts. The first should be immediately sent to the receiver.
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
|
||||
|
||||
notificationManagerStopped := make(chan struct{})
|
||||
// Wait for receiver to get the first request.
|
||||
<-handlerStarted
|
||||
|
||||
go func() {
|
||||
defer close(notificationManagerStopped)
|
||||
m.Run(nil)
|
||||
}()
|
||||
// Send second alert while first is still being processed (fakeDo has 100ms delay).
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
|
||||
|
||||
// Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
|
||||
// Stop the notification manager. With DrainOnShutdown=true, this should wait
|
||||
// for the queue to drain, ensuring both alerts are sent.
|
||||
m.Stop()
|
||||
|
||||
select {
|
||||
case <-receiverReceivedRequest:
|
||||
// Nothing more to do.
|
||||
case <-time.After(time.Second):
|
||||
require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
|
||||
}
|
||||
// Advance time so in-flight requests complete.
|
||||
time.Sleep(time.Second)
|
||||
|
||||
m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
|
||||
// Allow goroutines to finish.
|
||||
synctest.Wait()
|
||||
|
||||
// Stop the notification manager and allow the receiver to proceed.
|
||||
m.Stop()
|
||||
close(releaseReceiver)
|
||||
|
||||
// Wait for the notification manager to stop and confirm both notifications were sent.
|
||||
select {
|
||||
case <-notificationManagerStopped:
|
||||
// Nothing more to do.
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
require.FailNow(t, "gave up waiting for notification manager to stop")
|
||||
}
|
||||
|
||||
require.Equal(t, int64(2), alertsReceived.Load())
|
||||
// Confirm both notifications were sent.
|
||||
require.Equal(t, int64(2), alertsReceived.Load())
|
||||
})
|
||||
}
|
||||
|
||||
// TestQueuesDrainingOnApplyConfig ensures that when an alertmanagerSet disappears after an ApplyConfig(), its
|
||||
|
|
|
|||
Loading…
Reference in a new issue