mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-03 20:39:32 -05:00
Merge a65d075691 into 7769495a4a
This commit is contained in:
commit
4f2e5222d7
9 changed files with 1678 additions and 13 deletions
1
cmd/prometheus/testdata/features.json
vendored
1
cmd/prometheus/testdata/features.json
vendored
|
|
@ -196,6 +196,7 @@
|
|||
"lightsail": true,
|
||||
"linode": true,
|
||||
"marathon": true,
|
||||
"msk": true,
|
||||
"nerve": true,
|
||||
"nomad": true,
|
||||
"openstack": true,
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ const (
|
|||
RoleEC2 Role = "ec2"
|
||||
RoleECS Role = "ecs"
|
||||
RoleLightsail Role = "lightsail"
|
||||
RoleMSK Role = "msk"
|
||||
)
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface.
|
||||
|
|
@ -51,7 +52,7 @@ func (c *Role) UnmarshalYAML(unmarshal func(any) error) error {
|
|||
return err
|
||||
}
|
||||
switch *c {
|
||||
case RoleEC2, RoleECS, RoleLightsail:
|
||||
case RoleEC2, RoleECS, RoleLightsail, RoleMSK:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("unknown AWS SD role %q", *c)
|
||||
|
|
@ -78,13 +79,14 @@ type SDConfig struct {
|
|||
// ec2 specific
|
||||
Filters []*EC2Filter `yaml:"filters,omitempty"`
|
||||
|
||||
// ecs specific
|
||||
// ecs, msk specific
|
||||
Clusters []string `yaml:"clusters,omitempty"`
|
||||
|
||||
// Embedded sub-configs (internal use only, not serialized)
|
||||
*EC2SDConfig `yaml:"-"`
|
||||
*ECSSDConfig `yaml:"-"`
|
||||
*LightsailSDConfig `yaml:"-"`
|
||||
*MSKSDConfig `yaml:"-"`
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface for SDConfig.
|
||||
|
|
@ -195,6 +197,39 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(any) error) error {
|
|||
if c.RefreshInterval != 0 {
|
||||
c.LightsailSDConfig.RefreshInterval = c.RefreshInterval
|
||||
}
|
||||
case RoleMSK:
|
||||
if c.MSKSDConfig == nil {
|
||||
mskConfig := DefaultMSKSDConfig
|
||||
c.MSKSDConfig = &mskConfig
|
||||
}
|
||||
c.MSKSDConfig.HTTPClientConfig = c.HTTPClientConfig
|
||||
if c.Region != "" {
|
||||
c.MSKSDConfig.Region = c.Region
|
||||
}
|
||||
if c.Endpoint != "" {
|
||||
c.MSKSDConfig.Endpoint = c.Endpoint
|
||||
}
|
||||
if c.AccessKey != "" {
|
||||
c.MSKSDConfig.AccessKey = c.AccessKey
|
||||
}
|
||||
if c.SecretKey != "" {
|
||||
c.MSKSDConfig.SecretKey = c.SecretKey
|
||||
}
|
||||
if c.Profile != "" {
|
||||
c.MSKSDConfig.Profile = c.Profile
|
||||
}
|
||||
if c.RoleARN != "" {
|
||||
c.MSKSDConfig.RoleARN = c.RoleARN
|
||||
}
|
||||
if c.Port != 0 {
|
||||
c.MSKSDConfig.Port = c.Port
|
||||
}
|
||||
if c.RefreshInterval != 0 {
|
||||
c.MSKSDConfig.RefreshInterval = c.RefreshInterval
|
||||
}
|
||||
if c.Clusters != nil {
|
||||
c.MSKSDConfig.Clusters = c.Clusters
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown AWS SD role %q", c.Role)
|
||||
}
|
||||
|
|
@ -226,6 +261,9 @@ func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Di
|
|||
case RoleLightsail:
|
||||
opts.Metrics = &lightsailMetrics{refreshMetrics: awsMetrics.refreshMetrics}
|
||||
return NewLightsailDiscovery(c.LightsailSDConfig, opts)
|
||||
case RoleMSK:
|
||||
opts.Metrics = &mskMetrics{refreshMetrics: awsMetrics.refreshMetrics}
|
||||
return NewMSKDiscovery(c.MSKSDConfig, opts)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown AWS SD role %q", c.Role)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -272,6 +272,32 @@ func TestMultipleSDConfigsDoNotShareState(t *testing.T) {
|
|||
"LightsailSDConfig objects should not share the same memory address")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "MSKMultipleJobsDifferentPorts",
|
||||
yaml: `
|
||||
- role: msk
|
||||
region: ap-south-1
|
||||
port: 6060
|
||||
clusters: ["cluster-1"]
|
||||
- role: msk
|
||||
region: ap-south-1
|
||||
port: 6061
|
||||
clusters: ["cluster-2"]`,
|
||||
validateFunc: func(t *testing.T, cfg1, cfg2 *SDConfig) {
|
||||
require.Equal(t, RoleMSK, cfg1.Role)
|
||||
require.Equal(t, RoleMSK, cfg2.Role)
|
||||
require.NotNil(t, cfg1.MSKSDConfig)
|
||||
require.NotNil(t, cfg2.MSKSDConfig)
|
||||
|
||||
require.Equal(t, 6060, cfg1.MSKSDConfig.Port)
|
||||
require.Equal(t, []string{"cluster-1"}, cfg1.MSKSDConfig.Clusters)
|
||||
require.Equal(t, 6061, cfg2.MSKSDConfig.Port)
|
||||
require.Equal(t, []string{"cluster-2"}, cfg2.MSKSDConfig.Clusters)
|
||||
|
||||
require.NotSame(t, cfg1.MSKSDConfig, cfg2.MSKSDConfig,
|
||||
"MSKSDConfig objects should not share the same memory address")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
|
|||
32
discovery/aws/metrics_msk.go
Normal file
32
discovery/aws/metrics_msk.go
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
// Copyright 2015 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aws
|
||||
|
||||
import (
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
)
|
||||
|
||||
type mskMetrics struct {
|
||||
refreshMetrics discovery.RefreshMetricsInstantiator
|
||||
}
|
||||
|
||||
var _ discovery.DiscovererMetrics = (*mskMetrics)(nil)
|
||||
|
||||
// Register implements discovery.DiscovererMetrics.
|
||||
func (*mskMetrics) Register() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Unregister implements discovery.DiscovererMetrics.
|
||||
func (*mskMetrics) Unregister() {}
|
||||
463
discovery/aws/msk.go
Normal file
463
discovery/aws/msk.go
Normal file
|
|
@ -0,0 +1,463 @@
|
|||
// Copyright The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
|
||||
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kafka"
|
||||
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
|
||||
"github.com/aws/aws-sdk-go-v2/service/sts"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/config"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promslog"
|
||||
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
"github.com/prometheus/prometheus/discovery/refresh"
|
||||
"github.com/prometheus/prometheus/discovery/targetgroup"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
||||
type NodeType string
|
||||
|
||||
const (
|
||||
NodeTypeBroker NodeType = "BROKER"
|
||||
NodeTypeController NodeType = "CONTROLLER"
|
||||
)
|
||||
|
||||
const (
|
||||
mskLabel = model.MetaLabelPrefix + "msk_"
|
||||
|
||||
// Cluster labels.
|
||||
mskLabelCluster = mskLabel + "cluster_"
|
||||
mskLabelClusterName = mskLabelCluster + "name"
|
||||
mskLabelClusterARN = mskLabelCluster + "arn"
|
||||
mskLabelClusterState = mskLabelCluster + "state"
|
||||
mskLabelClusterType = mskLabelCluster + "type"
|
||||
mskLabelClusterVersion = mskLabelCluster + "version"
|
||||
mskLabelClusterJmxExporterEnabled = mskLabelCluster + "jmx_exporter_enabled"
|
||||
mskLabelClusterConfigurationARN = mskLabelCluster + "configuration_arn"
|
||||
mskLabelClusterConfigurationRevision = mskLabelCluster + "configuration_revision"
|
||||
mskLabelClusterKafkaVersion = mskLabelCluster + "kafka_version"
|
||||
mskLabelClusterTags = mskLabelCluster + "tag_"
|
||||
|
||||
// Node labels.
|
||||
mskLabelNode = mskLabel + "node_"
|
||||
mskLabelNodeType = mskLabelNode + "type"
|
||||
mskLabelNodeARN = mskLabelNode + "arn"
|
||||
mskLabelNodeAddedTime = mskLabelNode + "added_time"
|
||||
mskLabelNodeInstanceType = mskLabelNode + "instance_type"
|
||||
mskLabelNodeAttachedENI = mskLabelNode + "attached_eni"
|
||||
|
||||
// Broker labels.
|
||||
mskLabelBroker = mskLabel + "broker_"
|
||||
mskLabelBrokerEndpointIndex = mskLabelBroker + "endpoint_index"
|
||||
mskLabelBrokerID = mskLabelBroker + "id"
|
||||
mskLabelBrokerClientSubnet = mskLabelBroker + "client_subnet"
|
||||
mskLabelBrokerClientVPCIP = mskLabelBroker + "client_vpc_ip"
|
||||
mskLabelBrokerNodeExporterEnabled = mskLabelBroker + "node_exporter_enabled"
|
||||
|
||||
// Controller labels.
|
||||
mskLabelController = mskLabel + "controller_"
|
||||
mskLabelControllerEndpointIndex = mskLabelController + "endpoint_index"
|
||||
)
|
||||
|
||||
// DefaultMSKSDConfig is the default MSK SD configuration.
|
||||
var DefaultMSKSDConfig = MSKSDConfig{
|
||||
Port: 80,
|
||||
RefreshInterval: model.Duration(60 * time.Second),
|
||||
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
||||
}
|
||||
|
||||
func init() {
|
||||
discovery.RegisterConfig(&MSKSDConfig{})
|
||||
}
|
||||
|
||||
// MSKSDConfig is the configuration for MSK based service discovery.
|
||||
type MSKSDConfig struct {
|
||||
Region string `yaml:"region"`
|
||||
Endpoint string `yaml:"endpoint"`
|
||||
AccessKey string `yaml:"access_key,omitempty"`
|
||||
SecretKey config.Secret `yaml:"secret_key,omitempty"`
|
||||
Profile string `yaml:"profile,omitempty"`
|
||||
RoleARN string `yaml:"role_arn,omitempty"`
|
||||
Clusters []string `yaml:"clusters,omitempty"`
|
||||
Port int `yaml:"port"`
|
||||
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
|
||||
|
||||
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
|
||||
}
|
||||
|
||||
// NewDiscovererMetrics implements discovery.Config.
|
||||
func (*MSKSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
||||
return &mskMetrics{
|
||||
refreshMetrics: rmi,
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the name of the MSK Config.
|
||||
func (*MSKSDConfig) Name() string { return "msk" }
|
||||
|
||||
// NewDiscoverer returns a Discoverer for the MSK Config.
|
||||
func (c *MSKSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
|
||||
return NewMSKDiscovery(c, opts)
|
||||
}
|
||||
|
||||
// UnmarshalYAML implements the yaml.Unmarshaler interface for the MSK Config.
|
||||
func (c *MSKSDConfig) UnmarshalYAML(unmarshal func(any) error) error {
|
||||
*c = DefaultMSKSDConfig
|
||||
type plain MSKSDConfig
|
||||
err := unmarshal((*plain)(c))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.Region == "" {
|
||||
cfg, err := awsConfig.LoadDefaultConfig(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cfg.Region != "" {
|
||||
// If the region is already set in the config, use it (env vars).
|
||||
c.Region = cfg.Region
|
||||
}
|
||||
|
||||
if c.Region == "" {
|
||||
// Try to get the region from IMDS.
|
||||
imdsClient := imds.NewFromConfig(cfg)
|
||||
region, err := imdsClient.GetRegion(context.Background(), &imds.GetRegionInput{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.Region = region.Region
|
||||
}
|
||||
}
|
||||
|
||||
if c.Region == "" {
|
||||
return errors.New("MSK SD configuration requires a region")
|
||||
}
|
||||
|
||||
return c.HTTPClientConfig.Validate()
|
||||
}
|
||||
|
||||
type mskClient interface {
|
||||
DescribeClusterV2(context.Context, *kafka.DescribeClusterV2Input, ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error)
|
||||
ListClustersV2(context.Context, *kafka.ListClustersV2Input, ...func(*kafka.Options)) (*kafka.ListClustersV2Output, error)
|
||||
ListNodes(context.Context, *kafka.ListNodesInput, ...func(*kafka.Options)) (*kafka.ListNodesOutput, error)
|
||||
}
|
||||
|
||||
// MSKDiscovery periodically performs MSK-SD requests. It implements
|
||||
// the Discoverer interface.
|
||||
type MSKDiscovery struct {
|
||||
*refresh.Discovery
|
||||
logger *slog.Logger
|
||||
cfg *MSKSDConfig
|
||||
msk mskClient
|
||||
}
|
||||
|
||||
// NewMSKDiscovery returns a new MSKDiscovery which periodically refreshes its targets.
|
||||
func NewMSKDiscovery(conf *MSKSDConfig, opts discovery.DiscovererOptions) (*MSKDiscovery, error) {
|
||||
m, ok := opts.Metrics.(*mskMetrics)
|
||||
if !ok {
|
||||
return nil, errors.New("invalid discovery metrics type")
|
||||
}
|
||||
|
||||
if opts.Logger == nil {
|
||||
opts.Logger = promslog.NewNopLogger()
|
||||
}
|
||||
d := &MSKDiscovery{
|
||||
logger: opts.Logger,
|
||||
cfg: conf,
|
||||
}
|
||||
d.Discovery = refresh.NewDiscovery(
|
||||
refresh.Options{
|
||||
Logger: opts.Logger,
|
||||
Mech: "msk",
|
||||
Interval: time.Duration(d.cfg.RefreshInterval),
|
||||
RefreshF: d.refresh,
|
||||
MetricsInstantiator: m.refreshMetrics,
|
||||
},
|
||||
)
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func (d *MSKDiscovery) initMskClient(ctx context.Context) error {
|
||||
if d.msk != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if d.cfg.Region == "" {
|
||||
return errors.New("region must be set for MSK service discovery")
|
||||
}
|
||||
|
||||
// Build the HTTP client from the provided HTTPClientConfig.
|
||||
client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "msk_sd")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build the AWS config with the provided region.
|
||||
var configOptions []func(*awsConfig.LoadOptions) error
|
||||
configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region))
|
||||
configOptions = append(configOptions, awsConfig.WithHTTPClient(client))
|
||||
|
||||
// Only set static credentials if both access key and secret key are provided
|
||||
// Otherwise, let AWS SDK use its default credential chain
|
||||
if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" {
|
||||
credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "")
|
||||
configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider))
|
||||
}
|
||||
|
||||
if d.cfg.Profile != "" {
|
||||
configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile))
|
||||
}
|
||||
|
||||
cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...)
|
||||
if err != nil {
|
||||
d.logger.Error("Failed to create AWS config", "error", err)
|
||||
return fmt.Errorf("could not create aws config: %w", err)
|
||||
}
|
||||
|
||||
// If the role ARN is set, assume the role to get credentials and set the credentials provider in the config.
|
||||
if d.cfg.RoleARN != "" {
|
||||
assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN)
|
||||
cfg.Credentials = aws.NewCredentialsCache(assumeProvider)
|
||||
}
|
||||
|
||||
d.msk = kafka.NewFromConfig(cfg, func(options *kafka.Options) {
|
||||
if d.cfg.Endpoint != "" {
|
||||
options.BaseEndpoint = &d.cfg.Endpoint
|
||||
}
|
||||
options.HTTPClient = client
|
||||
})
|
||||
|
||||
// Test credentials by making a simple API call
|
||||
testCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = d.msk.ListClustersV2(testCtx, &kafka.ListClustersV2Input{})
|
||||
if err != nil {
|
||||
d.logger.Error("Failed to test MSK credentials", "error", err)
|
||||
return fmt.Errorf("MSK credential test failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *MSKDiscovery) describeClusters(ctx context.Context, clusterARNs []string) ([]types.Cluster, error) {
|
||||
var (
|
||||
clusters []types.Cluster
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
errs []error
|
||||
)
|
||||
for _, clusterARN := range clusterARNs {
|
||||
wg.Add(1)
|
||||
go func(clusterARN string) {
|
||||
defer wg.Done()
|
||||
cluster, err := d.msk.DescribeClusterV2(ctx, &kafka.DescribeClusterV2Input{
|
||||
ClusterArn: aws.String(clusterARN),
|
||||
})
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
errs = append(errs, fmt.Errorf("could not describe cluster %v: %w", clusterARN, err))
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
clusters = append(clusters, *cluster.ClusterInfo)
|
||||
mu.Unlock()
|
||||
}(clusterARN)
|
||||
}
|
||||
wg.Wait()
|
||||
if len(errs) > 0 {
|
||||
return nil, fmt.Errorf("errors occurred while describing clusters: %v", errs)
|
||||
}
|
||||
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
func (d *MSKDiscovery) listClusters(ctx context.Context) ([]types.Cluster, error) {
|
||||
var (
|
||||
clusters []types.Cluster
|
||||
nextToken *string
|
||||
)
|
||||
for {
|
||||
listClustersInput := kafka.ListClustersV2Input{
|
||||
ClusterTypeFilter: aws.String("PROVISIONED"),
|
||||
MaxResults: aws.Int32(100),
|
||||
NextToken: nextToken,
|
||||
}
|
||||
|
||||
resp, err := d.msk.ListClustersV2(ctx, &listClustersInput)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not list clusters: %w", err)
|
||||
}
|
||||
|
||||
clusters = append(clusters, resp.ClusterInfoList...)
|
||||
if resp.NextToken == nil {
|
||||
break
|
||||
}
|
||||
nextToken = resp.NextToken
|
||||
}
|
||||
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
func (d *MSKDiscovery) listNodes(ctx context.Context, clusterARN string) ([]types.NodeInfo, error) {
|
||||
var (
|
||||
nodes []types.NodeInfo
|
||||
nextToken *string
|
||||
)
|
||||
for {
|
||||
resp, err := d.msk.ListNodes(ctx, &kafka.ListNodesInput{
|
||||
ClusterArn: aws.String(clusterARN),
|
||||
MaxResults: aws.Int32(100),
|
||||
NextToken: nextToken,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not list nodes for cluster %v: %w", clusterARN, err)
|
||||
}
|
||||
|
||||
nodes = append(nodes, resp.NodeInfoList...)
|
||||
if resp.NextToken == nil {
|
||||
break
|
||||
}
|
||||
nextToken = resp.NextToken
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
||||
err := d.initMskClient(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tg := &targetgroup.Group{
|
||||
Source: d.cfg.Region,
|
||||
}
|
||||
|
||||
var clusters []types.Cluster
|
||||
if len(d.cfg.Clusters) > 0 {
|
||||
clusters, err = d.describeClusters(ctx, d.cfg.Clusters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
clusters, err = d.listClusters(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
targetsMu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
for _, cluster := range clusters {
|
||||
wg.Add(1)
|
||||
go func(cluster types.Cluster) {
|
||||
defer wg.Done()
|
||||
|
||||
nodes, err := d.listNodes(ctx, aws.ToString(cluster.ClusterArn))
|
||||
if err != nil {
|
||||
d.logger.Error("Failed to list nodes", "cluster", aws.ToString(cluster.ClusterName), "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
labels := model.LabelSet{
|
||||
mskLabelClusterName: model.LabelValue(aws.ToString(cluster.ClusterName)),
|
||||
mskLabelClusterARN: model.LabelValue(aws.ToString(cluster.ClusterArn)),
|
||||
mskLabelClusterState: model.LabelValue(string(cluster.State)),
|
||||
mskLabelClusterType: model.LabelValue(string(cluster.ClusterType)),
|
||||
mskLabelClusterVersion: model.LabelValue(aws.ToString(cluster.CurrentVersion)),
|
||||
mskLabelNodeARN: model.LabelValue(aws.ToString(node.NodeARN)),
|
||||
mskLabelNodeAddedTime: model.LabelValue(aws.ToString(node.AddedToClusterTime)),
|
||||
mskLabelNodeInstanceType: model.LabelValue(aws.ToString(node.InstanceType)),
|
||||
mskLabelClusterJmxExporterEnabled: model.LabelValue(strconv.FormatBool(*cluster.Provisioned.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker)),
|
||||
mskLabelClusterConfigurationARN: model.LabelValue(aws.ToString(cluster.Provisioned.CurrentBrokerSoftwareInfo.ConfigurationArn)),
|
||||
mskLabelClusterConfigurationRevision: model.LabelValue(strconv.FormatInt(*cluster.Provisioned.CurrentBrokerSoftwareInfo.ConfigurationRevision, 10)),
|
||||
mskLabelClusterKafkaVersion: model.LabelValue(aws.ToString(cluster.Provisioned.CurrentBrokerSoftwareInfo.KafkaVersion)),
|
||||
}
|
||||
|
||||
for key, value := range cluster.Tags {
|
||||
labels[model.LabelName(mskLabelClusterTags+strutil.SanitizeLabelName(key))] = model.LabelValue(value)
|
||||
}
|
||||
|
||||
switch nodeType(node) {
|
||||
case NodeTypeBroker:
|
||||
labels[mskLabelNodeType] = model.LabelValue(NodeTypeBroker)
|
||||
labels[mskLabelNodeAttachedENI] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.AttachedENIId))
|
||||
labels[mskLabelBrokerID] = model.LabelValue(fmt.Sprintf("%.0f", aws.ToFloat64(node.BrokerNodeInfo.BrokerId)))
|
||||
labels[mskLabelBrokerClientSubnet] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.ClientSubnet))
|
||||
labels[mskLabelBrokerClientVPCIP] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.ClientVpcIpAddress))
|
||||
labels[mskLabelBrokerNodeExporterEnabled] = model.LabelValue(strconv.FormatBool(*cluster.Provisioned.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker))
|
||||
|
||||
for idx, endpoint := range node.BrokerNodeInfo.Endpoints {
|
||||
endpointLabels := labels.Clone()
|
||||
endpointLabels[mskLabelBrokerEndpointIndex] = model.LabelValue(strconv.Itoa(idx))
|
||||
endpointLabels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(endpoint, strconv.Itoa(d.cfg.Port)))
|
||||
|
||||
targetsMu.Lock()
|
||||
tg.Targets = append(tg.Targets, endpointLabels)
|
||||
targetsMu.Unlock()
|
||||
}
|
||||
|
||||
case NodeTypeController:
|
||||
labels[mskLabelNodeType] = model.LabelValue(NodeTypeController)
|
||||
|
||||
for idx, endpoint := range node.ControllerNodeInfo.Endpoints {
|
||||
endpointLabels := labels.Clone()
|
||||
endpointLabels[mskLabelControllerEndpointIndex] = model.LabelValue(strconv.Itoa(idx))
|
||||
endpointLabels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(endpoint, strconv.Itoa(d.cfg.Port)))
|
||||
|
||||
targetsMu.Lock()
|
||||
tg.Targets = append(tg.Targets, endpointLabels)
|
||||
targetsMu.Unlock()
|
||||
}
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}(cluster)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return []*targetgroup.Group{tg}, nil
|
||||
}
|
||||
|
||||
func nodeType(node types.NodeInfo) NodeType {
|
||||
if node.BrokerNodeInfo != nil {
|
||||
return NodeTypeBroker
|
||||
} else if node.ControllerNodeInfo != nil {
|
||||
return NodeTypeController
|
||||
}
|
||||
return ""
|
||||
}
|
||||
1057
discovery/aws/msk_test.go
Normal file
1057
discovery/aws/msk_test.go
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -984,11 +984,56 @@ The following meta labels are available on targets during [relabeling](#relabel_
|
|||
* `__meta_ecs_tag_task_<tagkey>`: each task tag value, keyed by tag name
|
||||
* `__meta_ecs_tag_ec2_<tagkey>`: each EC2 instance tag value, keyed by tag name (EC2 launch type only)
|
||||
|
||||
#### `msk`
|
||||
|
||||
The `msk` role discovers targets from AWS MSK (Managed Streaming for Apache Kafka) provisioned clusters.
|
||||
|
||||
**Important**: This service discovery only works with **provisioned clusters**. Serverless clusters are not supported as they do not expose individual broker nodes.
|
||||
|
||||
Discovery includes:
|
||||
- **Broker nodes**: Kafka broker instances (supports both ZooKeeper-based and KRaft-based clusters)
|
||||
- **KRaft Controller nodes**: Controller instances (KRaft-based clusters only)
|
||||
|
||||
Note: ZooKeeper nodes are not discoverable via the MSK API. For monitoring, MSK provides:
|
||||
- **JMX Exporter**: Available on both broker and KRaft controller nodes (when enabled)
|
||||
- **Node Exporter**: Available on broker nodes only (when enabled)
|
||||
|
||||
The IAM credentials used must have the following permissions to discover
|
||||
scrape targets:
|
||||
|
||||
- `kafka:DescribeClusterV2`
|
||||
- `kafka:ListClustersV2`
|
||||
- `kafka:ListNodes`
|
||||
|
||||
The following meta labels are available on targets during [relabeling](#relabel_config):
|
||||
|
||||
* `__meta_msk_cluster_name`: the name of the MSK cluster
|
||||
* `__meta_msk_cluster_arn`: the ARN of the MSK cluster
|
||||
* `__meta_msk_cluster_state`: the state of the MSK cluster (e.g., ACTIVE, CREATING, DELETING)
|
||||
* `__meta_msk_cluster_type`: the type of the MSK cluster (e.g., PROVISIONED, SERVERLESS)
|
||||
* `__meta_msk_cluster_version`: the current version of the MSK cluster
|
||||
* `__meta_msk_cluster_kafka_version`: the Kafka version running on the cluster
|
||||
* `__meta_msk_cluster_jmx_exporter_enabled`: whether JMX exporter is enabled on the cluster
|
||||
* `__meta_msk_cluster_configuration_arn`: the ARN of the MSK configuration
|
||||
* `__meta_msk_cluster_configuration_revision`: the revision of the MSK configuration
|
||||
* `__meta_msk_cluster_tag_<tagkey>`: each cluster tag value, keyed by tag name
|
||||
* `__meta_msk_node_type`: the type of the node (BROKER or CONTROLLER)
|
||||
* `__meta_msk_node_arn`: the ARN of the node
|
||||
* `__meta_msk_node_added_time`: the time the node was added to the cluster
|
||||
* `__meta_msk_node_instance_type`: the instance type of the node
|
||||
* `__meta_msk_node_attached_eni`: the ID of the attached ENI
|
||||
* `__meta_msk_broker_id`: the broker ID (broker nodes only)
|
||||
* `__meta_msk_broker_endpoint_index`: the index of the broker endpoint (broker nodes only)
|
||||
* `__meta_msk_broker_client_subnet`: the client subnet of the broker (broker nodes only)
|
||||
* `__meta_msk_broker_client_vpc_ip`: the VPC IP address of the broker (broker nodes only)
|
||||
* `__meta_msk_broker_node_exporter_enabled`: whether node exporter is enabled on brokers (broker nodes only)
|
||||
* `__meta_msk_controller_endpoint_index`: the index of the controller endpoint (controller nodes only)
|
||||
|
||||
See below for the configuration options for AWS discovery:
|
||||
|
||||
```yaml
|
||||
# The AWS role to use for service discovery.
|
||||
# Must be one of: ec2, lightsail, or ecs.
|
||||
# Must be one of: ec2, lightsail, ecs, or msk.
|
||||
role: <string>
|
||||
|
||||
# The AWS region. If blank, the region from the instance metadata is used.
|
||||
|
|
@ -1024,7 +1069,7 @@ filters:
|
|||
[ - name: <string>
|
||||
values: <string>, [...] ]
|
||||
|
||||
# List of ECS cluster ARNs to discover (ecs role only). If empty, all clusters in the region are discovered.
|
||||
# List of ECS or MSK cluster ARNs (ecs and msk roles only) to discover. If empty, all clusters in the region are discovered.
|
||||
# This can significantly improve performance when you only need to monitor specific clusters.
|
||||
[ clusters: [<string>, ...] ]
|
||||
|
||||
|
|
|
|||
7
go.mod
7
go.mod
|
|
@ -11,11 +11,12 @@ require (
|
|||
github.com/KimMachineGun/automemlimit v0.7.5
|
||||
github.com/alecthomas/kingpin/v2 v2.4.0
|
||||
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.0
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.1
|
||||
github.com/aws/aws-sdk-go-v2/config v1.32.6
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.19.6
|
||||
github.com/aws/aws-sdk-go-v2/service/ec2 v1.279.0
|
||||
github.com/aws/aws-sdk-go-v2/service/ecs v1.70.0
|
||||
github.com/aws/aws-sdk-go-v2/service/kafka v1.46.7
|
||||
github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.10
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.5
|
||||
github.com/aws/smithy-go v1.24.0
|
||||
|
|
@ -137,8 +138,8 @@ require (
|
|||
github.com/Microsoft/go-winio v0.6.1 // indirect
|
||||
github.com/armon/go-metrics v0.4.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 // indirect
|
||||
|
|
|
|||
14
go.sum
14
go.sum
|
|
@ -47,18 +47,18 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ
|
|||
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
|
||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.0 h1:tNvqh1s+v0vFYdA1xq0aOJH+Y5cRyZ5upu6roPgPKd4=
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.0/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU=
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.32.6 h1:hFLBGUKjmLAekvi1evLi5hVvFQtSo3GYwi+Bx4lpJf8=
|
||||
github.com/aws/aws-sdk-go-v2/config v1.32.6/go.mod h1:lcUL/gcd8WyjCrMnxez5OXkO3/rwcNmvfno62tnXNcI=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.19.6 h1:F9vWao2TwjV2MyiyVS+duza0NIRtAslgLUM0vTA1ZaE=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.19.6/go.mod h1:SgHzKjEVsdQr6Opor0ihgWtkWdfRAIwxYzSJ8O85VHY=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 h1:80+uETIWS1BqjnN9uJ0dBUaETh+P1XwFy5vwHwK5r9k=
|
||||
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16/go.mod h1:wOOsYuxYuB/7FlnVtzeBYRcjSRtQpAW0hCP7tIULMwo=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 h1:rgGwPzb82iBYSvHMHXc8h9mRoOUBZIGFgKb9qniaZZc=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16/go.mod h1:L/UxsGeKpGoIj6DxfhOWHWQ/kGKcd4I1VncE4++IyKA=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 h1:1jtGzuV7c82xnqOVfx2F0xmJcOw5374L7N6juGW6x6U=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16/go.mod h1:M2E5OQf+XLe+SZGmmpaI2yy+J326aFf6/+54PoxSANc=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 h1:xOLELNKGp2vsiteLsvLPwxC+mYmO6OZ8PYgiuPJzF8U=
|
||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17/go.mod h1:5M5CI3D12dNOtH3/mk6minaRwI2/37ifCURZISxA/IQ=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 h1:WWLqlh79iO48yLkj1v3ISRNiv+3KdQoZ6JWyfcsyQik=
|
||||
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17/go.mod h1:EhG22vHRrvF8oXSTYStZhJc1aUgKtnJe+aOiFEV90cM=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
|
||||
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
|
||||
github.com/aws/aws-sdk-go-v2/service/ec2 v1.279.0 h1:o7eJKe6VYAnqERPlLAvDW5VKXV6eTKv1oxTpMoDP378=
|
||||
|
|
@ -69,6 +69,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEd
|
|||
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 h1:oHjJHeUy0ImIV0bsrX0X91GkV5nJAyv1l1CC9lnO0TI=
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16/go.mod h1:iRSNGgOYmiYwSCXxXaKb9HfOEj40+oTKn8pTxMlYkRM=
|
||||
github.com/aws/aws-sdk-go-v2/service/kafka v1.46.7 h1:0jDb9b505gbCmtjH1RT7kx8hDbVDzOhnTeZm7dzskpQ=
|
||||
github.com/aws/aws-sdk-go-v2/service/kafka v1.46.7/go.mod h1:tWnHS64fg5ydLHivFlCAtEh/1iMNzr56QsH3F+UTwD4=
|
||||
github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.10 h1:MQuZZ6Tq1qQabPlkVxrCMdyVl70Ogl4AERZKo+y9Wzo=
|
||||
github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.10/go.mod h1:U5C3JME1ibKESmpzBAqlRpTYZfVbTqrb5ICJm+sVVd8=
|
||||
github.com/aws/aws-sdk-go-v2/service/signin v1.0.4 h1:HpI7aMmJ+mm1wkSHIA2t5EaFFv5EFYXePW30p1EIrbQ=
|
||||
|
|
|
|||
Loading…
Reference in a new issue