1
0
mirror of https://github.com/BurntSushi/ripgrep.git synced 2025-04-24 17:12:16 +02:00

ignore: fix parallel traversal

It turns out that the previous version wasn't quite correct. Namely, it
was possible for the following sequence to occur:

1. Consider that all workers, except for one, are `waiting`.
2. The last remaining worker finds one more job to do and sends it on
   the channel.
3. One of the previously `waiting` workers wakes up from the job that
   the last running worker sent, but `self.resume()` has not been
   called yet.
4. The last worker, from (2), calls `get_work` and sees that the
   channel has nothing on it, so it executes `self.waiting() ==
   1`. Since the worker in (3) hasn't called `self.resume()` yet,
   `self.waiting() == 1` evaluates to true.
5. This sets off a chain reaction that stops all workers, despite that
   fact that (3) got more work (which could itself spawn more work).

The end result is that the traversal may terminate while their are still
outstanding work items to process. This problem was observed through
spurious failures in CI. I was not actually able to reproduce the bug
locally.

We fix this by changing our strategy to detect termination using a
counter. Namely, we increment the counter just before sending new work
and decrement the counter just after finishing work. In this way, we
guarantee that the counter only ever reaches 0 once there is no more
work to process.

See #1337 for more discussion. Many thanks to @zsugabubus for helping me
work through this.
This commit is contained in:
Andrew Gallant 2020-02-20 15:08:37 -05:00
parent fab5c812f3
commit f314b0d55f

View File

