From 92c332bd56dc7b63aaa00b486a08b20a50549950 Mon Sep 17 00:00:00 2001 From: Devesh Kumar Date: Mon, 8 Sep 2025 22:26:55 +0200 Subject: [PATCH 1/2] feat(kubernetes): add support for ServiceExternalTrafficPolicyTypeLocal - Implement node filtering for Local external traffic policy in HTTP, TCP, and UDP services - Filter nodes to only include those with ready endpoints when externalTrafficPolicy is Local - Add EndpointSlice integration to determine node readiness for Local traffic policy - Support multi-protocol NodePort services with Local external traffic policy - Add comprehensive test cases covering HTTP, TCP, and UDP scenarios - Ensure non-ready endpoints are excluded from load balancer configuration This enables proper load balancing behavior for Kubernetes services with externalTrafficPolicy set to Local, ensuring traffic is only routed to nodes that have ready endpoints for the service. --- ...e_port_service_lb_local_traffic_policy.yml | 92 ++++++++++++ ...e_port_service_lb_local_traffic_policy.yml | 91 ++++++++++++ ...with_node_port_lb_local_traffic_policy.yml | 93 ++++++++++++ .../kubernetes/crd/kubernetes_http.go | 33 ++++- pkg/provider/kubernetes/crd/kubernetes_tcp.go | 32 ++++- .../kubernetes/crd/kubernetes_test.go | 133 ++++++++++++++++++ pkg/provider/kubernetes/crd/kubernetes_udp.go | 33 ++++- 7 files changed, 498 insertions(+), 9 deletions(-) create mode 100644 pkg/provider/kubernetes/crd/fixtures/tcp/with_node_port_service_lb_local_traffic_policy.yml create mode 100644 pkg/provider/kubernetes/crd/fixtures/udp/with_node_port_service_lb_local_traffic_policy.yml create mode 100644 pkg/provider/kubernetes/crd/fixtures/with_node_port_lb_local_traffic_policy.yml diff --git a/pkg/provider/kubernetes/crd/fixtures/tcp/with_node_port_service_lb_local_traffic_policy.yml b/pkg/provider/kubernetes/crd/fixtures/tcp/with_node_port_service_lb_local_traffic_policy.yml new file mode 100644 index 000000000..9668bab8c --- /dev/null +++ b/pkg/provider/kubernetes/crd/fixtures/tcp/with_node_port_service_lb_local_traffic_policy.yml @@ -0,0 +1,92 @@ +apiVersion: traefik.io/v1alpha1 +kind: IngressRouteTCP +metadata: + name: test.route + namespace: default + +spec: + entryPoints: + - foo + + routes: + - match: HostSNI(`foo.com`) + services: + - name: nodeport-svc-tcp-local + port: 8000 + nodePortLB: true + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-1 +status: + addresses: + - type: InternalIP + address: 172.16.4.4 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-2 +status: + addresses: + - type: InternalIP + address: 172.16.4.5 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-3 +status: + addresses: + - type: InternalIP + address: 172.16.4.6 + +--- +apiVersion: v1 +kind: Service +metadata: + name: nodeport-svc-tcp-local + namespace: default + +spec: + ports: + - name: tcp + port: 8000 + nodePort: 32456 + type: NodePort + externalTrafficPolicy: Local + clusterIP: 10.10.0.1 + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: nodeport-svc-tcp-local-abc + namespace: default + labels: + kubernetes.io/service-name: nodeport-svc-tcp-local + +addressType: IPv4 +ports: + - name: tcp + port: 8000 +endpoints: + - addresses: + - 10.10.0.10 + conditions: + ready: true + nodeName: traefik-node-1 + - addresses: + - 10.10.0.11 + conditions: + ready: true + nodeName: traefik-node-2 + - addresses: + - 10.10.0.12 + conditions: + ready: false + nodeName: traefik-node-3 diff --git a/pkg/provider/kubernetes/crd/fixtures/udp/with_node_port_service_lb_local_traffic_policy.yml b/pkg/provider/kubernetes/crd/fixtures/udp/with_node_port_service_lb_local_traffic_policy.yml new file mode 100644 index 000000000..87dee0240 --- /dev/null +++ b/pkg/provider/kubernetes/crd/fixtures/udp/with_node_port_service_lb_local_traffic_policy.yml @@ -0,0 +1,91 @@ +apiVersion: traefik.io/v1alpha1 +kind: IngressRouteUDP +metadata: + name: test.route + namespace: default + +spec: + entryPoints: + - foo + + routes: + - services: + - name: nodeport-svc-udp-local + port: 8000 + nodePortLB: true + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-1 +status: + addresses: + - type: InternalIP + address: 172.16.4.4 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-2 +status: + addresses: + - type: InternalIP + address: 172.16.4.5 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-3 +status: + addresses: + - type: InternalIP + address: 172.16.4.6 + +--- +apiVersion: v1 +kind: Service +metadata: + name: nodeport-svc-udp-local + namespace: default + +spec: + ports: + - name: udp + port: 8000 + nodePort: 32456 + type: NodePort + externalTrafficPolicy: Local + clusterIP: 10.10.0.1 + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: nodeport-svc-udp-local-abc + namespace: default + labels: + kubernetes.io/service-name: nodeport-svc-udp-local + +addressType: IPv4 +ports: + - name: udp + port: 8000 +endpoints: + - addresses: + - 10.10.0.10 + conditions: + ready: true + nodeName: traefik-node-1 + - addresses: + - 10.10.0.11 + conditions: + ready: true + nodeName: traefik-node-2 + - addresses: + - 10.10.0.12 + conditions: + ready: false + nodeName: traefik-node-3 diff --git a/pkg/provider/kubernetes/crd/fixtures/with_node_port_lb_local_traffic_policy.yml b/pkg/provider/kubernetes/crd/fixtures/with_node_port_lb_local_traffic_policy.yml new file mode 100644 index 000000000..4a6393958 --- /dev/null +++ b/pkg/provider/kubernetes/crd/fixtures/with_node_port_lb_local_traffic_policy.yml @@ -0,0 +1,93 @@ +apiVersion: traefik.io/v1alpha1 +kind: IngressRoute +metadata: + name: test.route + namespace: default + +spec: + entryPoints: + - foo + + routes: + - match: Host(`foo.com`) + kind: Rule + services: + - name: nodeport-svc-local + port: 80 + nodePortLB: true + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-1 +status: + addresses: + - type: InternalIP + address: 172.16.4.4 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-2 +status: + addresses: + - type: InternalIP + address: 172.16.4.5 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-3 +status: + addresses: + - type: InternalIP + address: 172.16.4.6 + +--- +apiVersion: v1 +kind: Service +metadata: + name: nodeport-svc-local + namespace: default + +spec: + ports: + - name: web + port: 80 + nodePort: 32456 + type: NodePort + externalTrafficPolicy: Local + clusterIP: 10.10.0.1 + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: nodeport-svc-local-abc + namespace: default + labels: + kubernetes.io/service-name: nodeport-svc-local + +addressType: IPv4 +ports: + - name: web + port: 80 +endpoints: + - addresses: + - 10.10.0.10 + conditions: + ready: true + nodeName: traefik-node-1 + - addresses: + - 10.10.0.11 + conditions: + ready: true + nodeName: traefik-node-2 + - addresses: + - 10.10.0.12 + conditions: + ready: false + nodeName: traefik-node-3 diff --git a/pkg/provider/kubernetes/crd/kubernetes_http.go b/pkg/provider/kubernetes/crd/kubernetes_http.go index 26f8b1bfd..9217a4192 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_http.go +++ b/pkg/provider/kubernetes/crd/kubernetes_http.go @@ -586,11 +586,11 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L return nil, errors.New("nodes lookup is disabled") } - nodes, nodesExists, nodesErr := c.client.GetNodes() + allnodes, nodesExists, nodesErr := c.client.GetNodes() if nodesErr != nil { return nil, nodesErr } - if !nodesExists || len(nodes) == 0 { + if !nodesExists || len(allnodes) == 0 { return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", namespace, sanitizedName) } @@ -599,7 +599,34 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L return nil, err } - for _, node := range nodes { + filteredNodes := allnodes + // if service traffic policy is Local, then only use the nodes where pod is running + if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + filteredNodes = make([]*corev1.Node, 0) + endpointSlices, err := c.client.GetEndpointSlicesForService(namespace, sanitizedName) + if err != nil { + return nil, fmt.Errorf("getting endpointslices: %w", err) + } + + for _, endpointSlice := range endpointSlices { + for _, endpoint := range endpointSlice.Endpoints { + // if endpoint is ready, take the node name filter the nodes + if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + continue + } + // filter nodes by node name + for _, node := range allnodes { + if node.Name == *endpoint.NodeName { + filteredNodes = append(filteredNodes, node) + break + } + } + } + + } + } + + for _, node := range filteredNodes { for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { hostPort := net.JoinHostPort(addr.Address, strconv.Itoa(int(svcPort.NodePort))) diff --git a/pkg/provider/kubernetes/crd/kubernetes_tcp.go b/pkg/provider/kubernetes/crd/kubernetes_tcp.go index bbe48f638..9f503b72c 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_tcp.go +++ b/pkg/provider/kubernetes/crd/kubernetes_tcp.go @@ -239,16 +239,42 @@ func (p *Provider) loadTCPServers(client Client, namespace string, svc traefikv1 return nil, errors.New("nodes lookup is disabled") } - nodes, nodesExists, nodesErr := client.GetNodes() + allnodes, nodesExists, nodesErr := client.GetNodes() if nodesErr != nil { return nil, nodesErr } - if !nodesExists || len(nodes) == 0 { + if !nodesExists || len(allnodes) == 0 { return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", svc.Namespace, svc.Name) } + filteredNodes := allnodes + // if service traffic policy is Local, then only use the nodes where pod is running + if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + filteredNodes = make([]*corev1.Node, 0) + endpointSlices, err := client.GetEndpointSlicesForService(namespace, svc.Name) + if err != nil { + return nil, fmt.Errorf("getting endpointslices: %w", err) + } - for _, node := range nodes { + for _, endpointSlice := range endpointSlices { + for _, endpoint := range endpointSlice.Endpoints { + // if endpoint is ready, take the node name filter the nodes + if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + continue + } + // filter nodes by node name + for _, node := range allnodes { + if node.Name == *endpoint.NodeName { + filteredNodes = append(filteredNodes, node) + break + } + } + } + + } + } + + for _, node := range filteredNodes { for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { servers = append(servers, dynamic.TCPServer{ diff --git a/pkg/provider/kubernetes/crd/kubernetes_test.go b/pkg/provider/kubernetes/crd/kubernetes_test.go index eac9b5fc1..82e8fa1c1 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_test.go +++ b/pkg/provider/kubernetes/crd/kubernetes_test.go @@ -8471,6 +8471,96 @@ func TestNodePortLB(t *testing.T) { TLS: &dynamic.TLSConfiguration{}, }, }, + { + desc: "HTTP with node port LB and Local external traffic policy", + paths: []string{"services.yml", "with_node_port_lb_local_traffic_policy.yml"}, + expected: &dynamic.Configuration{ + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + TCP: &dynamic.TCPConfiguration{ + ServersTransports: map[string]*dynamic.TCPServersTransport{}, + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + }, + HTTP: &dynamic.HTTPConfiguration{ + ServersTransports: map[string]*dynamic.ServersTransport{}, + Routers: map[string]*dynamic.Router{ + "default-test-route-6f97418635c7e18853da": { + EntryPoints: []string{"foo"}, + Service: "default-test-route-6f97418635c7e18853da", + Rule: "Host(`foo.com`)", + Priority: 0, + }, + }, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{ + "default-test-route-6f97418635c7e18853da": { + LoadBalancer: &dynamic.ServersLoadBalancer{ + Strategy: dynamic.BalancerStrategyWRR, + ResponseForwarding: &dynamic.ResponseForwarding{FlushInterval: dynamic.DefaultFlushInterval}, + Servers: []dynamic.Server{ + { + URL: "http://172.16.4.4:32456", + }, + { + URL: "http://172.16.4.5:32456", + }, + }, + PassHostHeader: pointer(true), + }, + }, + }, + }, + TLS: &dynamic.TLSConfiguration{}, + }, + }, + { + desc: "TCP with node port LB and Local external traffic policy", + paths: []string{"tcp/services.yml", "tcp/with_node_port_service_lb_local_traffic_policy.yml"}, + expected: &dynamic.Configuration{ + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + HTTP: &dynamic.HTTPConfiguration{ + ServersTransports: map[string]*dynamic.ServersTransport{}, + Routers: map[string]*dynamic.Router{}, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{}, + }, + TCP: &dynamic.TCPConfiguration{ + ServersTransports: map[string]*dynamic.TCPServersTransport{}, + Routers: map[string]*dynamic.TCPRouter{ + "default-test.route-fdd3e9338e47a45efefc": { + EntryPoints: []string{"foo"}, + Service: "default-test.route-fdd3e9338e47a45efefc", + Rule: "HostSNI(`foo.com`)", + }, + }, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{ + "default-test.route-fdd3e9338e47a45efefc": { + LoadBalancer: &dynamic.TCPServersLoadBalancer{ + Servers: []dynamic.TCPServer{ + { + Address: "172.16.4.4:32456", + Port: "", + }, + { + Address: "172.16.4.5:32456", + Port: "", + }, + }, + }, + }, + }, + }, + TLS: &dynamic.TLSConfiguration{}, + }, + }, { desc: "TCP with native Service LB, cluster scope resources disabled", @@ -8502,6 +8592,49 @@ func TestNodePortLB(t *testing.T) { TLS: &dynamic.TLSConfiguration{}, }, }, + { + desc: "UDP with node port LB and Local external traffic policy", + paths: []string{"udp/services.yml", "udp/with_node_port_service_lb_local_traffic_policy.yml"}, + expected: &dynamic.Configuration{ + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{ + "default-test.route-0": { + EntryPoints: []string{"foo"}, + Service: "default-test.route-0", + }, + }, + Services: map[string]*dynamic.UDPService{ + "default-test.route-0": { + LoadBalancer: &dynamic.UDPServersLoadBalancer{ + Servers: []dynamic.UDPServer{ + { + Address: "172.16.4.4:32456", + Port: "", + }, + { + Address: "172.16.4.5:32456", + Port: "", + }, + }, + }, + }, + }, + }, + HTTP: &dynamic.HTTPConfiguration{ + ServersTransports: map[string]*dynamic.ServersTransport{}, + Routers: map[string]*dynamic.Router{}, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{}, + }, + TCP: &dynamic.TCPConfiguration{ + ServersTransports: map[string]*dynamic.TCPServersTransport{}, + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + }, + TLS: &dynamic.TLSConfiguration{}, + }, + }, { desc: "UDP with native Service LB, cluster scope resources disabled", paths: []string{"udp/services.yml", "udp/with_node_port_service_lb.yml"}, diff --git a/pkg/provider/kubernetes/crd/kubernetes_udp.go b/pkg/provider/kubernetes/crd/kubernetes_udp.go index 5b0e01e78..a28656ba4 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_udp.go +++ b/pkg/provider/kubernetes/crd/kubernetes_udp.go @@ -127,16 +127,43 @@ func (p *Provider) loadUDPServers(client Client, namespace string, svc traefikv1 return nil, errors.New("nodes lookup is disabled") } - nodes, nodesExists, nodesErr := client.GetNodes() + allnodes, nodesExists, nodesErr := client.GetNodes() if nodesErr != nil { return nil, nodesErr } - if !nodesExists || len(nodes) == 0 { + if !nodesExists || len(allnodes) == 0 { return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", svc.Namespace, svc.Name) } - for _, node := range nodes { + filteredNodes := allnodes + // if service traffic policy is Local, then only use the nodes where pod is running + if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + filteredNodes = make([]*corev1.Node, 0) + endpointSlices, err := client.GetEndpointSlicesForService(namespace, svc.Name) + if err != nil { + return nil, fmt.Errorf("getting endpointslices: %w", err) + } + + for _, endpointSlice := range endpointSlices { + for _, endpoint := range endpointSlice.Endpoints { + // if endpoint is ready, take the node name filter the nodes + if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + continue + } + // filter nodes by node name + for _, node := range allnodes { + if node.Name == *endpoint.NodeName { + filteredNodes = append(filteredNodes, node) + break + } + } + } + + } + } + + for _, node := range filteredNodes { for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { servers = append(servers, dynamic.UDPServer{ From 2ee813a7f4562d1a5996c7cd72a54dfef32fc847 Mon Sep 17 00:00:00 2001 From: Devesh Kumar Date: Sat, 20 Sep 2025 09:42:35 +0200 Subject: [PATCH 2/2] fix: fixed lint issues --- pkg/provider/kubernetes/crd/kubernetes_http.go | 1 - pkg/provider/kubernetes/crd/kubernetes_tcp.go | 1 - pkg/provider/kubernetes/crd/kubernetes_udp.go | 1 - 3 files changed, 3 deletions(-) diff --git a/pkg/provider/kubernetes/crd/kubernetes_http.go b/pkg/provider/kubernetes/crd/kubernetes_http.go index 9217a4192..cf1414408 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_http.go +++ b/pkg/provider/kubernetes/crd/kubernetes_http.go @@ -622,7 +622,6 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L } } } - } } diff --git a/pkg/provider/kubernetes/crd/kubernetes_tcp.go b/pkg/provider/kubernetes/crd/kubernetes_tcp.go index 9f503b72c..940997f39 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_tcp.go +++ b/pkg/provider/kubernetes/crd/kubernetes_tcp.go @@ -270,7 +270,6 @@ func (p *Provider) loadTCPServers(client Client, namespace string, svc traefikv1 } } } - } } diff --git a/pkg/provider/kubernetes/crd/kubernetes_udp.go b/pkg/provider/kubernetes/crd/kubernetes_udp.go index a28656ba4..5e0a09abb 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_udp.go +++ b/pkg/provider/kubernetes/crd/kubernetes_udp.go @@ -159,7 +159,6 @@ func (p *Provider) loadUDPServers(client Client, namespace string, svc traefikv1 } } } - } }