1
0
mirror of https://github.com/BurntSushi/ripgrep.git synced 2024-12-12 19:18:24 +02:00

Merge pull request #223 from BurntSushi/ignore-parallel

Add parallel recursive directory iterator.
This commit is contained in:
Andrew Gallant 2016-11-05 22:19:47 -04:00 committed by GitHub
commit 32db773d51
11 changed files with 1506 additions and 479 deletions

26
Cargo.lock generated
View File

@ -3,7 +3,6 @@ name = "ripgrep"
version = "0.2.6"
dependencies = [
"ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"grep 0.1.3",
@ -29,6 +28,11 @@ dependencies = [
"memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ctrlc"
version = "2.0.1"
@ -39,14 +43,6 @@ dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "deque"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "docopt"
version = "0.6.86"
@ -109,6 +105,7 @@ dependencies = [
name = "ignore"
version = "0.1.3"
dependencies = [
"crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"globset 0.1.1",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -169,14 +166,6 @@ dependencies = [
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "regex"
version = "0.1.80"
@ -293,8 +282,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66"
"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
"checksum ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77f98bb69e3fefadcc5ca80a1368a55251f70295168203e01165bcaecb270891"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
"checksum docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)" = "4a7ef30445607f6fc8720f0a0a2c7442284b629cf0d049286860fae23e71c4d9"
"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
"checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344"
@ -306,7 +295,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20"
"checksum memmap 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065ce59af31c18ea2c419100bda6247dd4ec3099423202b12f0bd32e529fabd2"
"checksum num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8890e6084723d57d0df8d2720b0d60c6ee67d6c93e7169630e4371e88765dcad"
"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5"
"checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f"
"checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957"
"checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b"

View File

@ -25,7 +25,6 @@ path = "tests/tests.rs"
[dependencies]
ctrlc = "2.0"
deque = "0.3"
docopt = "0.6"
env_logger = "0.3"
grep = { version = "0.1.3", path = "grep" }

View File

@ -18,6 +18,7 @@ name = "ignore"
bench = false
[dependencies]
crossbeam = "0.2"
globset = { version = "0.1.1", path = "../globset" }
lazy_static = "0.2"
log = "0.3"

View File

@ -1,28 +1,92 @@
/*
#![allow(dead_code, unused_imports, unused_mut, unused_variables)]
extern crate crossbeam;
extern crate ignore;
extern crate walkdir;
use std::env;
use std::io::{self, Write};
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use ignore::ignore::IgnoreBuilder;
use crossbeam::sync::MsQueue;
use ignore::WalkBuilder;
use walkdir::WalkDir;
fn main() {
let path = env::args().nth(1).unwrap();
let ig = IgnoreBuilder::new().build();
let wd = WalkDir::new(path);
let walker = ignore::walk::Iter::new(ig, wd);
let mut stdout = io::BufWriter::new(io::stdout());
// let mut count = 0;
for dirent in walker {
// count += 1;
stdout.write(dirent.path().as_os_str().as_bytes()).unwrap();
stdout.write(b"\n").unwrap();
let mut path = env::args().nth(1).unwrap();
let mut parallel = false;
let mut simple = false;
let queue: Arc<MsQueue<Option<DirEntry>>> = Arc::new(MsQueue::new());
if path == "parallel" {
path = env::args().nth(2).unwrap();
parallel = true;
} else if path == "walkdir" {
path = env::args().nth(2).unwrap();
simple = true;
}
// println!("{}", count);
let stdout_queue = queue.clone();
let stdout_thread = thread::spawn(move || {
let mut stdout = io::BufWriter::new(io::stdout());
while let Some(dent) = stdout_queue.pop() {
write_path(&mut stdout, dent.path());
}
});
if parallel {
let walker = WalkBuilder::new(path).threads(6).build_parallel();
walker.run(|| {
let queue = queue.clone();
Box::new(move |result| {
use ignore::WalkState::*;
queue.push(Some(DirEntry::Y(result.unwrap())));
Continue
})
});
} else if simple {
let mut stdout = io::BufWriter::new(io::stdout());
let walker = WalkDir::new(path);
for result in walker {
queue.push(Some(DirEntry::X(result.unwrap())));
}
} else {
let mut stdout = io::BufWriter::new(io::stdout());
let walker = WalkBuilder::new(path).build();
for result in walker {
queue.push(Some(DirEntry::Y(result.unwrap())));
}
}
queue.push(None);
stdout_thread.join().unwrap();
}
enum DirEntry {
X(walkdir::DirEntry),
Y(ignore::DirEntry),
}
impl DirEntry {
fn path(&self) -> &Path {
match *self {
DirEntry::X(ref x) => x.path(),
DirEntry::Y(ref y) => y.path(),
}
}
}
#[cfg(unix)]
fn write_path<W: Write>(mut wtr: W, path: &Path) {
use std::os::unix::ffi::OsStrExt;
wtr.write(path.as_os_str().as_bytes()).unwrap();
wtr.write(b"\n").unwrap();
}
#[cfg(not(unix))]
fn write_path<W: Write>(mut wtr: W, path: &Path) {
wtr.write(path.to_string_lossy().as_bytes()).unwrap();
wtr.write(b"\n").unwrap();
}
*/
fn main() {}

View File

@ -137,6 +137,11 @@ impl Ignore {
self.0.parent.is_none()
}
/// Returns true if this matcher was added via the `add_parents` method.
pub fn is_absolute_parent(&self) -> bool {
self.0.is_absolute_parent
}
/// Return this matcher's parent, if one exists.
pub fn parent(&self) -> Option<Ignore> {
self.0.parent.clone()
@ -376,7 +381,7 @@ impl Ignore {
}
/// Returns an iterator over parent ignore matchers, including this one.
fn parents(&self) -> Parents {
pub fn parents(&self) -> Parents {
Parents(Some(self))
}
@ -387,7 +392,10 @@ impl Ignore {
}
}
struct Parents<'a>(Option<&'a Ignore>);
/// An iterator over all parents of an ignore matcher, including itself.
///
/// The lifetime `'a` refers to the lifetime of the initial `Ignore` matcher.
pub struct Parents<'a>(Option<&'a Ignore>);
impl<'a> Iterator for Parents<'a> {
type Item = &'a Ignore;

View File

@ -44,6 +44,7 @@ for result in WalkBuilder::new("./").hidden(false).build() {
See the documentation for `WalkBuilder` for many other options.
*/
extern crate crossbeam;
extern crate globset;
#[macro_use]
extern crate lazy_static;
@ -61,7 +62,7 @@ use std::fmt;
use std::io;
use std::path::{Path, PathBuf};
pub use walk::{DirEntry, Walk, WalkBuilder};
pub use walk::{DirEntry, Walk, WalkBuilder, WalkParallel, WalkState};
mod dir;
pub mod gitignore;
@ -80,6 +81,12 @@ pub enum Error {
WithLineNumber { line: u64, err: Box<Error> },
/// An error associated with a particular file path.
WithPath { path: PathBuf, err: Box<Error> },
/// An error associated with a particular directory depth when recursively
/// walking a directory.
WithDepth { depth: usize, err: Box<Error> },
/// An error that occurs when a file loop is detected when traversing
/// symbolic links.
Loop { ancestor: PathBuf, child: PathBuf },
/// An error that occurs when doing I/O, such as reading an ignore file.
Io(io::Error),
/// An error that occurs when trying to parse a glob.
@ -101,6 +108,7 @@ impl Error {
Error::Partial(_) => true,
Error::WithLineNumber { ref err, .. } => err.is_partial(),
Error::WithPath { ref err, .. } => err.is_partial(),
Error::WithDepth { ref err, .. } => err.is_partial(),
_ => false,
}
}
@ -111,6 +119,8 @@ impl Error {
Error::Partial(ref errs) => errs.len() == 1 && errs[0].is_io(),
Error::WithLineNumber { ref err, .. } => err.is_io(),
Error::WithPath { ref err, .. } => err.is_io(),
Error::WithDepth { ref err, .. } => err.is_io(),
Error::Loop { .. } => false,
Error::Io(_) => true,
Error::Glob(_) => false,
Error::UnrecognizedFileType(_) => false,
@ -118,6 +128,16 @@ impl Error {
}
}
/// Returns a depth associated with recursively walking a directory (if
/// this error was generated from a recursive directory iterator).
pub fn depth(&self) -> Option<usize> {
match *self {
Error::WithPath { ref err, .. } => err.depth(),
Error::WithDepth { depth, .. } => Some(depth),
_ => None,
}
}
/// Turn an error into a tagged error with the given file path.
fn with_path<P: AsRef<Path>>(self, path: P) -> Error {
Error::WithPath {
@ -126,6 +146,14 @@ impl Error {
}
}
/// Turn an error into a tagged error with the given depth.
fn with_depth(self, depth: usize) -> Error {
Error::WithDepth {
depth: depth,
err: Box::new(self),
}
}
/// Turn an error into a tagged error with the given file path and line
/// number. If path is empty, then it is omitted from the error.
fn tagged<P: AsRef<Path>>(self, path: P, lineno: u64) -> Error {
@ -146,6 +174,8 @@ impl error::Error for Error {
Error::Partial(_) => "partial error",
Error::WithLineNumber { ref err, .. } => err.description(),
Error::WithPath { ref err, .. } => err.description(),
Error::WithDepth { ref err, .. } => err.description(),
Error::Loop { .. } => "file system loop found",
Error::Io(ref err) => err.description(),
Error::Glob(ref msg) => msg,
Error::UnrecognizedFileType(_) => "unrecognized file type",
@ -168,6 +198,12 @@ impl fmt::Display for Error {
Error::WithPath { ref path, ref err } => {
write!(f, "{}: {}", path.display(), err)
}
Error::WithDepth { ref err, .. } => err.fmt(f),
Error::Loop { ref ancestor, ref child } => {
write!(f, "File system loop found: \
{} points to an ancestor {}",
child.display(), ancestor.display())
}
Error::Io(ref err) => err.fmt(f),
Error::Glob(ref msg) => write!(f, "{}", msg),
Error::UnrecognizedFileType(ref ty) => {
@ -187,6 +223,30 @@ impl From<io::Error> for Error {
}
}
impl From<walkdir::Error> for Error {
fn from(err: walkdir::Error) -> Error {
let depth = err.depth();
if let (Some(anc), Some(child)) = (err.loop_ancestor(), err.path()) {
return Error::WithDepth {
depth: depth,
err: Box::new(Error::Loop {
ancestor: anc.to_path_buf(),
child: child.to_path_buf(),
}),
};
}
let path = err.path().map(|p| p.to_path_buf());
let mut ig_err = Error::Io(io::Error::from(err));
if let Some(path) = path {
ig_err = Error::WithPath {
path: path,
err: Box::new(ig_err),
};
}
ig_err
}
}
#[derive(Debug, Default)]
struct PartialErrorBuilder(Vec<Error>);

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,3 @@
use std::cmp;
use std::env;
use std::io;
use std::path::{Path, PathBuf};
@ -21,10 +20,9 @@ use ignore::types::{FileTypeDef, Types, TypesBuilder};
use ignore;
use out::{Out, ColoredTerminal};
use printer::Printer;
use search_buffer::BufferSearcher;
use search_stream::{InputBuffer, Searcher};
#[cfg(windows)]
use terminal_win::WindowsBuffer;
use worker::{Worker, WorkerBuilder};
use Result;
@ -364,7 +362,7 @@ impl RawArgs {
};
let threads =
if self.flag_threads == 0 {
cmp::min(8, num_cpus::get())
num_cpus::get()
} else {
self.flag_threads
};
@ -576,18 +574,6 @@ impl Args {
self.grep.clone()
}
/// Creates a new input buffer that is used in searching.
pub fn input_buffer(&self) -> InputBuffer {
let mut inp = InputBuffer::new();
inp.eol(self.eol);
inp
}
/// Whether we should prefer memory maps for searching or not.
pub fn mmap(&self) -> bool {
self.mmap
}
/// Whether ripgrep should be quiet or not.
pub fn quiet(&self) -> bool {
self.quiet
@ -662,18 +648,16 @@ impl Args {
&self.paths
}
/// Create a new line based searcher whose configuration is taken from the
/// command line. This searcher supports a dizzying array of features:
/// inverted matching, line counting, context control and more.
pub fn searcher<'a, R: io::Read, W: Send + Terminal>(
&self,
inp: &'a mut InputBuffer,
printer: &'a mut Printer<W>,
grep: &'a Grep,
path: &'a Path,
rdr: R,
) -> Searcher<'a, R, W> {
Searcher::new(inp, printer, grep, path, rdr)
/// Returns true if there is exactly one file path given to search.
pub fn is_one_path(&self) -> bool {
self.paths.len() == 1
&& (self.paths[0] == Path::new("-") || self.paths[0].is_file())
}
/// Create a worker whose configuration is taken from the
/// command line.
pub fn worker(&self) -> Worker {
WorkerBuilder::new(self.grep())
.after_context(self.after_context)
.before_context(self.before_context)
.count(self.count)
@ -681,28 +665,10 @@ impl Args {
.eol(self.eol)
.line_number(self.line_number)
.invert_match(self.invert_match)
.mmap(self.mmap)
.quiet(self.quiet)
.text(self.text)
}
/// Create a new line based searcher whose configuration is taken from the
/// command line. This search operates on an entire file all once (which
/// may have been memory mapped).
pub fn searcher_buffer<'a, W: Send + Terminal>(
&self,
printer: &'a mut Printer<W>,
grep: &'a Grep,
path: &'a Path,
buf: &'a [u8],
) -> BufferSearcher<'a, W> {
BufferSearcher::new(printer, grep, path, buf)
.count(self.count)
.files_with_matches(self.files_with_matches)
.eol(self.eol)
.line_number(self.line_number)
.invert_match(self.invert_match)
.quiet(self.quiet)
.text(self.text)
.build()
}
/// Returns the number of worker search threads that should be used.
@ -722,7 +688,17 @@ impl Args {
}
/// Create a new recursive directory iterator over the paths in argv.
pub fn walker(&self) -> Walk {
pub fn walker(&self) -> ignore::Walk {
self.walker_builder().build()
}
/// Create a new parallel recursive directory iterator over the paths
/// in argv.
pub fn walker_parallel(&self) -> ignore::WalkParallel {
self.walker_builder().build_parallel()
}
fn walker_builder(&self) -> ignore::WalkBuilder {
let paths = self.paths();
let mut wd = ignore::WalkBuilder::new(&paths[0]);
for path in &paths[1..] {
@ -744,7 +720,8 @@ impl Args {
wd.git_exclude(!self.no_ignore && !self.no_ignore_vcs);
wd.ignore(!self.no_ignore);
wd.parents(!self.no_ignore_parent);
Walk(wd.build())
wd.threads(self.threads());
wd
}
}
@ -761,34 +738,6 @@ fn version() -> String {
}
}
/// A simple wrapper around the ignore::Walk iterator. This will
/// automatically emit error messages to stderr and will skip directories.
pub struct Walk(ignore::Walk);
impl Iterator for Walk {
type Item = ignore::DirEntry;
fn next(&mut self) -> Option<ignore::DirEntry> {
while let Some(result) = self.0.next() {
match result {
Ok(dent) => {
if let Some(err) = dent.error() {
eprintln!("{}", err);
}
if dent.file_type().map_or(false, |x| x.is_dir()) {
continue;
}
return Some(dent);
}
Err(err) => {
eprintln!("{}", err);
}
}
}
None
}
}
/// A single state in the state machine used by `unescape`.
#[derive(Clone, Copy, Eq, PartialEq)]
enum State {

View File

@ -1,5 +1,4 @@
extern crate ctrlc;
extern crate deque;
extern crate docopt;
extern crate env_logger;
extern crate grep;
@ -21,30 +20,20 @@ extern crate term;
extern crate winapi;
use std::error::Error;
use std::fs::File;
use std::io;
use std::io::Write;
use std::path::Path;
use std::process;
use std::result;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread;
use std::cmp;
use deque::{Stealer, Stolen};
use grep::Grep;
use memmap::{Mmap, Protection};
use term::Terminal;
use ignore::DirEntry;
use args::Args;
use out::{ColoredTerminal, Out};
use pathutil::strip_prefix;
use printer::Printer;
use search_stream::InputBuffer;
#[cfg(windows)]
use terminal_win::WindowsBuffer;
use worker::Work;
macro_rules! errored {
($($tt:tt)*) => {
@ -68,11 +57,12 @@ mod search_buffer;
mod search_stream;
#[cfg(windows)]
mod terminal_win;
mod worker;
pub type Result<T> = result::Result<T, Box<Error + Send + Sync>>;
fn main() {
match Args::parse().and_then(run) {
match Args::parse().map(Arc::new).and_then(run) {
Ok(count) if count == 0 => process::exit(1),
Ok(_) => process::exit(0),
Err(err) => {
@ -82,95 +72,108 @@ fn main() {
}
}
fn run(args: Args) -> Result<u64> {
let args = Arc::new(args);
fn run(args: Arc<Args>) -> Result<u64> {
{
let args = args.clone();
ctrlc::set_handler(move || {
let stdout = io::stdout();
let mut stdout = stdout.lock();
let handler_args = args.clone();
ctrlc::set_handler(move || {
let stdout = io::stdout();
let mut stdout = stdout.lock();
let _ = args.stdout().reset();
let _ = stdout.flush();
let _ = handler_args.stdout().reset();
let _ = stdout.flush();
process::exit(1);
});
let paths = args.paths();
process::exit(1);
});
}
let threads = cmp::max(1, args.threads() - 1);
let isone =
paths.len() == 1 && (paths[0] == Path::new("-") || paths[0].is_file());
if args.files() {
return run_files(args.clone());
}
if args.type_list() {
return run_types(args.clone());
}
if threads == 1 || isone {
return run_one_thread(args.clone());
if threads == 1 || args.is_one_path() {
run_files_one_thread(args)
} else {
run_files_parallel(args)
}
} else if args.type_list() {
run_types(args)
} else if threads == 1 || args.is_one_path() {
run_one_thread(args)
} else {
run_parallel(args)
}
}
fn run_parallel(args: Arc<Args>) -> Result<u64> {
let out = Arc::new(Mutex::new(args.out()));
let quiet_matched = QuietMatched::new(args.quiet());
let mut workers = vec![];
let paths_searched = Arc::new(AtomicUsize::new(0));
let match_count = Arc::new(AtomicUsize::new(0));
let workq = {
let (workq, stealer) = deque::new();
for _ in 0..threads {
let worker = MultiWorker {
chan_work: stealer.clone(),
quiet_matched: quiet_matched.clone(),
out: out.clone(),
outbuf: Some(args.outbuf()),
worker: Worker {
args: args.clone(),
inpbuf: args.input_buffer(),
grep: args.grep(),
match_count: 0,
},
args.walker_parallel().run(|| {
let args = args.clone();
let quiet_matched = quiet_matched.clone();
let paths_searched = paths_searched.clone();
let match_count = match_count.clone();
let out = out.clone();
let mut outbuf = args.outbuf();
let mut worker = args.worker();
Box::new(move |result| {
use ignore::WalkState::*;
if quiet_matched.has_match() {
return Quit;
}
let dent = match get_or_log_dir_entry(result) {
None => return Continue,
Some(dent) => dent,
};
workers.push(thread::spawn(move || worker.run()));
}
workq
};
let mut paths_searched: u64 = 0;
for dent in args.walker() {
if quiet_matched.has_match() {
break;
}
paths_searched += 1;
if dent.is_stdin() {
workq.push(Work::Stdin);
} else {
workq.push(Work::File(dent));
}
paths_searched.fetch_add(1, Ordering::SeqCst);
outbuf.clear();
{
// This block actually executes the search and prints the
// results into outbuf.
let mut printer = args.printer(&mut outbuf);
let count =
if dent.is_stdin() {
worker.run(&mut printer, Work::Stdin)
} else {
worker.run(&mut printer, Work::DirEntry(dent))
};
match_count.fetch_add(count as usize, Ordering::SeqCst);
if quiet_matched.set_match(count > 0) {
return Quit;
}
}
if !outbuf.get_ref().is_empty() {
// This should be the only mutex in all of ripgrep. Since the
// common case is to report a small number of matches relative
// to the corpus, this really shouldn't matter much.
//
// Still, it'd be nice to send this on a channel, but then we'd
// need to manage a pool of outbufs, which would complicate the
// code.
let mut out = out.lock().unwrap();
out.write(&outbuf);
}
Continue
})
});
if !args.paths().is_empty() && paths_searched.load(Ordering::SeqCst) == 0 {
eprint_nothing_searched();
}
if !paths.is_empty() && paths_searched == 0 {
eprintln!("No files were searched, which means ripgrep probably \
applied a filter you didn't expect. \
Try running again with --debug.");
}
for _ in 0..workers.len() {
workq.push(Work::Quit);
}
let mut match_count = 0;
for worker in workers {
match_count += worker.join().unwrap();
}
Ok(match_count)
Ok(match_count.load(Ordering::SeqCst) as u64)
}
fn run_one_thread(args: Arc<Args>) -> Result<u64> {
let mut worker = Worker {
args: args.clone(),
inpbuf: args.input_buffer(),
grep: args.grep(),
match_count: 0,
};
let mut worker = args.worker();
let mut term = args.stdout();
let mut paths_searched: u64 = 0;
for dent in args.walker() {
let mut match_count = 0;
for result in args.walker() {
let dent = match get_or_log_dir_entry(result) {
None => continue,
Some(dent) => dent,
};
let mut printer = args.printer(&mut term);
if worker.match_count > 0 {
if match_count > 0 {
if args.quiet() {
break;
}
@ -179,32 +182,53 @@ fn run_one_thread(args: Arc<Args>) -> Result<u64> {
}
}
paths_searched += 1;
if dent.is_stdin() {
worker.do_work(&mut printer, WorkReady::Stdin);
} else {
let file = match File::open(dent.path()) {
Ok(file) => file,
Err(err) => {
eprintln!("{}: {}", dent.path().display(), err);
continue;
}
match_count +=
if dent.is_stdin() {
worker.run(&mut printer, Work::Stdin)
} else {
worker.run(&mut printer, Work::DirEntry(dent))
};
worker.do_work(&mut printer, WorkReady::DirFile(dent, file));
}
}
if !args.paths().is_empty() && paths_searched == 0 {
eprintln!("No files were searched, which means ripgrep probably \
applied a filter you didn't expect. \
Try running again with --debug.");
eprint_nothing_searched();
}
Ok(worker.match_count)
Ok(match_count)
}
fn run_files(args: Arc<Args>) -> Result<u64> {
fn run_files_parallel(args: Arc<Args>) -> Result<u64> {
let print_args = args.clone();
let (tx, rx) = mpsc::channel::<ignore::DirEntry>();
let print_thread = thread::spawn(move || {
let term = print_args.stdout();
let mut printer = print_args.printer(term);
let mut file_count = 0;
for dent in rx.iter() {
printer.path(dent.path());
file_count += 1;
}
file_count
});
args.walker_parallel().run(move || {
let tx = tx.clone();
Box::new(move |result| {
if let Some(dent) = get_or_log_dir_entry(result) {
tx.send(dent).unwrap();
}
ignore::WalkState::Continue
})
});
Ok(print_thread.join().unwrap())
}
fn run_files_one_thread(args: Arc<Args>) -> Result<u64> {
let term = args.stdout();
let mut printer = args.printer(term);
let mut file_count = 0;
for dent in args.walker() {
for result in args.walker() {
let dent = match get_or_log_dir_entry(result) {
None => continue,
Some(dent) => dent,
};
printer.path(dent.path());
file_count += 1;
}
@ -222,163 +246,64 @@ fn run_types(args: Arc<Args>) -> Result<u64> {
Ok(ty_count)
}
enum Work {
Stdin,
File(DirEntry),
Quit,
}
enum WorkReady {
Stdin,
DirFile(DirEntry, File),
}
struct MultiWorker {
chan_work: Stealer<Work>,
quiet_matched: QuietMatched,
out: Arc<Mutex<Out>>,
#[cfg(not(windows))]
outbuf: Option<ColoredTerminal<term::TerminfoTerminal<Vec<u8>>>>,
#[cfg(windows)]
outbuf: Option<ColoredTerminal<WindowsBuffer>>,
worker: Worker,
}
struct Worker {
args: Arc<Args>,
inpbuf: InputBuffer,
grep: Grep,
match_count: u64,
}
impl MultiWorker {
fn run(mut self) -> u64 {
loop {
if self.quiet_matched.has_match() {
break;
}
let work = match self.chan_work.steal() {
Stolen::Empty | Stolen::Abort => continue,
Stolen::Data(Work::Quit) => break,
Stolen::Data(Work::Stdin) => WorkReady::Stdin,
Stolen::Data(Work::File(ent)) => {
match File::open(ent.path()) {
Ok(file) => WorkReady::DirFile(ent, file),
Err(err) => {
eprintln!("{}: {}", ent.path().display(), err);
continue;
}
}
}
};
let mut outbuf = self.outbuf.take().unwrap();
outbuf.clear();
let mut printer = self.worker.args.printer(outbuf);
self.worker.do_work(&mut printer, work);
if self.quiet_matched.set_match(self.worker.match_count > 0) {
break;
}
let outbuf = printer.into_inner();
if !outbuf.get_ref().is_empty() {
let mut out = self.out.lock().unwrap();
out.write(&outbuf);
}
self.outbuf = Some(outbuf);
fn get_or_log_dir_entry(
result: result::Result<ignore::DirEntry, ignore::Error>,
) -> Option<ignore::DirEntry> {
match result {
Err(err) => {
eprintln!("{}", err);
None
}
self.worker.match_count
}
}
impl Worker {
fn do_work<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
work: WorkReady,
) {
let result = match work {
WorkReady::Stdin => {
let stdin = io::stdin();
let stdin = stdin.lock();
self.search(printer, &Path::new("<stdin>"), stdin)
}
WorkReady::DirFile(ent, file) => {
let mut path = ent.path();
if let Some(p) = strip_prefix("./", path) {
path = p;
}
if self.args.mmap() {
self.search_mmap(printer, path, &file)
} else {
self.search(printer, path, file)
}
}
};
match result {
Ok(count) => {
self.match_count += count;
}
Err(err) => {
Ok(dent) => {
if let Some(err) = dent.error() {
eprintln!("{}", err);
}
if !dent.file_type().map_or(true, |x| x.is_file()) {
None
} else {
Some(dent)
}
}
}
fn search<R: io::Read, W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
rdr: R,
) -> Result<u64> {
self.args.searcher(
&mut self.inpbuf,
printer,
&self.grep,
path,
rdr,
).run().map_err(From::from)
}
fn search_mmap<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
file: &File,
) -> Result<u64> {
if try!(file.metadata()).len() == 0 {
// Opening a memory map with an empty file results in an error.
// However, this may not actually be an empty file! For example,
// /proc/cpuinfo reports itself as an empty file, but it can
// produce data when it's read from. Therefore, we fall back to
// regular read calls.
return self.search(printer, path, file);
}
let mmap = try!(Mmap::open(file, Protection::Read));
Ok(self.args.searcher_buffer(
printer,
&self.grep,
path,
unsafe { mmap.as_slice() },
).run())
}
}
fn eprint_nothing_searched() {
eprintln!("No files were searched, which means ripgrep probably \
applied a filter you didn't expect. \
Try running again with --debug.");
}
/// A simple thread safe abstraction for determining whether a search should
/// stop if the user has requested quiet mode.
#[derive(Clone, Debug)]
struct QuietMatched(Arc<Option<AtomicBool>>);
pub struct QuietMatched(Arc<Option<AtomicBool>>);
impl QuietMatched {
fn new(quiet: bool) -> QuietMatched {
/// Create a new QuietMatched value.
///
/// If quiet is true, then set_match and has_match will reflect whether
/// a search should quit or not because it found a match.
///
/// If quiet is false, then set_match is always a no-op and has_match
/// always returns false.
pub fn new(quiet: bool) -> QuietMatched {
let atomic = if quiet { Some(AtomicBool::new(false)) } else { None };
QuietMatched(Arc::new(atomic))
}
fn has_match(&self) -> bool {
/// Returns true if and only if quiet mode is enabled and a match has
/// occurred.
pub fn has_match(&self) -> bool {
match *self.0 {
None => false,
Some(ref matched) => matched.load(Ordering::SeqCst),
}
}
fn set_match(&self, yes: bool) -> bool {
/// Sets whether a match has occurred or not.
///
/// If quiet mode is disabled, then this is a no-op.
pub fn set_match(&self, yes: bool) -> bool {
match *self.0 {
None => false,
Some(_) if !yes => false,

View File

@ -158,6 +158,7 @@ impl<W: Terminal + Send> Printer<W> {
}
/// Flushes the underlying writer and returns it.
#[allow(dead_code)]
pub fn into_inner(mut self) -> W {
let _ = self.wtr.flush();
self.wtr

253
src/worker.rs Normal file
View File

@ -0,0 +1,253 @@
use std::fs::File;
use std::io;
use std::path::Path;
use grep::Grep;
use ignore::DirEntry;
use memmap::{Mmap, Protection};
use term::Terminal;
use pathutil::strip_prefix;
use printer::Printer;
use search_buffer::BufferSearcher;
use search_stream::{InputBuffer, Searcher};
use Result;
pub enum Work {
Stdin,
DirEntry(DirEntry),
}
pub struct WorkerBuilder {
grep: Grep,
opts: Options,
}
#[derive(Clone, Debug)]
struct Options {
mmap: bool,
after_context: usize,
before_context: usize,
count: bool,
files_with_matches: bool,
eol: u8,
invert_match: bool,
line_number: bool,
quiet: bool,
text: bool,
}
impl Default for Options {
fn default() -> Options {
Options {
mmap: false,
after_context: 0,
before_context: 0,
count: false,
files_with_matches: false,
eol: b'\n',
invert_match: false,
line_number: false,
quiet: false,
text: false,
}
}
}
impl WorkerBuilder {
/// Create a new builder for a worker.
///
/// A reusable input buffer and a grep matcher are required, but there
/// are numerous additional options that can be configured on this builder.
pub fn new(grep: Grep) -> WorkerBuilder {
WorkerBuilder {
grep: grep,
opts: Options::default(),
}
}
/// Create the worker from this builder.
pub fn build(self) -> Worker {
let mut inpbuf = InputBuffer::new();
inpbuf.eol(self.opts.eol);
Worker {
grep: self.grep,
inpbuf: inpbuf,
opts: self.opts,
}
}
/// The number of contextual lines to show after each match. The default
/// is zero.
pub fn after_context(mut self, count: usize) -> Self {
self.opts.after_context = count;
self
}
/// The number of contextual lines to show before each match. The default
/// is zero.
pub fn before_context(mut self, count: usize) -> Self {
self.opts.before_context = count;
self
}
/// If enabled, searching will print a count instead of each match.
///
/// Disabled by default.
pub fn count(mut self, yes: bool) -> Self {
self.opts.count = yes;
self
}
/// If enabled, searching will print the path instead of each match.
///
/// Disabled by default.
pub fn files_with_matches(mut self, yes: bool) -> Self {
self.opts.files_with_matches = yes;
self
}
/// Set the end-of-line byte used by this searcher.
pub fn eol(mut self, eol: u8) -> Self {
self.opts.eol = eol;
self
}
/// If enabled, matching is inverted so that lines that *don't* match the
/// given pattern are treated as matches.
pub fn invert_match(mut self, yes: bool) -> Self {
self.opts.invert_match = yes;
self
}
/// If enabled, compute line numbers and prefix each line of output with
/// them.
pub fn line_number(mut self, yes: bool) -> Self {
self.opts.line_number = yes;
self
}
/// If enabled, try to use memory maps for searching if possible.
pub fn mmap(mut self, yes: bool) -> Self {
self.opts.mmap = yes;
self
}
/// If enabled, don't show any output and quit searching after the first
/// match is found.
pub fn quiet(mut self, yes: bool) -> Self {
self.opts.quiet = yes;
self
}
/// If enabled, search binary files as if they were text.
pub fn text(mut self, yes: bool) -> Self {
self.opts.text = yes;
self
}
}
/// Worker is responsible for executing searches on file paths, while choosing
/// streaming search or memory map search as appropriate.
pub struct Worker {
inpbuf: InputBuffer,
grep: Grep,
opts: Options,
}
impl Worker {
/// Execute the worker with the given printer and work item.
///
/// A work item can either be stdin or a file path.
pub fn run<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
work: Work,
) -> u64 {
let result = match work {
Work::Stdin => {
let stdin = io::stdin();
let stdin = stdin.lock();
self.search(printer, &Path::new("<stdin>"), stdin)
}
Work::DirEntry(dent) => {
let mut path = dent.path();
let file = match File::open(path) {
Ok(file) => file,
Err(err) => {
eprintln!("{}: {}", path.display(), err);
return 0;
}
};
if let Some(p) = strip_prefix("./", path) {
path = p;
}
if self.opts.mmap {
self.search_mmap(printer, path, &file)
} else {
self.search(printer, path, file)
}
}
};
match result {
Ok(count) => {
count
}
Err(err) => {
eprintln!("{}", err);
0
}
}
}
fn search<R: io::Read, W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
rdr: R,
) -> Result<u64> {
let searcher = Searcher::new(
&mut self.inpbuf, printer, &self.grep, path, rdr);
searcher
.after_context(self.opts.after_context)
.before_context(self.opts.before_context)
.count(self.opts.count)
.files_with_matches(self.opts.files_with_matches)
.eol(self.opts.eol)
.line_number(self.opts.line_number)
.invert_match(self.opts.invert_match)
.quiet(self.opts.quiet)
.text(self.opts.text)
.run()
.map_err(From::from)
}
fn search_mmap<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
file: &File,
) -> Result<u64> {
if try!(file.metadata()).len() == 0 {
// Opening a memory map with an empty file results in an error.
// However, this may not actually be an empty file! For example,
// /proc/cpuinfo reports itself as an empty file, but it can
// produce data when it's read from. Therefore, we fall back to
// regular read calls.
return self.search(printer, path, file);
}
let mmap = try!(Mmap::open(file, Protection::Read));
let searcher = BufferSearcher::new(
printer, &self.grep, path, unsafe { mmap.as_slice() });
Ok(searcher
.count(self.opts.count)
.files_with_matches(self.opts.files_with_matches)
.eol(self.opts.eol)
.line_number(self.opts.line_number)
.invert_match(self.opts.invert_match)
.quiet(self.opts.quiet)
.text(self.opts.text)
.run())
}
}