1
0
mirror of https://github.com/BurntSushi/ripgrep.git synced 2025-06-30 22:23:44 +02:00

Lots of improvements. Most notably, removal of memory maps for searching.

Memory maps appear to degrade quite a bit in the presence of multithreading.

Also, switch to lock free data structures for synchronization. Give each
worker an input and output buffer which require no synchronization.
This commit is contained in:
Andrew Gallant
2016-08-28 20:18:34 -04:00
parent 1c8379f55a
commit c809679cf2
5 changed files with 258 additions and 261 deletions

View File

@ -9,27 +9,32 @@ extern crate log;
extern crate memchr;
extern crate memmap;
extern crate num_cpus;
extern crate parking_lot;
extern crate regex;
extern crate regex_syntax as syntax;
extern crate rustc_serialize;
extern crate thread_local;
extern crate walkdir;
use std::cmp;
use std::error::Error;
use std::fs::File;
use std::io::{self, Write};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process;
use std::result;
use std::sync::Arc;
use std::thread;
use crossbeam::sync::{MsQueue, TreiberStack};
use crossbeam::sync::SegQueue;
use docopt::Docopt;
use grep::{Grep, GrepBuilder};
use parking_lot::Mutex;
use walkdir::WalkDir;
use ignore::Ignore;
use printer::Printer;
use search::Searcher;
use search::{InputBuffer, Searcher};
macro_rules! errored {
($($tt:tt)*) => {
@ -53,6 +58,7 @@ mod walk;
const USAGE: &'static str = "
Usage: xrep [options] <pattern> [<path> ...]
xrep --files [<path> ...]
xrep is like the silver searcher and grep, but faster than both.
@ -66,7 +72,7 @@ Options:
-L, --follow Follow symlinks.
--hidden Search hidden directories and files.
-i, --ignore-case Case insensitive search.
--threads ARG The number of threads to use. Defaults to the number
-t, --threads ARG The number of threads to use. Defaults to the number
of logical CPUs. [default: 0]
";
@ -83,18 +89,12 @@ struct Args {
flag_threads: usize,
}
impl Args {
fn printer<W: io::Write>(&self, wtr: W) -> Printer<W> {
Printer::new(wtr)
}
}
pub type Result<T> = result::Result<T, Box<Error + Send + Sync>>;
fn main() {
let args: Args = Docopt::new(USAGE).and_then(|d| d.decode())
.unwrap_or_else(|e| e.exit());
match real_main(args) {
match run(args) {
Ok(_) => process::exit(0),
Err(err) => {
let _ = writeln!(&mut io::stderr(), "{}", err);
@ -103,7 +103,7 @@ fn main() {
}
}
fn real_main(args: Args) -> Result<()> {
fn run(mut args: Args) -> Result<()> {
let mut logb = env_logger::LogBuilder::new();
if args.flag_debug {
logb.filter(None, log::LogLevelFilter::Debug);
@ -111,185 +111,137 @@ fn real_main(args: Args) -> Result<()> {
logb.filter(None, log::LogLevelFilter::Warn);
}
if let Err(err) = logb.init() {
return errored!("failed to initialize logger: {}", err);
errored!("failed to initialize logger: {}", err);
}
let mut main = Main::new(args);
try!(main.run_workers());
let writer = main.run_writer();
main.scan();
main.finish_workers();
main.chan_results.push(Message::Quit);
writer.join().unwrap();
if args.arg_path.is_empty() {
args.arg_path.push("./".to_string());
}
if args.arg_path.iter().any(|p| p == "-") {
errored!("searching <stdin> isn't yet supported");
}
if args.flag_files {
return run_files(args);
}
let args = Arc::new(args);
let mut workers = vec![];
let stdout = Arc::new(Mutex::new(io::BufWriter::new(io::stdout())));
let chan_work_send = {
let chan_work = Arc::new(SegQueue::new());
for _ in 0..args.num_workers() {
let grepb =
GrepBuilder::new(&args.arg_pattern)
.case_insensitive(args.flag_ignore_case);
let worker = Worker {
args: args.clone(),
stdout: stdout.clone(),
chan_work: chan_work.clone(),
inpbuf: InputBuffer::new(),
outbuf: Some(vec![]),
grep: try!(grepb.build()),
};
workers.push(thread::spawn(move || worker.run()));
}
chan_work
};
for p in &args.arg_path {
for path in args.walker(p) {
chan_work_send.push(Message::Some(path));
}
}
for _ in 0..workers.len() {
chan_work_send.push(Message::Quit);
}
for worker in workers {
worker.join().unwrap();
}
Ok(())
}
type ChanWork = Arc<MsQueue<Message<Work>>>;
fn run_files(args: Args) -> Result<()> {
let mut printer = Printer::new(io::BufWriter::new(io::stdout()));
for p in &args.arg_path {
for path in args.walker(p) {
printer.path(path);
}
}
Ok(())
}
type ChanResults = Arc<MsQueue<Message<Vec<u8>>>>;
impl Args {
fn printer<W: io::Write>(&self, wtr: W) -> Printer<W> {
Printer::new(wtr)
}
fn num_workers(&self) -> usize {
let mut num = self.flag_threads;
if num == 0 {
num = cmp::min(8, num_cpus::get());
}
num
}
fn walker<P: AsRef<Path>>(&self, path: P) -> walk::Iter {
let wd = WalkDir::new(path).follow_links(self.flag_follow);
let mut ig = Ignore::new();
ig.ignore_hidden(!self.flag_hidden);
walk::Iter::new(ig, wd)
}
}
enum Message<T> {
Some(T),
Quit,
}
struct Main {
args: Arc<Args>,
chan_work: ChanWork,
chan_results: ChanResults,
bufs: Arc<Bufs>,
workers: Vec<thread::JoinHandle<()>>,
}
impl Main {
fn new(mut args: Args) -> Main {
if args.arg_path.is_empty() {
args.arg_path.push("./".to_string());
}
Main {
args: Arc::new(args),
chan_work: Arc::new(MsQueue::new()),
chan_results: Arc::new(MsQueue::new()),
bufs: Arc::new(Bufs::new()),
workers: vec![],
}
}
fn scan(&mut self) {
for p in &self.args.arg_path {
if p == "-" {
eprintln!("searching <stdin> isn't yet supported");
continue;
}
let wd = WalkDir::new(p).follow_links(self.args.flag_follow);
let mut ig = Ignore::new();
ig.ignore_hidden(!self.args.flag_hidden);
for ent in walk::Iter::new(ig, wd) {
let mut path = ent.path();
if let Ok(p) = path.strip_prefix("./") {
path = p;
}
self.chan_work.push(Message::Some(Work {
path: path.to_path_buf(),
out: self.bufs.pop(),
}));
}
}
}
fn run_writer(&self) -> thread::JoinHandle<()> {
let wtr = Writer {
args: self.args.clone(),
chan_results: self.chan_results.clone(),
bufs: self.bufs.clone(),
};
thread::spawn(move || wtr.run())
}
fn run_workers(&mut self) -> Result<()> {
let mut num = self.args.flag_threads;
if num == 0 {
num = num_cpus::get();
}
if num < 4 {
num = 1;
} else {
num -= 2;
}
println!("running {} workers", num);
for _ in 0..num {
try!(self.run_worker());
}
Ok(())
}
fn run_worker(&mut self) -> Result<()> {
let grepb =
GrepBuilder::new(&self.args.arg_pattern)
.case_insensitive(self.args.flag_ignore_case);
let worker = Worker {
args: self.args.clone(),
chan_work: self.chan_work.clone(),
chan_results: self.chan_results.clone(),
grep: try!(grepb.build()),
};
self.workers.push(thread::spawn(move || worker.run()));
Ok(())
}
fn finish_workers(&mut self) {
// We can stop all of the works by sending a quit message.
// Each worker is guaranteed to receive the quit message exactly
// once, so we only need to send `self.workers.len()` of them
for _ in 0..self.workers.len() {
self.chan_work.push(Message::Quit);
}
// Now wait for each to finish.
while let Some(thread) = self.workers.pop() {
thread.join().unwrap();
}
}
}
struct Writer {
args: Arc<Args>,
chan_results: ChanResults,
bufs: Arc<Bufs>,
}
impl Writer {
fn run(self) {
let mut stdout = io::BufWriter::new(io::stdout());
while let Message::Some(res) = self.chan_results.pop() {
let _ = stdout.write_all(&res);
self.bufs.push(res);
}
}
}
struct Work {
path: PathBuf,
out: Vec<u8>,
}
struct Worker {
args: Arc<Args>,
chan_work: ChanWork,
chan_results: ChanResults,
stdout: Arc<Mutex<io::BufWriter<io::Stdout>>>,
chan_work: Arc<SegQueue<Message<PathBuf>>>,
inpbuf: InputBuffer,
outbuf: Option<Vec<u8>>,
grep: Grep,
}
impl Worker {
fn run(self) {
while let Message::Some(mut work) = self.chan_work.pop() {
work.out.clear();
let printer = self.args.printer(work.out);
let searcher = Searcher::new(&self.grep, work.path).unwrap();
let buf = searcher.search(printer);
self.chan_results.push(Message::Some(buf));
fn run(mut self) {
loop {
let path = match self.chan_work.try_pop() {
None => continue,
Some(Message::Quit) => break,
Some(Message::Some(path)) => path,
};
let file = match File::open(&path) {
Ok(file) => file,
Err(err) => {
eprintln!("{}: {}", path.display(), err);
continue;
}
};
let mut outbuf = self.outbuf.take().unwrap();
outbuf.clear();
let mut printer = self.args.printer(outbuf);
{
let searcher = Searcher {
grep: &self.grep,
path: &path,
haystack: file,
inp: &mut self.inpbuf,
printer: &mut printer,
};
if let Err(err) = searcher.run() {
eprintln!("{}", err);
}
}
let outbuf = printer.into_inner();
if !outbuf.is_empty() {
let mut stdout = self.stdout.lock();
let _ = stdout.write_all(&outbuf);
let _ = stdout.flush();
}
self.outbuf = Some(outbuf);
}
}
}
/// A pool of buffers used by each worker thread to write matches.
struct Bufs {
bufs: TreiberStack<Vec<u8>>,
}
impl Bufs {
pub fn new() -> Bufs {
Bufs { bufs: TreiberStack::new() }
}
pub fn pop(&self) -> Vec<u8> {
match self.bufs.pop() {
None => vec![],
Some(buf) => buf,
}
}
pub fn push(&self, buf: Vec<u8>) {
self.bufs.push(buf);
}
}