Skip to content

RUST-52 Implement Sessions API #304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,27 @@ impl Client {
///
/// Server selection will performed using the criteria specified on the operation, if any, and
/// an implicit session will be created if the operation and write concern are compatible with
/// sessions.
pub(crate) async fn execute_operation<T: Operation>(&self, op: T) -> Result<T::O> {
/// sessions and an explicit session is not provided.
pub(crate) async fn execute_operation<T: Operation>(
&self,
op: T,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<T::O> {
// TODO RUST-9: allow unacknowledged write concerns
if !op.is_acknowledged() {
return Err(ErrorKind::ArgumentError {
message: "Unacknowledged write concerns are not supported".to_string(),
}
.into());
}
let mut implicit_session = self.start_implicit_session(&op).await?;
self.execute_operation_with_retry(op, implicit_session.as_mut())
.await
match session.into() {
Some(session) => self.execute_operation_with_retry(op, Some(session)).await,
None => {
let mut implicit_session = self.start_implicit_session(&op).await?;
self.execute_operation_with_retry(op, implicit_session.as_mut())
.await
}
}
}

/// Execute the given operation, returning the implicit session created for it if one was.
Expand All @@ -63,16 +72,6 @@ impl Client {
.map(|result| (result, implicit_session))
}

/// Execute the given operation with the given session.
/// Server selection will performed using the criteria specified on the operation, if any.
pub(crate) async fn execute_operation_with_session<T: Operation>(
&self,
op: T,
session: &mut ClientSession,
) -> Result<T::O> {
self.execute_operation_with_retry(op, Some(session)).await
}

/// Selects a server and executes the given operation on it, optionally using a provided
/// session. Retries the operation upon failure if retryability is supported.
async fn execute_operation_with_retry<T: Operation>(
Expand Down Expand Up @@ -324,7 +323,7 @@ impl Client {
SessionSupportStatus::Supported {
logical_session_timeout,
} if op.supports_sessions() && op.is_acknowledged() => Ok(Some(
self.start_implicit_session_with_timeout(logical_session_timeout)
self.start_session_with_timeout(logical_session_timeout, None, true)
.await,
)),
_ => Ok(None),
Expand All @@ -334,7 +333,7 @@ impl Client {
/// Gets whether the topology supports sessions, and if so, returns the topology's logical
/// session timeout. If it has yet to be determined if the topology supports sessions, this
/// method will perform a server selection that will force that determination to be made.
async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
pub(crate) async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
let initial_status = self.inner.topology.session_support_status().await;

// Need to guarantee that we're connected to at least one server that can determine if
Expand Down
30 changes: 24 additions & 6 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod auth;
mod executor;
pub mod options;
mod session;
pub mod session;

use std::{sync::Arc, time::Duration};

Expand All @@ -23,10 +23,12 @@ use crate::{
ListDatabasesOptions,
ReadPreference,
SelectionCriteria,
SessionOptions,
},
sdam::{SelectedServer, SessionSupportStatus, Topology},
ClientSession,
};
pub(crate) use session::{ClientSession, ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
use session::{ServerSession, ServerSessionPool};

const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -161,7 +163,7 @@ impl Client {
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<Document>> {
let op = ListDatabases::new(filter.into(), false, options.into());
self.execute_operation(op).await
self.execute_operation(op, None).await
}

/// Gets the names of the databases present in the cluster the Client is connected to.
Expand All @@ -171,7 +173,7 @@ impl Client {
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<String>> {
let op = ListDatabases::new(filter.into(), true, options.into());
match self.execute_operation(op).await {
match self.execute_operation(op, None).await {
Ok(databases) => databases
.into_iter()
.map(|doc| {
Expand All @@ -189,6 +191,18 @@ impl Client {
}
}

/// Starts a new `ClientSession`.
pub async fn start_session(&self, options: Option<SessionOptions>) -> Result<ClientSession> {
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
} => Ok(self
.start_session_with_timeout(logical_session_timeout, options, false)
.await),
_ => Err(ErrorKind::SessionsNotSupported.into()),
}
}

/// Check in a server session to the server session pool.
/// If the session is expired or dirty, or the topology no longer supports sessions, the session
/// will be discarded.
Expand All @@ -210,16 +224,20 @@ impl Client {
/// This method will attempt to re-use server sessions from the pool which are not about to
/// expire according to the provided logical session timeout. If no such sessions are
/// available, a new one will be created.
pub(crate) async fn start_implicit_session_with_timeout(
pub(crate) async fn start_session_with_timeout(
&self,
logical_session_timeout: Duration,
options: Option<SessionOptions>,
is_implicit: bool,
) -> ClientSession {
ClientSession::new_implicit(
ClientSession::new(
self.inner
.session_pool
.check_out(logical_session_timeout)
.await,
self.clone(),
options,
is_implicit,
)
}

Expand Down
5 changes: 5 additions & 0 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2057,3 +2057,8 @@ mod tests {
);
}
}

/// Contains the options that can be used to create a new
/// [`ClientSession`](../struct.ClientSession.html).
#[derive(Clone, Debug, Deserialize, TypedBuilder)]
pub struct SessionOptions {}
2 changes: 1 addition & 1 deletion src/client/session/cluster_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::bson::{Document, Timestamp};
#[derive(Debug, Deserialize, Clone, Serialize, Derivative)]
#[derivative(PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ClusterTime {
pub struct ClusterTime {
cluster_time: Timestamp,

#[derivative(PartialEq = "ignore")]
Expand Down
45 changes: 36 additions & 9 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use uuid::Uuid;

use crate::{
bson::{doc, spec::BinarySubtype, Binary, Bson, Document},
options::SessionOptions,
Client,
RUNTIME,
};
Expand All @@ -28,29 +29,44 @@ lazy_static! {
};
}

/// Session to be used with client operations. This acts as a handle to a server session.
/// This keeps the details of how server sessions are pooled opaque to users.
/// A MongoDB client session. This struct represents a logical session used for ordering sequential
/// operations. To create a `ClientSession`, call `start_session` on a `Client`.
///
/// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread
/// or process at a time.
#[derive(Debug)]
pub(crate) struct ClientSession {
pub struct ClientSession {
cluster_time: Option<ClusterTime>,
server_session: ServerSession,
client: Client,
is_implicit: bool,
options: Option<SessionOptions>,
}

impl ClientSession {
/// Creates a new `ClientSession` wrapping the provided server session.
pub(crate) fn new_implicit(server_session: ServerSession, client: Client) -> Self {
pub(crate) fn new(
server_session: ServerSession,
client: Client,
options: Option<SessionOptions>,
is_implicit: bool,
) -> Self {
Self {
client,
server_session,
cluster_time: None,
is_implicit: true,
is_implicit,
options,
}
}

/// The client used to create this session.
pub fn client(&self) -> Client {
self.client.clone()
}

/// The id of this session.
pub(crate) fn id(&self) -> &Document {
pub fn id(&self) -> &Document {
&self.server_session.id
}

Expand All @@ -61,13 +77,18 @@ impl ClientSession {

/// The highest seen cluster time this session has seen so far.
/// This will be `None` if this session has not been used in an operation yet.
pub(crate) fn cluster_time(&self) -> Option<&ClusterTime> {
pub fn cluster_time(&self) -> Option<&ClusterTime> {
self.cluster_time.as_ref()
}

/// The options used to create this session.
pub fn options(&self) -> Option<&SessionOptions> {
self.options.as_ref()
}

/// Set the cluster time to the provided one if it is greater than this session's highest seen
/// cluster time or if this session's cluster time is `None`.
pub(crate) fn advance_cluster_time(&mut self, to: &ClusterTime) {
pub fn advance_cluster_time(&mut self, to: &ClusterTime) {
if self.cluster_time().map(|ct| ct < to).unwrap_or(true) {
self.cluster_time = Some(to.clone());
}
Expand All @@ -89,6 +110,12 @@ impl ClientSession {
self.server_session.txn_number += 1;
self.server_session.txn_number
}

/// Whether this session is dirty.
#[cfg(test)]
pub(crate) fn is_dirty(&self) -> bool {
self.server_session.dirty
}
}

impl Drop for ClientSession {
Expand All @@ -109,7 +136,7 @@ impl Drop for ClientSession {

/// Client side abstraction of a server session. These are pooled and may be associated with
/// multiple `ClientSession`s over the course of their lifetime.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub(crate) struct ServerSession {
/// The id of the server session to which this corresponds.
id: Document,
Expand Down
10 changes: 4 additions & 6 deletions src/client/session/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,8 @@ async fn pool_is_lifo() {
return;
}

let timeout = Duration::from_secs(60 * 60);

let a = client.start_implicit_session_with_timeout(timeout).await;
let b = client.start_implicit_session_with_timeout(timeout).await;
let a = client.start_session(None).await.unwrap();
let b = client.start_session(None).await.unwrap();

let a_id = a.id().clone();
let b_id = b.id().clone();
Expand All @@ -218,10 +216,10 @@ async fn pool_is_lifo() {
drop(b);
RUNTIME.delay_for(Duration::from_millis(250)).await;

let s1 = client.start_implicit_session_with_timeout(timeout).await;
let s1 = client.start_session(None).await.unwrap();
assert_eq!(s1.id(), &b_id);

let s2 = client.start_implicit_session_with_timeout(timeout).await;
let s2 = client.start_session(None).await.unwrap();
assert_eq!(s2.id(), &a_id);
}

Expand Down
3 changes: 2 additions & 1 deletion src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use super::wire::Message;
use crate::{
bson::{Bson, Document},
bson_util,
client::{options::ServerApi, ClientSession, ClusterTime},
client::{options::ServerApi, ClusterTime},
error::{CommandError, ErrorKind, Result},
options::StreamAddress,
selection_criteria::ReadPreference,
ClientSession,
};

/// `Command` is a driver side abstraction of a server command containing all the information
Expand Down
Loading