@@ -63,6 +63,7 @@ def __init__(self, return_type=None):
63
63
self ._raw_return_type = return_type
64
64
self ._stop = False
65
65
self ._api_client = client .ApiClient ()
66
+ self .resource_version = 0
66
67
67
68
def stop (self ):
68
69
self ._stop = True
@@ -81,16 +82,16 @@ def unmarshal_event(self, data, return_type):
81
82
if return_type :
82
83
obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
83
84
js ['object' ] = self ._api_client .deserialize (obj , return_type )
85
+ if hasattr (js ['object' ],'metadata' ):
86
+ self .resource_version = js ['object' ].metadata .resource_version
84
87
return js
85
88
86
- def stream (self , func , keep = False , * args , ** kwargs ):
89
+ def stream (self , func , * args , ** kwargs ):
87
90
"""Watch an API resource and stream the result back via a generator.
88
91
89
92
:param func: The API function pointer. Any parameter to the function
90
93
can be passed after this parameter.
91
94
92
- :param keep: Flag to keep the watch work all the time.
93
-
94
95
:return: Event object with these keys:
95
96
'type': The type of event such as "ADDED", "DELETED", etc.
96
97
'raw_object': a dict representing the watched object.
@@ -116,16 +117,19 @@ def stream(self, func, keep=False, *args, **kwargs):
116
117
kwargs ['watch' ] = True
117
118
kwargs ['_preload_content' ] = False
118
119
120
+ timeouts = ('timeout_seconds' in kwargs )
119
121
while True :
120
122
resp = func (* args , ** kwargs )
123
+ resource_version = 0
121
124
try :
122
125
for line in iter_resp_lines (resp ):
123
126
yield self .unmarshal_event (line , return_type )
124
127
if self ._stop :
125
128
break
126
129
finally :
130
+ kwargs ['resource_version' ] = self .resource_version
127
131
resp .close ()
128
132
resp .release_conn ()
129
133
130
- if not keep or self ._stop :
134
+ if timeouts or self ._stop :
131
135
break
0 commit comments