Skip to content

Commit 9af6b95

Browse files
authored
Optimize jobserver try_acquire (#1037)
* Bump dep jobserver from 0.1.20 to 0.1.30 Signed-off-by: Jiahao XU <[email protected]> * Add `parallel::OnceLock` impl copied from `std::sync::OnceLock` Signed-off-by: Jiahao XU <[email protected]> * Optimize `inherited_jobserver` acquire First try `jobserver::Client::try_acquire`, which will work: - If a fifo is used as jobserver - On linux and: - preadv2 with non-blocking read available (>=5.6) - /proc is available - On Windows - On wasm if not, we will simply fallback to help thread implementation, spawning one thread to maintain compatibility with other platforms. Signed-off-by: Jiahao XU <[email protected]> * Use `OnceLock` in `JobTokenServer::new` Also impls `Send`, `Sync`, `RefUnwindSafe` and `UnwindSafed` when the `T` meets the criterior. Signed-off-by: Jiahao XU <[email protected]> * Replace vendored `OnceLock` with dep `once_cell` Signed-off-by: Jiahao XU <[email protected]> * Fix dep: `once_cell` is needed on all targets whenever feature parallel is enabled. Signed-off-by: Jiahao XU <[email protected]> * Refactor: `ActiveJobTokenServer::new` no longer returns `Result` There is no need to, it never fails. Signed-off-by: Jiahao XU <[email protected]> * Add back TODO --------- Signed-off-by: Jiahao XU <[email protected]>
1 parent c284566 commit 9af6b95

File tree

3 files changed

+62
-46
lines changed

3 files changed

+62
-46
lines changed

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ edition = "2018"
2020
rust-version = "1.63"
2121

2222
[dependencies]
23-
jobserver = { version = "0.1.20", default-features = false, optional = true }
23+
jobserver = { version = "0.1.30", default-features = false, optional = true }
24+
once_cell = { version = "1.19", optional = true }
2425

2526
[target.'cfg(unix)'.dependencies]
2627
# Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
2728
# which is still an issue with `resolver = "1"`.
2829
libc = { version = "0.2.62", default-features = false, optional = true }
2930

3031
[features]
31-
parallel = ["libc", "jobserver"]
32+
parallel = ["libc", "jobserver", "once_cell"]
3233

3334
[dev-dependencies]
3435
tempfile = "3"

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1449,7 +1449,7 @@ impl Build {
14491449
}
14501450

14511451
// Limit our parallelism globally with a jobserver.
1452-
let tokens = parallel::job_token::ActiveJobTokenServer::new()?;
1452+
let tokens = parallel::job_token::ActiveJobTokenServer::new();
14531453

14541454
// When compiling objects in parallel we do a few dirty tricks to speed
14551455
// things up:

src/parallel/job_token.rs

+58-43
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
use std::{marker::PhantomData, mem::MaybeUninit, sync::Once};
1+
use std::marker::PhantomData;
22

33
use crate::Error;
44

5+
use once_cell::sync::OnceCell;
6+
57
pub(crate) struct JobToken(PhantomData<()>);
68

79
impl JobToken {
@@ -35,18 +37,13 @@ impl JobTokenServer {
3537
/// compilation.
3638
fn new() -> &'static Self {
3739
// TODO: Replace with a OnceLock once MSRV is 1.70
38-
static INIT: Once = Once::new();
39-
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
40-
41-
unsafe {
42-
INIT.call_once(|| {
43-
let server = inherited_jobserver::JobServer::from_env()
44-
.map(Self::Inherited)
45-
.unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()));
46-
JOBSERVER.write(server);
47-
});
48-
JOBSERVER.assume_init_ref()
49-
}
40+
static JOBSERVER: OnceCell<JobTokenServer> = OnceCell::new();
41+
42+
JOBSERVER.get_or_init(|| {
43+
unsafe { inherited_jobserver::JobServer::from_env() }
44+
.map(Self::Inherited)
45+
.unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()))
46+
})
5047
}
5148
}
5249

@@ -56,14 +53,12 @@ pub(crate) enum ActiveJobTokenServer {
5653
}
5754

