mirror of
				https://github.com/facebook/zstd.git
				synced 2025-10-31 16:47:48 +02:00 
			
		
		
		
	Fix memory usage issues.
This commit is contained in:
		| @@ -63,7 +63,9 @@ googletest: | ||||
| 	@cd googletest/build && cmake .. && make | ||||
|  | ||||
| test: libzstd.a Pzstd.o Options.o SkippableFrame.o | ||||
| 	$(MAKE) -C utils/test clean | ||||
| 	$(MAKE) -C utils/test test | ||||
| 	$(MAKE) -C test clean | ||||
| 	$(MAKE) -C test test | ||||
|  | ||||
| clean: | ||||
|   | ||||
| @@ -67,11 +67,14 @@ size_t pzstdMain(const Options& options, ErrorHolder& errorHolder) { | ||||
|  | ||||
|   // WorkQueue outlives ThreadPool so in the case of error we are certain | ||||
|   // we don't accidently try to call push() on it after it is destroyed. | ||||
|   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs; | ||||
|   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{2 * options.numThreads}; | ||||
|   size_t bytesWritten; | ||||
|   { | ||||
|     // Initialize the thread pool with numThreads | ||||
|     ThreadPool executor(options.numThreads); | ||||
|     // Initialize the thread pool with numThreads + 1 | ||||
|     // We add one because the read thread spends most of its time waiting. | ||||
|     // This also sets the minimum number of threads to 2, so the algorithm | ||||
|     // doesn't deadlock. | ||||
|     ThreadPool executor(options.numThreads + 1); | ||||
|     if (!options.decompress) { | ||||
|       // Add a job that reads the input and starts all the compression jobs | ||||
|       executor.add( | ||||
| @@ -229,6 +232,15 @@ calculateStep(size_t size, size_t numThreads, const ZSTD_parameters& params) { | ||||
|  | ||||
| namespace { | ||||
| enum class FileStatus { Continue, Done, Error }; | ||||
| /// Determines the status of the file descriptor `fd`. | ||||
| FileStatus fileStatus(FILE* fd) { | ||||
|   if (std::feof(fd)) { | ||||
|     return FileStatus::Done; | ||||
|   } else if (std::ferror(fd)) { | ||||
|     return FileStatus::Error; | ||||
|   } | ||||
|   return FileStatus::Continue; | ||||
| } | ||||
| } // anonymous namespace | ||||
|  | ||||
| /** | ||||
| @@ -243,10 +255,9 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) { | ||||
|     auto bytesRead = | ||||
|         std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd); | ||||
|     queue.push(buffer.splitAt(bytesRead)); | ||||
|     if (std::feof(fd)) { | ||||
|       return FileStatus::Done; | ||||
|     } else if (std::ferror(fd) || bytesRead == 0) { | ||||
|       return FileStatus::Error; | ||||
|     auto status = fileStatus(fd); | ||||
|     if (status != FileStatus::Continue) { | ||||
|       return status; | ||||
|     } | ||||
|   } | ||||
|   return FileStatus::Continue; | ||||
| @@ -388,6 +399,7 @@ void asyncDecompressFrames( | ||||
|       // frameSize is 0 if the frame info can't be decoded. | ||||
|       Buffer buffer(SkippableFrame::kSize); | ||||
|       auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd); | ||||
|       status = fileStatus(fd); | ||||
|       if (bytesRead == 0 && status != FileStatus::Continue) { | ||||
|         break; | ||||
|       } | ||||
| @@ -395,6 +407,12 @@ void asyncDecompressFrames( | ||||
|       frameSize = SkippableFrame::tryRead(buffer.range()); | ||||
|       in->push(std::move(buffer)); | ||||
|     } | ||||
|     if (frameSize == 0) { | ||||
|       // We hit a non SkippableFrame, so this will be the last job. | ||||
|       // Make sure that we don't use too much memory | ||||
|       in->setMaxSize(64); | ||||
|       out->setMaxSize(64); | ||||
|     } | ||||
|     // Start decompression in the thread pool | ||||
|     executor.add([&errorHolder, in, out] { | ||||
|       return decompress(errorHolder, std::move(in), std::move(out)); | ||||
|   | ||||
| @@ -99,6 +99,19 @@ class WorkQueue { | ||||
|     return true; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Sets the maximum queue size.  If `maxSize == 0` then it is unbounded. | ||||
|    * | ||||
|    * @param maxSize The new maximum queue size. | ||||
|    */ | ||||
|   void setMaxSize(std::size_t maxSize) { | ||||
|     { | ||||
|       std::lock_guard<std::mutex> lock(mutex_); | ||||
|       maxSize_ = maxSize; | ||||
|     } | ||||
|     writerCv_.notify_all(); | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Promise that `push()` won't be called again, so once the queue is empty | ||||
|    * there will never any more work. | ||||
| @@ -149,6 +162,10 @@ class BufferWorkQueue { | ||||
|     return result; | ||||
|   } | ||||
|  | ||||
|   void setMaxSize(std::size_t maxSize) { | ||||
|     queue_.setMaxSize(maxSize); | ||||
|   } | ||||
|  | ||||
|   void finish() { | ||||
|     queue_.finish(); | ||||
|   } | ||||
|   | ||||
| @@ -175,6 +175,27 @@ TEST(WorkQueue, BoundedSizePushAfterFinish) { | ||||
|   pusher.join(); | ||||
| } | ||||
|  | ||||
| TEST(WorkQueue, SetMaxSize) { | ||||
|   WorkQueue<int> queue(2); | ||||
|   int result; | ||||
|   queue.push(5); | ||||
|   queue.push(6); | ||||
|   queue.setMaxSize(1); | ||||
|   std::thread pusher([&queue] { | ||||
|     queue.push(7); | ||||
|   }); | ||||
|   // Dirtily try and make sure that pusher has run. | ||||
|   std::this_thread::sleep_for(std::chrono::seconds(1)); | ||||
|   queue.finish(); | ||||
|   EXPECT_TRUE(queue.pop(result)); | ||||
|   EXPECT_EQ(5, result); | ||||
|   EXPECT_TRUE(queue.pop(result)); | ||||
|   EXPECT_EQ(6, result); | ||||
|   EXPECT_FALSE(queue.pop(result)); | ||||
|  | ||||
|   pusher.join(); | ||||
| } | ||||
|  | ||||
| TEST(WorkQueue, BoundedSizeMPMC) { | ||||
|   WorkQueue<int> queue(100); | ||||
|   std::vector<int> results(10000, -1); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user