@@ -63,6 +63,7 @@ def __init__(self, return_type=None):
63
63
self ._raw_return_type = return_type
64
64
self ._stop = False
65
65
self ._api_client = client .ApiClient ()
66
+ self .resource_version = 0
66
67
67
68
def stop (self ):
68
69
self ._stop = True
@@ -81,6 +82,8 @@ def unmarshal_event(self, data, return_type):
81
82
if return_type :
82
83
obj = SimpleNamespace (data = json .dumps (js ['raw_object' ]))
83
84
js ['object' ] = self ._api_client .deserialize (obj , return_type )
85
+ if hasattr (js ['object' ], 'metadata' ):
86
+ self .resource_version = js ['object' ].metadata .resource_version
84
87
return js
85
88
86
89
def stream (self , func , * args , ** kwargs ):
@@ -113,12 +116,19 @@ def stream(self, func, *args, **kwargs):
113
116
return_type = self .get_return_type (func )
114
117
kwargs ['watch' ] = True
115
118
kwargs ['_preload_content' ] = False
116
- resp = func (* args , ** kwargs )
117
- try :
118
- for line in iter_resp_lines (resp ):
119
- yield self .unmarshal_event (line , return_type )
120
- if self ._stop :
121
- break
122
- finally :
123
- resp .close ()
124
- resp .release_conn ()
119
+
120
+ timeouts = ('timeout_seconds' in kwargs )
121
+ while True :
122
+ resp = func (* args , ** kwargs )
123
+ try :
124
+ for line in iter_resp_lines (resp ):
125
+ yield self .unmarshal_event (line , return_type )
126
+ if self ._stop :
127
+ break
128
+ finally :
129
+ kwargs ['resource_version' ] = self .resource_version
130
+ resp .close ()
131
+ resp .release_conn ()
132
+
133
+ if timeouts or self ._stop :
134
+ break
0 commit comments