Skip to content

Commit b7f6afc

Browse files
simorenohannatischgahl-levytjprescott
authored
[Cosmos] split proof queries async client (#22261)
* initial commit * Client Constructor (#20310) * Removed some stuff * Looking at constructors * Updated request * Added client close * working client creation Co-authored-by: simorenoh <[email protected]> * read database database read works, but ignored exception is returned: Fatal error on SSL transport NoneType has no attribute 'send' (_loop._proactor.send) RuntimeError: Event loop is closed Unclosed connector/ connection * Update simon_testfile.py * with coroutine Added methods needed to use async with when initializing client, but logs output "Exception ignored... Runtime Error: Event loop is closed" * Update simon_testfile.py * small changes * async with returns no exceptions * async read container * async item read * cleaning up * create item/ database methods * item delete working * docs replace functionality missing upsert and other resources * upsert functionality missing read_all_items and both query methods for container class * missing query methods * CRUD for udf, sproc, triggers * initial query logic + container methods * missing some execution logic and tests * oops * fully working queries * small fix to query_items() also fixed README and added examples_async * Update _cosmos_client_connection_async.py * Update _cosmos_client_connection.py * documentation update * updated MIT dates and get_user_client() description * Update CHANGELOG.md * Delete simon_testfile.py * leftover retry utility * Update README.md * docs and removed six package * changes based on comments still missing discussion resolution on SSL verification and tests for async functionality under test module (apart from samples which are basically end to end tests) * small change in type hints * updated readme * fixes based on conversations * added missing type comments * update changelog for ci pipeline * added typehints, moved params into keywords, added decorators, made _connection_policy private * changes based on sync with central sdk * remove is_system_key from scripts (only used in execute_sproc) is_system_key verifies that an empty partition key is properly dealt with if ['partitionKey']['systemKey'] exists in the container options - however, we do not allow containers to be created with empty partition key values in the python sdk, so the functionality is needless * Revert "remove is_system_key from scripts (only used in execute_sproc)" Reverting last commit, will find way to init is_system_key for now * async script proxy using composition * pylint * capitalized constants * Apply suggestions from code review Clarifying comments for README Co-authored-by: Gahl Levy <[email protected]> * closing python code snippet * last doc updates * Update sdk/cosmos/azure-cosmos/CHANGELOG.md Co-authored-by: Simon Moreno <[email protected]> * version update * cosmos updates for release * working split proof for async client, need to remove prints() and make better comments * remove print statements and improve comments * Update CHANGELOG.md * pylint * address Annie's comments on sync split proof * parity with sync client * async comparing of document producers * removed unneeded logic/imports, made compares async for pylint attempt * spelling mistake and making private * making sync client private too * Update CHANGELOG.md * Delete test_axq.py Co-authored-by: annatisch <[email protected]> Co-authored-by: Gahl Levy <[email protected]> Co-authored-by: Travis Prescott <[email protected]>
1 parent ead9b83 commit b7f6afc

13 files changed

+155
-37
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
### 4.3.0b3 (Unreleased)
44

5+
#### Features Added
6+
- Added support for split-proof queries for the async client
7+
58
### Bugs fixed
69
- Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the
710
consistency level of the user's cosmos account setting on initialization if not passed during client initialization.
8-
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`. Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
11+
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`.
12+
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
913

1014
### 4.3.0b2 (2022-01-25)
1115

@@ -19,7 +23,8 @@ We will also be removing support for Python 3.6 and will only support Python 3.7
1923
- Added async user agent for async client
2024

2125
### 4.3.0b1 (2021-12-14)
22-
**New features**
26+
27+
#### Features Added
2328
- Added language native async i/o client
2429

2530
### 4.2.0 (2020-10-08)
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# The MIT License (MIT)
2+
# Copyright (c) 2022 Microsoft Corporation
3+
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
11+
# The above copyright notice and this permission notice shall be included in all
12+
# copies or substantial portions of the Software.
13+
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20+
# SOFTWARE.
21+
22+
async def heap_push(heap, item, document_producer_comparator):
23+
"""Push item onto heap, maintaining the heap invariant."""
24+
heap.append(item)
25+
await _sift_down(heap, document_producer_comparator, 0, len(heap) - 1)
26+
27+
28+
async def heap_pop(heap, document_producer_comparator):
29+
"""Pop the smallest item off the heap, maintaining the heap invariant."""
30+
last_elt = heap.pop() # raises appropriate IndexError if heap is empty
31+
if heap:
32+
return_item = heap[0]
33+
heap[0] = last_elt
34+
await _sift_up(heap, document_producer_comparator, 0)
35+
return return_item
36+
return last_elt
37+
38+
39+
async def _sift_down(heap, document_producer_comparator, start_pos, pos):
40+
new_item = heap[pos]
41+
# Follow the path to the root, moving parents down until finding a place
42+
# new_item fits.
43+
while pos > start_pos:
44+
parent_pos = (pos - 1) >> 1
45+
parent = heap[parent_pos]
46+
if await document_producer_comparator.compare(new_item, parent) < 0:
47+
# if new_item < parent:
48+
heap[pos] = parent
49+
pos = parent_pos
50+
continue
51+
break
52+
heap[pos] = new_item
53+
54+
55+
async def _sift_up(heap, document_producer_comparator, pos):
56+
end_pos = len(heap)
57+
start_pos = pos
58+
new_item = heap[pos]
59+
# Bubble up the smaller child until hitting a leaf.
60+
child_pos = 2 * pos + 1 # leftmost child position
61+
while child_pos < end_pos:
62+
# Set child_pos to index of smaller child.
63+
right_pos = child_pos + 1
64+
# if right_pos < end_pos and not heap[child_pos] < heap[right_pos]:
65+
if right_pos < end_pos and not await document_producer_comparator.compare(heap[child_pos], heap[right_pos]) < 0:
66+
child_pos = right_pos
67+
# Move the smaller child up.
68+
heap[pos] = heap[child_pos]
69+
pos = child_pos
70+
child_pos = 2 * pos + 1
71+
# The leaf at pos is empty now. Put new_item there, and bubble it up
72+
# to its final resting place (by sifting its parents down).
73+
heap[pos] = new_item
74+
await _sift_down(heap, document_producer_comparator, start_pos, pos)

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
121121
self._has_started = True
122122
new_options = copy.deepcopy(self._options)
123123
new_options["continuation"] = self._continuation
124+
124125
(fetched_items, response_headers) = await fetch_function(new_options)
125126
continuation_key = http_constants.HttpHeaders.Continuation
126127
# Use Etag as continuation token for change feed queries.

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/document_producer.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ async def fetch_fn(options):
6262

6363
self._ex_context = _DefaultQueryExecutionContext(client, self._options, fetch_fn)
6464

65-
def __lt__(self, other):
66-
return self._doc_producer_comp.compare(self, other) < 0
67-
6865
async def __aiter__(self):
6966
return self
7067

@@ -104,14 +101,13 @@ async def peek(self):
104101
return self._cur_item
105102

106103

107-
108104
def _compare_helper(a, b):
109105
if a is None and b is None:
110106
return 0
111107
return (a > b) - (a < b)
112108

113109

114-
class _PartitionKeyRangeDocumentProduerComparator(object):
110+
class _PartitionKeyRangeDocumentProducerComparator(object):
115111
"""
116112
Provides a Comparator for document producers using the min value of the
117113
corresponding target partition.
@@ -120,7 +116,7 @@ class _PartitionKeyRangeDocumentProduerComparator(object):
120116
def __init__(self):
121117
pass
122118

123-
def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use
119+
async def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use
124120
return _compare_helper(
125121
doc_producer1.get_target_range()["minInclusive"], doc_producer2.get_target_range()["minInclusive"]
126122
)
@@ -179,7 +175,7 @@ def getTypeStr(orderby_item):
179175
raise TypeError("unknown type" + str(val))
180176

181177
@staticmethod
182-
def compare(orderby_item1, orderby_item2):
178+
async def compare(orderby_item1, orderby_item2):
183179
"""Compare two orderby item pairs.
184180
185181
:param dict orderby_item1:
@@ -213,7 +209,7 @@ def _peek_order_by_items(peek_result):
213209
return peek_result["orderByItems"]
214210

215211

216-
class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProduerComparator):
212+
class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProducerComparator):
217213
"""Provide a Comparator for document producers which respects orderby sort order.
218214
"""
219215

