From 0d59417473747c7ffc184c310ef4146f525ee91c Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 17 Apr 2023 13:41:24 +0200 Subject: [PATCH 1/4] Fix unrelated warnings Just two trivial compiler warnings that are unrelated to the changes made here. --- lightning-net-tokio/src/lib.rs | 2 +- lightning/src/ln/channelmanager.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 37c9ddad762..48f1736d0c2 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -189,7 +189,7 @@ impl Connection { // our timeslice to another task we may just spin on this peer, starving other peers // and eventually disconnecting them for ping timeouts. Instead, we explicitly yield // here. - tokio::task::yield_now().await; + let _ = tokio::task::yield_now().await; }; let writer_option = us.lock().unwrap().writer.take(); if let Some(mut writer) = writer_option { diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 9f403f85b2f..213f3b954a2 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7947,8 +7947,6 @@ mod tests { use bitcoin::hashes::Hash; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - #[cfg(feature = "std")] - use core::time::Duration; use core::sync::atomic::Ordering; use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason}; use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret}; From 36bf817ec70ff9f5ea3ba0317555a97f206a038f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 18 Apr 2023 16:27:02 +0200 Subject: [PATCH 2/4] Actually test `futures` builds Currently the BP `futures` tests rely on `std`. In order to actually have them run, we should enable `std`, i.e., remove `--no-default-features`. --- ci/ci-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/ci-tests.sh b/ci/ci-tests.sh index 7dad1436e98..37d8e06de1c 100755 --- a/ci/ci-tests.sh +++ b/ci/ci-tests.sh @@ -86,7 +86,7 @@ fi echo -e "\n\nTest futures builds" pushd lightning-background-processor -cargo test --verbose --color always --no-default-features --features futures +cargo test --verbose --color always --features futures popd if [ "$RUSTC_MINOR_VERSION" -gt 55 ]; then From b5468226cce7bfb2e754b7b46d3bf7137e79562e Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 20 Apr 2023 15:37:11 +0200 Subject: [PATCH 3/4] Fix BP prune timer and don't panic on persistence notification failure --- lightning-background-processor/src/lib.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 7d705bdcc3a..9be5127472f 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -342,7 +342,8 @@ macro_rules! define_run_body { // falling back to our usual hourly prunes. This avoids short-lived clients never // pruning their network graph. We run once 60 seconds after startup before // continuing our normal cadence. - if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) { + let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + if $timer_elapsed(&mut last_prune_call, prune_timer) { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = $gossip_sync.prunable_network_graph() { #[cfg(feature = "std")] { @@ -360,7 +361,8 @@ macro_rules! define_run_body { have_pruned = true; } - last_prune_call = $get_timer(NETWORK_PRUNE_TIMER); + let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + last_prune_call = $get_timer(prune_timer); } if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { @@ -867,7 +869,10 @@ mod tests { if key == "network_graph" { if let Some(sender) = &self.graph_persistence_notifier { - sender.send(()).unwrap(); + match sender.send(()) { + Ok(()) => {}, + Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."), + } }; if let Some((error, message)) = self.graph_error { From f2453b7fffd3ca79cac92803c8c6c99cea970e51 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 21 Apr 2023 18:02:54 +0200 Subject: [PATCH 4/4] Allow events processing without holding `total_consistency_lock` Unfortunately, the RAII types used by `RwLock` are not `Send`, which is why they can't be held over `await` boundaries. In order to allow asynchronous events processing in multi-threaded environments, we here allow to process events without holding the `total_consistency_lock`. --- lightning-background-processor/src/lib.rs | 22 ++++---- lightning/src/ln/channelmanager.rs | 61 +++++++++++++++-------- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 9be5127472f..7b7a75d60e6 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1485,10 +1485,9 @@ mod tests { }) }, false, ); - // TODO: Drop _local and simply spawn after #2003 - let local_set = tokio::task::LocalSet::new(); - local_set.spawn_local(bp_future); - local_set.spawn_local(async move { + + let t1 = tokio::spawn(bp_future); + let t2 = tokio::spawn(async move { do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, { let mut i = 0; loop { @@ -1500,7 +1499,9 @@ mod tests { }, tokio::time::sleep(Duration::from_millis(1)).await); exit_sender.send(()).unwrap(); }); - local_set.await; + let (r1, r2) = tokio::join!(t1, t2); + r1.unwrap().unwrap(); + r2.unwrap() } macro_rules! do_test_payment_path_scoring { @@ -1654,13 +1655,14 @@ mod tests { }) }, false, ); - // TODO: Drop _local and simply spawn after #2003 - let local_set = tokio::task::LocalSet::new(); - local_set.spawn_local(bp_future); - local_set.spawn_local(async move { + let t1 = tokio::spawn(bp_future); + let t2 = tokio::spawn(async move { do_test_payment_path_scoring!(nodes, receiver.recv().await); exit_sender.send(()).unwrap(); }); - local_set.await; + + let (r1, r2) = tokio::join!(t1, t2); + r1.unwrap().unwrap(); + r2.unwrap() } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 213f3b954a2..37b22a18428 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -72,7 +72,7 @@ use core::{cmp, mem}; use core::cell::RefCell; use crate::io::Read; use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; use core::time::Duration; use core::ops::Deref; @@ -926,6 +926,8 @@ where /// See `ChannelManager` struct-level documentation for lock order requirements. pending_events: Mutex>, + /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. + pending_events_processor: AtomicBool, /// See `ChannelManager` struct-level documentation for lock order requirements. pending_background_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. @@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { - // We'll acquire our total consistency lock until the returned future completes so that - // we can be sure no other persists happen while processing events. - let _read_guard = $self.total_consistency_lock.read().unwrap(); + let mut processed_all_events = false; + while !processed_all_events { + if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } - let mut result = NotifyOption::SkipPersist; + let mut result = NotifyOption::SkipPersist; - // TODO: This behavior should be documented. It's unintuitive that we query - // ChannelMonitors when clearing other events. - if $self.process_pending_monitor_events() { - result = NotifyOption::DoPersist; - } + { + // We'll acquire our total consistency lock so that we can be sure no other + // persists happen while processing monitor events. + let _read_guard = $self.total_consistency_lock.read().unwrap(); + + // TODO: This behavior should be documented. It's unintuitive that we query + // ChannelMonitors when clearing other events. + if $self.process_pending_monitor_events() { + result = NotifyOption::DoPersist; + } + } - let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]); - if !pending_events.is_empty() { - result = NotifyOption::DoPersist; - } + let pending_events = $self.pending_events.lock().unwrap().clone(); + let num_events = pending_events.len(); + if !pending_events.is_empty() { + result = NotifyOption::DoPersist; + } - for event in pending_events { - $event_to_handle = event; - $handle_event; - } + for event in pending_events { + $event_to_handle = event; + $handle_event; + } - if result == NotifyOption::DoPersist { - $self.persistence_notifier.notify(); + { + let mut pending_events = $self.pending_events.lock().unwrap(); + pending_events.drain(..num_events); + processed_all_events = pending_events.is_empty(); + $self.pending_events_processor.store(false, Ordering::Release); + } + + if result == NotifyOption::DoPersist { + $self.persistence_notifier.notify(); + } } } } @@ -1771,6 +1790,7 @@ where per_peer_state: FairRwLock::new(HashMap::new()), pending_events: Mutex::new(Vec::new()), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(), @@ -7916,6 +7936,7 @@ where per_peer_state: FairRwLock::new(per_peer_state), pending_events: Mutex::new(pending_events_read), + pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), persistence_notifier: Notifier::new(),