prometheus/notifier/sendloop.go
Siavash Safi a89c665f47
feat(notifier): independent alertmanager sendloops (#16355)
* notifier: unit test for dropping throughput on stuck AM

Ref: https://github.com/prometheus/prometheus/issues/7676

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>

* chore(notifier): remove year from copyrights

Signed-off-by: Siavash Safi <siavash@cloudflare.com>

* feat(notifier): independent alertmanager sendloops

Independent Alertmanager sendloops avoid issues with queue overflowing
when one or more Alertmanager instances are unavailable which could
result in lost alert notifications.
The sendloops are managed per AlertmanagerSet which are dynamically
added/removed with service discovery or configuration reload.

The following metrics now include an extra dimention for alertmanager label:
- prometheus_notifications_dropped_total
- prometheus_notifications_queue_capacity
- prometheus_notifications_queue_length

This change also includes the test from #14099

Closes #7676

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>

---------

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Signed-off-by: Siavash Safi <siavash@cloudflare.com>
Signed-off-by: machine424 <ayoubmrini424@gmail.com>
Co-authored-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
2026-01-20 10:33:07 +01:00

273 lines
6.9 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"
"github.com/prometheus/prometheus/config"
)
type sendLoop struct {
alertmanagerURL string
cfg *config.AlertmanagerConfig
client *http.Client
opts *Options
metrics *alertMetrics
mtx sync.RWMutex
queue []*Alert
hasWork chan struct{}
stopped chan struct{}
stopOnce sync.Once
logger *slog.Logger
}
func newSendLoop(
alertmanagerURL string,
client *http.Client,
cfg *config.AlertmanagerConfig,
opts *Options,
logger *slog.Logger,
metrics *alertMetrics,
) *sendLoop {
// This will initialize the Counters for the AM to 0 and set the static queue capacity gauge.
metrics.dropped.WithLabelValues(alertmanagerURL)
metrics.errors.WithLabelValues(alertmanagerURL)
metrics.sent.WithLabelValues(alertmanagerURL)
metrics.queueLength.WithLabelValues(alertmanagerURL)
return &sendLoop{
alertmanagerURL: alertmanagerURL,
client: client,
cfg: cfg,
opts: opts,
logger: logger,
metrics: metrics,
queue: make([]*Alert, 0, opts.QueueCapacity),
hasWork: make(chan struct{}, 1),
stopped: make(chan struct{}),
}
}
func (s *sendLoop) add(alerts ...*Alert) {
select {
case <-s.stopped:
return
default:
}
s.mtx.Lock()
defer s.mtx.Unlock()
var dropped int
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - s.opts.QueueCapacity; d > 0 {
s.logger.Warn("Alert batch larger than queue capacity, dropping alerts", "count", d)
dropped += d
alerts = alerts[d:]
}
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(s.queue) + len(alerts)) - s.opts.QueueCapacity; d > 0 {
s.logger.Warn("Alert notification queue full, dropping alerts", "count", d)
dropped += d
s.queue = s.queue[d:]
}
s.queue = append(s.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
s.notifyWork()
s.metrics.queueLength.WithLabelValues(s.alertmanagerURL).Set(float64(len(s.queue)))
if dropped > 0 {
s.metrics.dropped.WithLabelValues(s.alertmanagerURL).Add(float64(dropped))
}
}
func (s *sendLoop) notifyWork() {
select {
case <-s.stopped:
return
case s.hasWork <- struct{}{}:
default:
}
}
func (s *sendLoop) stop() {
s.stopOnce.Do(func() {
s.logger.Debug("Stopping send loop")
close(s.stopped)
if s.opts.DrainOnShutdown {
s.drainQueue()
} else {
ql := s.queueLen()
s.logger.Warn("Alert notification queue not drained on shutdown, dropping alerts", "count", ql)
s.metrics.dropped.WithLabelValues(s.alertmanagerURL).Add(float64(ql))
}
s.metrics.latencySummary.DeleteLabelValues(s.alertmanagerURL)
s.metrics.latencyHistogram.DeleteLabelValues(s.alertmanagerURL)
s.metrics.sent.DeleteLabelValues(s.alertmanagerURL)
s.metrics.dropped.DeleteLabelValues(s.alertmanagerURL)
s.metrics.errors.DeleteLabelValues(s.alertmanagerURL)
s.metrics.queueLength.DeleteLabelValues(s.alertmanagerURL)
})
}
func (s *sendLoop) drainQueue() {
for s.queueLen() > 0 {
s.sendOneBatch()
}
}
func (s *sendLoop) queueLen() int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.queue)
}
func (s *sendLoop) nextBatch() []*Alert {
s.mtx.Lock()
defer s.mtx.Unlock()
var alerts []*Alert
if maxBatchSize := s.opts.MaxBatchSize; len(s.queue) > maxBatchSize {
alerts = append(make([]*Alert, 0, maxBatchSize), s.queue[:maxBatchSize]...)
s.queue = s.queue[maxBatchSize:]
} else {
alerts = append(make([]*Alert, 0, len(s.queue)), s.queue...)
s.queue = s.queue[:0]
}
s.metrics.queueLength.WithLabelValues(s.alertmanagerURL).Set(float64(len(s.queue)))
return alerts
}
func (s *sendLoop) sendOneBatch() {
alerts := s.nextBatch()
if !s.sendAll(alerts) {
s.metrics.dropped.WithLabelValues(s.alertmanagerURL).Add(float64(len(alerts)))
}
}
// loop continuously consumes the notifications queue and sends alerts to
// the Alertmanager.
func (s *sendLoop) loop() {
s.logger.Debug("Starting send loop")
for {
// If we've been asked to stop, that takes priority over sending any further notifications.
select {
case <-s.stopped:
return
default:
select {
case <-s.stopped:
return
case <-s.hasWork:
s.sendOneBatch()
// If the queue still has items left, kick off the next iteration.
if s.queueLen() > 0 {
s.notifyWork()
}
}
}
}
}
func (s *sendLoop) sendAll(alerts []*Alert) bool {
if len(alerts) == 0 {
return true
}
begin := time.Now()
var payload []byte
var err error
switch s.cfg.APIVersion {
case config.AlertmanagerAPIVersionV2:
openAPIAlerts := alertsToOpenAPIAlerts(alerts)
payload, err = json.Marshal(openAPIAlerts)
if err != nil {
s.logger.Error("Encoding alerts for Alertmanager API v2 failed", "err", err)
return false
}
default:
s.logger.Error(
fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", s.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions),
"err", err,
)
return false
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.cfg.Timeout))
defer cancel()
if err := s.sendOne(ctx, s.client, s.alertmanagerURL, payload); err != nil {
s.logger.Error("Error sending alerts", "count", len(alerts), "err", err)
s.metrics.errors.WithLabelValues(s.alertmanagerURL).Add(float64(len(alerts)))
return false
}
durationSeconds := time.Since(begin).Seconds()
s.metrics.latencySummary.WithLabelValues(s.alertmanagerURL).Observe(durationSeconds)
s.metrics.latencyHistogram.WithLabelValues(s.alertmanagerURL).Observe(durationSeconds)
s.metrics.sent.WithLabelValues(s.alertmanagerURL).Add(float64(len(alerts)))
return true
}
func (s *sendLoop) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("User-Agent", userAgent)
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := s.opts.Do(ctx, c, req)
if err != nil {
return err
}
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %s", resp.Status)
}
return nil
}