Skip to content

Commit d364a6f

Browse files
authored
Merge pull request kubernetes-sigs#118 from pusher/leaderelection
Add leader election to controller manager
2 parents 9f3a942 + a58fb18 commit d364a6f

File tree

13 files changed

+1045
-75
lines changed

13 files changed

+1045
-75
lines changed

Gopkg.lock

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/leaderelection/doc.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
/*
18+
Package leaderelection contains a constructors for a leader election resource lock
19+
*/
20+
package leaderelection

pkg/leaderelection/fake/doc.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
/*
18+
Package fake mocks a resource lock for testing purposes.
19+
Always returns leadership.
20+
*/
21+
package fake
+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fake
18+
19+
import (
20+
"os"
21+
"time"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/util/uuid"
25+
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection/resourcelock"
27+
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
28+
"sigs.k8s.io/controller-runtime/pkg/recorder"
29+
)
30+
31+
// NewResourceLock creates a new ResourceLock for use in testing
32+
// leader election.
33+
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
34+
// Leader id, needs to be unique
35+
id, err := os.Hostname()
36+
if err != nil {
37+
return nil, err
38+
}
39+
id = id + "_" + string(uuid.NewUUID())
40+
41+
return &ResourceLock{
42+
id: id,
43+
record: resourcelock.LeaderElectionRecord{
44+
HolderIdentity: id,
45+
LeaseDurationSeconds: 15,
46+
AcquireTime: metav1.NewTime(time.Now()),
47+
RenewTime: metav1.NewTime(time.Now().Add(15 * time.Second)),
48+
LeaderTransitions: 1,
49+
},
50+
}, nil
51+
}
52+
53+
// ResourceLock implements the ResourceLockInterface.
54+
// By default returns that the current identity holds the lock.
55+
type ResourceLock struct {
56+
id string
57+
record resourcelock.LeaderElectionRecord
58+
}
59+
60+
// Get implements the ResourceLockInterface.
61+
func (f *ResourceLock) Get() (*resourcelock.LeaderElectionRecord, error) {
62+
return &f.record, nil
63+
}
64+
65+
// Create implements the ResourceLockInterface.
66+
func (f *ResourceLock) Create(ler resourcelock.LeaderElectionRecord) error {
67+
f.record = ler
68+
return nil
69+
}
70+
71+
// Update implements the ResourceLockInterface.
72+
func (f *ResourceLock) Update(ler resourcelock.LeaderElectionRecord) error {
73+
f.record = ler
74+
return nil
75+
}
76+
77+
// RecordEvent implements the ResourceLockInterface.
78+
func (f *ResourceLock) RecordEvent(s string) {
79+
return
80+
}
81+
82+
// Identity implements the ResourceLockInterface.
83+
func (f *ResourceLock) Identity() string {
84+
return f.id
85+
}
86+
87+
// Describe implements the ResourceLockInterface.
88+
func (f *ResourceLock) Describe() string {
89+
return f.id
90+
}

