Skip to content

RUST-1373 Update unified test format runner to support SDAM integration tests #712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ authors = [
"Kaitlin Mahar <[email protected]>",
]
description = "The official MongoDB driver for Rust"
edition = "2018"
edition = "2021"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Disjoint capture in closures feature of the 2021 edition helped make using lifetimes in the runner a lot easier, so I bumped it here. Now that the MSRV is updated to the minimum for this edition, this shouldn't be a problem.

keywords = ["mongo", "mongodb", "database", "bson", "nosql"]
categories = ["asynchronous", "database", "web-programming"]
repository = "https://github.com/mongodb/mongo-rust-driver"
Expand Down Expand Up @@ -163,6 +163,7 @@ function_name = "0.2.1"
futures = "0.3"
home = "0.5"
pretty_assertions = "1.1.0"
serde = { version = "*", features = ["rc"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little surprised a bare * is allowed; this means "whatever version is used by the main dependencies block, but also with the rc feature"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, or more like the dev dependency doesn't place any additional bounds on the version.

got this trick from: https://stackoverflow.com/questions/27872009/how-do-i-use-a-feature-of-a-dependency-only-for-testing

serde_json = "1.0.64"
semver = "1.0.0"
time = "0.3.9"
Expand Down
10 changes: 10 additions & 0 deletions src/bson_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ pub(crate) fn serialize_error_as_string<S: Serializer>(
serializer.serialize_str(&val.to_string())
}

/// Serializes a Result, serializing the error value as a string if present.
pub(crate) fn serialize_result_error_as_string<S: Serializer, T: Serialize>(
val: &Result<T>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error> {
val.as_ref()
.map_err(|e| e.to_string())
.serialize(serializer)
}

#[cfg(test)]
mod test {
use crate::bson_util::num_decimal_digits;
Expand Down
5 changes: 5 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,11 @@ impl Client {
})
.ok()
}

#[cfg(test)]
pub(crate) fn topology(&self) -> &Topology {
&self.inner.topology
}
}

#[cfg(feature = "csfle")]
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/test/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct TestFile {
#[serde(default)]
pub ignore: Vec<String>,
pub fail_point: Option<Document>,
pub run_on: Option<Vec<RunOn>>,
pub(crate) run_on: Option<Vec<RunOn>>,
}

#[derive(Debug, Deserialize)]
Expand Down
40 changes: 24 additions & 16 deletions src/sdam/description/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use serde::{Deserialize, Serialize};

use crate::{
bson::{oid::ObjectId, DateTime},
bson_util,
client::ClusterTime,
error::{ErrorKind, Result},
hello::HelloReply,
options::ServerAddress,
selection_criteria::TagSet,
Expand Down Expand Up @@ -106,7 +108,8 @@ pub(crate) struct ServerDescription {
// allows us to ensure that only valid states are possible (e.g. preventing that both an error
// and a reply are present) while still making it easy to define helper methods on
// ServerDescription for information we need from the hello reply by propagating with `?`.
pub(crate) reply: Result<Option<HelloReply>, String>,
#[serde(serialize_with = "bson_util::serialize_result_error_as_string")]
pub(crate) reply: Result<Option<HelloReply>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we store String here instead of the actual error is kind of a historical quirk at this point. In order to implement the full error matching for the test runner, I needed to make this a regular Result. IMO this is an improvement anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why were we storing it to begin with, and why do we need to preserve the string behavior in the serialize implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was originally converted to being String to preserve the Clone implementation for ServerDescription back when we were pondering removing Clone from Error. We ended up keeping Error Clone, so we didn't actually need to make this change, but it was kept in since the code was already written and as "a mild perf improvement".

See #301 for the full context.

As far as preserving the serialize implementation, I just kept it as-is to avoid breaking any existing tests that might be relying on it. I think workload executor uses it, but I'm not sure. cc @isabelatkinson

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least for the workload executor it doesn't really matter what the format is. all of the serialized stuff just gets dumped into JSON logs

}

impl PartialEq for ServerDescription {
Expand All @@ -122,17 +125,22 @@ impl PartialEq for ServerDescription {

self_response == other_response
}
(Err(self_err), Err(other_err)) => self_err == other_err,
(Err(self_err), Err(other_err)) => {
match (self_err.kind.as_ref(), other_err.kind.as_ref()) {
(
ErrorKind::Command(self_command_err),
ErrorKind::Command(other_command_err),
) => self_command_err.code == other_command_err.code,
_ => self_err.to_string() == other_err.to_string(),
}
}
_ => false,
}
}
}

