Skip to content

Commit f175e79

Browse files
committed
Remove Runtime object and cleanup runtime handling
So far we instantiated a dedicated `Runtime` object which also held the BP. This was clunky and somewhat error-prone under different circumstances. As we now got rid of the BP object, we here remove the dedicated `Runtime` object and just pass around an `Arc<RwLock<Option<Runtime>>>` upon init, which allows us to get rid of the `set`/`drop` methods on `Wallet` as an added benefit.
1 parent 91a18d0 commit f175e79

File tree

3 files changed

+83
-101
lines changed

3 files changed

+83
-101
lines changed

src/event.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use bitcoin::OutPoint;
2323
use rand::{thread_rng, Rng};
2424
use std::collections::VecDeque;
2525
use std::ops::Deref;
26-
use std::sync::{Arc, Condvar, Mutex};
26+
use std::sync::{Arc, Condvar, Mutex, RwLock};
2727
use std::time::Duration;
2828

2929
/// An event emitted by [`Node`], which should be handled by the user.
@@ -247,7 +247,7 @@ where
247247
network_graph: Arc<NetworkGraph>,
248248
keys_manager: Arc<KeysManager>,
249249
payment_store: Arc<PaymentStore<K, L>>,
250-
tokio_runtime: Arc<tokio::runtime::Runtime>,
250+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
251251
logger: L,
252252
_config: Arc<Config>,
253253
}
@@ -261,7 +261,7 @@ where
261261
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>, event_queue: Arc<EventQueue<K, L>>,
262262
channel_manager: Arc<ChannelManager>, network_graph: Arc<NetworkGraph>,
263263
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
264-
tokio_runtime: Arc<tokio::runtime::Runtime>, logger: L, _config: Arc<Config>,
264+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>, logger: L, _config: Arc<Config>,
265265
) -> Self {
266266
Self {
267267
event_queue,
@@ -271,7 +271,7 @@ where
271271
keys_manager,
272272
payment_store,
273273
logger,
274-
tokio_runtime,
274+
runtime,
275275
_config,
276276
}
277277
}
@@ -531,12 +531,17 @@ where
531531
let forwarding_channel_manager = self.channel_manager.clone();
532532
let min = time_forwardable.as_millis() as u64;
533533

534-
self.tokio_runtime.spawn(async move {
535-
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
536-
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
534+
let runtime_lock = self.runtime.read().unwrap();
535+
debug_assert!(runtime_lock.is_some());
537536

538-
forwarding_channel_manager.process_pending_htlc_forwards();
539-
});
537+
if let Some(runtime) = runtime_lock.as_ref() {
538+
runtime.spawn(async move {
539+
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
540+
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
541+
542+
forwarding_channel_manager.process_pending_htlc_forwards();
543+
});
544+
}
540545
}
541546
LdkEvent::SpendableOutputs { outputs } => {
542547
// TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so.

src/lib.rs

Lines changed: 64 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,13 @@ impl Builder {
347347
EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP)
348348
.with_concurrency(BDK_CLIENT_CONCURRENCY);
349349

350-
let wallet = Arc::new(Wallet::new(blockchain, bdk_wallet, Arc::clone(&logger)));
350+
let runtime = Arc::new(RwLock::new(None));
351+
let wallet = Arc::new(Wallet::new(
352+
blockchain,
353+
bdk_wallet,
354+
Arc::clone(&runtime),
355+
Arc::clone(&logger),
356+
));
351357

352358
let kv_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into()));
353359

@@ -556,10 +562,11 @@ impl Builder {
556562
}
557563
};
558564

559-
let running = RwLock::new(None);
565+
let stop_running = Arc::new(AtomicBool::new(false));
560566

561567
Node {
562-
running,
568+
runtime,
569+
stop_running,
563570
config,
564571
wallet,
565572
tx_sync,
@@ -579,18 +586,12 @@ impl Builder {
579586
}
580587
}
581588

