Skip to content

Commit f238bae

Browse files
committed
Recover from panic without requiring Unwind safety in the job
1 parent b54c09f commit f238bae

File tree

3 files changed

+95
-41
lines changed

3 files changed

+95
-41
lines changed

src/future.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use parking_lot::{Mutex, MutexGuard};
1616
use std::{
1717
future::Future,
18+
panic::{AssertUnwindSafe, RefUnwindSafe},
1819
sync::{
1920
atomic::{AtomicBool, Ordering},
2021
Arc,
@@ -33,14 +34,14 @@ pub struct PromiseFuture<T> {
3334
}
3435

3536
struct PromiseShared<T> {
36-
inner: Mutex<PromiseInner<T>>,
37+
inner: AssertUnwindSafe<Mutex<PromiseInner<T>>>,
3738
promise_dropped: AtomicBool,
3839
}
3940

4041
impl<T> Default for PromiseShared<T> {
4142
fn default() -> Self {
4243
Self {
43-
inner: Default::default(),
44+
inner: AssertUnwindSafe(Default::default()),
4445
promise_dropped: Default::default(),
4546
}
4647
}
@@ -86,7 +87,7 @@ impl<T> Drop for Promise<T> {
8687
}
8788
}
8889

89-
impl<T: Clone> Promise<T> {
90+
impl<T: Clone + RefUnwindSafe> Promise<T> {
9091
/// Create the sending and receiving parts of the promise.
9192
pub fn new() -> (Promise<T>, PromiseFuture<T>) {
9293
let shared = Default::default();

src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ use parking_lot::Mutex;
8686

8787
use std::{
8888
fmt,
89-
panic::UnwindSafe,
9089
sync::Arc,
9190
time::{Duration, Instant},
9291
};
@@ -183,7 +182,7 @@ impl<J: Job + Send + Clone + 'static> Default for Builder<J, source::IntervalRec
183182
}
184183
}
185184

186-
impl<J: Job + UnwindSafe + 'static, R: RecurringJob<J> + Send + 'static> Builder<J, R> {
185+
impl<J: Job + 'static, R: RecurringJob<J> + Send + 'static> Builder<J, R> {
187186
/// Function determining, for each priority, how many threads can be allocated to jobs of this priority, any remaining threads will be left idle to service higher-priority jobs. `None` means parallelism won't be limited
188187
pub fn limit_concurrency(
189188
mut self,

src/runner.rs

Lines changed: 90 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use parking_lot::{Mutex, MutexGuard};
22
use std::{
33
fmt::Debug,
44
iter,
5-
panic::{self, UnwindSafe},
65
sync::{Arc, Barrier},
76
thread::{self, JoinHandle},
87
};
@@ -28,7 +27,7 @@ pub fn spawn<J, R: RecurringJob<J> + Send + 'static>(
2827
concurrency_limit: Box<ConcurrencyLimitFn<J>>,
2928
) -> Vec<JoinHandle<()>>
3029
where
31-
J: Job + UnwindSafe + 'static,
30+
J: Job + 'static,
3231
<J as Prioritised>::Priority: Send,
3332
{
3433
let queue = jobs.lock().queue();
@@ -41,57 +40,112 @@ where
4140
thread::Builder::new()
4241
.name(format!("gaffer#{}", state.worker_index))
4342
.spawn(move || {
44-
run(state, jobs, queue, barrier, recv);
43+
Runner::new(state, jobs, queue).run(barrier, recv);
4544
})
4645
.unwrap()
4746
})
4847
.collect()
4948
}
5049

51-
/// Run the runner loop, `ready_barrier` syncronizes with the start of the other runners and decides the initial supervisor
52-
fn run<J: Job + UnwindSafe + 'static, R: RecurringJob<J>>(
50+
struct Runner<J: Job + 'static, R: RecurringJob<J> + Send + 'static> {
5351
state: RunnerState<J>,
5452
jobs: Arc<Mutex<SourceManager<J, R>>>,
5553
queue: Arc<Mutex<PriorityQueue<J>>>,
56-
ready_barrier: Arc<Barrier>,
57-
recv: crossbeam_channel::Receiver<J>,
58-
) -> ! {
59-
let mut job = if ready_barrier.wait().is_leader() {
60-
// become the supervisor
61-
state.become_supervisor();
62-
run_supervisor(&state, &jobs)
63-
} else {
64-
// worker is available
65-
recv.recv()
66-
.expect("Available worker is not connected to shared runner state")
67-
};
68-
drop(recv);
69-
loop {
70-
let _ = panic::catch_unwind(|| job.execute()); // so a panicking job doesn't kill workers
71-
let transition = state.completed_job(queue.lock().drain());
72-
job = match transition {
54+
}
55+
56+
impl<J, R> Runner<J, R>
57+
where
58+
J: Job + 'static,
59+
R: RecurringJob<J> + Send,
60+
{
61+
fn new(
62+
state: RunnerState<J>,
63+
jobs: Arc<Mutex<SourceManager<J, R>>>,
64+
queue: Arc<Mutex<PriorityQueue<J>>>,
65+
) -> Self {
66+
Self { state, jobs, queue }
67+
}
68+
69+
/// Run the runner loop, `ready_barrier` syncronizes with the start of the other runners and decides the initial supervisor
70+
fn run(self, ready_barrier: Arc<Barrier>, recv: crossbeam_channel::Receiver<J>) -> ! {
71+
let job = if ready_barrier.wait().is_leader() {
72+
// become the supervisor
73+
self.state.become_supervisor();
74+
self.run_supervisor()
75+
} else {
76+
// worker is available
77+
recv.recv()
78+
.expect("Available worker is not connected to shared runner state")
79+
};
80+
drop(recv);
81+
self.run_worker(job);
82+
}
83+
84+
fn run_worker(self, mut job: J) -> ! {
85+
loop {
86+
job.execute(); // so a panicking job doesn't kill workers
87+
job = self.next_job();
88+
}
89+
}
90+
91+
fn next_job(&self) -> J {
92+
let transition = self.state.completed_job(self.queue.lock().drain());
93+
match transition {
7394
PostJobTransition::BecomeAvailable(recv) => recv
7495
.recv()
7596
.expect("Available worker is not connected to shared runner state"),
76-
PostJobTransition::BecomeSupervisor => run_supervisor(&state, &jobs),
97+
PostJobTransition::BecomeSupervisor => self.run_supervisor(),
7798
PostJobTransition::KeepWorking(job) => job,
78-
};
99+
}
100+
}
101+
102+
/// Run the supervisor loop, jobs are retrieved and assigned. Returns when the supervisor has a job to execute and it becomes a worker
103+
fn run_supervisor(&self) -> J {
104+
let mut wait_for_new = false;
105+
let mut jobs = self.jobs.lock();
106+
loop {
107+
if let Some(job) = self.state.assign_jobs(jobs.get(wait_for_new)) {
108+
// become a worker
109+
return job;
110+
}
111+
wait_for_new = true;
112+
}
113+
}
114+
115+
/// Entry point for a new thread, replacing one which panicked whilst executing a job
116+
fn panic_recover(self) -> ! {
117+
let job = self.next_job();
118+
self.run_worker(job);
79119
}
80120
}
81121

82-
/// Run the supervisor loop, jobs are retrieved and assigned. Returns when the supervisor has a job to execute and it becomes a worker
83-
fn run_supervisor<J: Job + 'static, R: RecurringJob<J>>(
84-
state: &RunnerState<J>,
85-
jobs: &Arc<Mutex<SourceManager<J, R>>>,
86-
) -> J {
87-
let mut wait_for_new = false;
88-
let mut jobs = jobs.lock();
89-
loop {
90-
if let Some(job) = state.assign_jobs(jobs.get(wait_for_new)) {
91-
// become a worker
92-
return job;
122+
impl<J: Job + 'static, R: RecurringJob<J> + Send + 'static> Drop for Runner<J, R> {
123+
fn drop(&mut self) {
124+
if thread::panicking() {
125+
// spawn another thread to take over
126+
let Runner {
127+
state:
128+
RunnerState {
129+
workers,
130+
worker_index,
131+
concurrency_limit,
132+
},
133+
jobs,
134+
queue,
135+
} = self;
136+
let state = RunnerState {
137+
workers: workers.clone(),
138+
worker_index: *worker_index,
139+
concurrency_limit: concurrency_limit.clone(),
140+
};
141+
let runner = Runner::new(state, jobs.clone(), queue.clone());
142+
thread::Builder::new()
143+
.name(format!("gaffer#{}", worker_index))
144+
.spawn(move || {
145+
runner.panic_recover();
146+
})
147+
.unwrap();
93148
}
94-
wait_for_new = true;
95149
}
96150
}
97151

0 commit comments

Comments
 (0)