diff --git a/Cargo.lock b/Cargo.lock index adaf3f76..3c6dacb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,7 @@ name = "ignore" version = "0.4.11" dependencies = [ "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.4.4", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ignore/Cargo.toml b/ignore/Cargo.toml index 20b58016..5f201fa0 100644 --- a/ignore/Cargo.toml +++ b/ignore/Cargo.toml @@ -19,6 +19,7 @@ bench = false [dependencies] crossbeam-channel = "0.4.0" +crossbeam-utils = "0.7.0" globset = { version = "0.4.3", path = "../globset" } lazy_static = "1.1" log = "0.4.5" diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index e00b29a5..658c2dbb 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -1068,6 +1068,10 @@ impl WalkState { } } +type FnVisitor<'s> = Box< + dyn FnMut(Result) -> WalkState + Send + 's +>; + /// WalkParallel is a parallel recursive directory iterator over files paths /// in one or more directories. /// @@ -1091,11 +1095,10 @@ impl WalkParallel { /// Execute the parallel recursive directory iterator. `mkf` is called /// for each thread used for iteration. The function produced by `mkf` /// is then in turn called for each visited file path. - pub fn run(self, mut mkf: F) + pub fn run<'s, F>(mut self, mut mkf: F) where - F: FnMut() -> Box) -> WalkState + Send + 'static>, + F: FnMut() -> FnVisitor<'s>, { - let mut f = mkf(); let threads = self.threads(); // TODO: Figure out how to use a bounded channel here. With an // unbounded channel, the workers can run away and fill up memory @@ -1106,21 +1109,37 @@ impl WalkParallel { // this. The best case scenario would be finding a way to use rayon // to do this. let (tx, rx) = channel::unbounded(); - let mut any_work = false; - // Send the initial set of root paths to the pool of workers. - // Note that we only send directories. For files, we send to them the - // callback directly. - for path in self.paths { - let (dent, root_device) = if path == Path::new("-") { - (DirEntry::new_stdin(), None) - } else { - let root_device = if !self.same_file_system { - None + { + let mut f = mkf(); + let mut any_work = false; + let mut paths = Vec::new().into_iter(); + std::mem::swap(&mut paths, &mut self.paths); + // Send the initial set of root paths to the pool of workers. Note + // that we only send directories. For files, we send to them the + // callback directly. + for path in paths { + let (dent, root_device) = if path == Path::new("-") { + (DirEntry::new_stdin(), None) } else { - match device_num(&path) { - Ok(root_device) => Some(root_device), + let root_device = if !self.same_file_system { + None + } else { + match device_num(&path) { + Ok(root_device) => Some(root_device), + Err(err) => { + let err = Error::Io(err).with_path(path); + if f(Err(err)).is_quit() { + return; + } + continue; + } + } + }; + match DirEntryRaw::from_path(0, path, false) { + Ok(dent) => { + (DirEntry::new_raw(dent, None), root_device) + } Err(err) => { - let err = Error::Io(err).with_path(path); if f(Err(err)).is_quit() { return; } @@ -1128,56 +1147,50 @@ impl WalkParallel { } } }; - match DirEntryRaw::from_path(0, path, false) { - Ok(dent) => (DirEntry::new_raw(dent, None), root_device), - Err(err) => { - if f(Err(err)).is_quit() { - return; - } - continue; - } - } - }; - tx.send(Message::Work(Work { - dent: dent, - ignore: self.ig_root.clone(), - root_device: root_device, - })) - .unwrap(); - any_work = true; - } - // ... but there's no need to start workers if we don't need them. - if !any_work { - return; + tx.send(Message::Work(Work { + dent: dent, + ignore: self.ig_root.clone(), + root_device: root_device, + })) + .unwrap(); + any_work = true; + } + // ... but there's no need to start workers if we don't need them. + if !any_work { + return; + } } // Create the workers and then wait for them to finish. let num_waiting = Arc::new(AtomicUsize::new(0)); let num_quitting = Arc::new(AtomicUsize::new(0)); let quit_now = Arc::new(AtomicBool::new(false)); - let mut handles = vec![]; - for _ in 0..threads { - let worker = Worker { - f: mkf(), - tx: tx.clone(), - rx: rx.clone(), - quit_now: quit_now.clone(), - is_waiting: false, - is_quitting: false, - num_waiting: num_waiting.clone(), - num_quitting: num_quitting.clone(), - threads: threads, - max_depth: self.max_depth, - max_filesize: self.max_filesize, - follow_links: self.follow_links, - skip: self.skip.clone(), - }; - handles.push(thread::spawn(|| worker.run())); - } - drop(tx); - drop(rx); - for handle in handles { - handle.join().unwrap(); - } + crossbeam_utils::thread::scope(|s| { + let mut handles = vec![]; + for _ in 0..threads { + let worker = Worker { + f: mkf(), + tx: tx.clone(), + rx: rx.clone(), + quit_now: quit_now.clone(), + is_waiting: false, + is_quitting: false, + num_waiting: num_waiting.clone(), + num_quitting: num_quitting.clone(), + threads: threads, + max_depth: self.max_depth, + max_filesize: self.max_filesize, + follow_links: self.follow_links, + skip: self.skip.clone(), + }; + handles.push(s.spawn(|_| worker.run())); + } + drop(tx); + drop(rx); + for handle in handles { + handle.join().unwrap(); + } + }) + .unwrap(); // Pass along panics from threads } fn threads(&self) -> usize { @@ -1267,9 +1280,9 @@ impl Work { /// ignore matchers, producing new work and invoking the caller's callback. /// /// Note that a worker is *both* a producer and a consumer. -struct Worker { +struct Worker<'s> { /// The caller's callback. - f: Box) -> WalkState + Send + 'static>, + f: FnVisitor<'s>, /// The push side of our mpmc queue. tx: channel::Sender, /// The receive side of our mpmc queue. @@ -1303,7 +1316,7 @@ struct Worker { skip: Option>, } -impl Worker { +impl<'s> Worker<'s> { /// Runs this worker until there is no more work left to do. /// /// The worker will call the caller's callback for all entries that aren't