Skip to content

Commit 5b0747e

Browse files
committed
Switch to a future-based background processor
We switch to an async BP as upstream now allows us to.
1 parent 65d14dc commit 5b0747e

File tree

3 files changed

+66
-42
lines changed

3 files changed

+66
-42
lines changed

Cargo.toml

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,30 @@ description = "A ready-to-go node implementation built using LDK."
2525
#lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" }
2626
#lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["esplora-async"] }
2727

28-
lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506", features = ["max_level_trace", "std"] }
29-
lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
30-
lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
31-
lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
32-
lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
33-
lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
34-
lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506", features = ["esplora-async"] }
28+
#lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506", features = ["max_level_trace", "std"] }
29+
#lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
30+
#lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
31+
#lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
32+
#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506", features = ["futures"] }
33+
#lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506" }
34+
#lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev="2ebbe6f30467cd7f0688673f7bfbac297e5c3506", features = ["esplora-async"] }
3535

3636
#lightning = { path = "../rust-lightning/lightning", features = ["max_level_trace", "std"] }
3737
#lightning-invoice = { path = "../rust-lightning/lightning-invoice" }
3838
#lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" }
3939
#lightning-persister = { path = "../rust-lightning/lightning-persister" }
40-
#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" }
40+
#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor", features = ["futures"] }
4141
#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" }
4242
#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] }
4343

44+
lightning = { git = "https://github.com/tnull/rust-lightning", branch="2023-04-fix-async-event-processing", features = ["max_level_trace", "std"] }
45+
lightning-invoice = { git = "https://github.com/tnull/rust-lightning", branch="2023-04-fix-async-event-processing" }
46+
lightning-net-tokio = { git = "https://github.com/tnull/rust-lightning", branch="2023-04-fix-async-event-processing" }
47+
lightning-persister = { git = "https://github.com/tnull/rust-lightning", branch="2023-04-fix-async-event-processing" }
48+
lightning-background-processor = { git = "https://github.com/tnull/rust-lightning", branch="2023-04-fix-async-event-processing", features = ["futures"] }
49+
lightning-rapid-gossip-sync = { git = "https://github.com/tnull/rust-lightning", branch="2023-04-fix-async-event-processing" }
50+
lightning-transaction-sync = { git = "https://github.com/tnull/rust-lightning", branch="2023-04-fix-async-event-processing", features = ["esplora-async"] }
51+
4452
bdk = { version = "0.28.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled"]}
4553
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
4654
rusqlite = { version = "0.28.0", features = ["bundled"] }

src/event.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::logger::{log_error, log_info, Logger};
1111

1212
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
1313
use lightning::events::Event as LdkEvent;
14-
use lightning::events::EventHandler as LdkEventHandler;
1514
use lightning::events::PaymentPurpose;
1615
use lightning::impl_writeable_tlv_based_enum;
1716
use lightning::ln::PaymentHash;
@@ -255,14 +254,8 @@ where
255254
_config,
256255
}
257256
}
258-
}
259257

