diff --git a/compiler/rustc_data_structures/src/jobserver.rs b/compiler/rustc_data_structures/src/jobserver.rs index 1204f2d692d6c..3ed1ea7543f40 100644 --- a/compiler/rustc_data_structures/src/jobserver.rs +++ b/compiler/rustc_data_structures/src/jobserver.rs @@ -1,7 +1,8 @@ -use std::sync::{LazyLock, OnceLock}; +use std::sync::{Arc, LazyLock, OnceLock}; pub use jobserver_crate::{Acquired, Client, HelperThread}; use jobserver_crate::{FromEnv, FromEnvErrorKind}; +use parking_lot::{Condvar, Mutex}; // We can only call `from_env_ext` once per process @@ -71,10 +72,93 @@ pub fn client() -> Client { GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).clone() } -pub fn acquire_thread() { - GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).acquire_raw().ok(); +struct ProxyData { + /// The number of tokens assigned to threads. + /// If this is 0, a single token is still assigned to this process, but is unused. + used: u16, + + /// The number of threads requesting a token + pending: u16, +} + +/// This is a jobserver proxy used to ensure that we hold on to at least one token. +pub struct Proxy { + client: Client, + data: Mutex, + + /// Threads which are waiting on a token will wait on this. + wake_pending: Condvar, + + helper: OnceLock, } -pub fn release_thread() { - GLOBAL_CLIENT_CHECKED.get().expect(ACCESS_ERROR).release_raw().ok(); +impl Proxy { + pub fn new() -> Arc { + let proxy = Arc::new(Proxy { + client: client(), + data: Mutex::new(ProxyData { used: 1, pending: 0 }), + wake_pending: Condvar::new(), + helper: OnceLock::new(), + }); + let proxy_ = Arc::clone(&proxy); + let helper = proxy + .client + .clone() + .into_helper_thread(move |token| { + if let Ok(token) = token { + let mut data = proxy_.data.lock(); + if data.pending > 0 { + // Give the token to a waiting thread + token.drop_without_releasing(); + assert!(data.used > 0); + data.used += 1; + data.pending -= 1; + proxy_.wake_pending.notify_one(); + } else { + // The token is no longer needed, drop it. + drop(data); + drop(token); + } + } + }) + .expect("failed to create helper thread"); + proxy.helper.set(helper).unwrap(); + proxy + } + + pub fn acquire_thread(&self) { + let mut data = self.data.lock(); + + if data.used == 0 { + // There was a free token around. This can + // happen when all threads release their token. + assert_eq!(data.pending, 0); + data.used += 1; + } else { + // Request a token from the helper thread. We can't directly use `acquire_raw` + // as we also need to be able to wait for the final token in the process which + // does not get a corresponding `release_raw` call. + self.helper.get().unwrap().request_token(); + data.pending += 1; + self.wake_pending.wait(&mut data); + } + } + + pub fn release_thread(&self) { + let mut data = self.data.lock(); + + if data.pending > 0 { + // Give the token to a waiting thread + data.pending -= 1; + self.wake_pending.notify_one(); + } else { + data.used -= 1; + + // Release the token unless it's the last one in the process + if data.used > 0 { + drop(data); + self.client.release_raw().ok(); + } + } + } } diff --git a/compiler/rustc_data_structures/src/marker.rs b/compiler/rustc_data_structures/src/marker.rs index dfd9bd3207616..e0df1b232e134 100644 --- a/compiler/rustc_data_structures/src/marker.rs +++ b/compiler/rustc_data_structures/src/marker.rs @@ -59,8 +59,8 @@ macro_rules! already_send { // These structures are already `Send`. already_send!( [std::backtrace::Backtrace][std::io::Stdout][std::io::Stderr][std::io::Error][std::fs::File] - [rustc_arena::DroplessArena][crate::memmap::Mmap][crate::profiling::SelfProfiler] - [crate::owned_slice::OwnedSlice] + [rustc_arena::DroplessArena][jobserver_crate::Client][jobserver_crate::HelperThread] + [crate::memmap::Mmap][crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] ); macro_rules! impl_dyn_send { @@ -134,8 +134,8 @@ macro_rules! already_sync { already_sync!( [std::sync::atomic::AtomicBool][std::sync::atomic::AtomicUsize][std::sync::atomic::AtomicU8] [std::sync::atomic::AtomicU32][std::backtrace::Backtrace][std::io::Error][std::fs::File] - [jobserver_crate::Client][crate::memmap::Mmap][crate::profiling::SelfProfiler] - [crate::owned_slice::OwnedSlice] + [jobserver_crate::Client][jobserver_crate::HelperThread][crate::memmap::Mmap] + [crate::profiling::SelfProfiler][crate::owned_slice::OwnedSlice] ); // Use portable AtomicU64 for targets without native 64-bit atomics diff --git a/compiler/rustc_interface/src/interface.rs b/compiler/rustc_interface/src/interface.rs index 708fe23b79150..cf494f8d686e8 100644 --- a/compiler/rustc_interface/src/interface.rs +++ b/compiler/rustc_interface/src/interface.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use rustc_ast::{LitKind, MetaItemKind, token}; use rustc_codegen_ssa::traits::CodegenBackend; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; -use rustc_data_structures::jobserver; +use rustc_data_structures::jobserver::{self, Proxy}; use rustc_data_structures::stable_hasher::StableHasher; use rustc_errors::registry::Registry; use rustc_errors::{DiagCtxtHandle, ErrorGuaranteed}; @@ -40,7 +40,12 @@ pub struct Compiler { pub sess: Session, pub codegen_backend: Box, pub(crate) override_queries: Option, + + /// A reference to the current `GlobalCtxt` which we pass on to `GlobalCtxt`. pub(crate) current_gcx: CurrentGcx, + + /// A jobserver reference which we pass on to `GlobalCtxt`. + pub(crate) jobserver_proxy: Arc, } /// Converts strings provided as `--cfg [cfgspec]` into a `Cfg`. @@ -415,7 +420,7 @@ pub fn run_compiler(config: Config, f: impl FnOnce(&Compiler) -> R + Se config.opts.unstable_opts.threads, &config.extra_symbols, SourceMapInputs { file_loader, path_mapping, hash_kind, checksum_hash_kind }, - |current_gcx| { + |current_gcx, jobserver_proxy| { // The previous `early_dcx` can't be reused here because it doesn't // impl `Send`. Creating a new one is fine. let early_dcx = EarlyDiagCtxt::new(config.opts.error_format); @@ -511,6 +516,7 @@ pub fn run_compiler(config: Config, f: impl FnOnce(&Compiler) -> R + Se codegen_backend, override_queries: config.override_queries, current_gcx, + jobserver_proxy, }; // There are two paths out of `f`. diff --git a/compiler/rustc_interface/src/passes.rs b/compiler/rustc_interface/src/passes.rs index 66d2a79b93a4a..c95442d908d65 100644 --- a/compiler/rustc_interface/src/passes.rs +++ b/compiler/rustc_interface/src/passes.rs @@ -7,6 +7,7 @@ use std::{env, fs, iter}; use rustc_ast as ast; use rustc_codegen_ssa::traits::CodegenBackend; +use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::parallel; use rustc_data_structures::steal::Steal; use rustc_data_structures::sync::{AppendOnlyIndexVec, FreezeLock, WorkerLocal}; @@ -841,12 +842,13 @@ pub fn create_and_enter_global_ctxt FnOnce(TyCtxt<'tcx>) -> T>( dyn for<'tcx> FnOnce( &'tcx Session, CurrentGcx, + Arc, &'tcx OnceLock>, &'tcx WorkerLocal>, &'tcx WorkerLocal>, F, ) -> T, - > = Box::new(move |sess, current_gcx, gcx_cell, arena, hir_arena, f| { + > = Box::new(move |sess, current_gcx, jobserver_proxy, gcx_cell, arena, hir_arena, f| { TyCtxt::create_global_ctxt( gcx_cell, sess, @@ -865,6 +867,7 @@ pub fn create_and_enter_global_ctxt FnOnce(TyCtxt<'tcx>) -> T>( ), providers.hooks, current_gcx, + jobserver_proxy, |tcx| { let feed = tcx.create_crate_num(stable_crate_id).unwrap(); assert_eq!(feed.key(), LOCAL_CRATE); @@ -887,7 +890,15 @@ pub fn create_and_enter_global_ctxt FnOnce(TyCtxt<'tcx>) -> T>( ) }); - inner(&compiler.sess, compiler.current_gcx.clone(), &gcx_cell, &arena, &hir_arena, f) + inner( + &compiler.sess, + compiler.current_gcx.clone(), + Arc::clone(&compiler.jobserver_proxy), + &gcx_cell, + &arena, + &hir_arena, + f, + ) } /// Runs all analyses that we guarantee to run, even if errors were reported in earlier analyses. diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index c3a939f1ab086..3a291bbe802d9 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -1,11 +1,12 @@ use std::env::consts::{DLL_PREFIX, DLL_SUFFIX}; use std::path::{Path, PathBuf}; -use std::sync::OnceLock; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, OnceLock}; use std::{env, iter, thread}; use rustc_ast as ast; use rustc_codegen_ssa::traits::CodegenBackend; +use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::sync; use rustc_metadata::{DylibError, load_symbol_from_dylib}; use rustc_middle::ty::CurrentGcx; @@ -113,7 +114,7 @@ fn init_stack_size(early_dcx: &EarlyDiagCtxt) -> usize { }) } -fn run_in_thread_with_globals R + Send, R: Send>( +fn run_in_thread_with_globals) -> R + Send, R: Send>( thread_stack_size: usize, edition: Edition, sm_inputs: SourceMapInputs, @@ -139,7 +140,7 @@ fn run_in_thread_with_globals R + Send, R: Send>( edition, extra_symbols, Some(sm_inputs), - || f(CurrentGcx::new()), + || f(CurrentGcx::new(), Proxy::new()), ) }) .unwrap() @@ -152,7 +153,10 @@ fn run_in_thread_with_globals R + Send, R: Send>( }) } -pub(crate) fn run_in_thread_pool_with_globals R + Send, R: Send>( +pub(crate) fn run_in_thread_pool_with_globals< + F: FnOnce(CurrentGcx, Arc) -> R + Send, + R: Send, +>( thread_builder_diag: &EarlyDiagCtxt, edition: Edition, threads: usize, @@ -162,8 +166,8 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, ) -> R { use std::process; + use rustc_data_structures::defer; use rustc_data_structures::sync::FromDyn; - use rustc_data_structures::{defer, jobserver}; use rustc_middle::ty::tls; use rustc_query_impl::QueryCtxt; use rustc_query_system::query::{QueryContext, break_query_cycles}; @@ -178,11 +182,11 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, edition, sm_inputs, extra_symbols, - |current_gcx| { + |current_gcx, jobserver_proxy| { // Register the thread for use with the `WorkerLocal` type. registry.register(); - f(current_gcx) + f(current_gcx, jobserver_proxy) }, ); } @@ -190,10 +194,14 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, let current_gcx = FromDyn::from(CurrentGcx::new()); let current_gcx2 = current_gcx.clone(); + let proxy = Proxy::new(); + + let proxy_ = Arc::clone(&proxy); + let proxy__ = Arc::clone(&proxy); let builder = rayon_core::ThreadPoolBuilder::new() .thread_name(|_| "rustc".to_string()) - .acquire_thread_handler(jobserver::acquire_thread) - .release_thread_handler(jobserver::release_thread) + .acquire_thread_handler(move || proxy_.acquire_thread()) + .release_thread_handler(move || proxy__.release_thread()) .num_threads(threads) .deadlock_handler(move || { // On deadlock, creates a new thread and forwards information in thread @@ -257,7 +265,7 @@ pub(crate) fn run_in_thread_pool_with_globals R + Send, }, // Run `f` on the first thread in the thread pool. move |pool: &rayon_core::ThreadPool| { - pool.install(|| f(current_gcx.into_inner())) + pool.install(|| f(current_gcx.into_inner(), proxy)) }, ) .unwrap() diff --git a/compiler/rustc_middle/src/ty/context.rs b/compiler/rustc_middle/src/ty/context.rs index 1efd0d1d14beb..a185b29092a16 100644 --- a/compiler/rustc_middle/src/ty/context.rs +++ b/compiler/rustc_middle/src/ty/context.rs @@ -21,6 +21,7 @@ use rustc_data_structures::defer; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::fx::FxHashMap; use rustc_data_structures::intern::Interned; +use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::profiling::SelfProfilerRef; use rustc_data_structures::sharded::{IntoPointer, ShardedHashMap}; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; @@ -1438,6 +1439,9 @@ pub struct GlobalCtxt<'tcx> { pub(crate) alloc_map: interpret::AllocMap<'tcx>, current_gcx: CurrentGcx, + + /// A jobserver reference used to release then acquire a token while waiting on a query. + pub jobserver_proxy: Arc, } impl<'tcx> GlobalCtxt<'tcx> { @@ -1642,6 +1646,7 @@ impl<'tcx> TyCtxt<'tcx> { query_system: QuerySystem<'tcx>, hooks: crate::hooks::Providers, current_gcx: CurrentGcx, + jobserver_proxy: Arc, f: impl FnOnce(TyCtxt<'tcx>) -> T, ) -> T { let data_layout = s.target.parse_data_layout().unwrap_or_else(|err| { @@ -1676,6 +1681,7 @@ impl<'tcx> TyCtxt<'tcx> { data_layout, alloc_map: interpret::AllocMap::new(), current_gcx, + jobserver_proxy, }); // This is a separate function to work around a crash with parallel rustc (#135870) diff --git a/compiler/rustc_query_impl/src/plumbing.rs b/compiler/rustc_query_impl/src/plumbing.rs index 19ccc5587d6a6..ea37dc5489bd8 100644 --- a/compiler/rustc_query_impl/src/plumbing.rs +++ b/compiler/rustc_query_impl/src/plumbing.rs @@ -4,6 +4,7 @@ use std::num::NonZero; +use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::stable_hasher::{HashStable, StableHasher}; use rustc_data_structures::sync::{DynSend, DynSync}; use rustc_data_structures::unord::UnordMap; @@ -69,6 +70,11 @@ impl<'tcx> HasDepContext for QueryCtxt<'tcx> { impl<'tcx> QueryContext for QueryCtxt<'tcx> { type QueryInfo = QueryStackDeferred<'tcx>; + #[inline] + fn jobserver_proxy(&self) -> &Proxy { + &*self.jobserver_proxy + } + #[inline] fn next_job_id(self) -> QueryJobId { QueryJobId( diff --git a/compiler/rustc_query_system/src/query/job.rs b/compiler/rustc_query_system/src/query/job.rs index de35cd79ea275..6321abc5087f5 100644 --- a/compiler/rustc_query_system/src/query/job.rs +++ b/compiler/rustc_query_system/src/query/job.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use parking_lot::{Condvar, Mutex}; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; -use rustc_data_structures::jobserver; use rustc_errors::{Diag, DiagCtxtHandle}; use rustc_hir::def::DefKind; use rustc_session::Session; @@ -207,12 +206,13 @@ impl QueryLatch { /// Awaits for the query job to complete. pub(super) fn wait_on( &self, + qcx: impl QueryContext, query: Option, span: Span, ) -> Result<(), CycleError> { let waiter = Arc::new(QueryWaiter { query, span, cycle: Mutex::new(None), condvar: Condvar::new() }); - self.wait_on_inner(&waiter); + self.wait_on_inner(qcx, &waiter); // FIXME: Get rid of this lock. We have ownership of the QueryWaiter // although another thread may still have a Arc reference so we cannot // use Arc::get_mut @@ -224,7 +224,7 @@ impl QueryLatch { } /// Awaits the caller on this latch by blocking the current thread. - fn wait_on_inner(&self, waiter: &Arc>) { + fn wait_on_inner(&self, qcx: impl QueryContext, waiter: &Arc>) { let mut info = self.info.lock(); if !info.complete { // We push the waiter on to the `waiters` list. It can be accessed inside @@ -237,11 +237,12 @@ impl QueryLatch { // we have to be in the `wait` call. This is ensured by the deadlock handler // getting the self.info lock. rayon_core::mark_blocked(); - jobserver::release_thread(); + let proxy = qcx.jobserver_proxy(); + proxy.release_thread(); waiter.condvar.wait(&mut info); // Release the lock before we potentially block in `acquire_thread` drop(info); - jobserver::acquire_thread(); + proxy.acquire_thread(); } } diff --git a/compiler/rustc_query_system/src/query/mod.rs b/compiler/rustc_query_system/src/query/mod.rs index ef21af7dafba8..855769dacc3e1 100644 --- a/compiler/rustc_query_system/src/query/mod.rs +++ b/compiler/rustc_query_system/src/query/mod.rs @@ -16,6 +16,7 @@ mod caches; pub use self::caches::{DefIdCache, DefaultCache, QueryCache, SingleCache, VecCache}; mod config; +use rustc_data_structures::jobserver::Proxy; use rustc_data_structures::sync::{DynSend, DynSync}; use rustc_errors::DiagInner; use rustc_hashes::Hash64; @@ -151,6 +152,10 @@ pub enum QuerySideEffect { pub trait QueryContext: HasDepContext { type QueryInfo: Clone; + /// Gets a jobserver reference which is used to release then acquire + /// a token while waiting on a query. + fn jobserver_proxy(&self) -> &Proxy; + fn next_job_id(self) -> QueryJobId; /// Get the query information from the TLS context. diff --git a/compiler/rustc_query_system/src/query/plumbing.rs b/compiler/rustc_query_system/src/query/plumbing.rs index 6ea8e3b920084..3c1fc7317848c 100644 --- a/compiler/rustc_query_system/src/query/plumbing.rs +++ b/compiler/rustc_query_system/src/query/plumbing.rs @@ -297,7 +297,7 @@ where // With parallel queries we might just have to wait on some other // thread. - let result = latch.wait_on(current, span); + let result = latch.wait_on(qcx, current, span); match result { Ok(()) => {