Skip to content

Commit 5ccf10e

Browse files
authored
Merge pull request rust-lang#4112 from RalfJung/socket-cleanup
Socket read/write cleanup
2 parents 3623dfd + 6383513 commit 5ccf10e

File tree

4 files changed

+169
-116
lines changed

4 files changed

+169
-116
lines changed

src/tools/miri/src/shims/unix/linux_like/eventfd.rs

+25-30
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,10 @@ impl FileDescription for Event {
6262
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
6363
}
6464

65-
// eventfd read at the size of u64.
65+
// Turn the pointer into a place at the right type.
6666
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
6767

68-
let weak_eventfd = self_ref.downgrade();
69-
eventfd_read(buf_place, dest, weak_eventfd, ecx)
68+
eventfd_read(buf_place, dest, self_ref, ecx)
7069
}
7170

7271
/// A write call adds the 8-byte integer value supplied in
@@ -97,18 +96,10 @@ impl FileDescription for Event {
9796
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
9897
}
9998

100-
// Read the user-supplied value from the pointer.
99+
// Turn the pointer into a place at the right type.
101100
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
102-
let num = ecx.read_scalar(&buf_place)?.to_u64()?;
103101

104-
// u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
105-
if num == u64::MAX {
106-
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
107-
}
108-
// If the addition does not let the counter to exceed the maximum value, update the counter.
109-
// Else, block.
110-
let weak_eventfd = self_ref.downgrade();
111-
eventfd_write(num, buf_place, dest, weak_eventfd, ecx)
102+
eventfd_write(buf_place, dest, self_ref, ecx)
112103
}
113104

114105
fn as_unix(&self) -> &dyn UnixFileDescription {
@@ -193,20 +184,22 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
193184
/// Block thread if the value addition will exceed u64::MAX -1,
194185
/// else just add the user-supplied value to current counter.
195186
fn eventfd_write<'tcx>(
196-
num: u64,
197187
buf_place: MPlaceTy<'tcx>,
198188
dest: &MPlaceTy<'tcx>,
199-
weak_eventfd: WeakFileDescriptionRef,
189+
eventfd_ref: &FileDescriptionRef,
200190
ecx: &mut MiriInterpCx<'tcx>,
201191
) -> InterpResult<'tcx> {
202-
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
203-
throw_unsup_format!("eventfd FD got closed while blocking.")
204-
};
205-
206192
// Since we pass the weak file description ref, it is guaranteed to be
207193
// an eventfd file description.
208194
let eventfd = eventfd_ref.downcast::<Event>().unwrap();
209195

196+
// Figure out which value we should add.
197+
let num = ecx.read_scalar(&buf_place)?.to_u64()?;
198+
// u64::MAX as input is invalid because the maximum value of counter is u64::MAX - 1.
199+
if num == u64::MAX {
200+
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
201+
}
202+
210203
match eventfd.counter.get().checked_add(num) {
211204
Some(new_count @ 0..=MAX_COUNTER) => {
212205
// Future `read` calls will synchronize with this write, so update the FD clock.
@@ -219,7 +212,7 @@ fn eventfd_write<'tcx>(
219212

220213
// The state changed; we check and update the status of all supported event
221214
// types for current file description.
222-
ecx.check_and_update_readiness(&eventfd_ref)?;
215+
ecx.check_and_update_readiness(eventfd_ref)?;
223216

224217
// Unblock *all* threads previously blocked on `read`.
225218
// We need to take out the blocked thread ids and unblock them together,
@@ -244,6 +237,7 @@ fn eventfd_write<'tcx>(
244237

245238
eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());
246239

240+
let weak_eventfd = eventfd_ref.downgrade();
247241
ecx.block_thread(
248242
BlockReason::Eventfd,
249243
None,
@@ -255,8 +249,10 @@ fn eventfd_write<'tcx>(
255249
weak_eventfd: WeakFileDescriptionRef,
256250
}
257251
@unblock = |this| {
258-
// When we get unblocked, try again.
259-
eventfd_write(num, buf_place, &dest, weak_eventfd, this)
252+
// When we get unblocked, try again. We know the ref is still valid,
253+
// otherwise there couldn't be a `write` that unblocks us.
254+
let eventfd_ref = weak_eventfd.upgrade().unwrap();
255+
eventfd_write(buf_place, &dest, &eventfd_ref, this)
260256
}
261257
),
262258
);
@@ -270,13 +266,9 @@ fn eventfd_write<'tcx>(
270266
fn eventfd_read<'tcx>(
271267
buf_place: MPlaceTy<'tcx>,
272268
dest: &MPlaceTy<'tcx>,
273-
weak_eventfd: WeakFileDescriptionRef,
269+
eventfd_ref: &FileDescriptionRef,
274270
ecx: &mut MiriInterpCx<'tcx>,
275271
) -> InterpResult<'tcx> {
276-
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
277-
throw_unsup_format!("eventfd FD got closed while blocking.")
278-
};
279-
280272
// Since we pass the weak file description ref to the callback function, it is guaranteed to be
281273
// an eventfd file description.
282274
let eventfd = eventfd_ref.downcast::<Event>().unwrap();
@@ -293,6 +285,7 @@ fn eventfd_read<'tcx>(
293285

