Skip to content

Deduplicate PendingHTLCsForwardable events on generation #2026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3737,16 +3737,19 @@ where
// being fully configured. See the docs for `ChannelManagerReadArgs` for more.
match source {
HTLCSource::OutboundRoute { ref path, ref session_priv, ref payment_id, ref payment_params, .. } => {
self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path, session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx, &self.pending_events, &self.logger);
if self.pending_outbound_payments.fail_htlc(source, payment_hash, onion_error, path,
session_priv, payment_id, payment_params, self.probing_cookie_secret, &self.secp_ctx,
&self.pending_events, &self.logger)
{ self.push_pending_forwards_ev(); }
},
HTLCSource::PreviousHopData(HTLCPreviousHopData { ref short_channel_id, ref htlc_id, ref incoming_packet_shared_secret, ref phantom_shared_secret, ref outpoint }) => {
log_trace!(self.logger, "Failing HTLC with payment_hash {} backwards from us with {:?}", log_bytes!(payment_hash.0), onion_error);
let err_packet = onion_error.get_encrypted_failure_packet(incoming_packet_shared_secret, phantom_shared_secret);

let mut forward_event = None;
let mut push_forward_ev = false;
let mut forward_htlcs = self.forward_htlcs.lock().unwrap();
if forward_htlcs.is_empty() {
forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS));
push_forward_ev = true;
}
match forward_htlcs.entry(*short_channel_id) {
hash_map::Entry::Occupied(mut entry) => {
Expand All @@ -3757,12 +3760,8 @@ where
}
}
mem::drop(forward_htlcs);
if push_forward_ev { self.push_pending_forwards_ev(); }
let mut pending_events = self.pending_events.lock().unwrap();
if let Some(time) = forward_event {
pending_events.push(events::Event::PendingHTLCsForwardable {
time_forwardable: time
});
}
pending_events.push(events::Event::HTLCHandlingFailed {
prev_channel_id: outpoint.to_channel_id(),
failed_next_destination: destination,
Expand Down Expand Up @@ -4839,7 +4838,7 @@ where
#[inline]
fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) {
for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
let mut forward_event = None;
let mut push_forward_event = false;
let mut new_intercept_events = Vec::new();
let mut failed_intercept_forwards = Vec::new();
if !pending_forwards.is_empty() {
Expand Down Expand Up @@ -4897,7 +4896,7 @@ where
// We don't want to generate a PendingHTLCsForwardable event if only intercepted
// payments are being processed.
if forward_htlcs_empty {
forward_event = Some(Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS));
push_forward_event = true;
}
entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info })));
Expand All @@ -4915,16 +4914,21 @@ where
let mut events = self.pending_events.lock().unwrap();
events.append(&mut new_intercept_events);
}
if push_forward_event { self.push_pending_forwards_ev() }
}
}

match forward_event {
Some(time) => {
let mut pending_events = self.pending_events.lock().unwrap();
pending_events.push(events::Event::PendingHTLCsForwardable {
time_forwardable: time
});
}
None => {},
}
// We only want to push a PendingHTLCsForwardable event if no others are queued.
fn push_pending_forwards_ev(&self) {
let mut pending_events = self.pending_events.lock().unwrap();
let forward_ev_exists = pending_events.iter()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, oops, this check isn't sufficient - I mean it reduces duplication, but the issue is we send the user a PendingHTLCsForwardable event, then they wait (during which time we don't have one lying around here) and then they call pending_htlcs_forwardable. Avoiding duplicating entirely is gonna be a bit hard, but I think we could at least only have two in-flight ones at once by filtering the outbound_payment-generated one on needs_abandon (but under the same lock as the update in fail_htlc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see the case we're not covering. I don't think we'd want needs_abandon, though, seems we'd want a new needs_retry check IIUC. Will add this. (Note that needs_abandon should only apply to restart since we set all payments as non-retryable on deser.)

.find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol if our MSRV was just one minor version up (1.42) we could just

.find(|ev| matches!(ev, events::Event::PendingHTLCsForwardable))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Soclose 🥲

.is_some();
if !forward_ev_exists {
pending_events.push(events::Event::PendingHTLCsForwardable {
time_forwardable:
Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
});
}
}

