Skip to content

Commit 5357357

Browse files
author
OpenShift Bot
authored
Merge pull request #13082 from ncdc/sync-endpoints-before-leader-election
Merged by openshift-bot
2 parents d757d20 + d960c59 commit 5357357

File tree

1 file changed

+37
-13
lines changed

1 file changed

+37
-13
lines changed

pkg/util/leaderlease/leaderlease.go

+37-13
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ type Leaser interface {
2929
// Etcd takes and holds a leader lease until it can no longer confirm it owns
3030
// the lease, then returns.
3131
type Etcd struct {
32-
client etcdclient.KeysAPI
33-
key string
34-
value string
35-
ttl uint64
32+
client etcdclient.Client
33+
keysClient etcdclient.KeysAPI
34+
key string
35+
value string
36+
ttl uint64
3637

3738
// the fraction of the ttl to wait before trying to renew - for instance, 0.75 with TTL 20
3839
// will wait 15 seconds before attempting to renew the lease, then retry over the next 5
@@ -51,10 +52,11 @@ type Etcd struct {
5152
// client takes it.
5253
func NewEtcd(client etcdclient.Client, key, value string, ttl uint64) Leaser {
5354
return &Etcd{
54-
client: etcdclient.NewKeysAPI(client),
55-
key: key,
56-
value: value,
57-
ttl: ttl,
55+
client: client,
56+
keysClient: etcdclient.NewKeysAPI(client),
57+
key: key,
58+
value: value,
59+
ttl: ttl,
5860

5961
waitFraction: 0.66,
6062
pauseInterval: time.Second,
@@ -63,8 +65,30 @@ func NewEtcd(client etcdclient.Client, key, value string, ttl uint64) Leaser {
6365
}
6466
}
6567

68+
const autoSyncInterval = 10 * time.Second
69+
6670
// AcquireAndHold implements an acquire and release of a lease.
6771
func (e *Etcd) AcquireAndHold(notify chan error) {
72+
ctx, cancel := context.WithCancel(context.Background())
73+
defer cancel()
74+
75+
go func() {
76+
// Because the call to e.keysClient.Set in tryAcquire is using PrevNoExist, etcd considers this
77+
// to be a "one-shot" attempt, meaning that if the connection attempt to one of the etcd cluster
78+
// members fails, it will not fail over to any of the other cluster members. Calling
79+
// e.client.AutoSync is not a one-shot call, and it will try to contact each cluster member
80+
// until it succeeds. Assuming it does, the client's list of endpoints is updated, and any
81+
// unavailable members are removed from the list.
82+
for {
83+
err := e.client.AutoSync(ctx, autoSyncInterval)
84+
if err == context.DeadlineExceeded || err == context.Canceled {
85+
break
86+
}
87+
utilruntime.HandleError(err)
88+
time.Sleep(e.pauseInterval)
89+
}
90+
}()
91+
6892
for {
6993
ok, ttl, index, err := e.tryAcquire()
7094
if err != nil {
@@ -96,7 +120,7 @@ func (e *Etcd) AcquireAndHold(notify chan error) {
96120
func (e *Etcd) tryAcquire() (ok bool, ttl uint64, nextIndex uint64, err error) {
97121
ttl = e.ttl
98122

99-
resp, err := e.client.Set(
123+
resp, err := e.keysClient.Set(
100124
context.Background(),
101125
e.key,
102126
e.value,
@@ -116,7 +140,7 @@ func (e *Etcd) tryAcquire() (ok bool, ttl uint64, nextIndex uint64, err error) {
116140
return false, 0, 0, fmt.Errorf("unable to check lease %s: %v", e.key, err)
117141
}
118142

119-
latest, err := e.client.Get(context.Background(), e.key, nil)
143+
latest, err := e.keysClient.Get(context.Background(), e.key, nil)
120144
if err != nil {
121145
return false, 0, 0, fmt.Errorf("unable to retrieve lease %s: %v", e.key, err)
122146
}
@@ -144,7 +168,7 @@ func (e *Etcd) tryAcquire() (ok bool, ttl uint64, nextIndex uint64, err error) {
144168
// Release tries to delete the leader lock.
145169
func (e *Etcd) Release() {
146170
for i := 0; i < e.maxRetries; i++ {
147-
_, err := e.client.Delete(context.Background(), e.key, &etcdclient.DeleteOptions{PrevValue: e.value})
171+
_, err := e.keysClient.Delete(context.Background(), e.key, &etcdclient.DeleteOptions{PrevValue: e.value})
148172
if err == nil {
149173
break
150174
}
@@ -197,7 +221,7 @@ func (e *Etcd) tryHold(ttl, index uint64) error {
197221
case <-time.After(after):
198222
err := wait.Poll(interval, last, func() (bool, error) {
199223
glog.V(4).Infof("Renewing lease %s at %d", e.key, index-1)
200-
resp, err := e.client.Set(context.Background(), e.key, e.value,
224+
resp, err := e.keysClient.Set(context.Background(), e.key, e.value,
201225
&etcdclient.SetOptions{
202226
TTL: time.Duration(e.ttl) * time.Second,
203227
PrevValue: e.value,
@@ -264,7 +288,7 @@ func (e *Etcd) waitExpiration(held bool, from uint64, stop chan struct{}) (bool,
264288
default:
265289
}
266290
glog.V(5).Infof("watching for expiration of lease %s from %d", e.key, from)
267-
w := e.client.Watcher(e.key, &etcdclient.WatcherOptions{AfterIndex: from - 1})
291+
w := e.keysClient.Watcher(e.key, &etcdclient.WatcherOptions{AfterIndex: from - 1})
268292
resp, err := w.Next(context.Background())
269293
if err != nil {
270294
return false, etcdIndexFor(err, from), err

0 commit comments

Comments
 (0)