1
0
mirror of https://github.com/facebook/zstd.git synced 2025-03-07 01:10:04 +02:00

Async write for decompression (#2975)

* Async IO decompression:
- Added --[no-]asyncio flag for CLI decompression.
- Replaced dstBuffer in decompression with a pool of write jobs.
- Added an ability to execute write jobs in a separate thread.
- Added an ability to wait (join) on all jobs in a thread pool (queued and running).
This commit is contained in:
Yonatan Komornik 2022-01-21 13:55:41 -08:00 committed by GitHub
parent 2f03c1996f
commit 1598e6c634
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 394 additions and 103 deletions

View File

@ -20,14 +20,24 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
join_paths(zstd_rootdir, 'programs/dibio.c'),
join_paths(zstd_rootdir, 'programs/zstdcli_trace.c'),
# needed due to use of private symbol + -fvisibility=hidden
join_paths(zstd_rootdir, 'lib/common/xxhash.c')]
zstd_c_args = libzstd_debug_cflags
if use_multi_thread
zstd_c_args += [ '-DZSTD_MULTITHREAD' ]
endif
join_paths(zstd_rootdir, 'lib/common/xxhash.c'),
join_paths(zstd_rootdir, 'lib/common/pool.c'),
join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
join_paths(zstd_rootdir, 'lib/common/error_private.c')]
zstd_deps = [ libzstd_dep ]
zstd_c_args = libzstd_debug_cflags
zstd_frugal_deps = [ libzstd_dep ]
zstd_frugal_c_args = [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ]
if use_multi_thread
zstd_deps += [ thread_dep ]
zstd_c_args += [ '-DZSTD_MULTITHREAD' ]
zstd_frugal_deps += [ thread_dep ]
zstd_frugal_c_args += [ '-DZSTD_MULTITHREAD' ]
endif
if use_zlib
zstd_deps += [ zlib_dep ]
zstd_c_args += [ '-DZSTD_GZCOMPRESS', '-DZSTD_GZDECOMPRESS' ]
@ -69,14 +79,17 @@ zstd = executable('zstd',
zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
join_paths(zstd_rootdir, 'programs/timefn.c'),
join_paths(zstd_rootdir, 'programs/util.c'),
join_paths(zstd_rootdir, 'programs/fileio.c')]
join_paths(zstd_rootdir, 'programs/fileio.c'),
join_paths(zstd_rootdir, 'lib/common/pool.c'),
join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
join_paths(zstd_rootdir, 'lib/common/error_private.c')]
# Minimal target, with only zstd compression and decompression.
# No bench. No legacy.
executable('zstd-frugal',
zstd_frugal_sources,
dependencies: libzstd_dep,
c_args: [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ],
dependencies: zstd_frugal_deps,
c_args: zstd_frugal_c_args,
install: true)
install_data(join_paths(zstd_rootdir, 'programs/zstdgrep'),

View File

@ -96,9 +96,7 @@ static void* POOL_thread(void* opaque) {
/* If the intended queue size was 0, signal after finishing job */
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
if (ctx->queueSize == 1) {
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
}
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
} /* for (;;) */
@ -190,6 +188,17 @@ void POOL_free(POOL_ctx *ctx) {
ZSTD_customFree(ctx, ctx->customMem);
}
/*! POOL_joinJobs() :
* Waits for all queued jobs to finish executing.
*/
void POOL_joinJobs(POOL_ctx* ctx) {
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
}
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
POOL_free (pool);
}
@ -330,6 +339,11 @@ void POOL_free(POOL_ctx* ctx) {
(void)ctx;
}
void POOL_joinJobs(POOL_ctx* ctx){
assert(!ctx || ctx == &g_poolCtx);
(void)ctx;
}
int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
(void)ctx; (void)numThreads;
return 0;

View File

