Skip to content

Commit 7e49abd

Browse files
Introduce LocalTaskObj
1 parent 7a67744 commit 7e49abd

File tree

10 files changed

+211
-60
lines changed

10 files changed

+211
-60
lines changed

futures-core/src/executor.rs

-4
This file was deleted.

futures-core/src/lib.rs

-2
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,3 @@ pub use stream::Stream;
8383

8484
pub mod task;
8585
pub use task::Poll;
86-
87-
pub mod executor;

futures-core/src/task/local_task.rs

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use core::fmt;
2+
use core::mem::{self, PinMut};
3+
4+
use Future;
5+
use super::{Context, Poll, TaskObj, SpawnErrorKind, SpawnObjError};
6+
7+
/// A custom trait object for polling tasks, roughly akin to
8+
/// `Box<Future<Output = ()>>`.
9+
/// Contrary to `TaskObj`, `LocalTaskObj` does not have a `Send` bound
10+
pub struct LocalTaskObj {
11+
ptr: *mut (),
12+
poll_fn: unsafe fn(*mut (), &mut Context) -> Poll<()>,
13+
drop_fn: unsafe fn(*mut ()),
14+
}
15+
16+
impl fmt::Debug for LocalTaskObj {
17+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
18+
f.debug_struct("LocalTaskObj")
19+
.finish()
20+
}
21+
}
22+
23+
/// A custom implementation of a task trait object for `LocalTaskObj`, providing
24+
/// a hand-rolled vtable.
25+
///
26+
/// This custom representation is typically used only in `no_std` contexts,
27+
/// where the default `Box`-based implementation is not available.
28+
///
29+
/// The implementor must guarantee that it is safe to call `poll` repeatedly (in
30+
/// a non-concurrent fashion) with the result of `into_raw` until `drop` is
31+
/// called.
32+
pub unsafe trait UnsafeLocalTask: 'static {
33+
/// Convert a owned instance into a (conceptually owned) void pointer.
34+
fn into_raw(self) -> *mut ();
35+
36+
/// Poll the task represented by the given void pointer.
37+
///
38+
/// # Safety
39+
///
40+
/// The trait implementor must guarantee that it is safe to repeatedly call
41+
/// `poll` with the result of `into_raw` until `drop` is called; such calls
42+
/// are not, however, allowed to race with each other or with calls to `drop`.
43+
unsafe fn poll(task: *mut (), cx: &mut Context) -> Poll<()>;
44+
45+
/// Drops the task represented by the given void pointer.
46+
///
47+
/// # Safety
48+
///
49+
/// The trait implementor must guarantee that it is safe to call this
50+
/// function once per `into_raw` invocation; that call cannot race with
51+
/// other calls to `drop` or `poll`.
52+
unsafe fn drop(task: *mut ());
53+
}
54+
55+
impl LocalTaskObj {
56+
/// Create a `LocalTaskObj` from a custom trait object representation.
57+
#[inline]
58+
pub fn new<T: UnsafeLocalTask>(t: T) -> LocalTaskObj {
59+
LocalTaskObj {
60+
ptr: t.into_raw(),
61+
poll_fn: T::poll,
62+
drop_fn: T::drop,
63+
}
64+
}
65+
66+
/// Converts the `LocalTaskObj` into a `TaskObj`
67+
/// To make this operation safe one has to ensure that the `LocalTaskObj`
68+
/// implements `Send`.
69+
pub unsafe fn as_task_obj(self) -> TaskObj {
70+
// Safety: Both structs have the same memory layout
71+
mem::transmute::<LocalTaskObj, TaskObj>(self)
72+
}
73+
}
74+
75+
impl From<TaskObj> for LocalTaskObj {
76+
fn from(task: TaskObj) -> LocalTaskObj {
77+
unsafe {
78+
// Safety: Both structs have the same memory layout
79+
mem::transmute::<TaskObj, LocalTaskObj>(task)
80+
}
81+
}
82+
}
83+
84+
impl Future for LocalTaskObj {
85+
type Output = ();
86+
87+
#[inline]
88+
fn poll(self: PinMut<Self>, cx: &mut Context) -> Poll<()> {
89+
unsafe {
90+
(self.poll_fn)(self.ptr, cx)
91+
}
92+
}
93+
}
94+
95+
impl Drop for LocalTaskObj {
96+
fn drop(&mut self) {
97+
unsafe {
98+
(self.drop_fn)(self.ptr)
99+
}
100+
}
101+
}
102+
103+
/// The result of a failed spawn
104+
#[derive(Debug)]
105+
pub struct SpawnLocalObjError {
106+
/// The kind of error
107+
pub kind: SpawnErrorKind,
108+
109+
/// The task for which spawning was attempted
110+
pub task: LocalTaskObj,
111+
}
112+
113+
impl SpawnLocalObjError {
114+
/// Converts the `SpawnLocalObjError` into a `SpawnObjError`
115+
/// To make this operation safe one has to ensure that the `LocalTaskObj`
116+
/// stored inside implements `Send`.
117+
pub unsafe fn as_spawn_obj_error(self) -> SpawnObjError {
118+
// Safety: Both structs have the same memory layout
119+
mem::transmute::<SpawnLocalObjError, SpawnObjError>(self)
120+
}
121+
}
122+
123+
impl From<SpawnObjError> for SpawnLocalObjError {
124+
fn from(error: SpawnObjError) -> SpawnLocalObjError {
125+
unsafe {
126+
// Safety: Both structs have the same memory layout
127+
mem::transmute::<SpawnObjError, SpawnLocalObjError>(error)
128+
}
129+
}
130+
}
131+
132+
if_std! {
133+
use std::boxed::{Box, PinBox};
134+
135+
unsafe impl<F: Future<Output = ()> + 'static> UnsafeLocalTask for PinBox<F> {
136+
fn into_raw(self) -> *mut () {
137+
PinBox::into_raw(self) as *mut ()
138+
}
139+
140+
unsafe fn poll(task: *mut (), cx: &mut Context) -> Poll<()> {
141+
let ptr = task as *mut F;
142+
let pin: PinMut<F> = PinMut::new_unchecked(&mut *ptr);
143+
pin.poll(cx)
144+
}
145+
146+
unsafe fn drop(task: *mut ()) {
147+
drop(PinBox::from_raw(task as *mut F))
148+
}
149+
}
150+
151+
impl<F: Future<Output = ()> + 'static> From<PinBox<F>> for LocalTaskObj {
152+
fn from(boxed: PinBox<F>) -> Self {
153+
LocalTaskObj::new(boxed)
154+
}
155+
}
156+
157+
impl<F: Future<Output = ()> + 'static> From<Box<F>> for LocalTaskObj {
158+
fn from(boxed: Box<F>) -> Self {
159+
LocalTaskObj::new(PinBox::from(boxed))
160+
}
161+
}
162+
}

