Skip to content

Commit cfb30e9

Browse files
committed
PYTHON-1362 - Add find/aggregate_raw_batches()
Rename find_raw to find_raw_batches, and add aggregate_raw_batches. Rename RawBSONCursor and RawBSONCommandCursor to RawBatchCursor and RawBatchCommandCursor.
1 parent 4957589 commit cfb30e9

File tree

8 files changed

+304
-128
lines changed

8 files changed

+304
-128
lines changed

Diff for: doc/api/pymongo/cursor.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@
2424

2525
.. automethod:: __getitem__
2626

27-
.. autoclass:: pymongo.cursor.RawBSONCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, modifiers=None, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None)
27+
.. autoclass:: pymongo.cursor.RawBatchCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, modifiers=None, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None)

Diff for: doc/changelog.rst

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ This version drops support for MongoDB versions older than 2.6. If connecting to
88
a MongoDB 2.4 server or older, PyMongo now throws a
99
:exc:`~pymongo.errors.ConfigurationError`.
1010

11+
Highlights include:
12+
13+
- New methods :meth:`~pymongo.collection.Collection.find_raw_batches` and
14+
:meth:`~pymongo.collection.Collection.aggregate_raw_batches` for use with
15+
external libraries that can parse raw batches of BSON data.
16+
1117
Changes in Version 3.5.1
1218
------------------------
1319

Diff for: pymongo/collection.py

+92-61
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
helpers,
3131
message)
3232
from pymongo.bulk import BulkOperationBuilder, _Bulk
33-
from pymongo.command_cursor import CommandCursor
33+
from pymongo.command_cursor import CommandCursor, RawBatchCommandCursor
3434
from pymongo.collation import validate_collation_or_none
35-
from pymongo.cursor import Cursor, RawBSONCursor
35+
from pymongo.cursor import Cursor, RawBatchCursor
3636
from pymongo.errors import ConfigurationError, InvalidName, OperationFailure
3737
from pymongo.helpers import _check_write_command_response
3838
from pymongo.helpers import _UNICODE_REPLACE_CODEC_OPTIONS
@@ -1278,25 +1278,25 @@ def find(self, *args, **kwargs):
12781278
"""
12791279
return Cursor(self, *args, **kwargs)
12801280

1281-
def find_raw(self, *args, **kwargs):
1281+
def find_raw_batches(self, *args, **kwargs):
12821282
"""Query the database and retrieve batches of raw BSON.
12831283
12841284
Takes the same parameters as :meth:`find` but returns a
1285-
:class:`~pymongo.cursor.RawBSONCursor`.
1285+
:class:`~pymongo.cursor.RawBatchCursor`.
12861286
12871287
This example demonstrates how to work with raw batches, but in practice
12881288
raw batches should be passed to an external library that can decode
12891289
BSON into another data type, rather than used with PyMongo's
12901290
:mod:`bson` module.
12911291
12921292
>>> import bson
1293-
>>> cursor = db.test.find_raw()
1293+
>>> cursor = db.test.find_raw_batches()
12941294
>>> for batch in cursor:
12951295
... print(bson.decode_all(batch))
12961296
12971297
.. versionadded:: 3.6
12981298
"""
1299-
return RawBSONCursor(self, *args, **kwargs)
1299+
return RawBatchCursor(self, *args, **kwargs)
13001300

13011301
def parallel_scan(self, num_cursors, **kwargs):
13021302
"""Scan this entire collection in parallel.
@@ -1821,6 +1821,69 @@ def options(self):
18211821

18221822
return options
18231823

