Skip to content

fix memory buildup from JoinSet #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2025

Conversation

elnosh
Copy link
Collaborator

@elnosh elnosh commented Mar 9, 2025

I ended up taking a different approach from what I mentioned here #221 (comment) to fix the memory buildup.

This adds a new dependency tokio-util but since it's from tokio which is already used, I though it would be ok. If not, let me know and I can try something else.

It uses a TaskTracker from tokio-util which works very similar to a JoinSet but with the difference that it will free the memory from tasks when they are finished without having to call something like join_next for a JoinSet.

From their docs https://docs.rs/tokio-util/latest/tokio_util/task/task_tracker/struct.TaskTracker.html#comparison-to-joinset:

A JoinSet keeps track of the return value of every inserted task. This means that if the caller keeps inserting tasks and never calls join_next, then their return values will keep building up and consuming memory, even if most of the tasks have already exited. This can cause the process to run out of memory. With a TaskTracker, this does not happen. Once tasks exit, they are immediately removed from the TaskTracker.

I tested it with the scenario from here: https://github.com/carlaKC/sim-ln/tree/review-club-memleak and it's avoiding the memory buildup that was happening with the JoinSet.

@carlaKC
Copy link
Contributor

carlaKC commented Mar 12, 2025

It uses a TaskTracker from tokio-util which works very similar to a JoinSet but with the difference that it will free the memory from tasks when they are finished without having to call something like join_next for a JoinSet.

Cool 😻 happy to add the dep!

A few bigger picture thoughts for this PR:

  1. Let's go ahead and replace all the Joinsets so that we can be consistent across the project
  2. WDYT about having a single TaskTracker that we pass into Simulation and SimGraph? That way we can get rid of SimGraph.wait_for_shutdown and Simulation.run takes care of waiting for all our tasks to clean up.

I'm also tempted to switch out triggered with CancellationToken to standardize on this library, but one thing at a time :')

@elnosh
Copy link
Collaborator Author

elnosh commented Mar 12, 2025

WDYT about having a single TaskTracker that we pass into Simulation and SimGraph? That way we can get rid of SimGraph.wait_for_shutdown and Simulation.run takes care of waiting for all our tasks to clean up.

sounds good to me :) Only a couple of questions. I see the JoinSets used in Simulation are created as needed inside of Simulation.run, so you mean have something like a shared TaskTracker as fields in both the Simulation and SimGraph? If that's the case, they will need to be created together in a method like this one: https://github.com/carlaKC/sim-ln/blob/review-club-memleak/simln-lib/src/lib.rs#L590 when SimGraph starts being used?

Let's go ahead and replace all the Joinsets so that we can be consistent across the project

cool, will do and consolidate on TaskTracker when we settle on approach regarding previous point of sharing it between Simulation and SimGraph.

I'm also tempted to switch out triggered with CancellationToken to standardize on this library, but one thing at a time :')

👍

@carlaKC
Copy link
Contributor

carlaKC commented Mar 12, 2025

so you mean have something like a shared TaskTracker as fields in both the Simulation and SimGraph

Yes that's what I was thinking, you create one TaskTracker in main.rs and then pass it in to whatever constructor needs it. That approach will also be helpful for people using sim-ln as a library - you can provide the simulator with a TaskTracker and also use it for whatever other tasks you need to run.

@elnosh
Copy link
Collaborator Author

elnosh commented Mar 12, 2025

I've added the changes as suggested to replace JoinSets with TaskTracker in other places and adding a tasks field to Simulation. Still creating a producer_tasks inside run to track activity tasks alone and know if it can shutdown if those have finished.

Comment on lines 743 to 744
self.tasks.close();
self.tasks.wait().await;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only place I called them alone to wait for any tasks and return

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pre-existing from the current design, but WDYT about having no task management in this method at all (except for producer_tasks which is internal), and then just wrapping it in a thin method that does the task management for us?

Eg:

pub fn run() {
   self.internal_run()
   self.tasks.close()
   self.tasks.wait()
}

fn internal_run() {
    <this method logic>
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like it better with everything in run but I don't have strong opinions so I can do the internal_run if you prefer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like it better with everything in run

motivation?

My thinking was that it's a little more future proof to have an internal_run just handle task cleanup, so that we don't run the risk of forgetting to handle tasks if we add a new exit point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

motivation?

no reason really 😅

My thinking was that it's a little more future proof to have an internal_run just handle task cleanup,

yeah that's why I thought about maybe putting them in the shutdown method so that it could handle that cleanup too but shutdown isn't always called. Anyways, I'm sold, I'll move the cleanup to an internal_run method

Copy link
Contributor

@carlaKC carlaKC left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good!

Only one major comment about how we're going to clean up run a bit 👍

}
}

