From f4f129a279bd975611f3a3bfa95ff55dcb941da1 Mon Sep 17 00:00:00 2001 From: Simon Delicata Date: Wed, 28 Jan 2026 11:44:05 +0100 Subject: [PATCH] Add configuration transformer mechanism to the ConfigurationWatcher --- pkg/server/configurationwatcher.go | 45 +++++++------ pkg/server/configurationwatcher_test.go | 84 +++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 18 deletions(-) diff --git a/pkg/server/configurationwatcher.go b/pkg/server/configurationwatcher.go index 8142f8359..747f476f6 100644 --- a/pkg/server/configurationwatcher.go +++ b/pkg/server/configurationwatcher.go @@ -28,6 +28,8 @@ type ConfigurationWatcher struct { requiredProvider string configurationListeners []func(dynamic.Configuration) + configurationTransformers []func(context.Context, dynamic.Configurations) dynamic.Configurations + routinesPool *safe.Pool } @@ -63,12 +65,14 @@ func (c *ConfigurationWatcher) Stop() { // AddListener adds a new listener function used when new configuration is provided. func (c *ConfigurationWatcher) AddListener(listener func(dynamic.Configuration)) { - if c.configurationListeners == nil { - c.configurationListeners = make([]func(dynamic.Configuration), 0) - } c.configurationListeners = append(c.configurationListeners, listener) } +// AddTransformer registers a function to modify configurations before they are applied. +func (c *ConfigurationWatcher) AddTransformer(transformer func(context.Context, dynamic.Configurations) dynamic.Configurations) { + c.configurationTransformers = append(c.configurationTransformers, transformer) +} + func (c *ConfigurationWatcher) startProviderAggregator() { log.Info().Msgf("Starting provider aggregator %T", c.providerAggregator) @@ -81,22 +85,24 @@ func (c *ConfigurationWatcher) startProviderAggregator() { } // receiveConfigurations receives configuration changes from the providers. -// The configuration message then gets passed along a series of check, notably +// The configuration message then gets passed along a series of checks, notably // to verify that, for a given provider, the configuration that was just received // is at least different from the previously received one. -// The full set of configurations is then sent to the throttling goroutine, -// (throttleAndApplyConfigurations) via a RingChannel, which ensures that we can -// constantly send in a non-blocking way to the throttling goroutine the last -// global state we are aware of. +// The full set of configurations is then sent to applyConfigurations +// via a channel in a non-blocking manner, ensuring the latest global state +// is always available for processing. func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) { newConfigurations := make(dynamic.Configurations) + transformedConfigurations := make(dynamic.Configurations) + var output chan dynamic.Configurations + for { select { case <-ctx.Done(): return - // DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs - case output <- newConfigurations.DeepCopy(): + // DeepCopy is necessary because transformedConfigurations gets modified later by the consumer of c.newConfigs. + case output <- transformedConfigurations.DeepCopy(): output = nil default: @@ -123,28 +129,31 @@ func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) { logConfiguration(logger, configMsg) if reflect.DeepEqual(newConfigurations[configMsg.ProviderName], configMsg.Configuration) { - // no change, do nothing + // no change, do nothing. logger.Debug().Msg("Skipping unchanged configuration") continue } newConfigurations[configMsg.ProviderName] = configMsg.Configuration.DeepCopy() + transformedConfigurations = newConfigurations + for _, transform := range c.configurationTransformers { + transformedConfigurations = transform(logger.WithContext(ctx), transformedConfigurations.DeepCopy()) + } + output = c.newConfigs - // DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs - case output <- newConfigurations.DeepCopy(): + // DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs. + case output <- transformedConfigurations.DeepCopy(): output = nil } } } } -// applyConfigurations blocks on a RingChannel that receives the new -// set of configurations that is compiled and sent by receiveConfigurations as soon -// as a provider change occurs. If the new set is different from the previous set -// that had been applied, the new set is applied, and we sleep for a while before -// listening on the channel again. +// applyConfigurations receives the full set of configurations from +// receiveConfigurations and applies them if they differ from the previous set. +// It waits for the required provider's configuration before applying any configs. func (c *ConfigurationWatcher) applyConfigurations(ctx context.Context) { var lastConfigurations dynamic.Configurations for { diff --git a/pkg/server/configurationwatcher_test.go b/pkg/server/configurationwatcher_test.go index 66812feac..40df7b5aa 100644 --- a/pkg/server/configurationwatcher_test.go +++ b/pkg/server/configurationwatcher_test.go @@ -908,3 +908,87 @@ func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { assert.Equal(t, 1, publishedConfigCount) } + +func TestConfigurationWatcher_MultipleTransformers(t *testing.T) { + routinesPool := safe.NewPool(t.Context()) + t.Cleanup(routinesPool.Stop) + + pvd := &mockProvider{ + messages: []dynamic.Message{{ + ProviderName: "mock", + Configuration: &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters( + th.WithRouter("original", + th.WithEntryPoints("e"), + th.WithServiceName("scv"))), + ), + }, + }}, + } + + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "") + + var callOrder []string + + var callCount1, callCount2 int + + watcher.AddTransformer(func(_ context.Context, configs dynamic.Configurations) dynamic.Configurations { + callCount1++ + + callOrder = append(callOrder, "transformer1") + + for _, config := range configs { + if config != nil && config.HTTP != nil { + config.HTTP.Routers["from-transformer1"] = &dynamic.Router{ + EntryPoints: []string{"e"}, + Service: "svc1", + } + } + } + + return configs + }) + + watcher.AddTransformer(func(_ context.Context, configs dynamic.Configurations) dynamic.Configurations { + callCount2++ + + callOrder = append(callOrder, "transformer2") + + // Verify that transformer1's changes are visible. + for _, config := range configs { + if config != nil && config.HTTP != nil { + assert.Contains(t, config.HTTP.Routers, "from-transformer1") + config.HTTP.Routers["from-transformer2"] = &dynamic.Router{ + EntryPoints: []string{"e"}, + Service: "svc2", + } + } + } + + return configs + }) + + run := make(chan struct{}) + + watcher.AddListener(func(conf dynamic.Configuration) { + assert.NotNil(t, conf.HTTP) + assert.Contains(t, conf.HTTP.Routers, "original@mock") + assert.Contains(t, conf.HTTP.Routers, "from-transformer1@mock") + assert.Contains(t, conf.HTTP.Routers, "from-transformer2@mock") + close(run) + }) + + watcher.Start() + t.Cleanup(watcher.Stop) + + select { + case <-run: + case <-time.After(5 * time.Second): + t.Fatal("Timeout waiting for configuration") + } + + assert.Equal(t, []string{"transformer1", "transformer2"}, callOrder) + assert.Equal(t, 1, callCount1) + assert.Equal(t, 1, callCount2) +}