Skip to content

Commit f1d47ed

Browse files
authored
Add leaderelection module (#347)
* Add leaderelection module Add leaderelection module, based off of the leaderelection module in kubernetes-client/python. The module has been altered slightly to support asyncio. Fixes #297 * Add leaselock Add leaselock, derived from [1]. [1]: https://github.com/kubernetes-client/python/pull/2314/files * Switch callbacks to coroutines Instead of passing Callable[...,Coroutine] functions as callbacks, pass Coroutines directly. This allows the caller to supply arbitrary context associated with the Coroutine, such as the ApiClient used to establish the leader election. * Logging fixes * Remove misplaced basicConfig * Use %-style string interpolation * Revise log levels * Accept both coroutines and coroutine functions
1 parent 3ab6408 commit f1d47ed

File tree

9 files changed

+1070
-0
lines changed

9 files changed

+1070
-0
lines changed

examples/leaderelection.py

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import logging
17+
import os
18+
import uuid
19+
20+
from kubernetes_asyncio import config
21+
from kubernetes_asyncio.client import api_client
22+
from kubernetes_asyncio.leaderelection import electionconfig, leaderelection
23+
from kubernetes_asyncio.leaderelection.resourcelock.leaselock import LeaseLock
24+
25+
logging.basicConfig(level=logging.INFO)
26+
27+
28+
async def main():
29+
30+
# Authenticate using config file
31+
await config.load_kube_config(config_file=os.environ.get("KUBECONFIG", ""))
32+
33+
# Parameters required from the user
34+
35+
# A unique identifier for this candidate
36+
candidate_id = uuid.uuid4()
37+
38+
# Name of the lock object to be created
39+
lock_name = "examplepython"
40+
41+
# Kubernetes namespace
42+
lock_namespace = "default"
43+
44+
# The function that a user wants to run once a candidate is elected as a
45+
# leader. Cancellation is supported (when a held leader lock is lost).
46+
async def example_start_func():
47+
try:
48+
print("I am leader")
49+
except asyncio.CancelledError:
50+
print(
51+
"Start function cancelled - lost leader election after becoming leader"
52+
)
53+
54+
async def example_end_func():
55+
print("I am no longer leader")
56+
57+
# A user can choose not to provide any callbacks for what to do when a candidate fails to lead - onStoppedLeading()
58+
# In that case, a default callback function will be used
59+
60+
async with api_client.ApiClient() as apic:
61+
# Create config
62+
leader_election_config = electionconfig.Config(
63+
# A legacy ConfigMapLock is also available
64+
LeaseLock(lock_name, lock_namespace, candidate_id, apic),
65+
lease_duration=17,
66+
renew_deadline=15,
67+
retry_period=5,
68+
# Coroutines are also accepted, to facilitate providing context
69+
# (e.g. passing apic)
70+
onstarted_leading=example_start_func,
71+
onstopped_leading=example_end_func,
72+
)
73+
74+
# Enter leader election
75+
await leaderelection.LeaderElection(leader_election_config).run()
76+
# User can choose to do another round of election or simply exit
77+
print("Exited leader election")
78+
79+
80+
if __name__ == "__main__":
81+
asyncio.run(main())

kubernetes_asyncio/leaderelection/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections.abc import Callable, Coroutine # noqa:F401
16+
17+
18+
class Config:
19+
# Validate config, exit if an error is detected
20+
21+
# onstarted_leading and onstopped_leading accept either coroutines or
22+
# coroutine functions. Coroutines faciliate passing context, but coroutine
23+
# functions can be simpler when passing context is not required.
24+
#
25+
# One example of when passing context is helpful is sharing the ApiClient
26+
# used by the leader election, which can then be used for subsequent
27+
# Kubernetes API operations upon onstopped_leading or onstopped_leading.
28+
def __init__(
29+
self,
30+
lock,
31+
lease_duration,
32+
renew_deadline,
33+
retry_period,
34+
onstarted_leading, # type: Coroutine | Callable[[], Coroutine]
35+
onstopped_leading=None, # type: Coroutine | Callable[[], Coroutine] | None
36+
):
37+
self.jitter_factor = 1.2
38+
39+
if lock is None:
40+
raise ValueError("lock cannot be None")
41+
self.lock = lock
42+
43+
if lease_duration <= renew_deadline:
44+
raise ValueError("lease_duration must be greater than renew_deadline")
45+
46+
if renew_deadline <= self.jitter_factor * retry_period:
47+
raise ValueError(
48+
"renewDeadline must be greater than retry_period*jitter_factor"
49+
)
50+
51+
if lease_duration < 1:
52+
raise ValueError("lease_duration must be greater than one")
53+
54+
if renew_deadline < 1:
55+
raise ValueError("renew_deadline must be greater than one")
56+
57+
if retry_period < 1:
58+
raise ValueError("retry_period must be greater than one")
59+
60+
self.lease_duration = lease_duration
61+
self.renew_deadline = renew_deadline
62+
self.retry_period = retry_period
63+
64+
if onstarted_leading is None:
65+
raise ValueError("callback onstarted_leading cannot be None")
66+
self.onstarted_leading = onstarted_leading
67+
68+
self.onstopped_leading = onstopped_leading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
# Copyright 2021 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import datetime
17+
import inspect
18+
import json
19+
import logging
20+
import sys
21+
import time
22+
from http import HTTPStatus
23+
24+
from .leaderelectionrecord import LeaderElectionRecord
25+
26+
"""
27+
This package implements leader election using an annotation in a Kubernetes
28+
object. The onstarted_leading coroutine is run as a task, which is cancelled if
29+
the leader lock is obtained and then lost.
30+
31+
At first all candidates are considered followers. The one to create a lock or
32+
update an existing lock first becomes the leader and remains so until it fails
33+
to renew its lease.
34+
"""
35+
36+
37+
class LeaderElection:
38+
def __init__(self, election_config):
39+
if election_config is None:
40+
sys.exit("argument config not passed")
41+
42+
# Latest record observed in the created lock object
43+
self.observed_record = None
44+
45+
# The configuration set for this candidate
46+
self.election_config = election_config
47+
48+
# Latest update time of the lock
49+
self.observed_time_milliseconds = 0
50+
51+
# Point of entry to Leader election
52+
async def run(self):
53+
# Try to create/ acquire a lock
54+
if await self.acquire():
55+
logging.info(
56+
"%s successfully acquired lease", self.election_config.lock.identity
57+
)
58+
59+
onstarted_leading_coroutine = (
60+
self.election_config.onstarted_leading
61+
if inspect.iscoroutine(self.election_config.onstarted_leading)
62+
else self.election_config.onstarted_leading()
63+
)
64+
65+
task = asyncio.create_task(onstarted_leading_coroutine)
66+
67+
await self.renew_loop()
68+
69+
# Leader lock lost - cancel the onstarted_leading coroutine if it's
70+
# still running. This permits onstarted_leading to clean up state
71+
# that might not be accessible to onstopped_leading.
72+
task.cancel()
73+
74+
# Failed to update lease, run onstopped_leading callback. This is
75+
# preserved in order to continue to provide an interface similar to
76+
# the one provided by `kubernetes-client/python`.
77+
if self.election_config.onstopped_leading is not None:
78+
await (
79+
self.election_config.onstopped_leading
80+
if inspect.iscoroutine(self.election_config.onstopped_leading)
81+
else self.election_config.onstopped_leading()
82+
)
83+
84+
async def acquire(self):
85+
# Follower
86+
logging.debug("%s is a follower", self.election_config.lock.identity)
87+
retry_period = self.election_config.retry_period
88+
89+
while True:
90+
succeeded = await self.try_acquire_or_renew()
91+
92+
if succeeded:
93+
return True
94+
95+
await asyncio.sleep(retry_period)
96+
97+
async def renew_loop(self):
98+
# Leader
99+
logging.debug(
100+
"Leader has entered renew loop and will try to update lease continuously"
101+
)
102+
103+
retry_period = self.election_config.retry_period
104+
renew_deadline = self.election_config.renew_deadline * 1000
105+
106+
while True:
107+
timeout = int(time.time() * 1000) + renew_deadline
108+
succeeded = False
109+
110+
while int(time.time() * 1000) < timeout:
111+
succeeded = await self.try_acquire_or_renew()
112+
113+
if succeeded:
114+
break
115+
await asyncio.sleep(retry_period)
116+
117+
if succeeded:
118+
await asyncio.sleep(retry_period)
119+
continue
120+
121+
# failed to renew, return
122+
return
123+
124+
async def try_acquire_or_renew(self):
125+
now_timestamp = time.time()
126+
now = datetime.datetime.fromtimestamp(now_timestamp)
127+
128+
# Check if lock is created
129+
lock_status, old_election_record = await self.election_config.lock.get(
130+
self.election_config.lock.name, self.election_config.lock.namespace
131+
)
132+
133+
# create a default Election record for this candidate
134+
leader_election_record = LeaderElectionRecord(
135+
self.election_config.lock.identity,
136+
str(self.election_config.lease_duration),
137+
str(now),
138+
str(now),
139+
)
140+
141+
# A lock is not created with that name, try to create one
142+
if not lock_status:
143+
if json.loads(old_election_record.body)["code"] != HTTPStatus.NOT_FOUND:
144+
logging.error(
145+
"Error retrieving resource lock %s as %s",
146+
self.election_config.lock.name,
147+
old_election_record.reason,
148+
)
149+
return False
150+
151+
logging.debug(
152+
"%s is trying to create a lock",
153+
leader_election_record.holder_identity,
154+
)
155+
create_status = await self.election_config.lock.create(
156+
name=self.election_config.lock.name,
157+
namespace=self.election_config.lock.namespace,
158+
election_record=leader_election_record,
159+
)
160+
161+
if not create_status:
162+
logging.error(
163+
"%s failed to create lock", leader_election_record.holder_identity
164+
)
165+
return False
166+
167+
self.observed_record = leader_election_record
168+
self.observed_time_milliseconds = int(time.time() * 1000)
169+
return True
170+
171+
# A lock exists with that name
172+
# Validate old_election_record
173+
if old_election_record is None:
174+
# try to update lock with proper election record
175+
return await self.update_lock(leader_election_record)
176+
177+
if (
178+
old_election_record.holder_identity is None
179+
or old_election_record.lease_duration is None
180+
or old_election_record.acquire_time is None
181+
or old_election_record.renew_time is None
182+
):
183+
# try to update lock with proper election record
184+
return await self.update_lock(leader_election_record)
185+
186+
# Report transitions
187+
if (
188+
self.observed_record
189+
and self.observed_record.holder_identity
190+
!= old_election_record.holder_identity
191+
):
192+
logging.debug(
193+
"Leader has switched to %s", old_election_record.holder_identity
194+
)
195+
196+
if (
197+
self.observed_record is None
198+
or old_election_record.__dict__ != self.observed_record.__dict__
199+
):
200+
self.observed_record = old_election_record
201+
self.observed_time_milliseconds = int(time.time() * 1000)
202+
203+
# If This candidate is not the leader and lease duration is yet to finish
204+
if (
205+
self.election_config.lock.identity != self.observed_record.holder_identity
206+
and self.observed_time_milliseconds
207+
+ self.election_config.lease_duration * 1000
208+
> int(now_timestamp * 1000)
209+
):
210+
logging.debug(
211+
"Yet to finish lease_duration, lease held by %s and has not expired",
212+
old_election_record.holder_identity,
213+
)
214+
return False
215+
216+
# If this candidate is the Leader
217+
if self.election_config.lock.identity == self.observed_record.holder_identity:
218+
# Leader updates renewTime, but keeps acquire_time unchanged
219+
leader_election_record.acquire_time = self.observed_record.acquire_time
220+
221+
return await self.update_lock(leader_election_record)
222+
223+
async def update_lock(self, leader_election_record):
224+
# Update object with latest election record
225+
update_status = await self.election_config.lock.update(
226+
self.election_config.lock.name,
227+
self.election_config.lock.namespace,
228+
leader_election_record,
229+
)
230+
231+
if not update_status:
232+
logging.warning(
233+
"%s failed to acquire lease", leader_election_record.holder_identity
234+
)
235+
return False
236+
237+
self.observed_record = leader_election_record
238+
self.observed_time_milliseconds = int(time.time() * 1000)
239+
logging.debug(
240+
"Leader %s has successfully updated lease",
241+
leader_election_record.holder_identity,
242+
)
243+
return True

0 commit comments

Comments
 (0)