diff --git a/pkg/provider/kubernetes/crd/fixtures/tcp/with_node_port_service_lb_local_traffic_policy.yml b/pkg/provider/kubernetes/crd/fixtures/tcp/with_node_port_service_lb_local_traffic_policy.yml new file mode 100644 index 000000000..9668bab8c --- /dev/null +++ b/pkg/provider/kubernetes/crd/fixtures/tcp/with_node_port_service_lb_local_traffic_policy.yml @@ -0,0 +1,92 @@ +apiVersion: traefik.io/v1alpha1 +kind: IngressRouteTCP +metadata: + name: test.route + namespace: default + +spec: + entryPoints: + - foo + + routes: + - match: HostSNI(`foo.com`) + services: + - name: nodeport-svc-tcp-local + port: 8000 + nodePortLB: true + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-1 +status: + addresses: + - type: InternalIP + address: 172.16.4.4 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-2 +status: + addresses: + - type: InternalIP + address: 172.16.4.5 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-3 +status: + addresses: + - type: InternalIP + address: 172.16.4.6 + +--- +apiVersion: v1 +kind: Service +metadata: + name: nodeport-svc-tcp-local + namespace: default + +spec: + ports: + - name: tcp + port: 8000 + nodePort: 32456 + type: NodePort + externalTrafficPolicy: Local + clusterIP: 10.10.0.1 + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: nodeport-svc-tcp-local-abc + namespace: default + labels: + kubernetes.io/service-name: nodeport-svc-tcp-local + +addressType: IPv4 +ports: + - name: tcp + port: 8000 +endpoints: + - addresses: + - 10.10.0.10 + conditions: + ready: true + nodeName: traefik-node-1 + - addresses: + - 10.10.0.11 + conditions: + ready: true + nodeName: traefik-node-2 + - addresses: + - 10.10.0.12 + conditions: + ready: false + nodeName: traefik-node-3 diff --git a/pkg/provider/kubernetes/crd/fixtures/udp/with_node_port_service_lb_local_traffic_policy.yml b/pkg/provider/kubernetes/crd/fixtures/udp/with_node_port_service_lb_local_traffic_policy.yml new file mode 100644 index 000000000..87dee0240 --- /dev/null +++ b/pkg/provider/kubernetes/crd/fixtures/udp/with_node_port_service_lb_local_traffic_policy.yml @@ -0,0 +1,91 @@ +apiVersion: traefik.io/v1alpha1 +kind: IngressRouteUDP +metadata: + name: test.route + namespace: default + +spec: + entryPoints: + - foo + + routes: + - services: + - name: nodeport-svc-udp-local + port: 8000 + nodePortLB: true + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-1 +status: + addresses: + - type: InternalIP + address: 172.16.4.4 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-2 +status: + addresses: + - type: InternalIP + address: 172.16.4.5 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-3 +status: + addresses: + - type: InternalIP + address: 172.16.4.6 + +--- +apiVersion: v1 +kind: Service +metadata: + name: nodeport-svc-udp-local + namespace: default + +spec: + ports: + - name: udp + port: 8000 + nodePort: 32456 + type: NodePort + externalTrafficPolicy: Local + clusterIP: 10.10.0.1 + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: nodeport-svc-udp-local-abc + namespace: default + labels: + kubernetes.io/service-name: nodeport-svc-udp-local + +addressType: IPv4 +ports: + - name: udp + port: 8000 +endpoints: + - addresses: + - 10.10.0.10 + conditions: + ready: true + nodeName: traefik-node-1 + - addresses: + - 10.10.0.11 + conditions: + ready: true + nodeName: traefik-node-2 + - addresses: + - 10.10.0.12 + conditions: + ready: false + nodeName: traefik-node-3 diff --git a/pkg/provider/kubernetes/crd/fixtures/with_node_port_lb_local_traffic_policy.yml b/pkg/provider/kubernetes/crd/fixtures/with_node_port_lb_local_traffic_policy.yml new file mode 100644 index 000000000..4a6393958 --- /dev/null +++ b/pkg/provider/kubernetes/crd/fixtures/with_node_port_lb_local_traffic_policy.yml @@ -0,0 +1,93 @@ +apiVersion: traefik.io/v1alpha1 +kind: IngressRoute +metadata: + name: test.route + namespace: default + +spec: + entryPoints: + - foo + + routes: + - match: Host(`foo.com`) + kind: Rule + services: + - name: nodeport-svc-local + port: 80 + nodePortLB: true + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-1 +status: + addresses: + - type: InternalIP + address: 172.16.4.4 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-2 +status: + addresses: + - type: InternalIP + address: 172.16.4.5 + +--- +kind: Node +apiVersion: v1 +metadata: + name: traefik-node-3 +status: + addresses: + - type: InternalIP + address: 172.16.4.6 + +--- +apiVersion: v1 +kind: Service +metadata: + name: nodeport-svc-local + namespace: default + +spec: + ports: + - name: web + port: 80 + nodePort: 32456 + type: NodePort + externalTrafficPolicy: Local + clusterIP: 10.10.0.1 + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: nodeport-svc-local-abc + namespace: default + labels: + kubernetes.io/service-name: nodeport-svc-local + +addressType: IPv4 +ports: + - name: web + port: 80 +endpoints: + - addresses: + - 10.10.0.10 + conditions: + ready: true + nodeName: traefik-node-1 + - addresses: + - 10.10.0.11 + conditions: + ready: true + nodeName: traefik-node-2 + - addresses: + - 10.10.0.12 + conditions: + ready: false + nodeName: traefik-node-3 diff --git a/pkg/provider/kubernetes/crd/kubernetes_http.go b/pkg/provider/kubernetes/crd/kubernetes_http.go index d9b3845b4..04651fa35 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_http.go +++ b/pkg/provider/kubernetes/crd/kubernetes_http.go @@ -598,11 +598,11 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L return nil, errors.New("nodes lookup is disabled") } - nodes, nodesExists, nodesErr := c.client.GetNodes() + allnodes, nodesExists, nodesErr := c.client.GetNodes() if nodesErr != nil { return nil, nodesErr } - if !nodesExists || len(nodes) == 0 { + if !nodesExists || len(allnodes) == 0 { return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", namespace, sanitizedName) } @@ -611,7 +611,33 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L return nil, err } - for _, node := range nodes { + filteredNodes := allnodes + // if service traffic policy is Local, then only use the nodes where pod is running + if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + filteredNodes = make([]*corev1.Node, 0) + endpointSlices, err := c.client.GetEndpointSlicesForService(namespace, sanitizedName) + if err != nil { + return nil, fmt.Errorf("getting endpointslices: %w", err) + } + + for _, endpointSlice := range endpointSlices { + for _, endpoint := range endpointSlice.Endpoints { + // if endpoint is ready, take the node name filter the nodes + if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + continue + } + // filter nodes by node name + for _, node := range allnodes { + if node.Name == *endpoint.NodeName { + filteredNodes = append(filteredNodes, node) + break + } + } + } + } + } + + for _, node := range filteredNodes { for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { hostPort := net.JoinHostPort(addr.Address, strconv.Itoa(int(svcPort.NodePort))) diff --git a/pkg/provider/kubernetes/crd/kubernetes_tcp.go b/pkg/provider/kubernetes/crd/kubernetes_tcp.go index 77a363606..c7b2f2808 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_tcp.go +++ b/pkg/provider/kubernetes/crd/kubernetes_tcp.go @@ -243,16 +243,41 @@ func (p *Provider) loadTCPServers(client Client, namespace string, svc traefikv1 return nil, errors.New("nodes lookup is disabled") } - nodes, nodesExists, nodesErr := client.GetNodes() + allnodes, nodesExists, nodesErr := client.GetNodes() if nodesErr != nil { return nil, nodesErr } - if !nodesExists || len(nodes) == 0 { + if !nodesExists || len(allnodes) == 0 { return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", svc.Namespace, svc.Name) } + filteredNodes := allnodes + // if service traffic policy is Local, then only use the nodes where pod is running + if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + filteredNodes = make([]*corev1.Node, 0) + endpointSlices, err := client.GetEndpointSlicesForService(namespace, svc.Name) + if err != nil { + return nil, fmt.Errorf("getting endpointslices: %w", err) + } - for _, node := range nodes { + for _, endpointSlice := range endpointSlices { + for _, endpoint := range endpointSlice.Endpoints { + // if endpoint is ready, take the node name filter the nodes + if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + continue + } + // filter nodes by node name + for _, node := range allnodes { + if node.Name == *endpoint.NodeName { + filteredNodes = append(filteredNodes, node) + break + } + } + } + } + } + + for _, node := range filteredNodes { for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { servers = append(servers, dynamic.TCPServer{ diff --git a/pkg/provider/kubernetes/crd/kubernetes_test.go b/pkg/provider/kubernetes/crd/kubernetes_test.go index 64b922509..e8aea3e4f 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_test.go +++ b/pkg/provider/kubernetes/crd/kubernetes_test.go @@ -8941,6 +8941,96 @@ func TestNodePortLB(t *testing.T) { TLS: &dynamic.TLSConfiguration{}, }, }, + { + desc: "HTTP with node port LB and Local external traffic policy", + paths: []string{"services.yml", "with_node_port_lb_local_traffic_policy.yml"}, + expected: &dynamic.Configuration{ + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + TCP: &dynamic.TCPConfiguration{ + ServersTransports: map[string]*dynamic.TCPServersTransport{}, + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + }, + HTTP: &dynamic.HTTPConfiguration{ + ServersTransports: map[string]*dynamic.ServersTransport{}, + Routers: map[string]*dynamic.Router{ + "default-test-route-6f97418635c7e18853da": { + EntryPoints: []string{"foo"}, + Service: "default-test-route-6f97418635c7e18853da", + Rule: "Host(`foo.com`)", + Priority: 0, + }, + }, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{ + "default-test-route-6f97418635c7e18853da": { + LoadBalancer: &dynamic.ServersLoadBalancer{ + Strategy: dynamic.BalancerStrategyWRR, + ResponseForwarding: &dynamic.ResponseForwarding{FlushInterval: dynamic.DefaultFlushInterval}, + Servers: []dynamic.Server{ + { + URL: "http://172.16.4.4:32456", + }, + { + URL: "http://172.16.4.5:32456", + }, + }, + PassHostHeader: pointer(true), + }, + }, + }, + }, + TLS: &dynamic.TLSConfiguration{}, + }, + }, + { + desc: "TCP with node port LB and Local external traffic policy", + paths: []string{"tcp/services.yml", "tcp/with_node_port_service_lb_local_traffic_policy.yml"}, + expected: &dynamic.Configuration{ + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + HTTP: &dynamic.HTTPConfiguration{ + ServersTransports: map[string]*dynamic.ServersTransport{}, + Routers: map[string]*dynamic.Router{}, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{}, + }, + TCP: &dynamic.TCPConfiguration{ + ServersTransports: map[string]*dynamic.TCPServersTransport{}, + Routers: map[string]*dynamic.TCPRouter{ + "default-test.route-fdd3e9338e47a45efefc": { + EntryPoints: []string{"foo"}, + Service: "default-test.route-fdd3e9338e47a45efefc", + Rule: "HostSNI(`foo.com`)", + }, + }, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{ + "default-test.route-fdd3e9338e47a45efefc": { + LoadBalancer: &dynamic.TCPServersLoadBalancer{ + Servers: []dynamic.TCPServer{ + { + Address: "172.16.4.4:32456", + Port: "", + }, + { + Address: "172.16.4.5:32456", + Port: "", + }, + }, + }, + }, + }, + }, + TLS: &dynamic.TLSConfiguration{}, + }, + }, { desc: "TCP with native Service LB, cluster scope resources disabled", @@ -8972,6 +9062,49 @@ func TestNodePortLB(t *testing.T) { TLS: &dynamic.TLSConfiguration{}, }, }, + { + desc: "UDP with node port LB and Local external traffic policy", + paths: []string{"udp/services.yml", "udp/with_node_port_service_lb_local_traffic_policy.yml"}, + expected: &dynamic.Configuration{ + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{ + "default-test.route-0": { + EntryPoints: []string{"foo"}, + Service: "default-test.route-0", + }, + }, + Services: map[string]*dynamic.UDPService{ + "default-test.route-0": { + LoadBalancer: &dynamic.UDPServersLoadBalancer{ + Servers: []dynamic.UDPServer{ + { + Address: "172.16.4.4:32456", + Port: "", + }, + { + Address: "172.16.4.5:32456", + Port: "", + }, + }, + }, + }, + }, + }, + HTTP: &dynamic.HTTPConfiguration{ + ServersTransports: map[string]*dynamic.ServersTransport{}, + Routers: map[string]*dynamic.Router{}, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{}, + }, + TCP: &dynamic.TCPConfiguration{ + ServersTransports: map[string]*dynamic.TCPServersTransport{}, + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + }, + TLS: &dynamic.TLSConfiguration{}, + }, + }, { desc: "UDP with native Service LB, cluster scope resources disabled", paths: []string{"udp/services.yml", "udp/with_node_port_service_lb.yml"}, diff --git a/pkg/provider/kubernetes/crd/kubernetes_udp.go b/pkg/provider/kubernetes/crd/kubernetes_udp.go index 719348b06..bd47838bb 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_udp.go +++ b/pkg/provider/kubernetes/crd/kubernetes_udp.go @@ -131,16 +131,42 @@ func (p *Provider) loadUDPServers(client Client, namespace string, svc traefikv1 return nil, errors.New("nodes lookup is disabled") } - nodes, nodesExists, nodesErr := client.GetNodes() + allnodes, nodesExists, nodesErr := client.GetNodes() if nodesErr != nil { return nil, nodesErr } - if !nodesExists || len(nodes) == 0 { + if !nodesExists || len(allnodes) == 0 { return nil, fmt.Errorf("nodes not found for NodePort service %s/%s", svc.Namespace, svc.Name) } - for _, node := range nodes { + filteredNodes := allnodes + // if service traffic policy is Local, then only use the nodes where pod is running + if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { + filteredNodes = make([]*corev1.Node, 0) + endpointSlices, err := client.GetEndpointSlicesForService(namespace, svc.Name) + if err != nil { + return nil, fmt.Errorf("getting endpointslices: %w", err) + } + + for _, endpointSlice := range endpointSlices { + for _, endpoint := range endpointSlice.Endpoints { + // if endpoint is ready, take the node name filter the nodes + if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + continue + } + // filter nodes by node name + for _, node := range allnodes { + if node.Name == *endpoint.NodeName { + filteredNodes = append(filteredNodes, node) + break + } + } + } + } + } + + for _, node := range filteredNodes { for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { servers = append(servers, dynamic.UDPServer{