This commit is contained in:
yonizxz 2026-02-03 17:00:39 -08:00 committed by GitHub
commit bb1919017f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 45 additions and 7 deletions

View file

@ -64235,8 +64235,16 @@ func schema_controllers_node_config_v1alpha1_NodeControllerConfiguration(ref com
Format: "int32",
},
},
"ConcurrentNodeStatusUpdates": {
SchemaProps: spec.SchemaProps{
Description: "ConcurrentNodeStatusUpdates is the number of workers concurrently updating node statues",
Default: 0,
Type: []string{"integer"},
Format: "int32",
},
},
},
Required: []string{"ConcurrentNodeSyncs"},
Required: []string{"ConcurrentNodeSyncs", "ConcurrentNodeStatusUpdates"},
},
},
}

View file

@ -48,6 +48,7 @@ func startCloudNodeController(ctx context.Context, initContext ControllerInitCon
cloud,
completedConfig.ComponentConfig.NodeStatusUpdateFrequency.Duration,
completedConfig.ComponentConfig.NodeController.ConcurrentNodeSyncs,
completedConfig.ComponentConfig.NodeController.ConcurrentNodeStatusUpdates,
)
if err != nil {
klog.Warningf("failed to start cloud node controller: %s", err)

View file

@ -21,4 +21,7 @@ type NodeControllerConfiguration struct {
// ConcurrentNodeSyncs is the number of workers
// concurrently synchronizing nodes
ConcurrentNodeSyncs int32
// ConcurrentNodeStatusUpdates is the number of workers
// concurrently updating node statues
ConcurrentNodeStatusUpdates int32
}

View file

@ -21,4 +21,7 @@ type NodeControllerConfiguration struct {
// ConcurrentNodeSyncs is the number of workers
// concurrently synchronizing nodes
ConcurrentNodeSyncs int32
// ConcurrentNodeStatusUpdates is the number of workers
// concurrently updating node statues
ConcurrentNodeStatusUpdates int32
}

View file

@ -49,10 +49,12 @@ func RegisterConversions(s *runtime.Scheme) error {
func autoConvert_v1alpha1_NodeControllerConfiguration_To_config_NodeControllerConfiguration(in *NodeControllerConfiguration, out *config.NodeControllerConfiguration, s conversion.Scope) error {
out.ConcurrentNodeSyncs = in.ConcurrentNodeSyncs
out.ConcurrentNodeStatusUpdates = in.ConcurrentNodeStatusUpdates
return nil
}
func autoConvert_config_NodeControllerConfiguration_To_v1alpha1_NodeControllerConfiguration(in *config.NodeControllerConfiguration, out *NodeControllerConfiguration, s conversion.Scope) error {
out.ConcurrentNodeSyncs = in.ConcurrentNodeSyncs
out.ConcurrentNodeStatusUpdates = in.ConcurrentNodeStatusUpdates
return nil
}

View file

@ -106,6 +106,7 @@ type CloudNodeController struct {
nodeStatusUpdateFrequency time.Duration
workerCount int32
statusUpdateWorkerCount int32
nodesLister corelisters.NodeLister
nodesSynced cache.InformerSynced
@ -118,7 +119,8 @@ func NewCloudNodeController(
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeStatusUpdateFrequency time.Duration,
workerCount int32) (*CloudNodeController, error) {
workerCount,
statusUpdateWorkerCount int32) (*CloudNodeController, error) {
_, instancesSupported := cloud.Instances()
_, instancesV2Supported := cloud.InstancesV2()
@ -132,6 +134,7 @@ func NewCloudNodeController(
cloud: cloud,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
workerCount: workerCount,
statusUpdateWorkerCount: statusUpdateWorkerCount,
nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
@ -289,7 +292,7 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
cnc.updateNodeAddress(ctx, node, instanceMetadata)
}
workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc)
workqueue.ParallelizeUntil(ctx, int(cnc.statusUpdateWorkerCount), len(nodes), updateNodeFunc)
return nil
}

View file

@ -2787,7 +2787,7 @@ func TestUpdateNodeStatus(t *testing.T) {
cloud: fakeCloud,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
workerCount: test.workers,
statusUpdateWorkerCount: test.workers,
}
for _, n := range generateNodes(test.nodes) {

View file

@ -36,6 +36,7 @@ func (o *NodeControllerOptions) AddFlags(fs *pflag.FlagSet) {
}
fs.Int32Var(&o.ConcurrentNodeSyncs, "concurrent-node-syncs", o.ConcurrentNodeSyncs, "Number of workers concurrently synchronizing nodes.")
fs.Int32Var(&o.ConcurrentNodeStatusUpdates, "concurrent-node-status-updates", 0, "Number of workers concurrently updating node statuses.")
}
// ApplyTo fills up ServiceController config with options.
@ -45,6 +46,13 @@ func (o *NodeControllerOptions) ApplyTo(cfg *nodeconfig.NodeControllerConfigurat
}
cfg.ConcurrentNodeSyncs = o.ConcurrentNodeSyncs
// Concurrent node status updates used to be derived from the concurrent-node-syncs flag,
// so for backwards compatibility it will default to concurrent-node-syncs if concurrent-node-status-updates is not specified
if o.ConcurrentNodeStatusUpdates > 0 {
cfg.ConcurrentNodeStatusUpdates = o.ConcurrentNodeStatusUpdates
} else {
cfg.ConcurrentNodeStatusUpdates = o.ConcurrentNodeSyncs
}
return nil
}

View file

@ -242,7 +242,9 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, allControllers
// sync back to component config
// TODO: find more elegant way than syncing back the values.
c.ComponentConfig.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
c.ComponentConfig.NodeController.ConcurrentNodeSyncs = o.NodeController.ConcurrentNodeSyncs
if err = o.NodeController.ApplyTo(&c.ComponentConfig.NodeController); err != nil {
return err
}
return nil
}

View file

@ -423,7 +423,11 @@ func TestCreateConfig(t *testing.T) {
ServiceController: serviceconfig.ServiceControllerConfiguration{
ConcurrentServiceSyncs: 1,
},
NodeController: nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 1},
NodeController: nodeconfig.NodeControllerConfiguration{
ConcurrentNodeSyncs: 1,
// ConcurrentNodeStatusUpdates should default to the value of ConcurrentNodeSyncs only at the stage of config creation
ConcurrentNodeStatusUpdates: 1,
},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Minute},
Webhook: cpconfig.WebhookConfiguration{
Webhooks: []string{"foo", "bar", "-baz"},
@ -491,6 +495,7 @@ func TestCreateConfigWithoutWebHooks(t *testing.T) {
"--controller-start-interval=2m",
"--controllers=foo,bar",
"--concurrent-node-syncs=1",
"--concurrent-node-status-updates=2",
"--http2-max-streams-per-connection=47",
"--kube-api-burst=101",
"--kube-api-content-type=application/vnd.kubernetes.protobuf",
@ -564,7 +569,10 @@ func TestCreateConfigWithoutWebHooks(t *testing.T) {
ServiceController: serviceconfig.ServiceControllerConfiguration{
ConcurrentServiceSyncs: 1,
},
NodeController: nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 1},
NodeController: nodeconfig.NodeControllerConfiguration{
ConcurrentNodeSyncs: 1,
ConcurrentNodeStatusUpdates: 2,
},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Minute},
Webhook: cpconfig.WebhookConfiguration{},
},