From fd142dc68f159f91544b46c7f0d874f2ed1bb69f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Thu, 8 Oct 2020 20:40:27 +0200 Subject: [PATCH 01/13] Fixes logging of target names with dashes (#7281) * Fixes logging of target names with dashes There was a bug in tracing-core which resulted in not supporting dashes in target names. This was fixed upstream. Besides that a test was added to ensure that we don't break this again. * Extend test --- Cargo.lock | 19 ++--- client/cli/src/lib.rs | 73 ++++++++++++++++++++ client/tracing/Cargo.toml | 6 +- primitives/io/Cargo.toml | 2 +- primitives/runtime-interface/test/Cargo.toml | 2 +- primitives/tracing/Cargo.toml | 22 +++--- 6 files changed, 100 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb1afaae13..9e2b1ff575 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9443,12 +9443,13 @@ checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" [[package]] name = "tracing" -version = "0.1.19" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c" +checksum = "b0987850db3733619253fe60e17cb59b82d37c7e6c0236bb81e4d6b87c879f27" dependencies = [ "cfg-if", "log", + "pin-project-lite", "tracing-attributes", "tracing-core", ] @@ -9466,9 +9467,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bcf46c1f1f06aeea2d6b81f3c863d0930a596c86ad1920d4e5bad6dd1d7119a" +checksum = "f50de3927f93d202783f4513cda820ab47ef17f624b03c096e86ef00c67e6b5f" dependencies = [ "lazy_static", ] @@ -9486,9 +9487,9 @@ dependencies = [ [[package]] name = "tracing-serde" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" +checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" dependencies = [ "serde", "tracing-core", @@ -9496,9 +9497,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.10" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b33f8b2ef2ab0c3778c12646d9c42a24f7772bee4cdafc72199644a9f58fdc" +checksum = "4ef0a5e15477aa303afbfac3a44cba9b6430fdaad52423b1e6c0dbbe28c3eedd" dependencies = [ "ansi_term 0.12.1", "chrono", @@ -9509,6 +9510,8 @@ dependencies = [ "serde_json", "sharded-slab", "smallvec 1.4.1", + "thread_local", + "tracing", "tracing-core", "tracing-log", "tracing-serde", diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index f16d02cab5..6e9a1b52f7 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -299,3 +299,76 @@ pub fn init_logger( } Ok(()) } +mod tests { + use super::*; + use tracing::{metadata::Kind, subscriber::Interest, Callsite, Level, Metadata}; + use std::{process::Command, env}; + + #[test] + fn test_logger_filters() { + let test_pattern = "afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error"; + init_logger(&test_pattern, Default::default(), Default::default()).unwrap(); + + tracing::dispatcher::get_default(|dispatcher| { + let test_filter = |target, level| { + struct DummyCallSite; + impl Callsite for DummyCallSite { + fn set_interest(&self, _: Interest) {} + fn metadata(&self) -> &Metadata<'_> { + unreachable!(); + } + } + + let metadata = tracing::metadata!( + name: "", + target: target, + level: level, + fields: &[], + callsite: &DummyCallSite, + kind: Kind::SPAN, + ); + + dispatcher.enabled(&metadata) + }; + + assert!(test_filter("afg", Level::INFO)); + assert!(test_filter("afg", Level::DEBUG)); + assert!(!test_filter("afg", Level::TRACE)); + + assert!(test_filter("sync", Level::TRACE)); + assert!(test_filter("client", Level::WARN)); + + assert!(test_filter("telemetry", Level::TRACE)); + assert!(test_filter("something-with-dash", Level::ERROR)); + }); + } + + const EXPECTED_LOG_MESSAGE: &'static str = "yeah logging works as expected"; + + #[test] + fn dash_in_target_name_works() { + let executable = env::current_exe().unwrap(); + let output = Command::new(executable) + .env("ENABLE_LOGGING", "1") + .args(&["--nocapture", "log_something_with_dash_target_name"]) + .output() + .unwrap(); + + let output = String::from_utf8(output.stderr).unwrap(); + assert!(output.contains(EXPECTED_LOG_MESSAGE)); + } + + /// This is no actual test, it will be used by the `dash_in_target_name_works` test. + /// The given test will call the test executable to only execute this test that + /// will only print `EXPECTED_LOG_MESSAGE` through logging while using a target + /// name that contains a dash. This ensures that targets names with dashes work. + #[test] + fn log_something_with_dash_target_name() { + if env::var("ENABLE_LOGGING").is_ok() { + let test_pattern = "test-target=info"; + init_logger(&test_pattern, Default::default(), Default::default()).unwrap(); + + log::info!(target: "test-target", "{}", EXPECTED_LOG_MESSAGE); + } + } +} diff --git a/client/tracing/Cargo.toml b/client/tracing/Cargo.toml index 37b8cb4792..35db326c94 100644 --- a/client/tracing/Cargo.toml +++ b/client/tracing/Cargo.toml @@ -20,8 +20,8 @@ rustc-hash = "1.1.0" serde = "1.0.101" serde_json = "1.0.41" slog = { version = "2.5.2", features = ["nested-values"] } -tracing = "0.1.19" -tracing-core = "0.1.13" -tracing-subscriber = "0.2.10" +tracing = "0.1.21" +tracing-core = "0.1.17" +tracing-subscriber = "0.2.13" sp-tracing = { version = "2.0.0", path = "../../primitives/tracing" } sc-telemetry = { version = "2.0.0", path = "../telemetry" } diff --git a/primitives/io/Cargo.toml b/primitives/io/Cargo.toml index 70a78f99d5..d2dd0a7c64 100644 --- a/primitives/io/Cargo.toml +++ b/primitives/io/Cargo.toml @@ -30,7 +30,7 @@ log = { version = "0.4.8", optional = true } futures = { version = "0.3.1", features = ["thread-pool"], optional = true } parking_lot = { version = "0.10.0", optional = true } tracing = { version = "0.1.19", default-features = false } -tracing-core = { version = "0.1.15", default-features = false} +tracing-core = { version = "0.1.17", default-features = false} [features] default = ["std"] diff --git a/primitives/runtime-interface/test/Cargo.toml b/primitives/runtime-interface/test/Cargo.toml index 0fd61f7bce..d802f9cb6b 100644 --- a/primitives/runtime-interface/test/Cargo.toml +++ b/primitives/runtime-interface/test/Cargo.toml @@ -21,4 +21,4 @@ sp-runtime = { version = "2.0.0", path = "../../runtime" } sp-core = { version = "2.0.0", path = "../../core" } sp-io = { version = "2.0.0", path = "../../io" } tracing = "0.1.19" -tracing-core = "0.1.15" +tracing-core = "0.1.17" diff --git a/primitives/tracing/Cargo.toml b/primitives/tracing/Cargo.toml index a1d89af58c..1000952b39 100644 --- a/primitives/tracing/Cargo.toml +++ b/primitives/tracing/Cargo.toml @@ -20,23 +20,23 @@ targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"] [dependencies] sp-std = { version = "2.0.0", path = "../std", default-features = false} codec = { version = "1.3.1", package = "parity-scale-codec", default-features = false, features = ["derive"]} -tracing = { version = "0.1.19", default-features = false } -tracing-core = { version = "0.1.16", default-features = false } +tracing = { version = "0.1.21", default-features = false } +tracing-core = { version = "0.1.17", default-features = false } log = { version = "0.4.8", optional = true } tracing-subscriber = { version = "0.2.10", optional = true, features = ["tracing-log"] } [features] default = [ "std" ] with-tracing = [ - "codec/derive", - "codec/full", + "codec/derive", + "codec/full", ] std = [ - "with-tracing", - "tracing/std", - "tracing-core/std", - "codec/std", - "sp-std/std", - "log", - "tracing-subscriber", + "with-tracing", + "tracing/std", + "tracing-core/std", + "codec/std", + "sp-std/std", + "log", + "tracing-subscriber", ] From 273005f578e476c7ea3331e029ecf63e49f71960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= <123550+andresilva@users.noreply.github.com> Date: Wed, 30 Sep 2020 21:12:29 +0100 Subject: [PATCH 02/13] client: fix log filters (#7241) * client: fix multiple logger filters * client: add test for log filters setup --- client/cli/src/lib.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 6e9a1b52f7..07c54d7a7e 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -43,7 +43,7 @@ use structopt::{ clap::{self, AppSettings}, StructOpt, }; -use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::{filter::Directive, layer::SubscriberExt}; /// Substrate client CLI /// @@ -234,6 +234,13 @@ pub fn init_logger( tracing_receiver: sc_tracing::TracingReceiver, tracing_targets: Option, ) -> std::result::Result<(), String> { + fn parse_directives(dirs: impl AsRef) -> Vec { + dirs.as_ref() + .split(',') + .filter_map(|s| s.parse().ok()) + .collect() + } + if let Err(e) = tracing_log::LogTracer::init() { return Err(format!( "Registering Substrate logger failed: {:}!", e @@ -257,7 +264,7 @@ pub fn init_logger( if lvl != "" { // We're not sure if log or tracing is available at this moment, so silently ignore the // parse error. - if let Ok(directive) = lvl.parse() { + for directive in parse_directives(lvl) { env_filter = env_filter.add_directive(directive); } } @@ -266,7 +273,7 @@ pub fn init_logger( if pattern != "" { // We're not sure if log or tracing is available at this moment, so silently ignore the // parse error. - if let Ok(directive) = pattern.parse() { + for directive in parse_directives(pattern) { env_filter = env_filter.add_directive(directive); } } From ba93b85c6bf2687695e91105bd867c1d388922c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 20 Oct 2020 12:51:51 +0200 Subject: [PATCH 03/13] Fix logging from inside the WASM runtime (#7355) * Fix logging from inside the WASM runtime When using `RuntimeLogger` to log something from the runtime, we didn't set any logging level. So, we actually did not log anything from the runtime as logging is disabled by default. This pr fixes that by setting the logging level to `TRACE`. It also adds a test to ensure this does not break again ;) * Update frame/support/src/debug.rs --- Cargo.lock | 2 ++ frame/support/Cargo.toml | 2 ++ frame/support/src/debug.rs | 49 ++++++++++++++++++++++++++++++++++- primitives/tracing/src/lib.rs | 4 ++- test-utils/runtime/src/lib.rs | 14 ++++++++++ 5 files changed, 69 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e2b1ff575..b3bc394d61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1592,6 +1592,7 @@ dependencies = [ "pretty_assertions", "serde", "smallvec 1.4.1", + "sp-api", "sp-arithmetic", "sp-core", "sp-inherents", @@ -1600,6 +1601,7 @@ dependencies = [ "sp-state-machine", "sp-std", "sp-tracing", + "substrate-test-runtime-client", ] [[package]] diff --git a/frame/support/Cargo.toml b/frame/support/Cargo.toml index d082b71826..95e5dbabc5 100644 --- a/frame/support/Cargo.toml +++ b/frame/support/Cargo.toml @@ -36,6 +36,8 @@ smallvec = "1.4.1" pretty_assertions = "0.6.1" frame-system = { version = "2.0.0", path = "../system" } parity-util-mem = { version = "0.7.0", features = ["primitive-types"] } +substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } +sp-api = { version = "2.0.0", default-features = false, path = "../../primitives/api" } [features] default = ["std"] diff --git a/frame/support/src/debug.rs b/frame/support/src/debug.rs index 86b40f1664..04f5c529f0 100644 --- a/frame/support/src/debug.rs +++ b/frame/support/src/debug.rs @@ -170,8 +170,16 @@ impl RuntimeLogger { /// This is a no-op when running natively (`std`). #[cfg(not(feature = "std"))] pub fn init() { - static LOGGER: RuntimeLogger = RuntimeLogger;; + static LOGGER: RuntimeLogger = RuntimeLogger; let _ = log::set_logger(&LOGGER); + + // Set max level to `TRACE` to ensure we propagate + // all log entries to the native side that will do the + // final filtering on what should be printed. + // + // If we don't set any level, logging is disabled + // completly. + log::set_max_level(log::LevelFilter::Trace); } } @@ -198,3 +206,42 @@ impl log::Log for RuntimeLogger { fn flush(&self) {} } + +#[cfg(test)] +mod tests { + use substrate_test_runtime_client::{ + ExecutionStrategy, TestClientBuilderExt, DefaultTestClientBuilderExt, + TestClientBuilder, runtime::TestAPI, + }; + use sp_api::ProvideRuntimeApi; + use sp_runtime::generic::BlockId; + + #[test] + fn ensure_runtime_logger_works() { + let executable = std::env::current_exe().unwrap(); + let output = std::process::Command::new(executable) + .env("RUN_TEST", "1") + .env("RUST_LOG", "trace") + .args(&["--nocapture", "ensure_runtime_logger_works_implementation"]) + .output() + .unwrap(); + + let output = dbg!(String::from_utf8(output.stderr).unwrap()); + assert!(output.contains("Hey I'm runtime")); + } + + /// This is no actual test. It will be called by `ensure_runtime_logger_works` + /// to check that the runtime can print from the wasm side using the + /// `RuntimeLogger`. + #[test] + fn ensure_runtime_logger_works_implementation() { + if std::env::var("RUN_TEST").is_ok() { + sp_tracing::try_init_simple(); + + let client = TestClientBuilder::new().set_execution_strategy(ExecutionStrategy::AlwaysWasm).build(); + let runtime_api = client.runtime_api(); + let block_id = BlockId::Number(0); + runtime_api.do_trace_log(&block_id).expect("Logging should not fail"); + } + } +} diff --git a/primitives/tracing/src/lib.rs b/primitives/tracing/src/lib.rs index fb074d5579..cb67d8a0c5 100644 --- a/primitives/tracing/src/lib.rs +++ b/primitives/tracing/src/lib.rs @@ -107,7 +107,9 @@ pub use crate::types::{ /// Ignores any error. Useful for testing. #[cfg(feature = "std")] pub fn try_init_simple() { - let _ = tracing_subscriber::fmt().with_writer(std::io::stderr).try_init(); + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_writer(std::io::stderr).try_init(); } #[cfg(feature = "std")] diff --git a/test-utils/runtime/src/lib.rs b/test-utils/runtime/src/lib.rs index 5ab4d99dee..e772a28ee3 100644 --- a/test-utils/runtime/src/lib.rs +++ b/test-utils/runtime/src/lib.rs @@ -340,6 +340,8 @@ cfg_if! { /// Test that ensures that we can call a function that takes multiple /// arguments. fn test_multiple_arguments(data: Vec, other: Vec, num: u32); + /// Traces log "Hey I'm runtime." + fn do_trace_log(); } } } else { @@ -391,6 +393,8 @@ cfg_if! { /// Test that ensures that we can call a function that takes multiple /// arguments. fn test_multiple_arguments(data: Vec, other: Vec, num: u32); + /// Traces log "Hey I'm runtime." + fn do_trace_log(); } } } @@ -698,6 +702,11 @@ cfg_if! { assert_eq!(&data[..], &other[..]); assert_eq!(data.len(), num as usize); } + + fn do_trace_log() { + frame_support::debug::RuntimeLogger::init(); + frame_support::debug::trace!("Hey I'm runtime"); + } } impl sp_consensus_aura::AuraApi for Runtime { @@ -944,6 +953,11 @@ cfg_if! { assert_eq!(&data[..], &other[..]); assert_eq!(data.len(), num as usize); } + + fn do_trace_log() { + frame_support::debug::RuntimeLogger::init(); + frame_support::debug::trace!("Hey I'm runtime"); + } } impl sp_consensus_aura::AuraApi for Runtime { From 3d8df3446cac980a4b79f1a1dfe29c63ef8ce766 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 29 Oct 2020 11:47:58 +0100 Subject: [PATCH 04/13] Print an error if an unregistered notifications protocol is used (#7457) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Print an error if an nregistered notifications protocol is used * Print an error if an nregistered notifications protocol is used * Update client/network/src/service.rs Co-authored-by: Bastian Köcher --- client/network/src/service.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 59f55f01a4..b5dc7ac830 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -662,6 +662,11 @@ impl NetworkService { if let Some(protocol_name) = protocol_name { sink.send_sync_notification(protocol_name, message); } else { + log::error!( + target: "sub-libp2p", + "Attempted to send notification on unknown protocol: {:?}", + engine_id, + ); return; } From 3918e91d42f00d4559c5dc32b0d9334cfc75b878 Mon Sep 17 00:00:00 2001 From: Kian Paimani <5588131+kianenigma@users.noreply.github.com> Date: Sat, 24 Oct 2020 13:52:59 +0200 Subject: [PATCH 05/13] Fix wrong outgoing calculation in election (#7384) * Fix wrong outgoing calculation in election * Add test. * Lil bit better naming. --- frame/elections-phragmen/src/lib.rs | 94 ++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 29 deletions(-) diff --git a/frame/elections-phragmen/src/lib.rs b/frame/elections-phragmen/src/lib.rs index dd816033ae..7891853530 100644 --- a/frame/elections-phragmen/src/lib.rs +++ b/frame/elections-phragmen/src/lib.rs @@ -891,16 +891,18 @@ impl Module { num_to_elect, 0, candidates, - voters_and_votes, - ); - - if let Some(ElectionResult { winners, assignments }) = maybe_phragmen_result { - let old_members_ids = >::take().into_iter() + voters_and_votes.clone(), + None, + ).map(|ElectionResult { winners, assignments: _ }| { + // this is already sorted by id. + let old_members_ids_sorted = >::take().into_iter() .map(|(m, _)| m) .collect::>(); - let old_runners_up_ids = >::take().into_iter() + // this one needs a sort by id. + let mut old_runners_up_ids_sorted = >::take().into_iter() .map(|(r, _)| r) .collect::>(); + old_runners_up_ids_sorted.sort(); // filter out those who had literally no votes at all. // NOTE: the need to do this is because all candidates, even those who have no @@ -939,17 +941,17 @@ impl Module { // split new set into winners and runners up. let split_point = desired_seats.min(new_set_with_stake.len()); - let mut new_members = (&new_set_with_stake[..split_point]).to_vec(); + let mut new_members_sorted_by_id = (&new_set_with_stake[..split_point]).to_vec(); // save the runners up as-is. They are sorted based on desirability. // save the members, sorted based on account id. - new_members.sort_by(|i, j| i.0.cmp(&j.0)); + new_members_sorted_by_id.sort_by(|i, j| i.0.cmp(&j.0)); // Now we select a prime member using a [Borda count](https://en.wikipedia.org/wiki/Borda_count). // We weigh everyone's vote for that new member by a multiplier based on the order // of the votes. i.e. the first person a voter votes for gets a 16x multiplier, // the next person gets a 15x multiplier, an so on... (assuming `MAXIMUM_VOTE` = 16) - let mut prime_votes: Vec<_> = new_members.iter().map(|c| (&c.0, BalanceOf::::zero())).collect(); + let mut prime_votes: Vec<_> = new_members_sorted_by_id.iter().map(|c| (&c.0, BalanceOf::::zero())).collect(); for (_, stake, votes) in voters_and_stakes.into_iter() { for (vote_multiplier, who) in votes.iter() .enumerate() @@ -967,54 +969,58 @@ impl Module { // the person with the "highest" account id based on the sort above. let prime = prime_votes.into_iter().max_by_key(|x| x.1).map(|x| x.0.clone()); - // new_members_ids is sorted by account id. - let new_members_ids = new_members + // new_members_sorted_by_id is sorted by account id. + let new_members_ids_sorted = new_members_sorted_by_id .iter() .map(|(m, _)| m.clone()) .collect::>(); - let new_runners_up = &new_set_with_stake[split_point..] + let new_runners_up_sorted_by_rank = &new_set_with_stake[split_point..] .into_iter() .cloned() .rev() .collect::)>>(); // new_runners_up remains sorted by desirability. - let new_runners_up_ids = new_runners_up + let mut new_runners_up_ids_sorted = new_runners_up_sorted_by_rank .iter() .map(|(r, _)| r.clone()) .collect::>(); + new_runners_up_ids_sorted.sort(); // report member changes. We compute diff because we need the outgoing list. let (incoming, outgoing) = T::ChangeMembers::compute_members_diff( - &new_members_ids, - &old_members_ids, + &new_members_ids_sorted, + &old_members_ids_sorted, ); T::ChangeMembers::change_members_sorted( &incoming, &outgoing, - &new_members_ids, + &new_members_ids_sorted, ); T::ChangeMembers::set_prime(prime); - // outgoing candidates lose their bond. + // outgoing members lose their bond. let mut to_burn_bond = outgoing.to_vec(); // compute the outgoing of runners up as well and append them to the `to_burn_bond` { let (_, outgoing) = T::ChangeMembers::compute_members_diff( - &new_runners_up_ids, - &old_runners_up_ids, + &new_runners_up_ids_sorted, + &old_runners_up_ids_sorted, ); + // none of the ones computed to be outgoing must still be in the list. + debug_assert!(outgoing.iter().all(|o| !new_runners_up_ids_sorted.contains(o))); to_burn_bond.extend(outgoing); } // Burn loser bond. members list is sorted. O(NLogM) (N candidates, M members) - // runner up list is not sorted. O(K*N) given K runner ups. Overall: O(NLogM + N*K) + // runner up list is also sorted. O(NLogK) given K runner ups. Overall: O(NLogM + N*K) // both the member and runner counts are bounded. exposed_candidates.into_iter().for_each(|c| { // any candidate who is not a member and not a runner up. - if new_members.binary_search_by_key(&c, |(m, _)| m.clone()).is_err() - && !new_runners_up_ids.contains(&c) + if + new_members_ids_sorted.binary_search(&c).is_err() && + new_runners_up_ids_sorted.binary_search(&c).is_err() { let (imbalance, _) = T::Currency::slash_reserved(&c, T::CandidacyBond::get()); T::LoserCandidate::on_unbalanced(imbalance); @@ -1027,13 +1033,10 @@ impl Module { T::LoserCandidate::on_unbalanced(imbalance); }); - >::put(&new_members); - >::put(new_runners_up); + >::put(&new_members_sorted_by_id); + >::put(new_runners_up_sorted_by_rank); - Self::deposit_event(RawEvent::NewTerm(new_members.clone().to_vec())); - } else { - Self::deposit_event(RawEvent::EmptyTerm); - } + Self::deposit_event(RawEvent::NewTerm(new_members_sorted_by_id.clone().to_vec())); // clean candidates. >::kill(); @@ -1297,7 +1300,6 @@ mod tests { self.genesis_members = members; self } - #[cfg(feature = "runtime-benchmarks")] pub fn desired_members(mut self, count: u32) -> Self { self.desired_members = count; self @@ -2818,4 +2820,38 @@ mod tests { assert!(Elections::candidates().is_empty()); }) } + + #[test] + fn unsorted_runners_up_are_detected() { + ExtBuilder::default().desired_runners_up(2).desired_members(1).build_and_execute(|| { + assert_ok!(submit_candidacy(Origin::signed(5))); + assert_ok!(submit_candidacy(Origin::signed(4))); + assert_ok!(submit_candidacy(Origin::signed(3))); + + + assert_ok!(vote(Origin::signed(5), vec![5], 50)); + assert_ok!(vote(Origin::signed(4), vec![4], 5)); + assert_ok!(vote(Origin::signed(3), vec![3], 15)); + + System::set_block_number(5); + Elections::end_block(System::block_number()); + + assert_eq!(Elections::members_ids(), vec![5]); + assert_eq!(Elections::runners_up_ids(), vec![4, 3]); + + assert_ok!(submit_candidacy(Origin::signed(2))); + assert_ok!(vote(Origin::signed(2), vec![2], 10)); + + System::set_block_number(10); + Elections::end_block(System::block_number()); + + assert_eq!(Elections::members_ids(), vec![5]); + assert_eq!(Elections::runners_up_ids(), vec![2, 3]); + + // 4 is outgoing runner-up. Slash candidacy bond. + assert_eq!(balances(&4), (35, 2)); + // 3 stays. + assert_eq!(balances(&3), (25, 5)); + }) + } } From 863398a58dc791ea021021dd01ad7cd2aa7e2e06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 14 Oct 2020 11:44:42 +0100 Subject: [PATCH 06/13] grandpa: fix tests --- client/finality-grandpa/src/authorities.rs | 139 ++++++++++++++++++--- 1 file changed, 120 insertions(+), 19 deletions(-) diff --git a/client/finality-grandpa/src/authorities.rs b/client/finality-grandpa/src/authorities.rs index 7a064d7a62..b57e0347fb 100644 --- a/client/finality-grandpa/src/authorities.rs +++ b/client/finality-grandpa/src/authorities.rs @@ -937,12 +937,7 @@ mod tests { None, ); - // throw a standard change into the mix to prove that it's discarded - // for being on the same fork. - // - // NOTE: after https://github.com/paritytech/substrate/issues/1861 - // this should still be rejected based on the "span" rule -- it overlaps - // with another change on the same fork. + // there can only be one pending forced change per fork let change_c = PendingChange { next_authorities: set_b.clone(), delay: 3, @@ -951,11 +946,12 @@ mod tests { delay_kind: DelayKind::Best { median_last_finalized: 0 }, }; - let is_descendent_of_a = is_descendent_of(|base: &&str, _| { - base.starts_with("hash_a") - }); + let is_descendent_of_a = is_descendent_of(|base: &&str, _| base.starts_with("hash_a")); - assert!(authorities.add_pending_change(change_c, &is_descendent_of_a).is_err()); + assert!(matches!( + authorities.add_pending_change(change_c, &is_descendent_of_a), + Err(Error::MultiplePendingForcedAuthoritySetChanges) + )); // too early. assert!( @@ -966,22 +962,27 @@ mod tests { // too late. assert!( - authorities.apply_forced_changes("hash_a16", 16, &static_is_descendent_of(true), false) + authorities + .apply_forced_changes("hash_a16", 16, &is_descendent_of_a, false) .unwrap() .is_none() ); - // on time -- chooses the right change. + // on time -- chooses the right change for this fork. assert_eq!( - authorities.apply_forced_changes("hash_a15", 15, &is_descendent_of_a, false) + authorities + .apply_forced_changes("hash_a15", 15, &is_descendent_of_a, false) .unwrap() .unwrap(), - (42, AuthoritySet { - current_authorities: set_a, - set_id: 1, - pending_standard_changes: ForkTree::new(), - pending_forced_changes: Vec::new(), - }), + ( + 42, + AuthoritySet { + current_authorities: set_a, + set_id: 1, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + }, + ) ); } @@ -1022,6 +1023,106 @@ mod tests { ); } + #[test] + fn forced_changes_blocked_by_standard_changes() { + let set_a = vec![(AuthorityId::from_slice(&[1; 32]), 1)]; + + let mut authorities = AuthoritySet { + current_authorities: set_a.clone(), + set_id: 0, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + }; + + // effective at #15 + let change_a = PendingChange { + next_authorities: set_a.clone(), + delay: 5, + canon_height: 10, + canon_hash: "hash_a", + delay_kind: DelayKind::Finalized, + }; + + // effective #20 + let change_b = PendingChange { + next_authorities: set_a.clone(), + delay: 0, + canon_height: 20, + canon_hash: "hash_b", + delay_kind: DelayKind::Finalized, + }; + + // effective at #35 + let change_c = PendingChange { + next_authorities: set_a.clone(), + delay: 5, + canon_height: 30, + canon_hash: "hash_c", + delay_kind: DelayKind::Finalized, + }; + + // add some pending standard changes all on the same fork + authorities.add_pending_change(change_a, &static_is_descendent_of(true)).unwrap(); + authorities.add_pending_change(change_b, &static_is_descendent_of(true)).unwrap(); + authorities.add_pending_change(change_c, &static_is_descendent_of(true)).unwrap(); + + // effective at #45 + let change_d = PendingChange { + next_authorities: set_a.clone(), + delay: 5, + canon_height: 40, + canon_hash: "hash_d", + delay_kind: DelayKind::Best { + median_last_finalized: 31, + }, + }; + + // now add a forced change on the same fork + authorities.add_pending_change(change_d, &static_is_descendent_of(true)).unwrap(); + + // the forced change cannot be applied since the pending changes it depends on + // have not been applied yet. + assert!(matches!( + authorities.apply_forced_changes("hash_d45", 45, &static_is_descendent_of(true), false), + Err(Error::ForcedAuthoritySetChangeDependencyUnsatisfied(15)) + )); + + // we apply the first pending standard change at #15 + authorities + .apply_standard_changes("hash_a15", 15, &static_is_descendent_of(true), false) + .unwrap(); + + // but the forced change still depends on the next standard change + assert!(matches!( + authorities.apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false), + Err(Error::ForcedAuthoritySetChangeDependencyUnsatisfied(20)) + )); + + // we apply the pending standard change at #20 + authorities + .apply_standard_changes("hash_b", 20, &static_is_descendent_of(true), false) + .unwrap(); + + // afterwards the forced change at #45 can already be applied since it signals + // that finality stalled at #31, and the next pending standard change is effective + // at #35. subsequent forced changes on the same branch must be kept + assert_eq!( + authorities + .apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false) + .unwrap() + .unwrap(), + ( + 31, + AuthoritySet { + current_authorities: set_a.clone(), + set_id: 3, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + } + ), + ); + } + #[test] fn next_change_works() { let current_authorities = vec![(AuthorityId::from_slice(&[1; 32]), 1)]; From 1630f3d9fb09cab8bd8ae6c0bc7cd690a9947ced Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 20 Oct 2020 11:23:27 +0200 Subject: [PATCH 07/13] *: Bump async-std to v1.6.5 (#7306) * *: Bump async-std to v1.6.5 Prevent users from using v1.6.4 which faces issues receiving incoming TCP connections. See https://github.com/async-rs/async-std/issues/888 for details. * client/network/src/gossip: Use channel instead of condvar `async_std::sync::Condvar::wait_timeout` uses `gloo_timers::callback::Timeout` when compiled for `wasm32-unknown-unknown`. This timeout implementation does not fulfill the requirement of being `Send`. Instead of using a `Condvar` use a `futures::channel::mpsc` to signal progress from the `QueuedSender` to the background `Future`. * client/network/Cargo.toml: Remove async-std unstable feature * client/network/src/gossip: Forward all queued messages * client/network/gossip: Have QueuedSender methods take &mut self * client/network/gossip: Move queue_size_limit into QueuedSender The `queue_size_limit` field is only accessed by `QueuedSender`, thus there is no need to share it between the background future and the `QueuedSender`. * client/network/gossip: Rename background task to future To be a bit picky the background task is not a task in the sense of an asynchonous task, but rather a background future in the sense of `futures::future::Future`. --- Cargo.lock | 172 ++++++++++++++++++++--------- client/network-gossip/Cargo.toml | 2 +- client/network/Cargo.toml | 2 +- client/network/src/gossip.rs | 161 ++++++++++++--------------- client/network/src/gossip/tests.rs | 2 +- client/service/Cargo.toml | 2 +- utils/prometheus/Cargo.toml | 2 +- 7 files changed, 194 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b3bc394d61..c698041dcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,9 +220,9 @@ checksum = "7deb0a829ca7bcfaf5da70b073a8d128619259a7be8216a355e23f00763059e5" [[package]] name = "async-channel" -version = "1.1.1" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee81ba99bee79f3c8ae114ae4baa7eaa326f63447cf2ec65e4393618b63f8770" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" dependencies = [ "concurrent-queue", "event-listener", @@ -230,34 +230,95 @@ dependencies = [ ] [[package]] -name = "async-std" -version = "1.6.2" +name = "async-executor" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00d68a33ebc8b57800847d00787307f84a562224a14db069b0acefe4c2abbf5d" +checksum = "d373d78ded7d0b3fa8039375718cde0aace493f2e34fb60f51cbf567562ca801" dependencies = [ "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell 1.4.1", + "vec-arena", +] + +[[package]] +name = "async-global-executor" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fefeb39da249f4c33af940b779a56723ce45809ef5c54dad84bb538d4ffb6d9e" +dependencies = [ + "async-executor", + "async-io", + "futures-lite", + "num_cpus", + "once_cell 1.4.1", +] + +[[package]] +name = "async-io" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38628c78a34f111c5a6b98fc87dfc056cd1590b61afe748b145be4623c56d194" +dependencies = [ + "cfg-if", + "concurrent-queue", + "fastrand", + "futures-lite", + "libc", + "log", + "once_cell 1.4.1", + "parking", + "polling", + "socket2", + "vec-arena", + "waker-fn", + "wepoll-sys-stjepang", + "winapi 0.3.9", +] + +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9fa76751505e8df1c7a77762f60486f60c71bbd9b8557f4da6ad47d083732ed" +dependencies = [ + "async-global-executor", + "async-io", + "async-mutex", + "blocking", "crossbeam-utils", "futures-channel", "futures-core", "futures-io", - "futures-timer 3.0.2", + "futures-lite", + "gloo-timers", "kv-log-macro", "log", "memchr", "num_cpus", - "once_cell 1.4.0", + "once_cell 1.4.1", "pin-project-lite", "pin-utils", "slab", - "smol", "wasm-bindgen-futures", ] [[package]] name = "async-task" -version = "3.0.0" +version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17772156ef2829aadc587461c7753af20b7e8db1529bc66855add962a3b35d3" +checksum = "8ab27c1aa62945039e44edaeee1dc23c74cc0c303dd5fe0fb462a184f1c3a518" [[package]] name = "async-tls" @@ -506,16 +567,16 @@ dependencies = [ [[package]] name = "blocking" -version = "0.4.7" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2468ff7bf85066b4a3678fede6fe66db31846d753ff0adfbfab2c6a6e81612b" +checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" dependencies = [ "async-channel", + "async-task", "atomic-waker", + "fastrand", "futures-lite", - "once_cell 1.4.0", - "parking", - "waker-fn", + "once_cell 1.4.1", ] [[package]] @@ -744,9 +805,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "1.1.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83c06aff61f2d899eb87c379df3cbf7876f14471dcab474e0b6dc90ab96c080" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" dependencies = [ "cache-padded", ] @@ -1332,9 +1393,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "2.2.0" +version = "2.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "699d84875f1b72b4da017e6b0f77dfa88c0137f089958a88974d15938cbc2976" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" [[package]] name = "evm" @@ -1427,9 +1488,12 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" [[package]] name = "fastrand" -version = "1.3.3" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36a9cb09840f81cd211e435d00a4e487edd263dc3c8ff815c32dd76ad668ebed" +checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" +dependencies = [ + "instant", +] [[package]] name = "fdlimit" @@ -1585,7 +1649,7 @@ dependencies = [ "frame-system", "impl-trait-for-tuples", "log", - "once_cell 1.4.0", + "once_cell 1.4.1", "parity-scale-codec", "parity-util-mem", "paste", @@ -1842,9 +1906,9 @@ checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" [[package]] name = "futures-lite" -version = "0.1.8" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180d8fc9819eb48a0c976672fbeea13a73e10999e812bdc9e14644c25ad51d60" +checksum = "381a7ad57b1bad34693f63f6f377e1abded7a9c85c9d3eb6771e11c60aaadab9" dependencies = [ "fastrand", "futures-core", @@ -1879,7 +1943,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" dependencies = [ - "once_cell 1.4.0", + "once_cell 1.4.1", ] [[package]] @@ -2738,9 +2802,9 @@ checksum = "3576a87f2ba00f6f106fdfcd16db1d698d648a26ad8e0573cad8537c3c362d2a" [[package]] name = "libc" -version = "0.2.73" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9" +checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743" [[package]] name = "libloading" @@ -4144,11 +4208,11 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad" dependencies = [ - "parking_lot 0.10.2", + "parking_lot 0.11.0", ] [[package]] @@ -5203,9 +5267,9 @@ dependencies = [ [[package]] name = "parking" -version = "1.0.5" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d4a6da31f8144a32532fe38fe8fb439a6842e0ec633f0037f0144c14e7f907" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" [[package]] name = "parking_lot" @@ -5426,6 +5490,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "polling" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0720e0b9ea9d52451cf29d3413ba8a9303f8815d9d9653ef70e03ff73e65566" +dependencies = [ + "cfg-if", + "libc", + "log", + "wepoll-sys-stjepang", + "winapi 0.3.9", +] + [[package]] name = "poly1305" version = "0.6.0" @@ -6104,7 +6181,7 @@ checksum = "952cd6b98c85bbc30efa1ba5783b8abf12fec8b3287ffa52605b9432313e34e4" dependencies = [ "cc", "libc", - "once_cell 1.4.0", + "once_cell 1.4.1", "spin", "untrusted", "web-sys", @@ -7761,27 +7838,6 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f" -[[package]] -name = "smol" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "620cbb3c6e34da57d3a248cda0cd01cd5848164dc062e764e65d06fe3ea7aed5" -dependencies = [ - "async-task", - "blocking", - "concurrent-queue", - "fastrand", - "futures-io", - "futures-util", - "libc", - "once_cell 1.4.0", - "scoped-tls", - "slab", - "socket2", - "wepoll-sys-stjepang", - "winapi 0.3.9", -] - [[package]] name = "snow" version = "0.7.1" @@ -9098,7 +9154,7 @@ checksum = "b0165e045cc2ae1660270ca65e1676dbaab60feb0f91b10f7d0665e9b47e31f2" dependencies = [ "failure", "hmac", - "once_cell 1.4.0", + "once_cell 1.4.1", "pbkdf2", "rand 0.7.3", "rustc-hash", @@ -9738,6 +9794,12 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" +[[package]] +name = "vec-arena" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 53610650c0..36f877da9a 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -25,7 +25,7 @@ sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } wasm-timer = "0.2" [dev-dependencies] -async-std = "1.6.2" +async-std = "1.6.5" quickcheck = "0.9.0" rand = "0.7.2" substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 76d8f0a23e..af0e2a2dc1 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -18,7 +18,7 @@ prost-build = "0.6.1" [dependencies] async-trait = "0.1" -async-std = { version = "1.6.2", features = ["unstable"] } +async-std = "1.6.5" bitflags = "1.2.0" bs58 = "0.3.1" bytes = "0.5.0" diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 0650e7a2f8..9d20229288 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -49,15 +49,15 @@ use crate::{ExHashT, NetworkService}; -use async_std::sync::{Condvar, Mutex, MutexGuard}; +use async_std::sync::{Mutex, MutexGuard}; use futures::prelude::*; +use futures::channel::mpsc::{channel, Receiver, Sender}; use libp2p::PeerId; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{ collections::VecDeque, fmt, - sync::{atomic, Arc}, - time::Duration, + sync::Arc, }; #[cfg(test)] @@ -65,8 +65,12 @@ mod tests; /// Notifications sender for a specific combination of network service, peer, and protocol. pub struct QueuedSender { - /// Shared between the front and the back task. - shared: Arc>, + /// Shared between the user-facing [`QueuedSender`] and the background future. + shared_message_queue: SharedMessageQueue, + /// Used to notify the background future to check for new messages in the message queue. + notify_background_future: Sender<()>, + /// Maximum number of elements in [`QueuedSender::shared_message_queue`]. + queue_size_limit: usize, } impl QueuedSender { @@ -88,39 +92,45 @@ impl QueuedSender { H: ExHashT, F: Fn(M) -> Vec + Send + 'static, { - let shared = Arc::new(Shared { - stop_task: atomic::AtomicBool::new(false), - condvar: Condvar::new(), - queue_size_limit, - messages_queue: Mutex::new(VecDeque::with_capacity(queue_size_limit)), - }); + let (notify_background_future, wait_for_sender) = channel(0); + + let shared_message_queue = Arc::new(Mutex::new( + VecDeque::with_capacity(queue_size_limit), + )); - let task = spawn_task( + let background_future = create_background_future( + wait_for_sender, service, peer_id, protocol, - shared.clone(), + shared_message_queue.clone(), messages_encode ); - (QueuedSender { shared }, task) + let sender = QueuedSender { + shared_message_queue, + notify_background_future, + queue_size_limit, + }; + + (sender, background_future) } /// Locks the queue of messages towards this peer. /// /// The returned `Future` is expected to be ready quite quickly. - pub async fn lock_queue<'a>(&'a self) -> QueueGuard<'a, M> { + pub async fn lock_queue<'a>(&'a mut self) -> QueueGuard<'a, M> { QueueGuard { - messages_queue: self.shared.messages_queue.lock().await, - condvar: &self.shared.condvar, - queue_size_limit: self.shared.queue_size_limit, + message_queue: self.shared_message_queue.lock().await, + queue_size_limit: self.queue_size_limit, + notify_background_future: &mut self.notify_background_future, } } /// Pushes a message to the queue, or discards it if the queue is full. /// /// The returned `Future` is expected to be ready quite quickly. - pub async fn queue_or_discard(&self, message: M) + pub async fn queue_or_discard(&mut self, message: M) where M: Send + 'static { @@ -134,28 +144,17 @@ impl fmt::Debug for QueuedSender { } } -impl Drop for QueuedSender { - fn drop(&mut self) { - // The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, - // then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` - // being asynchronous, it can't reasonably be locked from within a destructor. - // See also the corresponding code in the background task. - self.shared.stop_task.store(true, atomic::Ordering::Release); - self.shared.condvar.notify_all(); - } -} - /// Locked queue of messages to the given peer. /// -/// As long as this struct exists, the background task is asleep and the owner of the [`QueueGuard`] -/// is in total control of the buffer. Messages can only ever be sent out after the [`QueueGuard`] -/// is dropped. +/// As long as this struct exists, the background future is asleep and the owner of the +/// [`QueueGuard`] is in total control of the message queue. Messages can only ever be sent out on +/// the network after the [`QueueGuard`] is dropped. #[must_use] pub struct QueueGuard<'a, M> { - messages_queue: MutexGuard<'a, VecDeque>, - condvar: &'a Condvar, - /// Same as [`Shared::queue_size_limit`]. + message_queue: MutexGuard<'a, MessageQueue>, + /// Same as [`QueuedSender::queue_size_limit`]. queue_size_limit: usize, + notify_background_future: &'a mut Sender<()>, } impl<'a, M: Send + 'static> QueueGuard<'a, M> { @@ -163,8 +162,8 @@ impl<'a, M: Send + 'static> QueueGuard<'a, M> { /// /// The message will only start being sent out after the [`QueueGuard`] is dropped. pub fn push_or_discard(&mut self, message: M) { - if self.messages_queue.len() < self.queue_size_limit { - self.messages_queue.push_back(message); + if self.message_queue.len() < self.queue_size_limit { + self.message_queue.push_back(message); } } @@ -174,72 +173,56 @@ impl<'a, M: Send + 'static> QueueGuard<'a, M> { /// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be /// > better) because the underlying implementation relies on `VecDeque::retain`. pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) { - self.messages_queue.retain(filter); + self.message_queue.retain(filter); } } impl<'a, M> Drop for QueueGuard<'a, M> { fn drop(&mut self) { - // We notify the `Condvar` in the destructor in order to be able to push multiple - // messages and wake up the background task only once afterwards. - self.condvar.notify_one(); + // Notify background future to check for new messages in the message queue. + let _ = self.notify_background_future.try_send(()); } } -#[derive(Debug)] -struct Shared { - /// Read by the background task after locking `locked`. If true, the task stops. - stop_task: atomic::AtomicBool, - /// Queue of messages waiting to be sent out. - messages_queue: Mutex>, - /// Must be notified every time the content of `locked` changes. - condvar: Condvar, - /// Maximum number of elements in `messages_queue`. - queue_size_limit: usize, -} +type MessageQueue = VecDeque; + +/// [`MessageQueue`] shared between [`QueuedSender`] and background future. +type SharedMessageQueue = Arc>>; -async fn spawn_task Vec>( +async fn create_background_future Vec>( + mut wait_for_sender: Receiver<()>, service: Arc>, peer_id: PeerId, protocol: ConsensusEngineId, - shared: Arc>, + shared_message_queue: SharedMessageQueue, messages_encode: F, ) { loop { - let next_message = 'next_msg: loop { - let mut queue = shared.messages_queue.lock().await; - - loop { - if shared.stop_task.load(atomic::Ordering::Acquire) { - return; - } - - if let Some(msg) = queue.pop_front() { - break 'next_msg msg; - } - - // It is possible that the destructor of `QueuedSender` sets `stop_task` to - // true and notifies the `Condvar` after the background task loads `stop_task` - // and before it calls `Condvar::wait`. - // See also the corresponding comment in `QueuedSender::drop`. - // For this reason, we use `wait_timeout`. In the worst case scenario, - // `stop_task` will always be checked again after the timeout is reached. - queue = shared.condvar.wait_timeout(queue, Duration::from_secs(10)).await.0; - } - }; - - // Starting from below, we try to send the message. If an error happens when sending, - // the only sane option we have is to silently discard the message. - let sender = match service.notification_sender(peer_id.clone(), protocol) { - Ok(s) => s, - Err(_) => continue, - }; - - let ready = match sender.ready().await { - Ok(r) => r, - Err(_) => continue, - }; + if wait_for_sender.next().await.is_none() { + return + } - let _ = ready.send(messages_encode(next_message)); + loop { + let mut queue_guard = shared_message_queue.lock().await; + let next_message = match queue_guard.pop_front() { + Some(msg) => msg, + None => break, + }; + drop(queue_guard); + + // Starting from below, we try to send the message. If an error happens when sending, + // the only sane option we have is to silently discard the message. + let sender = match service.notification_sender(peer_id.clone(), protocol) { + Ok(s) => s, + Err(_) => continue, + }; + + let ready = match sender.ready().await { + Ok(r) => r, + Err(_) => continue, + }; + + let _ = ready.send(messages_encode(next_message)); + } } } diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs index 9ba44f564e..0f01ed81bf 100644 --- a/client/network/src/gossip/tests.rs +++ b/client/network/src/gossip/tests.rs @@ -180,7 +180,7 @@ fn basic_works() { }); async_std::task::block_on(async move { - let (sender, bg_future) = + let (mut sender, bg_future) = QueuedSender::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg); async_std::task::spawn(bg_future); diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index f57a145422..2288412474 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -88,4 +88,4 @@ sp-consensus-babe = { version = "0.8.0", path = "../../primitives/consensus/babe grandpa = { version = "0.8.0", package = "sc-finality-grandpa", path = "../finality-grandpa" } grandpa-primitives = { version = "2.0.0", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } tokio = { version = "0.2", default-features = false } -async-std = { version = "1.6", default-features = false } +async-std = { version = "1.6.5", default-features = false } diff --git a/utils/prometheus/Cargo.toml b/utils/prometheus/Cargo.toml index 8fdbbc6124..9eed7a2fdc 100644 --- a/utils/prometheus/Cargo.toml +++ b/utils/prometheus/Cargo.toml @@ -19,6 +19,6 @@ futures-util = { version = "0.3.1", default-features = false, features = ["io"] derive_more = "0.99" [target.'cfg(not(target_os = "unknown"))'.dependencies] -async-std = { version = "1.6.2", features = ["unstable"] } +async-std = { version = "1.6.5", features = ["unstable"] } hyper = { version = "0.13.1", default-features = false, features = ["stream"] } tokio = "0.2" From 55dc78839e2c3057160d8e5f224426cdc8ff99e6 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 19 Oct 2020 21:43:32 +0200 Subject: [PATCH 08/13] client/network: Remove option to disable yamux flow control (#7358) With the `OnRead` flow control option yamux "send[s] window updates only when data is read on the receiving end" and not as soon as "a Stream's receive window drops to 0". Yamux flow control has proven itself. This commit removes the feature flag. Yamux flow control is now always enabled. --- client/cli/src/params/network_params.rs | 6 ------ client/network/src/config.rs | 3 --- client/network/src/service.rs | 10 +++++----- client/network/src/transport.rs | 10 +++------- client/service/test/src/lib.rs | 1 - utils/browser/src/lib.rs | 1 - 6 files changed, 8 insertions(+), 23 deletions(-) diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index faaf2c2bd2..79e7a70024 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -92,11 +92,6 @@ pub struct NetworkParams { #[structopt(flatten)] pub node_key_params: NodeKeyParams, - /// Disable the yamux flow control. This option will be removed in the future once there is - /// enough confidence that this feature is properly working. - #[structopt(long)] - pub no_yamux_flow_control: bool, - /// Enable peer discovery on local networks. /// /// By default this option is true for `--dev` and false otherwise. @@ -158,7 +153,6 @@ impl NetworkParams { enable_mdns: !is_dev && !self.no_mdns, allow_private_ipv4: !self.no_private_ipv4, wasm_external_transport: None, - use_yamux_flow_control: !self.no_yamux_flow_control, }, max_parallel_downloads: self.max_parallel_downloads, allow_non_globals_in_dht: self.discover_local || is_dev, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 7445ea0534..c144bebb5f 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -451,7 +451,6 @@ impl NetworkConfiguration { enable_mdns: false, allow_private_ipv4: true, wasm_external_transport: None, - use_yamux_flow_control: false, }, max_parallel_downloads: 5, allow_non_globals_in_dht: false, @@ -519,8 +518,6 @@ pub enum TransportConfig { /// This parameter exists whatever the target platform is, but it is expected to be set to /// `Some` only when compiling for WASM. wasm_external_transport: Option, - /// Use flow control for yamux streams if set to true. - use_yamux_flow_control: bool, }, /// Only allow connections within the same process. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index b5dc7ac830..f1ee94bf85 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -336,12 +336,12 @@ impl NetworkWorker { behaviour.register_notifications_protocol(*engine_id, protocol_name.clone()); } let (transport, bandwidth) = { - let (config_mem, config_wasm, flowctrl) = match params.network_config.transport { - TransportConfig::MemoryOnly => (true, None, false), - TransportConfig::Normal { wasm_external_transport, use_yamux_flow_control, .. } => - (false, wasm_external_transport, use_yamux_flow_control) + let (config_mem, config_wasm) = match params.network_config.transport { + TransportConfig::MemoryOnly => (true, None), + TransportConfig::Normal { wasm_external_transport, .. } => + (false, wasm_external_transport) }; - transport::build_transport(local_identity, config_mem, config_wasm, flowctrl) + transport::build_transport(local_identity, config_mem, config_wasm) }; let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) .peer_connection_limit(crate::MAX_CONNECTIONS_PER_PEER) diff --git a/client/network/src/transport.rs b/client/network/src/transport.rs index 626f84b6b5..b64e7d5792 100644 --- a/client/network/src/transport.rs +++ b/client/network/src/transport.rs @@ -41,7 +41,6 @@ pub fn build_transport( keypair: identity::Keypair, memory_only: bool, wasm_external_transport: Option, - use_yamux_flow_control: bool ) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc) { // Build the base layer of the transport. let transport = if let Some(t) = wasm_external_transport { @@ -110,12 +109,9 @@ pub fn build_transport( mplex_config.max_buffer_len(usize::MAX); let mut yamux_config = libp2p::yamux::Config::default(); - - if use_yamux_flow_control { - // Enable proper flow-control: window updates are only sent when - // buffered data has been consumed. - yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::OnRead); - } + // Enable proper flow-control: window updates are only sent when + // buffered data has been consumed. + yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::OnRead); core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) .map_inbound(move |muxer| core::muxing::StreamMuxerBox::new(muxer)) diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index cfe815f174..ccc1bb71f9 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -225,7 +225,6 @@ fn node_config Date: Sun, 18 Oct 2020 19:58:52 +0200 Subject: [PATCH 09/13] Make `queryStorage` and `storagePairs` unsafe RPC functions (#7342) The RPC calls can be rather expensive and can easily bring a RPC node in some problems ;) --- client/rpc-api/src/state/error.rs | 2 ++ client/rpc/src/state/mod.rs | 18 ++++++++++-- client/rpc/src/state/tests.rs | 49 ++++++++++++++++++++++++++----- client/service/src/builder.rs | 7 ++++- 4 files changed, 64 insertions(+), 12 deletions(-) diff --git a/client/rpc-api/src/state/error.rs b/client/rpc-api/src/state/error.rs index 2fcca3c343..1c22788062 100644 --- a/client/rpc-api/src/state/error.rs +++ b/client/rpc-api/src/state/error.rs @@ -51,6 +51,8 @@ pub enum Error { /// Maximum allowed value max: u32, }, + /// Call to an unsafe RPC was denied. + UnsafeRpcCalled(crate::policy::UnsafeRpcError), } impl std::error::Error for Error { diff --git a/client/rpc/src/state/mod.rs b/client/rpc/src/state/mod.rs index 01c7c5f1eb..8573b3cf82 100644 --- a/client/rpc/src/state/mod.rs +++ b/client/rpc/src/state/mod.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; use rpc::{Result as RpcResult, futures::{Future, future::result}}; -use sc_rpc_api::state::ReadProof; +use sc_rpc_api::{DenyUnsafe, state::ReadProof}; use sc_client_api::light::{RemoteBlockchain, Fetcher}; use sp_core::{Bytes, storage::{StorageKey, PrefixedStorageKey, StorageData, StorageChangeSet}}; use sp_version::RuntimeVersion; @@ -171,6 +171,7 @@ pub trait StateBackend: Send + Sync + 'static pub fn new_full( client: Arc, subscriptions: SubscriptionManager, + deny_unsafe: DenyUnsafe, ) -> (State, ChildState) where Block: BlockT + 'static, @@ -185,7 +186,7 @@ pub fn new_full( self::state_full::FullState::new(client.clone(), subscriptions.clone()) ); let backend = Box::new(self::state_full::FullState::new(client, subscriptions)); - (State { backend }, ChildState { backend: child_backend }) + (State { backend, deny_unsafe }, ChildState { backend: child_backend }) } /// Create new state API that works on light node. @@ -194,6 +195,7 @@ pub fn new_light>( subscriptions: SubscriptionManager, remote_blockchain: Arc>, fetcher: Arc, + deny_unsafe: DenyUnsafe, ) -> (State, ChildState) where Block: BlockT + 'static, @@ -217,12 +219,14 @@ pub fn new_light>( remote_blockchain, fetcher, )); - (State { backend }, ChildState { backend: child_backend }) + (State { backend, deny_unsafe }, ChildState { backend: child_backend }) } /// State API with subscriptions support. pub struct State { backend: Box>, + /// Whether to deny unsafe calls + deny_unsafe: DenyUnsafe, } impl StateApi for State @@ -249,6 +253,10 @@ impl StateApi for State key_prefix: StorageKey, block: Option, ) -> FutureResult> { + if let Err(err) = self.deny_unsafe.check_if_safe() { + return Box::new(result(Err(err.into()))) + } + self.backend.storage_pairs(block, key_prefix) } @@ -292,6 +300,10 @@ impl StateApi for State from: Block::Hash, to: Option ) -> FutureResult>> { + if let Err(err) = self.deny_unsafe.check_if_safe() { + return Box::new(result(Err(err.into()))) + } + self.backend.query_storage(from, to, keys) } diff --git a/client/rpc/src/state/tests.rs b/client/rpc/src/state/tests.rs index b6677a1f2f..d145ac5e55 100644 --- a/client/rpc/src/state/tests.rs +++ b/client/rpc/src/state/tests.rs @@ -32,6 +32,7 @@ use substrate_test_runtime_client::{ sp_consensus::BlockOrigin, runtime, }; +use sc_rpc_api::DenyUnsafe; use sp_runtime::generic::BlockId; use crate::testing::TaskExecutor; use futures::{executor, compat::Future01CompatExt}; @@ -58,7 +59,11 @@ fn should_return_storage() { .add_extra_storage(b":map:acc2".to_vec(), vec![1, 2, 3]) .build(); let genesis_hash = client.genesis_hash(); - let (client, child) = new_full(Arc::new(client), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (client, child) = new_full( + Arc::new(client), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let key = StorageKey(KEY.to_vec()); assert_eq!( @@ -96,7 +101,11 @@ fn should_return_child_storage() { .add_child_storage(&child_info, "key", vec![42_u8]) .build()); let genesis_hash = client.genesis_hash(); - let (_client, child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor))); + let (_client, child) = new_full( + client, + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let child_key = prefixed_storage_key(); let key = StorageKey(b"key".to_vec()); @@ -131,7 +140,11 @@ fn should_return_child_storage() { fn should_call_contract() { let client = Arc::new(substrate_test_runtime_client::new()); let genesis_hash = client.genesis_hash(); - let (client, _child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor))); + let (client, _child) = new_full( + client, + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); assert_matches!( client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()).wait(), @@ -145,7 +158,11 @@ fn should_notify_about_storage_changes() { { let mut client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); api.subscribe_storage(Default::default(), subscriber, None.into()); @@ -179,7 +196,11 @@ fn should_send_initial_storage_changes_and_notifications() { { let mut client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into())); @@ -217,7 +238,11 @@ fn should_send_initial_storage_changes_and_notifications() { #[test] fn should_query_storage() { fn run_tests(mut client: Arc, has_changes_trie_config: bool) { - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let mut add_block = |nonce| { let mut builder = client.new_block(Default::default()).unwrap(); @@ -434,7 +459,11 @@ fn should_split_ranges() { #[test] fn should_return_runtime_version() { let client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\ \"specVersion\":2,\"implVersion\":2,\"apis\":[[\"0xdf6acb689907609b\",3],\ @@ -457,7 +486,11 @@ fn should_notify_on_runtime_version_initially() { { let client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); api.subscribe_runtime_version(Default::default(), subscriber); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 410198af26..e0661f7f62 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -720,13 +720,18 @@ fn gen_handler( subscriptions.clone(), remote_blockchain.clone(), on_demand, + deny_unsafe, ); (chain, state, child_state) } else { // Full nodes let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone()); - let (state, child_state) = sc_rpc::state::new_full(client.clone(), subscriptions.clone()); + let (state, child_state) = sc_rpc::state::new_full( + client.clone(), + subscriptions.clone(), + deny_unsafe, + ); (chain, state, child_state) }; From 9169f46c12de258490edc49cd4ce18402e1942fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= <123550+andresilva@users.noreply.github.com> Date: Fri, 16 Oct 2020 15:22:50 +0100 Subject: [PATCH 10/13] consensus: prioritize finality work over block import in queue (#7307) * consensus: prioritize finality work over block import in queue * consensus: add test for import queue task priority --- primitives/consensus/common/Cargo.toml | 1 + .../common/src/import_queue/basic_queue.rs | 337 +++++++++++++++--- 2 files changed, 281 insertions(+), 57 deletions(-) diff --git a/primitives/consensus/common/Cargo.toml b/primitives/consensus/common/Cargo.toml index a6f8c01928..e8eaa06ee0 100644 --- a/primitives/consensus/common/Cargo.toml +++ b/primitives/consensus/common/Cargo.toml @@ -36,6 +36,7 @@ prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../.. wasm-timer = "0.2.4" [dev-dependencies] +futures = "0.3.4" sp-test-primitives = { version = "2.0.0", path = "../../test-primitives" } [features] diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index e59f7ab5b6..6c1c820072 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -36,8 +36,10 @@ use crate::{ /// Interface to a basic block import queue that is importing blocks sequentially in a separate /// task, with plugable verification. pub struct BasicQueue { - /// Channel to send messages to the background task. - sender: TracingUnboundedSender>, + /// Channel to send finality work messages to the background task. + finality_sender: TracingUnboundedSender>, + /// Channel to send block import messages to the background task. + block_import_sender: TracingUnboundedSender>, /// Results coming from the worker task. result_port: BufferedLinkReceiver, _phantom: PhantomData, @@ -46,7 +48,8 @@ pub struct BasicQueue { impl Drop for BasicQueue { fn drop(&mut self) { // Flush the queue and close the receiver to terminate the future. - self.sender.close_channel(); + self.finality_sender.close_channel(); + self.block_import_sender.close_channel(); self.result_port.close(); } } @@ -65,12 +68,16 @@ impl BasicQueue { prometheus_registry: Option<&Registry>, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); - let metrics = prometheus_registry.and_then(|r| + + let metrics = prometheus_registry.and_then(|r| { Metrics::register(r) - .map_err(|err| { log::warn!("Failed to register Prometheus metrics: {}", err); }) - .ok() - ); - let (future, worker_sender) = BlockImportWorker::new( + .map_err(|err| { + log::warn!("Failed to register Prometheus metrics: {}", err); + }) + .ok() + }); + + let (future, finality_sender, block_import_sender) = BlockImportWorker::new( result_sender, verifier, block_import, @@ -82,7 +89,8 @@ impl BasicQueue { spawner.spawn_blocking("basic-block-import-worker", future.boxed()); Self { - sender: worker_sender, + finality_sender, + block_import_sender, result_port, _phantom: PhantomData, } @@ -96,7 +104,9 @@ impl ImportQueue for BasicQueue } trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); + let res = + self.block_import_sender.unbounded_send(worker_messages::ImportBlocks(origin, blocks)); + if res.is_err() { log::error!( target: "sync", @@ -110,12 +120,12 @@ impl ImportQueue for BasicQueue who: Origin, hash: B::Hash, number: NumberFor, - justification: Justification + justification: Justification, ) { - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportJustification(who, hash, number, justification) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportJustification(who, hash, number, justification), + ); + if res.is_err() { log::error!( target: "sync", @@ -132,10 +142,10 @@ impl ImportQueue for BasicQueue finality_proof: Vec, ) { trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportFinalityProof(who, hash, number, finality_proof), + ); + if res.is_err() { log::error!( target: "sync", @@ -151,12 +161,16 @@ impl ImportQueue for BasicQueue } } -/// Message destinated to the background worker. -#[derive(Debug)] -enum ToWorkerMsg { - ImportBlocks(BlockOrigin, Vec>), - ImportJustification(Origin, B::Hash, NumberFor, Justification), - ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), +/// Messages destinated to the background worker. +mod worker_messages { + use super::*; + + pub struct ImportBlocks(pub BlockOrigin, pub Vec>); + + pub enum Finality { + ImportJustification(Origin, B::Hash, NumberFor, Justification), + ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), + } } struct BlockImportWorker { @@ -176,8 +190,18 @@ impl BlockImportWorker { justification_import: Option>, finality_proof_import: Option>, metrics: Option, - ) -> (impl Future + Send, TracingUnboundedSender>) { - let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker"); + ) -> ( + impl Future + Send, + TracingUnboundedSender>, + TracingUnboundedSender>, + ) { + use worker_messages::*; + + let (finality_sender, mut finality_port) = + tracing_unbounded("mpsc_import_queue_worker_finality"); + + let (block_import_sender, mut block_import_port) = + tracing_unbounded("mpsc_import_queue_worker_blocks"); let mut worker = BlockImportWorker { result_sender, @@ -206,6 +230,8 @@ impl BlockImportWorker { // `Future`, and `block_import` is `None`. // - Something else, in which case `block_import` is `Some` and `importing` is None. // + // Additionally, the task will prioritize processing of finality work messages over + // block import messages, hence why two distinct channels are used. let mut block_import_verifier = Some((block_import, verifier)); let mut importing = None; @@ -217,7 +243,30 @@ impl BlockImportWorker { return Poll::Ready(()) } - // If we are in the process of importing a bunch of block, let's resume this + // Grab the next finality action request sent to the import queue. + let finality_work = match Stream::poll_next(Pin::new(&mut finality_port), cx) { + Poll::Ready(Some(msg)) => Some(msg), + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => None, + }; + + match finality_work { + Some(Finality::ImportFinalityProof(who, hash, number, proof)) => { + let (_, verif) = block_import_verifier + .as_mut() + .expect("block_import_verifier is always Some; qed"); + + worker.import_finality_proof(verif, who, hash, number, proof); + continue; + } + Some(Finality::ImportJustification(who, hash, number, justification)) => { + worker.import_justification(who, hash, number, justification); + continue; + } + None => {} + } + + // If we are in the process of importing a bunch of blocks, let's resume this // process before doing anything more. if let Some(imp_fut) = importing.as_mut() { match Future::poll(Pin::new(imp_fut), cx) { @@ -232,34 +281,25 @@ impl BlockImportWorker { debug_assert!(importing.is_none()); debug_assert!(block_import_verifier.is_some()); - // Grab the next action request sent to the import queue. - let msg = match Stream::poll_next(Pin::new(&mut port), cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, - }; + // Grab the next block import request sent to the import queue. + let ImportBlocks(origin, blocks) = + match Stream::poll_next(Pin::new(&mut block_import_port), cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + }; - match msg { - ToWorkerMsg::ImportBlocks(origin, blocks) => { - // On blocks import request, we merely *start* the process and store - // a `Future` into `importing`. - let (bi, verif) = block_import_verifier.take() - .expect("block_import_verifier is always Some; qed"); - importing = Some(worker.import_batch(bi, verif, origin, blocks)); - }, - ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { - let (_, verif) = block_import_verifier.as_mut() - .expect("block_import_verifier is always Some; qed"); - worker.import_finality_proof(verif, who, hash, number, proof); - }, - ToWorkerMsg::ImportJustification(who, hash, number, justification) => { - worker.import_justification(who, hash, number, justification); - } - } + // On blocks import request, we merely *start* the process and store + // a `Future` into `importing`. + let (block_import, verifier) = block_import_verifier + .take() + .expect("block_import_verifier is always Some; qed"); + + importing = Some(worker.import_batch(block_import, verifier, origin, blocks)); } }); - (future, sender) + (future, finality_sender, block_import_sender) } /// Returns a `Future` that imports the given blocks and sends the results on @@ -353,12 +393,11 @@ fn import_many_blocks, Transaction>( Output = ( usize, usize, - Vec<(Result>, BlockImportError>, B::Hash,)>, + Vec<(Result>, BlockImportError>, B::Hash)>, BoxBlockImport, - V - ) -> -{ + V, + ), +> { let count = blocks.len(); let blocks_range = match ( @@ -451,3 +490,187 @@ fn import_many_blocks, Transaction>( Poll::Pending }) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + import_queue::{CacheKeyId, Verifier}, + BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport, + }; + use futures::{executor::block_on, Future}; + use sp_test_primitives::{Block, BlockNumber, Extrinsic, Hash, Header}; + use std::collections::HashMap; + + impl Verifier for () { + fn verify( + &mut self, + origin: BlockOrigin, + header: Header, + _justification: Option, + _body: Option>, + ) -> Result<(BlockImportParams, Option)>>), String> { + Ok((BlockImportParams::new(origin, header), None)) + } + } + + impl BlockImport for () { + type Error = crate::Error; + type Transaction = Extrinsic; + + fn check_block( + &mut self, + _block: BlockCheckParams, + ) -> Result { + Ok(ImportResult::imported(false)) + } + + fn import_block( + &mut self, + _block: BlockImportParams, + _cache: HashMap>, + ) -> Result { + Ok(ImportResult::imported(true)) + } + } + + impl JustificationImport for () { + type Error = crate::Error; + + fn import_justification( + &mut self, + _hash: Hash, + _number: BlockNumber, + _justification: Justification, + ) -> Result<(), Self::Error> { + Ok(()) + } + } + + #[derive(Debug, PartialEq)] + enum Event { + JustificationImported(Hash), + BlockImported(Hash), + } + + #[derive(Default)] + struct TestLink { + events: Vec, + } + + impl Link for TestLink { + fn blocks_processed( + &mut self, + _imported: usize, + _count: usize, + results: Vec<(Result, BlockImportError>, Hash)>, + ) { + if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) { + self.events.push(Event::BlockImported(hash)); + } + } + + fn justification_imported( + &mut self, + _who: Origin, + hash: &Hash, + _number: BlockNumber, + _success: bool, + ) { + self.events.push(Event::JustificationImported(hash.clone())) + } + } + + #[test] + fn prioritizes_finality_work_over_block_import() { + let (result_sender, mut result_port) = buffered_link::buffered_link(); + + let (mut worker, mut finality_sender, mut block_import_sender) = + BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None, None); + + let mut import_block = |n| { + let header = Header { + parent_hash: Hash::random(), + number: n, + extrinsics_root: Hash::random(), + state_root: Default::default(), + digest: Default::default(), + }; + + let hash = header.hash(); + + block_on(block_import_sender.send(worker_messages::ImportBlocks( + BlockOrigin::Own, + vec![IncomingBlock { + hash, + header: Some(header), + body: None, + justification: None, + origin: None, + allow_missing_state: false, + import_existing: false, + }], + ))) + .unwrap(); + + hash + }; + + let mut import_justification = || { + let hash = Hash::random(); + + block_on(finality_sender.send(worker_messages::Finality::ImportJustification( + libp2p::PeerId::random(), + hash, + 1, + Vec::new(), + ))) + .unwrap(); + + hash + }; + + let mut link = TestLink::default(); + + // we send a bunch of tasks to the worker + let block1 = import_block(1); + let block2 = import_block(2); + let block3 = import_block(3); + let justification1 = import_justification(); + let justification2 = import_justification(); + let block4 = import_block(4); + let block5 = import_block(5); + let block6 = import_block(6); + let justification3 = import_justification(); + + // we poll the worker until we have processed 9 events + block_on(futures::future::poll_fn(|cx| { + while link.events.len() < 9 { + match Future::poll(Pin::new(&mut worker), cx) { + Poll::Pending => {} + Poll::Ready(()) => panic!("import queue worker should not conclude."), + } + + result_port.poll_actions(cx, &mut link).unwrap(); + } + + Poll::Ready(()) + })); + + // all justification tasks must be done before any block import work + assert_eq!( + link.events, + vec![ + Event::JustificationImported(justification1), + Event::JustificationImported(justification2), + Event::JustificationImported(justification3), + Event::BlockImported(block1), + Event::BlockImported(block2), + Event::BlockImported(block3), + Event::BlockImported(block4), + Event::BlockImported(block5), + Event::BlockImported(block6), + ] + ); + } +} From eee68d19e23f1933dd8e46639a684beaa239f3ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= <123550+andresilva@users.noreply.github.com> Date: Fri, 16 Oct 2020 11:15:56 +0100 Subject: [PATCH 11/13] sync: only restart peers not doing finality related requests (#7322) * sync: only restart peers not doing finality related requests * sync: add test for sync restart * sync: add better docs to restart method --- client/network/src/protocol/sync.rs | 131 ++++++++++++++++++++++++++-- 1 file changed, 122 insertions(+), 9 deletions(-) diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index bfd8c4fe21..303f40c582 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -1270,8 +1270,13 @@ impl ChainSync { self.pending_requests.set_all(); } - /// Restart the sync process. - fn restart<'a>(&'a mut self) -> impl Iterator), BadPeer>> + 'a { + /// Restart the sync process. This will reset all pending block requests and return an iterator + /// of new block requests to make to peers. Peers that were downloading finality data (i.e. + /// their state was `DownloadingJustification` or `DownloadingFinalityProof`) are unaffected and + /// will stay in the same state. + fn restart<'a>( + &'a mut self, + ) -> impl Iterator), BadPeer>> + 'a { self.blocks.clear(); let info = self.client.info(); self.best_queued_hash = info.best_hash; @@ -1279,11 +1284,24 @@ impl ChainSync { self.pending_requests.set_all(); debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); let old_peers = std::mem::take(&mut self.peers); + old_peers.into_iter().filter_map(move |(id, p)| { + // peers that were downloading justifications or finality proofs + // should be kept in that state. + match p.state { + PeerSyncState::DownloadingJustification(_) + | PeerSyncState::DownloadingFinalityProof(_) => { + self.peers.insert(id, p); + return None; + } + _ => {} + } + + // handle peers that were in other states. match self.new_peer(id.clone(), p.best_hash, p.best_number) { Ok(None) => None, Ok(Some(x)) => Some(Ok((id, x))), - Err(e) => Some(Err(e)) + Err(e) => Some(Err(e)), } }) } @@ -1552,15 +1570,15 @@ fn validate_blocks(blocks: &Vec>, who: #[cfg(test)] mod test { - use super::*; use super::message::FromBlock; - use substrate_test_runtime_client::{ - runtime::Block, - DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, - }; - use sp_blockchain::HeaderBackend; + use super::*; use sc_block_builder::BlockBuilderProvider; + use sp_blockchain::HeaderBackend; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; + use substrate_test_runtime_client::{ + runtime::{Block, Hash}, + ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, + }; #[test] fn processes_empty_response_on_justification_request_for_unknown_block() { @@ -1639,4 +1657,99 @@ mod test { }) ); } + + #[test] + fn restart_doesnt_affect_peers_downloading_finality_data() { + let mut client = Arc::new(TestClientBuilder::new().build()); + let info = client.info(); + + let mut sync = ChainSync::new( + Roles::AUTHORITY, + client.clone(), + &info, + None, + Box::new(DefaultBlockAnnounceValidator), + 1, + ); + + let peer_id1 = PeerId::random(); + let peer_id2 = PeerId::random(); + let peer_id3 = PeerId::random(); + let peer_id4 = PeerId::random(); + + let mut new_blocks = |n| { + for _ in 0..n { + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).unwrap(); + } + + let info = client.info(); + (info.best_hash, info.best_number) + }; + + let (b1_hash, b1_number) = new_blocks(50); + let (b2_hash, b2_number) = new_blocks(10); + + // add 2 peers at blocks that we don't have locally + sync.new_peer(peer_id1.clone(), Hash::random(), 42).unwrap(); + sync.new_peer(peer_id2.clone(), Hash::random(), 10).unwrap(); + + // we wil send block requests to these peers + // for these blocks we don't know about + assert!(sync.block_requests().all(|(p, _)| { *p == peer_id1 || *p == peer_id2 })); + + // add a new peer at a known block + sync.new_peer(peer_id3.clone(), b1_hash, b1_number).unwrap(); + + // we request a justification for a block we have locally + sync.request_justification(&b1_hash, b1_number); + + // the justification request should be scheduled to the + // new peer which is at the given block + assert!(sync.justification_requests().any(|(p, r)| { + p == peer_id3 + && r.fields == BlockAttributes::JUSTIFICATION + && r.from == message::FromBlock::Hash(b1_hash) + && r.to == None + })); + + assert_eq!( + sync.peers.get(&peer_id3).unwrap().state, + PeerSyncState::DownloadingJustification(b1_hash), + ); + + // add another peer at a known later block + sync.new_peer(peer_id4.clone(), b2_hash, b2_number).unwrap(); + + // we request a finality proof for a block we have locally + sync.request_finality_proof(&b2_hash, b2_number); + + // the finality proof request should be scheduled to peer 4 + // which is at that block + assert!( + sync.finality_proof_requests().any(|(p, r)| { p == peer_id4 && r.block == b2_hash }) + ); + + assert_eq!( + sync.peers.get(&peer_id4).unwrap().state, + PeerSyncState::DownloadingFinalityProof(b2_hash), + ); + + // we restart the sync state + let block_requests = sync.restart(); + + // which should make us send out block requests to the first two peers + assert!(block_requests.map(|r| r.unwrap()).all(|(p, _)| { p == peer_id1 || p == peer_id2 })); + + // peer 3 and 4 should be unaffected as they were downloading finality data + assert_eq!( + sync.peers.get(&peer_id3).unwrap().state, + PeerSyncState::DownloadingJustification(b1_hash), + ); + + assert_eq!( + sync.peers.get(&peer_id4).unwrap().state, + PeerSyncState::DownloadingFinalityProof(b2_hash), + ); + } } From 9857ac1e1275b8f0002b46d643965342c4687287 Mon Sep 17 00:00:00 2001 From: Jordan Beauchamp Date: Tue, 10 Nov 2020 15:53:08 +1300 Subject: [PATCH 12/13] Undo phragmen merge --- frame/elections-phragmen/src/lib.rs | 94 +++++++++-------------------- 1 file changed, 29 insertions(+), 65 deletions(-) diff --git a/frame/elections-phragmen/src/lib.rs b/frame/elections-phragmen/src/lib.rs index 7891853530..dd816033ae 100644 --- a/frame/elections-phragmen/src/lib.rs +++ b/frame/elections-phragmen/src/lib.rs @@ -891,18 +891,16 @@ impl Module { num_to_elect, 0, candidates, - voters_and_votes.clone(), - None, - ).map(|ElectionResult { winners, assignments: _ }| { - // this is already sorted by id. - let old_members_ids_sorted = >::take().into_iter() + voters_and_votes, + ); + + if let Some(ElectionResult { winners, assignments }) = maybe_phragmen_result { + let old_members_ids = >::take().into_iter() .map(|(m, _)| m) .collect::>(); - // this one needs a sort by id. - let mut old_runners_up_ids_sorted = >::take().into_iter() + let old_runners_up_ids = >::take().into_iter() .map(|(r, _)| r) .collect::>(); - old_runners_up_ids_sorted.sort(); // filter out those who had literally no votes at all. // NOTE: the need to do this is because all candidates, even those who have no @@ -941,17 +939,17 @@ impl Module { // split new set into winners and runners up. let split_point = desired_seats.min(new_set_with_stake.len()); - let mut new_members_sorted_by_id = (&new_set_with_stake[..split_point]).to_vec(); + let mut new_members = (&new_set_with_stake[..split_point]).to_vec(); // save the runners up as-is. They are sorted based on desirability. // save the members, sorted based on account id. - new_members_sorted_by_id.sort_by(|i, j| i.0.cmp(&j.0)); + new_members.sort_by(|i, j| i.0.cmp(&j.0)); // Now we select a prime member using a [Borda count](https://en.wikipedia.org/wiki/Borda_count). // We weigh everyone's vote for that new member by a multiplier based on the order // of the votes. i.e. the first person a voter votes for gets a 16x multiplier, // the next person gets a 15x multiplier, an so on... (assuming `MAXIMUM_VOTE` = 16) - let mut prime_votes: Vec<_> = new_members_sorted_by_id.iter().map(|c| (&c.0, BalanceOf::::zero())).collect(); + let mut prime_votes: Vec<_> = new_members.iter().map(|c| (&c.0, BalanceOf::::zero())).collect(); for (_, stake, votes) in voters_and_stakes.into_iter() { for (vote_multiplier, who) in votes.iter() .enumerate() @@ -969,58 +967,54 @@ impl Module { // the person with the "highest" account id based on the sort above. let prime = prime_votes.into_iter().max_by_key(|x| x.1).map(|x| x.0.clone()); - // new_members_sorted_by_id is sorted by account id. - let new_members_ids_sorted = new_members_sorted_by_id + // new_members_ids is sorted by account id. + let new_members_ids = new_members .iter() .map(|(m, _)| m.clone()) .collect::>(); - let new_runners_up_sorted_by_rank = &new_set_with_stake[split_point..] + let new_runners_up = &new_set_with_stake[split_point..] .into_iter() .cloned() .rev() .collect::)>>(); // new_runners_up remains sorted by desirability. - let mut new_runners_up_ids_sorted = new_runners_up_sorted_by_rank + let new_runners_up_ids = new_runners_up .iter() .map(|(r, _)| r.clone()) .collect::>(); - new_runners_up_ids_sorted.sort(); // report member changes. We compute diff because we need the outgoing list. let (incoming, outgoing) = T::ChangeMembers::compute_members_diff( - &new_members_ids_sorted, - &old_members_ids_sorted, + &new_members_ids, + &old_members_ids, ); T::ChangeMembers::change_members_sorted( &incoming, &outgoing, - &new_members_ids_sorted, + &new_members_ids, ); T::ChangeMembers::set_prime(prime); - // outgoing members lose their bond. + // outgoing candidates lose their bond. let mut to_burn_bond = outgoing.to_vec(); // compute the outgoing of runners up as well and append them to the `to_burn_bond` { let (_, outgoing) = T::ChangeMembers::compute_members_diff( - &new_runners_up_ids_sorted, - &old_runners_up_ids_sorted, + &new_runners_up_ids, + &old_runners_up_ids, ); - // none of the ones computed to be outgoing must still be in the list. - debug_assert!(outgoing.iter().all(|o| !new_runners_up_ids_sorted.contains(o))); to_burn_bond.extend(outgoing); } // Burn loser bond. members list is sorted. O(NLogM) (N candidates, M members) - // runner up list is also sorted. O(NLogK) given K runner ups. Overall: O(NLogM + N*K) + // runner up list is not sorted. O(K*N) given K runner ups. Overall: O(NLogM + N*K) // both the member and runner counts are bounded. exposed_candidates.into_iter().for_each(|c| { // any candidate who is not a member and not a runner up. - if - new_members_ids_sorted.binary_search(&c).is_err() && - new_runners_up_ids_sorted.binary_search(&c).is_err() + if new_members.binary_search_by_key(&c, |(m, _)| m.clone()).is_err() + && !new_runners_up_ids.contains(&c) { let (imbalance, _) = T::Currency::slash_reserved(&c, T::CandidacyBond::get()); T::LoserCandidate::on_unbalanced(imbalance); @@ -1033,10 +1027,13 @@ impl Module { T::LoserCandidate::on_unbalanced(imbalance); }); - >::put(&new_members_sorted_by_id); - >::put(new_runners_up_sorted_by_rank); + >::put(&new_members); + >::put(new_runners_up); - Self::deposit_event(RawEvent::NewTerm(new_members_sorted_by_id.clone().to_vec())); + Self::deposit_event(RawEvent::NewTerm(new_members.clone().to_vec())); + } else { + Self::deposit_event(RawEvent::EmptyTerm); + } // clean candidates. >::kill(); @@ -1300,6 +1297,7 @@ mod tests { self.genesis_members = members; self } + #[cfg(feature = "runtime-benchmarks")] pub fn desired_members(mut self, count: u32) -> Self { self.desired_members = count; self @@ -2820,38 +2818,4 @@ mod tests { assert!(Elections::candidates().is_empty()); }) } - - #[test] - fn unsorted_runners_up_are_detected() { - ExtBuilder::default().desired_runners_up(2).desired_members(1).build_and_execute(|| { - assert_ok!(submit_candidacy(Origin::signed(5))); - assert_ok!(submit_candidacy(Origin::signed(4))); - assert_ok!(submit_candidacy(Origin::signed(3))); - - - assert_ok!(vote(Origin::signed(5), vec![5], 50)); - assert_ok!(vote(Origin::signed(4), vec![4], 5)); - assert_ok!(vote(Origin::signed(3), vec![3], 15)); - - System::set_block_number(5); - Elections::end_block(System::block_number()); - - assert_eq!(Elections::members_ids(), vec![5]); - assert_eq!(Elections::runners_up_ids(), vec![4, 3]); - - assert_ok!(submit_candidacy(Origin::signed(2))); - assert_ok!(vote(Origin::signed(2), vec![2], 10)); - - System::set_block_number(10); - Elections::end_block(System::block_number()); - - assert_eq!(Elections::members_ids(), vec![5]); - assert_eq!(Elections::runners_up_ids(), vec![2, 3]); - - // 4 is outgoing runner-up. Slash candidacy bond. - assert_eq!(balances(&4), (35, 2)); - // 3 stays. - assert_eq!(balances(&3), (25, 5)); - }) - } } From 30932b2bf5f28a7cfd56905d672656c2421274e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= <123550+andresilva@users.noreply.github.com> Date: Mon, 26 Oct 2020 12:29:36 +0000 Subject: [PATCH 13/13] grandpa: fix early enactment of forced changes (#7321) * grandpa: fix early enactment of forced authority set changes * grandpa: add test for early enactment of forced changes * grandpa: fix typo in log message * grandpa: only allow one pending forced change per fork * grandpa: fix tests --- client/finality-grandpa/src/authorities.rs | 192 ++++++++++++++------- 1 file changed, 132 insertions(+), 60 deletions(-) diff --git a/client/finality-grandpa/src/authorities.rs b/client/finality-grandpa/src/authorities.rs index b57e0347fb..f263fcc101 100644 --- a/client/finality-grandpa/src/authorities.rs +++ b/client/finality-grandpa/src/authorities.rs @@ -32,14 +32,42 @@ use std::ops::Add; use std::sync::Arc; /// Error type returned on operations on the `AuthoritySet`. -#[derive(Debug, derive_more::Display, derive_more::From)] -pub enum Error { - #[display("Invalid authority set, either empty or with an authority weight set to 0.")] +#[derive(Debug, derive_more::Display)] +pub enum Error { + #[display(fmt = "Invalid authority set, either empty or with an authority weight set to 0.")] InvalidAuthoritySet, + #[display(fmt = "Client error during ancestry lookup: {}", _0)] + Client(E), + #[display(fmt = "Duplicate authority set change.")] + DuplicateAuthoritySetChange, + #[display(fmt = "Multiple pending forced authority set changes are not allowed.")] + MultiplePendingForcedAuthoritySetChanges, + #[display( + fmt = "A pending forced authority set change could not be applied since it must be applied after \ + the pending standard change at #{}", + _0 + )] + ForcedAuthoritySetChangeDependencyUnsatisfied(N), #[display(fmt = "Invalid operation in the pending changes tree: {}", _0)] ForkTree(fork_tree::Error), } +impl From> for Error { + fn from(err: fork_tree::Error) -> Error { + match err { + fork_tree::Error::Client(err) => Error::Client(err), + fork_tree::Error::Duplicate => Error::DuplicateAuthoritySetChange, + err => Error::ForkTree(err), + } + } +} + +impl From for Error { + fn from(err: E) -> Error { + Error::Client(err) + } +} + /// A shared authority set. pub struct SharedAuthoritySet { inner: Arc>>, @@ -111,14 +139,20 @@ pub(crate) struct AuthoritySet { /// a given branch pub(crate) pending_standard_changes: ForkTree>, /// Pending forced changes across different forks (at most one per fork). - /// Forced changes are enacted on block depth (not finality), for this reason - /// only one forced change should exist per fork. + /// Forced changes are enacted on block depth (not finality), for this + /// reason only one forced change should exist per fork. When trying to + /// apply forced changes we keep track of any pending standard changes that + /// they may depend on, this is done by making sure that any pending change + /// that is an ancestor of the forced changed and its effective block number + /// is lower than the last finalized block (as signaled in the forced + /// change) must be applied beforehand. pending_forced_changes: Vec>, } impl AuthoritySet -where H: PartialEq, - N: Ord, +where + H: PartialEq, + N: Ord, { // authority sets must be non-empty and all weights must be greater than 0 fn invalid_authority_list(authorities: &AuthorityList) -> bool { @@ -180,7 +214,7 @@ where &self, best_hash: &H, is_descendent_of: &F, - ) -> Result, fork_tree::Error> + ) -> Result, Error> where F: Fn(&H, &H) -> Result, E: std::error::Error, @@ -219,7 +253,8 @@ where &mut self, pending: PendingChange, is_descendent_of: &F, - ) -> Result<(), Error> where + ) -> Result<(), Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -250,16 +285,18 @@ where &mut self, pending: PendingChange, is_descendent_of: &F, - ) -> Result<(), Error> where + ) -> Result<(), Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { - for change in self.pending_forced_changes.iter() { - if change.canon_hash == pending.canon_hash || - is_descendent_of(&change.canon_hash, &pending.canon_hash) - .map_err(fork_tree::Error::Client)? - { - return Err(fork_tree::Error::UnfinalizedAncestor.into()); + for change in &self.pending_forced_changes { + if change.canon_hash == pending.canon_hash { + return Err(Error::DuplicateAuthoritySetChange); + } + + if is_descendent_of(&change.canon_hash, &pending.canon_hash)? { + return Err(Error::MultiplePendingForcedAuthoritySetChanges); } } @@ -293,7 +330,8 @@ where &mut self, pending: PendingChange, is_descendent_of: &F, - ) -> Result<(), Error> where + ) -> Result<(), Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -341,52 +379,92 @@ where /// /// These transitions are always forced and do not lead to justifications /// which light clients can follow. + /// + /// Forced changes can only be applied after all pending standard changes + /// that it depends on have been applied. If any pending standard change + /// exists that is an ancestor of a given forced changed and which effective + /// block number is lower than the last finalized block (as defined by the + /// forced change), then the forced change cannot be applied. An error will + /// be returned in that case which will prevent block import. pub(crate) fn apply_forced_changes( &self, best_hash: H, best_number: N, is_descendent_of: &F, initial_sync: bool, - ) -> Result, E> - where F: Fn(&H, &H) -> Result, + ) -> Result, Error> + where + F: Fn(&H, &H) -> Result, + E: std::error::Error, { let mut new_set = None; - for change in self.pending_forced_changes.iter() + for change in self + .pending_forced_changes + .iter() .take_while(|c| c.effective_number() <= best_number) // to prevent iterating too far .filter(|c| c.effective_number() == best_number) { // check if the given best block is in the same branch as // the block that signaled the change. if change.canon_hash == best_hash || is_descendent_of(&change.canon_hash, &best_hash)? { + let median_last_finalized = match change.delay_kind { + DelayKind::Best { + ref median_last_finalized, + } => median_last_finalized.clone(), + _ => unreachable!( + "pending_forced_changes only contains forced changes; forced changes have delay kind Best; qed." + ), + }; + + // check if there's any pending standard change that we depend on + for (_, _, standard_change) in self.pending_standard_changes.roots() { + if standard_change.effective_number() <= median_last_finalized + && is_descendent_of(&standard_change.canon_hash, &change.canon_hash)? + { + log::info!(target: "afg", + "Not applying authority set change forced at block #{:?}, due to pending standard change at block #{:?}", + change.canon_height, + standard_change.effective_number(), + ); + + return Err( + Error::ForcedAuthoritySetChangeDependencyUnsatisfied( + standard_change.effective_number() + ) + ); + } + } + // apply this change: make the set canonical - afg_log!(initial_sync, + afg_log!( + initial_sync, "👴 Applying authority set change forced at block #{:?}", change.canon_height, ); - telemetry!(CONSENSUS_INFO; "afg.applying_forced_authority_set_change"; + + telemetry!( + CONSENSUS_INFO; + "afg.applying_forced_authority_set_change"; "block" => ?change.canon_height ); - let median_last_finalized = match change.delay_kind { - DelayKind::Best { ref median_last_finalized } => median_last_finalized.clone(), - _ => unreachable!("pending_forced_changes only contains forced changes; forced changes have delay kind Best; qed."), - }; - - new_set = Some((median_last_finalized, AuthoritySet { - current_authorities: change.next_authorities.clone(), - set_id: self.set_id + 1, - pending_standard_changes: ForkTree::new(), // new set, new changes. - pending_forced_changes: Vec::new(), - })); + new_set = Some(( + median_last_finalized, + AuthoritySet { + current_authorities: change.next_authorities.clone(), + set_id: self.set_id + 1, + pending_standard_changes: ForkTree::new(), // new set, new changes. + pending_forced_changes: Vec::new(), + }, + )); break; } - - // we don't wipe forced changes until another change is - // applied } + // we don't wipe forced changes until another change is applied, hence + // why we return a new set instead of mutating. Ok(new_set) } @@ -406,7 +484,8 @@ where finalized_number: N, is_descendent_of: &F, initial_sync: bool, - ) -> Result, Error> where + ) -> Result, Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -429,12 +508,11 @@ where Vec::new(), ); - // we will keep all forced change for any later blocks and that are a - // descendent of the finalized block (i.e. they are from this fork). + // we will keep all forced changes for any later blocks and that are a + // descendent of the finalized block (i.e. they are part of this branch). for change in pending_forced_changes { if change.effective_number() > finalized_number && - is_descendent_of(&finalized_hash, &change.canon_hash) - .map_err(fork_tree::Error::Client)? + is_descendent_of(&finalized_hash, &change.canon_hash)? { self.pending_forced_changes.push(change) } @@ -479,7 +557,8 @@ where finalized_hash: H, finalized_number: N, is_descendent_of: &F, - ) -> Result, Error> where + ) -> Result, Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -928,7 +1007,13 @@ mod tests { }; authorities.add_pending_change(change_a, &static_is_descendent_of(false)).unwrap(); - authorities.add_pending_change(change_b, &static_is_descendent_of(false)).unwrap(); + authorities.add_pending_change(change_b.clone(), &static_is_descendent_of(false)).unwrap(); + + // no duplicates are allowed + assert!(matches!( + authorities.add_pending_change(change_b, &static_is_descendent_of(false)), + Err(Error::DuplicateAuthoritySetChange) + )); // there's an effective change triggered at block 15 but not a standard one. // so this should do nothing. @@ -953,9 +1038,11 @@ mod tests { Err(Error::MultiplePendingForcedAuthoritySetChanges) )); - // too early. + // let's try and apply the forced changes. + // too early and there's no forced changes to apply. assert!( - authorities.apply_forced_changes("hash_a10", 10, &static_is_descendent_of(true), false) + authorities + .apply_forced_changes("hash_a10", 10, &static_is_descendent_of(true), false) .unwrap() .is_none() ); @@ -1379,26 +1466,11 @@ mod tests { add_pending_change(15, "C3", true); add_pending_change(20, "D", true); - println!( - "pending_changes: {:?}", - authorities - .pending_changes() - .map(|c| c.canon_hash) - .collect::>() - ); - // applying the standard change at A should not prune anything // other then the change that was applied authorities .apply_standard_changes("A", 5, &is_descendent_of, false) .unwrap(); - println!( - "pending_changes: {:?}", - authorities - .pending_changes() - .map(|c| c.canon_hash) - .collect::>() - ); assert_eq!(authorities.pending_changes().count(), 6);