Skip to content

Commit 18981ce

Browse files
simorenohannatischgahl-levytjprescott
authored
[Cosmos] split proof queries sync client (#22237)
* initial commit * Client Constructor (#20310) * Removed some stuff * Looking at constructors * Updated request * Added client close * working client creation Co-authored-by: simorenoh <[email protected]> * read database database read works, but ignored exception is returned: Fatal error on SSL transport NoneType has no attribute 'send' (_loop._proactor.send) RuntimeError: Event loop is closed Unclosed connector/ connection * Update simon_testfile.py * with coroutine Added methods needed to use async with when initializing client, but logs output "Exception ignored... Runtime Error: Event loop is closed" * Update simon_testfile.py * small changes * async with returns no exceptions * async read container * async item read * cleaning up * create item/ database methods * item delete working * docs replace functionality missing upsert and other resources * upsert functionality missing read_all_items and both query methods for container class * missing query methods * CRUD for udf, sproc, triggers * initial query logic + container methods * missing some execution logic and tests * oops * fully working queries * small fix to query_items() also fixed README and added examples_async * Update _cosmos_client_connection_async.py * Update _cosmos_client_connection.py * documentation update * updated MIT dates and get_user_client() description * Update CHANGELOG.md * Delete simon_testfile.py * leftover retry utility * Update README.md * docs and removed six package * changes based on comments still missing discussion resolution on SSL verification and tests for async functionality under test module (apart from samples which are basically end to end tests) * small change in type hints * updated readme * fixes based on conversations * added missing type comments * update changelog for ci pipeline * added typehints, moved params into keywords, added decorators, made _connection_policy private * changes based on sync with central sdk * remove is_system_key from scripts (only used in execute_sproc) is_system_key verifies that an empty partition key is properly dealt with if ['partitionKey']['systemKey'] exists in the container options - however, we do not allow containers to be created with empty partition key values in the python sdk, so the functionality is needless * Revert "remove is_system_key from scripts (only used in execute_sproc)" Reverting last commit, will find way to init is_system_key for now * async script proxy using composition * pylint * capitalized constants * Apply suggestions from code review Clarifying comments for README Co-authored-by: Gahl Levy <[email protected]> * closing python code snippet * last doc updates * Update sdk/cosmos/azure-cosmos/CHANGELOG.md Co-authored-by: Simon Moreno <[email protected]> * version update * cosmos updates for release * current state gone_retry_policy might end up being unneccesary, based on what we feel is best from an arch standpoint * working split proof, need to remove prints * improving comments and removing print statements * removed last prints and used constants * Update CHANGELOG.md * small fixes based on comments * addressed more comments * added test, made slight changes * rename test and small changes * pylint * pylintpylintpylint * moved partition_range_gone check to exceptions since makes more sense * re use code Co-authored-by: annatisch <[email protected]> Co-authored-by: Gahl Levy <[email protected]> Co-authored-by: Travis Prescott <[email protected]>
1 parent f94809d commit 18981ce

10 files changed

+246
-7
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
This version and all future versions will require Python 3.6+. Python 2.7 is no longer supported.
66
We will also be removing support for Python 3.6 and will only support Python 3.7+ starting December 2022.
77

8+
#### Features Added
9+
- Added support for split-proof queries for the sync client
10+
811
#### Other Changes
912
- Added async user agent for async client
1013

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2556,6 +2556,11 @@ def _retrieve_partition_key(self, partition_key_parts, document, is_system_key):
25562556

25572557
return partitionKey
25582558

2559+
def refresh_routing_map_provider(self):
2560+
# re-initializes the routing map provider, effectively refreshing the current partition key range cache
2561+
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
2562+
2563+
25592564
def _UpdateSessionIfRequired(self, request_headers, response_result, response_headers):
25602565
"""
25612566
Updates session if necessary.

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@
2525

2626
from collections import deque
2727
import copy
28-
from .. import _retry_utility
29-
from .. import http_constants
28+
from .. import _retry_utility, http_constants
3029

3130
# pylint: disable=protected-access
3231

@@ -120,6 +119,7 @@ def _fetch_items_helper_no_retries(self, fetch_function):
120119
self._has_started = True
121120
new_options = copy.deepcopy(self._options)
122121
new_options["continuation"] = self._continuation
122+
123123
(fetched_items, response_headers) = fetch_function(new_options)
124124
continuation_key = http_constants.HttpHeaders.Continuation
125125
# Use Etag as continuation token for change feed queries.

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
2727
from azure.cosmos._execution_context import document_producer
2828
from azure.cosmos._routing import routing_range
29+
from azure.cosmos import exceptions
2930

3031
# pylint: disable=protected-access
3132

@@ -78,8 +79,8 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
7879
else:
7980
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProduerComparator()
8081

81-
# will be a list of (parition_min, partition_max) tuples
82-
targetPartitionRanges = self._get_target_parition_key_range()
82+
# will be a list of (partition_min, partition_max) tuples
83+
targetPartitionRanges = self._get_target_partition_key_range()
8384

8485
targetPartitionQueryExecutionContextList = []
8586
for partitionTargetRange in targetPartitionRanges:
@@ -91,14 +92,20 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
9192
self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue()
9293

9394
for targetQueryExContext in targetPartitionQueryExecutionContextList:
94-
9595
try:
9696
# TODO: we can also use more_itertools.peekable to be more python friendly
9797
targetQueryExContext.peek()
9898
# if there are matching results in the target ex range add it to the priority queue
9999

100100
self._orderByPQ.push(targetQueryExContext)
101101

102+
except exceptions.CosmosHttpResponseError as e:
103+
if exceptions.partition_range_is_gone(e):
104+
# repairing document producer context on partition split
105+
self._repair_document_producer()
106+
else:
107+
raise
108+
102109
except StopIteration:
103110
continue
104111

@@ -129,6 +136,36 @@ def fetch_next_block(self):
129136

130137
raise NotImplementedError("You should use pipeline's fetch_next_block.")
131138

139+
def _repair_document_producer(self):
140+
"""Repairs the document producer context by using the re-initialized routing map provider in the client,
141+
which loads in a refreshed partition key range cache to re-create the partition key ranges.
142+
After loading this new cache, the document producers get re-created with the new valid ranges.
143+
"""
144+
# refresh the routing provider to get the newly initialized one post-refresh
145+
self._routing_provider = self._client._routing_map_provider
146+
# will be a list of (partition_min, partition_max) tuples
147+
targetPartitionRanges = self._get_target_partition_key_range()
148+
149+
targetPartitionQueryExecutionContextList = []
150+
for partitionTargetRange in targetPartitionRanges:
151+
# create and add the child execution context for the target range
152+
targetPartitionQueryExecutionContextList.append(
153+
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
154+
)
155+
156+
self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue()
157+
158+
for targetQueryExContext in targetPartitionQueryExecutionContextList:
159+
try:
160+
# TODO: we can also use more_itertools.peekable to be more python friendly
161+
targetQueryExContext.peek()
162+
# if there are matching results in the target ex range add it to the priority queue
163+
164+
self._orderByPQ.push(targetQueryExContext)
165+
166+
except StopIteration:
167+
continue
168+
132169
def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range):
133170

134171
rewritten_query = self._partitioned_query_ex_info.get_rewritten_query()
@@ -151,7 +188,7 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range
151188
self._options,
152189
)
153190

154-
def _get_target_parition_key_range(self):
191+
def _get_target_partition_key_range(self):
155192

156193
query_ranges = self._partitioned_query_ex_info.get_query_ranges()
157194
return self._routing_provider.get_overlapping_ranges(
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2021 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
"""Internal class for connection reset retry policy implementation in the Azure
23+
Cosmos database service.
24+
"""
25+
26+
27+
# pylint: disable=protected-access
28+
29+
30+
class PartitionKeyRangeGoneRetryPolicy(object):
31+
32+
def __init__(self, client, *args):
33+
self.retry_after_in_milliseconds = 1000
34+
self.refresh_partition_key_range_cache = True
35+
self.args = args
36+
self.client = client
37+
self.exception = None
38+
39+
def ShouldRetry(self, exception):
40+
"""Returns true if should retry based on the passed-in exception.
41+
42+
:param (exceptions.CosmosHttpResponseError instance) exception:
43+
:rtype: boolean
44+
45+
"""
46+
self.exception = exception # needed for pylint
47+
if self.refresh_partition_key_range_cache:
48+
# refresh routing_map_provider to refresh partition key range cache
49+
# make refresh_partition_key_range_cache False to skip this check on subsequent Gone exceptions
50+
self.client.refresh_routing_map_provider()
51+
self.refresh_partition_key_range_cache = False
52+
# return False to raise error to multi_execution_aggregator and repair document producer context
53+
return False

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
from . import _resource_throttle_retry_policy
3333
from . import _default_retry_policy
3434
from . import _session_retry_policy
35+
from . import _gone_retry_policy
3536
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes
3637

38+
3739
# pylint: disable=protected-access
3840

3941

@@ -65,6 +67,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
6567
sessionRetry_policy = _session_retry_policy._SessionRetryPolicy(
6668
client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, *args
6769
)
70+
partition_key_range_gone_retry_policy = _gone_retry_policy.PartitionKeyRangeGoneRetryPolicy(client, *args)
71+
6872
while True:
6973
try:
7074
client_timeout = kwargs.get('timeout')
@@ -97,6 +101,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
97101
and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
98102
):
99103
retry_policy = sessionRetry_policy
104+
elif exceptions.partition_range_is_gone(e):
105+
retry_policy = partition_key_range_gone_retry_policy
100106
else:
101107
retry_policy = defaultRetry_policy
102108

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def get_overlapping_ranges(self, collection_link, partition_key_ranges):
5555
overlapping partition key ranges.
5656
5757
:param str collection_link: The name of the collection.
58-
:param list partition_key_range: List of partition key range.
58+
:param list partition_key_ranges: List of partition key range.
5959
:return: List of overlapping partition key ranges.
6060
:rtype: list
6161
"""

sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,10 @@ def __init__(self, **kwargs):
7373
self.response = None
7474
self.history = None
7575
super(CosmosClientTimeoutError, self).__init__(message, **kwargs)
76+
77+
78+
def partition_range_is_gone(e):
79+
if (e.status_code == http_constants.StatusCodes.GONE
80+
and e.sub_status == http_constants.SubStatusCodes.PARTITION_KEY_RANGE_GONE):
81+
return True
82+
return False

sdk/cosmos/azure-cosmos/test/test_config.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,16 @@ def create_database_if_not_exist(cls, client):
8282
cls.IS_MULTIMASTER_ENABLED = client.get_database_account()._EnableMultipleWritableLocations
8383
return cls.TEST_DATABASE
8484

85+
@classmethod
86+
def create_database_if_not_exist_with_throughput(cls, client, throughput):
87+
# type: (CosmosClient) -> Database
88+
if cls.TEST_DATABASE is not None:
89+
return cls.TEST_DATABASE
90+
cls.try_delete_database(client)
91+
cls.TEST_DATABASE = client.create_database(id=cls.TEST_DATABASE_ID, offer_throughput=throughput)
92+
cls.IS_MULTIMASTER_ENABLED = client.get_database_account()._EnableMultipleWritableLocations
93+
return cls.TEST_DATABASE
94+
8595
@classmethod
8696
def try_delete_database(cls, client):
8797
# type: (CosmosClient) -> None
@@ -119,6 +129,17 @@ def create_multi_partition_collection_with_custom_pk_if_not_exist(cls, client):
119129
cls.remove_all_documents(cls.TEST_COLLECTION_MULTI_PARTITION_WITH_CUSTOM_PK, True)
120130
return cls.TEST_COLLECTION_MULTI_PARTITION_WITH_CUSTOM_PK
121131

132+
@classmethod
133+
def create_collection_no_custom_throughput(cls, client):
134+
# type: (CosmosClient) -> Container
135+
database = cls.create_database_if_not_exist(client)
136+
collection_id = cls.TEST_COLLECTION_SINGLE_PARTITION_ID
137+
138+
document_collection = database.create_container(
139+
id=collection_id,
140+
partition_key=PartitionKey(path="/id"))
141+
return document_collection
142+
122143
@classmethod
123144
def create_collection_with_required_throughput(cls, client, throughput, use_custom_partition_key):
124145
# type: (CosmosClient, int, boolean) -> Container
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2021 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
import unittest
23+
24+
import azure.cosmos.cosmos_client as cosmos_client
25+
import pytest
26+
import time
27+
import random
28+
import uuid
29+
import test_config
30+
31+
# This test class serves to test partition splits within the query context
32+
33+
pytestmark = pytest.mark.cosmosEmulator
34+
35+
36+
@pytest.mark.usefixtures("teardown")
37+
class TestPartitionSplitQuery(unittest.TestCase):
38+
configs = test_config._test_config
39+
host = configs.host
40+
masterKey = configs.masterKey
41+
throughput = 400
42+
43+
@classmethod
44+
def setUpClass(cls):
45+
cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey)
46+
cls.database = test_config._test_config.create_database_if_not_exist_with_throughput(cls.client, cls.throughput)
47+
cls.container = test_config._test_config.create_collection_no_custom_throughput(cls.client)
48+
49+
def test_partition_split_query(self):
50+
for i in range(1000):
51+
body = self.get_test_item()
52+
self.container.create_item(body=body)
53+
54+
print("created items, changing offer to 11k and starting queries")
55+
self.database.replace_throughput(11000)
56+
offer_time = time.time()
57+
print("changed offer to 11k")
58+
print("--------------------------------")
59+
print("now starting queries")
60+
61+
self.run_queries(self.container, 1000) # initial check for queries before partition split
62+
print("initial check succeeded, now reading offer until replacing is done")
63+
offer = self.database.read_offer()
64+
while True:
65+
if offer.properties['content'].get('isOfferReplacePending', False):
66+
time.sleep(10)
67+
offer = self.database.read_offer()
68+
else:
69+
print("offer replaced successfully, took around {} seconds".format(time.time() - offer_time))
70+
self.run_queries(self.container, 1000) # check queries work post partition split
71+
print("test over")
72+
self.assertTrue(offer.offer_throughput > self.throughput)
73+
self.client.delete_database(self.configs.TEST_DATABASE_ID)
74+
return
75+
76+
def run_queries(self, container, iterations):
77+
ret_list = list()
78+
for i in range(iterations):
79+
curr = str(random.randint(0, 10))
80+
query = 'SELECT * FROM c WHERE c.attr1=' + curr + ' order by c.attr1'
81+
qlist = list(container.query_items(query=query, enable_cross_partition_query=True))
82+
ret_list.append((curr, qlist))
83+
for ret in ret_list:
84+
curr = ret[0]
85+
if len(ret[1]) != 0:
86+
for results in ret[1]:
87+
attr_number = results['attr1']
88+
assert str(attr_number) == curr # verify that all results match their randomly generated attributes
89+
print("validation succeeded for all query results")
90+
91+
def get_test_item(self):
92+
async_item = {
93+
'id': 'Async_' + str(uuid.uuid4()),
94+
'address': {
95+
'state': 'WA',
96+
'city': 'Redmond',
97+
'street': '1 Microsoft Way'
98+
},
99+
'test_object': True,
100+
'lastName': 'Smith',
101+
'attr1': random.randint(0, 10)
102+
}
103+
return async_item
104+
105+
106+
if __name__ == "__main__":
107+
unittest.main()

0 commit comments

Comments
 (0)