self.tasks.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can get rid of this method completely and make a note that SimGraph expects the caller to wait for TaskTracker to close + complete? This method makes less sense in a world where we're sharing trackers IMO.

Ah, this is done in the last commit (reviewing one commit at a time). Could we move that into the first commit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I didn't clean up the commits. I was thinking this could all be squashed into one commit as it all encapsulates the change to TaskTracker from JoinSet or should I have 3 separate ones as I currently have?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this could all be squashed into one commit as it all encapsulates the change to TaskTracker from JoinSet

That sounds good! I was just looking at them individually because they weren't fixups, but that makes sense.

Why don't you go ahead and squash the next time you push, and then we can do fixups if there are any further changes? Just so it's easier to track in review.

Comment on lines 521 to 523
/// track all tasks spawned for use in the simulation.
tasks: TaskTracker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitnit: capitalize "track" and make a note that the run function will wait for all tasks to complete so that people passing a tracker in that they're using elsewhere know that it'll be handled.

I think that we can put that both here and on run because it's important.

Comment on lines 743 to 744
self.tasks.close();
self.tasks.wait().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pre-existing from the current design, but WDYT about having no task management in this method at all (except for producer_tasks which is internal), and then just wrapping it in a thin method that does the task management for us?

Eg:

pub fn run() {
   self.internal_run()
   self.tasks.close()
   self.tasks.wait()
}

fn internal_run() {
    <this method logic>
}

@elnosh
Copy link
Collaborator Author

elnosh commented Mar 14, 2025

I've squashed everything into one commit and tried to address the comments from your last review (:

Copy link
Contributor

@carlaKC carlaKC left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last few comments from me, sorry I didn't pick them up on the last review round!

@@ -518,6 +518,10 @@ pub struct Simulation {
activity: Vec<ActivityDefinition>,
/// Results logger that holds the simulation statistics.
results: Arc<Mutex<PaymentResultLogger>>,
/// Track all tasks spawned for use in the simulation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comments can wrap at 100, here and a few other places

@@ -682,24 +682,10 @@ impl SimGraph {
Ok(SimGraph {
nodes,
channels: Arc::new(Mutex::new(channels)),
tasks: JoinSet::new(),
tasks: TaskTracker::new(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with Simulation, let's pass tasks in here as well?

Even though we'll probably use a different constructor to run with a SimGraph when we implement this internally, we probably want other use cases that are using sim-ln as a library to have to provide (and manage) their own tasks - just gives more flexibility.

@@ -1351,7 +1341,7 @@ async fn produce_simulation_results(
results: Sender<(Payment, PaymentResult)>,
listener: Listener,
) -> Result<(), SimulationError> {
let mut set = tokio::task::JoinSet::new();
let set = TaskTracker::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can pass the top level task set in here as well rather than having a new set?

This is pre-existing, sorry I didn't mention it earlier!

JoinSet was causing memory to build up while running because we were waiting
until shutdown to clean those tasks with join_next. Using a TaskTracker
because it will free the memory from the tasks immediately after they exit.
Copy link
Contributor

@carlaKC carlaKC left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tACK on 3a94d61, thanks for picking this one up!

@@ -638,8 +638,9 @@ pub struct SimGraph {
/// channels maps the scid of a channel to its current simulation state.
channels: Arc<Mutex<HashMap<ShortChannelID, SimulatedChannel>>>,

/// track all tasks spawned to process payments in the graph.
tasks: JoinSet<()>,
/// track all tasks spawned to process payments in the graph. Note that handling the shutdown of tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: capitalize comment

Comment on lines +652 to +658
/// run until the simulation completes or we hit an error.
/// Note that it will wait for the tasks in self.tasks to complete
/// before returning.
pub async fn run(&self) -> Result<(), SimulationError> {
self.internal_run().await?;
// Close our TaskTracker and wait for any background tasks
// spawned during internal_run to complete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: line wrapping on comments

@carlaKC carlaKC merged commit f272dec into bitcoin-dev-project:main Mar 17, 2025
2 checks passed
@carlaKC carlaKC mentioned this pull request Mar 17, 2025
@elnosh elnosh deleted the use-tasktracker branch May 13, 2025 22:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants