@@ -80,6 +80,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
80
80
81
81
use lightning:: ln:: peer_handler;
82
82
use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
83
+ use lightning:: ln:: peer_handler:: CustomMessageHandler ;
83
84
use lightning:: ln:: msgs:: { ChannelMessageHandler , RoutingMessageHandler } ;
84
85
use lightning:: util:: logger:: Logger ;
85
86
@@ -119,10 +120,11 @@ struct Connection {
119
120
id : u64 ,
120
121
}
121
122
impl Connection {
122
- async fn schedule_read < CMH , RMH , L > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > > > , us : Arc < Mutex < Self > > , mut reader : io:: ReadHalf < TcpStream > , mut read_wake_receiver : mpsc:: Receiver < ( ) > , mut write_avail_receiver : mpsc:: Receiver < ( ) > ) where
123
+ async fn schedule_read < CMH , RMH , L , UMH > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > , Arc < UMH > > > , us : Arc < Mutex < Self > > , mut reader : io:: ReadHalf < TcpStream > , mut read_wake_receiver : mpsc:: Receiver < ( ) > , mut write_avail_receiver : mpsc:: Receiver < ( ) > ) where
123
124
CMH : ChannelMessageHandler + ' static ,
124
125
RMH : RoutingMessageHandler + ' static ,
125
- L : Logger + ' static + ?Sized {
126
+ L : Logger + ' static + ?Sized ,
127
+ UMH : CustomMessageHandler + ' static {
126
128
// 8KB is nice and big but also should never cause any issues with stack overflowing.
127
129
let mut buf = [ 0 ; 8192 ] ;
128
130
@@ -215,10 +217,11 @@ impl Connection {
215
217
/// The returned future will complete when the peer is disconnected and associated handling
216
218
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
217
219
/// not need to poll the provided future in order to make progress.
218
- pub fn setup_inbound < CMH , RMH , L > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > > > , stream : StdTcpStream ) -> impl std:: future:: Future < Output =( ) > where
220
+ pub fn setup_inbound < CMH , RMH , L , UMH > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > , Arc < UMH > > > , stream : StdTcpStream ) -> impl std:: future:: Future < Output =( ) > where
219
221
CMH : ChannelMessageHandler + ' static + Send + Sync ,
220
222
RMH : RoutingMessageHandler + ' static + Send + Sync ,
221
- L : Logger + ' static + ?Sized + Send + Sync {
223
+ L : Logger + ' static + ?Sized + Send + Sync ,
224
+ UMH : CustomMessageHandler + ' static + Send + Sync {
222
225
let ( reader, write_receiver, read_receiver, us) = Connection :: new ( stream) ;
223
226
#[ cfg( debug_assertions) ]
224
227
let last_us = Arc :: clone ( & us) ;
@@ -255,10 +258,11 @@ pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<So
255
258
/// The returned future will complete when the peer is disconnected and associated handling
256
259
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
257
260
/// not need to poll the provided future in order to make progress.
258
- pub fn setup_outbound < CMH , RMH , L > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > > > , their_node_id : PublicKey , stream : StdTcpStream ) -> impl std:: future:: Future < Output =( ) > where
261
+ pub fn setup_outbound < CMH , RMH , L , UMH > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > , Arc < UMH > > > , their_node_id : PublicKey , stream : StdTcpStream ) -> impl std:: future:: Future < Output =( ) > where
259
262
CMH : ChannelMessageHandler + ' static + Send + Sync ,
260
263
RMH : RoutingMessageHandler + ' static + Send + Sync ,
261
- L : Logger + ' static + ?Sized + Send + Sync {
264
+ L : Logger + ' static + ?Sized + Send + Sync ,
265
+ UMH : CustomMessageHandler + ' static + Send + Sync {
262
266
let ( reader, mut write_receiver, read_receiver, us) = Connection :: new ( stream) ;
263
267
#[ cfg( debug_assertions) ]
264
268
let last_us = Arc :: clone ( & us) ;
@@ -325,10 +329,11 @@ pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<S
325
329
/// disconnected and associated handling futures are freed, though, because all processing in said
326
330
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
327
331
/// make progress.
328
- pub async fn connect_outbound < CMH , RMH , L > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > > > , their_node_id : PublicKey , addr : SocketAddr ) -> Option < impl std:: future:: Future < Output =( ) > > where
332
+ pub async fn connect_outbound < CMH , RMH , L , UMH > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > , Arc < RMH > , Arc < L > , Arc < UMH > > > , their_node_id : PublicKey , addr : SocketAddr ) -> Option < impl std:: future:: Future < Output =( ) > > where
329
333
CMH : ChannelMessageHandler + ' static + Send + Sync ,
330
334
RMH : RoutingMessageHandler + ' static + Send + Sync ,
331
- L : Logger + ' static + ?Sized + Send + Sync {
335
+ L : Logger + ' static + ?Sized + Send + Sync ,
336
+ UMH : CustomMessageHandler + ' static + Send + Sync {
332
337
if let Ok ( Ok ( stream) ) = time:: timeout ( Duration :: from_secs ( 10 ) , async { TcpStream :: connect ( & addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ) . await {
333
338
Some ( setup_outbound ( peer_manager, their_node_id, stream) )
334
339
} else { None }
@@ -556,7 +561,7 @@ mod tests {
556
561
let a_manager = Arc :: new ( PeerManager :: new ( MessageHandler {
557
562
chan_handler : Arc :: clone ( & a_handler) ,
558
563
route_handler : Arc :: clone ( & a_handler) ,
559
- } , a_key. clone ( ) , & [ 1 ; 32 ] , Arc :: new ( TestLogger ( ) ) ) ) ;
564
+ } , a_key. clone ( ) , & [ 1 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( lightning :: ln :: peer_handler :: IgnoringCustomMessageHandler { } ) ) ) ;
560
565
561
566
let ( b_connected_sender, mut b_connected) = mpsc:: channel ( 1 ) ;
562
567
let ( b_disconnected_sender, mut b_disconnected) = mpsc:: channel ( 1 ) ;
@@ -570,7 +575,7 @@ mod tests {
570
575
let b_manager = Arc :: new ( PeerManager :: new ( MessageHandler {
571
576
chan_handler : Arc :: clone ( & b_handler) ,
572
577
route_handler : Arc :: clone ( & b_handler) ,
573
- } , b_key. clone ( ) , & [ 2 ; 32 ] , Arc :: new ( TestLogger ( ) ) ) ) ;
578
+ } , b_key. clone ( ) , & [ 2 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( lightning :: ln :: peer_handler :: IgnoringCustomMessageHandler { } ) ) ) ;
574
579
575
580
// We bind on localhost, hoping the environment is properly configured with a local
576
581
// address. This may not always be the case in containers and the like, so if this test is
0 commit comments