Skip to content

Commit 91a18d0

Browse files
committed
Switch to a future-based background processor
We switch to an async BP as upstream now allows us to.
1 parent 09825ac commit 91a18d0

File tree

3 files changed

+52
-36
lines changed

3 files changed

+52
-36
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ lightning = { version = "0.0.115", features = ["max_level_trace", "std"] }
1313
lightning-invoice = { version = "0.23" }
1414
lightning-net-tokio = { version = "0.0.115" }
1515
lightning-persister = { version = "0.0.115" }
16-
lightning-background-processor = { version = "0.0.115" }
16+
lightning-background-processor = { version = "0.0.115", features = ["futures"] }
1717
lightning-rapid-gossip-sync = { version = "0.0.115" }
1818
lightning-transaction-sync = { version = "0.0.115", features = ["esplora-async-https"] }
1919

@@ -29,7 +29,7 @@ lightning-transaction-sync = { version = "0.0.115", features = ["esplora-async-h
2929
#lightning-invoice = { path = "../rust-lightning/lightning-invoice" }
3030
#lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" }
3131
#lightning-persister = { path = "../rust-lightning/lightning-persister" }
32-
#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" }
32+
#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor", features = ["futures"] }
3333
#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" }
3434
#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] }
3535

src/event.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::logger::{log_error, log_info, Logger};
1111

1212
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
1313
use lightning::events::Event as LdkEvent;
14-
use lightning::events::EventHandler as LdkEventHandler;
1514
use lightning::events::PaymentPurpose;
1615
use lightning::impl_writeable_tlv_based_enum;
1716
use lightning::ln::PaymentHash;
@@ -276,14 +275,8 @@ where
276275
_config,
277276
}
278277
}
279-
}
280278

