Skip to content

Commit 2950e20

Browse files
author
annie-mac
committed
merge from main and resolve conflicts
1 parent 682d021 commit 2950e20

27 files changed

+3177
-746
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

+2-17
Original file line numberDiff line numberDiff line change
@@ -284,23 +284,8 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
284284
if options.get("disableRUPerMinuteUsage"):
285285
headers[http_constants.HttpHeaders.DisableRUPerMinuteUsage] = options["disableRUPerMinuteUsage"]
286286

287-
if options.get("changeFeed") is True:
288-
# On REST level, change feed is using IfNoneMatch/ETag instead of continuation.
289-
if_none_match_value = None
290-
if options.get("continuation"):
291-
if_none_match_value = options["continuation"]
292-
elif options.get("isStartFromBeginning") and not options["isStartFromBeginning"]:
293-
if_none_match_value = "*"
294-
elif options.get("startTime"):
295-
start_time = options.get("startTime")
296-
headers[http_constants.HttpHeaders.IfModified_since] = start_time
297-
if if_none_match_value:
298-
headers[http_constants.HttpHeaders.IfNoneMatch] = if_none_match_value
299-
300-
headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue
301-
else:
302-
if options.get("continuation"):
303-
headers[http_constants.HttpHeaders.Continuation] = options["continuation"]
287+
if options.get("continuation"):
288+
headers[http_constants.HttpHeaders.Continuation] = options["continuation"]
304289

