diff --git a/data.c b/data.c index 7f54dc9b..c3054923 100644 --- a/data.c +++ b/data.c @@ -359,11 +359,6 @@ restore_data_file(const char *from_root, } } - elog(LOG, "header block: %i, blknum: %i, hole_offset: %i, BLCKSZ:%i", - header.block, - blknum, - header.hole_offset, - BLCKSZ); if (header.block < blknum || header.hole_offset > BLCKSZ || (int) header.hole_offset + (int) header.hole_length > BLCKSZ) { diff --git a/restore.c b/restore.c index 5819a14a..e37fb463 100644 --- a/restore.c +++ b/restore.c @@ -13,9 +13,18 @@ #include #include #include +#include #include "catalog/pg_control.h" +typedef struct +{ + parray *files; + pgBackup *backup; + unsigned int start_file_idx; + unsigned int end_file_idx; +} restore_files_args; + static void backup_online_files(bool re_recovery); static void restore_database(pgBackup *backup); static void create_recovery_conf(const char *target_time, @@ -35,6 +44,8 @@ static void print_backup_lsn(const pgBackup *backup); static void search_next_wal(const char *path, XLogRecPtr *need_lsn, parray *timelines); +static void restore_files(void *arg); + int do_restore(const char *target_time, @@ -230,6 +241,8 @@ restore_database(pgBackup *backup) int ret; parray *files; int i; + pthread_t restore_threads[num_threads]; + restore_files_args *restore_threads_args[num_threads]; /* confirm block size compatibility */ if (backup->block_size != BLCKSZ) @@ -300,46 +313,36 @@ restore_database(pgBackup *backup) pgFileFree(parray_remove(files, i)); } + if (num_threads < 1) + num_threads = 1; + /* restore files into $PGDATA */ - for (i = 0; i < parray_num(files); i++) + for (i = 0; i < num_threads; i++) { - char from_root[MAXPGPATH]; - pgFile *file = (pgFile *) parray_get(files, i); + restore_files_args *arg = pg_malloc(sizeof(restore_files_args)); + arg->files = files; + arg->backup = backup; + arg->start_file_idx = i * (parray_num(files)/num_threads); + if (i == num_threads - 1) + arg->end_file_idx = parray_num(files); + else + arg->end_file_idx = (i + 1) * (parray_num(files)/num_threads); - pgBackupGetPath(backup, from_root, lengthof(from_root), DATABASE_DIR); + if (verbose) + elog(WARNING, "Start thread for start_file_idx:%i end_file_idx:%i num:%li", + arg->start_file_idx, + arg->end_file_idx, + parray_num(files)); - /* check for interrupt */ - if (interrupted) - elog(ERROR, "interrupted during restore database"); + restore_threads_args[i] = arg; + pthread_create(&restore_threads[i], NULL, (void *(*)(void *)) restore_files, arg); + } - /* print progress */ - if (!check) - elog(LOG, "(%d/%lu) %s ", i + 1, (unsigned long) parray_num(files), - file->path + strlen(from_root) + 1); - - /* directories are created with mkdirs.sh */ - if (S_ISDIR(file->mode)) - { - if (!check) - elog(LOG, "directory, skip"); - continue; - } - - /* not backed up */ - if (file->write_size == BYTES_INVALID) - { - if (!check) - elog(LOG, "not backed up, skip"); - continue; - } - - /* restore file */ - if (!check) - restore_data_file(from_root, pgdata, file); - - /* print size of restored file */ - if (!check) - elog(LOG, "restored %lu\n", (unsigned long) file->write_size); + /* Wait theads */ + for (i = 0; i < num_threads; i++) + { + pthread_join(restore_threads[i], NULL); + pg_free(restore_threads_args[i]); } /* Delete files which are not in file list. */ @@ -391,6 +394,56 @@ restore_database(pgBackup *backup) } +static void +restore_files(void *arg) +{ + int i; + + restore_files_args *arguments = (restore_files_args *)arg; + + /* restore files into $PGDATA */ + for (i = arguments->start_file_idx; i < arguments->end_file_idx; i++) + { + char from_root[MAXPGPATH]; + pgFile *file = (pgFile *) parray_get(arguments->files, i); + + pgBackupGetPath(arguments->backup, from_root, lengthof(from_root), DATABASE_DIR); + + /* check for interrupt */ + if (interrupted) + elog(ERROR, "interrupted during restore database"); + + /* print progress */ + if (!check) + elog(LOG, "(%d/%lu) %s ", i + 1, (unsigned long) parray_num(arguments->files), + file->path + strlen(from_root) + 1); + + /* directories are created with mkdirs.sh */ + if (S_ISDIR(file->mode)) + { + if (!check) + elog(LOG, "directory, skip"); + continue; + } + + /* not backed up */ + if (file->write_size == BYTES_INVALID) + { + if (!check) + elog(LOG, "not backed up, skip"); + continue; + } + + /* restore file */ + if (!check) + restore_data_file(from_root, pgdata, file); + + /* print size of restored file */ + if (!check) + elog(LOG, "restored %lu\n", (unsigned long) file->write_size); + } +} + static void create_recovery_conf(const char *target_time, const char *target_xid, diff --git a/sql/restore.sh b/sql/restore.sh index 5f775adb..c96f289a 100644 --- a/sql/restore.sh +++ b/sql/restore.sh @@ -50,7 +50,7 @@ pg_arman backup -B ${BACKUP_PATH} -b page -p ${TEST_PGPORT} -d postgres --verbos pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0002-run.out 2>&1 psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0002-before.out pg_ctl stop -m immediate > /dev/null 2>&1 -pg_arman restore -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0002-run.out 2>&1;echo $? +pg_arman restore -B ${BACKUP_PATH} -j 4 --verbose >> ${TEST_BASE}/TEST-0002-run.out 2>&1;echo $? pg_ctl start -w -t 600 > /dev/null 2>&1 psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0002-after.out diff ${TEST_BASE}/TEST-0002-before.out ${TEST_BASE}/TEST-0002-after.out