diff --git a/crates/cli/src/decompress.rs b/crates/cli/src/decompress.rs index 813cca6c..9bd9100a 100644 --- a/crates/cli/src/decompress.rs +++ b/crates/cli/src/decompress.rs @@ -366,6 +366,30 @@ impl DecompressionReader { let file = File::open(path)?; Ok(DecompressionReader { rdr: Err(file) }) } + + /// Closes this reader, freeing any resources used by its underlying child + /// process, if one was used. If the child process exits with a nonzero + /// exit code, the returned Err value will include its stderr. + /// + /// `close` is idempotent, meaning it can be safely called multiple times. + /// The first call closes the CommandReader and any subsequent calls do + /// nothing. + /// + /// This method should be called after partially reading a file to prevent + /// resource leakage. However there is no need to call `close` explicitly + /// if your code always calls `read` to EOF, as `read` takes care of + /// calling `close` in this case. + /// + /// `close` is also called in `drop` as a last line of defense against + /// resource leakage. Any error from the child process is then printed as a + /// warning to stderr. This can be avoided by explictly calling `close` + /// before the CommandReader is dropped. + pub fn close(&mut self) -> io::Result<()> { + match self.rdr { + Ok(ref mut rdr) => rdr.close(), + Err(_) => Ok(()), + } + } } impl io::Read for DecompressionReader { diff --git a/crates/cli/src/process.rs b/crates/cli/src/process.rs index 4ec5af7f..8ce94f15 100644 --- a/crates/cli/src/process.rs +++ b/crates/cli/src/process.rs @@ -30,6 +30,14 @@ impl CommandError { pub(crate) fn stderr(bytes: Vec) -> CommandError { CommandError { kind: CommandErrorKind::Stderr(bytes) } } + + /// Returns true if and only if this error has empty data from stderr. + pub(crate) fn is_empty(&self) -> bool { + match self.kind { + CommandErrorKind::Stderr(ref bytes) => bytes.is_empty(), + _ => false, + } + } } impl error::Error for CommandError { @@ -107,18 +115,12 @@ impl CommandReaderBuilder { .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .spawn()?; - let stdout = child.stdout.take().unwrap(); let stderr = if self.async_stderr { StderrReader::async(child.stderr.take().unwrap()) } else { StderrReader::sync(child.stderr.take().unwrap()) }; - Ok(CommandReader { - child: child, - stdout: stdout, - stderr: stderr, - done: false, - }) + Ok(CommandReader { child, stderr, eof: false }) } /// When enabled, the reader will asynchronously read the contents of the @@ -175,9 +177,11 @@ impl CommandReaderBuilder { #[derive(Debug)] pub struct CommandReader { child: process::Child, - stdout: process::ChildStdout, stderr: StderrReader, - done: bool, + /// This is set to true once 'read' returns zero bytes. When this isn't + /// set and we close the reader, then we anticipate a pipe error when + /// reaping the child process and silence it. + eof: bool, } impl CommandReader { @@ -201,23 +205,73 @@ impl CommandReader { ) -> Result { CommandReaderBuilder::new().build(cmd) } + + /// Closes the CommandReader, freeing any resources used by its underlying + /// child process. If the child process exits with a nonzero exit code, the + /// returned Err value will include its stderr. + /// + /// `close` is idempotent, meaning it can be safely called multiple times. + /// The first call closes the CommandReader and any subsequent calls do + /// nothing. + /// + /// This method should be called after partially reading a file to prevent + /// resource leakage. However there is no need to call `close` explicitly + /// if your code always calls `read` to EOF, as `read` takes care of + /// calling `close` in this case. + /// + /// `close` is also called in `drop` as a last line of defense against + /// resource leakage. Any error from the child process is then printed as a + /// warning to stderr. This can be avoided by explictly calling `close` + /// before the CommandReader is dropped. + pub fn close(&mut self) -> io::Result<()> { + // Dropping stdout closes the underlying file descriptor, which should + // cause a well-behaved child process to exit. If child.stdout is None + // we assume that close() has already been called and do nothing. + let stdout = match self.child.stdout.take() { + None => return Ok(()), + Some(stdout) => stdout, + }; + drop(stdout); + if self.child.wait()?.success() { + Ok(()) + } else { + let err = self.stderr.read_to_end(); + // In the specific case where we haven't consumed the full data + // from the child process, then closing stdout above results in + // a pipe signal being thrown in most cases. But I don't think + // there is any reliable and portable way of detecting it. Instead, + // if we know we haven't hit EOF (so we anticipate a broken pipe + // error) and if stderr otherwise doesn't have anything on it, then + // we assume total success. + if !self.eof && err.is_empty() { + return Ok(()); + } + Err(io::Error::from(err)) + } + } +} + +impl Drop for CommandReader { + fn drop(&mut self) { + if let Err(error) = self.close() { + warn!("{}", error); + } + } } impl io::Read for CommandReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { - if self.done { - return Ok(0); - } - let nread = self.stdout.read(buf)?; + let stdout = match self.child.stdout { + None => return Ok(0), + Some(ref mut stdout) => stdout, + }; + let nread = stdout.read(buf)?; if nread == 0 { - self.done = true; - // Reap the child now that we're done reading. If the command - // failed, report stderr as an error. - if !self.child.wait()?.success() { - return Err(io::Error::from(self.stderr.read_to_end())); - } + self.eof = true; + self.close().map(|_| 0) + } else { + Ok(nread) } - Ok(nread) } } diff --git a/crates/core/search.rs b/crates/core/search.rs index 98f524fb..6dc51cfe 100644 --- a/crates/core/search.rs +++ b/crates/core/search.rs @@ -335,7 +335,7 @@ impl SearchWorker { self.searcher.set_binary_detection(bin); if subject.is_stdin() { - self.search_reader(path, io::stdin().lock()) + self.search_reader(path, &mut io::stdin().lock()) } else if self.should_preprocess(path) { self.search_preprocessor(path) } else if self.should_decompress(path) { @@ -399,7 +399,7 @@ impl SearchWorker { let mut cmd = Command::new(bin); cmd.arg(path).stdin(Stdio::from(File::open(path)?)); - let rdr = self.command_builder.build(&mut cmd).map_err(|err| { + let mut rdr = self.command_builder.build(&mut cmd).map_err(|err| { io::Error::new( io::ErrorKind::Other, format!( @@ -408,20 +408,28 @@ impl SearchWorker { ), ) })?; - self.search_reader(path, rdr).map_err(|err| { + let result = self.search_reader(path, &mut rdr).map_err(|err| { io::Error::new( io::ErrorKind::Other, format!("preprocessor command failed: '{:?}': {}", cmd, err), ) - }) + }); + let close_result = rdr.close(); + let search_result = result?; + close_result?; + Ok(search_result) } /// Attempt to decompress the data at the given file path and search the /// result. If the given file path isn't recognized as a compressed file, /// then search it without doing any decompression. fn search_decompress(&mut self, path: &Path) -> io::Result { - let rdr = self.decomp_builder.build(path)?; - self.search_reader(path, rdr) + let mut rdr = self.decomp_builder.build(path)?; + let result = self.search_reader(path, &mut rdr); + let close_result = rdr.close(); + let search_result = result?; + close_result?; + Ok(search_result) } /// Search the contents of the given file path. @@ -448,7 +456,7 @@ impl SearchWorker { fn search_reader( &mut self, path: &Path, - rdr: R, + rdr: &mut R, ) -> io::Result { use self::PatternMatcher::*; @@ -504,12 +512,12 @@ fn search_reader( searcher: &mut Searcher, printer: &mut Printer, path: &Path, - rdr: R, + mut rdr: R, ) -> io::Result { match *printer { Printer::Standard(ref mut p) => { let mut sink = p.sink_with_path(&matcher, path); - searcher.search_reader(&matcher, rdr, &mut sink)?; + searcher.search_reader(&matcher, &mut rdr, &mut sink)?; Ok(SearchResult { has_match: sink.has_match(), stats: sink.stats().map(|s| s.clone()), @@ -517,7 +525,7 @@ fn search_reader( } Printer::Summary(ref mut p) => { let mut sink = p.sink_with_path(&matcher, path); - searcher.search_reader(&matcher, rdr, &mut sink)?; + searcher.search_reader(&matcher, &mut rdr, &mut sink)?; Ok(SearchResult { has_match: sink.has_match(), stats: sink.stats().map(|s| s.clone()), @@ -525,7 +533,7 @@ fn search_reader( } Printer::JSON(ref mut p) => { let mut sink = p.sink_with_path(&matcher, path); - searcher.search_reader(&matcher, rdr, &mut sink)?; + searcher.search_reader(&matcher, &mut rdr, &mut sink)?; Ok(SearchResult { has_match: sink.has_match(), stats: Some(sink.stats().clone()),