1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-05 13:20:31 +02:00

Add threads for restore.

This commit is contained in:
Zhuravlev Uriy aka stalkerg 2016-02-29 21:49:53 +03:00
parent fed13c298e
commit 07d4b8a030
3 changed files with 89 additions and 41 deletions

5
data.c
View File

@ -359,11 +359,6 @@ restore_data_file(const char *from_root,
}
}
elog(LOG, "header block: %i, blknum: %i, hole_offset: %i, BLCKSZ:%i",
header.block,
blknum,
header.hole_offset,
BLCKSZ);
if (header.block < blknum || header.hole_offset > BLCKSZ ||
(int) header.hole_offset + (int) header.hole_length > BLCKSZ)
{

123
restore.c
View File

@ -13,9 +13,18 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "catalog/pg_control.h"
typedef struct
{
parray *files;
pgBackup *backup;
unsigned int start_file_idx;
unsigned int end_file_idx;
} restore_files_args;
static void backup_online_files(bool re_recovery);
static void restore_database(pgBackup *backup);
static void create_recovery_conf(const char *target_time,
@ -35,6 +44,8 @@ static void print_backup_lsn(const pgBackup *backup);
static void search_next_wal(const char *path,
XLogRecPtr *need_lsn,
parray *timelines);
static void restore_files(void *arg);
int
do_restore(const char *target_time,
@ -230,6 +241,8 @@ restore_database(pgBackup *backup)
int ret;
parray *files;
int i;
pthread_t restore_threads[num_threads];
restore_files_args *restore_threads_args[num_threads];
/* confirm block size compatibility */
if (backup->block_size != BLCKSZ)
@ -300,46 +313,36 @@ restore_database(pgBackup *backup)
pgFileFree(parray_remove(files, i));
}
if (num_threads < 1)
num_threads = 1;
/* restore files into $PGDATA */
for (i = 0; i < parray_num(files); i++)
for (i = 0; i < num_threads; i++)
{
char from_root[MAXPGPATH];
pgFile *file = (pgFile *) parray_get(files, i);
restore_files_args *arg = pg_malloc(sizeof(restore_files_args));
arg->files = files;
arg->backup = backup;
arg->start_file_idx = i * (parray_num(files)/num_threads);
if (i == num_threads - 1)
arg->end_file_idx = parray_num(files);
else
arg->end_file_idx = (i + 1) * (parray_num(files)/num_threads);
pgBackupGetPath(backup, from_root, lengthof(from_root), DATABASE_DIR);
if (verbose)
elog(WARNING, "Start thread for start_file_idx:%i end_file_idx:%i num:%li",
arg->start_file_idx,
arg->end_file_idx,
parray_num(files));
/* check for interrupt */
if (interrupted)
elog(ERROR, "interrupted during restore database");
restore_threads_args[i] = arg;
pthread_create(&restore_threads[i], NULL, (void *(*)(void *)) restore_files, arg);
}
/* print progress */
if (!check)
elog(LOG, "(%d/%lu) %s ", i + 1, (unsigned long) parray_num(files),
file->path + strlen(from_root) + 1);
/* directories are created with mkdirs.sh */
if (S_ISDIR(file->mode))
{
if (!check)
elog(LOG, "directory, skip");
continue;
}
/* not backed up */
if (file->write_size == BYTES_INVALID)
{
if (!check)
elog(LOG, "not backed up, skip");
continue;
}
/* restore file */
if (!check)
restore_data_file(from_root, pgdata, file);
/* print size of restored file */
if (!check)
elog(LOG, "restored %lu\n", (unsigned long) file->write_size);
/* Wait theads */
for (i = 0; i < num_threads; i++)
{
pthread_join(restore_threads[i], NULL);
pg_free(restore_threads_args[i]);
}
/* Delete files which are not in file list. */
@ -391,6 +394,56 @@ restore_database(pgBackup *backup)
}
static void
restore_files(void *arg)
{
int i;
restore_files_args *arguments = (restore_files_args *)arg;
/* restore files into $PGDATA */
for (i = arguments->start_file_idx; i < arguments->end_file_idx; i++)
{
char from_root[MAXPGPATH];
pgFile *file = (pgFile *) parray_get(arguments->files, i);
pgBackupGetPath(arguments->backup, from_root, lengthof(from_root), DATABASE_DIR);
/* check for interrupt */
if (interrupted)
elog(ERROR, "interrupted during restore database");
/* print progress */
if (!check)
elog(LOG, "(%d/%lu) %s ", i + 1, (unsigned long) parray_num(arguments->files),
file->path + strlen(from_root) + 1);
/* directories are created with mkdirs.sh */
if (S_ISDIR(file->mode))
{
if (!check)
elog(LOG, "directory, skip");
continue;
}
/* not backed up */
if (file->write_size == BYTES_INVALID)
{
if (!check)
elog(LOG, "not backed up, skip");
continue;
}
/* restore file */
if (!check)
restore_data_file(from_root, pgdata, file);
/* print size of restored file */
if (!check)
elog(LOG, "restored %lu\n", (unsigned long) file->write_size);
}
}
static void
create_recovery_conf(const char *target_time,
const char *target_xid,

View File

@ -50,7 +50,7 @@ pg_arman backup -B ${BACKUP_PATH} -b page -p ${TEST_PGPORT} -d postgres --verbos
pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0002-run.out 2>&1
psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0002-before.out
pg_ctl stop -m immediate > /dev/null 2>&1
pg_arman restore -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0002-run.out 2>&1;echo $?
pg_arman restore -B ${BACKUP_PATH} -j 4 --verbose >> ${TEST_BASE}/TEST-0002-run.out 2>&1;echo $?
pg_ctl start -w -t 600 > /dev/null 2>&1
psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0002-after.out
diff ${TEST_BASE}/TEST-0002-before.out ${TEST_BASE}/TEST-0002-after.out