Skip to content

Allow async events processing without holding total_consistency_lock #2199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/ci-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fi

echo -e "\n\nTest futures builds"
pushd lightning-background-processor
cargo test --verbose --color always --no-default-features --features futures
cargo test --verbose --color always --features futures
popd

if [ "$RUSTC_MINOR_VERSION" -gt 55 ]; then
Expand Down
33 changes: 20 additions & 13 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ macro_rules! define_run_body {
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
if $timer_elapsed(&mut last_prune_call, prune_timer) {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
#[cfg(feature = "std")] {
Expand All @@ -360,7 +361,8 @@ macro_rules! define_run_body {

have_pruned = true;
}
last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
last_prune_call = $get_timer(prune_timer);
}

if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
Expand Down Expand Up @@ -867,7 +869,10 @@ mod tests {

if key == "network_graph" {
if let Some(sender) = &self.graph_persistence_notifier {
sender.send(()).unwrap();
match sender.send(()) {
Ok(()) => {},
Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, why?

Copy link
Contributor Author

@tnull tnull Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're shutting the other task down after the first send. However, we also persist again on shutdown, which triggers a second send, which would panic as the receiver is already gone at that point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a comment for why this is ok would be helpful

}
};

if let Some((error, message)) = self.graph_error {
Expand Down Expand Up @@ -1480,10 +1485,9 @@ mod tests {
})
}, false,
);
// TODO: Drop _local and simply spawn after #2003
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(bp_future);
local_set.spawn_local(async move {

let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
let mut i = 0;
loop {
Expand All @@ -1495,7 +1499,9 @@ mod tests {
}, tokio::time::sleep(Duration::from_millis(1)).await);
exit_sender.send(()).unwrap();
});
local_set.await;
let (r1, r2) = tokio::join!(t1, t2);
r1.unwrap().unwrap();
r2.unwrap()
}

macro_rules! do_test_payment_path_scoring {
Expand Down Expand Up @@ -1649,13 +1655,14 @@ mod tests {
})
}, false,
);
// TODO: Drop _local and simply spawn after #2003
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(bp_future);
local_set.spawn_local(async move {
let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
do_test_payment_path_scoring!(nodes, receiver.recv().await);
exit_sender.send(()).unwrap();
});
local_set.await;

let (r1, r2) = tokio::join!(t1, t2);
r1.unwrap().unwrap();
r2.unwrap()
}
}
2 changes: 1 addition & 1 deletion lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl Connection {
// our timeslice to another task we may just spin on this peer, starving other peers
// and eventually disconnecting them for ping timeouts. Instead, we explicitly yield
// here.
tokio::task::yield_now().await;
let _ = tokio::task::yield_now().await;
};
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
Expand Down
63 changes: 41 additions & 22 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use core::{cmp, mem};
use core::cell::RefCell;
use crate::io::Read;
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use core::time::Duration;
use core::ops::Deref;

Expand Down Expand Up @@ -926,6 +926,8 @@ where

/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_events: Mutex<Vec<events::Event>>,
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
pending_events_processor: AtomicBool,
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_background_events: Mutex<Vec<BackgroundEvent>>,
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
Expand Down Expand Up @@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update {

macro_rules! process_events_body {
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
// We'll acquire our total consistency lock until the returned future completes so that
// we can be sure no other persists happen while processing events.
let _read_guard = $self.total_consistency_lock.read().unwrap();
let mut processed_all_events = false;
while !processed_all_events {
Copy link
Contributor

@alecchendev alecchendev Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this is all run in a while loop? IIUC there may be other events added to pending_events by other async tasks while handling the events, which is how we end up not having processed all events, but why do we keep processing until pending_events is empty as opposed to just processing the events that were present when we first call this function? I guess does it make much of a difference or is it more just that we might as well do it while we're here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we no longer allow multiple processors to run at the same time - if one process_events call starts, and makes some progress, then an event is generated, causing a second process_events call to happen, the second call might return early, but there's some events there the user expects to have processed. Thus, we need to make sure the first process_events goes around again and processes the remaining events.

if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
return;
}

let mut result = NotifyOption::SkipPersist;
let mut result = NotifyOption::SkipPersist;

// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if $self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}
{
// We'll acquire our total consistency lock so that we can be sure no other
// persists happen while processing monitor events.
let _read_guard = $self.total_consistency_lock.read().unwrap();

// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if $self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}
}

let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}
let pending_events = $self.pending_events.lock().unwrap().clone();
let num_events = pending_events.len();
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}

for event in pending_events {
$event_to_handle = event;
$handle_event;
}
for event in pending_events {
$event_to_handle = event;
$handle_event;
}

if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
{
let mut pending_events = $self.pending_events.lock().unwrap();
pending_events.drain(..num_events);
processed_all_events = pending_events.is_empty();
$self.pending_events_processor.store(false, Ordering::Release);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this only happen if !processed_all_events? Not a big deal either way, I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if we processed all events? Yeah, I think I'd leave it as is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no, I mean literally just move the setter here into a check for if we're about to go around again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, had understood as much, but we def. need to reset in the case we leave the method. We could have moved the compare_exchange out of the loop and only reset the flag on exit, but given that it's a rare edge case anyways I thought it made sense to leave as is.

}

if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
}
}
}
}
Expand Down Expand Up @@ -1771,6 +1790,7 @@ where
per_peer_state: FairRwLock::new(HashMap::new()),

pending_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down Expand Up @@ -7916,6 +7936,7 @@ where
per_peer_state: FairRwLock::new(per_peer_state),

pending_events: Mutex::new(pending_events_read),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down Expand Up @@ -7947,8 +7968,6 @@ mod tests {
use bitcoin::hashes::Hash;
use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
#[cfg(feature = "std")]
use core::time::Duration;
use core::sync::atomic::Ordering;
use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
use crate::ln::{PaymentPreimage, PaymentHash, PaymentSecret};
Expand Down