Skip to content

Commit 3feeece

Browse files
committed
add initial CSI leader election library
1 parent 5f02458 commit 3feeece

File tree

2 files changed

+243
-0
lines changed

2 files changed

+243
-0
lines changed

Diff for: leaderelection/leader_election.go

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
"regexp"
24+
"time"
25+
26+
"k8s.io/api/core/v1"
27+
"k8s.io/client-go/kubernetes"
28+
"k8s.io/client-go/kubernetes/scheme"
29+
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
30+
"k8s.io/client-go/tools/leaderelection"
31+
"k8s.io/client-go/tools/leaderelection/resourcelock"
32+
"k8s.io/client-go/tools/record"
33+
"k8s.io/klog"
34+
)
35+
36+
const (
37+
defaultLeaseDuration = 15 * time.Second
38+
defaultRenewDeadline = 10 * time.Second
39+
defaultRetryPeriod = 5 * time.Second
40+
)
41+
42+
// leaderElection is a convenience wrapper around client-go's leader election library.
43+
type leaderElection struct {
44+
runFunc func(ctx context.Context)
45+
46+
// the lockName identifies the leader election config and should be shared across all members
47+
lockName string
48+
// the identity is the unique identity of the currently running member
49+
identity string
50+
// the namespace to store the lock resource
51+
namespace string
52+
// resourceLock defines the type of leaderelection that should be used
53+
// valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock,
54+
// and resourcelock.ConfigMapsResourceLock
55+
resourceLock string
56+
57+
leaseDuration time.Duration
58+
renewDeadline time.Duration
59+
retryPeriod time.Duration
60+
61+
clientset kubernetes.Interface
62+
}
63+
64+
// NewLeaderElection returns the default & preferred leader election type
65+
func NewLeaderElection(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
66+
return NewLeaderElectionWithLeases(clientset, lockName, lockNamespace, runFunc)
67+
}
68+
69+
// NewLeaderElectionWithLeases returns an implementation of leader election using Leases
70+
func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
71+
return &leaderElection{
72+
runFunc: runFunc,
73+
lockName: lockName,
74+
namespace: lockNamespace,
75+
resourceLock: resourcelock.LeasesResourceLock,
76+
leaseDuration: defaultLeaseDuration,
77+
renewDeadline: defaultRenewDeadline,
78+
retryPeriod: defaultRetryPeriod,
79+
clientset: clientset,
80+
}
81+
}
82+
83+
// NewLeaderElectionWithEndpoints returns an implementation of leader election using Endpoints
84+
func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
85+
return &leaderElection{
86+
runFunc: runFunc,
87+
lockName: lockName,
88+
namespace: lockNamespace,
89+
resourceLock: resourcelock.EndpointsResourceLock,
90+
leaseDuration: defaultLeaseDuration,
91+
renewDeadline: defaultRenewDeadline,
92+
retryPeriod: defaultRetryPeriod,
93+
clientset: clientset,
94+
}
95+
}
96+
97+
// NewLeaderElectionWithConfigMaps returns an implementation of leader election using ConfigMaps
98+
func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName, lockNamespace string, runFunc func(ctx context.Context)) *leaderElection {
99+
return &leaderElection{
100+
runFunc: runFunc,
101+
lockName: lockName,
102+
namespace: lockNamespace,
103+
resourceLock: resourcelock.ConfigMapsResourceLock,
104+
leaseDuration: defaultLeaseDuration,
105+
renewDeadline: defaultRenewDeadline,
106+
retryPeriod: defaultRetryPeriod,
107+
clientset: clientset,
108+
}
109+
}
110+
111+
func (l *leaderElection) WithIdentity(identity string) {
112+
l.identity = identity
113+
}
114+
115+
func (l *leaderElection) WithLeaseDuration(leaseDuration time.Duration) {
116+
l.leaseDuration = leaseDuration
117+
}
118+
119+
func (l *leaderElection) WithRenewDeadline(renewDeadline time.Duration) {
120+
l.renewDeadline = renewDeadline
121+
}
122+
123+
func (l *leaderElection) WithRetryPeriod(retryPeriod time.Duration) {
124+
l.retryPeriod = retryPeriod
125+
}
126+
127+
func (l *leaderElection) Run() error {
128+
if l.identity == "" {
129+
id, err := defaultLeaderElectionIdentity()
130+
if err != nil {
131+
return fmt.Errorf("error getting the default leader identity: %v", err)
132+
}
133+
134+
l.identity = id
135+
}
136+
137+
broadcaster := record.NewBroadcaster()
138+
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: l.clientset.CoreV1().Events(l.namespace)})
139+
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("%s/%s", l.lockName, string(l.identity))})
140+
141+
rlConfig := resourcelock.ResourceLockConfig{
142+
Identity: sanitizeName(l.identity),
143+
EventRecorder: eventRecorder,
144+
}
145+
146+
lock, err := resourcelock.New(l.resourceLock, l.namespace, sanitizeName(l.lockName), l.clientset.CoreV1(), l.clientset.CoordinationV1(), rlConfig)
147+
if err != nil {
148+
return err
149+
}
150+
151+
leaderConfig := leaderelection.LeaderElectionConfig{
152+
Lock: lock,
153+
LeaseDuration: l.leaseDuration,
154+
RenewDeadline: l.renewDeadline,
155+
RetryPeriod: l.retryPeriod,
156+
Callbacks: leaderelection.LeaderCallbacks{
157+
OnStartedLeading: func(ctx context.Context) {
158+
klog.V(2).Info("became leader, starting")
159+
l.runFunc(ctx)
160+
},
161+
OnStoppedLeading: func() {
162+
klog.Fatal("stopped leading")
163+
},
164+
OnNewLeader: func(identity string) {
165+
klog.V(3).Infof("new leader detected, current leader: %s", identity)
166+
},
167+
},
168+
}
169+
170+
leaderelection.RunOrDie(context.TODO(), leaderConfig)
171+
return nil // should never reach here
172+
}
173+
174+
func defaultLeaderElectionIdentity() (string, error) {
175+
return os.Hostname()
176+
}
177+
178+
// sanitizeName sanitizes the provided string so it can be consumed by leader election library
179+
func sanitizeName(name string) string {
180+
re := regexp.MustCompile("[^a-zA-Z0-9-]")
181+
name = re.ReplaceAllString(name, "-")
182+
if name[len(name)-1] == '-' {
183+
// name must not end with '-'
184+
name = name + "X"
185+
}
186+
return name
187+
}

Diff for: leaderelection/leader_election_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package leaderelection
18+
19+
import (
20+
"testing"
21+
)
22+
23+
func Test_sanitizeName(t *testing.T) {
24+
tests := []struct {
25+
name string
26+
input string
27+
output string
28+
}{
29+
{
30+
"requires no change",
31+
"test-driver",
32+
"test-driver",
33+
},
34+
{
35+
"has characters that should be replaced",
36+
"test!driver/foo",
37+
"test-driver-foo",
38+
},
39+
{
40+
"has trailing space",
41+
"driver\\",
42+
"driver-X",
43+
},
44+
}
45+
46+
for _, test := range tests {
47+
t.Run(test.name, func(t *testing.T) {
48+
output := sanitizeName(test.input)
49+
if output != test.output {
50+
t.Logf("expected name: %q", test.output)
51+
t.Logf("actual name: %q", output)
52+
t.Errorf("unexpected santized name")
53+
}
54+
})
55+
}
56+
}

0 commit comments

Comments
 (0)