Skip to content

Commit a84efca

Browse files
committed
Remove Runtime object and cleanup runtime handling
So far we instantiated a dedicated `Runtime` object which also held the BP. This was clunky and somewhat error-prone under different circumstances. As we now got rid of the BP object, we here remove the dedicated `Runtime` object and just pass around an `Arc<RwLock<Option<Runtime>>>` upon init, which allows us to get rid of the `set`/`drop` methods on `Wallet` as an added benefit.
1 parent 5b0747e commit a84efca

File tree

3 files changed

+77
-94
lines changed

3 files changed

+77
-94
lines changed

src/event.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use bitcoin::secp256k1::Secp256k1;
2222
use rand::{thread_rng, Rng};
2323
use std::collections::VecDeque;
2424
use std::ops::Deref;
25-
use std::sync::{Arc, Condvar, Mutex};
25+
use std::sync::{Arc, Condvar, Mutex, RwLock};
2626
use std::time::Duration;
2727

2828
/// An event emitted by [`Node`], which should be handled by the user.
@@ -226,7 +226,7 @@ where
226226
network_graph: Arc<NetworkGraph>,
227227
keys_manager: Arc<KeysManager>,
228228
payment_store: Arc<PaymentStore<K, L>>,
229-
tokio_runtime: Arc<tokio::runtime::Runtime>,
229+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
230230
logger: L,
231231
_config: Arc<Config>,
232232
}
@@ -240,7 +240,7 @@ where
240240
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>, event_queue: Arc<EventQueue<K, L>>,
241241
channel_manager: Arc<ChannelManager>, network_graph: Arc<NetworkGraph>,
242242
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
243-
tokio_runtime: Arc<tokio::runtime::Runtime>, logger: L, _config: Arc<Config>,
243+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>, logger: L, _config: Arc<Config>,
244244
) -> Self {
245245
Self {
246246
event_queue,
@@ -250,7 +250,7 @@ where
250250
keys_manager,
251251
payment_store,
252252
logger,
253-
tokio_runtime,
253+
runtime,
254254
_config,
255255
}
256256
}
@@ -509,12 +509,17 @@ where
509509
let forwarding_channel_manager = self.channel_manager.clone();
510510
let min = time_forwardable.as_millis() as u64;
511511

512-
self.tokio_runtime.spawn(async move {
513-
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
514-
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
512+
let runtime_lock = self.runtime.read().unwrap();
513+
debug_assert!(runtime_lock.is_some());
515514

516-
forwarding_channel_manager.process_pending_htlc_forwards();
517-
});
515+
if let Some(runtime) = runtime_lock.as_ref() {
516+
runtime.spawn(async move {
517+
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
518+
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
519+
520+
forwarding_channel_manager.process_pending_htlc_forwards();
521+
});
522+
}
518523
}
519524
LdkEvent::SpendableOutputs { outputs } => {
520525
// TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so.

src/lib.rs

Lines changed: 58 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,13 @@ impl Builder {
329329
EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP)
330330
.with_concurrency(BDK_CLIENT_CONCURRENCY);
331331

332-
let wallet = Arc::new(Wallet::new(blockchain, bdk_wallet, Arc::clone(&logger)));
332+
let runtime = Arc::new(RwLock::new(None));
333+
let wallet = Arc::new(Wallet::new(
334+
blockchain,
335+
bdk_wallet,
336+
Arc::clone(&runtime),
337+
Arc::clone(&logger),
338+
));
333339

334340
let kv_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into()));
335341

@@ -538,10 +544,11 @@ impl Builder {
538544
}
539545
};
540546

541-
let running = RwLock::new(None);
547+
let stop_running = Arc::new(AtomicBool::new(false));
542548

543549
Node {
544-
running,
550+
runtime,
551+
stop_running,
545552
config,
546553
wallet,
547554
tx_sync,
@@ -561,18 +568,12 @@ impl Builder {
561568
}
562569
}
563570

