diff --git a/Cargo.toml b/Cargo.toml index 9a19a660..e93f412b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,11 @@ log = "0.3" memchr = "0.1" memmap = "0.2" num_cpus = "1" +parking_lot = "0.3" regex = { version = "0.1", path = "/home/andrew/rust/regex" } regex-syntax = { version = "0.3.1", path = "/home/andrew/rust/regex/regex-syntax" } rustc-serialize = "0.3" +thread_local = "0.2" walkdir = "0.1" [features] @@ -41,3 +43,4 @@ lazy_static = "0.2" [profile.release] debug = true +panic = "abort" diff --git a/src/main.rs b/src/main.rs index 31da883c..067b0847 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,27 +9,32 @@ extern crate log; extern crate memchr; extern crate memmap; extern crate num_cpus; +extern crate parking_lot; extern crate regex; extern crate regex_syntax as syntax; extern crate rustc_serialize; +extern crate thread_local; extern crate walkdir; +use std::cmp; use std::error::Error; +use std::fs::File; use std::io::{self, Write}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process; use std::result; use std::sync::Arc; use std::thread; -use crossbeam::sync::{MsQueue, TreiberStack}; +use crossbeam::sync::SegQueue; use docopt::Docopt; use grep::{Grep, GrepBuilder}; +use parking_lot::Mutex; use walkdir::WalkDir; use ignore::Ignore; use printer::Printer; -use search::Searcher; +use search::{InputBuffer, Searcher}; macro_rules! errored { ($($tt:tt)*) => { @@ -53,6 +58,7 @@ mod walk; const USAGE: &'static str = " Usage: xrep [options] [ ...] + xrep --files [ ...] xrep is like the silver searcher and grep, but faster than both. @@ -66,7 +72,7 @@ Options: -L, --follow Follow symlinks. --hidden Search hidden directories and files. -i, --ignore-case Case insensitive search. - --threads ARG The number of threads to use. Defaults to the number + -t, --threads ARG The number of threads to use. Defaults to the number of logical CPUs. [default: 0] "; @@ -83,18 +89,12 @@ struct Args { flag_threads: usize, } -impl Args { - fn printer(&self, wtr: W) -> Printer { - Printer::new(wtr) - } -} - pub type Result = result::Result>; fn main() { let args: Args = Docopt::new(USAGE).and_then(|d| d.decode()) .unwrap_or_else(|e| e.exit()); - match real_main(args) { + match run(args) { Ok(_) => process::exit(0), Err(err) => { let _ = writeln!(&mut io::stderr(), "{}", err); @@ -103,7 +103,7 @@ fn main() { } } -fn real_main(args: Args) -> Result<()> { +fn run(mut args: Args) -> Result<()> { let mut logb = env_logger::LogBuilder::new(); if args.flag_debug { logb.filter(None, log::LogLevelFilter::Debug); @@ -111,185 +111,137 @@ fn real_main(args: Args) -> Result<()> { logb.filter(None, log::LogLevelFilter::Warn); } if let Err(err) = logb.init() { - return errored!("failed to initialize logger: {}", err); + errored!("failed to initialize logger: {}", err); } - let mut main = Main::new(args); - try!(main.run_workers()); - let writer = main.run_writer(); - main.scan(); - main.finish_workers(); - main.chan_results.push(Message::Quit); - writer.join().unwrap(); + if args.arg_path.is_empty() { + args.arg_path.push("./".to_string()); + } + if args.arg_path.iter().any(|p| p == "-") { + errored!("searching isn't yet supported"); + } + if args.flag_files { + return run_files(args); + } + let args = Arc::new(args); + let mut workers = vec![]; + let stdout = Arc::new(Mutex::new(io::BufWriter::new(io::stdout()))); + + let chan_work_send = { + let chan_work = Arc::new(SegQueue::new()); + for _ in 0..args.num_workers() { + let grepb = + GrepBuilder::new(&args.arg_pattern) + .case_insensitive(args.flag_ignore_case); + let worker = Worker { + args: args.clone(), + stdout: stdout.clone(), + chan_work: chan_work.clone(), + inpbuf: InputBuffer::new(), + outbuf: Some(vec![]), + grep: try!(grepb.build()), + }; + workers.push(thread::spawn(move || worker.run())); + } + chan_work + }; + + for p in &args.arg_path { + for path in args.walker(p) { + chan_work_send.push(Message::Some(path)); + } + } + for _ in 0..workers.len() { + chan_work_send.push(Message::Quit); + } + for worker in workers { + worker.join().unwrap(); + } Ok(()) } -type ChanWork = Arc>>; +fn run_files(args: Args) -> Result<()> { + let mut printer = Printer::new(io::BufWriter::new(io::stdout())); + for p in &args.arg_path { + for path in args.walker(p) { + printer.path(path); + } + } + Ok(()) +} -type ChanResults = Arc>>>; +impl Args { + fn printer(&self, wtr: W) -> Printer { + Printer::new(wtr) + } + + fn num_workers(&self) -> usize { + let mut num = self.flag_threads; + if num == 0 { + num = cmp::min(8, num_cpus::get()); + } + num + } + + fn walker>(&self, path: P) -> walk::Iter { + let wd = WalkDir::new(path).follow_links(self.flag_follow); + let mut ig = Ignore::new(); + ig.ignore_hidden(!self.flag_hidden); + walk::Iter::new(ig, wd) + } +} enum Message { Some(T), Quit, } -struct Main { - args: Arc, - chan_work: ChanWork, - chan_results: ChanResults, - bufs: Arc, - workers: Vec>, -} - -impl Main { - fn new(mut args: Args) -> Main { - if args.arg_path.is_empty() { - args.arg_path.push("./".to_string()); - } - Main { - args: Arc::new(args), - chan_work: Arc::new(MsQueue::new()), - chan_results: Arc::new(MsQueue::new()), - bufs: Arc::new(Bufs::new()), - workers: vec![], - } - } - - fn scan(&mut self) { - for p in &self.args.arg_path { - if p == "-" { - eprintln!("searching isn't yet supported"); - continue; - } - let wd = WalkDir::new(p).follow_links(self.args.flag_follow); - let mut ig = Ignore::new(); - ig.ignore_hidden(!self.args.flag_hidden); - - for ent in walk::Iter::new(ig, wd) { - let mut path = ent.path(); - if let Ok(p) = path.strip_prefix("./") { - path = p; - } - self.chan_work.push(Message::Some(Work { - path: path.to_path_buf(), - out: self.bufs.pop(), - })); - } - } - } - - fn run_writer(&self) -> thread::JoinHandle<()> { - let wtr = Writer { - args: self.args.clone(), - chan_results: self.chan_results.clone(), - bufs: self.bufs.clone(), - }; - thread::spawn(move || wtr.run()) - } - - fn run_workers(&mut self) -> Result<()> { - let mut num = self.args.flag_threads; - if num == 0 { - num = num_cpus::get(); - } - if num < 4 { - num = 1; - } else { - num -= 2; - } - println!("running {} workers", num); - for _ in 0..num { - try!(self.run_worker()); - } - Ok(()) - } - - fn run_worker(&mut self) -> Result<()> { - let grepb = - GrepBuilder::new(&self.args.arg_pattern) - .case_insensitive(self.args.flag_ignore_case); - let worker = Worker { - args: self.args.clone(), - chan_work: self.chan_work.clone(), - chan_results: self.chan_results.clone(), - grep: try!(grepb.build()), - }; - self.workers.push(thread::spawn(move || worker.run())); - Ok(()) - } - - fn finish_workers(&mut self) { - // We can stop all of the works by sending a quit message. - // Each worker is guaranteed to receive the quit message exactly - // once, so we only need to send `self.workers.len()` of them - for _ in 0..self.workers.len() { - self.chan_work.push(Message::Quit); - } - // Now wait for each to finish. - while let Some(thread) = self.workers.pop() { - thread.join().unwrap(); - } - } -} - -struct Writer { - args: Arc, - chan_results: ChanResults, - bufs: Arc, -} - -impl Writer { - fn run(self) { - let mut stdout = io::BufWriter::new(io::stdout()); - while let Message::Some(res) = self.chan_results.pop() { - let _ = stdout.write_all(&res); - self.bufs.push(res); - } - } -} - -struct Work { - path: PathBuf, - out: Vec, -} - struct Worker { args: Arc, - chan_work: ChanWork, - chan_results: ChanResults, + stdout: Arc>>, + chan_work: Arc>>, + inpbuf: InputBuffer, + outbuf: Option>, grep: Grep, } impl Worker { - fn run(self) { - while let Message::Some(mut work) = self.chan_work.pop() { - work.out.clear(); - let printer = self.args.printer(work.out); - let searcher = Searcher::new(&self.grep, work.path).unwrap(); - let buf = searcher.search(printer); - self.chan_results.push(Message::Some(buf)); + fn run(mut self) { + loop { + let path = match self.chan_work.try_pop() { + None => continue, + Some(Message::Quit) => break, + Some(Message::Some(path)) => path, + }; + let file = match File::open(&path) { + Ok(file) => file, + Err(err) => { + eprintln!("{}: {}", path.display(), err); + continue; + } + }; + let mut outbuf = self.outbuf.take().unwrap(); + outbuf.clear(); + let mut printer = self.args.printer(outbuf); + { + let searcher = Searcher { + grep: &self.grep, + path: &path, + haystack: file, + inp: &mut self.inpbuf, + printer: &mut printer, + }; + if let Err(err) = searcher.run() { + eprintln!("{}", err); + } + } + let outbuf = printer.into_inner(); + if !outbuf.is_empty() { + let mut stdout = self.stdout.lock(); + let _ = stdout.write_all(&outbuf); + let _ = stdout.flush(); + } + self.outbuf = Some(outbuf); } } } - -/// A pool of buffers used by each worker thread to write matches. -struct Bufs { - bufs: TreiberStack>, -} - -impl Bufs { - pub fn new() -> Bufs { - Bufs { bufs: TreiberStack::new() } - } - - pub fn pop(&self) -> Vec { - match self.bufs.pop() { - None => vec![], - Some(buf) => buf, - } - } - - pub fn push(&self, buf: Vec) { - self.bufs.push(buf); - } -} diff --git a/src/printer.rs b/src/printer.rs index 229fe151..e0885047 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -28,6 +28,10 @@ impl Printer { wln!(&mut self.wtr, "{}", path.as_ref().display()); } + pub fn path_count>(&mut self, path: P, count: u64) { + wln!(&mut self.wtr, "{}:{}", path.as_ref().display(), count); + } + pub fn count(&mut self, count: u64) { wln!(&mut self.wtr, "{}", count); } diff --git a/src/search.rs b/src/search.rs index f0e297ab..7d1fc523 100644 --- a/src/search.rs +++ b/src/search.rs @@ -6,16 +6,16 @@ matches. use std::cmp; use std::error::Error as StdError; use std::fmt; -use std::fs::File; use std::io; use std::path::{Path, PathBuf}; -use grep::Grep; -use memchr::memchr; -use memmap::{Mmap, Protection}; +use grep::{Grep, Match}; +use memchr::{memchr, memrchr}; use printer::Printer; +const READ_SIZE: usize = 8 * (1<<10); + /// Error describes errors that can occur while searching. #[derive(Debug)] pub enum Error { @@ -56,89 +56,121 @@ impl fmt::Display for Error { } } -/// Searcher searches a memory mapped buffer. -/// -/// The `'g` lifetime refers to the lifetime of the underlying matcher. -pub struct Searcher<'g> { - grep: &'g Grep, - path: PathBuf, - mmap: Option, +pub struct Searcher<'a, R, W: 'a> { + pub grep: &'a Grep, + pub path: &'a Path, + pub haystack: R, + pub inp: &'a mut InputBuffer, + pub printer: &'a mut Printer, } -impl<'g> Searcher<'g> { - /// Create a new memory map based searcher using the given matcher for the - /// file path given. - pub fn new>( - grep: &'g Grep, - path: P, - ) -> Result, Error> { - let file = try!(File::open(&path).map_err(|err| { - Error::from_io(err, &path) - })); - let md = try!(file.metadata().map_err(|err| { - Error::from_io(err, &path) - })); - let mmap = - if md.len() == 0 { - None - } else { - Some(try!(Mmap::open(&file, Protection::Read).map_err(|err| { - Error::from_io(err, &path) - }))) - }; - Ok(Searcher { - grep: grep, - path: path.as_ref().to_path_buf(), - mmap: mmap, - }) - } - - /// Execute the search, writing the results to the printer given and - /// returning the underlying buffer. - pub fn search(&self, printer: Printer) -> W { - Search { - grep: &self.grep, - path: &*self.path, - buf: self.buf(), - printer: printer, - }.run() - } - - /// Execute the search, returning a count of the number of hits. - pub fn count(&self) -> u64 { - self.grep.iter(self.buf()).count() as u64 - } - - fn buf(&self) -> &[u8] { - self.mmap.as_ref().map(|m| unsafe { m.as_slice() }).unwrap_or(&[]) - } -} - -struct Search<'a, W> { - grep: &'a Grep, - path: &'a Path, - buf: &'a [u8], - printer: Printer, -} - -impl<'a, W: io::Write> Search<'a, W> { - fn run(mut self) -> W { - let is_binary = self.is_binary(); - let mut it = self.grep.iter(self.buf).peekable(); - if is_binary && it.peek().is_some() { - self.printer.binary_matched(self.path); - return self.printer.into_inner(); +impl<'a, R: io::Read, W: io::Write> Searcher<'a, R, W> { + #[inline(never)] + pub fn run(mut self) -> Result<(), Error> { + self.inp.reset(); + let mut mat = Match::default(); + loop { + let ok = try!(self.inp.fill(&mut self.haystack).map_err(|err| { + Error::from_io(err, &self.path) + })); + if !ok { + return Ok(()); + } + loop { + let ok = self.grep.read_match( + &mut mat, + &mut self.inp.buf[..self.inp.lastnl], + self.inp.pos); + if !ok { + break; + } + self.inp.pos = mat.end() + 1; + self.printer.matched(self.path, &self.inp.buf, &mat); + } } - for m in it { - self.printer.matched(self.path, self.buf, &m); - } - self.printer.into_inner() - } - - fn is_binary(&self) -> bool { - if self.buf.len() >= 4 && &self.buf[0..4] == b"%PDF" { - return true; - } - memchr(b'\x00', &self.buf[0..cmp::min(1024, self.buf.len())]).is_some() } } + +pub struct InputBuffer { + buf: Vec, + tmp: Vec, + pos: usize, + lastnl: usize, + end: usize, + first: bool, + is_binary: bool, +} + +impl InputBuffer { + pub fn new() -> InputBuffer { + InputBuffer { + buf: vec![0; READ_SIZE], + tmp: vec![], + pos: 0, + lastnl: 0, + end: 0, + first: true, + is_binary: false, + } + } + + fn reset(&mut self) { + self.pos = 0; + self.lastnl = 0; + self.end = 0; + self.first = true; + self.is_binary = false; + } + + fn fill(&mut self, rdr: &mut R) -> Result { + if self.lastnl < self.end { + self.tmp.clear(); + self.tmp.extend_from_slice(&self.buf[self.lastnl..self.end]); + self.buf[0..self.tmp.len()].copy_from_slice(&self.tmp); + self.end = self.tmp.len(); + } else { + self.end = 0; + } + self.pos = 0; + self.lastnl = 0; + while self.lastnl == 0 { + if self.buf.len() - self.end < READ_SIZE { + let min_len = READ_SIZE + self.buf.len() - self.end; + let new_len = cmp::max(min_len, self.buf.len() * 2); + self.buf.resize(new_len, 0); + } + let n = try!(rdr.read( + &mut self.buf[self.end..self.end + READ_SIZE])); + if self.first { + if is_binary(&self.buf[self.end..self.end + n]) { + return Ok(false); + } + } + self.first = false; + if n == 0 { + if self.end == 0 { + return Ok(false); + } + self.lastnl = self.end; + break; + } + // We know there is no nl between self.start..self.end since: + // 1) If this is the first iteration, then any bytes preceding + // self.end do not contain nl by construction. + // 2) Subsequent iterations only occur if no nl could be found. + self.lastnl = + memrchr(b'\n', &self.buf[self.end..self.end + n]) + .map(|i| self.end + i) + .unwrap_or(0); + self.end += n; + } + Ok(true) + } +} + +fn is_binary(buf: &[u8]) -> bool { + if buf.len() >= 4 && &buf[0..4] == b"%PDF" { + return true; + } + memchr(b'\x00', &buf[0..cmp::min(1024, buf.len())]).is_some() +} diff --git a/src/walk.rs b/src/walk.rs index e60e2605..524e6f0b 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -4,6 +4,8 @@ crate that can efficiently skip and ignore files and directories specified in a user's ignore patterns. */ +use std::path::PathBuf; + use walkdir::{self, DirEntry, WalkDir, WalkDirIterator}; use ignore::Ignore; @@ -39,9 +41,9 @@ impl Iter { } impl Iterator for Iter { - type Item = DirEntry; + type Item = PathBuf; - fn next(&mut self) -> Option { + fn next(&mut self) -> Option { while let Some(ev) = self.it.next() { match ev { Err(err) => { @@ -74,7 +76,11 @@ impl Iterator for Iter { if !ent.file_type().is_file() { continue; } - return Some(ent); + let mut path = ent.path(); + if let Ok(p) = path.strip_prefix("./") { + path = p; + } + return Some(path.to_path_buf()); } } }