diff --git a/src/liblibc b/src/liblibc index e19309c8b4e8b..2278a549559c3 160000 --- a/src/liblibc +++ b/src/liblibc @@ -1 +1 @@ -Subproject commit e19309c8b4e8bbd11f4d84dfffd75e3d1ac477fe +Subproject commit 2278a549559c38872b4338cb002ecc2a80d860dc diff --git a/src/libstd/fs.rs b/src/libstd/fs.rs index 53384fb9b154b..6b88d498b1041 100644 --- a/src/libstd/fs.rs +++ b/src/libstd/fs.rs @@ -22,7 +22,6 @@ use ffi::OsString; use io::{self, SeekFrom, Seek, Read, Write}; use path::{Path, PathBuf}; use sys::fs as fs_imp; -use sys_common::io::read_to_end_uninitialized; use sys_common::{AsInnerMut, FromInner, AsInner, IntoInner}; use vec::Vec; use time::SystemTime; @@ -351,7 +350,7 @@ impl Read for File { self.inner.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.inner.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] @@ -372,6 +371,9 @@ impl<'a> Read for &'a File { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for &'a File { diff --git a/src/libstd/io/stdio.rs b/src/libstd/io/stdio.rs index cd2d5e52462bb..25309a785c45a 100644 --- a/src/libstd/io/stdio.rs +++ b/src/libstd/io/stdio.rs @@ -18,7 +18,6 @@ use io::lazy::Lazy; use io::{self, BufReader, LineWriter}; use sync::{Arc, Mutex, MutexGuard}; use sys::stdio; -use sys_common::io::{read_to_end_uninitialized}; use sys_common::remutex::{ReentrantMutex, ReentrantMutexGuard}; use thread::LocalKeyState; @@ -78,6 +77,9 @@ fn stderr_raw() -> io::Result { stdio::Stderr::new().map(StderrRaw) } impl Read for StdinRaw { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } } impl Write for StdoutRaw { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } @@ -116,6 +118,12 @@ impl io::Read for Maybe { Maybe::Fake => Ok(0) } } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + match *self { + Maybe::Real(ref mut r) => handle_ebadf(r.read_to_end(buf), 0), + Maybe::Fake => Ok(0) + } + } } fn handle_ebadf(r: io::Result, default: T) -> io::Result { @@ -294,7 +302,7 @@ impl<'a> Read for StdinLock<'a> { self.inner.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.inner.read_to_end(buf) } } diff --git a/src/libstd/net/tcp.rs b/src/libstd/net/tcp.rs index f8e3b58bb3e95..414696413f494 100644 --- a/src/libstd/net/tcp.rs +++ b/src/libstd/net/tcp.rs @@ -14,7 +14,6 @@ use io::prelude::*; use fmt; use io; use net::{ToSocketAddrs, SocketAddr, Shutdown}; -use sys_common::io::read_to_end_uninitialized; use sys_common::net as net_imp; use sys_common::{AsInner, FromInner, IntoInner}; use time::Duration; @@ -269,7 +268,7 @@ impl TcpStream { impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] @@ -281,7 +280,7 @@ impl Write for TcpStream { impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.0.read(buf) } fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - unsafe { read_to_end_uninitialized(self, buf) } + self.0.read_to_end(buf) } } #[stable(feature = "rust1", since = "1.0.0")] diff --git a/src/libstd/process.rs b/src/libstd/process.rs index 8db8ad324bea9..5813d82a315a6 100644 --- a/src/libstd/process.rs +++ b/src/libstd/process.rs @@ -20,10 +20,9 @@ use fmt; use io; use path::Path; use str; -use sys::pipe::AnonPipe; +use sys::pipe::{read2, AnonPipe}; use sys::process as imp; use sys_common::{AsInner, AsInnerMut, FromInner, IntoInner}; -use thread::{self, JoinHandle}; /// Representation of a running or exited child process. /// @@ -134,6 +133,9 @@ impl Read for ChildStdout { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } impl AsInner for ChildStdout { @@ -161,6 +163,9 @@ impl Read for ChildStderr { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.read(buf) } + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } } impl AsInner for ChildStderr { @@ -289,7 +294,7 @@ impl Command { /// By default, stdin, stdout and stderr are inherited from the parent. #[stable(feature = "process", since = "1.0.0")] pub fn spawn(&mut self) -> io::Result { - self.inner.spawn(imp::Stdio::Inherit).map(Child::from_inner) + self.inner.spawn(imp::Stdio::Inherit, true).map(Child::from_inner) } /// Executes the command as a child process, waiting for it to finish and @@ -312,7 +317,7 @@ impl Command { /// ``` #[stable(feature = "process", since = "1.0.0")] pub fn output(&mut self) -> io::Result { - self.inner.spawn(imp::Stdio::MakePipe).map(Child::from_inner) + self.inner.spawn(imp::Stdio::MakePipe, false).map(Child::from_inner) .and_then(|p| p.wait_with_output()) } @@ -334,7 +339,8 @@ impl Command { /// ``` #[stable(feature = "process", since = "1.0.0")] pub fn status(&mut self) -> io::Result { - self.spawn().and_then(|mut p| p.wait()) + self.inner.spawn(imp::Stdio::Inherit, false).map(Child::from_inner) + .and_then(|mut p| p.wait()) } } @@ -496,24 +502,29 @@ impl Child { #[stable(feature = "process", since = "1.0.0")] pub fn wait_with_output(mut self) -> io::Result { drop(self.stdin.take()); - fn read(mut input: R) -> JoinHandle>> - where R: Read + Send + 'static - { - thread::spawn(move || { - let mut ret = Vec::new(); - input.read_to_end(&mut ret).map(|_| ret) - }) + + let (mut stdout, mut stderr) = (Vec::new(), Vec::new()); + match (self.stdout.take(), self.stderr.take()) { + (None, None) => {} + (Some(mut out), None) => { + let res = out.read_to_end(&mut stdout); + res.unwrap(); + } + (None, Some(mut err)) => { + let res = err.read_to_end(&mut stderr); + res.unwrap(); + } + (Some(out), Some(err)) => { + let res = read2(out.inner, &mut stdout, err.inner, &mut stderr); + res.unwrap(); + } } - let stdout = self.stdout.take().map(read); - let stderr = self.stderr.take().map(read); - let status = try!(self.wait()); - let stdout = stdout.and_then(|t| t.join().unwrap().ok()); - let stderr = stderr.and_then(|t| t.join().unwrap().ok()); + let status = try!(self.wait()); Ok(Output { status: status, - stdout: stdout.unwrap_or(Vec::new()), - stderr: stderr.unwrap_or(Vec::new()), + stdout: stdout, + stderr: stderr, }) } } diff --git a/src/libstd/sys/common/net.rs b/src/libstd/sys/common/net.rs index ca4f6e19882b6..aa92e5be11403 100644 --- a/src/libstd/sys/common/net.rs +++ b/src/libstd/sys/common/net.rs @@ -225,6 +225,10 @@ impl TcpStream { self.inner.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let len = cmp::min(buf.len(), ::max_value() as usize) as wrlen_t; let ret = try!(cvt(unsafe { diff --git a/src/libstd/sys/unix/fd.rs b/src/libstd/sys/unix/fd.rs index 299c6ec2731d7..8ec073858fd21 100644 --- a/src/libstd/sys/unix/fd.rs +++ b/src/libstd/sys/unix/fd.rs @@ -8,12 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use io; +#![unstable(reason = "not public", issue = "0", feature = "fd")] + +use prelude::v1::*; + +use io::{self, Read}; use libc::{self, c_int, size_t, c_void}; use mem; +use sync::atomic::{AtomicBool, Ordering}; use sys::cvt; use sys_common::AsInner; -use sync::atomic::{AtomicBool, Ordering}; +use sys_common::io::read_to_end_uninitialized; pub struct FileDesc { fd: c_int, @@ -42,6 +47,11 @@ impl FileDesc { Ok(ret as usize) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let ret = try!(cvt(unsafe { libc::write(self.fd, @@ -67,6 +77,20 @@ impl FileDesc { } } + pub fn set_nonblocking(&self, nonblocking: bool) { + unsafe { + let previous = libc::fcntl(self.fd, libc::F_GETFL); + debug_assert!(previous != -1); + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + let ret = libc::fcntl(self.fd, libc::F_SETFL, new); + debug_assert!(ret != -1); + } + } + pub fn duplicate(&self) -> io::Result { // We want to atomically duplicate this file descriptor and set the // CLOEXEC flag, and currently that's done via F_DUPFD_CLOEXEC. This @@ -118,6 +142,16 @@ impl FileDesc { } } +impl<'a> Read for &'a FileDesc { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} + impl AsInner for FileDesc { fn as_inner(&self) -> &c_int { &self.fd } } diff --git a/src/libstd/sys/unix/fs.rs b/src/libstd/sys/unix/fs.rs index d1b4b1c5c0895..3985a07470e0d 100644 --- a/src/libstd/sys/unix/fs.rs +++ b/src/libstd/sys/unix/fs.rs @@ -486,6 +486,10 @@ impl File { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.0.write(buf) } diff --git a/src/libstd/sys/unix/net.rs b/src/libstd/sys/unix/net.rs index 8785da51986db..acf501d5fda88 100644 --- a/src/libstd/sys/unix/net.rs +++ b/src/libstd/sys/unix/net.rs @@ -116,6 +116,10 @@ impl Socket { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn set_timeout(&self, dur: Option, kind: libc::c_int) -> io::Result<()> { let timeout = match dur { Some(dur) => { diff --git a/src/libstd/sys/unix/pipe.rs b/src/libstd/sys/unix/pipe.rs index 667f0f9e6bf62..e5cb37610011b 100644 --- a/src/libstd/sys/unix/pipe.rs +++ b/src/libstd/sys/unix/pipe.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + +use cmp; use io; use libc::{self, c_int}; +use mem; use sys::cvt_r; use sys::fd::FileDesc; @@ -57,6 +61,10 @@ impl AnonPipe { self.0.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.0.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.0.write(buf) } @@ -64,3 +72,54 @@ impl AnonPipe { pub fn fd(&self) -> &FileDesc { &self.0 } pub fn into_fd(self) -> FileDesc { self.0 } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + // Set both pipes into nonblocking mode as we're gonna be reading from both + // in the `select` loop below, and we wouldn't want one to block the other! + let p1 = p1.into_fd(); + let p2 = p2.into_fd(); + p1.set_nonblocking(true); + p2.set_nonblocking(true); + + let max = cmp::max(p1.raw(), p2.raw()); + loop { + // wait for either pipe to become readable using `select` + try!(cvt_r(|| unsafe { + let mut read: libc::fd_set = mem::zeroed(); + libc::FD_SET(p1.raw(), &mut read); + libc::FD_SET(p2.raw(), &mut read); + libc::select(max + 1, &mut read, 0 as *mut _, 0 as *mut _, + 0 as *mut _) + })); + + // Read as much as we can from each pipe, ignoring EWOULDBLOCK or + // EAGAIN. If we hit EOF, then this will happen because the underlying + // reader will return Ok(0), in which case we'll see `Ok` ourselves. In + // this case we flip the other fd back into blocking mode and read + // whatever's leftover on that file descriptor. + let read = |fd: &FileDesc, dst: &mut Vec| { + match fd.read_to_end(dst) { + Ok(_) => Ok(true), + Err(e) => { + if e.raw_os_error() == Some(libc::EWOULDBLOCK) || + e.raw_os_error() == Some(libc::EAGAIN) { + Ok(false) + } else { + Err(e) + } + } + } + }; + if try!(read(&p1, v1)) { + p2.set_nonblocking(false); + return p2.read_to_end(v2).map(|_| ()); + } + if try!(read(&p2, v2)) { + p1.set_nonblocking(false); + return p1.read_to_end(v1).map(|_| ()); + } + } +} diff --git a/src/libstd/sys/unix/process.rs b/src/libstd/sys/unix/process.rs index 28475f50ce63e..47b0ff42f9322 100644 --- a/src/libstd/sys/unix/process.rs +++ b/src/libstd/sys/unix/process.rs @@ -216,7 +216,7 @@ impl Command { self.stderr = Some(stderr); } - pub fn spawn(&mut self, default: Stdio) + pub fn spawn(&mut self, default: Stdio, needs_stdin: bool) -> io::Result<(Process, StdioPipes)> { const CLOEXEC_MSG_FOOTER: &'static [u8] = b"NOEX"; @@ -225,7 +225,7 @@ impl Command { "nul byte found in provided data")); } - let (ours, theirs) = try!(self.setup_io(default)); + let (ours, theirs) = try!(self.setup_io(default, needs_stdin)); let (input, output) = try!(sys::pipe::anon_pipe()); let pid = unsafe { @@ -298,7 +298,7 @@ impl Command { "nul byte found in provided data") } - match self.setup_io(default) { + match self.setup_io(default, true) { Ok((_, theirs)) => unsafe { self.do_exec(theirs) }, Err(e) => e, } @@ -408,8 +408,11 @@ impl Command { } - fn setup_io(&self, default: Stdio) -> io::Result<(StdioPipes, ChildPipes)> { - let stdin = self.stdin.as_ref().unwrap_or(&default); + fn setup_io(&self, default: Stdio, needs_stdin: bool) + -> io::Result<(StdioPipes, ChildPipes)> { + let null = Stdio::Null; + let default_stdin = if needs_stdin {&default} else {&null}; + let stdin = self.stdin.as_ref().unwrap_or(default_stdin); let stdout = self.stdout.as_ref().unwrap_or(&default); let stderr = self.stderr.as_ref().unwrap_or(&default); let (their_stdin, our_stdin) = try!(stdin.to_child_stdio(true)); @@ -648,7 +651,7 @@ mod tests { cmd.stdin(Stdio::MakePipe); cmd.stdout(Stdio::MakePipe); - let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null)); + let (mut cat, mut pipes) = t!(cmd.spawn(Stdio::Null, true)); let stdin_write = pipes.stdin.take().unwrap(); let stdout_read = pipes.stdout.take().unwrap(); diff --git a/src/libstd/sys/unix/stdio.rs b/src/libstd/sys/unix/stdio.rs index ccbb14677c7e4..37d1d9a969ed8 100644 --- a/src/libstd/sys/unix/stdio.rs +++ b/src/libstd/sys/unix/stdio.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; + use io; use libc; use sys::fd::FileDesc; @@ -25,6 +27,13 @@ impl Stdin { fd.into_raw(); ret } + + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let fd = FileDesc::new(libc::STDIN_FILENO); + let ret = fd.read_to_end(buf); + fd.into_raw(); + ret + } } impl Stdout { diff --git a/src/libstd/sys/windows/c.rs b/src/libstd/sys/windows/c.rs index 472ffdf9e1d93..002ffc7c8685a 100644 --- a/src/libstd/sys/windows/c.rs +++ b/src/libstd/sys/windows/c.rs @@ -12,6 +12,7 @@ #![allow(bad_style)] #![cfg_attr(test, allow(dead_code))] +#![unstable(issue = "0", feature = "windows_c")] use os::raw::{c_int, c_uint, c_ulong, c_long, c_longlong, c_ushort,}; use os::raw::{c_char, c_ulonglong}; @@ -181,6 +182,7 @@ pub const ERROR_PATH_NOT_FOUND: DWORD = 3; pub const ERROR_ACCESS_DENIED: DWORD = 5; pub const ERROR_INVALID_HANDLE: DWORD = 6; pub const ERROR_NO_MORE_FILES: DWORD = 18; +pub const ERROR_HANDLE_EOF: DWORD = 38; pub const ERROR_BROKEN_PIPE: DWORD = 109; pub const ERROR_CALL_NOT_IMPLEMENTED: DWORD = 120; pub const ERROR_INSUFFICIENT_BUFFER: DWORD = 122; @@ -188,6 +190,7 @@ pub const ERROR_ALREADY_EXISTS: DWORD = 183; pub const ERROR_NO_DATA: DWORD = 232; pub const ERROR_ENVVAR_NOT_FOUND: DWORD = 203; pub const ERROR_OPERATION_ABORTED: DWORD = 995; +pub const ERROR_IO_PENDING: DWORD = 997; pub const ERROR_TIMEOUT: DWORD = 0x5B4; pub const INVALID_HANDLE_VALUE: HANDLE = !0 as HANDLE; @@ -292,6 +295,14 @@ pub const EXCEPTION_UNWIND: DWORD = EXCEPTION_UNWINDING | EXCEPTION_TARGET_UNWIND | EXCEPTION_COLLIDED_UNWIND; +pub const PIPE_ACCESS_INBOUND: DWORD = 0x00000001; +pub const FILE_FLAG_FIRST_PIPE_INSTANCE: DWORD = 0x00080000; +pub const FILE_FLAG_OVERLAPPED: DWORD = 0x40000000; +pub const PIPE_WAIT: DWORD = 0x00000000; +pub const PIPE_TYPE_BYTE: DWORD = 0x00000000; +pub const PIPE_REJECT_REMOTE_CLIENTS: DWORD = 0x00000008; +pub const PIPE_READMODE_BYTE: DWORD = 0x00000000; + #[repr(C)] #[cfg(target_arch = "x86")] pub struct WSADATA { @@ -913,10 +924,6 @@ extern "system" { nOutBufferSize: DWORD, lpBytesReturned: LPDWORD, lpOverlapped: LPOVERLAPPED) -> BOOL; - pub fn CreatePipe(hReadPipe: LPHANDLE, - hWritePipe: LPHANDLE, - lpPipeAttributes: LPSECURITY_ATTRIBUTES, - nSize: DWORD) -> BOOL; pub fn CreateThread(lpThreadAttributes: LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: extern "system" fn(*mut c_void) @@ -1129,6 +1136,29 @@ extern "system" { OriginalContext: *const CONTEXT, HistoryTable: *const UNWIND_HISTORY_TABLE); pub fn GetSystemTimeAsFileTime(lpSystemTimeAsFileTime: LPFILETIME); + + pub fn CreateEventW(lpEventAttributes: LPSECURITY_ATTRIBUTES, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: LPCWSTR) -> HANDLE; + pub fn WaitForMultipleObjects(nCount: DWORD, + lpHandles: *const HANDLE, + bWaitAll: BOOL, + dwMilliseconds: DWORD) -> DWORD; + pub fn CreateNamedPipeW(lpName: LPCWSTR, + dwOpenMode: DWORD, + dwPipeMode: DWORD, + nMaxInstances: DWORD, + nOutBufferSize: DWORD, + nInBufferSize: DWORD, + nDefaultTimeOut: DWORD, + lpSecurityAttributes: LPSECURITY_ATTRIBUTES) + -> HANDLE; + pub fn CancelIo(handle: HANDLE) -> BOOL; + pub fn GetOverlappedResult(hFile: HANDLE, + lpOverlapped: LPOVERLAPPED, + lpNumberOfBytesTransferred: LPDWORD, + bWait: BOOL) -> BOOL; } // Functions that aren't available on Windows XP, but we still use them and just diff --git a/src/libstd/sys/windows/fs.rs b/src/libstd/sys/windows/fs.rs index 95fb1e7c60052..624fef097fcc5 100644 --- a/src/libstd/sys/windows/fs.rs +++ b/src/libstd/sys/windows/fs.rs @@ -8,6 +8,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; use io::prelude::*; use os::windows::prelude::*; @@ -312,6 +313,10 @@ impl File { self.handle.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.handle.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.handle.write(buf) } diff --git a/src/libstd/sys/windows/handle.rs b/src/libstd/sys/windows/handle.rs index 47676a927f658..1396d670902bb 100644 --- a/src/libstd/sys/windows/handle.rs +++ b/src/libstd/sys/windows/handle.rs @@ -8,14 +8,19 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_handle")] + +use prelude::v1::*; + use cmp; -use io::ErrorKind; +use io::{ErrorKind, Read}; use io; use mem; use ops::Deref; use ptr; use sys::c; use sys::cvt; +use sys_common::io::read_to_end_uninitialized; use u32; /// An owned container for `HANDLE` object, closing them on Drop. @@ -39,6 +44,20 @@ impl Handle { Handle(RawHandle::new(handle)) } + pub fn new_event(manual: bool, init: bool) -> io::Result { + unsafe { + let event = c::CreateEventW(0 as *mut _, + manual as c::BOOL, + init as c::BOOL, + 0 as *const _); + if event.is_null() { + Err(io::Error::last_os_error()) + } else { + Ok(Handle::new(event)) + } + } + } + pub fn into_raw(self) -> c::HANDLE { let ret = self.raw(); mem::forget(self); @@ -87,6 +106,64 @@ impl RawHandle { } } + pub unsafe fn read_overlapped(&self, + buf: &mut [u8], + overlapped: *mut c::OVERLAPPED) + -> io::Result> { + let len = cmp::min(buf.len(), ::max_value() as usize) as c::DWORD; + let mut amt = 0; + let res = cvt({ + c::ReadFile(self.0, buf.as_ptr() as c::LPVOID, + len, &mut amt, overlapped) + }); + match res { + Ok(_) => Ok(Some(amt as usize)), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_IO_PENDING as i32) { + Ok(None) + } else if e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) { + Ok(Some(0)) + } else { + Err(e) + } + } + } + } + + pub fn overlapped_result(&self, + overlapped: *mut c::OVERLAPPED, + wait: bool) -> io::Result { + unsafe { + let mut bytes = 0; + let wait = if wait {c::TRUE} else {c::FALSE}; + let res = cvt({ + c::GetOverlappedResult(self.raw(), overlapped, &mut bytes, wait) + }); + match res { + Ok(_) => Ok(bytes as usize), + Err(e) => { + if e.raw_os_error() == Some(c::ERROR_HANDLE_EOF as i32) || + e.raw_os_error() == Some(c::ERROR_BROKEN_PIPE as i32) { + Ok(0) + } else { + Err(e) + } + } + } + } + } + + pub fn cancel_io(&self) -> io::Result<()> { + unsafe { + cvt(c::CancelIo(self.raw())).map(|_| ()) + } + } + + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { let mut amt = 0; // WriteFile takes a DWORD (u32) for the length so it only supports @@ -111,3 +188,13 @@ impl RawHandle { Ok(Handle::new(ret)) } } + +impl<'a> Read for &'a RawHandle { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} diff --git a/src/libstd/sys/windows/net.rs b/src/libstd/sys/windows/net.rs index dfa44a651e61c..bb3c79c5a84fd 100644 --- a/src/libstd/sys/windows/net.rs +++ b/src/libstd/sys/windows/net.rs @@ -8,8 +8,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_net")] + +use prelude::v1::*; + use cmp; -use io; +use io::{self, Read}; use libc::{c_int, c_void, c_ulong}; use mem; use net::{SocketAddr, Shutdown}; @@ -20,6 +24,7 @@ use sync::Once; use sys::c; use sys; use sys_common::{self, AsInner, FromInner, IntoInner}; +use sys_common::io::read_to_end_uninitialized; use sys_common::net; use time::Duration; @@ -142,6 +147,11 @@ impl Socket { } } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } + pub fn set_timeout(&self, dur: Option, kind: c_int) -> io::Result<()> { let timeout = match dur { @@ -206,6 +216,17 @@ impl Socket { } } +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a Socket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } +} + impl Drop for Socket { fn drop(&mut self) { let _ = unsafe { c::closesocket(self.0) }; diff --git a/src/libstd/sys/windows/pipe.rs b/src/libstd/sys/windows/pipe.rs index aec41885f3b87..fbe38d76e9571 100644 --- a/src/libstd/sys/windows/pipe.rs +++ b/src/libstd/sys/windows/pipe.rs @@ -8,10 +8,17 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use prelude::v1::*; +use os::windows::prelude::*; + +use ffi::OsStr; +use path::Path; use io; -use ptr; -use sys::cvt; +use mem; +use rand::{self, Rng}; +use slice; use sys::c; +use sys::fs::{File, OpenOptions}; use sys::handle::Handle; //////////////////////////////////////////////////////////////////////////////// @@ -23,14 +30,76 @@ pub struct AnonPipe { } pub fn anon_pipe() -> io::Result<(AnonPipe, AnonPipe)> { - let mut reader = c::INVALID_HANDLE_VALUE; - let mut writer = c::INVALID_HANDLE_VALUE; - try!(cvt(unsafe { - c::CreatePipe(&mut reader, &mut writer, ptr::null_mut(), 0) - })); - let reader = Handle::new(reader); - let writer = Handle::new(writer); - Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer })) + // Note that we specifically do *not* use `CreatePipe` here because + // unfortunately the anonymous pipes returned do not support overlapped + // operations. + // + // Instead, we create a "hopefully unique" name and create a named pipe + // which has overlapped operations enabled. + // + // Once we do this, we connect do it as usual via `CreateFileW`, and then we + // return those reader/writer halves. + unsafe { + let reader; + let mut name; + let mut tries = 0; + loop { + tries += 1; + let key: u64 = rand::thread_rng().gen(); + name = format!(r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}", + c::GetCurrentProcessId(), + key); + let wide_name = OsStr::new(&name) + .encode_wide() + .chain(Some(0)) + .collect::>(); + + let handle = c::CreateNamedPipeW(wide_name.as_ptr(), + c::PIPE_ACCESS_INBOUND | + c::FILE_FLAG_FIRST_PIPE_INSTANCE | + c::FILE_FLAG_OVERLAPPED, + c::PIPE_TYPE_BYTE | + c::PIPE_READMODE_BYTE | + c::PIPE_WAIT | + c::PIPE_REJECT_REMOTE_CLIENTS, + 1, + 4096, + 4096, + 0, + 0 as *mut _); + + // We pass the FILE_FLAG_FIRST_PIPE_INSTANCE flag above, and we're + // also just doing a best effort at selecting a unique name. If + // ERROR_ACCESS_DENIED is returned then it could mean that we + // accidentally conflicted with an already existing pipe, so we try + // again. + // + // Don't try again too much though as this could also perhaps be a + // legit error. + if handle == c::INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + if tries < 10 && + err.raw_os_error() == Some(c::ERROR_ACCESS_DENIED as i32) { + continue + } + return Err(err) + } + reader = Handle::new(handle); + break + } + + // Connect to the named pipe we just created in write-only mode (also + // overlapped for async I/O below). + let mut opts = OpenOptions::new(); + opts.write(true); + opts.read(false); + opts.share_mode(0); + opts.attributes(c::FILE_FLAG_OVERLAPPED); + let writer = try!(File::open(Path::new(&name), &opts)); + let writer = AnonPipe { inner: writer.into_handle() }; + + Ok((AnonPipe { inner: reader }, AnonPipe { inner: writer.into_handle() })) + } } impl AnonPipe { @@ -41,7 +110,193 @@ impl AnonPipe { self.inner.read(buf) } + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + self.inner.read_to_end(buf) + } + pub fn write(&self, buf: &[u8]) -> io::Result { self.inner.write(buf) } } + +pub fn read2(p1: AnonPipe, + v1: &mut Vec, + p2: AnonPipe, + v2: &mut Vec) -> io::Result<()> { + let p1 = p1.into_handle(); + let p2 = p2.into_handle(); + + let mut p1 = try!(AsyncPipe::new(p1, v1)); + let mut p2 = try!(AsyncPipe::new(p2, v2)); + let objs = [p1.event.raw(), p2.event.raw()]; + + // In a loop we wait for either pipe's scheduled read operation to complete. + // If the operation completes with 0 bytes, that means EOF was reached, in + // which case we just finish out the other pipe entirely. + // + // Note that overlapped I/O is in general super unsafe because we have to + // be careful to ensure that all pointers in play are valid for the entire + // duration of the I/O operation (where tons of operations can also fail). + // The destructor for `AsyncPipe` ends up taking care of most of this. + loop { + let res = unsafe { + c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) + }; + if res == c::WAIT_OBJECT_0 { + if !try!(p1.result()) || !try!(p1.schedule_read()) { + return p2.finish() + } + } else if res == c::WAIT_OBJECT_0 + 1 { + if !try!(p2.result()) || !try!(p2.schedule_read()) { + return p1.finish() + } + } else { + return Err(io::Error::last_os_error()) + } + } +} + +struct AsyncPipe<'a> { + pipe: Handle, + event: Handle, + overlapped: Box, // needs a stable address + dst: &'a mut Vec, + state: State, +} + +#[derive(PartialEq, Debug)] +enum State { + NotReading, + Reading, + Read(usize), +} + +impl<'a> AsyncPipe<'a> { + fn new(pipe: Handle, dst: &'a mut Vec) -> io::Result> { + // Create an event which we'll use to coordinate our overlapped + // opreations, this event will be used in WaitForMultipleObjects + // and passed as part of the OVERLAPPED handle. + // + // Note that we do a somewhat clever thing here by flagging the + // event as being manually reset and setting it initially to the + // signaled state. This means that we'll naturally fall through the + // WaitForMultipleObjects call above for pipes created initially, + // and the only time an even will go back to "unset" will be once an + // I/O operation is successfully scheduled (what we want). + let event = try!(Handle::new_event(true, true)); + let mut overlapped: Box = unsafe { + Box::new(mem::zeroed()) + }; + overlapped.hEvent = event.raw(); + Ok(AsyncPipe { + pipe: pipe, + overlapped: overlapped, + event: event, + dst: dst, + state: State::NotReading, + }) + } + + /// Executes an overlapped read operation. + /// + /// Must not currently be reading, and returns whether the pipe is currently + /// at EOF or not. If the pipe is not at EOF then `result()` must be called + /// to complete the read later on (may block), but if the pipe is at EOF + /// then `result()` should not be called as it will just block forever. + fn schedule_read(&mut self) -> io::Result { + assert_eq!(self.state, State::NotReading); + let amt = unsafe { + let slice = slice_to_end(self.dst); + try!(self.pipe.read_overlapped(slice, &mut *self.overlapped)) + }; + + // If this read finished immediately then our overlapped event will + // remain signaled (it was signaled coming in here) and we'll progress + // down to the method below. + // + // Otherwise the I/O operation is scheduled and the system set our event + // to not signaled, so we flag ourselves into the reading state and move + // on. + self.state = match amt { + Some(0) => return Ok(false), + Some(amt) => State::Read(amt), + None => State::Reading, + }; + Ok(true) + } + + /// Wait for the result of the overlapped operation previously executed. + /// + /// Takes a parameter `wait` which indicates if this pipe is currently being + /// read whether the function should block waiting for the read to complete. + /// + /// Return values: + /// + /// * `true` - finished any pending read and the pipe is not at EOF (keep + /// going) + /// * `false` - finished any pending read and pipe is at EOF (stop issuing + /// reads) + fn result(&mut self) -> io::Result { + let amt = match self.state { + State::NotReading => return Ok(true), + State::Reading => { + try!(self.pipe.overlapped_result(&mut *self.overlapped, true)) + } + State::Read(amt) => amt, + }; + self.state = State::NotReading; + unsafe { + let len = self.dst.len(); + self.dst.set_len(len + amt); + } + Ok(amt != 0) + } + + /// Finishes out reading this pipe entirely. + /// + /// Waits for any pending and schedule read, and then calls `read_to_end` + /// if necessary to read all the remaining information. + fn finish(&mut self) -> io::Result<()> { + while try!(self.result()) && try!(self.schedule_read()) { + // ... + } + Ok(()) + } +} + +impl<'a> Drop for AsyncPipe<'a> { + fn drop(&mut self) { + match self.state { + State::Reading => {} + _ => return, + } + + // If we have a pending read operation, then we have to make sure that + // it's *done* before we actually drop this type. The kernel requires + // that the `OVERLAPPED` and buffer pointers are valid for the entire + // I/O operation. + // + // To do that, we call `CancelIo` to cancel any pending operation, and + // if that succeeds we wait for the overlapped result. + // + // If anything here fails, there's not really much we can do, so we leak + // the buffer/OVERLAPPED pointers to ensure we're at least memory safe. + if self.pipe.cancel_io().is_err() || self.result().is_err() { + let buf = mem::replace(self.dst, Vec::new()); + let overlapped = Box::new(unsafe { mem::zeroed() }); + let overlapped = mem::replace(&mut self.overlapped, overlapped); + mem::forget((buf, overlapped)); + } + } +} + +unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { + if v.capacity() == 0 { + v.reserve(16); + } + if v.capacity() == v.len() { + v.reserve(1); + } + slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize), + v.capacity() - v.len()) +} diff --git a/src/libstd/sys/windows/process.rs b/src/libstd/sys/windows/process.rs index fa118be6fe6b1..524c932eed439 100644 --- a/src/libstd/sys/windows/process.rs +++ b/src/libstd/sys/windows/process.rs @@ -123,7 +123,7 @@ impl Command { self.stderr = Some(stderr); } - pub fn spawn(&mut self, default: Stdio) + pub fn spawn(&mut self, default: Stdio, needs_stdin: bool) -> io::Result<(Process, StdioPipes)> { // To have the spawning semantics of unix/windows stay the same, we need // to read the *child's* PATH if one is provided. See #15149 for more @@ -181,7 +181,9 @@ impl Command { stdout: None, stderr: None, }; - let stdin = self.stdin.as_ref().unwrap_or(&default); + let null = Stdio::Null; + let default_stdin = if needs_stdin {&default} else {&null}; + let stdin = self.stdin.as_ref().unwrap_or(default_stdin); let stdout = self.stdout.as_ref().unwrap_or(&default); let stderr = self.stderr.as_ref().unwrap_or(&default); let stdin = try!(stdin.to_handle(c::STD_INPUT_HANDLE, &mut pipes.stdin)); diff --git a/src/libstd/sys/windows/stdio.rs b/src/libstd/sys/windows/stdio.rs index 1cd05b61d25b0..5883904c21d72 100644 --- a/src/libstd/sys/windows/stdio.rs +++ b/src/libstd/sys/windows/stdio.rs @@ -8,6 +8,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![unstable(issue = "0", feature = "windows_stdio")] + use prelude::v1::*; use io::prelude::*; @@ -18,6 +20,7 @@ use sync::Mutex; use sys::c; use sys::cvt; use sys::handle::Handle; +use sys_common::io::read_to_end_uninitialized; pub struct NoClose(Option); @@ -113,6 +116,22 @@ impl Stdin { // MemReader shouldn't error here since we just filled it utf8.read(buf) } + + pub fn read_to_end(&self, buf: &mut Vec) -> io::Result { + let mut me = self; + (&mut me).read_to_end(buf) + } +} + +#[unstable(reason = "not public", issue = "0", feature = "fd_read")] +impl<'a> Read for &'a Stdin { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (**self).read(buf) + } + + fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { + unsafe { read_to_end_uninitialized(self, buf) } + } } impl Stdout {