Expand Down Expand Up @@ -7526,7 +7530,8 @@ where
}
}

if !forward_htlcs.is_empty() {
let pending_outbounds = OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), retry_lock: Mutex::new(()) };
if !forward_htlcs.is_empty() || pending_outbounds.needs_abandon() {
// If we have pending HTLCs to forward, assume we either dropped a
// `PendingHTLCsForwardable` or the user received it but never processed it as they
// shut down before the timer hit. Either way, set the time_forwardable to a small
Expand Down Expand Up @@ -7694,7 +7699,7 @@ where

inbound_payment_key: expanded_inbound_key,
pending_inbound_payments: Mutex::new(pending_inbound_payments),
pending_outbound_payments: OutboundPayments { pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()), retry_lock: Mutex::new(()), },
pending_outbound_payments: pending_outbounds,
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),

forward_htlcs: Mutex::new(forward_htlcs),
Expand Down
43 changes: 31 additions & 12 deletions lightning/src/ln/outbound_payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey};

use crate::chain::keysinterface::{EntropySource, NodeSigner, Recipient};
use crate::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
use crate::ln::channelmanager::{ChannelDetails, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, MIN_HTLC_RELAY_HOLDING_CELL_MILLIS, PaymentId};
use crate::ln::channelmanager::{ChannelDetails, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, PaymentId};
use crate::ln::channelmanager::MIN_FINAL_CLTV_EXPIRY_DELTA as LDK_DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA;
use crate::ln::msgs::DecodeError;
use crate::ln::onion_utils::HTLCFailReason;
Expand All @@ -30,7 +30,6 @@ use crate::util::time::tests::SinceEpoch;
use core::cmp;
use core::fmt::{self, Display, Formatter};
use core::ops::Deref;
use core::time::Duration;

use crate::prelude::*;
use crate::sync::Mutex;
Expand Down Expand Up @@ -546,6 +545,12 @@ impl OutboundPayments {
});
}

pub(super) fn needs_abandon(&self) -> bool {
let outbounds = self.pending_outbound_payments.lock().unwrap();
outbounds.iter().any(|(_, pmt)|
!pmt.is_auto_retryable_now() && pmt.remaining_parts() == 0 && !pmt.is_fulfilled())
}

/// Will return `Ok(())` iff at least one HTLC is sent for the payment.
fn pay_internal<R: Deref, NS: Deref, ES: Deref, IH, SP, L: Deref>(
&self, payment_id: PaymentId,
Expand Down Expand Up @@ -1006,12 +1011,13 @@ impl OutboundPayments {
});
}

