Skip to content

Commit 1cc9145

Browse files
xinlian12annie-mac
authored andcommitted
addFeedRangesAndUseFeedRangeInQueryChangeFeed (Azure#37687)
* Add getFeedRanges API * Add feedRange support in query changeFeed Co-authored-by: annie-mac <[email protected]>
1 parent f737640 commit 1cc9145

36 files changed

+3230
-793
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#### Features Added
66
* Added Retry Policy for Container Recreate in the Python SDK. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
77
* Added option to disable write payload on writes. See [PR 37365](https://github.com/Azure/azure-sdk-for-python/pull/37365)
8+
* Added get feed ranges API. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
9+
* Added feed range support in `query_items_change_feed`. See [PR 37687](https://github.com/Azure/azure-sdk-for-python/pull/37687)
810

911
#### Breaking Changes
1012

sdk/cosmos/azure-cosmos/azure/cosmos/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
)
4343
from .partition_key import PartitionKey
4444
from .permission import Permission
45+
from ._feed_range import FeedRange
4546

4647
__all__ = (
4748
"CosmosClient",
@@ -64,5 +65,6 @@
6465
"TriggerType",
6566
"ConnectionRetryPolicy",
6667
"ThroughputProperties",
68+
"FeedRange"
6769
)
6870
__version__ = VERSION

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

+2-17
Original file line numberDiff line numberDiff line change
@@ -284,23 +284,8 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
284284
if options.get("disableRUPerMinuteUsage"):
285285
headers[http_constants.HttpHeaders.DisableRUPerMinuteUsage] = options["disableRUPerMinuteUsage"]
286286

287-
if options.get("changeFeed") is True:
288-
# On REST level, change feed is using IfNoneMatch/ETag instead of continuation.
289-
if_none_match_value = None
290-
if options.get("continuation"):
291-
if_none_match_value = options["continuation"]
292-
elif options.get("isStartFromBeginning") and not options["isStartFromBeginning"]:
293-
if_none_match_value = "*"
294-
elif options.get("startTime"):
295-
start_time = options.get("startTime")
296-
headers[http_constants.HttpHeaders.IfModified_since] = start_time
297-
if if_none_match_value:
298-
headers[http_constants.HttpHeaders.IfNoneMatch] = if_none_match_value
299-
300-
headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue
301-
else:
302-
if options.get("continuation"):
303-
headers[http_constants.HttpHeaders.Continuation] = options["continuation"]
287+
if options.get("continuation"):
288+
headers[http_constants.HttpHeaders.Continuation] = options["continuation"]
304289

305290
if options.get("populatePartitionKeyRangeStatistics"):
306291
headers[http_constants.HttpHeaders.PopulatePartitionKeyRangeStatistics] = options[
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
"""Internal class for processing change feed implementation in the Azure Cosmos
23+
database service.
24+
"""
25+
import base64
26+
import json
27+
from abc import ABC, abstractmethod
28+
from typing import Dict, Any, List, Callable, Tuple, Awaitable, cast
29+
30+
from azure.cosmos import http_constants, exceptions
31+
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromType
32+
from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV2, ChangeFeedStateVersion
33+
from azure.cosmos.aio import _retry_utility_async
34+
from azure.cosmos.exceptions import CosmosHttpResponseError
35+
36+
# pylint: disable=protected-access
37+
38+
class ChangeFeedFetcher(ABC):
39+
40+
@abstractmethod
41+
async def fetch_next_block(self) -> List[Dict[str, Any]]:
42+
pass
43+
44+
class ChangeFeedFetcherV1(ChangeFeedFetcher):
45+
"""Internal class for change feed fetch v1 implementation.
46+
This is used when partition key range id is used or when the supplied continuation token is in just simple etag.
47+
Please note v1 does not support split or merge.
48+
49+
"""
50+
def __init__(
51+
self,
52+
client,
53+
resource_link: str,
54+
feed_options: Dict[str, Any],
55+
fetch_function: Callable[[Dict[str, Any]], Awaitable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]
56+
) -> None:
57+
58+
self._client = client
59+
self._feed_options = feed_options
60+
61+
self._change_feed_state = self._feed_options.pop("changeFeedState")
62+
if self._change_feed_state.version != ChangeFeedStateVersion.V1:
63+
raise ValueError(f"ChangeFeedFetcherV1 can not handle change feed state version"
64+
f" {type(self._change_feed_state)}")
65+
66+
self._resource_link = resource_link
67+
self._fetch_function = fetch_function
68+
69+
async def fetch_next_block(self) -> List[Dict[str, Any]]:
70+
"""Returns a block of results.
71+
72+
:return: List of results.
73+
:rtype: list
74+
"""
75+
async def callback():
76+
return await self.fetch_change_feed_items()
77+
78+
return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)
79+
80+
async def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
81+
self._feed_options["changeFeedState"] = self._change_feed_state
82+
83+
self._change_feed_state.populate_feed_options(self._feed_options)
84+
is_s_time_first_fetch = self._change_feed_state._continuation is None
85+
while True:
86+
(fetched_items, response_headers) = await self._fetch_function(self._feed_options)
87+
continuation_key = http_constants.HttpHeaders.ETag
88+
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
89+
# there is any items in the response or not.
90+
self._change_feed_state.apply_server_response_continuation(
91+
cast(str, response_headers.get(continuation_key)),
92+
bool(fetched_items))
93+
94+
if fetched_items:
95+
break
96+
97+
# When processing from point in time, there will be no initial results being returned,
98+
# so we will retry with the new continuation token again
99+
if (self._change_feed_state._change_feed_start_from.version == ChangeFeedStartFromType.POINT_IN_TIME
100+
and is_s_time_first_fetch):
101+
is_s_time_first_fetch = False
102+
else:
103+
break
104+
return fetched_items
105+
106+
107+
class ChangeFeedFetcherV2(object):
108+
"""Internal class for change feed fetch v2 implementation.
109+
"""
110+
111+
def __init__(
112+
self,
113+
client,
114+
resource_link: str,
115+
feed_options: Dict[str, Any],
116+
fetch_function: Callable[[Dict[str, Any]], Awaitable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]]
117+
) -> None:
118+
119+
self._client = client
120+
self._feed_options = feed_options
121+
122+
self._change_feed_state: ChangeFeedStateV2 = self._feed_options.pop("changeFeedState")
123+
if self._change_feed_state.version != ChangeFeedStateVersion.V2:
124+
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version "
125+
f"{type(self._change_feed_state.version)}")
126+
127+
self._resource_link = resource_link
128+
self._fetch_function = fetch_function
129+
130+
async def fetch_next_block(self) -> List[Dict[str, Any]]:
131+
"""Returns a block of results.
132+
133+
:return: List of results.
134+
:rtype: list
135+
"""
136+
137+
async def callback():
138+
return await self.fetch_change_feed_items()
139+
140+
try:
141+
return await _retry_utility_async.ExecuteAsync(
142+
self._client,
143+
self._client._global_endpoint_manager,
144+
callback)
145+
except CosmosHttpResponseError as e:
146+
if exceptions._partition_range_is_gone(e) or exceptions._is_partition_split_or_merge(e):
147+
# refresh change feed state
148+
await self._change_feed_state.handle_feed_range_gone_async(
149+
self._client._routing_map_provider,
150+
self._resource_link)
151+
else:
152+
raise e
153+
154+
return await self.fetch_next_block()
155+
156+
async def fetch_change_feed_items(self) -> List[Dict[str, Any]]:
157+
self._feed_options["changeFeedState"] = self._change_feed_state
158+
159+
self._change_feed_state.populate_feed_options(self._feed_options)
160+
161+
is_s_time_first_fetch = True
162+
while True:
163+
(fetched_items, response_headers) = await self._fetch_function(self._feed_options)
164+
165+
continuation_key = http_constants.HttpHeaders.ETag
166+
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
167+
# there is any items in the response or not.
168+
169+
self._change_feed_state.apply_server_response_continuation(
170+
cast(str, response_headers.get(continuation_key)),
171+
bool(fetched_items))
172+
173+
if fetched_items:
174+
self._change_feed_state._continuation._move_to_next_token()
175+
response_headers[continuation_key] = self._get_base64_encoded_continuation()
176+
break
177+
178+
# when there is no items being returned, we will decide to retry based on:
179+
# 1. When processing from point in time, there will be no initial results being returned,
180+
# so we will retry with the new continuation token
181+
# 2. if the feed range of the changeFeedState span multiple physical partitions
182+
# then we will read from the next feed range until we have looped through all physical partitions
183+
if (self._change_feed_state._change_feed_start_from.version == ChangeFeedStartFromType.POINT_IN_TIME
184+
and is_s_time_first_fetch):
185+
response_headers[continuation_key] = self._get_base64_encoded_continuation()
186+
is_s_time_first_fetch = False
187+
should_retry = True
188+
else:
189+
self._change_feed_state._continuation._move_to_next_token()
190+
response_headers[continuation_key] = self._get_base64_encoded_continuation()
191+
should_retry = self._change_feed_state.should_retry_on_not_modified_response()
192+
is_s_time_first_fetch = False
193+
194+
if not should_retry:
195+
break
196+
197+
return fetched_items
198+
199+
def _get_base64_encoded_continuation(self) -> str:
200+
continuation_json = json.dumps(self._change_feed_state.to_dict())
201+
json_bytes = continuation_json.encode('utf-8')
202+
# Encode the bytes to a Base64 string
203+
base64_bytes = base64.b64encode(json_bytes)
204+
# Convert the Base64 bytes to a string
205+
return base64_bytes.decode('utf-8')

0 commit comments

Comments
 (0)