-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-4542 Improved sessions API #2335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 9 commits
ebb1d62
e090035
de89b23
85add85
be283e8
89641f5
657fcc2
8e008ed
0f9e93a
3c68a70
71eca08
55e833f
c36e9df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,6 +139,7 @@ | |
import time | ||
import uuid | ||
from collections.abc import Mapping as _Mapping | ||
from contextvars import ContextVar | ||
from typing import ( | ||
TYPE_CHECKING, | ||
Any, | ||
|
@@ -204,6 +205,7 @@ def __init__( | |
causal_consistency: Optional[bool] = None, | ||
default_transaction_options: Optional[TransactionOptions] = None, | ||
snapshot: Optional[bool] = False, | ||
bind: Optional[bool] = False, | ||
) -> None: | ||
if snapshot: | ||
if causal_consistency: | ||
|
@@ -222,6 +224,7 @@ def __init__( | |
) | ||
self._default_transaction_options = default_transaction_options | ||
self._snapshot = snapshot | ||
self._bind = bind | ||
|
||
@property | ||
def causal_consistency(self) -> bool: | ||
|
@@ -545,9 +548,12 @@ def _check_ended(self) -> None: | |
raise InvalidOperation("Cannot use ended session") | ||
|
||
async def __aenter__(self) -> AsyncClientSession: | ||
self._token = _SESSION.set(self) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should only do this if the bind option is set to True. |
||
return self | ||
|
||
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: | ||
if self._token: | ||
_SESSION.reset(self._token) | ||
await self._end_session(lock=True) | ||
|
||
@property | ||
|
@@ -1065,6 +1071,9 @@ def __copy__(self) -> NoReturn: | |
raise TypeError("A AsyncClientSession cannot be copied, create a new session instead") | ||
|
||
|
||
_SESSION: ContextVar[Optional[AsyncClientSession]] = ContextVar("SESSION", default=None) | ||
|
||
|
||
class _EmptyServerSession: | ||
__slots__ = "dirty", "started_retryable_write" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,7 +65,7 @@ | |
from pymongo.asynchronous import client_session, database, uri_parser | ||
from pymongo.asynchronous.change_stream import AsyncChangeStream, AsyncClusterChangeStream | ||
from pymongo.asynchronous.client_bulk import _AsyncClientBulk | ||
from pymongo.asynchronous.client_session import _EmptyServerSession | ||
from pymongo.asynchronous.client_session import _SESSION, _EmptyServerSession | ||
from pymongo.asynchronous.command_cursor import AsyncCommandCursor | ||
from pymongo.asynchronous.settings import TopologySettings | ||
from pymongo.asynchronous.topology import Topology, _ErrorContext | ||
|
@@ -1355,13 +1355,18 @@ def _close_cursor_soon( | |
def _start_session(self, implicit: bool, **kwargs: Any) -> AsyncClientSession: | ||
server_session = _EmptyServerSession() | ||
opts = client_session.SessionOptions(**kwargs) | ||
return client_session.AsyncClientSession(self, server_session, opts, implicit) | ||
bind = opts._bind | ||
session = client_session.AsyncClientSession(self, server_session, opts, implicit) | ||
if bind: | ||
_SESSION.set(session) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be removed. We should only bind in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 3c68a70 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ShaneHarvey OK to resolve this one? |
||
return session | ||
|
||
def start_session( | ||
self, | ||
causal_consistency: Optional[bool] = None, | ||
default_transaction_options: Optional[client_session.TransactionOptions] = None, | ||
snapshot: Optional[bool] = False, | ||
bind: Optional[bool] = False, | ||
) -> client_session.AsyncClientSession: | ||
"""Start a logical session. | ||
|
||
|
@@ -1384,6 +1389,7 @@ def start_session( | |
causal_consistency=causal_consistency, | ||
default_transaction_options=default_transaction_options, | ||
snapshot=snapshot, | ||
bind=bind, | ||
) | ||
|
||
def _ensure_session( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to bind/unbind the session in
ClientSession.__enter__/__exit__
. That way the stack of sessions is managed correctly (ie we call_SESSION.reset(token)
). Think about how nested cases will work: