Skip to content

Implement process bindings to libuv (again) #9260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 19, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/libstd/rt/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ pub use self::net::ip::IpAddr;
pub use self::net::tcp::TcpListener;
pub use self::net::tcp::TcpStream;
pub use self::net::udp::UdpStream;
pub use self::pipe::PipeStream;
pub use self::pipe::UnboundPipeStream;
pub use self::process::Process;

// Some extension traits that all Readers and Writers get.
pub use self::extensions::ReaderUtil;
Expand All @@ -269,6 +272,12 @@ pub use self::extensions::WriterByteConversions;
/// Synchronous, non-blocking file I/O.
pub mod file;

/// Synchronous, in-memory I/O.
pub mod pipe;

/// Child process management.
pub mod process;

/// Synchronous, non-blocking network I/O.
pub mod net;

Expand Down
76 changes: 76 additions & 0 deletions src/libstd/rt/io/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Synchronous, in-memory pipes.
//!
//! Currently these aren't particularly useful, there only exists bindings
//! enough so that pipes can be created to child processes.

use prelude::*;
use super::{Reader, Writer};
use rt::io::{io_error, read_error, EndOfFile};
use rt::local::Local;
use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory};
use rt::rtio::RtioUnboundPipeObject;

pub struct PipeStream(RtioPipeObject);
pub struct UnboundPipeStream(~RtioUnboundPipeObject);

impl PipeStream {
/// Creates a new pipe initialized, but not bound to any particular
/// source/destination
pub fn new() -> Option<UnboundPipeStream> {
let pipe = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).pipe_init(false)
};
match pipe {
Ok(p) => Some(UnboundPipeStream(p)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}

pub fn bind(inner: RtioPipeObject) -> PipeStream {
PipeStream(inner)
}
}

impl Reader for PipeStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
match (**self).read(buf) {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
if ioerr.kind != EndOfFile {
read_error::cond.raise(ioerr);
}
return None;
}
}
}

fn eof(&mut self) -> bool { fail!() }
}

impl Writer for PipeStream {
fn write(&mut self, buf: &[u8]) {
match (**self).write(buf) {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
}
}
}

fn flush(&mut self) { fail!() }
}
278 changes: 278 additions & 0 deletions src/libstd/rt/io/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Bindings for executing child processes

use prelude::*;

use libc;
use rt::io;
use rt::io::io_error;
use rt::local::Local;
use rt::rtio::{RtioProcess, RtioProcessObject, IoFactoryObject, IoFactory};

pub struct Process {
priv handle: ~RtioProcessObject,
io: ~[Option<io::PipeStream>],
}

/// This configuration describes how a new process should be spawned. This is
/// translated to libuv's own configuration
pub struct ProcessConfig<'self> {
/// Path to the program to run
program: &'self str,

/// Arguments to pass to the program (doesn't include the program itself)
args: &'self [~str],

/// Optional environment to specify for the program. If this is None, then
/// it will inherit the current process's environment.
env: Option<&'self [(~str, ~str)]>,

/// Optional working directory for the new process. If this is None, then
/// the current directory of the running process is inherited.
cwd: Option<&'self str>,

/// Any number of streams/file descriptors/pipes may be attached to this
/// process. This list enumerates the file descriptors and such for the
/// process to be spawned, and the file descriptors inherited will start at
/// 0 and go to the length of this array.
///
/// Standard file descriptors are:
///
/// 0 - stdin
/// 1 - stdout
/// 2 - stderr
io: ~[StdioContainer]
}

/// Describes what to do with a standard io stream for a child process.
pub enum StdioContainer {
/// This stream will be ignored. This is the equivalent of attaching the
/// stream to `/dev/null`
Ignored,

/// The specified file descriptor is inherited for the stream which it is
/// specified for.
InheritFd(libc::c_int),

// XXX: these two shouldn't have libuv-specific implementation details

/// The specified libuv stream is inherited for the corresponding file
/// descriptor it is assigned to.
// XXX: this needs to be thought out more.
//InheritStream(uv::net::StreamWatcher),

/// Creates a pipe for the specified file descriptor which will be directed
/// into the previously-initialized pipe passed in.
///
/// The first boolean argument is whether the pipe is readable, and the
/// second is whether it is writable. These properties are from the view of
/// the *child* process, not the parent process.
CreatePipe(io::UnboundPipeStream,
bool /* readable */,
bool /* writable */),
}

