Skip to content

Commit d010f2e

Browse files
authored
Merge pull request kubernetes-client#2317 from anvil-works/allow_watch_bookmarks
Add support for allowWatchBookmarks to the dynamic client
2 parents 50771fd + 0945c8b commit d010f2e

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

examples/watch/watch_recovery.py

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Copyright 2025 The Kubernetes Authors.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Uses watch to print a stream of Pod events from the default namespace.
17+
The allow_watch_bookmarks flag is set to True, so the API server can send
18+
BOOKMARK events.
19+
20+
If the connection to the API server is lost, the script will reconnect and
21+
resume watching from the most recently received resource version.
22+
23+
For more information, see:
24+
- https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
25+
- https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch
26+
"""
27+
28+
import urllib3
29+
30+
from kubernetes import config
31+
from kubernetes.client import api_client
32+
from kubernetes.client.exceptions import ApiException
33+
from kubernetes.dynamic.client import DynamicClient
34+
35+
NAMESPACE = "default"
36+
37+
38+
def main():
39+
# Configs can be set in Configuration class directly or using helper
40+
# utility. If no argument provided, the config will be loaded from
41+
# default location.
42+
config.load_kube_config()
43+
client = DynamicClient(api_client.ApiClient())
44+
api = client.resources.get(api_version="v1", kind="Pod")
45+
46+
# Setting resource_version=None means the server will send synthetic
47+
# ADDED events for all resources that exist when the watch starts.
48+
resource_version = None
49+
while True:
50+
try:
51+
for event in api.watch(
52+
namespace=NAMESPACE,
53+
resource_version=resource_version,
54+
allow_watch_bookmarks=True,
55+
):
56+
# Remember the last resourceVersion we saw, so we can resume
57+
# watching from there if the connection is lost.
58+
resource_version = event['object'].metadata.resourceVersion
59+
60+
print("Event: %s %s %s" % (
61+
resource_version,
62+
event['type'],
63+
event['object'].metadata.name,
64+
))
65+
66+
except ApiException as err:
67+
if err.status == 410:
68+
print("ERROR: The requested resource version is no longer available.")
69+
resource_version = None
70+
else:
71+
raise
72+
73+
except urllib3.exceptions.ProtocolError:
74+
print("Lost connection to the k8s API server. Reconnecting...")
75+
76+
77+
if __name__ == "__main__":
78+
main()

kubernetes/base/dynamic/client.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def server_side_apply(self, resource, body=None, name=None, namespace=None, forc
163163

164164
return self.request('patch', path, body=body, force_conflicts=force_conflicts, **kwargs)
165165

166-
def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, watcher=None):
166+
def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, watcher=None, allow_watch_bookmarks=None):
167167
"""
168168
Stream events for a resource from the Kubernetes API
169169
@@ -176,6 +176,7 @@ def watch(self, resource, namespace=None, name=None, label_selector=None, field_
176176
a resource_version greater than this value will be returned
177177
:param timeout: The amount of time in seconds to wait before terminating the stream
178178
:param watcher: The Watcher object that will be used to stream the resource
179+
:param allow_watch_bookmarks: Ask the API server to send BOOKMARK events
179180
180181
:return: Event object with these keys:
181182
'type': The type of event such as "ADDED", "DELETED", etc.
@@ -206,7 +207,8 @@ def watch(self, resource, namespace=None, name=None, label_selector=None, field_
206207
label_selector=label_selector,
207208
resource_version=resource_version,
208209
serialize=False,
209-
timeout_seconds=timeout
210+
timeout_seconds=timeout,
211+
allow_watch_bookmarks=allow_watch_bookmarks,
210212
):
211213
event['object'] = ResourceInstance(resource, event['object'])
212214
yield event
@@ -248,6 +250,8 @@ def request(self, method, path, body=None, **params):
248250
query_params.append(('fieldManager', params['field_manager']))
249251
if params.get('force_conflicts') is not None:
250252
query_params.append(('force', params['force_conflicts']))
253+
if params.get('allow_watch_bookmarks') is not None:
254+
query_params.append(('allowWatchBookmarks', params['allow_watch_bookmarks']))
251255

252256
header_params = params.get('header_params', {})
253257
form_params = []

0 commit comments

Comments
 (0)