Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit 3c30a30

Browse files
committed
fix watching with a specified resource version
The watch code reset the version to the last found in the response. When you first list existing objects and then start watching from that resource version the existing versions are older than the version you wanted and the watch starts from the wrong version after the first restart. This leads to for example already deleted objects ending in the stream again. Fix this by setting the minimum resource version to reset from to the input resource version. As long as k8s returns all objects in order in the watch this should work. We cannot use the integer value of the resource version to order it as one should be treat the value as opaque. Closes kubernetes-client/python#700
1 parent 5c242ea commit 3c30a30

File tree

2 files changed

+74
-1
lines changed

2 files changed

+74
-1
lines changed

Diff for: watch/watch.py

+2
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ def stream(self, func, *args, **kwargs):
122122
return_type = self.get_return_type(func)
123123
kwargs['watch'] = True
124124
kwargs['_preload_content'] = False
125+
if 'resource_version' in kwargs:
126+
self.resource_version = kwargs['resource_version']
125127

126128
timeouts = ('timeout_seconds' in kwargs)
127129
while True:

Diff for: watch/watch_test.py

+72-1
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414

1515
import unittest
1616

17-
from mock import Mock
17+
from mock import Mock, call
1818

1919
from .watch import Watch
2020

2121

2222
class WatchTests(unittest.TestCase):
23+
def setUp(self):
24+
# counter for a test that needs test global state
25+
self.callcount = 0
2326

2427
def test_watch_with_decode(self):
2528
fake_resp = Mock()
@@ -62,6 +65,74 @@ def test_watch_with_decode(self):
6265
fake_resp.close.assert_called_once()
6366
fake_resp.release_conn.assert_called_once()
6467

68+
def test_watch_resource_version_set(self):
69+
# https://github.com/kubernetes-client/python/issues/700
70+
# ensure watching from a resource version does reset to resource
71+
# version 0 after k8s resets the watch connection
72+
fake_resp = Mock()
73+
fake_resp.close = Mock()
74+
fake_resp.release_conn = Mock()
75+
values = [
76+
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
77+
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
78+
'{"type": "ADDED", "object": {"metadata": {"name": "test2",'
79+
'"resourceVersion": "2"}, "spec": {}, "sta',
80+
'tus": {}}}\n'
81+
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
82+
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
83+
]
84+
# return nothing on the first call and values on the second
85+
# this emulates a watch from a rv that returns nothing in the first k8s
86+
# watch reset and values later
87+
88+
def get_values(*args, **kwargs):
89+
self.callcount += 1
90+
if self.callcount == 1:
91+
return []
92+
else:
93+
return values
94+
95+
fake_resp.read_chunked = Mock(
96+
side_effect=get_values)
97+
98+
fake_api = Mock()
99+
fake_api.get_namespaces = Mock(return_value=fake_resp)
100+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
101+
102+
w = Watch()
103+
# ensure we keep our requested resource version or the version latest
104+
# returned version when the existing versions are older than the
105+
# requested version
106+
# needed for the list existing objects, then watch from there use case
107+
calls = []
108+
109+
iterations = 2
110+
# first two calls must use the passed rv, the first call is a
111+
# "reset" and does not actually return anything
112+
# the second call must use the same rv but will return values
113+
# (with a wrong rv but a real cluster would behave correctly)
114+
# calls following that will use the rv from those returned values
115+
calls.append(call(_preload_content=False, watch=True,
116+
resource_version="5"))
117+
calls.append(call(_preload_content=False, watch=True,
118+
resource_version="5"))
119+
for i in range(iterations):
120+
# ideally we want 5 here but as rv must be treated as an
121+
# opaque value we cannot interpret it and order it so rely
122+
# on k8s returning the events completely and in order
123+
calls.append(call(_preload_content=False, watch=True,
124+
resource_version="3"))
125+
126+
for c, e in enumerate(w.stream(fake_api.get_namespaces,
127+
resource_version="5")):
128+
if c == len(values) * iterations:
129+
w.stop()
130+
131+
# check calls are in the list, gives good error output
132+
fake_api.get_namespaces.assert_has_calls(calls)
133+
# more strict test with worse error message
134+
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)
135+
65136
def test_watch_stream_twice(self):
66137
w = Watch(float)
67138
for step in ['first', 'second']:

0 commit comments

Comments
 (0)