@@ -32,12 +32,14 @@ use system_interface::io::ReadReady;
32
32
/// ```
33
33
#[ derive( Debug ) ]
34
34
pub struct ReadPipe < R : Read + ReadReady > {
35
+ notify : Arc < tokio:: sync:: Notify > ,
35
36
reader : Arc < RwLock < R > > ,
36
37
}
37
38
38
39
impl < R : Read + ReadReady > Clone for ReadPipe < R > {
39
40
fn clone ( & self ) -> Self {
40
41
Self {
42
+ notify : self . notify . clone ( ) ,
41
43
reader : self . reader . clone ( ) ,
42
44
}
43
45
}
@@ -55,7 +57,11 @@ impl<R: Read + ReadReady> ReadPipe<R> {
55
57
///
56
58
/// All `Handle` read operations delegate to reading from this underlying reader.
57
59
pub fn from_shared ( reader : Arc < RwLock < R > > ) -> Self {
58
- Self { reader }
60
+ Self {
61
+ // TODO(elliottt): should the shared notify be an argument as well?
62
+ notify : Arc :: new ( tokio:: sync:: Notify :: new ( ) ) ,
63
+ reader,
64
+ }
59
65
}
60
66
61
67
/// Try to convert this `ReadPipe<R>` back to the underlying `R` type.
@@ -119,25 +125,50 @@ impl<R: Read + ReadReady + Any + Send + Sync> HostInputStream for ReadPipe<R> {
119
125
}
120
126
121
127
fn pollable ( & self ) -> HostPollable {
128
+ // This is a standalone function because RwLockReadGuard does not implement Send -- calling
129
+ // `reader.read()` from within the async closure below is just not possible.
130
+ fn ready < T : Read + ReadReady + Any + Send + Sync > ( reader : & RwLock < T > ) -> bool {
131
+ if let Ok ( g) = reader. read ( ) {
132
+ if let Ok ( n) = g. num_ready_bytes ( ) {
133
+ return n > 0 ;
134
+ }
135
+ }
136
+
137
+ // If either read or num_ready_bytes raised an error, we want to consider the pipe
138
+ // ready for reading.
139
+ true
140
+ }
141
+
142
+ let notify = Arc :: clone ( & self . notify ) ;
122
143
let reader = Arc :: clone ( & self . reader ) ;
123
144
HostPollable :: new ( move || {
145
+ // TODO(elliottt): is it possible to avoid these clones? They're needed because `Arc`
146
+ // isn't copy, and we need to move values into the async closure.
147
+ let notify = Arc :: clone ( & notify) ;
124
148
let reader = Arc :: clone ( & reader) ;
125
149
Box :: pin ( async move {
126
- loop {
127
- let amount = match reader. read ( ) {
128
- Ok ( g) => g. num_ready_bytes ( ) ?,
129
- Err ( _) => {
130
- // TODO(elliottt): are there any circumstances where we want to clear
131
- // the poisoned state of the pipe?
132
- return Err ( anyhow ! ( "pipe has been poisoned" ) ) ;
150
+ {
151
+ let reader = reader. clone ( ) ;
152
+ let sender = notify. clone ( ) ;
153
+ tokio:: spawn ( async move {
154
+ while !ready ( & reader) {
155
+ tokio:: task:: yield_now ( ) . await ;
133
156
}
134
- } ;
135
- if amount > 0 {
136
- return Ok ( ( ) ) ;
137
- }
138
157
139
- // TODO(elliottt): is there a better way to wait on the pipe to become ready?
140
- tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 1 ) ) . await ;
158
+ sender. notify_one ( ) ;
159
+ } ) ;
160
+ }
161
+
162
+ notify. notified ( ) . await ;
163
+
164
+ let g = match reader. read ( ) {
165
+ Ok ( g) => g,
166
+ Err ( _) => return Err ( anyhow ! ( "pipe has been poisoned" ) ) ,
167
+ } ;
168
+
169
+ match g. num_ready_bytes ( ) {
170
+ Ok ( _) => Ok ( ( ) ) ,
171
+ Err ( e) => Err ( anyhow ! ( e) ) ,
141
172
}
142
173
} )
143
174
} )
0 commit comments