1
0
mirror of https://github.com/facebook/zstd.git synced 2025-07-15 19:54:20 +02:00

[pzstd] Fix lantent bug in WorkQueue::push()

This commit is contained in:
Nick Terrell
2016-10-13 12:03:02 -07:00
parent baa152e56e
commit 8c6c686d0a
3 changed files with 21 additions and 7 deletions

View File

@ -27,7 +27,7 @@ class ThreadPool {
explicit ThreadPool(std::size_t numThreads) { explicit ThreadPool(std::size_t numThreads) {
threads_.reserve(numThreads); threads_.reserve(numThreads);
for (std::size_t i = 0; i < numThreads; ++i) { for (std::size_t i = 0; i < numThreads; ++i) {
threads_.emplace_back([&] { threads_.emplace_back([this] {
std::function<void()> task; std::function<void()> task;
while (tasks_.pop(task)) { while (tasks_.pop(task)) {
task(); task();

View File

@ -54,12 +54,13 @@ class WorkQueue {
/** /**
* Push an item onto the work queue. Notify a single thread that work is * Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false. * available. If `finish()` has been called, do nothing and return false.
* If `push()` returns false, then `item` has not been moved from.
* *
* @param item Item to push onto the queue. * @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An * @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true. * item was pushed iff `push()` returns true.
*/ */
bool push(T item) { bool push(T&& item) {
{ {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) { while (full() && !done_) {

View File

@ -10,6 +10,7 @@
#include "utils/WorkQueue.h" #include "utils/WorkQueue.h"
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <memory>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include <vector> #include <vector>
@ -64,7 +65,7 @@ TEST(WorkQueue, SPSC) {
const int max = 100; const int max = 100;
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
queue.push(i); queue.push(int{i});
} }
std::thread thread([ &queue, max ] { std::thread thread([ &queue, max ] {
@ -80,7 +81,7 @@ TEST(WorkQueue, SPSC) {
std::this_thread::yield(); std::this_thread::yield();
for (int i = 10; i < max; ++i) { for (int i = 10; i < max; ++i) {
queue.push(i); queue.push(int{i});
} }
queue.finish(); queue.finish();
@ -97,7 +98,7 @@ TEST(WorkQueue, SPMC) {
} }
for (int i = 0; i < 50; ++i) { for (int i = 0; i < 50; ++i) {
queue.push(i); queue.push(int{i});
} }
queue.finish(); queue.finish();
@ -126,7 +127,7 @@ TEST(WorkQueue, MPMC) {
pusherThreads.emplace_back( pusherThreads.emplace_back(
[ &queue, min, max ] { [ &queue, min, max ] {
for (int i = min; i < max; ++i) { for (int i = min; i < max; ++i) {
queue.push(i); queue.push(int{i});
} }
}); });
} }
@ -212,7 +213,7 @@ TEST(WorkQueue, BoundedSizeMPMC) {
pusherThreads.emplace_back( pusherThreads.emplace_back(
[ &queue, min, max ] { [ &queue, min, max ] {
for (int i = min; i < max; ++i) { for (int i = min; i < max; ++i) {
queue.push(i); queue.push(int{i});
} }
}); });
} }
@ -231,6 +232,18 @@ TEST(WorkQueue, BoundedSizeMPMC) {
} }
} }
TEST(WorkQueue, FailedPush) {
WorkQueue<std::unique_ptr<int>> queue;
std::unique_ptr<int> x(new int{5});
EXPECT_TRUE(queue.push(std::move(x)));
EXPECT_EQ(nullptr, x);
queue.finish();
x.reset(new int{6});
EXPECT_FALSE(queue.push(std::move(x)));
EXPECT_NE(nullptr, x);
EXPECT_EQ(6, *x);
}
TEST(BufferWorkQueue, SizeCalculatedCorrectly) { TEST(BufferWorkQueue, SizeCalculatedCorrectly) {
{ {
BufferWorkQueue queue; BufferWorkQueue queue;