26
26
import copy
27
27
import json
28
28
from abc import ABC , abstractmethod
29
+ from typing import Dict , Any , List
29
30
30
31
from azure .cosmos import http_constants , exceptions
31
- from azure .cosmos ._change_feed .aio .change_feed_state import ChangeFeedStateV1 , ChangeFeedStateV2
32
+ from azure .cosmos ._change_feed .change_feed_start_from import ChangeFeedStartFromPointInTime
33
+ from azure .cosmos ._change_feed .change_feed_state import ChangeFeedStateV1 , ChangeFeedStateV2
32
34
from azure .cosmos .aio import _retry_utility_async
33
35
from azure .cosmos .exceptions import CosmosHttpResponseError
34
36
37
+ # pylint: disable=protected-access
35
38
36
39
class ChangeFeedFetcher (ABC ):
37
40
@@ -49,16 +52,16 @@ def __init__(
49
52
self ,
50
53
client ,
51
54
resource_link : str ,
52
- feed_options : dict [str , any ],
55
+ feed_options : Dict [str , Any ],
53
56
fetch_function ):
54
57
55
58
self ._client = client
56
59
self ._feed_options = feed_options
57
60
58
61
self ._change_feed_state = self ._feed_options .pop ("changeFeedState" )
59
62
if not isinstance (self ._change_feed_state , ChangeFeedStateV1 ):
60
- raise ValueError (f"ChangeFeedFetcherV1 can not handle change feed state version { type ( self . _change_feed_state ) } " )
61
- self ._change_feed_state . __class__ = ChangeFeedStateV1
63
+ raise ValueError (f"ChangeFeedFetcherV1 can not handle change feed state version"
64
+ f" { type ( self ._change_feed_state ) } " )
62
65
63
66
self ._resource_link = resource_link
64
67
self ._fetch_function = fetch_function
@@ -74,24 +77,27 @@ async def callback():
74
77
75
78
return await _retry_utility_async .ExecuteAsync (self ._client , self ._client ._global_endpoint_manager , callback )
76
79
77
- async def fetch_change_feed_items (self , fetch_function ) -> list [ dict [str , any ]]:
80
+ async def fetch_change_feed_items (self , fetch_function ) -> List [ Dict [str , Any ]]:
78
81
new_options = copy .deepcopy (self ._feed_options )
79
82
new_options ["changeFeedState" ] = self ._change_feed_state
80
83
81
84
self ._change_feed_state .populate_feed_options (new_options )
82
- is_s_time_first_fetch = True
85
+ is_s_time_first_fetch = self . _change_feed_state . _continuation is None
83
86
while True :
84
87
(fetched_items , response_headers ) = await fetch_function (new_options )
85
88
continuation_key = http_constants .HttpHeaders .ETag
86
89
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
87
90
# there is any items in the response or not.
88
- # For start time however we get no initial results, so we need to pass continuation token? Is this true?
89
91
self ._change_feed_state .apply_server_response_continuation (
90
92
response_headers .get (continuation_key ))
91
93
92
94
if fetched_items :
93
95
break
94
- elif is_s_time_first_fetch :
96
+
97
+ # When processing from point in time, there will be no initial results being returned,
98
+ # so we will retry with the new continuation token again
99
+ if (isinstance (self ._change_feed_state ._change_feed_start_from , ChangeFeedStartFromPointInTime )
100
+ and is_s_time_first_fetch ):
95
101
is_s_time_first_fetch = False
96
102
else :
97
103
break
@@ -106,16 +112,15 @@ def __init__(
106
112
self ,
107
113
client ,
108
114
resource_link : str ,
109
- feed_options : dict [str , any ],
115
+ feed_options : Dict [str , Any ],
110
116
fetch_function ):
111
117
112
118
self ._client = client
113
119
self ._feed_options = feed_options
114
120
115
- self ._change_feed_state = self ._feed_options .pop ("changeFeedState" )
121
+ self ._change_feed_state : ChangeFeedStateV2 = self ._feed_options .pop ("changeFeedState" )
116
122
if not isinstance (self ._change_feed_state , ChangeFeedStateV2 ):
117
123
raise ValueError (f"ChangeFeedFetcherV2 can not handle change feed state version { type (self ._change_feed_state )} " )
118
- self ._change_feed_state .__class__ = ChangeFeedStateV2
119
124
120
125
self ._resource_link = resource_link
121
126
self ._fetch_function = fetch_function
@@ -131,17 +136,22 @@ async def callback():
131
136
return await self .fetch_change_feed_items (self ._fetch_function )
132
137
133
138
try :
134
- return await _retry_utility_async .ExecuteAsync (self ._client , self ._client ._global_endpoint_manager , callback )
139
+ return await _retry_utility_async .ExecuteAsync (
140
+ self ._client ,
141
+ self ._client ._global_endpoint_manager ,
142
+ callback )
135
143
except CosmosHttpResponseError as e :
136
144
if exceptions ._partition_range_is_gone (e ) or exceptions ._is_partition_split_or_merge (e ):
137
145
# refresh change feed state
138
- await self ._change_feed_state .handle_feed_range_gone (self ._client ._routing_map_provider , self ._resource_link )
146
+ await self ._change_feed_state .handle_feed_range_gone_async (
147
+ self ._client ._routing_map_provider ,
148
+ self ._resource_link )
139
149
else :
140
150
raise e
141
151
142
152
return await self .fetch_next_block ()
143
153
144
- async def fetch_change_feed_items (self , fetch_function ) -> list [ dict [str , any ]]:
154
+ async def fetch_change_feed_items (self , fetch_function ) -> List [ Dict [str , Any ]]:
145
155
new_options = copy .deepcopy (self ._feed_options )
146
156
new_options ["changeFeedState" ] = self ._change_feed_state
147
157
@@ -154,19 +164,33 @@ async def fetch_change_feed_items(self, fetch_function) -> list[dict[str, any]]:
154
164
continuation_key = http_constants .HttpHeaders .ETag
155
165
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
156
166
# there is any items in the response or not.
157
- # For start time however we get no initial results, so we need to pass continuation token? Is this true?
158
167
if fetched_items :
159
168
self ._change_feed_state .apply_server_response_continuation (
160
169
response_headers .get (continuation_key ))
161
170
response_headers [continuation_key ] = self ._get_base64_encoded_continuation ()
162
171
break
163
- else :
172
+
173
+ # when there is no items being returned, we will decide to retry based on:
174
+ # 1. When processing from point in time, there will be no initial results being returned,
175
+ # so we will retry with the new continuation token
176
+ # 2. if the feed range of the changeFeedState span multiple physical partitions
177
+ # then we will read from the next feed range until we have looped through all physical partitions
164
178
self ._change_feed_state .apply_not_modified_response ()
165
179
self ._change_feed_state .apply_server_response_continuation (
166
180
response_headers .get (continuation_key ))
167
- response_headers [continuation_key ] = self ._get_base64_encoded_continuation ()
168
- should_retry = self ._change_feed_state .should_retry_on_not_modified_response () or is_s_time_first_fetch
169
- is_s_time_first_fetch = False
181
+
182
+ #TODO: can this part logic be simplified
183
+ if (isinstance (self ._change_feed_state ._change_feed_start_from , ChangeFeedStartFromPointInTime )
184
+ and is_s_time_first_fetch ):
185
+ response_headers [continuation_key ] = self ._get_base64_encoded_continuation ()
186
+ is_s_time_first_fetch = False
187
+ should_retry = True
188
+ else :
189
+ self ._change_feed_state ._continuation ._move_to_next_token ()
190
+ response_headers [continuation_key ] = self ._get_base64_encoded_continuation ()
191
+ should_retry = self ._change_feed_state .should_retry_on_not_modified_response ()
192
+ is_s_time_first_fetch = False
193
+
170
194
if not should_retry :
171
195
break
172
196
0 commit comments