This commit is contained in:
Devesh Kumar 2026-01-30 10:54:21 +01:00 committed by GitHub
commit b189bab1a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 495 additions and 9 deletions

View file

@ -0,0 +1,92 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
name: test.route
namespace: default
spec:
entryPoints:
- foo
routes:
- match: HostSNI(`foo.com`)
services:
- name: nodeport-svc-tcp-local
port: 8000
nodePortLB: true
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-1
status:
addresses:
- type: InternalIP
address: 172.16.4.4
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-2
status:
addresses:
- type: InternalIP
address: 172.16.4.5
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-3
status:
addresses:
- type: InternalIP
address: 172.16.4.6
---
apiVersion: v1
kind: Service
metadata:
name: nodeport-svc-tcp-local
namespace: default
spec:
ports:
- name: tcp
port: 8000
nodePort: 32456
type: NodePort
externalTrafficPolicy: Local
clusterIP: 10.10.0.1
---
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: nodeport-svc-tcp-local-abc
namespace: default
labels:
kubernetes.io/service-name: nodeport-svc-tcp-local
addressType: IPv4
ports:
- name: tcp
port: 8000
endpoints:
- addresses:
- 10.10.0.10
conditions:
ready: true
nodeName: traefik-node-1
- addresses:
- 10.10.0.11
conditions:
ready: true
nodeName: traefik-node-2
- addresses:
- 10.10.0.12
conditions:
ready: false
nodeName: traefik-node-3

View file

@ -0,0 +1,91 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRouteUDP
metadata:
name: test.route
namespace: default
spec:
entryPoints:
- foo
routes:
- services:
- name: nodeport-svc-udp-local
port: 8000
nodePortLB: true
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-1
status:
addresses:
- type: InternalIP
address: 172.16.4.4
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-2
status:
addresses:
- type: InternalIP
address: 172.16.4.5
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-3
status:
addresses:
- type: InternalIP
address: 172.16.4.6
---
apiVersion: v1
kind: Service
metadata:
name: nodeport-svc-udp-local
namespace: default
spec:
ports:
- name: udp
port: 8000
nodePort: 32456
type: NodePort
externalTrafficPolicy: Local
clusterIP: 10.10.0.1
---
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: nodeport-svc-udp-local-abc
namespace: default
labels:
kubernetes.io/service-name: nodeport-svc-udp-local
addressType: IPv4
ports:
- name: udp
port: 8000
endpoints:
- addresses:
- 10.10.0.10
conditions:
ready: true
nodeName: traefik-node-1
- addresses:
- 10.10.0.11
conditions:
ready: true
nodeName: traefik-node-2
- addresses:
- 10.10.0.12
conditions:
ready: false
nodeName: traefik-node-3

View file

@ -0,0 +1,93 @@
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: test.route
namespace: default
spec:
entryPoints:
- foo
routes:
- match: Host(`foo.com`)
kind: Rule
services:
- name: nodeport-svc-local
port: 80
nodePortLB: true
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-1
status:
addresses:
- type: InternalIP
address: 172.16.4.4
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-2
status:
addresses:
- type: InternalIP
address: 172.16.4.5
---
kind: Node
apiVersion: v1
metadata:
name: traefik-node-3
status:
addresses:
- type: InternalIP
address: 172.16.4.6
---
apiVersion: v1
kind: Service
metadata:
name: nodeport-svc-local
namespace: default
spec:
ports:
- name: web
port: 80
nodePort: 32456
type: NodePort
externalTrafficPolicy: Local
clusterIP: 10.10.0.1
---
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: nodeport-svc-local-abc
namespace: default
labels:
kubernetes.io/service-name: nodeport-svc-local
addressType: IPv4
ports:
- name: web
port: 80
endpoints:
- addresses:
- 10.10.0.10
conditions:
ready: true
nodeName: traefik-node-1
- addresses:
- 10.10.0.11
conditions:
ready: true
nodeName: traefik-node-2
- addresses:
- 10.10.0.12
conditions:
ready: false
nodeName: traefik-node-3

View file

