1
0
mirror of https://github.com/BurntSushi/ripgrep.git synced 2025-03-03 14:32:22 +02:00

ignore: allow parallel walker to borrow data

This makes it so the caller can more easily refactor from
single-threaded to multi-threaded walking. If they want to support both,
this makes it easier to do so with a single initialization code-path. In
particular, it side-steps the need to put everything into an `Arc`.

This is not a breaking change because it strictly increases the number
of allowed inputs to `WalkParallel::run`.

Closes #1410, Closes #1432
This commit is contained in:
Ed Page 2019-10-26 19:02:10 -06:00 committed by Andrew Gallant
parent 5c1eac41a3
commit 9f7c2ebc09
3 changed files with 79 additions and 64 deletions

1
Cargo.lock generated
View File

@ -238,6 +238,7 @@ name = "ignore"
version = "0.4.11" version = "0.4.11"
dependencies = [ dependencies = [
"crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"globset 0.4.4", "globset 0.4.4",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -19,6 +19,7 @@ bench = false
[dependencies] [dependencies]
crossbeam-channel = "0.4.0" crossbeam-channel = "0.4.0"
crossbeam-utils = "0.7.0"
globset = { version = "0.4.3", path = "../globset" } globset = { version = "0.4.3", path = "../globset" }
lazy_static = "1.1" lazy_static = "1.1"
log = "0.4.5" log = "0.4.5"

View File

@ -1068,6 +1068,10 @@ impl WalkState {
} }
} }
type FnVisitor<'s> = Box<
dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's
>;
/// WalkParallel is a parallel recursive directory iterator over files paths /// WalkParallel is a parallel recursive directory iterator over files paths
/// in one or more directories. /// in one or more directories.
/// ///
@ -1091,11 +1095,10 @@ impl WalkParallel {
/// Execute the parallel recursive directory iterator. `mkf` is called /// Execute the parallel recursive directory iterator. `mkf` is called
/// for each thread used for iteration. The function produced by `mkf` /// for each thread used for iteration. The function produced by `mkf`
/// is then in turn called for each visited file path. /// is then in turn called for each visited file path.
pub fn run<F>(self, mut mkf: F) pub fn run<'s, F>(mut self, mut mkf: F)
where where
F: FnMut() -> Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>, F: FnMut() -> FnVisitor<'s>,
{ {
let mut f = mkf();
let threads = self.threads(); let threads = self.threads();
// TODO: Figure out how to use a bounded channel here. With an // TODO: Figure out how to use a bounded channel here. With an
// unbounded channel, the workers can run away and fill up memory // unbounded channel, the workers can run away and fill up memory
@ -1106,21 +1109,37 @@ impl WalkParallel {
// this. The best case scenario would be finding a way to use rayon // this. The best case scenario would be finding a way to use rayon
// to do this. // to do this.
let (tx, rx) = channel::unbounded(); let (tx, rx) = channel::unbounded();
let mut any_work = false; {
// Send the initial set of root paths to the pool of workers. let mut f = mkf();
// Note that we only send directories. For files, we send to them the let mut any_work = false;
// callback directly. let mut paths = Vec::new().into_iter();
for path in self.paths { std::mem::swap(&mut paths, &mut self.paths);
let (dent, root_device) = if path == Path::new("-") { // Send the initial set of root paths to the pool of workers. Note
(DirEntry::new_stdin(), None) // that we only send directories. For files, we send to them the
} else { // callback directly.
let root_device = if !self.same_file_system { for path in paths {
None let (dent, root_device) = if path == Path::new("-") {
(DirEntry::new_stdin(), None)
} else { } else {
match device_num(&path) { let root_device = if !self.same_file_system {
Ok(root_device) => Some(root_device), None
} else {
match device_num(&path) {
Ok(root_device) => Some(root_device),
Err(err) => {
let err = Error::Io(err).with_path(path);
if f(Err(err)).is_quit() {
return;
}
continue;
}
}
};
match DirEntryRaw::from_path(0, path, false) {
Ok(dent) => {
(DirEntry::new_raw(dent, None), root_device)
}
Err(err) => { Err(err) => {
let err = Error::Io(err).with_path(path);
if f(Err(err)).is_quit() { if f(Err(err)).is_quit() {
return; return;
} }
@ -1128,56 +1147,50 @@ impl WalkParallel {
} }
} }
}; };
match DirEntryRaw::from_path(0, path, false) { tx.send(Message::Work(Work {
Ok(dent) => (DirEntry::new_raw(dent, None), root_device), dent: dent,
Err(err) => { ignore: self.ig_root.clone(),
if f(Err(err)).is_quit() { root_device: root_device,
return; }))
} .unwrap();
continue; any_work = true;
} }
} // ... but there's no need to start workers if we don't need them.
}; if !any_work {
tx.send(Message::Work(Work { return;
dent: dent, }
ignore: self.ig_root.clone(),
root_device: root_device,
}))
.unwrap();
any_work = true;
}
// ... but there's no need to start workers if we don't need them.
if !any_work {
return;
} }
// Create the workers and then wait for them to finish. // Create the workers and then wait for them to finish.
let num_waiting = Arc::new(AtomicUsize::new(0)); let num_waiting = Arc::new(AtomicUsize::new(0));
let num_quitting = Arc::new(AtomicUsize::new(0)); let num_quitting = Arc::new(AtomicUsize::new(0));
let quit_now = Arc::new(AtomicBool::new(false)); let quit_now = Arc::new(AtomicBool::new(false));
let mut handles = vec![]; crossbeam_utils::thread::scope(|s| {
for _ in 0..threads { let mut handles = vec![];
let worker = Worker { for _ in 0..threads {
f: mkf(), let worker = Worker {
tx: tx.clone(), f: mkf(),
rx: rx.clone(), tx: tx.clone(),
quit_now: quit_now.clone(), rx: rx.clone(),
is_waiting: false, quit_now: quit_now.clone(),
is_quitting: false, is_waiting: false,
num_waiting: num_waiting.clone(), is_quitting: false,
num_quitting: num_quitting.clone(), num_waiting: num_waiting.clone(),
threads: threads, num_quitting: num_quitting.clone(),
max_depth: self.max_depth, threads: threads,
max_filesize: self.max_filesize, max_depth: self.max_depth,
follow_links: self.follow_links, max_filesize: self.max_filesize,
skip: self.skip.clone(), follow_links: self.follow_links,
}; skip: self.skip.clone(),
handles.push(thread::spawn(|| worker.run())); };
} handles.push(s.spawn(|_| worker.run()));
drop(tx); }
drop(rx); drop(tx);
for handle in handles { drop(rx);
handle.join().unwrap(); for handle in handles {
} handle.join().unwrap();
}
})
.unwrap(); // Pass along panics from threads
} }
fn threads(&self) -> usize { fn threads(&self) -> usize {
@ -1267,9 +1280,9 @@ impl Work {
/// ignore matchers, producing new work and invoking the caller's callback. /// ignore matchers, producing new work and invoking the caller's callback.
/// ///
/// Note that a worker is *both* a producer and a consumer. /// Note that a worker is *both* a producer and a consumer.
struct Worker { struct Worker<'s> {
/// The caller's callback. /// The caller's callback.
f: Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>, f: FnVisitor<'s>,
/// The push side of our mpmc queue. /// The push side of our mpmc queue.
tx: channel::Sender<Message>, tx: channel::Sender<Message>,
/// The receive side of our mpmc queue. /// The receive side of our mpmc queue.
@ -1303,7 +1316,7 @@ struct Worker {
skip: Option<Arc<Handle>>, skip: Option<Arc<Handle>>,
} }
impl Worker { impl<'s> Worker<'s> {
/// Runs this worker until there is no more work left to do. /// Runs this worker until there is no more work left to do.
/// ///
/// The worker will call the caller's callback for all entries that aren't /// The worker will call the caller's callback for all entries that aren't