mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
notifier: fix flaky TestHangingNotifier race condition (#17934)
* notifier: fix flaky TestHangingNotifier race condition Make deterministic through `synctest.Test()`. --------- Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
304dcdf695
commit
ade3f08eca
1 changed files with 118 additions and 123 deletions
|
|
@ -14,6 +14,7 @@
|
|||
package notifier
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
|
@ -23,6 +24,7 @@ import (
|
|||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -41,6 +43,7 @@ import (
|
|||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/model/relabel"
|
||||
"github.com/prometheus/prometheus/util/testutil/synctest"
|
||||
)
|
||||
|
||||
func alertsEqual(a, b []*Alert) error {
|
||||
|
|
@ -698,141 +701,133 @@ func makeInputTargetGroup() *targetgroup.Group {
|
|||
// queued alerts. This test reproduces the issue described in https://github.com/prometheus/prometheus/issues/13676.
|
||||
// and https://github.com/prometheus/prometheus/issues/8768.
|
||||
func TestHangingNotifier(t *testing.T) {
|
||||
const (
|
||||
batches = 100
|
||||
alertsCount = DefaultMaxBatchSize * batches
|
||||
)
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
const (
|
||||
batches = 100
|
||||
alertsCount = DefaultMaxBatchSize * batches
|
||||
|
||||
var (
|
||||
sendTimeout = 100 * time.Millisecond
|
||||
sdUpdatert = sendTimeout / 2
|
||||
faultyURL = "http://faulty:9093/api/v2/alerts"
|
||||
functionalURL = "http://functional:9093/api/v2/alerts"
|
||||
)
|
||||
|
||||
done = make(chan struct{})
|
||||
)
|
||||
var (
|
||||
sendTimeout = 100 * time.Millisecond
|
||||
sdUpdatert = sendTimeout / 2
|
||||
)
|
||||
|
||||
// Set up a faulty Alertmanager.
|
||||
var faultyCalled atomic.Bool
|
||||
faultyServer := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
|
||||
faultyCalled.Store(true)
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Hour):
|
||||
}
|
||||
}))
|
||||
defer func() {
|
||||
close(done)
|
||||
}()
|
||||
// Track which alertmanagers have been called.
|
||||
var faultyCalled, functionalCalled atomic.Bool
|
||||
|
||||
faultyURL, err := url.Parse(faultyServer.URL)
|
||||
require.NoError(t, err)
|
||||
faultyURL.Path = "/api/v2/alerts"
|
||||
|
||||
// Set up a functional Alertmanager.
|
||||
var functionalCalled atomic.Bool
|
||||
functionalServer := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
|
||||
functionalCalled.Store(true)
|
||||
}))
|
||||
defer functionalServer.Close()
|
||||
functionalURL, err := url.Parse(functionalServer.URL)
|
||||
require.NoError(t, err)
|
||||
functionalURL.Path = "/api/v2/alerts"
|
||||
|
||||
// Initialize the discovery manager
|
||||
// This is relevant as the updates aren't sent continually in real life, but only each updatert.
|
||||
// The old implementation of TestHangingNotifier didn't take that into account.
|
||||
ctx, cancelSdManager := context.WithCancel(t.Context())
|
||||
defer cancelSdManager()
|
||||
reg := prometheus.NewRegistry()
|
||||
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
||||
require.NoError(t, err)
|
||||
sdManager := discovery.NewManager(
|
||||
ctx,
|
||||
promslog.NewNopLogger(),
|
||||
reg,
|
||||
sdMetrics,
|
||||
discovery.Name("sd-manager"),
|
||||
discovery.Updatert(sdUpdatert),
|
||||
)
|
||||
go sdManager.Run()
|
||||
|
||||
// Set up the notifier with both faulty and functional Alertmanagers.
|
||||
notifier := NewManager(
|
||||
&Options{
|
||||
QueueCapacity: alertsCount,
|
||||
Registerer: reg,
|
||||
},
|
||||
model.UTF8Validation,
|
||||
nil,
|
||||
)
|
||||
notifier.alertmanagers = make(map[string]*alertmanagerSet)
|
||||
amCfg := config.DefaultAlertmanagerConfig
|
||||
amCfg.Timeout = model.Duration(sendTimeout)
|
||||
notifier.alertmanagers["config-0"] = newTestAlertmanagerSet(&amCfg, nil, notifier.opts, notifier.metrics, faultyURL.String(), functionalURL.String())
|
||||
|
||||
for _, ams := range notifier.alertmanagers {
|
||||
ams.startSendLoops(ams.ams)
|
||||
}
|
||||
|
||||
go notifier.Run(sdManager.SyncCh())
|
||||
defer notifier.Stop()
|
||||
|
||||
require.Len(t, notifier.Alertmanagers(), 2)
|
||||
|
||||
// Enqueue the alerts.
|
||||
var alerts []*Alert
|
||||
for i := range make([]struct{}, alertsCount) {
|
||||
alerts = append(alerts, &Alert{
|
||||
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
||||
})
|
||||
}
|
||||
notifier.Send(alerts...)
|
||||
|
||||
// Wait for the Alertmanagers to start receiving alerts.
|
||||
// 10*sdUpdatert is used as an arbitrary timeout here.
|
||||
timeout := time.After(10 * sdUpdatert)
|
||||
loop1:
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatalf("Timeout waiting for the alertmanagers to be reached for the first time.")
|
||||
default:
|
||||
if faultyCalled.Load() && functionalCalled.Load() {
|
||||
break loop1
|
||||
// Fake Do function that simulates alertmanager behavior in-process.
|
||||
// This runs within the synctest bubble, so time.Sleep uses fake time.
|
||||
fakeDo := func(ctx context.Context, _ *http.Client, req *http.Request) (*http.Response, error) {
|
||||
url := req.URL.String()
|
||||
if strings.Contains(url, "faulty") {
|
||||
faultyCalled.Store(true)
|
||||
// Faulty alertmanager hangs until context is canceled (by timeout).
|
||||
<-ctx.Done()
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
// Functional alertmanager responds successfully.
|
||||
// Sleep simulates network latency that real HTTP would have—without it,
|
||||
// the queue drains instantly and the final queueLen() assertion fails.
|
||||
functionalCalled.Store(true)
|
||||
time.Sleep(sendTimeout / 2)
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(bytes.NewBuffer(nil)),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Request to remove the faulty Alertmanager.
|
||||
c := map[string]discovery.Configs{
|
||||
"config-0": {
|
||||
discovery.StaticConfig{
|
||||
&targetgroup.Group{
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
model.AddressLabel: model.LabelValue(functionalURL.Host),
|
||||
// Initialize the discovery manager
|
||||
// This is relevant as the updates aren't sent continually in real life, but only each updatert.
|
||||
// The old implementation of TestHangingNotifier didn't take that into account.
|
||||
ctx, cancelSdManager := context.WithCancel(t.Context())
|
||||
defer cancelSdManager()
|
||||
reg := prometheus.NewRegistry()
|
||||
sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg))
|
||||
require.NoError(t, err)
|
||||
sdManager := discovery.NewManager(
|
||||
ctx,
|
||||
promslog.NewNopLogger(),
|
||||
reg,
|
||||
sdMetrics,
|
||||
discovery.Name("sd-manager"),
|
||||
discovery.Updatert(sdUpdatert),
|
||||
)
|
||||
go sdManager.Run()
|
||||
|
||||
// Set up the notifier with both faulty and functional Alertmanagers.
|
||||
notifier := NewManager(
|
||||
&Options{
|
||||
QueueCapacity: alertsCount,
|
||||
Registerer: reg,
|
||||
Do: fakeDo,
|
||||
},
|
||||
model.UTF8Validation,
|
||||
nil,
|
||||
)
|
||||
|
||||
notifier.alertmanagers = make(map[string]*alertmanagerSet)
|
||||
amCfg := config.DefaultAlertmanagerConfig
|
||||
amCfg.Timeout = model.Duration(sendTimeout)
|
||||
notifier.alertmanagers["config-0"] = newTestAlertmanagerSet(&amCfg, nil, notifier.opts, notifier.metrics, faultyURL, functionalURL)
|
||||
|
||||
for _, ams := range notifier.alertmanagers {
|
||||
ams.startSendLoops(ams.ams)
|
||||
}
|
||||
|
||||
go notifier.Run(sdManager.SyncCh())
|
||||
t.Cleanup(func() {
|
||||
notifier.Stop()
|
||||
// Advance time so in-flight request timeouts fire.
|
||||
time.Sleep(sendTimeout * 2)
|
||||
})
|
||||
|
||||
require.Len(t, notifier.Alertmanagers(), 2)
|
||||
|
||||
// Enqueue the alerts.
|
||||
var alerts []*Alert
|
||||
for i := range make([]struct{}, alertsCount) {
|
||||
alerts = append(alerts, &Alert{
|
||||
Labels: labels.FromStrings("alertname", strconv.Itoa(i)),
|
||||
})
|
||||
}
|
||||
notifier.Send(alerts...)
|
||||
|
||||
// Wait for the Alertmanagers to start receiving alerts.
|
||||
// Use a polling loop since we need to wait for goroutines to process.
|
||||
for !faultyCalled.Load() || !functionalCalled.Load() {
|
||||
time.Sleep(sdUpdatert)
|
||||
synctest.Wait()
|
||||
}
|
||||
|
||||
// Request to remove the faulty Alertmanager.
|
||||
c := map[string]discovery.Configs{
|
||||
"config-0": {
|
||||
discovery.StaticConfig{
|
||||
&targetgroup.Group{
|
||||
Targets: []model.LabelSet{
|
||||
{
|
||||
model.AddressLabel: "functional:9093",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(t, sdManager.ApplyConfig(c))
|
||||
|
||||
timeout = time.After(batches * sendTimeout)
|
||||
loop2:
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatalf("Timeout, the faulty alertmanager not removed on time.")
|
||||
default:
|
||||
// The faulty alertmanager was dropped.
|
||||
if len(notifier.Alertmanagers()) == 1 {
|
||||
// The notifier should not wait until the alerts queue of the functional am is empty to apply the discovery changes.
|
||||
require.NotZero(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL.String()].queueLen())
|
||||
break loop2
|
||||
}
|
||||
}
|
||||
}
|
||||
require.NoError(t, sdManager.ApplyConfig(c))
|
||||
|
||||
// Wait for the discovery update to be processed.
|
||||
// Advance time to trigger the discovery manager's update interval.
|
||||
// The faulty alertmanager should be dropped without waiting for its queue to drain.
|
||||
for len(notifier.Alertmanagers()) != 1 {
|
||||
time.Sleep(sdUpdatert)
|
||||
synctest.Wait()
|
||||
}
|
||||
// The notifier should not wait until the alerts queue of the functional am is empty to apply the discovery changes.
|
||||
require.NotZero(t, notifier.alertmanagers["config-0"].sendLoops[functionalURL].queueLen())
|
||||
})
|
||||
}
|
||||
|
||||
func TestStop_DrainingDisabled(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue