Skip to content

Added support for configuring number of threads, and thread name, used by POOL in task::executor #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/a-chat/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use async_std::{
net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task,
task::RuntimeConfig,
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand All @@ -20,6 +21,11 @@ type Receiver<T> = mpsc::UnboundedReceiver<T>;
enum Void {}

pub(crate) fn main() -> Result<()> {
let mut config = RuntimeConfig::new();
let num_threads = num_cpus::get() / 2;
config.num_thread(num_threads).thread_name("a-chat-server");
assert!(config.finalize().is_ok());

task::block_on(accept_loop("127.0.0.1:8080"))
}

Expand Down
1 change: 1 addition & 0 deletions src/task/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! * The only import is the `crate::task::Runnable` type.

pub(crate) use pool::schedule;
pub use pool::RuntimeConfig;

use sleepers::Sleepers;

Expand Down
49 changes: 47 additions & 2 deletions src/task/executor/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,50 @@ use crate::task::executor::Sleepers;
use crate::task::Runnable;
use crate::utils::{abort_on_panic, random};

type SyncOnceCell<T> = once_cell::sync::OnceCell<T>;
static RUNTIME_CONFIG: SyncOnceCell<RuntimeConfig> = SyncOnceCell::new();

/// configuration parameters for executor
#[derive(Debug)]
pub struct RuntimeConfig {
/// Name given to created worker threads
pub thread_name: String,

/// Number of threads executor is allowed to create
pub num_threads: usize,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
thread_name: "async-std/executor".to_string(),
num_threads: num_cpus::get(),
}
}
}
impl RuntimeConfig {
/// Creates new config with predefined defaults
pub fn new() -> RuntimeConfig {
RuntimeConfig::default()
}

/// Configures name given to worker threads
pub fn thread_name(&mut self, thread_name: impl Into<String>) -> &mut Self {
self.thread_name = thread_name.into();
self
}

/// Configures number of worker threads
pub fn num_thread(&mut self, num_threads: usize) -> &mut Self {
self.num_threads = num_threads;
self
}

/// Sets `RUNTIME_CONFIG` with self
pub fn finalize(self) -> Result<(), RuntimeConfig> {
RUNTIME_CONFIG.set(self)
}
}

/// The state of an executor.
struct Pool {
/// The global queue of tasks.
Expand All @@ -25,7 +69,8 @@ struct Pool {

/// Global executor that runs spawned tasks.
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let runtime_config = RUNTIME_CONFIG.get_or_init(|| RuntimeConfig::default());
let num_threads = runtime_config.num_threads.max(1);
let mut stealers = Vec::new();

// Spawn worker threads.
Expand All @@ -40,7 +85,7 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
};

thread::Builder::new()
.name("async-std/executor".to_string())
.name(runtime_config.thread_name.clone())
.spawn(|| {
let _ = PROCESSOR.with(|p| p.set(proc));
abort_on_panic(main_loop);
Expand Down
1 change: 1 addition & 0 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ cfg_default! {
pub use sleep::sleep;
pub use spawn::spawn;
pub use task_local::{AccessError, LocalKey};
pub use executor::RuntimeConfig;

use builder::Runnable;
use task_local::LocalsMap;
Expand Down