Skip to content

Commit 15f4f2a

Browse files
authored
Merge pull request #152 from tnull/2023-08-sweep-sweep-sweep
Add `OutputSweeper` persisting and spending outputs
2 parents 4685ad1 + 707c170 commit 15f4f2a

File tree

8 files changed

+556
-77
lines changed

8 files changed

+556
-77
lines changed

src/builder.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::io::sqlite_store::SqliteStore;
66
use crate::logger::{log_error, FilesystemLogger, Logger};
77
use crate::payment_store::PaymentStore;
88
use crate::peer_store::PeerStore;
9+
use crate::sweep::OutputSweeper;
910
use crate::tx_broadcaster::TransactionBroadcaster;
1011
use crate::types::{
1112
ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph,
@@ -777,6 +778,25 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
777778
}
778779
};
779780

781+
let best_block = channel_manager.current_best_block();
782+
let output_sweeper =
783+
match io::utils::read_spendable_outputs(Arc::clone(&kv_store), Arc::clone(&logger)) {
784+
Ok(outputs) => Arc::new(OutputSweeper::new(
785+
outputs,
786+
Arc::clone(&wallet),
787+
Arc::clone(&tx_broadcaster),
788+
Arc::clone(&fee_estimator),
789+
Arc::clone(&keys_manager),
790+
Arc::clone(&kv_store),
791+
best_block,
792+
Some(Arc::clone(&tx_sync)),
793+
Arc::clone(&logger),
794+
)),
795+
Err(_) => {
796+
return Err(BuildError::ReadFailed);
797+
}
798+
};
799+
780800
let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
781801