impl ServerDescription {
pub(crate) fn new(
mut address: ServerAddress,
hello_reply: Option<Result<HelloReply, String>>,
) -> Self {
pub(crate) fn new(mut address: ServerAddress, hello_reply: Option<Result<HelloReply>>) -> Self {
address = ServerAddress::Tcp {
host: address.host().to_lowercase(),
port: address.port(),
Expand Down Expand Up @@ -231,7 +239,7 @@ impl ServerDescription {
None
}

pub(crate) fn set_name(&self) -> Result<Option<String>, String> {
pub(crate) fn set_name(&self) -> Result<Option<String>> {
let set_name = self
.reply
.as_ref()
Expand All @@ -241,7 +249,7 @@ impl ServerDescription {
Ok(set_name)
}

pub(crate) fn known_hosts(&self) -> Result<impl Iterator<Item = &String>, String> {
pub(crate) fn known_hosts(&self) -> Result<impl Iterator<Item = &String>> {
let known_hosts = self
.reply
.as_ref()
Expand All @@ -262,7 +270,7 @@ impl ServerDescription {
Ok(known_hosts.into_iter().flatten())
}

pub(crate) fn invalid_me(&self) -> Result<bool, String> {
pub(crate) fn invalid_me(&self) -> Result<bool> {
if let Some(ref reply) = self.reply.as_ref().map_err(Clone::clone)? {
if let Some(ref me) = reply.command_response.me {
return Ok(&self.address.to_string() != me);
Expand All @@ -272,7 +280,7 @@ impl ServerDescription {
Ok(false)
}

pub(crate) fn set_version(&self) -> Result<Option<i32>, String> {
pub(crate) fn set_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
Expand All @@ -282,7 +290,7 @@ impl ServerDescription {
Ok(me)
}

pub(crate) fn election_id(&self) -> Result<Option<ObjectId>, String> {
pub(crate) fn election_id(&self) -> Result<Option<ObjectId>> {
let me = self
.reply
.as_ref()
Expand All @@ -293,7 +301,7 @@ impl ServerDescription {
}

#[cfg(test)]
pub(crate) fn min_wire_version(&self) -> Result<Option<i32>, String> {
pub(crate) fn min_wire_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
Expand All @@ -303,7 +311,7 @@ impl ServerDescription {
Ok(me)
}

pub(crate) fn max_wire_version(&self) -> Result<Option<i32>, String> {
pub(crate) fn max_wire_version(&self) -> Result<Option<i32>> {
let me = self
.reply
.as_ref()
Expand All @@ -313,7 +321,7 @@ impl ServerDescription {
Ok(me)
}

pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>, String> {
pub(crate) fn last_write_date(&self) -> Result<Option<DateTime>> {
match self.reply {
Ok(None) => Ok(None),
Ok(Some(ref reply)) => Ok(reply
Expand All @@ -325,7 +333,7 @@ impl ServerDescription {
}
}

pub(crate) fn logical_session_timeout(&self) -> Result<Option<Duration>, String> {
pub(crate) fn logical_session_timeout(&self) -> Result<Option<Duration>> {
match self.reply {
Ok(None) => Ok(None),
Ok(Some(ref reply)) => Ok(reply
Expand All @@ -336,7 +344,7 @@ impl ServerDescription {
}
}

pub(crate) fn cluster_time(&self) -> Result<Option<ClusterTime>, String> {
pub(crate) fn cluster_time(&self) -> Result<Option<ClusterTime>> {
match self.reply {
Ok(None) => Ok(None),
Ok(Some(ref reply)) => Ok(reply.cluster_time.clone()),
Expand Down
45 changes: 16 additions & 29 deletions src/sdam/description/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
bson::oid::ObjectId,
client::ClusterTime,
cmap::Command,
error::{Error, Result},
options::{ClientOptions, ServerAddress},
sdam::{
description::server::{ServerDescription, ServerType},
Expand Down Expand Up @@ -460,10 +461,7 @@ impl TopologyDescription {

/// Update the topology based on the new information about the topology contained by the
/// ServerDescription.
pub(crate) fn update(
&mut self,
mut server_description: ServerDescription,
) -> Result<(), String> {
pub(crate) fn update(&mut self, mut server_description: ServerDescription) -> Result<()> {
// Ignore updates from servers not currently in the cluster.
if !self.servers.contains_key(&server_description.address) {
return Ok(());
Expand Down Expand Up @@ -516,10 +514,7 @@ impl TopologyDescription {
}

/// Update the Unknown topology description based on the server description.
fn update_unknown_topology(
&mut self,
server_description: ServerDescription,
) -> Result<(), String> {
fn update_unknown_topology(&mut self, server_description: ServerDescription) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RsGhost => {}
ServerType::Standalone => {
Expand All @@ -535,7 +530,7 @@ impl TopologyDescription {
self.update_rs_without_primary_server(server_description)?;
}
ServerType::LoadBalancer => {
return Err("cannot transition to a load balancer".to_string())
return Err(Error::internal("cannot transition to a load balancer"))
}
}

Expand All @@ -556,7 +551,7 @@ impl TopologyDescription {
fn update_replica_set_no_primary_topology(
&mut self,
server_description: ServerDescription,
) -> Result<(), String> {
) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RsGhost => {}
ServerType::Standalone | ServerType::Mongos => {
Expand All @@ -570,7 +565,7 @@ impl TopologyDescription {
self.update_rs_without_primary_server(server_description)?;
}
ServerType::LoadBalancer => {
return Err("cannot transition to a load balancer".to_string())
return Err(Error::internal("cannot transition to a load balancer"))
}
}

Expand All @@ -581,7 +576,7 @@ impl TopologyDescription {
fn update_replica_set_with_primary_topology(
&mut self,
server_description: ServerDescription,
) -> Result<(), String> {
) -> Result<()> {
match server_description.server_type {
ServerType::Unknown | ServerType::RsGhost => {
self.record_primary_state();
Expand All @@ -595,7 +590,7 @@ impl TopologyDescription {
self.update_rs_with_primary_from_member(server_description)?;
}
ServerType::LoadBalancer => {
return Err("cannot transition to a load balancer".to_string())
return Err(Error::internal("cannot transition to a load balancer"));
}
}

Expand All @@ -616,7 +611,7 @@ impl TopologyDescription {
fn update_rs_without_primary_server(
&mut self,
server_description: ServerDescription,
) -> Result<(), String> {
) -> Result<()> {
if self.set_name.is_none() {
self.set_name = server_description.set_name()?;
} else if self.set_name != server_description.set_name()? {
Expand All @@ -639,7 +634,7 @@ impl TopologyDescription {
fn update_rs_with_primary_from_member(
&mut self,
server_description: ServerDescription,
) -> Result<(), String> {
) -> Result<()> {
if self.set_name != server_description.set_name()? {
self.servers.remove(&server_description.address);
self.record_primary_state();
Expand All @@ -661,7 +656,7 @@ impl TopologyDescription {
fn update_rs_from_primary_server(
&mut self,
server_description: ServerDescription,
) -> Result<(), String> {
) -> Result<()> {
if self.set_name.is_none() {
self.set_name = server_description.set_name()?;
} else if self.set_name != server_description.set_name()? {
Expand Down Expand Up @@ -750,13 +745,8 @@ impl TopologyDescription {
}

/// Create a new ServerDescription for each address and add it to the topology.
fn add_new_servers<'a>(
&mut self,
servers: impl Iterator<Item = &'a String>,
) -> Result<(), String> {
let servers: Result<Vec<_>, String> = servers
.map(|server| ServerAddress::parse(server).map_err(|e| e.to_string()))
.collect();
fn add_new_servers<'a>(&mut self, servers: impl Iterator<Item = &'a String>) -> Result<()> {
let servers: Result<Vec<_>> = servers.map(ServerAddress::parse).collect();

self.add_new_servers_from_addresses(servers?.iter());
Ok(())
Expand Down Expand Up @@ -856,16 +846,13 @@ pub(crate) struct TopologyDescriptionDiff<'a> {
}

fn verify_max_staleness(max_staleness: Option<Duration>) -> crate::error::Result<()> {
verify_max_staleness_inner(max_staleness)
.map_err(|s| crate::error::ErrorKind::InvalidArgument { message: s }.into())
}

fn verify_max_staleness_inner(max_staleness: Option<Duration>) -> std::result::Result<(), String> {
if max_staleness
.map(|staleness| staleness > Duration::from_secs(0) && staleness < Duration::from_secs(90))
.unwrap_or(false)
{
return Err("max staleness cannot be both positive and below 90 seconds".into());
return Err(Error::invalid_argument(
"max staleness cannot be both positive and below 90 seconds",
));
}

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions src/sdam/description/topology/server_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ impl TopologyDescription {
.filter(move |server| types.contains(&server.server_type))
}

#[cfg(test)]
pub(crate) fn primary(&self) -> Option<&ServerDescription> {
self.servers_with_type(&[ServerType::RsPrimary]).next()
}

fn suitable_servers_in_replica_set<'a>(
&self,
read_preference: &'a ReadPreference,
Expand Down
6 changes: 6 additions & 0 deletions src/sdam/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Serialize;
pub use crate::sdam::description::{server::ServerType, topology::TopologyType};
use crate::{
bson::DateTime,
error::Error,
hello::HelloCommandResponse,
options::ServerAddress,
sdam::ServerDescription,
Expand Down Expand Up @@ -100,6 +101,11 @@ impl<'a> ServerInfo<'a> {
pub fn tags(&self) -> Option<&TagSet> {
self.command_response_getter(|r| r.tags.as_ref())
}

/// Gets the error this server encountered, if any.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we make this a little more descriptive to clarify it's whatever error caused the server state to change and not just any error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

pub fn error(&self) -> Option<&Error> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed RUST-1432 for this change.

self.description.reply.as_ref().err()
}
}

impl<'a> fmt::Debug for ServerInfo<'a> {
Expand Down
Loading