From 4b525af53afbc6fd5f13b021b8f707ebe2ac4514 Mon Sep 17 00:00:00 2001 From: Yann Collet Date: Fri, 2 Feb 2018 15:58:13 -0800 Subject: [PATCH] zstdmt: applies new parameters on the fly when invoked from ZSTD_compress_generic() --- lib/compress/zstd_compress.c | 16 ++++++++++------ lib/compress/zstdmt_compress.c | 4 ++-- lib/compress/zstdmt_compress.h | 4 ++-- programs/fileio.c | 3 ++- tests/fuzzer.c | 4 ++-- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 67cc49b7b..fe916b50c 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3222,13 +3222,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, /* compression stage */ #ifdef ZSTD_MULTITHREAD if (cctx->appliedParams.nbWorkers > 0) { - size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); - if ( ZSTD_isError(flushMin) - || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ - ZSTD_startNewCompression(cctx); + if (cctx->cParamsChanged) { + ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, cctx->requestedParams.compressionLevel, cctx->requestedParams.cParams); + cctx->cParamsChanged = 0; } - return flushMin; - } + { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); + if ( ZSTD_isError(flushMin) + || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ + ZSTD_startNewCompression(cctx); + } + return flushMin; + } } #endif CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) ); DEBUGLOG(5, "completed ZSTD_compress_generic"); diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 3542623e9..a2deac145 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -666,11 +666,11 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) return jobParams; } -/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() : +/*! ZSTDMT_updateCParams_whileCompressing() : * Update compression level and parameters (except wlog) * while compression is ongoing. * New parameters will be applied to next compression job. */ -void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams) +void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams) { U32 const wlog = cParams.windowLog; mtctx->params.cParams = cParams; diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index c25521d38..4364f100d 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -121,11 +121,11 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_param * Also reset jobSize and overlapLog */ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers); -/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() : +/*! ZSTDMT_updateCParams_whileCompressing() : * Update compression level and parameters (except wlog) * while compression is ongoing. * New parameters will be applied to next compression job. */ -void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams); +void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams); /* ZSTDMT_getNbWorkers(): * @return nb threads currently active in mtctx. diff --git a/programs/fileio.c b/programs/fileio.c index f1c9b9224..0cc807a11 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -781,7 +781,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, } if (READY_FOR_UPDATE()) { ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); - DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", + DISPLAYUPDATE(2, "\r(%i) Read :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", + compressionLevel, (U32)(zfp.ingested >> 20), (U32)(zfp.consumed >> 20), (U32)(zfp.produced >> 20), diff --git a/tests/fuzzer.c b/tests/fuzzer.c index f42b8b7a7..e7c92edc0 100644 --- a/tests/fuzzer.c +++ b/tests/fuzzer.c @@ -53,7 +53,7 @@ static const U32 nbTestsDefault = 30000; /*-************************************ * Display Macros **************************************/ -#define DISPLAY(...) fprintf(stdout, __VA_ARGS__) +#define DISPLAY(...) fprintf(stderr, __VA_ARGS__) #define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } static U32 g_displayLevel = 2; @@ -63,7 +63,7 @@ static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; #define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \ if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \ { g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \ - if (g_displayLevel>=4) fflush(stdout); } } + if (g_displayLevel>=4) fflush(stderr); } } #undef MIN