diff --git a/Cargo.lock b/Cargo.lock index cb1afaae13..c698041dcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,9 +220,9 @@ checksum = "7deb0a829ca7bcfaf5da70b073a8d128619259a7be8216a355e23f00763059e5" [[package]] name = "async-channel" -version = "1.1.1" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee81ba99bee79f3c8ae114ae4baa7eaa326f63447cf2ec65e4393618b63f8770" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" dependencies = [ "concurrent-queue", "event-listener", @@ -230,34 +230,95 @@ dependencies = [ ] [[package]] -name = "async-std" -version = "1.6.2" +name = "async-executor" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00d68a33ebc8b57800847d00787307f84a562224a14db069b0acefe4c2abbf5d" +checksum = "d373d78ded7d0b3fa8039375718cde0aace493f2e34fb60f51cbf567562ca801" dependencies = [ "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell 1.4.1", + "vec-arena", +] + +[[package]] +name = "async-global-executor" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fefeb39da249f4c33af940b779a56723ce45809ef5c54dad84bb538d4ffb6d9e" +dependencies = [ + "async-executor", + "async-io", + "futures-lite", + "num_cpus", + "once_cell 1.4.1", +] + +[[package]] +name = "async-io" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38628c78a34f111c5a6b98fc87dfc056cd1590b61afe748b145be4623c56d194" +dependencies = [ + "cfg-if", + "concurrent-queue", + "fastrand", + "futures-lite", + "libc", + "log", + "once_cell 1.4.1", + "parking", + "polling", + "socket2", + "vec-arena", + "waker-fn", + "wepoll-sys-stjepang", + "winapi 0.3.9", +] + +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9fa76751505e8df1c7a77762f60486f60c71bbd9b8557f4da6ad47d083732ed" +dependencies = [ + "async-global-executor", + "async-io", + "async-mutex", + "blocking", "crossbeam-utils", "futures-channel", "futures-core", "futures-io", - "futures-timer 3.0.2", + "futures-lite", + "gloo-timers", "kv-log-macro", "log", "memchr", "num_cpus", - "once_cell 1.4.0", + "once_cell 1.4.1", "pin-project-lite", "pin-utils", "slab", - "smol", "wasm-bindgen-futures", ] [[package]] name = "async-task" -version = "3.0.0" +version = "4.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17772156ef2829aadc587461c7753af20b7e8db1529bc66855add962a3b35d3" +checksum = "8ab27c1aa62945039e44edaeee1dc23c74cc0c303dd5fe0fb462a184f1c3a518" [[package]] name = "async-tls" @@ -506,16 +567,16 @@ dependencies = [ [[package]] name = "blocking" -version = "0.4.7" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2468ff7bf85066b4a3678fede6fe66db31846d753ff0adfbfab2c6a6e81612b" +checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" dependencies = [ "async-channel", + "async-task", "atomic-waker", + "fastrand", "futures-lite", - "once_cell 1.4.0", - "parking", - "waker-fn", + "once_cell 1.4.1", ] [[package]] @@ -744,9 +805,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "1.1.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83c06aff61f2d899eb87c379df3cbf7876f14471dcab474e0b6dc90ab96c080" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" dependencies = [ "cache-padded", ] @@ -1332,9 +1393,9 @@ dependencies = [ [[package]] name = "event-listener" -version = "2.2.0" +version = "2.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "699d84875f1b72b4da017e6b0f77dfa88c0137f089958a88974d15938cbc2976" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" [[package]] name = "evm" @@ -1427,9 +1488,12 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" [[package]] name = "fastrand" -version = "1.3.3" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36a9cb09840f81cd211e435d00a4e487edd263dc3c8ff815c32dd76ad668ebed" +checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" +dependencies = [ + "instant", +] [[package]] name = "fdlimit" @@ -1585,13 +1649,14 @@ dependencies = [ "frame-system", "impl-trait-for-tuples", "log", - "once_cell 1.4.0", + "once_cell 1.4.1", "parity-scale-codec", "parity-util-mem", "paste", "pretty_assertions", "serde", "smallvec 1.4.1", + "sp-api", "sp-arithmetic", "sp-core", "sp-inherents", @@ -1600,6 +1665,7 @@ dependencies = [ "sp-state-machine", "sp-std", "sp-tracing", + "substrate-test-runtime-client", ] [[package]] @@ -1840,9 +1906,9 @@ checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" [[package]] name = "futures-lite" -version = "0.1.8" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180d8fc9819eb48a0c976672fbeea13a73e10999e812bdc9e14644c25ad51d60" +checksum = "381a7ad57b1bad34693f63f6f377e1abded7a9c85c9d3eb6771e11c60aaadab9" dependencies = [ "fastrand", "futures-core", @@ -1877,7 +1943,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" dependencies = [ - "once_cell 1.4.0", + "once_cell 1.4.1", ] [[package]] @@ -2736,9 +2802,9 @@ checksum = "3576a87f2ba00f6f106fdfcd16db1d698d648a26ad8e0573cad8537c3c362d2a" [[package]] name = "libc" -version = "0.2.73" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9" +checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743" [[package]] name = "libloading" @@ -4142,11 +4208,11 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +checksum = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad" dependencies = [ - "parking_lot 0.10.2", + "parking_lot 0.11.0", ] [[package]] @@ -5201,9 +5267,9 @@ dependencies = [ [[package]] name = "parking" -version = "1.0.5" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d4a6da31f8144a32532fe38fe8fb439a6842e0ec633f0037f0144c14e7f907" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" [[package]] name = "parking_lot" @@ -5424,6 +5490,19 @@ dependencies = [ "web-sys", ] +[[package]] +name = "polling" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0720e0b9ea9d52451cf29d3413ba8a9303f8815d9d9653ef70e03ff73e65566" +dependencies = [ + "cfg-if", + "libc", + "log", + "wepoll-sys-stjepang", + "winapi 0.3.9", +] + [[package]] name = "poly1305" version = "0.6.0" @@ -6102,7 +6181,7 @@ checksum = "952cd6b98c85bbc30efa1ba5783b8abf12fec8b3287ffa52605b9432313e34e4" dependencies = [ "cc", "libc", - "once_cell 1.4.0", + "once_cell 1.4.1", "spin", "untrusted", "web-sys", @@ -7759,27 +7838,6 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f" -[[package]] -name = "smol" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "620cbb3c6e34da57d3a248cda0cd01cd5848164dc062e764e65d06fe3ea7aed5" -dependencies = [ - "async-task", - "blocking", - "concurrent-queue", - "fastrand", - "futures-io", - "futures-util", - "libc", - "once_cell 1.4.0", - "scoped-tls", - "slab", - "socket2", - "wepoll-sys-stjepang", - "winapi 0.3.9", -] - [[package]] name = "snow" version = "0.7.1" @@ -9096,7 +9154,7 @@ checksum = "b0165e045cc2ae1660270ca65e1676dbaab60feb0f91b10f7d0665e9b47e31f2" dependencies = [ "failure", "hmac", - "once_cell 1.4.0", + "once_cell 1.4.1", "pbkdf2", "rand 0.7.3", "rustc-hash", @@ -9443,12 +9501,13 @@ checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" [[package]] name = "tracing" -version = "0.1.19" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c" +checksum = "b0987850db3733619253fe60e17cb59b82d37c7e6c0236bb81e4d6b87c879f27" dependencies = [ "cfg-if", "log", + "pin-project-lite", "tracing-attributes", "tracing-core", ] @@ -9466,9 +9525,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bcf46c1f1f06aeea2d6b81f3c863d0930a596c86ad1920d4e5bad6dd1d7119a" +checksum = "f50de3927f93d202783f4513cda820ab47ef17f624b03c096e86ef00c67e6b5f" dependencies = [ "lazy_static", ] @@ -9486,9 +9545,9 @@ dependencies = [ [[package]] name = "tracing-serde" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ccba2f8f16e0ed268fc765d9b7ff22e965e7185d32f8f1ec8294fe17d86e79" +checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" dependencies = [ "serde", "tracing-core", @@ -9496,9 +9555,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.10" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b33f8b2ef2ab0c3778c12646d9c42a24f7772bee4cdafc72199644a9f58fdc" +checksum = "4ef0a5e15477aa303afbfac3a44cba9b6430fdaad52423b1e6c0dbbe28c3eedd" dependencies = [ "ansi_term 0.12.1", "chrono", @@ -9509,6 +9568,8 @@ dependencies = [ "serde_json", "sharded-slab", "smallvec 1.4.1", + "thread_local", + "tracing", "tracing-core", "tracing-log", "tracing-serde", @@ -9733,6 +9794,12 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" +[[package]] +name = "vec-arena" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index f16d02cab5..07c54d7a7e 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -43,7 +43,7 @@ use structopt::{ clap::{self, AppSettings}, StructOpt, }; -use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::{filter::Directive, layer::SubscriberExt}; /// Substrate client CLI /// @@ -234,6 +234,13 @@ pub fn init_logger( tracing_receiver: sc_tracing::TracingReceiver, tracing_targets: Option, ) -> std::result::Result<(), String> { + fn parse_directives(dirs: impl AsRef) -> Vec { + dirs.as_ref() + .split(',') + .filter_map(|s| s.parse().ok()) + .collect() + } + if let Err(e) = tracing_log::LogTracer::init() { return Err(format!( "Registering Substrate logger failed: {:}!", e @@ -257,7 +264,7 @@ pub fn init_logger( if lvl != "" { // We're not sure if log or tracing is available at this moment, so silently ignore the // parse error. - if let Ok(directive) = lvl.parse() { + for directive in parse_directives(lvl) { env_filter = env_filter.add_directive(directive); } } @@ -266,7 +273,7 @@ pub fn init_logger( if pattern != "" { // We're not sure if log or tracing is available at this moment, so silently ignore the // parse error. - if let Ok(directive) = pattern.parse() { + for directive in parse_directives(pattern) { env_filter = env_filter.add_directive(directive); } } @@ -299,3 +306,76 @@ pub fn init_logger( } Ok(()) } +mod tests { + use super::*; + use tracing::{metadata::Kind, subscriber::Interest, Callsite, Level, Metadata}; + use std::{process::Command, env}; + + #[test] + fn test_logger_filters() { + let test_pattern = "afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error"; + init_logger(&test_pattern, Default::default(), Default::default()).unwrap(); + + tracing::dispatcher::get_default(|dispatcher| { + let test_filter = |target, level| { + struct DummyCallSite; + impl Callsite for DummyCallSite { + fn set_interest(&self, _: Interest) {} + fn metadata(&self) -> &Metadata<'_> { + unreachable!(); + } + } + + let metadata = tracing::metadata!( + name: "", + target: target, + level: level, + fields: &[], + callsite: &DummyCallSite, + kind: Kind::SPAN, + ); + + dispatcher.enabled(&metadata) + }; + + assert!(test_filter("afg", Level::INFO)); + assert!(test_filter("afg", Level::DEBUG)); + assert!(!test_filter("afg", Level::TRACE)); + + assert!(test_filter("sync", Level::TRACE)); + assert!(test_filter("client", Level::WARN)); + + assert!(test_filter("telemetry", Level::TRACE)); + assert!(test_filter("something-with-dash", Level::ERROR)); + }); + } + + const EXPECTED_LOG_MESSAGE: &'static str = "yeah logging works as expected"; + + #[test] + fn dash_in_target_name_works() { + let executable = env::current_exe().unwrap(); + let output = Command::new(executable) + .env("ENABLE_LOGGING", "1") + .args(&["--nocapture", "log_something_with_dash_target_name"]) + .output() + .unwrap(); + + let output = String::from_utf8(output.stderr).unwrap(); + assert!(output.contains(EXPECTED_LOG_MESSAGE)); + } + + /// This is no actual test, it will be used by the `dash_in_target_name_works` test. + /// The given test will call the test executable to only execute this test that + /// will only print `EXPECTED_LOG_MESSAGE` through logging while using a target + /// name that contains a dash. This ensures that targets names with dashes work. + #[test] + fn log_something_with_dash_target_name() { + if env::var("ENABLE_LOGGING").is_ok() { + let test_pattern = "test-target=info"; + init_logger(&test_pattern, Default::default(), Default::default()).unwrap(); + + log::info!(target: "test-target", "{}", EXPECTED_LOG_MESSAGE); + } + } +} diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index faaf2c2bd2..79e7a70024 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -92,11 +92,6 @@ pub struct NetworkParams { #[structopt(flatten)] pub node_key_params: NodeKeyParams, - /// Disable the yamux flow control. This option will be removed in the future once there is - /// enough confidence that this feature is properly working. - #[structopt(long)] - pub no_yamux_flow_control: bool, - /// Enable peer discovery on local networks. /// /// By default this option is true for `--dev` and false otherwise. @@ -158,7 +153,6 @@ impl NetworkParams { enable_mdns: !is_dev && !self.no_mdns, allow_private_ipv4: !self.no_private_ipv4, wasm_external_transport: None, - use_yamux_flow_control: !self.no_yamux_flow_control, }, max_parallel_downloads: self.max_parallel_downloads, allow_non_globals_in_dht: self.discover_local || is_dev, diff --git a/client/finality-grandpa/src/authorities.rs b/client/finality-grandpa/src/authorities.rs index 7a064d7a62..f263fcc101 100644 --- a/client/finality-grandpa/src/authorities.rs +++ b/client/finality-grandpa/src/authorities.rs @@ -32,14 +32,42 @@ use std::ops::Add; use std::sync::Arc; /// Error type returned on operations on the `AuthoritySet`. -#[derive(Debug, derive_more::Display, derive_more::From)] -pub enum Error { - #[display("Invalid authority set, either empty or with an authority weight set to 0.")] +#[derive(Debug, derive_more::Display)] +pub enum Error { + #[display(fmt = "Invalid authority set, either empty or with an authority weight set to 0.")] InvalidAuthoritySet, + #[display(fmt = "Client error during ancestry lookup: {}", _0)] + Client(E), + #[display(fmt = "Duplicate authority set change.")] + DuplicateAuthoritySetChange, + #[display(fmt = "Multiple pending forced authority set changes are not allowed.")] + MultiplePendingForcedAuthoritySetChanges, + #[display( + fmt = "A pending forced authority set change could not be applied since it must be applied after \ + the pending standard change at #{}", + _0 + )] + ForcedAuthoritySetChangeDependencyUnsatisfied(N), #[display(fmt = "Invalid operation in the pending changes tree: {}", _0)] ForkTree(fork_tree::Error), } +impl From> for Error { + fn from(err: fork_tree::Error) -> Error { + match err { + fork_tree::Error::Client(err) => Error::Client(err), + fork_tree::Error::Duplicate => Error::DuplicateAuthoritySetChange, + err => Error::ForkTree(err), + } + } +} + +impl From for Error { + fn from(err: E) -> Error { + Error::Client(err) + } +} + /// A shared authority set. pub struct SharedAuthoritySet { inner: Arc>>, @@ -111,14 +139,20 @@ pub(crate) struct AuthoritySet { /// a given branch pub(crate) pending_standard_changes: ForkTree>, /// Pending forced changes across different forks (at most one per fork). - /// Forced changes are enacted on block depth (not finality), for this reason - /// only one forced change should exist per fork. + /// Forced changes are enacted on block depth (not finality), for this + /// reason only one forced change should exist per fork. When trying to + /// apply forced changes we keep track of any pending standard changes that + /// they may depend on, this is done by making sure that any pending change + /// that is an ancestor of the forced changed and its effective block number + /// is lower than the last finalized block (as signaled in the forced + /// change) must be applied beforehand. pending_forced_changes: Vec>, } impl AuthoritySet -where H: PartialEq, - N: Ord, +where + H: PartialEq, + N: Ord, { // authority sets must be non-empty and all weights must be greater than 0 fn invalid_authority_list(authorities: &AuthorityList) -> bool { @@ -180,7 +214,7 @@ where &self, best_hash: &H, is_descendent_of: &F, - ) -> Result, fork_tree::Error> + ) -> Result, Error> where F: Fn(&H, &H) -> Result, E: std::error::Error, @@ -219,7 +253,8 @@ where &mut self, pending: PendingChange, is_descendent_of: &F, - ) -> Result<(), Error> where + ) -> Result<(), Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -250,16 +285,18 @@ where &mut self, pending: PendingChange, is_descendent_of: &F, - ) -> Result<(), Error> where + ) -> Result<(), Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { - for change in self.pending_forced_changes.iter() { - if change.canon_hash == pending.canon_hash || - is_descendent_of(&change.canon_hash, &pending.canon_hash) - .map_err(fork_tree::Error::Client)? - { - return Err(fork_tree::Error::UnfinalizedAncestor.into()); + for change in &self.pending_forced_changes { + if change.canon_hash == pending.canon_hash { + return Err(Error::DuplicateAuthoritySetChange); + } + + if is_descendent_of(&change.canon_hash, &pending.canon_hash)? { + return Err(Error::MultiplePendingForcedAuthoritySetChanges); } } @@ -293,7 +330,8 @@ where &mut self, pending: PendingChange, is_descendent_of: &F, - ) -> Result<(), Error> where + ) -> Result<(), Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -341,52 +379,92 @@ where /// /// These transitions are always forced and do not lead to justifications /// which light clients can follow. + /// + /// Forced changes can only be applied after all pending standard changes + /// that it depends on have been applied. If any pending standard change + /// exists that is an ancestor of a given forced changed and which effective + /// block number is lower than the last finalized block (as defined by the + /// forced change), then the forced change cannot be applied. An error will + /// be returned in that case which will prevent block import. pub(crate) fn apply_forced_changes( &self, best_hash: H, best_number: N, is_descendent_of: &F, initial_sync: bool, - ) -> Result, E> - where F: Fn(&H, &H) -> Result, + ) -> Result, Error> + where + F: Fn(&H, &H) -> Result, + E: std::error::Error, { let mut new_set = None; - for change in self.pending_forced_changes.iter() + for change in self + .pending_forced_changes + .iter() .take_while(|c| c.effective_number() <= best_number) // to prevent iterating too far .filter(|c| c.effective_number() == best_number) { // check if the given best block is in the same branch as // the block that signaled the change. if change.canon_hash == best_hash || is_descendent_of(&change.canon_hash, &best_hash)? { + let median_last_finalized = match change.delay_kind { + DelayKind::Best { + ref median_last_finalized, + } => median_last_finalized.clone(), + _ => unreachable!( + "pending_forced_changes only contains forced changes; forced changes have delay kind Best; qed." + ), + }; + + // check if there's any pending standard change that we depend on + for (_, _, standard_change) in self.pending_standard_changes.roots() { + if standard_change.effective_number() <= median_last_finalized + && is_descendent_of(&standard_change.canon_hash, &change.canon_hash)? + { + log::info!(target: "afg", + "Not applying authority set change forced at block #{:?}, due to pending standard change at block #{:?}", + change.canon_height, + standard_change.effective_number(), + ); + + return Err( + Error::ForcedAuthoritySetChangeDependencyUnsatisfied( + standard_change.effective_number() + ) + ); + } + } + // apply this change: make the set canonical - afg_log!(initial_sync, + afg_log!( + initial_sync, "👴 Applying authority set change forced at block #{:?}", change.canon_height, ); - telemetry!(CONSENSUS_INFO; "afg.applying_forced_authority_set_change"; + + telemetry!( + CONSENSUS_INFO; + "afg.applying_forced_authority_set_change"; "block" => ?change.canon_height ); - let median_last_finalized = match change.delay_kind { - DelayKind::Best { ref median_last_finalized } => median_last_finalized.clone(), - _ => unreachable!("pending_forced_changes only contains forced changes; forced changes have delay kind Best; qed."), - }; - - new_set = Some((median_last_finalized, AuthoritySet { - current_authorities: change.next_authorities.clone(), - set_id: self.set_id + 1, - pending_standard_changes: ForkTree::new(), // new set, new changes. - pending_forced_changes: Vec::new(), - })); + new_set = Some(( + median_last_finalized, + AuthoritySet { + current_authorities: change.next_authorities.clone(), + set_id: self.set_id + 1, + pending_standard_changes: ForkTree::new(), // new set, new changes. + pending_forced_changes: Vec::new(), + }, + )); break; } - - // we don't wipe forced changes until another change is - // applied } + // we don't wipe forced changes until another change is applied, hence + // why we return a new set instead of mutating. Ok(new_set) } @@ -406,7 +484,8 @@ where finalized_number: N, is_descendent_of: &F, initial_sync: bool, - ) -> Result, Error> where + ) -> Result, Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -429,12 +508,11 @@ where Vec::new(), ); - // we will keep all forced change for any later blocks and that are a - // descendent of the finalized block (i.e. they are from this fork). + // we will keep all forced changes for any later blocks and that are a + // descendent of the finalized block (i.e. they are part of this branch). for change in pending_forced_changes { if change.effective_number() > finalized_number && - is_descendent_of(&finalized_hash, &change.canon_hash) - .map_err(fork_tree::Error::Client)? + is_descendent_of(&finalized_hash, &change.canon_hash)? { self.pending_forced_changes.push(change) } @@ -479,7 +557,8 @@ where finalized_hash: H, finalized_number: N, is_descendent_of: &F, - ) -> Result, Error> where + ) -> Result, Error> + where F: Fn(&H, &H) -> Result, E: std::error::Error, { @@ -928,7 +1007,13 @@ mod tests { }; authorities.add_pending_change(change_a, &static_is_descendent_of(false)).unwrap(); - authorities.add_pending_change(change_b, &static_is_descendent_of(false)).unwrap(); + authorities.add_pending_change(change_b.clone(), &static_is_descendent_of(false)).unwrap(); + + // no duplicates are allowed + assert!(matches!( + authorities.add_pending_change(change_b, &static_is_descendent_of(false)), + Err(Error::DuplicateAuthoritySetChange) + )); // there's an effective change triggered at block 15 but not a standard one. // so this should do nothing. @@ -937,12 +1022,7 @@ mod tests { None, ); - // throw a standard change into the mix to prove that it's discarded - // for being on the same fork. - // - // NOTE: after https://github.com/paritytech/substrate/issues/1861 - // this should still be rejected based on the "span" rule -- it overlaps - // with another change on the same fork. + // there can only be one pending forced change per fork let change_c = PendingChange { next_authorities: set_b.clone(), delay: 3, @@ -951,37 +1031,45 @@ mod tests { delay_kind: DelayKind::Best { median_last_finalized: 0 }, }; - let is_descendent_of_a = is_descendent_of(|base: &&str, _| { - base.starts_with("hash_a") - }); + let is_descendent_of_a = is_descendent_of(|base: &&str, _| base.starts_with("hash_a")); - assert!(authorities.add_pending_change(change_c, &is_descendent_of_a).is_err()); + assert!(matches!( + authorities.add_pending_change(change_c, &is_descendent_of_a), + Err(Error::MultiplePendingForcedAuthoritySetChanges) + )); - // too early. + // let's try and apply the forced changes. + // too early and there's no forced changes to apply. assert!( - authorities.apply_forced_changes("hash_a10", 10, &static_is_descendent_of(true), false) + authorities + .apply_forced_changes("hash_a10", 10, &static_is_descendent_of(true), false) .unwrap() .is_none() ); // too late. assert!( - authorities.apply_forced_changes("hash_a16", 16, &static_is_descendent_of(true), false) + authorities + .apply_forced_changes("hash_a16", 16, &is_descendent_of_a, false) .unwrap() .is_none() ); - // on time -- chooses the right change. + // on time -- chooses the right change for this fork. assert_eq!( - authorities.apply_forced_changes("hash_a15", 15, &is_descendent_of_a, false) + authorities + .apply_forced_changes("hash_a15", 15, &is_descendent_of_a, false) .unwrap() .unwrap(), - (42, AuthoritySet { - current_authorities: set_a, - set_id: 1, - pending_standard_changes: ForkTree::new(), - pending_forced_changes: Vec::new(), - }), + ( + 42, + AuthoritySet { + current_authorities: set_a, + set_id: 1, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + }, + ) ); } @@ -1022,6 +1110,106 @@ mod tests { ); } + #[test] + fn forced_changes_blocked_by_standard_changes() { + let set_a = vec![(AuthorityId::from_slice(&[1; 32]), 1)]; + + let mut authorities = AuthoritySet { + current_authorities: set_a.clone(), + set_id: 0, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + }; + + // effective at #15 + let change_a = PendingChange { + next_authorities: set_a.clone(), + delay: 5, + canon_height: 10, + canon_hash: "hash_a", + delay_kind: DelayKind::Finalized, + }; + + // effective #20 + let change_b = PendingChange { + next_authorities: set_a.clone(), + delay: 0, + canon_height: 20, + canon_hash: "hash_b", + delay_kind: DelayKind::Finalized, + }; + + // effective at #35 + let change_c = PendingChange { + next_authorities: set_a.clone(), + delay: 5, + canon_height: 30, + canon_hash: "hash_c", + delay_kind: DelayKind::Finalized, + }; + + // add some pending standard changes all on the same fork + authorities.add_pending_change(change_a, &static_is_descendent_of(true)).unwrap(); + authorities.add_pending_change(change_b, &static_is_descendent_of(true)).unwrap(); + authorities.add_pending_change(change_c, &static_is_descendent_of(true)).unwrap(); + + // effective at #45 + let change_d = PendingChange { + next_authorities: set_a.clone(), + delay: 5, + canon_height: 40, + canon_hash: "hash_d", + delay_kind: DelayKind::Best { + median_last_finalized: 31, + }, + }; + + // now add a forced change on the same fork + authorities.add_pending_change(change_d, &static_is_descendent_of(true)).unwrap(); + + // the forced change cannot be applied since the pending changes it depends on + // have not been applied yet. + assert!(matches!( + authorities.apply_forced_changes("hash_d45", 45, &static_is_descendent_of(true), false), + Err(Error::ForcedAuthoritySetChangeDependencyUnsatisfied(15)) + )); + + // we apply the first pending standard change at #15 + authorities + .apply_standard_changes("hash_a15", 15, &static_is_descendent_of(true), false) + .unwrap(); + + // but the forced change still depends on the next standard change + assert!(matches!( + authorities.apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false), + Err(Error::ForcedAuthoritySetChangeDependencyUnsatisfied(20)) + )); + + // we apply the pending standard change at #20 + authorities + .apply_standard_changes("hash_b", 20, &static_is_descendent_of(true), false) + .unwrap(); + + // afterwards the forced change at #45 can already be applied since it signals + // that finality stalled at #31, and the next pending standard change is effective + // at #35. subsequent forced changes on the same branch must be kept + assert_eq!( + authorities + .apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false) + .unwrap() + .unwrap(), + ( + 31, + AuthoritySet { + current_authorities: set_a.clone(), + set_id: 3, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + } + ), + ); + } + #[test] fn next_change_works() { let current_authorities = vec![(AuthorityId::from_slice(&[1; 32]), 1)]; @@ -1278,26 +1466,11 @@ mod tests { add_pending_change(15, "C3", true); add_pending_change(20, "D", true); - println!( - "pending_changes: {:?}", - authorities - .pending_changes() - .map(|c| c.canon_hash) - .collect::>() - ); - // applying the standard change at A should not prune anything // other then the change that was applied authorities .apply_standard_changes("A", 5, &is_descendent_of, false) .unwrap(); - println!( - "pending_changes: {:?}", - authorities - .pending_changes() - .map(|c| c.canon_hash) - .collect::>() - ); assert_eq!(authorities.pending_changes().count(), 6); diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 53610650c0..36f877da9a 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -25,7 +25,7 @@ sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" } wasm-timer = "0.2" [dev-dependencies] -async-std = "1.6.2" +async-std = "1.6.5" quickcheck = "0.9.0" rand = "0.7.2" substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" } diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 76d8f0a23e..af0e2a2dc1 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -18,7 +18,7 @@ prost-build = "0.6.1" [dependencies] async-trait = "0.1" -async-std = { version = "1.6.2", features = ["unstable"] } +async-std = "1.6.5" bitflags = "1.2.0" bs58 = "0.3.1" bytes = "0.5.0" diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 7445ea0534..c144bebb5f 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -451,7 +451,6 @@ impl NetworkConfiguration { enable_mdns: false, allow_private_ipv4: true, wasm_external_transport: None, - use_yamux_flow_control: false, }, max_parallel_downloads: 5, allow_non_globals_in_dht: false, @@ -519,8 +518,6 @@ pub enum TransportConfig { /// This parameter exists whatever the target platform is, but it is expected to be set to /// `Some` only when compiling for WASM. wasm_external_transport: Option, - /// Use flow control for yamux streams if set to true. - use_yamux_flow_control: bool, }, /// Only allow connections within the same process. diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 0650e7a2f8..9d20229288 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -49,15 +49,15 @@ use crate::{ExHashT, NetworkService}; -use async_std::sync::{Condvar, Mutex, MutexGuard}; +use async_std::sync::{Mutex, MutexGuard}; use futures::prelude::*; +use futures::channel::mpsc::{channel, Receiver, Sender}; use libp2p::PeerId; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{ collections::VecDeque, fmt, - sync::{atomic, Arc}, - time::Duration, + sync::Arc, }; #[cfg(test)] @@ -65,8 +65,12 @@ mod tests; /// Notifications sender for a specific combination of network service, peer, and protocol. pub struct QueuedSender { - /// Shared between the front and the back task. - shared: Arc>, + /// Shared between the user-facing [`QueuedSender`] and the background future. + shared_message_queue: SharedMessageQueue, + /// Used to notify the background future to check for new messages in the message queue. + notify_background_future: Sender<()>, + /// Maximum number of elements in [`QueuedSender::shared_message_queue`]. + queue_size_limit: usize, } impl QueuedSender { @@ -88,39 +92,45 @@ impl QueuedSender { H: ExHashT, F: Fn(M) -> Vec + Send + 'static, { - let shared = Arc::new(Shared { - stop_task: atomic::AtomicBool::new(false), - condvar: Condvar::new(), - queue_size_limit, - messages_queue: Mutex::new(VecDeque::with_capacity(queue_size_limit)), - }); + let (notify_background_future, wait_for_sender) = channel(0); + + let shared_message_queue = Arc::new(Mutex::new( + VecDeque::with_capacity(queue_size_limit), + )); - let task = spawn_task( + let background_future = create_background_future( + wait_for_sender, service, peer_id, protocol, - shared.clone(), + shared_message_queue.clone(), messages_encode ); - (QueuedSender { shared }, task) + let sender = QueuedSender { + shared_message_queue, + notify_background_future, + queue_size_limit, + }; + + (sender, background_future) } /// Locks the queue of messages towards this peer. /// /// The returned `Future` is expected to be ready quite quickly. - pub async fn lock_queue<'a>(&'a self) -> QueueGuard<'a, M> { + pub async fn lock_queue<'a>(&'a mut self) -> QueueGuard<'a, M> { QueueGuard { - messages_queue: self.shared.messages_queue.lock().await, - condvar: &self.shared.condvar, - queue_size_limit: self.shared.queue_size_limit, + message_queue: self.shared_message_queue.lock().await, + queue_size_limit: self.queue_size_limit, + notify_background_future: &mut self.notify_background_future, } } /// Pushes a message to the queue, or discards it if the queue is full. /// /// The returned `Future` is expected to be ready quite quickly. - pub async fn queue_or_discard(&self, message: M) + pub async fn queue_or_discard(&mut self, message: M) where M: Send + 'static { @@ -134,28 +144,17 @@ impl fmt::Debug for QueuedSender { } } -impl Drop for QueuedSender { - fn drop(&mut self) { - // The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, - // then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` - // being asynchronous, it can't reasonably be locked from within a destructor. - // See also the corresponding code in the background task. - self.shared.stop_task.store(true, atomic::Ordering::Release); - self.shared.condvar.notify_all(); - } -} - /// Locked queue of messages to the given peer. /// -/// As long as this struct exists, the background task is asleep and the owner of the [`QueueGuard`] -/// is in total control of the buffer. Messages can only ever be sent out after the [`QueueGuard`] -/// is dropped. +/// As long as this struct exists, the background future is asleep and the owner of the +/// [`QueueGuard`] is in total control of the message queue. Messages can only ever be sent out on +/// the network after the [`QueueGuard`] is dropped. #[must_use] pub struct QueueGuard<'a, M> { - messages_queue: MutexGuard<'a, VecDeque>, - condvar: &'a Condvar, - /// Same as [`Shared::queue_size_limit`]. + message_queue: MutexGuard<'a, MessageQueue>, + /// Same as [`QueuedSender::queue_size_limit`]. queue_size_limit: usize, + notify_background_future: &'a mut Sender<()>, } impl<'a, M: Send + 'static> QueueGuard<'a, M> { @@ -163,8 +162,8 @@ impl<'a, M: Send + 'static> QueueGuard<'a, M> { /// /// The message will only start being sent out after the [`QueueGuard`] is dropped. pub fn push_or_discard(&mut self, message: M) { - if self.messages_queue.len() < self.queue_size_limit { - self.messages_queue.push_back(message); + if self.message_queue.len() < self.queue_size_limit { + self.message_queue.push_back(message); } } @@ -174,72 +173,56 @@ impl<'a, M: Send + 'static> QueueGuard<'a, M> { /// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be /// > better) because the underlying implementation relies on `VecDeque::retain`. pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) { - self.messages_queue.retain(filter); + self.message_queue.retain(filter); } } impl<'a, M> Drop for QueueGuard<'a, M> { fn drop(&mut self) { - // We notify the `Condvar` in the destructor in order to be able to push multiple - // messages and wake up the background task only once afterwards. - self.condvar.notify_one(); + // Notify background future to check for new messages in the message queue. + let _ = self.notify_background_future.try_send(()); } } -#[derive(Debug)] -struct Shared { - /// Read by the background task after locking `locked`. If true, the task stops. - stop_task: atomic::AtomicBool, - /// Queue of messages waiting to be sent out. - messages_queue: Mutex>, - /// Must be notified every time the content of `locked` changes. - condvar: Condvar, - /// Maximum number of elements in `messages_queue`. - queue_size_limit: usize, -} +type MessageQueue = VecDeque; + +/// [`MessageQueue`] shared between [`QueuedSender`] and background future. +type SharedMessageQueue = Arc>>; -async fn spawn_task Vec>( +async fn create_background_future Vec>( + mut wait_for_sender: Receiver<()>, service: Arc>, peer_id: PeerId, protocol: ConsensusEngineId, - shared: Arc>, + shared_message_queue: SharedMessageQueue, messages_encode: F, ) { loop { - let next_message = 'next_msg: loop { - let mut queue = shared.messages_queue.lock().await; - - loop { - if shared.stop_task.load(atomic::Ordering::Acquire) { - return; - } - - if let Some(msg) = queue.pop_front() { - break 'next_msg msg; - } - - // It is possible that the destructor of `QueuedSender` sets `stop_task` to - // true and notifies the `Condvar` after the background task loads `stop_task` - // and before it calls `Condvar::wait`. - // See also the corresponding comment in `QueuedSender::drop`. - // For this reason, we use `wait_timeout`. In the worst case scenario, - // `stop_task` will always be checked again after the timeout is reached. - queue = shared.condvar.wait_timeout(queue, Duration::from_secs(10)).await.0; - } - }; - - // Starting from below, we try to send the message. If an error happens when sending, - // the only sane option we have is to silently discard the message. - let sender = match service.notification_sender(peer_id.clone(), protocol) { - Ok(s) => s, - Err(_) => continue, - }; - - let ready = match sender.ready().await { - Ok(r) => r, - Err(_) => continue, - }; + if wait_for_sender.next().await.is_none() { + return + } - let _ = ready.send(messages_encode(next_message)); + loop { + let mut queue_guard = shared_message_queue.lock().await; + let next_message = match queue_guard.pop_front() { + Some(msg) => msg, + None => break, + }; + drop(queue_guard); + + // Starting from below, we try to send the message. If an error happens when sending, + // the only sane option we have is to silently discard the message. + let sender = match service.notification_sender(peer_id.clone(), protocol) { + Ok(s) => s, + Err(_) => continue, + }; + + let ready = match sender.ready().await { + Ok(r) => r, + Err(_) => continue, + }; + + let _ = ready.send(messages_encode(next_message)); + } } } diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs index 9ba44f564e..0f01ed81bf 100644 --- a/client/network/src/gossip/tests.rs +++ b/client/network/src/gossip/tests.rs @@ -180,7 +180,7 @@ fn basic_works() { }); async_std::task::block_on(async move { - let (sender, bg_future) = + let (mut sender, bg_future) = QueuedSender::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg); async_std::task::spawn(bg_future); diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index bfd8c4fe21..303f40c582 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -1270,8 +1270,13 @@ impl ChainSync { self.pending_requests.set_all(); } - /// Restart the sync process. - fn restart<'a>(&'a mut self) -> impl Iterator), BadPeer>> + 'a { + /// Restart the sync process. This will reset all pending block requests and return an iterator + /// of new block requests to make to peers. Peers that were downloading finality data (i.e. + /// their state was `DownloadingJustification` or `DownloadingFinalityProof`) are unaffected and + /// will stay in the same state. + fn restart<'a>( + &'a mut self, + ) -> impl Iterator), BadPeer>> + 'a { self.blocks.clear(); let info = self.client.info(); self.best_queued_hash = info.best_hash; @@ -1279,11 +1284,24 @@ impl ChainSync { self.pending_requests.set_all(); debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); let old_peers = std::mem::take(&mut self.peers); + old_peers.into_iter().filter_map(move |(id, p)| { + // peers that were downloading justifications or finality proofs + // should be kept in that state. + match p.state { + PeerSyncState::DownloadingJustification(_) + | PeerSyncState::DownloadingFinalityProof(_) => { + self.peers.insert(id, p); + return None; + } + _ => {} + } + + // handle peers that were in other states. match self.new_peer(id.clone(), p.best_hash, p.best_number) { Ok(None) => None, Ok(Some(x)) => Some(Ok((id, x))), - Err(e) => Some(Err(e)) + Err(e) => Some(Err(e)), } }) } @@ -1552,15 +1570,15 @@ fn validate_blocks(blocks: &Vec>, who: #[cfg(test)] mod test { - use super::*; use super::message::FromBlock; - use substrate_test_runtime_client::{ - runtime::Block, - DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, - }; - use sp_blockchain::HeaderBackend; + use super::*; use sc_block_builder::BlockBuilderProvider; + use sp_blockchain::HeaderBackend; use sp_consensus::block_validation::DefaultBlockAnnounceValidator; + use substrate_test_runtime_client::{ + runtime::{Block, Hash}, + ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt, + }; #[test] fn processes_empty_response_on_justification_request_for_unknown_block() { @@ -1639,4 +1657,99 @@ mod test { }) ); } + + #[test] + fn restart_doesnt_affect_peers_downloading_finality_data() { + let mut client = Arc::new(TestClientBuilder::new().build()); + let info = client.info(); + + let mut sync = ChainSync::new( + Roles::AUTHORITY, + client.clone(), + &info, + None, + Box::new(DefaultBlockAnnounceValidator), + 1, + ); + + let peer_id1 = PeerId::random(); + let peer_id2 = PeerId::random(); + let peer_id3 = PeerId::random(); + let peer_id4 = PeerId::random(); + + let mut new_blocks = |n| { + for _ in 0..n { + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).unwrap(); + } + + let info = client.info(); + (info.best_hash, info.best_number) + }; + + let (b1_hash, b1_number) = new_blocks(50); + let (b2_hash, b2_number) = new_blocks(10); + + // add 2 peers at blocks that we don't have locally + sync.new_peer(peer_id1.clone(), Hash::random(), 42).unwrap(); + sync.new_peer(peer_id2.clone(), Hash::random(), 10).unwrap(); + + // we wil send block requests to these peers + // for these blocks we don't know about + assert!(sync.block_requests().all(|(p, _)| { *p == peer_id1 || *p == peer_id2 })); + + // add a new peer at a known block + sync.new_peer(peer_id3.clone(), b1_hash, b1_number).unwrap(); + + // we request a justification for a block we have locally + sync.request_justification(&b1_hash, b1_number); + + // the justification request should be scheduled to the + // new peer which is at the given block + assert!(sync.justification_requests().any(|(p, r)| { + p == peer_id3 + && r.fields == BlockAttributes::JUSTIFICATION + && r.from == message::FromBlock::Hash(b1_hash) + && r.to == None + })); + + assert_eq!( + sync.peers.get(&peer_id3).unwrap().state, + PeerSyncState::DownloadingJustification(b1_hash), + ); + + // add another peer at a known later block + sync.new_peer(peer_id4.clone(), b2_hash, b2_number).unwrap(); + + // we request a finality proof for a block we have locally + sync.request_finality_proof(&b2_hash, b2_number); + + // the finality proof request should be scheduled to peer 4 + // which is at that block + assert!( + sync.finality_proof_requests().any(|(p, r)| { p == peer_id4 && r.block == b2_hash }) + ); + + assert_eq!( + sync.peers.get(&peer_id4).unwrap().state, + PeerSyncState::DownloadingFinalityProof(b2_hash), + ); + + // we restart the sync state + let block_requests = sync.restart(); + + // which should make us send out block requests to the first two peers + assert!(block_requests.map(|r| r.unwrap()).all(|(p, _)| { p == peer_id1 || p == peer_id2 })); + + // peer 3 and 4 should be unaffected as they were downloading finality data + assert_eq!( + sync.peers.get(&peer_id3).unwrap().state, + PeerSyncState::DownloadingJustification(b1_hash), + ); + + assert_eq!( + sync.peers.get(&peer_id4).unwrap().state, + PeerSyncState::DownloadingFinalityProof(b2_hash), + ); + } } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 59f55f01a4..f1ee94bf85 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -336,12 +336,12 @@ impl NetworkWorker { behaviour.register_notifications_protocol(*engine_id, protocol_name.clone()); } let (transport, bandwidth) = { - let (config_mem, config_wasm, flowctrl) = match params.network_config.transport { - TransportConfig::MemoryOnly => (true, None, false), - TransportConfig::Normal { wasm_external_transport, use_yamux_flow_control, .. } => - (false, wasm_external_transport, use_yamux_flow_control) + let (config_mem, config_wasm) = match params.network_config.transport { + TransportConfig::MemoryOnly => (true, None), + TransportConfig::Normal { wasm_external_transport, .. } => + (false, wasm_external_transport) }; - transport::build_transport(local_identity, config_mem, config_wasm, flowctrl) + transport::build_transport(local_identity, config_mem, config_wasm) }; let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) .peer_connection_limit(crate::MAX_CONNECTIONS_PER_PEER) @@ -662,6 +662,11 @@ impl NetworkService { if let Some(protocol_name) = protocol_name { sink.send_sync_notification(protocol_name, message); } else { + log::error!( + target: "sub-libp2p", + "Attempted to send notification on unknown protocol: {:?}", + engine_id, + ); return; } diff --git a/client/network/src/transport.rs b/client/network/src/transport.rs index 626f84b6b5..b64e7d5792 100644 --- a/client/network/src/transport.rs +++ b/client/network/src/transport.rs @@ -41,7 +41,6 @@ pub fn build_transport( keypair: identity::Keypair, memory_only: bool, wasm_external_transport: Option, - use_yamux_flow_control: bool ) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc) { // Build the base layer of the transport. let transport = if let Some(t) = wasm_external_transport { @@ -110,12 +109,9 @@ pub fn build_transport( mplex_config.max_buffer_len(usize::MAX); let mut yamux_config = libp2p::yamux::Config::default(); - - if use_yamux_flow_control { - // Enable proper flow-control: window updates are only sent when - // buffered data has been consumed. - yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::OnRead); - } + // Enable proper flow-control: window updates are only sent when + // buffered data has been consumed. + yamux_config.set_window_update_mode(libp2p::yamux::WindowUpdateMode::OnRead); core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) .map_inbound(move |muxer| core::muxing::StreamMuxerBox::new(muxer)) diff --git a/client/rpc-api/src/state/error.rs b/client/rpc-api/src/state/error.rs index 2fcca3c343..1c22788062 100644 --- a/client/rpc-api/src/state/error.rs +++ b/client/rpc-api/src/state/error.rs @@ -51,6 +51,8 @@ pub enum Error { /// Maximum allowed value max: u32, }, + /// Call to an unsafe RPC was denied. + UnsafeRpcCalled(crate::policy::UnsafeRpcError), } impl std::error::Error for Error { diff --git a/client/rpc/src/state/mod.rs b/client/rpc/src/state/mod.rs index 01c7c5f1eb..8573b3cf82 100644 --- a/client/rpc/src/state/mod.rs +++ b/client/rpc/src/state/mod.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager}; use rpc::{Result as RpcResult, futures::{Future, future::result}}; -use sc_rpc_api::state::ReadProof; +use sc_rpc_api::{DenyUnsafe, state::ReadProof}; use sc_client_api::light::{RemoteBlockchain, Fetcher}; use sp_core::{Bytes, storage::{StorageKey, PrefixedStorageKey, StorageData, StorageChangeSet}}; use sp_version::RuntimeVersion; @@ -171,6 +171,7 @@ pub trait StateBackend: Send + Sync + 'static pub fn new_full( client: Arc, subscriptions: SubscriptionManager, + deny_unsafe: DenyUnsafe, ) -> (State, ChildState) where Block: BlockT + 'static, @@ -185,7 +186,7 @@ pub fn new_full( self::state_full::FullState::new(client.clone(), subscriptions.clone()) ); let backend = Box::new(self::state_full::FullState::new(client, subscriptions)); - (State { backend }, ChildState { backend: child_backend }) + (State { backend, deny_unsafe }, ChildState { backend: child_backend }) } /// Create new state API that works on light node. @@ -194,6 +195,7 @@ pub fn new_light>( subscriptions: SubscriptionManager, remote_blockchain: Arc>, fetcher: Arc, + deny_unsafe: DenyUnsafe, ) -> (State, ChildState) where Block: BlockT + 'static, @@ -217,12 +219,14 @@ pub fn new_light>( remote_blockchain, fetcher, )); - (State { backend }, ChildState { backend: child_backend }) + (State { backend, deny_unsafe }, ChildState { backend: child_backend }) } /// State API with subscriptions support. pub struct State { backend: Box>, + /// Whether to deny unsafe calls + deny_unsafe: DenyUnsafe, } impl StateApi for State @@ -249,6 +253,10 @@ impl StateApi for State key_prefix: StorageKey, block: Option, ) -> FutureResult> { + if let Err(err) = self.deny_unsafe.check_if_safe() { + return Box::new(result(Err(err.into()))) + } + self.backend.storage_pairs(block, key_prefix) } @@ -292,6 +300,10 @@ impl StateApi for State from: Block::Hash, to: Option ) -> FutureResult>> { + if let Err(err) = self.deny_unsafe.check_if_safe() { + return Box::new(result(Err(err.into()))) + } + self.backend.query_storage(from, to, keys) } diff --git a/client/rpc/src/state/tests.rs b/client/rpc/src/state/tests.rs index b6677a1f2f..d145ac5e55 100644 --- a/client/rpc/src/state/tests.rs +++ b/client/rpc/src/state/tests.rs @@ -32,6 +32,7 @@ use substrate_test_runtime_client::{ sp_consensus::BlockOrigin, runtime, }; +use sc_rpc_api::DenyUnsafe; use sp_runtime::generic::BlockId; use crate::testing::TaskExecutor; use futures::{executor, compat::Future01CompatExt}; @@ -58,7 +59,11 @@ fn should_return_storage() { .add_extra_storage(b":map:acc2".to_vec(), vec![1, 2, 3]) .build(); let genesis_hash = client.genesis_hash(); - let (client, child) = new_full(Arc::new(client), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (client, child) = new_full( + Arc::new(client), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let key = StorageKey(KEY.to_vec()); assert_eq!( @@ -96,7 +101,11 @@ fn should_return_child_storage() { .add_child_storage(&child_info, "key", vec![42_u8]) .build()); let genesis_hash = client.genesis_hash(); - let (_client, child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor))); + let (_client, child) = new_full( + client, + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let child_key = prefixed_storage_key(); let key = StorageKey(b"key".to_vec()); @@ -131,7 +140,11 @@ fn should_return_child_storage() { fn should_call_contract() { let client = Arc::new(substrate_test_runtime_client::new()); let genesis_hash = client.genesis_hash(); - let (client, _child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor))); + let (client, _child) = new_full( + client, + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); assert_matches!( client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()).wait(), @@ -145,7 +158,11 @@ fn should_notify_about_storage_changes() { { let mut client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); api.subscribe_storage(Default::default(), subscriber, None.into()); @@ -179,7 +196,11 @@ fn should_send_initial_storage_changes_and_notifications() { { let mut client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into())); @@ -217,7 +238,11 @@ fn should_send_initial_storage_changes_and_notifications() { #[test] fn should_query_storage() { fn run_tests(mut client: Arc, has_changes_trie_config: bool) { - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let mut add_block = |nonce| { let mut builder = client.new_block(Default::default()).unwrap(); @@ -434,7 +459,11 @@ fn should_split_ranges() { #[test] fn should_return_runtime_version() { let client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\ \"specVersion\":2,\"implVersion\":2,\"apis\":[[\"0xdf6acb689907609b\",3],\ @@ -457,7 +486,11 @@ fn should_notify_on_runtime_version_initially() { { let client = Arc::new(substrate_test_runtime_client::new()); - let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor))); + let (api, _child) = new_full( + client.clone(), + SubscriptionManager::new(Arc::new(TaskExecutor)), + DenyUnsafe::No, + ); api.subscribe_runtime_version(Default::default(), subscriber); diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index f57a145422..2288412474 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -88,4 +88,4 @@ sp-consensus-babe = { version = "0.8.0", path = "../../primitives/consensus/babe grandpa = { version = "0.8.0", package = "sc-finality-grandpa", path = "../finality-grandpa" } grandpa-primitives = { version = "2.0.0", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } tokio = { version = "0.2", default-features = false } -async-std = { version = "1.6", default-features = false } +async-std = { version = "1.6.5", default-features = false } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 410198af26..e0661f7f62 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -720,13 +720,18 @@ fn gen_handler( subscriptions.clone(), remote_blockchain.clone(), on_demand, + deny_unsafe, ); (chain, state, child_state) } else { // Full nodes let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone()); - let (state, child_state) = sc_rpc::state::new_full(client.clone(), subscriptions.clone()); + let (state, child_state) = sc_rpc::state::new_full( + client.clone(), + subscriptions.clone(), + deny_unsafe, + ); (chain, state, child_state) }; diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index cfe815f174..ccc1bb71f9 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -225,7 +225,6 @@ fn node_config { - /// Channel to send messages to the background task. - sender: TracingUnboundedSender>, + /// Channel to send finality work messages to the background task. + finality_sender: TracingUnboundedSender>, + /// Channel to send block import messages to the background task. + block_import_sender: TracingUnboundedSender>, /// Results coming from the worker task. result_port: BufferedLinkReceiver, _phantom: PhantomData, @@ -46,7 +48,8 @@ pub struct BasicQueue { impl Drop for BasicQueue { fn drop(&mut self) { // Flush the queue and close the receiver to terminate the future. - self.sender.close_channel(); + self.finality_sender.close_channel(); + self.block_import_sender.close_channel(); self.result_port.close(); } } @@ -65,12 +68,16 @@ impl BasicQueue { prometheus_registry: Option<&Registry>, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); - let metrics = prometheus_registry.and_then(|r| + + let metrics = prometheus_registry.and_then(|r| { Metrics::register(r) - .map_err(|err| { log::warn!("Failed to register Prometheus metrics: {}", err); }) - .ok() - ); - let (future, worker_sender) = BlockImportWorker::new( + .map_err(|err| { + log::warn!("Failed to register Prometheus metrics: {}", err); + }) + .ok() + }); + + let (future, finality_sender, block_import_sender) = BlockImportWorker::new( result_sender, verifier, block_import, @@ -82,7 +89,8 @@ impl BasicQueue { spawner.spawn_blocking("basic-block-import-worker", future.boxed()); Self { - sender: worker_sender, + finality_sender, + block_import_sender, result_port, _phantom: PhantomData, } @@ -96,7 +104,9 @@ impl ImportQueue for BasicQueue } trace!(target: "sync", "Scheduling {} blocks for import", blocks.len()); - let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks)); + let res = + self.block_import_sender.unbounded_send(worker_messages::ImportBlocks(origin, blocks)); + if res.is_err() { log::error!( target: "sync", @@ -110,12 +120,12 @@ impl ImportQueue for BasicQueue who: Origin, hash: B::Hash, number: NumberFor, - justification: Justification + justification: Justification, ) { - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportJustification(who, hash, number, justification) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportJustification(who, hash, number, justification), + ); + if res.is_err() { log::error!( target: "sync", @@ -132,10 +142,10 @@ impl ImportQueue for BasicQueue finality_proof: Vec, ) { trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash); - let res = self.sender - .unbounded_send( - ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof) - ); + let res = self.finality_sender.unbounded_send( + worker_messages::Finality::ImportFinalityProof(who, hash, number, finality_proof), + ); + if res.is_err() { log::error!( target: "sync", @@ -151,12 +161,16 @@ impl ImportQueue for BasicQueue } } -/// Message destinated to the background worker. -#[derive(Debug)] -enum ToWorkerMsg { - ImportBlocks(BlockOrigin, Vec>), - ImportJustification(Origin, B::Hash, NumberFor, Justification), - ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), +/// Messages destinated to the background worker. +mod worker_messages { + use super::*; + + pub struct ImportBlocks(pub BlockOrigin, pub Vec>); + + pub enum Finality { + ImportJustification(Origin, B::Hash, NumberFor, Justification), + ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), + } } struct BlockImportWorker { @@ -176,8 +190,18 @@ impl BlockImportWorker { justification_import: Option>, finality_proof_import: Option>, metrics: Option, - ) -> (impl Future + Send, TracingUnboundedSender>) { - let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker"); + ) -> ( + impl Future + Send, + TracingUnboundedSender>, + TracingUnboundedSender>, + ) { + use worker_messages::*; + + let (finality_sender, mut finality_port) = + tracing_unbounded("mpsc_import_queue_worker_finality"); + + let (block_import_sender, mut block_import_port) = + tracing_unbounded("mpsc_import_queue_worker_blocks"); let mut worker = BlockImportWorker { result_sender, @@ -206,6 +230,8 @@ impl BlockImportWorker { // `Future`, and `block_import` is `None`. // - Something else, in which case `block_import` is `Some` and `importing` is None. // + // Additionally, the task will prioritize processing of finality work messages over + // block import messages, hence why two distinct channels are used. let mut block_import_verifier = Some((block_import, verifier)); let mut importing = None; @@ -217,7 +243,30 @@ impl BlockImportWorker { return Poll::Ready(()) } - // If we are in the process of importing a bunch of block, let's resume this + // Grab the next finality action request sent to the import queue. + let finality_work = match Stream::poll_next(Pin::new(&mut finality_port), cx) { + Poll::Ready(Some(msg)) => Some(msg), + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => None, + }; + + match finality_work { + Some(Finality::ImportFinalityProof(who, hash, number, proof)) => { + let (_, verif) = block_import_verifier + .as_mut() + .expect("block_import_verifier is always Some; qed"); + + worker.import_finality_proof(verif, who, hash, number, proof); + continue; + } + Some(Finality::ImportJustification(who, hash, number, justification)) => { + worker.import_justification(who, hash, number, justification); + continue; + } + None => {} + } + + // If we are in the process of importing a bunch of blocks, let's resume this // process before doing anything more. if let Some(imp_fut) = importing.as_mut() { match Future::poll(Pin::new(imp_fut), cx) { @@ -232,34 +281,25 @@ impl BlockImportWorker { debug_assert!(importing.is_none()); debug_assert!(block_import_verifier.is_some()); - // Grab the next action request sent to the import queue. - let msg = match Stream::poll_next(Pin::new(&mut port), cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, - }; + // Grab the next block import request sent to the import queue. + let ImportBlocks(origin, blocks) = + match Stream::poll_next(Pin::new(&mut block_import_port), cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + }; - match msg { - ToWorkerMsg::ImportBlocks(origin, blocks) => { - // On blocks import request, we merely *start* the process and store - // a `Future` into `importing`. - let (bi, verif) = block_import_verifier.take() - .expect("block_import_verifier is always Some; qed"); - importing = Some(worker.import_batch(bi, verif, origin, blocks)); - }, - ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { - let (_, verif) = block_import_verifier.as_mut() - .expect("block_import_verifier is always Some; qed"); - worker.import_finality_proof(verif, who, hash, number, proof); - }, - ToWorkerMsg::ImportJustification(who, hash, number, justification) => { - worker.import_justification(who, hash, number, justification); - } - } + // On blocks import request, we merely *start* the process and store + // a `Future` into `importing`. + let (block_import, verifier) = block_import_verifier + .take() + .expect("block_import_verifier is always Some; qed"); + + importing = Some(worker.import_batch(block_import, verifier, origin, blocks)); } }); - (future, sender) + (future, finality_sender, block_import_sender) } /// Returns a `Future` that imports the given blocks and sends the results on @@ -353,12 +393,11 @@ fn import_many_blocks, Transaction>( Output = ( usize, usize, - Vec<(Result>, BlockImportError>, B::Hash,)>, + Vec<(Result>, BlockImportError>, B::Hash)>, BoxBlockImport, - V - ) -> -{ + V, + ), +> { let count = blocks.len(); let blocks_range = match ( @@ -451,3 +490,187 @@ fn import_many_blocks, Transaction>( Poll::Pending }) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + import_queue::{CacheKeyId, Verifier}, + BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport, + }; + use futures::{executor::block_on, Future}; + use sp_test_primitives::{Block, BlockNumber, Extrinsic, Hash, Header}; + use std::collections::HashMap; + + impl Verifier for () { + fn verify( + &mut self, + origin: BlockOrigin, + header: Header, + _justification: Option, + _body: Option>, + ) -> Result<(BlockImportParams, Option)>>), String> { + Ok((BlockImportParams::new(origin, header), None)) + } + } + + impl BlockImport for () { + type Error = crate::Error; + type Transaction = Extrinsic; + + fn check_block( + &mut self, + _block: BlockCheckParams, + ) -> Result { + Ok(ImportResult::imported(false)) + } + + fn import_block( + &mut self, + _block: BlockImportParams, + _cache: HashMap>, + ) -> Result { + Ok(ImportResult::imported(true)) + } + } + + impl JustificationImport for () { + type Error = crate::Error; + + fn import_justification( + &mut self, + _hash: Hash, + _number: BlockNumber, + _justification: Justification, + ) -> Result<(), Self::Error> { + Ok(()) + } + } + + #[derive(Debug, PartialEq)] + enum Event { + JustificationImported(Hash), + BlockImported(Hash), + } + + #[derive(Default)] + struct TestLink { + events: Vec, + } + + impl Link for TestLink { + fn blocks_processed( + &mut self, + _imported: usize, + _count: usize, + results: Vec<(Result, BlockImportError>, Hash)>, + ) { + if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) { + self.events.push(Event::BlockImported(hash)); + } + } + + fn justification_imported( + &mut self, + _who: Origin, + hash: &Hash, + _number: BlockNumber, + _success: bool, + ) { + self.events.push(Event::JustificationImported(hash.clone())) + } + } + + #[test] + fn prioritizes_finality_work_over_block_import() { + let (result_sender, mut result_port) = buffered_link::buffered_link(); + + let (mut worker, mut finality_sender, mut block_import_sender) = + BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None, None); + + let mut import_block = |n| { + let header = Header { + parent_hash: Hash::random(), + number: n, + extrinsics_root: Hash::random(), + state_root: Default::default(), + digest: Default::default(), + }; + + let hash = header.hash(); + + block_on(block_import_sender.send(worker_messages::ImportBlocks( + BlockOrigin::Own, + vec![IncomingBlock { + hash, + header: Some(header), + body: None, + justification: None, + origin: None, + allow_missing_state: false, + import_existing: false, + }], + ))) + .unwrap(); + + hash + }; + + let mut import_justification = || { + let hash = Hash::random(); + + block_on(finality_sender.send(worker_messages::Finality::ImportJustification( + libp2p::PeerId::random(), + hash, + 1, + Vec::new(), + ))) + .unwrap(); + + hash + }; + + let mut link = TestLink::default(); + + // we send a bunch of tasks to the worker + let block1 = import_block(1); + let block2 = import_block(2); + let block3 = import_block(3); + let justification1 = import_justification(); + let justification2 = import_justification(); + let block4 = import_block(4); + let block5 = import_block(5); + let block6 = import_block(6); + let justification3 = import_justification(); + + // we poll the worker until we have processed 9 events + block_on(futures::future::poll_fn(|cx| { + while link.events.len() < 9 { + match Future::poll(Pin::new(&mut worker), cx) { + Poll::Pending => {} + Poll::Ready(()) => panic!("import queue worker should not conclude."), + } + + result_port.poll_actions(cx, &mut link).unwrap(); + } + + Poll::Ready(()) + })); + + // all justification tasks must be done before any block import work + assert_eq!( + link.events, + vec![ + Event::JustificationImported(justification1), + Event::JustificationImported(justification2), + Event::JustificationImported(justification3), + Event::BlockImported(block1), + Event::BlockImported(block2), + Event::BlockImported(block3), + Event::BlockImported(block4), + Event::BlockImported(block5), + Event::BlockImported(block6), + ] + ); + } +} diff --git a/primitives/io/Cargo.toml b/primitives/io/Cargo.toml index 70a78f99d5..d2dd0a7c64 100644 --- a/primitives/io/Cargo.toml +++ b/primitives/io/Cargo.toml @@ -30,7 +30,7 @@ log = { version = "0.4.8", optional = true } futures = { version = "0.3.1", features = ["thread-pool"], optional = true } parking_lot = { version = "0.10.0", optional = true } tracing = { version = "0.1.19", default-features = false } -tracing-core = { version = "0.1.15", default-features = false} +tracing-core = { version = "0.1.17", default-features = false} [features] default = ["std"] diff --git a/primitives/runtime-interface/test/Cargo.toml b/primitives/runtime-interface/test/Cargo.toml index 0fd61f7bce..d802f9cb6b 100644 --- a/primitives/runtime-interface/test/Cargo.toml +++ b/primitives/runtime-interface/test/Cargo.toml @@ -21,4 +21,4 @@ sp-runtime = { version = "2.0.0", path = "../../runtime" } sp-core = { version = "2.0.0", path = "../../core" } sp-io = { version = "2.0.0", path = "../../io" } tracing = "0.1.19" -tracing-core = "0.1.15" +tracing-core = "0.1.17" diff --git a/primitives/tracing/Cargo.toml b/primitives/tracing/Cargo.toml index a1d89af58c..1000952b39 100644 --- a/primitives/tracing/Cargo.toml +++ b/primitives/tracing/Cargo.toml @@ -20,23 +20,23 @@ targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"] [dependencies] sp-std = { version = "2.0.0", path = "../std", default-features = false} codec = { version = "1.3.1", package = "parity-scale-codec", default-features = false, features = ["derive"]} -tracing = { version = "0.1.19", default-features = false } -tracing-core = { version = "0.1.16", default-features = false } +tracing = { version = "0.1.21", default-features = false } +tracing-core = { version = "0.1.17", default-features = false } log = { version = "0.4.8", optional = true } tracing-subscriber = { version = "0.2.10", optional = true, features = ["tracing-log"] } [features] default = [ "std" ] with-tracing = [ - "codec/derive", - "codec/full", + "codec/derive", + "codec/full", ] std = [ - "with-tracing", - "tracing/std", - "tracing-core/std", - "codec/std", - "sp-std/std", - "log", - "tracing-subscriber", + "with-tracing", + "tracing/std", + "tracing-core/std", + "codec/std", + "sp-std/std", + "log", + "tracing-subscriber", ] diff --git a/primitives/tracing/src/lib.rs b/primitives/tracing/src/lib.rs index fb074d5579..cb67d8a0c5 100644 --- a/primitives/tracing/src/lib.rs +++ b/primitives/tracing/src/lib.rs @@ -107,7 +107,9 @@ pub use crate::types::{ /// Ignores any error. Useful for testing. #[cfg(feature = "std")] pub fn try_init_simple() { - let _ = tracing_subscriber::fmt().with_writer(std::io::stderr).try_init(); + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_writer(std::io::stderr).try_init(); } #[cfg(feature = "std")] diff --git a/test-utils/runtime/src/lib.rs b/test-utils/runtime/src/lib.rs index 5ab4d99dee..e772a28ee3 100644 --- a/test-utils/runtime/src/lib.rs +++ b/test-utils/runtime/src/lib.rs @@ -340,6 +340,8 @@ cfg_if! { /// Test that ensures that we can call a function that takes multiple /// arguments. fn test_multiple_arguments(data: Vec, other: Vec, num: u32); + /// Traces log "Hey I'm runtime." + fn do_trace_log(); } } } else { @@ -391,6 +393,8 @@ cfg_if! { /// Test that ensures that we can call a function that takes multiple /// arguments. fn test_multiple_arguments(data: Vec, other: Vec, num: u32); + /// Traces log "Hey I'm runtime." + fn do_trace_log(); } } } @@ -698,6 +702,11 @@ cfg_if! { assert_eq!(&data[..], &other[..]); assert_eq!(data.len(), num as usize); } + + fn do_trace_log() { + frame_support::debug::RuntimeLogger::init(); + frame_support::debug::trace!("Hey I'm runtime"); + } } impl sp_consensus_aura::AuraApi for Runtime { @@ -944,6 +953,11 @@ cfg_if! { assert_eq!(&data[..], &other[..]); assert_eq!(data.len(), num as usize); } + + fn do_trace_log() { + frame_support::debug::RuntimeLogger::init(); + frame_support::debug::trace!("Hey I'm runtime"); + } } impl sp_consensus_aura::AuraApi for Runtime { diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index ffd0a134be..29b1848fa7 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -57,7 +57,6 @@ where wasm_external_transport: Some(transport.clone()), allow_private_ipv4: true, enable_mdns: false, - use_yamux_flow_control: true, }; let config = Configuration { diff --git a/utils/prometheus/Cargo.toml b/utils/prometheus/Cargo.toml index 8fdbbc6124..9eed7a2fdc 100644 --- a/utils/prometheus/Cargo.toml +++ b/utils/prometheus/Cargo.toml @@ -19,6 +19,6 @@ futures-util = { version = "0.3.1", default-features = false, features = ["io"] derive_more = "0.99" [target.'cfg(not(target_os = "unknown"))'.dependencies] -async-std = { version = "1.6.2", features = ["unstable"] } +async-std = { version = "1.6.5", features = ["unstable"] } hyper = { version = "0.13.1", default-features = false, features = ["stream"] } tokio = "0.2"