mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (push) Waiting to run
CI / Build Prometheus for all architectures (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run
These bugs were discovered accidentally with code analysis: - https://app.devin.ai/review/prometheus/prometheus/pull/16355 Upon further inspection and performing more analysis, 3 potential bugs were found: 1. sendloops could continue running if corresponding AM changed position in the config 2. multiple configs with the same hash would share sendloops resulting in sets without sendloops 3. sendloops could continue running if the config hash was changed - `TestApplyConfigSendLoopsNotStoppedOnKeyChange`: Verifies sendLoops work when keys swap (no fix needed) - `TestApplyConfigDuplicateHashSharesSendLoops`: Verifies sendLoops are independent with duplicate hashes (bug fixed) - `TestApplyConfigHashChangeLeaksSendLoops`: Verifies sendLoops are cleaned up when hash changes (bug fixed) Signed-off-by: Siavash Safi <siavash@cloudflare.com>
326 lines
8.6 KiB
Go
326 lines
8.6 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package notifier
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"net/url"
|
|
"sync"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"github.com/prometheus/common/version"
|
|
|
|
"github.com/prometheus/prometheus/config"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"github.com/prometheus/prometheus/model/relabel"
|
|
)
|
|
|
|
const (
|
|
// DefaultMaxBatchSize is the default maximum number of alerts to send in a single request to the alertmanager.
|
|
DefaultMaxBatchSize = 256
|
|
|
|
contentTypeJSON = "application/json"
|
|
)
|
|
|
|
// String constants for instrumentation.
|
|
const (
|
|
namespace = "prometheus"
|
|
subsystem = "notifications"
|
|
alertmanagerLabel = "alertmanager"
|
|
)
|
|
|
|
var userAgent = version.PrometheusUserAgent()
|
|
|
|
// Manager is responsible for dispatching alert notifications to an
|
|
// alert manager service.
|
|
type Manager struct {
|
|
opts *Options
|
|
|
|
metrics *alertMetrics
|
|
|
|
mtx sync.RWMutex
|
|
|
|
stopOnce *sync.Once
|
|
stopRequested chan struct{}
|
|
|
|
alertmanagers map[string]*alertmanagerSet
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// Options are the configurable parameters of a Handler.
|
|
type Options struct {
|
|
QueueCapacity int
|
|
DrainOnShutdown bool
|
|
ExternalLabels labels.Labels
|
|
RelabelConfigs []*relabel.Config
|
|
// Used for sending HTTP requests to the Alertmanager.
|
|
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
|
|
|
|
Registerer prometheus.Registerer
|
|
|
|
// MaxBatchSize determines the maximum number of alerts to send in a single request to the alertmanager.
|
|
MaxBatchSize int
|
|
}
|
|
|
|
func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
|
|
if client == nil {
|
|
client = http.DefaultClient
|
|
}
|
|
return client.Do(req.WithContext(ctx))
|
|
}
|
|
|
|
// NewManager is the manager constructor.
|
|
func NewManager(o *Options, nameValidationScheme model.ValidationScheme, logger *slog.Logger) *Manager {
|
|
if o.Do == nil {
|
|
o.Do = do
|
|
}
|
|
// Set default MaxBatchSize if not provided.
|
|
if o.MaxBatchSize <= 0 {
|
|
o.MaxBatchSize = DefaultMaxBatchSize
|
|
}
|
|
if logger == nil {
|
|
logger = promslog.NewNopLogger()
|
|
}
|
|
|
|
for _, rc := range o.RelabelConfigs {
|
|
switch rc.NameValidationScheme {
|
|
case model.LegacyValidation, model.UTF8Validation:
|
|
default:
|
|
rc.NameValidationScheme = nameValidationScheme
|
|
}
|
|
}
|
|
|
|
n := &Manager{
|
|
stopRequested: make(chan struct{}),
|
|
stopOnce: &sync.Once{},
|
|
opts: o,
|
|
logger: logger,
|
|
}
|
|
|
|
alertmanagersDiscoveredFunc := func() float64 { return float64(len(n.Alertmanagers())) }
|
|
|
|
n.metrics = newAlertMetrics(o.Registerer, alertmanagersDiscoveredFunc)
|
|
n.metrics.queueCapacity.Set(float64(o.QueueCapacity))
|
|
|
|
return n
|
|
}
|
|
|
|
// ApplyConfig updates the status state as the new config requires.
|
|
func (n *Manager) ApplyConfig(conf *config.Config) error {
|
|
n.mtx.Lock()
|
|
defer n.mtx.Unlock()
|
|
|
|
n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels
|
|
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
|
|
for i, rc := range n.opts.RelabelConfigs {
|
|
switch rc.NameValidationScheme {
|
|
case model.LegacyValidation, model.UTF8Validation:
|
|
default:
|
|
n.opts.RelabelConfigs[i].NameValidationScheme = conf.GlobalConfig.MetricNameValidationScheme
|
|
}
|
|
}
|
|
|
|
amSets := make(map[string]*alertmanagerSet)
|
|
// configToAlertmanagers maps alertmanager sets for each unique AlertmanagerConfig,
|
|
// helping to avoid dropping known alertmanagers and re-use them without waiting for SD updates when applying the config.
|
|
configToAlertmanagers := make(map[string]*alertmanagerSet, len(n.alertmanagers))
|
|
for _, oldAmSet := range n.alertmanagers {
|
|
hash, err := oldAmSet.configHash()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
configToAlertmanagers[hash] = oldAmSet
|
|
}
|
|
|
|
for k, cfg := range conf.AlertingConfig.AlertmanagerConfigs.ToMap() {
|
|
ams, err := newAlertmanagerSet(cfg, n.opts, n.logger, n.metrics)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hash, err := ams.configHash()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if oldAmSet, ok := configToAlertmanagers[hash]; ok {
|
|
ams.ams = oldAmSet.ams
|
|
ams.droppedAms = oldAmSet.droppedAms
|
|
// Only transfer sendLoops to the first new config with this hash.
|
|
// Subsequent configs with the same hash should not share the sendLoops
|
|
// map reference, as that would cause shared mutable state between
|
|
// alertmanagerSets (cleanup in one would affect the other).
|
|
oldAmSet.mtx.Lock()
|
|
if oldAmSet.sendLoops != nil {
|
|
ams.mtx.Lock()
|
|
ams.sendLoops = oldAmSet.sendLoops
|
|
oldAmSet.sendLoops = nil
|
|
ams.mtx.Unlock()
|
|
}
|
|
oldAmSet.mtx.Unlock()
|
|
}
|
|
|
|
amSets[k] = ams
|
|
}
|
|
|
|
// Clean up sendLoops that weren't transferred to new config.
|
|
// This happens when: (1) key was removed, or (2) key exists but hash changed.
|
|
// After the transfer loop above, any oldAmSet with non-nil sendLoops
|
|
// had its sendLoops NOT transferred (since we set it to nil on transfer).
|
|
for _, oldAmSet := range n.alertmanagers {
|
|
oldAmSet.mtx.Lock()
|
|
if oldAmSet.sendLoops != nil {
|
|
oldAmSet.cleanSendLoops(oldAmSet.ams...)
|
|
}
|
|
oldAmSet.mtx.Unlock()
|
|
}
|
|
|
|
n.alertmanagers = amSets
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run dispatches notifications continuously, returning once Stop has been called and all
|
|
// pending notifications have been drained from the queue (if draining is enabled).
|
|
//
|
|
// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
|
|
// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
|
|
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
|
|
n.targetUpdateLoop(tsets)
|
|
|
|
n.mtx.Lock()
|
|
defer n.mtx.Unlock()
|
|
for _, ams := range n.alertmanagers {
|
|
ams.mtx.Lock()
|
|
ams.cleanSendLoops(ams.ams...)
|
|
ams.mtx.Unlock()
|
|
}
|
|
}
|
|
|
|
// targetUpdateLoop receives updates of target groups and triggers a reload.
|
|
func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) {
|
|
for {
|
|
// If we've been asked to stop, that takes priority over processing any further target group updates.
|
|
select {
|
|
case <-n.stopRequested:
|
|
return
|
|
default:
|
|
select {
|
|
case <-n.stopRequested:
|
|
return
|
|
case ts, ok := <-tsets:
|
|
if !ok {
|
|
break
|
|
}
|
|
n.reload(ts)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
|
|
n.mtx.Lock()
|
|
defer n.mtx.Unlock()
|
|
|
|
for id, tgroup := range tgs {
|
|
am, ok := n.alertmanagers[id]
|
|
if !ok {
|
|
n.logger.Error("couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id))
|
|
continue
|
|
}
|
|
am.sync(tgroup)
|
|
}
|
|
}
|
|
|
|
// Send queues the given notification requests for processing.
|
|
// Panics if called on a handler that is not running.
|
|
func (n *Manager) Send(alerts ...*Alert) {
|
|
// If we've been asked to stop, that takes priority over accepting new alerts.
|
|
select {
|
|
case <-n.stopRequested:
|
|
return
|
|
default:
|
|
}
|
|
|
|
n.mtx.RLock()
|
|
defer n.mtx.RUnlock()
|
|
|
|
alerts = relabelAlerts(n.opts.RelabelConfigs, n.opts.ExternalLabels, alerts)
|
|
if len(alerts) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, ams := range n.alertmanagers {
|
|
ams.send(alerts...)
|
|
}
|
|
}
|
|
|
|
// Alertmanagers returns a slice of Alertmanager URLs.
|
|
func (n *Manager) Alertmanagers() []*url.URL {
|
|
n.mtx.RLock()
|
|
amSets := n.alertmanagers
|
|
n.mtx.RUnlock()
|
|
|
|
var res []*url.URL
|
|
|
|
for _, ams := range amSets {
|
|
ams.mtx.RLock()
|
|
for _, am := range ams.ams {
|
|
res = append(res, am.url())
|
|
}
|
|
ams.mtx.RUnlock()
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
// DroppedAlertmanagers returns a slice of Alertmanager URLs.
|
|
func (n *Manager) DroppedAlertmanagers() []*url.URL {
|
|
n.mtx.RLock()
|
|
amSets := n.alertmanagers
|
|
n.mtx.RUnlock()
|
|
|
|
var res []*url.URL
|
|
|
|
for _, ams := range amSets {
|
|
ams.mtx.RLock()
|
|
for _, dam := range ams.droppedAms {
|
|
res = append(res, dam.url())
|
|
}
|
|
ams.mtx.RUnlock()
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
// Stop signals the notification manager to shut down and immediately returns.
|
|
//
|
|
// Run will return once the notification manager has successfully shut down.
|
|
//
|
|
// The manager will optionally drain send loops before shutting down.
|
|
//
|
|
// Stop is safe to call multiple times.
|
|
func (n *Manager) Stop() {
|
|
n.logger.Info("Stopping notification manager...")
|
|
|
|
n.stopOnce.Do(func() {
|
|
close(n.stopRequested)
|
|
})
|
|
}
|