From 5fa71396e0f019467f5852a3f7391d8306d576f0 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 6 Oct 2016 12:45:47 -0700 Subject: [PATCH 01/13] [pzstd] Fix typo in readme --- contrib/pzstd/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/pzstd/README.md b/contrib/pzstd/README.md index 05ceb5599..3fe7b0b9d 100644 --- a/contrib/pzstd/README.md +++ b/contrib/pzstd/README.md @@ -10,7 +10,7 @@ When decompressing files compressed with Zstandard, PZstandard does IO in one th ## Usage -PZstandard supports the same command line interface as Zstandard, but also provies the `-p` option to specify the number of threads. +PZstandard supports the same command line interface as Zstandard, but also provides the `-p` option to specify the number of threads. Dictionary mode is not currently supported. Basic usage From b0f6d73002eff829e60c8344ac7bb2bac06564cb Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 6 Oct 2016 12:58:23 -0700 Subject: [PATCH 02/13] [pzstd] Remove empty else statement --- contrib/pzstd/Pzstd.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index e0826b9d8..59a2496c4 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -183,8 +183,6 @@ int pzstdMain(const Options &options) { std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(), errorHolder.getError().c_str()); } - } else { - } }); // Open the input file From 87629978d30fb262ba55214e6a92cc7608fe0062 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 6 Oct 2016 15:51:58 -0700 Subject: [PATCH 03/13] [pzstd] Fix latent bug in WorkQueue --- contrib/pzstd/utils/WorkQueue.h | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h index 538213500..c46e6cbcf 100644 --- a/contrib/pzstd/utils/WorkQueue.h +++ b/contrib/pzstd/utils/WorkQueue.h @@ -28,6 +28,7 @@ class WorkQueue { std::mutex mutex_; std::condition_variable readerCv_; std::condition_variable writerCv_; + std::condition_variable finishCv_; std::queue queue_; bool done_; @@ -124,19 +125,14 @@ class WorkQueue { } readerCv_.notify_all(); writerCv_.notify_all(); + finishCv_.notify_all(); } /// Blocks until `finish()` has been called (but the queue may not be empty). void waitUntilFinished() { std::unique_lock lock(mutex_); while (!done_) { - readerCv_.wait(lock); - // If we were woken by a push, we need to wake a thread waiting on pop(). - if (!done_) { - lock.unlock(); - readerCv_.notify_one(); - lock.lock(); - } + finishCv_.wait(lock); } } }; From 8b4e84249b1f6eba4ace7de3335cc2fdd34cd651 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 6 Oct 2016 18:17:08 -0700 Subject: [PATCH 04/13] [pzstd] Fix Makefile --- .gitignore | 1 + .travis.yml | 4 +- appveyor.yml | 5 +- contrib/pzstd/Makefile | 280 ++++++++++++++++++++++-------- contrib/pzstd/test/Makefile | 48 ----- contrib/pzstd/utils/test/Makefile | 42 ----- 6 files changed, 209 insertions(+), 171 deletions(-) delete mode 100644 contrib/pzstd/test/Makefile delete mode 100644 contrib/pzstd/utils/test/Makefile diff --git a/.gitignore b/.gitignore index c939f1228..220a1e0eb 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,4 @@ _zstdbench/ *.swp .DS_Store googletest/ +*.d diff --git a/.travis.yml b/.travis.yml index 972c1b4ba..f328f480d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,7 +22,7 @@ matrix: packages: - gcc-4.8 - g++-4.8 - env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd pzstd && make -C contrib/pzstd googletest && make -C contrib/pzstd test && make -C contrib/pzstd clean" + env: PLATFORM="Ubuntu 12.04 container" CMD="make zlibwrapper && make clean && make -C tests test-zstd_nolegacy && make clean && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest && make -C contrib/pzstd all && make -C contrib/pzstd check && make -C contrib/pzstd clean" - os: linux sudo: false env: PLATFORM="Ubuntu 12.04 container" CMD="make usan" @@ -69,7 +69,7 @@ matrix: sudo: required install: - export CXX="g++-4.8" CC="gcc-4.8" - env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd pzstd32 && make -C contrib/pzstd googletest32 && make -C contrib/pzstd test32 && make -C contrib/pzstd clean" + env: PLATFORM="Ubuntu 14.04" CMD="make gpptest && make clean && make gnu90test && make clean && make c99test && make clean && make gnu99test && make clean && make clangtest && make clean && make -C contrib/pzstd googletest32 && make -C contrib/pzstd all32 && make -C contrib/pzstd check && make -C contrib/pzstd clean" addons: apt: packages: diff --git a/appveyor.yml b/appveyor.yml index 6345c7b39..fbdc30c40 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -50,10 +50,9 @@ build_script: ECHO *** && ECHO *** Building pzstd for %PLATFORM% && ECHO *** && - ECHO make -C contrib\pzstd pzstd && - make -C contrib\pzstd pzstd && make -C contrib\pzstd googletest-mingw64 && - make -C contrib\pzstd test && + make -C contrib\pzstd all && + make -C contrib\pzstd check && make -C contrib\pzstd clean ) - if [%COMPILER%]==[gcc] ( diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile index e30be0bed..b998c1fff 100644 --- a/contrib/pzstd/Makefile +++ b/contrib/pzstd/Makefile @@ -7,20 +7,69 @@ # of patent rights can be found in the PATENTS file in the same directory. # ########################################################################## +# Standard variables for installation +DESTDIR ?= +PREFIX ?= /usr/local +BINDIR := $(DESTDIR)$(PREFIX)/bin + ZSTDDIR = ../../lib PROGDIR = ../../programs -CPPFLAGS = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I. -CXXFLAGS ?= -O3 -CXXFLAGS += -std=c++11 -CXXFLAGS += $(MOREFLAGS) -FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS) +# External program to use to run tests, e.g. qemu or valgrind +TESTPROG ?= +# Flags to pass to the tests +TESTFLAGS ?= + +# We use gcc/clang to generate the header dependencies of files +DEPFLAGS = -MMD -MP -MF $*.Td +POSTCOMPILE = mv -f $*.Td $*.d + +# CFLAGS, CXXFLAGS, and LDFLAGS are for the users to override +CFLAGS ?= -O3 -Wall -Wextra +CXXFLAGS ?= -O3 -Wall -Wextra -pedantic -std=c++11 +LDFLAGS ?= + +# Googletest default flags +PZSTD_INC = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I. +GTEST_INC = -isystem googletest/googletest/include + +PZSTD_CCXXFLAGS = $(PZSTD_INC) $(GTEST_INC) +PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS) +PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS) +PZSTD_LDFLAGS = +EXTRA_FLAGS = +ALL_CFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS) $(PZSTD_CFLAGS) +ALL_CXXFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CXXFLAGS) $(PZSTD_CXXFLAGS) +ALL_LDFLAGS = $(EXTRA_FLAGS) $(LDFLAGS) $(PZSTD_LDFLAGS) -ZSTDCOMMON_FILES := $(ZSTDDIR)/common/*.c -ZSTDCOMP_FILES := $(ZSTDDIR)/compress/zstd_compress.c $(ZSTDDIR)/compress/fse_compress.c $(ZSTDDIR)/compress/huf_compress.c -ZSTDDECOMP_FILES := $(ZSTDDIR)/decompress/huf_decompress.c -ZSTD_FILES := $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) +# gtest libraries need to go before "-lpthread" because they depend on it. +GTEST_LIB = -L googletest/build/googlemock/gtest +LIBS = $(GTEST_LIB) -lpthread + +# Compilation commands +LD_COMMAND = $(CXX) $^ $(ALL_LDFLAGS) $(LIBS) -o $@ +CC_COMMAND = $(CC) $(DEPFLAGS) $(ALL_CFLAGS) -c $< -o $@ +CXX_COMMAND = $(CXX) $(DEPFLAGS) $(ALL_CXXFLAGS) -c $< -o $@ + +# Get a list of all zstd files so we rebuild the static library when we need to +ZSTDCOMMON_FILES := $(wildcard $(ZSTDDIR)/common/*.c) \ + $(wildcard $(ZSTDDIR)/common/*.h) +ZSTDCOMP_FILES := $(wildcard $(ZSTDDIR)/compress/*.c) \ + $(wildcard $(ZSTDDIR)/compress/*.h) +ZSTDDECOMP_FILES := $(wildcard $(ZSTDDIR)/decompress/*.c) \ + $(wildcard $(ZSTDDIR)/decompress/*.h) +ZSTDPROG_FILES := $(wildcard $(PROGDIR)/*.c) \ + $(wildcard $(PROGDIR)/*.h) +ZSTD_FILES := $(wildcard $(ZSTDDIR)/*.h) \ + $(ZSTDDECOMP_FILES) $(ZSTDCOMMON_FILES) $(ZSTDCOMP_FILES) \ + $(ZSTDPROG_FILES) + +# List all the pzstd source files so we can determine their dependencies +PZSTD_SRCS := $(wildcard *.cpp) +PZSTD_TESTS := $(wildcard test/*.cpp) +UTILS_TESTS := $(wildcard utils/test/*.cpp) +ALL_SRCS := $(PZSTD_SRCS) $(PZSTD_TESTS) $(UTILS_TESTS) # Define *.exe as extension for Windows systems @@ -30,89 +79,168 @@ else EXT = endif -.PHONY: default all test clean test32 googletest googletest32 +# Standard targets +.PHONY: default +default: all -default: pzstd +.PHONY: check +check: + $(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./test/OptionsTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./test/PzstdTest$(EXT) $(TESTFLAGS) -all: pzstd +.PHONY: install +install: PZSTD_CPPFLAGS += -DNDEBUG +install: pzstd$(EXT) + install -d -m 755 $(BINDIR)/ + install -m 755 pzstd$(EXT) $(BINDIR)/pzstd$(EXT) + +.PHONY: uninstall +uninstall: + $(RM) $(BINDIR)/pzstd$(EXT) + +# Targets for many different builds +.PHONY: all +all: PZSTD_CPPFLAGS += -DNDEBUG +all: pzstd$(EXT) tests roundtrip + +.PHONY: debug +debug: EXTRA_FLAGS += -g +debug: pzstd$(EXT) tests roundtrip + +.PHONY: tsan +tsan: PZSTD_CCXXFLAGS += -fsanitize=thread -fPIC +tsan: PZSTD_LDFLAGS += -fsanitize=thread -pie +tsan: debug + +.PHONY: asan +asan: EXTRA_FLAGS += -fsanitize=address +asan: debug + +.PHONY: ubsan +ubsan: EXTRA_FLAGS += -fsanitize=undefined +ubsan: debug + +.PHONY: all32 +all32: EXTRA_FLAGS += -m32 +all32: all + +.PHONY: debug32 +debug32: EXTRA_FLAGS += -m32 +debug32: debug + +.PHONY: asan32 +asan32: EXTRA_FLAGS += -m32 +asan32: asan + +.PHONY: tsan32 +tsan32: EXTRA_FLAGS += -m32 +tsan32: tsan + +.PHONY: ubsan32 +ubsan32: EXTRA_FLAGS += -m32 +ubsan32: ubsan + +# Run long round trip tests +.PHONY: roundtripcheck +roundtripcheck: roundtrip check + $(TESTPROG) ./test/RoundTripTest$(EXT) $(TESTFLAGS) + +# Build the main binary +pzstd$(EXT): main.o Options.o Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a + $(LD_COMMAND) + +# Target that depends on all the tests +.PHONY: tests +tests: EXTRA_FLAGS += -Wno-deprecated-declarations +tests: $(patsubst %,%$(EXT),$(basename $(PZSTD_TESTS) $(UTILS_TESTS))) + +# Build the round trip tests +.PHONY: roundtrip +roundtrip: EXTRA_FLAGS += -Wno-deprecated-declarations +roundtrip: test/RoundTripTest$(EXT) + +# Use the static library that zstd builds for simplicity and +# so we get the compiler options correct +$(ZSTDDIR)/libzstd.a: $(ZSTD_FILES) + $(MAKE) -C $(ZSTDDIR) libzstd CFLAGS="$(ALL_CFLAGS)" LDFLAGS="$(ALL_LDFLAGS)" -libzstd.a: $(ZSTD_FILES) - $(MAKE) -C $(ZSTDDIR) libzstd - @cp $(ZSTDDIR)/libzstd.a . +# Rules to build the tests +test/RoundTripTest$(EXT): test/RoundTripTest.o $(PROGDIR)/datagen.o Options.o \ + Pzstd.o SkippableFrame.o $(ZSTDDIR)/libzstd.a + $(LD_COMMAND) -Pzstd.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h - $(CXX) $(FLAGS) -c Pzstd.cpp -o $@ +test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main +test/%Test$(EXT): test/%Test.o $(PROGDIR)/datagen.o Options.o Pzstd.o \ + SkippableFrame.o $(ZSTDDIR)/libzstd.a + $(LD_COMMAND) -SkippableFrame.o: SkippableFrame.h SkippableFrame.cpp utils/*.h - $(CXX) $(FLAGS) -c SkippableFrame.cpp -o $@ +utils/test/%Test$(EXT): GTEST_LIB += -lgtest -lgtest_main +utils/test/%Test$(EXT): utils/test/%Test.o + $(LD_COMMAND) -Options.o: Options.h Options.cpp - $(CXX) $(FLAGS) -c Options.cpp -o $@ -main.o: main.cpp *.h utils/*.h - $(CXX) $(FLAGS) -c main.cpp -o $@ - -pzstd: Pzstd.o SkippableFrame.o Options.o main.o libzstd.a - $(CXX) $(FLAGS) $^ -o $@$(EXT) -lpthread - -libzstd32.a: $(ZSTD_FILES) - $(MAKE) -C $(ZSTDDIR) libzstd MOREFLAGS="-m32" - @cp $(ZSTDDIR)/libzstd.a libzstd32.a - -Pzstd32.o: Pzstd.h Pzstd.cpp ErrorHolder.h utils/*.h - $(CXX) -m32 $(FLAGS) -c Pzstd.cpp -o $@ - -SkippableFrame32.o: SkippableFrame.h SkippableFrame.cpp utils/*.h - $(CXX) -m32 $(FLAGS) -c SkippableFrame.cpp -o $@ - -Options32.o: Options.h Options.cpp - $(CXX) -m32 $(FLAGS) -c Options.cpp -o $@ - -main32.o: main.cpp *.h utils/*.h - $(CXX) -m32 $(FLAGS) -c main.cpp -o $@ - -pzstd32: Pzstd32.o SkippableFrame32.o Options32.o main32.o libzstd32.a - $(CXX) -m32 $(FLAGS) $^ -o $@$(EXT) -lpthread +GTEST_CMAKEFLAGS = +# Install googletest +.PHONY: googletest +googletest: PZSTD_CCXXFLAGS += -fPIC googletest: @$(RM) -rf googletest @git clone https://github.com/google/googletest @mkdir -p googletest/build - @cd googletest/build && cmake .. && make + @cd googletest/build && cmake $(GTEST_CMAKEFLAGS) -DCMAKE_CXX_FLAGS="$(ALL_CXXFLAGS)" .. && $(MAKE) -googletest32: - @$(RM) -rf googletest - @git clone https://github.com/google/googletest - @mkdir -p googletest/build - @cd googletest/build && cmake .. -DCMAKE_CXX_FLAGS=-m32 && make - -googletest-mingw64: - $(RM) -rf googletest - git clone https://github.com/google/googletest - mkdir -p googletest/build - cd googletest/build && cmake -G "MSYS Makefiles" .. && $(MAKE) - -test: - $(MAKE) libzstd.a - $(MAKE) pzstd MOREFLAGS="-Wall -Wextra -pedantic -Werror" - $(MAKE) -C utils/test clean - $(MAKE) -C utils/test test MOREFLAGS="-Wall -Wextra -pedantic -Werror" - $(MAKE) -C test clean - $(MAKE) -C test test MOREFLAGS="-Wall -Wextra -pedantic -Werror" - -test32: - $(MAKE) libzstd.a MOREFLAGS="-m32" - $(MAKE) pzstd MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror" - $(MAKE) -C utils/test clean - $(MAKE) -C utils/test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror" - $(MAKE) -C test clean - $(MAKE) -C test test MOREFLAGS="-m32 -Wall -Wextra -pedantic -Werror" +.PHONY: googletest32 +googletest32: PZSTD_CCXXFLAGS += -m32 +googletest32: googletest +.PHONY: googletest-mingw64 +googletest-mingw64: GTEST_CMAKEFLAGS += -G "MSYS Makefiles" +googletest-mingw64: googletest +.PHONY: clean clean: + $(RM) -f *.o pzstd$(EXT) *.Td *.d + $(RM) -f test/*.o test/*Test$(EXT) test/*.Td test/*.d + $(RM) -f utils/test/*.o utils/test/*Test$(EXT) utils/test/*.Td utils/test/*.d + $(RM) -f $(PROGDIR)/*.o $(PROGDIR)/*.Td $(PROGDIR)/*.d $(MAKE) -C $(ZSTDDIR) clean - $(MAKE) -C utils/test clean - $(MAKE) -C test clean - @$(RM) -rf libzstd.a *.o pzstd$(EXT) pzstd32$(EXT) @echo Cleaning completed + + +# Cancel implicit rules +%.o: %.c +%.o: %.cpp + +# Object file rules +%.o: %.c + $(CC_COMMAND) + $(POSTCOMPILE) + +$(PROGDIR)/%.o: $(PROGDIR)/%.c + $(CC_COMMAND) + $(POSTCOMPILE) + +%.o: %.cpp + $(CXX_COMMAND) + $(POSTCOMPILE) + +test/%.o: test/%.cpp + $(CXX_COMMAND) + $(POSTCOMPILE) + +utils/test/%.o: utils/test/%.cpp + $(CXX_COMMAND) + $(POSTCOMPILE) + +# Dependency file stuff +.PRECIOUS: %.d test/%.d utils/test/%.d + +# Include rules that specify header file dependencies +-include $(patsubst %,%.d,$(basename $(ALL_SRCS))) diff --git a/contrib/pzstd/test/Makefile b/contrib/pzstd/test/Makefile deleted file mode 100644 index 4f6ba9997..000000000 --- a/contrib/pzstd/test/Makefile +++ /dev/null @@ -1,48 +0,0 @@ -# ########################################################################## -# Copyright (c) 2016-present, Facebook, Inc. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. An additional grant -# of patent rights can be found in the PATENTS file in the same directory. -# ########################################################################## - -# Define *.exe as extension for Windows systems -ifneq (,$(filter Windows%,$(OS))) -EXT =.exe -else -EXT = -endif - -PZSTDDIR = .. -PROGDIR = ../../../programs -ZSTDDIR = ../../../lib - -# Set GTEST_INC and GTEST_LIB to work with your install of gtest -GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include -GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest -GTEST_FLAGS = $(GTEST_INC) $(GTEST_LIB) -CPPFLAGS = -I$(PZSTDDIR) -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I. - -CXXFLAGS ?= -O3 -CXXFLAGS += -std=c++11 -Wno-deprecated-declarations -CXXFLAGS += $(MOREFLAGS) -FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS) - -datagen.o: $(PROGDIR)/datagen.* - $(CC) $(CPPFLAGS) -O3 $(MOREFLAGS) $(LDFLAGS) -Wno-long-long -Wno-variadic-macros $(PROGDIR)/datagen.c -c -o $@ - -%: %.cpp *.h datagen.o - $(CXX) $(FLAGS) $@.cpp datagen.o $(PZSTDDIR)/Pzstd.o $(PZSTDDIR)/SkippableFrame.o $(PZSTDDIR)/Options.o $(PZSTDDIR)/libzstd.a -o $@$(EXT) $(GTEST_FLAGS) -lgtest -lgtest_main -lpthread - -.PHONY: test clean - -test: OptionsTest PzstdTest - @./OptionsTest$(EXT) - @./PzstdTest$(EXT) - -roundtrip: RoundTripTest - @./RoundTripTest$(EXT) - -clean: - @rm -f datagen.o OptionsTest PzstdTest RoundTripTest diff --git a/contrib/pzstd/utils/test/Makefile b/contrib/pzstd/utils/test/Makefile deleted file mode 100644 index b9ea73e32..000000000 --- a/contrib/pzstd/utils/test/Makefile +++ /dev/null @@ -1,42 +0,0 @@ -# ########################################################################## -# Copyright (c) 2016-present, Facebook, Inc. -# All rights reserved. -# -# This source code is licensed under the BSD-style license found in the -# LICENSE file in the root directory of this source tree. An additional grant -# of patent rights can be found in the PATENTS file in the same directory. -# ########################################################################## - -# Define *.exe as extension for Windows systems -ifneq (,$(filter Windows%,$(OS))) -EXT =.exe -else -EXT = -endif - -PZSTDDIR = ../.. - -# Set GTEST_INC and GTEST_LIB to work with your install of gtest -GTEST_INC ?= -isystem $(PZSTDDIR)/googletest/googletest/include -GTEST_LIB ?= -L $(PZSTDDIR)/googletest/build/googlemock/gtest - -CPPFLAGS = -I$(PZSTDDIR) $(GTEST_INC) $(GTEST_LIB) -CXXFLAGS ?= -O3 -CXXFLAGS += -std=c++11 -CXXFLAGS += $(MOREFLAGS) -FLAGS = $(CPPFLAGS) $(CXXFLAGS) $(LDFLAGS) - -%: %.cpp - $(CXX) $(FLAGS) $^ -o $@$(EXT) -lgtest -lgtest_main -lpthread - -.PHONY: test clean - -test: BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest - @./BufferTest$(EXT) - @./RangeTest$(EXT) - @./ScopeGuardTest$(EXT) - @./ThreadPoolTest$(EXT) - @./WorkQueueTest$(EXT) - -clean: - @rm -f BufferTest RangeTest ScopeGuardTest ThreadPoolTest WorkQueueTest From 96e0702c00c1ec9a9888a2d113b38d926681d3e0 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 6 Oct 2016 21:31:16 -0700 Subject: [PATCH 05/13] [pzstd] Print the correct width ints --- contrib/pzstd/Pzstd.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 59a2496c4..68f5bb977 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -15,6 +15,7 @@ #include "utils/WorkQueue.h" #include +#include #include #include #include @@ -104,11 +105,12 @@ static std::uint64_t handleOneInput(const Options &options, if (!options.decompress) { double ratio = static_cast(bytesWritten) / static_cast(bytesRead + !bytesRead); - std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n", + std::fprintf(stderr, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64 + " bytes, %s)\n", inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, outputFileName.c_str()); } else { - std::fprintf(stderr, "%-20s: %llu bytes \n", + std::fprintf(stderr, "%-20s: %" PRIu64 " bytes \n", inputFileName.c_str(),bytesWritten); } } From 4cb5e90a5c7ce0162c2b8dbf2e055e3951fd75fb Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 6 Oct 2016 21:32:06 -0700 Subject: [PATCH 06/13] [pzstd] Add asan and tsan tests to travis gcc-6 tsan is buggy. It fails to use the correct linker. It is also broken with `-pie` with linux kernels newer than 4.1, but previous versions require `-pie`... --- .travis.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.travis.yml b/.travis.yml index f328f480d..0c8960725 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,6 +55,20 @@ matrix: packages: - libc6-dev-i386 - gcc-multilib + - os: linux + sudo: required + install: + - export CXX="g++-6" CC="gcc-6" + - export LDFLAGS="-fuse-ld=gold" + - export TESTFLAGS='--gtest_filter=-*ExtremelyLarge*' + env: PLATFORM="Ubuntu 12.04" CMD='cd contrib/pzstd && make googletest && make tsan && make check && make clean && make asan && make check && make clean && cd ../..' + addons: + apt: + sources: + - ubuntu-toolchain-r-test + packages: + - gcc-6 + - g++-6 # Ubuntu 14.04 LTS Server Edition 64 bit - os: linux dist: trusty From 9b603ee284878039c076e1cc88215e4749d126dc Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 7 Oct 2016 15:04:34 -0700 Subject: [PATCH 07/13] [pzstd] Run the reading thread separately --- contrib/pzstd/Pzstd.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 68f5bb977..db9b8c85b 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -62,19 +62,18 @@ static std::uint64_t handleOneInput(const Options &options, ErrorHolder &errorHolder) { auto inputSize = fileSizeOrZero(inputFile); // WorkQueue outlives ThreadPool so in the case of error we are certain - // we don't accidently try to call push() on it after it is destroyed. + // we don't accidently try to call push() on it after it is destroyed WorkQueue> outs{options.numThreads + 1}; std::uint64_t bytesRead; std::uint64_t bytesWritten; { - // Initialize the thread pool with numThreads + 1 - // We add one because the read thread spends most of its time waiting. - // This also sets the minimum number of threads to 2, so the algorithm - // doesn't deadlock. - ThreadPool executor(options.numThreads + 1); + // Initialize the (de)compression thread pool with numThreads + ThreadPool executor(options.numThreads); + // Run the reader thread on an extra thread + ThreadPool readExecutor(1); if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs - executor.add( + readExecutor.add( [&errorHolder, &outs, &executor, inputFd, inputSize, &options, &bytesRead] { bytesRead = asyncCompressChunks( @@ -91,7 +90,7 @@ static std::uint64_t handleOneInput(const Options &options, options.verbosity); } else { // Add a job that reads the input and starts all the decompression jobs - executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { + readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); }); // Start writing From 48294b57c359bc0cfc6560df89f34507658bdc3d Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Wed, 12 Oct 2016 15:18:16 -0700 Subject: [PATCH 08/13] [pzstd] Put ErrorHolder into SharedState --- contrib/pzstd/Pzstd.cpp | 74 +++++++++++++++++++++-------------------- contrib/pzstd/Pzstd.h | 18 ++++++---- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index db9b8c85b..70c0515b3 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -59,7 +59,7 @@ static std::uint64_t handleOneInput(const Options &options, FILE* inputFd, const std::string &outputFile, FILE* outputFd, - ErrorHolder &errorHolder) { + SharedState& state) { auto inputSize = fileSizeOrZero(inputFile); // WorkQueue outlives ThreadPool so in the case of error we are certain // we don't accidently try to call push() on it after it is destroyed @@ -74,10 +74,9 @@ static std::uint64_t handleOneInput(const Options &options, if (!options.decompress) { // Add a job that reads the input and starts all the compression jobs readExecutor.add( - [&errorHolder, &outs, &executor, inputFd, inputSize, &options, - &bytesRead] { + [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] { bytesRead = asyncCompressChunks( - errorHolder, + state, outs, executor, inputFd, @@ -86,19 +85,19 @@ static std::uint64_t handleOneInput(const Options &options, options.determineParameters()); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + bytesWritten = writeFile(state, outs, outputFd, options.decompress, options.verbosity); } else { // Add a job that reads the input and starts all the decompression jobs - readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] { - bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd); + readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] { + bytesRead = asyncDecompressFrames(state, outs, executor, inputFd); }); // Start writing - bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress, + bytesWritten = writeFile(state, outs, outputFd, options.decompress, options.verbosity); } } - if (options.verbosity > 1 && !errorHolder.hasError()) { + if (options.verbosity > 1 && !state.errorHolder.hasError()) { std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; if (!options.decompress) { @@ -176,53 +175,53 @@ int pzstdMain(const Options &options) { int returnCode = 0; for (const auto& input : options.inputFiles) { // Setup the error holder - ErrorHolder errorHolder; + SharedState state; auto printErrorGuard = makeScopeGuard([&] { - if (errorHolder.hasError()) { + if (state.errorHolder.hasError()) { returnCode = 1; if (options.verbosity > 0) { std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(), - errorHolder.getError().c_str()); + state.errorHolder.getError().c_str()); } } }); // Open the input file - auto inputFd = openInputFile(input, errorHolder); + auto inputFd = openInputFile(input, state.errorHolder); if (inputFd == nullptr) { continue; } auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); }); // Open the output file auto outputFile = options.getOutputFile(input); - if (!errorHolder.check(outputFile != "", + if (!state.errorHolder.check(outputFile != "", "Input file does not have extension .zst")) { continue; } - auto outputFd = openOutputFile(options, outputFile, errorHolder); + auto outputFd = openOutputFile(options, outputFile, state.errorHolder); if (outputFd == nullptr) { continue; } auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); }); // (de)compress the file - handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder); - if (errorHolder.hasError()) { + handleOneInput(options, input, inputFd, outputFile, outputFd, state); + if (state.errorHolder.hasError()) { continue; } // Delete the input file if necessary if (!options.keepSource) { // Be sure that we are done and have written everything before we delete - if (!errorHolder.check(std::fclose(inputFd) == 0, + if (!state.errorHolder.check(std::fclose(inputFd) == 0, "Failed to close input file")) { continue; } closeInputGuard.dismiss(); - if (!errorHolder.check(std::fclose(outputFd) == 0, + if (!state.errorHolder.check(std::fclose(outputFd) == 0, "Failed to close output file")) { continue; } closeOutputGuard.dismiss(); if (std::remove(input.c_str()) != 0) { - errorHolder.setError("Failed to remove input file"); + state.errorHolder.setError("Failed to remove input file"); continue; } } @@ -268,18 +267,19 @@ Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) { /** * Stream chunks of input from `in`, compress it, and stream it out to `out`. * - * @param errorHolder Used to report errors and check if an error occured + * @param state The shared state * @param in Queue that we `pop()` input buffers from * @param out Queue that we `push()` compressed output buffers to * @param maxInputSize An upper bound on the size of the input * @param parameters The zstd parameters to use for compression */ static void compress( - ErrorHolder& errorHolder, + SharedState& state, std::shared_ptr in, std::shared_ptr out, size_t maxInputSize, ZSTD_parameters parameters) { + auto& errorHolder = state.errorHolder; auto guard = makeScopeGuard([&] { out->finish(); }); // Initialize the CCtx std::unique_ptr ctx( @@ -395,7 +395,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd, } std::uint64_t asyncCompressChunks( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& chunks, ThreadPool& executor, FILE* fd, @@ -409,23 +409,23 @@ std::uint64_t asyncCompressChunks( // independently. size_t step = calculateStep(size, numThreads, params); auto status = FileStatus::Continue; - while (status == FileStatus::Continue && !errorHolder.hasError()) { + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { // Make a new input queue that we will put the chunk's input data into. auto in = std::make_shared(); auto inGuard = makeScopeGuard([&] { in->finish(); }); // Make a new output queue that compress will put the compressed data into. auto out = std::make_shared(); // Start compression in the thread pool - executor.add([&errorHolder, in, out, step, params] { + executor.add([&state, in, out, step, params] { return compress( - errorHolder, std::move(in), std::move(out), step, params); + state, std::move(in), std::move(out), step, params); }); // Pass the output queue to the writer thread. chunks.push(std::move(out)); // Fill the input queue for the compression job we just started status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead); } - errorHolder.check(status != FileStatus::Error, "Error reading input"); + state.errorHolder.check(status != FileStatus::Error, "Error reading input"); return bytesRead; } @@ -433,15 +433,16 @@ std::uint64_t asyncCompressChunks( * Decompress a frame, whose data is streamed into `in`, and stream the output * to `out`. * - * @param errorHolder Used to report errors and check if an error occured + * @param state The shared state * @param in Queue that we `pop()` input buffers from. It contains * exactly one compressed frame. * @param out Queue that we `push()` decompressed output buffers to */ static void decompress( - ErrorHolder& errorHolder, + SharedState& state, std::shared_ptr in, std::shared_ptr out) { + auto& errorHolder = state.errorHolder; auto guard = makeScopeGuard([&] { out->finish(); }); // Initialize the DCtx std::unique_ptr ctx( @@ -508,7 +509,7 @@ static void decompress( } std::uint64_t asyncDecompressFrames( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& frames, ThreadPool& executor, FILE* fd) { @@ -521,7 +522,7 @@ std::uint64_t asyncDecompressFrames( // Otherwise, we will decompress using only one decompression task. const size_t chunkSize = ZSTD_DStreamInSize(); auto status = FileStatus::Continue; - while (status == FileStatus::Continue && !errorHolder.hasError()) { + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { // Make a new input queue that we will put the frames's bytes into. auto in = std::make_shared(); auto inGuard = makeScopeGuard([&] { in->finish(); }); @@ -550,15 +551,15 @@ std::uint64_t asyncDecompressFrames( out->setMaxSize(64); } // Start decompression in the thread pool - executor.add([&errorHolder, in, out] { - return decompress(errorHolder, std::move(in), std::move(out)); + executor.add([&state, in, out] { + return decompress(state, std::move(in), std::move(out)); }); // Pass the output queue to the writer thread frames.push(std::move(out)); if (frameSize == 0) { // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted // Pass the rest of the source to this decompression task - while (status == FileStatus::Continue && !errorHolder.hasError()) { + while (status == FileStatus::Continue && !state.errorHolder.hasError()) { status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead); } break; @@ -566,7 +567,7 @@ std::uint64_t asyncDecompressFrames( // Fill the input queue for the decompression job we just started status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead); } - errorHolder.check(status != FileStatus::Error, "Error reading input"); + state.errorHolder.check(status != FileStatus::Error, "Error reading input"); return totalBytesRead; } @@ -598,11 +599,12 @@ void updateWritten(int verbosity, std::uint64_t bytesWritten) { } std::uint64_t writeFile( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& outs, FILE* outputFd, bool decompress, int verbosity) { + auto& errorHolder = state.errorHolder; auto lineClearGuard = makeScopeGuard([verbosity] { if (verbosity > 1) { std::fprintf(stderr, "\r%79s\r", ""); diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index fe44ccfde..469c20cd4 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -12,6 +12,7 @@ #include "Options.h" #include "utils/Buffer.h" #include "utils/Range.h" +#include "utils/ResourcePool.h" #include "utils/ThreadPool.h" #include "utils/WorkQueue.h" #define ZSTD_STATIC_LINKING_ONLY @@ -32,12 +33,17 @@ namespace pzstd { */ int pzstdMain(const Options& options); +class SharedState { + public: + ErrorHolder errorHolder; +}; + /** * Streams input from `fd`, breaks input up into chunks, and compresses each * chunk independently. Output of each chunk gets streamed to a queue, and * the output queues get put into `chunks` in order. * - * @param errorHolder Used to report errors and coordinate early shutdown + * @param state The shared state * @param chunks Each compression jobs output queue gets `pushed()` here * as soon as it is available * @param executor The thread pool to run compression jobs in @@ -48,7 +54,7 @@ int pzstdMain(const Options& options); * @returns The number of bytes read from the file */ std::uint64_t asyncCompressChunks( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& chunks, ThreadPool& executor, FILE* fd, @@ -62,7 +68,7 @@ std::uint64_t asyncCompressChunks( * decompression job. Output of each frame gets streamed to a queue, and * the output queues get put into `frames` in order. * - * @param errorHolder Used to report errors and coordinate early shutdown + * @param state The shared state * @param frames Each decompression jobs output queue gets `pushed()` here * as soon as it is available * @param executor The thread pool to run compression jobs in @@ -70,7 +76,7 @@ std::uint64_t asyncCompressChunks( * @returns The number of bytes read from the file */ std::uint64_t asyncDecompressFrames( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& frames, ThreadPool& executor, FILE* fd); @@ -79,7 +85,7 @@ std::uint64_t asyncDecompressFrames( * Streams input in from each queue in `outs` in order, and writes the data to * `outputFd`. * - * @param errorHolder Used to report errors and coordinate early exit + * @param state The shared state * @param outs A queue of output queues, one for each * (de)compression job. * @param outputFd The file descriptor to write to @@ -88,7 +94,7 @@ std::uint64_t asyncDecompressFrames( * @returns The number of bytes written */ std::uint64_t writeFile( - ErrorHolder& errorHolder, + SharedState& state, WorkQueue>& outs, FILE* outputFd, bool decompress, From e9e151ce31835accd17ed5dcdf87ba18c558606d Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Wed, 12 Oct 2016 17:23:38 -0700 Subject: [PATCH 09/13] [pzstd] Reuse ZSTD_{C,D}Stream --- contrib/pzstd/Makefile | 1 + contrib/pzstd/Pzstd.cpp | 22 ++--- contrib/pzstd/Pzstd.h | 39 ++++++++ contrib/pzstd/utils/ResourcePool.h | 96 +++++++++++++++++++ contrib/pzstd/utils/test/ResourcePoolTest.cpp | 72 ++++++++++++++ 5 files changed, 217 insertions(+), 13 deletions(-) create mode 100644 contrib/pzstd/utils/ResourcePool.h create mode 100644 contrib/pzstd/utils/test/ResourcePoolTest.cpp diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile index b998c1fff..4f63887d3 100644 --- a/contrib/pzstd/Makefile +++ b/contrib/pzstd/Makefile @@ -87,6 +87,7 @@ default: all check: $(TESTPROG) ./utils/test/BufferTest$(EXT) $(TESTFLAGS) $(TESTPROG) ./utils/test/RangeTest$(EXT) $(TESTFLAGS) + $(TESTPROG) ./utils/test/ResourcePoolTest$(EXT) $(TESTFLAGS) $(TESTPROG) ./utils/test/ScopeGuardTest$(EXT) $(TESTFLAGS) $(TESTPROG) ./utils/test/ThreadPoolTest$(EXT) $(TESTFLAGS) $(TESTPROG) ./utils/test/WorkQueueTest$(EXT) $(TESTFLAGS) diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 70c0515b3..6b3d27b3e 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -173,9 +173,9 @@ static FILE *openOutputFile(const Options &options, int pzstdMain(const Options &options) { int returnCode = 0; + SharedState state(options.decompress, options.determineParameters()); for (const auto& input : options.inputFiles) { - // Setup the error holder - SharedState state; + // Setup the shared state auto printErrorGuard = makeScopeGuard([&] { if (state.errorHolder.hasError()) { returnCode = 1; @@ -271,24 +271,21 @@ Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) { * @param in Queue that we `pop()` input buffers from * @param out Queue that we `push()` compressed output buffers to * @param maxInputSize An upper bound on the size of the input - * @param parameters The zstd parameters to use for compression */ static void compress( SharedState& state, std::shared_ptr in, std::shared_ptr out, - size_t maxInputSize, - ZSTD_parameters parameters) { + size_t maxInputSize) { auto& errorHolder = state.errorHolder; auto guard = makeScopeGuard([&] { out->finish(); }); // Initialize the CCtx - std::unique_ptr ctx( - ZSTD_createCStream(), ZSTD_freeCStream); + auto ctx = state.cStreamPool->get(); if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) { return; } { - auto err = ZSTD_initCStream_advanced(ctx.get(), nullptr, 0, parameters, 0); + auto err = ZSTD_resetCStream(ctx.get(), 0); if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { return; } @@ -416,9 +413,9 @@ std::uint64_t asyncCompressChunks( // Make a new output queue that compress will put the compressed data into. auto out = std::make_shared(); // Start compression in the thread pool - executor.add([&state, in, out, step, params] { + executor.add([&state, in, out, step] { return compress( - state, std::move(in), std::move(out), step, params); + state, std::move(in), std::move(out), step); }); // Pass the output queue to the writer thread. chunks.push(std::move(out)); @@ -445,13 +442,12 @@ static void decompress( auto& errorHolder = state.errorHolder; auto guard = makeScopeGuard([&] { out->finish(); }); // Initialize the DCtx - std::unique_ptr ctx( - ZSTD_createDStream(), ZSTD_freeDStream); + auto ctx = state.dStreamPool->get(); if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) { return; } { - auto err = ZSTD_initDStream(ctx.get()); + auto err = ZSTD_resetDStream(ctx.get()); if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) { return; } diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index 469c20cd4..b02fe7b19 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -35,7 +35,46 @@ int pzstdMain(const Options& options); class SharedState { public: + SharedState(bool decompress, ZSTD_parameters parameters) { + if (!decompress) { + cStreamPool.reset(new ResourcePool{ + [parameters]() -> ZSTD_CStream* { + auto zcs = ZSTD_createCStream(); + if (zcs) { + auto err = ZSTD_initCStream_advanced( + zcs, nullptr, 0, parameters, 0); + if (ZSTD_isError(err)) { + ZSTD_freeCStream(zcs); + return nullptr; + } + } + return zcs; + }, + [](ZSTD_CStream *zcs) { + ZSTD_freeCStream(zcs); + }}); + } else { + dStreamPool.reset(new ResourcePool{ + []() -> ZSTD_DStream* { + auto zds = ZSTD_createDStream(); + if (zds) { + auto err = ZSTD_initDStream(zds); + if (ZSTD_isError(err)) { + ZSTD_freeDStream(zds); + return nullptr; + } + } + return zds; + }, + [](ZSTD_DStream *zds) { + ZSTD_freeDStream(zds); + }}); + } + } + ErrorHolder errorHolder; + std::unique_ptr> cStreamPool; + std::unique_ptr> dStreamPool; }; /** diff --git a/contrib/pzstd/utils/ResourcePool.h b/contrib/pzstd/utils/ResourcePool.h new file mode 100644 index 000000000..ed011306b --- /dev/null +++ b/contrib/pzstd/utils/ResourcePool.h @@ -0,0 +1,96 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include +#include +#include +#include +#include + +namespace pzstd { + +/** + * An unbounded pool of resources. + * A `ResourcePool` requires a factory function that takes allocates `T*` and + * a free function that frees a `T*`. + * Calling `ResourcePool::get()` will give you a new `ResourcePool::UniquePtr` + * to a `T`, and when it goes out of scope the resource will be returned to the + * pool. + * The `ResourcePool` *must* survive longer than any resources it hands out. + * Remember that `ResourcePool` hands out mutable `T`s, so make sure to clean + * up the resource before or after every use. + */ +template +class ResourcePool { + public: + class Deleter; + using Factory = std::function; + using Free = std::function; + using UniquePtr = std::unique_ptr; + + private: + std::mutex mutex_; + Factory factory_; + Free free_; + std::vector resources_; + unsigned inUse_; + + public: + /** + * Creates a `ResourcePool`. + * + * @param factory The function to use to create new resources. + * @param free The function to use to free resources created by `factory`. + */ + ResourcePool(Factory factory, Free free) + : factory_(std::move(factory)), free_(std::move(free)), inUse_(0) {} + + /** + * @returns A unique pointer to a resource. The resource is null iff + * there are no avaiable resources and `factory()` returns null. + */ + UniquePtr get() { + std::lock_guard lock(mutex_); + if (!resources_.empty()) { + UniquePtr resource{resources_.back(), Deleter{*this}}; + resources_.pop_back(); + ++inUse_; + return resource; + } + UniquePtr resource{factory_(), Deleter{*this}}; + ++inUse_; + return resource; + } + + ~ResourcePool() noexcept { + assert(inUse_ == 0); + for (const auto resource : resources_) { + free_(resource); + } + } + + class Deleter { + ResourcePool *pool_; + public: + explicit Deleter(ResourcePool &pool) : pool_(&pool) {} + + void operator() (T *resource) { + std::lock_guard lock(pool_->mutex_); + // Make sure we don't put null resources into the pool + if (resource) { + pool_->resources_.push_back(resource); + } + assert(pool_->inUse_ > 0); + --pool_->inUse_; + } + }; +}; + +} diff --git a/contrib/pzstd/utils/test/ResourcePoolTest.cpp b/contrib/pzstd/utils/test/ResourcePoolTest.cpp new file mode 100644 index 000000000..a6a86b345 --- /dev/null +++ b/contrib/pzstd/utils/test/ResourcePoolTest.cpp @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#include "utils/ResourcePool.h" + +#include +#include +#include + +using namespace pzstd; + +TEST(ResourcePool, FullTest) { + unsigned numCreated = 0; + unsigned numDeleted = 0; + { + ResourcePool pool( + [&numCreated] { ++numCreated; return new int{5}; }, + [&numDeleted](int *x) { ++numDeleted; delete x; }); + + { + auto i = pool.get(); + EXPECT_EQ(5, *i); + *i = 6; + } + { + auto i = pool.get(); + EXPECT_EQ(6, *i); + auto j = pool.get(); + EXPECT_EQ(5, *j); + *j = 7; + } + { + auto i = pool.get(); + EXPECT_EQ(6, *i); + auto j = pool.get(); + EXPECT_EQ(7, *j); + } + } + EXPECT_EQ(2, numCreated); + EXPECT_EQ(numCreated, numDeleted); +} + +TEST(ResourcePool, ThreadSafe) { + std::atomic numCreated{0}; + std::atomic numDeleted{0}; + { + ResourcePool pool( + [&numCreated] { ++numCreated; return new int{0}; }, + [&numDeleted](int *x) { ++numDeleted; delete x; }); + auto push = [&pool] { + for (int i = 0; i < 100; ++i) { + auto x = pool.get(); + ++*x; + } + }; + std::thread t1{push}; + std::thread t2{push}; + t1.join(); + t2.join(); + + auto x = pool.get(); + auto y = pool.get(); + EXPECT_EQ(200, *x + *y); + } + EXPECT_GE(2, numCreated); + EXPECT_EQ(numCreated, numDeleted); +} From baa152e56e5f56e8991615998b3550b0a736d5cb Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Wed, 12 Oct 2016 19:02:27 -0700 Subject: [PATCH 10/13] [pzstd] Add Logger class --- contrib/pzstd/Logging.h | 72 +++++++++++++++++++++++++++++++++++++++++ contrib/pzstd/Pzstd.cpp | 66 +++++++++++++------------------------ contrib/pzstd/Pzstd.h | 11 ++++--- 3 files changed, 100 insertions(+), 49 deletions(-) create mode 100644 contrib/pzstd/Logging.h diff --git a/contrib/pzstd/Logging.h b/contrib/pzstd/Logging.h new file mode 100644 index 000000000..76c982ab2 --- /dev/null +++ b/contrib/pzstd/Logging.h @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#pragma once + +#include +#include + +namespace pzstd { + +constexpr int ERROR = 1; +constexpr int INFO = 2; +constexpr int DEBUG = 3; +constexpr int VERBOSE = 4; + +class Logger { + std::mutex mutex_; + FILE* out_; + const int level_; + + using Clock = std::chrono::system_clock; + Clock::time_point lastUpdate_; + std::chrono::milliseconds refreshRate_; + + public: + explicit Logger(int level, FILE* out = stderr) + : out_(out), level_(level), lastUpdate_(Clock::now()), + refreshRate_(150) {} + + + bool logsAt(int level) { + return level <= level_; + } + + template + void operator()(int level, const char *fmt, Args... args) { + if (level > level_) { + return; + } + std::lock_guard lock(mutex_); + std::fprintf(out_, fmt, args...); + } + + template + void update(int level, const char *fmt, Args... args) { + if (level > level_) { + return; + } + std::lock_guard lock(mutex_); + auto now = Clock::now(); + if (now - lastUpdate_ > refreshRate_) { + lastUpdate_ = now; + std::fprintf(out_, "\r"); + std::fprintf(out_, fmt, args...); + } + } + + void clear(int level) { + if (level > level_) { + return; + } + std::lock_guard lock(mutex_); + std::fprintf(out_, "\r%79s\r", ""); + } +}; + +} diff --git a/contrib/pzstd/Pzstd.cpp b/contrib/pzstd/Pzstd.cpp index 6b3d27b3e..c5b4ce4cb 100644 --- a/contrib/pzstd/Pzstd.cpp +++ b/contrib/pzstd/Pzstd.cpp @@ -85,30 +85,28 @@ static std::uint64_t handleOneInput(const Options &options, options.determineParameters()); }); // Start writing - bytesWritten = writeFile(state, outs, outputFd, options.decompress, - options.verbosity); + bytesWritten = writeFile(state, outs, outputFd, options.decompress); } else { // Add a job that reads the input and starts all the decompression jobs readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] { bytesRead = asyncDecompressFrames(state, outs, executor, inputFd); }); // Start writing - bytesWritten = writeFile(state, outs, outputFd, options.decompress, - options.verbosity); + bytesWritten = writeFile(state, outs, outputFd, options.decompress); } } - if (options.verbosity > 1 && !state.errorHolder.hasError()) { + if (!state.errorHolder.hasError()) { std::string inputFileName = inputFile == "-" ? "stdin" : inputFile; std::string outputFileName = outputFile == "-" ? "stdout" : outputFile; if (!options.decompress) { double ratio = static_cast(bytesWritten) / static_cast(bytesRead + !bytesRead); - std::fprintf(stderr, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64 + state.log(INFO, "%-20s :%6.2f%% (%6" PRIu64 " => %6" PRIu64 " bytes, %s)\n", inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten, outputFileName.c_str()); } else { - std::fprintf(stderr, "%-20s: %" PRIu64 " bytes \n", + state.log(INFO, "%-20s: %" PRIu64 " bytes \n", inputFileName.c_str(),bytesWritten); } } @@ -138,7 +136,7 @@ static FILE *openInputFile(const std::string &inputFile, static FILE *openOutputFile(const Options &options, const std::string &outputFile, - ErrorHolder &errorHolder) { + SharedState& state) { if (outputFile == "-") { SET_BINARY_MODE(stdout); return stdout; @@ -148,41 +146,39 @@ static FILE *openOutputFile(const Options &options, auto outputFd = std::fopen(outputFile.c_str(), "rb"); if (outputFd != nullptr) { std::fclose(outputFd); - if (options.verbosity <= 1) { - errorHolder.setError("Output file exists"); + if (!state.log.logsAt(INFO)) { + state.errorHolder.setError("Output file exists"); return nullptr; } - std::fprintf( - stderr, + state.log( + INFO, "pzstd: %s already exists; do you wish to overwrite (y/n) ? ", outputFile.c_str()); int c = getchar(); if (c != 'y' && c != 'Y') { - errorHolder.setError("Not overwritten"); + state.errorHolder.setError("Not overwritten"); return nullptr; } } } auto outputFd = std::fopen(outputFile.c_str(), "wb"); - if (!errorHolder.check( + if (!state.errorHolder.check( outputFd != nullptr, "Failed to open output file")) { - return 0; + return nullptr; } return outputFd; } int pzstdMain(const Options &options) { int returnCode = 0; - SharedState state(options.decompress, options.determineParameters()); + SharedState state(options); for (const auto& input : options.inputFiles) { // Setup the shared state auto printErrorGuard = makeScopeGuard([&] { if (state.errorHolder.hasError()) { returnCode = 1; - if (options.verbosity > 0) { - std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(), - state.errorHolder.getError().c_str()); - } + state.log(ERROR, "pzstd: %s: %s.\n", input.c_str(), + state.errorHolder.getError().c_str()); } }); // Open the input file @@ -197,7 +193,7 @@ int pzstdMain(const Options &options) { "Input file does not have extension .zst")) { continue; } - auto outputFd = openOutputFile(options, outputFile, state.errorHolder); + auto outputFd = openOutputFile(options, outputFile, state); if (outputFd == nullptr) { continue; } @@ -578,33 +574,14 @@ static bool writeData(ByteRange data, FILE* fd) { return true; } -void updateWritten(int verbosity, std::uint64_t bytesWritten) { - if (verbosity <= 1) { - return; - } - using Clock = std::chrono::system_clock; - static Clock::time_point then; - constexpr std::chrono::milliseconds refreshRate{150}; - - auto now = Clock::now(); - if (now - then > refreshRate) { - then = now; - std::fprintf(stderr, "\rWritten: %u MB ", - static_cast(bytesWritten >> 20)); - } -} - std::uint64_t writeFile( SharedState& state, WorkQueue>& outs, FILE* outputFd, - bool decompress, - int verbosity) { + bool decompress) { auto& errorHolder = state.errorHolder; - auto lineClearGuard = makeScopeGuard([verbosity] { - if (verbosity > 1) { - std::fprintf(stderr, "\r%79s\r", ""); - } + auto lineClearGuard = makeScopeGuard([&state] { + state.log.clear(INFO); }); std::uint64_t bytesWritten = 0; std::shared_ptr out; @@ -630,7 +607,8 @@ std::uint64_t writeFile( return bytesWritten; } bytesWritten += buffer.size(); - updateWritten(verbosity, bytesWritten); + state.log.update(INFO, "Written: %u MB ", + static_cast(bytesWritten >> 20)); } } return bytesWritten; diff --git a/contrib/pzstd/Pzstd.h b/contrib/pzstd/Pzstd.h index b02fe7b19..9fb2c4884 100644 --- a/contrib/pzstd/Pzstd.h +++ b/contrib/pzstd/Pzstd.h @@ -9,6 +9,7 @@ #pragma once #include "ErrorHolder.h" +#include "Logging.h" #include "Options.h" #include "utils/Buffer.h" #include "utils/Range.h" @@ -35,8 +36,9 @@ int pzstdMain(const Options& options); class SharedState { public: - SharedState(bool decompress, ZSTD_parameters parameters) { - if (!decompress) { + SharedState(const Options& options) : log(options.verbosity) { + if (!options.decompress) { + auto parameters = options.determineParameters(); cStreamPool.reset(new ResourcePool{ [parameters]() -> ZSTD_CStream* { auto zcs = ZSTD_createCStream(); @@ -72,6 +74,7 @@ class SharedState { } } + Logger log; ErrorHolder errorHolder; std::unique_ptr> cStreamPool; std::unique_ptr> dStreamPool; @@ -129,13 +132,11 @@ std::uint64_t asyncDecompressFrames( * (de)compression job. * @param outputFd The file descriptor to write to * @param decompress Are we decompressing? - * @param verbosity The verbosity level to log at * @returns The number of bytes written */ std::uint64_t writeFile( SharedState& state, WorkQueue>& outs, FILE* outputFd, - bool decompress, - int verbosity); + bool decompress); } From 8c6c686d0ae9c25f249b41ea83a8f7909ccc0a38 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Thu, 13 Oct 2016 12:03:02 -0700 Subject: [PATCH 11/13] [pzstd] Fix lantent bug in WorkQueue::push() --- contrib/pzstd/utils/ThreadPool.h | 2 +- contrib/pzstd/utils/WorkQueue.h | 3 ++- contrib/pzstd/utils/test/WorkQueueTest.cpp | 23 +++++++++++++++++----- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/contrib/pzstd/utils/ThreadPool.h b/contrib/pzstd/utils/ThreadPool.h index a1d1fc0b9..99b3ecfa5 100644 --- a/contrib/pzstd/utils/ThreadPool.h +++ b/contrib/pzstd/utils/ThreadPool.h @@ -27,7 +27,7 @@ class ThreadPool { explicit ThreadPool(std::size_t numThreads) { threads_.reserve(numThreads); for (std::size_t i = 0; i < numThreads; ++i) { - threads_.emplace_back([&] { + threads_.emplace_back([this] { std::function task; while (tasks_.pop(task)) { task(); diff --git a/contrib/pzstd/utils/WorkQueue.h b/contrib/pzstd/utils/WorkQueue.h index c46e6cbcf..780e5360f 100644 --- a/contrib/pzstd/utils/WorkQueue.h +++ b/contrib/pzstd/utils/WorkQueue.h @@ -54,12 +54,13 @@ class WorkQueue { /** * Push an item onto the work queue. Notify a single thread that work is * available. If `finish()` has been called, do nothing and return false. + * If `push()` returns false, then `item` has not been moved from. * * @param item Item to push onto the queue. * @returns True upon success, false if `finish()` has been called. An * item was pushed iff `push()` returns true. */ - bool push(T item) { + bool push(T&& item) { { std::unique_lock lock(mutex_); while (full() && !done_) { diff --git a/contrib/pzstd/utils/test/WorkQueueTest.cpp b/contrib/pzstd/utils/test/WorkQueueTest.cpp index ebf375a84..7f58ccb3f 100644 --- a/contrib/pzstd/utils/test/WorkQueueTest.cpp +++ b/contrib/pzstd/utils/test/WorkQueueTest.cpp @@ -10,6 +10,7 @@ #include "utils/WorkQueue.h" #include +#include #include #include #include @@ -64,7 +65,7 @@ TEST(WorkQueue, SPSC) { const int max = 100; for (int i = 0; i < 10; ++i) { - queue.push(i); + queue.push(int{i}); } std::thread thread([ &queue, max ] { @@ -80,7 +81,7 @@ TEST(WorkQueue, SPSC) { std::this_thread::yield(); for (int i = 10; i < max; ++i) { - queue.push(i); + queue.push(int{i}); } queue.finish(); @@ -97,7 +98,7 @@ TEST(WorkQueue, SPMC) { } for (int i = 0; i < 50; ++i) { - queue.push(i); + queue.push(int{i}); } queue.finish(); @@ -126,7 +127,7 @@ TEST(WorkQueue, MPMC) { pusherThreads.emplace_back( [ &queue, min, max ] { for (int i = min; i < max; ++i) { - queue.push(i); + queue.push(int{i}); } }); } @@ -212,7 +213,7 @@ TEST(WorkQueue, BoundedSizeMPMC) { pusherThreads.emplace_back( [ &queue, min, max ] { for (int i = min; i < max; ++i) { - queue.push(i); + queue.push(int{i}); } }); } @@ -231,6 +232,18 @@ TEST(WorkQueue, BoundedSizeMPMC) { } } +TEST(WorkQueue, FailedPush) { + WorkQueue> queue; + std::unique_ptr x(new int{5}); + EXPECT_TRUE(queue.push(std::move(x))); + EXPECT_EQ(nullptr, x); + queue.finish(); + x.reset(new int{6}); + EXPECT_FALSE(queue.push(std::move(x))); + EXPECT_NE(nullptr, x); + EXPECT_EQ(6, *x); +} + TEST(BufferWorkQueue, SizeCalculatedCorrectly) { { BufferWorkQueue queue; From ac4310d303f363df6b0ccea32cedbb6f6a893f00 Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Wed, 26 Oct 2016 00:09:39 -0700 Subject: [PATCH 12/13] [pzstd] Fix test mode for streaming input --- contrib/pzstd/Options.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/contrib/pzstd/Options.cpp b/contrib/pzstd/Options.cpp index ece8c0782..18c069eae 100644 --- a/contrib/pzstd/Options.cpp +++ b/contrib/pzstd/Options.cpp @@ -303,6 +303,12 @@ Options::Status Options::parse(int argc, const char **argv) { } // while (*options != 0); } // for (int i = 1; i < argc; ++i); + // Set options for test mode + if (test) { + outputFile = nullOutput; + keepSource = true; + } + // Input file defaults to standard input if not provided. if (localInputFiles.empty()) { localInputFiles.emplace_back(kStdIn); @@ -399,11 +405,6 @@ Options::Status Options::parse(int argc, const char **argv) { verbosity = 1; } - // Set options for test mode - if (test) { - outputFile = nullOutput; - keepSource = true; - } return Status::Success; } From 25086d9bc623cd7e1203d72e74f61c990ba46d0a Mon Sep 17 00:00:00 2001 From: Nick Terrell Date: Fri, 28 Oct 2016 14:24:15 -0700 Subject: [PATCH 13/13] [pzstd] Move -I flags to PZSTD_CPPFLAGS --- contrib/pzstd/Makefile | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile index 4f63887d3..2de50416c 100644 --- a/contrib/pzstd/Makefile +++ b/contrib/pzstd/Makefile @@ -24,16 +24,18 @@ TESTFLAGS ?= DEPFLAGS = -MMD -MP -MF $*.Td POSTCOMPILE = mv -f $*.Td $*.d -# CFLAGS, CXXFLAGS, and LDFLAGS are for the users to override +# CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override CFLAGS ?= -O3 -Wall -Wextra CXXFLAGS ?= -O3 -Wall -Wextra -pedantic -std=c++11 +CPPFLAGS ?= LDFLAGS ?= -# Googletest default flags +# Include flags PZSTD_INC = -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(PROGDIR) -I. GTEST_INC = -isystem googletest/googletest/include -PZSTD_CCXXFLAGS = $(PZSTD_INC) $(GTEST_INC) +PZSTD_CPPFLAGS = $(PZSTD_INC) $(GTEST_INC) +PZSTD_CCXXFLAGS = PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS) PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS) PZSTD_LDFLAGS =