diff --git a/Cargo.lock b/Cargo.lock index 6e171451..d6f512f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,7 +3,6 @@ name = "ripgrep" version = "0.2.6" dependencies = [ "ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "grep 0.1.3", @@ -29,6 +28,11 @@ dependencies = [ "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "ctrlc" version = "2.0.1" @@ -39,14 +43,6 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "deque" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "docopt" version = "0.6.86" @@ -109,6 +105,7 @@ dependencies = [ name = "ignore" version = "0.1.3" dependencies = [ + "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.1.1", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -169,14 +166,6 @@ dependencies = [ "libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rand" -version = "0.3.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "regex" version = "0.1.80" @@ -293,8 +282,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" +"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77f98bb69e3fefadcc5ca80a1368a55251f70295168203e01165bcaecb270891" -"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf" "checksum docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)" = "4a7ef30445607f6fc8720f0a0a2c7442284b629cf0d049286860fae23e71c4d9" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" @@ -306,7 +295,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum memmap 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065ce59af31c18ea2c419100bda6247dd4ec3099423202b12f0bd32e529fabd2" "checksum num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8890e6084723d57d0df8d2720b0d60c6ee67d6c93e7169630e4371e88765dcad" -"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5" "checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f" "checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957" "checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b" diff --git a/Cargo.toml b/Cargo.toml index daec1490..95cc97d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ path = "tests/tests.rs" [dependencies] ctrlc = "2.0" -deque = "0.3" docopt = "0.6" env_logger = "0.3" grep = { version = "0.1.3", path = "grep" } diff --git a/ignore/Cargo.toml b/ignore/Cargo.toml index 1d336441..635299f9 100644 --- a/ignore/Cargo.toml +++ b/ignore/Cargo.toml @@ -18,6 +18,7 @@ name = "ignore" bench = false [dependencies] +crossbeam = "0.2" globset = { version = "0.1.1", path = "../globset" } lazy_static = "0.2" log = "0.3" diff --git a/ignore/examples/walk.rs b/ignore/examples/walk.rs index 0ce0a086..0ff4ea94 100644 --- a/ignore/examples/walk.rs +++ b/ignore/examples/walk.rs @@ -1,28 +1,92 @@ -/* +#![allow(dead_code, unused_imports, unused_mut, unused_variables)] + +extern crate crossbeam; extern crate ignore; extern crate walkdir; use std::env; use std::io::{self, Write}; -use std::os::unix::ffi::OsStrExt; +use std::path::Path; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; -use ignore::ignore::IgnoreBuilder; +use crossbeam::sync::MsQueue; +use ignore::WalkBuilder; use walkdir::WalkDir; fn main() { - let path = env::args().nth(1).unwrap(); - let ig = IgnoreBuilder::new().build(); - let wd = WalkDir::new(path); - let walker = ignore::walk::Iter::new(ig, wd); - - let mut stdout = io::BufWriter::new(io::stdout()); - // let mut count = 0; - for dirent in walker { - // count += 1; - stdout.write(dirent.path().as_os_str().as_bytes()).unwrap(); - stdout.write(b"\n").unwrap(); + let mut path = env::args().nth(1).unwrap(); + let mut parallel = false; + let mut simple = false; + let queue: Arc>> = Arc::new(MsQueue::new()); + if path == "parallel" { + path = env::args().nth(2).unwrap(); + parallel = true; + } else if path == "walkdir" { + path = env::args().nth(2).unwrap(); + simple = true; } - // println!("{}", count); + + let stdout_queue = queue.clone(); + let stdout_thread = thread::spawn(move || { + let mut stdout = io::BufWriter::new(io::stdout()); + while let Some(dent) = stdout_queue.pop() { + write_path(&mut stdout, dent.path()); + } + }); + + if parallel { + let walker = WalkBuilder::new(path).threads(6).build_parallel(); + walker.run(|| { + let queue = queue.clone(); + Box::new(move |result| { + use ignore::WalkState::*; + + queue.push(Some(DirEntry::Y(result.unwrap()))); + Continue + }) + }); + } else if simple { + let mut stdout = io::BufWriter::new(io::stdout()); + let walker = WalkDir::new(path); + for result in walker { + queue.push(Some(DirEntry::X(result.unwrap()))); + } + } else { + let mut stdout = io::BufWriter::new(io::stdout()); + let walker = WalkBuilder::new(path).build(); + for result in walker { + queue.push(Some(DirEntry::Y(result.unwrap()))); + } + } + queue.push(None); + stdout_thread.join().unwrap(); +} + +enum DirEntry { + X(walkdir::DirEntry), + Y(ignore::DirEntry), +} + +impl DirEntry { + fn path(&self) -> &Path { + match *self { + DirEntry::X(ref x) => x.path(), + DirEntry::Y(ref y) => y.path(), + } + } +} + +#[cfg(unix)] +fn write_path(mut wtr: W, path: &Path) { + use std::os::unix::ffi::OsStrExt; + wtr.write(path.as_os_str().as_bytes()).unwrap(); + wtr.write(b"\n").unwrap(); +} + +#[cfg(not(unix))] +fn write_path(mut wtr: W, path: &Path) { + wtr.write(path.to_string_lossy().as_bytes()).unwrap(); + wtr.write(b"\n").unwrap(); } -*/ -fn main() {} diff --git a/ignore/src/dir.rs b/ignore/src/dir.rs index 6ac00627..496664f3 100644 --- a/ignore/src/dir.rs +++ b/ignore/src/dir.rs @@ -137,6 +137,11 @@ impl Ignore { self.0.parent.is_none() } + /// Returns true if this matcher was added via the `add_parents` method. + pub fn is_absolute_parent(&self) -> bool { + self.0.is_absolute_parent + } + /// Return this matcher's parent, if one exists. pub fn parent(&self) -> Option { self.0.parent.clone() @@ -376,7 +381,7 @@ impl Ignore { } /// Returns an iterator over parent ignore matchers, including this one. - fn parents(&self) -> Parents { + pub fn parents(&self) -> Parents { Parents(Some(self)) } @@ -387,7 +392,10 @@ impl Ignore { } } -struct Parents<'a>(Option<&'a Ignore>); +/// An iterator over all parents of an ignore matcher, including itself. +/// +/// The lifetime `'a` refers to the lifetime of the initial `Ignore` matcher. +pub struct Parents<'a>(Option<&'a Ignore>); impl<'a> Iterator for Parents<'a> { type Item = &'a Ignore; diff --git a/ignore/src/lib.rs b/ignore/src/lib.rs index a3aa0c8f..d489712d 100644 --- a/ignore/src/lib.rs +++ b/ignore/src/lib.rs @@ -44,6 +44,7 @@ for result in WalkBuilder::new("./").hidden(false).build() { See the documentation for `WalkBuilder` for many other options. */ +extern crate crossbeam; extern crate globset; #[macro_use] extern crate lazy_static; @@ -61,7 +62,7 @@ use std::fmt; use std::io; use std::path::{Path, PathBuf}; -pub use walk::{DirEntry, Walk, WalkBuilder}; +pub use walk::{DirEntry, Walk, WalkBuilder, WalkParallel, WalkState}; mod dir; pub mod gitignore; @@ -80,6 +81,12 @@ pub enum Error { WithLineNumber { line: u64, err: Box }, /// An error associated with a particular file path. WithPath { path: PathBuf, err: Box }, + /// An error associated with a particular directory depth when recursively + /// walking a directory. + WithDepth { depth: usize, err: Box }, + /// An error that occurs when a file loop is detected when traversing + /// symbolic links. + Loop { ancestor: PathBuf, child: PathBuf }, /// An error that occurs when doing I/O, such as reading an ignore file. Io(io::Error), /// An error that occurs when trying to parse a glob. @@ -101,6 +108,7 @@ impl Error { Error::Partial(_) => true, Error::WithLineNumber { ref err, .. } => err.is_partial(), Error::WithPath { ref err, .. } => err.is_partial(), + Error::WithDepth { ref err, .. } => err.is_partial(), _ => false, } } @@ -111,6 +119,8 @@ impl Error { Error::Partial(ref errs) => errs.len() == 1 && errs[0].is_io(), Error::WithLineNumber { ref err, .. } => err.is_io(), Error::WithPath { ref err, .. } => err.is_io(), + Error::WithDepth { ref err, .. } => err.is_io(), + Error::Loop { .. } => false, Error::Io(_) => true, Error::Glob(_) => false, Error::UnrecognizedFileType(_) => false, @@ -118,6 +128,16 @@ impl Error { } } + /// Returns a depth associated with recursively walking a directory (if + /// this error was generated from a recursive directory iterator). + pub fn depth(&self) -> Option { + match *self { + Error::WithPath { ref err, .. } => err.depth(), + Error::WithDepth { depth, .. } => Some(depth), + _ => None, + } + } + /// Turn an error into a tagged error with the given file path. fn with_path>(self, path: P) -> Error { Error::WithPath { @@ -126,6 +146,14 @@ impl Error { } } + /// Turn an error into a tagged error with the given depth. + fn with_depth(self, depth: usize) -> Error { + Error::WithDepth { + depth: depth, + err: Box::new(self), + } + } + /// Turn an error into a tagged error with the given file path and line /// number. If path is empty, then it is omitted from the error. fn tagged>(self, path: P, lineno: u64) -> Error { @@ -146,6 +174,8 @@ impl error::Error for Error { Error::Partial(_) => "partial error", Error::WithLineNumber { ref err, .. } => err.description(), Error::WithPath { ref err, .. } => err.description(), + Error::WithDepth { ref err, .. } => err.description(), + Error::Loop { .. } => "file system loop found", Error::Io(ref err) => err.description(), Error::Glob(ref msg) => msg, Error::UnrecognizedFileType(_) => "unrecognized file type", @@ -168,6 +198,12 @@ impl fmt::Display for Error { Error::WithPath { ref path, ref err } => { write!(f, "{}: {}", path.display(), err) } + Error::WithDepth { ref err, .. } => err.fmt(f), + Error::Loop { ref ancestor, ref child } => { + write!(f, "File system loop found: \ + {} points to an ancestor {}", + child.display(), ancestor.display()) + } Error::Io(ref err) => err.fmt(f), Error::Glob(ref msg) => write!(f, "{}", msg), Error::UnrecognizedFileType(ref ty) => { @@ -187,6 +223,30 @@ impl From for Error { } } +impl From for Error { + fn from(err: walkdir::Error) -> Error { + let depth = err.depth(); + if let (Some(anc), Some(child)) = (err.loop_ancestor(), err.path()) { + return Error::WithDepth { + depth: depth, + err: Box::new(Error::Loop { + ancestor: anc.to_path_buf(), + child: child.to_path_buf(), + }), + }; + } + let path = err.path().map(|p| p.to_path_buf()); + let mut ig_err = Error::Io(io::Error::from(err)); + if let Some(path) = path { + ig_err = Error::WithPath { + path: path, + err: Box::new(ig_err), + }; + } + ig_err + } +} + #[derive(Debug, Default)] struct PartialErrorBuilder(Vec); diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index 0bcc6136..a1ac2de5 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -1,10 +1,15 @@ use std::ffi::OsStr; -use std::fs::{FileType, Metadata}; +use std::fmt; +use std::fs::{self, FileType, Metadata}; use std::io; use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::thread; use std::vec; -use walkdir::{self, WalkDir, WalkDirIterator}; +use crossbeam::sync::MsQueue; +use walkdir::{self, WalkDir, WalkDirIterator, is_same_file}; use dir::{Ignore, IgnoreBuilder}; use gitignore::GitignoreBuilder; @@ -12,6 +17,278 @@ use overrides::Override; use types::Types; use {Error, PartialErrorBuilder}; +/// A directory entry with a possible error attached. +/// +/// The error typically refers to a problem parsing ignore files in a +/// particular directory. +#[derive(Debug)] +pub struct DirEntry { + dent: DirEntryInner, + err: Option, +} + +impl DirEntry { + /// The full path that this entry represents. + pub fn path(&self) -> &Path { + self.dent.path() + } + + /// Whether this entry corresponds to a symbolic link or not. + pub fn path_is_symbolic_link(&self) -> bool { + self.dent.path_is_symbolic_link() + } + + /// Returns true if and only if this entry corresponds to stdin. + /// + /// i.e., The entry has depth 0 and its file name is `-`. + pub fn is_stdin(&self) -> bool { + self.dent.is_stdin() + } + + /// Return the metadata for the file that this entry points to. + pub fn metadata(&self) -> Result { + self.dent.metadata() + } + + /// Return the file type for the file that this entry points to. + /// + /// This entry doesn't have a file type if it corresponds to stdin. + pub fn file_type(&self) -> Option { + self.dent.file_type() + } + + /// Return the file name of this entry. + /// + /// If this entry has no file name (e.g., `/`), then the full path is + /// returned. + pub fn file_name(&self) -> &OsStr { + self.dent.file_name() + } + + /// Returns the depth at which this entry was created relative to the root. + pub fn depth(&self) -> usize { + self.dent.depth() + } + + /// Returns an error, if one exists, associated with processing this entry. + /// + /// An example of an error is one that occurred while parsing an ignore + /// file. + pub fn error(&self) -> Option<&Error> { + self.err.as_ref() + } + + fn new_stdin() -> DirEntry { + DirEntry { + dent: DirEntryInner::Stdin, + err: None, + } + } + + fn new_walkdir(dent: walkdir::DirEntry, err: Option) -> DirEntry { + DirEntry { + dent: DirEntryInner::Walkdir(dent), + err: err, + } + } + + fn new_raw(dent: DirEntryRaw, err: Option) -> DirEntry { + DirEntry { + dent: DirEntryInner::Raw(dent), + err: err, + } + } +} + +/// DirEntryInner is the implementation of DirEntry. +/// +/// It specifically represents three distinct sources of directory entries: +/// +/// 1. From the walkdir crate. +/// 2. Special entries that represent things like stdin. +/// 3. From a path. +/// +/// Specifically, (3) has to essentially re-create the DirEntry implementation +/// from WalkDir. +#[derive(Debug)] +enum DirEntryInner { + Stdin, + Walkdir(walkdir::DirEntry), + Raw(DirEntryRaw), +} + +impl DirEntryInner { + fn path(&self) -> &Path { + use self::DirEntryInner::*; + match *self { + Stdin => Path::new(""), + Walkdir(ref x) => x.path(), + Raw(ref x) => x.path(), + } + } + + fn path_is_symbolic_link(&self) -> bool { + use self::DirEntryInner::*; + match *self { + Stdin => false, + Walkdir(ref x) => x.path_is_symbolic_link(), + Raw(ref x) => x.path_is_symbolic_link(), + } + } + + fn is_stdin(&self) -> bool { + match *self { + DirEntryInner::Stdin => true, + _ => false, + } + } + + fn metadata(&self) -> Result { + use self::DirEntryInner::*; + match *self { + Stdin => { + let err = Error::Io(io::Error::new( + io::ErrorKind::Other, " has no metadata")); + Err(err.with_path("")) + } + Walkdir(ref x) => { + x.metadata().map_err(|err| { + Error::Io(io::Error::from(err)).with_path(x.path()) + }) + } + Raw(ref x) => x.metadata(), + } + } + + fn file_type(&self) -> Option { + use self::DirEntryInner::*; + match *self { + Stdin => None, + Walkdir(ref x) => Some(x.file_type()), + Raw(ref x) => Some(x.file_type()), + } + } + + fn file_name(&self) -> &OsStr { + use self::DirEntryInner::*; + match *self { + Stdin => OsStr::new(""), + Walkdir(ref x) => x.file_name(), + Raw(ref x) => x.file_name(), + } + } + + fn depth(&self) -> usize { + use self::DirEntryInner::*; + match *self { + Stdin => 0, + Walkdir(ref x) => x.depth(), + Raw(ref x) => x.depth(), + } + } +} + +/// DirEntryRaw is essentially copied from the walkdir crate so that we can +/// build `DirEntry`s from whole cloth in the parallel iterator. +struct DirEntryRaw { + /// The path as reported by the `fs::ReadDir` iterator (even if it's a + /// symbolic link). + path: PathBuf, + /// The file type. Necessary for recursive iteration, so store it. + ty: FileType, + /// Is set when this entry was created from a symbolic link and the user + /// expects the iterator to follow symbolic links. + follow_link: bool, + /// The depth at which this entry was generated relative to the root. + depth: usize, +} + +impl fmt::Debug for DirEntryRaw { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Leaving out FileType because it doesn't have a debug impl + // in Rust 1.9. We could add it if we really wanted to by manually + // querying each possibly file type. Meh. ---AG + f.debug_struct("DirEntryRaw") + .field("path", &self.path) + .field("follow_link", &self.follow_link) + .field("depth", &self.depth) + .finish() + } +} + +impl DirEntryRaw { + fn path(&self) -> &Path { + &self.path + } + + fn path_is_symbolic_link(&self) -> bool { + self.ty.is_symlink() || self.follow_link + } + + fn metadata(&self) -> Result { + if self.follow_link { + fs::metadata(&self.path) + } else { + fs::symlink_metadata(&self.path) + }.map_err(|err| Error::Io(io::Error::from(err)).with_path(&self.path)) + } + + fn file_type(&self) -> FileType { + self.ty + } + + fn file_name(&self) -> &OsStr { + self.path.file_name().unwrap_or_else(|| self.path.as_os_str()) + } + + fn depth(&self) -> usize { + self.depth + } + + fn from_entry( + depth: usize, + ent: &fs::DirEntry, + ) -> Result { + let ty = try!(ent.file_type().map_err(|err| { + let err = Error::Io(io::Error::from(err)).with_path(ent.path()); + Error::WithDepth { + depth: depth, + err: Box::new(err), + } + })); + Ok(DirEntryRaw { + path: ent.path(), + ty: ty, + follow_link: false, + depth: depth, + }) + } + + fn from_link(depth: usize, pb: PathBuf) -> Result { + let md = try!(fs::metadata(&pb).map_err(|err| { + Error::Io(err).with_path(&pb) + })); + Ok(DirEntryRaw { + path: pb, + ty: md.file_type(), + follow_link: true, + depth: depth, + }) + } + + fn from_path(depth: usize, pb: PathBuf) -> Result { + let md = try!(fs::symlink_metadata(&pb).map_err(|err| { + Error::Io(err).with_path(&pb) + })); + Ok(DirEntryRaw { + path: pb, + ty: md.file_type(), + follow_link: false, + depth: depth, + }) + } +} + /// WalkBuilder builds a recursive directory iterator. /// /// The builder supports a large number of configurable options. This includes @@ -58,12 +335,14 @@ use {Error, PartialErrorBuilder}; /// path is skipped. /// * Sixth, if the path has made it this far then it is yielded in the /// iterator. +#[derive(Clone, Debug)] pub struct WalkBuilder { paths: Vec, ig_builder: IgnoreBuilder, parents: bool, max_depth: Option, follow_links: bool, + threads: usize, } impl WalkBuilder { @@ -80,6 +359,7 @@ impl WalkBuilder { parents: true, max_depth: None, follow_links: false, + threads: 0, } } @@ -109,6 +389,22 @@ impl WalkBuilder { } } + /// Build a new `WalkParallel` iterator. + /// + /// Note that this *doesn't* return something that implements `Iterator`. + /// Instead, the returned value must be run with a closure. e.g., + /// `builder.build_parallel().run(|| |path| println!("{:?}", path))`. + pub fn build_parallel(&self) -> WalkParallel { + WalkParallel { + paths: self.paths.clone().into_iter(), + ig_root: self.ig_builder.build(), + max_depth: self.max_depth, + follow_links: self.follow_links, + parents: self.parents, + threads: self.threads, + } + } + /// Add a file path to the iterator. /// /// Each additional file path added is traversed recursively. This should @@ -133,6 +429,17 @@ impl WalkBuilder { self } + /// The number of threads to use for traversal. + /// + /// Note that this only has an effect when using `build_parallel`. + /// + /// The default setting is `0`, which chooses the number of threads + /// automatically using heuristics. + pub fn threads(&mut self, n: usize) -> &mut WalkBuilder { + self.threads = n; + self + } + /// Add an ignore file to the matcher. /// /// This has lower precedence than all other sources of ignore rules. @@ -239,7 +546,8 @@ impl WalkBuilder { } } -/// Walk is a recursive directory iterator over file paths in a directory. +/// Walk is a recursive directory iterator over file paths in one or more +/// directories. /// /// Only file and directory paths matching the rules are returned. By default, /// ignore files like `.gitignore` are respected. The precise matching rules @@ -264,17 +572,9 @@ impl Walk { fn skip_entry(&self, ent: &walkdir::DirEntry) -> bool { if ent.depth() == 0 { - // Never skip the root directory. return false; } - let m = self.ig.matched(ent.path(), ent.file_type().is_dir()); - if m.is_ignore() { - debug!("ignoring {}: {:?}", ent.path().display(), m); - return true; - } else if m.is_whitelist() { - debug!("whitelisting {}: {:?}", ent.path().display(), m); - } - false + skip_path(&self.ig, ent.path(), ent.file_type().is_dir()) } } @@ -290,10 +590,7 @@ impl Iterator for Walk { match self.its.next() { None => return None, Some((_, None)) => { - return Some(Ok(DirEntry { - dent: None, - err: None, - })); + return Some(Ok(DirEntry::new_stdin())); } Some((path, Some(it))) => { self.it = Some(it); @@ -313,15 +610,7 @@ impl Iterator for Walk { }; match ev { Err(err) => { - let path = err.path().map(|p| p.to_path_buf()); - let mut ig_err = Error::Io(io::Error::from(err)); - if let Some(path) = path { - ig_err = Error::WithPath { - path: path.to_path_buf(), - err: Box::new(ig_err), - }; - } - return Some(Err(ig_err)); + return Some(Err(Error::from(err))); } Ok(WalkEvent::Exit) => { self.ig = self.ig.parent().unwrap(); @@ -338,98 +627,19 @@ impl Iterator for Walk { } let (igtmp, err) = self.ig.add_child(ent.path()); self.ig = igtmp; - return Some(Ok(DirEntry { dent: Some(ent), err: err })); + return Some(Ok(DirEntry::new_walkdir(ent, err))); } Ok(WalkEvent::File(ent)) => { if self.skip_entry(&ent) { continue; } - // If this isn't actually a file (e.g., a symlink), - // then skip it. - if !ent.file_type().is_file() { - continue; - } - return Some(Ok(DirEntry { dent: Some(ent), err: None })); + return Some(Ok(DirEntry::new_walkdir(ent, None))); } } } } } -/// A directory entry with a possible error attached. -/// -/// The error typically refers to a problem parsing ignore files in a -/// particular directory. -#[derive(Debug)] -pub struct DirEntry { - dent: Option, - err: Option, -} - -impl DirEntry { - /// The full path that this entry represents. - pub fn path(&self) -> &Path { - self.dent.as_ref().map_or(Path::new(""), |x| x.path()) - } - - /// Whether this entry corresponds to a symbolic link or not. - pub fn path_is_symbolic_link(&self) -> bool { - self.dent.as_ref().map_or(false, |x| x.path_is_symbolic_link()) - } - - /// Returns true if and only if this entry corresponds to stdin. - /// - /// i.e., The entry has depth 0 and its file name is `-`. - pub fn is_stdin(&self) -> bool { - self.dent.is_none() - } - - /// Return the metadata for the file that this entry points to. - pub fn metadata(&self) -> Result { - if let Some(dent) = self.dent.as_ref() { - dent.metadata().map_err(|err| Error::WithPath { - path: self.path().to_path_buf(), - err: Box::new(Error::Io(io::Error::from(err))), - }) - } else { - let ioerr = io::Error::new( - io::ErrorKind::Other, "stdin has no metadata"); - Err(Error::WithPath { - path: Path::new("").to_path_buf(), - err: Box::new(Error::Io(ioerr)), - }) - } - } - - /// Return the file type for the file that this entry points to. - /// - /// This entry doesn't have a file type if it corresponds to stdin. - pub fn file_type(&self) -> Option { - self.dent.as_ref().map(|x| x.file_type()) - } - - /// Return the file name of this entry. - /// - /// If this entry has no file name (e.g., `/`), then the full path is - /// returned. - pub fn file_name(&self) -> &OsStr { - self.dent.as_ref().map_or(OsStr::new(""), |x| x.file_name()) - } - - /// Returns the depth at which this entry was created relative to the root. - pub fn depth(&self) -> usize { - self.dent.as_ref().map_or(0, |x| x.depth()) - } - - /// Returns an error, if one exists, associated with processing this entry. - /// - /// An example of an error is one that occurred while parsing an ignore - /// file. - pub fn error(&self) -> Option<&Error> { - self.err.as_ref() - } -} - /// WalkEventIter transforms a WalkDir iterator into an iterator that more /// accurately describes the directory tree. Namely, it emits events that are /// one of three types: directory, file or "exit." An "exit" event means that @@ -485,21 +695,497 @@ impl Iterator for WalkEventIter { } } +/// WalkState is used in the parallel recursive directory iterator to indicate +/// whether walking should continue as normal, skip descending into a +/// particular directory or quit the walk entirely. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum WalkState { + /// Continue walking as normal. + Continue, + /// If the directory entry given is a directory, don't descend into it. + /// In all other cases, this has no effect. + Skip, + /// Quit the entire iterator as soon as possible. + /// + /// Note that this is an inherently asynchronous action. It is possible + /// for more entries to be yielded even after instructing the iterator + /// to quit. + Quit, +} + +impl WalkState { + fn is_quit(&self) -> bool { + *self == WalkState::Quit + } +} + +/// WalkParallel is a parallel recursive directory iterator over files paths +/// in one or more directories. +/// +/// Only file and directory paths matching the rules are returned. By default, +/// ignore files like `.gitignore` are respected. The precise matching rules +/// and precedence is explained in the documentation for `WalkBuilder`. +/// +/// Unlike `Walk`, this uses multiple threads for traversing a directory. +pub struct WalkParallel { + paths: vec::IntoIter, + ig_root: Ignore, + parents: bool, + max_depth: Option, + follow_links: bool, + threads: usize, +} + +impl WalkParallel { + /// Execute the parallel recursive directory iterator. `mkf` is called + /// for each thread used for iteration. The function produced by `mkf` + /// is then in turn called for each visited file path. + pub fn run( + self, + mut mkf: F, + ) where F: FnMut() -> Box) -> WalkState + Send + 'static> { + let mut f = mkf(); + let threads = self.threads(); + let queue = Arc::new(MsQueue::new()); + let mut any_dirs = false; + // Send the initial set of root paths to the pool of workers. + // Note that we only send directories. For files, we send to them the + // callback directly. + for path in self.paths { + if path == Path::new("-") { + if f(Ok(DirEntry::new_stdin())).is_quit() { + return; + } + continue; + } + let dent = match DirEntryRaw::from_path(0, path) { + Ok(dent) => DirEntry::new_raw(dent, None), + Err(err) => { + if f(Err(err)).is_quit() { + return; + } + continue; + } + }; + if !dent.file_type().map_or(false, |t| t.is_dir()) { + if f(Ok(dent)).is_quit() { + return; + } + } else { + any_dirs = true; + queue.push(Message::Work(Work { + dent: dent, + ignore: self.ig_root.clone(), + })); + } + } + // ... but there's no need to start workers if we don't need them. + if !any_dirs { + return; + } + // Create the workers and then wait for them to finish. + let num_waiting = Arc::new(AtomicUsize::new(0)); + let num_quitting = Arc::new(AtomicUsize::new(0)); + let mut handles = vec![]; + for _ in 0..threads { + let worker = Worker { + f: mkf(), + queue: queue.clone(), + quit_now: Arc::new(AtomicBool::new(false)), + is_waiting: false, + is_quitting: false, + num_waiting: num_waiting.clone(), + num_quitting: num_quitting.clone(), + threads: threads, + parents: self.parents, + max_depth: self.max_depth, + follow_links: self.follow_links, + }; + handles.push(thread::spawn(|| worker.run())); + } + for handle in handles { + handle.join().unwrap(); + } + } + + fn threads(&self) -> usize { + if self.threads == 0 { + 2 + } else { + self.threads + } + } +} + +/// Message is the set of instructions that a worker knows how to process. +enum Message { + /// A work item corresponds to a directory that should be descended into. + /// Work items for entries that should be skipped or ignored should not + /// be produced. + Work(Work), + /// This instruction indicates that the worker should start quitting. + Quit, +} + +/// A unit of work for each worker to process. +/// +/// Each unit of work corresponds to a directory that should be descended +/// into. +struct Work { + /// The directory entry. + dent: DirEntry, + /// Any ignore matchers that have been built for this directory's parents. + ignore: Ignore, +} + +impl Work { + /// Adds ignore rules for parent directories. + /// + /// Note that this only applies to entries at depth 0. On all other + /// entries, this is a no-op. + fn add_parents(&mut self) -> Option { + if self.dent.depth() > 0 { + return None; + } + // At depth 0, the path of this entry is a root path, so we can + // use it directly to add parent ignore rules. + let (ig, err) = self.ignore.add_parents(self.dent.path()); + self.ignore = ig; + err + } + + /// Reads the directory contents of this work item and adds ignore + /// rules for this directory. + /// + /// If there was a problem with reading the directory contents, then + /// an error is returned. If there was a problem reading the ignore + /// rules for this directory, then the error is attached to this + /// work item's directory entry. + fn read_dir(&mut self) -> Result { + let readdir = match fs::read_dir(self.dent.path()) { + Ok(readdir) => readdir, + Err(err) => { + let err = Error::from(err) + .with_path(self.dent.path()) + .with_depth(self.dent.depth()); + return Err(err); + } + }; + let (ig, err) = self.ignore.add_child(self.dent.path()); + self.ignore = ig; + self.dent.err = err; + Ok(readdir) + } +} + +/// A worker is responsible for descending into directories, updating the +/// ignore matchers, producing new work and invoking the caller's callback. +/// +/// Note that a worker is *both* a producer and a consumer. +struct Worker { + /// The caller's callback. + f: Box) -> WalkState + Send + 'static>, + /// A queue of work items. This is multi-producer and multi-consumer. + queue: Arc>, + /// Whether all workers should quit at the next opportunity. Note that + /// this is distinct from quitting because of exhausting the contents of + /// a directory. Instead, this is used when the caller's callback indicates + /// that the iterator should quit immediately. + quit_now: Arc, + /// Whether this worker is waiting for more work. + is_waiting: bool, + /// Whether this worker has started to quit. + is_quitting: bool, + /// The number of workers waiting for more work. + num_waiting: Arc, + /// The number of workers waiting to quit. + num_quitting: Arc, + /// The total number of workers. + threads: usize, + /// Whether to create ignore matchers for parents of caller specified + /// directories. + parents: bool, + /// The maximum depth of directories to descend. A value of `0` means no + /// descension at all. + max_depth: Option, + /// Whether to follow symbolic links or not. When this is enabled, loop + /// detection is performed. + follow_links: bool, +} + +impl Worker { + /// Runs this worker until there is no more work left to do. + /// + /// The worker will call the caller's callback for all entries that aren't + /// skipped by the ignore matcher. + fn run(mut self) { + while let Some(mut work) = self.get_work() { + let depth = work.dent.depth(); + if self.parents { + if let Some(err) = work.add_parents() { + if (self.f)(Err(err)).is_quit() { + self.quit_now(); + return; + } + } + } + let readdir = match work.read_dir() { + Ok(readdir) => readdir, + Err(err) => { + if (self.f)(Err(err)).is_quit() { + self.quit_now(); + return; + } + continue; + } + }; + match (self.f)(Ok(work.dent)) { + WalkState::Continue => {} + WalkState::Skip => continue, + WalkState::Quit => { + self.quit_now(); + return; + } + } + if self.max_depth.map_or(false, |max| depth >= max) { + continue; + } + for result in readdir { + if self.run_one(&work.ignore, depth + 1, result).is_quit() { + self.quit_now(); + return; + } + } + } + } + + /// Runs the worker on a single entry from a directory iterator. + /// + /// If the entry is a path that should be ignored, then this is a no-op. + /// Otherwise, if the entry is a directory, then it is pushed on to the + /// queue. If the entry isn't a directory, the caller's callback is + /// applied. + /// + /// If an error occurs while reading the entry, then it is sent to the + /// caller's callback. + /// + /// `ig` is the `Ignore` matcher for the parent directory. `depth` should + /// be the depth of this entry. `result` should be the item yielded by + /// a directory iterator. + fn run_one( + &mut self, + ig: &Ignore, + depth: usize, + result: Result, + ) -> WalkState { + let fs_dent = match result { + Ok(fs_dent) => fs_dent, + Err(err) => { + return (self.f)(Err(Error::from(err).with_depth(depth))); + } + }; + let mut dent = match DirEntryRaw::from_entry(depth, &fs_dent) { + Ok(dent) => DirEntry::new_raw(dent, None), + Err(err) => { + return (self.f)(Err(err)); + } + }; + let is_symlink = dent.file_type().map_or(false, |ft| ft.is_symlink()); + if self.follow_links && is_symlink { + let path = dent.path().to_path_buf(); + dent = match DirEntryRaw::from_link(depth, path) { + Ok(dent) => DirEntry::new_raw(dent, None), + Err(err) => { + return (self.f)(Err(err)); + } + }; + if dent.file_type().map_or(false, |ft| ft.is_dir()) { + if let Err(err) = check_symlink_loop(ig, dent.path(), depth) { + return (self.f)(Err(err)); + } + } + } + let is_dir = dent.file_type().map_or(false, |ft| ft.is_dir()); + if skip_path(ig, dent.path(), is_dir) { + WalkState::Continue + } else if !is_dir { + (self.f)(Ok(dent)) + } else { + self.queue.push(Message::Work(Work { + dent: dent, + ignore: ig.clone(), + })); + WalkState::Continue + } + } + + /// Returns the next directory to descend into. + /// + /// If all work has been exhausted, then this returns None. The worker + /// should then subsequently quit. + fn get_work(&mut self) -> Option { + loop { + if self.is_quit_now() { + return None; + } + match self.queue.try_pop() { + Some(Message::Work(work)) => { + self.waiting(false); + self.quitting(false); + return Some(work); + } + Some(Message::Quit) => { + // We can't just quit because a Message::Quit could be + // spurious. For example, it's possible to observe that + // all workers are waiting even if there's more work to + // be done. + // + // Therefore, we do a bit of a dance to wait until all + // workers have signaled that they're ready to quit before + // actually quitting. + // + // If the Quit message turns out to be spurious, then the + // loop below will break and we'll go back to looking for + // more work. + self.waiting(true); + self.quitting(true); + while !self.is_quit_now() { + let nwait = self.num_waiting(); + let nquit = self.num_quitting(); + // If the number of waiting workers dropped, then + // abort our attempt to quit. + if nwait < self.threads { + break; + } + // If all workers are in this quit loop, then we + // can stop. + if nquit == self.threads { + return None; + } + // Otherwise, spin. + } + // If we're here, then we've aborted our quit attempt. + continue; + } + None => { + self.waiting(true); + self.quitting(false); + if self.num_waiting() == self.threads { + for _ in 0..self.threads { + self.queue.push(Message::Quit); + } + } + continue; + } + } + } + } + + /// Indicates that all workers should quit immediately. + fn quit_now(&self) { + self.quit_now.store(true, Ordering::SeqCst); + } + + /// Returns true if this worker should quit immediately. + fn is_quit_now(&self) -> bool { + self.quit_now.load(Ordering::SeqCst) + } + + /// Returns the total number of workers waiting for work. + fn num_waiting(&self) -> usize { + self.num_waiting.load(Ordering::SeqCst) + } + + /// Returns the total number of workers ready to quit. + fn num_quitting(&self) -> usize { + self.num_quitting.load(Ordering::SeqCst) + } + + /// Sets this worker's "quitting" state to the value of `yes`. + fn quitting(&mut self, yes: bool) { + if yes { + if !self.is_quitting { + self.is_quitting = true; + self.num_quitting.fetch_add(1, Ordering::SeqCst); + } + } else { + if self.is_quitting { + self.is_quitting = false; + self.num_quitting.fetch_sub(1, Ordering::SeqCst); + } + } + } + + /// Sets this worker's "waiting" state to the value of `yes`. + fn waiting(&mut self, yes: bool) { + if yes { + if !self.is_waiting { + self.is_waiting = true; + self.num_waiting.fetch_add(1, Ordering::SeqCst); + } + } else { + if self.is_waiting { + self.is_waiting = false; + self.num_waiting.fetch_sub(1, Ordering::SeqCst); + } + } + } +} + +fn check_symlink_loop( + ig_parent: &Ignore, + child_path: &Path, + child_depth: usize, +) -> Result<(), Error> { + for ig in ig_parent.parents().take_while(|ig| !ig.is_absolute_parent()) { + let same = try!(is_same_file(ig.path(), child_path).map_err(|err| { + Error::from(err).with_path(child_path).with_depth(child_depth) + })); + if same { + return Err(Error::Loop { + ancestor: ig.path().to_path_buf(), + child: child_path.to_path_buf(), + }.with_depth(child_depth)); + } + } + Ok(()) +} + +fn skip_path(ig: &Ignore, path: &Path, is_dir: bool) -> bool { + let m = ig.matched(path, is_dir); + if m.is_ignore() { + debug!("ignoring {}: {:?}", path.display(), m); + true + } else if m.is_whitelist() { + debug!("whitelisting {}: {:?}", path.display(), m); + false + } else { + false + } +} + #[cfg(test)] mod tests { use std::fs::{self, File}; use std::io::Write; use std::path::Path; + use std::sync::{Arc, Mutex}; use tempdir::TempDir; - use super::{Walk, WalkBuilder}; + use super::{WalkBuilder, WalkState}; fn wfile>(path: P, contents: &str) { let mut file = File::create(path).unwrap(); file.write_all(contents.as_bytes()).unwrap(); } + #[cfg(unix)] + fn symlink, Q: AsRef>(src: P, dst: Q) { + use std::os::unix::fs::symlink; + symlink(src, dst).unwrap(); + } + fn mkdirp>(path: P) { fs::create_dir_all(path).unwrap(); } @@ -512,10 +1198,13 @@ mod tests { } } - fn walk_collect(prefix: &Path, walk: Walk) -> Vec { + fn walk_collect(prefix: &Path, builder: &WalkBuilder) -> Vec { let mut paths = vec![]; - for dent in walk { - let dent = dent.unwrap(); + for result in builder.build() { + let dent = match result { + Err(_) => continue, + Ok(dent) => dent, + }; let path = dent.path().strip_prefix(prefix).unwrap(); if path.as_os_str().is_empty() { continue; @@ -526,12 +1215,51 @@ mod tests { paths } + fn walk_collect_parallel( + prefix: &Path, + builder: &WalkBuilder, + ) -> Vec { + let paths = Arc::new(Mutex::new(vec![])); + let prefix = Arc::new(prefix.to_path_buf()); + builder.build_parallel().run(|| { + let paths = paths.clone(); + let prefix = prefix.clone(); + Box::new(move |result| { + let dent = match result { + Err(_) => return WalkState::Continue, + Ok(dent) => dent, + }; + let path = dent.path().strip_prefix(&**prefix).unwrap(); + if path.as_os_str().is_empty() { + return WalkState::Continue; + } + let mut paths = paths.lock().unwrap(); + paths.push(normal_path(path.to_str().unwrap())); + WalkState::Continue + }) + }); + let mut paths = paths.lock().unwrap(); + paths.sort(); + paths.to_vec() + } + fn mkpaths(paths: &[&str]) -> Vec { let mut paths: Vec<_> = paths.iter().map(|s| s.to_string()).collect(); paths.sort(); paths } + fn assert_paths( + prefix: &Path, + builder: &WalkBuilder, + expected: &[&str], + ) { + let got = walk_collect(prefix, builder); + assert_eq!(got, mkpaths(expected)); + let got = walk_collect_parallel(prefix, builder); + assert_eq!(got, mkpaths(expected)); + } + #[test] fn no_ignores() { let td = TempDir::new("walk-test-").unwrap(); @@ -540,10 +1268,9 @@ mod tests { wfile(td.path().join("a/b/foo"), ""); wfile(td.path().join("x/y/foo"), ""); - let got = walk_collect(td.path(), Walk::new(td.path())); - assert_eq!(got, mkpaths(&[ + assert_paths(td.path(), &WalkBuilder::new(td.path()), &[ "x", "x/y", "x/y/foo", "a", "a/b", "a/b/foo", "a/b/c", - ])); + ]); } #[test] @@ -556,8 +1283,9 @@ mod tests { wfile(td.path().join("bar"), ""); wfile(td.path().join("a/bar"), ""); - let got = walk_collect(td.path(), Walk::new(td.path())); - assert_eq!(got, mkpaths(&["bar", "a", "a/bar"])); + assert_paths(td.path(), &WalkBuilder::new(td.path()), &[ + "bar", "a", "a/bar", + ]); } #[test] @@ -573,8 +1301,7 @@ mod tests { let mut builder = WalkBuilder::new(td.path()); assert!(builder.add_ignore(&igpath).is_none()); - let got = walk_collect(td.path(), builder.build()); - assert_eq!(got, mkpaths(&["bar", "a", "a/bar"])); + assert_paths(td.path(), &builder, &["bar", "a", "a/bar"]); } #[test] @@ -586,7 +1313,59 @@ mod tests { wfile(td.path().join("a/bar"), ""); let root = td.path().join("a"); - let got = walk_collect(&root, Walk::new(&root)); - assert_eq!(got, mkpaths(&["bar"])); + assert_paths(&root, &WalkBuilder::new(&root), &["bar"]); + } + + #[test] + fn max_depth() { + let td = TempDir::new("walk-test-").unwrap(); + mkdirp(td.path().join("a/b/c")); + wfile(td.path().join("foo"), ""); + wfile(td.path().join("a/foo"), ""); + wfile(td.path().join("a/b/foo"), ""); + wfile(td.path().join("a/b/c/foo"), ""); + + let mut builder = WalkBuilder::new(td.path()); + assert_paths(td.path(), &builder, &[ + "a", "a/b", "a/b/c", "foo", "a/foo", "a/b/foo", "a/b/c/foo", + ]); + assert_paths(td.path(), builder.max_depth(Some(0)), &[]); + assert_paths(td.path(), builder.max_depth(Some(1)), &["a", "foo"]); + assert_paths(td.path(), builder.max_depth(Some(2)), &[ + "a", "a/b", "foo", "a/foo", + ]); + } + + #[cfg(unix)] // because symlinks on windows are weird + #[test] + fn symlinks() { + let td = TempDir::new("walk-test-").unwrap(); + mkdirp(td.path().join("a/b")); + symlink(td.path().join("a/b"), td.path().join("z")); + wfile(td.path().join("a/b/foo"), ""); + + let mut builder = WalkBuilder::new(td.path()); + assert_paths(td.path(), &builder, &[ + "a", "a/b", "a/b/foo", "z", + ]); + assert_paths(td.path(), &builder.follow_links(true), &[ + "a", "a/b", "a/b/foo", "z", "z/foo", + ]); + } + + #[cfg(unix)] // because symlinks on windows are weird + #[test] + fn symlink_loop() { + let td = TempDir::new("walk-test-").unwrap(); + mkdirp(td.path().join("a/b")); + symlink(td.path().join("a"), td.path().join("a/b/c")); + + let mut builder = WalkBuilder::new(td.path()); + assert_paths(td.path(), &builder, &[ + "a", "a/b", "a/b/c", + ]); + assert_paths(td.path(), &builder.follow_links(true), &[ + "a", "a/b", + ]); } } diff --git a/src/args.rs b/src/args.rs index 9d2923b8..017ade4c 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,4 +1,3 @@ -use std::cmp; use std::env; use std::io; use std::path::{Path, PathBuf}; @@ -21,10 +20,9 @@ use ignore::types::{FileTypeDef, Types, TypesBuilder}; use ignore; use out::{Out, ColoredTerminal}; use printer::Printer; -use search_buffer::BufferSearcher; -use search_stream::{InputBuffer, Searcher}; #[cfg(windows)] use terminal_win::WindowsBuffer; +use worker::{Worker, WorkerBuilder}; use Result; @@ -364,7 +362,7 @@ impl RawArgs { }; let threads = if self.flag_threads == 0 { - cmp::min(8, num_cpus::get()) + num_cpus::get() } else { self.flag_threads }; @@ -576,18 +574,6 @@ impl Args { self.grep.clone() } - /// Creates a new input buffer that is used in searching. - pub fn input_buffer(&self) -> InputBuffer { - let mut inp = InputBuffer::new(); - inp.eol(self.eol); - inp - } - - /// Whether we should prefer memory maps for searching or not. - pub fn mmap(&self) -> bool { - self.mmap - } - /// Whether ripgrep should be quiet or not. pub fn quiet(&self) -> bool { self.quiet @@ -662,18 +648,16 @@ impl Args { &self.paths } - /// Create a new line based searcher whose configuration is taken from the - /// command line. This searcher supports a dizzying array of features: - /// inverted matching, line counting, context control and more. - pub fn searcher<'a, R: io::Read, W: Send + Terminal>( - &self, - inp: &'a mut InputBuffer, - printer: &'a mut Printer, - grep: &'a Grep, - path: &'a Path, - rdr: R, - ) -> Searcher<'a, R, W> { - Searcher::new(inp, printer, grep, path, rdr) + /// Returns true if there is exactly one file path given to search. + pub fn is_one_path(&self) -> bool { + self.paths.len() == 1 + && (self.paths[0] == Path::new("-") || self.paths[0].is_file()) + } + + /// Create a worker whose configuration is taken from the + /// command line. + pub fn worker(&self) -> Worker { + WorkerBuilder::new(self.grep()) .after_context(self.after_context) .before_context(self.before_context) .count(self.count) @@ -681,28 +665,10 @@ impl Args { .eol(self.eol) .line_number(self.line_number) .invert_match(self.invert_match) + .mmap(self.mmap) .quiet(self.quiet) .text(self.text) - } - - /// Create a new line based searcher whose configuration is taken from the - /// command line. This search operates on an entire file all once (which - /// may have been memory mapped). - pub fn searcher_buffer<'a, W: Send + Terminal>( - &self, - printer: &'a mut Printer, - grep: &'a Grep, - path: &'a Path, - buf: &'a [u8], - ) -> BufferSearcher<'a, W> { - BufferSearcher::new(printer, grep, path, buf) - .count(self.count) - .files_with_matches(self.files_with_matches) - .eol(self.eol) - .line_number(self.line_number) - .invert_match(self.invert_match) - .quiet(self.quiet) - .text(self.text) + .build() } /// Returns the number of worker search threads that should be used. @@ -722,7 +688,17 @@ impl Args { } /// Create a new recursive directory iterator over the paths in argv. - pub fn walker(&self) -> Walk { + pub fn walker(&self) -> ignore::Walk { + self.walker_builder().build() + } + + /// Create a new parallel recursive directory iterator over the paths + /// in argv. + pub fn walker_parallel(&self) -> ignore::WalkParallel { + self.walker_builder().build_parallel() + } + + fn walker_builder(&self) -> ignore::WalkBuilder { let paths = self.paths(); let mut wd = ignore::WalkBuilder::new(&paths[0]); for path in &paths[1..] { @@ -744,7 +720,8 @@ impl Args { wd.git_exclude(!self.no_ignore && !self.no_ignore_vcs); wd.ignore(!self.no_ignore); wd.parents(!self.no_ignore_parent); - Walk(wd.build()) + wd.threads(self.threads()); + wd } } @@ -761,34 +738,6 @@ fn version() -> String { } } -/// A simple wrapper around the ignore::Walk iterator. This will -/// automatically emit error messages to stderr and will skip directories. -pub struct Walk(ignore::Walk); - -impl Iterator for Walk { - type Item = ignore::DirEntry; - - fn next(&mut self) -> Option { - while let Some(result) = self.0.next() { - match result { - Ok(dent) => { - if let Some(err) = dent.error() { - eprintln!("{}", err); - } - if dent.file_type().map_or(false, |x| x.is_dir()) { - continue; - } - return Some(dent); - } - Err(err) => { - eprintln!("{}", err); - } - } - } - None - } -} - /// A single state in the state machine used by `unescape`. #[derive(Clone, Copy, Eq, PartialEq)] enum State { diff --git a/src/main.rs b/src/main.rs index 71c4da32..276ee059 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ extern crate ctrlc; -extern crate deque; extern crate docopt; extern crate env_logger; extern crate grep; @@ -21,30 +20,20 @@ extern crate term; extern crate winapi; use std::error::Error; -use std::fs::File; use std::io; use std::io::Write; -use std::path::Path; use std::process; use std::result; use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc; use std::thread; use std::cmp; -use deque::{Stealer, Stolen}; -use grep::Grep; -use memmap::{Mmap, Protection}; use term::Terminal; -use ignore::DirEntry; use args::Args; -use out::{ColoredTerminal, Out}; -use pathutil::strip_prefix; -use printer::Printer; -use search_stream::InputBuffer; -#[cfg(windows)] -use terminal_win::WindowsBuffer; +use worker::Work; macro_rules! errored { ($($tt:tt)*) => { @@ -68,11 +57,12 @@ mod search_buffer; mod search_stream; #[cfg(windows)] mod terminal_win; +mod worker; pub type Result = result::Result>; fn main() { - match Args::parse().and_then(run) { + match Args::parse().map(Arc::new).and_then(run) { Ok(count) if count == 0 => process::exit(1), Ok(_) => process::exit(0), Err(err) => { @@ -82,95 +72,108 @@ fn main() { } } -fn run(args: Args) -> Result { - let args = Arc::new(args); +fn run(args: Arc) -> Result { + { + let args = args.clone(); + ctrlc::set_handler(move || { + let stdout = io::stdout(); + let mut stdout = stdout.lock(); - let handler_args = args.clone(); - ctrlc::set_handler(move || { - let stdout = io::stdout(); - let mut stdout = stdout.lock(); + let _ = args.stdout().reset(); + let _ = stdout.flush(); - let _ = handler_args.stdout().reset(); - let _ = stdout.flush(); - - process::exit(1); - }); - - let paths = args.paths(); + process::exit(1); + }); + } let threads = cmp::max(1, args.threads() - 1); - let isone = - paths.len() == 1 && (paths[0] == Path::new("-") || paths[0].is_file()); if args.files() { - return run_files(args.clone()); - } - if args.type_list() { - return run_types(args.clone()); - } - if threads == 1 || isone { - return run_one_thread(args.clone()); + if threads == 1 || args.is_one_path() { + run_files_one_thread(args) + } else { + run_files_parallel(args) + } + } else if args.type_list() { + run_types(args) + } else if threads == 1 || args.is_one_path() { + run_one_thread(args) + } else { + run_parallel(args) } +} + +fn run_parallel(args: Arc) -> Result { let out = Arc::new(Mutex::new(args.out())); let quiet_matched = QuietMatched::new(args.quiet()); - let mut workers = vec![]; + let paths_searched = Arc::new(AtomicUsize::new(0)); + let match_count = Arc::new(AtomicUsize::new(0)); - let workq = { - let (workq, stealer) = deque::new(); - for _ in 0..threads { - let worker = MultiWorker { - chan_work: stealer.clone(), - quiet_matched: quiet_matched.clone(), - out: out.clone(), - outbuf: Some(args.outbuf()), - worker: Worker { - args: args.clone(), - inpbuf: args.input_buffer(), - grep: args.grep(), - match_count: 0, - }, + args.walker_parallel().run(|| { + let args = args.clone(); + let quiet_matched = quiet_matched.clone(); + let paths_searched = paths_searched.clone(); + let match_count = match_count.clone(); + let out = out.clone(); + let mut outbuf = args.outbuf(); + let mut worker = args.worker(); + Box::new(move |result| { + use ignore::WalkState::*; + + if quiet_matched.has_match() { + return Quit; + } + let dent = match get_or_log_dir_entry(result) { + None => return Continue, + Some(dent) => dent, }; - workers.push(thread::spawn(move || worker.run())); - } - workq - }; - let mut paths_searched: u64 = 0; - for dent in args.walker() { - if quiet_matched.has_match() { - break; - } - paths_searched += 1; - if dent.is_stdin() { - workq.push(Work::Stdin); - } else { - workq.push(Work::File(dent)); - } + paths_searched.fetch_add(1, Ordering::SeqCst); + outbuf.clear(); + { + // This block actually executes the search and prints the + // results into outbuf. + let mut printer = args.printer(&mut outbuf); + let count = + if dent.is_stdin() { + worker.run(&mut printer, Work::Stdin) + } else { + worker.run(&mut printer, Work::DirEntry(dent)) + }; + match_count.fetch_add(count as usize, Ordering::SeqCst); + if quiet_matched.set_match(count > 0) { + return Quit; + } + } + if !outbuf.get_ref().is_empty() { + // This should be the only mutex in all of ripgrep. Since the + // common case is to report a small number of matches relative + // to the corpus, this really shouldn't matter much. + // + // Still, it'd be nice to send this on a channel, but then we'd + // need to manage a pool of outbufs, which would complicate the + // code. + let mut out = out.lock().unwrap(); + out.write(&outbuf); + } + Continue + }) + }); + if !args.paths().is_empty() && paths_searched.load(Ordering::SeqCst) == 0 { + eprint_nothing_searched(); } - if !paths.is_empty() && paths_searched == 0 { - eprintln!("No files were searched, which means ripgrep probably \ - applied a filter you didn't expect. \ - Try running again with --debug."); - } - for _ in 0..workers.len() { - workq.push(Work::Quit); - } - let mut match_count = 0; - for worker in workers { - match_count += worker.join().unwrap(); - } - Ok(match_count) + Ok(match_count.load(Ordering::SeqCst) as u64) } fn run_one_thread(args: Arc) -> Result { - let mut worker = Worker { - args: args.clone(), - inpbuf: args.input_buffer(), - grep: args.grep(), - match_count: 0, - }; + let mut worker = args.worker(); let mut term = args.stdout(); let mut paths_searched: u64 = 0; - for dent in args.walker() { + let mut match_count = 0; + for result in args.walker() { + let dent = match get_or_log_dir_entry(result) { + None => continue, + Some(dent) => dent, + }; let mut printer = args.printer(&mut term); - if worker.match_count > 0 { + if match_count > 0 { if args.quiet() { break; } @@ -179,32 +182,53 @@ fn run_one_thread(args: Arc) -> Result { } } paths_searched += 1; - if dent.is_stdin() { - worker.do_work(&mut printer, WorkReady::Stdin); - } else { - let file = match File::open(dent.path()) { - Ok(file) => file, - Err(err) => { - eprintln!("{}: {}", dent.path().display(), err); - continue; - } + match_count += + if dent.is_stdin() { + worker.run(&mut printer, Work::Stdin) + } else { + worker.run(&mut printer, Work::DirEntry(dent)) }; - worker.do_work(&mut printer, WorkReady::DirFile(dent, file)); - } } if !args.paths().is_empty() && paths_searched == 0 { - eprintln!("No files were searched, which means ripgrep probably \ - applied a filter you didn't expect. \ - Try running again with --debug."); + eprint_nothing_searched(); } - Ok(worker.match_count) + Ok(match_count) } -fn run_files(args: Arc) -> Result { +fn run_files_parallel(args: Arc) -> Result { + let print_args = args.clone(); + let (tx, rx) = mpsc::channel::(); + let print_thread = thread::spawn(move || { + let term = print_args.stdout(); + let mut printer = print_args.printer(term); + let mut file_count = 0; + for dent in rx.iter() { + printer.path(dent.path()); + file_count += 1; + } + file_count + }); + args.walker_parallel().run(move || { + let tx = tx.clone(); + Box::new(move |result| { + if let Some(dent) = get_or_log_dir_entry(result) { + tx.send(dent).unwrap(); + } + ignore::WalkState::Continue + }) + }); + Ok(print_thread.join().unwrap()) +} + +fn run_files_one_thread(args: Arc) -> Result { let term = args.stdout(); let mut printer = args.printer(term); let mut file_count = 0; - for dent in args.walker() { + for result in args.walker() { + let dent = match get_or_log_dir_entry(result) { + None => continue, + Some(dent) => dent, + }; printer.path(dent.path()); file_count += 1; } @@ -222,163 +246,64 @@ fn run_types(args: Arc) -> Result { Ok(ty_count) } -enum Work { - Stdin, - File(DirEntry), - Quit, -} - -enum WorkReady { - Stdin, - DirFile(DirEntry, File), -} - -struct MultiWorker { - chan_work: Stealer, - quiet_matched: QuietMatched, - out: Arc>, - #[cfg(not(windows))] - outbuf: Option>>>, - #[cfg(windows)] - outbuf: Option>, - worker: Worker, -} - -struct Worker { - args: Arc, - inpbuf: InputBuffer, - grep: Grep, - match_count: u64, -} - -impl MultiWorker { - fn run(mut self) -> u64 { - loop { - if self.quiet_matched.has_match() { - break; - } - let work = match self.chan_work.steal() { - Stolen::Empty | Stolen::Abort => continue, - Stolen::Data(Work::Quit) => break, - Stolen::Data(Work::Stdin) => WorkReady::Stdin, - Stolen::Data(Work::File(ent)) => { - match File::open(ent.path()) { - Ok(file) => WorkReady::DirFile(ent, file), - Err(err) => { - eprintln!("{}: {}", ent.path().display(), err); - continue; - } - } - } - }; - let mut outbuf = self.outbuf.take().unwrap(); - outbuf.clear(); - let mut printer = self.worker.args.printer(outbuf); - self.worker.do_work(&mut printer, work); - if self.quiet_matched.set_match(self.worker.match_count > 0) { - break; - } - let outbuf = printer.into_inner(); - if !outbuf.get_ref().is_empty() { - let mut out = self.out.lock().unwrap(); - out.write(&outbuf); - } - self.outbuf = Some(outbuf); +fn get_or_log_dir_entry( + result: result::Result, +) -> Option { + match result { + Err(err) => { + eprintln!("{}", err); + None } - self.worker.match_count - } -} - -impl Worker { - fn do_work( - &mut self, - printer: &mut Printer, - work: WorkReady, - ) { - let result = match work { - WorkReady::Stdin => { - let stdin = io::stdin(); - let stdin = stdin.lock(); - self.search(printer, &Path::new(""), stdin) - } - WorkReady::DirFile(ent, file) => { - let mut path = ent.path(); - if let Some(p) = strip_prefix("./", path) { - path = p; - } - if self.args.mmap() { - self.search_mmap(printer, path, &file) - } else { - self.search(printer, path, file) - } - } - }; - match result { - Ok(count) => { - self.match_count += count; - } - Err(err) => { + Ok(dent) => { + if let Some(err) = dent.error() { eprintln!("{}", err); } + if !dent.file_type().map_or(true, |x| x.is_file()) { + None + } else { + Some(dent) + } } } - - fn search( - &mut self, - printer: &mut Printer, - path: &Path, - rdr: R, - ) -> Result { - self.args.searcher( - &mut self.inpbuf, - printer, - &self.grep, - path, - rdr, - ).run().map_err(From::from) - } - - fn search_mmap( - &mut self, - printer: &mut Printer, - path: &Path, - file: &File, - ) -> Result { - if try!(file.metadata()).len() == 0 { - // Opening a memory map with an empty file results in an error. - // However, this may not actually be an empty file! For example, - // /proc/cpuinfo reports itself as an empty file, but it can - // produce data when it's read from. Therefore, we fall back to - // regular read calls. - return self.search(printer, path, file); - } - let mmap = try!(Mmap::open(file, Protection::Read)); - Ok(self.args.searcher_buffer( - printer, - &self.grep, - path, - unsafe { mmap.as_slice() }, - ).run()) - } } +fn eprint_nothing_searched() { + eprintln!("No files were searched, which means ripgrep probably \ + applied a filter you didn't expect. \ + Try running again with --debug."); +} + +/// A simple thread safe abstraction for determining whether a search should +/// stop if the user has requested quiet mode. #[derive(Clone, Debug)] -struct QuietMatched(Arc>); +pub struct QuietMatched(Arc>); impl QuietMatched { - fn new(quiet: bool) -> QuietMatched { + /// Create a new QuietMatched value. + /// + /// If quiet is true, then set_match and has_match will reflect whether + /// a search should quit or not because it found a match. + /// + /// If quiet is false, then set_match is always a no-op and has_match + /// always returns false. + pub fn new(quiet: bool) -> QuietMatched { let atomic = if quiet { Some(AtomicBool::new(false)) } else { None }; QuietMatched(Arc::new(atomic)) } - fn has_match(&self) -> bool { + /// Returns true if and only if quiet mode is enabled and a match has + /// occurred. + pub fn has_match(&self) -> bool { match *self.0 { None => false, Some(ref matched) => matched.load(Ordering::SeqCst), } } - fn set_match(&self, yes: bool) -> bool { + /// Sets whether a match has occurred or not. + /// + /// If quiet mode is disabled, then this is a no-op. + pub fn set_match(&self, yes: bool) -> bool { match *self.0 { None => false, Some(_) if !yes => false, diff --git a/src/printer.rs b/src/printer.rs index e7373bce..1b8e5965 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -158,6 +158,7 @@ impl Printer { } /// Flushes the underlying writer and returns it. + #[allow(dead_code)] pub fn into_inner(mut self) -> W { let _ = self.wtr.flush(); self.wtr diff --git a/src/worker.rs b/src/worker.rs new file mode 100644 index 00000000..797fe9d7 --- /dev/null +++ b/src/worker.rs @@ -0,0 +1,253 @@ +use std::fs::File; +use std::io; +use std::path::Path; + +use grep::Grep; +use ignore::DirEntry; +use memmap::{Mmap, Protection}; +use term::Terminal; + +use pathutil::strip_prefix; +use printer::Printer; +use search_buffer::BufferSearcher; +use search_stream::{InputBuffer, Searcher}; + +use Result; + +pub enum Work { + Stdin, + DirEntry(DirEntry), +} + +pub struct WorkerBuilder { + grep: Grep, + opts: Options, +} + +#[derive(Clone, Debug)] +struct Options { + mmap: bool, + after_context: usize, + before_context: usize, + count: bool, + files_with_matches: bool, + eol: u8, + invert_match: bool, + line_number: bool, + quiet: bool, + text: bool, +} + +impl Default for Options { + fn default() -> Options { + Options { + mmap: false, + after_context: 0, + before_context: 0, + count: false, + files_with_matches: false, + eol: b'\n', + invert_match: false, + line_number: false, + quiet: false, + text: false, + } + } +} + +impl WorkerBuilder { + /// Create a new builder for a worker. + /// + /// A reusable input buffer and a grep matcher are required, but there + /// are numerous additional options that can be configured on this builder. + pub fn new(grep: Grep) -> WorkerBuilder { + WorkerBuilder { + grep: grep, + opts: Options::default(), + } + } + + /// Create the worker from this builder. + pub fn build(self) -> Worker { + let mut inpbuf = InputBuffer::new(); + inpbuf.eol(self.opts.eol); + Worker { + grep: self.grep, + inpbuf: inpbuf, + opts: self.opts, + } + } + + /// The number of contextual lines to show after each match. The default + /// is zero. + pub fn after_context(mut self, count: usize) -> Self { + self.opts.after_context = count; + self + } + + /// The number of contextual lines to show before each match. The default + /// is zero. + pub fn before_context(mut self, count: usize) -> Self { + self.opts.before_context = count; + self + } + + /// If enabled, searching will print a count instead of each match. + /// + /// Disabled by default. + pub fn count(mut self, yes: bool) -> Self { + self.opts.count = yes; + self + } + + /// If enabled, searching will print the path instead of each match. + /// + /// Disabled by default. + pub fn files_with_matches(mut self, yes: bool) -> Self { + self.opts.files_with_matches = yes; + self + } + + /// Set the end-of-line byte used by this searcher. + pub fn eol(mut self, eol: u8) -> Self { + self.opts.eol = eol; + self + } + + /// If enabled, matching is inverted so that lines that *don't* match the + /// given pattern are treated as matches. + pub fn invert_match(mut self, yes: bool) -> Self { + self.opts.invert_match = yes; + self + } + + /// If enabled, compute line numbers and prefix each line of output with + /// them. + pub fn line_number(mut self, yes: bool) -> Self { + self.opts.line_number = yes; + self + } + + /// If enabled, try to use memory maps for searching if possible. + pub fn mmap(mut self, yes: bool) -> Self { + self.opts.mmap = yes; + self + } + + /// If enabled, don't show any output and quit searching after the first + /// match is found. + pub fn quiet(mut self, yes: bool) -> Self { + self.opts.quiet = yes; + self + } + + /// If enabled, search binary files as if they were text. + pub fn text(mut self, yes: bool) -> Self { + self.opts.text = yes; + self + } +} + +/// Worker is responsible for executing searches on file paths, while choosing +/// streaming search or memory map search as appropriate. +pub struct Worker { + inpbuf: InputBuffer, + grep: Grep, + opts: Options, +} + +impl Worker { + /// Execute the worker with the given printer and work item. + /// + /// A work item can either be stdin or a file path. + pub fn run( + &mut self, + printer: &mut Printer, + work: Work, + ) -> u64 { + let result = match work { + Work::Stdin => { + let stdin = io::stdin(); + let stdin = stdin.lock(); + self.search(printer, &Path::new(""), stdin) + } + Work::DirEntry(dent) => { + let mut path = dent.path(); + let file = match File::open(path) { + Ok(file) => file, + Err(err) => { + eprintln!("{}: {}", path.display(), err); + return 0; + } + }; + if let Some(p) = strip_prefix("./", path) { + path = p; + } + if self.opts.mmap { + self.search_mmap(printer, path, &file) + } else { + self.search(printer, path, file) + } + } + }; + match result { + Ok(count) => { + count + } + Err(err) => { + eprintln!("{}", err); + 0 + } + } + } + + fn search( + &mut self, + printer: &mut Printer, + path: &Path, + rdr: R, + ) -> Result { + let searcher = Searcher::new( + &mut self.inpbuf, printer, &self.grep, path, rdr); + searcher + .after_context(self.opts.after_context) + .before_context(self.opts.before_context) + .count(self.opts.count) + .files_with_matches(self.opts.files_with_matches) + .eol(self.opts.eol) + .line_number(self.opts.line_number) + .invert_match(self.opts.invert_match) + .quiet(self.opts.quiet) + .text(self.opts.text) + .run() + .map_err(From::from) + } + + fn search_mmap( + &mut self, + printer: &mut Printer, + path: &Path, + file: &File, + ) -> Result { + if try!(file.metadata()).len() == 0 { + // Opening a memory map with an empty file results in an error. + // However, this may not actually be an empty file! For example, + // /proc/cpuinfo reports itself as an empty file, but it can + // produce data when it's read from. Therefore, we fall back to + // regular read calls. + return self.search(printer, path, file); + } + let mmap = try!(Mmap::open(file, Protection::Read)); + let searcher = BufferSearcher::new( + printer, &self.grep, path, unsafe { mmap.as_slice() }); + Ok(searcher + .count(self.opts.count) + .files_with_matches(self.opts.files_with_matches) + .eol(self.opts.eol) + .line_number(self.opts.line_number) + .invert_match(self.opts.invert_match) + .quiet(self.opts.quiet) + .text(self.opts.text) + .run()) + } +}