2026-03-17 08:25:52 -04:00
|
|
|
/*
|
|
|
|
|
Copyright The Kubernetes Authors.
|
|
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
|
limitations under the License.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package resourcepoolstatusrequest
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"sort"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
resourcev1alpha3 "k8s.io/api/resource/v1alpha3"
|
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
|
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
2026-03-18 15:46:53 -04:00
|
|
|
resourcev1informers "k8s.io/client-go/informers/resource/v1"
|
|
|
|
|
resourcev1alpha3informers "k8s.io/client-go/informers/resource/v1alpha3"
|
2026-03-17 08:25:52 -04:00
|
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
|
|
|
resourcev1listers "k8s.io/client-go/listers/resource/v1"
|
|
|
|
|
resourcev1alpha3listers "k8s.io/client-go/listers/resource/v1alpha3"
|
|
|
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
|
"k8s.io/klog/v2"
|
|
|
|
|
"k8s.io/kubernetes/pkg/controller/resourcepoolstatusrequest/metrics"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// ControllerName is the name of this controller.
|
|
|
|
|
ControllerName = "resourcepoolstatusrequest-controller"
|
|
|
|
|
|
|
|
|
|
// maxRetries is the number of times a request will be retried before it is dropped.
|
|
|
|
|
maxRetries = 5
|
|
|
|
|
|
|
|
|
|
// cleanupPollingInterval is how often the controller checks for expired requests.
|
|
|
|
|
cleanupPollingInterval = 10 * time.Minute
|
|
|
|
|
|
|
|
|
|
// completedRequestTTL is how long a completed request (with Complete or Failed condition)
|
|
|
|
|
// is retained before being automatically deleted.
|
|
|
|
|
completedRequestTTL = 1 * time.Hour
|
|
|
|
|
|
|
|
|
|
// pendingRequestTTL is how long a pending request (without status)
|
|
|
|
|
// is retained before being automatically deleted. This handles stuck requests.
|
|
|
|
|
pendingRequestTTL = 24 * time.Hour
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Controller manages ResourcePoolStatusRequest processing.
|
|
|
|
|
type Controller struct {
|
|
|
|
|
client clientset.Interface
|
|
|
|
|
|
|
|
|
|
// requestLister can list/get ResourcePoolStatusRequests from the shared informer's store
|
|
|
|
|
requestLister resourcev1alpha3listers.ResourcePoolStatusRequestLister
|
|
|
|
|
|
|
|
|
|
// sliceLister can list/get ResourceSlices from the shared informer's store
|
|
|
|
|
sliceLister resourcev1listers.ResourceSliceLister
|
|
|
|
|
|
|
|
|
|
// claimLister can list/get ResourceClaims from the shared informer's store
|
|
|
|
|
claimLister resourcev1listers.ResourceClaimLister
|
|
|
|
|
|
|
|
|
|
// requestSynced returns true if the ResourcePoolStatusRequest store has been synced
|
|
|
|
|
requestSynced cache.InformerSynced
|
|
|
|
|
|
|
|
|
|
// sliceSynced returns true if the ResourceSlice store has been synced
|
|
|
|
|
sliceSynced cache.InformerSynced
|
|
|
|
|
|
|
|
|
|
// claimSynced returns true if the ResourceClaim store has been synced
|
|
|
|
|
claimSynced cache.InformerSynced
|
|
|
|
|
|
|
|
|
|
// workqueue is a rate limited work queue for processing ResourcePoolStatusRequests
|
|
|
|
|
workqueue workqueue.TypedRateLimitingInterface[string]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewController creates a new ResourcePoolStatusRequest controller.
|
|
|
|
|
func NewController(
|
|
|
|
|
ctx context.Context,
|
|
|
|
|
client clientset.Interface,
|
2026-03-18 15:46:53 -04:00
|
|
|
requestInformer resourcev1alpha3informers.ResourcePoolStatusRequestInformer,
|
|
|
|
|
sliceInformer resourcev1informers.ResourceSliceInformer,
|
|
|
|
|
claimInformer resourcev1informers.ResourceClaimInformer,
|
2026-03-17 08:25:52 -04:00
|
|
|
) (*Controller, error) {
|
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
|
|
|
|
|
|
c := &Controller{
|
|
|
|
|
client: client,
|
|
|
|
|
requestLister: requestInformer.Lister(),
|
|
|
|
|
sliceLister: sliceInformer.Lister(),
|
|
|
|
|
claimLister: claimInformer.Lister(),
|
|
|
|
|
requestSynced: requestInformer.Informer().HasSynced,
|
|
|
|
|
sliceSynced: sliceInformer.Informer().HasSynced,
|
|
|
|
|
claimSynced: claimInformer.Informer().HasSynced,
|
|
|
|
|
workqueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Register metrics
|
|
|
|
|
metrics.Register()
|
|
|
|
|
|
|
|
|
|
// Set up event handlers for ResourcePoolStatusRequests
|
|
|
|
|
_, err := requestInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
|
|
|
AddFunc: func(obj interface{}) {
|
|
|
|
|
c.enqueueRequest(logger, obj)
|
|
|
|
|
},
|
|
|
|
|
UpdateFunc: func(old, new interface{}) {
|
2026-03-18 15:46:53 -04:00
|
|
|
c.enqueueRequest(logger, new)
|
2026-03-17 08:25:52 -04:00
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to add request event handler: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Info("ResourcePoolStatusRequest controller initialized")
|
|
|
|
|
return c, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Run starts the controller workers.
|
|
|
|
|
func (c *Controller) Run(ctx context.Context, workers int) {
|
|
|
|
|
defer utilruntime.HandleCrash()
|
|
|
|
|
defer c.workqueue.ShutDown()
|
|
|
|
|
|
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
|
logger.Info("Starting ResourcePoolStatusRequest controller")
|
|
|
|
|
|
|
|
|
|
// Wait for the caches to be synced before starting workers
|
|
|
|
|
logger.Info("Waiting for informer caches to sync")
|
|
|
|
|
if !cache.WaitForCacheSync(ctx.Done(), c.requestSynced, c.sliceSynced, c.claimSynced) {
|
|
|
|
|
logger.Error(nil, "Failed to wait for caches to sync")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Info("Starting workers", "count", workers)
|
|
|
|
|
for range workers {
|
|
|
|
|
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start the cleanup goroutine for TTL-based garbage collection
|
|
|
|
|
go wait.UntilWithContext(ctx, c.cleanupExpiredRequests, cleanupPollingInterval)
|
|
|
|
|
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
logger.Info("Shutting down ResourcePoolStatusRequest controller")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// runWorker is a long-running function that will continually call the
|
|
|
|
|
// processNextWorkItem function in order to read and process a message on the workqueue.
|
|
|
|
|
func (c *Controller) runWorker(ctx context.Context) {
|
|
|
|
|
for c.processNextWorkItem(ctx) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// processNextWorkItem will read a single work item off the workqueue and
|
|
|
|
|
// attempt to process it.
|
|
|
|
|
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
|
|
|
|
|
key, shutdown := c.workqueue.Get()
|
|
|
|
|
if shutdown {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
defer c.workqueue.Done(key)
|
|
|
|
|
|
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
|
err := c.syncRequest(ctx, key)
|
|
|
|
|
if err == nil {
|
|
|
|
|
c.workqueue.Forget(key)
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.workqueue.NumRequeues(key) < maxRetries {
|
|
|
|
|
logger.Error(err, "Error syncing request, requeuing", "request", key)
|
|
|
|
|
c.workqueue.AddRateLimited(key)
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Error(err, "Dropping request out of the queue after max retries", "request", key, "retries", maxRetries)
|
|
|
|
|
c.workqueue.Forget(key)
|
|
|
|
|
utilruntime.HandleError(err)
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// syncRequest processes a single ResourcePoolStatusRequest.
|
|
|
|
|
func (c *Controller) syncRequest(ctx context.Context, key string) error {
|
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
|
startTime := time.Now()
|
|
|
|
|
var driverName string
|
|
|
|
|
var recordDuration bool
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
if driverName != "" && recordDuration {
|
|
|
|
|
metrics.RequestProcessingDuration.WithLabelValues(driverName).Observe(time.Since(startTime).Seconds())
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Get the request
|
|
|
|
|
request, err := c.requestLister.Get(key)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Request was deleted, nothing to do
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
driverName = request.Spec.Driver
|
|
|
|
|
|
|
|
|
|
// Skip if already processed (status is set)
|
|
|
|
|
if request.Status != nil {
|
|
|
|
|
logger.V(4).Info("Request already processed, skipping", "request", key)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.V(2).Info("Processing ResourcePoolStatusRequest", "request", key, "driver", request.Spec.Driver)
|
|
|
|
|
|
|
|
|
|
// Calculate the pool status on-demand from listers
|
|
|
|
|
status := c.calculatePoolStatus(ctx, request)
|
|
|
|
|
|
|
|
|
|
// Requeue if pools have validation errors and retries remain — gives drivers
|
|
|
|
|
// time to publish remaining slices before we finalize the response.
|
|
|
|
|
hasIncomplete := false
|
|
|
|
|
for _, p := range status.Pools {
|
|
|
|
|
if p.ValidationError != nil {
|
|
|
|
|
hasIncomplete = true
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-18 15:46:53 -04:00
|
|
|
if hasIncomplete {
|
2026-03-17 08:25:52 -04:00
|
|
|
return fmt.Errorf("incomplete pools detected, requeueing")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update the request status
|
|
|
|
|
requestCopy := request.DeepCopy()
|
|
|
|
|
requestCopy.Status = &status
|
|
|
|
|
|
|
|
|
|
_, err = c.client.ResourceV1alpha3().ResourcePoolStatusRequests().UpdateStatus(ctx, requestCopy, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
recordDuration = true
|
|
|
|
|
metrics.RequestProcessingErrors.WithLabelValues(driverName).Inc()
|
|
|
|
|
metrics.RequestsProcessed.WithLabelValues(driverName).Inc()
|
|
|
|
|
return fmt.Errorf("failed to update status for request %s: %w", key, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
recordDuration = true
|
|
|
|
|
metrics.RequestsProcessed.WithLabelValues(driverName).Inc()
|
|
|
|
|
logger.V(2).Info("Successfully processed ResourcePoolStatusRequest", "request", key, "poolCount", len(status.Pools))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// calculatePoolStatus computes the pool status on-demand by reading directly
|
|
|
|
|
// from the shared informer listers. No caches are maintained between requests.
|
|
|
|
|
func (c *Controller) calculatePoolStatus(ctx context.Context, request *resourcev1alpha3.ResourcePoolStatusRequest) resourcev1alpha3.ResourcePoolStatusRequestStatus {
|
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
|
|
|
|
|
|
driver := request.Spec.Driver
|
|
|
|
|
var poolNameFilter string
|
|
|
|
|
if request.Spec.PoolName != nil {
|
|
|
|
|
poolNameFilter = *request.Spec.PoolName
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 1: Aggregate pool data from ResourceSlices.
|
|
|
|
|
// Uses two passes: first finds the max generation per pool, then only
|
|
|
|
|
// counts slices at that generation (older-generation slices are ignored
|
|
|
|
|
// per KEP-5677).
|
|
|
|
|
type poolInfo struct {
|
|
|
|
|
driver string
|
|
|
|
|
poolName string
|
|
|
|
|
nodeName string
|
|
|
|
|
nodeNameMixed bool // true when slices have different NodeNames
|
|
|
|
|
totalDevices int32
|
|
|
|
|
sliceCount int32
|
|
|
|
|
expectedSliceCount int64
|
|
|
|
|
generation int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
slices, err := c.sliceLister.List(labels.Everything())
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error(err, "Failed to list ResourceSlices")
|
|
|
|
|
return errorStatus("Failed to list ResourceSlices: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Pass 1: Find max generation per pool
|
|
|
|
|
maxGeneration := make(map[string]int64)
|
|
|
|
|
for _, slice := range slices {
|
|
|
|
|
if slice.Spec.Driver != driver {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
slicePoolName := slice.Spec.Pool.Name
|
|
|
|
|
if poolNameFilter != "" && slicePoolName != poolNameFilter {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
key := slice.Spec.Driver + "/" + slicePoolName
|
|
|
|
|
if gen, exists := maxGeneration[key]; !exists || slice.Spec.Pool.Generation > gen {
|
|
|
|
|
maxGeneration[key] = slice.Spec.Pool.Generation
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Pass 2: Aggregate only slices at max generation
|
|
|
|
|
poolData := make(map[string]*poolInfo)
|
|
|
|
|
for _, slice := range slices {
|
|
|
|
|
if slice.Spec.Driver != driver {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
slicePoolName := slice.Spec.Pool.Name
|
|
|
|
|
if poolNameFilter != "" && slicePoolName != poolNameFilter {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
key := slice.Spec.Driver + "/" + slicePoolName
|
|
|
|
|
|
|
|
|
|
// Skip slices from older generations
|
|
|
|
|
if slice.Spec.Pool.Generation < maxGeneration[key] {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
deviceCount := int32(len(slice.Spec.Devices))
|
|
|
|
|
info, exists := poolData[key]
|
|
|
|
|
if !exists {
|
|
|
|
|
var nodeName string
|
|
|
|
|
if slice.Spec.NodeName != nil {
|
|
|
|
|
nodeName = *slice.Spec.NodeName
|
|
|
|
|
}
|
|
|
|
|
poolData[key] = &poolInfo{
|
|
|
|
|
driver: slice.Spec.Driver,
|
|
|
|
|
poolName: slicePoolName,
|
|
|
|
|
nodeName: nodeName,
|
|
|
|
|
totalDevices: deviceCount,
|
|
|
|
|
sliceCount: 1,
|
|
|
|
|
expectedSliceCount: slice.Spec.Pool.ResourceSliceCount,
|
|
|
|
|
generation: maxGeneration[key],
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
info.totalDevices += deviceCount
|
|
|
|
|
info.sliceCount++
|
|
|
|
|
// Check NodeName consistency across slices
|
|
|
|
|
sliceNodeName := ""
|
|
|
|
|
if slice.Spec.NodeName != nil {
|
|
|
|
|
sliceNodeName = *slice.Spec.NodeName
|
|
|
|
|
}
|
|
|
|
|
if sliceNodeName != info.nodeName {
|
|
|
|
|
info.nodeNameMixed = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 2: Count allocations from ResourceClaims
|
|
|
|
|
allocationData := make(map[string]int32)
|
|
|
|
|
|
|
|
|
|
claims, err := c.claimLister.List(labels.Everything())
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error(err, "Failed to list ResourceClaims")
|
|
|
|
|
return errorStatus("Failed to list ResourceClaims: " + err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, claim := range claims {
|
|
|
|
|
if claim.Status.Allocation == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
for _, result := range claim.Status.Allocation.Devices.Results {
|
|
|
|
|
key := result.Driver + "/" + result.Pool
|
|
|
|
|
allocationData[key]++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 3: Build pool status list, marking incomplete pools with a validation error
|
|
|
|
|
var pools []resourcev1alpha3.PoolStatus
|
|
|
|
|
for key, info := range poolData {
|
|
|
|
|
pool := resourcev1alpha3.PoolStatus{
|
|
|
|
|
Driver: info.driver,
|
|
|
|
|
PoolName: info.poolName,
|
|
|
|
|
Generation: info.generation,
|
|
|
|
|
}
|
|
|
|
|
if info.nodeName != "" && !info.nodeNameMixed {
|
|
|
|
|
nodeName := info.nodeName
|
|
|
|
|
pool.NodeName = &nodeName
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if int64(info.sliceCount) < info.expectedSliceCount {
|
|
|
|
|
// Incomplete pool: set validation error, leave device counts and slice count nil
|
|
|
|
|
errMsg := fmt.Sprintf("pool %s/%s is incomplete: observed %d/%d slices at generation %d",
|
|
|
|
|
info.driver, info.poolName, info.sliceCount, info.expectedSliceCount, info.generation)
|
2026-03-17 23:17:28 -04:00
|
|
|
// Truncate to 256 bytes to stay within the API field's +k8s:maxBytes=256 limit.
|
|
|
|
|
if len(errMsg) > 256 {
|
|
|
|
|
errMsg = errMsg[:256]
|
|
|
|
|
}
|
2026-03-17 08:25:52 -04:00
|
|
|
pool.ValidationError = &errMsg
|
|
|
|
|
} else {
|
|
|
|
|
// Complete pool: populate device counts and slice count
|
|
|
|
|
allocatedDevices := allocationData[key]
|
|
|
|
|
// UnavailableDevices is currently always 0 in Alpha because the controller
|
|
|
|
|
// does not yet inspect device conditions/taints. This will be computed from
|
|
|
|
|
// real data when device health tracking is wired in.
|
|
|
|
|
unavailDevices := int32(0)
|
|
|
|
|
availableDevices := max(0, info.totalDevices-allocatedDevices-unavailDevices)
|
|
|
|
|
|
|
|
|
|
totalDevices := info.totalDevices
|
|
|
|
|
allocDevices := allocatedDevices
|
|
|
|
|
availDevices := availableDevices
|
|
|
|
|
sliceCount := info.sliceCount
|
|
|
|
|
pool.ResourceSliceCount = &sliceCount
|
|
|
|
|
pool.TotalDevices = &totalDevices
|
|
|
|
|
pool.AllocatedDevices = &allocDevices
|
|
|
|
|
pool.AvailableDevices = &availDevices
|
|
|
|
|
pool.UnavailableDevices = &unavailDevices
|
|
|
|
|
}
|
|
|
|
|
pools = append(pools, pool)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Sort pools by driver, then pool name
|
|
|
|
|
sort.Slice(pools, func(i, j int) bool {
|
|
|
|
|
if pools[i].Driver != pools[j].Driver {
|
|
|
|
|
return pools[i].Driver < pools[j].Driver
|
|
|
|
|
}
|
|
|
|
|
return pools[i].PoolName < pools[j].PoolName
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
poolCount := int32(len(pools))
|
|
|
|
|
|
|
|
|
|
// Apply limit if specified
|
|
|
|
|
limit := resourcev1alpha3.ResourcePoolStatusRequestLimitDefault
|
|
|
|
|
if request.Spec.Limit != nil {
|
|
|
|
|
limit = *request.Spec.Limit
|
|
|
|
|
}
|
|
|
|
|
if int32(len(pools)) > limit {
|
|
|
|
|
pools = pools[:limit]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Count incomplete pools for condition message
|
|
|
|
|
incompletePoolCount := 0
|
|
|
|
|
for _, p := range pools {
|
|
|
|
|
if p.ValidationError != nil {
|
|
|
|
|
incompletePoolCount++
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
conditionMessage := fmt.Sprintf("Calculated status for %d pools", len(pools))
|
|
|
|
|
if incompletePoolCount > 0 {
|
|
|
|
|
conditionMessage = fmt.Sprintf("Calculated status for %d pools (%d incomplete)", len(pools), incompletePoolCount)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
now := metav1.Now()
|
|
|
|
|
status := resourcev1alpha3.ResourcePoolStatusRequestStatus{
|
|
|
|
|
PoolCount: &poolCount,
|
|
|
|
|
Pools: pools,
|
|
|
|
|
Conditions: []metav1.Condition{
|
|
|
|
|
{
|
|
|
|
|
Type: resourcev1alpha3.ResourcePoolStatusRequestConditionComplete,
|
|
|
|
|
Status: metav1.ConditionTrue,
|
|
|
|
|
LastTransitionTime: now,
|
|
|
|
|
Reason: "CalculationComplete",
|
|
|
|
|
Message: conditionMessage,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.V(4).Info("Calculated pool status",
|
|
|
|
|
"request", request.Name,
|
|
|
|
|
"poolCount", poolCount,
|
|
|
|
|
"returned", len(pools))
|
|
|
|
|
|
|
|
|
|
return status
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// errorStatus returns a status indicating a processing failure.
|
|
|
|
|
func errorStatus(message string) resourcev1alpha3.ResourcePoolStatusRequestStatus {
|
|
|
|
|
now := metav1.Now()
|
|
|
|
|
zero := int32(0)
|
|
|
|
|
return resourcev1alpha3.ResourcePoolStatusRequestStatus{
|
|
|
|
|
PoolCount: &zero,
|
|
|
|
|
Conditions: []metav1.Condition{
|
|
|
|
|
{
|
|
|
|
|
Type: resourcev1alpha3.ResourcePoolStatusRequestConditionFailed,
|
|
|
|
|
Status: metav1.ConditionTrue,
|
|
|
|
|
LastTransitionTime: now,
|
|
|
|
|
Reason: "CalculationFailed",
|
|
|
|
|
Message: message,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// enqueueRequest adds a request to the workqueue.
|
|
|
|
|
func (c *Controller) enqueueRequest(logger klog.Logger, obj interface{}) {
|
|
|
|
|
request, ok := obj.(*resourcev1alpha3.ResourcePoolStatusRequest)
|
|
|
|
|
if !ok {
|
|
|
|
|
logger.Error(nil, "Failed to cast object to ResourcePoolStatusRequest")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Skip if already processed (status is set)
|
|
|
|
|
if request.Status != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.workqueue.Add(request.Name)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cleanupExpiredRequests deletes ResourcePoolStatusRequests that have exceeded their TTL.
|
|
|
|
|
// Completed requests (with status set) are deleted after completedRequestTTL.
|
|
|
|
|
// Pending requests (without status) are deleted after pendingRequestTTL.
|
|
|
|
|
func (c *Controller) cleanupExpiredRequests(ctx context.Context) {
|
|
|
|
|
logger := klog.FromContext(ctx)
|
|
|
|
|
|
|
|
|
|
requests, err := c.requestLister.List(labels.Everything())
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error(err, "Failed to list ResourcePoolStatusRequests for cleanup")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var deletedCount int
|
|
|
|
|
for _, request := range requests {
|
|
|
|
|
if c.shouldDeleteRequest(request) {
|
|
|
|
|
if err := c.client.ResourceV1alpha3().ResourcePoolStatusRequests().Delete(ctx, request.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &request.UID}}); err != nil {
|
|
|
|
|
// Ignore NotFound errors - the request may have been deleted by another process
|
|
|
|
|
if !apierrors.IsNotFound(err) {
|
|
|
|
|
logger.Error(err, "Failed to delete expired ResourcePoolStatusRequest", "request", request.Name)
|
|
|
|
|
}
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
deletedCount++
|
|
|
|
|
logger.V(2).Info("Deleted expired ResourcePoolStatusRequest", "request", request.Name)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if deletedCount > 0 {
|
|
|
|
|
logger.Info("Cleanup completed", "deletedCount", deletedCount)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// shouldDeleteRequest determines if a request should be deleted based on TTL.
|
|
|
|
|
func (c *Controller) shouldDeleteRequest(request *resourcev1alpha3.ResourcePoolStatusRequest) bool {
|
|
|
|
|
if request.Status != nil {
|
|
|
|
|
// Completed/failed request: check against completedRequestTTL using condition time
|
|
|
|
|
for _, cond := range request.Status.Conditions {
|
|
|
|
|
if cond.Type == resourcev1alpha3.ResourcePoolStatusRequestConditionComplete ||
|
|
|
|
|
cond.Type == resourcev1alpha3.ResourcePoolStatusRequestConditionFailed {
|
|
|
|
|
return isOlderThan(cond.LastTransitionTime.Time, completedRequestTTL)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Status set but no Complete/Failed condition (edge case); use creation time
|
|
|
|
|
return isOlderThan(request.CreationTimestamp.Time, completedRequestTTL)
|
|
|
|
|
}
|
|
|
|
|
// Pending request: check against pendingRequestTTL based on creation time
|
|
|
|
|
return isOlderThan(request.CreationTimestamp.Time, pendingRequestTTL)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// isOlderThan checks if the given time is older than the specified duration.
|
|
|
|
|
func isOlderThan(t time.Time, d time.Duration) bool {
|
|
|
|
|
return !t.IsZero() && time.Since(t) > d
|
|
|
|
|
}
|