|
13 | 13 | // limitations under the License.
|
14 | 14 |
|
15 | 15 | use std::future::Future;
|
16 |
| -use std::sync::mpsc::channel; |
17 | 16 | use std::sync::Arc;
|
18 | 17 | use std::thread;
|
19 |
| -use std::time::Duration; |
20 | 18 |
|
21 | 19 | use common_exception::ErrorCode;
|
22 | 20 | use common_exception::Result;
|
23 |
| -use futures::future::Either; |
24 | 21 | use tokio::runtime::Handle;
|
25 | 22 | use tokio::sync::oneshot;
|
26 | 23 | use tokio::task::JoinHandle;
|
@@ -67,79 +64,6 @@ impl<S: TrySpawn> TrySpawn for Arc<S> {
|
67 | 64 | }
|
68 | 65 | }
|
69 | 66 |
|
70 |
| -/// Blocking wait for a future to complete. |
71 |
| -/// |
72 |
| -/// This trait turns an `async` function into `sync`. |
73 |
| -/// It is meant to provide convenience for building a proof-of-concept demo or else. |
74 |
| -/// Always avoid using it in a real world production, |
75 |
| -/// unless **you KNOW what you are doing**: |
76 |
| -/// |
77 |
| -/// - `wait()` runs the future in current thread and **blocks** current thread until the future is finished. |
78 |
| -/// - `wait_in(rt)` runs the future in the specified runtime, and **blocks** current thread until the future is finished. |
79 |
| -pub trait BlockingWait |
80 |
| -where |
81 |
| - Self: Future + Send + 'static, |
82 |
| - Self::Output: Send + 'static, |
83 |
| -{ |
84 |
| - /// Runs the future and blocks current thread. |
85 |
| - /// |
86 |
| - /// ```ignore |
87 |
| - /// use runtime::BlockingWait; |
88 |
| - /// async fn five() -> u8 { 5 } |
89 |
| - /// assert_eq!(5, five().wait()); |
90 |
| - /// ``` |
91 |
| - fn wait(self, timeout: Option<Duration>) -> Result<Self::Output>; |
92 |
| - |
93 |
| - /// Runs the future in provided runtime and blocks current thread. |
94 |
| - fn wait_in<RT: TrySpawn>(self, rt: &RT, timeout: Option<Duration>) -> Result<Self::Output>; |
95 |
| -} |
96 |
| - |
97 |
| -impl<T> BlockingWait for T |
98 |
| -where |
99 |
| - T: Future + Send + 'static, |
100 |
| - T::Output: Send + 'static, |
101 |
| -{ |
102 |
| - fn wait(self, timeout: Option<Duration>) -> Result<T::Output> { |
103 |
| - match timeout { |
104 |
| - None => Ok(futures::executor::block_on(self)), |
105 |
| - Some(d) => { |
106 |
| - let rt = tokio::runtime::Builder::new_current_thread() |
107 |
| - .enable_time() |
108 |
| - .build() |
109 |
| - .map_err(|e| ErrorCode::TokioError(format!("{}", e)))?; |
110 |
| - |
111 |
| - rt.block_on(async move { |
112 |
| - let sl = tokio::time::sleep(d); |
113 |
| - let sl = Box::pin(sl); |
114 |
| - let task = Box::pin(self); |
115 |
| - |
116 |
| - match futures::future::select(sl, task).await { |
117 |
| - Either::Left((_, _)) => Err::<T::Output, ErrorCode>(ErrorCode::Timeout( |
118 |
| - format!("timeout: {:?}", d), |
119 |
| - )), |
120 |
| - Either::Right((res, _)) => Ok(res), |
121 |
| - } |
122 |
| - }) |
123 |
| - } |
124 |
| - } |
125 |
| - } |
126 |
| - |
127 |
| - fn wait_in<RT: TrySpawn>(self, rt: &RT, timeout: Option<Duration>) -> Result<T::Output> { |
128 |
| - let (tx, rx) = channel(); |
129 |
| - let _jh = rt.spawn(async move { |
130 |
| - let r = self.await; |
131 |
| - let _ = tx.send(r); |
132 |
| - }); |
133 |
| - let reply = match timeout { |
134 |
| - Some(to) => rx |
135 |
| - .recv_timeout(to) |
136 |
| - .map_err(|timeout_err| ErrorCode::Timeout(timeout_err.to_string()))?, |
137 |
| - None => rx.recv().map_err(ErrorCode::from_std_error)?, |
138 |
| - }; |
139 |
| - Ok(reply) |
140 |
| - } |
141 |
| -} |
142 |
| - |
143 | 67 | /// Tokio Runtime wrapper.
|
144 | 68 | /// If a runtime is in an asynchronous context, shutdown it first.
|
145 | 69 | pub struct Runtime {
|
|
0 commit comments