281-
impl<K: Deref + Clone, L: Deref> LdkEventHandler for EventHandler<K, L>
282-
where
283-
K::Target: KVStore,
284-
L::Target: Logger,
285-
{
286-
fn handle_event(&self, event: LdkEvent) {
279+
pub async fn handle_event(&self, event: LdkEvent) {
287280
match event {
288281
LdkEvent::FundingGenerationReady {
289282
temporary_channel_id,

src/lib.rs

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ use lightning::routing::utxo::UtxoLookup;
117117
use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig};
118118
use lightning::util::ser::ReadableArgs;
119119

120-
use lightning_background_processor::BackgroundProcessor;
120+
use lightning_background_processor::process_events_async;
121121
use lightning_background_processor::GossipSync as BPGossipSync;
122122

123123
use lightning_transaction_sync::EsploraSyncClient;
@@ -583,9 +583,7 @@ impl Builder {
583583
/// upon [`Node::stop()`].
584584
struct Runtime {
585585
tokio_runtime: Arc<tokio::runtime::Runtime>,
586-
_background_processor: BackgroundProcessor,
587-
stop_networking: Arc<AtomicBool>,
588-
stop_wallet_sync: Arc<AtomicBool>,
586+
stop_runtime: Arc<AtomicBool>,
589587
}
590588

591589
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
@@ -640,11 +638,10 @@ impl Node {
640638

641639
let runtime = run_lock.as_ref().unwrap();
642640

643-
// Stop wallet sync
644-
runtime.stop_wallet_sync.store(true, Ordering::Release);
641+
// Stop the runtime.
642+
runtime.stop_runtime.store(true, Ordering::Release);
645643

646-
// Stop networking
647-
runtime.stop_networking.store(true, Ordering::Release);
644+
// Stop disconnect peers.
648645
self.peer_manager.disconnect_all_peers();
649646

650647
// Drop the held runtimes.
@@ -661,6 +658,8 @@ impl Node {
661658

662659
self.wallet.set_runtime(Arc::clone(&tokio_runtime));
663660

661+
let stop_runtime = Arc::new(AtomicBool::new(false));
662+
664663
let event_handler = Arc::new(EventHandler::new(
665664
Arc::clone(&self.wallet),
666665
Arc::clone(&self.event_queue),
@@ -679,8 +678,7 @@ impl Node {
679678
let sync_cman = Arc::clone(&self.channel_manager);
680679
let sync_cmon = Arc::clone(&self.chain_monitor);
681680
let sync_logger = Arc::clone(&self.logger);
682-
let stop_wallet_sync = Arc::new(AtomicBool::new(false));
683-
let stop_sync = Arc::clone(&stop_wallet_sync);
681+
let stop_sync = Arc::clone(&stop_runtime);
684682

685683
std::thread::spawn(move || {
686684
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
@@ -711,7 +709,7 @@ impl Node {
711709
});
712710

713711
let sync_logger = Arc::clone(&self.logger);
714-
let stop_sync = Arc::clone(&stop_wallet_sync);
712+
let stop_sync = Arc::clone(&stop_runtime);
715713
tokio_runtime.spawn(async move {
716714
loop {
717715
if stop_sync.load(Ordering::Acquire) {
@@ -736,11 +734,10 @@ impl Node {
736734
}
737735
});
738736

739-
let stop_networking = Arc::new(AtomicBool::new(false));
740737
if let Some(listening_address) = &self.config.listening_address {
741738
// Setup networking
742739
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
743-
let stop_listen = Arc::clone(&stop_networking);
740+
let stop_listen = Arc::clone(&stop_runtime);
744741
let listening_address = listening_address.clone();
745742

746743
tokio_runtime.spawn(async move {
@@ -770,7 +767,7 @@ impl Node {
770767
let connect_pm = Arc::clone(&self.peer_manager);
771768
let connect_logger = Arc::clone(&self.logger);
772769
let connect_peer_store = Arc::clone(&self.peer_store);
773-
let stop_connect = Arc::clone(&stop_networking);
770+
let stop_connect = Arc::clone(&stop_runtime);
774771
tokio_runtime.spawn(async move {
775772
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
776773
loop {
@@ -803,19 +800,45 @@ impl Node {
803800
});
804801

805802
// Setup background processing
806-
let _background_processor = BackgroundProcessor::start(
807-
Arc::clone(&self.kv_store),
808-
Arc::clone(&event_handler),
809-
Arc::clone(&self.chain_monitor),
810-
Arc::clone(&self.channel_manager),
811-
BPGossipSync::p2p(Arc::clone(&self.gossip_sync)),
812-
Arc::clone(&self.peer_manager),
813-
Arc::clone(&self.logger),
814-
Some(Arc::clone(&self.scorer)),
815-
);
803+
let background_persister = Arc::clone(&self.kv_store);
804+
let background_event_handler = Arc::clone(&event_handler);
805+
let background_chain_mon = Arc::clone(&self.chain_monitor);
806+
let background_chan_man = Arc::clone(&self.channel_manager);
807+
let background_gossip_sync = BPGossipSync::p2p(Arc::clone(&self.gossip_sync));
808+
let background_peer_man = Arc::clone(&self.peer_manager);
809+
let background_logger = Arc::clone(&self.logger);
810+
let background_scorer = Arc::clone(&self.scorer);
811+
let stop_background_processing = Arc::clone(&stop_runtime);
812+
let sleeper = move |d| {
813+
let stop = Arc::clone(&stop_background_processing);
814+
Box::pin(async move {
815+
if stop.load(Ordering::Acquire) {
816+
true
817+
} else {
818+
tokio::time::sleep(d).await;
819+
false
820+
}
821+
})
822+
};
823+
824+
tokio_runtime.spawn(async move {
825+
process_events_async(
826+
background_persister,
827+
|e| background_event_handler.handle_event(e),
828+
background_chain_mon,
829+
background_chan_man,
830+
background_gossip_sync,
831+
background_peer_man,
832+
background_logger,
833+
Some(background_scorer),
834+
sleeper,
835+
true,
836+
)
837+
.await
838+
.expect("Failed to process events");
839+
});
816840

817-
// TODO: frequently check back on background_processor if there was an error
818-
Ok(Runtime { tokio_runtime, _background_processor, stop_networking, stop_wallet_sync })
841+
Ok(Runtime { tokio_runtime, stop_runtime })
819842
}
820843

821844
/// Blocks until the next event is available.

0 commit comments

Comments
 (0)