mirror of
https://github.com/facebook/zstd.git
synced 2025-03-07 01:10:04 +02:00
zstdmt: applies new parameters on the fly
when invoked from ZSTD_compress_generic()
This commit is contained in:
parent
90eca318a7
commit
4b525af53a
@ -3222,13 +3222,17 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
|
|||||||
/* compression stage */
|
/* compression stage */
|
||||||
#ifdef ZSTD_MULTITHREAD
|
#ifdef ZSTD_MULTITHREAD
|
||||||
if (cctx->appliedParams.nbWorkers > 0) {
|
if (cctx->appliedParams.nbWorkers > 0) {
|
||||||
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
|
if (cctx->cParamsChanged) {
|
||||||
if ( ZSTD_isError(flushMin)
|
ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, cctx->requestedParams.compressionLevel, cctx->requestedParams.cParams);
|
||||||
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
|
cctx->cParamsChanged = 0;
|
||||||
ZSTD_startNewCompression(cctx);
|
|
||||||
}
|
}
|
||||||
return flushMin;
|
{ size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
|
||||||
}
|
if ( ZSTD_isError(flushMin)
|
||||||
|
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
|
||||||
|
ZSTD_startNewCompression(cctx);
|
||||||
|
}
|
||||||
|
return flushMin;
|
||||||
|
} }
|
||||||
#endif
|
#endif
|
||||||
CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) );
|
CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) );
|
||||||
DEBUGLOG(5, "completed ZSTD_compress_generic");
|
DEBUGLOG(5, "completed ZSTD_compress_generic");
|
||||||
|
@ -666,11 +666,11 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
|
|||||||
return jobParams;
|
return jobParams;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() :
|
/*! ZSTDMT_updateCParams_whileCompressing() :
|
||||||
* Update compression level and parameters (except wlog)
|
* Update compression level and parameters (except wlog)
|
||||||
* while compression is ongoing.
|
* while compression is ongoing.
|
||||||
* New parameters will be applied to next compression job. */
|
* New parameters will be applied to next compression job. */
|
||||||
void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams)
|
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams)
|
||||||
{
|
{
|
||||||
U32 const wlog = cParams.windowLog;
|
U32 const wlog = cParams.windowLog;
|
||||||
mtctx->params.cParams = cParams;
|
mtctx->params.cParams = cParams;
|
||||||
|
@ -121,11 +121,11 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_param
|
|||||||
* Also reset jobSize and overlapLog */
|
* Also reset jobSize and overlapLog */
|
||||||
size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
|
size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
|
||||||
|
|
||||||
/*! ZSTDMT_MTCtx_updateParametersWhileCompressing() :
|
/*! ZSTDMT_updateCParams_whileCompressing() :
|
||||||
* Update compression level and parameters (except wlog)
|
* Update compression level and parameters (except wlog)
|
||||||
* while compression is ongoing.
|
* while compression is ongoing.
|
||||||
* New parameters will be applied to next compression job. */
|
* New parameters will be applied to next compression job. */
|
||||||
void ZSTDMT_MTCtx_updateParametersWhileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams);
|
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams);
|
||||||
|
|
||||||
/* ZSTDMT_getNbWorkers():
|
/* ZSTDMT_getNbWorkers():
|
||||||
* @return nb threads currently active in mtctx.
|
* @return nb threads currently active in mtctx.
|
||||||
|
@ -781,7 +781,8 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
|
|||||||
}
|
}
|
||||||
if (READY_FOR_UPDATE()) {
|
if (READY_FOR_UPDATE()) {
|
||||||
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
|
||||||
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
|
DISPLAYUPDATE(2, "\r(%i) Read :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
|
||||||
|
compressionLevel,
|
||||||
(U32)(zfp.ingested >> 20),
|
(U32)(zfp.ingested >> 20),
|
||||||
(U32)(zfp.consumed >> 20),
|
(U32)(zfp.consumed >> 20),
|
||||||
(U32)(zfp.produced >> 20),
|
(U32)(zfp.produced >> 20),
|
||||||
|
@ -53,7 +53,7 @@ static const U32 nbTestsDefault = 30000;
|
|||||||
/*-************************************
|
/*-************************************
|
||||||
* Display Macros
|
* Display Macros
|
||||||
**************************************/
|
**************************************/
|
||||||
#define DISPLAY(...) fprintf(stdout, __VA_ARGS__)
|
#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
|
||||||
#define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); }
|
#define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); }
|
||||||
static U32 g_displayLevel = 2;
|
static U32 g_displayLevel = 2;
|
||||||
|
|
||||||
@ -63,7 +63,7 @@ static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
|
|||||||
#define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \
|
#define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \
|
||||||
if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \
|
if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \
|
||||||
{ g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \
|
{ g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \
|
||||||
if (g_displayLevel>=4) fflush(stdout); } }
|
if (g_displayLevel>=4) fflush(stderr); } }
|
||||||
|
|
||||||
|
|
||||||
#undef MIN
|
#undef MIN
|
||||||
|
Loading…
x
Reference in New Issue
Block a user