@@ -1363,6 +1363,10 @@ where
1363
1363
1364
1364
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
1365
1365
1366
+ /// Tracks the channel_update message that were not broadcasted because
1367
+ /// we were not connected to any peers.
1368
+ pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
1369
+
1366
1370
entropy_source: ES,
1367
1371
node_signer: NS,
1368
1372
signer_provider: SP,
@@ -2441,6 +2445,7 @@ where
2441
2445
funding_batch_states: Mutex::new(BTreeMap::new()),
2442
2446
2443
2447
pending_offers_messages: Mutex::new(Vec::new()),
2448
+ pending_broadcast_messages: Mutex::new(Vec::new()),
2444
2449
2445
2450
entropy_source,
2446
2451
node_signer,
@@ -2936,15 +2941,30 @@ where
2936
2941
};
2937
2942
if let Some(update) = update_opt {
2938
2943
// Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
2939
- // not try to broadcast it via whatever peer we have.
2944
+ // not try to broadcast it via whatever peer we are connected to.
2945
+ let brodcast_message_evt = events::MessageSendEvent::BroadcastChannelUpdate {
2946
+ msg: update
2947
+ };
2948
+
2940
2949
let per_peer_state = self.per_peer_state.read().unwrap();
2941
- let a_peer_state_opt = per_peer_state.get(peer_node_id)
2942
- .ok_or(per_peer_state.values().next());
2943
- if let Ok(a_peer_state_mutex) = a_peer_state_opt {
2944
- let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
2945
- a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
2946
- msg: update
2947
- });
2950
+
2951
+ // Attempt to get the peer_state_mutex for the peer we force-closed on (counterparty).
2952
+ let peer_state_mutex_opt = per_peer_state.get(peer_node_id);
2953
+
2954
+ match peer_state_mutex_opt {
2955
+ Some(peer_state_mutex) => {
2956
+ let mut peer_state = peer_state_mutex.lock().unwrap();
2957
+ peer_state.pending_msg_events.push(brodcast_message_evt);
2958
+ }
2959
+ None => {
2960
+ // If we could not find the couterparty in our per_peer_state, we poll
2961
+ // the messages together in pending_broadcast_messages, and broadcast
2962
+ // them later.
2963
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
2964
+ pending_broadcast_messages.push(brodcast_message_evt);
2965
+ log_info!(self.logger, "Not able to broadcast channel_update of force-closed channel right now.
2966
+ Will try rebroadcasting later.");
2967
+ }
2948
2968
}
2949
2969
}
2950
2970
@@ -4859,6 +4879,7 @@ where
4859
4879
4860
4880
{
4861
4881
let per_peer_state = self.per_peer_state.read().unwrap();
4882
+
4862
4883
for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
4863
4884
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
4864
4885
let peer_state = &mut *peer_state_lock;
@@ -8056,6 +8077,8 @@ where
8056
8077
pending_events.append(&mut peer_state.pending_msg_events);
8057
8078
}
8058
8079
}
8080
+ let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
8081
+ pending_events.append(&mut broadcast_msgs);
8059
8082
8060
8083
if !pending_events.is_empty() {
8061
8084
events.replace(pending_events);
@@ -8881,6 +8904,16 @@ where
8881
8904
msg: chan.get_channel_reestablish(&&logger),
8882
8905
});
8883
8906
});
8907
+
8908
+ {
8909
+ // Get pending messages to be broadcasted.
8910
+ let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
8911
+
8912
+ // If we have some pending message to broadcast, and we are connected to peers.
8913
+ if broadcast_msgs.len() > 0 {
8914
+ pending_msg_events.append(&mut broadcast_msgs);
8915
+ }
8916
+ }
8884
8917
}
8885
8918
8886
8919
return NotifyOption::SkipPersistHandleEvents;
@@ -10894,6 +10927,8 @@ where
10894
10927
10895
10928
pending_offers_messages: Mutex::new(Vec::new()),
10896
10929
10930
+ pending_broadcast_messages: Mutex::new(Vec::new()),
10931
+
10897
10932
entropy_source: args.entropy_source,
10898
10933
node_signer: args.node_signer,
10899
10934
signer_provider: args.signer_provider,
0 commit comments