Skip to content

Commit 4b807d7

Browse files
committed
Stop cleaning monitor updates on new block connect
Previously, we used to cleanup monitor updates at both consolidation threshold and new block connects. With this change we will only cleanup when our consolidation criteria is met. Also, we remove monitor read from cleanup logic, in case of update consolidation. Note: In case of channel-closing monitor update, we still need to read the old monitor before persisting the new one in order to determine the cleanup range.
1 parent f07f4b9 commit 4b807d7

File tree

1 file changed

+83
-87
lines changed

1 file changed

+83
-87
lines changed

lightning/src/util/persist.rs

Lines changed: 83 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::chain::chainmonitor::{Persist, MonitorUpdateId};
2424
use crate::sign::{EntropySource, NodeSigner, ecdsa::WriteableEcdsaChannelSigner, SignerProvider};
2525
use crate::chain::transaction::OutPoint;
2626
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
27+
use crate::chain::ChannelMonitorUpdateStatus;
2728
use crate::ln::channelmanager::ChannelManager;
2829
use crate::routing::router::Router;
2930
use crate::routing::gossip::NetworkGraph;
@@ -346,9 +347,10 @@ where
346347
///
347348
/// # Pruning stale channel updates
348349
///
349-
/// Stale updates are pruned when a full monitor is written. The old monitor is first read, and if
350-
/// that succeeds, updates in the range between the old and new monitors are deleted. The `lazy`
351-
/// flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
350+
/// Stale updates are pruned when consolidation threshold is reached according to `maximum_pending_updates`.
351+
/// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates`
352+
/// are deleted.
353+
/// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions
352354
/// will complete. However, stale updates are not a problem for data integrity, since updates are
353355
/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`.
354356
///
@@ -610,24 +612,6 @@ where
610612
) -> chain::ChannelMonitorUpdateStatus {
611613
// Determine the proper key for this monitor
612614
let monitor_name = MonitorName::from(funding_txo);
613-
let maybe_old_monitor = self.read_monitor(&monitor_name);
614-
match maybe_old_monitor {
615-
Ok((_, ref old_monitor)) => {
616-
// Check that this key isn't already storing a monitor with a higher update_id
617-
// (collision)
618-
if old_monitor.get_latest_update_id() > monitor.get_latest_update_id() {
619-
log_error!(
620-
self.logger,
621-
"Tried to write a monitor at the same outpoint {} with a higher update_id!",
622-
monitor_name.as_str()
623-
);
624-
return chain::ChannelMonitorUpdateStatus::UnrecoverableError;
625-
}
626-
}
627-
// This means the channel monitor is new.
628-
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
629-
_ => return chain::ChannelMonitorUpdateStatus::UnrecoverableError,
630-
}
631615
// Serialize and write the new monitor
632616
let mut monitor_bytes = Vec::with_capacity(
633617
MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
@@ -641,59 +625,6 @@ where
641625
&monitor_bytes,
642626
) {
643627
Ok(_) => {
644-
// Assess cleanup. Typically, we'll clean up only between the last two known full
645-
// monitors.
646-
if let Ok((_, old_monitor)) = maybe_old_monitor {
647-
let start = old_monitor.get_latest_update_id();
648-
let end = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
649-
// We don't want to clean the rest of u64, so just do possible pending
650-
// updates. Note that we never write updates at
651-
// `CLOSED_CHANNEL_UPDATE_ID`.
652-
cmp::min(
653-
start.saturating_add(self.maximum_pending_updates),
654-
CLOSED_CHANNEL_UPDATE_ID - 1,
655-
)
656-
} else {
657-
monitor.get_latest_update_id().saturating_sub(1)
658-
};
659-
// We should bother cleaning up only if there's at least one update
660-
// expected.
661-
for update_id in start..=end {
662-
let update_name = UpdateName::from(update_id);
663-
#[cfg(debug_assertions)]
664-
{
665-
if let Ok(update) =
666-
self.read_monitor_update(&monitor_name, &update_name)
667-
{
668-
// Assert that we are reading what we think we are.
669-
debug_assert_eq!(update.update_id, update_name.0);
670-
} else if update_id != start && monitor.get_latest_update_id() != CLOSED_CHANNEL_UPDATE_ID
671-
{
672-
// We're deleting something we should know doesn't exist.
673-
panic!(
674-
"failed to read monitor update {}",
675-
update_name.as_str()
676-
);
677-
}
678-
// On closed channels, we will unavoidably try to read
679-
// non-existent updates since we have to guess at the range of
680-
// stale updates, so do nothing.
681-
}
682-
if let Err(e) = self.kv_store.remove(
683-
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
684-
monitor_name.as_str(),
685-
update_name.as_str(),
686-
true,
687-
) {
688-
log_error!(
689-
self.logger,
690-
"error cleaning up channel monitor updates for monitor {}, reason: {}",
691-
monitor_name.as_str(),
692-
e
693-
);
694-
};
695-
}
696-
};
697628
chain::ChannelMonitorUpdateStatus::Completed
698629
}
699630
Err(e) => {
@@ -751,8 +682,43 @@ where
751682
}
752683
}
753684
} else {
754-
// We could write this update, but it meets criteria of our design that call for a full monitor write.
755-
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
685+
let monitor_name = MonitorName::from(funding_txo);
686+
// In case of channel-close monitor update, we need to read old monitor before persisting
687+
// the new one in order to determine the cleanup range.
688+
let maybe_old_monitor = match monitor.get_latest_update_id() {
689+
CLOSED_CHANNEL_UPDATE_ID => Some(self.read_monitor(&monitor_name)),
690+
_ => None
691+
};
692+
693+
// We could write this update, but it meets criteria of our design that calls for a full monitor write.
694+
let monitor_update_status = self.persist_new_channel(funding_txo, monitor, monitor_update_call_id);
695+
696+
if let ChannelMonitorUpdateStatus::Completed = monitor_update_status {
697+
let cleanup_range = if monitor.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
698+
match maybe_old_monitor {
699+
Some(Ok((_, ref old_monitor))) => {
700+
let start = old_monitor.get_latest_update_id();
701+
// We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
702+
let end = cmp::min(
703+
start.saturating_add(self.maximum_pending_updates),
704+
CLOSED_CHANNEL_UPDATE_ID - 1,
705+
);
706+
Some((start, end))
707+
}
708+
_ => None
709+
}
710+
} else {
711+
let end = monitor.get_latest_update_id();
712+
let start = end.saturating_sub(self.maximum_pending_updates);
713+
Some((start, end))
714+
};
715+
716+
if let Some((start, end)) = cleanup_range {
717+
self.cleanup_in_range(monitor_name, start, end);
718+
}
719+
}
720+
721+
monitor_update_status
756722
}
757723
} else {
758724
// There is no update given, so we must persist a new monitor.
@@ -761,6 +727,34 @@ where
761727
}
762728
}
763729

730+
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>
731+
where
732+
ES::Target: EntropySource + Sized,
733+
K::Target: KVStore,
734+
L::Target: Logger,
735+
SP::Target: SignerProvider + Sized
736+
{
737+
// Cleans up monitor updates for given monitor in range `start..=end`.
738+
fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
739+
for update_id in start..=end {
740+
let update_name = UpdateName::from(update_id);
741+
if let Err(e) = self.kv_store.remove(
742+
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
743+
monitor_name.as_str(),
744+
update_name.as_str(),
745+
true,
746+
) {
747+
log_error!(
748+
self.logger,
749+
"error cleaning up channel monitor updates for monitor {}, reason: {}",
750+
monitor_name.as_str(),
751+
e
752+
);
753+
};
754+
}
755+
}
756+
}
757+
764758
/// A struct representing a name for a monitor.
765759
#[derive(Debug)]
766760
struct MonitorName(String);
@@ -896,20 +890,21 @@ mod tests {
896890
#[test]
897891
fn persister_with_real_monitors() {
898892
// This value is used later to limit how many iterations we perform.
899-
let test_max_pending_updates = 7;
893+
let persister_0_max_pending_updates = 7;
894+
// Intentionally set this to a smaller value to test a different alignment.
895+
let persister_1_max_pending_updates = 3;
900896
let chanmon_cfgs = create_chanmon_cfgs(4);
901897
let persister_0 = MonitorUpdatingPersister {
902898
kv_store: &TestStore::new(false),
903899
logger: &TestLogger::new(),
904-
maximum_pending_updates: test_max_pending_updates,
900+
maximum_pending_updates: persister_0_max_pending_updates,
905901
entropy_source: &chanmon_cfgs[0].keys_manager,
906902
signer_provider: &chanmon_cfgs[0].keys_manager,
907903
};
908904
let persister_1 = MonitorUpdatingPersister {
909905
kv_store: &TestStore::new(false),
910906
logger: &TestLogger::new(),
911-
// Intentionally set this to a smaller value to test a different alignment.
912-
maximum_pending_updates: 3,
907+
maximum_pending_updates: persister_1_max_pending_updates,
913908
entropy_source: &chanmon_cfgs[1].keys_manager,
914909
signer_provider: &chanmon_cfgs[1].keys_manager,
915910
};
@@ -934,7 +929,6 @@ mod tests {
934929
node_cfgs[1].chain_monitor = chain_mon_1;
935930
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
936931
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
937-
938932
let broadcaster_0 = &chanmon_cfgs[2].tx_broadcaster;
939933
let broadcaster_1 = &chanmon_cfgs[3].tx_broadcaster;
940934

@@ -957,10 +951,11 @@ mod tests {
957951
for (_, mon) in persisted_chan_data_0.iter() {
958952
// check that when we read it, we got the right update id
959953
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
960-
// if the CM is at the correct update id without updates, ensure no updates are stored
954+
955+
// if the CM is at consolidation threshold, ensure no updates are stored.
961956
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
962-
let (_, cm_0) = persister_0.read_monitor(&monitor_name).unwrap();
963-
if cm_0.get_latest_update_id() == $expected_update_id {
957+
if mon.get_latest_update_id() % persister_0_max_pending_updates == 0
958+
|| mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
964959
assert_eq!(
965960
persister_0.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
966961
monitor_name.as_str()).unwrap().len(),
@@ -975,8 +970,9 @@ mod tests {
975970
for (_, mon) in persisted_chan_data_1.iter() {
976971
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
977972
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
978-
let (_, cm_1) = persister_1.read_monitor(&monitor_name).unwrap();
979-
if cm_1.get_latest_update_id() == $expected_update_id {
973+
// if the CM is at consolidation threshold, ensure no updates are stored.
974+
if mon.get_latest_update_id() % persister_1_max_pending_updates == 0
975+
|| mon.get_latest_update_id() == CLOSED_CHANNEL_UPDATE_ID {
980976
assert_eq!(
981977
persister_1.kv_store.list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
982978
monitor_name.as_str()).unwrap().len(),
@@ -1001,7 +997,7 @@ mod tests {
1001997
// Send a few more payments to try all the alignments of max pending updates with
1002998
// updates for a payment sent and received.
1003999
let mut sender = 0;
1004-
for i in 3..=test_max_pending_updates * 2 {
1000+
for i in 3..=persister_0_max_pending_updates * 2 {
10051001
let receiver;
10061002
if sender == 0 {
10071003
sender = 1;

0 commit comments

Comments
 (0)