564-
/// Wraps all objects that need to be preserved during the run time of [`Node`]. Will be dropped
565-
/// upon [`Node::stop()`].
566-
struct Runtime {
567-
tokio_runtime: Arc<tokio::runtime::Runtime>,
568-
stop_runtime: Arc<AtomicBool>,
569-
}
570-
571571
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
572572
///
573573
/// Needs to be initialized and instantiated through [`Builder::build`].
574574
pub struct Node {
575-
running: RwLock<Option<Runtime>>,
575+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
576+
stop_running: Arc<AtomicBool>,
576577
config: Arc<Config>,
577578
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>,
578579
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -598,49 +599,15 @@ impl Node {
598599
/// a thread-safe manner.
599600
pub fn start(&self) -> Result<(), Error> {
600601
// Acquire a run lock and hold it until we're setup.
601-
let mut run_lock = self.running.write().unwrap();
602-
if run_lock.is_some() {
602+
let mut runtime_lock = self.runtime.write().unwrap();
603+
if runtime_lock.is_some() {
603604
// We're already running.
604605
return Err(Error::AlreadyRunning);
605606
}
606607

607-
let runtime = self.setup_runtime()?;
608-
*run_lock = Some(runtime);
609-
Ok(())
610-
}
611-
612-
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
613-
///
614-
/// After this returns most API methods will return [`Error::NotRunning`].
615-
pub fn stop(&self) -> Result<(), Error> {
616-
let mut run_lock = self.running.write().unwrap();
617-
if run_lock.is_none() {
618-
return Err(Error::NotRunning);
619-
}
620-
621-
let runtime = run_lock.as_ref().unwrap();
622-
623-
// Stop the runtime.
624-
runtime.stop_runtime.store(true, Ordering::Release);
625-
626-
// Stop disconnect peers.
627-
self.peer_manager.disconnect_all_peers();
628-
629-
// Drop the held runtimes.
630-
self.wallet.drop_runtime();
608+
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
631609

632-
// Drop the runtime, which stops the background processor and any possibly remaining tokio threads.
633-
*run_lock = None;
634-
Ok(())
635-
}
636-
637-
fn setup_runtime(&self) -> Result<Runtime, Error> {
638-
let tokio_runtime =
639-
Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());
640-
641-
self.wallet.set_runtime(Arc::clone(&tokio_runtime));
642-
643-
let stop_runtime = Arc::new(AtomicBool::new(false));
610+
let stop_running = Arc::new(AtomicBool::new(false));
644611

645612
let event_handler = Arc::new(EventHandler::new(
646613
Arc::clone(&self.wallet),
@@ -649,7 +616,7 @@ impl Node {
649616
Arc::clone(&self.network_graph),
650617
Arc::clone(&self.keys_manager),
651618
Arc::clone(&self.payment_store),
652-
Arc::clone(&tokio_runtime),
619+
Arc::clone(&self.runtime),
653620
Arc::clone(&self.logger),
654621
Arc::clone(&self.config),
655622
));
@@ -660,7 +627,7 @@ impl Node {
660627
let sync_cman = Arc::clone(&self.channel_manager);
661628
let sync_cmon = Arc::clone(&self.chain_monitor);
662629
let sync_logger = Arc::clone(&self.logger);
663-
let stop_sync = Arc::clone(&stop_runtime);
630+
let stop_sync = Arc::clone(&stop_running);
664631

665632
std::thread::spawn(move || {
666633
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
@@ -691,8 +658,8 @@ impl Node {
691658
});
692659

693660
let sync_logger = Arc::clone(&self.logger);
694-
let stop_sync = Arc::clone(&stop_runtime);
695-
tokio_runtime.spawn(async move {
661+
let stop_sync = Arc::clone(&stop_running);
662+
runtime.spawn(async move {
696663
loop {
697664
if stop_sync.load(Ordering::Acquire) {
698665
return;
@@ -719,10 +686,10 @@ impl Node {
719686
if let Some(listening_address) = &self.config.listening_address {
720687
// Setup networking
721688
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
722-
let stop_listen = Arc::clone(&stop_runtime);
689+
let stop_listen = Arc::clone(&stop_running);
723690
let listening_address = listening_address.clone();
724691

725-
tokio_runtime.spawn(async move {
692+
runtime.spawn(async move {
726693
let listener =
727694
tokio::net::TcpListener::bind(listening_address).await.expect(
728695
"Failed to bind to listen address/port - is something else already listening on it?",
@@ -749,8 +716,8 @@ impl Node {
749716
let connect_pm = Arc::clone(&self.peer_manager);
750717
let connect_logger = Arc::clone(&self.logger);
751718
let connect_peer_store = Arc::clone(&self.peer_store);
752-
let stop_connect = Arc::clone(&stop_runtime);
753-
tokio_runtime.spawn(async move {
719+
let stop_connect = Arc::clone(&stop_running);
720+
runtime.spawn(async move {
754721
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
755722
loop {
756723
if stop_connect.load(Ordering::Acquire) {
@@ -790,7 +757,7 @@ impl Node {
790757
let background_peer_man = Arc::clone(&self.peer_manager);
791758
let background_logger = Arc::clone(&self.logger);
792759
let background_scorer = Arc::clone(&self.scorer);
793-
let stop_background_processing = Arc::clone(&stop_runtime);
760+
let stop_background_processing = Arc::clone(&stop_running);
794761
let sleeper = move |d| {
795762
let stop = Arc::clone(&stop_background_processing);
796763
Box::pin(async move {
@@ -803,7 +770,7 @@ impl Node {
803770
})
804771
};
805772

806-
tokio_runtime.spawn(async move {
773+
runtime.spawn(async move {
807774
process_events_async(
808775
background_persister,
809776
|e| background_event_handler.handle_event(e),
@@ -820,7 +787,23 @@ impl Node {
820787
.expect("Failed to process events");
821788
});
822789

823-
Ok(Runtime { tokio_runtime, stop_runtime })
790+
*runtime_lock = Some(runtime);
791+
Ok(())
792+
}
793+
794+
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
795+
///
796+
/// After this returns most API methods will return [`Error::NotRunning`].
797+
pub fn stop(&self) -> Result<(), Error> {
798+
let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?;
799+
// Stop the runtime.
800+
self.stop_running.store(true, Ordering::Release);
801+
runtime.shutdown_timeout(Duration::from_secs(10));
802+
803+
// Stop disconnect peers.
804+
self.peer_manager.disconnect_all_peers();
805+
806+
Ok(())
824807
}
825808

826809
/// Blocks until the next event is available.
@@ -870,12 +853,11 @@ impl Node {
870853
pub fn connect_open_channel(
871854
&self, node_pubkey_and_address: &str, channel_amount_sats: u64, announce_channel: bool,
872855
) -> Result<(), Error> {
873-
let runtime_lock = self.running.read().unwrap();
874-
if runtime_lock.is_none() {
856+
let rt_lock = self.runtime.read().unwrap();
857+
if rt_lock.is_none() {
875858
return Err(Error::NotRunning);
876859
}
877-
878-
let runtime = runtime_lock.as_ref().unwrap();
860+
let runtime = rt_lock.as_ref().unwrap();
879861

880862
let cur_balance = self.wallet.get_balance()?;
881863
if cur_balance.get_spendable() < channel_amount_sats {
@@ -893,7 +875,7 @@ impl Node {
893875
let con_pm = Arc::clone(&self.peer_manager);
894876

895877
tokio::task::block_in_place(move || {
896-
runtime.tokio_runtime.block_on(async move {
878+
runtime.block_on(async move {
897879
let res =
898880
connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger)
899881
.await;
@@ -947,10 +929,12 @@ impl Node {
947929
///
948930
/// Note that the wallets will be also synced regularly in the background.
949931
pub fn sync_wallets(&self) -> Result<(), Error> {
950-
let runtime_lock = self.running.read().unwrap();
951-
if runtime_lock.is_none() {
932+
let rt_lock = self.runtime.read().unwrap();
933+
if rt_lock.is_none() {
952934
return Err(Error::NotRunning);
953935
}
936+
let runtime = rt_lock.as_ref().unwrap();
937+
954938
let wallet = Arc::clone(&self.wallet);
955939
let tx_sync = Arc::clone(&self.tx_sync);
956940
let sync_cman = Arc::clone(&self.channel_manager);
@@ -961,7 +945,6 @@ impl Node {
961945
&*sync_cmon as &(dyn Confirm + Sync + Send),
962946
];
963947

964-
let runtime = runtime_lock.as_ref().unwrap();
965948
tokio::task::block_in_place(move || {
966949
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
967950
async move {
@@ -986,7 +969,7 @@ impl Node {
986969

987970
let sync_logger = Arc::clone(&self.logger);
988971
tokio::task::block_in_place(move || {
989-
runtime.tokio_runtime.block_on(async move {
972+
runtime.block_on(async move {
990973
let now = Instant::now();
991974
match tx_sync.sync(confirmables).await {
992975
Ok(()) => {
@@ -1021,7 +1004,8 @@ impl Node {
10211004

10221005
/// Send a payement given an invoice.
10231006
pub fn send_payment(&self, invoice: Invoice) -> Result<PaymentHash, Error> {
1024-
if self.running.read().unwrap().is_none() {
1007+
let rt_lock = self.runtime.read().unwrap();
1008+
if rt_lock.is_none() {
10251009
return Err(Error::NotRunning);
10261010
}
10271011

@@ -1087,7 +1071,8 @@ impl Node {
10871071
pub fn send_payment_using_amount(
10881072
&self, invoice: Invoice, amount_msat: u64,
10891073
) -> Result<PaymentHash, Error> {
1090-
if self.running.read().unwrap().is_none() {
1074+
let rt_lock = self.runtime.read().unwrap();
1075+
if rt_lock.is_none() {
10911076
return Err(Error::NotRunning);
10921077
}
10931078

@@ -1175,7 +1160,8 @@ impl Node {
11751160
pub fn send_spontaneous_payment(
11761161
&self, amount_msat: u64, node_id: &str,
11771162
) -> Result<PaymentHash, Error> {
1178-
if self.running.read().unwrap().is_none() {
1163+
let rt_lock = self.runtime.read().unwrap();
1164+
if rt_lock.is_none() {
11791165
return Err(Error::NotRunning);
11801166
}
11811167

src/wallet.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ where
3838
inner: Mutex<bdk::Wallet<D>>,
3939
// A cache storing the most recently retrieved fee rate estimations.
4040
fee_rate_cache: RwLock<HashMap<ConfirmationTarget, FeeRate>>,
41-
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
41+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
4242
sync_lock: (Mutex<()>, Condvar),
4343
logger: Arc<FilesystemLogger>,
4444
}
@@ -48,13 +48,13 @@ where
4848
D: BatchDatabase,
4949
{
5050
pub(crate) fn new(
51-
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
51+
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>,
52+
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>, logger: Arc<FilesystemLogger>,
5253
) -> Self {
5354
let inner = Mutex::new(wallet);
5455
let fee_rate_cache = RwLock::new(HashMap::new());
55-
let tokio_runtime = RwLock::new(None);
5656
let sync_lock = (Mutex::new(()), Condvar::new());
57-
Self { blockchain, inner, fee_rate_cache, tokio_runtime, sync_lock, logger }
57+
Self { blockchain, inner, fee_rate_cache, runtime, sync_lock, logger }
5858
}
5959

6060
pub(crate) async fn sync(&self) -> Result<(), Error> {
@@ -115,14 +115,6 @@ where
115115
res
116116
}
117117

118-
pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
119-
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
120-
}
121-
122-
pub(crate) fn drop_runtime(&self) {
123-
*self.tokio_runtime.write().unwrap() = None;
124-
}
125-
126118
pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> {
127119
let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap();
128120

@@ -239,7 +231,7 @@ where
239231
D: BatchDatabase,
240232
{
241233
fn broadcast_transaction(&self, tx: &Transaction) {
242-
let locked_runtime = self.tokio_runtime.read().unwrap();
234+
let locked_runtime = self.runtime.read().unwrap();
243235
if locked_runtime.as_ref().is_none() {
244236
log_error!(self.logger, "Failed to broadcast transaction: No runtime.");
245237
return;

0 commit comments

Comments
 (0)