mirror of
https://github.com/BurntSushi/ripgrep.git
synced 2025-04-14 00:58:43 +02:00
ignore: allow post-processing at end-of-thread
On top of the parallel-walk's closures, this provides a Visitor API. This clarifies the role of the two different closures in the `run` API and allows implementing of `Drop` for post-processing once traversal is finished. The closure API is maintained not just for compatibility but also convinience for simple cases. Fixes #469, Closes #1430
This commit is contained in:
parent
578e2d47a8
commit
f8e70294d5
@ -65,7 +65,10 @@ use std::fmt;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
pub use walk::{DirEntry, Walk, WalkBuilder, WalkParallel, WalkState};
|
||||
pub use walk::{
|
||||
DirEntry, Walk, WalkBuilder, WalkParallel, WalkState,
|
||||
ParallelVisitorBuilder, ParallelVisitor,
|
||||
};
|
||||
|
||||
mod dir;
|
||||
pub mod gitignore;
|
||||
|
@ -1068,10 +1068,60 @@ impl WalkState {
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder for constructing a visitor when using
|
||||
/// [`WalkParallel::visit`](struct.WalkParallel.html#method.visit). The builder
|
||||
/// will be called for each thread started by `WalkParallel`. The visitor
|
||||
/// returned from each builder is then called for every directory entry.
|
||||
pub trait ParallelVisitorBuilder<'s> {
|
||||
/// Create per-thread `ParallelVisitor`s for `WalkParallel`.
|
||||
fn build(&mut self) -> Box<dyn ParallelVisitor + 's>;
|
||||
}
|
||||
|
||||
impl<'a, 's, P: ParallelVisitorBuilder<'s>>
|
||||
ParallelVisitorBuilder<'s> for &'a mut P
|
||||
{
|
||||
fn build(&mut self) -> Box<dyn ParallelVisitor + 's> {
|
||||
(**self).build()
|
||||
}
|
||||
}
|
||||
|
||||
/// Receives files and directories for the current thread.
|
||||
///
|
||||
/// Setup for the traversal can be implemented as part of
|
||||
/// [`ParallelVisitorBuilder::build`](trait.ParallelVisitorBuilder.html#tymethod.build).
|
||||
/// Teardown when traversal finishes can be implemented by implementing the
|
||||
/// `Drop` trait on your traversal type.
|
||||
pub trait ParallelVisitor: Send {
|
||||
/// Receives files and directories for the current thread. This is called
|
||||
/// once for every directory entry visited by traversal.
|
||||
fn visit(&mut self, entry: Result<DirEntry, Error>) -> WalkState;
|
||||
}
|
||||
|
||||
struct FnBuilder<F> {
|
||||
builder: F,
|
||||
}
|
||||
|
||||
impl<'s, F: FnMut() -> FnVisitor<'s>> ParallelVisitorBuilder<'s> for FnBuilder<F> {
|
||||
fn build(&mut self) -> Box<dyn ParallelVisitor + 's> {
|
||||
let visitor = (self.builder)();
|
||||
Box::new(FnVisitorImp { visitor })
|
||||
}
|
||||
}
|
||||
|
||||
type FnVisitor<'s> = Box<
|
||||
dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's
|
||||
>;
|
||||
|
||||
struct FnVisitorImp<'s> {
|
||||
visitor: FnVisitor<'s>,
|
||||
}
|
||||
|
||||
impl<'s> ParallelVisitor for FnVisitorImp<'s> {
|
||||
fn visit(&mut self, entry: Result<DirEntry, Error>) -> WalkState {
|
||||
(self.visitor)(entry)
|
||||
}
|
||||
}
|
||||
|
||||
/// WalkParallel is a parallel recursive directory iterator over files paths
|
||||
/// in one or more directories.
|
||||
///
|
||||
@ -1095,10 +1145,31 @@ impl WalkParallel {
|
||||
/// Execute the parallel recursive directory iterator. `mkf` is called
|
||||
/// for each thread used for iteration. The function produced by `mkf`
|
||||
/// is then in turn called for each visited file path.
|
||||
pub fn run<'s, F>(mut self, mut mkf: F)
|
||||
pub fn run<'s, F>(self, mkf: F)
|
||||
where
|
||||
F: FnMut() -> FnVisitor<'s>,
|
||||
{
|
||||
self.visit(&mut FnBuilder { builder: mkf })
|
||||
}
|
||||
|
||||
/// Execute the parallel recursive directory iterator using a custom
|
||||
/// visitor.
|
||||
///
|
||||
/// The builder given is used to construct a visitor for every thread
|
||||
/// used by this traversal. The visitor returned from each builder is then
|
||||
/// called for every directory entry seen by that thread.
|
||||
///
|
||||
/// Typically, creating a custom visitor is useful if you need to perform
|
||||
/// some kind of cleanup once traversal is finished. This can be achieved
|
||||
/// by implementing `Drop` for your builder (or for your visitor, if you
|
||||
/// want to execute cleanup for every thread that is launched).
|
||||
///
|
||||
/// For example, each visitor might build up a data structure of results
|
||||
/// corresponding to the directory entries seen for each thread. Since each
|
||||
/// visitor runs on only one thread, this build-up can be done without
|
||||
/// synchronization. Then, once traversal is complete, all of the results
|
||||
/// can be merged together into a single data structure.
|
||||
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder) {
|
||||
let threads = self.threads();
|
||||
// TODO: Figure out how to use a bounded channel here. With an
|
||||
// unbounded channel, the workers can run away and fill up memory
|
||||
@ -1110,7 +1181,7 @@ impl WalkParallel {
|
||||
// to do this.
|
||||
let (tx, rx) = channel::unbounded();
|
||||
{
|
||||
let mut f = mkf();
|
||||
let mut visitor = builder.build();
|
||||
let mut any_work = false;
|
||||
let mut paths = Vec::new().into_iter();
|
||||
std::mem::swap(&mut paths, &mut self.paths);
|
||||
@ -1128,7 +1199,7 @@ impl WalkParallel {
|
||||
Ok(root_device) => Some(root_device),
|
||||
Err(err) => {
|
||||
let err = Error::Io(err).with_path(path);
|
||||
if f(Err(err)).is_quit() {
|
||||
if visitor.visit(Err(err)).is_quit() {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
@ -1140,7 +1211,7 @@ impl WalkParallel {
|
||||
(DirEntry::new_raw(dent, None), root_device)
|
||||
}
|
||||
Err(err) => {
|
||||
if f(Err(err)).is_quit() {
|
||||
if visitor.visit(Err(err)).is_quit() {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
@ -1168,7 +1239,7 @@ impl WalkParallel {
|
||||
let mut handles = vec![];
|
||||
for _ in 0..threads {
|
||||
let worker = Worker {
|
||||
f: mkf(),
|
||||
visitor: builder.build(),
|
||||
tx: tx.clone(),
|
||||
rx: rx.clone(),
|
||||
quit_now: quit_now.clone(),
|
||||
@ -1282,7 +1353,7 @@ impl Work {
|
||||
/// Note that a worker is *both* a producer and a consumer.
|
||||
struct Worker<'s> {
|
||||
/// The caller's callback.
|
||||
f: FnVisitor<'s>,
|
||||
visitor: Box<dyn ParallelVisitor + 's>,
|
||||
/// The push side of our mpmc queue.
|
||||
tx: channel::Sender<Message>,
|
||||
/// The receive side of our mpmc queue.
|
||||
@ -1326,14 +1397,14 @@ impl<'s> Worker<'s> {
|
||||
// If the work is not a directory, then we can just execute the
|
||||
// caller's callback immediately and move on.
|
||||
if work.is_symlink() || !work.is_dir() {
|
||||
if (self.f)(Ok(work.dent)).is_quit() {
|
||||
if self.visitor.visit(Ok(work.dent)).is_quit() {
|
||||
self.quit_now();
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if let Some(err) = work.add_parents() {
|
||||
if (self.f)(Err(err)).is_quit() {
|
||||
if self.visitor.visit(Err(err)).is_quit() {
|
||||
self.quit_now();
|
||||
return;
|
||||
}
|
||||
@ -1341,7 +1412,7 @@ impl<'s> Worker<'s> {
|
||||
let readdir = match work.read_dir() {
|
||||
Ok(readdir) => readdir,
|
||||
Err(err) => {
|
||||
if (self.f)(Err(err)).is_quit() {
|
||||
if self.visitor.visit(Err(err)).is_quit() {
|
||||
self.quit_now();
|
||||
return;
|
||||
}
|
||||
@ -1353,7 +1424,7 @@ impl<'s> Worker<'s> {
|
||||
Ok(true) => true,
|
||||
Ok(false) => false,
|
||||
Err(err) => {
|
||||
if (self.f)(Err(err)).is_quit() {
|
||||
if self.visitor.visit(Err(err)).is_quit() {
|
||||
self.quit_now();
|
||||
return;
|
||||
}
|
||||
@ -1365,7 +1436,7 @@ impl<'s> Worker<'s> {
|
||||
};
|
||||
|
||||
let depth = work.dent.depth();
|
||||
match (self.f)(Ok(work.dent)) {
|
||||
match self.visitor.visit(Ok(work.dent)) {
|
||||
WalkState::Continue => {}
|
||||
WalkState::Skip => continue,
|
||||
WalkState::Quit => {
|
||||
@ -1411,13 +1482,13 @@ impl<'s> Worker<'s> {
|
||||
let fs_dent = match result {
|
||||
Ok(fs_dent) => fs_dent,
|
||||
Err(err) => {
|
||||
return (self.f)(Err(Error::from(err).with_depth(depth)));
|
||||
return self.visitor.visit(Err(Error::from(err).with_depth(depth)));
|
||||
}
|
||||
};
|
||||
let mut dent = match DirEntryRaw::from_entry(depth, &fs_dent) {
|
||||
Ok(dent) => DirEntry::new_raw(dent, None),
|
||||
Err(err) => {
|
||||
return (self.f)(Err(err));
|
||||
return self.visitor.visit(Err(err));
|
||||
}
|
||||
};
|
||||
let is_symlink = dent.file_type().map_or(false, |ft| ft.is_symlink());
|
||||
@ -1426,19 +1497,19 @@ impl<'s> Worker<'s> {
|
||||
dent = match DirEntryRaw::from_path(depth, path, true) {
|
||||
Ok(dent) => DirEntry::new_raw(dent, None),
|
||||
Err(err) => {
|
||||
return (self.f)(Err(err));
|
||||
return self.visitor.visit(Err(err));
|
||||
}
|
||||
};
|
||||
if dent.is_dir() {
|
||||
if let Err(err) = check_symlink_loop(ig, dent.path(), depth) {
|
||||
return (self.f)(Err(err));
|
||||
return self.visitor.visit(Err(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(ref stdout) = self.skip {
|
||||
let is_stdout = match path_equals(&dent, stdout) {
|
||||
Ok(is_stdout) => is_stdout,
|
||||
Err(err) => return (self.f)(Err(err)),
|
||||
Err(err) => return self.visitor.visit(Err(err)),
|
||||
};
|
||||
if is_stdout {
|
||||
return WalkState::Continue;
|
||||
|
Loading…
x
Reference in New Issue
Block a user