294286
eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());
295287

288+
let weak_eventfd = eventfd_ref.downgrade();
296289
ecx.block_thread(
297290
BlockReason::Eventfd,
298291
None,
@@ -303,8 +296,10 @@ fn eventfd_read<'tcx>(
303296
weak_eventfd: WeakFileDescriptionRef,
304297
}
305298
@unblock = |this| {
306-
// When we get unblocked, try again.
307-
eventfd_read(buf_place, &dest, weak_eventfd, this)
299+
// When we get unblocked, try again. We know the ref is still valid,
300+
// otherwise there couldn't be a `write` that unblocks us.
301+
let eventfd_ref = weak_eventfd.upgrade().unwrap();
302+
eventfd_read(buf_place, &dest, &eventfd_ref, this)
308303
}
309304
),
310305
);
@@ -317,7 +312,7 @@ fn eventfd_read<'tcx>(
317312

318313
// The state changed; we check and update the status of all supported event
319314
// types for current file description.
320-
ecx.check_and_update_readiness(&eventfd_ref)?;
315+
ecx.check_and_update_readiness(eventfd_ref)?;
321316

322317
// Unblock *all* threads previously blocked on `write`.
323318
// We need to take out the blocked thread ids and unblock them together,

src/tools/miri/src/shims/unix/unnamed_socket.rs

+72-86
Original file line numberDiff line numberDiff line change
@@ -96,26 +96,7 @@ impl FileDescription for AnonSocket {
9696
dest: &MPlaceTy<'tcx>,
9797
ecx: &mut MiriInterpCx<'tcx>,
9898
) -> InterpResult<'tcx> {
99-
// Always succeed on read size 0.
100-
if len == 0 {
101-
return ecx.return_read_success(ptr, &[], 0, dest);
102-
}
103-
104-
let Some(readbuf) = &self.readbuf else {
105-
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
106-
// corresponding ErrorKind variant.
107-
throw_unsup_format!("reading from the write end of a pipe");
108-
};
109-
110-
if readbuf.borrow().buf.is_empty() && self.is_nonblock {
111-
// Non-blocking socketpair with writer and empty buffer.
112-
// https://linux.die.net/man/2/read
113-
// EAGAIN or EWOULDBLOCK can be returned for socket,
114-
// POSIX.1-2001 allows either error to be returned for this case.
115-
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
116-
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
117-
}
118-
anonsocket_read(self_ref.downgrade(), len, ptr, dest.clone(), ecx)
99+
anonsocket_read(self_ref, len, ptr, dest, ecx)
119100
}
120101

121102
fn write<'tcx>(
@@ -127,31 +108,7 @@ impl FileDescription for AnonSocket {
127108
dest: &MPlaceTy<'tcx>,
128109
ecx: &mut MiriInterpCx<'tcx>,
129110
) -> InterpResult<'tcx> {
130-
// Always succeed on write size 0.
131-
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
132-
if len == 0 {
133-
return ecx.return_write_success(0, dest);
134-
}
135-
136-
// We are writing to our peer's readbuf.
137-
let Some(peer_fd) = self.peer_fd().upgrade() else {
138-
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
139-
// closed.
140-
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
141-
};
142-
143-
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
144-
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
145-
// corresponding ErrorKind variant.
146-
throw_unsup_format!("writing to the reading end of a pipe");
147-
};
148-
let available_space =
149-
MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
150-
if available_space == 0 && self.is_nonblock {
151-
// Non-blocking socketpair with a full buffer.
152-
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
153-
}
154-
anonsocket_write(self_ref.downgrade(), ptr, len, dest.clone(), ecx)
111+
anonsocket_write(self_ref, ptr, len, dest, ecx)
155112
}
156113

