Skip to content

Commit b6c53fb

Browse files
author
annie-mac
committed
refactor
1 parent 7a1a1eb commit b6c53fb

10 files changed

+170
-148
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_iterable.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,16 @@ async def _fetch_next(self, *args): # pylint: disable=unused-argument
114114

115115
async def _initialize_change_feed_fetcher(self):
116116
change_feed_state_context = self._options.pop("changeFeedStateContext")
117-
conn_properties = await change_feed_state_context.pop("containerProperties")
117+
conn_properties = await self._options.pop("containerProperties")
118118
if is_key_exists_and_not_none(change_feed_state_context, "partitionKey"):
119119
change_feed_state_context["partitionKey"] = await change_feed_state_context.pop("partitionKey")
120-
121-
pk_properties = conn_properties.get("partitionKey")
122-
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
120+
pk_properties = conn_properties.get("partitionKey")
121+
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
122+
change_feed_state_context["partitionKeyFeedRange"] =\
123+
partition_key_definition._get_epk_range_for_partition_key(change_feed_state_context["partitionKey"])
123124

124125
change_feed_state =\
125-
ChangeFeedState.from_json(self._collection_link, conn_properties["_rid"], partition_key_definition, change_feed_state_context)
126+
ChangeFeedState.from_json(self._collection_link, conn_properties["_rid"], change_feed_state_context)
126127
self._options["changeFeedState"] = change_feed_state
127128

