Skip to content

Commit fdbfa47

Browse files
author
vadym1226
committed
Add flag to enable keep the watch action working all the time
Fixes issue: kubernetes-client/python#124
1 parent 0a7e19a commit fdbfa47

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

watch/watch.py

+17-10
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,14 @@ def unmarshal_event(self, data, return_type):
8383
js['object'] = self._api_client.deserialize(obj, return_type)
8484
return js
8585

86-
def stream(self, func, *args, **kwargs):
86+
def stream(self, func, keep=False, *args, **kwargs):
8787
"""Watch an API resource and stream the result back via a generator.
8888
8989
:param func: The API function pointer. Any parameter to the function
9090
can be passed after this parameter.
9191
92+
:param keep: Flag to keep the watch work all the time.
93+
9294
:return: Event object with these keys:
9395
'type': The type of event such as "ADDED", "DELETED", etc.
9496
'raw_object': a dict representing the watched object.
@@ -113,12 +115,17 @@ def stream(self, func, *args, **kwargs):
113115
return_type = self.get_return_type(func)
114116
kwargs['watch'] = True
115117
kwargs['_preload_content'] = False
116-
resp = func(*args, **kwargs)
117-
try:
118-
for line in iter_resp_lines(resp):
119-
yield self.unmarshal_event(line, return_type)
120-
if self._stop:
121-
break
122-
finally:
123-
resp.close()
124-
resp.release_conn()
118+
119+
while True:
120+
resp = func(*args, **kwargs)
121+
try:
122+
for line in iter_resp_lines(resp):
123+
yield self.unmarshal_event(line, return_type)
124+
if self._stop:
125+
break
126+
finally:
127+
resp.close()
128+
resp.release_conn()
129+
130+
if not keep or self._stop:
131+
break

watch/watch_test.py

+30
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,36 @@ def test_watch_stream_twice(self):
8585
fake_resp.close.assert_called_once()
8686
fake_resp.release_conn.assert_called_once()
8787

88+
def test_watch_stream_keep(self):
89+
w = Watch(float)
90+
91+
fake_resp = Mock()
92+
fake_resp.close = Mock()
93+
fake_resp.release_conn = Mock()
94+
fake_resp.read_chunked = Mock(
95+
return_value=['{"type": "ADDED", "object": 1}\n'])
96+
97+
fake_api = Mock()
98+
fake_api.get_namespaces = Mock(return_value=fake_resp)
99+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
100+
101+
count = 0
102+
for e in w.stream(fake_api.get_namespaces):
103+
count = count + 1
104+
105+
self.assertEqual(count, 1)
106+
107+
for e in w.stream(fake_api.get_namespaces, True):
108+
count = count + 1
109+
if count == 2:
110+
w.stop()
111+
112+
self.assertEqual(count, 2)
113+
self.assertEqual(fake_api.get_namespaces.call_count, 2)
114+
self.assertEqual(fake_resp.read_chunked.call_count, 2)
115+
self.assertEqual(fake_resp.close.call_count, 2)
116+
self.assertEqual(fake_resp.release_conn.call_count, 2)
117+
88118
def test_unmarshal_with_float_object(self):
89119
w = Watch()
90120
event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float')

0 commit comments

Comments
 (0)