157114
fn as_unix(&self) -> &dyn UnixFileDescription {
@@ -161,50 +118,65 @@ impl FileDescription for AnonSocket {
161118

162119
/// Write to AnonSocket based on the space available and return the written byte size.
163120
fn anonsocket_write<'tcx>(
164-
weak_self_ref: WeakFileDescriptionRef,
121+
self_ref: &FileDescriptionRef,
165122
ptr: Pointer,
166123
len: usize,
167-
dest: MPlaceTy<'tcx>,
124+
dest: &MPlaceTy<'tcx>,
168125
ecx: &mut MiriInterpCx<'tcx>,
169126
) -> InterpResult<'tcx> {
170-
let Some(self_ref) = weak_self_ref.upgrade() else {
171-
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
172-
throw_unsup_format!("This will be a deadlock error in future")
173-
};
174127
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
128+
129+
// Always succeed on write size 0.
130+
// ("If count is zero and fd refers to a file other than a regular file, the results are not specified.")
131+
if len == 0 {
132+
return ecx.return_write_success(0, dest);
133+
}
134+
135+
// We are writing to our peer's readbuf.
175136
let Some(peer_fd) = self_anonsocket.peer_fd().upgrade() else {
176137
// If the upgrade from Weak to Rc fails, it indicates that all read ends have been
177-
// closed.
178-
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, &dest);
138+
// closed. It is an error to write even if there would be space.
139+
return ecx.set_last_error_and_return(ErrorKind::BrokenPipe, dest);
179140
};
141+
180142
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
181-
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
182-
// corresponding ErrorKind variant.
183-
throw_unsup_format!("writing to the reading end of a pipe")
143+
// Writing to the read end of a pipe.
144+
return ecx.set_last_error_and_return(IoError::LibcError("EBADF"), dest);
184145
};
185146

147+
// Let's see if we can write.
186148
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(writebuf.borrow().buf.len());
187-
188149
if available_space == 0 {
189-
// Blocking socketpair with a full buffer.
190-
let dest = dest.clone();
191-
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
192-
ecx.block_thread(
193-
BlockReason::UnnamedSocket,
194-
None,
195-
callback!(
196-
@capture<'tcx> {
197-
weak_self_ref: WeakFileDescriptionRef,
198-
ptr: Pointer,
199-
len: usize,
200-
dest: MPlaceTy<'tcx>,
201-
}
202-
@unblock = |this| {
203-
anonsocket_write(weak_self_ref, ptr, len, dest, this)
204-
}
205-
),
206-
);
150+
if self_anonsocket.is_nonblock {
151+
// Non-blocking socketpair with a full buffer.
152+
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
153+
} else {
154+
// Blocking socketpair with a full buffer.
155+
// Block the current thread; only keep a weak ref for this.
156+
let weak_self_ref = self_ref.downgrade();
157+
let dest = dest.clone();
158+
self_anonsocket.blocked_write_tid.borrow_mut().push(ecx.active_thread());
159+
ecx.block_thread(
160+
BlockReason::UnnamedSocket,
161+
None,
162+
callback!(
163+
@capture<'tcx> {
164+
weak_self_ref: WeakFileDescriptionRef,
165+
ptr: Pointer,
166+
len: usize,
167+
dest: MPlaceTy<'tcx>,
168+
}
169+
@unblock = |this| {
170+
// If we got unblocked, then our peer successfully upgraded its weak
171+
// ref to us. That means we can also upgrade our weak ref.
172+
let self_ref = weak_self_ref.upgrade().unwrap();
173+
anonsocket_write(&self_ref, ptr, len, &dest, this)
174+
}
175+
),
176+
);
177+
}
207178
} else {
179+
// There is space to write!
208180
let mut writebuf = writebuf.borrow_mut();
209181
// Remember this clock so `read` can synchronize with us.
210182
ecx.release_clock(|clock| {
@@ -229,25 +201,26 @@ fn anonsocket_write<'tcx>(
229201
ecx.unblock_thread(thread_id, BlockReason::UnnamedSocket)?;
230202
}
231203

232-
return ecx.return_write_success(actual_write_size, &dest);
204+
return ecx.return_write_success(actual_write_size, dest);
233205
}
234206
interp_ok(())
235207
}
236208

