Skip to content

Handle retrying sign_counterparty_commitment failures #2558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 2, 2023
1 change: 1 addition & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl SignerProvider for KeyProvider {
inner,
state,
disable_revocation_policy_check: false,
available: Arc::new(Mutex::new(true)),
})
}

Expand Down
323 changes: 323 additions & 0 deletions lightning/src/ln/async_signer_tests.rs

Large diffs are not rendered by default.

318 changes: 213 additions & 105 deletions lightning/src/ln/channel.rs

Large diffs are not rendered by default.

99 changes: 85 additions & 14 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3802,7 +3802,7 @@ where

let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let (chan, msg) = match peer_state.channel_by_id.remove(temporary_channel_id) {
let (chan, msg_opt) = match peer_state.channel_by_id.remove(temporary_channel_id) {
Some(ChannelPhase::UnfundedOutboundV1(chan)) => {
let funding_txo = find_funding_output(&chan, &funding_transaction)?;

Expand Down Expand Up @@ -3841,10 +3841,12 @@ where
}),
};

peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id: chan.context.get_counterparty_node_id(),
msg,
});
if let Some(msg) = msg_opt {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id: chan.context.get_counterparty_node_id(),
msg,
});
}
match peer_state.channel_by_id.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
panic!("Generated duplicate funding txid?");
Expand Down Expand Up @@ -6229,7 +6231,7 @@ where

let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let (chan, funding_msg, monitor) =
let (chan, funding_msg_opt, monitor) =
match peer_state.channel_by_id.remove(&msg.temporary_channel_id) {
Some(ChannelPhase::UnfundedInboundV1(inbound_chan)) => {
match inbound_chan.funding_created(msg, best_block, &self.signer_provider, &self.logger) {
Expand All @@ -6252,17 +6254,20 @@ where
None => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.temporary_channel_id))
};

match peer_state.channel_by_id.entry(funding_msg.channel_id) {
match peer_state.channel_by_id.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
Err(MsgHandleErrInternal::send_err_msg_no_close("Already had channel with the new channel_id".to_owned(), funding_msg.channel_id))
Err(MsgHandleErrInternal::send_err_msg_no_close(
"Already had channel with the new channel_id".to_owned(),
chan.context.channel_id()
))
},
hash_map::Entry::Vacant(e) => {
let mut id_to_peer_lock = self.id_to_peer.lock().unwrap();
match id_to_peer_lock.entry(chan.context.channel_id()) {
hash_map::Entry::Occupied(_) => {
return Err(MsgHandleErrInternal::send_err_msg_no_close(
"The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
funding_msg.channel_id))
chan.context.channel_id()))
},
hash_map::Entry::Vacant(i_e) => {
let monitor_res = self.chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
Expand All @@ -6274,10 +6279,12 @@ where
// hasn't persisted to disk yet - we can't lose money on a transaction that we haven't
// accepted payment from yet. We do, however, need to wait to send our channel_ready
// until we have persisted our monitor.
peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id: counterparty_node_id.clone(),
msg: funding_msg,
});
if let Some(msg) = funding_msg_opt {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id: counterparty_node_id.clone(),
msg,
});
}

if let ChannelPhase::Funded(chan) = e.insert(ChannelPhase::Funded(chan)) {
handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
Expand All @@ -6288,9 +6295,13 @@ where
Ok(())
} else {
log_error!(self.logger, "Persisting initial ChannelMonitor failed, implying the funding outpoint was duplicated");
let channel_id = match funding_msg_opt {
Some(msg) => msg.channel_id,
None => chan.context.channel_id(),
};
return Err(MsgHandleErrInternal::send_err_msg_no_close(
"The funding_created message had the same funding_txid as an existing channel - funding is not possible".to_owned(),
funding_msg.channel_id));
channel_id));
}
}
}
Expand Down Expand Up @@ -7216,6 +7227,66 @@ where
has_update
}

/// When a call to a [`ChannelSigner`] method returns an error, this indicates that the signer
/// is (temporarily) unavailable, and the operation should be retried later.
///
/// This method allows for that retry - either checking for any signer-pending messages to be
/// attempted in every channel, or in the specifically provided channel.
///
/// [`ChannelSigner`]: crate::sign::ChannelSigner
#[cfg(test)] // This is only implemented for one signer method, and should be private until we
// actually finish implementing it fully.
pub fn signer_unblocked(&self, channel_opt: Option<(PublicKey, ChannelId)>) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);

