Skip to content

Commit 9cb95fe

Browse files
RUST-52 Implement Sessions API (#304)
1 parent 57beb1f commit 9cb95fe

40 files changed

+3031
-1372
lines changed

Diff for: src/client/executor.rs

+16-17
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,27 @@ impl Client {
3636
///
3737
/// Server selection will performed using the criteria specified on the operation, if any, and
3838
/// an implicit session will be created if the operation and write concern are compatible with
39-
/// sessions.
40-
pub(crate) async fn execute_operation<T: Operation>(&self, op: T) -> Result<T::O> {
39+
/// sessions and an explicit session is not provided.
40+
pub(crate) async fn execute_operation<T: Operation>(
41+
&self,
42+
op: T,
43+
session: impl Into<Option<&mut ClientSession>>,
44+
) -> Result<T::O> {
4145
// TODO RUST-9: allow unacknowledged write concerns
4246
if !op.is_acknowledged() {
4347
return Err(ErrorKind::ArgumentError {
4448
message: "Unacknowledged write concerns are not supported".to_string(),
4549
}
4650
.into());
4751
}
48-
let mut implicit_session = self.start_implicit_session(&op).await?;
49-
self.execute_operation_with_retry(op, implicit_session.as_mut())
50-
.await
52+
match session.into() {
53+
Some(session) => self.execute_operation_with_retry(op, Some(session)).await,
54+
None => {
55+
let mut implicit_session = self.start_implicit_session(&op).await?;
56+
self.execute_operation_with_retry(op, implicit_session.as_mut())
57+
.await
58+
}
59+
}
5160
}
5261

5362
/// Execute the given operation, returning the implicit session created for it if one was.
@@ -63,16 +72,6 @@ impl Client {
6372
.map(|result| (result, implicit_session))
6473
}
6574

66-
/// Execute the given operation with the given session.
67-
/// Server selection will performed using the criteria specified on the operation, if any.
68-
pub(crate) async fn execute_operation_with_session<T: Operation>(
69-
&self,
70-
op: T,
71-
session: &mut ClientSession,
72-
) -> Result<T::O> {
73-
self.execute_operation_with_retry(op, Some(session)).await
74-
}
75-
7675
/// Selects a server and executes the given operation on it, optionally using a provided
7776
/// session. Retries the operation upon failure if retryability is supported.
7877
async fn execute_operation_with_retry<T: Operation>(
@@ -324,7 +323,7 @@ impl Client {
324323
SessionSupportStatus::Supported {
325324
logical_session_timeout,
326325
} if op.supports_sessions() && op.is_acknowledged() => Ok(Some(
327-
self.start_implicit_session_with_timeout(logical_session_timeout)
326+
self.start_session_with_timeout(logical_session_timeout, None, true)
328327
.await,
329328
)),
330329
_ => Ok(None),
@@ -334,7 +333,7 @@ impl Client {
334333
/// Gets whether the topology supports sessions, and if so, returns the topology's logical
335334
/// session timeout. If it has yet to be determined if the topology supports sessions, this
336335
/// method will perform a server selection that will force that determination to be made.
337-
async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
336+
pub(crate) async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
338337
let initial_status = self.inner.topology.session_support_status().await;
339338

340339
// Need to guarantee that we're connected to at least one server that can determine if

Diff for: src/client/mod.rs

+24-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub mod auth;
22
mod executor;
33
pub mod options;
4-
mod session;
4+
pub mod session;
55

66
use std::{sync::Arc, time::Duration};
77

@@ -23,10 +23,12 @@ use crate::{
2323
ListDatabasesOptions,
2424
ReadPreference,
2525
SelectionCriteria,
26+
SessionOptions,
2627
},
2728
sdam::{SelectedServer, SessionSupportStatus, Topology},
29+
ClientSession,
2830
};
29-
pub(crate) use session::{ClientSession, ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
31+
pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
3032
use session::{ServerSession, ServerSessionPool};
3133

3234
const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
@@ -161,7 +163,7 @@ impl Client {
161163
options: impl Into<Option<ListDatabasesOptions>>,
162164
) -> Result<Vec<Document>> {
163165
let op = ListDatabases::new(filter.into(), false, options.into());
164-
self.execute_operation(op).await
166+
self.execute_operation(op, None).await
165167
}
166168

167169
/// Gets the names of the databases present in the cluster the Client is connected to.
@@ -171,7 +173,7 @@ impl Client {
171173
options: impl Into<Option<ListDatabasesOptions>>,
172174
) -> Result<Vec<String>> {
173175
let op = ListDatabases::new(filter.into(), true, options.into());
174-
match self.execute_operation(op).await {
176+
match self.execute_operation(op, None).await {
175177
Ok(databases) => databases
176178
.into_iter()
177179
.map(|doc| {
@@ -189,6 +191,18 @@ impl Client {
189191
}
190192
}
191193

194+
/// Starts a new `ClientSession`.
195+
pub async fn start_session(&self, options: Option<SessionOptions>) -> Result<ClientSession> {
196+
match self.get_session_support_status().await? {
197+
SessionSupportStatus::Supported {
198+
logical_session_timeout,
199+
} => Ok(self
200+
.start_session_with_timeout(logical_session_timeout, options, false)
201+
.await),
202+
_ => Err(ErrorKind::SessionsNotSupported.into()),
203+
}
204+
}
205+
192206
/// Check in a server session to the server session pool.
193207
/// If the session is expired or dirty, or the topology no longer supports sessions, the session
194208
/// will be discarded.
@@ -210,16 +224,20 @@ impl Client {
210224
/// This method will attempt to re-use server sessions from the pool which are not about to
211225
/// expire according to the provided logical session timeout. If no such sessions are
212226
/// available, a new one will be created.
213-
pub(crate) async fn start_implicit_session_with_timeout(
227+
pub(crate) async fn start_session_with_timeout(
214228
&self,
215229
logical_session_timeout: Duration,
230+
options: Option<SessionOptions>,
231+
is_implicit: bool,
216232
) -> ClientSession {
217-
ClientSession::new_implicit(
233+
ClientSession::new(
218234
self.inner
219235
.session_pool
220236
.check_out(logical_session_timeout)
221237
.await,
222238
self.clone(),
239+
options,
240+
is_implicit,
223241
)
224242
}
225243

Diff for: src/client/options/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -2057,3 +2057,8 @@ mod tests {
20572057
);
20582058
}
20592059
}
2060+
2061+
/// Contains the options that can be used to create a new
2062+
/// [`ClientSession`](../struct.ClientSession.html).
2063+
#[derive(Clone, Debug, Deserialize, TypedBuilder)]
2064+
pub struct SessionOptions {}

Diff for: src/client/session/cluster_time.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::bson::{Document, Timestamp};
1010
#[derive(Debug, Deserialize, Clone, Serialize, Derivative)]
1111
#[derivative(PartialEq, Eq)]
1212
#[serde(rename_all = "camelCase")]
13-
pub(crate) struct ClusterTime {
13+
pub struct ClusterTime {
1414
cluster_time: Timestamp,
1515

1616
#[derivative(PartialEq = "ignore")]

Diff for: src/client/session/mod.rs

+36-9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use uuid::Uuid;
1313

1414
use crate::{
1515
bson::{doc, spec::BinarySubtype, Binary, Bson, Document},
16+
options::SessionOptions,
1617
Client,
1718
RUNTIME,
1819
};
@@ -28,29 +29,44 @@ lazy_static! {
2829
};
2930
}
3031

31-
/// Session to be used with client operations. This acts as a handle to a server session.
32-
/// This keeps the details of how server sessions are pooled opaque to users.
32+
/// A MongoDB client session. This struct represents a logical session used for ordering sequential
33+
/// operations. To create a `ClientSession`, call `start_session` on a `Client`.
34+
///
35+
/// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread
36+
/// or process at a time.
3337
#[derive(Debug)]
34-
pub(crate) struct ClientSession {
38+
pub struct ClientSession {
3539
cluster_time: Option<ClusterTime>,
3640
server_session: ServerSession,
3741
client: Client,
3842
is_implicit: bool,
43+
options: Option<SessionOptions>,
3944
}
4045

4146
impl ClientSession {
4247
/// Creates a new `ClientSession` wrapping the provided server session.
43-
pub(crate) fn new_implicit(server_session: ServerSession, client: Client) -> Self {
48+
pub(crate) fn new(
49+
server_session: ServerSession,
50+
client: Client,
51+
options: Option<SessionOptions>,
52+
is_implicit: bool,
53+
) -> Self {
4454
Self {
4555
client,
4656
server_session,
4757
cluster_time: None,
48-
is_implicit: true,
58+
is_implicit,
59+
options,
4960
}
5061
}
5162

63+
/// The client used to create this session.
64+
pub fn client(&self) -> Client {
65+
self.client.clone()
66+
}
67+
5268
/// The id of this session.
53-
pub(crate) fn id(&self) -> &Document {
69+
pub fn id(&self) -> &Document {
5470
&self.server_session.id
5571
}
5672

@@ -61,13 +77,18 @@ impl ClientSession {
6177

6278
/// The highest seen cluster time this session has seen so far.
6379
/// This will be `None` if this session has not been used in an operation yet.
64-
pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
80+
pub fn cluster_time(&self) -> Option<&ClusterTime> {
6581
self.cluster_time.as_ref()
6682
}
6783

84+
/// The options used to create this session.
85+
pub fn options(&self) -> Option<&SessionOptions> {
86+
self.options.as_ref()
87+
}
88+
6889
/// Set the cluster time to the provided one if it is greater than this session's highest seen
6990
/// cluster time or if this session's cluster time is `None`.
70-
pub(crate) fn advance_cluster_time(&mut self, to: &ClusterTime) {
91+
pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
7192
if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
7293
self.cluster_time = Some(to.clone());
7394
}
@@ -89,6 +110,12 @@ impl ClientSession {
89110
self.server_session.txn_number += 1;
90111
self.server_session.txn_number
91112
}
113+
114+
/// Whether this session is dirty.
115+
#[cfg(test)]
116+
pub(crate) fn is_dirty(&self) -> bool {
117+
self.server_session.dirty
118+
}
92119
}
93120

94121
impl Drop for ClientSession {
@@ -109,7 +136,7 @@ impl Drop for ClientSession {
109136

110137
/// Client side abstraction of a server session. These are pooled and may be associated with
111138
/// multiple `ClientSession`s over the course of their lifetime.
112-
#[derive(Debug)]
139+
#[derive(Clone, Debug)]
113140
pub(crate) struct ServerSession {
114141
/// The id of the server session to which this corresponds.
115142
id: Document,

Diff for: src/client/session/test.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,8 @@ async fn pool_is_lifo() {
202202
return;
203203
}
204204

205-
let timeout = Duration::from_secs(60 * 60);
206-
207-
let a = client.start_implicit_session_with_timeout(timeout).await;
208-
let b = client.start_implicit_session_with_timeout(timeout).await;
205+
let a = client.start_session(None).await.unwrap();
206+
let b = client.start_session(None).await.unwrap();
209207

210208
let a_id = a.id().clone();
211209
let b_id = b.id().clone();
@@ -218,10 +216,10 @@ async fn pool_is_lifo() {
218216
drop(b);
219217
RUNTIME.delay_for(Duration::from_millis(250)).await;
220218

221-
let s1 = client.start_implicit_session_with_timeout(timeout).await;
219+
let s1 = client.start_session(None).await.unwrap();
222220
assert_eq!(s1.id(), &b_id);
223221

224-
let s2 = client.start_implicit_session_with_timeout(timeout).await;
222+
let s2 = client.start_session(None).await.unwrap();
225223
assert_eq!(s2.id(), &a_id);
226224
}
227225

Diff for: src/cmap/conn/command.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ use super::wire::Message;
44
use crate::{
55
bson::{Bson, Document},
66
bson_util,
7-
client::{options::ServerApi, ClientSession, ClusterTime},
7+
client::{options::ServerApi, ClusterTime},
88
error::{CommandError, ErrorKind, Result},
99
options::StreamAddress,
1010
selection_criteria::ReadPreference,
11+
ClientSession,
1112
};
1213

1314
/// `Command` is a driver side abstraction of a server command containing all the information

0 commit comments

Comments
 (0)