@@ -47,46 +47,6 @@ use core::ops::Deref;
47
47
use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
48
48
use bitcoin:: secp256k1:: PublicKey ;
49
49
50
- mod update_origin {
51
- #[ derive( Debug , Clone , Copy , Hash , PartialEq , Eq ) ]
52
- /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
53
- /// entirely opaque.
54
- pub ( crate ) enum UpdateOrigin {
55
- /// An update that was generated by the `ChannelManager` (via our [`crate::chain::Watch`]
56
- /// implementation). This corresponds to an actual [ChannelMonitorUpdate::update_id] field
57
- /// and [ChannelMonitor::get_latest_update_id].
58
- ///
59
- /// [ChannelMonitor::get_latest_update_id]: crate::chain::channelmonitor::ChannelMonitor::get_latest_update_id
60
- /// [ChannelMonitorUpdate::update_id]: crate::chain::channelmonitor::ChannelMonitorUpdate::update_id
61
- OffChain ( u64 ) ,
62
- /// An update that was generated during blockchain processing. The ID here is specific to the
63
- /// generating [ChannelMonitor] and does *not* correspond to any on-disk IDs.
64
- ///
65
- /// [ChannelMonitor]: crate::chain::channelmonitor::ChannelMonitor
66
- ChainSync ( u64 ) ,
67
- }
68
- }
69
-
70
- #[ cfg( any( feature = "_test_utils" , test) ) ]
71
- pub ( crate ) use update_origin:: UpdateOrigin ;
72
- #[ cfg( not( any( feature = "_test_utils" , test) ) ) ]
73
- use update_origin:: UpdateOrigin ;
74
-
75
- /// An opaque identifier describing a specific [`Persist`] method call.
76
- #[ derive( Debug , Clone , Copy , Hash , PartialEq , Eq ) ]
77
- pub struct MonitorUpdateId {
78
- pub ( crate ) contents : UpdateOrigin ,
79
- }
80
-
81
- impl MonitorUpdateId {
82
- pub ( crate ) fn from_monitor_update ( update : & ChannelMonitorUpdate ) -> Self {
83
- Self { contents : UpdateOrigin :: OffChain ( update. update_id ) }
84
- }
85
- pub ( crate ) fn from_new_monitor < ChannelSigner : WriteableEcdsaChannelSigner > ( monitor : & ChannelMonitor < ChannelSigner > ) -> Self {
86
- Self { contents : UpdateOrigin :: OffChain ( monitor. get_latest_update_id ( ) ) }
87
- }
88
- }
89
-
90
50
/// `Persist` defines behavior for persisting channel monitors: this could mean
91
51
/// writing once to disk, and/or uploading to one or more backup services.
92
52
///
@@ -119,7 +79,7 @@ impl MonitorUpdateId {
119
79
/// All calls should generally spawn a background task and immediately return
120
80
/// [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes,
121
81
/// [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding
122
- /// [`MonitorUpdateId `].
82
+ /// [`ChannelMonitor::get_latest_update_id`] or [`ChannelMonitorUpdate::update_id `].
123
83
///
124
84
/// Note that unlike the direct [`chain::Watch`] interface,
125
85
/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
@@ -150,15 +110,16 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
150
110
/// channel's outpoint (and it is up to you to maintain a correct mapping between the outpoint
151
111
/// and the stored channel data). Note that you **must** persist every new monitor to disk.
152
112
///
153
- /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
154
- /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
113
+ /// The [`ChannelMonitor::get_latest_update_id`] uniquely links this call to [`ChainMonitor::channel_monitor_updated`].
114
+ /// For [`Persist::persist_new_channel`], it is only necessary to call [`ChainMonitor::channel_monitor_updated`]
115
+ /// when you return [`ChannelMonitorUpdateStatus::InProgress`].
155
116
///
156
117
/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
157
118
/// and [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
158
119
///
159
120
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
160
121
/// [`Writeable::write`]: crate::util::ser::Writeable::write
161
- fn persist_new_channel ( & self , channel_funding_outpoint : OutPoint , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> ChannelMonitorUpdateStatus ;
122
+ fn persist_new_channel ( & self , channel_funding_outpoint : OutPoint , monitor : & ChannelMonitor < ChannelSigner > ) -> ChannelMonitorUpdateStatus ;
162
123
163
124
/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
164
125
/// update.
@@ -185,15 +146,17 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
185
146
/// them in batches. The size of each monitor grows `O(number of state updates)`
186
147
/// whereas updates are small and `O(1)`.
187
148
///
188
- /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`],
189
- /// if you return [`ChannelMonitorUpdateStatus::InProgress`].
149
+ /// The [`ChannelMonitorUpdate::update_id`] or [`ChannelMonitor::get_latest_update_id`] uniquely
150
+ /// links this call to [`ChainMonitor::channel_monitor_updated`].
151
+ /// For [`Persist::update_persisted_channel`], it is only necessary to call [`ChainMonitor::channel_monitor_updated`]
152
+ /// when an [`ChannelMonitorUpdate`] is provided and when you return [`ChannelMonitorUpdateStatus::InProgress`].
190
153
///
191
154
/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`,
192
155
/// [`Writeable::write`] on [`ChannelMonitorUpdate`] for writing out an update, and
193
156
/// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
194
157
///
195
158
/// [`Writeable::write`]: crate::util::ser::Writeable::write
196
- fn update_persisted_channel ( & self , channel_funding_outpoint : OutPoint , update : Option < & ChannelMonitorUpdate > , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> ChannelMonitorUpdateStatus ;
159
+ fn update_persisted_channel ( & self , channel_funding_outpoint : OutPoint , monitor_update : Option < & ChannelMonitorUpdate > , monitor : & ChannelMonitor < ChannelSigner > ) -> ChannelMonitorUpdateStatus ;
197
160
/// Prevents the channel monitor from being loaded on startup.
198
161
///
199
162
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
@@ -209,13 +172,12 @@ struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
209
172
/// update_persisted_channel, the user returns a
210
173
/// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated
211
174
/// immediately, racing our insertion of the pending update into the contained Vec.
212
- pending_monitor_updates : Mutex < Vec < MonitorUpdateId > > ,
175
+ pending_monitor_updates : Mutex < Vec < u64 > > ,
213
176
}
214
177
215
178
impl < ChannelSigner : WriteableEcdsaChannelSigner > MonitorHolder < ChannelSigner > {
216
- fn has_pending_offchain_updates ( & self , pending_monitor_updates_lock : & MutexGuard < Vec < MonitorUpdateId > > ) -> bool {
217
- pending_monitor_updates_lock. iter ( ) . any ( |update_id|
218
- if let UpdateOrigin :: OffChain ( _) = update_id. contents { true } else { false } )
179
+ fn has_pending_updates ( & self , pending_monitor_updates_lock : & MutexGuard < Vec < u64 > > ) -> bool {
180
+ !pending_monitor_updates_lock. is_empty ( )
219
181
}
220
182
}
221
183
@@ -259,7 +221,7 @@ pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T:
259
221
P :: Target : Persist < ChannelSigner > ,
260
222
{
261
223
monitors : RwLock < HashMap < OutPoint , MonitorHolder < ChannelSigner > > > ,
262
- /// When we generate a [`MonitorUpdateId`] for a chain-event monitor persistence, we need a
224
+ /// When we generate a monitor update for a chain-event monitor persistence, we need a
263
225
/// unique ID, which we calculate by simply getting the next value from this counter. Note that
264
226
/// the ID is never persisted so it's ok that they reset on restart.
265
227
sync_persistence_id : AtomicCounter ,
@@ -346,20 +308,11 @@ where C::Target: chain::Filter,
346
308
let mut txn_outputs;
347
309
{
348
310
txn_outputs = process ( monitor, txdata) ;
349
- let chain_sync_update_id = self . sync_persistence_id . get_increment ( ) ;
350
- let update_id = MonitorUpdateId {
351
- contents : UpdateOrigin :: ChainSync ( chain_sync_update_id) ,
352
- } ;
353
-
354
- log_trace ! ( logger, "Syncing Channel Monitor for channel {} for block-data update_id {}" ,
355
- log_funding_info!( monitor) ,
356
- chain_sync_update_id
357
- ) ;
358
- match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor, update_id) {
311
+ log_trace ! ( logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
312
+ match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor) {
359
313
ChannelMonitorUpdateStatus :: Completed =>
360
- log_trace ! ( logger, "Finished syncing Channel Monitor for channel {} for block-data update_id {}" ,
361
- log_funding_info!( monitor) ,
362
- chain_sync_update_id
314
+ log_trace ! ( logger, "Finished syncing Channel Monitor for channel {} for block-data" ,
315
+ log_funding_info!( monitor)
363
316
) ,
364
317
ChannelMonitorUpdateStatus :: InProgress => {
365
318
log_debug ! ( logger, "Channel Monitor sync for channel {} in progress." , log_funding_info!( monitor) ) ;
@@ -464,15 +417,21 @@ where C::Target: chain::Filter,
464
417
465
418
#[ cfg( not( c_bindings) ) ]
466
419
/// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
467
- pub fn list_pending_monitor_updates ( & self ) -> HashMap < OutPoint , Vec < MonitorUpdateId > > {
420
+ /// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
421
+ /// that have not yet been fully persisted. Note that if a full monitor is persisted all the pending
422
+ /// monitor updates must be individually marked completed by calling [`ChainMonitor::channel_monitor_updated`].
423
+ pub fn list_pending_monitor_updates ( & self ) -> HashMap < OutPoint , Vec < u64 > > {
468
424
hash_map_from_iter ( self . monitors . read ( ) . unwrap ( ) . iter ( ) . map ( |( outpoint, holder) | {
469
425
( * outpoint, holder. pending_monitor_updates . lock ( ) . unwrap ( ) . clone ( ) )
470
426
} ) )
471
427
}
472
428
473
429
#[ cfg( c_bindings) ]
474
430
/// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
475
- pub fn list_pending_monitor_updates ( & self ) -> Vec < ( OutPoint , Vec < MonitorUpdateId > ) > {
431
+ /// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
432
+ /// that have not yet been fully persisted. Note that if a full monitor is persisted all the pending
433
+ /// monitor updates must be individually marked completed by calling [`ChainMonitor::channel_monitor_updated`].
434
+ pub fn list_pending_monitor_updates ( & self ) -> Vec < ( OutPoint , Vec < u64 > ) > {
476
435
self . monitors . read ( ) . unwrap ( ) . iter ( ) . map ( |( outpoint, holder) | {
477
436
( * outpoint, holder. pending_monitor_updates . lock ( ) . unwrap ( ) . clone ( ) )
478
437
} ) . collect ( )
@@ -491,56 +450,49 @@ where C::Target: chain::Filter,
491
450
/// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
492
451
/// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
493
452
/// returning [`ChannelMonitorUpdateStatus::InProgress`],
494
- /// 2) once all remote copies are updated, you call this function with the
495
- /// `completed_update_id` that completed, and once all pending updates have completed the
496
- /// channel will be re-enabled.
497
- // Note that we re-enable only after `UpdateOrigin::OffChain` updates complete, we don't
498
- // care about `UpdateOrigin::ChainSync` updates for the channel state being updated. We
499
- // only care about `UpdateOrigin::ChainSync` for returning `MonitorEvent`s.
453
+ /// 2) once all remote copies are updated, you call this function with [`ChannelMonitor::get_latest_update_id`]
454
+ /// or [`ChannelMonitorUpdate::update_id`] as the `completed_update_id`, and once all pending
455
+ /// updates have completed the channel will be re-enabled.
456
+ ///
457
+ /// It is only necessary to call [`ChainMonitor::channel_monitor_updated`] when you return [`ChannelMonitorUpdateStatus::InProgress`]
458
+ /// from [`Persist`] and either:
459
+ /// 1. A new [`ChannelMonitor`] was added in [`Persist::persist_new_channel`], or
460
+ /// 2. A [`ChannelMonitorUpdate`] was provided as part of [`Persist::update_persisted_channel`].
461
+ /// Note that we don't care about calls to [`Persist::update_persisted_channel`] where no
462
+ /// [`ChannelMonitorUpdate`] was provided.
500
463
///
501
464
/// Returns an [`APIError::APIMisuseError`] if `funding_txo` does not match any currently
502
465
/// registered [`ChannelMonitor`]s.
503
- pub fn channel_monitor_updated ( & self , funding_txo : OutPoint , completed_update_id : MonitorUpdateId ) -> Result < ( ) , APIError > {
466
+ pub fn channel_monitor_updated ( & self , funding_txo : OutPoint , completed_update_id : u64 ) -> Result < ( ) , APIError > {
504
467
let monitors = self . monitors . read ( ) . unwrap ( ) ;
505
468
let monitor_data = if let Some ( mon) = monitors. get ( & funding_txo) { mon } else {
506
469
return Err ( APIError :: APIMisuseError { err : format ! ( "No ChannelMonitor matching funding outpoint {:?} found" , funding_txo) } ) ;
507
470
} ;
508
471
let mut pending_monitor_updates = monitor_data. pending_monitor_updates . lock ( ) . unwrap ( ) ;
509
472
pending_monitor_updates. retain ( |update_id| * update_id != completed_update_id) ;
510
473
511
- match completed_update_id {
512
- MonitorUpdateId { contents : UpdateOrigin :: OffChain ( completed_update_id) } => {
513
- // Note that we only check for `UpdateOrigin::OffChain` failures here - if
514
- // we're being told that a `UpdateOrigin::OffChain` monitor update completed,
515
- // we only care about ensuring we don't tell the `ChannelManager` to restore
516
- // the channel to normal operation until all `UpdateOrigin::OffChain` updates
517
- // complete.
518
- // If there's some `UpdateOrigin::ChainSync` update still pending that's okay
519
- // - we can still update our channel state, just as long as we don't return
520
- // `MonitorEvent`s from the monitor back to the `ChannelManager` until they
521
- // complete.
522
- let monitor_is_pending_updates = monitor_data. has_pending_offchain_updates ( & pending_monitor_updates) ;
523
- log_debug ! ( self . logger, "Completed off-chain monitor update {} for channel with funding outpoint {:?}, {}" ,
524
- completed_update_id,
525
- funding_txo,
526
- if monitor_is_pending_updates {
527
- "still have pending off-chain updates"
528
- } else {
529
- "all off-chain updates complete, returning a MonitorEvent"
530
- } ) ;
531
- if monitor_is_pending_updates {
532
- // If there are still monitor updates pending, we cannot yet construct a
533
- // Completed event.
534
- return Ok ( ( ) ) ;
535
- }
536
- let channel_id = monitor_data. monitor . channel_id ( ) ;
537
- self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( funding_txo, channel_id, vec ! [ MonitorEvent :: Completed {
538
- funding_txo, channel_id,
539
- monitor_update_id: monitor_data. monitor. get_latest_update_id( ) ,
540
- } ] , monitor_data. monitor . get_counterparty_node_id ( ) ) ) ;
541
- } ,
542
- MonitorUpdateId { contents : UpdateOrigin :: ChainSync ( _) } => { } ,
474
+ // Note that we only check for pending non-chainsync monitor updates and we don't track monitor
475
+ // updates resulting from chainsync in `pending_monitor_updates`.
476
+ let monitor_is_pending_updates = monitor_data. has_pending_updates ( & pending_monitor_updates) ;
477
+ log_debug ! ( self . logger, "Completed off-chain monitor update {} for channel with funding outpoint {:?}, {}" ,
478
+ completed_update_id,
479
+ funding_txo,
480
+ if monitor_is_pending_updates {
481
+ "still have pending off-chain updates"
482
+ } else {
483
+ "all off-chain updates complete, returning a MonitorEvent"
484
+ } ) ;
485
+ if monitor_is_pending_updates {
486
+ // If there are still monitor updates pending, we cannot yet construct a
487
+ // Completed event.
488
+ return Ok ( ( ) ) ;
543
489
}
490
+ let channel_id = monitor_data. monitor . channel_id ( ) ;
491
+ self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( funding_txo, channel_id, vec ! [ MonitorEvent :: Completed {
492
+ funding_txo, channel_id,
493
+ monitor_update_id: monitor_data. monitor. get_latest_update_id( ) ,
494
+ } ] , monitor_data. monitor . get_counterparty_node_id ( ) ) ) ;
495
+
544
496
self . event_notifier . notify ( ) ;
545
497
Ok ( ( ) )
546
498
}
@@ -771,9 +723,9 @@ where C::Target: chain::Filter,
771
723
hash_map:: Entry :: Vacant ( e) => e,
772
724
} ;
773
725
log_trace ! ( logger, "Got new ChannelMonitor for channel {}" , log_funding_info!( monitor) ) ;
774
- let update_id = MonitorUpdateId :: from_new_monitor ( & monitor) ;
726
+ let update_id = monitor. get_latest_update_id ( ) ;
775
727
let mut pending_monitor_updates = Vec :: new ( ) ;
776
- let persist_res = self . persister . persist_new_channel ( funding_outpoint, & monitor, update_id ) ;
728
+ let persist_res = self . persister . persist_new_channel ( funding_outpoint, & monitor) ;
777
729
match persist_res {
778
730
ChannelMonitorUpdateStatus :: InProgress => {
779
731
log_info ! ( logger, "Persistence of new ChannelMonitor for channel {} in progress" , log_funding_info!( monitor) ) ;
@@ -823,7 +775,7 @@ where C::Target: chain::Filter,
823
775
log_trace ! ( logger, "Updating ChannelMonitor to id {} for channel {}" , update. update_id, log_funding_info!( monitor) ) ;
824
776
let update_res = monitor. update_monitor ( update, & self . broadcaster , & self . fee_estimator , & self . logger ) ;
825
777
826
- let update_id = MonitorUpdateId :: from_monitor_update ( update) ;
778
+ let update_id = update. update_id ;
827
779
let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
828
780
let persist_res = if update_res. is_err ( ) {
829
781
// Even if updating the monitor returns an error, the monitor's state will
@@ -832,9 +784,9 @@ where C::Target: chain::Filter,
832
784
// while reading `channel_monitor` with updates from storage. Instead, we should persist
833
785
// the entire `channel_monitor` here.
834
786
log_warn ! ( logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor" , log_funding_info!( monitor) ) ;
835
- self . persister . update_persisted_channel ( funding_txo, None , monitor, update_id )
787
+ self . persister . update_persisted_channel ( funding_txo, None , monitor)
836
788
} else {
837
- self . persister . update_persisted_channel ( funding_txo, Some ( update) , monitor, update_id )
789
+ self . persister . update_persisted_channel ( funding_txo, Some ( update) , monitor)
838
790
} ;
839
791
match persist_res {
840
792
ChannelMonitorUpdateStatus :: InProgress => {
0 commit comments