// Returns a bool indicating whether a PendingHTLCsForwardable event should be generated.
pub(super) fn fail_htlc<L: Deref>(
&self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason,
path: &Vec<RouteHop>, session_priv: &SecretKey, payment_id: &PaymentId,
payment_params: &Option<PaymentParameters>, probing_cookie_secret: [u8; 32],
secp_ctx: &Secp256k1<secp256k1::All>, pending_events: &Mutex<Vec<events::Event>>, logger: &L
) where L::Target: Logger {
) -> bool where L::Target: Logger {
#[cfg(test)]
let (network_update, short_channel_id, payment_retryable, onion_error_code, onion_error_data) = onion_error.decode_onion_failure(secp_ctx, logger, &source);
#[cfg(not(test))]
Expand All @@ -1021,18 +1027,33 @@ impl OutboundPayments {
let mut session_priv_bytes = [0; 32];
session_priv_bytes.copy_from_slice(&session_priv[..]);
let mut outbounds = self.pending_outbound_payments.lock().unwrap();

// If any payments already need retry, there's no need to generate a redundant
// `PendingHTLCsForwardable`.
let already_awaiting_retry = outbounds.iter().any(|(_, pmt)| {
let mut awaiting_retry = false;
if pmt.is_auto_retryable_now() {
if let PendingOutboundPayment::Retryable { pending_amt_msat, total_msat, .. } = pmt {
if pending_amt_msat < total_msat {
awaiting_retry = true;
}
}
}
awaiting_retry
});

let mut all_paths_failed = false;
let mut full_failure_ev = None;
let mut pending_retry_ev = None;
let mut pending_retry_ev = false;
let mut retry = None;
let attempts_remaining = if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(*payment_id) {
if !payment.get_mut().remove(&session_priv_bytes, Some(&path)) {
log_trace!(logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
return
return false
}
if payment.get().is_fulfilled() {
log_trace!(logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0));
return
return false
}
let mut is_retryable_now = payment.get().is_auto_retryable_now();
if let Some(scid) = short_channel_id {
Expand Down Expand Up @@ -1084,7 +1105,7 @@ impl OutboundPayments {
is_retryable_now
} else {
log_trace!(logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
return
return false
};
core::mem::drop(outbounds);
log_trace!(logger, "Failing outbound payment HTLC with payment_hash {}", log_bytes!(payment_hash.0));
Expand Down Expand Up @@ -1114,11 +1135,9 @@ impl OutboundPayments {
}
// If we miss abandoning the payment above, we *must* generate an event here or else the
// payment will sit in our outbounds forever.
if attempts_remaining {
if attempts_remaining && !already_awaiting_retry {
debug_assert!(full_failure_ev.is_none());
pending_retry_ev = Some(events::Event::PendingHTLCsForwardable {
time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS),
});
pending_retry_ev = true;
}
events::Event::PaymentPathFailed {
payment_id: Some(*payment_id),
Expand All @@ -1139,7 +1158,7 @@ impl OutboundPayments {
let mut pending_events = pending_events.lock().unwrap();
pending_events.push(path_failure);
if let Some(ev) = full_failure_ev { pending_events.push(ev); }
if let Some(ev) = pending_retry_ev { pending_events.push(ev); }
pending_retry_ev
}

pub(super) fn abandon_payment(
Expand Down
9 changes: 3 additions & 6 deletions lightning/src/ln/payment_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1721,8 +1721,9 @@ fn do_automatic_retries(test: AutoRetry) {
let chan_1_monitor_serialized = get_monitor!(nodes[0], channel_id_1).encode();
reload_node!(nodes[0], node_encoded, &[&chan_1_monitor_serialized], persister, new_chain_monitor, node_0_deserialized);

let mut events = nodes[0].node.get_and_clear_pending_events();
expect_pending_htlcs_forwardable_from_events!(nodes[0], events, true);
// Make sure we don't retry again.
nodes[0].node.process_pending_htlc_forwards();
let mut msg_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 0);

Expand Down Expand Up @@ -2348,7 +2349,7 @@ fn no_extra_retries_on_back_to_back_fail() {
// Because we now retry payments as a batch, we simply return a single-path route in the
// second, batched, request, have that fail, ensure the payment was abandoned.
let mut events = nodes[0].node.get_and_clear_pending_events();
assert_eq!(events.len(), 4);
assert_eq!(events.len(), 3);
match events[0] {
Event::PaymentPathFailed { payment_hash: ev_payment_hash, payment_failed_permanently, .. } => {
assert_eq!(payment_hash, ev_payment_hash);
Expand All @@ -2367,10 +2368,6 @@ fn no_extra_retries_on_back_to_back_fail() {
},
_ => panic!("Unexpected event"),
}
match events[3] {
Event::PendingHTLCsForwardable { .. } => {},
_ => panic!("Unexpected event"),
}

nodes[0].node.process_pending_htlc_forwards();
let retry_htlc_updates = SendEvent::from_node(&nodes[0]);
Expand Down