Skip to content

Commit dbdef7e

Browse files
authored
Gracefully handle the inability to spawn threads, take 2 (#31)
* Gracefully handle the inability to spawn threads * Ensure the thread limit is never zero
1 parent 59b51b6 commit dbdef7e

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ async-task = "4.0.2"
2121
atomic-waker = "1.0.0"
2222
fastrand = "1.3.4"
2323
futures-lite = "1.11.0"
24+
log = "0.4.17"

src/lib.rs

+25-7
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ use std::env;
8383
use std::fmt;
8484
use std::io::{self, Read, Seek, SeekFrom, Write};
8585
use std::mem;
86+
use std::num::NonZeroUsize;
8687
use std::panic;
8788
use std::pin::Pin;
8889
use std::slice;
@@ -120,9 +121,6 @@ struct Executor {
120121

121122
/// Used to put idle threads to sleep and wake them up when new work comes in.
122123
cvar: Condvar,
123-
124-
/// Maximum number of threads in the pool
125-
thread_limit: usize,
126124
}
127125

128126
/// Inner state of the blocking executor.
@@ -139,6 +137,9 @@ struct Inner {
139137

140138
/// The queue of blocking tasks.
141139
queue: VecDeque<Runnable>,
140+
141+
/// Maximum number of threads in the pool
142+
thread_limit: NonZeroUsize,
142143
}
143144

144145
impl Executor {
@@ -167,9 +168,9 @@ impl Executor {
167168
idle_count: 0,
168169
thread_count: 0,
169170
queue: VecDeque::new(),
171+
thread_limit: NonZeroUsize::new(thread_limit).unwrap(),
170172
}),
171173
cvar: Condvar::new(),
172-
thread_limit,
173174
}
174175
});
175176

@@ -232,7 +233,9 @@ impl Executor {
232233
fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
233234
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
234235
// already, then be aggressive: wake all idle threads and spawn one more thread.
235-
while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < self.thread_limit {
236+
while inner.queue.len() > inner.idle_count * 5
237+
&& inner.thread_count < inner.thread_limit.get()
238+
{
236239
// The new thread starts in idle state.
237240
inner.idle_count += 1;
238241
inner.thread_count += 1;
@@ -245,10 +248,25 @@ impl Executor {
245248
let id = ID.fetch_add(1, Ordering::Relaxed);
246249

247250
// Spawn the new thread.
248-
thread::Builder::new()
251+
if let Err(e) = thread::Builder::new()
249252
.name(format!("blocking-{}", id))
250253
.spawn(move || self.main_loop())
251-
.unwrap();
254+
{
255+
// We were unable to spawn the thread, so we need to undo the state changes.
256+
log::error!("Failed to spawn a blocking thread: {}", e);
257+
inner.idle_count -= 1;
258+
inner.thread_count -= 1;
259+
260+
// The current number of threads is likely to be the system's upper limit, so update
261+
// thread_limit accordingly.
262+
inner.thread_limit = {
263+
let new_limit = inner.thread_count;
264+
265+
// If the limit is about to be set to zero, set it to one instead so that if,
266+
// in the future, we are able to spawn more threads, we will be able to do so.
267+
NonZeroUsize::new(new_limit).unwrap_or_else(|| NonZeroUsize::new(1).unwrap())
268+
};
269+
}
252270
}
253271
}
254272
}

0 commit comments

Comments
 (0)