@ -38,6 +38,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
*/
void POOL_free(POOL_ctx* ctx);
/*! POOL_joinJobs() :
* Waits for all queued jobs to finish executing.
*/
void POOL_joinJobs(POOL_ctx* ctx);
/*! POOL_resize() :
* Expands or shrinks pool's number of threads.
* This is more efficient than releasing + creating a new context,

View File

@ -34,6 +34,8 @@
#include <limits.h> /* INT_MAX */
#include <signal.h>
#include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */
#include "../lib/common/pool.h"
#include "../lib/common/threading.h"
#if defined (_MSC_VER)
# include <sys/stat.h>
@ -325,6 +327,7 @@ struct FIO_prefs_s {
/* IO preferences */
U32 removeSrcFile;
U32 overwrite;
U32 asyncIO;
/* Computation resources preferences */
unsigned memLimit;
@ -395,6 +398,7 @@ FIO_prefs_t* FIO_createPreferences(void)
ret->literalCompressionMode = ZSTD_ps_auto;
ret->excludeCompressedFiles = 0;
ret->allowBlockDevices = 0;
ret->asyncIO = 0;
return ret;
}
@ -558,6 +562,10 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value)
prefs->contentSize = value != 0;
}
void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) {
prefs->asyncIO = value;
}
/* FIO_ctx_t functions */
void FIO_setHasStdoutOutput(FIO_ctx_t* const fCtx, int value) {
@ -1798,7 +1806,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
static const char* checked_index(const char* options[], size_t length, size_t index) {
assert(index < length);
// Necessary to avoid warnings since -O3 will omit the above `assert`
/* Necessary to avoid warnings since -O3 will omit the above `assert` */
(void) length;
return options[index];
}
@ -2000,16 +2008,124 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
/* **************************************************************************
* Decompression
***************************************************************************/
#define DECOMPRESSION_MAX_WRITE_JOBS (10)
typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* writerPool;
int totalWriteJobs;
FIO_prefs_t* prefs;
/* Controls the file we currently write to, make changes only by using provided utility functions */
FILE* dstFile;
unsigned storedSkips;
/* The jobs and availableWriteJobs fields are access by both the main and writer threads and should
* only be mutated after locking the mutex */
ZSTD_pthread_mutex_t writeJobsMutex;
void* jobs[DECOMPRESSION_MAX_WRITE_JOBS];
int availableWriteJobs;
} write_pool_ctx_t;
typedef struct {
/* These fields are automaically set and shouldn't be changed by non WritePool code. */
write_pool_ctx_t *ctx;
FILE* dstFile;
void *buffer;
size_t bufferSize;
/* This field should be changed before a job is queued for execution and should contain the number
* of bytes to write from the buffer. */
size_t usedBufferSize;
} write_job_t;
typedef struct {
void* srcBuffer;
size_t srcBufferSize;
size_t srcBufferLoaded;
void* dstBuffer;
size_t dstBufferSize;
ZSTD_DStream* dctx;
FILE* dstFile;
write_pool_ctx_t *writePoolCtx;
} dRess_t;
static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) {
void *buffer;
write_job_t *job;
job = (write_job_t*) malloc(sizeof(write_job_t));
buffer = malloc(ZSTD_DStreamOutSize());
if(!job || !buffer)
EXM_THROW(101, "Allocation error : not enough memory");
job->buffer = buffer;
job->bufferSize = ZSTD_DStreamOutSize();
job->usedBufferSize = 0;
job->dstFile = NULL;
job->ctx = ctx;
return job;
}
/* WritePool_createThreadPool:
* Creates a thread pool and a mutex for threaded write pool.
* Displays warning if asyncio is requested but MT isn't available. */
static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) {
ctx->writerPool = NULL;
if(prefs->asyncIO) {
#ifdef ZSTD_MULTITHREAD
if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL))
EXM_THROW(102, "Failed creating write jobs mutex");
/* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2);
ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2);
if (!ctx->writerPool)
EXM_THROW(103, "Failed creating writer thread pool");
#else
DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n");
#endif
}
}
/* WritePool_create:
* Allocates and sets and a new write pool including its included jobs. */
static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) {
write_pool_ctx_t *ctx;
int i;
ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t));
if(!ctx)
EXM_THROW(100, "Allocation error : not enough memory");
WritePool_createThreadPool(ctx, prefs);
ctx->prefs = prefs;
ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1;
ctx->availableWriteJobs = ctx->totalWriteJobs;
for(i=0; i < ctx->availableWriteJobs; i++) {
ctx->jobs[i] = FIO_createWriteJob(ctx);
}
ctx->storedSkips = 0;
ctx->dstFile = NULL;
return ctx;
}
/* WritePool_free:
* Release a previously allocated write thread pool. Makes sure all takss are done and released. */
static void WritePool_free(write_pool_ctx_t* ctx) {
int i=0;
if(ctx->writerPool) {
/* Make sure we finish all tasks and then free the resources */
POOL_joinJobs(ctx->writerPool);
/* Make sure we are not leaking jobs */
assert(ctx->availableWriteJobs==ctx->totalWriteJobs);
POOL_free(ctx->writerPool);
ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex);
}
assert(ctx->dstFile==NULL);
assert(ctx->storedSkips==0);
for(i=0; i<ctx->availableWriteJobs; i++) {
write_job_t* job = (write_job_t*) ctx->jobs[i];
free(job->buffer);
free(job);
}
free(ctx);
}
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
{
dRess_t ress;
@ -2027,9 +2143,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
ress.srcBufferSize = ZSTD_DStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
ress.dstBufferSize = ZSTD_DStreamOutSize();
ress.dstBuffer = malloc(ress.dstBufferSize);
if (!ress.srcBuffer || !ress.dstBuffer)
if (!ress.srcBuffer)
EXM_THROW(61, "Allocation error : not enough memory");
/* dictionary */
@ -2039,6 +2153,8 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
free(dictBuffer);
}
ress.writePoolCtx = WritePool_create(prefs);
return ress;
}
@ -2046,9 +2162,16 @@ static void FIO_freeDResources(dRess_t ress)
{
CHECK( ZSTD_freeDStream(ress.dctx) );
free(ress.srcBuffer);
free(ress.dstBuffer);
WritePool_free(ress.writePoolCtx);
}
/* FIO_consumeDSrcBuffer:
* Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
assert(ress->srcBufferLoaded >= len);
ress->srcBufferLoaded -= len;
memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
}
/** FIO_fwriteSparse() :
* @return : storedSkips,
@ -2148,6 +2271,106 @@ FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedS
} }
}
/* WritePool_releaseWriteJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void WritePool_releaseWriteJob(write_job_t *job) {
write_pool_ctx_t *ctx = job->ctx;
if(ctx->writerPool) {
ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS);
ctx->jobs[ctx->availableWriteJobs++] = job;
ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
} else {
ctx->availableWriteJobs++;
}
}
/* WritePool_acquireWriteJob:
* Returns an available write job to be used for a future write. */
static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) {
write_job_t *job;
assert(ctx->dstFile!=NULL || ctx->prefs->testMode);
if(ctx->writerPool) {
ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
assert(ctx->availableWriteJobs > 0);
job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs];
ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
} else {
assert(ctx->availableWriteJobs==1);
ctx->availableWriteJobs--;
job = (write_job_t*)ctx->jobs[0];
}
job->usedBufferSize = 0;
job->dstFile = ctx->dstFile;
return job;
}
/* WritePool_executeWriteJob:
* Executes a write job synchronously. Can be used as a function for a thread pool. */
static void WritePool_executeWriteJob(void* opaque){
write_job_t* job = (write_job_t*) opaque;
write_pool_ctx_t* ctx = job->ctx;
ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips);
WritePool_releaseWriteJob(job);
}
/* WritePool_queueWriteJob:
* Queues a write job for execution.
* Make sure to set `usedBufferSize` to the wanted length before call.
* The queued job shouldn't be used directly after queueing it. */
static void WritePool_queueWriteJob(write_job_t *job) {
write_pool_ctx_t* ctx = job->ctx;
if(ctx->writerPool)
POOL_add(ctx->writerPool, WritePool_executeWriteJob, job);
else
WritePool_executeWriteJob(job);
}
/* WritePool_queueAndReacquireWriteJob:
* Queues a write job for execution and acquires a new one.
* After execution `job`'s pointed value would change to the newly acquired job.
* Make sure to set `usedBufferSize` to the wanted length before call.
* The queued job shouldn't be used directly after queueing it. */
static void WritePool_queueAndReacquireWriteJob(write_job_t **job) {
WritePool_queueWriteJob(*job);
*job = WritePool_acquireWriteJob((*job)->ctx);
}
/* WritePool_sparseWriteEnd:
* Ends sparse writes to the current dstFile.
* Blocks on completion of all current write jobs before executing. */
static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) {
assert(ctx != NULL);
if(ctx->writerPool)
POOL_joinJobs(ctx->writerPool);
FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips);
ctx->storedSkips = 0;
}
/* WritePool_setDstFile:
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) {
assert(ctx!=NULL);
/* We can change the dst file only if we have finished writing */
if(ctx->writerPool)
POOL_joinJobs(ctx->writerPool);
assert(ctx->storedSkips == 0);
assert(ctx->availableWriteJobs == ctx->totalWriteJobs);
ctx->dstFile = dstFile;
}
/* WritePool_closeDstFile:
* Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
static int WritePool_closeDstFile(write_pool_ctx_t *ctx) {
FILE *dstFile = ctx->dstFile;
assert(dstFile!=NULL || ctx->prefs->testMode!=0);
WritePool_sparseWriteEnd(ctx);
WritePool_setDstFile(ctx, NULL);
return fclose(dstFile);
}
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
@return : 0 (no error) */
@ -2224,7 +2447,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
U64 alreadyDecoded) /* for multi-frames streams */
{
U64 frameSize = 0;
U32 storedSkips = 0;
write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
/* display last 20 characters only */
{ size_t const srcFileLength = strlen(srcFileName);
@ -2244,7 +2467,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
/* Main decompression Loop */
while (1) {
ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 };
ZSTD_outBuffer outBuff= { ress->dstBuffer, ress->dstBufferSize, 0 };
ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
UTIL_HumanReadableSize_t const hrs = UTIL_makeHumanReadableSize(alreadyDecoded+frameSize);
@ -2256,7 +2479,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
}
/* Write block */
storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, prefs, storedSkips);
writeJob->usedBufferSize = outBuff.pos;
WritePool_queueAndReacquireWriteJob(&writeJob);
frameSize += outBuff.pos;
if (fCtx->nbFilesTotal > 1) {
size_t srcFileNameSize = strlen(srcFileName);
@ -2273,10 +2497,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
srcFileName, hrs.precision, hrs.value, hrs.suffix);
}
if (inBuff.pos > 0) {
memmove(ress->srcBuffer, (char*)ress->srcBuffer + inBuff.pos, inBuff.size - inBuff.pos);
ress->srcBufferLoaded -= inBuff.pos;
}
FIO_consumeDSrcBuffer(ress, inBuff.pos);
if (readSizeHint == 0) break; /* end of frame */
@ -2294,7 +2515,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
ress->srcBufferLoaded += readSize;
} } }
FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
return frameSize;
}
@ -2302,15 +2524,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
#ifdef ZSTD_GZDECOMPRESS
static unsigned long long
FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile,
const FIO_prefs_t* const prefs,
const char* srcFileName)
FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
{
unsigned long long outFileSize = 0;
z_stream strm;
int flush = Z_NO_FLUSH;
int decodingError = 0;
unsigned storedSkips = 0;
write_job_t *writeJob = NULL;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
@ -2321,8 +2541,9 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile,
if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK)
return FIO_ERROR_FRAME_DECODING;
strm.next_out = (Bytef*)ress->dstBuffer;
strm.avail_out = (uInt)ress->dstBufferSize;
writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.avail_in = (uInt)ress->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
@ -2343,35 +2564,34 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile,
DISPLAYLEVEL(1, "zstd: %s: inflate error %d \n", srcFileName, ret);
decodingError = 1; break;
}
{ size_t const decompBytes = ress->dstBufferSize - strm.avail_out;
{ size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
if (decompBytes) {
storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips);
writeJob->usedBufferSize = decompBytes;
WritePool_queueAndReacquireWriteJob(&writeJob);
outFileSize += decompBytes;
strm.next_out = (Bytef*)ress->dstBuffer;
strm.avail_out = (uInt)ress->dstBufferSize;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
}
}
if (ret == Z_STREAM_END) break;
}
if (strm.avail_in > 0)
memmove(ress->srcBuffer, strm.next_in, strm.avail_in);
ress->srcBufferLoaded = strm.avail_in;
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */
&& (decodingError==0) ) {
DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName);
decodingError = 1;
}
FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
}
#endif
#ifdef ZSTD_LZMADECOMPRESS
static unsigned long long
FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
const FIO_prefs_t* const prefs,
const char* srcFileName, int plain_lzma)
{
unsigned long long outFileSize = 0;
@ -2379,7 +2599,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
lzma_action action = LZMA_RUN;
lzma_ret initRet;
int decodingError = 0;
unsigned storedSkips = 0;
write_job_t *writeJob = NULL;
strm.next_in = 0;
strm.avail_in = 0;
@ -2396,8 +2616,9 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
return FIO_ERROR_FRAME_DECODING;
}
strm.next_out = (BYTE*)ress->dstBuffer;
strm.avail_out = ress->dstBufferSize;
writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = (BYTE const*)ress->srcBuffer;
strm.avail_in = ress->srcBufferLoaded;
@ -2420,21 +2641,21 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
srcFileName, ret);
decodingError = 1; break;
}
{ size_t const decompBytes = ress->dstBufferSize - strm.avail_out;
{ size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
if (decompBytes) {
storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips);
writeJob->usedBufferSize = decompBytes;
WritePool_queueAndReacquireWriteJob(&writeJob);
outFileSize += decompBytes;
strm.next_out = (BYTE*)ress->dstBuffer;
strm.avail_out = ress->dstBufferSize;
strm.next_out = (Bytef*)writeJob->buffer;
strm.avail_out = writeJob->bufferSize;
} }
if (ret == LZMA_STREAM_END) break;
}
if (strm.avail_in > 0)
memmove(ress->srcBuffer, strm.next_in, strm.avail_in);
ress->srcBufferLoaded = strm.avail_in;
FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
lzma_end(&strm);
FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
}
#endif
@ -2442,60 +2663,57 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
#ifdef ZSTD_LZ4DECOMPRESS
static unsigned long long
FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
const FIO_prefs_t* const prefs,
const char* srcFileName)
{
unsigned long long filesize = 0;
LZ4F_errorCode_t nextToLoad;
LZ4F_errorCode_t nextToLoad = 4;
LZ4F_decompressionContext_t dCtx;
LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION);
int decodingError = 0;
unsigned storedSkips = 0;
write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
if (LZ4F_isError(errorCode)) {
DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n");
return FIO_ERROR_FRAME_DECODING;
}
/* Init feed with magic number (already consumed from FILE* sFile) */
{ size_t inSize = 4;
size_t outSize= 0;
MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER);
nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &outSize, ress->srcBuffer, &inSize, NULL);
if (LZ4F_isError(nextToLoad)) {
DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n",
srcFileName, LZ4F_getErrorName(nextToLoad));
LZ4F_freeDecompressionContext(dCtx);
return FIO_ERROR_FRAME_DECODING;
} }
/* Main Loop */
for (;nextToLoad;) {
size_t readSize;
size_t pos = 0;
size_t decodedBytes = ress->dstBufferSize;
size_t decodedBytes = writeJob->bufferSize;
int fullBufferDecoded = 0;
/* Read input */
if (nextToLoad > ress->srcBufferSize) nextToLoad = ress->srcBufferSize;
readSize = fread(ress->srcBuffer, 1, nextToLoad, srcFile);
if (!readSize) break; /* reached end of file or stream */
nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded);
readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile);
if(!readSize && ferror(srcFile)) {
DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
decodingError=1;
break;
}
if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
ress->srcBufferLoaded += readSize;
while ((pos < readSize) || (decodedBytes == ress->dstBufferSize)) { /* still to read, or still to flush */
while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
/* Decode Input (at least partially) */
size_t remaining = readSize - pos;
decodedBytes = ress->dstBufferSize;
nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
size_t remaining = ress->srcBufferLoaded - pos;
decodedBytes = writeJob->bufferSize;
nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
if (LZ4F_isError(nextToLoad)) {
DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
srcFileName, LZ4F_getErrorName(nextToLoad));
decodingError = 1; nextToLoad = 0; break;
}
pos += remaining;
assert(pos <= ress->srcBufferLoaded);
fullBufferDecoded = decodedBytes == writeJob->bufferSize;
/* Write Block */
if (decodedBytes) {
UTIL_HumanReadableSize_t hrs;
storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decodedBytes, prefs, storedSkips);
writeJob->usedBufferSize = decodedBytes;
WritePool_queueAndReacquireWriteJob(&writeJob);
filesize += decodedBytes;
hrs = UTIL_makeHumanReadableSize(filesize);
DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix);
@ -2503,21 +2721,16 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
if (!nextToLoad) break;
}
FIO_consumeDSrcBuffer(ress, pos);
}
/* can be out because readSize == 0, which could be an fread() error */
if (ferror(srcFile)) {
DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
decodingError=1;
}
if (nextToLoad!=0) {
DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
decodingError=1;
}
LZ4F_freeDecompressionContext(dCtx);
ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */
FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
WritePool_releaseWriteJob(writeJob);
WritePool_sparseWriteEnd(ress->writePoolCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : filesize;
}
@ -2566,7 +2779,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
filesize += frameSize;
} else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
#ifdef ZSTD_GZDECOMPRESS
unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, prefs, srcFileName);
unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
@ -2576,7 +2789,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
} else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */
|| (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
#ifdef ZSTD_LZMADECOMPRESS
unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, prefs, srcFileName, buf[0] != 0xFD);
unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
@ -2585,7 +2798,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
#endif
} else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
#ifdef ZSTD_LZ4DECOMPRESS
unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, prefs, srcFileName);
unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
@ -2594,7 +2807,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
#endif
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
return FIO_passThrough(prefs,
ress.dstFile, srcFile,
ress.writePoolCtx->dstFile, srcFile,
ress.srcBuffer, ress.srcBufferSize,
ress.srcBufferLoaded);
} else {
@ -2632,7 +2845,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
int releaseDstFile = 0;
int transferMTime = 0;
if ((ress.dstFile == NULL) && (prefs->testMode==0)) {
if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) {
FILE *dstFile;
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp(srcFileName, stdinmark) /* special case : don't transfer permissions from stdin */
&& strcmp(dstFileName, stdoutmark)
@ -2644,8 +2858,9 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
releaseDstFile = 1;
ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
if (ress.dstFile==NULL) return 1;
dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
if (dstFile==NULL) return 1;
WritePool_setDstFile(ress.writePoolCtx, dstFile);
/* Must only be added after FIO_openDstFile() succeeds.
* Otherwise we may delete the destination file if it already exists,
@ -2657,10 +2872,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName);
if (releaseDstFile) {
FILE* const dstFile = ress.dstFile;
clearHandler();
ress.dstFile = NULL;
if (fclose(dstFile)) {
if (WritePool_closeDstFile(ress.writePoolCtx)) {
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result = 1;
}
@ -2874,15 +3087,16 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx,
return 1;
}
if (!prefs->testMode) {
ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
if (ress.dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
WritePool_setDstFile(ress.writePoolCtx, dstFile);
}
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) {
status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}
if ((!prefs->testMode) && (fclose(ress.dstFile)))
if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx)))
EXM_THROW(72, "Write error : %s : cannot properly close output file",
strerror(errno));
} else {

View File

@ -109,6 +109,7 @@ void FIO_setAllowBlockDevices(FIO_prefs_t* const prefs, int allowBlockDevices);
void FIO_setPatchFromMode(FIO_prefs_t* const prefs, int value);
void FIO_setContentSize(FIO_prefs_t* const prefs, int value);
void FIO_displayCompressionParameters(const FIO_prefs_t* prefs);
void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value);
/* FIO_ctx_t functions */
void FIO_setNbFilesTotal(FIO_ctx_t* const fCtx, int value);

View File

@ -239,9 +239,12 @@ static void usage_advanced(const char* programName)
#ifndef ZSTD_NODECOMPRESS
DISPLAYOUT( "\n");
DISPLAYOUT( "Advanced decompression arguments : \n");
DISPLAYOUT( " -l : print information about zstd compressed files \n");
DISPLAYOUT( "--test : test compressed file integrity \n");
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
DISPLAYOUT( " -l : print information about zstd compressed files \n");
DISPLAYOUT( "--test : test compressed file integrity \n");
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
#ifdef ZSTD_MULTITHREAD
DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n");
#endif
# if ZSTD_SPARSE_DEFAULT
DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
# else
@ -912,6 +915,8 @@ int main(int argCount, const char* argv[])
if (!strcmp(argument, "--sparse")) { FIO_setSparseWrite(prefs, 2); continue; }
if (!strcmp(argument, "--no-sparse")) { FIO_setSparseWrite(prefs, 0); continue; }
if (!strcmp(argument, "--test")) { operation=zom_test; continue; }
if (!strcmp(argument, "--asyncio")) { FIO_setAsyncIOFlag(prefs, 1); continue;}
if (!strcmp(argument, "--no-asyncio")) { FIO_setAsyncIOFlag(prefs, 0); continue;}
if (!strcmp(argument, "--train")) { operation=zom_train; if (outFileName==NULL) outFileName=g_defaultDictName; continue; }
if (!strcmp(argument, "--no-dictID")) { FIO_setDictIDFlag(prefs, 0); continue; }
if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(prefs, 0); continue; }

View File

@ -1575,6 +1575,44 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
exit 1
fi
println "\n===> zstd asyncio decompression tests "
addFrame() {
datagen -g2M -s$2 >> tmp_uncompressed
datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst
}
addTwoFrames() {
addFrame $1 1
addFrame $1 2
}
testAsyncIO() {
roundTripTest -g2M "3 --asyncio --format=$1"
roundTripTest -g2M "3 --no-asyncio --format=$1"
}
rm -f tmp_compressed tmp_uncompressed
testAsyncIO zstd
addTwoFrames zstd
if [ $GZIPMODE -eq 1 ]; then
testAsyncIO gzip
addTwoFrames gzip
fi
if [ $LZMAMODE -eq 1 ]; then
testAsyncIO lzma
addTwoFrames lzma
fi
if [ $LZ4MODE -eq 1 ]; then
testAsyncIO lz4
addTwoFrames lz4
fi
cat tmp_uncompressed | $MD5SUM > tmp2
zstd -d tmp_compressed.zst --asyncio -c | $MD5SUM > tmp1
$DIFF -q tmp1 tmp2
rm tmp1
zstd -d tmp_compressed.zst --no-asyncio -c | $MD5SUM > tmp1
$DIFF -q tmp1 tmp2
if [ "$1" != "--test-large-data" ]; then
println "Skipping large data tests"