From 9ec02abc0df1843d81201a68c135407de4c55361 Mon Sep 17 00:00:00 2001 From: Anastasia Date: Fri, 2 Mar 2018 15:22:57 +0300 Subject: [PATCH] WIP. Doesn't work. Cross platform multi threaded backup/restore --- Makefile | 7 +++++-- src/backup.c | 6 +++--- src/pg_probackup.h | 3 ++- src/restore.c | 4 ++-- src/validate.c | 4 ++-- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/Makefile b/Makefile index 0f4fa672..e7106d55 100644 --- a/Makefile +++ b/Makefile @@ -7,9 +7,10 @@ OBJS = src/backup.o src/catalog.o src/configure.o src/data.o \ src/archive.o src/utils/parray.o src/utils/pgut.o src/utils/logger.o EXTRA_CLEAN = src/datapagemap.c src/datapagemap.h src/xlogreader.c \ - src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h src/logging.h + src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h src/logging.h \ + src/atomics.h -INCLUDES = src/datapagemap.h src/logging.h src/receivelog.h src/streamutil.h +INCLUDES = src/datapagemap.h src/logging.h src/receivelog.h src/streamutil.h src/atomics.h ifdef USE_PGXS PG_CONFIG = pg_config @@ -61,6 +62,8 @@ src/streamutil.c: $(top_srcdir)/src/bin/pg_basebackup/streamutil.c rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/streamutil.c $@ src/streamutil.h: $(top_srcdir)/src/bin/pg_basebackup/streamutil.h rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/streamutil.h $@ +src/atomics.h: $(top_srcdir)/src/include/port/atomics.h + rm -f $@ && $(LN_S) $(srchome)/src/include/port/atomics.h $@ ifeq ($(MAJORVERSION),10) src/walmethods.c: $(top_srcdir)/src/bin/pg_basebackup/walmethods.c diff --git a/src/backup.c b/src/backup.c index 0ee2a9ba..77ca3311 100644 --- a/src/backup.c +++ b/src/backup.c @@ -383,7 +383,7 @@ remote_backup_files(void *arg) if (S_ISDIR(file->mode)) continue; - if (__sync_lock_test_and_set(&file->lock, 1) != 0) + if (pg_atomic_test_set_flag(&file->lock)) continue; file_backup_conn = pgut_connect_replication(pgut_dbname); @@ -646,7 +646,7 @@ do_backup_instance(void) } /* setup threads */ - __sync_lock_release(&file->lock); + pg_atomic_clear_flag(&file->lock); } /* sort by size for load balancing */ @@ -1922,7 +1922,7 @@ backup_files(void *arg) pgFile *file = (pgFile *) parray_get(arguments->backup_files_list, i); elog(VERBOSE, "Copying file: \"%s\" ", file->path); - if (__sync_lock_test_and_set(&file->lock, 1) != 0) + if (pg_atomic_test_set_flag(&file->lock)) continue; /* check for interrupt */ diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 7da50800..713b8e62 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -33,6 +33,7 @@ #include "common/relpath.h" #include "port.h" +#include "atomics.h" #include "utils/parray.h" #include "utils/pgut.h" @@ -105,7 +106,7 @@ typedef struct pgFile bool is_cfs; /* Flag to distinguish files compressed by CFS*/ bool is_database; CompressAlg compress_alg; /* compression algorithm applied to the file */ - volatile uint32 lock; /* lock for synchronization of parallel threads */ + volatile pg_atomic_flag lock; /* lock for synchronization of parallel threads */ datapagemap_t pagemap; /* bitmap of pages updated since previous backup */ } pgFile; diff --git a/src/restore.c b/src/restore.c index a63a4bd3..24f6bfae 100644 --- a/src/restore.c +++ b/src/restore.c @@ -398,7 +398,7 @@ restore_backup(pgBackup *backup) for (i = 0; i < parray_num(files); i++) { pgFile *file = (pgFile *) parray_get(files, i); - __sync_lock_release(&file->lock); + pg_atomic_clear_flag(&file->lock); } /* Restore files into target directory */ @@ -703,7 +703,7 @@ restore_files(void *arg) char *rel_path; pgFile *file = (pgFile *) parray_get(arguments->files, i); - if (__sync_lock_test_and_set(&file->lock, 1) != 0) + if (pg_atomic_test_set_flag(&file->lock)) continue; pgBackupGetPath(arguments->backup, from_root, diff --git a/src/validate.c b/src/validate.c index 2be75760..e6bb3b22 100644 --- a/src/validate.c +++ b/src/validate.c @@ -62,7 +62,7 @@ pgBackupValidate(pgBackup *backup) for (i = 0; i < parray_num(files); i++) { pgFile *file = (pgFile *) parray_get(files, i); - __sync_lock_release(&file->lock); + pg_atomic_clear_flag(&file->lock); } /* Validate files */ @@ -116,7 +116,7 @@ pgBackupValidateFiles(void *arg) struct stat st; pgFile *file = (pgFile *) parray_get(arguments->files, i); - if (__sync_lock_test_and_set(&file->lock, 1) != 0) + if (pg_atomic_test_set_flag(&file->lock)) continue; if (interrupted)