1824+
def _aggregate(self, pipeline, cursor_class, first_batch_size, **kwargs):
1825+
if not isinstance(pipeline, list):
1826+
raise TypeError("pipeline must be a list")
1827+
1828+
if "explain" in kwargs:
1829+
raise ConfigurationError("The explain option is not supported. "
1830+
"Use Database.command instead.")
1831+
collation = validate_collation_or_none(kwargs.pop('collation', None))
1832+
1833+
cmd = SON([("aggregate", self.__name),
1834+
("pipeline", pipeline)])
1835+
1836+
# Remove things that are not command options.
1837+
batch_size = common.validate_non_negative_integer_or_none(
1838+
"batchSize", kwargs.pop("batchSize", None))
1839+
use_cursor = common.validate_boolean(
1840+
"useCursor", kwargs.pop("useCursor", True))
1841+
# If the server does not support the "cursor" option we
1842+
# ignore useCursor and batchSize.
1843+
with self._socket_for_reads() as (sock_info, slave_ok):
1844+
if sock_info.max_wire_version > 0:
1845+
if use_cursor:
1846+
if "cursor" not in kwargs:
1847+
kwargs["cursor"] = {}
1848+
if first_batch_size is not None:
1849+
kwargs["cursor"]["batchSize"] = first_batch_size
1850+
1851+
dollar_out = pipeline and '$out' in pipeline[-1]
1852+
if (sock_info.max_wire_version >= 5 and dollar_out and
1853+
self.write_concern):
1854+
cmd['writeConcern'] = self.write_concern.document
1855+
1856+
cmd.update(kwargs)
1857+
1858+
# Apply this Collection's read concern if $out is not in the
1859+
# pipeline.
1860+
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
1861+
if dollar_out:
1862+
result = self._command(sock_info, cmd, slave_ok,
1863+
parse_write_concern_error=True,
1864+
collation=collation)
1865+
else:
1866+
result = self._command(sock_info, cmd, slave_ok,
1867+
read_concern=self.read_concern,
1868+
collation=collation)
1869+
else:
1870+
result = self._command(sock_info, cmd, slave_ok,
1871+
parse_write_concern_error=dollar_out,
1872+
collation=collation)
1873+
1874+
if "cursor" in result:
1875+
cursor = result["cursor"]
1876+
else:
1877+
# Pre-MongoDB 2.6. Fake a cursor.
1878+
cursor = {
1879+
"id": 0,
1880+
"firstBatch": result["result"],
1881+
"ns": self.full_name,
1882+
}
1883+
1884+
return cursor_class(
1885+
self, cursor, sock_info.address).batch_size(batch_size or 0)
1886+
18241887
def aggregate(self, pipeline, **kwargs):
18251888
"""Perform an aggregation using the aggregation framework on this
18261889
collection.
@@ -1892,66 +1955,34 @@ def aggregate(self, pipeline, **kwargs):
18921955
.. _aggregate command:
18931956
http://docs.mongodb.org/manual/applications/aggregation
18941957
"""
1895-
if not isinstance(pipeline, list):
1896-
raise TypeError("pipeline must be a list")
1897-
1898-
if "explain" in kwargs:
1899-
raise ConfigurationError("The explain option is not supported. "
1900-
"Use Database.command instead.")
1901-
collation = validate_collation_or_none(kwargs.pop('collation', None))
1958+
return self._aggregate(pipeline,
1959+
CommandCursor,
1960+
kwargs.get('batchSize'),
1961+
**kwargs)
19021962

1903-
cmd = SON([("aggregate", self.__name),
1904-
("pipeline", pipeline)])
1905-
1906-
# Remove things that are not command options.
1907-
batch_size = common.validate_positive_integer_or_none(
1908-
"batchSize", kwargs.pop("batchSize", None))
1909-
use_cursor = common.validate_boolean(
1910-
"useCursor", kwargs.pop("useCursor", True))
1911-
# If the server does not support the "cursor" option we
1912-
# ignore useCursor and batchSize.
1913-
with self._socket_for_reads() as (sock_info, slave_ok):
1914-
if sock_info.max_wire_version > 0:
1915-
if use_cursor:
1916-
if "cursor" not in kwargs:
1917-
kwargs["cursor"] = {}
1918-
if batch_size is not None:
1919-
kwargs["cursor"]["batchSize"] = batch_size
1963+
def aggregate_raw_batches(self, pipeline, **kwargs):
1964+
"""Perform an aggregation and retrieve batches of raw BSON.
19201965
1921-
dollar_out = pipeline and '$out' in pipeline[-1]
1922-
if (sock_info.max_wire_version >= 5 and dollar_out and
1923-
self.write_concern):
1924-
cmd['writeConcern'] = self.write_concern.document
1966+
Takes the same parameters as :meth:`aggregate` but returns a
1967+
:class:`~pymongo.cursor.RawBatchCursor`.
19251968
1926-
cmd.update(kwargs)
1969+
This example demonstrates how to work with raw batches, but in practice
1970+
raw batches should be passed to an external library that can decode
1971+
BSON into another data type, rather than used with PyMongo's
1972+
:mod:`bson` module.
19271973
1928-
# Apply this Collection's read concern if $out is not in the
1929-
# pipeline.
1930-
if sock_info.max_wire_version >= 4 and 'readConcern' not in cmd:
1931-
if dollar_out:
1932-
result = self._command(sock_info, cmd, slave_ok,
1933-
parse_write_concern_error=True,
1934-
collation=collation)
1935-
else:
1936-
result = self._command(sock_info, cmd, slave_ok,
1937-
read_concern=self.read_concern,
1938-
collation=collation)
1939-
else:
1940-
result = self._command(sock_info, cmd, slave_ok,
1941-
parse_write_concern_error=dollar_out,
1942-
collation=collation)
1974+
>>> import bson
1975+
>>> cursor = db.test.aggregate_raw_batches([
1976+
... {'$project': {'x': {'$multiply': [2, '$x']}}}])
1977+
>>> for batch in cursor:
1978+
... print(bson.decode_all(batch))
19431979
1944-
if "cursor" in result:
1945-
cursor = result["cursor"]
1946-
else:
1947-
# Pre-MongoDB 2.6. Fake a cursor.
1948-
cursor = {
1949-
"id": 0,
1950-
"firstBatch": result["result"],
1951-
"ns": self.full_name,
1952-
}
1953-
return CommandCursor(
1954-
self, cursor, sock_info.address).batch_size(batch_size or 0)
1980+
.. versionadded:: 3.6
1981+
"""
1982+
return self._aggregate(pipeline,
1983+
RawBatchCommandCursor,
1984+
0,
1985+
**kwargs)
19551986

