diff --git a/watch/watch.py b/watch/watch.py index 3058ed9a..b432778e 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -151,7 +151,9 @@ def stream(self, func, *args, **kwargs): if 'resource_version' in kwargs: self.resource_version = kwargs['resource_version'] - timeouts = ('timeout_seconds' in kwargs) + # Do not attempt retries if user specifies a timeout. + # We want to ensure we are returning within that timeout. + disable_retries = ('timeout_seconds' in kwargs) retry_after_410 = False while True: resp = func(*args, **kwargs) @@ -164,9 +166,9 @@ def stream(self, func, *args, **kwargs): if isinstance(event, dict) \ and event['type'] == 'ERROR': obj = event['raw_object'] - # Current request expired, let's retry, + # Current request expired, let's retry, (if enabled) # but only if we have not already retried. - if not retry_after_410 and \ + if not disable_retries and not retry_after_410 and \ obj['code'] == HTTP_STATUS_GONE: retry_after_410 = True break @@ -190,5 +192,5 @@ def stream(self, func, *args, **kwargs): else: self._stop = True - if timeouts or self._stop: + if self._stop or disable_retries: break diff --git a/watch/watch_test.py b/watch/watch_test.py index b8cefd20..32cf6334 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -287,15 +287,65 @@ def test_watch_with_error_event(self): fake_api = Mock() fake_api.get_thing = Mock(return_value=fake_resp) + w = Watch() + # No events are generated when no initial resourceVersion is passed + # No retry is attempted either, preventing an ApiException + assert not list(w.stream(fake_api.get_thing)) + + fake_api.get_thing.assert_called_once_with( + _preload_content=False, watch=True) + fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.close.assert_called_once() + fake_resp.release_conn.assert_called_once() + + def test_watch_retries_on_error_event(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=[ + '{"type": "ERROR", "object": {"code": 410, ' + '"reason": "Gone", "message": "error message"}}\n']) + + fake_api = Mock() + fake_api.get_thing = Mock(return_value=fake_resp) + w = Watch() try: - for _ in w.stream(fake_api.get_thing): + for _ in w.stream(fake_api.get_thing, resource_version=0): + self.fail(self, "Should fail with ApiException.") + except client.rest.ApiException: + pass + + # Two calls should be expected during a retry + fake_api.get_thing.assert_has_calls( + [call(resource_version=0, _preload_content=False, watch=True)] * 2) + fake_resp.read_chunked.assert_has_calls( + [call(decode_content=False)] * 2) + assert fake_resp.close.call_count == 2 + assert fake_resp.release_conn.call_count == 2 + + def test_watch_with_error_event_and_timeout_param(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=[ + '{"type": "ERROR", "object": {"code": 410, ' + '"reason": "Gone", "message": "error message"}}\n']) + + fake_api = Mock() + fake_api.get_thing = Mock(return_value=fake_resp) + + w = Watch() + try: + for _ in w.stream(fake_api.get_thing, timeout_seconds=10): self.fail(self, "Should fail with ApiException.") except client.rest.ApiException: pass fake_api.get_thing.assert_called_once_with( - _preload_content=False, watch=True) + _preload_content=False, watch=True, timeout_seconds=10) fake_resp.read_chunked.assert_called_once_with(decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once()