diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs index b2063cde..ecf87b8d 100644 --- a/crates/ignore/src/walk.rs +++ b/crates/ignore/src/walk.rs @@ -1073,6 +1073,10 @@ pub enum WalkState { } impl WalkState { + fn is_continue(&self) -> bool { + *self == WalkState::Continue + } + fn is_quit(&self) -> bool { *self == WalkState::Quit } @@ -1191,6 +1195,7 @@ impl WalkParallel { // this. The best case scenario would be finding a way to use rayon // to do this. let (tx, rx) = channel::unbounded(); + let num_pending = Arc::new(AtomicUsize::new(0)); { let mut visitor = builder.build(); let mut any_work = false; @@ -1229,6 +1234,7 @@ impl WalkParallel { } } }; + num_pending.fetch_add(1, Ordering::SeqCst); tx.send(Message::Work(Work { dent: dent, ignore: self.ig_root.clone(), @@ -1243,7 +1249,6 @@ impl WalkParallel { } } // Create the workers and then wait for them to finish. - let num_running = Arc::new(AtomicUsize::new(threads)); let quit_now = Arc::new(AtomicBool::new(false)); crossbeam_utils::thread::scope(|s| { let mut handles = vec![]; @@ -1253,7 +1258,7 @@ impl WalkParallel { tx: tx.clone(), rx: rx.clone(), quit_now: quit_now.clone(), - num_running: num_running.clone(), + num_pending: num_pending.clone(), max_depth: self.max_depth, max_filesize: self.max_filesize, follow_links: self.follow_links, @@ -1368,8 +1373,8 @@ struct Worker<'s> { /// that we need this because we don't want other `Work` to be done after /// we quit. We wouldn't need this if have a priority channel. quit_now: Arc, - /// The number of workers waiting for more work. - num_running: Arc, + /// The number of outstanding work items. + num_pending: Arc, /// The maximum depth of directories to descend. A value of `0` means no /// descension at all. max_depth: Option, @@ -1390,91 +1395,89 @@ impl<'s> Worker<'s> { /// The worker will call the caller's callback for all entries that aren't /// skipped by the ignore matcher. fn run(mut self) { - 'get_work: while let Some(mut work) = self.get_work() { - // If the work is not a directory, then we can just execute the - // caller's callback immediately and move on. - if work.is_symlink() || !work.is_dir() { - if self.visitor.visit(Ok(work.dent)).is_quit() { - self.quit_now(); - } - continue; - } - if let Some(err) = work.add_parents() { - if self.visitor.visit(Err(err)).is_quit() { - self.quit_now(); - continue; - } - } - - let descend = if let Some(root_device) = work.root_device { - match is_same_file_system(root_device, work.dent.path()) { - Ok(true) => true, - Ok(false) => false, - Err(err) => { - if self.visitor.visit(Err(err)).is_quit() { - self.quit_now(); - continue; - } - false - } - } - } else { - true - }; - - // Try to read the directory first before we transfer ownership - // to the provided closure. Do not unwrap it immediately, though, - // as we may receive an `Err` value e.g. in the case when we do not - // have sufficient read permissions to list the directory. - // In that case we still want to provide the closure with a valid - // entry before passing the error value. - let readdir = work.read_dir(); - let depth = work.dent.depth(); - match self.visitor.visit(Ok(work.dent)) { - WalkState::Continue => {} - WalkState::Skip => continue, - WalkState::Quit => { - self.quit_now(); - continue; - } - } - if !descend { - continue; - } - - let readdir = match readdir { - Ok(readdir) => readdir, - Err(err) => { - if self.visitor.visit(Err(err)).is_quit() { - self.quit_now(); - } - continue; - } - }; - - if self.max_depth.map_or(false, |max| depth >= max) { - continue; - } - for result in readdir { - let state = self.run_one( - &work.ignore, - depth + 1, - work.root_device, - result, - ); - if state.is_quit() { - self.quit_now(); - continue 'get_work; - } + while let Some(work) = self.get_work() { + if let WalkState::Quit = self.run_one(work) { + self.quit_now(); } + self.work_done(); } } - /// Runs the worker on a single entry from a directory iterator. + fn run_one(&mut self, mut work: Work) -> WalkState { + // If the work is not a directory, then we can just execute the + // caller's callback immediately and move on. + if work.is_symlink() || !work.is_dir() { + return self.visitor.visit(Ok(work.dent)); + } + if let Some(err) = work.add_parents() { + let state = self.visitor.visit(Err(err)); + if state.is_quit() { + return state; + } + } + + let descend = if let Some(root_device) = work.root_device { + match is_same_file_system(root_device, work.dent.path()) { + Ok(true) => true, + Ok(false) => false, + Err(err) => { + let state = self.visitor.visit(Err(err)); + if state.is_quit() { + return state; + } + false + } + } + } else { + true + }; + + // Try to read the directory first before we transfer ownership + // to the provided closure. Do not unwrap it immediately, though, + // as we may receive an `Err` value e.g. in the case when we do not + // have sufficient read permissions to list the directory. + // In that case we still want to provide the closure with a valid + // entry before passing the error value. + let readdir = work.read_dir(); + let depth = work.dent.depth(); + let state = self.visitor.visit(Ok(work.dent)); + if !state.is_continue() { + return state; + } + if !descend { + return WalkState::Skip; + } + + let readdir = match readdir { + Ok(readdir) => readdir, + Err(err) => { + return self.visitor.visit(Err(err)); + } + }; + + if self.max_depth.map_or(false, |max| depth >= max) { + return WalkState::Skip; + } + for result in readdir { + let state = self.generate_work( + &work.ignore, + depth + 1, + work.root_device, + result, + ); + if state.is_quit() { + return state; + } + } + WalkState::Continue + } + + /// Decides whether to submit the given directory entry as a file to + /// search. /// /// If the entry is a path that should be ignored, then this is a no-op. /// Otherwise, the entry is pushed on to the queue. (The actual execution - /// of the callback happens in `run`.) + /// of the callback happens in `run_one`.) /// /// If an error occurs while reading the entry, then it is sent to the /// caller's callback. @@ -1482,7 +1485,7 @@ impl<'s> Worker<'s> { /// `ig` is the `Ignore` matcher for the parent directory. `depth` should /// be the depth of this entry. `result` should be the item yielded by /// a directory iterator. - fn run_one( + fn generate_work( &mut self, ig: &Ignore, depth: usize, @@ -1540,13 +1543,7 @@ impl<'s> Worker<'s> { }; if !should_skip_path && !should_skip_filesize { - self.tx - .send(Message::Work(Work { - dent: dent, - ignore: ig.clone(), - root_device: root_device, - })) - .unwrap(); + self.send(Work { dent, ignore: ig.clone(), root_device }); } WalkState::Continue } @@ -1571,15 +1568,19 @@ impl<'s> Worker<'s> { // Repeat quit message to wake up sleeping threads, if // any. The domino effect will ensure that every thread // will quit. - self.waiting(); self.tx.send(Message::Quit).unwrap(); return None; } Err(TryRecvError::Empty) => { - // If it was the last running thread, then no more work can - // arrive, thus we can safely start quitting. Otherwise, a - // thread may spawn new work to be done. - if self.waiting() == 1 { + // Once num_pending reaches 0, it is impossible for it to + // ever increase again. Namely, it only reaches 0 once + // all jobs have run such that no jobs have produced more + // work. We have this guarantee because num_pending is + // always incremented before each job is submitted and only + // decremented once each job is completely finished. + // Therefore, if this reaches zero, then there can be no + // other job running. + if self.num_pending() == 0 { // Every other thread is blocked at the next recv(). // Send the initial quit message and quit. self.tx.send(Message::Quit).unwrap(); @@ -1590,7 +1591,6 @@ impl<'s> Worker<'s> { .rx .recv() .expect("channel disconnected while worker is alive")); - self.resume(); } Err(TryRecvError::Disconnected) => { unreachable!("channel disconnected while worker is alive"); @@ -1609,15 +1609,20 @@ impl<'s> Worker<'s> { self.quit_now.load(Ordering::SeqCst) } - /// Sets this worker's "running" state to false. Returns the previous - /// number of running workers. - fn waiting(&self) -> usize { - self.num_running.fetch_sub(1, Ordering::SeqCst) + /// Returns the number of pending jobs. + fn num_pending(&self) -> usize { + self.num_pending.load(Ordering::SeqCst) } - /// Sets this worker's "running" state to true. - fn resume(&self) { - self.num_running.fetch_add(1, Ordering::SeqCst); + /// Send work. + fn send(&self, work: Work) { + self.num_pending.fetch_add(1, Ordering::SeqCst); + self.tx.send(Message::Work(work)).unwrap(); + } + + /// Signal that work has been received. + fn work_done(&self) { + self.num_pending.fetch_sub(1, Ordering::SeqCst); } }