1
0
mirror of https://github.com/facebook/zstd.git synced 2025-10-31 16:47:48 +02:00

Merge pull request #4517 from Cyan4973/asyncio_revisit

Remove asyncio from the compression path
This commit is contained in:
Yann Collet
2025-10-26 13:39:25 -08:00
committed by GitHub

View File

@@ -125,6 +125,271 @@ char const* FIO_lzmaVersion(void)
#define TEMPORARY_FILE_PERMISSIONS (0600)
#endif
#ifndef ZSTD_NOCOMPRESS
/* *************************************
* Synchronous compression IO helpers
* Lightweight wrapper used by compression paths to manage buffered
* reads/writes without the async job machinery.
***************************************/
typedef struct {
const FIO_prefs_t* prefs;
FILE* srcFile;
FILE* dstFile;
unsigned storedSkips;
U8* inBuffer;
size_t inCapacity;
U8* srcBuffer;
size_t srcBufferLoaded;
U8* outBuffer;
size_t outCapacity;
} FIO_SyncCompressIO;
static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io,
const FIO_prefs_t* prefs,
size_t inCapacity,
size_t outCapacity);
static void FIO_SyncCompressIO_destroy(FIO_SyncCompressIO* io);
static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file);
static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io);
static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file);
static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io);
static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave);
static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n);
static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size);
static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io);
static unsigned FIO_sparseWrite(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips)
{
const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */
size_t bufferSizeT = bufferSize / sizeof(size_t);
const size_t* const bufferTEnd = bufferT + bufferSizeT;
const size_t* ptrT = bufferT;
static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */
if (prefs->testMode) return 0; /* do not output anything in test mode */
if (!prefs->sparseFileSupport) { /* normal write */
size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
if (sizeCheck != bufferSize)
EXM_THROW(70, "Write error : cannot write block : %s",
strerror(errno));
return 0;
}
/* avoid int overflow */
if (storedSkips > 1 GB) {
if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0)
EXM_THROW(91, "1 GB skip error (sparse file support)");
storedSkips -= 1 GB;
}
while (ptrT < bufferTEnd) {
size_t nb0T;
/* adjust last segment if < 32 KB */
size_t seg0SizeT = segmentSizeT;
if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT;
bufferSizeT -= seg0SizeT;
/* count leading zeroes */
for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ;
storedSkips += (unsigned)(nb0T * sizeof(size_t));
if (nb0T != seg0SizeT) { /* not all 0s */
size_t const nbNon0ST = seg0SizeT - nb0T;
/* skip leading zeros */
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
storedSkips = 0;
/* write the rest */
if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
EXM_THROW(93, "Write error : cannot write block : %s",
strerror(errno));
}
ptrT += seg0SizeT;
}
{ static size_t const maskT = sizeof(size_t)-1;
if (bufferSize & maskT) {
/* size not multiple of sizeof(size_t) : implies end of block */
const char* const restStart = (const char*)bufferTEnd;
const char* restPtr = restStart;
const char* const restEnd = (const char*)buffer + bufferSize;
assert(restEnd > restStart && restEnd < restStart + sizeof(size_t));
for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ;
storedSkips += (unsigned) (restPtr - restStart);
if (restPtr != restEnd) {
/* not all remaining bytes are 0 */
size_t const restSize = (size_t)(restEnd - restPtr);
if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0)
EXM_THROW(92, "Sparse skip error ; try --no-sparse");
if (fwrite(restPtr, 1, restSize, file) != restSize)
EXM_THROW(95, "Write error : cannot write end of decoded block : %s",
strerror(errno));
storedSkips = 0;
} } }
return storedSkips;
}
static void FIO_sparseWriteEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
{
if (file == NULL) return;
if (prefs->testMode) {
assert(storedSkips == 0);
return;
}
if (storedSkips>0) {
assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */
if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0)
EXM_THROW(69, "Final skip error (sparse file support)");
/* last zero must be explicitly written,
* so that skipped ones get implicitly translated as zero by FS */
{ const char lastZeroByte[1] = { 0 };
if (fwrite(lastZeroByte, 1, 1, file) != 1)
EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno));
}
}
}
static void FIO_SyncCompressIO_init(FIO_SyncCompressIO* io,
const FIO_prefs_t* prefs,
size_t inCapacity,
size_t outCapacity)
{
memset(io, 0, sizeof(*io));
io->prefs = prefs;
io->inCapacity = inCapacity;
io->outCapacity = outCapacity;
io->inBuffer = (U8*)malloc(inCapacity);
if (!io->inBuffer)
EXM_THROW(101, "Allocation error : not enough memory");
io->outBuffer = (U8*)malloc(outCapacity);
if (!io->outBuffer) {
free(io->inBuffer);
io->inBuffer = NULL;
EXM_THROW(101, "Allocation error : not enough memory");
}
io->srcBuffer = io->inBuffer;
io->srcBufferLoaded = 0;
}
static void FIO_SyncCompressIO_destroy(FIO_SyncCompressIO* io)
{
if (!io) return;
free(io->inBuffer);
free(io->outBuffer);
io->inBuffer = NULL;
io->outBuffer = NULL;
io->srcBuffer = NULL;
io->srcBufferLoaded = 0;
io->srcFile = NULL;
io->dstFile = NULL;
io->storedSkips = 0;
}
static void FIO_SyncCompressIO_setSrc(FIO_SyncCompressIO* io, FILE* file)
{
io->srcFile = file;
io->srcBuffer = io->inBuffer;
io->srcBufferLoaded = 0;
}
static void FIO_SyncCompressIO_clearSrc(FIO_SyncCompressIO* io)
{
io->srcFile = NULL;
io->srcBuffer = io->inBuffer;
io->srcBufferLoaded = 0;
}
static void FIO_SyncCompressIO_setDst(FIO_SyncCompressIO* io, FILE* file)
{
io->dstFile = file;
io->storedSkips = 0;
}
static int FIO_SyncCompressIO_closeDst(FIO_SyncCompressIO* io)
{
int result = 0;
if (io->dstFile != NULL) {
FIO_SyncCompressIO_finish(io);
result = fclose(io->dstFile);
io->dstFile = NULL;
}
return result;
}
static size_t FIO_SyncCompressIO_fillBuffer(FIO_SyncCompressIO* io, size_t minToHave)
{
size_t added = 0;
if (io->srcFile == NULL)
return 0;
if (minToHave > io->inCapacity)
minToHave = io->inCapacity;
if (io->srcBufferLoaded >= minToHave)
return 0;
if (io->srcBuffer != io->inBuffer) {
if (io->srcBufferLoaded > 0)
memmove(io->inBuffer, io->srcBuffer, io->srcBufferLoaded);
io->srcBuffer = io->inBuffer;
}
while (io->srcBufferLoaded < minToHave) {
size_t const toRead = io->inCapacity - io->srcBufferLoaded;
size_t const readBytes = fread(io->inBuffer + io->srcBufferLoaded, 1, toRead, io->srcFile);
if (readBytes == 0) {
if (ferror(io->srcFile))
EXM_THROW(37, "Read error");
break; /* EOF */
}
io->srcBufferLoaded += readBytes;
added += readBytes;
if (readBytes < toRead)
break;
}
return added;
}
static void FIO_SyncCompressIO_consumeBytes(FIO_SyncCompressIO* io, size_t n)
{
assert(n <= io->srcBufferLoaded);
io->srcBuffer += n;
io->srcBufferLoaded -= n;
if (io->srcBufferLoaded == 0)
io->srcBuffer = io->inBuffer;
}
static void FIO_SyncCompressIO_commitOut(FIO_SyncCompressIO* io, const void* buffer, size_t size)
{
if (size == 0)
return;
if (io->dstFile == NULL) {
assert(io->prefs->testMode);
return;
}
io->storedSkips = FIO_sparseWrite(io->dstFile, buffer, size, io->prefs, io->storedSkips);
}
static void FIO_SyncCompressIO_finish(FIO_SyncCompressIO* io)
{
if (io->dstFile == NULL)
return;
FIO_sparseWriteEnd(io->prefs, io->dstFile, io->storedSkips);
io->storedSkips = 0;
}
#endif /* ZSTD_NOCOMPRESS */
/*-************************************
* Signal (Ctrl-C trapping)
**************************************/
@@ -1078,8 +1343,7 @@ typedef struct {
const char* dictFileName;
stat_t dictFileStat;
ZSTD_CStream* cctx;
WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
FIO_SyncCompressIO io;
} cRess_t;
/** ZSTD_cycleLog() :
@@ -1147,8 +1411,7 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
dictBufferType = (useMMap && !forceNoUseMMap) ? FIO_mmapDict : FIO_mallocDict;
FIO_initDict(&ress.dict, dictFileName, prefs, &ress.dictFileStat, dictBufferType); /* works with dictFileName==NULL */
ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
FIO_SyncCompressIO_init(&ress.io, prefs, ZSTD_CStreamInSize(), ZSTD_CStreamOutSize());
/* Advanced parameters, including dictionary */
if (dictFileName && (ress.dict.dictBuffer==NULL))
@@ -1212,21 +1475,20 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
static void FIO_freeCResources(cRess_t* const ress)
{
FIO_freeDict(&(ress->dict));
AIO_WritePool_free(ress->writeCtx);
AIO_ReadPool_free(ress->readCtx);
FIO_SyncCompressIO_destroy(&ress->io);
ZSTD_freeCStream(ress->cctx); /* never fails */
}
#ifdef ZSTD_GZCOMPRESS
static unsigned long long
FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but not changed */
FIO_compressGzFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize)
{
FIO_SyncCompressIO* const syncIO = &ress->io;
unsigned long long inFileSize = 0, outFileSize = 0;
z_stream strm;
IOJob_t *writeJob = NULL;
if (compressionLevel > Z_BEST_COMPRESSION)
compressionLevel = Z_BEST_COMPRESSION;
@@ -1242,37 +1504,36 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
} }
writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_in = 0;
strm.avail_in = 0;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_out = (Bytef*)syncIO->outBuffer;
strm.avail_out = (uInt)syncIO->outCapacity;
while (1) {
int ret;
if (strm.avail_in == 0) {
AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
if (ress->readCtx->srcBufferLoaded == 0) break;
inFileSize += ress->readCtx->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize());
if (syncIO->srcBufferLoaded == 0) break;
inFileSize += added;
*readsize += added;
strm.next_in = (z_const unsigned char*)syncIO->srcBuffer;
strm.avail_in = (uInt)syncIO->srcBufferLoaded;
}
{
size_t const availBefore = strm.avail_in;
ret = deflate(&strm, Z_NO_FLUSH);
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in);
}
if (ret != Z_OK)
EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
{ size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out);
if (cSize) {
writeJob->usedBufferSize = cSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize);
outFileSize += cSize;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_out = (Bytef*)syncIO->outBuffer;
strm.avail_out = (uInt)syncIO->outCapacity;
} }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
DISPLAYUPDATE_PROGRESS(
@@ -1288,13 +1549,12 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
while (1) {
int const ret = deflate(&strm, Z_FINISH);
{ size_t const cSize = writeJob->bufferSize - strm.avail_out;
{ size_t const cSize = (size_t)((uInt)syncIO->outCapacity - strm.avail_out);
if (cSize) {
writeJob->usedBufferSize = cSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, cSize);
outFileSize += cSize;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_out = (Bytef*)syncIO->outBuffer;
strm.avail_out = (uInt)syncIO->outCapacity;
} }
if (ret == Z_STREAM_END) break;
if (ret != Z_BUF_ERROR)
@@ -1306,8 +1566,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no
EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
} }
*readsize = inFileSize;
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
FIO_SyncCompressIO_finish(syncIO);
return outFileSize;
}
#endif
@@ -1319,11 +1578,11 @@ FIO_compressLzmaFrame(cRess_t* ress,
const char* srcFileName, U64 const srcFileSize,
int compressionLevel, U64* readsize, int plain_lzma)
{
FIO_SyncCompressIO* const syncIO = &ress->io;
unsigned long long inFileSize = 0, outFileSize = 0;
lzma_stream strm = LZMA_STREAM_INIT;
lzma_action action = LZMA_RUN;
lzma_ret ret;
IOJob_t *writeJob = NULL;
if (compressionLevel < 0) compressionLevel = 0;
if (compressionLevel > 9) compressionLevel = 9;
@@ -1341,37 +1600,35 @@ FIO_compressLzmaFrame(cRess_t* ress,
EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
}
writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
strm.next_out = (BYTE*)writeJob->buffer;
strm.avail_out = writeJob->bufferSize;
strm.next_out = (BYTE*)syncIO->outBuffer;
strm.avail_out = syncIO->outCapacity;
strm.next_in = 0;
strm.avail_in = 0;
while (1) {
if (strm.avail_in == 0) {
size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
inFileSize += inSize;
strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
strm.avail_in = ress->readCtx->srcBufferLoaded;
size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize());
if (syncIO->srcBufferLoaded == 0) action = LZMA_FINISH;
inFileSize += added;
*readsize += added;
strm.next_in = (BYTE const*)syncIO->srcBuffer;
strm.avail_in = syncIO->srcBufferLoaded;
}
{
size_t const availBefore = strm.avail_in;
ret = lzma_code(&strm, action);
AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
FIO_SyncCompressIO_consumeBytes(syncIO, availBefore - strm.avail_in);
}
if (ret != LZMA_OK && ret != LZMA_STREAM_END)
EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
{ size_t const compBytes = writeJob->bufferSize - strm.avail_out;
{ size_t const compBytes = syncIO->outCapacity - strm.avail_out;
if (compBytes) {
writeJob->usedBufferSize = compBytes;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, compBytes);
outFileSize += compBytes;
strm.next_out = (BYTE*)writeJob->buffer;
strm.avail_out = writeJob->bufferSize;
strm.next_out = (BYTE*)syncIO->outBuffer;
strm.avail_out = syncIO->outCapacity;
} }
if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
DISPLAYUPDATE_PROGRESS("\rRead : %u MB ==> %.2f%%",
@@ -1387,8 +1644,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
lzma_end(&strm);
*readsize = inFileSize;
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
FIO_SyncCompressIO_finish(syncIO);
return outFileSize;
}
@@ -1409,21 +1665,20 @@ FIO_compressLz4Frame(cRess_t* ress,
int compressionLevel, int checksumFlag,
U64* readsize)
{
FIO_SyncCompressIO* const syncIO = &ress->io;
const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB);
unsigned long long inFileSize = 0, outFileSize = 0;
LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx;
IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(errorCode))
EXM_THROW(31, "zstd: failed to create lz4 compression context");
memset(&prefs, 0, sizeof(prefs));
assert(blockSize <= ress->readCtx->base.jobBufferSize);
assert(blockSize <= syncIO->inCapacity);
/* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
prefs.autoFlush = 0;
@@ -1434,25 +1689,26 @@ FIO_compressLz4Frame(cRess_t* ress,
#if LZ4_VERSION_NUMBER >= 10600
prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
#endif
assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
assert(LZ4F_compressBound(blockSize, &prefs) <= syncIO->outCapacity);
{
size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
size_t headerSize = LZ4F_compressBegin(ctx, syncIO->outBuffer, syncIO->outCapacity, &prefs);
if (LZ4F_isError(headerSize))
EXM_THROW(33, "File header generation failed : %s",
LZ4F_getErrorName(headerSize));
writeJob->usedBufferSize = headerSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize);
outFileSize += headerSize;
/* Read first block */
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
{
size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize);
inFileSize += added;
*readsize += added;
}
/* Main Loop */
while (ress->readCtx->srcBufferLoaded) {
size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
ress->readCtx->srcBuffer, inSize, NULL);
while (syncIO->srcBufferLoaded) {
size_t const inSize = MIN(blockSize, syncIO->srcBufferLoaded);
size_t const outSize = LZ4F_compressUpdate(ctx, syncIO->outBuffer, syncIO->outCapacity,
syncIO->srcBuffer, inSize, NULL);
if (LZ4F_isError(outSize))
EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
srcFileName, LZ4F_getErrorName(outSize));
@@ -1467,30 +1723,27 @@ FIO_compressLz4Frame(cRess_t* ress,
(double)outFileSize/(double)inFileSize*100);
}
/* Write Block */
writeJob->usedBufferSize = outSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outSize);
/* Read next block */
AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
FIO_SyncCompressIO_consumeBytes(syncIO, inSize);
{
size_t const added = FIO_SyncCompressIO_fillBuffer(syncIO, blockSize);
inFileSize += added;
*readsize += added;
}
}
/* End of Stream mark */
headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
headerSize = LZ4F_compressEnd(ctx, syncIO->outBuffer, syncIO->outCapacity, NULL);
if (LZ4F_isError(headerSize))
EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
srcFileName, LZ4F_getErrorName(headerSize));
writeJob->usedBufferSize = headerSize;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, headerSize);
outFileSize += headerSize;
}
*readsize = inFileSize;
LZ4F_freeCompressionContext(ctx);
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ress->writeCtx);
FIO_SyncCompressIO_finish(syncIO);
return outFileSize;
}
@@ -1499,12 +1752,11 @@ FIO_compressLz4Frame(cRess_t* ress,
static unsigned long long
FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
const cRess_t* ressPtr,
cRess_t* ress,
const char* srcFileName, U64 fileSize,
int compressionLevel, U64* readsize)
{
cRess_t const ress = *ressPtr;
IOJob_t* writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
FIO_SyncCompressIO* const syncIO = &ress->io;
U64 compressedfilesize = 0;
ZSTD_EndDirective directive = ZSTD_e_continue;
@@ -1529,16 +1781,16 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
/* init */
if (fileSize != UTIL_FILESIZE_UNKNOWN) {
pledgedSrcSize = fileSize;
CHECK(ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize));
CHECK(ZSTD_CCtx_setPledgedSrcSize(ress->cctx, fileSize));
} else if (prefs->streamSrcSize > 0) {
/* unknown source size; use the declared stream size */
pledgedSrcSize = prefs->streamSrcSize;
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, prefs->streamSrcSize) );
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress->cctx, prefs->streamSrcSize) );
}
{ int windowLog;
UTIL_HumanReadableSize_t windowSize;
CHECK(ZSTD_CCtx_getParameter(ress.cctx, ZSTD_c_windowLog, &windowLog));
CHECK(ZSTD_CCtx_getParameter(ress->cctx, ZSTD_c_windowLog, &windowLog));
if (windowLog == 0) {
if (prefs->ldmFlag) {
/* If long mode is set without a window size libzstd will set this size internally */
@@ -1556,12 +1808,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
do {
size_t stillToFlush;
/* Fill input Buffer */
size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
ZSTD_inBuffer inBuff = setInBuffer( ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 );
size_t const inSize = FIO_SyncCompressIO_fillBuffer(syncIO, ZSTD_CStreamInSize());
ZSTD_inBuffer inBuff = setInBuffer( syncIO->srcBuffer, syncIO->srcBufferLoaded, 0 );
DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
*readsize += inSize;
if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
if ((syncIO->srcBufferLoaded == 0) || (*readsize == fileSize))
directive = ZSTD_e_end;
stillToFlush = 1;
@@ -1569,10 +1821,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
|| (directive == ZSTD_e_end && stillToFlush != 0) ) {
size_t const oldIPos = inBuff.pos;
ZSTD_outBuffer outBuff = setOutBuffer( writeJob->buffer, writeJob->bufferSize, 0 );
size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
ZSTD_outBuffer outBuff = setOutBuffer( syncIO->outBuffer, syncIO->outCapacity, 0 );
size_t const toFlushNow = ZSTD_toFlushNow(ress->cctx);
CHECK_V(stillToFlush, ZSTD_compressStream2(ress->cctx, &outBuff, &inBuff, directive));
FIO_SyncCompressIO_consumeBytes(syncIO, inBuff.pos - oldIPos);
/* count stats */
inputPresented++;
@@ -1583,14 +1835,13 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
(unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
if (outBuff.pos) {
writeJob->usedBufferSize = outBuff.pos;
AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
FIO_SyncCompressIO_commitOut(syncIO, syncIO->outBuffer, outBuff.pos);
compressedfilesize += outBuff.pos;
}
/* adaptive mode : statistics measurement and speed correction */
if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx);
lastAdaptTime = UTIL_getTime();
@@ -1663,14 +1914,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
compressionLevel += (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel);
}
if (speedChange == faster) {
DISPLAYLEVEL(6, "faster speed , lighter compression \n")
compressionLevel --;
if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
compressionLevel -= (compressionLevel == 0); /* skip 0 */
ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
ZSTD_CCtx_setParameter(ress->cctx, ZSTD_c_compressionLevel, compressionLevel);
}
speedChange = noChange;
@@ -1680,7 +1931,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
/* display notification */
if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress->cctx);
double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
@@ -1727,8 +1978,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
(unsigned long long)*readsize, (unsigned long long)fileSize);
}
AIO_WritePool_releaseIoJob(writeJob);
AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
FIO_SyncCompressIO_finish(syncIO);
return compressedfilesize;
}
@@ -1741,7 +1991,7 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
static int
FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
cRess_t ress,
cRess_t* ress,
const char* dstFileName, const char* srcFileName,
int compressionLevel)
{
@@ -1756,12 +2006,12 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
switch (prefs->compressionType) {
default:
case FIO_zstdCompression:
compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, &ress, srcFileName, fileSize, compressionLevel, &readsize);
compressedfilesize = FIO_compressZstdFrame(fCtx, prefs, ress, srcFileName, fileSize, compressionLevel, &readsize);
break;
case FIO_gzipCompression:
#ifdef ZSTD_GZCOMPRESS
compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
compressedfilesize = FIO_compressGzFrame(ress, srcFileName, fileSize, compressionLevel, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n",
@@ -1772,7 +2022,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
case FIO_xzCompression:
case FIO_lzmaCompression:
#ifdef ZSTD_LZMACOMPRESS
compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression);
compressedfilesize = FIO_compressLzmaFrame(ress, srcFileName, fileSize, compressionLevel, &readsize, prefs->compressionType==FIO_lzmaCompression);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n",
@@ -1782,7 +2032,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
case FIO_lz4Compression:
#ifdef ZSTD_LZ4COMPRESS
compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize);
compressedfilesize = FIO_compressLz4Frame(ress, srcFileName, fileSize, compressionLevel, prefs->checksumFlag, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n",
@@ -1838,7 +2088,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
*/
static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
cRess_t ress,
cRess_t* ress,
const char* dstFileName,
const char* srcFileName,
const stat_t* srcFileStat,
@@ -1849,8 +2099,7 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
int transferStat = 0;
int dstFd = -1;
assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
if (ress->io.dstFile == NULL) {
int dstFileInitialPermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp (srcFileName, stdinmark)
&& strcmp (dstFileName, stdoutmark)
@@ -1861,15 +2110,13 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
closeDstFile = 1;
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
{ FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions);
{
FILE *dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFileInitialPermissions);
if (dstFile==NULL) return 1; /* could not open dstFileName */
dstFd = fileno(dstFile);
AIO_WritePool_setFile(ress.writeCtx, dstFile);
FIO_SyncCompressIO_setDst(&ress->io, dstFile);
}
/* Must only be added after FIO_openDstFile() succeeds.
* Otherwise we may delete the destination file if it already exists,
* and the user presses Ctrl-C when asked if they wish to overwrite.
*/
/* Must only be added after FIO_openDstFile() succeeds. */
addHandler(dstFileName);
}
@@ -1883,7 +2130,7 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
}
DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
if (FIO_SyncCompressIO_closeDst(&ress->io)) { /* error closing file */
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result=1;
}
@@ -1892,10 +2139,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
UTIL_utime(dstFileName, srcFileStat);
}
if ( (result != 0) /* operation failure */
&& strcmp(dstFileName, stdoutmark) /* special case : don't remove() stdout */
) {
FIO_removeFile(dstFileName); /* remove compression artefact; note don't do anything special if remove() fails */
if ( (result != 0)
&& strcmp(dstFileName, stdoutmark) ) {
FIO_removeFile(dstFileName);
}
}
@@ -2029,7 +2275,7 @@ static const char *compressedFileExtensions[] = {
static int
FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
FIO_prefs_t* const prefs,
cRess_t ress,
cRess_t* ress,
const char* dstFileName,
const char* srcFileName,
int compressionLevel)
@@ -2051,7 +2297,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
}
/* ensure src is not the same as dict (if present) */
if (ress.dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress.dictFileName, &srcFileStat, &ress.dictFileStat)) {
if (ress->dictFileName != NULL && UTIL_isSameFileStat(srcFileName, ress->dictFileName, &srcFileStat, &ress->dictFileStat)) {
DISPLAYLEVEL(1, "zstd: cannot use %s as an input file and dictionary \n", srcFileName);
return 1;
}
@@ -2070,23 +2316,21 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
if (srcFile == NULL) return 1; /* srcFile could not be opened */
/* Don't use AsyncIO for small files */
if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
fileSize = UTIL_getFileSizeStat(&srcFileStat);
if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) {
AIO_ReadPool_setAsync(ress.readCtx, 0);
AIO_WritePool_setAsync(ress.writeCtx, 0);
} else {
AIO_ReadPool_setAsync(ress.readCtx, 1);
AIO_WritePool_setAsync(ress.writeCtx, 1);
}
(void)fileSize;
AIO_ReadPool_setFile(ress.readCtx, srcFile);
FIO_SyncCompressIO_setSrc(&ress->io, srcFile);
result = FIO_compressFilename_dstFile(
fCtx, prefs, ress,
dstFileName, srcFileName,
&srcFileStat, compressionLevel);
AIO_ReadPool_closeFile(ress.readCtx);
FIO_SyncCompressIO_clearSrc(&ress->io);
if (srcFile != NULL && fclose(srcFile)) {
DISPLAYLEVEL(1, "zstd: %s: %s \n", srcFileName, strerror(errno));
return 1;
}
if ( prefs->removeSrcFile /* --rm */
&& result == 0 /* success */
@@ -2153,7 +2397,7 @@ int FIO_compressFilename(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, const
int compressionLevel, ZSTD_compressionParameters comprParams)
{
cRess_t ress = FIO_createCResources(prefs, dictFileName, UTIL_getFileSize(srcFileName), compressionLevel, comprParams);
int const result = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
int const result = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel);
#define DISPLAY_LEVEL_DEFAULT 2
@@ -2250,13 +2494,13 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
if (dstFile == NULL) { /* could not open outFileName */
error = 1;
} else {
AIO_WritePool_setFile(ress.writeCtx, dstFile);
FIO_SyncCompressIO_setDst(&ress.io, dstFile);
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}
if (AIO_WritePool_closeFile(ress.writeCtx))
if (FIO_SyncCompressIO_closeDst(&ress.io))
EXM_THROW(29, "Write error (%s) : cannot properly close %s",
strerror(errno), outFileName);
}
@@ -2280,7 +2524,7 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
} else {
dstFileName = FIO_determineCompressedName(srcFileName, outDirName, suffix); /* cannot fail */
}
status = FIO_compressFilename_srcFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
status = FIO_compressFilename_srcFile(fCtx, prefs, &ress, dstFileName, srcFileName, compressionLevel);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}