1
0
mirror of https://github.com/facebook/zstd.git synced 2025-03-07 01:10:04 +02:00

Add optional max size to work queue

This commit is contained in:
Nick Terrell 2016-09-02 13:53:23 -07:00
parent 2fcf8a4b99
commit 64c1c065cc
4 changed files with 108 additions and 20 deletions

View File

@ -70,5 +70,5 @@ clean:
$(MAKE) -C $(ZSTDDIR) clean
$(MAKE) -C utils/test clean
$(MAKE) -C test clean
@$(RM) -rf googletest/ libzstd.a *.o pzstd$(EXT)
@$(RM) -rf libzstd.a *.o pzstd$(EXT)
@echo Cleaning completed

View File

@ -12,6 +12,7 @@
#include <atomic>
#include <cassert>
#include <cstddef>
#include <condition_variable>
#include <cstddef>
#include <functional>
@ -25,14 +26,29 @@ template <typename T>
class WorkQueue {
// Protects all member variable access
std::mutex mutex_;
std::condition_variable cv_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::queue<T> queue_;
bool done_;
std::size_t maxSize_;
// Must have lock to call this function
bool full() const {
if (maxSize_ == 0) {
return false;
}
return queue_.size() >= maxSize_;
}
public:
/// Constructs an empty work queue.
WorkQueue() : done_(false) {}
/**
* Constructs an empty work queue with an optional max size.
* If `maxSize == 0` the queue size is unbounded.
*
* @param maxSize The maximum allowed size of the work queue.
*/
WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
/**
* Push an item onto the work queue. Notify a single thread that work is
@ -44,13 +60,16 @@ class WorkQueue {
*/
bool push(T item) {
{
std::lock_guard<std::mutex> lock(mutex_);
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
writerCv_.wait(lock);
}
if (done_) {
return false;
}
queue_.push(std::move(item));
}
cv_.notify_one();
readerCv_.notify_one();
return true;
}
@ -64,16 +83,19 @@ class WorkQueue {
* `finish()` has been called.
*/
bool pop(T& item) {
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !done_) {
cv_.wait(lock);
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !done_) {
readerCv_.wait(lock);
}
if (queue_.empty()) {
assert(done_);
return false;
}
item = std::move(queue_.front());
queue_.pop();
}
if (queue_.empty()) {
assert(done_);
return false;
}
item = std::move(queue_.front());
queue_.pop();
writerCv_.notify_one();
return true;
}
@ -87,18 +109,19 @@ class WorkQueue {
assert(!done_);
done_ = true;
}
cv_.notify_all();
readerCv_.notify_all();
writerCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
cv_.wait(lock);
readerCv_.wait(lock);
// If we were woken by a push, we need to wake a thread waiting on pop().
if (!done_) {
lock.unlock();
cv_.notify_one();
readerCv_.notify_one();
lock.lock();
}
}
@ -111,7 +134,7 @@ class BufferWorkQueue {
std::atomic<std::size_t> size_;
public:
BufferWorkQueue() : size_(0) {}
BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
void push(Buffer buffer) {
size_.fetch_add(buffer.size());

View File

@ -23,7 +23,7 @@ GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest
CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB)
CXXFLAGS ?= -O3
CXXFLAGS += -std=c++11
CFLAGS += $(MOREFLAGS)
CXXFLAGS += $(MOREFLAGS)
FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS)
%: %.cpp

View File

@ -145,6 +145,71 @@ TEST(WorkQueue, MPMC) {
}
}
TEST(WorkQueue, BoundedSizeWorks) {
WorkQueue<int> queue(1);
int result;
queue.push(5);
queue.pop(result);
queue.push(5);
queue.pop(result);
queue.push(5);
queue.finish();
queue.pop(result);
EXPECT_EQ(5, result);
}
TEST(WorkQueue, BoundedSizePushAfterFinish) {
WorkQueue<int> queue(1);
int result;
queue.push(5);
std::thread pusher([&queue] {
queue.push(6);
});
// Dirtily try and make sure that pusher has run.
std::this_thread::sleep_for(std::chrono::seconds(1));
queue.finish();
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(5, result);
EXPECT_FALSE(queue.pop(result));
pusher.join();
}
TEST(WorkQueue, BoundedSizeMPMC) {
WorkQueue<int> queue(100);
std::vector<int> results(10000, -1);
std::mutex mutex;
std::vector<std::thread> popperThreads;
for (int i = 0; i < 10; ++i) {
popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
}
std::vector<std::thread> pusherThreads;
for (int i = 0; i < 100; ++i) {
auto min = i * 100;
auto max = (i + 1) * 100;
pusherThreads.emplace_back(
[ &queue, min, max ] {
for (int i = min; i < max; ++i) {
queue.push(i);
}
});
}
for (auto& thread : pusherThreads) {
thread.join();
}
queue.finish();
for (auto& thread : popperThreads) {
thread.join();
}
for (int i = 0; i < 10000; ++i) {
EXPECT_EQ(i, results[i]);
}
}
TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{
BufferWorkQueue queue;