5855
impl ActiveJobTokenServer {
59-
pub(crate) fn new() -> Result<Self, Error> {
56+
pub(crate) fn new() -> Self {
6057
match JobTokenServer::new() {
6158
JobTokenServer::Inherited(inherited_jobserver) => {
62-
inherited_jobserver.enter_active().map(Self::Inherited)
63-
}
64-
JobTokenServer::InProcess(inprocess_jobserver) => {
65-
Ok(Self::InProcess(inprocess_jobserver))
59+
Self::Inherited(inherited_jobserver.enter_active())
6660
}
61+
JobTokenServer::InProcess(inprocess_jobserver) => Self::InProcess(inprocess_jobserver),
6762
}
6863
}
6964

@@ -76,7 +71,7 @@ impl ActiveJobTokenServer {
7671
}
7772

7873
mod inherited_jobserver {
79-
use super::JobToken;
74+
use super::{JobToken, OnceCell};
8075

8176
use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
8277

@@ -139,31 +134,39 @@ mod inherited_jobserver {
139134
}
140135
}
141136

142-
pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
143-
ActiveJobServer::new(self)
137+
pub(super) fn enter_active(&self) -> ActiveJobServer<'_> {
138+
ActiveJobServer {
139+
jobserver: self,
140+
helper_thread: OnceCell::new(),
141+
}
144142
}
145143
}
146144

147-
pub(crate) struct ActiveJobServer<'a> {
148-
jobserver: &'a JobServer,
149-
helper_thread: jobserver::HelperThread,
145+
struct HelperThread {
146+
inner: jobserver::HelperThread,
150147
/// When rx is dropped, all the token stored within it will be dropped.
151148
rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
152149
}
153150

154-
impl<'a> ActiveJobServer<'a> {
155-
fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
151+
impl HelperThread {
152+
fn new(jobserver: &JobServer) -> Result<Self, Error> {
156153
let (tx, rx) = mpsc::channel();
157154

158155
Ok(Self {
159156
rx,
160-
helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
157+
inner: jobserver.inner.clone().into_helper_thread(move |res| {
161158
let _ = tx.send(res);
162159
})?,
163-
jobserver,
164160
})
165161
}
162+
}
163+
164+
pub(crate) struct ActiveJobServer<'a> {
165+
jobserver: &'a JobServer,
166+
helper_thread: OnceCell<HelperThread>,
167+
}
166168

169+
impl<'a> ActiveJobServer<'a> {
167170
pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
168171
let mut has_requested_token = false;
169172

@@ -173,26 +176,38 @@ mod inherited_jobserver {
173176
break Ok(JobToken::new());
174177
}
175178

176-
// Cold path, no global implicit token, obtain one
177-
match self.rx.try_recv() {
178-
Ok(res) => {
179-
let acquired = res?;
179+
match self.jobserver.inner.try_acquire() {
180+
Ok(Some(acquired)) => {
180181
acquired.drop_without_releasing();
181182
break Ok(JobToken::new());
182183
}
183-
Err(mpsc::TryRecvError::Disconnected) => {
184-
break Err(Error::new(
185-
ErrorKind::JobserverHelpThreadError,
186-
"jobserver help thread has returned before ActiveJobServer is dropped",
187-
))
188-
}
189-
Err(mpsc::TryRecvError::Empty) => {
190-
if !has_requested_token {
191-
self.helper_thread.request_token();
192-
has_requested_token = true;
184+
Ok(None) => YieldOnce::default().await,
185+
Err(err) if err.kind() == io::ErrorKind::Unsupported => {
186+
// Fallback to creating a help thread with blocking acquire
187+
let helper_thread = self
188+
.helper_thread
189+
.get_or_try_init(|| HelperThread::new(&self.jobserver))?;
190+
191+
match helper_thread.rx.try_recv() {
192+
Ok(res) => {
193+
let acquired = res?;
194+
acquired.drop_without_releasing();
195+
break Ok(JobToken::new());
196+
}
197+
Err(mpsc::TryRecvError::Disconnected) => break Err(Error::new(
198+
ErrorKind::JobserverHelpThreadError,
199+
"jobserver help thread has returned before ActiveJobServer is dropped",
200+
)),
201+
Err(mpsc::TryRecvError::Empty) => {
202+
if !has_requested_token {
203+
helper_thread.inner.request_token();
204+
has_requested_token = true;
205+
}
206+
YieldOnce::default().await
207+
}
193208
}
194-
YieldOnce::default().await
195209
}
210+
Err(err) => break Err(err.into()),
196211
}
197212
}
198213
}

0 commit comments

Comments
 (0)