@@ -63,6 +63,7 @@ def __init__(self, return_type=None):
63
63
self ._raw_return_type = return_type
64
64
self ._stop = False
65
65
self ._api_client = client .ApiClient ()
66
+ self .resource_version = 0
66
67
67
68
def stop (self ):
68
69
self ._stop = True
@@ -81,16 +82,16 @@ def unmarshal_event(self, data, return_type):
81
82
if return_type :
82
83
obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
83
84
js ['object' ] = self ._api_client .deserialize (obj , return_type )
85
+ if hasattr (js ['object' ],'metadata' ):
86
+ self .resource_version = js ['object' ].metadata .resource_version
84
87
return js
85
88
86
- def stream (self , func , keep = False , * args , ** kwargs ):
89
+ def stream (self , func , * args , ** kwargs ):
87
90
"""Watch an API resource and stream the result back via a generator.
88
91
89
92
:param func: The API function pointer. Any parameter to the function
90
93
can be passed after this parameter.
91
94
92
- :param keep: Flag to keep the watch work all the time.
93
-
94
95
:return: Event object with these keys:
95
96
'type': The type of event such as "ADDED", "DELETED", etc.
96
97
'raw_object': a dict representing the watched object.
@@ -116,6 +117,7 @@ def stream(self, func, keep=False, *args, **kwargs):
116
117
kwargs ['watch' ] = True
117
118
kwargs ['_preload_content' ] = False
118
119
120
+ timeouts = ('timeout_seconds' in kwargs )
119
121
while True :
120
122
resp = func (* args , ** kwargs )
121
123
try :
@@ -124,8 +126,9 @@ def stream(self, func, keep=False, *args, **kwargs):
124
126
if self ._stop :
125
127
break
126
128
finally :
129
+ kwargs ['resource_version' ] = self .resource_version
127
130
resp .close ()
128
131
resp .release_conn ()
129
132
130
- if not keep or self ._stop :
133
+ if timeouts or self ._stop :
131
134
break
0 commit comments