260-
impl<K: Deref + Clone, L: Deref> LdkEventHandler for EventHandler<K, L>
261-
where
262-
K::Target: KVStore,
263-
L::Target: Logger,
264-
{
265-
fn handle_event(&self, event: LdkEvent) {
258+
pub async fn handle_event(&self, event: LdkEvent) {
266259
match event {
267260
LdkEvent::FundingGenerationReady {
268261
temporary_channel_id,

src/lib.rs

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ use lightning::routing::utxo::UtxoLookup;
113113
use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig};
114114
use lightning::util::ser::ReadableArgs;
115115

116-
use lightning_background_processor::BackgroundProcessor;
116+
use lightning_background_processor::process_events_async;
117117
use lightning_background_processor::GossipSync as BPGossipSync;
118118

119119
use lightning_transaction_sync::EsploraSyncClient;
@@ -565,9 +565,7 @@ impl Builder {
565565
/// upon [`Node::stop()`].
566566
struct Runtime {
567567
tokio_runtime: Arc<tokio::runtime::Runtime>,
568-
_background_processor: BackgroundProcessor,
569-
stop_networking: Arc<AtomicBool>,
570-
stop_wallet_sync: Arc<AtomicBool>,
568+
stop_runtime: Arc<AtomicBool>,
571569
}
572570

573571
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
@@ -622,11 +620,10 @@ impl Node {
622620

623621
let runtime = run_lock.as_ref().unwrap();
624622

625-
// Stop wallet sync
626-
runtime.stop_wallet_sync.store(true, Ordering::Release);
623+
// Stop the runtime.
624+
runtime.stop_runtime.store(true, Ordering::Release);
627625

628-
// Stop networking
629-
runtime.stop_networking.store(true, Ordering::Release);
626+
// Stop disconnect peers.
630627
self.peer_manager.disconnect_all_peers();
631628

632629
// Drop the held runtimes.
@@ -643,6 +640,8 @@ impl Node {
643640

644641
self.wallet.set_runtime(Arc::clone(&tokio_runtime));
645642

643+
let stop_runtime = Arc::new(AtomicBool::new(false));
644+
646645
let event_handler = Arc::new(EventHandler::new(
647646
Arc::clone(&self.wallet),
648647
Arc::clone(&self.event_queue),
@@ -661,8 +660,7 @@ impl Node {
661660
let sync_cman = Arc::clone(&self.channel_manager);
662661
let sync_cmon = Arc::clone(&self.chain_monitor);
663662
let sync_logger = Arc::clone(&self.logger);
664-
let stop_wallet_sync = Arc::new(AtomicBool::new(false));
665-
let stop_sync = Arc::clone(&stop_wallet_sync);
663+
let stop_sync = Arc::clone(&stop_runtime);
666664

667665
std::thread::spawn(move || {
668666
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
@@ -693,7 +691,7 @@ impl Node {
693691
});
694692

695693
let sync_logger = Arc::clone(&self.logger);
696-
let stop_sync = Arc::clone(&stop_wallet_sync);
694+
let stop_sync = Arc::clone(&stop_runtime);
697695
tokio_runtime.spawn(async move {
698696
loop {
699697
if stop_sync.load(Ordering::Acquire) {
@@ -718,11 +716,10 @@ impl Node {
718716
}
719717
});
720718

721-
let stop_networking = Arc::new(AtomicBool::new(false));
722719
if let Some(listening_address) = &self.config.listening_address {
723720
// Setup networking
724721
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
725-
let stop_listen = Arc::clone(&stop_networking);
722+
let stop_listen = Arc::clone(&stop_runtime);
726723
let listening_address = listening_address.clone();
727724

728725
tokio_runtime.spawn(async move {
@@ -752,7 +749,7 @@ impl Node {
752749
let connect_pm = Arc::clone(&self.peer_manager);
753750
let connect_logger = Arc::clone(&self.logger);
754751
let connect_peer_store = Arc::clone(&self.peer_store);
755-
let stop_connect = Arc::clone(&stop_networking);
752+
let stop_connect = Arc::clone(&stop_runtime);
756753
tokio_runtime.spawn(async move {
757754
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
758755
loop {
@@ -785,19 +782,45 @@ impl Node {
785782
});
786783

787784
// Setup background processing
788-
let _background_processor = BackgroundProcessor::start(
789-
Arc::clone(&self.kv_store),
790-
Arc::clone(&event_handler),
791-
Arc::clone(&self.chain_monitor),
792-
Arc::clone(&self.channel_manager),
793-
BPGossipSync::p2p(Arc::clone(&self.gossip_sync)),
794-
Arc::clone(&self.peer_manager),
795-
Arc::clone(&self.logger),
796-
Some(Arc::clone(&self.scorer)),
797-
);
785+
let background_persister = Arc::clone(&self.kv_store);
786+
let background_event_handler = Arc::clone(&event_handler);
787+
let background_chain_mon = Arc::clone(&self.chain_monitor);
788+
let background_chan_man = Arc::clone(&self.channel_manager);
789+
let background_gossip_sync = BPGossipSync::p2p(Arc::clone(&self.gossip_sync));
790+
let background_peer_man = Arc::clone(&self.peer_manager);
791+
let background_logger = Arc::clone(&self.logger);
792+
let background_scorer = Arc::clone(&self.scorer);
793+
let stop_background_processing = Arc::clone(&stop_runtime);
794+
let sleeper = move |d| {
795+
let stop = Arc::clone(&stop_background_processing);
796+
Box::pin(async move {
797+
if stop.load(Ordering::Acquire) {
798+
true
799+
} else {
800+
tokio::time::sleep(d).await;
801+
false
802+
}
803+
})
804+
};
805+
806+
tokio_runtime.spawn(async move {
807+
process_events_async(
808+
background_persister,
809+
|e| background_event_handler.handle_event(e),
810+
background_chain_mon,
811+
background_chan_man,
812+
background_gossip_sync,
813+
background_peer_man,
814+
background_logger,
815+
Some(background_scorer),
816+
sleeper,
817+
true,
818+
)
819+
.await
820+
.expect("Failed to process events");
821+
});
798822

799-
// TODO: frequently check back on background_processor if there was an error
800-
Ok(Runtime { tokio_runtime, _background_processor, stop_networking, stop_wallet_sync })
823+
Ok(Runtime { tokio_runtime, stop_runtime })
801824
}
802825

803826
/// Blocks until the next event is available.

0 commit comments

Comments
 (0)