782802
Ok(Node {
@@ -791,6 +811,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
791811
event_queue,
792812
channel_manager,
793813
chain_monitor,
814+
output_sweeper,
794815
peer_manager,
795816
keys_manager,
796817
network_graph,

src/event.rs

Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use crate::types::{Broadcaster, FeeEstimator, Wallet};
1+
use crate::types::{Sweeper, Wallet};
22
use crate::{
3-
hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore,
4-
UserChannelId,
3+
hex_utils, ChannelManager, Config, Error, NetworkGraph, PeerInfo, PeerStore, UserChannelId,
54
};
65

76
use crate::payment_store::{
@@ -12,11 +11,9 @@ use crate::io::{
1211
EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
1312
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
1413
};
15-
use crate::logger::{log_debug, log_error, log_info, Logger};
14+
use crate::logger::{log_error, log_info, Logger};
1615

17-
use lightning::chain::chaininterface::{
18-
BroadcasterInterface, ConfirmationTarget, FeeEstimator as LDKFeeEstimator,
19-
};
16+
use lightning::chain::chaininterface::ConfirmationTarget;
2017
use lightning::events::Event as LdkEvent;
2118
use lightning::events::PaymentPurpose;
2219
use lightning::impl_writeable_tlv_based_enum;
@@ -26,8 +23,8 @@ use lightning::util::errors::APIError;
2623
use lightning::util::persist::KVStore;
2724
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
2825

29-
use bitcoin::secp256k1::{PublicKey, Secp256k1};
30-
use bitcoin::{LockTime, OutPoint, PackedLockTime};
26+
use bitcoin::secp256k1::PublicKey;
27+
use bitcoin::{LockTime, OutPoint};
3128
use rand::{thread_rng, Rng};
3229
use std::collections::VecDeque;
3330
use std::ops::Deref;
@@ -249,10 +246,8 @@ where
249246
event_queue: Arc<EventQueue<K, L>>,
250247
wallet: Arc<Wallet>,
251248
channel_manager: Arc<ChannelManager<K>>,
252-
tx_broadcaster: Arc<Broadcaster>,
253-
fee_estimator: Arc<FeeEstimator>,
249+
output_sweeper: Arc<Sweeper<K>>,
254250
network_graph: Arc<NetworkGraph>,
255-
keys_manager: Arc<KeysManager>,
256251
payment_store: Arc<PaymentStore<K, L>>,
257252
peer_store: Arc<PeerStore<K, L>>,
258253
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
@@ -266,20 +261,17 @@ where
266261
{
267262
pub fn new(
268263
event_queue: Arc<EventQueue<K, L>>, wallet: Arc<Wallet>,
269-
channel_manager: Arc<ChannelManager<K>>, tx_broadcaster: Arc<Broadcaster>,
270-
fee_estimator: Arc<FeeEstimator>, network_graph: Arc<NetworkGraph>,
271-
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
264+
channel_manager: Arc<ChannelManager<K>>, output_sweeper: Arc<Sweeper<K>>,
265+
network_graph: Arc<NetworkGraph>, payment_store: Arc<PaymentStore<K, L>>,
272266
peer_store: Arc<PeerStore<K, L>>, runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
273267
logger: L, config: Arc<Config>,
274268
) -> Self {
275269
Self {
276270
event_queue,
277271
wallet,
278272
channel_manager,
279-
tx_broadcaster,
280-
fee_estimator,
273+
output_sweeper,
281274
network_graph,
282-
keys_manager,
283275
payment_store,
284276
peer_store,
285277
logger,
@@ -585,42 +577,8 @@ where
585577
});
586578
}
587579
}
588-
LdkEvent::SpendableOutputs { outputs, channel_id: _ } => {
589-
// TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so.
590-
let destination_address = self.wallet.get_new_address().unwrap_or_else(|e| {
591-
log_error!(self.logger, "Failed to get destination address: {}", e);
592-
panic!("Failed to get destination address");
593-
});
594-
595-
let output_descriptors = &outputs.iter().collect::<Vec<_>>();
596-
let tx_feerate = self
597-
.fee_estimator
598-
.get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee);
599-
600-
// We set nLockTime to the current height to discourage fee sniping.
601-
let cur_height = self.channel_manager.current_best_block().height();
602-
let locktime: PackedLockTime =
603-
LockTime::from_height(cur_height).map_or(PackedLockTime::ZERO, |l| l.into());
604-
let res = self.keys_manager.spend_spendable_outputs(
605-
output_descriptors,
606-
Vec::new(),
607-
destination_address.script_pubkey(),
608-
tx_feerate,
609-
Some(locktime),
610-
&Secp256k1::new(),
611-
);
612-
613-
match res {
614-
Ok(Some(spending_tx)) => {
615-
self.tx_broadcaster.broadcast_transactions(&[&spending_tx])
616-
}
617-
Ok(None) => {
618-
log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs);
619-
}
620-
Err(err) => {
621-
log_error!(self.logger, "Error spending outputs: {:?}", err);
622-
}
623-
}
580+
LdkEvent::SpendableOutputs { outputs, channel_id } => {
581+
self.output_sweeper.add_outputs(outputs, channel_id)
624582
}
625583
LdkEvent::OpenChannelRequest {
626584
temporary_channel_id,

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
2121
pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments";
2222
pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
2323

24+
/// The spendable output information will be persisted under this prefix.
25+
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs";
26+
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
27+
2428
/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
2529
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = "";
2630
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE: &str = "";

src/io/utils.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::WALLET_KEYS_SEED_LEN;
33

44
use crate::logger::log_error;
55
use crate::peer_store::PeerStore;
6+
use crate::sweep::SpendableOutputInfo;
67
use crate::{Error, EventQueue, PaymentDetails};
78

89
use lightning::routing::gossip::NetworkGraph;
@@ -199,6 +200,36 @@ where
199200
Ok(res)
200201
}
201202

203+
/// Read previously persisted spendable output information from the store.
204+
pub(crate) fn read_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
205+
kv_store: Arc<K>, logger: L,
206+
) -> Result<Vec<SpendableOutputInfo>, std::io::Error>
207+
where
208+
L::Target: Logger,
209+
{
210+
let mut res = Vec::new();
211+
212+
for stored_key in kv_store.list(
213+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
214+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
215+
)? {
216+
let mut reader = Cursor::new(kv_store.read(
217+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
218+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
219+
&stored_key,
220+
)?);
221+
let output = SpendableOutputInfo::read(&mut reader).map_err(|e| {
222+
log_error!(logger, "Failed to deserialize SpendableOutputInfo: {}", e);
223+
std::io::Error::new(
224+
std::io::ErrorKind::InvalidData,
225+
"Failed to deserialize SpendableOutputInfo",
226+
)
227+
})?;
228+
res.push(output);
229+
}
230+
Ok(res)
231+
}
232+
202233
pub(crate) fn read_latest_rgs_sync_timestamp<K: KVStore + Sync + Send, L: Deref>(
203234
kv_store: Arc<K>, logger: L,
204235
) -> Result<u32, std::io::Error>

