2016-07-11 07:23:53 -04:00
/ *
Copyright 2016 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2017-08-08 15:55:57 -04:00
package util
2016-07-11 07:23:53 -04:00
import (
2017-08-08 15:55:57 -04:00
"errors"
2016-07-11 07:23:53 -04:00
"fmt"
"strings"
2017-08-08 15:55:57 -04:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
2017-01-21 22:36:02 -05:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2017-01-19 09:50:16 -05:00
"k8s.io/apimachinery/pkg/fields"
2017-01-11 09:09:48 -05:00
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2017-01-24 09:11:51 -05:00
"k8s.io/client-go/tools/cache"
2017-01-30 13:39:54 -05:00
"k8s.io/client-go/tools/record"
2017-02-13 05:48:34 -05:00
2017-06-22 13:25:57 -04:00
"k8s.io/api/core/v1"
2017-06-23 16:56:37 -04:00
clientset "k8s.io/client-go/kubernetes"
extensionslisters "k8s.io/client-go/listers/extensions/v1beta1"
2017-06-22 14:24:23 -04:00
"k8s.io/kubernetes/pkg/api"
2016-07-11 07:23:53 -04:00
"k8s.io/kubernetes/pkg/cloudprovider"
2017-04-04 09:35:44 -04:00
"k8s.io/kubernetes/pkg/controller"
2016-07-11 07:23:53 -04:00
"k8s.io/kubernetes/pkg/kubelet/util/format"
2017-04-04 09:35:44 -04:00
nodepkg "k8s.io/kubernetes/pkg/util/node"
2016-10-22 12:49:18 -04:00
utilversion "k8s.io/kubernetes/pkg/util/version"
2016-07-11 07:23:53 -04:00
"github.com/golang/glog"
)
2017-08-08 15:55:57 -04:00
var (
2017-08-08 20:04:50 -04:00
// ErrCloudInstance occurs when the cloud provider does not support
// the Instances API.
2017-08-08 15:55:57 -04:00
ErrCloudInstance = errors . New ( "cloud provider doesn't support instances" )
2017-08-08 20:04:50 -04:00
// podStatusReconciliationVersion is the the minimum kubelet version
// for which the nodecontroller can safely flip pod.Status to
// NotReady.
2017-08-08 15:55:57 -04:00
podStatusReconciliationVersion = utilversion . MustParseSemantic ( "v1.2.0" )
)
2017-08-08 20:04:50 -04:00
// DeletePods will delete all pods from master running on given node,
// and return true if any pods were deleted, or were found pending
// deletion.
2017-08-08 15:55:57 -04:00
func DeletePods ( kubeClient clientset . Interface , recorder record . EventRecorder , nodeName , nodeUID string , daemonStore extensionslisters . DaemonSetLister ) ( bool , error ) {
2016-07-11 07:23:53 -04:00
remaining := false
2016-11-18 15:50:17 -05:00
selector := fields . OneTermEqualSelector ( api . PodHostField , nodeName ) . String ( )
2017-01-21 22:36:02 -05:00
options := metav1 . ListOptions { FieldSelector : selector }
pods , err := kubeClient . Core ( ) . Pods ( metav1 . NamespaceAll ) . List ( options )
2016-11-01 15:59:06 -04:00
var updateErrList [ ] error
2016-07-11 07:23:53 -04:00
if err != nil {
return remaining , err
}
if len ( pods . Items ) > 0 {
2017-08-08 15:55:57 -04:00
RecordNodeEvent ( recorder , nodeName , nodeUID , v1 . EventTypeNormal , "DeletingAllPods" , fmt . Sprintf ( "Deleting all Pods from Node %v." , nodeName ) )
2016-07-11 07:23:53 -04:00
}
for _ , pod := range pods . Items {
// Defensive check, also needed for tests.
if pod . Spec . NodeName != nodeName {
continue
}
2016-11-01 15:59:06 -04:00
// Set reason and message in the pod object.
2017-08-08 15:55:57 -04:00
if _ , err = SetPodTerminationReason ( kubeClient , & pod , nodeName ) ; err != nil {
if apierrors . IsConflict ( err ) {
2016-11-01 15:59:06 -04:00
updateErrList = append ( updateErrList ,
fmt . Errorf ( "update status failed for pod %q: %v" , format . Pod ( & pod ) , err ) )
continue
}
}
2016-08-15 10:21:47 -04:00
// if the pod has already been marked for deletion, we still return true that there are remaining pods.
2016-07-11 07:23:53 -04:00
if pod . DeletionGracePeriodSeconds != nil {
2016-08-15 10:21:47 -04:00
remaining = true
2016-07-11 07:23:53 -04:00
continue
}
// if the pod is managed by a daemonset, ignore it
_ , err := daemonStore . GetPodDaemonSets ( & pod )
if err == nil { // No error means at least one daemonset was found
continue
}
2017-03-07 04:29:57 -05:00
glog . V ( 2 ) . Infof ( "Starting deletion of pod %v/%v" , pod . Namespace , pod . Name )
2016-11-18 15:50:17 -05:00
recorder . Eventf ( & pod , v1 . EventTypeNormal , "NodeControllerEviction" , "Marking for deletion Pod %s from Node %s" , pod . Name , nodeName )
2016-07-11 07:23:53 -04:00
if err := kubeClient . Core ( ) . Pods ( pod . Namespace ) . Delete ( pod . Name , nil ) ; err != nil {
return false , err
}
remaining = true
}
2016-11-01 15:59:06 -04:00
if len ( updateErrList ) > 0 {
return false , utilerrors . NewAggregate ( updateErrList )
}
2016-07-11 07:23:53 -04:00
return remaining , nil
}
2017-08-08 20:04:50 -04:00
// SetPodTerminationReason attempts to set a reason and message in the
// pod status, updates it in the apiserver, and returns an error if it
// encounters one.
2017-08-08 15:55:57 -04:00
func SetPodTerminationReason ( kubeClient clientset . Interface , pod * v1 . Pod , nodeName string ) ( * v1 . Pod , error ) {
2017-04-04 09:35:44 -04:00
if pod . Status . Reason == nodepkg . NodeUnreachablePodReason {
2016-11-01 15:59:06 -04:00
return pod , nil
}
2017-04-04 09:35:44 -04:00
pod . Status . Reason = nodepkg . NodeUnreachablePodReason
pod . Status . Message = fmt . Sprintf ( nodepkg . NodeUnreachablePodMessage , nodeName , pod . Name )
2016-11-01 15:59:06 -04:00
2016-11-18 15:50:17 -05:00
var updatedPod * v1 . Pod
2016-11-01 15:59:06 -04:00
var err error
if updatedPod , err = kubeClient . Core ( ) . Pods ( pod . Namespace ) . UpdateStatus ( pod ) ; err != nil {
return nil , err
}
return updatedPod , nil
}
2017-08-08 20:04:50 -04:00
// ForcefullyDeletePod deletes the pod immediately.
2017-08-08 15:55:57 -04:00
func ForcefullyDeletePod ( c clientset . Interface , pod * v1 . Pod ) error {
2016-07-11 07:23:53 -04:00
var zero int64
2016-11-16 09:00:01 -05:00
glog . Infof ( "NodeController is force deleting Pod: %v:%v" , pod . Namespace , pod . Name )
2017-01-24 10:38:21 -05:00
err := c . Core ( ) . Pods ( pod . Namespace ) . Delete ( pod . Name , & metav1 . DeleteOptions { GracePeriodSeconds : & zero } )
2016-07-11 07:23:53 -04:00
if err == nil {
glog . V ( 4 ) . Infof ( "forceful deletion of %s succeeded" , pod . Name )
}
return err
}
2017-08-08 20:04:50 -04:00
// ForcefullyDeleteNode deletes the node immediately. The pods on the
// node are cleaned up by the podGC.
2017-08-08 15:55:57 -04:00
func ForcefullyDeleteNode ( kubeClient clientset . Interface , nodeName string ) error {
2016-07-11 07:23:53 -04:00
if err := kubeClient . Core ( ) . Nodes ( ) . Delete ( nodeName , nil ) ; err != nil {
return fmt . Errorf ( "unable to delete node %q: %v" , nodeName , err )
}
return nil
}
2017-08-08 20:04:50 -04:00
// MarkAllPodsNotReady updates ready status of all pods running on
// given node from master return true if success
2017-08-08 15:55:57 -04:00
func MarkAllPodsNotReady ( kubeClient clientset . Interface , node * v1 . Node ) error {
2016-08-17 18:33:35 -04:00
// Don't set pods to NotReady if the kubelet is running a version that
// doesn't understand how to correct readiness.
// TODO: Remove this check when we no longer guarantee backward compatibility
// with node versions < 1.2.0.
2017-08-08 15:55:57 -04:00
if NodeRunningOutdatedKubelet ( node ) {
2016-08-17 18:33:35 -04:00
return nil
}
nodeName := node . Name
2016-07-11 07:23:53 -04:00
glog . V ( 2 ) . Infof ( "Update ready status of pods on node [%v]" , nodeName )
2017-01-21 22:36:02 -05:00
opts := metav1 . ListOptions { FieldSelector : fields . OneTermEqualSelector ( api . PodHostField , nodeName ) . String ( ) }
pods , err := kubeClient . Core ( ) . Pods ( metav1 . NamespaceAll ) . List ( opts )
2016-07-11 07:23:53 -04:00
if err != nil {
return err
}
errMsg := [ ] string { }
for _ , pod := range pods . Items {
// Defensive check, also needed for tests.
if pod . Spec . NodeName != nodeName {
continue
}
for i , cond := range pod . Status . Conditions {
2016-11-18 15:50:17 -05:00
if cond . Type == v1 . PodReady {
pod . Status . Conditions [ i ] . Status = v1 . ConditionFalse
2016-07-11 07:23:53 -04:00
glog . V ( 2 ) . Infof ( "Updating ready status of pod %v to false" , pod . Name )
_ , err := kubeClient . Core ( ) . Pods ( pod . Namespace ) . UpdateStatus ( & pod )
if err != nil {
glog . Warningf ( "Failed to update status for pod %q: %v" , format . Pod ( & pod ) , err )
errMsg = append ( errMsg , fmt . Sprintf ( "%v" , err ) )
}
break
}
}
}
if len ( errMsg ) == 0 {
return nil
}
return fmt . Errorf ( "%v" , strings . Join ( errMsg , "; " ) )
}
2017-08-08 15:55:57 -04:00
// NodeRunningOutdatedKubelet returns true if the kubeletVersion reported
2016-08-17 18:33:35 -04:00
// in the nodeInfo of the given node is "outdated", meaning < 1.2.0.
// Older versions were inflexible and modifying pod.Status directly through
// the apiserver would result in unexpected outcomes.
2017-08-08 15:55:57 -04:00
func NodeRunningOutdatedKubelet ( node * v1 . Node ) bool {
2016-10-22 12:49:18 -04:00
v , err := utilversion . ParseSemantic ( node . Status . NodeInfo . KubeletVersion )
2016-08-17 18:33:35 -04:00
if err != nil {
glog . Errorf ( "couldn't parse version %q of node %v" , node . Status . NodeInfo . KubeletVersion , err )
return true
}
2016-10-22 12:49:18 -04:00
if v . LessThan ( podStatusReconciliationVersion ) {
2016-08-17 18:33:35 -04:00
glog . Infof ( "Node %v running kubelet at (%v) which is less than the minimum version that allows nodecontroller to mark pods NotReady (%v)." , node . Name , v , podStatusReconciliationVersion )
return true
}
return false
}
2017-08-08 20:04:50 -04:00
// NodeExistsInCloudProvider returns true if the node exists in the
// cloud provider.
2017-08-08 15:55:57 -04:00
func NodeExistsInCloudProvider ( cloud cloudprovider . Interface , nodeName types . NodeName ) ( bool , error ) {
2016-07-11 07:23:53 -04:00
instances , ok := cloud . Instances ( )
if ! ok {
return false , fmt . Errorf ( "%v" , ErrCloudInstance )
}
if _ , err := instances . ExternalID ( nodeName ) ; err != nil {
if err == cloudprovider . InstanceNotFound {
return false , nil
}
return false , err
}
return true , nil
}
2017-08-08 20:04:50 -04:00
// RecordNodeEvent records a event related to a node.
2017-08-08 15:55:57 -04:00
func RecordNodeEvent ( recorder record . EventRecorder , nodeName , nodeUID , eventtype , reason , event string ) {
2017-07-15 01:25:54 -04:00
ref := & v1 . ObjectReference {
2016-07-11 07:23:53 -04:00
Kind : "Node" ,
Name : nodeName ,
2016-08-13 21:41:20 -04:00
UID : types . UID ( nodeUID ) ,
2016-07-11 07:23:53 -04:00
Namespace : "" ,
}
glog . V ( 2 ) . Infof ( "Recording %s event message for node %s" , event , nodeName )
recorder . Eventf ( ref , eventtype , reason , "Node %s event: %s" , nodeName , event )
}
2017-08-08 20:04:50 -04:00
// RecordNodeStatusChange records a event related to a node status change.
func RecordNodeStatusChange ( recorder record . EventRecorder , node * v1 . Node , newStatus string ) {
2017-07-15 01:25:54 -04:00
ref := & v1 . ObjectReference {
2016-07-11 07:23:53 -04:00
Kind : "Node" ,
Name : node . Name ,
2016-08-13 21:41:20 -04:00
UID : node . UID ,
2016-07-11 07:23:53 -04:00
Namespace : "" ,
}
2017-08-08 20:04:50 -04:00
glog . V ( 2 ) . Infof ( "Recording status change %s event message for node %s" , newStatus , node . Name )
2016-07-11 07:23:53 -04:00
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
2017-08-08 20:04:50 -04:00
recorder . Eventf ( ref , v1 . EventTypeNormal , newStatus , "Node %s status is now: %s" , node . Name , newStatus )
2016-07-11 07:23:53 -04:00
}
2017-04-04 09:35:44 -04:00
2017-08-08 20:04:50 -04:00
// SwapNodeControllerTaint returns true in case of success and false
// otherwise.
2017-07-19 11:51:19 -04:00
func SwapNodeControllerTaint ( kubeClient clientset . Interface , taintsToAdd , taintsToRemove [ ] * v1 . Taint , node * v1 . Node ) bool {
for _ , taintToAdd := range taintsToAdd {
taintToAdd . TimeAdded = metav1 . Now ( )
}
err := controller . AddOrUpdateTaintOnNode ( kubeClient , node . Name , taintsToAdd ... )
2017-04-04 09:35:44 -04:00
if err != nil {
utilruntime . HandleError (
fmt . Errorf (
2017-07-19 11:51:19 -04:00
"unable to taint %+v unresponsive Node %q: %v" ,
taintsToAdd ,
2017-04-04 09:35:44 -04:00
node . Name ,
err ) )
return false
}
2017-07-19 11:51:19 -04:00
glog . V ( 4 ) . Infof ( "Added %+v Taint to Node %v" , taintsToAdd , node . Name )
2017-04-04 09:35:44 -04:00
2017-07-19 11:51:19 -04:00
err = controller . RemoveTaintOffNode ( kubeClient , node . Name , node , taintsToRemove ... )
2017-04-04 09:35:44 -04:00
if err != nil {
utilruntime . HandleError (
fmt . Errorf (
2017-07-19 11:51:19 -04:00
"unable to remove %+v unneeded taint from unresponsive Node %q: %v" ,
taintsToRemove ,
2017-04-04 09:35:44 -04:00
node . Name ,
err ) )
return false
}
2017-07-19 11:51:19 -04:00
glog . V ( 4 ) . Infof ( "Made sure that Node %+v has no %v Taint" , node . Name , taintsToRemove )
2017-04-04 09:35:44 -04:00
return true
}
2017-05-05 06:01:08 -04:00
2017-08-08 20:04:50 -04:00
// CreateAddNodeHandler creates an add node handler.
2017-08-08 15:55:57 -04:00
func CreateAddNodeHandler ( f func ( node * v1 . Node ) error ) func ( obj interface { } ) {
2017-05-05 06:01:08 -04:00
return func ( originalObj interface { } ) {
2017-08-15 08:14:21 -04:00
node := originalObj . ( * v1 . Node ) . DeepCopy ( )
2017-05-05 06:01:08 -04:00
if err := f ( node ) ; err != nil {
utilruntime . HandleError ( fmt . Errorf ( "Error while processing Node Delete: %v" , err ) )
}
}
}
2017-08-08 20:04:50 -04:00
// CreateUpdateNodeHandler creates a node update handler.
2017-08-08 15:55:57 -04:00
func CreateUpdateNodeHandler ( f func ( oldNode , newNode * v1 . Node ) error ) func ( oldObj , newObj interface { } ) {
2017-05-05 06:01:08 -04:00
return func ( origOldObj , origNewObj interface { } ) {
2017-08-15 08:14:21 -04:00
node := origNewObj . ( * v1 . Node ) . DeepCopy ( )
prevNode := origOldObj . ( * v1 . Node ) . DeepCopy ( )
2017-05-05 06:01:08 -04:00
if err := f ( prevNode , node ) ; err != nil {
utilruntime . HandleError ( fmt . Errorf ( "Error while processing Node Add/Delete: %v" , err ) )
}
}
}
2017-08-08 20:04:50 -04:00
// CreateDeleteNodeHandler creates a delete node handler.
2017-08-08 15:55:57 -04:00
func CreateDeleteNodeHandler ( f func ( node * v1 . Node ) error ) func ( obj interface { } ) {
2017-05-05 06:01:08 -04:00
return func ( originalObj interface { } ) {
2017-08-15 08:14:21 -04:00
originalNode , isNode := originalObj . ( * v1 . Node )
2017-05-05 06:01:08 -04:00
// We can get DeletedFinalStateUnknown instead of *v1.Node here and
// we need to handle that correctly. #34692
if ! isNode {
2017-08-15 08:14:21 -04:00
deletedState , ok := originalObj . ( cache . DeletedFinalStateUnknown )
2017-05-05 06:01:08 -04:00
if ! ok {
2017-08-15 08:14:21 -04:00
glog . Errorf ( "Received unexpected object: %v" , originalObj )
2017-05-05 06:01:08 -04:00
return
}
2017-08-15 08:14:21 -04:00
originalNode , ok = deletedState . Obj . ( * v1 . Node )
2017-05-05 06:01:08 -04:00
if ! ok {
glog . Errorf ( "DeletedFinalStateUnknown contained non-Node object: %v" , deletedState . Obj )
return
}
}
2017-08-15 08:14:21 -04:00
node := originalNode . DeepCopy ( )
2017-05-05 06:01:08 -04:00
if err := f ( node ) ; err != nil {
utilruntime . HandleError ( fmt . Errorf ( "Error while processing Node Add/Delete: %v" , err ) )
}
}
}