1
0
mirror of https://github.com/facebook/zstd.git synced 2025-03-07 01:10:04 +02:00

Merge pull request #3364 from yoniko/fix-windows-mt-thread-resize-bug

Windows MT layer bug fixes
This commit is contained in:
Yonatan Komornik 2022-12-19 15:54:01 -08:00 committed by GitHub
commit a8add436ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 26 deletions

View File

@ -192,8 +192,6 @@ test('test-zstream-1',
test('test-zstream-3',
zstreamtest,
args: ['--newapi', '-t1', ZSTREAM_TESTTIME] + FUZZER_FLAGS,
# --newapi dies on Windows with "exit status 3221225477 or signal 3221225349 SIGinvalid"
should_fail: host_machine_os == os_windows,
timeout: 120)
test('test-longmatch', longmatch, timeout: 36)
test('test-invalidDictionaries', invalidDictionaries) # should be fast

View File

@ -173,7 +173,7 @@ static void POOL_join(POOL_ctx* ctx) {
/* Join all of the threads */
{ size_t i;
for (i = 0; i < ctx->threadCapacity; ++i) {
ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */
ZSTD_pthread_join(ctx->threads[i]); /* note : could fail */
} }
}

View File

@ -34,39 +34,92 @@ int g_ZSTD_threading_useless_symbol;
/* === Implementation === */
typedef struct {
void* (*start_routine)(void*);
void* arg;
int initialized;
ZSTD_pthread_cond_t initialized_cond;
ZSTD_pthread_mutex_t initialized_mutex;
} ZSTD_thread_params_t;
static unsigned __stdcall worker(void *arg)
{
ZSTD_pthread_t* const thread = (ZSTD_pthread_t*) arg;
thread->arg = thread->start_routine(thread->arg);
void* (*start_routine)(void*);
void* thread_arg;
/* Inialized thread_arg and start_routine and signal main thread that we don't need it
* to wait any longer.
*/
{
ZSTD_thread_params_t* thread_param = (ZSTD_thread_params_t*)arg;
thread_arg = thread_param->arg;
start_routine = thread_param->start_routine;
/* Signal main thread that we are running and do not depend on its memory anymore */
ZSTD_pthread_mutex_lock(&thread_param->initialized_mutex);
thread_param->initialized = 1;
ZSTD_pthread_cond_signal(&thread_param->initialized_cond);
ZSTD_pthread_mutex_unlock(&thread_param->initialized_mutex);
}
start_routine(thread_arg);
return 0;
}
int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused,
void* (*start_routine) (void*), void* arg)
{
ZSTD_thread_params_t thread_param;
(void)unused;
thread->arg = arg;
thread->start_routine = start_routine;
thread->handle = (HANDLE) _beginthreadex(NULL, 0, worker, thread, 0, NULL);
if (!thread->handle)
thread_param.start_routine = start_routine;
thread_param.arg = arg;
thread_param.initialized = 0;
*thread = NULL;
/* Setup thread initialization synchronization */
if(ZSTD_pthread_cond_init(&thread_param.initialized_cond, NULL)) {
/* Should never happen on Windows */
return -1;
}
if(ZSTD_pthread_mutex_init(&thread_param.initialized_mutex, NULL)) {
/* Should never happen on Windows */
ZSTD_pthread_cond_destroy(&thread_param.initialized_cond);
return -1;
}
/* Spawn thread */
*thread = (HANDLE)_beginthreadex(NULL, 0, worker, &thread_param, 0, NULL);
if (!thread) {
ZSTD_pthread_mutex_destroy(&thread_param.initialized_mutex);
ZSTD_pthread_cond_destroy(&thread_param.initialized_cond);
return errno;
else
return 0;
}
/* Wait for thread to be initialized */
ZSTD_pthread_mutex_lock(&thread_param.initialized_mutex);
while(!thread_param.initialized) {
ZSTD_pthread_cond_wait(&thread_param.initialized_cond, &thread_param.initialized_mutex);
}
ZSTD_pthread_mutex_unlock(&thread_param.initialized_mutex);
ZSTD_pthread_mutex_destroy(&thread_param.initialized_mutex);
ZSTD_pthread_cond_destroy(&thread_param.initialized_cond);
return 0;
}
int ZSTD_pthread_join(ZSTD_pthread_t thread, void **value_ptr)
int ZSTD_pthread_join(ZSTD_pthread_t thread)
{
DWORD result;
if (!thread.handle) return 0;
if (!thread) return 0;
result = WaitForSingleObject(thread.handle, INFINITE);
CloseHandle(thread.handle);
result = WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
switch (result) {
case WAIT_OBJECT_0:
if (value_ptr) *value_ptr = thread.arg;
return 0;
case WAIT_ABANDONED:
return EINVAL;

View File

@ -61,16 +61,12 @@ extern "C" {
#define ZSTD_pthread_cond_broadcast(a) WakeAllConditionVariable((a))
/* ZSTD_pthread_create() and ZSTD_pthread_join() */
typedef struct {
HANDLE handle;
void* (*start_routine)(void*);
void* arg;
} ZSTD_pthread_t;
typedef HANDLE ZSTD_pthread_t;
int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused,
void* (*start_routine) (void*), void* arg);
int ZSTD_pthread_join(ZSTD_pthread_t thread, void** value_ptr);
int ZSTD_pthread_join(ZSTD_pthread_t thread);
/**
* add here more wrappers as required
@ -98,7 +94,7 @@ int ZSTD_pthread_join(ZSTD_pthread_t thread, void** value_ptr);
#define ZSTD_pthread_t pthread_t
#define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d))
#define ZSTD_pthread_join(a, b) pthread_join((a),(b))
#define ZSTD_pthread_join(a) pthread_join((a),NULL)
#else /* DEBUGLEVEL >= 1 */
@ -123,7 +119,7 @@ int ZSTD_pthread_cond_destroy(ZSTD_pthread_cond_t* cond);
#define ZSTD_pthread_t pthread_t
#define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d))
#define ZSTD_pthread_join(a, b) pthread_join((a),(b))
#define ZSTD_pthread_join(a) pthread_join((a),NULL)
#endif

View File

@ -431,8 +431,8 @@ static int threadPoolTests(void) {
ZSTD_pthread_create(&t1, NULL, threadPoolTests_compressionJob, &p1);
ZSTD_pthread_create(&t2, NULL, threadPoolTests_compressionJob, &p2);
ZSTD_pthread_join(t1, NULL);
ZSTD_pthread_join(t2, NULL);
ZSTD_pthread_join(t1);
ZSTD_pthread_join(t2);
assert(!memcmp(decodedBuffer, decodedBuffer2, CNBuffSize));
free(decodedBuffer2);