@ -1073,6 +1073,10 @@ pub enum WalkState {
}
impl WalkState {
fn is_continue(&self) -> bool {
*self == WalkState::Continue
}
fn is_quit(&self) -> bool {
*self == WalkState::Quit
}
@ -1191,6 +1195,7 @@ impl WalkParallel {
// this. The best case scenario would be finding a way to use rayon
// to do this.
let (tx, rx) = channel::unbounded();
let num_pending = Arc::new(AtomicUsize::new(0));
{
let mut visitor = builder.build();
let mut any_work = false;
@ -1229,6 +1234,7 @@ impl WalkParallel {
}
}
};
num_pending.fetch_add(1, Ordering::SeqCst);
tx.send(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
@ -1243,7 +1249,6 @@ impl WalkParallel {
}
}
// Create the workers and then wait for them to finish.
let num_running = Arc::new(AtomicUsize::new(threads));
let quit_now = Arc::new(AtomicBool::new(false));
crossbeam_utils::thread::scope(|s| {
let mut handles = vec![];
@ -1253,7 +1258,7 @@ impl WalkParallel {
tx: tx.clone(),
rx: rx.clone(),
quit_now: quit_now.clone(),
num_running: num_running.clone(),
num_pending: num_pending.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
@ -1368,8 +1373,8 @@ struct Worker<'s> {
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
/// The number of workers waiting for more work.
num_running: Arc<AtomicUsize>,
/// The number of outstanding work items.
num_pending: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
@ -1390,91 +1395,89 @@ impl<'s> Worker<'s> {
/// The worker will call the caller's callback for all entries that aren't
/// skipped by the ignore matcher.
fn run(mut self) {
'get_work: while let Some(mut work) = self.get_work() {
// If the work is not a directory, then we can just execute the
// caller's callback immediately and move on.
if work.is_symlink() || !work.is_dir() {
if self.visitor.visit(Ok(work.dent)).is_quit() {
self.quit_now();
}
continue;
}
if let Some(err) = work.add_parents() {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
continue;
}
}
let descend = if let Some(root_device) = work.root_device {
match is_same_file_system(root_device, work.dent.path()) {
Ok(true) => true,
Ok(false) => false,
Err(err) => {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
continue;
}
false
}
}
} else {
true
};
// Try to read the directory first before we transfer ownership
// to the provided closure. Do not unwrap it immediately, though,
// as we may receive an `Err` value e.g. in the case when we do not
// have sufficient read permissions to list the directory.
// In that case we still want to provide the closure with a valid
// entry before passing the error value.
let readdir = work.read_dir();
let depth = work.dent.depth();
match self.visitor.visit(Ok(work.dent)) {
WalkState::Continue => {}
WalkState::Skip => continue,
WalkState::Quit => {
self.quit_now();
continue;
}
}
if !descend {
continue;
}
let readdir = match readdir {
Ok(readdir) => readdir,
Err(err) => {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
}
continue;
}
};
if self.max_depth.map_or(false, |max| depth >= max) {
continue;
}
for result in readdir {
let state = self.run_one(
&work.ignore,
depth + 1,
work.root_device,
result,
);
if state.is_quit() {
self.quit_now();
continue 'get_work;
}
while let Some(work) = self.get_work() {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
}
self.work_done();
}
}
/// Runs the worker on a single entry from a directory iterator.
fn run_one(&mut self, mut work: Work) -> WalkState {
// If the work is not a directory, then we can just execute the
// caller's callback immediately and move on.
if work.is_symlink() || !work.is_dir() {
return self.visitor.visit(Ok(work.dent));
}
if let Some(err) = work.add_parents() {
let state = self.visitor.visit(Err(err));
if state.is_quit() {
return state;
}
}
let descend = if let Some(root_device) = work.root_device {
match is_same_file_system(root_device, work.dent.path()) {
Ok(true) => true,
Ok(false) => false,
Err(err) => {
let state = self.visitor.visit(Err(err));
if state.is_quit() {
return state;
}
false
}
}
} else {
true
};
// Try to read the directory first before we transfer ownership
// to the provided closure. Do not unwrap it immediately, though,
// as we may receive an `Err` value e.g. in the case when we do not
// have sufficient read permissions to list the directory.
// In that case we still want to provide the closure with a valid
// entry before passing the error value.
let readdir = work.read_dir();
let depth = work.dent.depth();
let state = self.visitor.visit(Ok(work.dent));
if !state.is_continue() {
return state;
}
if !descend {
return WalkState::Skip;
}
let readdir = match readdir {
Ok(readdir) => readdir,
Err(err) => {
return self.visitor.visit(Err(err));
}
};
if self.max_depth.map_or(false, |max| depth >= max) {
return WalkState::Skip;
}
for result in readdir {
let state = self.generate_work(
&work.ignore,
depth + 1,
work.root_device,
result,
);
if state.is_quit() {
return state;
}
}
WalkState::Continue
}
/// Decides whether to submit the given directory entry as a file to
/// search.
///
/// If the entry is a path that should be ignored, then this is a no-op.
/// Otherwise, the entry is pushed on to the queue. (The actual execution
/// of the callback happens in `run`.)
/// of the callback happens in `run_one`.)
///
/// If an error occurs while reading the entry, then it is sent to the
/// caller's callback.
@ -1482,7 +1485,7 @@ impl<'s> Worker<'s> {
/// `ig` is the `Ignore` matcher for the parent directory. `depth` should
/// be the depth of this entry. `result` should be the item yielded by
/// a directory iterator.
fn run_one(
fn generate_work(
&mut self,
ig: &Ignore,
depth: usize,
@ -1540,13 +1543,7 @@ impl<'s> Worker<'s> {
};
if !should_skip_path && !should_skip_filesize {
self.tx
.send(Message::Work(Work {
dent: dent,
ignore: ig.clone(),
root_device: root_device,
}))
.unwrap();
self.send(Work { dent, ignore: ig.clone(), root_device });
}
WalkState::Continue
}
@ -1571,15 +1568,19 @@ impl<'s> Worker<'s> {
// Repeat quit message to wake up sleeping threads, if
// any. The domino effect will ensure that every thread
// will quit.
self.waiting();
self.tx.send(Message::Quit).unwrap();
return None;
}
Err(TryRecvError::Empty) => {
// If it was the last running thread, then no more work can
// arrive, thus we can safely start quitting. Otherwise, a
// thread may spawn new work to be done.
if self.waiting() == 1 {
// Once num_pending reaches 0, it is impossible for it to
// ever increase again. Namely, it only reaches 0 once
// all jobs have run such that no jobs have produced more
// work. We have this guarantee because num_pending is
// always incremented before each job is submitted and only
// decremented once each job is completely finished.
// Therefore, if this reaches zero, then there can be no
// other job running.
if self.num_pending() == 0 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
self.tx.send(Message::Quit).unwrap();
@ -1590,7 +1591,6 @@ impl<'s> Worker<'s> {
.rx
.recv()
.expect("channel disconnected while worker is alive"));
self.resume();
}
Err(TryRecvError::Disconnected) => {
unreachable!("channel disconnected while worker is alive");
@ -1609,15 +1609,20 @@ impl<'s> Worker<'s> {
self.quit_now.load(Ordering::SeqCst)
}
/// Sets this worker's "running" state to false. Returns the previous
/// number of running workers.
fn waiting(&self) -> usize {
self.num_running.fetch_sub(1, Ordering::SeqCst)
/// Returns the number of pending jobs.
fn num_pending(&self) -> usize {
self.num_pending.load(Ordering::SeqCst)
}
/// Sets this worker's "running" state to true.
fn resume(&self) {
self.num_running.fetch_add(1, Ordering::SeqCst);
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, Ordering::SeqCst);
self.tx.send(Message::Work(work)).unwrap();
}
/// Signal that work has been received.
fn work_done(&self) {
self.num_pending.fetch_sub(1, Ordering::SeqCst);
}
}