From a65d0756917f169de4b9212f74959f69ac2b57e4 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 17 Nov 2025 15:59:40 +0100 Subject: [PATCH] prw2: Move Remote Write 2.0 CT to be per Sample; Rename to ST (start timestamp) (#17411) Relates to https://github.com/prometheus/prometheus/issues/16944#issuecomment-3164760343 Signed-off-by: bwplotka Signed-off-by: matt-gp --- cmd/prometheus/testdata/features.json | 1 + discovery/aws/aws.go | 42 +- discovery/aws/aws_test.go | 26 + discovery/aws/metrics_msk.go | 32 + discovery/aws/msk.go | 463 +++++++++++ discovery/aws/msk_test.go | 1057 +++++++++++++++++++++++++ docs/configuration/configuration.md | 49 +- go.mod | 7 +- go.sum | 14 +- 9 files changed, 1678 insertions(+), 13 deletions(-) create mode 100644 discovery/aws/metrics_msk.go create mode 100644 discovery/aws/msk.go create mode 100644 discovery/aws/msk_test.go diff --git a/cmd/prometheus/testdata/features.json b/cmd/prometheus/testdata/features.json index 4c893daae2..b8645ecf01 100644 --- a/cmd/prometheus/testdata/features.json +++ b/cmd/prometheus/testdata/features.json @@ -194,6 +194,7 @@ "lightsail": true, "linode": true, "marathon": true, + "msk": true, "nerve": true, "nomad": true, "openstack": true, diff --git a/discovery/aws/aws.go b/discovery/aws/aws.go index be6b4dabbe..9db87965bb 100644 --- a/discovery/aws/aws.go +++ b/discovery/aws/aws.go @@ -43,6 +43,7 @@ const ( RoleEC2 Role = "ec2" RoleECS Role = "ecs" RoleLightsail Role = "lightsail" + RoleMSK Role = "msk" ) // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -51,7 +52,7 @@ func (c *Role) UnmarshalYAML(unmarshal func(any) error) error { return err } switch *c { - case RoleEC2, RoleECS, RoleLightsail: + case RoleEC2, RoleECS, RoleLightsail, RoleMSK: return nil default: return fmt.Errorf("unknown AWS SD role %q", *c) @@ -78,13 +79,14 @@ type SDConfig struct { // ec2 specific Filters []*EC2Filter `yaml:"filters,omitempty"` - // ecs specific + // ecs, msk specific Clusters []string `yaml:"clusters,omitempty"` // Embedded sub-configs (internal use only, not serialized) *EC2SDConfig `yaml:"-"` *ECSSDConfig `yaml:"-"` *LightsailSDConfig `yaml:"-"` + *MSKSDConfig `yaml:"-"` } // UnmarshalYAML implements the yaml.Unmarshaler interface for SDConfig. @@ -195,6 +197,39 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(any) error) error { if c.RefreshInterval != 0 { c.LightsailSDConfig.RefreshInterval = c.RefreshInterval } + case RoleMSK: + if c.MSKSDConfig == nil { + mskConfig := DefaultMSKSDConfig + c.MSKSDConfig = &mskConfig + } + c.MSKSDConfig.HTTPClientConfig = c.HTTPClientConfig + if c.Region != "" { + c.MSKSDConfig.Region = c.Region + } + if c.Endpoint != "" { + c.MSKSDConfig.Endpoint = c.Endpoint + } + if c.AccessKey != "" { + c.MSKSDConfig.AccessKey = c.AccessKey + } + if c.SecretKey != "" { + c.MSKSDConfig.SecretKey = c.SecretKey + } + if c.Profile != "" { + c.MSKSDConfig.Profile = c.Profile + } + if c.RoleARN != "" { + c.MSKSDConfig.RoleARN = c.RoleARN + } + if c.Port != 0 { + c.MSKSDConfig.Port = c.Port + } + if c.RefreshInterval != 0 { + c.MSKSDConfig.RefreshInterval = c.RefreshInterval + } + if c.Clusters != nil { + c.MSKSDConfig.Clusters = c.Clusters + } default: return fmt.Errorf("unknown AWS SD role %q", c.Role) } @@ -226,6 +261,9 @@ func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Di case RoleLightsail: opts.Metrics = &lightsailMetrics{refreshMetrics: awsMetrics.refreshMetrics} return NewLightsailDiscovery(c.LightsailSDConfig, opts) + case RoleMSK: + opts.Metrics = &mskMetrics{refreshMetrics: awsMetrics.refreshMetrics} + return NewMSKDiscovery(c.MSKSDConfig, opts) default: return nil, fmt.Errorf("unknown AWS SD role %q", c.Role) } diff --git a/discovery/aws/aws_test.go b/discovery/aws/aws_test.go index dc1f2044ec..b47a6cd92c 100644 --- a/discovery/aws/aws_test.go +++ b/discovery/aws/aws_test.go @@ -272,6 +272,32 @@ func TestMultipleSDConfigsDoNotShareState(t *testing.T) { "LightsailSDConfig objects should not share the same memory address") }, }, + { + name: "MSKMultipleJobsDifferentPorts", + yaml: ` +- role: msk + region: ap-south-1 + port: 6060 + clusters: ["cluster-1"] +- role: msk + region: ap-south-1 + port: 6061 + clusters: ["cluster-2"]`, + validateFunc: func(t *testing.T, cfg1, cfg2 *SDConfig) { + require.Equal(t, RoleMSK, cfg1.Role) + require.Equal(t, RoleMSK, cfg2.Role) + require.NotNil(t, cfg1.MSKSDConfig) + require.NotNil(t, cfg2.MSKSDConfig) + + require.Equal(t, 6060, cfg1.MSKSDConfig.Port) + require.Equal(t, []string{"cluster-1"}, cfg1.MSKSDConfig.Clusters) + require.Equal(t, 6061, cfg2.MSKSDConfig.Port) + require.Equal(t, []string{"cluster-2"}, cfg2.MSKSDConfig.Clusters) + + require.NotSame(t, cfg1.MSKSDConfig, cfg2.MSKSDConfig, + "MSKSDConfig objects should not share the same memory address") + }, + }, } for _, tt := range tests { diff --git a/discovery/aws/metrics_msk.go b/discovery/aws/metrics_msk.go new file mode 100644 index 0000000000..fc69f57aa1 --- /dev/null +++ b/discovery/aws/metrics_msk.go @@ -0,0 +1,32 @@ +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "github.com/prometheus/prometheus/discovery" +) + +type mskMetrics struct { + refreshMetrics discovery.RefreshMetricsInstantiator +} + +var _ discovery.DiscovererMetrics = (*mskMetrics)(nil) + +// Register implements discovery.DiscovererMetrics. +func (*mskMetrics) Register() error { + return nil +} + +// Unregister implements discovery.DiscovererMetrics. +func (*mskMetrics) Unregister() {} diff --git a/discovery/aws/msk.go b/discovery/aws/msk.go new file mode 100644 index 0000000000..2a2b240d49 --- /dev/null +++ b/discovery/aws/msk.go @@ -0,0 +1,463 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "strconv" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/credentials/stscreds" + "github.com/aws/aws-sdk-go-v2/feature/ec2/imds" + "github.com/aws/aws-sdk-go-v2/service/kafka" + "github.com/aws/aws-sdk-go-v2/service/kafka/types" + "github.com/aws/aws-sdk-go-v2/service/sts" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/refresh" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/strutil" +) + +type NodeType string + +const ( + NodeTypeBroker NodeType = "BROKER" + NodeTypeController NodeType = "CONTROLLER" +) + +const ( + mskLabel = model.MetaLabelPrefix + "msk_" + + // Cluster labels. + mskLabelCluster = mskLabel + "cluster_" + mskLabelClusterName = mskLabelCluster + "name" + mskLabelClusterARN = mskLabelCluster + "arn" + mskLabelClusterState = mskLabelCluster + "state" + mskLabelClusterType = mskLabelCluster + "type" + mskLabelClusterVersion = mskLabelCluster + "version" + mskLabelClusterJmxExporterEnabled = mskLabelCluster + "jmx_exporter_enabled" + mskLabelClusterConfigurationARN = mskLabelCluster + "configuration_arn" + mskLabelClusterConfigurationRevision = mskLabelCluster + "configuration_revision" + mskLabelClusterKafkaVersion = mskLabelCluster + "kafka_version" + mskLabelClusterTags = mskLabelCluster + "tag_" + + // Node labels. + mskLabelNode = mskLabel + "node_" + mskLabelNodeType = mskLabelNode + "type" + mskLabelNodeARN = mskLabelNode + "arn" + mskLabelNodeAddedTime = mskLabelNode + "added_time" + mskLabelNodeInstanceType = mskLabelNode + "instance_type" + mskLabelNodeAttachedENI = mskLabelNode + "attached_eni" + + // Broker labels. + mskLabelBroker = mskLabel + "broker_" + mskLabelBrokerEndpointIndex = mskLabelBroker + "endpoint_index" + mskLabelBrokerID = mskLabelBroker + "id" + mskLabelBrokerClientSubnet = mskLabelBroker + "client_subnet" + mskLabelBrokerClientVPCIP = mskLabelBroker + "client_vpc_ip" + mskLabelBrokerNodeExporterEnabled = mskLabelBroker + "node_exporter_enabled" + + // Controller labels. + mskLabelController = mskLabel + "controller_" + mskLabelControllerEndpointIndex = mskLabelController + "endpoint_index" +) + +// DefaultMSKSDConfig is the default MSK SD configuration. +var DefaultMSKSDConfig = MSKSDConfig{ + Port: 80, + RefreshInterval: model.Duration(60 * time.Second), + HTTPClientConfig: config.DefaultHTTPClientConfig, +} + +func init() { + discovery.RegisterConfig(&MSKSDConfig{}) +} + +// MSKSDConfig is the configuration for MSK based service discovery. +type MSKSDConfig struct { + Region string `yaml:"region"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"access_key,omitempty"` + SecretKey config.Secret `yaml:"secret_key,omitempty"` + Profile string `yaml:"profile,omitempty"` + RoleARN string `yaml:"role_arn,omitempty"` + Clusters []string `yaml:"clusters,omitempty"` + Port int `yaml:"port"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + + HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` +} + +// NewDiscovererMetrics implements discovery.Config. +func (*MSKSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics { + return &mskMetrics{ + refreshMetrics: rmi, + } +} + +// Name returns the name of the MSK Config. +func (*MSKSDConfig) Name() string { return "msk" } + +// NewDiscoverer returns a Discoverer for the MSK Config. +func (c *MSKSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { + return NewMSKDiscovery(c, opts) +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface for the MSK Config. +func (c *MSKSDConfig) UnmarshalYAML(unmarshal func(any) error) error { + *c = DefaultMSKSDConfig + type plain MSKSDConfig + err := unmarshal((*plain)(c)) + if err != nil { + return err + } + + if c.Region == "" { + cfg, err := awsConfig.LoadDefaultConfig(context.Background()) + if err != nil { + return err + } + if cfg.Region != "" { + // If the region is already set in the config, use it (env vars). + c.Region = cfg.Region + } + + if c.Region == "" { + // Try to get the region from IMDS. + imdsClient := imds.NewFromConfig(cfg) + region, err := imdsClient.GetRegion(context.Background(), &imds.GetRegionInput{}) + if err != nil { + return err + } + c.Region = region.Region + } + } + + if c.Region == "" { + return errors.New("MSK SD configuration requires a region") + } + + return c.HTTPClientConfig.Validate() +} + +type mskClient interface { + DescribeClusterV2(context.Context, *kafka.DescribeClusterV2Input, ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) + ListClustersV2(context.Context, *kafka.ListClustersV2Input, ...func(*kafka.Options)) (*kafka.ListClustersV2Output, error) + ListNodes(context.Context, *kafka.ListNodesInput, ...func(*kafka.Options)) (*kafka.ListNodesOutput, error) +} + +// MSKDiscovery periodically performs MSK-SD requests. It implements +// the Discoverer interface. +type MSKDiscovery struct { + *refresh.Discovery + logger *slog.Logger + cfg *MSKSDConfig + msk mskClient +} + +// NewMSKDiscovery returns a new MSKDiscovery which periodically refreshes its targets. +func NewMSKDiscovery(conf *MSKSDConfig, opts discovery.DiscovererOptions) (*MSKDiscovery, error) { + m, ok := opts.Metrics.(*mskMetrics) + if !ok { + return nil, errors.New("invalid discovery metrics type") + } + + if opts.Logger == nil { + opts.Logger = promslog.NewNopLogger() + } + d := &MSKDiscovery{ + logger: opts.Logger, + cfg: conf, + } + d.Discovery = refresh.NewDiscovery( + refresh.Options{ + Logger: opts.Logger, + Mech: "msk", + Interval: time.Duration(d.cfg.RefreshInterval), + RefreshF: d.refresh, + MetricsInstantiator: m.refreshMetrics, + }, + ) + return d, nil +} + +func (d *MSKDiscovery) initMskClient(ctx context.Context) error { + if d.msk != nil { + return nil + } + + if d.cfg.Region == "" { + return errors.New("region must be set for MSK service discovery") + } + + // Build the HTTP client from the provided HTTPClientConfig. + client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "msk_sd") + if err != nil { + return err + } + + // Build the AWS config with the provided region. + var configOptions []func(*awsConfig.LoadOptions) error + configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region)) + configOptions = append(configOptions, awsConfig.WithHTTPClient(client)) + + // Only set static credentials if both access key and secret key are provided + // Otherwise, let AWS SDK use its default credential chain + if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" { + credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "") + configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider)) + } + + if d.cfg.Profile != "" { + configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile)) + } + + cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...) + if err != nil { + d.logger.Error("Failed to create AWS config", "error", err) + return fmt.Errorf("could not create aws config: %w", err) + } + + // If the role ARN is set, assume the role to get credentials and set the credentials provider in the config. + if d.cfg.RoleARN != "" { + assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN) + cfg.Credentials = aws.NewCredentialsCache(assumeProvider) + } + + d.msk = kafka.NewFromConfig(cfg, func(options *kafka.Options) { + if d.cfg.Endpoint != "" { + options.BaseEndpoint = &d.cfg.Endpoint + } + options.HTTPClient = client + }) + + // Test credentials by making a simple API call + testCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + _, err = d.msk.ListClustersV2(testCtx, &kafka.ListClustersV2Input{}) + if err != nil { + d.logger.Error("Failed to test MSK credentials", "error", err) + return fmt.Errorf("MSK credential test failed: %w", err) + } + + return nil +} + +func (d *MSKDiscovery) describeClusters(ctx context.Context, clusterARNs []string) ([]types.Cluster, error) { + var ( + clusters []types.Cluster + wg sync.WaitGroup + mu sync.Mutex + errs []error + ) + for _, clusterARN := range clusterARNs { + wg.Add(1) + go func(clusterARN string) { + defer wg.Done() + cluster, err := d.msk.DescribeClusterV2(ctx, &kafka.DescribeClusterV2Input{ + ClusterArn: aws.String(clusterARN), + }) + if err != nil { + mu.Lock() + errs = append(errs, fmt.Errorf("could not describe cluster %v: %w", clusterARN, err)) + mu.Unlock() + return + } + mu.Lock() + clusters = append(clusters, *cluster.ClusterInfo) + mu.Unlock() + }(clusterARN) + } + wg.Wait() + if len(errs) > 0 { + return nil, fmt.Errorf("errors occurred while describing clusters: %v", errs) + } + + return clusters, nil +} + +func (d *MSKDiscovery) listClusters(ctx context.Context) ([]types.Cluster, error) { + var ( + clusters []types.Cluster + nextToken *string + ) + for { + listClustersInput := kafka.ListClustersV2Input{ + ClusterTypeFilter: aws.String("PROVISIONED"), + MaxResults: aws.Int32(100), + NextToken: nextToken, + } + + resp, err := d.msk.ListClustersV2(ctx, &listClustersInput) + if err != nil { + return nil, fmt.Errorf("could not list clusters: %w", err) + } + + clusters = append(clusters, resp.ClusterInfoList...) + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken + } + + return clusters, nil +} + +func (d *MSKDiscovery) listNodes(ctx context.Context, clusterARN string) ([]types.NodeInfo, error) { + var ( + nodes []types.NodeInfo + nextToken *string + ) + for { + resp, err := d.msk.ListNodes(ctx, &kafka.ListNodesInput{ + ClusterArn: aws.String(clusterARN), + MaxResults: aws.Int32(100), + NextToken: nextToken, + }) + if err != nil { + return nil, fmt.Errorf("could not list nodes for cluster %v: %w", clusterARN, err) + } + + nodes = append(nodes, resp.NodeInfoList...) + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken + } + + return nodes, nil +} + +func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { + err := d.initMskClient(ctx) + if err != nil { + return nil, err + } + + tg := &targetgroup.Group{ + Source: d.cfg.Region, + } + + var clusters []types.Cluster + if len(d.cfg.Clusters) > 0 { + clusters, err = d.describeClusters(ctx, d.cfg.Clusters) + if err != nil { + return nil, err + } + } else { + clusters, err = d.listClusters(ctx) + if err != nil { + return nil, err + } + } + + var ( + targetsMu sync.Mutex + wg sync.WaitGroup + ) + for _, cluster := range clusters { + wg.Add(1) + go func(cluster types.Cluster) { + defer wg.Done() + + nodes, err := d.listNodes(ctx, aws.ToString(cluster.ClusterArn)) + if err != nil { + d.logger.Error("Failed to list nodes", "cluster", aws.ToString(cluster.ClusterName), "error", err) + return + } + + for _, node := range nodes { + labels := model.LabelSet{ + mskLabelClusterName: model.LabelValue(aws.ToString(cluster.ClusterName)), + mskLabelClusterARN: model.LabelValue(aws.ToString(cluster.ClusterArn)), + mskLabelClusterState: model.LabelValue(string(cluster.State)), + mskLabelClusterType: model.LabelValue(string(cluster.ClusterType)), + mskLabelClusterVersion: model.LabelValue(aws.ToString(cluster.CurrentVersion)), + mskLabelNodeARN: model.LabelValue(aws.ToString(node.NodeARN)), + mskLabelNodeAddedTime: model.LabelValue(aws.ToString(node.AddedToClusterTime)), + mskLabelNodeInstanceType: model.LabelValue(aws.ToString(node.InstanceType)), + mskLabelClusterJmxExporterEnabled: model.LabelValue(strconv.FormatBool(*cluster.Provisioned.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker)), + mskLabelClusterConfigurationARN: model.LabelValue(aws.ToString(cluster.Provisioned.CurrentBrokerSoftwareInfo.ConfigurationArn)), + mskLabelClusterConfigurationRevision: model.LabelValue(strconv.FormatInt(*cluster.Provisioned.CurrentBrokerSoftwareInfo.ConfigurationRevision, 10)), + mskLabelClusterKafkaVersion: model.LabelValue(aws.ToString(cluster.Provisioned.CurrentBrokerSoftwareInfo.KafkaVersion)), + } + + for key, value := range cluster.Tags { + labels[model.LabelName(mskLabelClusterTags+strutil.SanitizeLabelName(key))] = model.LabelValue(value) + } + + switch nodeType(node) { + case NodeTypeBroker: + labels[mskLabelNodeType] = model.LabelValue(NodeTypeBroker) + labels[mskLabelNodeAttachedENI] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.AttachedENIId)) + labels[mskLabelBrokerID] = model.LabelValue(fmt.Sprintf("%.0f", aws.ToFloat64(node.BrokerNodeInfo.BrokerId))) + labels[mskLabelBrokerClientSubnet] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.ClientSubnet)) + labels[mskLabelBrokerClientVPCIP] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.ClientVpcIpAddress)) + labels[mskLabelBrokerNodeExporterEnabled] = model.LabelValue(strconv.FormatBool(*cluster.Provisioned.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker)) + + for idx, endpoint := range node.BrokerNodeInfo.Endpoints { + endpointLabels := labels.Clone() + endpointLabels[mskLabelBrokerEndpointIndex] = model.LabelValue(strconv.Itoa(idx)) + endpointLabels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(endpoint, strconv.Itoa(d.cfg.Port))) + + targetsMu.Lock() + tg.Targets = append(tg.Targets, endpointLabels) + targetsMu.Unlock() + } + + case NodeTypeController: + labels[mskLabelNodeType] = model.LabelValue(NodeTypeController) + + for idx, endpoint := range node.ControllerNodeInfo.Endpoints { + endpointLabels := labels.Clone() + endpointLabels[mskLabelControllerEndpointIndex] = model.LabelValue(strconv.Itoa(idx)) + endpointLabels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(endpoint, strconv.Itoa(d.cfg.Port))) + + targetsMu.Lock() + tg.Targets = append(tg.Targets, endpointLabels) + targetsMu.Unlock() + } + default: + continue + } + } + }(cluster) + } + wg.Wait() + + return []*targetgroup.Group{tg}, nil +} + +func nodeType(node types.NodeInfo) NodeType { + if node.BrokerNodeInfo != nil { + return NodeTypeBroker + } else if node.ControllerNodeInfo != nil { + return NodeTypeController + } + return "" +} diff --git a/discovery/aws/msk_test.go b/discovery/aws/msk_test.go new file mode 100644 index 0000000000..31744221ef --- /dev/null +++ b/discovery/aws/msk_test.go @@ -0,0 +1,1057 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "context" + "fmt" + "sort" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kafka" + "github.com/aws/aws-sdk-go-v2/service/kafka/types" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +// Struct for test data. +type mskDataStore struct { + region string + clusters []types.Cluster + nodes map[string][]types.NodeInfo // keyed by cluster ARN +} + +func TestMSKDiscoveryListClusters(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + mskData *mskDataStore + expected []types.Cluster + }{ + { + name: "MultipleClusters", + mskData: &mskDataStore{ + region: "us-west-2", + clusters: []types.Cluster{ + { + ClusterName: strptr("test-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + { + ClusterName: strptr("prod-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/prod-cluster/def-456"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + }, + }, + expected: []types.Cluster{ + { + ClusterName: strptr("test-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + { + ClusterName: strptr("prod-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/prod-cluster/def-456"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + }, + }, + { + name: "SingleCluster", + mskData: &mskDataStore{ + region: "us-east-1", + clusters: []types.Cluster{ + { + ClusterName: strptr("single-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-east-1:123456789012:cluster/single-cluster/xyz-789"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + }, + }, + expected: []types.Cluster{ + { + ClusterName: strptr("single-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-east-1:123456789012:cluster/single-cluster/xyz-789"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + }, + }, + { + name: "NoClusters", + mskData: &mskDataStore{ + region: "us-east-1", + clusters: []types.Cluster{}, + }, + expected: nil, + }, + } { + t.Run(tt.name, func(t *testing.T) { + client := newMockMSKClient(tt.mskData) + + d := &MSKDiscovery{ + msk: client, + cfg: &MSKSDConfig{ + Region: tt.mskData.region, + }, + } + + clusters, err := d.listClusters(ctx) + require.NoError(t, err) + require.Equal(t, tt.expected, clusters) + }) + } +} + +func TestMSKDiscoveryDescribeClusters(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + mskData *mskDataStore + clusterARNs []string + expected []types.Cluster + }{ + { + name: "SingleCluster", + mskData: &mskDataStore{ + region: "us-west-2", + clusters: []types.Cluster{ + { + ClusterName: strptr("test-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + CurrentVersion: strptr("1.2.3"), + Tags: map[string]string{ + "Environment": "production", + "Team": "platform", + }, + }, + }, + }, + clusterARNs: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"}, + expected: []types.Cluster{ + { + ClusterName: strptr("test-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + CurrentVersion: strptr("1.2.3"), + Tags: map[string]string{ + "Environment": "production", + "Team": "platform", + }, + }, + }, + }, + { + name: "MultipleClusters", + mskData: &mskDataStore{ + region: "us-east-1", + clusters: []types.Cluster{ + { + ClusterName: strptr("cluster-1"), + ClusterArn: strptr("arn:aws:kafka:us-east-1:123456789012:cluster/cluster-1/xyz-789"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + { + ClusterName: strptr("cluster-2"), + ClusterArn: strptr("arn:aws:kafka:us-east-1:123456789012:cluster/cluster-2/def-456"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + Tags: map[string]string{ + "Stage": "prod", + }, + }, + }, + }, + clusterARNs: []string{ + "arn:aws:kafka:us-east-1:123456789012:cluster/cluster-1/xyz-789", + "arn:aws:kafka:us-east-1:123456789012:cluster/cluster-2/def-456", + }, + expected: []types.Cluster{ + { + ClusterName: strptr("cluster-1"), + ClusterArn: strptr("arn:aws:kafka:us-east-1:123456789012:cluster/cluster-1/xyz-789"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + }, + { + ClusterName: strptr("cluster-2"), + ClusterArn: strptr("arn:aws:kafka:us-east-1:123456789012:cluster/cluster-2/def-456"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + Tags: map[string]string{ + "Stage": "prod", + }, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + client := newMockMSKClient(tt.mskData) + + d := &MSKDiscovery{ + msk: client, + cfg: &MSKSDConfig{ + Region: tt.mskData.region, + }, + } + + clusters, err := d.describeClusters(ctx, tt.clusterARNs) + require.NoError(t, err) + + // Sort clusters by ARN to handle non-deterministic ordering from goroutines + sort.Slice(clusters, func(i, j int) bool { + return aws.ToString(clusters[i].ClusterArn) < aws.ToString(clusters[j].ClusterArn) + }) + sort.Slice(tt.expected, func(i, j int) bool { + return aws.ToString(tt.expected[i].ClusterArn) < aws.ToString(tt.expected[j].ClusterArn) + }) + + require.Equal(t, tt.expected, clusters) + }) + } +} + +func TestMSKDiscoveryListNodes(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + mskData *mskDataStore + clusterARN string + expected []types.NodeInfo + }{ + { + name: "ClusterWithBrokers", + mskData: &mskDataStore{ + region: "us-west-2", + nodes: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + ClientSubnet: strptr("subnet-12345"), + ClientVpcIpAddress: strptr("10.0.1.100"), + Endpoints: []string{"b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-12345"), + }, + }, + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(2), + ClientSubnet: strptr("subnet-67890"), + ClientVpcIpAddress: strptr("10.0.1.101"), + Endpoints: []string{"b-2.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-67890"), + }, + }, + }, + }, + }, + clusterARN: "arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123", + expected: []types.NodeInfo{ + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + ClientSubnet: strptr("subnet-12345"), + ClientVpcIpAddress: strptr("10.0.1.100"), + Endpoints: []string{"b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-12345"), + }, + }, + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(2), + ClientSubnet: strptr("subnet-67890"), + ClientVpcIpAddress: strptr("10.0.1.101"), + Endpoints: []string{"b-2.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-67890"), + }, + }, + }, + }, + { + name: "ClusterWithNoNodes", + mskData: &mskDataStore{ + region: "us-west-2", + nodes: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789": {}, + }, + }, + clusterARN: "arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789", + expected: nil, + }, + } { + t.Run(tt.name, func(t *testing.T) { + client := newMockMSKClient(tt.mskData) + + d := &MSKDiscovery{ + msk: client, + cfg: &MSKSDConfig{ + Region: tt.mskData.region, + }, + } + + nodes, err := d.listNodes(ctx, tt.clusterARN) + require.NoError(t, err) + require.Equal(t, tt.expected, nodes) + }) + } +} + +func TestMSKDiscoveryRefresh(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + mskData *mskDataStore + config *MSKSDConfig + expected []*targetgroup.Group + }{ + { + name: "ClusterWithBrokersUsingClustersConfig", + mskData: &mskDataStore{ + region: "us-west-2", + clusters: []types.Cluster{ + { + ClusterName: strptr("test-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + CurrentVersion: strptr("1.2.3"), + Tags: map[string]string{ + "Environment": "production", + "Team": "platform", + }, + Provisioned: &types.Provisioned{ + CurrentBrokerSoftwareInfo: &types.BrokerSoftwareInfo{ + ConfigurationArn: strptr("arn:aws:kafka:us-west-2:123456789012:configuration/my-config/abc-123"), + ConfigurationRevision: aws.Int64(1), + KafkaVersion: strptr("2.8.1"), + }, + OpenMonitoring: &types.OpenMonitoringInfo{ + Prometheus: &types.PrometheusInfo{ + JmxExporter: &types.JmxExporterInfo{ + EnabledInBroker: aws.Bool(true), + }, + NodeExporter: &types.NodeExporterInfo{ + EnabledInBroker: aws.Bool(true), + }, + }, + }, + }, + }, + }, + nodes: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + ClientSubnet: strptr("subnet-12345"), + ClientVpcIpAddress: strptr("10.0.1.100"), + Endpoints: []string{"b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-12345"), + }, + }, + }, + }, + }, + config: &MSKSDConfig{ + Region: "us-west-2", + Port: 80, + Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"}, + }, + expected: []*targetgroup.Group{ + { + Source: "us-west-2", + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("test-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("1.2.3"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:configuration/my-config/abc-123"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("2.8.1"), + "__meta_msk_cluster_tag_Environment": model.LabelValue("production"), + "__meta_msk_cluster_tag_Team": model.LabelValue("platform"), + "__meta_msk_node_type": model.LabelValue("BROKER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + "__meta_msk_node_added_time": model.LabelValue("2023-01-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_node_attached_eni": model.LabelValue("eni-12345"), + "__meta_msk_broker_id": model.LabelValue("1"), + "__meta_msk_broker_client_subnet": model.LabelValue("subnet-12345"), + "__meta_msk_broker_client_vpc_ip": model.LabelValue("10.0.1.100"), + "__meta_msk_broker_node_exporter_enabled": model.LabelValue("true"), + "__meta_msk_broker_endpoint_index": model.LabelValue("0"), + }, + }, + }, + }, + }, + { + name: "NoClustersWithEmptyClustersConfig", + mskData: &mskDataStore{ + region: "us-east-1", + clusters: []types.Cluster{}, + }, + config: &MSKSDConfig{ + Region: "us-east-1", + Port: 80, + Clusters: []string{}, // Empty clusters list uses listClusters + }, + expected: []*targetgroup.Group{ + { + Source: "us-east-1", + }, + }, + }, + { + name: "ClusterWithBrokersUsingListClusters", + mskData: &mskDataStore{ + region: "us-west-2", + clusters: []types.Cluster{ + { + ClusterName: strptr("auto-discovered-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/auto-discovered-cluster/xyz-123"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + CurrentVersion: strptr("1.0.0"), + Provisioned: &types.Provisioned{ + CurrentBrokerSoftwareInfo: &types.BrokerSoftwareInfo{ + ConfigurationArn: strptr("arn:aws:kafka:us-west-2:123456789012:configuration/config/xyz"), + ConfigurationRevision: aws.Int64(1), + KafkaVersion: strptr("3.3.1"), + }, + OpenMonitoring: &types.OpenMonitoringInfo{ + Prometheus: &types.PrometheusInfo{ + JmxExporter: &types.JmxExporterInfo{ + EnabledInBroker: aws.Bool(true), + }, + NodeExporter: &types.NodeExporterInfo{ + EnabledInBroker: aws.Bool(true), + }, + }, + }, + }, + }, + }, + nodes: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/auto-discovered-cluster/xyz-123": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-auto"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + ClientSubnet: strptr("subnet-auto"), + ClientVpcIpAddress: strptr("10.0.1.200"), + Endpoints: []string{"b-auto.cluster.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-auto"), + }, + }, + }, + }, + }, + config: &MSKSDConfig{ + Region: "us-west-2", + Port: 80, + Clusters: nil, // nil clusters list uses listClusters (backward compatibility) + }, + expected: []*targetgroup.Group{ + { + Source: "us-west-2", + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("b-auto.cluster.kafka.us-west-2.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("auto-discovered-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:cluster/auto-discovered-cluster/xyz-123"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("1.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:configuration/config/xyz"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.3.1"), + "__meta_msk_node_type": model.LabelValue("BROKER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:node/broker-auto"), + "__meta_msk_node_added_time": model.LabelValue("2023-01-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_node_attached_eni": model.LabelValue("eni-auto"), + "__meta_msk_broker_id": model.LabelValue("1"), + "__meta_msk_broker_client_subnet": model.LabelValue("subnet-auto"), + "__meta_msk_broker_client_vpc_ip": model.LabelValue("10.0.1.200"), + "__meta_msk_broker_node_exporter_enabled": model.LabelValue("true"), + "__meta_msk_broker_endpoint_index": model.LabelValue("0"), + }, + }, + }, + }, + }, + { + name: "ClusterWithBrokersAndControllersUsingClustersConfig", + mskData: &mskDataStore{ + region: "us-west-2", + clusters: []types.Cluster{ + { + ClusterName: strptr("kraft-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + CurrentVersion: strptr("1.0.0"), + Tags: map[string]string{ + "Type": "kraft", + }, + Provisioned: &types.Provisioned{ + CurrentBrokerSoftwareInfo: &types.BrokerSoftwareInfo{ + ConfigurationArn: strptr("arn:aws:kafka:us-west-2:123456789012:configuration/config/xyz"), + ConfigurationRevision: aws.Int64(2), + KafkaVersion: strptr("3.3.1"), + }, + OpenMonitoring: &types.OpenMonitoringInfo{ + Prometheus: &types.PrometheusInfo{ + JmxExporter: &types.JmxExporterInfo{ + EnabledInBroker: aws.Bool(true), + }, + NodeExporter: &types.NodeExporterInfo{ + EnabledInBroker: aws.Bool(false), + }, + }, + }, + }, + }, + }, + nodes: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + AddedToClusterTime: strptr("2023-06-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + ClientSubnet: strptr("subnet-abc123"), + ClientVpcIpAddress: strptr("10.0.2.100"), + Endpoints: []string{"b-1.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-broker-1"), + }, + }, + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), + AddedToClusterTime: strptr("2023-06-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(2), + ClientSubnet: strptr("subnet-abc124"), + ClientVpcIpAddress: strptr("10.0.2.101"), + Endpoints: []string{"b-2.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-broker-2"), + }, + }, + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/controller-1"), + AddedToClusterTime: strptr("2023-06-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + ControllerNodeInfo: &types.ControllerNodeInfo{ + Endpoints: []string{"c-1.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com"}, + }, + }, + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/controller-2"), + AddedToClusterTime: strptr("2023-06-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + ControllerNodeInfo: &types.ControllerNodeInfo{ + Endpoints: []string{"c-2.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com"}, + }, + }, + }, + }, + }, + config: &MSKSDConfig{ + Region: "us-west-2", + Port: 80, + Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"}, + }, + expected: []*targetgroup.Group{ + { + Source: "us-west-2", + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("b-1.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("kraft-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("1.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:configuration/config/xyz"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("2"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.3.1"), + "__meta_msk_cluster_tag_Type": model.LabelValue("kraft"), + "__meta_msk_node_type": model.LabelValue("BROKER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + "__meta_msk_node_added_time": model.LabelValue("2023-06-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_node_attached_eni": model.LabelValue("eni-broker-1"), + "__meta_msk_broker_id": model.LabelValue("1"), + "__meta_msk_broker_client_subnet": model.LabelValue("subnet-abc123"), + "__meta_msk_broker_client_vpc_ip": model.LabelValue("10.0.2.100"), + "__meta_msk_broker_node_exporter_enabled": model.LabelValue("false"), + "__meta_msk_broker_endpoint_index": model.LabelValue("0"), + }, + { + model.AddressLabel: model.LabelValue("b-2.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("kraft-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("1.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:configuration/config/xyz"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("2"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.3.1"), + "__meta_msk_cluster_tag_Type": model.LabelValue("kraft"), + "__meta_msk_node_type": model.LabelValue("BROKER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), + "__meta_msk_node_added_time": model.LabelValue("2023-06-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_node_attached_eni": model.LabelValue("eni-broker-2"), + "__meta_msk_broker_id": model.LabelValue("2"), + "__meta_msk_broker_client_subnet": model.LabelValue("subnet-abc124"), + "__meta_msk_broker_client_vpc_ip": model.LabelValue("10.0.2.101"), + "__meta_msk_broker_node_exporter_enabled": model.LabelValue("false"), + "__meta_msk_broker_endpoint_index": model.LabelValue("0"), + }, + { + model.AddressLabel: model.LabelValue("c-1.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("kraft-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("1.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:configuration/config/xyz"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("2"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.3.1"), + "__meta_msk_cluster_tag_Type": model.LabelValue("kraft"), + "__meta_msk_node_type": model.LabelValue("CONTROLLER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:node/controller-1"), + "__meta_msk_node_added_time": model.LabelValue("2023-06-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_controller_endpoint_index": model.LabelValue("0"), + }, + { + model.AddressLabel: model.LabelValue("c-2.kraft-cluster.xyz789.kafka.us-west-2.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("kraft-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("1.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:configuration/config/xyz"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("2"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.3.1"), + "__meta_msk_cluster_tag_Type": model.LabelValue("kraft"), + "__meta_msk_node_type": model.LabelValue("CONTROLLER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-west-2:123456789012:node/controller-2"), + "__meta_msk_node_added_time": model.LabelValue("2023-06-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_controller_endpoint_index": model.LabelValue("0"), + }, + }, + }, + }, + }, + { + name: "NodesWithMultipleEndpointsUsingClustersConfig", + mskData: &mskDataStore{ + region: "us-east-1", + clusters: []types.Cluster{ + { + ClusterName: strptr("multi-endpoint-cluster"), + ClusterArn: strptr("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + State: types.ClusterStateActive, + ClusterType: types.ClusterTypeProvisioned, + CurrentVersion: strptr("2.0.0"), + Provisioned: &types.Provisioned{ + CurrentBrokerSoftwareInfo: &types.BrokerSoftwareInfo{ + ConfigurationArn: strptr("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + ConfigurationRevision: aws.Int64(1), + KafkaVersion: strptr("3.4.0"), + }, + OpenMonitoring: &types.OpenMonitoringInfo{ + Prometheus: &types.PrometheusInfo{ + JmxExporter: &types.JmxExporterInfo{ + EnabledInBroker: aws.Bool(true), + }, + NodeExporter: &types.NodeExporterInfo{ + EnabledInBroker: aws.Bool(true), + }, + }, + }, + }, + }, + }, + nodes: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999": { + { + NodeARN: strptr("arn:aws:kafka:us-east-1:123456789012:node/broker-multi"), + AddedToClusterTime: strptr("2023-08-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.xlarge"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(3), + ClientSubnet: strptr("subnet-multi-1"), + ClientVpcIpAddress: strptr("10.0.3.50"), + // Multiple endpoints for this broker + Endpoints: []string{"b-3-1.cluster.kafka.us-east-1.amazonaws.com", "b-3-2.cluster.kafka.us-east-1.amazonaws.com", "b-3-3.cluster.kafka.us-east-1.amazonaws.com"}, + AttachedENIId: strptr("eni-multi-broker"), + }, + }, + { + NodeARN: strptr("arn:aws:kafka:us-east-1:123456789012:node/controller-multi"), + AddedToClusterTime: strptr("2023-08-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + ControllerNodeInfo: &types.ControllerNodeInfo{ + // Multiple endpoints for this controller + Endpoints: []string{"c-1-1.cluster.kafka.us-east-1.amazonaws.com", "c-1-2.cluster.kafka.us-east-1.amazonaws.com", "c-1-3.cluster.kafka.us-east-1.amazonaws.com", "c-1-4.cluster.kafka.us-east-1.amazonaws.com"}, + }, + }, + }, + }, + }, + config: &MSKSDConfig{ + Region: "us-east-1", + Port: 80, + Clusters: []string{"arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"}, + }, + expected: []*targetgroup.Group{ + { + Source: "us-east-1", + Targets: []model.LabelSet{ + // Broker with 3 endpoints - creates 3 targets with different endpoint indices + { + model.AddressLabel: model.LabelValue("b-3-1.cluster.kafka.us-east-1.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("multi-endpoint-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("2.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.4.0"), + "__meta_msk_node_type": model.LabelValue("BROKER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:node/broker-multi"), + "__meta_msk_node_added_time": model.LabelValue("2023-08-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.xlarge"), + "__meta_msk_node_attached_eni": model.LabelValue("eni-multi-broker"), + "__meta_msk_broker_id": model.LabelValue("3"), + "__meta_msk_broker_client_subnet": model.LabelValue("subnet-multi-1"), + "__meta_msk_broker_client_vpc_ip": model.LabelValue("10.0.3.50"), + "__meta_msk_broker_node_exporter_enabled": model.LabelValue("true"), + "__meta_msk_broker_endpoint_index": model.LabelValue("0"), + }, + { + model.AddressLabel: model.LabelValue("b-3-2.cluster.kafka.us-east-1.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("multi-endpoint-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("2.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.4.0"), + "__meta_msk_node_type": model.LabelValue("BROKER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:node/broker-multi"), + "__meta_msk_node_added_time": model.LabelValue("2023-08-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.xlarge"), + "__meta_msk_node_attached_eni": model.LabelValue("eni-multi-broker"), + "__meta_msk_broker_id": model.LabelValue("3"), + "__meta_msk_broker_client_subnet": model.LabelValue("subnet-multi-1"), + "__meta_msk_broker_client_vpc_ip": model.LabelValue("10.0.3.50"), + "__meta_msk_broker_node_exporter_enabled": model.LabelValue("true"), + "__meta_msk_broker_endpoint_index": model.LabelValue("1"), + }, + { + model.AddressLabel: model.LabelValue("b-3-3.cluster.kafka.us-east-1.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("multi-endpoint-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("2.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.4.0"), + "__meta_msk_node_type": model.LabelValue("BROKER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:node/broker-multi"), + "__meta_msk_node_added_time": model.LabelValue("2023-08-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.xlarge"), + "__meta_msk_node_attached_eni": model.LabelValue("eni-multi-broker"), + "__meta_msk_broker_id": model.LabelValue("3"), + "__meta_msk_broker_client_subnet": model.LabelValue("subnet-multi-1"), + "__meta_msk_broker_client_vpc_ip": model.LabelValue("10.0.3.50"), + "__meta_msk_broker_node_exporter_enabled": model.LabelValue("true"), + "__meta_msk_broker_endpoint_index": model.LabelValue("2"), + }, + // Controller with 4 endpoints - creates 4 targets with different endpoint indices + { + model.AddressLabel: model.LabelValue("c-1-1.cluster.kafka.us-east-1.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("multi-endpoint-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("2.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.4.0"), + "__meta_msk_node_type": model.LabelValue("CONTROLLER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:node/controller-multi"), + "__meta_msk_node_added_time": model.LabelValue("2023-08-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_controller_endpoint_index": model.LabelValue("0"), + }, + { + model.AddressLabel: model.LabelValue("c-1-2.cluster.kafka.us-east-1.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("multi-endpoint-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("2.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.4.0"), + "__meta_msk_node_type": model.LabelValue("CONTROLLER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:node/controller-multi"), + "__meta_msk_node_added_time": model.LabelValue("2023-08-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_controller_endpoint_index": model.LabelValue("1"), + }, + { + model.AddressLabel: model.LabelValue("c-1-3.cluster.kafka.us-east-1.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("multi-endpoint-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("2.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.4.0"), + "__meta_msk_node_type": model.LabelValue("CONTROLLER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:node/controller-multi"), + "__meta_msk_node_added_time": model.LabelValue("2023-08-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_controller_endpoint_index": model.LabelValue("2"), + }, + { + model.AddressLabel: model.LabelValue("c-1-4.cluster.kafka.us-east-1.amazonaws.com:80"), + "__meta_msk_cluster_name": model.LabelValue("multi-endpoint-cluster"), + "__meta_msk_cluster_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"), + "__meta_msk_cluster_state": model.LabelValue("ACTIVE"), + "__meta_msk_cluster_type": model.LabelValue("PROVISIONED"), + "__meta_msk_cluster_version": model.LabelValue("2.0.0"), + "__meta_msk_cluster_jmx_exporter_enabled": model.LabelValue("true"), + "__meta_msk_cluster_configuration_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:configuration/config/abc"), + "__meta_msk_cluster_configuration_revision": model.LabelValue("1"), + "__meta_msk_cluster_kafka_version": model.LabelValue("3.4.0"), + "__meta_msk_node_type": model.LabelValue("CONTROLLER"), + "__meta_msk_node_arn": model.LabelValue("arn:aws:kafka:us-east-1:123456789012:node/controller-multi"), + "__meta_msk_node_added_time": model.LabelValue("2023-08-01T00:00:00Z"), + "__meta_msk_node_instance_type": model.LabelValue("kafka.m5.large"), + "__meta_msk_controller_endpoint_index": model.LabelValue("3"), + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := newMockMSKClient(tt.mskData) + + config := tt.config + if config == nil { + // Default config for backward compatibility + config = &MSKSDConfig{ + Region: tt.mskData.region, + Port: 80, + } + } + + d := &MSKDiscovery{ + msk: client, + cfg: config, + } + + groups, err := d.refresh(ctx) + require.NoError(t, err) + + // Sort targets within each group by address to handle non-deterministic ordering from goroutines + for _, group := range groups { + if len(group.Targets) > 0 { + sort.Slice(group.Targets, func(i, j int) bool { + return string(group.Targets[i][model.AddressLabel]) < string(group.Targets[j][model.AddressLabel]) + }) + } + } + for _, group := range tt.expected { + if len(group.Targets) > 0 { + sort.Slice(group.Targets, func(i, j int) bool { + return string(group.Targets[i][model.AddressLabel]) < string(group.Targets[j][model.AddressLabel]) + }) + } + } + + require.Equal(t, tt.expected, groups) + }) + } +} + +func TestNodeType(t *testing.T) { + tests := []struct { + name string + node types.NodeInfo + expected NodeType + }{ + { + name: "BrokerNode", + node: types.NodeInfo{ + BrokerNodeInfo: &types.BrokerNodeInfo{}, + }, + expected: NodeTypeBroker, + }, + { + name: "ControllerNode", + node: types.NodeInfo{ + ControllerNodeInfo: &types.ControllerNodeInfo{}, + }, + expected: NodeTypeController, + }, + { + name: "UnknownNode", + node: types.NodeInfo{}, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := nodeType(tt.node) + require.Equal(t, tt.expected, result) + }) + } +} + +// MSK client mock. +type mockMSKClient struct { + mskData mskDataStore +} + +func newMockMSKClient(mskData *mskDataStore) *mockMSKClient { + return &mockMSKClient{ + mskData: *mskData, + } +} + +func (m *mockMSKClient) DescribeClusterV2(_ context.Context, input *kafka.DescribeClusterV2Input, _ ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) { + inputARN := aws.ToString(input.ClusterArn) + for i := range m.mskData.clusters { + cluster := &m.mskData.clusters[i] + if aws.ToString(cluster.ClusterArn) == inputARN { + return &kafka.DescribeClusterV2Output{ + ClusterInfo: cluster, + }, nil + } + } + + return nil, fmt.Errorf("cluster not found: %s", inputARN) +} + +func (m *mockMSKClient) ListClustersV2(_ context.Context, input *kafka.ListClustersV2Input, _ ...func(*kafka.Options)) (*kafka.ListClustersV2Output, error) { + var clusters []types.Cluster + + for _, cluster := range m.mskData.clusters { + // Apply cluster name filter if specified + if input.ClusterNameFilter != nil && *input.ClusterNameFilter != "" { + if cluster.ClusterName != nil && *cluster.ClusterName != *input.ClusterNameFilter { + continue + } + } + + // Apply cluster type filter if specified + if input.ClusterTypeFilter != nil && *input.ClusterTypeFilter != "" { + if string(cluster.ClusterType) != *input.ClusterTypeFilter { + continue + } + } + + clusters = append(clusters, cluster) + } + + return &kafka.ListClustersV2Output{ + ClusterInfoList: clusters, + }, nil +} + +func (m *mockMSKClient) ListNodes(_ context.Context, input *kafka.ListNodesInput, _ ...func(*kafka.Options)) (*kafka.ListNodesOutput, error) { + clusterARN := aws.ToString(input.ClusterArn) + nodes, exists := m.mskData.nodes[clusterARN] + if !exists { + return &kafka.ListNodesOutput{ + NodeInfoList: nil, + }, nil + } + + return &kafka.ListNodesOutput{ + NodeInfoList: nodes, + }, nil +} diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 1f2f9931e8..bac04efb53 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -984,11 +984,56 @@ The following meta labels are available on targets during [relabeling](#relabel_ * `__meta_ecs_tag_task_`: each task tag value, keyed by tag name * `__meta_ecs_tag_ec2_`: each EC2 instance tag value, keyed by tag name (EC2 launch type only) +#### `msk` + +The `msk` role discovers targets from AWS MSK (Managed Streaming for Apache Kafka) provisioned clusters. + +**Important**: This service discovery only works with **provisioned clusters**. Serverless clusters are not supported as they do not expose individual broker nodes. + +Discovery includes: +- **Broker nodes**: Kafka broker instances (supports both ZooKeeper-based and KRaft-based clusters) +- **KRaft Controller nodes**: Controller instances (KRaft-based clusters only) + +Note: ZooKeeper nodes are not discoverable via the MSK API. For monitoring, MSK provides: +- **JMX Exporter**: Available on both broker and KRaft controller nodes (when enabled) +- **Node Exporter**: Available on broker nodes only (when enabled) + +The IAM credentials used must have the following permissions to discover +scrape targets: + +- `kafka:DescribeClusterV2` +- `kafka:ListClustersV2` +- `kafka:ListNodes` + +The following meta labels are available on targets during [relabeling](#relabel_config): + +* `__meta_msk_cluster_name`: the name of the MSK cluster +* `__meta_msk_cluster_arn`: the ARN of the MSK cluster +* `__meta_msk_cluster_state`: the state of the MSK cluster (e.g., ACTIVE, CREATING, DELETING) +* `__meta_msk_cluster_type`: the type of the MSK cluster (e.g., PROVISIONED, SERVERLESS) +* `__meta_msk_cluster_version`: the current version of the MSK cluster +* `__meta_msk_cluster_kafka_version`: the Kafka version running on the cluster +* `__meta_msk_cluster_jmx_exporter_enabled`: whether JMX exporter is enabled on the cluster +* `__meta_msk_cluster_configuration_arn`: the ARN of the MSK configuration +* `__meta_msk_cluster_configuration_revision`: the revision of the MSK configuration +* `__meta_msk_cluster_tag_`: each cluster tag value, keyed by tag name +* `__meta_msk_node_type`: the type of the node (BROKER or CONTROLLER) +* `__meta_msk_node_arn`: the ARN of the node +* `__meta_msk_node_added_time`: the time the node was added to the cluster +* `__meta_msk_node_instance_type`: the instance type of the node +* `__meta_msk_node_attached_eni`: the ID of the attached ENI +* `__meta_msk_broker_id`: the broker ID (broker nodes only) +* `__meta_msk_broker_endpoint_index`: the index of the broker endpoint (broker nodes only) +* `__meta_msk_broker_client_subnet`: the client subnet of the broker (broker nodes only) +* `__meta_msk_broker_client_vpc_ip`: the VPC IP address of the broker (broker nodes only) +* `__meta_msk_broker_node_exporter_enabled`: whether node exporter is enabled on brokers (broker nodes only) +* `__meta_msk_controller_endpoint_index`: the index of the controller endpoint (controller nodes only) + See below for the configuration options for AWS discovery: ```yaml # The AWS role to use for service discovery. -# Must be one of: ec2, lightsail, or ecs. +# Must be one of: ec2, lightsail, ecs, or msk. role: # The AWS region. If blank, the region from the instance metadata is used. @@ -1024,7 +1069,7 @@ filters: [ - name: values: , [...] ] -# List of ECS cluster ARNs to discover (ecs role only). If empty, all clusters in the region are discovered. +# List of ECS or MSK cluster ARNs (ecs and msk roles only) to discover. If empty, all clusters in the region are discovered. # This can significantly improve performance when you only need to monitor specific clusters. [ clusters: [, ...] ] diff --git a/go.mod b/go.mod index afc3f2740d..231d7af077 100644 --- a/go.mod +++ b/go.mod @@ -11,11 +11,12 @@ require ( github.com/KimMachineGun/automemlimit v0.7.5 github.com/alecthomas/kingpin/v2 v2.4.0 github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b - github.com/aws/aws-sdk-go-v2 v1.41.0 + github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.32.6 github.com/aws/aws-sdk-go-v2/credentials v1.19.6 github.com/aws/aws-sdk-go-v2/service/ec2 v1.279.0 github.com/aws/aws-sdk-go-v2/service/ecs v1.70.0 + github.com/aws/aws-sdk-go-v2/service/kafka v1.46.7 github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.10 github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 github.com/aws/smithy-go v1.24.0 @@ -127,8 +128,8 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 // indirect diff --git a/go.sum b/go.sum index 6ac2105275..a979afa128 100644 --- a/go.sum +++ b/go.sum @@ -47,18 +47,18 @@ github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJ github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= -github.com/aws/aws-sdk-go-v2 v1.41.0 h1:tNvqh1s+v0vFYdA1xq0aOJH+Y5cRyZ5upu6roPgPKd4= -github.com/aws/aws-sdk-go-v2 v1.41.0/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= +github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU= +github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= github.com/aws/aws-sdk-go-v2/config v1.32.6 h1:hFLBGUKjmLAekvi1evLi5hVvFQtSo3GYwi+Bx4lpJf8= github.com/aws/aws-sdk-go-v2/config v1.32.6/go.mod h1:lcUL/gcd8WyjCrMnxez5OXkO3/rwcNmvfno62tnXNcI= github.com/aws/aws-sdk-go-v2/credentials v1.19.6 h1:F9vWao2TwjV2MyiyVS+duza0NIRtAslgLUM0vTA1ZaE= github.com/aws/aws-sdk-go-v2/credentials v1.19.6/go.mod h1:SgHzKjEVsdQr6Opor0ihgWtkWdfRAIwxYzSJ8O85VHY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16 h1:80+uETIWS1BqjnN9uJ0dBUaETh+P1XwFy5vwHwK5r9k= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.16/go.mod h1:wOOsYuxYuB/7FlnVtzeBYRcjSRtQpAW0hCP7tIULMwo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16 h1:rgGwPzb82iBYSvHMHXc8h9mRoOUBZIGFgKb9qniaZZc= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.16/go.mod h1:L/UxsGeKpGoIj6DxfhOWHWQ/kGKcd4I1VncE4++IyKA= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16 h1:1jtGzuV7c82xnqOVfx2F0xmJcOw5374L7N6juGW6x6U= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.16/go.mod h1:M2E5OQf+XLe+SZGmmpaI2yy+J326aFf6/+54PoxSANc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 h1:xOLELNKGp2vsiteLsvLPwxC+mYmO6OZ8PYgiuPJzF8U= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17/go.mod h1:5M5CI3D12dNOtH3/mk6minaRwI2/37ifCURZISxA/IQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 h1:WWLqlh79iO48yLkj1v3ISRNiv+3KdQoZ6JWyfcsyQik= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17/go.mod h1:EhG22vHRrvF8oXSTYStZhJc1aUgKtnJe+aOiFEV90cM= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= github.com/aws/aws-sdk-go-v2/service/ec2 v1.279.0 h1:o7eJKe6VYAnqERPlLAvDW5VKXV6eTKv1oxTpMoDP378= @@ -69,6 +69,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEd github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16 h1:oHjJHeUy0ImIV0bsrX0X91GkV5nJAyv1l1CC9lnO0TI= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.16/go.mod h1:iRSNGgOYmiYwSCXxXaKb9HfOEj40+oTKn8pTxMlYkRM= +github.com/aws/aws-sdk-go-v2/service/kafka v1.46.7 h1:0jDb9b505gbCmtjH1RT7kx8hDbVDzOhnTeZm7dzskpQ= +github.com/aws/aws-sdk-go-v2/service/kafka v1.46.7/go.mod h1:tWnHS64fg5ydLHivFlCAtEh/1iMNzr56QsH3F+UTwD4= github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.10 h1:MQuZZ6Tq1qQabPlkVxrCMdyVl70Ogl4AERZKo+y9Wzo= github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.10/go.mod h1:U5C3JME1ibKESmpzBAqlRpTYZfVbTqrb5ICJm+sVVd8= github.com/aws/aws-sdk-go-v2/service/signin v1.0.4 h1:HpI7aMmJ+mm1wkSHIA2t5EaFFv5EFYXePW30p1EIrbQ=