src/lib.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub mod io;
8585
mod logger;
8686
mod payment_store;
8787
mod peer_store;
88+
mod sweep;
8889
#[cfg(test)]
8990
mod test;
9091
mod tx_broadcaster;
@@ -122,7 +123,7 @@ pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus};
122123
use peer_store::{PeerInfo, PeerStore};
123124
use types::{
124125
Broadcaster, ChainMonitor, ChannelManager, FeeEstimator, KeysManager, NetworkGraph,
125-
PeerManager, Router, Scorer, Wallet,
126+
PeerManager, Router, Scorer, Sweeper, Wallet,
126127
};
127128
pub use types::{ChannelDetails, PeerDetails, UserChannelId};
128129

@@ -295,6 +296,7 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
295296
event_queue: Arc<EventQueue<K, Arc<FilesystemLogger>>>,
296297
channel_manager: Arc<ChannelManager<K>>,
297298
chain_monitor: Arc<ChainMonitor<K>>,
299+
output_sweeper: Arc<Sweeper<K>>,
298300
peer_manager: Arc<PeerManager<K>>,
299301
keys_manager: Arc<KeysManager>,
300302
network_graph: Arc<NetworkGraph>,
@@ -432,6 +434,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
432434
let tx_sync = Arc::clone(&self.tx_sync);
433435
let sync_cman = Arc::clone(&self.channel_manager);
434436
let sync_cmon = Arc::clone(&self.chain_monitor);
437+
let sync_sweeper = Arc::clone(&self.output_sweeper);
435438
let sync_logger = Arc::clone(&self.logger);
436439
let mut stop_sync = self.stop_receiver.clone();
437440
let wallet_sync_interval_secs =
@@ -449,6 +452,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
449452
let confirmables = vec![
450453
&*sync_cman as &(dyn Confirm + Sync + Send),
451454
&*sync_cmon as &(dyn Confirm + Sync + Send),
455+
&*sync_sweeper as &(dyn Confirm + Sync + Send),
452456
];
453457
let now = Instant::now();
454458
match tx_sync.sync(confirmables).await {
@@ -695,10 +699,8 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
695699
Arc::clone(&self.event_queue),
696700
Arc::clone(&self.wallet),
697701
Arc::clone(&self.channel_manager),
698-
Arc::clone(&self.tx_broadcaster),
699-
Arc::clone(&self.fee_estimator),
702+
Arc::clone(&self.output_sweeper),
700703
Arc::clone(&self.network_graph),
701-
Arc::clone(&self.keys_manager),
702704
Arc::clone(&self.payment_store),
703705
Arc::clone(&self.peer_store),
704706
Arc::clone(&self.runtime),
@@ -1036,10 +1038,12 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
10361038
let tx_sync = Arc::clone(&self.tx_sync);
10371039
let sync_cman = Arc::clone(&self.channel_manager);
10381040
let sync_cmon = Arc::clone(&self.chain_monitor);
1041+
let sync_sweeper = Arc::clone(&self.output_sweeper);
10391042
let sync_logger = Arc::clone(&self.logger);
10401043
let confirmables = vec![
10411044
&*sync_cman as &(dyn Confirm + Sync + Send),
10421045
&*sync_cmon as &(dyn Confirm + Sync + Send),
1046+
&*sync_sweeper as &(dyn Confirm + Sync + Send),
10431047
];
10441048

10451049
tokio::task::block_in_place(move || {

0 commit comments

Comments
 (0)