305290
if options.get("populatePartitionKeyRangeStatistics"):
306291
headers[http_constants.HttpHeaders.PopulatePartitionKeyRangeStatistics] = options[
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
"""Internal class for processing change feed implementation in the Azure Cosmos
23+
database service.
24+
"""
25+
import base64
26+
import copy
27+
import json
28+
from abc import ABC, abstractmethod
29+
30+
from azure.cosmos import http_constants, exceptions
31+
from azure.cosmos._change_feed.aio.change_feed_state import ChangeFeedStateV1, ChangeFeedStateV2
32+
from azure.cosmos.aio import _retry_utility_async
33+
from azure.cosmos.exceptions import CosmosHttpResponseError
34+
35+
36+
class ChangeFeedFetcher(ABC):
37+
38+
@abstractmethod
39+
async def fetch_next_block(self):
40+
pass
41+
42+
class ChangeFeedFetcherV1(ChangeFeedFetcher):
43+
"""Internal class for change feed fetch v1 implementation.
44+
This is used when partition key range id is used or when the supplied continuation token is in just simple etag.
45+
Please note v1 does not support split or merge.
46+
47+
"""
48+
def __init__(
49+
self,
50+
client,
51+
resource_link: str,
52+
feed_options: dict[str, any],
53+
fetch_function):
54+
55+
self._client = client
56+
self._feed_options = feed_options
57+
58+
self._change_feed_state = self._feed_options.pop("changeFeedState")
59+
if not isinstance(self._change_feed_state, ChangeFeedStateV1):
60+
raise ValueError(f"ChangeFeedFetcherV1 can not handle change feed state version {type(self._change_feed_state)}")
61+
self._change_feed_state.__class__ = ChangeFeedStateV1
62+
63+
self._resource_link = resource_link
64+
self._fetch_function = fetch_function
65+
66+
async def fetch_next_block(self):
67+
"""Returns a block of results.
68+
69+
:return: List of results.
70+
:rtype: list
71+
"""
72+
async def callback():
73+
return await self.fetch_change_feed_items(self._fetch_function)
74+
75+
return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)
76+
77+
async def fetch_change_feed_items(self, fetch_function) -> list[dict[str, any]]:
78+
new_options = copy.deepcopy(self._feed_options)
79+
new_options["changeFeedState"] = self._change_feed_state
80+
81+
self._change_feed_state.populate_feed_options(new_options)
82+
is_s_time_first_fetch = True
83+
while True:
84+
(fetched_items, response_headers) = await fetch_function(new_options)
85+
continuation_key = http_constants.HttpHeaders.ETag
86+
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
87+
# there is any items in the response or not.
88+
# For start time however we get no initial results, so we need to pass continuation token? Is this true?
89+
self._change_feed_state.apply_server_response_continuation(
90+
response_headers.get(continuation_key))
91+
92+
if fetched_items:
93+
break
94+
elif is_s_time_first_fetch:
95+
is_s_time_first_fetch = False
96+
else:
97+
break
98+
return fetched_items
99+
100+
101+
class ChangeFeedFetcherV2(object):
102+
"""Internal class for change feed fetch v2 implementation.
103+
"""
104+
105+
def __init__(
106+
self,
107+
client,
108+
resource_link: str,
109+
feed_options: dict[str, any],
110+
fetch_function):
111+
112+
self._client = client
113+
self._feed_options = feed_options
114+
115+
self._change_feed_state = self._feed_options.pop("changeFeedState")
116+
if not isinstance(self._change_feed_state, ChangeFeedStateV2):
117+
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version {type(self._change_feed_state)}")
118+
self._change_feed_state.__class__ = ChangeFeedStateV2
119+
120+
self._resource_link = resource_link
121+
self._fetch_function = fetch_function
122+
123+
async def fetch_next_block(self):
124+
"""Returns a block of results.
125+
126+
:return: List of results.
127+
:rtype: list
128+
"""
129+
130+
async def callback():
131+
return await self.fetch_change_feed_items(self._fetch_function)
132+
133+
try:
134+
return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)
135+
except CosmosHttpResponseError as e:
136+
if exceptions._partition_range_is_gone(e) or exceptions._is_partition_split_or_merge(e):
137+
# refresh change feed state
138+
await self._change_feed_state.handle_feed_range_gone(self._client._routing_map_provider, self._resource_link)
139+
else:
140+
raise e
141+
142+
return await self.fetch_next_block()
143+
144+
async def fetch_change_feed_items(self, fetch_function) -> list[dict[str, any]]:
145+
new_options = copy.deepcopy(self._feed_options)
146+
new_options["changeFeedState"] = self._change_feed_state
147+
148+
self._change_feed_state.populate_feed_options(new_options)
149+
150+
is_s_time_first_fetch = True
151+
while True:
152+
(fetched_items, response_headers) = await fetch_function(new_options)
153+
154+
continuation_key = http_constants.HttpHeaders.ETag
155+
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
156+
# there is any items in the response or not.
157+
# For start time however we get no initial results, so we need to pass continuation token? Is this true?
158+
if fetched_items:
159+
self._change_feed_state.apply_server_response_continuation(
160+
response_headers.get(continuation_key))
161+
response_headers[continuation_key] = self._get_base64_encoded_continuation()
162+
break
163+
else:
164+
self._change_feed_state.apply_not_modified_response()
165+
self._change_feed_state.apply_server_response_continuation(
166+
response_headers.get(continuation_key))
167+
response_headers[continuation_key] = self._get_base64_encoded_continuation()
168+
should_retry = self._change_feed_state.should_retry_on_not_modified_response() or is_s_time_first_fetch
169+
is_s_time_first_fetch = False
170+
if not should_retry:
171+
break
172+
173+
return fetched_items
174+
175+
def _get_base64_encoded_continuation(self) -> str:
176+
continuation_json = json.dumps(self._change_feed_state.to_dict())
177+
json_bytes = continuation_json.encode('utf-8')
178+
# Encode the bytes to a Base64 string
179+
base64_bytes = base64.b64encode(json_bytes)
180+
# Convert the Base64 bytes to a string
181+
return base64_bytes.decode('utf-8')
182+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2014 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
"""Iterable change feed results in the Azure Cosmos database service.
23+
"""
24+
from azure.core.async_paging import AsyncPageIterator
25+
26+
from azure.cosmos._change_feed.aio.change_feed_fetcher import ChangeFeedFetcherV1, ChangeFeedFetcherV2
27+
from azure.cosmos._change_feed.aio.change_feed_state import ChangeFeedStateV1, ChangeFeedState
28+
from azure.cosmos._utils import is_base64_encoded
29+
30+
31+
class ChangeFeedIterable(AsyncPageIterator):
32+
"""Represents an iterable object of the change feed results.
33+
34+
ChangeFeedIterable is a wrapper for change feed execution.
35+
"""
36+
37+
def __init__(
38+
self,
39+
client,
40+
options,
41+
fetch_function=None,
42+
collection_link=None,
43+
continuation_token=None,
44+
):
45+
"""Instantiates a ChangeFeedIterable for non-client side partitioning queries.
46+
47+
ChangeFeedFetcher will be used as the internal query execution
48+
context.
49+
50+
:param CosmosClient client: Instance of document client.
51+
:param dict options: The request options for the request.
52+
:param method fetch_function:
53+
54+
"""
55+
self._client = client
56+
self.retry_options = client.connection_policy.RetryOptions
57+
self._options = options
58+
self._fetch_function = fetch_function
59+
self._collection_link = collection_link
60+
61+
change_feed_state = self._options.get("changeFeedState")
62+
if not change_feed_state:
63+
raise ValueError("Missing changeFeedState in feed options")
64+
65+
if isinstance(change_feed_state, ChangeFeedStateV1):
66+
if continuation_token:
67+
if is_base64_encoded(continuation_token):
68+
raise ValueError("Incompatible continuation token")
69+
else:
70+
change_feed_state.apply_server_response_continuation(continuation_token)
71+
72+
self._change_feed_fetcher = ChangeFeedFetcherV1(
73+
self._client,
74+
self._collection_link,
75+
self._options,
76+
fetch_function
77+
)
78+
else:
79+
if continuation_token:
80+
if not is_base64_encoded(continuation_token):
81+
raise ValueError("Incompatible continuation token")
82+
83+
effective_change_feed_context = {"continuationFeedRange": continuation_token}
84+
effective_change_feed_state = ChangeFeedState.from_json(change_feed_state.container_rid, effective_change_feed_context)
85+
# replace with the effective change feed state
86+
self._options["continuationFeedRange"] = effective_change_feed_state
87+
88+
self._change_feed_fetcher = ChangeFeedFetcherV2(
89+
self._client,
90+
self._collection_link,
91+
self._options,
92+
fetch_function
93+
)
94+
super(ChangeFeedIterable, self).__init__(self._fetch_next, self._unpack, continuation_token=continuation_token)
95+
96+
async def _unpack(self, block):
97+
continuation = None
98+
if self._client.last_response_headers:
99+
continuation = self._client.last_response_headers.get('etag')
100+
101+
if block:
102+
self._did_a_call_already = False
103+
return continuation, block
104+
105+
async def _fetch_next(self, *args): # pylint: disable=unused-argument
106+
"""Return a block of results with respecting retry policy.
107+
108+
This method only exists for backward compatibility reasons. (Because
109+
QueryIterable has exposed fetch_next_block api).
110+
111+
:param Any args:
112+
:return: List of results.
113+
:rtype: list
114+
"""
115+
block = await self._change_feed_fetcher.fetch_next_block()
116+
if not block:
117+
raise StopAsyncIteration
118+
return block

0 commit comments

Comments
 (0)