Skip to content

Commit 0afe5ae

Browse files
committed
Introduce TransactionBroadcaster
We introduce a separate `TransactionBroadcaster` which will regularly process a message queue of transactions to be broadcast. This allows us to a) decouple broadcasting from the BDK wallet and b) add transactions to the queue from a blocking context, without the need to schlep around a separate runtime or mess with it.
1 parent 5dda29c commit 0afe5ae

File tree

6 files changed

+156
-75
lines changed

6 files changed

+156
-75
lines changed

src/builder.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::io::sqlite_store::SqliteStore;
55
use crate::logger::{log_error, FilesystemLogger, Logger};
66
use crate::payment_store::PaymentStore;
77
use crate::peer_store::PeerStore;
8+
use crate::tx_broadcaster::TransactionBroadcaster;
89
use crate::types::{
910
ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph,
1011
OnionMessenger, PeerManager,
@@ -464,13 +465,17 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
464465
BuildError::WalletSetupFailed
465466
})?;
466467

467-
let (blockchain, tx_sync) = match chain_data_source_config {
468+
let (blockchain, tx_sync, tx_broadcaster) = match chain_data_source_config {
468469
Some(ChainDataSourceConfig::Esplora(server_url)) => {
469470
let tx_sync = Arc::new(EsploraSyncClient::new(server_url.clone(), Arc::clone(&logger)));
470471
let blockchain =
471472
EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP)
472473
.with_concurrency(BDK_CLIENT_CONCURRENCY);
473-
(blockchain, tx_sync)
474+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
475+
tx_sync.client().clone(),
476+
Arc::clone(&logger),
477+
));
478+
(blockchain, tx_sync, tx_broadcaster)
474479
}
475480
None => {
476481
// Default to Esplora client.
@@ -479,18 +484,26 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
479484
let blockchain =
480485
EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP)
481486
.with_concurrency(BDK_CLIENT_CONCURRENCY);
482-
(blockchain, tx_sync)
487+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
488+
tx_sync.client().clone(),
489+
Arc::clone(&logger),
490+
));
491+
(blockchain, tx_sync, tx_broadcaster)
483492
}
484493
};
485494

486495
let runtime = Arc::new(RwLock::new(None));
487-
let wallet =
488-
Arc::new(Wallet::new(blockchain, bdk_wallet, Arc::clone(&runtime), Arc::clone(&logger)));
496+
let wallet = Arc::new(Wallet::new(
497+
blockchain,
498+
bdk_wallet,
499+
Arc::clone(&tx_broadcaster),
500+
Arc::clone(&logger),
501+
));
489502

