diff --git a/lib/common/pool.c b/lib/common/pool.c index 41a216f16..281b3824a 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -190,20 +190,19 @@ size_t POOL_sizeof(POOL_ctx *ctx) { } -/* @return : a working pool on success, NULL on failure - * note : starting context is considered consumed. */ -static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) +/* @return : 0 on success, 1 on error */ +static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) { if (numThreads <= ctx->threadCapacity) { - if (!numThreads) return NULL; + if (!numThreads) return 1; ctx->threadLimit = numThreads; - return ctx; + return 0; } /* numThreads > threadCapacity */ { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); - if (!threadPool) return NULL; + if (!threadPool) return 1; /* replace existing thread pool */ - memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(ctx->threads[0])); + memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool)); ZSTD_free(ctx->threads, ctx->customMem); ctx->threads = threadPool; /* Initialize additional threads */ @@ -211,30 +210,25 @@ static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) { if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) { ctx->threadCapacity = threadId; - ctx->threadLimit = threadId; - return NULL; /* will release the pool */ + return 1; } } } } /* successfully expanded */ ctx->threadCapacity = numThreads; ctx->threadLimit = numThreads; - return ctx; + return 0; } -/* @return : a working pool on success, NULL on failure - * note : starting context is considered consumed. */ -POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads) +/* @return : 0 on success, 1 on error */ +int POOL_resize(POOL_ctx* ctx, size_t numThreads) { - if (ctx==NULL) return NULL; + int result; + if (ctx==NULL) return 1; ZSTD_pthread_mutex_lock(&ctx->queueMutex); - { POOL_ctx* const newCtx = POOL_resize_internal(ctx, numThreads); - if (newCtx!=ctx) { - POOL_free(ctx); - return newCtx; - } } + result = POOL_resize_internal(ctx, numThreads); ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); ZSTD_pthread_mutex_unlock(&ctx->queueMutex); - return ctx; + return result; } /** @@ -321,9 +315,9 @@ void POOL_free(POOL_ctx* ctx) { (void)ctx; } -POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads) { - (void)numThreads; - return ctx; +int POOL_resize(POOL_ctx* ctx, size_t numThreads) { + (void)ctx; (void)numThreads; + return 0; } void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { diff --git a/lib/common/pool.h b/lib/common/pool.h index caf514907..458d37f13 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -40,14 +40,14 @@ void POOL_free(POOL_ctx* ctx); /*! POOL_resize() : * Expands or shrinks pool's number of threads. - * This is more efficient than releasing and creating a new context. - * @return : a new pool context on success, NULL on failure - * note : new pool context might have same address as original one, but it's not guaranteed. - * consider starting context as consumed, only rely on returned one. - * note 2 : only numThreads can be resized, queueSize is unchanged. - * note 3 : `numThreads` must be at least 1 + * This is more efficient than releasing + creating a new context, + * since it tries to preserve and re-use existing threads. + * `numThreads` must be at least 1. + * @return : 0 when resize was successful, + * !0 (typically 1) if there is an error. + * note : only numThreads can be resized, queueSize remains unchanged. */ -POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads); +int POOL_resize(POOL_ctx* ctx, size_t numThreads); /*! POOL_sizeof() : * @return threadpool memory usage diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index bbbdc5cd4..dc025e2a5 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1018,8 +1018,7 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) * @return : error code if fails, 0 on success */ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) { - mtctx->factory = POOL_resize(mtctx->factory, nbWorkers); - if (mtctx->factory == NULL) return ERROR(memory_allocation); + if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation); CHECK_F( ZSTDMT_expandJobsTable(mtctx, nbWorkers) ); mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, nbWorkers); if (mtctx->bufPool == NULL) return ERROR(memory_allocation); diff --git a/tests/poolTests.c b/tests/poolTests.c index d5768967a..6a058a5a3 100644 --- a/tests/poolTests.c +++ b/tests/poolTests.c @@ -121,8 +121,7 @@ static int testThreadReduction_internal(POOL_ctx* ctx, poolTest_t test) ZSTD_pthread_mutex_unlock(&test.mut); time4threads = UTIL_clockSpanNano(startTime); - ctx = POOL_resize(ctx, 2/*nbThreads*/); - ASSERT_TRUE(ctx); + ASSERT_EQ( POOL_resize(ctx, 2/*nbThreads*/) , 0 ); test.val = 0; startTime = UTIL_getTime(); { int i; @@ -142,7 +141,7 @@ static int testThreadReduction_internal(POOL_ctx* ctx, poolTest_t test) static int testThreadReduction(void) { int result; poolTest_t test; - POOL_ctx* ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/); + POOL_ctx* const ctx = POOL_create(4 /*nbThreads*/, 2 /*queueSize*/); ASSERT_TRUE(ctx); @@ -179,7 +178,7 @@ static int testAbruptEnding_internal(abruptEndCanary_t test) { int const nbWaits = 16; - POOL_ctx* ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/); + POOL_ctx* const ctx = POOL_create(3 /*numThreads*/, nbWaits /*queueSize*/); ASSERT_TRUE(ctx); test.val = 0; @@ -187,7 +186,7 @@ static int testAbruptEnding_internal(abruptEndCanary_t test) for (i=0; i