|
| 1 | +package servingcert |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "time" |
| 6 | + |
| 7 | + "github.com/golang/glog" |
| 8 | + |
| 9 | + kapi "k8s.io/kubernetes/pkg/api" |
| 10 | + "k8s.io/kubernetes/pkg/client/cache" |
| 11 | + kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" |
| 12 | + "k8s.io/kubernetes/pkg/controller" |
| 13 | + "k8s.io/kubernetes/pkg/controller/framework" |
| 14 | + "k8s.io/kubernetes/pkg/runtime" |
| 15 | + utilruntime "k8s.io/kubernetes/pkg/util/runtime" |
| 16 | + "k8s.io/kubernetes/pkg/util/sets" |
| 17 | + "k8s.io/kubernetes/pkg/util/wait" |
| 18 | + "k8s.io/kubernetes/pkg/util/workqueue" |
| 19 | + "k8s.io/kubernetes/pkg/watch" |
| 20 | + |
| 21 | + "github.com/openshift/origin/pkg/cmd/server/crypto" |
| 22 | +) |
| 23 | + |
| 24 | +// ServiceServingCertUpdateController is responsible for synchronizing Service objects stored |
| 25 | +// in the system with actual running replica sets and pods. |
| 26 | +type ServiceServingCertUpdateController struct { |
| 27 | + secretClient kcoreclient.SecretsGetter |
| 28 | + |
| 29 | + // Services that need to be checked |
| 30 | + queue workqueue.RateLimitingInterface |
| 31 | + |
| 32 | + serviceCache cache.Store |
| 33 | + serviceController *framework.Controller |
| 34 | + serviceHasSynced informerSynced |
| 35 | + |
| 36 | + secretCache cache.Store |
| 37 | + secretController *framework.Controller |
| 38 | + secretHasSynced informerSynced |
| 39 | + |
| 40 | + ca *crypto.CA |
| 41 | + publicCert string |
| 42 | + dnsSuffix string |
| 43 | + // minTimeLeftForCert is how much time is remaining for the serving cert before regenerating it. |
| 44 | + minTimeLeftForCert time.Duration |
| 45 | + |
| 46 | + // syncHandler does the work. It's factored out for unit testing |
| 47 | + syncHandler func(serviceKey string) error |
| 48 | +} |
| 49 | + |
| 50 | +// NewServiceServingCertUpdateController creates a new ServiceServingCertUpdateController. |
| 51 | +// TODO this should accept a shared informer |
| 52 | +func NewServiceServingCertUpdateController(serviceClient kcoreclient.ServicesGetter, secretClient kcoreclient.SecretsGetter, ca *crypto.CA, dnsSuffix string, resyncInterval time.Duration) *ServiceServingCertUpdateController { |
| 53 | + sc := &ServiceServingCertUpdateController{ |
| 54 | + secretClient: secretClient, |
| 55 | + |
| 56 | + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), |
| 57 | + |
| 58 | + ca: ca, |
| 59 | + dnsSuffix: dnsSuffix, |
| 60 | + // TODO base the expiry time on a percentage of the time for the lifespan of the cert |
| 61 | + minTimeLeftForCert: 1 * time.Hour, |
| 62 | + } |
| 63 | + |
| 64 | + sc.serviceCache, sc.serviceController = framework.NewInformer( |
| 65 | + &cache.ListWatch{ |
| 66 | + ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { |
| 67 | + return serviceClient.Services(kapi.NamespaceAll).List(options) |
| 68 | + }, |
| 69 | + WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { |
| 70 | + return serviceClient.Services(kapi.NamespaceAll).Watch(options) |
| 71 | + }, |
| 72 | + }, |
| 73 | + &kapi.Service{}, |
| 74 | + resyncInterval, |
| 75 | + framework.ResourceEventHandlerFuncs{}, |
| 76 | + ) |
| 77 | + sc.serviceHasSynced = sc.serviceController.HasSynced |
| 78 | + |
| 79 | + sc.secretCache, sc.secretController = framework.NewInformer( |
| 80 | + &cache.ListWatch{ |
| 81 | + ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { |
| 82 | + return sc.secretClient.Secrets(kapi.NamespaceAll).List(options) |
| 83 | + }, |
| 84 | + WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { |
| 85 | + return sc.secretClient.Secrets(kapi.NamespaceAll).Watch(options) |
| 86 | + }, |
| 87 | + }, |
| 88 | + &kapi.Secret{}, |
| 89 | + resyncInterval, |
| 90 | + framework.ResourceEventHandlerFuncs{ |
| 91 | + AddFunc: sc.addSecret, |
| 92 | + UpdateFunc: sc.updateSecret, |
| 93 | + }, |
| 94 | + ) |
| 95 | + sc.secretHasSynced = sc.secretController.HasSynced |
| 96 | + |
| 97 | + sc.syncHandler = sc.syncSecret |
| 98 | + |
| 99 | + return sc |
| 100 | +} |
| 101 | + |
| 102 | +// Run begins watching and syncing. |
| 103 | +func (sc *ServiceServingCertUpdateController) Run(workers int, stopCh <-chan struct{}) { |
| 104 | + defer utilruntime.HandleCrash() |
| 105 | + defer glog.Infof("Shutting down service signing cert update controller") |
| 106 | + defer sc.queue.ShutDown() |
| 107 | + |
| 108 | + glog.Infof("starting service signing cert update controller") |
| 109 | + go sc.serviceController.Run(stopCh) |
| 110 | + go sc.secretController.Run(stopCh) |
| 111 | + |
| 112 | + if !waitForCacheSync(stopCh, sc.serviceHasSynced, sc.secretHasSynced) { |
| 113 | + return |
| 114 | + } |
| 115 | + |
| 116 | + for i := 0; i < workers; i++ { |
| 117 | + go wait.Until(sc.runWorker, time.Second, stopCh) |
| 118 | + } |
| 119 | + |
| 120 | + <-stopCh |
| 121 | +} |
| 122 | + |
| 123 | +// TODO this is all in the kube library after the 1.5 rebase |
| 124 | + |
| 125 | +// informerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. |
| 126 | +type informerSynced func() bool |
| 127 | + |
| 128 | +// syncedPollPeriod controls how often you look at the status of your sync funcs |
| 129 | +const syncedPollPeriod = 100 * time.Millisecond |
| 130 | + |
| 131 | +func waitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...informerSynced) bool { |
| 132 | + err := wait.PollUntil(syncedPollPeriod, |
| 133 | + func() (bool, error) { |
| 134 | + for _, syncFunc := range cacheSyncs { |
| 135 | + if !syncFunc() { |
| 136 | + return false, nil |
| 137 | + } |
| 138 | + } |
| 139 | + return true, nil |
| 140 | + }, |
| 141 | + stopCh) |
| 142 | + if err != nil { |
| 143 | + glog.V(2).Infof("stop requested") |
| 144 | + return false |
| 145 | + } |
| 146 | + |
| 147 | + glog.V(4).Infof("caches populated") |
| 148 | + return true |
| 149 | +} |
| 150 | + |
| 151 | +func (sc *ServiceServingCertUpdateController) enqueueSecret(obj interface{}) { |
| 152 | + key, err := controller.KeyFunc(obj) |
| 153 | + if err != nil { |
| 154 | + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) |
| 155 | + return |
| 156 | + } |
| 157 | + |
| 158 | + sc.queue.Add(key) |
| 159 | +} |
| 160 | + |
| 161 | +func (sc *ServiceServingCertUpdateController) addSecret(obj interface{}) { |
| 162 | + secret := obj.(*kapi.Secret) |
| 163 | + if len(secret.Annotations[ServiceNameAnnotation]) == 0 { |
| 164 | + return |
| 165 | + } |
| 166 | + |
| 167 | + glog.V(4).Infof("adding %s", secret.Name) |
| 168 | + sc.enqueueSecret(secret) |
| 169 | +} |
| 170 | + |
| 171 | +func (sc *ServiceServingCertUpdateController) updateSecret(old, cur interface{}) { |
| 172 | + secret := cur.(*kapi.Secret) |
| 173 | + if len(secret.Annotations[ServiceNameAnnotation]) == 0 { |
| 174 | + // if the current doesn't have a service name, check the old |
| 175 | + secret = old.(*kapi.Secret) |
| 176 | + if len(secret.Annotations[ServiceNameAnnotation]) == 0 { |
| 177 | + return |
| 178 | + } |
| 179 | + } |
| 180 | + |
| 181 | + glog.V(4).Infof("updating %s", secret.Name) |
| 182 | + sc.enqueueSecret(secret) |
| 183 | +} |
| 184 | + |
| 185 | +func (sc *ServiceServingCertUpdateController) runWorker() { |
| 186 | + for sc.processNextWorkItem() { |
| 187 | + } |
| 188 | +} |
| 189 | + |
| 190 | +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. |
| 191 | +func (sc *ServiceServingCertUpdateController) processNextWorkItem() bool { |
| 192 | + key, quit := sc.queue.Get() |
| 193 | + if quit { |
| 194 | + return false |
| 195 | + } |
| 196 | + defer sc.queue.Done(key) |
| 197 | + |
| 198 | + err := sc.syncHandler(key.(string)) |
| 199 | + if err == nil { |
| 200 | + sc.queue.Forget(key) |
| 201 | + return true |
| 202 | + } |
| 203 | + |
| 204 | + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) |
| 205 | + sc.queue.AddRateLimited(key) |
| 206 | + |
| 207 | + return true |
| 208 | +} |
| 209 | + |
| 210 | +// syncSecret will sync the service with the given key. |
| 211 | +// This function is not meant to be invoked concurrently with the same key. |
| 212 | +func (sc *ServiceServingCertUpdateController) syncSecret(key string) error { |
| 213 | + obj, exists, err := sc.secretCache.GetByKey(key) |
| 214 | + if err != nil { |
| 215 | + glog.V(4).Infof("Unable to retrieve service %v from store: %v", key, err) |
| 216 | + return err |
| 217 | + } |
| 218 | + if !exists { |
| 219 | + glog.V(4).Infof("Secret has been deleted %v", key) |
| 220 | + return nil |
| 221 | + } |
| 222 | + |
| 223 | + if !sc.requiresRegeneration(obj.(*kapi.Secret)) { |
| 224 | + return nil |
| 225 | + } |
| 226 | + |
| 227 | + // make a copy to avoid mutating cache state |
| 228 | + t, err := kapi.Scheme.DeepCopy(obj) |
| 229 | + if err != nil { |
| 230 | + return err |
| 231 | + } |
| 232 | + secret := t.(*kapi.Secret) |
| 233 | + |
| 234 | + dnsName := secret.Annotations[ServiceNameAnnotation] + "." + secret.Namespace + ".svc" |
| 235 | + fqDNSName := dnsName + "." + sc.dnsSuffix |
| 236 | + servingCert, err := sc.ca.MakeServerCert(sets.NewString(dnsName, fqDNSName)) |
| 237 | + if err != nil { |
| 238 | + return err |
| 239 | + } |
| 240 | + secret.Annotations[ServingCertExpiryAnnotation] = servingCert.Certs[0].NotAfter.Format(time.RFC3339) |
| 241 | + secret.Data[kapi.TLSCertKey], secret.Data[kapi.TLSPrivateKeyKey], err = servingCert.GetPEMBytes() |
| 242 | + if err != nil { |
| 243 | + return err |
| 244 | + } |
| 245 | + |
| 246 | + _, err = sc.secretClient.Secrets(secret.Namespace).Update(secret) |
| 247 | + return err |
| 248 | +} |
| 249 | + |
| 250 | +func (sc *ServiceServingCertUpdateController) requiresRegeneration(secret *kapi.Secret) bool { |
| 251 | + serviceName := secret.Annotations[ServiceNameAnnotation] |
| 252 | + if len(serviceName) == 0 { |
| 253 | + return false |
| 254 | + } |
| 255 | + |
| 256 | + serviceObj, exists, err := sc.serviceCache.GetByKey(secret.Namespace + "/" + serviceName) |
| 257 | + if err != nil { |
| 258 | + return false |
| 259 | + } |
| 260 | + if !exists { |
| 261 | + return false |
| 262 | + } |
| 263 | + |
| 264 | + service := serviceObj.(*kapi.Service) |
| 265 | + if secret.Annotations[ServiceUIDAnnotation] != string(service.UID) { |
| 266 | + return false |
| 267 | + } |
| 268 | + |
| 269 | + // if we don't have the annotation for expiry, just go ahead and regenerate. It's easier than writing a |
| 270 | + // secondary logic flow that creates the expiry dates |
| 271 | + expiryString, ok := secret.Annotations[ServingCertExpiryAnnotation] |
| 272 | + if !ok { |
| 273 | + return true |
| 274 | + } |
| 275 | + expiry, err := time.Parse(time.RFC3339, expiryString) |
| 276 | + if err != nil { |
| 277 | + return true |
| 278 | + } |
| 279 | + |
| 280 | + if time.Now().Add(sc.minTimeLeftForCert).After(expiry) { |
| 281 | + return true |
| 282 | + } |
| 283 | + |
| 284 | + return false |
| 285 | +} |
0 commit comments