490503
// Initialize the ChainMonitor
491504
let chain_monitor: Arc<ChainMonitor<K>> = Arc::new(chainmonitor::ChainMonitor::new(
492505
Some(Arc::clone(&tx_sync)),
493-
Arc::clone(&wallet),
506+
Arc::clone(&tx_broadcaster),
494507
Arc::clone(&logger),
495508
Arc::clone(&wallet),
496509
Arc::clone(&kv_store),
@@ -594,7 +607,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
594607
Arc::clone(&keys_manager),
595608
Arc::clone(&wallet),
596609
Arc::clone(&chain_monitor),
597-
Arc::clone(&wallet),
610+
Arc::clone(&tx_broadcaster),
598611
Arc::clone(&router),
599612
Arc::clone(&logger),
600613
user_config,
@@ -618,7 +631,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
618631
channelmanager::ChannelManager::new(
619632
Arc::clone(&wallet),
620633
Arc::clone(&chain_monitor),
621-
Arc::clone(&wallet),
634+
Arc::clone(&tx_broadcaster),
622635
Arc::clone(&router),
623636
Arc::clone(&logger),
624637
Arc::clone(&keys_manager),
@@ -767,6 +780,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
767780
config,
768781
wallet,
769782
tx_sync,
783+
tx_broadcaster,
770784
event_queue,
771785
channel_manager,
772786
chain_monitor,

src/event.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::types::Wallet;
1+
use crate::types::{Broadcaster, Wallet};
22
use crate::{hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, UserChannelId};
33

44
use crate::payment_store::{
@@ -241,9 +241,10 @@ pub(crate) struct EventHandler<K: KVStore + Sync + Send, L: Deref>
241241
where
242242
L::Target: Logger,
243243
{
244-
wallet: Arc<Wallet>,
245244
event_queue: Arc<EventQueue<K, L>>,
245+
wallet: Arc<Wallet>,
246246
channel_manager: Arc<ChannelManager<K>>,
247+
tx_broadcaster: Arc<Broadcaster>,
247248
network_graph: Arc<NetworkGraph>,
248249
keys_manager: Arc<KeysManager>,
249250
payment_store: Arc<PaymentStore<K, L>>,
@@ -257,15 +258,17 @@ where
257258
L::Target: Logger,
258259
{
259260
pub fn new(
260-
wallet: Arc<Wallet>, event_queue: Arc<EventQueue<K, L>>,
261-
channel_manager: Arc<ChannelManager<K>>, network_graph: Arc<NetworkGraph>,
262-
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
261+
event_queue: Arc<EventQueue<K, L>>, wallet: Arc<Wallet>,
262+
channel_manager: Arc<ChannelManager<K>>, tx_broadcaster: Arc<Broadcaster>,
263+
network_graph: Arc<NetworkGraph>, keys_manager: Arc<KeysManager>,
264+
payment_store: Arc<PaymentStore<K, L>>,
263265
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>, logger: L, config: Arc<Config>,
264266
) -> Self {
265267
Self {
266268
event_queue,
267269
wallet,
268270
channel_manager,
271+
tx_broadcaster,
269272
network_graph,
270273
keys_manager,
271274
payment_store,
@@ -598,7 +601,9 @@ where
598601
);
599602

600603
match res {
601-
Ok(Some(spending_tx)) => self.wallet.broadcast_transactions(&[&spending_tx]),
604+
Ok(Some(spending_tx)) => {
605+
self.tx_broadcaster.broadcast_transactions(&[&spending_tx])
606+
}
602607
Ok(None) => {
603608
log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs);
604609
}

src/lib.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ mod payment_store;
8686
mod peer_store;
8787
#[cfg(test)]
8888
mod test;
89+
mod tx_broadcaster;
8990
mod types;
9091
#[cfg(feature = "uniffi")]
9192
mod uniffi_types;
@@ -119,7 +120,8 @@ use payment_store::PaymentStore;
119120
pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus};
120121
use peer_store::{PeerInfo, PeerStore};
121122
use types::{
122-
ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router, Scorer, Wallet,
123+
Broadcaster, ChainMonitor, ChannelManager, KeysManager, NetworkGraph, PeerManager, Router,
124+
Scorer, Wallet,
123125
};
124126
pub use types::{ChannelDetails, PeerDetails, UserChannelId};
125127

@@ -287,6 +289,7 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
287289
config: Arc<Config>,
288290
wallet: Arc<Wallet>,
289291
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
292+
tx_broadcaster: Arc<Broadcaster>,
290293
event_queue: Arc<EventQueue<K, Arc<FilesystemLogger>>>,
291294
channel_manager: Arc<ChannelManager<K>>,
292295
chain_monitor: Arc<ChainMonitor<K>>,
@@ -647,10 +650,29 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
647650
}
648651
});
649652

653+
let mut stop_tx_bcast = self.stop_receiver.clone();
654+
let tx_bcaster = Arc::clone(&self.tx_broadcaster);
655+
runtime.spawn(async move {
656+
// Every second we try to clear our broadcasting queue.
657+
let mut interval = tokio::time::interval(Duration::from_secs(1));
658+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
659+
loop {
660+
tokio::select! {
661+
_ = stop_tx_bcast.changed() => {
662+
return;
663+
}
664+
_ = interval.tick() => {
665+
tx_bcaster.process_next_package().await;
666+
}
667+
}
668+
}
669+
});
670+
650671
let event_handler = Arc::new(EventHandler::new(
651-
Arc::clone(&self.wallet),
652672
Arc::clone(&self.event_queue),
673+
Arc::clone(&self.wallet),
653674
Arc::clone(&self.channel_manager),
675+
Arc::clone(&self.tx_broadcaster),
654676
Arc::clone(&self.network_graph),
655677
Arc::clone(&self.keys_manager),
656678
Arc::clone(&self.payment_store),

src/tx_broadcaster.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use crate::logger::{log_error, log_trace, Logger};
2+
3+
use lightning::chain::chaininterface::BroadcasterInterface;
4+
5+
use esplora_client::AsyncClient as EsploraClient;
6+
7+
use bitcoin::Transaction;
8+
9+
use tokio::sync::mpsc;
10+
use tokio::sync::Mutex;
11+
12+
use std::ops::Deref;
13+
14+
const BCAST_PACKAGE_QUEUE_SIZE: usize = 50;
15+
16+
pub(crate) struct TransactionBroadcaster<L: Deref>
17+
where
18+
L::Target: Logger,
19+
{
20+
queue_sender: mpsc::Sender<Vec<Transaction>>,
21+
queue_receiver: Mutex<mpsc::Receiver<Vec<Transaction>>>,
22+
esplora_client: EsploraClient,
23+
logger: L,
24+
}
25+
26+
impl<L: Deref> TransactionBroadcaster<L>
27+
where
28+
L::Target: Logger,
29+
{
30+
pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self {
31+
let (queue_sender, queue_receiver) = mpsc::channel(BCAST_PACKAGE_QUEUE_SIZE);
32+
Self { queue_sender, queue_receiver: Mutex::new(queue_receiver), esplora_client, logger }
33+
}
34+
35+
pub(crate) async fn process_next_package(&self) {
36+
let mut receiver = self.queue_receiver.lock().await;
37+
if let Some(next_package) = receiver.recv().await {
38+
for tx in &next_package {
39+
match self.esplora_client.broadcast(tx).await {
40+
Ok(()) => {
41+
log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid());
42+
}
43+
Err(e) => {
44+
log_error!(
45+
self.logger,
46+
"Failed to broadcast transaction {}: {}",
47+
tx.txid(),
48+
e
49+
);
50+
}
51+
}
52+
}
53+
}
54+
}
55+
}
56+
57+
impl<L: Deref> BroadcasterInterface for TransactionBroadcaster<L>
58+
where
59+
L::Target: Logger,
60+
{
61+
fn broadcast_transactions(&self, txs: &[&Transaction]) {
62+
let package = txs.iter().map(|&t| t.clone()).collect::<Vec<Transaction>>();
63+
self.queue_sender.try_send(package).unwrap_or_else(|e| {
64+
log_error!(self.logger, "Failed to broadcast transactions: {}", e);
65+
});
66+
}
67+
}

src/types.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::{Arc, Mutex, RwLock};
2424
pub(crate) type ChainMonitor<K> = chainmonitor::ChainMonitor<
2525
InMemorySigner,
2626
Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
27-
Arc<Wallet>,
27+
Arc<Broadcaster>,
2828
Arc<Wallet>,
2929
Arc<FilesystemLogger>,
3030
Arc<K>,
@@ -42,7 +42,7 @@ pub(crate) type PeerManager<K> = lightning::ln::peer_handler::PeerManager<
4242

4343
pub(crate) type ChannelManager<K> = lightning::ln::channelmanager::ChannelManager<
4444
Arc<ChainMonitor<K>>,
45-
Arc<Wallet>,
45+
Arc<Broadcaster>,
4646
Arc<KeysManager>,
4747
Arc<KeysManager>,
4848
Arc<KeysManager>,
@@ -51,11 +51,16 @@ pub(crate) type ChannelManager<K> = lightning::ln::channelmanager::ChannelManage
5151
Arc<FilesystemLogger>,
5252
>;
5353

54+
pub(crate) type Broadcaster = crate::tx_broadcaster::TransactionBroadcaster<Arc<FilesystemLogger>>;
55+
5456
pub(crate) type Wallet =
55-
crate::wallet::Wallet<bdk::database::SqliteDatabase, Arc<FilesystemLogger>>;
57+
crate::wallet::Wallet<bdk::database::SqliteDatabase, Arc<Broadcaster>, Arc<FilesystemLogger>>;
5658

57-
pub(crate) type KeysManager =
58-
crate::wallet::WalletKeysManager<bdk::database::SqliteDatabase, Arc<FilesystemLogger>>;
59+
pub(crate) type KeysManager = crate::wallet::WalletKeysManager<
60+
bdk::database::SqliteDatabase,
61+
Arc<Broadcaster>,
62+
Arc<FilesystemLogger>,
63+
>;
5964

6065
pub(crate) type Router = DefaultRouter<
6166
Arc<NetworkGraph>,

0 commit comments

Comments
 (0)