let unblock_chan = |phase: &mut ChannelPhase<SP>, pending_msg_events: &mut Vec<MessageSendEvent>| {
let node_id = phase.context().get_counterparty_node_id();
if let ChannelPhase::Funded(chan) = phase {
let msgs = chan.signer_maybe_unblocked(&self.logger);
if let Some(updates) = msgs.commitment_update {
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id,
updates,
});
}
if let Some(msg) = msgs.funding_signed {
pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
node_id,
msg,
});
}
if let Some(msg) = msgs.funding_created {
pending_msg_events.push(events::MessageSendEvent::SendFundingCreated {
node_id,
msg,
});
}
if let Some(msg) = msgs.channel_ready {
send_channel_ready!(self, pending_msg_events, chan, msg);
}
}
};

let per_peer_state = self.per_peer_state.read().unwrap();
if let Some((counterparty_node_id, channel_id)) = channel_opt {
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
if let Some(chan) = peer_state.channel_by_id.get_mut(&channel_id) {
unblock_chan(chan, &mut peer_state.pending_msg_events);
}
}
} else {
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
for (_, chan) in peer_state.channel_by_id.iter_mut() {
unblock_chan(chan, &mut peer_state.pending_msg_events);
}
}
}
}

/// Check whether any channels have finished removing all pending updates after a shutdown
/// exchange and can now send a closing_signed.
/// Returns whether any closing_signed messages were generated.
Expand Down
43 changes: 35 additions & 8 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::util::test_utils::{panicking, TestChainMonitor, TestScorer, TestKeysI
use crate::util::errors::APIError;
use crate::util::config::{UserConfig, MaxDustHTLCExposure};
use crate::util::ser::{ReadableArgs, Writeable};
#[cfg(test)]
use crate::util::logger::Logger;

use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::transaction::{Transaction, TxOut};
Expand Down Expand Up @@ -436,6 +438,25 @@ impl<'a, 'b, 'c> Node<'a, 'b, 'c> {
pub fn get_block_header(&self, height: u32) -> BlockHeader {
self.blocks.lock().unwrap()[height as usize].0.header
}
/// Changes the channel signer's availability for the specified peer and channel.
///
/// When `available` is set to `true`, the channel signer will behave normally. When set to
/// `false`, the channel signer will act like an off-line remote signer and will return `Err` for
/// several of the signing methods. Currently, only `get_per_commitment_point` and
/// `release_commitment_secret` are affected by this setting.
#[cfg(test)]
pub fn set_channel_signer_available(&self, peer_id: &PublicKey, chan_id: &ChannelId, available: bool) {
let per_peer_state = self.node.per_peer_state.read().unwrap();
let chan_lock = per_peer_state.get(peer_id).unwrap().lock().unwrap();
let signer = (|| {
match chan_lock.channel_by_id.get(chan_id) {
Some(phase) => phase.context().get_signer(),
None => panic!("Couldn't find a channel with id {}", chan_id),
}
})();
log_debug!(self.logger, "Setting channel signer for {} as available={}", chan_id, available);
signer.as_ecdsa().unwrap().set_available(available);
}
}

/// If we need an unsafe pointer to a `Node` (ie to reference it in a thread
Expand Down Expand Up @@ -924,7 +945,8 @@ macro_rules! unwrap_send_err {
pub fn check_added_monitors<CM: AChannelManager, H: NodeHolder<CM=CM>>(node: &H, count: usize) {
if let Some(chain_monitor) = node.chain_monitor() {
let mut added_monitors = chain_monitor.added_monitors.lock().unwrap();
assert_eq!(added_monitors.len(), count);
let n = added_monitors.len();
assert_eq!(n, count, "expected {} monitors to be added, not {}", count, n);
added_monitors.clear();
}
}
Expand Down Expand Up @@ -2119,12 +2141,13 @@ macro_rules! expect_channel_shutdown_state {
}

#[cfg(any(test, ldk_bench, feature = "_test_utils"))]
pub fn expect_channel_pending_event<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, expected_counterparty_node_id: &PublicKey) {
pub fn expect_channel_pending_event<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, expected_counterparty_node_id: &PublicKey) -> ChannelId {
let events = node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
crate::events::Event::ChannelPending { ref counterparty_node_id, .. } => {
match &events[0] {
crate::events::Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
assert_eq!(*expected_counterparty_node_id, *counterparty_node_id);
*channel_id
},
_ => panic!("Unexpected event"),
}
Expand Down Expand Up @@ -3175,24 +3198,28 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
// If a expects a channel_ready, it better not think it has received a revoke_and_ack
// from b
for reestablish in reestablish_1.iter() {
assert_eq!(reestablish.next_remote_commitment_number, 0);
let n = reestablish.next_remote_commitment_number;
assert_eq!(n, 0, "expected a->b next_remote_commitment_number to be 0, got {}", n);
}
}
if send_channel_ready.1 {
// If b expects a channel_ready, it better not think it has received a revoke_and_ack
// from a
for reestablish in reestablish_2.iter() {
assert_eq!(reestablish.next_remote_commitment_number, 0);
let n = reestablish.next_remote_commitment_number;
assert_eq!(n, 0, "expected b->a next_remote_commitment_number to be 0, got {}", n);
}
}
if send_channel_ready.0 || send_channel_ready.1 {
// If we expect any channel_ready's, both sides better have set
// next_holder_commitment_number to 1
for reestablish in reestablish_1.iter() {
assert_eq!(reestablish.next_local_commitment_number, 1);
let n = reestablish.next_local_commitment_number;
assert_eq!(n, 1, "expected a->b next_local_commitment_number to be 1, got {}", n);
}
for reestablish in reestablish_2.iter() {
assert_eq!(reestablish.next_local_commitment_number, 1);
let n = reestablish.next_local_commitment_number;
assert_eq!(n, 1, "expected b->a next_local_commitment_number to be 1, got {}", n);
}
}

Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9045,7 +9045,7 @@ fn test_duplicate_chan_id() {
}
};
check_added_monitors!(nodes[0], 0);
nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created);
nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created.unwrap());
// At this point we'll look up if the channel_id is present and immediately fail the channel
// without trying to persist the `ChannelMonitor`.
check_added_monitors!(nodes[1], 0);
Expand Down
3 changes: 3 additions & 0 deletions lightning/src/ln/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ mod monitor_tests;
#[cfg(test)]
#[allow(unused_mut)]
mod shutdown_tests;
#[cfg(test)]
#[allow(unused_mut)]
mod async_signer_tests;

