Skip to content

Implement pending claim rebroadcast on force-closed channels #2208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ use alloc::vec::Vec;
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
/// writing it to disk/backups by invoking the callback given to it at startup.
/// [`ChannelManager`] persistence should be done in the background.
/// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
/// at the appropriate intervals.
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
///
Expand Down Expand Up @@ -116,12 +116,17 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
#[cfg(test)]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;

#[cfg(not(test))]
const REBROADCAST_TIMER: u64 = 30;
#[cfg(test)]
const REBROADCAST_TIMER: u64 = 1;

#[cfg(feature = "futures")]
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
#[cfg(feature = "futures")]
const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER));
min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));

/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
pub enum GossipSync<
Expand Down Expand Up @@ -270,11 +275,14 @@ macro_rules! define_run_body {
=> { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
$chain_monitor.rebroadcast_pending_claims();

let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
let mut last_ping_call = $get_timer(PING_TIMER);
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
let mut have_pruned = false;

loop {
Expand Down Expand Up @@ -372,6 +380,12 @@ macro_rules! define_run_body {
}
last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
}

if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
log_trace!($logger, "Rebroadcasting monitor's pending claims");
$chain_monitor.rebroadcast_pending_claims();
last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
}
}

// After we exit, ensure we persist the ChannelManager one final time - this avoids
Expand Down Expand Up @@ -1189,19 +1203,22 @@ mod tests {

#[test]
fn test_timer_tick_called() {
// Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
// `FRESHNESS_TIMER`.
// Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
// `PeerManager::timer_tick_occurred` every `PING_TIMER`.
let nodes = create_nodes(1, "test_timer_tick_called".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: _| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
break
}
}
Expand Down
21 changes: 21 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,15 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> Deref for LockedChannelMonitor<
/// or used independently to monitor channels remotely. See the [module-level documentation] for
/// details.
///
/// Note that `ChainMonitor` should regularly trigger rebroadcasts/fee bumps of pending claims from
/// a force-closed channel. This is crucial in preventing certain classes of pinning attacks,
/// detecting substantial mempool feerate changes between blocks, and ensuring reliability if
/// broadcasting fails. We recommend invoking this every 30 seconds, or lower if running in an
/// environment with spotty connections, like on mobile.
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
/// [module-level documentation]: crate::chain::chainmonitor
/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
Expand Down Expand Up @@ -533,6 +540,20 @@ where C::Target: chain::Filter,
pub fn get_update_future(&self) -> Future {
self.event_notifier.get_future()
}

/// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
/// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
/// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
/// invoking this every 30 seconds, or lower if running in an environment with spotty
/// connections, like on mobile.
pub fn rebroadcast_pending_claims(&self) {
let monitors = self.monitors.read().unwrap();
for (_, monitor_holder) in &*monitors {
monitor_holder.monitor.rebroadcast_pending_claims(
&*self.broadcaster, &*self.fee_estimator, &*self.logger
)
}
}
}

impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand Down
21 changes: 21 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,27 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
pub fn current_best_block(&self) -> BestBlock {
self.inner.lock().unwrap().best_block.clone()
}

/// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
/// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
/// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
/// invoking this every 30 seconds, or lower if running in an environment with spotty
/// connections, like on mobile.
pub fn rebroadcast_pending_claims<B: Deref, F: Deref, L: Deref>(
&self, broadcaster: B, fee_estimator: F, logger: L,
)
where
B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
let fee_estimator = LowerBoundedFeeEstimator::new(fee_estimator);
let mut inner = self.inner.lock().unwrap();
let current_height = inner.best_block.height;
inner.onchain_tx_handler.rebroadcast_pending_claims(
current_height, &broadcaster, &fee_estimator, &logger,
);
}
}

impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
Expand Down
85 changes: 75 additions & 10 deletions lightning/src/chain/onchaintx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,59 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
events.into_iter().map(|(_, event)| event).collect()
}

/// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
/// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
/// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
/// invoking this every 30 seconds, or lower if running in an environment with spotty
/// connections, like on mobile.
pub(crate) fn rebroadcast_pending_claims<B: Deref, F: Deref, L: Deref>(
&mut self, current_height: u32, broadcaster: &B, fee_estimator: &LowerBoundedFeeEstimator<F>,
logger: &L,
)
where
B::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
let mut bump_requests = Vec::with_capacity(self.pending_claim_requests.len());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

god we really need to fix this so we can just iterate the map and build as we go, rather than adding intermediate vecs and then looking up in the map and updating it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I want to address this for the next release.

