Skip to content

PYTHON-2243 Raise informative error message when attempting a GridFS operation in a transaction #454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ Version 3.11 adds support for MongoDB 4.4. Highlights include:
:meth:`~pymongo.database.Database.command` to run the ``reIndex`` command
instead.

Unavoidable breaking changes:

- :class:`~gridfs.GridFSBucket` and :class:`~gridfs.GridFS` do not support
multi-document transactions. Running a GridFS operation in a transaction
now always raises the following error:
``InvalidOperation: GridFS does not support multi-document transactions``

.. _validate command: https://docs.mongodb.com/manual/reference/command/validate/

Issues Resolved
Expand Down
25 changes: 18 additions & 7 deletions gridfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
GridOut,
GridOutCursor,
DEFAULT_CHUNK_SIZE,
_clear_entity_type_registry)
_clear_entity_type_registry,
_disallow_transactions)
from pymongo import (ASCENDING,
DESCENDING)
from pymongo.common import UNAUTHORIZED_CODES, validate_string
Expand All @@ -50,6 +51,10 @@ def __init__(self, database, collection="fs", disable_md5=False):
computed for uploaded files. Useful in environments where MD5
cannot be used for regulatory or other reasons. Defaults to False.

.. versionchanged:: 3.11
Running a GridFS operation in a transaction now always raises an
error. GridFS does not support multi-document transactions.

.. versionchanged:: 3.1
Indexes are only ensured on the first write to the DB.

Expand All @@ -68,7 +73,6 @@ def __init__(self, database, collection="fs", disable_md5=False):
raise ConfigurationError('database must use '
'acknowledged write_concern')

self.__database = database
self.__collection = database[collection]
self.__files = self.__collection.files
self.__chunks = self.__collection.chunks
Expand All @@ -88,8 +92,6 @@ def new_file(self, **kwargs):
:Parameters:
- `**kwargs` (optional): keyword arguments for file creation
"""
# No need for __ensure_index_files_id() here; GridIn ensures
# the (files_id, n) index when needed.
return GridIn(
self.__collection, disable_md5=self.__disable_md5, **kwargs)

Expand Down Expand Up @@ -192,6 +194,7 @@ def get_version(self, filename=None, version=-1, session=None, **kwargs):
if filename is not None:
query["filename"] = filename

_disallow_transactions(session)
cursor = self.__files.find(query, session=session)
if version < 0:
skip = abs(version) - 1
Expand Down Expand Up @@ -249,6 +252,7 @@ def delete(self, file_id, session=None):
.. versionchanged:: 3.1
``delete`` no longer ensures indexes.
"""
_disallow_transactions(session)
self.__files.delete_one({"_id": file_id}, session=session)
self.__chunks.delete_many({"files_id": file_id}, session=session)

Expand All @@ -266,6 +270,7 @@ def list(self, session=None):
.. versionchanged:: 3.1
``list`` no longer ensures indexes.
"""
_disallow_transactions(session)
# With an index, distinct includes documents with no filename
# as None.
return [
Expand Down Expand Up @@ -299,6 +304,7 @@ def find_one(self, filter=None, session=None, *args, **kwargs):
if filter is not None and not isinstance(filter, abc.Mapping):
filter = {"_id": filter}

_disallow_transactions(session)
for f in self.find(filter, *args, session=session, **kwargs):
return f

Expand Down Expand Up @@ -403,6 +409,7 @@ def exists(self, document_or_id=None, session=None, **kwargs):
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
_disallow_transactions(session)
if kwargs:
f = self.__files.find_one(kwargs, ["_id"], session=session)
else:
Expand Down Expand Up @@ -439,6 +446,10 @@ def __init__(self, db, bucket_name="fs",
computed for uploaded files. Useful in environments where MD5
cannot be used for regulatory or other reasons. Defaults to False.

.. versionchanged:: 3.11
Running a GridFS operation in a transaction now always raises an
error. GridFSBucket does not support multi-document transactions.

.. versionadded:: 3.1

.. mongodoc:: gridfs
Expand All @@ -452,7 +463,6 @@ def __init__(self, db, bucket_name="fs",
if not wtc.acknowledged:
raise ConfigurationError('write concern must be acknowledged')

self._db = db
self._bucket_name = bucket_name
self._collection = db[bucket_name]
self._disable_md5 = disable_md5
Expand Down Expand Up @@ -746,6 +756,7 @@ def delete(self, file_id, session=None):
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
_disallow_transactions(session)
res = self._files.delete_one({"_id": file_id}, session=session)
self._chunks.delete_many({"files_id": file_id}, session=session)
if not res.deleted_count:
Expand Down Expand Up @@ -839,9 +850,8 @@ def open_download_stream_by_name(self, filename, revision=-1, session=None):
Added ``session`` parameter.
"""
validate_string("filename", filename)

query = {"filename": filename}

