Skip to content

Commit 18dc7f2

Browse files
committed
Avoid a tokio::mpsc::Sender clone for each P2P send operation
Whenever we go to send bytes to a peer, we need to construct a waker for tokio to call back into if we need to finish sending later. That waker needs some reference to the peer's read task to wake it up, hidden behind a single `*const ()`. To do this, we'd previously simply stored a `Box<tokio::mpsc::Sender>` in that pointer, which requires a `clone` for each waker construction. This leads to substantial malloc traffic. Instead, here, we replace this box with an `Arc`, leaving a single `tokio::mpsc::Sender` floating around and simply change the refcounts whenever we construct a new waker, which we can do without allocations.
1 parent 969085b commit 18dc7f2

File tree

1 file changed

+22
-10
lines changed

1 file changed

+22
-10
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,11 @@ const SOCK_WAKER_VTABLE: task::RawWakerVTable =
422422
task::RawWakerVTable::new(clone_socket_waker, wake_socket_waker, wake_socket_waker_by_ref, drop_socket_waker);
423423

424424
fn clone_socket_waker(orig_ptr: *const ()) -> task::RawWaker {
425-
write_avail_to_waker(orig_ptr as *const mpsc::Sender<()>)
425+
let new_waker = unsafe { Arc::from_raw(orig_ptr as *const mpsc::Sender<()>) };
426+
let res = write_avail_to_waker(&new_waker);
427+
// Don't decrement the refcount when dropping new_waker by turning it back `into_raw`.
428+
let _ = Arc::into_raw(new_waker);
429+
res
426430
}
427431
// When waking, an error should be fine. Most likely we got two send_datas in a row, both of which
428432
// failed to fully write, but we only need to call write_buffer_space_avail() once. Otherwise, the
@@ -435,29 +439,36 @@ fn wake_socket_waker(orig_ptr: *const ()) {
435439
}
436440
fn wake_socket_waker_by_ref(orig_ptr: *const ()) {
437441
let sender_ptr = orig_ptr as *const mpsc::Sender<()>;
438-
let sender = unsafe { (*sender_ptr).clone() };
442+
let sender = unsafe { &*sender_ptr };
439443
let _ = sender.try_send(());
440444
}
441445
fn drop_socket_waker(orig_ptr: *const ()) {
442-
let _orig_box = unsafe { Box::from_raw(orig_ptr as *mut mpsc::Sender<()>) };
443-
// _orig_box is now dropped
446+
let _orig_arc = unsafe { Arc::from_raw(orig_ptr as *mut mpsc::Sender<()>) };
447+
// _orig_arc is now dropped
444448
}
445-
fn write_avail_to_waker(sender: *const mpsc::Sender<()>) -> task::RawWaker {
446-
let new_box = Box::leak(Box::new(unsafe { (*sender).clone() }));
447-
let new_ptr = new_box as *const mpsc::Sender<()>;
449+
fn write_avail_to_waker(sender: &Arc<mpsc::Sender<()>>) -> task::RawWaker {
450+
let new_ptr = Arc::into_raw(Arc::clone(&sender));
448451
task::RawWaker::new(new_ptr as *const (), &SOCK_WAKER_VTABLE)
449452
}
450453

451454
/// The SocketDescriptor used to refer to sockets by a PeerHandler. This is pub only as it is a
452455
/// type in the template of PeerHandler.
453456
pub struct SocketDescriptor {
454457
conn: Arc<Mutex<Connection>>,
458+
// We store a copy of the mpsc::Sender to wake the read task in an Arc here. While we can
459+
// simply clone the sender and store a copy in each waker, that would require allocating for
460+
// each waker. Instead, we can simply `Arc::clone`, creating a new reference and store the
461+
// pointer in the waker.
462+
write_avail_sender: Arc<mpsc::Sender<()>>,
455463
id: u64,
456464
}
457465
impl SocketDescriptor {
458466
fn new(conn: Arc<Mutex<Connection>>) -> Self {
459-
let id = conn.lock().unwrap().id;
460-
Self { conn, id }
467+
let (id, write_avail_sender) = {
468+
let us = conn.lock().unwrap();
469+
(us.id, Arc::new(us.write_avail.clone()))
470+
};
471+
Self { conn, id, write_avail_sender }
461472
}
462473
}
463474
impl peer_handler::SocketDescriptor for SocketDescriptor {
@@ -480,7 +491,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
480491
let _ = us.read_waker.try_send(());
481492
}
482493
if data.is_empty() { return 0; }
483-
let waker = unsafe { task::Waker::from_raw(write_avail_to_waker(&us.write_avail)) };
494+
let waker = unsafe { task::Waker::from_raw(write_avail_to_waker(&self.write_avail_sender)) };
484495
let mut ctx = task::Context::from_waker(&waker);
485496
let mut written_len = 0;
486497
loop {
@@ -522,6 +533,7 @@ impl Clone for SocketDescriptor {
522533
Self {
523534
conn: Arc::clone(&self.conn),
524535
id: self.id,
536+
write_avail_sender: Arc::clone(&self.write_avail_sender),
525537
}
526538
}
527539
}

0 commit comments

Comments
 (0)