From f32d58c577f826fdbd8701cc135722c227062729 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 29 Jan 2026 11:42:06 +0100 Subject: [PATCH] Fix flakiness unit tests --- pkg/healthcheck/healthcheck_tcp_test.go | 85 +++++++++----- pkg/healthcheck/healthcheck_test.go | 37 ++++-- pkg/healthcheck/mock_test.go | 15 +++ .../kubernetes/ingress/kubernetes_test.go | 2 +- .../loadbalancer/leasttime/leasttime_test.go | 108 +++++++++--------- 5 files changed, 153 insertions(+), 94 deletions(-) diff --git a/pkg/healthcheck/healthcheck_tcp_test.go b/pkg/healthcheck/healthcheck_tcp_test.go index 70f91e8a8..4f80fd017 100644 --- a/pkg/healthcheck/healthcheck_tcp_test.go +++ b/pkg/healthcheck/healthcheck_tcp_test.go @@ -466,42 +466,52 @@ func TestServiceTCPHealthChecker_Launch(t *testing.T) { }, } - lb := &testLoadBalancer{} + // Create load balancer with event channel for synchronization. + lb := &testLoadBalancer{ + RWMutex: &sync.RWMutex{}, + eventCh: make(chan struct{}, len(test.server.StatusSequence)+5), + } serviceInfo := &truntime.TCPServiceInfo{} service := NewServiceTCPHealthChecker(ctx, test.config, lb, serviceInfo, targets, "serviceName") go service.Launch(ctx) - // How much time to wait for the health check to actually complete. - deadline := time.Now().Add(200 * time.Millisecond) - // TLS handshake can take much longer. + // Timeout for each event - TLS handshake can take longer. + eventTimeout := 500 * time.Millisecond if test.server.TLS { - deadline = time.Now().Add(1000 * time.Millisecond) + eventTimeout = 2 * time.Second } - // Wait for all health checks to complete deterministically + // Wait for health check events using channel synchronization. + // Iterate over StatusSequence to release each connection via Next(). for i := range test.server.StatusSequence { test.server.Next() - initialUpserted := lb.numUpsertedServers - initialRemoved := lb.numRemovedServers - - for time.Now().Before(deadline) { - time.Sleep(5 * time.Millisecond) - if lb.numUpsertedServers > initialUpserted || lb.numRemovedServers > initialRemoved { - // Stop the health checker immediately after the last expected sequence completes - // to prevent extra health checks from firing and modifying the counters. - if i == len(test.server.StatusSequence)-1 { - cancel() - } - break + select { + case <-lb.eventCh: + // Event received + // On the last iteration, stop the health checker immediately + // to prevent extra checks from modifying the counters. + if i == len(test.server.StatusSequence)-1 { + test.server.Close() + cancel() } + case <-time.After(eventTimeout): + t.Fatalf("timeout waiting for health check event %d/%d", i+1, len(test.server.StatusSequence)) } } - assert.Equal(t, test.expNumRemovedServers, lb.numRemovedServers, "removed servers") - assert.Equal(t, test.expNumUpsertedServers, lb.numUpsertedServers, "upserted servers") + // Small delay to let goroutines clean up. + time.Sleep(10 * time.Millisecond) + + lb.RLock() + removedServers := lb.numRemovedServers + upsertedServers := lb.numUpsertedServers + lb.RUnlock() + + assert.Equal(t, test.expNumRemovedServers, removedServers, "removed servers") + assert.Equal(t, test.expNumUpsertedServers, upsertedServers, "upserted servers") assert.Equal(t, map[string]string{test.server.Addr.String(): test.targetStatus}, serviceInfo.GetAllStatus()) }) } @@ -597,6 +607,8 @@ type sequencedTCPServer struct { StatusSequence []tcpMockSequence TLS bool release chan struct{} + mu sync.Mutex + listener net.Listener } func newTCPServer(t *testing.T, tlsEnabled bool, statusSequence ...tcpMockSequence) *sequencedTCPServer { @@ -624,17 +636,28 @@ func (s *sequencedTCPServer) Next() { s.release <- struct{}{} } +func (s *sequencedTCPServer) Close() { + s.mu.Lock() + defer s.mu.Unlock() + if s.listener != nil { + s.listener.Close() + s.listener = nil + } +} + func (s *sequencedTCPServer) Start(t *testing.T) { t.Helper() go func() { - var listener net.Listener - for _, seq := range s.StatusSequence { <-s.release - if listener != nil { - listener.Close() + + s.mu.Lock() + if s.listener != nil { + s.listener.Close() + s.listener = nil } + s.mu.Unlock() if !seq.accept { continue @@ -643,7 +666,7 @@ func (s *sequencedTCPServer) Start(t *testing.T) { lis, err := net.ListenTCP("tcp", s.Addr) require.NoError(t, err) - listener = lis + var listener net.Listener = lis if s.TLS { cert, err := tls.X509KeyPair(localhostCert, localhostKey) @@ -670,8 +693,18 @@ func (s *sequencedTCPServer) Start(t *testing.T) { ) } + s.mu.Lock() + s.listener = listener + s.mu.Unlock() + conn, err := listener.Accept() - require.NoError(t, err) + if err != nil { + // Listener was closed during shutdown - this is expected behavior. + if strings.Contains(err.Error(), "use of closed network connection") { + return + } + require.NoError(t, err) + } t.Cleanup(func() { _ = conn.Close() }) diff --git a/pkg/healthcheck/healthcheck_test.go b/pkg/healthcheck/healthcheck_test.go index cf2bdf2a4..077a01349 100644 --- a/pkg/healthcheck/healthcheck_test.go +++ b/pkg/healthcheck/healthcheck_test.go @@ -418,7 +418,12 @@ func TestServiceHealthChecker_Launch(t *testing.T) { targetURL, timeout := test.server.Start(t, cancel) - lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}} + // Create load balancer with event channel for synchronization. + expectedEvents := test.expNumRemovedServers + test.expNumUpsertedServers + lb := &testLoadBalancer{ + RWMutex: &sync.RWMutex{}, + eventCh: make(chan struct{}, expectedEvents+5), + } config := &dynamic.ServerHealthCheck{ Mode: test.mode, @@ -441,18 +446,30 @@ func TestServiceHealthChecker_Launch(t *testing.T) { wg.Done() }() - select { - case <-time.After(timeout): - t.Fatal("test did not complete in time") - case <-ctx.Done(): - wg.Wait() + // Wait for expected health check events using channel synchronization. + for i := range expectedEvents { + select { + case <-lb.eventCh: + // Event received. + // On the last event, cancel to prevent extra health checks. + if i == expectedEvents-1 { + cancel() + } + case <-time.After(timeout): + t.Fatalf("timeout waiting for health check event %d/%d", i+1, expectedEvents) + } } - lb.Lock() - defer lb.Unlock() + // Wait for the health checker goroutine to exit before making assertions. + wg.Wait() - assert.Equal(t, test.expNumRemovedServers, lb.numRemovedServers, "removed servers") - assert.Equal(t, test.expNumUpsertedServers, lb.numUpsertedServers, "upserted servers") + lb.RLock() + removedServers := lb.numRemovedServers + upsertedServers := lb.numUpsertedServers + lb.RUnlock() + + assert.Equal(t, test.expNumRemovedServers, removedServers, "removed servers") + assert.Equal(t, test.expNumUpsertedServers, upsertedServers, "upserted servers") assert.InDelta(t, test.expGaugeValue, gauge.GaugeValue, delta, "ServerUp Gauge") assert.Equal(t, []string{"service", "foobar", "url", targetURL.String()}, gauge.LastLabelValues) assert.Equal(t, map[string]string{targetURL.String(): test.targetStatus}, serviceInfo.GetAllStatus()) diff --git a/pkg/healthcheck/mock_test.go b/pkg/healthcheck/mock_test.go index 0adcf1b25..cea0f670c 100644 --- a/pkg/healthcheck/mock_test.go +++ b/pkg/healthcheck/mock_test.go @@ -168,14 +168,29 @@ type testLoadBalancer struct { numRemovedServers int numUpsertedServers int + + // eventCh is used to signal when a status change occurs, allowing tests + // to synchronize with health check events deterministically. + eventCh chan struct{} } func (lb *testLoadBalancer) SetStatus(ctx context.Context, childName string, up bool) { + lb.Lock() if up { lb.numUpsertedServers++ } else { lb.numRemovedServers++ } + lb.Unlock() + + // Signal the event if a listener is registered. + if lb.eventCh != nil { + select { + case lb.eventCh <- struct{}{}: + default: + // Don't block if channel is full or no listener. + } + } } type MetricsMock struct { diff --git a/pkg/provider/kubernetes/ingress/kubernetes_test.go b/pkg/provider/kubernetes/ingress/kubernetes_test.go index c67e84e57..b7aac6014 100644 --- a/pkg/provider/kubernetes/ingress/kubernetes_test.go +++ b/pkg/provider/kubernetes/ingress/kubernetes_test.go @@ -2357,7 +2357,7 @@ func TestIngressEndpointPublishedService(t *testing.T) { ingress, err := kubeClient.NetworkingV1().Ingresses(metav1.NamespaceDefault).Get(t.Context(), "foo", metav1.GetOptions{}) require.NoError(t, err) - assert.Equal(t, test.expected, ingress.Status.LoadBalancer.Ingress) + assert.ElementsMatch(t, test.expected, ingress.Status.LoadBalancer.Ingress) }) } } diff --git a/pkg/server/service/loadbalancer/leasttime/leasttime_test.go b/pkg/server/service/loadbalancer/leasttime/leasttime_test.go index d444b314f..7c199eefc 100644 --- a/pkg/server/service/loadbalancer/leasttime/leasttime_test.go +++ b/pkg/server/service/loadbalancer/leasttime/leasttime_test.go @@ -800,53 +800,47 @@ func TestScoreCalculationWithWeights(t *testing.T) { } // TestScoreCalculationWithInflight tests that inflight requests are considered in score calculation. +// Uses direct manipulation of response times and nextServer() for deterministic results. func TestScoreCalculationWithInflight(t *testing.T) { balancer := New(nil, false) - // We'll manually control the inflight counters to test the score calculation. - // Add two servers with same response time. + // Add two servers with dummy handlers (we test selection logic directly). balancer.Add("server1", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - time.Sleep(10 * time.Millisecond) - rw.Header().Set("server", "server1") rw.WriteHeader(http.StatusOK) - httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte() }), pointer(1), false) balancer.Add("server2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - time.Sleep(10 * time.Millisecond) - rw.Header().Set("server", "server2") rw.WriteHeader(http.StatusOK) - httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte() }), pointer(1), false) - // Build up response time averages for both servers. - for range 2 { - recorder := httptest.NewRecorder() - balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + // Pre-fill response times directly (10ms average for both servers). + for _, h := range balancer.handlers { + for i := range sampleSize { + h.responseTimes[i] = 10.0 + } + h.responseTimeSum = 10.0 * sampleSize + h.sampleCount = sampleSize } - // Now manually set server1 to have high inflight count. + // Set server1 to have high inflight count. balancer.handlers[0].inflightCount.Store(5) // Make requests - they should prefer server2 because: // Score for server1: (10 × (1 + 5)) / 1 = 60 // Score for server2: (10 × (1 + 0)) / 1 = 10 - recorder := &responseRecorder{save: map[string]int{}} + counts := map[string]int{"server1": 0, "server2": 0} for range 5 { - // Manually increment to simulate the ServeHTTP behavior. - server, _ := balancer.nextServer() + server, err := balancer.nextServer() + assert.NoError(t, err) + counts[server.name]++ + // Simulate ServeHTTP incrementing inflight count. server.inflightCount.Add(1) - - if server.name == "server1" { - recorder.save["server1"]++ - } else { - recorder.save["server2"]++ - } } - // Server2 should get all requests - assert.Equal(t, 5, recorder.save["server2"]) - assert.Zero(t, recorder.save["server1"]) + // Server2 should get all requests since its score (10-50) is always less than server1's (60). + // After each selection, server2's inflight grows: scores are 10, 20, 30, 40, 50 vs 60. + assert.Equal(t, 5, counts["server2"]) + assert.Zero(t, counts["server1"]) } // TestScoreCalculationColdStart tests that new servers (0ms avg) get fair selection @@ -930,28 +924,20 @@ func TestFastServerGetsMoreTraffic(t *testing.T) { // TestTrafficShiftsWhenPerformanceDegrades verifies that the load balancer // adapts to changing server performance by shifting traffic away from degraded servers. // This tests the adaptive behavior - the core value proposition of least-time load balancing. +// Uses nextServer() directly to avoid timing variations and ensure deterministic results. func TestTrafficShiftsWhenPerformanceDegrades(t *testing.T) { balancer := New(nil, false) - // Use atomic to dynamically control server1's response time. - server1Delay := atomic.Int64{} - server1Delay.Store(5) // Start with 5ms - + // Add two servers with dummy handlers (we'll test selection logic directly). balancer.Add("server1", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - time.Sleep(time.Duration(server1Delay.Load()) * time.Millisecond) - rw.Header().Set("server", "server1") rw.WriteHeader(http.StatusOK) - httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte() }), pointer(1), false) balancer.Add("server2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - time.Sleep(5 * time.Millisecond) // Static 5ms - rw.Header().Set("server", "server2") rw.WriteHeader(http.StatusOK) - httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte() }), pointer(1), false) - // Pre-fill ring buffers to eliminate cold start effects and ensure deterministic equal performance state. + // Pre-fill ring buffers with equal response times (5ms each). for _, h := range balancer.handlers { for i := range sampleSize { h.responseTimes[i] = 5.0 @@ -960,35 +946,43 @@ func TestTrafficShiftsWhenPerformanceDegrades(t *testing.T) { h.sampleCount = sampleSize } - // Phase 1: Both servers perform equally (5ms each). - recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + // Phase 1: Both servers have equal performance (5ms each). + // With WRR tie-breaking, traffic should be distributed evenly. + counts := map[string]int{"server1": 0, "server2": 0} for range 50 { - balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + server, err := balancer.nextServer() + assert.NoError(t, err) + counts[server.name]++ } - // With equal performance and pre-filled buffers, distribution should be balanced via WRR tie-breaking. - total := recorder.save["server1"] + recorder.save["server2"] + total := counts["server1"] + counts["server2"] assert.Equal(t, 50, total) - assert.InDelta(t, 25, recorder.save["server1"], 10) // 25 ± 10 requests - assert.InDelta(t, 25, recorder.save["server2"], 10) // 25 ± 10 requests + assert.InDelta(t, 25, counts["server1"], 1) // Deterministic WRR: 25 ± 1 + assert.InDelta(t, 25, counts["server2"], 1) // Deterministic WRR: 25 ± 1 - // Phase 2: server1 degrades (simulating GC pause, CPU spike, or network latency). - server1Delay.Store(50) // Now 50ms (10x slower) - dramatic degradation for reliable detection - - // Make more requests to shift the moving average. - // Ring buffer has 100 samples, need significant new samples to shift average. - // server1's average will climb from ~5ms toward 50ms. - recorder2 := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} - for range 60 { - balancer.ServeHTTP(recorder2, httptest.NewRequest(http.MethodGet, "/", nil)) + // Phase 2: Simulate server1 degradation by directly updating its ring buffer. + // Set server1's average response time to 50ms (10x slower than server2's 5ms). + for _, h := range balancer.handlers { + if h.name == "server1" { + for i := range sampleSize { + h.responseTimes[i] = 50.0 + } + h.responseTimeSum = 50.0 * sampleSize + } } - // server2 should get significantly more traffic - // With 10x performance difference, server2 should dominate. - total2 := recorder2.save["server1"] + recorder2.save["server2"] + // With 10x performance difference, server2 should get significantly more traffic. + counts2 := map[string]int{"server1": 0, "server2": 0} + for range 60 { + server, err := balancer.nextServer() + assert.NoError(t, err) + counts2[server.name]++ + } + + total2 := counts2["server1"] + counts2["server2"] assert.Equal(t, 60, total2) - assert.Greater(t, recorder2.save["server2"], 35) // At least ~60% (35/60) - assert.Less(t, recorder2.save["server1"], 25) // At most ~40% (25/60) + assert.Greater(t, counts2["server2"], 35) // At least ~60% (35/60) + assert.Less(t, counts2["server1"], 25) // At most ~40% (25/60) } // TestMultipleServersWithSameScore tests WRR tie-breaking when multiple servers have identical scores.