_disallow_transactions(session)
cursor = self._files.find(query, session=session)
if revision < 0:
skip = abs(revision) - 1
Expand Down Expand Up @@ -922,6 +932,7 @@ def rename(self, file_id, new_filename, session=None):
.. versionchanged:: 3.6
Added ``session`` parameter.
"""
_disallow_transactions(session)
result = self._files.update_one({"_id": file_id},
{"$set": {"filename": new_filename}},
session=session)
Expand Down
14 changes: 14 additions & 0 deletions gridfs/grid_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from pymongo.errors import (ConfigurationError,
CursorNotFound,
DuplicateKeyError,
InvalidOperation,
OperationFailure)
from pymongo.read_preferences import ReadPreference

Expand Down Expand Up @@ -105,6 +106,12 @@ def _clear_entity_type_registry(entity, **kwargs):
return entity.with_options(codec_options=codecopts, **kwargs)


def _disallow_transactions(session):
if session and session.in_transaction:
raise InvalidOperation(
'GridFS does not support multi-document transactions')


class GridIn(object):
"""Class to write data to GridFS.
"""
Expand Down Expand Up @@ -168,6 +175,7 @@ def __init__(
if not root_collection.write_concern.acknowledged:
raise ConfigurationError('root_collection must use '
'acknowledged write_concern')
_disallow_transactions(session)

# Handle alternative naming
if "content_type" in kwargs:
Expand Down Expand Up @@ -207,6 +215,7 @@ def __create_index(self, collection, index_key, unique):

def __ensure_indexes(self):
if not object.__getattribute__(self, "_ensured_index"):
_disallow_transactions(self._session)
self.__create_index(self._coll.files, _F_INDEX, False)
self.__create_index(self._coll.chunks, _C_INDEX, True)
object.__setattr__(self, "_ensured_index", True)
Expand Down Expand Up @@ -456,6 +465,7 @@ def __init__(self, root_collection, file_id=None, file_document=None,
if not isinstance(root_collection, Collection):
raise TypeError("root_collection must be an "
"instance of Collection")
_disallow_transactions(session)

root_collection = _clear_entity_type_registry(root_collection)

Expand Down Expand Up @@ -483,6 +493,7 @@ def __init__(self, root_collection, file_id=None, file_document=None,

def _ensure_file(self):
if not self._file:
_disallow_transactions(self._session)
self._file = self.__files.find_one({"_id": self.__file_id},
session=self._session)
if not self._file:
Expand Down Expand Up @@ -718,6 +729,7 @@ def _create_cursor(self):
filter = {"files_id": self._id}
if self._next_chunk > 0:
filter["n"] = {"$gte": self._next_chunk}
_disallow_transactions(self._session)
self._cursor = self._chunks.find(filter, sort=[("n", 1)],
session=self._session)

Expand Down Expand Up @@ -810,6 +822,7 @@ def __init__(self, collection, filter=None, skip=0, limit=0,

.. mongodoc:: cursors
"""
_disallow_transactions(session)
collection = _clear_entity_type_registry(collection)

# Hold on to the base "fs" collection to create GridOut objects later.
Expand All @@ -823,6 +836,7 @@ def __init__(self, collection, filter=None, skip=0, limit=0,
def next(self):
"""Get next GridOut object from cursor.
"""
_disallow_transactions(self.session)
# Work around "super is not iterable" issue in Python 3.x
next_file = super(GridOutCursor, self).next()
return GridOut(self.__root_collection, file_document=next_file,
Expand Down
50 changes: 48 additions & 2 deletions test/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@

sys.path[0:0] = [""]

from bson.py3compat import StringIO

from pymongo import client_session, WriteConcern
from pymongo.client_session import TransactionOptions
from pymongo.errors import (CollectionInvalid,
ConfigurationError,
ConnectionFailure,
InvalidOperation,
OperationFailure)
from pymongo.operations import IndexModel, InsertOne
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference

from gridfs import GridFS, GridFSBucket

from test import unittest, client_context
from test.utils import (rs_client, single_client,
wait_until, OvertCommandListener,
Expand Down Expand Up @@ -215,8 +220,7 @@ def test_unpin_for_non_transaction_operation(self):
@client_context.require_transactions
@client_context.require_version_min(4, 3, 4)
def test_create_collection(self):
client = rs_client()
self.addCleanup(client.close)
client = client_context.client
db = client.pymongo_test
coll = db.test_create_collection
self.addCleanup(coll.drop)
Expand All @@ -241,6 +245,48 @@ def create_and_insert(session):
db.create_collection(coll.name, session=s)
self.assertEqual(ctx.exception.code, 48) # NamespaceExists

@client_context.require_transactions
def test_gridfs_does_not_support_transactions(self):
client = client_context.client
db = client.pymongo_test
gfs = GridFS(db)
bucket = GridFSBucket(db)

def gridfs_find(*args, **kwargs):
return gfs.find(*args, **kwargs).next()

def gridfs_open_upload_stream(*args, **kwargs):
bucket.open_upload_stream(*args, **kwargs).write(b'1')

gridfs_ops = [
(gfs.put, (b'123',)),
(gfs.get, (1,)),
(gfs.get_version, ('name',)),
(gfs.get_last_version, ('name',)),
(gfs.delete, (1, )),
(gfs.list, ()),
(gfs.find_one, ()),
(gridfs_find, ()),
(gfs.exists, ()),
(gridfs_open_upload_stream, ('name',)),
(bucket.upload_from_stream, ('name', b'data',)),
(bucket.download_to_stream, (1, StringIO(),)),
(bucket.download_to_stream_by_name, ('name', StringIO(),)),
(bucket.delete, (1,)),
(bucket.find, ()),
(bucket.open_download_stream, (1,)),
(bucket.open_download_stream_by_name, ('name',)),
(bucket.rename, (1, 'new-name',)),
]

with client.start_session() as s, s.start_transaction():
for op, args in gridfs_ops:
with self.assertRaisesRegex(
InvalidOperation,
'GridFS does not support multi-document transactions',
):
op(*args, session=s)


class PatchSessionTimeout(object):
"""Patches the client_session's with_transaction timeout for testing."""
Expand Down