for (package_id, request) in self.pending_claim_requests.iter() {
let inputs = request.outpoints();
log_info!(logger, "Triggering rebroadcast/fee-bump for request with inputs {:?}", inputs);
bump_requests.push((*package_id, request.clone()));
}
for (package_id, request) in bump_requests {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can prefix the package_id with custom_package_id or ldk_package_id, just some sanity codebase as Core is working on introducing package at the p2p-level and there is a real formalized package id introduced. Avoid some confusion in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also used in a few other places. We can likely address it when we update our BroadcasterInterface to support broadcasting transaction packages.

self.generate_claim(current_height, &request, false /* force_feerate_bump */, fee_estimator, logger)
.map(|(_, new_feerate, claim)| {
let mut bumped_feerate = false;
if let Some(mut_request) = self.pending_claim_requests.get_mut(&package_id) {
bumped_feerate = request.previous_feerate() > new_feerate;
mut_request.set_feerate(new_feerate);
}
match claim {
OnchainClaim::Tx(tx) => {
let log_start = if bumped_feerate { "Broadcasting RBF-bumped" } else { "Rebroadcasting" };
log_info!(logger, "{} onchain {}", log_start, log_tx!(tx));
broadcaster.broadcast_transaction(&tx);
},
#[cfg(anchors)]
OnchainClaim::Event(event) => {
let log_start = if bumped_feerate { "Yielding fee-bumped" } else { "Replaying" };
log_info!(logger, "{} onchain event to spend inputs {:?}", log_start,
request.outpoints());
#[cfg(debug_assertions)] {
debug_assert!(request.requires_external_funding());
let num_existing = self.pending_claim_events.iter()
.filter(|entry| entry.0 == package_id).count();
assert!(num_existing == 0 || num_existing == 1);
}
self.pending_claim_events.retain(|event| event.0 != package_id);
self.pending_claim_events.push((package_id, event));
}
}
});
}
}

/// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty
/// onchain) lays on the assumption of claim transactions getting confirmed before timelock
/// expiration (CSV or CLTV following cases). In case of high-fee spikes, claim tx may get stuck
Expand All @@ -489,9 +542,13 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
///
/// Panics if there are signing errors, because signing operations in reaction to on-chain
/// events are not expected to fail, and if they do, we may lose funds.
fn generate_claim<F: Deref, L: Deref>(&mut self, cur_height: u32, cached_request: &PackageTemplate, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L) -> Option<(u32, u64, OnchainClaim)>
where F::Target: FeeEstimator,
L::Target: Logger,
fn generate_claim<F: Deref, L: Deref>(
&mut self, cur_height: u32, cached_request: &PackageTemplate, force_feerate_bump: bool,
fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L,
) -> Option<(u32, u64, OnchainClaim)>
where
F::Target: FeeEstimator,
L::Target: Logger,
{
let request_outpoints = cached_request.outpoints();
if request_outpoints.is_empty() {
Expand Down Expand Up @@ -538,8 +595,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
#[cfg(anchors)]
{ // Attributes are not allowed on if expressions on our current MSRV of 1.41.
if cached_request.requires_external_funding() {
let target_feerate_sat_per_1000_weight = cached_request
.compute_package_feerate(fee_estimator, ConfirmationTarget::HighPriority);
let target_feerate_sat_per_1000_weight = cached_request.compute_package_feerate(
fee_estimator, ConfirmationTarget::HighPriority, force_feerate_bump
);
if let Some(htlcs) = cached_request.construct_malleable_package_with_external_funding(self) {
return Some((
new_timer,
Expand All @@ -558,7 +616,8 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>

let predicted_weight = cached_request.package_weight(&self.destination_script);
if let Some((output_value, new_feerate)) = cached_request.compute_package_output(
predicted_weight, self.destination_script.dust_value().to_sat(), fee_estimator, logger,
predicted_weight, self.destination_script.dust_value().to_sat(),
force_feerate_bump, fee_estimator, logger,
) {
assert!(new_feerate != 0);

Expand Down Expand Up @@ -601,7 +660,7 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
// counterparty's latest commitment don't have any HTLCs present.
let conf_target = ConfirmationTarget::HighPriority;
let package_target_feerate_sat_per_1000_weight = cached_request
.compute_package_feerate(fee_estimator, conf_target);
.compute_package_feerate(fee_estimator, conf_target, force_feerate_bump);
Some((
new_timer,
package_target_feerate_sat_per_1000_weight as u64,
Expand Down Expand Up @@ -700,7 +759,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
// Generate claim transactions and track them to bump if necessary at
// height timer expiration (i.e in how many blocks we're going to take action).
for mut req in preprocessed_requests {
if let Some((new_timer, new_feerate, claim)) = self.generate_claim(cur_height, &req, &*fee_estimator, &*logger) {
if let Some((new_timer, new_feerate, claim)) = self.generate_claim(
cur_height, &req, true /* force_feerate_bump */, &*fee_estimator, &*logger,
) {
req.set_timer(new_timer);
req.set_feerate(new_feerate);
let package_id = match claim {
Expand Down Expand Up @@ -893,7 +954,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
// Build, bump and rebroadcast tx accordingly
log_trace!(logger, "Bumping {} candidates", bump_candidates.len());
for (package_id, request) in bump_candidates.iter() {
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(cur_height, &request, &*fee_estimator, &*logger) {
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(
cur_height, &request, true /* force_feerate_bump */, &*fee_estimator, &*logger,
) {
match bump_claim {
OnchainClaim::Tx(bump_tx) => {
log_info!(logger, "Broadcasting RBF-bumped onchain {}", log_tx!(bump_tx));
Expand Down Expand Up @@ -973,7 +1036,9 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
}
}
for ((_package_id, _), ref mut request) in bump_candidates.iter_mut() {
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(height, &request, fee_estimator, &&*logger) {
if let Some((new_timer, new_feerate, bump_claim)) = self.generate_claim(
height, &request, true /* force_feerate_bump */, fee_estimator, &&*logger
) {
request.set_timer(new_timer);
request.set_feerate(new_feerate);
match bump_claim {
Expand Down
Loading