@ -598,11 +598,11 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L
return nil, errors.New("nodes lookup is disabled")
}
nodes, nodesExists, nodesErr := c.client.GetNodes()
allnodes, nodesExists, nodesErr := c.client.GetNodes()
if nodesErr != nil {
return nil, nodesErr
}
if !nodesExists || len(nodes) == 0 {
if !nodesExists || len(allnodes) == 0 {
return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", namespace, sanitizedName)
}
@ -611,7 +611,33 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L
return nil, err
}
for _, node := range nodes {
filteredNodes := allnodes
// if service traffic policy is Local, then only use the nodes where pod is running
if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
filteredNodes = make([]*corev1.Node, 0)
endpointSlices, err := c.client.GetEndpointSlicesForService(namespace, sanitizedName)
if err != nil {
return nil, fmt.Errorf("getting endpointslices: %w", err)
}
for _, endpointSlice := range endpointSlices {
for _, endpoint := range endpointSlice.Endpoints {
// if endpoint is ready, take the node name filter the nodes
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
continue
}
// filter nodes by node name
for _, node := range allnodes {
if node.Name == *endpoint.NodeName {
filteredNodes = append(filteredNodes, node)
break
}
}
}
}
}
for _, node := range filteredNodes {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
hostPort := net.JoinHostPort(addr.Address, strconv.Itoa(int(svcPort.NodePort)))

View file

@ -243,16 +243,41 @@ func (p *Provider) loadTCPServers(client Client, namespace string, svc traefikv1
return nil, errors.New("nodes lookup is disabled")
}
nodes, nodesExists, nodesErr := client.GetNodes()
allnodes, nodesExists, nodesErr := client.GetNodes()
if nodesErr != nil {
return nil, nodesErr
}
if !nodesExists || len(nodes) == 0 {
if !nodesExists || len(allnodes) == 0 {
return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", svc.Namespace, svc.Name)
}
filteredNodes := allnodes
// if service traffic policy is Local, then only use the nodes where pod is running
if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
filteredNodes = make([]*corev1.Node, 0)
endpointSlices, err := client.GetEndpointSlicesForService(namespace, svc.Name)
if err != nil {
return nil, fmt.Errorf("getting endpointslices: %w", err)
}
for _, node := range nodes {
for _, endpointSlice := range endpointSlices {
for _, endpoint := range endpointSlice.Endpoints {
// if endpoint is ready, take the node name filter the nodes
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
continue
}
// filter nodes by node name
for _, node := range allnodes {
if node.Name == *endpoint.NodeName {
filteredNodes = append(filteredNodes, node)
break
}
}
}
}
}
for _, node := range filteredNodes {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
servers = append(servers, dynamic.TCPServer{

View file

@ -8941,6 +8941,96 @@ func TestNodePortLB(t *testing.T) {
TLS: &dynamic.TLSConfiguration{},
},
},
{
desc: "HTTP with node port LB and Local external traffic policy",
paths: []string{"services.yml", "with_node_port_lb_local_traffic_policy.yml"},
expected: &dynamic.Configuration{
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
TCP: &dynamic.TCPConfiguration{
ServersTransports: map[string]*dynamic.TCPServersTransport{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
HTTP: &dynamic.HTTPConfiguration{
ServersTransports: map[string]*dynamic.ServersTransport{},
Routers: map[string]*dynamic.Router{
"default-test-route-6f97418635c7e18853da": {
EntryPoints: []string{"foo"},
Service: "default-test-route-6f97418635c7e18853da",
Rule: "Host(`foo.com`)",
Priority: 0,
},
},
Middlewares: map[string]*dynamic.Middleware{},
Services: map[string]*dynamic.Service{
"default-test-route-6f97418635c7e18853da": {
LoadBalancer: &dynamic.ServersLoadBalancer{
Strategy: dynamic.BalancerStrategyWRR,
ResponseForwarding: &dynamic.ResponseForwarding{FlushInterval: dynamic.DefaultFlushInterval},
Servers: []dynamic.Server{
{
URL: "http://172.16.4.4:32456",
},
{
URL: "http://172.16.4.5:32456",
},
},
PassHostHeader: pointer(true),
},
},
},
},
TLS: &dynamic.TLSConfiguration{},
},
},
{
desc: "TCP with node port LB and Local external traffic policy",
paths: []string{"tcp/services.yml", "tcp/with_node_port_service_lb_local_traffic_policy.yml"},
expected: &dynamic.Configuration{
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
HTTP: &dynamic.HTTPConfiguration{
ServersTransports: map[string]*dynamic.ServersTransport{},
Routers: map[string]*dynamic.Router{},
Middlewares: map[string]*dynamic.Middleware{},
Services: map[string]*dynamic.Service{},
},
TCP: &dynamic.TCPConfiguration{
ServersTransports: map[string]*dynamic.TCPServersTransport{},
Routers: map[string]*dynamic.TCPRouter{
"default-test.route-fdd3e9338e47a45efefc": {
EntryPoints: []string{"foo"},
Service: "default-test.route-fdd3e9338e47a45efefc",
Rule: "HostSNI(`foo.com`)",
},
},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{
"default-test.route-fdd3e9338e47a45efefc": {
LoadBalancer: &dynamic.TCPServersLoadBalancer{
Servers: []dynamic.TCPServer{
{
Address: "172.16.4.4:32456",
Port: "",
},
{
Address: "172.16.4.5:32456",
Port: "",
},
},
},
},
},
},
TLS: &dynamic.TLSConfiguration{},
},
},
{
desc: "TCP with native Service LB, cluster scope resources disabled",
@ -8972,6 +9062,49 @@ func TestNodePortLB(t *testing.T) {
TLS: &dynamic.TLSConfiguration{},
},
},
{
desc: "UDP with node port LB and Local external traffic policy",
paths: []string{"udp/services.yml", "udp/with_node_port_service_lb_local_traffic_policy.yml"},
expected: &dynamic.Configuration{
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{
"default-test.route-0": {
EntryPoints: []string{"foo"},
Service: "default-test.route-0",
},
},
Services: map[string]*dynamic.UDPService{
"default-test.route-0": {
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{
Address: "172.16.4.4:32456",
Port: "",
},
{
Address: "172.16.4.5:32456",
Port: "",
},
},
},
},
},
},
HTTP: &dynamic.HTTPConfiguration{
ServersTransports: map[string]*dynamic.ServersTransport{},
Routers: map[string]*dynamic.Router{},
Middlewares: map[string]*dynamic.Middleware{},
Services: map[string]*dynamic.Service{},
},
TCP: &dynamic.TCPConfiguration{
ServersTransports: map[string]*dynamic.TCPServersTransport{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
},
TLS: &dynamic.TLSConfiguration{},
},
},
{
desc: "UDP with native Service LB, cluster scope resources disabled",
paths: []string{"udp/services.yml", "udp/with_node_port_service_lb.yml"},

View file

@ -131,16 +131,42 @@ func (p *Provider) loadUDPServers(client Client, namespace string, svc traefikv1
return nil, errors.New("nodes lookup is disabled")
}
nodes, nodesExists, nodesErr := client.GetNodes()
allnodes, nodesExists, nodesErr := client.GetNodes()
if nodesErr != nil {
return nil, nodesErr
}
if !nodesExists || len(nodes) == 0 {
if !nodesExists || len(allnodes) == 0 {
return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", svc.Namespace, svc.Name)
}
for _, node := range nodes {
filteredNodes := allnodes
// if service traffic policy is Local, then only use the nodes where pod is running
if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal {
filteredNodes = make([]*corev1.Node, 0)
endpointSlices, err := client.GetEndpointSlicesForService(namespace, svc.Name)
if err != nil {
return nil, fmt.Errorf("getting endpointslices: %w", err)
}
for _, endpointSlice := range endpointSlices {
for _, endpoint := range endpointSlice.Endpoints {
// if endpoint is ready, take the node name filter the nodes
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
continue
}
// filter nodes by node name
for _, node := range allnodes {
if node.Name == *endpoint.NodeName {
filteredNodes = append(filteredNodes, node)
break
}
}
}
}
}
for _, node := range filteredNodes {
for _, addr := range node.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
servers = append(servers, dynamic.UDPServer{