diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index 616756fb..118cb7e0 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -224,7 +224,7 @@ async fn main() -> anyhow::Result<()> { ctrlc::set_handler(move || { log::info!("Shutting down simulation."); - sim2.shutdown(); + sim2.abort(); })?; sim.run().await?; diff --git a/simln-lib/src/lib.rs b/simln-lib/src/lib.rs index 13d2ef78..8d710cc7 100644 --- a/simln-lib/src/lib.rs +++ b/simln-lib/src/lib.rs @@ -521,6 +521,8 @@ pub struct Simulation { /// High level triggers used to manage simulation tasks and shutdown. shutdown_trigger: Trigger, shutdown_listener: Listener, + abort_trigger: Trigger, + abort_listener: Listener, } #[derive(Clone)] @@ -548,6 +550,7 @@ impl Simulation { activity: Vec, ) -> Self { let (shutdown_trigger, shutdown_listener) = triggered::trigger(); + let (abort_trigger, abort_listener) = triggered::trigger(); Self { cfg, nodes, @@ -555,6 +558,8 @@ impl Simulation { results: Arc::new(Mutex::new(PaymentResultLogger::new())), shutdown_trigger, shutdown_listener, + abort_trigger, + abort_listener, } } @@ -661,90 +666,95 @@ impl Simulation { ); let mut tasks = JoinSet::new(); - // Before we start the simulation up, start tasks that will be responsible for gathering simulation data. - // The event channels are shared across our functionality: - // - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the - // final result of the event that it has taken. - // - Event Receiver: used by data reporting to receive events that have been simulated that need to be - // tracked and recorded. - let (event_sender, event_receiver) = channel(1); - self.run_data_collection(event_receiver, &mut tasks); - - // Get an execution kit per activity that we need to generate and spin up consumers for each source node. - let activities = match self.activity_executors().await { - Ok(a) => a, - Err(e) => { - // If we encounter an error while setting up the activity_executors, - // we need to shutdown and wait for tasks to finish. We have started background tasks in the - // run_data_collection function, so we should shut those down before returning. - self.shutdown(); - while let Some(res) = tasks.join_next().await { - if let Err(e) = res { - log::error!("Task exited with error: {e}."); - } - } - return Err(e); - }, - }; - let consumer_channels = self.dispatch_consumers( - activities - .iter() - .map(|generator| generator.source_info.pubkey) - .collect(), - event_sender.clone(), - &mut tasks, - ); - - // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. - // The producers will use their own JoinSet so that the simulation can be shutdown if they all finish. - let mut producer_tasks = JoinSet::new(); - match self - .dispatch_producers(activities, consumer_channels, &mut producer_tasks) - .await { - Ok(_) => {}, - Err(e) => { - // If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish. - // We have started background tasks in the run_data_collection function, - // so we should shut those down before returning. - self.shutdown(); - while let Some(res) = tasks.join_next().await { - if let Err(e) = res { - log::error!("Task exited with error: {e}."); + // A block to control the scope of consumer_channels and event_sender. These need to go out of scope so that receivers will close. + + // Before we start the simulation up, start tasks that will be responsible for gathering simulation data. + // The event channels are shared across our functionality: + // - Event Sender: used by the simulation to inform data reporting that it needs to start tracking the + // final result of the event that it has taken. + // - Event Receiver: used by data reporting to receive events that have been simulated that need to be + // tracked and recorded. + let (event_sender, event_receiver) = channel(1); + self.run_data_collection(event_receiver, &mut tasks); + + // Get an execution kit per activity that we need to generate and spin up consumers for each source node. + let activities = match self.activity_executors().await { + Ok(a) => a, + Err(e) => { + // If we encounter an error while setting up the activity_executors, + // we need to shutdown and wait for tasks to finish. We have started background tasks in the + // run_data_collection function, so we should shut those down before returning. + // The tasks started in run_data_collection are listening for the abort trigger. + self.abort_trigger.trigger(); + while let Some(res) = tasks.join_next().await { + if let Err(e) = res { + log::error!("Task exited with error: {e}."); + } } - } - return Err(e); - }, - } + return Err(e); + }, + }; + let consumer_channels = self.dispatch_consumers( + activities + .iter() + .map(|generator| generator.source_info.pubkey) + .collect(), + event_sender.clone(), + &mut tasks, + ); - // Start a task that waits for the producers to finish. - // If all producers finish, then there is nothing left to do and the simulation can be shutdown. - let producer_trigger = self.shutdown_trigger.clone(); - tasks.spawn(async move { - while let Some(res) = producer_tasks.join_next().await { - if let Err(e) = res { - log::error!("Producer exited with error: {e}."); - } + // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. + // The producers will use their own JoinSet so that the simulation can be shutdown if they all finish. + let mut producer_tasks = JoinSet::new(); + match self + .dispatch_producers(activities, consumer_channels, &mut producer_tasks) + .await + { + Ok(_) => {}, + Err(e) => { + // If we encounter an error in dispatch_producers, we need to shutdown and wait for tasks to finish. + // We have started background tasks in the run_data_collection function, + // so we should shut those down before returning. + self.shutdown(); + while let Some(res) = tasks.join_next().await { + if let Err(e) = res { + log::error!("Task exited with error: {e}."); + } + } + return Err(e); + }, } - log::info!("All producers finished. Shutting down."); - producer_trigger.trigger() - }); - - // Start a task that will shutdown the simulation if the total_time is met. - if let Some(total_time) = self.cfg.total_time { - let t = self.shutdown_trigger.clone(); - let l = self.shutdown_listener.clone(); + // Start a task that waits for the producers to finish. + // If all producers finish, then there is nothing left to do and the simulation can be shutdown. + let producer_trigger = self.shutdown_trigger.clone(); tasks.spawn(async move { - if time::timeout(total_time, l).await.is_err() { - log::info!( - "Simulation run for {}s. Shutting down.", - total_time.as_secs() - ); - t.trigger() + while let Some(res) = producer_tasks.join_next().await { + if let Err(e) = res { + log::error!("Producer exited with error: {e}."); + } } + log::info!("All producers finished. Shutting down."); + producer_trigger.trigger() }); - } + + // Start a task that will shutdown the simulation if the total_time is met. + if let Some(total_time) = self.cfg.total_time { + let t = self.shutdown_trigger.clone(); + let l = self.shutdown_listener.clone(); + + tasks.spawn(async move { + if time::timeout(total_time, l).await.is_err() { + log::info!( + "Simulation run for {}s. Shutting down.", + total_time.as_secs() + ); + t.trigger() + } + }); + } + } // A block to control the scope of consumer_channels and event_sender. These need to go out of scope so that receivers will close. // We always want to wait for all threads to exit, so we wait for all of them to exit and track any errors // that surface. It's okay if there are multiple and one is overwritten, we just want to know whether we @@ -764,6 +774,11 @@ impl Simulation { self.shutdown_trigger.trigger() } + pub fn abort(&self) { + self.shutdown_trigger.trigger(); + self.abort_trigger.trigger(); + } + pub async fn get_total_payments(&self) -> u64 { self.results.lock().await.total_attempts() } @@ -779,8 +794,6 @@ impl Simulation { output_receiver: Receiver, tasks: &mut JoinSet<()>, ) { - let listener = self.shutdown_listener.clone(); - let shutdown = self.shutdown_trigger.clone(); log::debug!("Setting up simulator data collection."); // Create a sender/receiver pair that will be used to report final results of simulation. @@ -788,8 +801,10 @@ impl Simulation { let nodes = self.nodes.clone(); // psr: produce simulation results - let psr_listener = listener.clone(); - let psr_shutdown = shutdown.clone(); + // psr should trigger a clean shutdown if there is an error + // psr should listen for an abort, not shutdown because it will cleanly shutdown when the receiver closes + let psr_listener = self.abort_listener.clone(); + let psr_shutdown = self.shutdown_trigger.clone(); tasks.spawn(async move { log::debug!("Starting simulation results producer."); if let Err(e) = @@ -806,11 +821,15 @@ impl Simulation { let result_logger = self.results.clone(); let result_logger_clone = result_logger.clone(); - let result_logger_listener = listener.clone(); + // rl: results logger + // rl should listen for both shutdowns and aborts because it does not have any channels that will cause a shutdown + let rl_shutdown_listener = self.shutdown_listener.clone(); + let rl_abort_listener = self.abort_listener.clone(); tasks.spawn(async move { log::debug!("Starting results logger."); run_results_logger( - result_logger_listener, + rl_shutdown_listener, + rl_abort_listener, result_logger_clone, Duration::from_secs(60), ) @@ -819,18 +838,22 @@ impl Simulation { }); // csr: consume simulation results + // crs should trigger a clean shutdown if there is an error + // crs should listen for an abort, not shutdown because it will cleanly shutdown when the receiver closes let csr_write_results = self.cfg.write_results.clone(); + let csr_shutdown = self.shutdown_trigger.clone(); + let csr_abort = self.abort_listener.clone(); tasks.spawn(async move { log::debug!("Starting simulation results consumer."); if let Err(e) = consume_simulation_results( result_logger, results_receiver, - listener, + csr_abort, csr_write_results, ) .await { - shutdown.trigger(); + csr_shutdown.trigger(); log::error!("Consume simulation results exited with error: {e:?}."); } else { log::debug!("Consume simulation result received shutdown signal."); @@ -957,7 +980,7 @@ impl Simulation { // Generate a consumer for the receiving end of the channel. It takes the event receiver that it'll pull // events from and the results sender to report the events it has triggered for further monitoring. // ce: consume event - let ce_listener = self.shutdown_listener.clone(); + let ce_listener = self.abort_listener.clone(); let ce_shutdown = self.shutdown_trigger.clone(); let ce_output_sender = output_sender.clone(); let ce_node = node.clone(); @@ -1316,7 +1339,8 @@ impl Display for PaymentResultLogger { /// Note that `run_results_logger` does not error in any way, thus it has no /// trigger. It listens for triggers to ensure clean exit. async fn run_results_logger( - listener: Listener, + shutdown_listener: Listener, + abort_listener: Listener, logger: Arc>, interval: Duration, ) { @@ -1325,7 +1349,11 @@ async fn run_results_logger( loop { select! { biased; - _ = listener.clone() => { + _ = shutdown_listener.clone() => { + break + } + + _ = abort_listener.clone() => { break }