diff --git a/src/client/options/resolver_config.rs b/src/client/options/resolver_config.rs index 5bb43b7e8..67d556c2a 100644 --- a/src/client/options/resolver_config.rs +++ b/src/client/options/resolver_config.rs @@ -13,7 +13,7 @@ impl ResolverConfig { /// Creates a default configuration, using 1.1.1.1, 1.0.0.1 and 2606:4700:4700::1111, /// 2606:4700:4700::1001 (thank you, Cloudflare). /// - /// Please see: https://www.cloudflare.com/dns/ + /// Please see: pub fn cloudflare() -> Self { ResolverConfig { inner: TrustDnsResolverConfig::cloudflare(), @@ -34,7 +34,7 @@ impl ResolverConfig { /// Creates a configuration, using 9.9.9.9, 149.112.112.112 and 2620:fe::fe, 2620:fe::fe:9, the /// “secure” variants of the quad9 settings (thank you, Quad9). /// - /// Please see: https://www.quad9.net/faq/ + /// Please see: pub fn quad9() -> Self { ResolverConfig { inner: TrustDnsResolverConfig::quad9(), diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 644315f0e..1287e0e2a 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -37,6 +37,67 @@ lazy_static! { /// /// `ClientSession` instances are not thread safe or fork safe. They can only be used by one thread /// or process at a time. +/// +/// ## Transactions +/// Transactions are used to execute a series of operations across multiple documents and +/// collections atomically. For more information about when and how to use transactions in MongoDB, +/// see the [manual](https://docs.mongodb.com/manual/core/transactions/). +/// +/// Replica set transactions are supported on MongoDB 4.0+. Transactions are associated with a +/// `ClientSession`. To begin a transaction, call [`ClientSession::start_transaction`] on a +/// `ClientSession`. The `ClientSession` must be passed to operations to be executed within the +/// transaction. +/// +/// ```rust +/// use mongodb::{ +/// bson::doc, +/// error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, +/// options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern}, +/// # Client, +/// ClientSession, +/// Collection, +/// }; +/// +/// # async fn do_stuff() -> Result<()> { +/// # let client = Client::with_uri_str("mongodb://example.com").await?; +/// # let coll = client.database("foo").collection("bar"); +/// let mut session = client.start_session(None).await?; +/// let options = TransactionOptions::builder() +/// .read_concern(ReadConcern::majority()) +/// .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build()) +/// .build(); +/// session.start_transaction(options).await?; +/// // A "TransientTransactionError" label indicates that the entire transaction can be retried +/// // with a reasonable expectation that it will succeed. +/// while let Err(error) = execute_transaction(&coll, &mut session).await { +/// if !error.contains_label(TRANSIENT_TRANSACTION_ERROR) { +/// break; +/// } +/// } +/// # Ok(()) +/// # } +/// +/// async fn execute_transaction(coll: &Collection, session: &mut ClientSession) -> Result<()> { +/// coll.insert_one_with_session(doc! { "x": 1 }, None, session).await?; +/// coll.delete_one_with_session(doc! { "y": 2 }, None, session).await?; +/// // An "UnknownTransactionCommitResult" label indicates that it is unknown whether the +/// // commit has satisfied the write concern associated with the transaction. If an error +/// // with this label is returned, it is safe to retry the commit until the write concern is +/// // satisfied or an error without the label is returned. +/// loop { +/// let result = session.commit_transaction().await; +/// if let Err(ref error) = result { +/// if error.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { +/// continue; +/// } +/// } +/// result? +/// } +/// } +/// ``` +// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded +// transactions are supported on 4.2+ +/// Note: the driver does not currently support transactions on sharded clusters. #[derive(Clone, Debug)] pub struct ClientSession { cluster_time: Option, @@ -193,6 +254,10 @@ impl ClientSession { /// be passed into each operation within the transaction; otherwise, the operation will be /// executed outside of the transaction. /// + /// Errors returned from operations executed within a transaction may include a + /// [`crate::error::TRANSIENT_TRANSACTION_ERROR`] label. This label indicates that the entire + /// transaction can be retried with a reasonable expectation that it will succeed. + /// /// Transactions are supported on MongoDB 4.0+. The Rust driver currently only supports /// transactions on replica sets. /// @@ -276,6 +341,13 @@ impl ClientSession { /// Commits the transaction that is currently active on this session. /// + /// + /// This method may return an error with a [`crate::error::UNKNOWN_TRANSACTION_COMMIT_RESULT`] + /// label. This label indicates that it is unknown whether the commit has satisfied the write + /// concern associated with the transaction. If an error with this label is returned, it is + /// safe to retry the commit until the write concern is satisfied or an error without the label + /// is returned. + /// /// ```rust /// # use mongodb::{bson::{doc, Document}, error::Result, Client, ClientSession}; /// # @@ -344,14 +416,14 @@ impl ClientSession { /// # let coll = client.database("foo").collection::("bar"); /// # let mut session = client.start_session(None).await?; /// session.start_transaction(None).await?; - /// match execute_transaction(coll, &mut session).await { + /// match execute_transaction(&coll, &mut session).await { /// Ok(_) => session.commit_transaction().await?, /// Err(_) => session.abort_transaction().await?, /// } /// # Ok(()) /// # } /// - /// async fn execute_transaction(coll: Collection, session: &mut ClientSession) -> Result<()> { + /// async fn execute_transaction(coll: &Collection, session: &mut ClientSession) -> Result<()> { /// coll.insert_one_with_session(doc! { "x": 1 }, None, session).await?; /// coll.delete_one_with_session(doc! { "y": 2 }, None, session).await?; /// Ok(()) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 3c5af3ba7..733934585 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -146,7 +146,7 @@ impl AsyncRuntime { } /// Create a new `Interval` that yields with interval of `duration`. - /// See: https://docs.rs/tokio/latest/tokio/time/fn.interval.html + /// See: pub(crate) fn interval(self, duration: Duration) -> Interval { match self { #[cfg(feature = "tokio-runtime")] diff --git a/src/test/spec/initial_dns_seedlist_discovery.rs b/src/test/spec/initial_dns_seedlist_discovery.rs index 7c9d5f6be..f35d3dd2a 100644 --- a/src/test/spec/initial_dns_seedlist_discovery.rs +++ b/src/test/spec/initial_dns_seedlist_discovery.rs @@ -44,7 +44,7 @@ async fn run() { // "encoded-userinfo-and-db.json" specifies a database name with a question mark which is // disallowed on Windows. See - // https://docs.mongodb.com/manual/reference/limits/#restrictions-on-db-names + // if let Some(ref mut options) = test_file.parsed_options { if options.db.as_deref() == Some("mydb?") && cfg!(target_os = "windows") { options.db = Some("mydb".to_string()); diff --git a/src/test/util/failpoint.rs b/src/test/util/failpoint.rs index 5cc83a4ae..6368cb015 100644 --- a/src/test/util/failpoint.rs +++ b/src/test/util/failpoint.rs @@ -23,7 +23,7 @@ impl FailPoint { } /// Create a failCommand failpoint. - /// See https://github.com/mongodb/mongo/wiki/The-%22failCommand%22-fail-point for more info. + /// See for more info. pub fn fail_command( fail_commands: &[&str], mode: FailPointMode, diff --git a/tests/transaction_examples.rs b/tests/transaction_examples.rs new file mode 100644 index 000000000..aeef81192 --- /dev/null +++ b/tests/transaction_examples.rs @@ -0,0 +1,74 @@ +#![allow(dead_code)] +#![cfg(not(feature = "sync"))] + +// START TRANSACTIONS EXAMPLE +use mongodb::{ + bson::{doc, Document}, + error::{Result, TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}, + options::{Acknowledgment, ReadConcern, TransactionOptions, WriteConcern}, + ClientSession, +}; + +async fn update_employee_info(session: &mut ClientSession) -> Result<()> { + let transaction_options = TransactionOptions::builder() + .read_concern(ReadConcern::snapshot()) + .write_concern(WriteConcern::builder().w(Acknowledgment::Majority).build()) + .build(); + session.start_transaction(transaction_options).await?; + + execute_transaction_with_retry(session).await +} + +async fn execute_transaction_with_retry(session: &mut ClientSession) -> Result<()> { + while let Err(err) = execute_employee_info_transaction(session).await { + println!("Transaction aborted. Error returned during transaction."); + if err.contains_label(TRANSIENT_TRANSACTION_ERROR) { + println!("Encountered TransientTransactionError, retrying transaction."); + continue; + } else { + return Err(err); + } + } + Ok(()) +} + +async fn execute_employee_info_transaction(session: &mut ClientSession) -> Result<()> { + let client = session.client(); + let employees = client.database("hr").collection::("employees"); + let events = client + .database("reporting") + .collection::("events"); + + employees + .update_one_with_session( + doc! { "employee": 3 }, + doc! { "$set": { "status": "Inactive" } }, + None, + session, + ) + .await?; + events + .insert_one_with_session( + doc! { "employee": 3, "status": { "new": "Inactive", "old": "Active" } }, + None, + session, + ) + .await?; + + commit_with_retry(session).await +} + +async fn commit_with_retry(session: &mut ClientSession) -> Result<()> { + while let Err(err) = session.commit_transaction().await { + if err.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { + println!("Encountered UnknownTransactionCommitResult, retrying commit operation."); + continue; + } else { + println!("Encountered non-retryable error during commit."); + return Err(err); + } + } + println!("Transaction committed."); + Ok(()) +} +// END TRANSACTIONS EXAMPLE