mirror of
https://github.com/postgrespro/pg_probackup.git
synced 2025-01-09 14:45:47 +02:00
PGPRO-427: Put in order thread arguments
This commit is contained in:
parent
a3b9d1ebbb
commit
b97e9e8e29
79
src/backup.c
79
src/backup.c
@ -373,8 +373,8 @@ static void *
|
||||
remote_backup_files(void *arg)
|
||||
{
|
||||
int i;
|
||||
backup_files_args *arguments = (backup_files_args *) arg;
|
||||
int n_backup_files_list = parray_num(arguments->backup_files_list);
|
||||
backup_files_arg *arguments = (backup_files_arg *) arg;
|
||||
int n_backup_files_list = parray_num(arguments->files_list);
|
||||
PGconn *file_backup_conn = NULL;
|
||||
|
||||
for (i = 0; i < n_backup_files_list; i++)
|
||||
@ -385,7 +385,7 @@ remote_backup_files(void *arg)
|
||||
pgFile *file;
|
||||
int row_length;
|
||||
|
||||
file = (pgFile *) parray_get(arguments->backup_files_list, i);
|
||||
file = (pgFile *) parray_get(arguments->files_list, i);
|
||||
|
||||
/* We have already copied all directories */
|
||||
if (S_ISDIR(file->mode))
|
||||
@ -465,12 +465,11 @@ do_backup_instance(void)
|
||||
XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr;
|
||||
|
||||
/* arrays with meta info for multi threaded backup */
|
||||
pthread_t *backup_threads;
|
||||
backup_files_args *backup_threads_args;
|
||||
pthread_t *threads;
|
||||
backup_files_arg *threads_args;
|
||||
bool backup_isok = true;
|
||||
|
||||
pgBackup *prev_backup = NULL;
|
||||
char prev_backup_filelist_path[MAXPGPATH];
|
||||
parray *prev_backup_filelist = NULL;
|
||||
|
||||
elog(LOG, "Database backup start");
|
||||
@ -512,6 +511,7 @@ do_backup_instance(void)
|
||||
current.backup_mode == BACKUP_MODE_DIFF_DELTA)
|
||||
{
|
||||
parray *backup_list;
|
||||
char prev_backup_filelist_path[MAXPGPATH];
|
||||
|
||||
/* get list of backups already taken */
|
||||
backup_list = catalog_get_backup_list(INVALID_BACKUP_ID);
|
||||
@ -524,8 +524,8 @@ do_backup_instance(void)
|
||||
"Create new FULL backup before an incremental one.");
|
||||
parray_free(backup_list);
|
||||
|
||||
pgBackupGetPath(prev_backup, prev_backup_filelist_path, lengthof(prev_backup_filelist_path),
|
||||
DATABASE_FILE_LIST);
|
||||
pgBackupGetPath(prev_backup, prev_backup_filelist_path,
|
||||
lengthof(prev_backup_filelist_path), DATABASE_FILE_LIST);
|
||||
/* Files of previous backup needed by DELTA backup */
|
||||
prev_backup_filelist = dir_read_file_list(NULL, prev_backup_filelist_path);
|
||||
|
||||
@ -701,20 +701,20 @@ do_backup_instance(void)
|
||||
parray_qsort(backup_files_list, pgFileCompareSize);
|
||||
|
||||
/* init thread args with own file lists */
|
||||
backup_threads = (pthread_t *) palloc(sizeof(pthread_t)*num_threads);
|
||||
backup_threads_args = (backup_files_args *) palloc(sizeof(backup_files_args)*num_threads);
|
||||
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
|
||||
threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
|
||||
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
backup_files_args *arg = &(backup_threads_args[i]);
|
||||
backup_files_arg *arg = &(threads_args[i]);
|
||||
|
||||
arg->from_root = pgdata;
|
||||
arg->to_root = database_path;
|
||||
arg->backup_files_list = backup_files_list;
|
||||
arg->prev_backup_filelist = prev_backup_filelist;
|
||||
arg->prev_backup_start_lsn = prev_backup_start_lsn;
|
||||
arg->thread_backup_conn = NULL;
|
||||
arg->thread_cancel_conn = NULL;
|
||||
arg->files_list = backup_files_list;
|
||||
arg->prev_filelist = prev_backup_filelist;
|
||||
arg->prev_start_lsn = prev_backup_start_lsn;
|
||||
arg->backup_conn = NULL;
|
||||
arg->cancel_conn = NULL;
|
||||
/* By default there are some error */
|
||||
arg->ret = 1;
|
||||
}
|
||||
@ -723,20 +723,21 @@ do_backup_instance(void)
|
||||
elog(LOG, "Start transfering data files");
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
backup_files_args *arg = &(backup_threads_args[i]);
|
||||
backup_files_arg *arg = &(threads_args[i]);
|
||||
|
||||
elog(VERBOSE, "Start thread num: %i", i);
|
||||
|
||||
if (!is_remote_backup)
|
||||
pthread_create(&backup_threads[i], NULL, backup_files, arg);
|
||||
pthread_create(&threads[i], NULL, backup_files, arg);
|
||||
else
|
||||
pthread_create(&backup_threads[i], NULL, remote_backup_files, arg);
|
||||
pthread_create(&threads[i], NULL, remote_backup_files, arg);
|
||||
}
|
||||
|
||||
/* Wait threads */
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
pthread_join(backup_threads[i], NULL);
|
||||
if (backup_threads_args[i].ret == 1)
|
||||
pthread_join(threads[i], NULL);
|
||||
if (threads_args[i].ret == 1)
|
||||
backup_isok = false;
|
||||
}
|
||||
if (backup_isok)
|
||||
@ -2022,16 +2023,16 @@ static void *
|
||||
backup_files(void *arg)
|
||||
{
|
||||
int i;
|
||||
backup_files_args *arguments = (backup_files_args *) arg;
|
||||
int n_backup_files_list = parray_num(arguments->backup_files_list);
|
||||
backup_files_arg *arguments = (backup_files_arg *) arg;
|
||||
int n_backup_files_list = parray_num(arguments->files_list);
|
||||
|
||||
/* backup a file */
|
||||
for (i = 0; i < n_backup_files_list; i++)
|
||||
{
|
||||
int ret;
|
||||
struct stat buf;
|
||||
pgFile *file = (pgFile *) parray_get(arguments->files_list, i);
|
||||
|
||||
pgFile *file = (pgFile *) parray_get(arguments->backup_files_list, i);
|
||||
elog(VERBOSE, "Copying file: \"%s\" ", file->path);
|
||||
if (!pg_atomic_test_set_flag(&file->lock))
|
||||
continue;
|
||||
@ -2077,11 +2078,15 @@ backup_files(void *arg)
|
||||
{
|
||||
int p;
|
||||
char *relative;
|
||||
int n_prev_backup_files_list = parray_num(arguments->prev_backup_filelist);
|
||||
int n_prev_files = parray_num(arguments->prev_filelist);
|
||||
|
||||
relative = GetRelativePath(file->path, arguments->from_root);
|
||||
for (p = 0; p < n_prev_backup_files_list; p++)
|
||||
for (p = 0; p < n_prev_files; p++)
|
||||
{
|
||||
pgFile *prev_file = (pgFile *) parray_get(arguments->prev_backup_filelist, p);
|
||||
pgFile *prev_file;
|
||||
|
||||
prev_file = (pgFile *) parray_get(arguments->prev_filelist, p);
|
||||
|
||||
if (strcmp(relative, prev_file->path) == 0)
|
||||
{
|
||||
/* File exists in previous backup */
|
||||
@ -2098,7 +2103,7 @@ backup_files(void *arg)
|
||||
if (!backup_data_file(arguments,
|
||||
arguments->from_root,
|
||||
arguments->to_root, file,
|
||||
arguments->prev_backup_start_lsn,
|
||||
arguments->prev_start_lsn,
|
||||
current.backup_mode))
|
||||
{
|
||||
file->write_size = BYTES_INVALID;
|
||||
@ -2130,8 +2135,8 @@ backup_files(void *arg)
|
||||
}
|
||||
|
||||
/* Close connection */
|
||||
if (arguments->thread_backup_conn)
|
||||
pgut_disconnect(arguments->thread_backup_conn);
|
||||
if (arguments->backup_conn)
|
||||
pgut_disconnect(arguments->backup_conn);
|
||||
|
||||
/* Data files transferring is successful */
|
||||
arguments->ret = 0;
|
||||
@ -2633,7 +2638,7 @@ get_last_ptrack_lsn(void)
|
||||
}
|
||||
|
||||
char *
|
||||
pg_ptrack_get_block(backup_files_args *arguments,
|
||||
pg_ptrack_get_block(backup_files_arg *arguments,
|
||||
Oid dbOid,
|
||||
Oid tblsOid,
|
||||
Oid relOid,
|
||||
@ -2658,17 +2663,17 @@ pg_ptrack_get_block(backup_files_args *arguments,
|
||||
sprintf(params[2], "%i", relOid);
|
||||
sprintf(params[3], "%u", blknum);
|
||||
|
||||
if (arguments->thread_backup_conn == NULL)
|
||||
if (arguments->backup_conn == NULL)
|
||||
{
|
||||
arguments->thread_backup_conn = pgut_connect(pgut_dbname);
|
||||
arguments->backup_conn = pgut_connect(pgut_dbname);
|
||||
}
|
||||
|
||||
if (arguments->thread_cancel_conn == NULL)
|
||||
arguments->thread_cancel_conn = PQgetCancel(arguments->thread_backup_conn);
|
||||
if (arguments->cancel_conn == NULL)
|
||||
arguments->cancel_conn = PQgetCancel(arguments->backup_conn);
|
||||
|
||||
//elog(LOG, "db %i pg_ptrack_get_block(%i, %i, %u)",dbOid, tblsOid, relOid, blknum);
|
||||
res = pgut_execute_parallel(arguments->thread_backup_conn,
|
||||
arguments->thread_cancel_conn,
|
||||
res = pgut_execute_parallel(arguments->backup_conn,
|
||||
arguments->cancel_conn,
|
||||
"SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)",
|
||||
4, (const char **)params, true);
|
||||
|
||||
|
@ -221,7 +221,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
|
||||
* to the backup file.
|
||||
*/
|
||||
static void
|
||||
backup_data_page(backup_files_args *arguments,
|
||||
backup_data_page(backup_files_arg *arguments,
|
||||
pgFile *file, XLogRecPtr prev_backup_start_lsn,
|
||||
BlockNumber blknum, BlockNumber nblocks,
|
||||
FILE *in, FILE *out,
|
||||
@ -409,7 +409,7 @@ backup_data_page(backup_files_args *arguments,
|
||||
* backup with special header.
|
||||
*/
|
||||
bool
|
||||
backup_data_file(backup_files_args* arguments,
|
||||
backup_data_file(backup_files_arg* arguments,
|
||||
const char *from_root, const char *to_root,
|
||||
pgFile *file, XLogRecPtr prev_backup_start_lsn,
|
||||
BackupMode backup_mode)
|
||||
|
@ -5,7 +5,7 @@
|
||||
*
|
||||
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
* Portions Copyright (c) 2015-2017, Postgres Professional
|
||||
* Portions Copyright (c) 2015-2018, Postgres Professional
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
@ -101,6 +101,19 @@ typedef struct XLogPageReadPrivate
|
||||
TimeLineID tli;
|
||||
} XLogPageReadPrivate;
|
||||
|
||||
/* An argument for a thread function */
|
||||
typedef struct
|
||||
{
|
||||
parray *files;
|
||||
bool corrupted;
|
||||
|
||||
/*
|
||||
* Return value from the thread.
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} xlog_thread_arg;
|
||||
|
||||
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
|
||||
XLogRecPtr targetPagePtr,
|
||||
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
|
||||
@ -125,6 +138,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
|
||||
XLogSegNo endSegNo,
|
||||
nextSegNo = 0;
|
||||
|
||||
pthread_t threads[num_threads];
|
||||
xlog_thread_arg thread_args[num_threads];
|
||||
|
||||
elog(LOG, "Compiling pagemap");
|
||||
if (!XRecOffIsValid(startpoint))
|
||||
elog(ERROR, "Invalid startpoint value %X/%X",
|
||||
|
@ -276,18 +276,20 @@ typedef struct
|
||||
{
|
||||
const char *from_root;
|
||||
const char *to_root;
|
||||
parray *backup_files_list;
|
||||
parray *prev_backup_filelist;
|
||||
XLogRecPtr prev_backup_start_lsn;
|
||||
PGconn *thread_backup_conn;
|
||||
PGcancel *thread_cancel_conn;
|
||||
|
||||
parray *files_list;
|
||||
parray *prev_filelist;
|
||||
XLogRecPtr prev_start_lsn;
|
||||
|
||||
PGconn *backup_conn;
|
||||
PGcancel *cancel_conn;
|
||||
|
||||
/*
|
||||
* Return value from the thread.
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} backup_files_args;
|
||||
} backup_files_arg;
|
||||
|
||||
/*
|
||||
* return pointer that exceeds the length of prefix from character string.
|
||||
@ -381,7 +383,7 @@ extern const char *deparse_backup_mode(BackupMode mode);
|
||||
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
|
||||
BlockNumber blkno);
|
||||
|
||||
extern char *pg_ptrack_get_block(backup_files_args *arguments,
|
||||
extern char *pg_ptrack_get_block(backup_files_arg *arguments,
|
||||
Oid dbOid, Oid tblsOid, Oid relOid,
|
||||
BlockNumber blknum,
|
||||
size_t *result_size);
|
||||
@ -485,7 +487,7 @@ extern int pgFileCompareLinked(const void *f1, const void *f2);
|
||||
extern int pgFileCompareSize(const void *f1, const void *f2);
|
||||
|
||||
/* in data.c */
|
||||
extern bool backup_data_file(backup_files_args* arguments,
|
||||
extern bool backup_data_file(backup_files_arg* arguments,
|
||||
const char *from_root, const char *to_root,
|
||||
pgFile *file, XLogRecPtr prev_backup_start_lsn,
|
||||
BackupMode backup_mode);
|
||||
|
@ -28,7 +28,7 @@ typedef struct
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} restore_files_args;
|
||||
} restore_files_arg;
|
||||
|
||||
/* Tablespace mapping structures */
|
||||
|
||||
@ -363,8 +363,8 @@ restore_backup(pgBackup *backup)
|
||||
parray *files;
|
||||
int i;
|
||||
/* arrays with meta info for multi threaded backup */
|
||||
pthread_t *restore_threads;
|
||||
restore_files_args *restore_threads_args;
|
||||
pthread_t *threads;
|
||||
restore_files_arg *threads_args;
|
||||
bool restore_isok = true;
|
||||
|
||||
if (backup->status != BACKUP_STATUS_OK)
|
||||
@ -398,43 +398,44 @@ restore_backup(pgBackup *backup)
|
||||
pgBackupGetPath(backup, list_path, lengthof(list_path), DATABASE_FILE_LIST);
|
||||
files = dir_read_file_list(database_path, list_path);
|
||||
|
||||
restore_threads = (pthread_t *) palloc(sizeof(pthread_t)*num_threads);
|
||||
restore_threads_args = (restore_files_args *) palloc(sizeof(restore_files_args)*num_threads);
|
||||
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
|
||||
threads_args = (restore_files_arg *) palloc(sizeof(restore_files_arg)*num_threads);
|
||||
|
||||
/* setup threads */
|
||||
for (i = 0; i < parray_num(files); i++)
|
||||
{
|
||||
pgFile *file = (pgFile *) parray_get(files, i);
|
||||
|
||||
pg_atomic_clear_flag(&file->lock);
|
||||
}
|
||||
|
||||
/* Restore files into target directory */
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
restore_files_args *arg = &(restore_threads_args[i]);
|
||||
restore_files_arg *arg = &(threads_args[i]);
|
||||
|
||||
arg->files = files;
|
||||
arg->backup = backup;
|
||||
/* By default there are some error */
|
||||
arg->ret = 1;
|
||||
threads_args[i].ret = 1;
|
||||
|
||||
elog(LOG, "Start thread for num:%li", parray_num(files));
|
||||
|
||||
pthread_create(&restore_threads[i], NULL, restore_files, arg);
|
||||
pthread_create(&threads[i], NULL, restore_files, arg);
|
||||
}
|
||||
|
||||
/* Wait theads */
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
pthread_join(restore_threads[i], NULL);
|
||||
if (restore_threads_args[i].ret == 1)
|
||||
pthread_join(threads[i], NULL);
|
||||
if (threads_args[i].ret == 1)
|
||||
restore_isok = false;
|
||||
}
|
||||
if (!restore_isok)
|
||||
elog(ERROR, "Data files restoring failed");
|
||||
|
||||
pfree(restore_threads);
|
||||
pfree(restore_threads_args);
|
||||
pfree(threads);
|
||||
pfree(threads_args);
|
||||
|
||||
/* cleanup */
|
||||
parray_walk(files, pgFileFree);
|
||||
@ -710,7 +711,7 @@ static void *
|
||||
restore_files(void *arg)
|
||||
{
|
||||
int i;
|
||||
restore_files_args *arguments = (restore_files_args *)arg;
|
||||
restore_files_arg *arguments = (restore_files_arg *)arg;
|
||||
|
||||
for (i = 0; i < parray_num(arguments->files); i++)
|
||||
{
|
||||
|
@ -30,7 +30,7 @@ typedef struct
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} validate_files_args;
|
||||
} validate_files_arg;
|
||||
|
||||
/*
|
||||
* Validate backup files.
|
||||
@ -44,8 +44,8 @@ pgBackupValidate(pgBackup *backup)
|
||||
bool corrupted = false;
|
||||
bool validation_isok = true;
|
||||
/* arrays with meta info for multi threaded validate */
|
||||
pthread_t *validate_threads;
|
||||
validate_files_args *validate_threads_args;
|
||||
pthread_t *threads;
|
||||
validate_files_arg *threads_args;
|
||||
int i;
|
||||
|
||||
/* Revalidation is attempted for DONE, ORPHAN and CORRUPT backups */
|
||||
@ -83,29 +83,29 @@ pgBackupValidate(pgBackup *backup)
|
||||
}
|
||||
|
||||
/* init thread args with own file lists */
|
||||
validate_threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
|
||||
validate_threads_args = (validate_files_args *)
|
||||
palloc(sizeof(validate_files_args) * num_threads);
|
||||
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
|
||||
threads_args = (validate_files_arg *)
|
||||
palloc(sizeof(validate_files_arg) * num_threads);
|
||||
|
||||
/* Validate files */
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
validate_files_args *arg = &(validate_threads_args[i]);
|
||||
validate_files_arg *arg = &(threads_args[i]);
|
||||
|
||||
arg->files = files;
|
||||
arg->corrupted = false;
|
||||
/* By default there are some error */
|
||||
arg->ret = 1;
|
||||
threads_args[i].ret = 1;
|
||||
|
||||
pthread_create(&validate_threads[i], NULL, pgBackupValidateFiles, arg);
|
||||
pthread_create(&threads[i], NULL, pgBackupValidateFiles, arg);
|
||||
}
|
||||
|
||||
/* Wait theads */
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
validate_files_args *arg = &(validate_threads_args[i]);
|
||||
validate_files_arg *arg = &(threads_args[i]);
|
||||
|
||||
pthread_join(validate_threads[i], NULL);
|
||||
pthread_join(threads[i], NULL);
|
||||
if (arg->corrupted)
|
||||
corrupted = true;
|
||||
if (arg->ret == 1)
|
||||
@ -114,8 +114,8 @@ pgBackupValidate(pgBackup *backup)
|
||||
if (!validation_isok)
|
||||
elog(ERROR, "Data files validation failed");
|
||||
|
||||
pfree(validate_threads);
|
||||
pfree(validate_threads_args);
|
||||
pfree(threads);
|
||||
pfree(threads_args);
|
||||
|
||||
/* cleanup */
|
||||
parray_walk(files, pgFileFree);
|
||||
@ -141,14 +141,14 @@ static void *
|
||||
pgBackupValidateFiles(void *arg)
|
||||
{
|
||||
int i;
|
||||
validate_files_args *arguments = (validate_files_args *)arg;
|
||||
validate_files_arg *arguments = (validate_files_arg *)arg;
|
||||
pg_crc32 crc;
|
||||
|
||||
for (i = 0; i < parray_num(arguments->files); i++)
|
||||
{
|
||||
struct stat st;
|
||||
|
||||
pgFile *file = (pgFile *) parray_get(arguments->files, i);
|
||||
|
||||
if (!pg_atomic_test_set_flag(&file->lock))
|
||||
continue;
|
||||
|
||||
@ -334,11 +334,13 @@ do_validate_instance(void)
|
||||
if (current_backup->status == BACKUP_STATUS_CORRUPT)
|
||||
{
|
||||
int j;
|
||||
|
||||
corrupted_backup_found = true;
|
||||
current_backup_id = base36enc_dup(current_backup->start_time);
|
||||
for (j = i - 1; j >= 0; j--)
|
||||
{
|
||||
pgBackup *backup = (pgBackup *) parray_get(backups, j);
|
||||
|
||||
if (backup->backup_mode == BACKUP_MODE_FULL)
|
||||
break;
|
||||
if (backup->status != BACKUP_STATUS_OK)
|
||||
|
Loading…
Reference in New Issue
Block a user