582-
/// Wraps all objects that need to be preserved during the run time of [`Node`]. Will be dropped
583-
/// upon [`Node::stop()`].
584-
struct Runtime {
585-
tokio_runtime: Arc<tokio::runtime::Runtime>,
586-
stop_runtime: Arc<AtomicBool>,
587-
}
588-
589589
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
590590
///
591591
/// Needs to be initialized and instantiated through [`Builder::build`].
592592
pub struct Node {
593-
running: RwLock<Option<Runtime>>,
593+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
594+
stop_running: Arc<AtomicBool>,
594595
config: Arc<Config>,
595596
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>,
596597
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -616,49 +617,15 @@ impl Node {
616617
/// a thread-safe manner.
617618
pub fn start(&self) -> Result<(), Error> {
618619
// Acquire a run lock and hold it until we're setup.
619-
let mut run_lock = self.running.write().unwrap();
620-
if run_lock.is_some() {
620+
let mut runtime_lock = self.runtime.write().unwrap();
621+
if runtime_lock.is_some() {
621622
// We're already running.
622623
return Err(Error::AlreadyRunning);
623624
}
624625

625-
let runtime = self.setup_runtime()?;
626-
*run_lock = Some(runtime);
627-
Ok(())
628-
}
626+
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
629627

630-
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
631-
///
632-
/// After this returns most API methods will return [`Error::NotRunning`].
633-
pub fn stop(&self) -> Result<(), Error> {
634-
let mut run_lock = self.running.write().unwrap();
635-
if run_lock.is_none() {
636-
return Err(Error::NotRunning);
637-
}
638-
639-
let runtime = run_lock.as_ref().unwrap();
640-
641-
// Stop the runtime.
642-
runtime.stop_runtime.store(true, Ordering::Release);
643-
644-
// Stop disconnect peers.
645-
self.peer_manager.disconnect_all_peers();
646-
647-
// Drop the held runtimes.
648-
self.wallet.drop_runtime();
649-
650-
// Drop the runtime, which stops the background processor and any possibly remaining tokio threads.
651-
*run_lock = None;
652-
Ok(())
653-
}
654-
655-
fn setup_runtime(&self) -> Result<Runtime, Error> {
656-
let tokio_runtime =
657-
Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());
658-
659-
self.wallet.set_runtime(Arc::clone(&tokio_runtime));
660-
661-
let stop_runtime = Arc::new(AtomicBool::new(false));
628+
let stop_running = Arc::new(AtomicBool::new(false));
662629

663630
let event_handler = Arc::new(EventHandler::new(
664631
Arc::clone(&self.wallet),
@@ -667,7 +634,7 @@ impl Node {
667634
Arc::clone(&self.network_graph),
668635
Arc::clone(&self.keys_manager),
669636
Arc::clone(&self.payment_store),
670-
Arc::clone(&tokio_runtime),
637+
Arc::clone(&self.runtime),
671638
Arc::clone(&self.logger),
672639
Arc::clone(&self.config),
673640
));
@@ -678,7 +645,7 @@ impl Node {
678645
let sync_cman = Arc::clone(&self.channel_manager);
679646
let sync_cmon = Arc::clone(&self.chain_monitor);
680647
let sync_logger = Arc::clone(&self.logger);
681-
let stop_sync = Arc::clone(&stop_runtime);
648+
let stop_sync = Arc::clone(&stop_running);
682649

683650
std::thread::spawn(move || {
684651
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
@@ -709,8 +676,8 @@ impl Node {
709676
});
710677

711678
let sync_logger = Arc::clone(&self.logger);
712-
let stop_sync = Arc::clone(&stop_runtime);
713-
tokio_runtime.spawn(async move {
679+
let stop_sync = Arc::clone(&stop_running);
680+
runtime.spawn(async move {
714681
loop {
715682
if stop_sync.load(Ordering::Acquire) {
716683
return;
@@ -737,10 +704,10 @@ impl Node {
737704
if let Some(listening_address) = &self.config.listening_address {
738705
// Setup networking
739706
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
740-
let stop_listen = Arc::clone(&stop_runtime);
707+
let stop_listen = Arc::clone(&stop_running);
741708
let listening_address = listening_address.clone();
742709

743-
tokio_runtime.spawn(async move {
710+
runtime.spawn(async move {
744711
let listener =
745712
tokio::net::TcpListener::bind(listening_address).await.expect(
746713
"Failed to bind to listen address/port - is something else already listening on it?",
@@ -767,8 +734,8 @@ impl Node {
767734
let connect_pm = Arc::clone(&self.peer_manager);
768735
let connect_logger = Arc::clone(&self.logger);
769736
let connect_peer_store = Arc::clone(&self.peer_store);
770-
let stop_connect = Arc::clone(&stop_runtime);
771-
tokio_runtime.spawn(async move {
737+
let stop_connect = Arc::clone(&stop_running);
738+
runtime.spawn(async move {
772739
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
773740
loop {
774741
if stop_connect.load(Ordering::Acquire) {
@@ -808,7 +775,7 @@ impl Node {
808775
let background_peer_man = Arc::clone(&self.peer_manager);
809776
let background_logger = Arc::clone(&self.logger);
810777
let background_scorer = Arc::clone(&self.scorer);
811-
let stop_background_processing = Arc::clone(&stop_runtime);
778+
let stop_background_processing = Arc::clone(&stop_running);
812779
let sleeper = move |d| {
813780
let stop = Arc::clone(&stop_background_processing);
814781
Box::pin(async move {
@@ -821,7 +788,7 @@ impl Node {
821788
})
822789
};
823790

824-
tokio_runtime.spawn(async move {
791+
runtime.spawn(async move {
825792
process_events_async(
826793
background_persister,
827794
|e| background_event_handler.handle_event(e),
@@ -838,7 +805,23 @@ impl Node {
838805
.expect("Failed to process events");
839806
});
840807

841-
Ok(Runtime { tokio_runtime, stop_runtime })
808+
*runtime_lock = Some(runtime);
809+
Ok(())
810+
}
811+
812+
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
813+
///
814+
/// After this returns most API methods will return [`Error::NotRunning`].
815+
pub fn stop(&self) -> Result<(), Error> {
816+
let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?;
817+
// Stop the runtime.
818+
self.stop_running.store(true, Ordering::Release);
819+
820+
// Stop disconnect peers.
821+
self.peer_manager.disconnect_all_peers();
822+
823+
runtime.shutdown_timeout(Duration::from_secs(10));
824+
Ok(())
842825
}
843826

844827
/// Blocks until the next event is available.
@@ -888,12 +871,11 @@ impl Node {
888871
pub fn connect(
889872
&self, node_id: PublicKey, address: SocketAddr, permanently: bool,
890873
) -> Result<(), Error> {
891-
let runtime_lock = self.running.read().unwrap();
892-
if runtime_lock.is_none() {
874+
let rt_lock = self.runtime.read().unwrap();
875+
if rt_lock.is_none() {
893876
return Err(Error::NotRunning);
894877
}
895-
896-
let runtime = runtime_lock.as_ref().unwrap();
878+
let runtime = rt_lock.as_ref().unwrap();
897879

898880
let peer_info = PeerInfo { pubkey: node_id, address };
899881

@@ -905,7 +887,7 @@ impl Node {
905887
let con_pm = Arc::clone(&self.peer_manager);
906888

907889
tokio::task::block_in_place(move || {
908-
runtime.tokio_runtime.block_on(async move {
890+
runtime.block_on(async move {
909891
let res =
910892
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger)
911893
.await;
@@ -931,8 +913,8 @@ impl Node {
931913
/// Will also remove the peer from the peer store, i.e., after this has been called we won't
932914
/// try to reconnect on restart.
933915
pub fn disconnect(&self, counterparty_node_id: &PublicKey) -> Result<(), Error> {
934-
let runtime_lock = self.running.read().unwrap();
935-
if runtime_lock.is_none() {
916+
let rt_lock = self.runtime.read().unwrap();
917+
if rt_lock.is_none() {
936918
return Err(Error::NotRunning);
937919
}
938920

@@ -962,12 +944,11 @@ impl Node {
962944
&self, node_id: PublicKey, address: SocketAddr, channel_amount_sats: u64,
963945
push_to_counterparty_msat: Option<u64>, announce_channel: bool,
964946
) -> Result<(), Error> {
965-
let runtime_lock = self.running.read().unwrap();
966-
if runtime_lock.is_none() {
947+
let rt_lock = self.runtime.read().unwrap();
948+
if rt_lock.is_none() {
967949
return Err(Error::NotRunning);
968950
}
969-
970-
let runtime = runtime_lock.as_ref().unwrap();
951+
let runtime = rt_lock.as_ref().unwrap();
971952

972953
let cur_balance = self.wallet.get_balance()?;
973954
if cur_balance.get_spendable() < channel_amount_sats {
@@ -985,7 +966,7 @@ impl Node {
985966
let con_pm = Arc::clone(&self.peer_manager);
986967

987968
tokio::task::block_in_place(move || {
988-
runtime.tokio_runtime.block_on(async move {
969+
runtime.block_on(async move {
989970
let res =
990971
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger)
991972
.await;
@@ -1040,10 +1021,12 @@ impl Node {
10401021
///
10411022
/// Note that the wallets will be also synced regularly in the background.
10421023
pub fn sync_wallets(&self) -> Result<(), Error> {
1043-
let runtime_lock = self.running.read().unwrap();
1044-
if runtime_lock.is_none() {
1024+
let rt_lock = self.runtime.read().unwrap();
1025+
if rt_lock.is_none() {
10451026
return Err(Error::NotRunning);
10461027
}
1028+
let runtime = rt_lock.as_ref().unwrap();
1029+
10471030
let wallet = Arc::clone(&self.wallet);
10481031
let tx_sync = Arc::clone(&self.tx_sync);
10491032
let sync_cman = Arc::clone(&self.channel_manager);
@@ -1054,7 +1037,6 @@ impl Node {
10541037
&*sync_cmon as &(dyn Confirm + Sync + Send),
10551038
];
10561039

1057-
let runtime = runtime_lock.as_ref().unwrap();
10581040
tokio::task::block_in_place(move || {
10591041
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
10601042
async move {
@@ -1079,7 +1061,7 @@ impl Node {
10791061

10801062
let sync_logger = Arc::clone(&self.logger);
10811063
tokio::task::block_in_place(move || {
1082-
runtime.tokio_runtime.block_on(async move {
1064+
runtime.block_on(async move {
10831065
let now = Instant::now();
10841066
match tx_sync.sync(confirmables).await {
10851067
Ok(()) => {
@@ -1114,7 +1096,8 @@ impl Node {
11141096

11151097
/// Send a payement given an invoice.
11161098
pub fn send_payment(&self, invoice: &Invoice) -> Result<PaymentHash, Error> {
1117-
if self.running.read().unwrap().is_none() {
1099+
let rt_lock = self.runtime.read().unwrap();
1100+
if rt_lock.is_none() {
11181101
return Err(Error::NotRunning);
11191102
}
11201103

@@ -1180,7 +1163,8 @@ impl Node {
11801163
pub fn send_payment_using_amount(
11811164
&self, invoice: &Invoice, amount_msat: u64,
11821165
) -> Result<PaymentHash, Error> {
1183-
if self.running.read().unwrap().is_none() {
1166+
let rt_lock = self.runtime.read().unwrap();
1167+
if rt_lock.is_none() {
11841168
return Err(Error::NotRunning);
11851169
}
11861170

@@ -1268,7 +1252,8 @@ impl Node {
12681252
pub fn send_spontaneous_payment(
12691253
&self, amount_msat: u64, node_id: &PublicKey,
12701254
) -> Result<PaymentHash, Error> {
1271-
if self.running.read().unwrap().is_none() {
1255+
let rt_lock = self.runtime.read().unwrap();
1256+
if rt_lock.is_none() {
12721257
return Err(Error::NotRunning);
12731258
}
12741259

0 commit comments

Comments
 (0)