2019-03-27 18:21:32 -04:00
/ *
Copyright 2019 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2020-06-19 05:05:45 -04:00
package runtime
2019-03-27 18:21:32 -04:00
import (
2019-07-16 09:19:20 -04:00
"context"
2019-03-27 18:21:32 -04:00
"fmt"
2019-09-26 21:15:32 -04:00
"reflect"
2020-11-10 19:41:18 -05:00
"sort"
2019-05-10 09:05:59 -04:00
"time"
2019-03-27 18:21:32 -04:00
2020-02-15 00:48:45 -05:00
v1 "k8s.io/api/core/v1"
2019-03-27 18:21:32 -04:00
"k8s.io/apimachinery/pkg/runtime"
2019-05-10 09:05:59 -04:00
"k8s.io/apimachinery/pkg/types"
2019-09-26 21:15:32 -04:00
"k8s.io/apimachinery/pkg/util/sets"
2019-10-09 02:49:56 -04:00
"k8s.io/client-go/informers"
2019-08-19 05:46:15 -04:00
clientset "k8s.io/client-go/kubernetes"
2021-03-29 15:27:27 -04:00
restclient "k8s.io/client-go/rest"
2020-06-10 18:51:32 -04:00
"k8s.io/client-go/tools/events"
2020-12-19 11:18:40 -05:00
"k8s.io/component-helpers/scheduling/corev1"
2020-04-17 15:25:06 -04:00
"k8s.io/klog/v2"
2019-05-04 06:29:30 -04:00
"k8s.io/kubernetes/pkg/scheduler/apis/config"
2020-10-09 10:41:44 -04:00
"k8s.io/kubernetes/pkg/scheduler/framework"
2021-10-20 13:35:12 -04:00
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
2019-10-04 10:25:17 -04:00
"k8s.io/kubernetes/pkg/scheduler/metrics"
2019-03-27 18:21:32 -04:00
)
2019-10-04 17:40:21 -04:00
const (
2020-01-22 10:51:39 -05:00
// Filter is the name of the filter extension point.
Filter = "Filter"
2019-10-04 17:40:21 -04:00
// Specifies the maximum timeout a permit plugin can return.
2020-08-04 02:46:23 -04:00
maxTimeout = 15 * time . Minute
preFilter = "PreFilter"
preFilterExtensionAddPod = "PreFilterExtensionAddPod"
preFilterExtensionRemovePod = "PreFilterExtensionRemovePod"
postFilter = "PostFilter"
preScore = "PreScore"
score = "Score"
scoreExtensionNormalize = "ScoreExtensionNormalize"
preBind = "PreBind"
bind = "Bind"
postBind = "PostBind"
reserve = "Reserve"
unreserve = "Unreserve"
permit = "Permit"
2019-10-04 17:40:21 -04:00
)
2021-01-29 01:29:10 -05:00
var allClusterEvents = [ ] framework . ClusterEvent {
{ Resource : framework . Pod , ActionType : framework . All } ,
{ Resource : framework . Node , ActionType : framework . All } ,
{ Resource : framework . CSINode , ActionType : framework . All } ,
{ Resource : framework . PersistentVolume , ActionType : framework . All } ,
{ Resource : framework . PersistentVolumeClaim , ActionType : framework . All } ,
{ Resource : framework . StorageClass , ActionType : framework . All } ,
}
2020-06-19 05:05:45 -04:00
// frameworkImpl is the component responsible for initializing and running scheduler
2019-03-27 18:21:32 -04:00
// plugins.
2020-06-19 05:05:45 -04:00
type frameworkImpl struct {
2021-03-29 04:34:10 -04:00
registry Registry
snapshotSharedLister framework . SharedLister
waitingPods * waitingPodsMap
scorePluginWeight map [ string ] int
queueSortPlugins [ ] framework . QueueSortPlugin
preFilterPlugins [ ] framework . PreFilterPlugin
filterPlugins [ ] framework . FilterPlugin
postFilterPlugins [ ] framework . PostFilterPlugin
preScorePlugins [ ] framework . PreScorePlugin
scorePlugins [ ] framework . ScorePlugin
reservePlugins [ ] framework . ReservePlugin
preBindPlugins [ ] framework . PreBindPlugin
bindPlugins [ ] framework . BindPlugin
postBindPlugins [ ] framework . PostBindPlugin
permitPlugins [ ] framework . PermitPlugin
2019-08-19 05:46:15 -04:00
2019-10-09 02:49:56 -04:00
clientSet clientset . Interface
2021-03-29 15:27:27 -04:00
kubeConfig * restclient . Config
2020-06-10 18:51:32 -04:00
eventRecorder events . EventRecorder
2019-10-09 02:49:56 -04:00
informerFactory informers . SharedInformerFactory
2019-10-29 10:54:02 -04:00
metricsRecorder * metricsRecorder
2020-06-17 17:49:30 -04:00
profileName string
2019-12-20 16:50:17 -05:00
2021-02-26 11:44:05 -05:00
extenders [ ] framework . Extender
framework . PodNominator
2020-05-18 13:29:08 -04:00
2021-03-01 09:20:18 -05:00
parallelizer parallelize . Parallelizer
2019-12-20 16:50:17 -05:00
// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
// after the first failure.
runAllFilters bool
2019-03-27 18:21:32 -04:00
}
2019-10-04 17:40:21 -04:00
// extensionPoint encapsulates desired and applied set of plugins at a specific extension
// point. This is used to simplify iterating over all extension points supported by the
2020-06-19 05:05:45 -04:00
// frameworkImpl.
2019-10-04 17:40:21 -04:00
type extensionPoint struct {
// the set of plugins to be configured at this extension point.
2021-06-10 08:45:49 -04:00
plugins * config . PluginSet
2019-10-04 17:40:21 -04:00
// a pointer to the slice storing plugins implementations that will run at this
2019-12-12 02:44:37 -05:00
// extension point.
2019-10-04 17:40:21 -04:00
slicePtr interface { }
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) getExtensionPoints ( plugins * config . Plugins ) [ ] extensionPoint {
2019-10-04 17:40:21 -04:00
return [ ] extensionPoint {
2021-06-10 08:45:49 -04:00
{ & plugins . PreFilter , & f . preFilterPlugins } ,
{ & plugins . Filter , & f . filterPlugins } ,
{ & plugins . PostFilter , & f . postFilterPlugins } ,
{ & plugins . Reserve , & f . reservePlugins } ,
{ & plugins . PreScore , & f . preScorePlugins } ,
{ & plugins . Score , & f . scorePlugins } ,
{ & plugins . PreBind , & f . preBindPlugins } ,
{ & plugins . Bind , & f . bindPlugins } ,
{ & plugins . PostBind , & f . postBindPlugins } ,
{ & plugins . Permit , & f . permitPlugins } ,
{ & plugins . QueueSort , & f . queueSortPlugins } ,
2019-10-04 17:40:21 -04:00
}
}
2019-05-10 09:05:59 -04:00
2021-02-26 11:44:05 -05:00
// Extenders returns the registered extenders.
func ( f * frameworkImpl ) Extenders ( ) [ ] framework . Extender {
return f . extenders
}
2019-08-19 05:46:15 -04:00
type frameworkOptions struct {
2021-03-10 11:08:05 -05:00
componentConfigVersion string
clientSet clientset . Interface
kubeConfig * restclient . Config
eventRecorder events . EventRecorder
informerFactory informers . SharedInformerFactory
snapshotSharedLister framework . SharedLister
metricsRecorder * metricsRecorder
podNominator framework . PodNominator
extenders [ ] framework . Extender
runAllFilters bool
captureProfile CaptureProfile
clusterEventMap map [ framework . ClusterEvent ] sets . String
parallelizer parallelize . Parallelizer
2019-08-19 05:46:15 -04:00
}
2020-06-19 05:05:45 -04:00
// Option for the frameworkImpl.
2019-08-19 05:46:15 -04:00
type Option func ( * frameworkOptions )
2021-03-10 11:08:05 -05:00
// WithComponentConfigVersion sets the component config version to the
// KubeSchedulerConfiguration version used. The string should be the full
// scheme group/version of the external type we converted from (for example
// "kubescheduler.config.k8s.io/v1beta2")
func WithComponentConfigVersion ( componentConfigVersion string ) Option {
return func ( o * frameworkOptions ) {
o . componentConfigVersion = componentConfigVersion
}
}
2020-06-19 05:05:45 -04:00
// WithClientSet sets clientSet for the scheduling frameworkImpl.
2019-08-19 05:46:15 -04:00
func WithClientSet ( clientSet clientset . Interface ) Option {
return func ( o * frameworkOptions ) {
o . clientSet = clientSet
}
}
2021-03-29 15:27:27 -04:00
// WithKubeConfig sets kubeConfig for the scheduling frameworkImpl.
func WithKubeConfig ( kubeConfig * restclient . Config ) Option {
return func ( o * frameworkOptions ) {
o . kubeConfig = kubeConfig
}
}
2020-06-19 05:05:45 -04:00
// WithEventRecorder sets clientSet for the scheduling frameworkImpl.
2020-06-10 18:51:32 -04:00
func WithEventRecorder ( recorder events . EventRecorder ) Option {
return func ( o * frameworkOptions ) {
o . eventRecorder = recorder
}
}
2020-06-19 05:05:45 -04:00
// WithInformerFactory sets informer factory for the scheduling frameworkImpl.
2019-10-09 02:49:56 -04:00
func WithInformerFactory ( informerFactory informers . SharedInformerFactory ) Option {
return func ( o * frameworkOptions ) {
o . informerFactory = informerFactory
}
}
2019-11-05 21:25:07 -05:00
// WithSnapshotSharedLister sets the SharedLister of the snapshot.
2020-06-19 05:05:45 -04:00
func WithSnapshotSharedLister ( snapshotSharedLister framework . SharedLister ) Option {
2019-10-24 18:30:21 -04:00
return func ( o * frameworkOptions ) {
2019-11-05 21:25:07 -05:00
o . snapshotSharedLister = snapshotSharedLister
2019-10-24 18:30:21 -04:00
}
}
2019-12-20 16:50:17 -05:00
// WithRunAllFilters sets the runAllFilters flag, which means RunFilterPlugins accumulates
// all failure Statuses.
func WithRunAllFilters ( runAllFilters bool ) Option {
return func ( o * frameworkOptions ) {
o . runAllFilters = runAllFilters
}
}
2020-06-19 05:05:45 -04:00
// WithPodNominator sets podNominator for the scheduling frameworkImpl.
func WithPodNominator ( nominator framework . PodNominator ) Option {
2020-05-18 13:29:08 -04:00
return func ( o * frameworkOptions ) {
o . podNominator = nominator
}
}
2020-06-19 05:05:45 -04:00
// WithExtenders sets extenders for the scheduling frameworkImpl.
func WithExtenders ( extenders [ ] framework . Extender ) Option {
2020-04-23 20:08:59 -04:00
return func ( o * frameworkOptions ) {
o . extenders = extenders
}
}
2021-03-01 09:20:18 -05:00
// WithParallelism sets parallelism for the scheduling frameworkImpl.
func WithParallelism ( parallelism int ) Option {
return func ( o * frameworkOptions ) {
o . parallelizer = parallelize . NewParallelizer ( parallelism )
}
}
2020-11-10 19:41:18 -05:00
// CaptureProfile is a callback to capture a finalized profile.
type CaptureProfile func ( config . KubeSchedulerProfile )
// WithCaptureProfile sets a callback to capture the finalized profile.
func WithCaptureProfile ( c CaptureProfile ) Option {
return func ( o * frameworkOptions ) {
o . captureProfile = c
}
}
2021-01-22 01:40:06 -05:00
func defaultFrameworkOptions ( ) frameworkOptions {
return frameworkOptions {
metricsRecorder : newMetricsRecorder ( 1000 , time . Second ) ,
2021-01-29 01:29:10 -05:00
clusterEventMap : make ( map [ framework . ClusterEvent ] sets . String ) ,
2021-03-01 09:20:18 -05:00
parallelizer : parallelize . NewParallelizer ( parallelize . DefaultParallelism ) ,
2021-01-29 01:29:10 -05:00
}
}
// WithClusterEventMap sets clusterEventMap for the scheduling frameworkImpl.
func WithClusterEventMap ( m map [ framework . ClusterEvent ] sets . String ) Option {
return func ( o * frameworkOptions ) {
o . clusterEventMap = m
2021-01-22 01:40:06 -05:00
}
2019-10-29 10:54:02 -04:00
}
2019-08-19 05:46:15 -04:00
2020-06-19 05:05:45 -04:00
var _ framework . Framework = & frameworkImpl { }
2019-03-27 18:21:32 -04:00
// NewFramework initializes plugins given the configuration and the registry.
2021-03-03 16:44:25 -05:00
func NewFramework ( r Registry , profile * config . KubeSchedulerProfile , opts ... Option ) ( framework . Framework , error ) {
2021-01-22 01:40:06 -05:00
options := defaultFrameworkOptions ( )
2019-08-19 05:46:15 -04:00
for _ , opt := range opts {
opt ( & options )
}
2020-06-19 05:05:45 -04:00
f := & frameworkImpl {
2021-03-29 04:34:10 -04:00
registry : r ,
snapshotSharedLister : options . snapshotSharedLister ,
scorePluginWeight : make ( map [ string ] int ) ,
waitingPods : newWaitingPodsMap ( ) ,
clientSet : options . clientSet ,
kubeConfig : options . kubeConfig ,
eventRecorder : options . eventRecorder ,
informerFactory : options . informerFactory ,
metricsRecorder : options . metricsRecorder ,
runAllFilters : options . runAllFilters ,
extenders : options . extenders ,
PodNominator : options . podNominator ,
parallelizer : options . parallelizer ,
2020-04-23 20:08:59 -04:00
}
2021-03-03 16:44:25 -05:00
if profile == nil {
return f , nil
}
f . profileName = profile . SchedulerName
if profile . Plugins == nil {
2019-05-04 06:29:30 -04:00
return f , nil
}
// get needed plugins from config
2021-03-03 16:44:25 -05:00
pg := f . pluginsNeeded ( profile . Plugins )
2019-03-27 18:21:32 -04:00
2021-03-03 16:44:25 -05:00
pluginConfig := make ( map [ string ] runtime . Object , len ( profile . PluginConfig ) )
for i := range profile . PluginConfig {
name := profile . PluginConfig [ i ] . Name
2020-03-05 13:52:40 -05:00
if _ , ok := pluginConfig [ name ] ; ok {
return nil , fmt . Errorf ( "repeated config for plugin %s" , name )
}
2021-03-03 16:44:25 -05:00
pluginConfig [ name ] = profile . PluginConfig [ i ] . Args
2019-10-04 17:40:21 -04:00
}
2020-11-10 19:41:18 -05:00
outputProfile := config . KubeSchedulerProfile {
SchedulerName : f . profileName ,
2021-03-03 16:44:25 -05:00
Plugins : profile . Plugins ,
2020-11-10 19:41:18 -05:00
PluginConfig : make ( [ ] config . PluginConfig , 0 , len ( pg ) ) ,
}
2019-10-04 17:40:21 -04:00
2020-06-19 05:05:45 -04:00
pluginsMap := make ( map [ string ] framework . Plugin )
2019-03-27 18:21:32 -04:00
for name , factory := range r {
2019-10-04 17:40:21 -04:00
// initialize only needed plugins.
2019-05-04 06:29:30 -04:00
if _ , ok := pg [ name ] ; ! ok {
continue
}
2021-06-10 08:45:49 -04:00
args := pluginConfig [ name ]
2020-11-10 19:41:18 -05:00
if args != nil {
outputProfile . PluginConfig = append ( outputProfile . PluginConfig , config . PluginConfig {
Name : name ,
Args : args ,
} )
}
2020-04-30 18:11:37 -04:00
p , err := factory ( args , f )
2019-03-27 18:21:32 -04:00
if err != nil {
2020-09-24 09:10:42 -04:00
return nil , fmt . Errorf ( "initializing plugin %q: %w" , name , err )
2019-03-27 18:21:32 -04:00
}
2019-07-16 09:19:20 -04:00
pluginsMap [ name ] = p
2021-01-29 01:29:10 -05:00
// Update ClusterEventMap in place.
fillEventToPluginMap ( p , options . clusterEventMap )
2019-05-04 06:29:30 -04:00
}
2019-03-27 18:21:32 -04:00
2021-10-11 13:47:23 -04:00
// initialize plugins per individual extension points
2021-03-03 16:44:25 -05:00
for _ , e := range f . getExtensionPoints ( profile . Plugins ) {
2021-06-10 08:45:49 -04:00
if err := updatePluginList ( e . slicePtr , * e . plugins , pluginsMap ) ; err != nil {
2019-10-04 17:40:21 -04:00
return nil , err
}
2019-09-26 21:15:32 -04:00
}
2021-10-11 13:47:23 -04:00
// initialize multiPoint plugins to their expanded extension points
if len ( profile . Plugins . MultiPoint . Enabled ) > 0 {
if err := f . expandMultiPointPlugins ( profile , pluginsMap ) ; err != nil {
return nil , err
}
}
if len ( f . queueSortPlugins ) != 1 {
return nil , fmt . Errorf ( "one queue sort plugin required for profile with scheduler name %q" , profile . SchedulerName )
}
if err := getScoreWeights ( f , pluginsMap , append ( profile . Plugins . Score . Enabled , profile . Plugins . MultiPoint . Enabled ... ) ) ; err != nil {
return nil , err
}
2019-10-04 17:40:21 -04:00
// Verifying the score weights again since Plugin.Name() could return a different
// value from the one used in the configuration.
2019-09-26 21:15:32 -04:00
for _ , scorePlugin := range f . scorePlugins {
2021-03-29 04:34:10 -04:00
if f . scorePluginWeight [ scorePlugin . Name ( ) ] == 0 {
2019-09-26 21:15:32 -04:00
return nil , fmt . Errorf ( "score plugin %q is not configured with weight" , scorePlugin . Name ( ) )
2019-05-10 09:05:59 -04:00
}
2019-03-27 18:21:32 -04:00
}
2019-05-04 06:29:30 -04:00
2020-01-15 15:26:22 -05:00
if len ( f . queueSortPlugins ) == 0 {
return nil , fmt . Errorf ( "no queue sort plugin is enabled" )
}
2019-09-26 21:15:32 -04:00
if len ( f . queueSortPlugins ) > 1 {
return nil , fmt . Errorf ( "only one queue sort plugin can be enabled" )
}
2020-01-15 11:26:35 -05:00
if len ( f . bindPlugins ) == 0 {
return nil , fmt . Errorf ( "at least one bind plugin is needed" )
}
2019-09-26 21:15:32 -04:00
2020-11-10 19:41:18 -05:00
if options . captureProfile != nil {
if len ( outputProfile . PluginConfig ) != 0 {
sort . Slice ( outputProfile . PluginConfig , func ( i , j int ) bool {
return outputProfile . PluginConfig [ i ] . Name < outputProfile . PluginConfig [ j ] . Name
} )
} else {
outputProfile . PluginConfig = nil
}
options . captureProfile ( outputProfile )
}
2019-03-27 18:21:32 -04:00
return f , nil
}
2021-10-11 13:47:23 -04:00
// getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score
// plugin weights there is not an overflow of MaxTotalScore.
func getScoreWeights ( f * frameworkImpl , pluginsMap map [ string ] framework . Plugin , plugins [ ] config . Plugin ) error {
var totalPriority int64
scorePlugins := reflect . ValueOf ( & f . scorePlugins ) . Elem ( )
pluginType := scorePlugins . Type ( ) . Elem ( )
for _ , e := range plugins {
pg := pluginsMap [ e . Name ]
if ! reflect . TypeOf ( pg ) . Implements ( pluginType ) {
continue
}
// We append MultiPoint plugins to the list of Score plugins. So if this plugin has already been
// encountered, let the individual Score weight take precedence.
if _ , ok := f . scorePluginWeight [ e . Name ] ; ok {
continue
}
// a weight of zero is not permitted, plugins can be disabled explicitly
// when configured.
f . scorePluginWeight [ e . Name ] = int ( e . Weight )
if f . scorePluginWeight [ e . Name ] == 0 {
f . scorePluginWeight [ e . Name ] = 1
}
// Checks totalPriority against MaxTotalScore to avoid overflow
if int64 ( f . scorePluginWeight [ e . Name ] ) * framework . MaxNodeScore > framework . MaxTotalScore - totalPriority {
return fmt . Errorf ( "total score of Score plugins could overflow" )
}
totalPriority += int64 ( f . scorePluginWeight [ e . Name ] ) * framework . MaxNodeScore
}
return nil
}
func ( f * frameworkImpl ) expandMultiPointPlugins ( profile * config . KubeSchedulerProfile , pluginsMap map [ string ] framework . Plugin ) error {
// initialize MultiPoint plugins
for _ , e := range f . getExtensionPoints ( profile . Plugins ) {
plugins := reflect . ValueOf ( e . slicePtr ) . Elem ( )
pluginType := plugins . Type ( ) . Elem ( )
// build enabledSet of plugins already registered via normal extension points
// to check double registration
enabledSet := sets . NewString ( )
for _ , plugin := range e . plugins . Enabled {
enabledSet . Insert ( plugin . Name )
}
disabledSet := sets . NewString ( )
for _ , disabledPlugin := range e . plugins . Disabled {
disabledSet . Insert ( disabledPlugin . Name )
}
if disabledSet . Has ( "*" ) {
klog . V ( 4 ) . InfoS ( "all plugins disabled for extension point, skipping MultiPoint expansion" , "extension" , pluginType )
continue
}
// track plugins enabled via multipoint separately from those enabled by specific extensions,
// so that we can distinguish between double-registration and explicit overrides
multiPointEnabled := sets . NewString ( )
for _ , ep := range profile . Plugins . MultiPoint . Enabled {
pg , ok := pluginsMap [ ep . Name ]
if ! ok {
return fmt . Errorf ( "%s %q does not exist" , pluginType . Name ( ) , ep . Name )
}
// if this plugin doesn't implement the type for the current extension we're trying to expand, skip
if ! reflect . TypeOf ( pg ) . Implements ( pluginType ) {
continue
}
// a plugin that's enabled via MultiPoint can still be disabled for specific extension points
if disabledSet . Has ( ep . Name ) {
klog . V ( 4 ) . InfoS ( "plugin disabled for extension point" , "plugin" , ep . Name , "extension" , pluginType )
continue
}
// if this plugin has already been enabled by the specific extension point,
// the user intent is to override the default plugin or make some other explicit setting.
// Either way, discard the MultiPoint value for this plugin.
// This maintains expected behavior for overriding default plugins (see https://github.com/kubernetes/kubernetes/pull/99582)
if enabledSet . Has ( ep . Name ) {
klog . InfoS ( "MultiPoint plugin is explicitly re-configured; overriding" , "plugin" , ep . Name )
continue
}
// if this plugin is already registered via MultiPoint, then this is
// a double registration and an error in the config.
if multiPointEnabled . Has ( ep . Name ) {
return fmt . Errorf ( "plugin %q already registered as %q" , ep . Name , pluginType . Name ( ) )
}
// we only need to update the multipoint set, since we already have the specific extension set from above
multiPointEnabled . Insert ( ep . Name )
newPlugins := reflect . Append ( plugins , reflect . ValueOf ( pg ) )
plugins . Set ( newPlugins )
}
}
return nil
}
2021-01-29 01:29:10 -05:00
func fillEventToPluginMap ( p framework . Plugin , eventToPlugins map [ framework . ClusterEvent ] sets . String ) {
ext , ok := p . ( framework . EnqueueExtensions )
if ! ok {
// If interface EnqueueExtensions is not implemented, register the default events
// to the plugin. This is to ensure backward compatibility.
registerClusterEvents ( p . Name ( ) , eventToPlugins , allClusterEvents )
return
}
events := ext . EventsToRegister ( )
// It's rare that a plugin implements EnqueueExtensions but returns nil.
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event.
if len ( events ) == 0 {
klog . InfoS ( "Plugin's EventsToRegister() returned nil" , "plugin" , p . Name ( ) )
return
}
// The most common case: a plugin implements EnqueueExtensions and returns non-nil result.
registerClusterEvents ( p . Name ( ) , eventToPlugins , events )
}
func registerClusterEvents ( name string , eventToPlugins map [ framework . ClusterEvent ] sets . String , evts [ ] framework . ClusterEvent ) {
for _ , evt := range evts {
if eventToPlugins [ evt ] == nil {
eventToPlugins [ evt ] = sets . NewString ( name )
} else {
eventToPlugins [ evt ] . Insert ( name )
}
}
}
2021-02-10 10:45:59 -05:00
func updatePluginList ( pluginList interface { } , pluginSet config . PluginSet , pluginsMap map [ string ] framework . Plugin ) error {
2019-10-04 12:40:21 -04:00
plugins := reflect . ValueOf ( pluginList ) . Elem ( )
pluginType := plugins . Type ( ) . Elem ( )
2019-09-26 21:15:32 -04:00
set := sets . NewString ( )
for _ , ep := range pluginSet . Enabled {
pg , ok := pluginsMap [ ep . Name ]
if ! ok {
2019-10-04 17:40:21 -04:00
return fmt . Errorf ( "%s %q does not exist" , pluginType . Name ( ) , ep . Name )
2019-09-26 21:15:32 -04:00
}
if ! reflect . TypeOf ( pg ) . Implements ( pluginType ) {
2019-10-04 17:40:21 -04:00
return fmt . Errorf ( "plugin %q does not extend %s plugin" , ep . Name , pluginType . Name ( ) )
2019-09-26 21:15:32 -04:00
}
if set . Has ( ep . Name ) {
2019-10-04 17:40:21 -04:00
return fmt . Errorf ( "plugin %q already registered as %q" , ep . Name , pluginType . Name ( ) )
2019-09-26 21:15:32 -04:00
}
set . Insert ( ep . Name )
newPlugins := reflect . Append ( plugins , reflect . ValueOf ( pg ) )
plugins . Set ( newPlugins )
}
return nil
}
2019-05-06 21:03:00 -04:00
// QueueSortFunc returns the function to sort pods in scheduling queue
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) QueueSortFunc ( ) framework . LessFunc {
2020-01-15 15:26:22 -05:00
if f == nil {
2020-06-19 05:05:45 -04:00
// If frameworkImpl is nil, simply keep their order unchanged.
2020-01-15 15:26:22 -05:00
// NOTE: this is primarily for tests.
2020-06-19 05:05:45 -04:00
return func ( _ , _ * framework . QueuedPodInfo ) bool { return false }
2020-01-15 15:26:22 -05:00
}
2019-05-06 21:03:00 -04:00
if len ( f . queueSortPlugins ) == 0 {
2020-06-19 05:05:45 -04:00
panic ( "No QueueSort plugin is registered in the frameworkImpl." )
2019-05-06 21:03:00 -04:00
}
// Only one QueueSort plugin can be enabled.
return f . queueSortPlugins [ 0 ] . Less
}
2019-08-22 13:56:28 -04:00
// RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
2019-06-10 17:01:50 -04:00
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunPreFilterPlugins ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod ) ( status * framework . Status ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( preFilter , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2019-08-22 13:56:28 -04:00
for _ , pl := range f . preFilterPlugins {
2019-10-29 10:54:02 -04:00
status = f . runPreFilterPlugin ( ctx , pl , state , pod )
2019-06-10 17:01:50 -04:00
if ! status . IsSuccess ( ) {
2021-01-13 19:39:55 -05:00
status . SetFailedPlugin ( pl . Name ( ) )
2019-08-27 15:16:07 -04:00
if status . IsUnschedulable ( ) {
2020-07-04 11:43:12 -04:00
return status
2019-06-10 17:01:50 -04:00
}
2021-01-31 04:17:03 -05:00
return framework . AsStatus ( fmt . Errorf ( "running PreFilter plugin %q: %w" , pl . Name ( ) , status . AsError ( ) ) ) . WithFailedPlugin ( pl . Name ( ) )
2019-06-10 17:01:50 -04:00
}
}
2019-07-30 23:21:26 -04:00
2019-06-10 17:01:50 -04:00
return nil
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runPreFilterPlugin ( ctx context . Context , pl framework . PreFilterPlugin , state * framework . CycleState , pod * v1 . Pod ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return pl . PreFilter ( ctx , state , pod )
}
startTime := time . Now ( )
status := pl . PreFilter ( ctx , state , pod )
f . metricsRecorder . observePluginDurationAsync ( preFilter , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status
}
2019-10-01 11:59:48 -04:00
// RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
2019-09-24 12:39:27 -04:00
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunPreFilterExtensionAddPod (
2019-08-28 07:12:02 -04:00
ctx context . Context ,
2020-06-19 05:05:45 -04:00
state * framework . CycleState ,
2019-08-28 07:12:02 -04:00
podToSchedule * v1 . Pod ,
2020-12-30 03:42:06 -05:00
podInfoToAdd * framework . PodInfo ,
2020-06-19 05:05:45 -04:00
nodeInfo * framework . NodeInfo ,
) ( status * framework . Status ) {
2019-09-24 12:39:27 -04:00
for _ , pl := range f . preFilterPlugins {
2019-10-04 10:59:13 -04:00
if pl . PreFilterExtensions ( ) == nil {
2019-10-01 11:59:48 -04:00
continue
}
2020-12-30 03:42:06 -05:00
status = f . runPreFilterExtensionAddPod ( ctx , pl , state , podToSchedule , podInfoToAdd , nodeInfo )
2019-10-29 10:54:02 -04:00
if ! status . IsSuccess ( ) {
2020-09-24 09:10:42 -04:00
err := status . AsError ( )
klog . ErrorS ( err , "Failed running AddPod on PreFilter plugin" , "plugin" , pl . Name ( ) , "pod" , klog . KObj ( podToSchedule ) )
return framework . AsStatus ( fmt . Errorf ( "running AddPod on PreFilter plugin %q: %w" , pl . Name ( ) , err ) )
2019-09-24 12:39:27 -04:00
}
}
return nil
}
2020-12-30 03:42:06 -05:00
func ( f * frameworkImpl ) runPreFilterExtensionAddPod ( ctx context . Context , pl framework . PreFilterPlugin , state * framework . CycleState , podToSchedule * v1 . Pod , podInfoToAdd * framework . PodInfo , nodeInfo * framework . NodeInfo ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2020-12-30 03:42:06 -05:00
return pl . PreFilterExtensions ( ) . AddPod ( ctx , state , podToSchedule , podInfoToAdd , nodeInfo )
2019-10-29 10:54:02 -04:00
}
startTime := time . Now ( )
2020-12-30 03:42:06 -05:00
status := pl . PreFilterExtensions ( ) . AddPod ( ctx , state , podToSchedule , podInfoToAdd , nodeInfo )
2019-10-29 10:54:02 -04:00
f . metricsRecorder . observePluginDurationAsync ( preFilterExtensionAddPod , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status
}
2019-10-01 11:59:48 -04:00
// RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
2019-09-24 12:39:27 -04:00
// PreFilter plugins. It returns directly if any of the plugins return any
// status other than Success.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunPreFilterExtensionRemovePod (
2019-08-28 07:12:02 -04:00
ctx context . Context ,
2020-06-19 05:05:45 -04:00
state * framework . CycleState ,
2019-08-28 07:12:02 -04:00
podToSchedule * v1 . Pod ,
2020-12-30 03:42:06 -05:00
podInfoToRemove * framework . PodInfo ,
2020-06-19 05:05:45 -04:00
nodeInfo * framework . NodeInfo ,
) ( status * framework . Status ) {
2019-09-24 12:39:27 -04:00
for _ , pl := range f . preFilterPlugins {
2019-10-04 10:59:13 -04:00
if pl . PreFilterExtensions ( ) == nil {
2019-10-01 11:59:48 -04:00
continue
}
2020-12-30 03:42:06 -05:00
status = f . runPreFilterExtensionRemovePod ( ctx , pl , state , podToSchedule , podInfoToRemove , nodeInfo )
2019-10-29 10:54:02 -04:00
if ! status . IsSuccess ( ) {
2020-09-24 09:10:42 -04:00
err := status . AsError ( )
klog . ErrorS ( err , "Failed running RemovePod on PreFilter plugin" , "plugin" , pl . Name ( ) , "pod" , klog . KObj ( podToSchedule ) )
return framework . AsStatus ( fmt . Errorf ( "running RemovePod on PreFilter plugin %q: %w" , pl . Name ( ) , err ) )
2019-09-24 12:39:27 -04:00
}
}
return nil
}
2020-12-30 03:42:06 -05:00
func ( f * frameworkImpl ) runPreFilterExtensionRemovePod ( ctx context . Context , pl framework . PreFilterPlugin , state * framework . CycleState , podToSchedule * v1 . Pod , podInfoToRemove * framework . PodInfo , nodeInfo * framework . NodeInfo ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2020-12-30 03:42:06 -05:00
return pl . PreFilterExtensions ( ) . RemovePod ( ctx , state , podToSchedule , podInfoToRemove , nodeInfo )
2019-10-29 10:54:02 -04:00
}
startTime := time . Now ( )
2020-12-30 03:42:06 -05:00
status := pl . PreFilterExtensions ( ) . RemovePod ( ctx , state , podToSchedule , podInfoToRemove , nodeInfo )
2019-10-29 10:54:02 -04:00
f . metricsRecorder . observePluginDurationAsync ( preFilterExtensionRemovePod , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status
}
2019-05-29 05:02:53 -04:00
// RunFilterPlugins runs the set of configured Filter plugins for pod on
// the given node. If any of these plugins doesn't return "Success", the
// given node is not suitable for running pod.
// Meanwhile, the failure message and status are set for the given node.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunFilterPlugins (
2019-08-28 07:12:02 -04:00
ctx context . Context ,
2020-06-19 05:05:45 -04:00
state * framework . CycleState ,
2019-08-28 07:12:02 -04:00
pod * v1 . Pod ,
2020-06-19 05:05:45 -04:00
nodeInfo * framework . NodeInfo ,
) framework . PluginToStatus {
statuses := make ( framework . PluginToStatus )
2019-07-31 03:27:03 -04:00
for _ , pl := range f . filterPlugins {
2019-12-20 16:50:17 -05:00
pluginStatus := f . runFilterPlugin ( ctx , pl , state , pod , nodeInfo )
if ! pluginStatus . IsSuccess ( ) {
if ! pluginStatus . IsUnschedulable ( ) {
// Filter plugins are not supposed to return any status other than
// Success or Unschedulable.
2021-01-13 19:39:55 -05:00
errStatus := framework . AsStatus ( fmt . Errorf ( "running %q filter plugin: %w" , pl . Name ( ) , pluginStatus . AsError ( ) ) ) . WithFailedPlugin ( pl . Name ( ) )
2020-06-19 05:05:45 -04:00
return map [ string ] * framework . Status { pl . Name ( ) : errStatus }
2019-12-20 16:50:17 -05:00
}
2021-01-13 19:39:55 -05:00
pluginStatus . SetFailedPlugin ( pl . Name ( ) )
2020-01-08 13:26:10 -05:00
statuses [ pl . Name ( ) ] = pluginStatus
2019-12-20 16:50:17 -05:00
if ! f . runAllFilters {
// Exit early if we don't need to run all filters.
2020-01-08 13:26:10 -05:00
return statuses
2019-12-20 16:50:17 -05:00
}
2019-05-29 05:02:53 -04:00
}
}
2020-01-08 13:26:10 -05:00
return statuses
2019-05-29 05:02:53 -04:00
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runFilterPlugin ( ctx context . Context , pl framework . FilterPlugin , state * framework . CycleState , pod * v1 . Pod , nodeInfo * framework . NodeInfo ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return pl . Filter ( ctx , state , pod , nodeInfo )
}
startTime := time . Now ( )
status := pl . Filter ( ctx , state , pod , nodeInfo )
2020-01-22 10:51:39 -05:00
f . metricsRecorder . observePluginDurationAsync ( Filter , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
2019-10-29 10:54:02 -04:00
return status
}
2020-06-05 16:02:45 -04:00
// RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
// Success or Error is met, otherwise continues to execute all plugins.
2020-06-22 20:22:27 -04:00
func ( f * frameworkImpl ) RunPostFilterPlugins ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , filteredNodeStatusMap framework . NodeToStatusMap ) ( _ * framework . PostFilterResult , status * framework . Status ) {
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( postFilter , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-06-22 20:22:27 -04:00
} ( )
2020-06-19 05:05:45 -04:00
statuses := make ( framework . PluginToStatus )
2020-06-05 16:02:45 -04:00
for _ , pl := range f . postFilterPlugins {
r , s := f . runPostFilterPlugin ( ctx , pl , state , pod , filteredNodeStatusMap )
if s . IsSuccess ( ) {
return r , s
} else if ! s . IsUnschedulable ( ) {
// Any status other than Success or Unschedulable is Error.
2021-01-26 00:48:58 -05:00
return nil , framework . AsStatus ( s . AsError ( ) )
2020-06-05 16:02:45 -04:00
}
statuses [ pl . Name ( ) ] = s
}
2020-06-22 20:22:27 -04:00
2020-06-05 16:02:45 -04:00
return nil , statuses . Merge ( )
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runPostFilterPlugin ( ctx context . Context , pl framework . PostFilterPlugin , state * framework . CycleState , pod * v1 . Pod , filteredNodeStatusMap framework . NodeToStatusMap ) ( * framework . PostFilterResult , * framework . Status ) {
2020-06-05 16:02:45 -04:00
if ! state . ShouldRecordPluginMetrics ( ) {
return pl . PostFilter ( ctx , state , pod , filteredNodeStatusMap )
}
startTime := time . Now ( )
r , s := pl . PostFilter ( ctx , state , pod , filteredNodeStatusMap )
f . metricsRecorder . observePluginDurationAsync ( postFilter , pl . Name ( ) , s , metrics . SinceInSeconds ( startTime ) )
return r , s
}
2020-12-19 11:18:40 -05:00
// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins
// for nominated pod on the given node.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is
// schedulable on the node with all the existing pods on the node plus higher
// and equal priority pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption
// and add the nominated pods. Removal of the victims is done by
// SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
// NodeInfo before calling this function.
func ( f * frameworkImpl ) RunFilterPluginsWithNominatedPods ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , info * framework . NodeInfo ) * framework . Status {
var status * framework . Status
podsAdded := false
// We run filters twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
// If all filters succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// filters such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// filters fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: filters like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while filters like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0 ; i < 2 ; i ++ {
stateToUse := state
nodeInfoToUse := info
if i == 0 {
var err error
2021-02-26 11:44:05 -05:00
podsAdded , stateToUse , nodeInfoToUse , err = addNominatedPods ( ctx , f , pod , state , info )
2020-12-19 11:18:40 -05:00
if err != nil {
2021-01-26 00:48:58 -05:00
return framework . AsStatus ( err )
2020-12-19 11:18:40 -05:00
}
} else if ! podsAdded || ! status . IsSuccess ( ) {
break
}
2021-02-26 11:44:05 -05:00
statusMap := f . RunFilterPlugins ( ctx , stateToUse , pod , nodeInfoToUse )
2020-12-19 11:18:40 -05:00
status = statusMap . Merge ( )
if ! status . IsSuccess ( ) && ! status . IsUnschedulable ( ) {
return status
}
}
return status
}
// addNominatedPods adds pods with equal or greater priority which are nominated
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
// 3) augmented nodeInfo.
2021-02-26 11:44:05 -05:00
func addNominatedPods ( ctx context . Context , fh framework . Handle , pod * v1 . Pod , state * framework . CycleState , nodeInfo * framework . NodeInfo ) ( bool , * framework . CycleState , * framework . NodeInfo , error ) {
if fh == nil || nodeInfo . Node ( ) == nil {
2020-12-19 11:18:40 -05:00
// This may happen only in tests.
return false , state , nodeInfo , nil
}
2021-02-26 11:44:05 -05:00
nominatedPodInfos := fh . NominatedPodsForNode ( nodeInfo . Node ( ) . Name )
2021-02-22 09:00:23 -05:00
if len ( nominatedPodInfos ) == 0 {
2020-12-19 11:18:40 -05:00
return false , state , nodeInfo , nil
}
nodeInfoOut := nodeInfo . Clone ( )
stateOut := state . Clone ( )
podsAdded := false
2021-02-22 09:00:23 -05:00
for _ , pi := range nominatedPodInfos {
if corev1 . PodPriority ( pi . Pod ) >= corev1 . PodPriority ( pod ) && pi . Pod . UID != pod . UID {
nodeInfoOut . AddPodInfo ( pi )
2021-02-26 11:44:05 -05:00
status := fh . RunPreFilterExtensionAddPod ( ctx , stateOut , pod , pi , nodeInfoOut )
2020-12-19 11:18:40 -05:00
if ! status . IsSuccess ( ) {
return false , state , nodeInfo , status . AsError ( )
}
podsAdded = true
}
}
return podsAdded , stateOut , nodeInfoOut , nil
}
2020-02-02 05:07:01 -05:00
// RunPreScorePlugins runs the set of configured pre-score plugins. If any
2020-02-15 00:48:45 -05:00
// of these plugins returns any status other than "Success", the given pod is rejected.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunPreScorePlugins (
2019-08-28 07:12:02 -04:00
ctx context . Context ,
2020-06-19 05:05:45 -04:00
state * framework . CycleState ,
2019-07-30 23:21:26 -04:00
pod * v1 . Pod ,
nodes [ ] * v1 . Node ,
2020-06-19 05:05:45 -04:00
) ( status * framework . Status ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( preScore , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2020-02-02 05:07:01 -05:00
for _ , pl := range f . preScorePlugins {
2020-02-15 00:48:45 -05:00
status = f . runPreScorePlugin ( ctx , pl , state , pod , nodes )
2019-07-30 23:21:26 -04:00
if ! status . IsSuccess ( ) {
2021-01-31 04:17:03 -05:00
return framework . AsStatus ( fmt . Errorf ( "running PreScore plugin %q: %w" , pl . Name ( ) , status . AsError ( ) ) )
2019-07-30 23:21:26 -04:00
}
}
return nil
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runPreScorePlugin ( ctx context . Context , pl framework . PreScorePlugin , state * framework . CycleState , pod * v1 . Pod , nodes [ ] * v1 . Node ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2020-02-15 00:48:45 -05:00
return pl . PreScore ( ctx , state , pod , nodes )
2019-10-29 10:54:02 -04:00
}
startTime := time . Now ( )
2020-02-15 00:48:45 -05:00
status := pl . PreScore ( ctx , state , pod , nodes )
2020-02-02 05:07:01 -05:00
f . metricsRecorder . observePluginDurationAsync ( preScore , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
2019-10-29 10:54:02 -04:00
return status
}
2019-08-02 07:24:55 -04:00
// RunScorePlugins runs the set of configured scoring plugins. It returns a list that
2019-07-16 09:19:20 -04:00
// stores for each scoring plugin name the corresponding NodeScoreList(s).
// It also returns *Status, which is set to non-success if any of the plugins returns
// a non-success status.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunScorePlugins ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , nodes [ ] * v1 . Node ) ( ps framework . PluginToNodeScores , status * framework . Status ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( score , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2020-06-19 05:05:45 -04:00
pluginToNodeScores := make ( framework . PluginToNodeScores , len ( f . scorePlugins ) )
2019-07-16 09:19:20 -04:00
for _ , pl := range f . scorePlugins {
2020-06-19 05:05:45 -04:00
pluginToNodeScores [ pl . Name ( ) ] = make ( framework . NodeScoreList , len ( nodes ) )
2019-07-16 09:19:20 -04:00
}
2019-08-28 07:12:02 -04:00
ctx , cancel := context . WithCancel ( ctx )
2020-03-31 22:34:13 -04:00
errCh := parallelize . NewErrorChannel ( )
2019-08-19 16:08:14 -04:00
// Run Score method for each node in parallel.
2021-03-01 09:20:18 -05:00
f . Parallelizer ( ) . Until ( ctx , len ( nodes ) , func ( index int ) {
2019-07-16 09:19:20 -04:00
for _ , pl := range f . scorePlugins {
2019-08-02 07:24:55 -04:00
nodeName := nodes [ index ] . Name
2019-10-29 10:54:02 -04:00
s , status := f . runScorePlugin ( ctx , pl , state , pod , nodeName )
2019-07-16 09:19:20 -04:00
if ! status . IsSuccess ( ) {
2020-09-24 09:10:42 -04:00
err := fmt . Errorf ( "plugin %q failed with: %w" , pl . Name ( ) , status . AsError ( ) )
errCh . SendErrorWithCancel ( err , cancel )
2019-07-16 09:19:20 -04:00
return
}
2020-06-19 05:05:45 -04:00
pluginToNodeScores [ pl . Name ( ) ] [ index ] = framework . NodeScore {
2019-08-02 07:24:55 -04:00
Name : nodeName ,
2020-08-04 02:46:23 -04:00
Score : s ,
2019-08-02 07:24:55 -04:00
}
2019-07-16 09:19:20 -04:00
}
} )
2019-07-16 10:00:42 -04:00
if err := errCh . ReceiveError ( ) ; err != nil {
2020-09-24 09:10:42 -04:00
return nil , framework . AsStatus ( fmt . Errorf ( "running Score plugins: %w" , err ) )
2019-07-16 09:19:20 -04:00
}
2019-09-23 23:44:50 -04:00
// Run NormalizeScore method for each ScorePlugin in parallel.
2021-03-01 09:20:18 -05:00
f . Parallelizer ( ) . Until ( ctx , len ( f . scorePlugins ) , func ( index int ) {
2019-09-23 23:44:50 -04:00
pl := f . scorePlugins [ index ]
2019-08-19 16:08:14 -04:00
nodeScoreList := pluginToNodeScores [ pl . Name ( ) ]
2019-10-04 10:59:13 -04:00
if pl . ScoreExtensions ( ) == nil {
2019-10-01 11:59:48 -04:00
return
}
2019-10-29 10:54:02 -04:00
status := f . runScoreExtension ( ctx , pl , state , pod , nodeScoreList )
2019-10-04 10:25:17 -04:00
if ! status . IsSuccess ( ) {
2020-09-24 09:10:42 -04:00
err := fmt . Errorf ( "plugin %q failed with: %w" , pl . Name ( ) , status . AsError ( ) )
2019-07-29 11:45:01 -04:00
errCh . SendErrorWithCancel ( err , cancel )
2019-08-01 07:59:15 -04:00
return
2019-07-29 11:45:01 -04:00
}
} )
if err := errCh . ReceiveError ( ) ; err != nil {
2020-09-24 09:10:42 -04:00
return nil , framework . AsStatus ( fmt . Errorf ( "running Normalize on Score plugins: %w" , err ) )
2019-07-29 11:45:01 -04:00
}
2019-08-19 16:08:14 -04:00
// Apply score defaultWeights for each ScorePlugin in parallel.
2021-03-01 09:20:18 -05:00
f . Parallelizer ( ) . Until ( ctx , len ( f . scorePlugins ) , func ( index int ) {
2019-07-29 11:45:01 -04:00
pl := f . scorePlugins [ index ]
// Score plugins' weight has been checked when they are initialized.
2021-03-29 04:34:10 -04:00
weight := f . scorePluginWeight [ pl . Name ( ) ]
2019-08-19 16:08:14 -04:00
nodeScoreList := pluginToNodeScores [ pl . Name ( ) ]
2019-08-09 05:03:55 -04:00
for i , nodeScore := range nodeScoreList {
// return error if score plugin returns invalid score.
2020-08-04 02:46:23 -04:00
if nodeScore . Score > framework . MaxNodeScore || nodeScore . Score < framework . MinNodeScore {
2020-09-24 09:10:42 -04:00
err := fmt . Errorf ( "plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing" , pl . Name ( ) , nodeScore . Score , framework . MinNodeScore , framework . MaxNodeScore )
2019-08-09 05:03:55 -04:00
errCh . SendErrorWithCancel ( err , cancel )
return
}
2019-09-27 14:23:29 -04:00
nodeScoreList [ i ] . Score = nodeScore . Score * int64 ( weight )
2019-07-29 11:45:01 -04:00
}
} )
if err := errCh . ReceiveError ( ) ; err != nil {
2020-09-24 09:10:42 -04:00
return nil , framework . AsStatus ( fmt . Errorf ( "applying score defaultWeights on Score plugins: %w" , err ) )
2019-07-29 11:45:01 -04:00
}
2019-08-19 16:08:14 -04:00
return pluginToNodeScores , nil
2019-07-29 11:45:01 -04:00
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runScorePlugin ( ctx context . Context , pl framework . ScorePlugin , state * framework . CycleState , pod * v1 . Pod , nodeName string ) ( int64 , * framework . Status ) {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return pl . Score ( ctx , state , pod , nodeName )
}
startTime := time . Now ( )
s , status := pl . Score ( ctx , state , pod , nodeName )
f . metricsRecorder . observePluginDurationAsync ( score , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return s , status
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runScoreExtension ( ctx context . Context , pl framework . ScorePlugin , state * framework . CycleState , pod * v1 . Pod , nodeScoreList framework . NodeScoreList ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return pl . ScoreExtensions ( ) . NormalizeScore ( ctx , state , pod , nodeScoreList )
}
startTime := time . Now ( )
status := pl . ScoreExtensions ( ) . NormalizeScore ( ctx , state , pod , nodeScoreList )
f . metricsRecorder . observePluginDurationAsync ( scoreExtensionNormalize , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status
}
2019-08-22 14:07:28 -04:00
// RunPreBindPlugins runs the set of configured prebind plugins. It returns a
2019-03-27 18:21:32 -04:00
// failure (bool) if any of the plugins returns an error. It also returns an
// error containing the rejection message or the error occurred in the plugin.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunPreBindPlugins ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , nodeName string ) ( status * framework . Status ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( preBind , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2019-08-22 14:07:28 -04:00
for _ , pl := range f . preBindPlugins {
2019-10-29 10:54:02 -04:00
status = f . runPreBindPlugin ( ctx , pl , state , pod , nodeName )
2019-03-27 18:21:32 -04:00
if ! status . IsSuccess ( ) {
2020-09-24 09:10:42 -04:00
err := status . AsError ( )
klog . ErrorS ( err , "Failed running PreBind plugin" , "plugin" , pl . Name ( ) , "pod" , klog . KObj ( pod ) )
return framework . AsStatus ( fmt . Errorf ( "running PreBind plugin %q: %w" , pl . Name ( ) , err ) )
2019-03-27 18:21:32 -04:00
}
}
return nil
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runPreBindPlugin ( ctx context . Context , pl framework . PreBindPlugin , state * framework . CycleState , pod * v1 . Pod , nodeName string ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return pl . PreBind ( ctx , state , pod , nodeName )
}
startTime := time . Now ( )
status := pl . PreBind ( ctx , state , pod , nodeName )
f . metricsRecorder . observePluginDurationAsync ( preBind , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status
}
2019-06-23 12:42:28 -04:00
// RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunBindPlugins ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , nodeName string ) ( status * framework . Status ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( bind , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2019-06-23 12:42:28 -04:00
if len ( f . bindPlugins ) == 0 {
2020-06-19 05:05:45 -04:00
return framework . NewStatus ( framework . Skip , "" )
2019-06-23 12:42:28 -04:00
}
for _ , bp := range f . bindPlugins {
2019-10-29 10:54:02 -04:00
status = f . runBindPlugin ( ctx , bp , state , pod , nodeName )
2020-06-19 05:05:45 -04:00
if status != nil && status . Code ( ) == framework . Skip {
2019-06-23 12:42:28 -04:00
continue
}
if ! status . IsSuccess ( ) {
2020-09-24 09:10:42 -04:00
err := status . AsError ( )
klog . ErrorS ( err , "Failed running Bind plugin" , "plugin" , bp . Name ( ) , "pod" , klog . KObj ( pod ) )
return framework . AsStatus ( fmt . Errorf ( "running Bind plugin %q: %w" , bp . Name ( ) , err ) )
2019-06-23 12:42:28 -04:00
}
return status
}
return status
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runBindPlugin ( ctx context . Context , bp framework . BindPlugin , state * framework . CycleState , pod * v1 . Pod , nodeName string ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return bp . Bind ( ctx , state , pod , nodeName )
}
2019-10-04 10:25:17 -04:00
startTime := time . Now ( )
2019-10-29 10:54:02 -04:00
status := bp . Bind ( ctx , state , pod , nodeName )
f . metricsRecorder . observePluginDurationAsync ( bind , bp . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status
}
// RunPostBindPlugins runs the set of configured postbind plugins.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunPostBindPlugins ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , nodeName string ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( postBind , framework . Success . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2019-08-22 14:02:26 -04:00
for _ , pl := range f . postBindPlugins {
2019-10-29 10:54:02 -04:00
f . runPostBindPlugin ( ctx , pl , state , pod , nodeName )
}
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runPostBindPlugin ( ctx context . Context , pl framework . PostBindPlugin , state * framework . CycleState , pod * v1 . Pod , nodeName string ) {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-08-28 07:12:02 -04:00
pl . PostBind ( ctx , state , pod , nodeName )
2019-10-29 10:54:02 -04:00
return
2019-05-08 07:55:39 -04:00
}
2019-10-29 10:54:02 -04:00
startTime := time . Now ( )
pl . PostBind ( ctx , state , pod , nodeName )
f . metricsRecorder . observePluginDurationAsync ( postBind , pl . Name ( ) , nil , metrics . SinceInSeconds ( startTime ) )
2019-05-08 07:55:39 -04:00
}
2020-06-15 17:52:54 -04:00
// RunReservePluginsReserve runs the Reserve method in the set of configured
// reserve plugins. If any of these plugins returns an error, it does not
// continue running the remaining ones and returns the error. In such a case,
2020-06-22 14:38:29 -04:00
// the pod will not be scheduled and the caller will be expected to call
// RunReservePluginsUnreserve.
2020-06-15 17:52:54 -04:00
func ( f * frameworkImpl ) RunReservePluginsReserve ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , nodeName string ) ( status * framework . Status ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( reserve , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2019-03-27 18:21:32 -04:00
for _ , pl := range f . reservePlugins {
2020-06-15 17:52:54 -04:00
status = f . runReservePluginReserve ( ctx , pl , state , pod , nodeName )
2019-03-27 18:21:32 -04:00
if ! status . IsSuccess ( ) {
2020-09-24 09:10:42 -04:00
err := status . AsError ( )
klog . ErrorS ( err , "Failed running Reserve plugin" , "plugin" , pl . Name ( ) , "pod" , klog . KObj ( pod ) )
return framework . AsStatus ( fmt . Errorf ( "running Reserve plugin %q: %w" , pl . Name ( ) , err ) )
2019-03-27 18:21:32 -04:00
}
}
return nil
}
2020-06-15 17:52:54 -04:00
func ( f * frameworkImpl ) runReservePluginReserve ( ctx context . Context , pl framework . ReservePlugin , state * framework . CycleState , pod * v1 . Pod , nodeName string ) * framework . Status {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return pl . Reserve ( ctx , state , pod , nodeName )
}
2019-10-04 10:25:17 -04:00
startTime := time . Now ( )
2019-10-29 10:54:02 -04:00
status := pl . Reserve ( ctx , state , pod , nodeName )
f . metricsRecorder . observePluginDurationAsync ( reserve , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status
}
2020-06-15 17:52:54 -04:00
// RunReservePluginsUnreserve runs the Unreserve method in the set of
// configured reserve plugins.
func ( f * frameworkImpl ) RunReservePluginsUnreserve ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , nodeName string ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( unreserve , framework . Success . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2020-06-15 17:52:54 -04:00
// Execute the Unreserve operation of each reserve plugin in the
// *reverse* order in which the Reserve operation was executed.
for i := len ( f . reservePlugins ) - 1 ; i >= 0 ; i -- {
f . runReservePluginUnreserve ( ctx , f . reservePlugins [ i ] , state , pod , nodeName )
2019-10-29 10:54:02 -04:00
}
}
2020-06-15 17:52:54 -04:00
func ( f * frameworkImpl ) runReservePluginUnreserve ( ctx context . Context , pl framework . ReservePlugin , state * framework . CycleState , pod * v1 . Pod , nodeName string ) {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-08-28 07:12:02 -04:00
pl . Unreserve ( ctx , state , pod , nodeName )
2019-10-29 10:54:02 -04:00
return
2019-05-04 23:16:14 -04:00
}
2019-10-29 10:54:02 -04:00
startTime := time . Now ( )
pl . Unreserve ( ctx , state , pod , nodeName )
f . metricsRecorder . observePluginDurationAsync ( unreserve , pl . Name ( ) , nil , metrics . SinceInSeconds ( startTime ) )
2019-05-04 23:16:14 -04:00
}
2019-05-10 09:05:59 -04:00
// RunPermitPlugins runs the set of configured permit plugins. If any of these
// plugins returns a status other than "Success" or "Wait", it does not continue
// running the remaining plugins and returns an error. Otherwise, if any of the
2020-02-15 19:28:43 -05:00
// plugins returns "Wait", then this function will create and add waiting pod
// to a map of currently waiting pods and return status with "Wait" code.
// Pod will remain waiting pod for the minimum duration returned by the permit plugins.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) RunPermitPlugins ( ctx context . Context , state * framework . CycleState , pod * v1 . Pod , nodeName string ) ( status * framework . Status ) {
2020-01-08 23:23:05 -05:00
startTime := time . Now ( )
defer func ( ) {
2020-06-17 17:49:30 -04:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( permit , status . Code ( ) . String ( ) , f . profileName ) . Observe ( metrics . SinceInSeconds ( startTime ) )
2020-01-08 23:23:05 -05:00
} ( )
2019-10-10 21:56:13 -04:00
pluginsWaitTime := make ( map [ string ] time . Duration )
2020-06-19 05:05:45 -04:00
statusCode := framework . Success
2019-05-10 09:05:59 -04:00
for _ , pl := range f . permitPlugins {
2019-10-29 10:54:02 -04:00
status , timeout := f . runPermitPlugin ( ctx , pl , state , pod , nodeName )
2019-05-10 09:05:59 -04:00
if ! status . IsSuccess ( ) {
2019-08-27 15:16:07 -04:00
if status . IsUnschedulable ( ) {
2021-02-20 20:56:46 -05:00
klog . V ( 4 ) . InfoS ( "Pod rejected by permit plugin" , "pod" , klog . KObj ( pod ) , "plugin" , pl . Name ( ) , "status" , status . Message ( ) )
2021-01-13 19:39:55 -05:00
status . SetFailedPlugin ( pl . Name ( ) )
return status
2019-05-10 09:05:59 -04:00
}
2020-06-19 05:05:45 -04:00
if status . Code ( ) == framework . Wait {
2019-10-10 21:56:13 -04:00
// Not allowed to be greater than maxTimeout.
if timeout > maxTimeout {
timeout = maxTimeout
2019-05-10 09:05:59 -04:00
}
2019-10-10 21:56:13 -04:00
pluginsWaitTime [ pl . Name ( ) ] = timeout
2020-06-19 05:05:45 -04:00
statusCode = framework . Wait
2019-05-10 09:05:59 -04:00
} else {
2020-09-24 09:10:42 -04:00
err := status . AsError ( )
klog . ErrorS ( err , "Failed running Permit plugin" , "plugin" , pl . Name ( ) , "pod" , klog . KObj ( pod ) )
2021-01-13 19:39:55 -05:00
return framework . AsStatus ( fmt . Errorf ( "running Permit plugin %q: %w" , pl . Name ( ) , err ) ) . WithFailedPlugin ( pl . Name ( ) )
2019-05-10 09:05:59 -04:00
}
}
}
2020-06-19 05:05:45 -04:00
if statusCode == framework . Wait {
2020-02-15 19:28:43 -05:00
waitingPod := newWaitingPod ( pod , pluginsWaitTime )
f . waitingPods . add ( waitingPod )
msg := fmt . Sprintf ( "one or more plugins asked to wait and no plugin rejected pod %q" , pod . Name )
2021-02-20 20:56:46 -05:00
klog . V ( 4 ) . InfoS ( "One or more plugins asked to wait and no plugin rejected pod" , "pod" , klog . KObj ( pod ) )
2020-06-19 05:05:45 -04:00
return framework . NewStatus ( framework . Wait , msg )
2019-05-10 09:05:59 -04:00
}
return nil
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) runPermitPlugin ( ctx context . Context , pl framework . PermitPlugin , state * framework . CycleState , pod * v1 . Pod , nodeName string ) ( * framework . Status , time . Duration ) {
2020-01-08 23:23:05 -05:00
if ! state . ShouldRecordPluginMetrics ( ) {
2019-10-29 10:54:02 -04:00
return pl . Permit ( ctx , state , pod , nodeName )
}
startTime := time . Now ( )
status , timeout := pl . Permit ( ctx , state , pod , nodeName )
f . metricsRecorder . observePluginDurationAsync ( permit , pl . Name ( ) , status , metrics . SinceInSeconds ( startTime ) )
return status , timeout
}
2020-02-15 19:28:43 -05:00
// WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
2021-01-13 19:39:55 -05:00
func ( f * frameworkImpl ) WaitOnPermit ( ctx context . Context , pod * v1 . Pod ) * framework . Status {
2020-02-15 19:28:43 -05:00
waitingPod := f . waitingPods . get ( pod . UID )
if waitingPod == nil {
return nil
}
defer f . waitingPods . remove ( pod . UID )
2021-02-20 20:56:46 -05:00
klog . V ( 4 ) . InfoS ( "Pod waiting on permit" , "pod" , klog . KObj ( pod ) )
2020-02-15 19:28:43 -05:00
startTime := time . Now ( )
s := <- waitingPod . s
metrics . PermitWaitDuration . WithLabelValues ( s . Code ( ) . String ( ) ) . Observe ( metrics . SinceInSeconds ( startTime ) )
if ! s . IsSuccess ( ) {
if s . IsUnschedulable ( ) {
2021-02-20 20:56:46 -05:00
klog . V ( 4 ) . InfoS ( "Pod rejected while waiting on permit" , "pod" , klog . KObj ( pod ) , "status" , s . Message ( ) )
2021-01-13 19:39:55 -05:00
s . SetFailedPlugin ( s . FailedPlugin ( ) )
return s
2020-02-15 19:28:43 -05:00
}
2020-09-24 09:10:42 -04:00
err := s . AsError ( )
klog . ErrorS ( err , "Failed waiting on permit for pod" , "pod" , klog . KObj ( pod ) )
2021-01-13 19:39:55 -05:00
return framework . AsStatus ( fmt . Errorf ( "waiting on permit for pod: %w" , err ) ) . WithFailedPlugin ( s . FailedPlugin ( ) )
2020-02-15 19:28:43 -05:00
}
return nil
}
2019-10-24 18:30:21 -04:00
// SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
// snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
// unchanged until a pod finishes "Reserve". There is no guarantee that the information
// remains unchanged after "Reserve".
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) SnapshotSharedLister ( ) framework . SharedLister {
2019-11-05 21:25:07 -05:00
return f . snapshotSharedLister
2019-03-27 18:21:32 -04:00
}
2019-05-10 09:05:59 -04:00
// IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) IterateOverWaitingPods ( callback func ( framework . WaitingPod ) ) {
2019-05-10 09:05:59 -04:00
f . waitingPods . iterate ( callback )
}
// GetWaitingPod returns a reference to a WaitingPod given its UID.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) GetWaitingPod ( uid types . UID ) framework . WaitingPod {
2020-02-15 19:28:43 -05:00
if wp := f . waitingPods . get ( uid ) ; wp != nil {
return wp
}
return nil // Returning nil instead of *waitingPod(nil).
2019-05-10 09:05:59 -04:00
}
2019-05-04 06:29:30 -04:00
2019-11-05 09:04:44 -05:00
// RejectWaitingPod rejects a WaitingPod given its UID.
2021-07-19 18:46:55 -04:00
// The returned value indicates if the given pod is waiting or not.
func ( f * frameworkImpl ) RejectWaitingPod ( uid types . UID ) bool {
if waitingPod := f . waitingPods . get ( uid ) ; waitingPod != nil {
2021-01-13 19:39:55 -05:00
waitingPod . Reject ( "" , "removed" )
2021-07-19 18:46:55 -04:00
return true
2019-11-05 09:04:44 -05:00
}
2021-07-19 18:46:55 -04:00
return false
2019-11-05 09:04:44 -05:00
}
2019-10-16 21:24:33 -04:00
// HasFilterPlugins returns true if at least one filter plugin is defined.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) HasFilterPlugins ( ) bool {
2019-10-16 21:24:33 -04:00
return len ( f . filterPlugins ) > 0
}
2020-06-22 20:22:27 -04:00
// HasPostFilterPlugins returns true if at least one postFilter plugin is defined.
func ( f * frameworkImpl ) HasPostFilterPlugins ( ) bool {
return len ( f . postFilterPlugins ) > 0
}
2019-10-29 05:25:35 -04:00
// HasScorePlugins returns true if at least one score plugin is defined.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) HasScorePlugins ( ) bool {
2019-10-29 05:25:35 -04:00
return len ( f . scorePlugins ) > 0
}
2019-10-04 17:40:21 -04:00
// ListPlugins returns a map of extension point name to plugin names configured at each extension
2020-05-26 12:00:22 -04:00
// point. Returns nil if no plugins where configured.
2021-06-10 08:45:49 -04:00
func ( f * frameworkImpl ) ListPlugins ( ) * config . Plugins {
m := config . Plugins { }
2019-10-05 20:31:51 -04:00
2021-06-10 08:45:49 -04:00
for _ , e := range f . getExtensionPoints ( & m ) {
2019-10-05 22:30:50 -04:00
plugins := reflect . ValueOf ( e . slicePtr ) . Elem ( )
2019-10-05 20:31:51 -04:00
extName := plugins . Type ( ) . Elem ( ) . Name ( )
var cfgs [ ] config . Plugin
2019-10-04 17:40:21 -04:00
for i := 0 ; i < plugins . Len ( ) ; i ++ {
2020-06-19 05:05:45 -04:00
name := plugins . Index ( i ) . Interface ( ) . ( framework . Plugin ) . Name ( )
2019-10-05 20:31:51 -04:00
p := config . Plugin { Name : name }
if extName == "ScorePlugin" {
// Weights apply only to score plugins.
2021-03-29 04:34:10 -04:00
p . Weight = int32 ( f . scorePluginWeight [ name ] )
2019-10-05 20:31:51 -04:00
}
cfgs = append ( cfgs , p )
2019-10-04 17:40:21 -04:00
}
2019-10-05 20:31:51 -04:00
if len ( cfgs ) > 0 {
2021-06-10 08:45:49 -04:00
e . plugins . Enabled = cfgs
2019-10-04 17:40:21 -04:00
}
2019-05-04 06:29:30 -04:00
}
2021-06-10 08:45:49 -04:00
return & m
2019-05-04 06:29:30 -04:00
}
2019-08-19 05:46:15 -04:00
// ClientSet returns a kubernetes clientset.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) ClientSet ( ) clientset . Interface {
2019-08-19 05:46:15 -04:00
return f . clientSet
}
2021-03-29 15:27:27 -04:00
// KubeConfig returns a kubernetes config.
func ( f * frameworkImpl ) KubeConfig ( ) * restclient . Config {
return f . kubeConfig
}
2020-06-10 18:51:32 -04:00
// EventRecorder returns an event recorder.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) EventRecorder ( ) events . EventRecorder {
2020-06-10 18:51:32 -04:00
return f . eventRecorder
}
2019-10-09 02:49:56 -04:00
// SharedInformerFactory returns a shared informer factory.
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) SharedInformerFactory ( ) informers . SharedInformerFactory {
2019-10-09 02:49:56 -04:00
return f . informerFactory
}
2020-06-19 05:05:45 -04:00
func ( f * frameworkImpl ) pluginsNeeded ( plugins * config . Plugins ) map [ string ] config . Plugin {
2019-10-05 20:31:51 -04:00
pgMap := make ( map [ string ] config . Plugin )
2019-05-04 06:29:30 -04:00
if plugins == nil {
return pgMap
}
2021-06-10 08:45:49 -04:00
find := func ( pgs * config . PluginSet ) {
2019-05-04 06:29:30 -04:00
for _ , pg := range pgs . Enabled {
2019-07-16 09:19:20 -04:00
pgMap [ pg . Name ] = pg
2019-05-04 06:29:30 -04:00
}
}
2019-10-04 17:40:21 -04:00
for _ , e := range f . getExtensionPoints ( plugins ) {
find ( e . plugins )
}
2021-10-11 13:47:23 -04:00
// Parse MultiPoint separately since they are not returned by f.getExtensionPoints()
find ( & plugins . MultiPoint )
2019-05-04 06:29:30 -04:00
return pgMap
}
2020-06-05 16:02:45 -04:00
2020-10-18 22:43:50 -04:00
// ProfileName returns the profile name associated to this framework.
func ( f * frameworkImpl ) ProfileName ( ) string {
return f . profileName
}
2021-03-01 09:20:18 -05:00
// Parallelizer returns a parallelizer holding parallelism for scheduler.
func ( f * frameworkImpl ) Parallelizer ( ) parallelize . Parallelizer {
return f . parallelizer
}