Skip to content

Commit f9b8039

Browse files
authored
Merge pull request #5348 from tstromberg/ncontext
Integration de-flake: expand lock scopes, sync clock at creation
2 parents df9d811 + 1d346a0 commit f9b8039

File tree

7 files changed

+81
-42
lines changed

7 files changed

+81
-42
lines changed

pkg/minikube/bootstrapper/certs.go

+19-14
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,26 @@ var (
5757

5858
// SetupCerts gets the generated credentials required to talk to the APIServer.
5959
func SetupCerts(cmd command.Runner, k8s config.KubernetesConfig) error {
60+
// WARNING: This function was not designed for multiple profiles, so it is VERY racey:
61+
//
62+
// It updates a shared certificate file and uploads it to the apiserver before launch.
63+
//
64+
// If another process updates the shared certificate, it's invalid.
65+
// TODO: Instead of racey manipulation of a shared certificate, use per-profile certs
66+
spec := mutex.Spec{
67+
Name: "setupCerts",
68+
Clock: clock.WallClock,
69+
Delay: 15 * time.Second,
70+
}
71+
glog.Infof("acquiring lock: %+v", spec)
72+
releaser, err := mutex.Acquire(spec)
73+
if err != nil {
74+
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
75+
}
76+
defer releaser.Release()
77+
6078
localPath := constants.GetMinipath()
61-
glog.Infof("Setting up certificates for IP: %s\n", k8s.NodeIP)
79+
glog.Infof("Setting up %s for IP: %s\n", localPath, k8s.NodeIP)
6280

6381
if err := generateCerts(k8s); err != nil {
6482
return errors.Wrap(err, "Error generating certs")
@@ -126,19 +144,6 @@ func SetupCerts(cmd command.Runner, k8s config.KubernetesConfig) error {
126144
}
127145

128146
func generateCerts(k8s config.KubernetesConfig) error {
129-
// TODO: Instead of racey manipulation of a shared certificate, use per-profile certs
130-
spec := mutex.Spec{
131-
Name: "generateCerts",
132-
Clock: clock.WallClock,
133-
Delay: 10 * time.Second,
134-
}
135-
glog.Infof("acquiring lock: %+v", spec)
136-
releaser, err := mutex.Acquire(spec)
137-
if err != nil {
138-
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
139-
}
140-
defer releaser.Release()
141-
142147
serviceIP, err := util.GetServiceClusterIP(k8s.ServiceCIDR)
143148
if err != nil {
144149
return errors.Wrap(err, "getting service cluster ip")

pkg/minikube/bootstrapper/kubeadm/kubeadm.go

+39-15
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,12 @@ func (k *Bootstrapper) createCompatSymlinks() error {
228228

229229
// StartCluster starts the cluster
230230
func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
231+
start := time.Now()
232+
glog.Infof("StartCluster: %+v", k8s)
233+
defer func() {
234+
glog.Infof("StartCluster complete in %s", time.Since(start))
235+
}()
236+
231237
version, err := parseKubernetesVersion(k8s.KubernetesVersion)
232238
if err != nil {
233239
return errors.Wrap(err, "parsing kubernetes version")
@@ -266,7 +272,15 @@ func (k *Bootstrapper) StartCluster(k8s config.KubernetesConfig) error {
266272

267273
glog.Infof("Configuring cluster permissions ...")
268274

269-
if err := retry.Expo(elevateKubeSystemPrivileges, time.Millisecond*500, 60*time.Second); err != nil {
275+
elevate := func() error {
276+
client, err := k.client(k8s)
277+
if err != nil {
278+
return err
279+
}
280+
return elevateKubeSystemPrivileges(client)
281+
}
282+
283+
if err := retry.Expo(elevate, time.Millisecond*500, 120*time.Second); err != nil {
270284
return errors.Wrap(err, "timed out waiting to elevate kube-system RBAC privileges")
271285
}
272286

@@ -326,6 +340,23 @@ func addAddons(files *[]assets.CopyableFile, data interface{}) error {
326340
return nil
327341
}
328342

343+
// client returns a Kubernetes client to use to speak to a kubeadm launched apiserver
344+
func (k *Bootstrapper) client(k8s config.KubernetesConfig) (*kubernetes.Clientset, error) {
345+
// Catch case if WaitCluster was called with a stale ~/.kube/config
346+
config, err := kapi.ClientConfig(k.contextName)
347+
if err != nil {
348+
return nil, errors.Wrap(err, "client config")
349+
}
350+
351+
endpoint := fmt.Sprintf("https://%s:%d", k8s.NodeIP, k8s.NodePort)
352+
if config.Host != endpoint {
353+
glog.Errorf("Overriding stale ClientConfig host %s with %s", config.Host, endpoint)
354+
config.Host = endpoint
355+
}
356+
357+
return kubernetes.NewForConfig(config)
358+
}
359+
329360
// WaitCluster blocks until Kubernetes appears to be healthy.
330361
func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig, timeout time.Duration) error {
331362
// Do not wait for "k8s-app" pods in the case of CNI, as they are managed
@@ -341,22 +372,11 @@ func (k *Bootstrapper) WaitCluster(k8s config.KubernetesConfig, timeout time.Dur
341372
return errors.Wrap(err, "waiting for apiserver")
342373
}
343374

344-
// Catch case if WaitCluster was called with a stale ~/.kube/config
345-
config, err := kapi.ClientConfig(k.contextName)
375+
client, err := k.client(k8s)
346376
if err != nil {
347-
return errors.Wrap(err, "client config")
348-
}
349-
350-
endpoint := fmt.Sprintf("https://%s:%d", k8s.NodeIP, k8s.NodePort)
351-
if config.Host != endpoint {
352-
glog.Errorf("Overriding stale ClientConfig host %s with %s", config.Host, endpoint)
353-
config.Host = endpoint
377+
return errors.Wrap(err, "client")
354378
}
355379

356-
client, err := kubernetes.NewForConfig(config)
357-
if err != nil {
358-
return errors.Wrap(err, "k8s client")
359-
}
360380
for _, p := range PodsByLayer {
361381
if componentsOnly && p.key != "component" { // skip component check if network plugin is cni
362382
continue
@@ -458,8 +478,12 @@ func (k *Bootstrapper) waitForAPIServer(k8s config.KubernetesConfig) error {
458478
return false, nil
459479
}
460480
return true, nil
481+
482+
// TODO: Check apiserver/kubelet logs for fatal errors so that users don't
483+
// need to wait minutes to find out their flag didn't work.
484+
461485
}
462-
err = wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, f)
486+
err = wait.PollImmediate(kconst.APICallRetryInterval, 2*kconst.DefaultControlPlaneTimeout, f)
463487
return err
464488
}
465489

pkg/minikube/bootstrapper/kubeadm/util.go

+3-10
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ import (
2424
"github.com/pkg/errors"
2525
rbac "k8s.io/api/rbac/v1beta1"
2626
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
27-
"k8s.io/minikube/pkg/minikube/constants"
28-
"k8s.io/minikube/pkg/minikube/service"
27+
"k8s.io/client-go/kubernetes"
2928
"k8s.io/minikube/pkg/util/retry"
3029
)
3130

@@ -35,13 +34,8 @@ const (
3534

3635
// elevateKubeSystemPrivileges gives the kube-system service account
3736
// cluster admin privileges to work with RBAC.
38-
func elevateKubeSystemPrivileges() error {
37+
func elevateKubeSystemPrivileges(client kubernetes.Interface) error {
3938
start := time.Now()
40-
k8s := service.K8s
41-
client, err := k8s.GetClientset(constants.DefaultK8sClientTimeout)
42-
if err != nil {
43-
return errors.Wrap(err, "getting clientset")
44-
}
4539
clusterRoleBinding := &rbac.ClusterRoleBinding{
4640
ObjectMeta: meta.ObjectMeta{
4741
Name: rbacName,
@@ -63,8 +57,7 @@ func elevateKubeSystemPrivileges() error {
6357
glog.Infof("Role binding %s already exists. Skipping creation.", rbacName)
6458
return nil
6559
}
66-
_, err = client.RbacV1beta1().ClusterRoleBindings().Create(clusterRoleBinding)
67-
if err != nil {
60+
if _, err := client.RbacV1beta1().ClusterRoleBindings().Create(clusterRoleBinding); err != nil {
6861
netErr, ok := err.(net.Error)
6962
if ok && netErr.Timeout() {
7063
return &retry.RetriableError{Err: errors.Wrap(err, "creating clusterrolebinding")}

pkg/minikube/cluster/cluster.go

+5
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,11 @@ func createHost(api libmachine.API, config cfg.MachineConfig) (*host.Host, error
463463

464464
if !localDriver(config.VMDriver) {
465465
showRemoteOsRelease(h.Driver)
466+
// Ensure that even new VM's have proper time synchronization up front
467+
// It's 2019, and I can't believe I am still dealing with time desync as a problem.
468+
if err := ensureSyncedGuestClock(h); err != nil {
469+
return h, err
470+
}
466471
} else {
467472
showLocalOsRelease()
468473
}

pkg/minikube/cluster/cluster_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func TestCreateHost(t *testing.T) {
7676
if exists {
7777
t.Fatal("Machine already exists.")
7878
}
79+
7980
_, err := createHost(api, defaultMachineConfig)
8081
if err != nil {
8182
t.Fatalf("Error creating host: %v", err)
@@ -215,7 +216,7 @@ func TestStartHostConfig(t *testing.T) {
215216
provision.SetDetector(md)
216217

217218
config := config.MachineConfig{
218-
VMDriver: constants.DefaultVMDriver,
219+
VMDriver: constants.DriverMock,
219220
DockerEnv: []string{"FOO=BAR"},
220221
DockerOpt: []string{"param=value"},
221222
Downloader: MockDownloader{},

pkg/minikube/config/profile.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ func CreateProfile(name string, cfg *Config, miniHome ...string) error {
6262
if err != nil {
6363
return err
6464
}
65-
glog.Infof("Saving config:\n%s", data)
6665
path := profileFilePath(name, miniHome...)
66+
glog.Infof("Saving config to %s ...", path)
6767
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
6868
return err
6969
}

pkg/minikube/kubeconfig/settings.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ package kubeconfig
1919
import (
2020
"io/ioutil"
2121
"sync/atomic"
22+
"time"
2223

2324
"github.com/golang/glog"
25+
"github.com/juju/clock"
26+
"github.com/juju/mutex"
2427
"github.com/pkg/errors"
2528
"k8s.io/client-go/tools/clientcmd/api"
2629
)
@@ -116,9 +119,17 @@ func PopulateFromSettings(cfg *Settings, apiCfg *api.Config) error {
116119
// activeContext is true when minikube is the CurrentContext
117120
// If no CurrentContext is set, the given name will be used.
118121
func Update(kcs *Settings) error {
119-
glog.Infoln("Using kubeconfig: ", kcs.filePath())
122+
// Add a lock around both the read, update, and write operations
123+
spec := mutex.Spec{Name: "kubeconfigUpdate", Clock: clock.WallClock, Delay: 10 * time.Second}
124+
glog.Infof("acquiring lock: %+v", spec)
125+
releaser, err := mutex.Acquire(spec)
126+
if err != nil {
127+
return errors.Wrapf(err, "unable to acquire lock for %+v", spec)
128+
}
129+
defer releaser.Release()
120130

121131
// read existing config or create new if does not exist
132+
glog.Infoln("Updating kubeconfig: ", kcs.filePath())
122133
kcfg, err := readOrNew(kcs.filePath())
123134
if err != nil {
124135
return err

0 commit comments

Comments
 (0)