mirror of
https://github.com/facebook/zstd.git
synced 2025-03-06 16:56:49 +02:00
fixed : calling ZSTD_compress_generic() to end-flush a stream in multiple steps
This commit is contained in:
parent
bd18c885a3
commit
559ee82e90
@ -24,7 +24,8 @@ const char* ERR_getErrorString(ERR_enum code)
|
|||||||
case PREFIX(frameParameter_unsupported): return "Unsupported frame parameter";
|
case PREFIX(frameParameter_unsupported): return "Unsupported frame parameter";
|
||||||
case PREFIX(frameParameter_unsupportedBy32bits): return "Frame parameter unsupported in 32-bits mode";
|
case PREFIX(frameParameter_unsupportedBy32bits): return "Frame parameter unsupported in 32-bits mode";
|
||||||
case PREFIX(frameParameter_windowTooLarge): return "Frame requires too much memory for decoding";
|
case PREFIX(frameParameter_windowTooLarge): return "Frame requires too much memory for decoding";
|
||||||
case PREFIX(compressionParameter_unsupported): return "Compression parameter is out of bound";
|
case PREFIX(compressionParameter_unsupported): return "Compression parameter is not supported";
|
||||||
|
case PREFIX(compressionParameter_outOfBound): return "Compression parameter is out of bound";
|
||||||
case PREFIX(init_missing): return "Context should be init first";
|
case PREFIX(init_missing): return "Context should be init first";
|
||||||
case PREFIX(memory_allocation): return "Allocation error : not enough memory";
|
case PREFIX(memory_allocation): return "Allocation error : not enough memory";
|
||||||
case PREFIX(stage_wrong): return "Operation not authorized at current processing stage";
|
case PREFIX(stage_wrong): return "Operation not authorized at current processing stage";
|
||||||
|
@ -35,8 +35,11 @@ extern "C" {
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
/*-****************************************
|
/*-****************************************
|
||||||
* error codes list
|
* error codes list
|
||||||
******************************************/
|
* note : this API is still considered unstable
|
||||||
|
* it should not be used with a dynamic library
|
||||||
|
* only static linking is allowed
|
||||||
|
******************************************/
|
||||||
typedef enum {
|
typedef enum {
|
||||||
ZSTD_error_no_error,
|
ZSTD_error_no_error,
|
||||||
ZSTD_error_GENERIC,
|
ZSTD_error_GENERIC,
|
||||||
@ -47,6 +50,7 @@ typedef enum {
|
|||||||
ZSTD_error_frameParameter_unsupportedBy32bits,
|
ZSTD_error_frameParameter_unsupportedBy32bits,
|
||||||
ZSTD_error_frameParameter_windowTooLarge,
|
ZSTD_error_frameParameter_windowTooLarge,
|
||||||
ZSTD_error_compressionParameter_unsupported,
|
ZSTD_error_compressionParameter_unsupported,
|
||||||
|
ZSTD_error_compressionParameter_outOfBound,
|
||||||
ZSTD_error_init_missing,
|
ZSTD_error_init_missing,
|
||||||
ZSTD_error_memory_allocation,
|
ZSTD_error_memory_allocation,
|
||||||
ZSTD_error_stage_wrong,
|
ZSTD_error_stage_wrong,
|
||||||
@ -67,7 +71,7 @@ typedef enum {
|
|||||||
|
|
||||||
/*! ZSTD_getErrorCode() :
|
/*! ZSTD_getErrorCode() :
|
||||||
convert a `size_t` function result into a `ZSTD_ErrorCode` enum type,
|
convert a `size_t` function result into a `ZSTD_ErrorCode` enum type,
|
||||||
which can be used to compare directly with enum list published into "error_public.h" */
|
which can be used to compare with enum list published above */
|
||||||
ZSTDERRORLIB_API ZSTD_ErrorCode ZSTD_getErrorCode(size_t functionResult);
|
ZSTDERRORLIB_API ZSTD_ErrorCode ZSTD_getErrorCode(size_t functionResult);
|
||||||
ZSTDERRORLIB_API const char* ZSTD_getErrorString(ZSTD_ErrorCode code);
|
ZSTDERRORLIB_API const char* ZSTD_getErrorString(ZSTD_ErrorCode code);
|
||||||
|
|
||||||
|
@ -241,7 +241,7 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
|
|||||||
{
|
{
|
||||||
# define CLAMPCHECK(val,min,max) { \
|
# define CLAMPCHECK(val,min,max) { \
|
||||||
if ((val<min) | (val>max)) { \
|
if ((val<min) | (val>max)) { \
|
||||||
return ERROR(compressionParameter_unsupported); \
|
return ERROR(compressionParameter_outOfBound); \
|
||||||
} }
|
} }
|
||||||
|
|
||||||
if (cctx->streamStage != zcss_init) return ERROR(stage_wrong);
|
if (cctx->streamStage != zcss_init) return ERROR(stage_wrong);
|
||||||
@ -342,19 +342,20 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
|
|||||||
if (cctx->staticSize) /* MT not compatible with static alloc */
|
if (cctx->staticSize) /* MT not compatible with static alloc */
|
||||||
return ERROR(compressionParameter_unsupported);
|
return ERROR(compressionParameter_unsupported);
|
||||||
ZSTDMT_freeCCtx(cctx->mtctx);
|
ZSTDMT_freeCCtx(cctx->mtctx);
|
||||||
cctx->nbThreads = value;
|
cctx->nbThreads = 1;
|
||||||
cctx->mtctx = ZSTDMT_createCCtx(value);
|
cctx->mtctx = ZSTDMT_createCCtx(value);
|
||||||
if (cctx->mtctx == NULL) return ERROR(memory_allocation);
|
if (cctx->mtctx == NULL) return ERROR(memory_allocation);
|
||||||
}
|
cctx->nbThreads = value;
|
||||||
cctx->nbThreads = 1;
|
} else
|
||||||
|
cctx->nbThreads = 1;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
case ZSTDMT_p_jobSize:
|
case ZSTD_p_jobSize:
|
||||||
if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported);
|
if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported);
|
||||||
assert(cctx->mtctx != NULL);
|
assert(cctx->mtctx != NULL);
|
||||||
return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_sectionSize, value);
|
return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_sectionSize, value);
|
||||||
|
|
||||||
case ZSTDMT_p_overlapSizeLog:
|
case ZSTD_p_overlapSizeLog:
|
||||||
if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported);
|
if (cctx->nbThreads <= 1) return ERROR(compressionParameter_unsupported);
|
||||||
assert(cctx->mtctx != NULL);
|
assert(cctx->mtctx != NULL);
|
||||||
return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_overlapSectionLog, value);
|
return ZSTDMT_setMTCtxParameter(cctx->mtctx, ZSTDMT_p_overlapSectionLog, value);
|
||||||
@ -3648,8 +3649,8 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
|
|||||||
&& (zcs->inBuffPos == zcs->inToCompress) ) {
|
&& (zcs->inBuffPos == zcs->inToCompress) ) {
|
||||||
/* empty */
|
/* empty */
|
||||||
someMoreWork = 0; break;
|
someMoreWork = 0; break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* compress current block (note : this stage cannot be stopped in the middle) */
|
/* compress current block (note : this stage cannot be stopped in the middle) */
|
||||||
DEBUGLOG(5, "stream compression stage (flushMode==%u)", flushMode);
|
DEBUGLOG(5, "stream compression stage (flushMode==%u)", flushMode);
|
||||||
{ void* cDst;
|
{ void* cDst;
|
||||||
@ -3658,7 +3659,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
|
|||||||
size_t oSize = oend-op;
|
size_t oSize = oend-op;
|
||||||
unsigned const lastBlock = (flushMode == ZSTD_e_end) && (ip==iend);
|
unsigned const lastBlock = (flushMode == ZSTD_e_end) && (ip==iend);
|
||||||
if (oSize >= ZSTD_compressBound(iSize))
|
if (oSize >= ZSTD_compressBound(iSize))
|
||||||
cDst = op; /* compress directly into output buffer (skip flush stage) */
|
cDst = op; /* compress into output buffer, to skip flush stage */
|
||||||
else
|
else
|
||||||
cDst = zcs->outBuff, oSize = zcs->outBuffSize;
|
cDst = zcs->outBuff, oSize = zcs->outBuffSize;
|
||||||
cSize = lastBlock ?
|
cSize = lastBlock ?
|
||||||
@ -3667,7 +3668,6 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
|
|||||||
ZSTD_compressContinue(zcs, cDst, oSize,
|
ZSTD_compressContinue(zcs, cDst, oSize,
|
||||||
zcs->inBuff + zcs->inToCompress, iSize);
|
zcs->inBuff + zcs->inToCompress, iSize);
|
||||||
if (ZSTD_isError(cSize)) return cSize;
|
if (ZSTD_isError(cSize)) return cSize;
|
||||||
DEBUGLOG(5, "cSize = %u (lastBlock:%u)", (U32)cSize, lastBlock);
|
|
||||||
zcs->frameEnded = lastBlock;
|
zcs->frameEnded = lastBlock;
|
||||||
/* prepare next block */
|
/* prepare next block */
|
||||||
zcs->inBuffTarget = zcs->inBuffPos + zcs->blockSize;
|
zcs->inBuffTarget = zcs->inBuffPos + zcs->blockSize;
|
||||||
@ -3681,7 +3681,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
|
|||||||
if (cDst == op) { /* no need to flush */
|
if (cDst == op) { /* no need to flush */
|
||||||
op += cSize;
|
op += cSize;
|
||||||
if (zcs->frameEnded) {
|
if (zcs->frameEnded) {
|
||||||
DEBUGLOG(5, "Frame directly completed");
|
DEBUGLOG(5, "Frame completed directly in outBuffer");
|
||||||
someMoreWork = 0;
|
someMoreWork = 0;
|
||||||
zcs->streamStage = zcss_init;
|
zcs->streamStage = zcss_init;
|
||||||
}
|
}
|
||||||
@ -3707,7 +3707,7 @@ static size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
|
|||||||
}
|
}
|
||||||
zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0;
|
zcs->outBuffContentSize = zcs->outBuffFlushedSize = 0;
|
||||||
if (zcs->frameEnded) {
|
if (zcs->frameEnded) {
|
||||||
DEBUGLOG(5, "Frame completed");
|
DEBUGLOG(5, "Frame completed on flush");
|
||||||
someMoreWork = 0;
|
someMoreWork = 0;
|
||||||
zcs->streamStage = zcss_init;
|
zcs->streamStage = zcss_init;
|
||||||
break;
|
break;
|
||||||
@ -3781,15 +3781,19 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
|
|||||||
|
|
||||||
#ifdef ZSTD_MULTITHREAD
|
#ifdef ZSTD_MULTITHREAD
|
||||||
if (cctx->nbThreads > 1) {
|
if (cctx->nbThreads > 1) {
|
||||||
|
DEBUGLOG(5, "calling ZSTDMT_compressStream_generic(%i,...)", endOp);
|
||||||
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
|
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
|
||||||
|
DEBUGLOG(5, "ZSTDMT result : %u", (U32)flushMin);
|
||||||
if (ZSTD_isError(flushMin)) cctx->streamStage = zcss_init;
|
if (ZSTD_isError(flushMin)) cctx->streamStage = zcss_init;
|
||||||
|
if (endOp == ZSTD_e_end && flushMin==0)
|
||||||
|
cctx->streamStage = zcss_init; /* compression completed */
|
||||||
return flushMin;
|
return flushMin;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
DEBUGLOG(5, "starting ZSTD_compressStream_generic");
|
DEBUGLOG(5, "calling ZSTD_compressStream_generic(%i,...)", endOp);
|
||||||
CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) );
|
CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) );
|
||||||
DEBUGLOG(5, "completing ZSTD_compress_generic");
|
DEBUGLOG(5, "completed ZSTD_compress_generic");
|
||||||
return cctx->outBuffContentSize - cctx->outBuffFlushedSize; /* remaining to flush */
|
return cctx->outBuffContentSize - cctx->outBuffFlushedSize; /* remaining to flush */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -704,7 +704,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
|
|||||||
zcs->inBuff.filled);
|
zcs->inBuff.filled);
|
||||||
DEBUGLOG(5, "new inBuff pre-filled");
|
DEBUGLOG(5, "new inBuff pre-filled");
|
||||||
zcs->dictSize = newDictSize;
|
zcs->dictSize = newDictSize;
|
||||||
} else {
|
} else { /* if (endFrame==1) */
|
||||||
zcs->inBuff.buffer = g_nullBuffer;
|
zcs->inBuff.buffer = g_nullBuffer;
|
||||||
zcs->inBuff.filled = 0;
|
zcs->inBuff.filled = 0;
|
||||||
zcs->dictSize = 0;
|
zcs->dictSize = 0;
|
||||||
@ -768,7 +768,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
|
|||||||
zcs->jobs[wJobID].jobScanned = 1;
|
zcs->jobs[wJobID].jobScanned = 1;
|
||||||
}
|
}
|
||||||
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
|
{ size_t const toWrite = MIN(job.cSize - job.dstFlushed, output->size - output->pos);
|
||||||
DEBUGLOG(4, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
|
DEBUGLOG(5, "Flushing %u bytes from job %u ", (U32)toWrite, zcs->doneJobID);
|
||||||
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
|
memcpy((char*)output->dst + output->pos, (const char*)job.dstBuff.start + job.dstFlushed, toWrite);
|
||||||
output->pos += toWrite;
|
output->pos += toWrite;
|
||||||
job.dstFlushed += toWrite;
|
job.dstFlushed += toWrite;
|
||||||
@ -808,11 +808,11 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
|
|||||||
|
|
||||||
if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|
if ( (zcs->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */
|
||||||
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
|
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) { /* avoid overwriting job round buffer */
|
||||||
CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0) );
|
CHECK_F( ZSTDMT_createCompressionJob(zcs, zcs->targetSectionSize, 0 /* blockToFlush */) );
|
||||||
}
|
}
|
||||||
|
|
||||||
/* check for data to flush */
|
/* check for data to flush */
|
||||||
CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize)) ); /* block if it wasn't possible to create new job due to saturation */
|
CHECK_F( ZSTDMT_flushNextJob(zcs, output, (zcs->inBuff.filled == zcs->inBuffSize) /* blockToFlush */) ); /* block if it wasn't possible to create new job due to saturation */
|
||||||
|
|
||||||
/* recommended next input size : fill current input buffer */
|
/* recommended next input size : fill current input buffer */
|
||||||
return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
|
return zcs->inBuffSize - zcs->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
|
||||||
@ -823,16 +823,20 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
|
|||||||
{
|
{
|
||||||
size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
|
size_t const srcSize = zcs->inBuff.filled - zcs->dictSize;
|
||||||
|
|
||||||
if (srcSize) DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
|
if (srcSize)
|
||||||
|
DEBUGLOG(5, "flushing : %u bytes left to compress", (U32)srcSize);
|
||||||
if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
|
if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
|
||||||
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
|
&& (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
|
||||||
|
DEBUGLOG(5, "create new job with %u bytes to compress", (U32)srcSize);
|
||||||
|
DEBUGLOG(5, "end order : %u", endFrame);
|
||||||
CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
|
CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
|
||||||
|
DEBUGLOG(5, "resulting zcs->frameEnded : %u", zcs->frameEnded);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* check if there is any data available to flush */
|
/* check if there is any data available to flush */
|
||||||
DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u",
|
DEBUGLOG(5, "zcs->doneJobID : %u ; zcs->nextJobID : %u",
|
||||||
zcs->doneJobID, zcs->nextJobID);
|
zcs->doneJobID, zcs->nextJobID);
|
||||||
return ZSTDMT_flushNextJob(zcs, output, 1);
|
return ZSTDMT_flushNextJob(zcs, output, 1 /*blockToFlush */);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -840,14 +844,14 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
|
|||||||
{
|
{
|
||||||
if (zcs->nbThreads==1)
|
if (zcs->nbThreads==1)
|
||||||
return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
|
return ZSTD_flushStream(zcs->cctxPool->cctx[0], output);
|
||||||
return ZSTDMT_flushStream_internal(zcs, output, 0);
|
return ZSTDMT_flushStream_internal(zcs, output, 0 /* endFrame */);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
|
size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output)
|
||||||
{
|
{
|
||||||
if (zcs->nbThreads==1)
|
if (zcs->nbThreads==1)
|
||||||
return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
|
return ZSTD_endStream(zcs->cctxPool->cctx[0], output);
|
||||||
return ZSTDMT_flushStream_internal(zcs, output, 1);
|
return ZSTDMT_flushStream_internal(zcs, output, 1 /* endFrame */);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
||||||
@ -855,7 +859,8 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
|
|||||||
ZSTD_inBuffer* input,
|
ZSTD_inBuffer* input,
|
||||||
ZSTD_EndDirective endOp)
|
ZSTD_EndDirective endOp)
|
||||||
{
|
{
|
||||||
CHECK_F(ZSTDMT_compressStream(mtctx, output, input));
|
if (input->pos < input->size) /* exclude final flushes */
|
||||||
|
CHECK_F(ZSTDMT_compressStream(mtctx, output, input));
|
||||||
switch(endOp)
|
switch(endOp)
|
||||||
{
|
{
|
||||||
case ZSTD_e_flush:
|
case ZSTD_e_flush:
|
||||||
|
@ -702,11 +702,11 @@ typedef enum {
|
|||||||
* More threads improve speed, but also increase memory usage.
|
* More threads improve speed, but also increase memory usage.
|
||||||
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
|
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled.
|
||||||
* Special: value 0 means "do not change nbThreads" */
|
* Special: value 0 means "do not change nbThreads" */
|
||||||
ZSTDMT_p_jobSize, /* Size of a compression job. Each compression job is completed in parallel.
|
ZSTD_p_jobSize, /* Size of a compression job. Each compression job is completed in parallel.
|
||||||
* 0 means default, which is dynamically determined based on compression parameters.
|
* 0 means default, which is dynamically determined based on compression parameters.
|
||||||
* Job size must be a minimum of overlapSize, or 1 KB, whichever is largest
|
* Job size must be a minimum of overlapSize, or 1 KB, whichever is largest
|
||||||
* The minimum size is automatically and transparently enforced */
|
* The minimum size is automatically and transparently enforced */
|
||||||
ZSTDMT_p_overlapSizeLog, /* Size of previous input reloaded at the beginning of each job.
|
ZSTD_p_overlapSizeLog, /* Size of previous input reloaded at the beginning of each job.
|
||||||
* 0 => no overlap, 6(default) => use 1/8th of windowSize, >=9 => use full windowSize */
|
* 0 => no overlap, 6(default) => use 1/8th of windowSize, >=9 => use full windowSize */
|
||||||
|
|
||||||
/* advanced parameters - may not remain available after API update */
|
/* advanced parameters - may not remain available after API update */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user