Skip to content

Commit ddd598e

Browse files
author
annie-mac
committed
clean up change feed logic from query pipeline
1 parent cdb6d52 commit ddd598e

File tree

4 files changed

+2
-25
lines changed

4 files changed

+2
-25
lines changed

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1162,7 +1162,6 @@ def _QueryChangeFeed(
11621162
options = {}
11631163
else:
11641164
options = dict(options)
1165-
options["changeFeed"] = True
11661165

11671166
resource_key_map = {"Documents": "docs"}
11681167

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ def __init__(self, client, options):
4444
"""
4545
self._client = client
4646
self._options = options
47-
self._is_change_feed = "changeFeed" in options and options["changeFeed"] is True
4847
self._continuation = self._get_initial_continuation()
4948
self._has_started = False
5049
self._has_finished = False
@@ -117,10 +116,6 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
117116
fetched_items = []
118117
new_options = copy.deepcopy(self._options)
119118
while self._continuation or not self._has_started:
120-
# Check if this is first fetch for read from specific time change feed.
121-
# For read specific time the first fetch will return empty even if we have more pages.
122-
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started
123-
124119
new_options["continuation"] = self._continuation
125120

126121
response_headers = {}
@@ -129,13 +124,7 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
129124
self._has_started = True
130125

131126
continuation_key = http_constants.HttpHeaders.Continuation
132-
# Use Etag as continuation token for change feed queries.
133-
if self._is_change_feed:
134-
continuation_key = http_constants.HttpHeaders.ETag
135-
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
136-
# there is any items in the response or not.
137-
# No initial fetch for start time change feed, so we need to pass continuation token for first fetch
138-
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
127+
if fetched_items:
139128
self._continuation = response_headers.get(continuation_key)
140129
else:
141130
self._continuation = None

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def __init__(self, client, options):
4242
"""
4343
self._client = client
4444
self._options = options
45-
self._is_change_feed = "changeFeed" in options and options["changeFeed"] is True
4645
self._continuation = self._get_initial_continuation()
4746
self._has_started = False
4847
self._has_finished = False
@@ -115,9 +114,6 @@ def _fetch_items_helper_no_retries(self, fetch_function):
115114
fetched_items = []
116115
new_options = copy.deepcopy(self._options)
117116
while self._continuation or not self._has_started:
118-
# Check if this is first fetch for read from specific time change feed.
119-
# For read specific time the first fetch will return empty even if we have more pages.
120-
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started
121117
if not self._has_started:
122118
self._has_started = True
123119
new_options["continuation"] = self._continuation
@@ -126,13 +122,7 @@ def _fetch_items_helper_no_retries(self, fetch_function):
126122
(fetched_items, response_headers) = fetch_function(new_options)
127123

128124
continuation_key = http_constants.HttpHeaders.Continuation
129-
# Use Etag as continuation token for change feed queries.
130-
if self._is_change_feed:
131-
continuation_key = http_constants.HttpHeaders.ETag
132-
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
133-
# there is any items in the response or not.
134-
# For start time however we get no initial results, so we need to pass continuation token
135-
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
125+
if fetched_items:
136126
self._continuation = response_headers.get(continuation_key)
137127
else:
138128
self._continuation = None

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2277,7 +2277,6 @@ def _QueryChangeFeed(
22772277
options = {}
22782278
else:
22792279
options = dict(options)
2280-
options["changeFeed"] = True
22812280

22822281
resource_key_map = {"Documents": "docs"}
22832282

0 commit comments

Comments
 (0)