19561987
def group(self, key, condition, initial, reduce, finalize=None, **kwargs):
19571988
"""Perform a query similar to an SQL *group by* operation.

Diff for: pymongo/command_cursor.py

+54-15
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,29 @@
2020

2121
from bson.py3compat import integer_types
2222
from pymongo import helpers
23-
from pymongo.errors import AutoReconnect, NotMasterError, OperationFailure
24-
from pymongo.message import _CursorAddress, _GetMore, _convert_exception
23+
from pymongo.errors import (AutoReconnect,
24+
InvalidOperation,
25+
NotMasterError,
26+
OperationFailure)
27+
from pymongo.message import (_convert_exception,
28+
_CursorAddress,
29+
_GetMore,
30+
_RawBatchGetMore)
2531

2632

2733
class CommandCursor(object):
28-
"""A cursor / iterator over command cursors.
29-
"""
34+
"""A cursor / iterator over command cursors."""
35+
_getmore_class = _GetMore
3036

3137
def __init__(self, collection, cursor_info, address, retrieved=0):
3238
"""Create a new command cursor.
39+
40+
The parameter 'retrieved' is unused.
3341
"""
3442
self.__collection = collection
3543
self.__id = cursor_info['id']
3644
self.__address = address
3745
self.__data = deque(cursor_info['firstBatch'])
38-
self.__retrieved = retrieved
3946
self.__batch_size = 0
4047
self.__killed = (self.__id == 0)
4148

@@ -115,9 +122,9 @@ def __send_message(self, operation):
115122
if publish:
116123
start = datetime.datetime.now()
117124
try:
118-
doc = helpers._unpack_response(response.data,
119-
self.__id,
120-
self.__collection.codec_options)
125+
doc = self._unpack_response(response.data,
126+
self.__id,
127+
self.__collection.codec_options)
121128
if from_command:
122129
helpers._check_command_response(doc['data'][0])
123130

@@ -154,11 +161,9 @@ def __send_message(self, operation):
154161
cursor = doc['data'][0]['cursor']
155162
documents = cursor['nextBatch']
156163
self.__id = cursor['id']
157-
self.__retrieved += len(documents)
158164
else:
159165
documents = doc["data"]
160166
self.__id = doc["cursor_id"]
161-
self.__retrieved += doc["number_returned"]
162167

163168
if publish:
164169
duration = (datetime.datetime.now() - start) + cmd_duration
@@ -174,6 +179,8 @@ def __send_message(self, operation):
174179
self.__killed = True
175180
self.__data = deque(documents)
176181

182+
def _unpack_response(self, response, cursor_id, codec_options):
183+
return helpers._unpack_response(response, cursor_id, codec_options)
177184

178185
def _refresh(self):
179186
"""Refreshes the cursor with more data from the server.
@@ -188,17 +195,21 @@ def _refresh(self):
188195
if self.__id: # Get More
189196
dbname, collname = self.__ns.split('.', 1)
190197
self.__send_message(
191-
_GetMore(dbname,
192-
collname,
193-
self.__batch_size,
194-
self.__id,
195-
self.__collection.codec_options))
198+
self._getmore_class(dbname,
199+
collname,
200+
self.__batch_size,
201+
self.__id,
202+
self.__collection.codec_options))
196203

197204
else: # Cursor id is zero nothing else to return
198205
self.__killed = True
199206

200207
return len(self.__data)
201208

209+
@property
210+
def _collection(self):
211+
return self.__collection
212+
202213
@property
203214
def alive(self):
204215
"""Does this cursor have the potential to return more data?
@@ -247,3 +258,31 @@ def __enter__(self):
247258

248259
def __exit__(self, exc_type, exc_val, exc_tb):
249260
self.close()
261+
262+
263+
class RawBatchCommandCursor(CommandCursor):
264+
_getmore_class = _RawBatchGetMore
265+
266+
def __init__(self, collection, cursor_info, address, retrieved=0):
267+
"""Create a new cursor / iterator over raw batches of BSON data.
268+
269+
Should not be called directly by application developers -
270+
see :meth:`~pymongo.collection.Collection.aggregate_raw_batches`
271+
instead.
272+
273+
.. mongodoc:: cursors
274+
"""
275+
assert not cursor_info.get('firstBatch')
276+
super(RawBatchCommandCursor, self).__init__(
277+
collection, cursor_info, address, retrieved)
278+
279+
db = self._collection.database
280+
if db.outgoing_manipulators or db.outgoing_copying_manipulators:
281+
raise InvalidOperation("Raw batches are not compatible with"
282+
" SON manipulators.")
283+
284+
def _unpack_response(self, response, cursor_id, codec_options):
285+
return helpers._raw_response(response, cursor_id)
286+
287+
def __getitem__(self, index):
288+
raise InvalidOperation("Cannot call __getitem__ on RawBatchCursor")

0 commit comments

Comments
 (0)