@@ -229,7 +225,7 @@ def __init__(self, sort_order): # pylint: disable=super-init-not-called
229225
"""
230226
self._sort_order = sort_order
231227

232-
def compare(self, doc_producer1, doc_producer2):
228+
async def compare(self, doc_producer1, doc_producer2):
233229
"""Compares the given two instances of DocumentProducers.
234230
235231
Based on the orderby query items and whether the sort order is Ascending
@@ -238,29 +234,29 @@ def compare(self, doc_producer1, doc_producer2):
238234
If the peek results are equal based on the sort order, this comparator
239235
compares the target partition key range of the two DocumentProducers.
240236
241-
:param _DocumentProducer doc_producers1: first instance
242-
:param _DocumentProducer doc_producers2: first instance
237+
:param _DocumentProducer doc_producer1: first instance
238+
:param _DocumentProducer doc_producer2: first instance
243239
:return:
244240
Integer value of compare result.
245-
positive integer if doc_producers1 > doc_producers2
246-
negative integer if doc_producers1 < doc_producers2
241+
positive integer if doc_producer1 > doc_producer2
242+
negative integer if doc_producer1 < doc_producer2
247243
:rtype: int
248244
"""
249245

250-
res1 = _peek_order_by_items(doc_producer1.peek())
251-
res2 = _peek_order_by_items(doc_producer2.peek())
246+
res1 = _peek_order_by_items(await doc_producer1.peek())
247+
res2 = _peek_order_by_items(await doc_producer2.peek())
252248

253249
self._validate_orderby_items(res1, res2)
254250

255251
for i, (elt1, elt2) in enumerate(zip(res1, res2)):
256-
res = _OrderByHelper.compare(elt1, elt2)
252+
res = await _OrderByHelper.compare(elt1, elt2)
257253
if res != 0:
258254
if self._sort_order[i] == "Ascending":
259255
return res
260256
if self._sort_order[i] == "Descending":
261257
return -res
262258

263-
return _PartitionKeyRangeDocumentProduerComparator.compare(self, doc_producer1, doc_producer2)
259+
return await _PartitionKeyRangeDocumentProducerComparator.compare(self, doc_producer1, doc_producer2)
264260

265261
def _validate_orderby_items(self, res1, res2):
266262
if len(res1) != len(res2):

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ async def __anext__(self):
7070
query_to_use = self._query if self._query is not None else "Select * from root r"
7171
query_execution_info = _PartitionedQueryExecutionInfo(await self._client._GetQueryPlanThroughGateway
7272
(query_to_use, self._resource_link))
73-
self._execution_context = self._create_pipelined_execution_context(query_execution_info)
73+
self._execution_context = await self._create_pipelined_execution_context(query_execution_info)
7474
else:
7575
raise e
7676

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
2323
"""
2424

