Skip to content

Commit 92a818f

Browse files
Forward onion messages in PeerManager
Making sure channel messages are prioritized over OMs and we only write them when there's sufficient space in the peer's buffer.
1 parent ef5859a commit 92a818f

File tree

1 file changed

+42
-2
lines changed

1 file changed

+42
-2
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,15 +309,23 @@ enum InitSyncTracker{
309309
/// forwarding gossip messages to peers altogether.
310310
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
311311

312+
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
313+
/// forwarding onion messages to peers altogether.
314+
const OM_BUFFER_LIMIT_RATIO: usize = 2;
315+
312316
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
313317
/// we have fewer than this many messages in the outbound buffer again.
314-
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
315-
/// refilled as we send bytes.
318+
/// We also use this as the target number of outbound gossip and onion messages to keep in the write
319+
/// buffer, refilled as we send bytes.
316320
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
317321
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
318322
/// the peer.
319323
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
320324

325+
/// When the outbound buffer has this many messages, we won't poll for new onion messages for this
326+
/// peer.
327+
const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO;
328+
321329
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
322330
/// the socket receive buffer before receiving the ping.
323331
///
@@ -393,6 +401,14 @@ impl Peer {
393401
InitSyncTracker::NodesSyncing(pk) => pk < node_id,
394402
}
395403
}
404+
405+
/// Returns the number of onion messages we can fit in this peer's buffer.
406+
fn onion_message_buffer_slots_available(&self) -> usize {
407+
cmp::min(
408+
OUTBOUND_BUFFER_LIMIT_PAUSE_OMS.saturating_sub(self.pending_outbound_buffer.len()),
409+
(BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO).saturating_sub(self.msgs_sent_since_pong))
410+
}
411+
396412
/// Returns whether this peer's buffer is full and we should drop gossip messages.
397413
fn buffer_full_drop_gossip(&self) -> bool {
398414
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
@@ -817,8 +833,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
817833
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
818834
/// sufficient!
819835
///
836+
/// If any bytes are written, [`process_events`] should be called afterwards.
837+
// TODO: why?
838+
///
820839
/// [`send_data`]: SocketDescriptor::send_data
821840
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
841+
/// [`process_events`]: PeerManager::process_events
822842
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
823843
let peers = self.peers.read().unwrap();
824844
match peers.get(descriptor) {
@@ -1412,6 +1432,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
14121432
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
14131433
/// or one of the other clients provided in our language bindings.
14141434
///
1435+
/// Note that this method should be called again if any bytes are written.
1436+
///
14151437
/// Note that if there are any other calls to this function waiting on lock(s) this may return
14161438
/// without doing any work. All available events that need handling will be handled before the
14171439
/// other calls return.
@@ -1666,6 +1688,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
16661688

16671689
for (descriptor, peer_mutex) in peers.iter() {
16681690
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
1691+
1692+
// Only see if we have room for onion messages after we've written all channel messages, to
1693+
// ensure they take priority.
1694+
let (peer_node_id, om_buffer_slots_avail) = {
1695+
let peer = peer_mutex.lock().unwrap();
1696+
if let Some(peer_node_id) = peer.their_node_id {
1697+
(Some(peer_node_id.clone()), peer.onion_message_buffer_slots_available())
1698+
} else { (None, 0) }
1699+
};
1700+
if peer_node_id.is_some() && om_buffer_slots_avail > 0 {
1701+
for event in self.message_handler.onion_message_handler.next_onion_messages_for_peer(
1702+
peer_node_id.unwrap(), om_buffer_slots_avail)
1703+
{
1704+
if let MessageSendEvent::SendOnionMessage { ref node_id, ref msg } = event {
1705+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
1706+
}
1707+
}
1708+
}
16691709
}
16701710
}
16711711
if !peers_to_disconnect.is_empty() {

0 commit comments

Comments
 (0)