From 662298ab63f999cf90094a02461ff66baed77ac5 Mon Sep 17 00:00:00 2001 From: Aaron Hoffer Date: Thu, 22 Sep 2022 11:55:11 -0700 Subject: [PATCH 1/3] Allow retries for statuses other than 429 in bulk streaming Closes https://github.com/elastic/elasticsearch-py/issues/1004. This updates https://github.com/elastic/elasticsearch-py/pull/1005 to work for both the async and sync client as well as adding tests. --- elasticsearch/_async/helpers.py | 35 ++++++++++----- elasticsearch/helpers/actions.py | 35 ++++++++++----- .../test_async/test_server/test_helpers.py | 44 +++++++++++++++++++ .../test_server/test_helpers.py | 44 +++++++++++++++++++ 4 files changed, 138 insertions(+), 20 deletions(-) diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index 7c8c8af37..6f6c1e557 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -158,6 +158,12 @@ async def azip( pass +def _retry_for_status(status: int) -> bool: + if status == 429: + return True + return False + + async def async_streaming_bulk( client: AsyncElasticsearch, actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]], @@ -167,6 +173,7 @@ async def async_streaming_bulk( expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY ] = expand_action, + retry_for_status_callback: Callable[[int], bool] = _retry_for_status, raise_on_exception: bool = True, max_retries: int = 0, initial_backoff: float = 2, @@ -185,10 +192,11 @@ async def async_streaming_bulk( entire input is consumed and sent. If you specify ``max_retries`` it will also retry any documents that were - rejected with a ``429`` status code. To do this it will wait (**by calling - asyncio.sleep**) for ``initial_backoff`` seconds and then, - every subsequent rejection for the same chunk, for double the time every - time up to ``max_backoff`` seconds. + rejected with a ``429`` status code. Use ``retry_for_status_callback`` to + configure which status codes will be retried. To do this it will wait + (**by calling time.sleep which will block**) for ``initial_backoff`` seconds + and then, every subsequent rejection for the same chunk, for double the time + every time up to ``max_backoff`` seconds. :arg client: instance of :class:`~elasticsearch.AsyncElasticsearch` to use :arg actions: iterable or async iterable containing the actions to be executed @@ -201,8 +209,12 @@ async def async_streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). + :arg retry_for_status_callback: callback executed on each item's status, + should return a True if the status require a retry and False if not. + (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when - ``429`` is received, set to 0 (default) for no retries on ``429`` + retry_for_status_callback (defaulting to ``429``) is received, + set to 0 (default) for no retries on retry_for_status_callback :arg initial_backoff: number of seconds we should wait before the first retry. Any subsequent retries will be powers of ``initial_backoff * 2**retry_number`` @@ -267,11 +279,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: if not ok: action, info = info.popitem() - # retry if retries enabled, we get 429, and we are not - # in the last attempt + # retry if retries enabled, we are not in the last attempt, + # and retry_for_status_callback is true (defaulting to 429) if ( max_retries - and info["status"] == 429 + and retry_for_status_callback(info["status"]) and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects strings so we need to @@ -284,8 +296,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: yield ok, info except ApiError as e: - # suppress 429 errors since we will retry them - if attempt == max_retries or e.status_code != 429: + # suppress any status which retry_for_status_callback is true (defaulting to 429) + # since we will retry them + if attempt == max_retries or not retry_for_status_callback( + e.status_code + ): raise else: if not to_retry: diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index a016c998b..b26002b8a 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -355,6 +355,12 @@ def _process_bulk_chunk( yield from gen +def _retry_for_status(status: int) -> bool: + if status == 429: + return True + return False + + def streaming_bulk( client: Elasticsearch, actions: Iterable[_TYPE_BULK_ACTION], @@ -364,6 +370,7 @@ def streaming_bulk( expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY ] = expand_action, + retry_for_status_callback: Callable[[int], bool] = _retry_for_status, raise_on_exception: bool = True, max_retries: int = 0, initial_backoff: float = 2, @@ -382,10 +389,11 @@ def streaming_bulk( entire input is consumed and sent. If you specify ``max_retries`` it will also retry any documents that were - rejected with a ``429`` status code. To do this it will wait (**by calling - time.sleep which will block**) for ``initial_backoff`` seconds and then, - every subsequent rejection for the same chunk, for double the time every - time up to ``max_backoff`` seconds. + rejected with a ``429`` status code. Use ``retry_for_status_callback`` to + configure which status codes will be retried. To do this it will wait + (**by calling time.sleep which will block**) for ``initial_backoff`` seconds + and then, every subsequent rejection for the same chunk, for double the time + every time up to ``max_backoff`` seconds. :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use :arg actions: iterable containing the actions to be executed @@ -398,8 +406,12 @@ def streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). + :arg retry_for_status_callback: callback executed on each item's status, + should return a True if the status require a retry and False if not. + (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when - ``429`` is received, set to 0 (default) for no retries on ``429`` + retry_for_status_callback (defaulting to ``429``) is received, + set to 0 (default) for no retries on retry_for_status_callback :arg initial_backoff: number of seconds we should wait before the first retry. Any subsequent retries will be powers of ``initial_backoff * 2**retry_number`` @@ -451,11 +463,11 @@ def streaming_bulk( if not ok: action, info = info.popitem() - # retry if retries enabled, we get 429, and we are not - # in the last attempt + # retry if retries enabled, we are not in the last attempt, + # and retry_for_status_callback is true (defaulting to 429) if ( max_retries - and info["status"] == 429 + and retry_for_status_callback(info["status"]) and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects bytes so we need to @@ -468,8 +480,11 @@ def streaming_bulk( yield ok, info except ApiError as e: - # suppress 429 errors since we will retry them - if attempt == max_retries or e.status_code != 429: + # suppress any status which retry_for_status_callback is true (defaulting to 429) + # since we will retry them + if attempt == max_retries or not retry_for_status_callback( + e.status_code + ): raise else: if not to_retry: diff --git a/test_elasticsearch/test_async/test_server/test_helpers.py b/test_elasticsearch/test_async/test_server/test_helpers.py index ecce0dffc..9ef35ad10 100644 --- a/test_elasticsearch/test_async/test_server/test_helpers.py +++ b/test_elasticsearch/test_async/test_server/test_helpers.py @@ -292,6 +292,50 @@ async def streaming_bulk(): await streaming_bulk() assert 4 == failing_client._called + async def test_connection_timeout_is_retried_with_retry_status_callback( + self, async_client + ): + failing_client = FailingBulkClient( + async_client, + fail_with=ApiError( + message="Connection timed out!", + body={}, + meta=ApiResponseMeta( + status=522, headers={}, http_version="1.1", duration=0, node=None + ), + ), + ) + docs = [ + {"_index": "i", "_id": 47, "f": "v"}, + {"_index": "i", "_id": 45, "f": "v"}, + {"_index": "i", "_id": 42, "f": "v"}, + ] + + def _retry_for_connection_timeout(status): + if status == 522: + return True + return False + + results = [ + x + async for x in helpers.async_streaming_bulk( + failing_client, + docs, + raise_on_exception=False, + raise_on_error=False, + chunk_size=1, + retry_for_status_callback=_retry_for_connection_timeout, + max_retries=1, + initial_backoff=0, + ) + ] + assert 3 == len(results) + assert [True, True, True] == [r[0] for r in results] + await async_client.indices.refresh(index="i") + res = await async_client.search(index="i") + assert {"value": 3, "relation": "eq"} == res["hits"]["total"] + assert 4 == failing_client._called + class TestBulk(object): async def test_bulk_works_with_single_item(self, async_client): diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index a74aa205c..364043728 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -288,6 +288,50 @@ def streaming_bulk(): assert 4 == failing_client._called +def test_connection_timeout_is_retried_with_retry_status_callback(sync_client): + failing_client = FailingBulkClient( + sync_client, + fail_with=ApiError( + message="Connection timed out!", + body={}, + meta=ApiResponseMeta( + status=522, headers={}, http_version="1.1", duration=0, node=None + ), + ), + ) + docs = [ + {"_index": "i", "_id": 47, "f": "v"}, + {"_index": "i", "_id": 45, "f": "v"}, + {"_index": "i", "_id": 42, "f": "v"}, + ] + + def _retry_for_connection_timeout(status): + if status == 522: + return True + return False + + results = list( + helpers.streaming_bulk( + failing_client, + docs, + index="i", + raise_on_exception=False, + raise_on_error=False, + chunk_size=1, + retry_for_status_callback=_retry_for_connection_timeout, + max_retries=1, + initial_backoff=0, + ) + ) + assert 3 == len(results) + print(results) + assert [True, True, True] == [r[0] for r in results] + sync_client.indices.refresh(index="i") + res = sync_client.search(index="i") + assert {"value": 3, "relation": "eq"} == res["hits"]["total"] + assert 4 == failing_client._called + + def test_bulk_works_with_single_item(sync_client): docs = [{"answer": 42, "_id": 1}] success, failed = helpers.bulk(sync_client, docs, index="test-index", refresh=True) From 1c2800f5e9c6549df25eb8025fcd312b9889f992 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Tue, 12 Nov 2024 13:19:19 +0400 Subject: [PATCH 2/3] Use enum instead of function for status retries --- elasticsearch/_async/helpers.py | 30 ++++++++----------- elasticsearch/helpers/actions.py | 30 ++++++++----------- .../test_async/test_server/test_helpers.py | 7 +---- .../test_server/test_helpers.py | 7 +---- 4 files changed, 26 insertions(+), 48 deletions(-) diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index 2c54bacdb..f1d7c8eb1 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -158,12 +158,6 @@ async def azip( pass -def _retry_for_status(status: int) -> bool: - if status == 429: - return True - return False - - async def async_streaming_bulk( client: AsyncElasticsearch, actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]], @@ -173,13 +167,13 @@ async def async_streaming_bulk( expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY ] = expand_action, - retry_for_status_callback: Callable[[int], bool] = _retry_for_status, raise_on_exception: bool = True, max_retries: int = 0, initial_backoff: float = 2, max_backoff: float = 600, yield_ok: bool = True, ignore_status: Union[int, Collection[int]] = (), + retry_on_status: Union[int, Collection[int]] = (429,), *args: Any, **kwargs: Any, ) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]: @@ -191,7 +185,7 @@ async def async_streaming_bulk( entire input is consumed and sent. If you specify ``max_retries`` it will also retry any documents that were - rejected with a ``429`` status code. Use ``retry_for_status_callback`` to + rejected with a ``429`` status code. Use ``retry_on_status`` to configure which status codes will be retried. To do this it will wait (**by calling time.sleep which will block**) for ``initial_backoff`` seconds and then, every subsequent rejection for the same chunk, for double the time @@ -208,12 +202,11 @@ async def async_streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). - :arg retry_for_status_callback: callback executed on each item's status, - should return a True if the status require a retry and False if not. + :arg retry_on_status: HTTP status code that will trigger a retry. (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when - retry_for_status_callback (defaulting to ``429``) is received, - set to 0 (default) for no retries on retry_for_status_callback + retry_on_status (defaulting to ``429``) is received, + set to 0 (default) for no retries :arg initial_backoff: number of seconds we should wait before the first retry. Any subsequent retries will be powers of ``initial_backoff * 2**retry_number`` @@ -225,6 +218,9 @@ async def async_streaming_bulk( client = client.options() client._client_meta = (("h", "bp"),) + if isinstance(retry_on_status, int): + retry_on_status = (retry_on_status,) + async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: async for item in aiter(actions): yield expand_action_callback(item) @@ -277,10 +273,10 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: if not ok: action, info = info.popitem() # retry if retries enabled, we are not in the last attempt, - # and retry_for_status_callback is true (defaulting to 429) + # and status not in retry_on_status (defaulting to 429) if ( max_retries - and retry_for_status_callback(info["status"]) + and info["status"] in retry_on_status and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects strings so we need to @@ -293,11 +289,9 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: yield ok, info except ApiError as e: - # suppress any status which retry_for_status_callback is true (defaulting to 429) + # suppress any status in retry_on_status (429 by default) # since we will retry them - if attempt == max_retries or not retry_for_status_callback( - e.status_code - ): + if attempt == max_retries or e.status_code not in retry_on_status: raise else: if not to_retry: diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 5f6049e7e..a2e8771a5 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -359,12 +359,6 @@ def _process_bulk_chunk( yield from gen -def _retry_for_status(status: int) -> bool: - if status == 429: - return True - return False - - def streaming_bulk( client: Elasticsearch, actions: Iterable[_TYPE_BULK_ACTION], @@ -374,13 +368,13 @@ def streaming_bulk( expand_action_callback: Callable[ [_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY ] = expand_action, - retry_for_status_callback: Callable[[int], bool] = _retry_for_status, raise_on_exception: bool = True, max_retries: int = 0, initial_backoff: float = 2, max_backoff: float = 600, yield_ok: bool = True, ignore_status: Union[int, Collection[int]] = (), + retry_on_status: Union[int, Collection[int]] = (429,), span_name: str = "helpers.streaming_bulk", *args: Any, **kwargs: Any, @@ -393,7 +387,7 @@ def streaming_bulk( entire input is consumed and sent. If you specify ``max_retries`` it will also retry any documents that were - rejected with a ``429`` status code. Use ``retry_for_status_callback`` to + rejected with a ``429`` status code. Use ``retry_on_status`` to configure which status codes will be retried. To do this it will wait (**by calling time.sleep which will block**) for ``initial_backoff`` seconds and then, every subsequent rejection for the same chunk, for double the time @@ -410,12 +404,11 @@ def streaming_bulk( :arg expand_action_callback: callback executed on each action passed in, should return a tuple containing the action line and the data line (`None` if data line should be omitted). - :arg retry_for_status_callback: callback executed on each item's status, - should return a True if the status require a retry and False if not. + :arg retry_on_status: HTTP status code that will trigger a retry. (if `None` is specified only status 429 will retry). :arg max_retries: maximum number of times a document will be retried when - retry_for_status_callback (defaulting to ``429``) is received, - set to 0 (default) for no retries on retry_for_status_callback + retry_on_status (defaulting to ``429``) is received, + set to 0 (default) for no retries :arg initial_backoff: number of seconds we should wait before the first retry. Any subsequent retries will be powers of ``initial_backoff * 2**retry_number`` @@ -427,6 +420,9 @@ def streaming_bulk( client = client.options() client._client_meta = (("h", "bp"),) + if isinstance(retry_on_status, int): + retry_on_status = (retry_on_status,) + serializer = client.transport.serializers.get_serializer("application/json") bulk_data: List[ @@ -471,10 +467,10 @@ def streaming_bulk( if not ok: action, info = info.popitem() # retry if retries enabled, we are not in the last attempt, - # and retry_for_status_callback is true (defaulting to 429) + # and status not in retry_on_status (defaulting to 429) if ( max_retries - and retry_for_status_callback(info["status"]) + and info["status"] in retry_on_status and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects bytes so we need to @@ -487,11 +483,9 @@ def streaming_bulk( yield ok, info except ApiError as e: - # suppress any status which retry_for_status_callback is true (defaulting to 429) + # suppress any status in retry_on_status (429 by default) # since we will retry them - if attempt == max_retries or not retry_for_status_callback( - e.status_code - ): + if attempt == max_retries or e.status_code not in retry_on_status: raise else: if not to_retry: diff --git a/test_elasticsearch/test_async/test_server/test_helpers.py b/test_elasticsearch/test_async/test_server/test_helpers.py index 5c9646cf8..0bb781304 100644 --- a/test_elasticsearch/test_async/test_server/test_helpers.py +++ b/test_elasticsearch/test_async/test_server/test_helpers.py @@ -312,11 +312,6 @@ async def test_connection_timeout_is_retried_with_retry_status_callback( {"_index": "i", "_id": 42, "f": "v"}, ] - def _retry_for_connection_timeout(status): - if status == 522: - return True - return False - results = [ x async for x in helpers.async_streaming_bulk( @@ -325,7 +320,7 @@ def _retry_for_connection_timeout(status): raise_on_exception=False, raise_on_error=False, chunk_size=1, - retry_for_status_callback=_retry_for_connection_timeout, + retry_on_status=522, max_retries=1, initial_backoff=0, ) diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index d3dac2d2b..6ed43e2af 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -305,11 +305,6 @@ def test_connection_timeout_is_retried_with_retry_status_callback(sync_client): {"_index": "i", "_id": 42, "f": "v"}, ] - def _retry_for_connection_timeout(status): - if status == 522: - return True - return False - results = list( helpers.streaming_bulk( failing_client, @@ -318,7 +313,7 @@ def _retry_for_connection_timeout(status): raise_on_exception=False, raise_on_error=False, chunk_size=1, - retry_for_status_callback=_retry_for_connection_timeout, + retry_on_status=522, max_retries=1, initial_backoff=0, ) From 36dd1052ba012f6d1d06f92105c11417f9ee8bd5 Mon Sep 17 00:00:00 2001 From: Quentin Pradet Date: Tue, 12 Nov 2024 15:32:15 +0400 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Miguel Grinberg --- elasticsearch/_async/helpers.py | 4 ++-- elasticsearch/helpers/actions.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/elasticsearch/_async/helpers.py b/elasticsearch/_async/helpers.py index f1d7c8eb1..1bc339917 100644 --- a/elasticsearch/_async/helpers.py +++ b/elasticsearch/_async/helpers.py @@ -187,7 +187,7 @@ async def async_streaming_bulk( If you specify ``max_retries`` it will also retry any documents that were rejected with a ``429`` status code. Use ``retry_on_status`` to configure which status codes will be retried. To do this it will wait - (**by calling time.sleep which will block**) for ``initial_backoff`` seconds + (**by calling asyncio.sleep which will block**) for ``initial_backoff`` seconds and then, every subsequent rejection for the same chunk, for double the time every time up to ``max_backoff`` seconds. @@ -273,7 +273,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]: if not ok: action, info = info.popitem() # retry if retries enabled, we are not in the last attempt, - # and status not in retry_on_status (defaulting to 429) + # and status in retry_on_status (defaulting to 429) if ( max_retries and info["status"] in retry_on_status diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index a2e8771a5..687bf4b84 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -467,7 +467,7 @@ def streaming_bulk( if not ok: action, info = info.popitem() # retry if retries enabled, we are not in the last attempt, - # and status not in retry_on_status (defaulting to 429) + # and status in retry_on_status (defaulting to 429) if ( max_retries and info["status"] in retry_on_status