25-
import heapq
2625
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
27-
from azure.cosmos._execution_context.aio import document_producer
26+
from azure.cosmos._execution_context.aio import document_producer, _queue_async_helper
2827
from azure.cosmos._routing import routing_range
28+
from azure.cosmos import exceptions
2929

3030
# pylint: disable=protected-access
3131

@@ -50,11 +50,11 @@ class PriorityQueue:
5050
def __init__(self):
5151
self._heap = []
5252

53-
def pop(self):
54-
return heapq.heappop(self._heap)
53+
async def pop_async(self, document_producer_comparator):
54+
return await _queue_async_helper.heap_pop(self._heap, document_producer_comparator)
5555

56-
def push(self, item):
57-
heapq.heappush(self._heap, item)
56+
async def push_async(self, item, document_producer_comparator):
57+
await _queue_async_helper.heap_push(self._heap, item, document_producer_comparator)
5858

5959
def peek(self):
6060
return self._heap[0]
@@ -76,7 +76,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
7676
if self._sort_orders:
7777
self._document_producer_comparator = document_producer._OrderByDocumentProducerComparator(self._sort_orders)
7878
else:
79-
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProduerComparator()
79+
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProducerComparator()
8080

8181
self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue()
8282

@@ -89,13 +89,13 @@ async def __anext__(self):
8989
"""
9090
if self._orderByPQ.size() > 0:
9191

92-
targetRangeExContext = self._orderByPQ.pop()
92+
targetRangeExContext = await self._orderByPQ.pop_async(self._document_producer_comparator)
9393
res = await targetRangeExContext.__anext__()
9494

9595
try:
9696
# TODO: we can also use more_itertools.peekable to be more python friendly
9797
await targetRangeExContext.peek()
98-
self._orderByPQ.push(targetRangeExContext)
98+
await self._orderByPQ.push_async(targetRangeExContext, self._document_producer_comparator)
9999

100100
except StopAsyncIteration:
101101
pass
@@ -107,6 +107,33 @@ async def fetch_next_block(self):
107107

108108
raise NotImplementedError("You should use pipeline's fetch_next_block.")
109109

110+
async def _repair_document_producer(self):
111+
"""Repairs the document producer context by using the re-initialized routing map provider in the client,
112+
which loads in a refreshed partition key range cache to re-create the partition key ranges.
113+
After loading this new cache, the document producers get re-created with the new valid ranges.
114+
"""
115+
# refresh the routing provider to get the newly initialized one post-refresh
116+
self._routing_provider = self._client._routing_map_provider
117+
# will be a list of (partition_min, partition_max) tuples
118+
targetPartitionRanges = await self._get_target_partition_key_range()
119+
120+
targetPartitionQueryExecutionContextList = []
121+
for partitionTargetRange in targetPartitionRanges:
122+
# create and add the child execution context for the target range
123+
targetPartitionQueryExecutionContextList.append(
124+
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
125+
)
126+
127+
for targetQueryExContext in targetPartitionQueryExecutionContextList:
128+
try:
129+
# TODO: we can also use more_itertools.peekable to be more python friendly
130+
await targetQueryExContext.peek()
131+
# if there are matching results in the target ex range add it to the priority queue
132+
await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator)
133+
134+
except StopAsyncIteration:
135+
continue
136+
110137
def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range):
111138

112139
rewritten_query = self._partitioned_query_ex_info.get_rewritten_query()
@@ -148,13 +175,19 @@ async def _configure_partition_ranges(self):
148175
)
149176

150177
for targetQueryExContext in targetPartitionQueryExecutionContextList:
151-
152178
try:
153179
# TODO: we can also use more_itertools.peekable to be more python friendly
154180
await targetQueryExContext.peek()
155181
# if there are matching results in the target ex range add it to the priority queue
156182

157-
self._orderByPQ.push(targetQueryExContext)
183+
await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator)
184+
185+
except exceptions.CosmosHttpResponseError as e:
186+
if exceptions._partition_range_is_gone(e):
187+
# repairing document producer context on partition split
188+
await self._repair_document_producer()
189+
else:
190+
raise
158191

159192
except StopAsyncIteration:
160193
continue

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
100100
self._orderByPQ.push(targetQueryExContext)
101101

102102
except exceptions.CosmosHttpResponseError as e:
103-
if exceptions.partition_range_is_gone(e):
103+
if exceptions._partition_range_is_gone(e):
104104
# repairing document producer context on partition split
105105
self._repair_document_producer()
106106
else:

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
101101
and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
102102
):
103103
retry_policy = sessionRetry_policy
104-
elif exceptions.partition_range_is_gone(e):
104+
elif exceptions._partition_range_is_gone(e):
105105
retry_policy = partition_key_range_gone_retry_policy
106106
else:
107107
retry_policy = defaultRetry_policy

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async def get_overlapping_ranges(self, collection_link, partition_key_ranges):
5454
overlapping partition key ranges.
5555
5656
:param str collection_link: The name of the collection.
57-
:param list partition_key_range: List of partition key range.
57+
:param list partition_key_ranges: List of partition key range.
5858
:return: List of overlapping partition key ranges.
5959
:rtype: list
6060
"""
@@ -127,7 +127,7 @@ def _subtract_range(r, partition_key_range):
127127

128128
class SmartRoutingMapProvider(PartitionKeyRangeCache):
129129
"""
130-
Efficiently uses PartitionKeyRangeCach and minimizes the unnecessary
130+
Efficiently uses PartitionKeyRangeCache and minimizes the unnecessary
131131
invocation of CollectionRoutingMap.get_overlapping_ranges()
132132
"""
133133

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2477,6 +2477,10 @@ def _retrieve_partition_key(self, partition_key_parts, document, is_system_key):
24772477

24782478
return partitionKey
24792479

2480+
def refresh_routing_map_provider(self):
2481+
# re-initializes the routing map provider, effectively refreshing the current partition key range cache
2482+
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
2483+
24802484
async def _GetQueryPlanThroughGateway(self, query, resource_link, **kwargs):
24812485
supported_query_features = (documents._QueryFeature.Aggregate + "," +
24822486
documents._QueryFeature.CompositeAggregate + "," +

0 commit comments

Comments
 (0)