2016-08-15 23:04:56 -04:00
/ *
Copyright 2016 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2017-04-07 15:07:03 -04:00
package sync
2016-08-15 23:04:56 -04:00
import (
2016-11-05 05:08:08 -04:00
"fmt"
2016-08-15 23:04:56 -04:00
"time"
2017-07-15 01:25:54 -04:00
"k8s.io/api/core/v1"
2017-01-13 12:48:50 -05:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-21 22:36:02 -05:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2017-01-11 09:09:48 -05:00
pkgruntime "k8s.io/apimachinery/pkg/runtime"
2017-05-15 14:19:34 -04:00
"k8s.io/apimachinery/pkg/util/runtime"
2017-04-26 21:02:42 -04:00
"k8s.io/apimachinery/pkg/util/wait"
2017-01-11 09:09:48 -05:00
"k8s.io/apimachinery/pkg/watch"
2017-06-23 16:56:37 -04:00
kubeclientset "k8s.io/client-go/kubernetes"
2017-03-14 15:10:05 -04:00
restclient "k8s.io/client-go/rest"
2017-01-24 09:11:51 -05:00
"k8s.io/client-go/tools/cache"
2017-01-30 13:39:54 -05:00
"k8s.io/client-go/tools/record"
2017-01-23 13:37:22 -05:00
"k8s.io/client-go/util/flowcontrol"
2017-04-26 21:02:42 -04:00
"k8s.io/client-go/util/workqueue"
2016-11-30 02:27:27 -05:00
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
2016-12-14 12:57:24 -05:00
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
2017-04-07 00:09:56 -04:00
"k8s.io/kubernetes/federation/pkg/federatedtypes"
2016-08-15 23:04:56 -04:00
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
2017-01-19 14:02:13 -05:00
"k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector"
2016-11-05 05:08:08 -04:00
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
2016-08-30 11:30:29 -04:00
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
2016-08-15 23:04:56 -04:00
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
)
const (
allClustersKey = "ALL_CLUSTERS"
)
2017-03-22 14:52:50 -04:00
// FederationSyncController synchronizes the state of a federated type
// to clusters that are members of the federation.
type FederationSyncController struct {
// For triggering reconciliation of a single resource. This is
// used when there is an add/update/delete operation on a resource
// in either federated API server or in some member of the
// federation.
deliverer * util . DelayingDeliverer
// For triggering reconciliation of all target resources. This is
// used when a new cluster becomes available.
2016-08-15 23:04:56 -04:00
clusterDeliverer * util . DelayingDeliverer
2017-03-22 14:52:50 -04:00
// Contains resources present in members of federation.
informer util . FederatedInformer
2016-08-15 23:04:56 -04:00
// For updating members of federation.
2017-03-22 14:52:50 -04:00
updater util . FederatedUpdater
// Definitions of resources that should be federated.
store cache . Store
// Informer controller for resources that should be federated.
controller cache . Controller
2016-08-15 23:04:56 -04:00
2017-04-26 21:02:42 -04:00
// Work queue allowing parallel processing of resources
workQueue workqueue . Interface
2017-03-22 14:52:50 -04:00
// Backoff manager
backoff * flowcontrol . Backoff
2016-08-15 23:04:56 -04:00
2016-08-30 11:30:29 -04:00
// For events
eventRecorder record . EventRecorder
2016-11-05 05:08:08 -04:00
deletionHelper * deletionhelper . DeletionHelper
2017-05-26 15:23:16 -04:00
reviewDelay time . Duration
clusterAvailableDelay time . Duration
clusterUnavailableDelay time . Duration
smallDelay time . Duration
updateTimeout time . Duration
2017-02-05 23:28:20 -05:00
2017-04-07 00:09:56 -04:00
adapter federatedtypes . FederatedTypeAdapter
2016-08-15 23:04:56 -04:00
}
2017-04-06 22:57:31 -04:00
// StartFederationSyncController starts a new sync controller for a type adapter
2017-07-25 11:40:26 -04:00
func StartFederationSyncController ( kind string , adapterFactory federatedtypes . AdapterFactory , config * restclient . Config , stopChan <- chan struct { } , minimizeLatency bool , adapterSpecificArgs map [ string ] interface { } ) {
2017-05-02 03:05:24 -04:00
restclient . AddUserAgent ( config , fmt . Sprintf ( "federation-%s-controller" , kind ) )
2017-03-14 15:10:05 -04:00
client := federationclientset . NewForConfigOrDie ( config )
2017-07-25 11:40:26 -04:00
adapter := adapterFactory ( client , config , adapterSpecificArgs )
2017-03-22 14:52:50 -04:00
controller := newFederationSyncController ( client , adapter )
2017-03-14 15:10:05 -04:00
if minimizeLatency {
controller . minimizeLatency ( )
}
2017-04-06 22:57:31 -04:00
glog . Infof ( fmt . Sprintf ( "Starting federated sync controller for %s resources" , kind ) )
2017-03-14 15:10:05 -04:00
controller . Run ( stopChan )
}
2017-03-22 14:52:50 -04:00
// newFederationSyncController returns a new sync controller for the given client and type adapter
2017-04-07 00:09:56 -04:00
func newFederationSyncController ( client federationclientset . Interface , adapter federatedtypes . FederatedTypeAdapter ) * FederationSyncController {
2016-08-30 11:30:29 -04:00
broadcaster := record . NewBroadcaster ( )
broadcaster . StartRecordingToSink ( eventsink . NewFederatedEventSink ( client ) )
2017-07-15 01:25:54 -04:00
recorder := broadcaster . NewRecorder ( api . Scheme , v1 . EventSource { Component : fmt . Sprintf ( "federation-%v-controller" , adapter . Kind ( ) ) } )
2016-08-30 11:30:29 -04:00
2017-03-22 14:52:50 -04:00
s := & FederationSyncController {
2017-05-26 15:23:16 -04:00
reviewDelay : time . Second * 10 ,
clusterAvailableDelay : time . Second * 20 ,
clusterUnavailableDelay : time . Second * 60 ,
smallDelay : time . Second * 3 ,
updateTimeout : time . Second * 30 ,
workQueue : workqueue . New ( ) ,
backoff : flowcontrol . NewBackOff ( 5 * time . Second , time . Minute ) ,
eventRecorder : recorder ,
adapter : adapter ,
2016-08-15 23:04:56 -04:00
}
2016-09-08 07:45:31 -04:00
// Build delivereres for triggering reconciliations.
2017-03-22 14:52:50 -04:00
s . deliverer = util . NewDelayingDeliverer ( )
s . clusterDeliverer = util . NewDelayingDeliverer ( )
2016-08-15 23:04:56 -04:00
2017-03-22 14:52:50 -04:00
// Start informer in federated API servers on the resource type that should be federated.
s . store , s . controller = cache . NewInformer (
2016-08-15 23:04:56 -04:00
& cache . ListWatch {
2017-01-21 22:36:02 -05:00
ListFunc : func ( options metav1 . ListOptions ) ( pkgruntime . Object , error ) {
2017-02-05 23:28:20 -05:00
return adapter . FedList ( metav1 . NamespaceAll , options )
2016-08-15 23:04:56 -04:00
} ,
2017-01-21 22:36:02 -05:00
WatchFunc : func ( options metav1 . ListOptions ) ( watch . Interface , error ) {
2017-02-05 23:28:20 -05:00
return adapter . FedWatch ( metav1 . NamespaceAll , options )
2016-08-15 23:04:56 -04:00
} ,
} ,
2017-02-05 23:28:20 -05:00
adapter . ObjectType ( ) ,
2016-08-15 23:04:56 -04:00
controller . NoResyncPeriodFunc ( ) ,
2017-03-22 14:52:50 -04:00
util . NewTriggerOnAllChanges ( func ( obj pkgruntime . Object ) { s . deliverObj ( obj , 0 , false ) } ) )
2016-08-15 23:04:56 -04:00
2017-03-22 14:52:50 -04:00
// Federated informer on the resource type in members of federation.
s . informer = util . NewFederatedInformer (
2016-08-15 23:04:56 -04:00
client ,
2017-01-12 08:45:53 -05:00
func ( cluster * federationapi . Cluster , targetClient kubeclientset . Interface ) ( cache . Store , cache . Controller ) {
2016-09-14 14:35:38 -04:00
return cache . NewInformer (
2016-08-15 23:04:56 -04:00
& cache . ListWatch {
2017-01-21 22:36:02 -05:00
ListFunc : func ( options metav1 . ListOptions ) ( pkgruntime . Object , error ) {
2017-02-05 23:28:20 -05:00
return adapter . ClusterList ( targetClient , metav1 . NamespaceAll , options )
2016-08-15 23:04:56 -04:00
} ,
2017-01-21 22:36:02 -05:00
WatchFunc : func ( options metav1 . ListOptions ) ( watch . Interface , error ) {
2017-02-05 23:28:20 -05:00
return adapter . ClusterWatch ( targetClient , metav1 . NamespaceAll , options )
2016-08-15 23:04:56 -04:00
} ,
} ,
2017-02-05 23:28:20 -05:00
adapter . ObjectType ( ) ,
2016-08-15 23:04:56 -04:00
controller . NoResyncPeriodFunc ( ) ,
2016-09-08 07:45:31 -04:00
// Trigger reconciliation whenever something in federated cluster is changed. In most cases it
2017-03-22 14:52:50 -04:00
// would be just confirmation that some operation on the target resource type had succeeded.
2016-08-23 05:45:01 -04:00
util . NewTriggerOnAllChanges (
2016-11-30 02:27:27 -05:00
func ( obj pkgruntime . Object ) {
2017-03-22 14:52:50 -04:00
s . deliverObj ( obj , s . reviewDelay , false )
2016-08-15 23:04:56 -04:00
} ,
) )
} ,
& util . ClusterLifecycleHandlerFuncs {
2016-11-30 02:27:27 -05:00
ClusterAvailable : func ( cluster * federationapi . Cluster ) {
2017-03-22 14:52:50 -04:00
// When new cluster becomes available process all the target resources again.
s . clusterDeliverer . DeliverAt ( allClustersKey , nil , time . Now ( ) . Add ( s . clusterAvailableDelay ) )
2016-08-15 23:04:56 -04:00
} ,
2017-05-26 15:23:16 -04:00
// When a cluster becomes unavailable process all the target resources again.
ClusterUnavailable : func ( cluster * federationapi . Cluster , _ [ ] interface { } ) {
s . clusterDeliverer . DeliverAt ( allClustersKey , nil , time . Now ( ) . Add ( s . clusterUnavailableDelay ) )
} ,
2016-08-15 23:04:56 -04:00
} ,
)
// Federated updeater along with Create/Update/Delete operations.
2017-05-04 16:37:33 -04:00
s . updater = util . NewFederatedUpdater ( s . informer , adapter . Kind ( ) , s . updateTimeout , s . eventRecorder ,
2016-11-30 02:27:27 -05:00
func ( client kubeclientset . Interface , obj pkgruntime . Object ) error {
2017-02-05 23:28:20 -05:00
_ , err := adapter . ClusterCreate ( client , obj )
2016-08-15 23:04:56 -04:00
return err
} ,
2016-11-30 02:27:27 -05:00
func ( client kubeclientset . Interface , obj pkgruntime . Object ) error {
2017-02-05 23:28:20 -05:00
_ , err := adapter . ClusterUpdate ( client , obj )
2016-08-15 23:04:56 -04:00
return err
} ,
2016-11-30 02:27:27 -05:00
func ( client kubeclientset . Interface , obj pkgruntime . Object ) error {
2017-06-21 21:47:26 -04:00
qualifiedName := adapter . QualifiedName ( obj )
2017-04-04 17:19:48 -04:00
orphanDependents := false
2017-06-21 21:47:26 -04:00
err := adapter . ClusterDelete ( client , qualifiedName , & metav1 . DeleteOptions { OrphanDependents : & orphanDependents } )
2016-08-15 23:04:56 -04:00
return err
} )
2016-11-05 05:08:08 -04:00
2017-03-22 14:52:50 -04:00
s . deletionHelper = deletionhelper . NewDeletionHelper (
2017-04-05 05:20:00 -04:00
s . updateObject ,
2016-11-05 05:08:08 -04:00
// objNameFunc
2016-11-30 02:27:27 -05:00
func ( obj pkgruntime . Object ) string {
2017-06-21 21:47:26 -04:00
return adapter . QualifiedName ( obj ) . String ( )
2016-11-05 05:08:08 -04:00
} ,
2017-03-22 14:52:50 -04:00
s . informer ,
s . updater ,
2016-11-05 05:08:08 -04:00
)
2017-03-22 14:52:50 -04:00
return s
2016-08-15 23:04:56 -04:00
}
2017-03-14 15:10:05 -04:00
// minimizeLatency reduces delays and timeouts to make the controller more responsive (useful for testing).
2017-03-22 14:52:50 -04:00
func ( s * FederationSyncController ) minimizeLatency ( ) {
s . clusterAvailableDelay = time . Second
2017-05-26 15:23:16 -04:00
s . clusterUnavailableDelay = time . Second
2017-03-22 14:52:50 -04:00
s . reviewDelay = 50 * time . Millisecond
s . smallDelay = 20 * time . Millisecond
s . updateTimeout = 5 * time . Second
2017-03-14 15:10:05 -04:00
}
2017-04-05 05:20:00 -04:00
// Sends the given updated object to apiserver.
func ( s * FederationSyncController ) updateObject ( obj pkgruntime . Object ) ( pkgruntime . Object , error ) {
return s . adapter . FedUpdate ( obj )
2016-11-05 05:08:08 -04:00
}
2017-03-22 14:52:50 -04:00
func ( s * FederationSyncController ) Run ( stopChan <- chan struct { } ) {
go s . controller . Run ( stopChan )
s . informer . Start ( )
s . deliverer . StartWithHandler ( func ( item * util . DelayingDelivererItem ) {
2017-04-26 21:02:42 -04:00
s . workQueue . Add ( item )
2016-08-15 23:04:56 -04:00
} )
2017-03-22 14:52:50 -04:00
s . clusterDeliverer . StartWithHandler ( func ( _ * util . DelayingDelivererItem ) {
s . reconcileOnClusterChange ( )
2016-08-15 23:04:56 -04:00
} )
2017-04-26 21:02:42 -04:00
// TODO: Allow multiple workers.
go wait . Until ( s . worker , time . Second , stopChan )
2017-03-22 14:52:50 -04:00
util . StartBackoffGC ( s . backoff , stopChan )
2017-05-11 14:33:36 -04:00
// Ensure all goroutines are cleaned up when the stop channel closes
go func ( ) {
<- stopChan
s . informer . Stop ( )
s . workQueue . ShutDown ( )
s . deliverer . Stop ( )
s . clusterDeliverer . Stop ( )
} ( )
2016-08-15 23:04:56 -04:00
}
2017-04-26 21:02:42 -04:00
type reconciliationStatus int
const (
statusAllOK reconciliationStatus = iota
statusNeedsRecheck
statusError
statusNotSynced
)
func ( s * FederationSyncController ) worker ( ) {
for {
obj , quit := s . workQueue . Get ( )
if quit {
return
}
item := obj . ( * util . DelayingDelivererItem )
2017-06-21 21:47:26 -04:00
qualifiedName := item . Value . ( * federatedtypes . QualifiedName )
status := s . reconcile ( * qualifiedName )
2017-04-26 21:02:42 -04:00
s . workQueue . Done ( item )
switch status {
case statusAllOK :
break
case statusError :
2017-06-21 21:47:26 -04:00
s . deliver ( * qualifiedName , 0 , true )
2017-04-26 21:02:42 -04:00
case statusNeedsRecheck :
2017-06-21 21:47:26 -04:00
s . deliver ( * qualifiedName , s . reviewDelay , false )
2017-04-26 21:02:42 -04:00
case statusNotSynced :
2017-06-21 21:47:26 -04:00
s . deliver ( * qualifiedName , s . clusterAvailableDelay , false )
2017-04-26 21:02:42 -04:00
}
}
}
2017-03-22 14:52:50 -04:00
func ( s * FederationSyncController ) deliverObj ( obj pkgruntime . Object , delay time . Duration , failed bool ) {
2017-06-21 21:47:26 -04:00
qualifiedName := s . adapter . QualifiedName ( obj )
s . deliver ( qualifiedName , delay , failed )
2016-08-15 23:04:56 -04:00
}
// Adds backoff to delay if this delivery is related to some failure. Resets backoff if there was no failure.
2017-06-21 21:47:26 -04:00
func ( s * FederationSyncController ) deliver ( qualifiedName federatedtypes . QualifiedName , delay time . Duration , failed bool ) {
key := qualifiedName . String ( )
2016-08-15 23:04:56 -04:00
if failed {
2017-03-22 14:52:50 -04:00
s . backoff . Next ( key , time . Now ( ) )
delay = delay + s . backoff . Get ( key )
2016-08-15 23:04:56 -04:00
} else {
2017-03-22 14:52:50 -04:00
s . backoff . Reset ( key )
2016-08-15 23:04:56 -04:00
}
2017-06-21 21:47:26 -04:00
s . deliverer . DeliverAfter ( key , & qualifiedName , delay )
2016-08-15 23:04:56 -04:00
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
2016-09-08 07:45:31 -04:00
// synced with the corresponding api server.
2017-03-22 14:52:50 -04:00
func ( s * FederationSyncController ) isSynced ( ) bool {
if ! s . informer . ClustersSynced ( ) {
2016-08-15 23:04:56 -04:00
glog . V ( 2 ) . Infof ( "Cluster list not synced" )
return false
}
2017-03-22 14:52:50 -04:00
clusters , err := s . informer . GetReadyClusters ( )
2016-08-15 23:04:56 -04:00
if err != nil {
2017-05-15 14:19:34 -04:00
runtime . HandleError ( fmt . Errorf ( "Failed to get ready clusters: %v" , err ) )
2016-08-15 23:04:56 -04:00
return false
}
2017-03-22 14:52:50 -04:00
if ! s . informer . GetTargetStore ( ) . ClustersSynced ( clusters ) {
2016-08-15 23:04:56 -04:00
return false
}
return true
}
2017-03-22 14:52:50 -04:00
// The function triggers reconciliation of all target federated resources.
func ( s * FederationSyncController ) reconcileOnClusterChange ( ) {
if ! s . isSynced ( ) {
s . clusterDeliverer . DeliverAt ( allClustersKey , nil , time . Now ( ) . Add ( s . clusterAvailableDelay ) )
2016-08-15 23:04:56 -04:00
}
2017-03-22 14:52:50 -04:00
for _ , obj := range s . store . List ( ) {
2017-06-21 21:47:26 -04:00
qualifiedName := s . adapter . QualifiedName ( obj . ( pkgruntime . Object ) )
s . deliver ( qualifiedName , s . smallDelay , false )
2016-08-15 23:04:56 -04:00
}
}
2017-06-21 21:47:26 -04:00
func ( s * FederationSyncController ) reconcile ( qualifiedName federatedtypes . QualifiedName ) reconciliationStatus {
2017-03-22 14:52:50 -04:00
if ! s . isSynced ( ) {
2017-04-26 21:02:42 -04:00
return statusNotSynced
2016-08-15 23:04:56 -04:00
}
2017-03-22 14:52:50 -04:00
kind := s . adapter . Kind ( )
2017-06-21 21:47:26 -04:00
key := qualifiedName . String ( )
2017-05-11 14:33:36 -04:00
2017-05-26 15:23:16 -04:00
glog . V ( 4 ) . Infof ( "Starting to reconcile %v %v" , kind , key )
startTime := time . Now ( )
defer glog . V ( 4 ) . Infof ( "Finished reconciling %v %v (duration: %v)" , kind , key , time . Now ( ) . Sub ( startTime ) )
2017-05-11 14:33:36 -04:00
obj , err := s . objFromCache ( kind , key )
2016-08-15 23:04:56 -04:00
if err != nil {
2017-04-26 21:02:42 -04:00
return statusError
2016-08-15 23:04:56 -04:00
}
2017-05-11 14:33:36 -04:00
if obj == nil {
2017-04-26 21:02:42 -04:00
return statusAllOK
2016-08-15 23:04:56 -04:00
}
2016-11-05 05:08:08 -04:00
2017-03-22 14:52:50 -04:00
meta := s . adapter . ObjectMeta ( obj )
2017-02-05 23:28:20 -05:00
if meta . DeletionTimestamp != nil {
2017-06-21 21:47:26 -04:00
err := s . delete ( obj , kind , qualifiedName )
2017-05-11 14:33:36 -04:00
if err != nil {
msg := "Failed to delete %s %q: %v"
2017-06-21 21:47:26 -04:00
args := [ ] interface { } { kind , qualifiedName , err }
2017-05-15 14:19:34 -04:00
runtime . HandleError ( fmt . Errorf ( msg , args ... ) )
2017-05-11 14:33:36 -04:00
s . eventRecorder . Eventf ( obj , api . EventTypeWarning , "DeleteFailed" , msg , args ... )
2017-04-26 21:02:42 -04:00
return statusError
2016-11-05 05:08:08 -04:00
}
2017-04-26 21:02:42 -04:00
return statusAllOK
2016-11-05 05:08:08 -04:00
}
2017-05-04 17:40:43 -04:00
glog . V ( 3 ) . Infof ( "Ensuring finalizers exist on %s %q" , kind , key )
2017-03-22 14:52:50 -04:00
obj , err = s . deletionHelper . EnsureFinalizers ( obj )
2016-11-05 05:08:08 -04:00
if err != nil {
2017-05-15 14:19:34 -04:00
runtime . HandleError ( fmt . Errorf ( "Failed to ensure finalizers for %s %q: %v" , kind , key , err ) )
2017-04-26 21:02:42 -04:00
return statusError
2016-11-05 05:08:08 -04:00
}
2017-06-28 10:11:04 -04:00
operationsAccessor := func ( adapter federatedtypes . FederatedTypeAdapter , selectedClusters [ ] * federationapi . Cluster , unselectedClusters [ ] * federationapi . Cluster , obj pkgruntime . Object , schedulingInfo interface { } ) ( [ ] util . FederatedOperation , error ) {
2017-05-26 15:04:13 -04:00
operations , err := clusterOperations ( adapter , selectedClusters , unselectedClusters , obj , key , schedulingInfo , func ( clusterName string ) ( interface { } , bool , error ) {
2017-05-04 17:40:43 -04:00
return s . informer . GetTargetStore ( ) . GetByKey ( clusterName , key )
2017-05-22 18:09:29 -04:00
} )
2017-01-19 14:02:13 -05:00
if err != nil {
s . eventRecorder . Eventf ( obj , api . EventTypeWarning , "FedClusterOperationsError" , "Error obtaining sync operations for %s: %s error: %s" , kind , key , err . Error ( ) )
}
return operations , err
2016-08-15 23:04:56 -04:00
}
2017-01-19 14:02:13 -05:00
2017-05-04 17:40:43 -04:00
return syncToClusters (
s . informer . GetReadyClusters ,
operationsAccessor ,
2017-05-22 18:09:29 -04:00
selectedClusters ,
2017-05-04 17:40:43 -04:00
s . updater . Update ,
s . adapter ,
2017-05-26 15:04:13 -04:00
s . informer ,
2017-05-04 17:40:43 -04:00
obj ,
)
2016-08-15 23:04:56 -04:00
}
2016-11-05 05:08:08 -04:00
2017-05-11 14:33:36 -04:00
func ( s * FederationSyncController ) objFromCache ( kind , key string ) ( pkgruntime . Object , error ) {
cachedObj , exist , err := s . store . GetByKey ( key )
if err != nil {
2017-05-15 14:19:34 -04:00
wrappedErr := fmt . Errorf ( "Failed to query %s store for %q: %v" , kind , key , err )
runtime . HandleError ( wrappedErr )
return nil , err
2017-05-11 14:33:36 -04:00
}
if ! exist {
return nil , nil
}
// Create a copy before modifying the resource to prevent racing with other readers.
copiedObj , err := api . Scheme . DeepCopy ( cachedObj )
if err != nil {
2017-05-15 14:19:34 -04:00
wrappedErr := fmt . Errorf ( "Error in retrieving %s %q from store: %v" , kind , key , err )
runtime . HandleError ( wrappedErr )
return nil , err
2017-05-11 14:33:36 -04:00
}
if ! s . adapter . IsExpectedType ( copiedObj ) {
2017-05-15 14:19:34 -04:00
err = fmt . Errorf ( "Object is not the expected type: %v" , copiedObj )
runtime . HandleError ( err )
return nil , err
2017-05-11 14:33:36 -04:00
}
return copiedObj . ( pkgruntime . Object ) , nil
}
2017-03-22 14:52:50 -04:00
// delete deletes the given resource or returns error if the deletion was not complete.
2017-06-21 21:47:26 -04:00
func ( s * FederationSyncController ) delete ( obj pkgruntime . Object , kind string , qualifiedName federatedtypes . QualifiedName ) error {
glog . V ( 3 ) . Infof ( "Handling deletion of %s %q" , kind , qualifiedName )
2017-06-22 00:45:26 -04:00
// Perform pre-deletion cleanup for the namespace adapter
namespaceAdapter , ok := s . adapter . ( * federatedtypes . NamespaceAdapter )
if ok {
var err error
obj , err = namespaceAdapter . CleanUpNamespace ( obj , s . eventRecorder )
if err != nil {
return err
}
}
2017-03-22 14:52:50 -04:00
_ , err := s . deletionHelper . HandleObjectInUnderlyingClusters ( obj )
2016-11-05 05:08:08 -04:00
if err != nil {
return err
}
2017-06-21 21:47:26 -04:00
err = s . adapter . FedDelete ( qualifiedName , nil )
2016-11-05 05:08:08 -04:00
if err != nil {
// Its all good if the error is not found error. That means it is deleted already and we do not have to do anything.
2017-03-22 14:52:50 -04:00
// This is expected when we are processing an update as a result of finalizer deletion.
// The process that deleted the last finalizer is also going to delete the resource and we do not have to do anything.
2016-11-05 05:08:08 -04:00
if ! errors . IsNotFound ( err ) {
2017-05-11 14:33:36 -04:00
return err
2016-11-05 05:08:08 -04:00
}
}
return nil
}
2017-05-11 14:33:36 -04:00
2017-05-04 17:40:43 -04:00
type clustersAccessorFunc func ( ) ( [ ] * federationapi . Cluster , error )
2017-06-28 10:11:04 -04:00
type operationsFunc func ( federatedtypes . FederatedTypeAdapter , [ ] * federationapi . Cluster , [ ] * federationapi . Cluster , pkgruntime . Object , interface { } ) ( [ ] util . FederatedOperation , error )
2017-05-22 18:09:29 -04:00
type clusterSelectorFunc func ( * metav1 . ObjectMeta , func ( map [ string ] string , map [ string ] string ) ( bool , error ) , [ ] * federationapi . Cluster ) ( [ ] * federationapi . Cluster , [ ] * federationapi . Cluster , error )
2017-05-04 17:40:43 -04:00
type executionFunc func ( [ ] util . FederatedOperation ) error
// syncToClusters ensures that the state of the given object is synchronized to member clusters.
2017-05-26 15:04:13 -04:00
func syncToClusters ( clustersAccessor clustersAccessorFunc , operationsAccessor operationsFunc , selector clusterSelectorFunc , execute executionFunc , adapter federatedtypes . FederatedTypeAdapter , informer util . FederatedInformer , obj pkgruntime . Object ) reconciliationStatus {
2017-05-04 17:40:43 -04:00
kind := adapter . Kind ( )
key := federatedtypes . ObjectKey ( adapter , obj )
glog . V ( 3 ) . Infof ( "Syncing %s %q in underlying clusters" , kind , key )
clusters , err := clustersAccessor ( )
if err != nil {
2017-05-15 14:19:34 -04:00
runtime . HandleError ( fmt . Errorf ( "Failed to get cluster list: %v" , err ) )
2017-05-04 17:40:43 -04:00
return statusNotSynced
}
2017-05-22 18:09:29 -04:00
selectedClusters , unselectedClusters , err := selector ( adapter . ObjectMeta ( obj ) , clusterselector . SendToCluster , clusters )
if err != nil {
return statusError
}
2017-06-28 10:11:04 -04:00
var schedulingInfo interface { }
2017-05-26 15:04:13 -04:00
if adapter . IsSchedulingAdapter ( ) {
schedulingAdapter , ok := adapter . ( federatedtypes . SchedulingAdapter )
if ! ok {
glog . Fatalf ( "Adapter for kind %q does not properly implement SchedulingAdapter." , kind )
}
schedulingInfo , err = schedulingAdapter . GetSchedule ( obj , key , selectedClusters , informer )
if err != nil {
runtime . HandleError ( fmt . Errorf ( "adapter.GetSchedule() failed on adapter for %s %q: %v" , kind , key , err ) )
return statusError
}
}
operations , err := operationsAccessor ( adapter , selectedClusters , unselectedClusters , obj , schedulingInfo )
2017-05-04 17:40:43 -04:00
if err != nil {
return statusError
}
2017-05-26 15:04:13 -04:00
if adapter . IsSchedulingAdapter ( ) {
schedulingAdapter , ok := adapter . ( federatedtypes . SchedulingAdapter )
if ! ok {
glog . Fatalf ( "Adapter for kind %q does not properly implement SchedulingAdapter." , kind )
}
2016-12-05 13:59:40 -05:00
err = schedulingAdapter . UpdateFederatedStatus ( obj , schedulingInfo )
2017-05-26 15:04:13 -04:00
if err != nil {
runtime . HandleError ( fmt . Errorf ( "adapter.UpdateFinished() failed on adapter for %s %q: %v" , kind , key , err ) )
return statusError
}
}
2017-05-04 17:40:43 -04:00
if len ( operations ) == 0 {
return statusAllOK
}
err = execute ( operations )
if err != nil {
2017-05-15 14:19:34 -04:00
runtime . HandleError ( fmt . Errorf ( "Failed to execute updates for %s %q: %v" , kind , key , err ) )
2017-05-04 17:40:43 -04:00
return statusError
}
2017-05-26 15:04:13 -04:00
// Everything is in order but let's be double sure
2017-05-04 17:40:43 -04:00
return statusNeedsRecheck
}
2017-05-22 18:09:29 -04:00
// selectedClusters filters the provided clusters into two slices, one containing the clusters selected by selector and the other containing the rest of the provided clusters.
func selectedClusters ( objMeta * metav1 . ObjectMeta , selector func ( map [ string ] string , map [ string ] string ) ( bool , error ) , clusters [ ] * federationapi . Cluster ) ( [ ] * federationapi . Cluster , [ ] * federationapi . Cluster , error ) {
selectedClusters := [ ] * federationapi . Cluster { }
unselectedClusters := [ ] * federationapi . Cluster { }
for _ , cluster := range clusters {
send , err := selector ( cluster . Labels , objMeta . Annotations )
if err != nil {
return nil , nil , err
} else if ! send {
unselectedClusters = append ( unselectedClusters , cluster )
} else {
selectedClusters = append ( selectedClusters , cluster )
}
}
return selectedClusters , unselectedClusters , nil
}
2017-05-04 17:40:43 -04:00
type clusterObjectAccessorFunc func ( clusterName string ) ( interface { } , bool , error )
2017-05-11 14:33:36 -04:00
// clusterOperations returns the list of operations needed to synchronize the state of the given object to the provided clusters
2017-06-28 10:11:04 -04:00
func clusterOperations ( adapter federatedtypes . FederatedTypeAdapter , selectedClusters [ ] * federationapi . Cluster , unselectedClusters [ ] * federationapi . Cluster , obj pkgruntime . Object , key string , schedulingInfo interface { } , accessor clusterObjectAccessorFunc ) ( [ ] util . FederatedOperation , error ) {
2017-05-11 14:33:36 -04:00
operations := make ( [ ] util . FederatedOperation , 0 )
2017-01-19 14:02:13 -05:00
2017-05-26 15:04:13 -04:00
kind := adapter . Kind ( )
2017-05-22 18:09:29 -04:00
for _ , cluster := range selectedClusters {
2017-05-26 15:04:13 -04:00
// The data should not be modified.
desiredObj := adapter . Copy ( obj )
2017-05-11 14:33:36 -04:00
clusterObj , found , err := accessor ( cluster . Name )
if err != nil {
2017-05-26 15:04:13 -04:00
wrappedErr := fmt . Errorf ( "Failed to get %s %q from cluster %q: %v" , kind , key , cluster . Name , err )
2017-05-15 14:19:34 -04:00
runtime . HandleError ( wrappedErr )
return nil , wrappedErr
2017-05-11 14:33:36 -04:00
}
2017-01-19 14:02:13 -05:00
2016-12-05 13:59:40 -05:00
var scheduleAction federatedtypes . ScheduleAction = federatedtypes . ActionAdd
2017-05-26 15:04:13 -04:00
if adapter . IsSchedulingAdapter ( ) {
schedulingAdapter , ok := adapter . ( federatedtypes . SchedulingAdapter )
if ! ok {
err = fmt . Errorf ( "adapter for kind %s does not properly implement SchedulingAdapter." , kind )
glog . Fatalf ( "Error: %v" , err )
}
var clusterTypedObj pkgruntime . Object = nil
if clusterObj != nil {
clusterTypedObj = clusterObj . ( pkgruntime . Object )
}
2016-12-05 13:59:40 -05:00
desiredObj , scheduleAction , err = schedulingAdapter . ScheduleObject ( cluster , clusterTypedObj , desiredObj , schedulingInfo )
2017-05-26 15:04:13 -04:00
if err != nil {
runtime . HandleError ( err )
return nil , err
}
}
2017-05-11 14:33:36 -04:00
var operationType util . FederatedOperationType = ""
2017-05-22 18:09:29 -04:00
if found {
2016-12-05 13:59:40 -05:00
if scheduleAction == federatedtypes . ActionDelete {
operationType = util . OperationTypeDelete
} else {
clusterObj := clusterObj . ( pkgruntime . Object )
if ! adapter . Equivalent ( desiredObj , clusterObj ) {
operationType = util . OperationTypeUpdate
}
2017-05-11 14:33:36 -04:00
}
2016-12-05 13:59:40 -05:00
} else if scheduleAction == federatedtypes . ActionAdd {
2017-05-11 14:33:36 -04:00
operationType = util . OperationTypeAdd
}
2017-01-19 14:02:13 -05:00
2017-05-11 14:33:36 -04:00
if len ( operationType ) > 0 {
operations = append ( operations , util . FederatedOperation {
Type : operationType ,
Obj : desiredObj ,
ClusterName : cluster . Name ,
Key : key ,
} )
}
}
2017-05-22 18:09:29 -04:00
for _ , cluster := range unselectedClusters {
2017-05-26 15:04:13 -04:00
clusterObj , found , err := accessor ( cluster . Name )
2017-05-22 18:09:29 -04:00
if err != nil {
2017-05-26 15:04:13 -04:00
wrappedErr := fmt . Errorf ( "Failed to get %s %q from cluster %q: %v" , kind , key , cluster . Name , err )
2017-05-22 18:09:29 -04:00
runtime . HandleError ( wrappedErr )
return nil , wrappedErr
}
if found {
operations = append ( operations , util . FederatedOperation {
Type : util . OperationTypeDelete ,
2017-05-26 15:04:13 -04:00
Obj : clusterObj . ( pkgruntime . Object ) ,
2017-05-22 18:09:29 -04:00
ClusterName : cluster . Name ,
Key : key ,
} )
}
}
2017-05-11 14:33:36 -04:00
return operations , nil
}