pkg/leaderelection/leader_election.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"fmt"
21+
"io/ioutil"
22+
"os"
23+
24+
"k8s.io/apimachinery/pkg/util/uuid"
25+
"k8s.io/client-go/kubernetes"
26+
"k8s.io/client-go/rest"
27+
"k8s.io/client-go/tools/leaderelection/resourcelock"
28+
"sigs.k8s.io/controller-runtime/pkg/recorder"
29+
)
30+
31+
const inClusterNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
32+
33+
// Options provides the required configuration to create a new resource lock
34+
type Options struct {
35+
// LeaderElection determines whether or not to use leader election when
36+
// starting the manager.
37+
LeaderElection bool
38+
39+
// LeaderElectionNamespace determines the namespace in which the leader
40+
// election configmap will be created.
41+
LeaderElectionNamespace string
42+
43+
// LeaderElectionID determines the name of the configmap that leader election
44+
// will use for holding the leader lock.
45+
LeaderElectionID string
46+
}
47+
48+
// NewResourceLock creates a new config map resource lock for use in a leader
49+
// election loop
50+
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error) {
51+
if !options.LeaderElection {
52+
return nil, nil
53+
}
54+
55+
// Default the LeaderElectionID
56+
if options.LeaderElectionID == "" {
57+
options.LeaderElectionID = "controller-leader-election-helper"
58+
}
59+
60+
// Default the namespace (if running in cluster)
61+
if options.LeaderElectionNamespace == "" {
62+
var err error
63+
options.LeaderElectionNamespace, err = getInClusterNamespace()
64+
if err != nil {
65+
return nil, fmt.Errorf("unable to find leader election namespace: %v", err)
66+
}
67+
}
68+
69+
// Leader id, needs to be unique
70+
id, err := os.Hostname()
71+
if err != nil {
72+
return nil, err
73+
}
74+
id = id + "_" + string(uuid.NewUUID())
75+
76+
// Construct client for leader election
77+
client, err := kubernetes.NewForConfig(config)
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
// TODO(JoelSpeed): switch to leaderelection object in 1.12
83+
return resourcelock.New(resourcelock.ConfigMapsResourceLock,
84+
options.LeaderElectionNamespace,
85+
options.LeaderElectionID,
86+
client.CoreV1(),
87+
resourcelock.ResourceLockConfig{
88+
Identity: id,
89+
EventRecorder: recorderProvider.GetEventRecorderFor(id),
90+
})
91+
}
92+
93+
func getInClusterNamespace() (string, error) {
94+
// Check whether the namespace file exists.
95+
// If not, we are not running in cluster so can't guess the namespace.
96+
_, err := os.Stat(inClusterNamespacePath)
97+
if os.IsNotExist(err) {
98+
return "", fmt.Errorf("not running in-cluster, please specify LeaderElectionNamespace")
99+
} else if err != nil {
100+
return "", fmt.Errorf("error checking namespace file: %v", err)
101+
}
102+
103+
// Load the namespace file and return itss content
104+
namespace, err := ioutil.ReadFile(inClusterNamespacePath)
105+
if err != nil {
106+
return "", fmt.Errorf("error reading namespace file: %v", err)
107+
}
108+
return string(namespace), nil
109+
}

pkg/manager/internal.go

+54-4
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ limitations under the License.
1717
package manager
1818

1919
import (
20+
"fmt"
2021
"sync"
22+
"time"
2123

2224
"k8s.io/apimachinery/pkg/runtime"
2325
"k8s.io/client-go/rest"
26+
"k8s.io/client-go/tools/leaderelection"
27+
"k8s.io/client-go/tools/leaderelection/resourcelock"
2428
"k8s.io/client-go/tools/record"
2529
"sigs.k8s.io/controller-runtime/pkg/cache"
2630
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -56,6 +60,9 @@ type controllerManager struct {
5660
// (and EventHandlers, Sources and Predicates).
5761
recorderProvider recorder.Provider
5862

63+
// resourceLock
64+
resourceLock resourcelock.Interface
65+
5966
mu sync.Mutex
6067
started bool
6168
errChan chan error
@@ -133,6 +140,52 @@ func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
133140
}
134141

135142
func (cm *controllerManager) Start(stop <-chan struct{}) error {
143+
if cm.resourceLock == nil {
144+
go cm.start(stop)
145+
select {
146+
case <-stop:
147+
// we are done
148+
return nil
149+
case err := <-cm.errChan:
150+
// Error starting a controller
151+
return err
152+
}
153+
}
154+
155+
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
156+
Lock: cm.resourceLock,
157+
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
158+
// TODO(joelspeed): These timings should be configurable
159+
LeaseDuration: 15 * time.Second,
160+
RenewDeadline: 10 * time.Second,
161+
RetryPeriod: 2 * time.Second,
162+
Callbacks: leaderelection.LeaderCallbacks{
163+
OnStartedLeading: cm.start,
164+
OnStoppedLeading: func() {
165+
// Most implementations of leader election log.Fatal() here.
166+
// Since Start is wrapped in log.Fatal when called, we can just return
167+
// an error here which will cause the program to exit.
168+
cm.errChan <- fmt.Errorf("leader election lost")
169+
},
170+
},
171+
})
172+
if err != nil {
173+
return err
174+
}
175+
176+
go l.Run()
177+
178+
select {
179+
case <-stop:
180+
// We are done
181+
return nil
182+
case err := <-cm.errChan:
183+
// Error starting a controller
184+
return err
185+
}
186+
}
187+
188+
func (cm *controllerManager) start(stop <-chan struct{}) {
136189
func() {
137190
cm.mu.Lock()
138191
defer cm.mu.Unlock()
@@ -169,9 +222,6 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
169222
select {
170223
case <-stop:
171224
// We are done
172-
return nil
173-
case err := <-cm.errChan:
174-
// Error starting a controller
175-
return err
225+
return
176226
}
177227
}

0 commit comments

Comments
 (0)