impl Process {
/// Creates a new pipe initialized, but not bound to any particular
/// source/destination
pub fn new(config: ProcessConfig) -> Option<Process> {
let process = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).spawn(config)
};
match process {
Ok((p, io)) => Some(Process{
handle: p,
io: io.move_iter().map(|p|
p.map_move(|p| io::PipeStream::bind(p))
).collect()
}),
Err(ioerr) => {
io_error::cond.raise(ioerr);
None
}
}
}

/// Returns the process id of this child process
pub fn id(&self) -> libc::pid_t { self.handle.id() }

/// Sends the specified signal to the child process, returning whether the
/// signal could be delivered or not.
///
/// Note that this is purely a wrapper around libuv's `uv_process_kill`
/// function.
///
/// If the signal delivery fails, then the `io_error` condition is raised on
pub fn signal(&mut self, signal: int) {
match self.handle.kill(signal) {
Ok(()) => {}
Err(err) => {
io_error::cond.raise(err)
}
}
}

/// Wait for the child to exit completely, returning the status that it
/// exited with. This function will continue to have the same return value
/// after it has been called at least once.
pub fn wait(&mut self) -> int { self.handle.wait() }
}

impl Drop for Process {
fn drop(&mut self) {
// Close all I/O before exiting to ensure that the child doesn't wait
// forever to print some text or something similar.
for _ in range(0, self.io.len()) {
self.io.pop();
}

self.wait();
}
}

#[cfg(test)]
mod tests {
use prelude::*;
use super::*;

use rt::io::{Reader, Writer};
use rt::io::pipe::*;
use str;

#[test]
#[cfg(unix, not(android))]
fn smoke() {
let io = ~[];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"true"],
env: None,
cwd: None,
io: io,
};
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert_eq!(p.wait(), 0);
}

#[test]
#[cfg(unix, not(android))]
fn smoke_failure() {
let io = ~[];
let args = ProcessConfig {
program: "if-this-is-a-binary-then-the-world-has-ended",
args: [],
env: None,
cwd: None,
io: io,
};
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert!(p.wait() != 0);
}

#[test]
#[cfg(unix, not(android))]
fn exit_reported_right() {
let io = ~[];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"exit 1"],
env: None,
cwd: None,
io: io,
};
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert_eq!(p.wait(), 1);
}

fn read_all(input: &mut Reader) -> ~str {
let mut ret = ~"";
let mut buf = [0, ..1024];
loop {
match input.read(buf) {
None | Some(0) => { break }
Some(n) => { ret = ret + str::from_utf8(buf.slice_to(n)); }
}
}
return ret;
}

fn run_output(args: ProcessConfig) -> ~str {
let p = Process::new(args);
assert!(p.is_some());
let mut p = p.unwrap();
assert!(p.io[0].is_none());
assert!(p.io[1].is_some());
let ret = read_all(p.io[1].get_mut_ref() as &mut Reader);
assert_eq!(p.wait(), 0);
return ret;
}

#[test]
#[cfg(unix, not(android))]
fn stdout_works() {
let pipe = PipeStream::new().unwrap();
let io = ~[Ignored, CreatePipe(pipe, false, true)];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"echo foobar"],
env: None,
cwd: None,
io: io,
};
assert_eq!(run_output(args), ~"foobar\n");
}

#[test]
#[cfg(unix, not(android))]
fn set_cwd_works() {
let pipe = PipeStream::new().unwrap();
let io = ~[Ignored, CreatePipe(pipe, false, true)];
let cwd = Some("/");
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"pwd"],
env: None,
cwd: cwd,
io: io,
};
assert_eq!(run_output(args), ~"/\n");
}

#[test]
#[cfg(unix, not(android))]
fn stdin_works() {
let input = PipeStream::new().unwrap();
let output = PipeStream::new().unwrap();
let io = ~[CreatePipe(input, true, false),
CreatePipe(output, false, true)];
let args = ProcessConfig {
program: "/bin/sh",
args: [~"-c", ~"read line; echo $line"],
env: None,
cwd: None,
io: io,
};
let mut p = Process::new(args).expect("didn't create a proces?!");
p.io[0].get_mut_ref().write("foobar".as_bytes());
p.io[0] = None; // close stdin;
let out = read_all(p.io[1].get_mut_ref() as &mut Reader);
assert_eq!(p.wait(), 0);
assert_eq!(out, ~"foobar\n");
}
}
Loading