@@ -38,8 +38,7 @@ import (
38
38
"github.com/pkg/errors"
39
39
"github.com/spf13/viper"
40
40
"golang.org/x/sync/errgroup"
41
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
- "k8s.io/apimachinery/pkg/labels"
41
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
43
42
"k8s.io/apimachinery/pkg/util/wait"
44
43
"k8s.io/client-go/kubernetes"
45
44
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
@@ -66,7 +65,6 @@ const (
66
65
defaultCNIConfigPath = "/etc/cni/net.d/k8s.conf"
67
66
kubeletServiceFile = "/lib/systemd/system/kubelet.service"
68
67
kubeletSystemdConfFile = "/etc/systemd/system/kubelet.service.d/10-kubeadm.conf"
69
- AllPods = "ALL_PODS"
70
68
)
71
69
72
70
const (
@@ -96,22 +94,6 @@ var KubeadmExtraArgsWhitelist = map[int][]string{
96
94
},
97
95
}
98
96
99
- type pod struct {
100
- // Human friendly name
101
- name string
102
- key string
103
- value string
104
- }
105
-
106
- // PodsByLayer are queries we run when health checking, sorted roughly by dependency layer
107
- var PodsByLayer = []pod {
108
- {"proxy" , "k8s-app" , "kube-proxy" },
109
- {"etcd" , "component" , "etcd" },
110
- {"scheduler" , "component" , "kube-scheduler" },
111
- {"controller" , "component" , "kube-controller-manager" },
112
- {"dns" , "k8s-app" , "kube-dns" },
113
- }
114
-
115
97
// yamlConfigPath is the path to the kubeadm configuration
116
98
var yamlConfigPath = path .Join (vmpath .GuestEphemeralDir , "kubeadm.yaml" )
117
99
@@ -166,12 +148,13 @@ func (k *Bootstrapper) GetAPIServerStatus(ip net.IP, apiserverPort int) (string,
166
148
}
167
149
client := & http.Client {Transport : tr }
168
150
resp , err := client .Get (url )
169
- glog .Infof ("%s response: %v %+v" , url , err , resp )
170
151
// Connection refused, usually.
171
152
if err != nil {
153
+ glog .Warningf ("%s response: %v %+v" , url , err , resp )
172
154
return state .Stopped .String (), nil
173
155
}
174
156
if resp .StatusCode != http .StatusOK {
157
+ glog .Warningf ("%s response: %v %+v" , url , err , resp )
175
158
return state .Error .String (), nil
176
159
}
177
160
return state .Running .String (), nil
@@ -354,7 +337,6 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
354
337
355
338
// client returns a Kubernetes client to use to speak to a kubeadm launched apiserver
356
339
func (k * Bootstrapper ) client (k8s config.KubernetesConfig ) (* kubernetes.Clientset , error ) {
357
- // Catch case if WaitForPods was called with a stale ~/.kube/config
358
340
config , err := kapi .ClientConfig (k .contextName )
359
341
if err != nil {
360
342
return nil , errors .Wrap (err , "client config" )
@@ -369,67 +351,104 @@ func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientse
369
351
return kubernetes .NewForConfig (config )
370
352
}
371
353
372
- // WaitForPods blocks until pods specified in podsToWaitFor appear to be healthy.
373
- func (k * Bootstrapper ) WaitForPods (k8s config.KubernetesConfig , timeout time.Duration , podsToWaitFor []string ) error {
374
- // Do not wait for "k8s-app" pods in the case of CNI, as they are managed
375
- // by a CNI plugin which is usually started after minikube has been brought
376
- // up. Otherwise, minikube won't start, as "k8s-app" pods are not ready.
377
- componentsOnly := k8s .NetworkPlugin == "cni"
378
- out .T (out .WaitingPods , "Waiting for:" )
379
-
380
- // Wait until the apiserver can answer queries properly. We don't care if the apiserver
381
- // pod shows up as registered, but need the webserver for all subsequent queries.
382
-
383
- if shouldWaitForPod ("apiserver" , podsToWaitFor ) {
384
- out .String (" apiserver" )
385
- if err := k .waitForAPIServer (k8s ); err != nil {
386
- return errors .Wrap (err , "waiting for apiserver" )
354
+ func (k * Bootstrapper ) waitForAPIServerProcess (start time.Time , timeout time.Duration ) error {
355
+ glog .Infof ("waiting for apiserver process to appear ..." )
356
+ err := wait .PollImmediate (time .Second * 1 , timeout , func () (bool , error ) {
357
+ if time .Since (start ) > timeout {
358
+ return false , fmt .Errorf ("cluster wait timed out during process check" )
387
359
}
360
+ rr , ierr := k .c .RunCmd (exec .Command ("sudo" , "pgrep" , "kube-apiserver" ))
361
+ if ierr != nil {
362
+ glog .Warningf ("pgrep apiserver: %v cmd: %s" , ierr , rr .Command ())
363
+ return false , nil
364
+ }
365
+ return true , nil
366
+ })
367
+ if err != nil {
368
+ return fmt .Errorf ("apiserver process never appeared" )
388
369
}
370
+ glog .Infof ("duration metric: took %s to wait for apiserver process to appear ..." , time .Since (start ))
371
+ return nil
372
+ }
373
+
374
+ func (k * Bootstrapper ) waitForAPIServerHealthz (start time.Time , k8s config.KubernetesConfig , timeout time.Duration ) error {
375
+ glog .Infof ("waiting for apiserver healthz status ..." )
376
+ hStart := time .Now ()
377
+ healthz := func () (bool , error ) {
378
+ if time .Since (start ) > timeout {
379
+ return false , fmt .Errorf ("cluster wait timed out during healthz check" )
380
+ }
389
381
382
+ status , err := k .GetAPIServerStatus (net .ParseIP (k8s .NodeIP ), k8s .NodePort )
383
+ if err != nil {
384
+ glog .Warningf ("status: %v" , err )
385
+ return false , nil
386
+ }
387
+ if status != "Running" {
388
+ return false , nil
389
+ }
390
+ return true , nil
391
+ }
392
+
393
+ if err := wait .PollImmediate (kconst .APICallRetryInterval , kconst .DefaultControlPlaneTimeout , healthz ); err != nil {
394
+ return fmt .Errorf ("apiserver healthz never reported healthy" )
395
+ }
396
+ glog .Infof ("duration metric: took %s to wait for apiserver healthz status ..." , time .Since (hStart ))
397
+ return nil
398
+ }
399
+
400
+ func (k * Bootstrapper ) waitForSystemPods (start time.Time , k8s config.KubernetesConfig , timeout time.Duration ) error {
401
+ glog .Infof ("waiting for kube-system pods to appear ..." )
402
+ pStart := time .Now ()
390
403
client , err := k .client (k8s )
391
404
if err != nil {
392
405
return errors .Wrap (err , "client" )
393
406
}
394
407
395
- for _ , p := range PodsByLayer {
396
- if componentsOnly && p .key != "component" { // skip component check if network plugin is cni
397
- continue
408
+ podStart := time.Time {}
409
+ podList := func () (bool , error ) {
410
+ if time .Since (start ) > timeout {
411
+ return false , fmt .Errorf ("cluster wait timed out during pod check" )
412
+ }
413
+ // Wait for any system pod, as waiting for apiserver may block until etcd
414
+ pods , err := client .CoreV1 ().Pods ("kube-system" ).List (meta.ListOptions {})
415
+ if len (pods .Items ) < 2 {
416
+ podStart = time.Time {}
417
+ return false , nil
418
+ }
419
+ if err != nil {
420
+ podStart = time.Time {}
421
+ return false , nil
398
422
}
399
- if ! shouldWaitForPod ( p . name , podsToWaitFor ) {
400
- continue
423
+ if podStart . IsZero ( ) {
424
+ podStart = time . Now ()
401
425
}
402
- out .String (" %s" , p .name )
403
- selector := labels .SelectorFromSet (labels .Set (map [string ]string {p .key : p .value }))
404
- if err := kapi .WaitForPodsWithLabelRunning (client , "kube-system" , selector , timeout ); err != nil {
405
- return errors .Wrap (err , fmt .Sprintf ("waiting for %s=%s" , p .key , p .value ))
426
+
427
+ glog .Infof ("%d kube-system pods found since %s" , len (pods .Items ), podStart )
428
+ if time .Since (podStart ) > 2 * kconst .APICallRetryInterval {
429
+ glog .Infof ("stability requirement met, returning" )
430
+ return true , nil
406
431
}
432
+ return false , nil
433
+ }
434
+ if err = wait .PollImmediate (kconst .APICallRetryInterval , kconst .DefaultControlPlaneTimeout , podList ); err != nil {
435
+ return fmt .Errorf ("apiserver never returned a pod list" )
407
436
}
408
- out . Ln ( "" )
437
+ glog . Infof ( "duration metric: took %s to wait for pod list to return data ..." , time . Since ( pStart ) )
409
438
return nil
410
439
}
411
440
412
- // shouldWaitForPod returns true if:
413
- // 1. podsToWaitFor is nil
414
- // 2. name is in podsToWaitFor
415
- // 3. ALL_PODS is in podsToWaitFor
416
- // else, return false
417
- func shouldWaitForPod (name string , podsToWaitFor []string ) bool {
418
- if podsToWaitFor == nil {
419
- return true
420
- }
421
- if len (podsToWaitFor ) == 0 {
422
- return false
423
- }
424
- for _ , p := range podsToWaitFor {
425
- if p == AllPods {
426
- return true
427
- }
428
- if p == name {
429
- return true
430
- }
441
+ // WaitForCluster blocks until the cluster appears to be healthy
442
+ func (k * Bootstrapper ) WaitForCluster (k8s config.KubernetesConfig , timeout time.Duration ) error {
443
+ start := time .Now ()
444
+ out .T (out .Waiting , "Waiting for cluster to come online ..." )
445
+ if err := k .waitForAPIServerProcess (start , timeout ); err != nil {
446
+ return err
431
447
}
432
- return false
448
+ if err := k .waitForAPIServerHealthz (start , k8s , timeout ); err != nil {
449
+ return err
450
+ }
451
+ return k .waitForSystemPods (start , k8s , timeout )
433
452
}
434
453
435
454
// RestartCluster restarts the Kubernetes cluster configured by kubeadm
@@ -472,11 +491,15 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
472
491
}
473
492
}
474
493
475
- if err := k .waitForAPIServer (k8s ); err != nil {
476
- return errors .Wrap (err , "waiting for apiserver" )
494
+ // We must ensure that the apiserver is healthy before proceeding
495
+ if err := k .waitForAPIServerHealthz (time .Now (), k8s , kconst .DefaultControlPlaneTimeout ); err != nil {
496
+ return errors .Wrap (err , "apiserver healthz" )
497
+ }
498
+ if err := k .waitForSystemPods (time .Now (), k8s , kconst .DefaultControlPlaneTimeout ); err != nil {
499
+ return errors .Wrap (err , "system pods" )
477
500
}
478
501
479
- // restart the proxy and coredns
502
+ // Explicitly re-enable kubeadm addons (proxy, coredns) so that they will check for IP or configuration changes.
480
503
if rr , err := k .c .RunCmd (exec .Command ("/bin/bash" , "-c" , fmt .Sprintf ("%s phase addon all --config %s" , baseCmd , yamlConfigPath ))); err != nil {
481
504
return errors .Wrapf (err , fmt .Sprintf ("addon phase cmd:%q" , rr .Command ()))
482
505
}
@@ -487,63 +510,6 @@ func (k *Bootstrapper) RestartCluster(k8s config.KubernetesConfig) error {
487
510
return nil
488
511
}
489
512
490
- // waitForAPIServer waits for the apiserver to start up
491
- func (k * Bootstrapper ) waitForAPIServer (k8s config.KubernetesConfig ) error {
492
- start := time .Now ()
493
- defer func () {
494
- glog .Infof ("duration metric: took %s to wait for apiserver status ..." , time .Since (start ))
495
- }()
496
-
497
- glog .Infof ("Waiting for apiserver process ..." )
498
- // To give a better error message, first check for process existence via ssh
499
- // Needs minutes in case the image isn't cached (such as with v1.10.x)
500
- err := wait .PollImmediate (time .Millisecond * 300 , time .Minute * 3 , func () (bool , error ) {
501
- rr , ierr := k .c .RunCmd (exec .Command ("sudo" , "pgrep" , "kube-apiserver" ))
502
- if ierr != nil {
503
- glog .Warningf ("pgrep apiserver: %v cmd: %s" , ierr , rr .Command ())
504
- return false , nil
505
- }
506
- return true , nil
507
- })
508
- if err != nil {
509
- return fmt .Errorf ("apiserver process never appeared" )
510
- }
511
-
512
- glog .Infof ("Waiting for apiserver to port healthy status ..." )
513
- var client * kubernetes.Clientset
514
- f := func () (bool , error ) {
515
- status , err := k .GetAPIServerStatus (net .ParseIP (k8s .NodeIP ), k8s .NodePort )
516
- glog .Infof ("apiserver status: %s, err: %v" , status , err )
517
- if err != nil {
518
- glog .Warningf ("status: %v" , err )
519
- return false , nil
520
- }
521
- if status != "Running" {
522
- return false , nil
523
- }
524
- // Make sure apiserver pod is retrievable
525
- if client == nil {
526
- // We only want to get the clientset once, because this line takes ~1 second to complete
527
- client , err = k .client (k8s )
528
- if err != nil {
529
- glog .Warningf ("get kubernetes client: %v" , err )
530
- return false , nil
531
- }
532
- }
533
-
534
- _ , err = client .CoreV1 ().Pods ("kube-system" ).Get ("kube-apiserver-minikube" , metav1.GetOptions {})
535
- if err != nil {
536
- return false , nil
537
- }
538
-
539
- return true , nil
540
- // TODO: Check apiserver/kubelet logs for fatal errors so that users don't
541
- // need to wait minutes to find out their flag didn't work.
542
- }
543
- err = wait .PollImmediate (kconst .APICallRetryInterval , 2 * kconst .DefaultControlPlaneTimeout , f )
544
- return err
545
- }
546
-
547
513
// DeleteCluster removes the components that were started earlier
548
514
func (k * Bootstrapper ) DeleteCluster (k8s config.KubernetesConfig ) error {
549
515
version , err := parseKubernetesVersion (k8s .KubernetesVersion )
0 commit comments