Skip to content

Commit 620667a

Browse files
committed
Implement Confirm/Listen interfaces
1 parent 18edaaf commit 620667a

File tree

1 file changed

+156
-5
lines changed

1 file changed

+156
-5
lines changed

src/sweep.rs

Lines changed: 156 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::wallet::{num_blocks_from_conf_target, Wallet};
55
use crate::{Error, KeysManager};
66

77
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
8-
use lightning::chain::BestBlock;
8+
use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
99
use lightning::impl_writeable_tlv_based;
1010
use lightning::sign::{EntropySource, SpendableOutputDescriptor};
1111
use lightning::util::ser::Writeable;
@@ -16,6 +16,8 @@ use bitcoin::{BlockHash, BlockHeader, LockTime, PackedLockTime, Script, Transact
1616
use std::ops::Deref;
1717
use std::sync::{Arc, Mutex};
1818

19+
const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6;
20+
1921
#[derive(Clone, Debug, PartialEq, Eq)]
2022
pub(crate) struct SpendableOutputInfo {
2123
id: [u8; 32],
@@ -33,29 +35,33 @@ impl_writeable_tlv_based!(SpendableOutputInfo, {
3335
(8, confirmed_in_block, option),
3436
});
3537

36-
pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, L: Deref>
38+
pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, F: Deref, L: Deref>
3739
where
40+
F::Target: Filter,
3841
L::Target: Logger,
3942
{
4043
outputs: Mutex<Vec<SpendableOutputInfo>>,
4144
wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
4245
keys_manager: Arc<KeysManager>,
4346
kv_store: Arc<K>,
4447
best_block: Mutex<BestBlock>,
48+
chain_source: Option<F>,
4549
logger: L,
4650
}
4751

48-
impl<K: KVStore + Sync + Send, L: Deref> OutputSweeper<K, L>
52+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> OutputSweeper<K, F, L>
4953
where
54+
F::Target: Filter,
5055
L::Target: Logger,
5156
{
5257
pub(crate) fn new(
5358
outputs: Vec<SpendableOutputInfo>, wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
54-
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock, logger: L,
59+
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock,
60+
chain_source: Option<F>, logger: L,
5561
) -> Self {
5662
let outputs = Mutex::new(outputs);
5763
let best_block = Mutex::new(best_block);
58-
Self { outputs, wallet, keys_manager, kv_store, best_block, logger }
64+
Self { outputs, wallet, keys_manager, kv_store, best_block, chain_source, logger }
5965
}
6066

6167
pub(crate) fn add_outputs(&self, output_descriptors: Vec<SpendableOutputDescriptor>) {
@@ -64,6 +70,9 @@ where
6470
let spending_tx = match self.get_spending_tx(&output_descriptors) {
6571
Ok(Some(spending_tx)) => {
6672
self.wallet.broadcast_transactions(&[&spending_tx]);
73+
if let Some(filter) = self.chain_source.as_ref() {
74+
filter.register_tx(&spending_tx.txid(), &Script::new())
75+
}
6776
spending_tx
6877
}
6978
Ok(None) => {
@@ -138,3 +147,145 @@ where
138147
})
139148
}
140149
}
150+
151+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Listen for OutputSweeper<K, F, L>
152+
where
153+
F::Target: Filter,
154+
L::Target: Logger,
155+
{
156+
fn filtered_block_connected(
157+
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
158+
) {
159+
{
160+
let best_block = self.best_block.lock().unwrap();
161+
assert_eq!(best_block.block_hash(), header.prev_blockhash,
162+
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
163+
assert_eq!(best_block.height(), height - 1,
164+
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
165+
}
166+
167+
self.transactions_confirmed(header, txdata, height);
168+
self.best_block_updated(header, height);
169+
}
170+
171+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
172+
let new_height = height - 1;
173+
{
174+
let mut best_block = self.best_block.lock().unwrap();
175+
assert_eq!(best_block.block_hash(), header.block_hash(),
176+
"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
177+
assert_eq!(best_block.height(), height,
178+
"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
179+
*best_block = BestBlock::new(header.prev_blockhash, new_height)
180+
}
181+
182+
let mut locked_outputs = self.outputs.lock().unwrap();
183+
for output_info in locked_outputs.iter_mut() {
184+
if output_info.confirmed_in_block == Some((height, header.block_hash())) {
185+
output_info.confirmed_in_block = None;
186+
}
187+
}
188+
}
189+
}
190+
191+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Confirm for OutputSweeper<K, F, L>
192+
where
193+
F::Target: Filter,
194+
L::Target: Logger,
195+
{
196+
fn transactions_confirmed(
197+
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
198+
) {
199+
let mut locked_outputs = self.outputs.lock().unwrap();
200+
for (_, tx) in txdata {
201+
locked_outputs
202+
.iter_mut()
203+
.filter(|o| o.spending_tx.txid() == tx.txid())
204+
.for_each(|o| o.confirmed_in_block = Some((height, header.block_hash())));
205+
}
206+
}
207+
208+
fn transaction_unconfirmed(&self, txid: &Txid) {
209+
let mut locked_outputs = self.outputs.lock().unwrap();
210+
211+
// Get what height was unconfirmed.
212+
let unconf_height = locked_outputs
213+
.iter()
214+
.find(|o| o.spending_tx.txid() == *txid)
215+
.and_then(|o| o.confirmed_in_block)
216+
.map(|t| t.0);
217+
218+
// Unconfirm all >= this height.
219+
locked_outputs
220+
.iter_mut()
221+
.filter(|o| o.confirmed_in_block.map(|t| t.0) >= unconf_height)
222+
.for_each(|o| o.confirmed_in_block = None);
223+
}
224+
225+
fn best_block_updated(&self, header: &BlockHeader, height: u32) {
226+
*self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height);
227+
228+
let mut locked_outputs = self.outputs.lock().unwrap();
229+
230+
// Rebroadcast all outputs that didn't get confirmed by now.
231+
for output_info in locked_outputs.iter_mut().filter(|o| o.confirmed_in_block.is_none()) {
232+
if height
233+
>= output_info.broadcast_height
234+
+ num_blocks_from_conf_target(ConfirmationTarget::Background)
235+
{
236+
let output_descriptors = vec![output_info.descriptor.clone()];
237+
match self.get_spending_tx(&output_descriptors) {
238+
Ok(Some(spending_tx)) => {
239+
self.wallet.broadcast_transactions(&[&spending_tx]);
240+
if let Some(filter) = self.chain_source.as_ref() {
241+
filter.register_tx(&spending_tx.txid(), &Script::new())
242+
}
243+
output_info.spending_tx = spending_tx;
244+
output_info.broadcast_height = height;
245+
}
246+
Ok(None) => {
247+
log_debug!(
248+
self.logger,
249+
"Omitted spending static outputs: {:?}",
250+
output_descriptors
251+
);
252+
}
253+
Err(err) => {
254+
log_error!(self.logger, "Error spending outputs: {:?}", err);
255+
}
256+
};
257+
}
258+
}
259+
260+
// Prune all outputs that have sufficient depth by now.
261+
locked_outputs.retain(|o| {
262+
if let Some((conf_height, _)) = o.confirmed_in_block {
263+
if height >= conf_height + CONSIDERED_SPENT_THRESHOLD_CONF {
264+
let key = hex_utils::to_string(&o.id);
265+
match self.kv_store.remove(SPENDABLE_OUTPUT_INFO_PERSISTENCE_NAMESPACE, &key) {
266+
Ok(_) => return false,
267+
Err(e) => {
268+
log_error!(
269+
self.logger,
270+
"Removal of key {}/{} failed due to: {}",
271+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_NAMESPACE,
272+
key,
273+
e
274+
);
275+
return true;
276+
}
277+
}
278+
}
279+
}
280+
true
281+
});
282+
}
283+
284+
fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
285+
let locked_outputs = self.outputs.lock().unwrap();
286+
locked_outputs
287+
.iter()
288+
.map(|o| (o.spending_tx.txid(), o.confirmed_in_block.map(|c| c.1)))
289+
.collect::<Vec<_>>()
290+
}
291+
}

0 commit comments

Comments
 (0)