pub use self::peer_channel_encryptor::LN_MAX_MSG_LEN;

Expand Down
28 changes: 26 additions & 2 deletions lightning/src/util/test_channel_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub struct TestChannelSigner {
/// Channel state used for policy enforcement
pub state: Arc<Mutex<EnforcementState>>,
pub disable_revocation_policy_check: bool,
/// When `true` (the default), the signer will respond immediately with signatures. When `false`,
/// the signer will return an error indicating that it is unavailable.
pub available: Arc<Mutex<bool>>,
}

impl PartialEq for TestChannelSigner {
Expand All @@ -71,7 +74,8 @@ impl TestChannelSigner {
Self {
inner,
state,
disable_revocation_policy_check: false
disable_revocation_policy_check: false,
available: Arc::new(Mutex::new(true)),
}
}

Expand All @@ -84,7 +88,8 @@ impl TestChannelSigner {
Self {
inner,
state,
disable_revocation_policy_check
disable_revocation_policy_check,
available: Arc::new(Mutex::new(true)),
}
}

Expand All @@ -94,6 +99,16 @@ impl TestChannelSigner {
pub fn get_enforcement_state(&self) -> MutexGuard<EnforcementState> {
self.state.lock().unwrap()
}

/// Marks the signer's availability.
///
/// When `true`, methods are forwarded to the underlying signer as normal. When `false`, some
/// methods will return `Err` indicating that the signer is unavailable. Intended to be used for
/// testing asynchronous signing.
#[cfg(test)]
pub fn set_available(&self, available: bool) {
*self.available.lock().unwrap() = available;
}
}

impl ChannelSigner for TestChannelSigner {
Expand Down Expand Up @@ -133,6 +148,9 @@ impl EcdsaChannelSigner for TestChannelSigner {
self.verify_counterparty_commitment_tx(commitment_tx, secp_ctx);

{
if !*self.available.lock().unwrap() {
return Err(());
}
let mut state = self.state.lock().unwrap();
let actual_commitment_number = commitment_tx.commitment_number();
let last_commitment_number = state.last_counterparty_commitment;
Expand All @@ -149,13 +167,19 @@ impl EcdsaChannelSigner for TestChannelSigner {
}

fn validate_counterparty_revocation(&self, idx: u64, _secret: &SecretKey) -> Result<(), ()> {
if !*self.available.lock().unwrap() {
return Err(());
}
let mut state = self.state.lock().unwrap();
assert!(idx == state.last_counterparty_revoked_commitment || idx == state.last_counterparty_revoked_commitment - 1, "expecting to validate the current or next counterparty revocation - trying {}, current {}", idx, state.last_counterparty_revoked_commitment);
state.last_counterparty_revoked_commitment = idx;
Ok(())
}

fn sign_holder_commitment(&self, commitment_tx: &HolderCommitmentTransaction, secp_ctx: &Secp256k1<secp256k1::All>) -> Result<Signature, ()> {
if !*self.available.lock().unwrap() {
return Err(());
}
let trusted_tx = self.verify_holder_commitment_tx(commitment_tx, secp_ctx);
let state = self.state.lock().unwrap();
let commitment_number = trusted_tx.commitment_number();
Expand Down