Skip to content

Commit eb8c46c

Browse files
committed
add initial CSI leader election library
1 parent 690c21a commit eb8c46c

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

Diff for: leaderelection/leader_election.go

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
"regexp"
24+
"time"
25+
26+
"k8s.io/api/core/v1"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/kubernetes/scheme"
29+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
30+
"k8s.io/client-go/tools/leaderelection"
31+
"k8s.io/client-go/tools/leaderelection/resourcelock"
32+
"k8s.io/client-go/tools/record"
33+
"k8s.io/klog"
34+
)
35+
36+
const (
37+
defaultLeaseDuration = 15 * time.Second
38+
defaultRenewDeadline = 10 * time.Second
39+
defaultRetryPeriod = 5 * time.Second
40+
)
41+
42+
// leaderElection is a convenience wrapper around client-go's leader election library.
43+
type leaderElection struct {
44+
runFunc func(ctx context.Context)
45+
46+
// the lockName identifies the leader election config and should be shared across all members
47+
lockName string
48+
// the identity is the unique identity of the currently running member
49+
identity string
50+
// the namespace to store the lock resource
51+
namespace string
52+
53+
leaseDuration time.Duration
54+
renewDeadline time.Duration
55+
retryPeriod time.Duration
56+
57+
clientset kubernetes.Interface
58+
}
59+
60+
// NewLeaderElection returns the default & preferred leader election type
61+
func NewLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
62+
return &leaderElection{
63+
runFunc: runFunc,
64+
lockName: lockName,
65+
namespace: lockNamespace,
66+
leaseDuration: defaultLeaseDuration,
67+
renewDeadline: defaultRenewDeadline,
68+
retryPeriod: defaultRetryPeriod,
69+
clientset: clientset,
70+
}
71+
}
72+
73+
func (l *leaderElection) WithIdentity(identity string) {
74+
l.identity = identity
75+
}
76+
77+
func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) {
78+
l.leaseDuration = leaseDuration
79+
}
80+
81+
func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) {
82+
l.renewDeadline = renewDeadline
83+
}
84+
85+
func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) {
86+
l.retryPeriod = retryPeriod
87+
}
88+
89+
func (l *leaderElection) Run() error {
90+
if l.identity == "" {
91+
id, err := defaultLeaderElectionIdentity()
92+
if err != nil {
93+
return fmt.Errorf("error getting the default leader identity: %v", err)
94+
}
95+
96+
l.identity = id
97+
}
98+
99+
broadcaster := record.NewBroadcaster()
100+
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
101+
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))})
102+
103+
rlConfig := resourcelock.ResourceLockConfig{
104+
Identity: sanitizeName(l.identity),
105+
EventRecorder: eventRecorder,
106+
}
107+
108+
lock, err := resourcelock.New(resourcelock.LeasesResourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig)
109+
if err != nil {
110+
return err
111+
}
112+
113+
leaderConfig := leaderelection.LeaderElectionConfig{
114+
Lock: lock,
115+
LeaseDuration: l.leaseDuration,
116+
RenewDeadline: l.renewDeadline,
117+
RetryPeriod: l.retryPeriod,
118+
Callbacks: leaderelection.LeaderCallbacks{
119+
OnStartedLeading: func(ctx context.Context) {
120+
klog.V(2).Info("became leader, starting")
121+
l.runFunc(ctx)
122+
},
123+
OnStoppedLeading: func() {
124+
klog.Fatal("stopped leading")
125+
},
126+
OnNewLeader: func(identity string) {
127+
klog.V(3).Infof("new leader detected, current leader: %s", identity)
128+
},
129+
},
130+
}
131+
132+
leaderelection.RunOrDie(context.TODO(), leaderConfig)
133+
return nil // should never reach here
134+
}
135+
136+
func defaultLeaderElectionIdentity() (string, error) {
137+
return os.Hostname()
138+
}
139+
140+
// sanitizeName sanitizes the provided string so it can be consumed by leader election library
141+
func sanitizeName(name string) string {
142+
re := regexp.MustCompile("[^a-zA-Z0-9-]")
143+
name = re.ReplaceAllString(name, "-")
144+
if name[len(name)-1] == '-' {
145+
// name must not end with '-'
146+
name = name + "X"
147+
}
148+
return name
149+
}

Diff for: leaderelection/leader_election_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"testing"
21+
)
22+
23+
func Test_sanitizeName(t *testing.T) {
24+
tests := []struct {
25+
name string
26+
input string
27+
output string
28+
}{
29+
{
30+
"requires no change",
31+
"test-driver",
32+
"test-driver",
33+
},
34+
{
35+
"has characters that should be replaced",
36+
"test!driver/foo",
37+
"test-driver-foo",
38+
},
39+
{
40+
"has trailing space",
41+
"driver\\",
42+
"driver-X",
43+
},
44+
}
45+
46+
for _, test := range tests {
47+
t.Run(test.name, func(t *testing.T) {
48+
output := sanitizeName(test.input)
49+
if output != test.output {
50+
t.Logf("expected name: %q", test.output)
51+
t.Logf("actual name: %q", output)
52+
t.Errorf("unexpected santized name")
53+
}
54+
})
55+
}
56+
}

0 commit comments

Comments
 (0)