18
18
from kubernetes import client
19
19
20
20
PYDOC_RETURN_LABEL = ":return:"
21
+ PYDOC_FOLLOW_PARAM = ":param bool follow:"
21
22
22
23
# Removing this suffix from return type name should give us event's object
23
24
# type. e.g., if list_namespaces() returns "NamespaceList" type,
@@ -63,7 +64,7 @@ def __init__(self, return_type=None):
63
64
self ._raw_return_type = return_type
64
65
self ._stop = False
65
66
self ._api_client = client .ApiClient ()
66
- self .resource_version = 0
67
+ self .resource_version = None
67
68
68
69
def stop (self ):
69
70
self ._stop = True
@@ -76,8 +77,17 @@ def get_return_type(self, func):
76
77
return return_type [:- len (TYPE_LIST_SUFFIX )]
77
78
return return_type
78
79
80
+ def get_watch_argument_name (self , func ):
81
+ if PYDOC_FOLLOW_PARAM in pydoc .getdoc (func ):
82
+ return 'follow'
83
+ else :
84
+ return 'watch'
85
+
79
86
def unmarshal_event (self , data , return_type ):
80
- js = json .loads (data )
87
+ try :
88
+ js = json .loads (data )
89
+ except ValueError :
90
+ return data
81
91
js ['raw_object' ] = js ['object' ]
82
92
if return_type :
83
93
obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
@@ -120,7 +130,7 @@ def stream(self, func, *args, **kwargs):
120
130
121
131
self ._stop = False
122
132
return_type = self .get_return_type (func )
123
- kwargs ['watch' ] = True
133
+ kwargs [self . get_watch_argument_name ( func ) ] = True
124
134
kwargs ['_preload_content' ] = False
125
135
126
136
timeouts = ('timeout_seconds' in kwargs )
@@ -132,9 +142,12 @@ def stream(self, func, *args, **kwargs):
132
142
if self ._stop :
133
143
break
134
144
finally :
135
- kwargs ['resource_version' ] = self .resource_version
136
145
resp .close ()
137
146
resp .release_conn ()
147
+ if self .resource_version is not None :
148
+ kwargs ['resource_version' ] = self .resource_version
149
+ else :
150
+ break
138
151
139
152
if timeouts or self ._stop :
140
153
break
0 commit comments