237209
/// Read from AnonSocket and return the number of bytes read.
238210
fn anonsocket_read<'tcx>(
239-
weak_self_ref: WeakFileDescriptionRef,
211+
self_ref: &FileDescriptionRef,
240212
len: usize,
241213
ptr: Pointer,
242-
dest: MPlaceTy<'tcx>,
214+
dest: &MPlaceTy<'tcx>,
243215
ecx: &mut MiriInterpCx<'tcx>,
244216
) -> InterpResult<'tcx> {
245-
let Some(self_ref) = weak_self_ref.upgrade() else {
246-
// FIXME: We should raise a deadlock error if the self_ref upgrade failed.
247-
throw_unsup_format!("This will be a deadlock error in future")
248-
};
249217
let self_anonsocket = self_ref.downcast::<AnonSocket>().unwrap();
250218

219+
// Always succeed on read size 0.
220+
if len == 0 {
221+
return ecx.return_read_success(ptr, &[], 0, dest);
222+
}
223+
251224
let Some(readbuf) = &self_anonsocket.readbuf else {
252225
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
253226
// corresponding ErrorKind variant.
@@ -258,10 +231,19 @@ fn anonsocket_read<'tcx>(
258231
if self_anonsocket.peer_fd().upgrade().is_none() {
259232
// Socketpair with no peer and empty buffer.
260233
// 0 bytes successfully read indicates end-of-file.
261-
return ecx.return_read_success(ptr, &[], 0, &dest);
234+
return ecx.return_read_success(ptr, &[], 0, dest);
235+
} else if self_anonsocket.is_nonblock {
236+
// Non-blocking socketpair with writer and empty buffer.
237+
// https://linux.die.net/man/2/read
238+
// EAGAIN or EWOULDBLOCK can be returned for socket,
239+
// POSIX.1-2001 allows either error to be returned for this case.
240+
// Since there is no ErrorKind for EAGAIN, WouldBlock is used.
241+
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
262242
} else {
263243
// Blocking socketpair with writer and empty buffer.
264-
let weak_self_ref = weak_self_ref.clone();
244+
// Block the current thread; only keep a weak ref for this.
245+
let weak_self_ref = self_ref.downgrade();
246+
let dest = dest.clone();
265247
self_anonsocket.blocked_read_tid.borrow_mut().push(ecx.active_thread());
266248
ecx.block_thread(
267249
BlockReason::UnnamedSocket,
@@ -274,12 +256,16 @@ fn anonsocket_read<'tcx>(
274256
dest: MPlaceTy<'tcx>,
275257
}
276258
@unblock = |this| {
277-
anonsocket_read(weak_self_ref, len, ptr, dest, this)
259+
// If we got unblocked, then our peer successfully upgraded its weak
260+
// ref to us. That means we can also upgrade our weak ref.
261+
let self_ref = weak_self_ref.upgrade().unwrap();
262+
anonsocket_read(&self_ref, len, ptr, &dest, this)
278263
}
279264
),
280265
);
281266
}
282267
} else {
268+
// There's data to be read!
283269
let mut bytes = vec![0; len];
284270
let mut readbuf = readbuf.borrow_mut();
285271
// Synchronize with all previous writes to this buffer.
@@ -313,7 +299,7 @@ fn anonsocket_read<'tcx>(
313299
}
314300
};
315301

316-
return ecx.return_read_success(ptr, &bytes, actual_read_size, &dest);
302+
return ecx.return_read_success(ptr, &bytes, actual_read_size, dest);
317303
}
318304
interp_ok(())
319305
}

0 commit comments

Comments
 (0)