Skip to content

Fix channel manager race panic #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 38 additions & 41 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient};
use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus};
use lightning::chain::{BestBlock, Filter, Watch};
use lightning::chain::{Filter, Watch};
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{
ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager,
Expand Down Expand Up @@ -458,7 +458,12 @@ async fn start_ldk() {
// Step 7: Read ChannelMonitor state from disk
let mut channelmonitors = persister.read_channelmonitors(keys_manager.clone()).unwrap();

// Step 8: Initialize the ChannelManager
// Step 8: Poll for the best chain tip, which may be used by the channel manager & spv client
let polled_chain_tip = init::validate_best_block_header(bitcoind_client.as_ref())
.await
.expect("Failed to fetch best block header and best block");

// Step 9: Initialize the ChannelManager
let mut user_config = UserConfig::default();
user_config.channel_handshake_limits.force_announced_channel_preference = false;
let mut restarting_node = true;
Expand All @@ -481,15 +486,11 @@ async fn start_ldk() {
} else {
// We're starting a fresh node.
restarting_node = false;
let getinfo_resp = bitcoind_client.get_blockchain_info().await;

let chain_params = ChainParameters {
network: args.network,
best_block: BestBlock::new(
getinfo_resp.latest_blockhash,
getinfo_resp.latest_height as u32,
),
};

let polled_best_block = polled_chain_tip.to_best_block();
let polled_best_block_hash = polled_best_block.block_hash();
let chain_params =
ChainParameters { network: args.network, best_block: polled_best_block };
let fresh_channel_manager = channelmanager::ChannelManager::new(
fee_estimator.clone(),
chain_monitor.clone(),
Expand All @@ -499,15 +500,14 @@ async fn start_ldk() {
user_config,
chain_params,
);
(getinfo_resp.latest_blockhash, fresh_channel_manager)
(polled_best_block_hash, fresh_channel_manager)
}
};

// Step 9: Sync ChannelMonitors and ChannelManager to chain tip
// Step 10: Sync ChannelMonitors and ChannelManager to chain tip
let mut chain_listener_channel_monitors = Vec::new();
let mut cache = UnboundedCache::new();
let mut chain_tip: Option<poll::ValidatedBlockHeader> = None;
if restarting_node {
let chain_tip = if restarting_node {
let mut chain_listeners = vec![(
channel_manager_blockhash,
&channel_manager as &(dyn chain::Listen + Send + Sync),
Expand All @@ -528,19 +528,20 @@ async fn start_ldk() {
&monitor_listener_info.1 as &(dyn chain::Listen + Send + Sync),
));
}
chain_tip = Some(
init::synchronize_listeners(
bitcoind_client.as_ref(),
args.network,
&mut cache,
chain_listeners,
)
.await
.unwrap(),
);
}

// Step 10: Give ChannelMonitors to ChainMonitor
init::synchronize_listeners(
bitcoind_client.as_ref(),
args.network,
&mut cache,
chain_listeners,
)
.await
.unwrap()
} else {
polled_chain_tip
};

// Step 11: Give ChannelMonitors to ChainMonitor
for item in chain_listener_channel_monitors.drain(..) {
let channel_monitor = item.1 .0;
let funding_outpoint = item.2;
Expand All @@ -550,7 +551,7 @@ async fn start_ldk() {
);
}

// Step 11: Optional: Initialize the P2PGossipSync
// Step 12: Optional: Initialize the P2PGossipSync
let genesis = genesis_block(args.network).header.block_hash();
let network_graph_path = format!("{}/network_graph", ldk_data_dir.clone());
let network_graph =
Expand All @@ -561,7 +562,7 @@ async fn start_ldk() {
logger.clone(),
));

// Step 12: Initialize the PeerManager
// Step 13: Initialize the PeerManager
let channel_manager: Arc<ChannelManager> = Arc::new(channel_manager);
let onion_messenger: Arc<OnionMessenger> = Arc::new(OnionMessenger::new(
Arc::clone(&keys_manager),
Expand All @@ -586,7 +587,7 @@ async fn start_ldk() {
));

// ## Running LDK
// Step 13: Initialize networking
// Step 14: Initialize networking

let peer_manager_connection_handler = peer_manager.clone();
let listening_port = args.ldk_peer_listening_port;
Expand All @@ -612,26 +613,22 @@ async fn start_ldk() {
}
});

// Step 14: Connect and Disconnect Blocks
if chain_tip.is_none() {
chain_tip = Some(init::validate_best_block_header(bitcoind_client.as_ref()).await.unwrap());
}
// Step 15: Connect and Disconnect Blocks
let channel_manager_listener = channel_manager.clone();
let chain_monitor_listener = chain_monitor.clone();
let bitcoind_block_source = bitcoind_client.clone();
let network = args.network;
tokio::spawn(async move {
let chain_poller = poll::ChainPoller::new(bitcoind_block_source.as_ref(), network);
let chain_listener = (chain_monitor_listener, channel_manager_listener);
let mut spv_client =
SpvClient::new(chain_tip.unwrap(), chain_poller, &mut cache, &chain_listener);
let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
loop {
spv_client.poll_best_tip().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
});

// Step 15: Handle LDK Events
// Step 16: Handle LDK Events
let channel_manager_event_listener = channel_manager.clone();
let keys_manager_listener = keys_manager.clone();
// TODO: persist payment info to disk
Expand All @@ -656,15 +653,15 @@ async fn start_ldk() {
));
};

// Step 16: Initialize routing ProbabilisticScorer
// Step 17: Initialize routing ProbabilisticScorer
let scorer_path = format!("{}/scorer", ldk_data_dir.clone());
let scorer = Arc::new(Mutex::new(disk::read_scorer(
Path::new(&scorer_path),
Arc::clone(&network_graph),
Arc::clone(&logger),
)));

// Step 17: Create InvoicePayer
// Step 18: Create InvoicePayer
let router = DefaultRouter::new(
network_graph.clone(),
logger.clone(),
Expand All @@ -679,10 +676,10 @@ async fn start_ldk() {
payment::Retry::Timeout(Duration::from_secs(10)),
));

// Step 18: Persist ChannelManager and NetworkGraph
// Step 19: Persist ChannelManager and NetworkGraph
let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone()));

// Step 19: Background Processing
// Step 20: Background Processing
let background_processor = BackgroundProcessor::start(
persister,
invoice_payer.clone(),
Expand Down