128129
if isinstance(change_feed_state, ChangeFeedStateV1):
@@ -158,5 +159,3 @@ def _validate_change_feed_state_context(self, change_feed_state_context: dict[st
158159
if count > 1:
159160
raise ValueError(
160161
"partition_key_range_id, partition_key, feed_range are exclusive parameters, please only set one of them")
161-
162-

sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/change_feed_state.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from abc import ABC, abstractmethod
3030
from typing import Optional, Union, List, Any
3131

32-
from azure.cosmos import http_constants, PartitionKey
32+
from azure.cosmos import http_constants
3333
from azure.cosmos._change_feed.aio.change_feed_start_from import ChangeFeedStartFromETagAndFeedRange, \
3434
ChangeFeedStartFromInternal
3535
from azure.cosmos._change_feed.aio.composite_continuation_token import CompositeContinuationToken
@@ -64,7 +64,6 @@ def apply_server_response_continuation(self, continuation: str) -> None:
6464
def from_json(
6565
container_link: str,
6666
container_rid: str,
67-
partition_key_definition: PartitionKey,
6867
data: dict[str, Any]):
6968
if is_key_exists_and_not_none(data, "partitionKeyRangeId") or is_key_exists_and_not_none(data, "continuationPkRangeId"):
7069
return ChangeFeedStateV1.from_json(container_link, container_rid, data)
@@ -77,11 +76,11 @@ def from_json(
7776
if version is None:
7877
raise ValueError("Invalid base64 encoded continuation string [Missing version]")
7978
elif version == "V2":
80-
return ChangeFeedStateV2.from_continuation(container_link, container_rid, partition_key_definition, continuation_json)
79+
return ChangeFeedStateV2.from_continuation(container_link, container_rid, continuation_json)
8180
else:
8281
raise ValueError("Invalid base64 encoded continuation string [Invalid version]")
8382
# when there is no continuation token, by default construct ChangeFeedStateV2
84-
return ChangeFeedStateV2.from_initial_state(container_link, container_rid, partition_key_definition, data)
83+
return ChangeFeedStateV2.from_initial_state(container_link, container_rid, data)
8584

8685
class ChangeFeedStateV1(ChangeFeedState):
8786
"""Change feed state v1 implementation. This is used when partition key range id is used or the continuation is just simple _etag
@@ -151,7 +150,6 @@ def __init__(
151150
self,
152151
container_link: str,
153152
container_rid: str,
154-
partition_key_definition: PartitionKey,
155153
feed_range: FeedRange,
156154
change_feed_start_from: ChangeFeedStartFromInternal,
157155
continuation: Optional[FeedRangeCompositeContinuation] = None):
@@ -163,9 +161,10 @@ def __init__(
163161
self._continuation = continuation
164162
if self._continuation is None:
165163
composite_continuation_token_queue = collections.deque()
166-
composite_continuation_token_queue.append(CompositeContinuationToken(
167-
self._feed_range.get_normalized_range(partition_key_definition),
168-
None))
164+
composite_continuation_token_queue.append(
165+
CompositeContinuationToken(
166+
self._feed_range.get_normalized_range(),
167+
None))
169168
self._continuation =\
170169
FeedRangeCompositeContinuation(self._container_rid, self._feed_range, composite_continuation_token_queue)
171170

@@ -241,7 +240,6 @@ def from_continuation(
241240
cls,
242241
container_link: str,
243242
container_rid: str,
244-
partition_key_definition: PartitionKey,
245243
continuation_json: dict[str, Any]) -> 'ChangeFeedStateV2':
246244

247245
container_rid_from_continuation = continuation_json.get(ChangeFeedStateV2.container_rid_property_name)
@@ -262,8 +260,7 @@ def from_continuation(
262260
return ChangeFeedStateV2(
263261
container_link=container_link,
264262
container_rid=container_rid,
265-
partition_key_definition=partition_key_definition,
266-
feed_range=continuation.feed_range,
263+
feed_range=continuation._feed_range,
267264
change_feed_start_from=change_feed_start_from,
268265
continuation=continuation)
269266

@@ -272,15 +269,17 @@ def from_initial_state(
272269
cls,
273270
container_link: str,
274271
collection_rid: str,
275-
partition_key_definition: PartitionKey,
276272
data: dict[str, Any]) -> 'ChangeFeedStateV2':
277273

278274
if is_key_exists_and_not_none(data, "feedRange"):
279275
feed_range_str = base64.b64decode(data["feedRange"]).decode('utf-8')
280276
feed_range_json = json.loads(feed_range_str)
281277
feed_range = FeedRangeEpk(Range.ParseFromDict(feed_range_json))
282278
elif is_key_exists_and_not_none(data, "partitionKey"):
283-
feed_range = FeedRangePartitionKey(data["partitionKey"])
279+
if is_key_exists_and_not_none(data, "partitionKeyFeedRange"):
280+
feed_range = FeedRangePartitionKey(data["partitionKey"], data["partitionKeyFeedRange"])
281+
else:
282+
raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange")
284283
else:
285284
# default to full range
286285
feed_range = FeedRangeEpk(
@@ -294,7 +293,6 @@ def from_initial_state(
294293
return cls(
295294
container_link=container_link,
296295
container_rid=collection_rid,
297-
partition_key_definition=partition_key_definition,
298296
feed_range=feed_range,
299297
change_feed_start_from=change_feed_start_from,
300298
continuation=None)

sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/aio/feed_range_composite_continuation_token.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,6 @@ def __init__(
5656
def current_token(self):
5757
return self._current_token
5858

59-
def get_feed_range(self) -> FeedRange:
60-
if isinstance(self._feed_range, FeedRangeEpk):
61-
return FeedRangeEpk(self.current_token.feed_range)
62-
else:
63-
return self._feed_range
64-
6559
def to_dict(self) -> dict[str, Any]:
6660
json_data = {
6761
self._version_property_name: "v2",
@@ -93,16 +87,17 @@ def from_json(cls, data) -> 'FeedRangeCompositeContinuation':
9387

9488
# parsing feed range
9589
if is_key_exists_and_not_none(data, FeedRangeEpk.type_property_name):
96-
feed_range = FeedRangeEpk.from_json({ FeedRangeEpk.type_property_name: data[FeedRangeEpk.type_property_name] })
90+
feed_range = FeedRangeEpk.from_json(data)
9791
elif is_key_exists_and_not_none(data, FeedRangePartitionKey.type_property_name):
98-
feed_range = FeedRangePartitionKey.from_json({ FeedRangePartitionKey.type_property_name: data[FeedRangePartitionKey.type_property_name] })
92+
feed_range =\
93+
FeedRangePartitionKey.from_json(data, continuation[0].feed_range)
9994
else:
10095
raise ValueError("Invalid feed range composite continuation token [Missing feed range scope]")
10196

10297
return cls(container_rid=container_rid, feed_range=feed_range, continuation=deque(continuation))
10398

10499
async def handle_feed_range_gone(self, routing_provider: SmartRoutingMapProvider, collection_link: str) -> None:
105-
overlapping_ranges = await routing_provider.get_overlapping_ranges(collection_link, self._current_token.feed_range)
100+
overlapping_ranges = await routing_provider.get_overlapping_ranges(collection_link, [self._current_token.feed_range])
106101

107102
if len(overlapping_ranges) == 1:
108103
# merge,reusing the existing the feedRange and continuationToken

sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_fetcher.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ def __init__(
115115
self._change_feed_state = self._feed_options.pop("changeFeedState")
116116
if not isinstance(self._change_feed_state, ChangeFeedStateV2):
117117
raise ValueError(f"ChangeFeedFetcherV2 can not handle change feed state version {type(self._change_feed_state)}")
118-
self._change_feed_state.__class__ = ChangeFeedStateV2
119118

120119
self._resource_link = resource_link
121120
self._fetch_function = fetch_function

sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_iterable.py

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
from azure.cosmos._change_feed.change_feed_fetcher import ChangeFeedFetcherV1, ChangeFeedFetcherV2
2828
from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV1, ChangeFeedState
29-
from azure.cosmos._utils import is_base64_encoded
29+
from azure.cosmos._utils import is_base64_encoded, is_key_exists_and_not_none
3030

3131

3232
class ChangeFeedIterable(PageIterator):
@@ -57,40 +57,29 @@ def __init__(
5757
self._options = options
5858
self._fetch_function = fetch_function
5959
self._collection_link = collection_link
60+
self._change_feed_fetcher = None
6061

61-
change_feed_state = self._options.get("changeFeedState")
62-
if not change_feed_state:
63-
raise ValueError("Missing changeFeedState in feed options")
62+
if not is_key_exists_and_not_none(self._options, "changeFeedStateContext"):
63+
raise ValueError("Missing changeFeedStateContext in feed options")
6464

65-
if isinstance(change_feed_state, ChangeFeedStateV1):
66-
if continuation_token:
67-
if is_base64_encoded(continuation_token):
68-
raise ValueError("Incompatible continuation token")
69-
else:
70-
change_feed_state.apply_server_response_continuation(continuation_token)
65+
change_feed_state_context = self._options.pop("changeFeedStateContext")
66+
continuation = continuation_token if continuation_token is not None else change_feed_state_context.pop("continuation", None)
7167

72-
self._change_feed_fetcher = ChangeFeedFetcherV1(
73-
self._client,
74-
self._collection_link,
75-
self._options,
76-
fetch_function
77-
)
78-
else:
79-
if continuation_token:
80-
if not is_base64_encoded(continuation_token):
81-
raise ValueError("Incompatible continuation token")
68+
# analysis and validate continuation token
69+
# there are two types of continuation token we support currently:
70+
# v1 version: the continuation token would just be the _etag,
71+
# which is being returned when customer is using partition_key_range_id,
72+
# which is under deprecation and does not support split/merge
73+
# v2 version: the continuation token will be base64 encoded composition token which includes full change feed state
74+
if continuation is not None:
75+
if is_base64_encoded(continuation):
76+
change_feed_state_context["continuationFeedRange"] = continuation
77+
else:
78+
change_feed_state_context["continuationPkRangeId"] = continuation
8279

83-
effective_change_feed_context = {"continuationFeedRange": continuation_token}
84-
effective_change_feed_state = ChangeFeedState.from_json(change_feed_state.container_rid, effective_change_feed_context)
85-
# replace with the effective change feed state
86-
self._options["continuationFeedRange"] = effective_change_feed_state
80+
self._validate_change_feed_state_context(change_feed_state_context)
81+
self._options["changeFeedStateContext"] = change_feed_state_context
8782

88-
self._change_feed_fetcher = ChangeFeedFetcherV2(
89-
self._client,
90-
self._collection_link,
91-
self._options,
92-
fetch_function
93-
)
9483
super(ChangeFeedIterable, self).__init__(self._fetch_next, self._unpack, continuation_token=continuation_token)
9584

9685
def _unpack(self, block):
@@ -112,7 +101,58 @@ def _fetch_next(self, *args): # pylint: disable=unused-argument
112101
:return: List of results.
113102
:rtype: list
114103
"""
104+
105+
if self._change_feed_fetcher is None:
106+
self._initialize_change_feed_fetcher()
107+
115108
block = self._change_feed_fetcher.fetch_next_block()
116109
if not block:
117110
raise StopIteration
118111
return block
112+
113+
def _initialize_change_feed_fetcher(self):
114+
change_feed_state_context = self._options.pop("changeFeedStateContext")
115+
change_feed_state = \
116+
ChangeFeedState.from_json(
117+
self._collection_link,
118+
self._options.get("containerRID"),
119+
change_feed_state_context)
120+
121+
self._options["changeFeedState"] = change_feed_state
122+
123+
if isinstance(change_feed_state, ChangeFeedStateV1):
124+
self._change_feed_fetcher = ChangeFeedFetcherV1(
125+
self._client,
126+
self._collection_link,
127+
self._options,
128+
self._fetch_function
129+
)
130+
else:
131+
self._change_feed_fetcher = ChangeFeedFetcherV2(
132+
self._client,
133+
self._collection_link,
134+
self._options,
135+
self._fetch_function
136+
)
137+
138+
def _validate_change_feed_state_context(self, change_feed_state_context: dict[str, any]) -> None:
139+
140+
if is_key_exists_and_not_none(change_feed_state_context, "continuationPkRangeId"):
141+
# if continuation token is in v1 format, throw exception if feed_range is set
142+
if is_key_exists_and_not_none(change_feed_state_context, "feedRange"):
143+
raise ValueError("feed_range and continuation are incompatible")
144+
elif is_key_exists_and_not_none(change_feed_state_context, "continuationFeedRange"):
145+
# if continuation token is in v2 format, since the token itself contains the full change feed state
146+
# so we will ignore other parameters (including incompatible parameters) if they passed in
147+
pass
148+
else:
149+
# validation when no continuation is passed
150+
exclusive_keys = ["partitionKeyRangeId", "partitionKey", "feedRange"]
151+
count = sum(1 for key in exclusive_keys if
152+
key in change_feed_state_context and change_feed_state_context[key] is not None)
153+
if count > 1:
154+
raise ValueError(
155+
"partition_key_range_id, partition_key, feed_range are exclusive parameters, please only set one of them")
156+
157+
158+

0 commit comments

Comments
 (0)