@@ -91,7 +91,7 @@ def unmarshal_event(self, data, return_type):
91
91
except ValueError :
92
92
return data
93
93
js ['raw_object' ] = js ['object' ]
94
- if return_type :
94
+ if return_type and js [ 'type' ] != 'ERROR' :
95
95
obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
96
96
js ['object' ] = self ._api_client .deserialize (obj , return_type )
97
97
if hasattr (js ['object' ], 'metadata' ):
@@ -107,6 +107,14 @@ def unmarshal_event(self, data, return_type):
107
107
def stream (self , func , * args , ** kwargs ):
108
108
"""Watch an API resource and stream the result back via a generator.
109
109
110
+ Note that watching an API resource can expire. The method tries to
111
+ resume automatically from the last result, but if that last result
112
+ is too old as well, an `ApiException` exception will be thrown with
113
+ ``code`` 410. In that case you have to recover yourself, probably
114
+ by listing the API resource to obtain the latest state and then
115
+ watching from that state on by setting ``resource_version`` to
116
+ one returned from listing.
117
+
110
118
:param func: The API function pointer. Any parameter to the function
111
119
can be passed after this parameter.
112
120
@@ -138,11 +146,26 @@ def stream(self, func, *args, **kwargs):
138
146
self .resource_version = kwargs ['resource_version' ]
139
147
140
148
timeouts = ('timeout_seconds' in kwargs )
149
+ retry_after_410 = False
141
150
while True :
142
151
resp = func (* args , ** kwargs )
143
152
try :
144
153
for line in iter_resp_lines (resp ):
145
- yield self .unmarshal_event (line , return_type )
154
+ event = self .unmarshal_event (line , return_type )
155
+ if isinstance (event , dict ) and event ['type' ] == 'ERROR' :
156
+ obj = event ['raw_object' ]
157
+ # Current request expired, let's retry,
158
+ # but only if we have not already retried.
159
+ if not retry_after_410 and obj ['code' ] == 410 :
160
+ retry_after_410 = True
161
+ break
162
+ else :
163
+ reason = "%s: %s" % (obj ['reason' ], obj ['message' ])
164
+ raise client .rest .ApiException (status = obj ['code' ],
165
+ reason = reason )
166
+ else :
167
+ retry_after_410 = False
168
+ yield event
146
169
if self ._stop :
147
170
break
148
171
finally :
0 commit comments