mirror of
https://github.com/BurntSushi/ripgrep.git
synced 2025-03-17 20:28:03 +02:00
ignore: use work-stealing stack instead of Arc<Mutex<Vec<_>>>
This represents yet another iteration on how `ignore` enqueues and distributes work in parallel. The original implementation used a multi-producer/multi-consumer thread safe queue from crossbeam. At some point, I migrated to a simple `Arc<Mutex<Vec<_>>>` and treated it as a stack so that we did depth first traversal. This helped with memory usage in very wide directories. But it turns out that a naive stack-behind-a-mutex can be quite a bit slower than something that's a little smarter, such as a work-stealing stack used in this commit. My hypothesis for why this helps is that without the stealing component, work distribution can get stuck in sub-optimal configurations that depend on which directory entries get assigned to a particular worker. It's likely that this can result in some workers getting "more" work than others, just by chance, and thus remain idle. But the work-stealing approach heads that off. This does re-introduce a dependency on parts of crossbeam which is kind of a bummer, but it's carrying its weight for now. Closes #1823, Closes #2591 Ref https://github.com/sharkdp/fd/issues/28
This commit is contained in:
parent
cad1f5fae2
commit
d938e955af
@ -8,6 +8,11 @@ Unreleased changes. Release notes have not yet been written.
|
||||
`rg -B1 -A2`. That is, `-A` and `-B` no longer completely override `-C`.
|
||||
Instead, they only partially override `-C`.
|
||||
|
||||
Performance improvements:
|
||||
|
||||
* [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591):
|
||||
Parallel directory traversal now uses work stealing for faster searches.
|
||||
|
||||
Feature enhancements:
|
||||
|
||||
* Added or improved file type filtering for Ada, DITA, Elixir, Fuchsia, Gentoo,
|
||||
|
40
Cargo.lock
generated
40
Cargo.lock
generated
@ -78,6 +78,30 @@ dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-deque"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
"memoffset",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.16"
|
||||
@ -224,6 +248,7 @@ name = "ignore"
|
||||
version = "0.4.20"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-deque",
|
||||
"globset",
|
||||
"lazy_static",
|
||||
"log",
|
||||
@ -309,6 +334,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.16"
|
||||
@ -444,6 +478,12 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.188"
|
||||
|
@ -19,6 +19,7 @@ name = "ignore"
|
||||
bench = false
|
||||
|
||||
[dependencies]
|
||||
crossbeam-deque = "0.8.3"
|
||||
globset = { version = "0.4.10", path = "../globset" }
|
||||
lazy_static = "1.1"
|
||||
log = "0.4.5"
|
||||
|
@ -3,14 +3,15 @@ use std::ffi::OsStr;
|
||||
use std::fmt;
|
||||
use std::fs::{self, FileType, Metadata};
|
||||
use std::io;
|
||||
use std::iter::FusedIterator;
|
||||
use std::iter::{self, FusedIterator};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::vec;
|
||||
|
||||
use crossbeam_deque::{Stealer, Worker as Deque};
|
||||
use same_file::Handle;
|
||||
use walkdir::{self, WalkDir};
|
||||
|
||||
@ -1231,9 +1232,8 @@ impl WalkParallel {
|
||||
/// can be merged together into a single data structure.
|
||||
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder<'_>) {
|
||||
let threads = self.threads();
|
||||
let stack = Arc::new(Mutex::new(vec![]));
|
||||
let mut stack = vec![];
|
||||
{
|
||||
let mut stack = stack.lock().unwrap();
|
||||
let mut visitor = builder.build();
|
||||
let mut paths = Vec::new().into_iter();
|
||||
std::mem::swap(&mut paths, &mut self.paths);
|
||||
@ -1283,14 +1283,14 @@ impl WalkParallel {
|
||||
}
|
||||
// Create the workers and then wait for them to finish.
|
||||
let quit_now = Arc::new(AtomicBool::new(false));
|
||||
let num_pending =
|
||||
Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
|
||||
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
|
||||
let stacks = Stack::new_for_each_thread(threads, stack);
|
||||
std::thread::scope(|s| {
|
||||
let mut handles = vec![];
|
||||
for _ in 0..threads {
|
||||
let worker = Worker {
|
||||
let handles: Vec<_> = stacks
|
||||
.into_iter()
|
||||
.map(|stack| Worker {
|
||||
visitor: builder.build(),
|
||||
stack: stack.clone(),
|
||||
stack,
|
||||
quit_now: quit_now.clone(),
|
||||
num_pending: num_pending.clone(),
|
||||
max_depth: self.max_depth,
|
||||
@ -1298,9 +1298,9 @@ impl WalkParallel {
|
||||
follow_links: self.follow_links,
|
||||
skip: self.skip.clone(),
|
||||
filter: self.filter.clone(),
|
||||
};
|
||||
handles.push(s.spawn(|| worker.run()));
|
||||
}
|
||||
})
|
||||
.map(|worker| s.spawn(|| worker.run()))
|
||||
.collect();
|
||||
for handle in handles {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
@ -1390,6 +1390,73 @@ impl Work {
|
||||
}
|
||||
}
|
||||
|
||||
/// A work-stealing stack.
|
||||
#[derive(Debug)]
|
||||
struct Stack {
|
||||
/// This thread's index.
|
||||
index: usize,
|
||||
/// The thread-local stack.
|
||||
deque: Deque<Message>,
|
||||
/// The work stealers.
|
||||
stealers: Arc<[Stealer<Message>]>,
|
||||
}
|
||||
|
||||
impl Stack {
|
||||
/// Create a work-stealing stack for each thread. The given messages
|
||||
/// correspond to the initial paths to start the search at. They will
|
||||
/// be distributed automatically to each stack in a round-robin fashion.
|
||||
fn new_for_each_thread(threads: usize, init: Vec<Message>) -> Vec<Stack> {
|
||||
// Using new_lifo() ensures each worker operates depth-first, not
|
||||
// breadth-first. We do depth-first because a breadth first traversal
|
||||
// on wide directories with a lot of gitignores is disastrous (for
|
||||
// example, searching a directory tree containing all of crates.io).
|
||||
let deques: Vec<Deque<Message>> =
|
||||
iter::repeat_with(Deque::new_lifo).take(threads).collect();
|
||||
let stealers = Arc::<[Stealer<Message>]>::from(
|
||||
deques.iter().map(Deque::stealer).collect::<Vec<_>>(),
|
||||
);
|
||||
let stacks: Vec<Stack> = deques
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(index, deque)| Stack {
|
||||
index,
|
||||
deque,
|
||||
stealers: stealers.clone(),
|
||||
})
|
||||
.collect();
|
||||
// Distribute the initial messages.
|
||||
init.into_iter()
|
||||
.zip(stacks.iter().cycle())
|
||||
.for_each(|(m, s)| s.push(m));
|
||||
stacks
|
||||
}
|
||||
|
||||
/// Push a message.
|
||||
fn push(&self, msg: Message) {
|
||||
self.deque.push(msg);
|
||||
}
|
||||
|
||||
/// Pop a message.
|
||||
fn pop(&self) -> Option<Message> {
|
||||
self.deque.pop().or_else(|| self.steal())
|
||||
}
|
||||
|
||||
/// Steal a message from another queue.
|
||||
fn steal(&self) -> Option<Message> {
|
||||
// For fairness, try to steal from index - 1, then index - 2, ... 0,
|
||||
// then wrap around to len - 1, len - 2, ... index + 1.
|
||||
let (left, right) = self.stealers.split_at(self.index);
|
||||
// Don't steal from ourselves
|
||||
let right = &right[1..];
|
||||
|
||||
left.iter()
|
||||
.rev()
|
||||
.chain(right.iter().rev())
|
||||
.map(|s| s.steal_batch_and_pop(&self.deque))
|
||||
.find_map(|s| s.success())
|
||||
}
|
||||
}
|
||||
|
||||
/// A worker is responsible for descending into directories, updating the
|
||||
/// ignore matchers, producing new work and invoking the caller's callback.
|
||||
///
|
||||
@ -1397,13 +1464,13 @@ impl Work {
|
||||
struct Worker<'s> {
|
||||
/// The caller's callback.
|
||||
visitor: Box<dyn ParallelVisitor + 's>,
|
||||
/// A stack of work to do.
|
||||
/// A work-stealing stack of work to do.
|
||||
///
|
||||
/// We use a stack instead of a channel because a stack lets us visit
|
||||
/// directories in depth first order. This can substantially reduce peak
|
||||
/// memory usage by keeping both the number of files path and gitignore
|
||||
/// memory usage by keeping both the number of file paths and gitignore
|
||||
/// matchers in memory lower.
|
||||
stack: Arc<Mutex<Vec<Message>>>,
|
||||
stack: Stack,
|
||||
/// Whether all workers should terminate at the next opportunity. Note
|
||||
/// that we need this because we don't want other `Work` to be done after
|
||||
/// we quit. We wouldn't need this if have a priority channel.
|
||||
@ -1668,20 +1735,17 @@ impl<'s> Worker<'s> {
|
||||
/// Send work.
|
||||
fn send(&self, work: Work) {
|
||||
self.num_pending.fetch_add(1, Ordering::SeqCst);
|
||||
let mut stack = self.stack.lock().unwrap();
|
||||
stack.push(Message::Work(work));
|
||||
self.stack.push(Message::Work(work));
|
||||
}
|
||||
|
||||
/// Send a quit message.
|
||||
fn send_quit(&self) {
|
||||
let mut stack = self.stack.lock().unwrap();
|
||||
stack.push(Message::Quit);
|
||||
self.stack.push(Message::Quit);
|
||||
}
|
||||
|
||||
/// Receive work.
|
||||
fn recv(&self) -> Option<Message> {
|
||||
let mut stack = self.stack.lock().unwrap();
|
||||
stack.pop()
|
||||
self.stack.pop()
|
||||
}
|
||||
|
||||
/// Signal that work has been finished.
|
||||
|
Loading…
x
Reference in New Issue
Block a user