futures-core/src/task/mod.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,21 @@ pub use core::task::{UnsafeWake, Waker, LocalWaker};
77
#[cfg(feature = "std")]
88
pub use std::task::{Wake, local_waker, local_waker_from_nonlocal};
99

10-
pub use core::task::Context;
11-
1210
mod poll;
1311
pub use self::poll::Poll;
1412

13+
pub use core::task::{
14+
Context, Executor, TaskObj, UnsafeTask, SpawnErrorKind, SpawnObjError
15+
};
16+
17+
mod local_task;
18+
pub use self::local_task::{LocalTaskObj, SpawnLocalObjError};
19+
1520
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
1621
mod atomic_waker;
1722
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
1823
pub use self::atomic_waker::AtomicWaker;
1924

20-
pub use core::task::{TaskObj, UnsafeTask};
21-
2225
if_std! {
2326
use std::boxed::PinBox;
2427

futures-executor/src/local_pool.rs

+15-18
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::sync::Arc;
77
use std::thread::{self, Thread};
88

99
use futures_core::{Future, Poll, Stream};
10-
use futures_core::task::{self, Context, LocalWaker, TaskObj, Wake};
11-
use futures_core::executor::{Executor, SpawnObjError, SpawnErrorKind};
10+
use futures_core::task::{
11+
self, Context, LocalWaker, TaskObj, LocalTaskObj, Wake,
12+
Executor, SpawnObjError, SpawnLocalObjError, SpawnErrorKind};
1213
use futures_util::stream::FuturesUnordered;
1314
use futures_util::stream::StreamExt;
1415

@@ -27,7 +28,7 @@ use ThreadPool;
2728
/// single-threaded, it supports a special form of task spawning for non-`Send`
2829
/// futures, via [`spawn_local`](LocalExecutor::spawn_local).
2930
pub struct LocalPool {
30-
pool: FuturesUnordered<TaskObj>,
31+
pool: FuturesUnordered<LocalTaskObj>,
3132
incoming: Rc<Incoming>,
3233
}
3334

@@ -38,7 +39,7 @@ pub struct LocalExecutor {
3839
incoming: Weak<Incoming>,
3940
}
4041

41-
type Incoming = RefCell<Vec<TaskObj>>;
42+
type Incoming = RefCell<Vec<LocalTaskObj>>;
4243

4344
pub(crate) struct ThreadNotify {
4445
thread: Thread
@@ -254,12 +255,8 @@ impl<S: Stream> Iterator for BlockingStream<S> where S: Unpin {
254255

255256
impl Executor for LocalExecutor {
256257
fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> {
257-
if let Some(incoming) = self.incoming.upgrade() {
258-
incoming.borrow_mut().push(task);
259-
Ok(())
260-
} else {
261-
Err(SpawnObjError{ task, kind: SpawnErrorKind::shutdown() })
262-
}
258+
self.spawn_local_obj(task.into())
259+
.map_err(|err| unsafe { err.as_spawn_obj_error() })
263260
}
264261

265262
fn status(&self) -> Result<(), SpawnErrorKind> {
@@ -272,15 +269,15 @@ impl Executor for LocalExecutor {
272269
}
273270

274271
impl LocalExecutor {
275-
/*
276272
/// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool).
277-
pub fn spawn_local<F>(&mut self, f: F) -> Result<(), SpawnObjError>
278-
where F: Future<Item = (), Error = Never> + 'static
273+
pub fn spawn_local_obj(&mut self, task: LocalTaskObj)
274+
-> Result<(), SpawnLocalObjError>
279275
{
280-
self.spawn_task(Task {
281-
fut: Box::new(f),
282-
map: LocalMap::new(),
283-
})
276+
if let Some(incoming) = self.incoming.upgrade() {
277+
incoming.borrow_mut().push(task);
278+
Ok(())
279+
} else {
280+
Err(SpawnLocalObjError{ task, kind: SpawnErrorKind::shutdown() })
281+
}
284282
}
285-
*/
286283
}

futures-executor/src/thread_pool.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ use std::thread;
88
use std::fmt;
99

1010
use futures_core::*;
11-
use futures_core::task::{self, Wake, TaskObj};
12-
use futures_core::executor::{Executor, SpawnObjError};
11+
use futures_core::task::{self, Wake, TaskObj, Executor, SpawnObjError};
1312

1413
use enter;
1514
use num_cpus;

0 commit comments

Comments
 (0)