1
0
mirror of https://github.com/facebook/zstd.git synced 2025-03-06 08:49:28 +02:00

zbuff uses ZSTD_compressEnd()

This commit is contained in:
Yann Collet 2016-07-28 19:55:09 +02:00
parent 16e73033ad
commit 60ba31c570
10 changed files with 66 additions and 62 deletions

View File

@ -137,8 +137,8 @@ ZSTDLIB_API size_t ZBUFF_decompressContinue(ZBUFF_DCtx* dctx,
* The function will report how many bytes were read or written by modifying *srcSizePtr and *dstCapacityPtr.
* Note that it may not consume the entire input, in which case it's up to the caller to present remaining input again.
* The content of `dst` will be overwritten (up to *dstCapacityPtr) at each function call, so save its content if it matters, or change `dst`.
* @return : a hint to preferred nb of bytes to use as input for next function call (it's only a hint, to help latency),
* or 0 when a frame is completely decoded,
* @return : 0 when a frame is completely decoded and fully flushed,
>0 when decoding is not finished, with value being a suggested next input size (it's just a hint, tends to help latency),
* or an error code, which can be tested using ZBUFF_isError().
*
* Hint : recommended buffer sizes (not compulsory) : ZBUFF_recommendedDInSize() and ZBUFF_recommendedDOutSize()

View File

@ -46,7 +46,7 @@
static size_t const ZBUFF_endFrameSize = ZSTD_BLOCKHEADERSIZE;
/*_**************************************************
/*-***********************************************************
* Streaming compression
*
* A ZBUFF_CCtx object is required to track streaming operation.
@ -77,7 +77,7 @@ static size_t const ZBUFF_endFrameSize = ZSTD_BLOCKHEADERSIZE;
* Hint : recommended buffer sizes (not compulsory)
* input : ZSTD_BLOCKSIZE_MAX (128 KB), internal unit size, it improves latency to use this value.
* output : ZSTD_compressBound(ZSTD_BLOCKSIZE_MAX) + ZSTD_blockHeaderSize + ZBUFF_endFrameSize : ensures it's always possible to write/flush/end a full block at best speed.
* **************************************************/
* ***********************************************************/
typedef enum { ZBUFFcs_init, ZBUFFcs_load, ZBUFFcs_flush, ZBUFFcs_final } ZBUFF_cStage;
@ -96,6 +96,7 @@ struct ZBUFF_CCtx_s {
size_t outBuffFlushedSize;
ZBUFF_cStage stage;
U32 checksum;
U32 frameEnded;
ZSTD_customMem customMem;
}; /* typedef'd tp ZBUFF_CCtx within "zstd_buffered.h" */
@ -166,6 +167,7 @@ size_t ZBUFF_compressInit_advanced(ZBUFF_CCtx* zbc,
zbc->outBuffContentSize = zbc->outBuffFlushedSize = 0;
zbc->stage = ZBUFFcs_load;
zbc->checksum = params.fParams.checksumFlag > 0;
zbc->frameEnded = 0;
return 0; /* ready to go */
}
@ -198,7 +200,7 @@ typedef enum { zbf_gather, zbf_flush, zbf_end } ZBUFF_flush_e;
static size_t ZBUFF_compressContinue_generic(ZBUFF_CCtx* zbc,
void* dst, size_t* dstCapacityPtr,
const void* src, size_t* srcSizePtr,
ZBUFF_flush_e flush)
ZBUFF_flush_e const flush)
{
U32 someMoreWork = 1;
const char* const istart = (const char*)src;
@ -231,8 +233,11 @@ static size_t ZBUFF_compressContinue_generic(ZBUFF_CCtx* zbc,
cDst = op; /* compress directly into output buffer (avoid flush stage) */
else
cDst = zbc->outBuff, oSize = zbc->outBuffSize;
cSize = ZSTD_compressContinue(zbc->zc, cDst, oSize, zbc->inBuff + zbc->inToCompress, iSize);
cSize = (flush == zbf_end) ?
ZSTD_compressEnd(zbc->zc, cDst, oSize, zbc->inBuff + zbc->inToCompress, iSize) :
ZSTD_compressContinue(zbc->zc, cDst, oSize, zbc->inBuff + zbc->inToCompress, iSize);
if (ZSTD_isError(cSize)) return cSize;
if (flush == zbf_end) zbc->frameEnded = 1;
/* prepare next block */
zbc->inBuffTarget = zbc->inBuffPos + zbc->blockSize;
if (zbc->inBuffTarget > zbc->inBuffSize)
@ -266,6 +271,7 @@ static size_t ZBUFF_compressContinue_generic(ZBUFF_CCtx* zbc,
*srcSizePtr = ip - istart;
*dstCapacityPtr = op - ostart;
if (zbc->frameEnded) return 0;
{ size_t hintInSize = zbc->inBuffTarget - zbc->inBuffPos;
if (hintInSize==0) hintInSize = zbc->blockSize;
return hintInSize;
@ -301,16 +307,17 @@ size_t ZBUFF_compressEnd(ZBUFF_CCtx* zbc, void* dst, size_t* dstCapacityPtr)
/* flush whatever remains */
size_t outSize = *dstCapacityPtr;
size_t srcSize = 0;
size_t const uselessHint = ZBUFF_compressContinue_generic(zbc, dst, &outSize, &srcSize, &srcSize, zbf_end); /* use a valid address instead of NULL */
size_t const notEnded = ZBUFF_compressContinue_generic(zbc, dst, &outSize, &srcSize, &srcSize, zbf_end); /* use a valid address instead of NULL */
size_t const remainingToFlush = zbc->outBuffContentSize - zbc->outBuffFlushedSize;
op += outSize; (void)uselessHint;
op += outSize;
if (remainingToFlush) {
*dstCapacityPtr = op-ostart;
return remainingToFlush + ZBUFF_endFrameSize + (zbc->checksum * 4);
}
/* create epilogue */
zbc->stage = ZBUFFcs_final;
zbc->outBuffContentSize = ZSTD_compressEnd(zbc->zc, zbc->outBuff, zbc->outBuffSize, NULL, 0); /* epilogue into outBuff */
zbc->outBuffContentSize = !notEnded ? 0 :
ZSTD_compressEnd(zbc->zc, zbc->outBuff, zbc->outBuffSize, NULL, 0); /* write epilogue into outBuff */
}
/* flush epilogue */

View File

@ -158,9 +158,9 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
char* const ostart = (char*)dst;
char* const oend = ostart + *dstCapacityPtr;
char* op = ostart;
U32 notDone = 1;
U32 someMoreWork = 1;
while (notDone) {
while (someMoreWork) {
switch(zbd->stage)
{
case ZBUFFds_init :
@ -168,9 +168,9 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
case ZBUFFds_loadHeader :
{ size_t const hSize = ZSTD_getFrameParams(&(zbd->fParams), zbd->headerBuffer, zbd->lhSize);
if (hSize != 0) {
if (ZSTD_isError(hSize)) return hSize;
if (hSize != 0) { /* need more input */
size_t const toLoad = hSize - zbd->lhSize; /* if hSize!=0, hSize > zbd->lhSize */
if (ZSTD_isError(hSize)) return hSize;
if (toLoad > (size_t)(iend-ip)) { /* not enough input to load full header */
memcpy(zbd->headerBuffer + zbd->lhSize, ip, iend-ip);
zbd->lhSize += iend-ip;
@ -184,7 +184,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
/* Consume header */
{ size_t const h1Size = ZSTD_nextSrcSizeToDecompress(zbd->zd); /* == ZSTD_frameHeaderSize_min */
size_t const h1Result = ZSTD_decompressContinue(zbd->zd, NULL, 0, zbd->headerBuffer, h1Size);
if (ZSTD_isError(h1Result)) return h1Result;
if (ZSTD_isError(h1Result)) return h1Result; /* should not happen : already checked */
if (h1Size < zbd->lhSize) { /* long header */
size_t const h2Size = ZSTD_nextSrcSizeToDecompress(zbd->zd);
size_t const h2Result = ZSTD_decompressContinue(zbd->zd, NULL, 0, zbd->headerBuffer+h1Size, h2Size);
@ -195,6 +195,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
/* Frame header instruct buffer sizes */
{ size_t const blockSize = MIN(zbd->fParams.windowSize, ZSTD_BLOCKSIZE_ABSOLUTEMAX);
size_t const neededOutSize = zbd->fParams.windowSize + blockSize;
zbd->blockSize = blockSize;
if (zbd->inBuffSize < blockSize) {
zbd->customMem.customFree(zbd->customMem.opaque, zbd->inBuff);
@ -202,20 +203,20 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
zbd->inBuff = (char*)zbd->customMem.customAlloc(zbd->customMem.opaque, blockSize);
if (zbd->inBuff == NULL) return ERROR(memory_allocation);
}
{ size_t const neededOutSize = zbd->fParams.windowSize + blockSize;
if (zbd->outBuffSize < neededOutSize) {
zbd->customMem.customFree(zbd->customMem.opaque, zbd->outBuff);
zbd->outBuffSize = neededOutSize;
zbd->outBuff = (char*)zbd->customMem.customAlloc(zbd->customMem.opaque, neededOutSize);
if (zbd->outBuff == NULL) return ERROR(memory_allocation);
} } }
if (zbd->outBuffSize < neededOutSize) {
zbd->customMem.customFree(zbd->customMem.opaque, zbd->outBuff);
zbd->outBuffSize = neededOutSize;
zbd->outBuff = (char*)zbd->customMem.customAlloc(zbd->customMem.opaque, neededOutSize);
if (zbd->outBuff == NULL) return ERROR(memory_allocation);
} }
zbd->stage = ZBUFFds_read;
/* pass-through */
case ZBUFFds_read:
{ size_t const neededInSize = ZSTD_nextSrcSizeToDecompress(zbd->zd);
if (neededInSize==0) { /* end of frame */
zbd->stage = ZBUFFds_init;
notDone = 0;
someMoreWork = 0;
break;
}
if ((size_t)(iend-ip) >= neededInSize) { /* decode directly from src */
@ -230,8 +231,9 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
zbd->stage = ZBUFFds_flush;
break;
}
if (ip==iend) { notDone = 0; break; } /* no more input */
if (ip==iend) { someMoreWork = 0; break; } /* no more input */
zbd->stage = ZBUFFds_load;
/* pass-through */
}
case ZBUFFds_load:
@ -242,7 +244,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
loadedSize = ZBUFF_limitCopy(zbd->inBuff + zbd->inPos, toLoad, ip, iend-ip);
ip += loadedSize;
zbd->inPos += loadedSize;
if (loadedSize < toLoad) { notDone = 0; break; } /* not enough input, wait for more */
if (loadedSize < toLoad) { someMoreWork = 0; break; } /* not enough input, wait for more */
/* decode loaded input */
{ const int isSkipFrame = ZSTD_isSkipFrame(zbd->zd);
@ -254,7 +256,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
if (!decodedSize && !isSkipFrame) { zbd->stage = ZBUFFds_read; break; } /* this was just a header */
zbd->outEnd = zbd->outStart + decodedSize;
zbd->stage = ZBUFFds_flush;
// break; /* ZBUFFds_flush follows */
/* pass-through */
} }
case ZBUFFds_flush:
@ -262,14 +264,14 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
size_t const flushedSize = ZBUFF_limitCopy(op, oend-op, zbd->outBuff + zbd->outStart, toFlushSize);
op += flushedSize;
zbd->outStart += flushedSize;
if (flushedSize == toFlushSize) {
if (flushedSize == toFlushSize) { /* flush completed */
zbd->stage = ZBUFFds_read;
if (zbd->outStart + zbd->blockSize > zbd->outBuffSize)
zbd->outStart = zbd->outEnd = 0;
break;
}
/* cannot flush everything */
notDone = 0;
someMoreWork = 0;
break;
}
default: return ERROR(GENERIC); /* impossible */
@ -279,13 +281,15 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd,
*srcSizePtr = ip-istart;
*dstCapacityPtr = op-ostart;
{ size_t nextSrcSizeHint = ZSTD_nextSrcSizeToDecompress(zbd->zd);
if (!nextSrcSizeHint) return (zbd->outEnd != zbd->outStart); /* return 0 only if fully flushed too */
if (nextSrcSizeHint > 4) nextSrcSizeHint += ZSTD_blockHeaderSize;
if (zbd->inPos > nextSrcSizeHint) return ERROR(GENERIC); /* should never happen */
nextSrcSizeHint -= zbd->inPos; /* already loaded*/
return nextSrcSizeHint;
}
}
/* *************************************
* Tool functions
***************************************/

View File

@ -979,18 +979,12 @@ size_t ZSTD_decompress(void* dst, size_t dstCapacity, const void* src, size_t sr
}
/*_******************************
* Streaming Decompression API
********************************/
size_t ZSTD_nextSrcSizeToDecompress(ZSTD_DCtx* dctx)
{
return dctx->expected;
}
/*-**********************************
* Streaming Decompression API
************************************/
size_t ZSTD_nextSrcSizeToDecompress(ZSTD_DCtx* dctx) { return dctx->expected; }
int ZSTD_isSkipFrame(ZSTD_DCtx* dctx)
{
return dctx->stage == ZSTDds_skipFrame;
}
int ZSTD_isSkipFrame(ZSTD_DCtx* dctx) { return dctx->stage == ZSTDds_skipFrame; } /* for zbuff */
/** ZSTD_decompressContinue() :
* @return : nb of bytes generated into `dst` (necessarily <= `dstCapacity)
@ -1079,11 +1073,12 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c
if (dctx->stage == ZSTDds_decompressLastBlock) { /* end of frame */
if (dctx->fParams.checksumFlag) { /* another round for frame checksum */
dctx->expected = 0;
dctx->expected = 4;
dctx->stage = ZSTDds_checkChecksum;
} else {
dctx->expected = 0; /* ends here */
dctx->stage = ZSTDds_getFrameHeaderSize;
}
dctx->expected = 0; /* ends here */
dctx->stage = ZSTDds_getFrameHeaderSize;
} else {
dctx->stage = ZSTDds_decodeBlockHeader;
dctx->expected = ZSTD_blockHeaderSize;

View File

@ -316,6 +316,7 @@ ZSTDLIB_API size_t ZSTD_sizeofDCtx(const ZSTD_DCtx* dctx);
* But it's also a complex one, with a lot of restrictions (documented below).
* For an easier streaming API, look into common/zbuff.h
* which removes all restrictions by allocating and managing its own internal buffer */
ZSTDLIB_API size_t ZSTD_compressBegin(ZSTD_CCtx* cctx, int compressionLevel);
ZSTDLIB_API size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel);
ZSTDLIB_API size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, ZSTD_parameters params, unsigned long long pledgedSrcSize);
@ -360,7 +361,7 @@ typedef struct {
unsigned checksumFlag;
} ZSTD_frameParams;
ZSTDLIB_API size_t ZSTD_getFrameParams(ZSTD_frameParams* fparamsPtr, const void* src, size_t srcSize); /**< doesn't consume input */
ZSTDLIB_API size_t ZSTD_getFrameParams(ZSTD_frameParams* fparamsPtr, const void* src, size_t srcSize); /**< doesn't consume input, see details below */
ZSTDLIB_API size_t ZSTD_decompressBegin(ZSTD_DCtx* dctx);
ZSTDLIB_API size_t ZSTD_decompressBegin_usingDict(ZSTD_DCtx* dctx, const void* dict, size_t dictSize);

View File

@ -39,7 +39,7 @@
#define MB *(1 <<20)
#define GB *(1U<<30)
#define SIZE_DEFAULT (64 KB)
#define SIZE_DEFAULT ((64 KB) + 1)
#define SEED_DEFAULT 0
#define COMPRESSIBILITY_DEFAULT 50
@ -72,15 +72,13 @@ static int usage(const char* programName)
int main(int argc, const char** argv)
{
int argNb;
double proba = (double)COMPRESSIBILITY_DEFAULT / 100;
double litProba = 0.0;
U64 size = SIZE_DEFAULT;
U32 seed = SEED_DEFAULT;
const char* programName;
const char* const programName = argv[0];
/* Check command line */
programName = argv[0];
int argNb;
for(argNb=1; argNb<argc; argNb++) {
const char* argument = argv[argNb];

View File

@ -636,13 +636,12 @@ unsigned long long FIO_decompressFrame(dRess_t ress,
DISPLAYUPDATE(2, "\rDecoded : %u MB... ", (U32)(frameSize>>20) );
if (toRead == 0) break; /* end of frame */
if (readSize) EXM_THROW(38, "Decoding error : should consume entire input");
if (readSize) EXM_THROW(37, "Decoding error : should consume entire input");
/* Fill input buffer */
if (toRead > ress.srcBufferSize) EXM_THROW(34, "too large block");
if (toRead > ress.srcBufferSize) EXM_THROW(38, "too large block");
readSize = fread(ress.srcBuffer, 1, toRead, finput);
if (readSize != toRead)
EXM_THROW(35, "Read error");
if (readSize == 0) EXM_THROW(39, "Read error : premature end");
}
FIO_fwriteSparseEnd(foutput, storedSkips);
@ -710,8 +709,8 @@ static int FIO_decompressSrcFile(dRess_t ress, const char* srcFileName)
continue;
}
#endif
if (((magic & 0xFFFFFFF0U) != ZSTD_MAGIC_SKIPPABLE_START) && (magic != ZSTD_MAGICNUMBER)) {
if (g_overwrite) { /* -df : pass-through mode */
if (((magic & 0xFFFFFFF0U) != ZSTD_MAGIC_SKIPPABLE_START) & (magic != ZSTD_MAGICNUMBER)) {
if ((g_overwrite) && !strcmp (srcFileName, stdinmark)) { /* pass-through mode */
unsigned const result = FIO_passThrough(dstFile, srcFile, ress.srcBuffer, ress.srcBufferSize);
if (fclose(srcFile)) EXM_THROW(32, "zstd: %s close error", srcFileName); /* error should never happen */
return result;

View File

@ -142,8 +142,8 @@ $ECHO "\n**** multiple files tests **** "
./datagen -s1 > tmp1 2> $INTOVOID
./datagen -s2 -g100K > tmp2 2> $INTOVOID
./datagen -s3 -g1M > tmp3 2> $INTOVOID
$ZSTD -f tmp*
$ECHO "compress tmp* : "
$ZSTD -f tmp*
ls -ls tmp*
rm tmp1 tmp2 tmp3
$ECHO "decompress tmp* : "

View File

@ -424,23 +424,22 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres
U32 const enoughDstSize = dstBuffSize >= remainingToFlush;
remainingToFlush = ZBUFF_compressEnd(zc, cBuffer+cSize, &dstBuffSize);
CHECK (ZBUFF_isError(remainingToFlush), "flush error : %s", ZBUFF_getErrorName(remainingToFlush));
//DISPLAY("flush %u bytes : still within context : %i \n", (U32)dstBuffSize, (int)remainingToFlush);
CHECK (enoughDstSize && remainingToFlush, "ZBUFF_compressEnd() not fully flushed, but enough space available");
CHECK (enoughDstSize && remainingToFlush, "ZBUFF_compressEnd() not fully flushed (%u remaining), but enough space available", (U32)remainingToFlush);
cSize += dstBuffSize;
} }
crcOrig = XXH64_digest(&xxhState);
/* multi - fragments decompression test */
ZBUFF_decompressInitDictionary(zd, dict, dictSize);
for (totalCSize = 0, totalGenSize = 0 ; totalCSize < cSize ; ) {
errorCode = 1;
for (totalCSize = 0, totalGenSize = 0 ; errorCode ; ) {
size_t readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog);
size_t dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize);
size_t const decompressError = ZBUFF_decompressContinue(zd, dstBuffer+totalGenSize, &dstBuffSize, cBuffer+totalCSize, &readCSrcSize);
CHECK (ZBUFF_isError(decompressError), "decompression error : %s", ZBUFF_getErrorName(decompressError));
errorCode = ZBUFF_decompressContinue(zd, dstBuffer+totalGenSize, &dstBuffSize, cBuffer+totalCSize, &readCSrcSize);
CHECK (ZBUFF_isError(errorCode), "decompression error : %s", ZBUFF_getErrorName(errorCode));
totalGenSize += dstBuffSize;
totalCSize += readCSrcSize;
errorCode = decompressError; /* needed for != 0 last test */
}
CHECK (errorCode != 0, "frame not fully decoded");
CHECK (totalGenSize != totalTestSize, "decompressed data : wrong size")

View File

@ -80,7 +80,8 @@ It also features a very fast decoder, with speed > 500 MB/s per core.
verbose mode
.TP
.BR \-q ", " --quiet
suppress warnings and notifications; specify twice to suppress errors too
suppress warnings, interactivity and notifications.
specify twice to suppress errors too.
.TP
.BR \-C ", " --check
add integrity check computed from uncompressed data