diff --git a/src/backup.c b/src/backup.c index beca8bc8..63a11bee 100644 --- a/src/backup.c +++ b/src/backup.c @@ -372,10 +372,10 @@ remote_copy_file(PGconn *conn, pgFile* file) static void * remote_backup_files(void *arg) { - int i; - backup_files_args *arguments = (backup_files_args *) arg; - int n_backup_files_list = parray_num(arguments->backup_files_list); - PGconn *file_backup_conn = NULL; + int i; + backup_files_arg *arguments = (backup_files_arg *) arg; + int n_backup_files_list = parray_num(arguments->files_list); + PGconn *file_backup_conn = NULL; for (i = 0; i < n_backup_files_list; i++) { @@ -385,7 +385,7 @@ remote_backup_files(void *arg) pgFile *file; int row_length; - file = (pgFile *) parray_get(arguments->backup_files_list, i); + file = (pgFile *) parray_get(arguments->files_list, i); /* We have already copied all directories */ if (S_ISDIR(file->mode)) @@ -465,12 +465,11 @@ do_backup_instance(void) XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr; /* arrays with meta info for multi threaded backup */ - pthread_t *backup_threads; - backup_files_args *backup_threads_args; + pthread_t *threads; + backup_files_arg *threads_args; bool backup_isok = true; pgBackup *prev_backup = NULL; - char prev_backup_filelist_path[MAXPGPATH]; parray *prev_backup_filelist = NULL; elog(LOG, "Database backup start"); @@ -512,6 +511,7 @@ do_backup_instance(void) current.backup_mode == BACKUP_MODE_DIFF_DELTA) { parray *backup_list; + char prev_backup_filelist_path[MAXPGPATH]; /* get list of backups already taken */ backup_list = catalog_get_backup_list(INVALID_BACKUP_ID); @@ -524,8 +524,8 @@ do_backup_instance(void) "Create new FULL backup before an incremental one."); parray_free(backup_list); - pgBackupGetPath(prev_backup, prev_backup_filelist_path, lengthof(prev_backup_filelist_path), - DATABASE_FILE_LIST); + pgBackupGetPath(prev_backup, prev_backup_filelist_path, + lengthof(prev_backup_filelist_path), DATABASE_FILE_LIST); /* Files of previous backup needed by DELTA backup */ prev_backup_filelist = dir_read_file_list(NULL, prev_backup_filelist_path); @@ -701,20 +701,20 @@ do_backup_instance(void) parray_qsort(backup_files_list, pgFileCompareSize); /* init thread args with own file lists */ - backup_threads = (pthread_t *) palloc(sizeof(pthread_t)*num_threads); - backup_threads_args = (backup_files_args *) palloc(sizeof(backup_files_args)*num_threads); + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads); for (i = 0; i < num_threads; i++) { - backup_files_args *arg = &(backup_threads_args[i]); + backup_files_arg *arg = &(threads_args[i]); arg->from_root = pgdata; arg->to_root = database_path; - arg->backup_files_list = backup_files_list; - arg->prev_backup_filelist = prev_backup_filelist; - arg->prev_backup_start_lsn = prev_backup_start_lsn; - arg->thread_backup_conn = NULL; - arg->thread_cancel_conn = NULL; + arg->files_list = backup_files_list; + arg->prev_filelist = prev_backup_filelist; + arg->prev_start_lsn = prev_backup_start_lsn; + arg->backup_conn = NULL; + arg->cancel_conn = NULL; /* By default there are some error */ arg->ret = 1; } @@ -723,20 +723,21 @@ do_backup_instance(void) elog(LOG, "Start transfering data files"); for (i = 0; i < num_threads; i++) { - backup_files_args *arg = &(backup_threads_args[i]); + backup_files_arg *arg = &(threads_args[i]); + elog(VERBOSE, "Start thread num: %i", i); if (!is_remote_backup) - pthread_create(&backup_threads[i], NULL, backup_files, arg); + pthread_create(&threads[i], NULL, backup_files, arg); else - pthread_create(&backup_threads[i], NULL, remote_backup_files, arg); + pthread_create(&threads[i], NULL, remote_backup_files, arg); } /* Wait threads */ for (i = 0; i < num_threads; i++) { - pthread_join(backup_threads[i], NULL); - if (backup_threads_args[i].ret == 1) + pthread_join(threads[i], NULL); + if (threads_args[i].ret == 1) backup_isok = false; } if (backup_isok) @@ -2021,17 +2022,17 @@ backup_disconnect(bool fatal, void *userdata) static void * backup_files(void *arg) { - int i; - backup_files_args *arguments = (backup_files_args *) arg; - int n_backup_files_list = parray_num(arguments->backup_files_list); + int i; + backup_files_arg *arguments = (backup_files_arg *) arg; + int n_backup_files_list = parray_num(arguments->files_list); /* backup a file */ for (i = 0; i < n_backup_files_list; i++) { int ret; struct stat buf; + pgFile *file = (pgFile *) parray_get(arguments->files_list, i); - pgFile *file = (pgFile *) parray_get(arguments->backup_files_list, i); elog(VERBOSE, "Copying file: \"%s\" ", file->path); if (!pg_atomic_test_set_flag(&file->lock)) continue; @@ -2077,11 +2078,15 @@ backup_files(void *arg) { int p; char *relative; - int n_prev_backup_files_list = parray_num(arguments->prev_backup_filelist); + int n_prev_files = parray_num(arguments->prev_filelist); + relative = GetRelativePath(file->path, arguments->from_root); - for (p = 0; p < n_prev_backup_files_list; p++) + for (p = 0; p < n_prev_files; p++) { - pgFile *prev_file = (pgFile *) parray_get(arguments->prev_backup_filelist, p); + pgFile *prev_file; + + prev_file = (pgFile *) parray_get(arguments->prev_filelist, p); + if (strcmp(relative, prev_file->path) == 0) { /* File exists in previous backup */ @@ -2098,7 +2103,7 @@ backup_files(void *arg) if (!backup_data_file(arguments, arguments->from_root, arguments->to_root, file, - arguments->prev_backup_start_lsn, + arguments->prev_start_lsn, current.backup_mode)) { file->write_size = BYTES_INVALID; @@ -2130,8 +2135,8 @@ backup_files(void *arg) } /* Close connection */ - if (arguments->thread_backup_conn) - pgut_disconnect(arguments->thread_backup_conn); + if (arguments->backup_conn) + pgut_disconnect(arguments->backup_conn); /* Data files transferring is successful */ arguments->ret = 0; @@ -2633,7 +2638,7 @@ get_last_ptrack_lsn(void) } char * -pg_ptrack_get_block(backup_files_args *arguments, +pg_ptrack_get_block(backup_files_arg *arguments, Oid dbOid, Oid tblsOid, Oid relOid, @@ -2658,17 +2663,17 @@ pg_ptrack_get_block(backup_files_args *arguments, sprintf(params[2], "%i", relOid); sprintf(params[3], "%u", blknum); - if (arguments->thread_backup_conn == NULL) + if (arguments->backup_conn == NULL) { - arguments->thread_backup_conn = pgut_connect(pgut_dbname); + arguments->backup_conn = pgut_connect(pgut_dbname); } - if (arguments->thread_cancel_conn == NULL) - arguments->thread_cancel_conn = PQgetCancel(arguments->thread_backup_conn); + if (arguments->cancel_conn == NULL) + arguments->cancel_conn = PQgetCancel(arguments->backup_conn); //elog(LOG, "db %i pg_ptrack_get_block(%i, %i, %u)",dbOid, tblsOid, relOid, blknum); - res = pgut_execute_parallel(arguments->thread_backup_conn, - arguments->thread_cancel_conn, + res = pgut_execute_parallel(arguments->backup_conn, + arguments->cancel_conn, "SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)", 4, (const char **)params, true); diff --git a/src/data.c b/src/data.c index 4ab8a1c0..b874b1bd 100644 --- a/src/data.c +++ b/src/data.c @@ -221,7 +221,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum, * to the backup file. */ static void -backup_data_page(backup_files_args *arguments, +backup_data_page(backup_files_arg *arguments, pgFile *file, XLogRecPtr prev_backup_start_lsn, BlockNumber blknum, BlockNumber nblocks, FILE *in, FILE *out, @@ -409,7 +409,7 @@ backup_data_page(backup_files_args *arguments, * backup with special header. */ bool -backup_data_file(backup_files_args* arguments, +backup_data_file(backup_files_arg* arguments, const char *from_root, const char *to_root, pgFile *file, XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode) diff --git a/src/parsexlog.c b/src/parsexlog.c index eacf9ec3..5d558ff4 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -5,7 +5,7 @@ * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California - * Portions Copyright (c) 2015-2017, Postgres Professional + * Portions Copyright (c) 2015-2018, Postgres Professional * *------------------------------------------------------------------------- */ @@ -101,6 +101,19 @@ typedef struct XLogPageReadPrivate TimeLineID tli; } XLogPageReadPrivate; +/* An argument for a thread function */ +typedef struct +{ + parray *files; + bool corrupted; + + /* + * Return value from the thread. + * 0 means there is no error, 1 - there is an error. + */ + int ret; +} xlog_thread_arg; + static int SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, @@ -125,6 +138,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli, XLogSegNo endSegNo, nextSegNo = 0; + pthread_t threads[num_threads]; + xlog_thread_arg thread_args[num_threads]; + elog(LOG, "Compiling pagemap"); if (!XRecOffIsValid(startpoint)) elog(ERROR, "Invalid startpoint value %X/%X", diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 39e2db67..887f6e5b 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -276,18 +276,20 @@ typedef struct { const char *from_root; const char *to_root; - parray *backup_files_list; - parray *prev_backup_filelist; - XLogRecPtr prev_backup_start_lsn; - PGconn *thread_backup_conn; - PGcancel *thread_cancel_conn; + + parray *files_list; + parray *prev_filelist; + XLogRecPtr prev_start_lsn; + + PGconn *backup_conn; + PGcancel *cancel_conn; /* * Return value from the thread. * 0 means there is no error, 1 - there is an error. */ int ret; -} backup_files_args; +} backup_files_arg; /* * return pointer that exceeds the length of prefix from character string. @@ -381,7 +383,7 @@ extern const char *deparse_backup_mode(BackupMode mode); extern void process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno); -extern char *pg_ptrack_get_block(backup_files_args *arguments, +extern char *pg_ptrack_get_block(backup_files_arg *arguments, Oid dbOid, Oid tblsOid, Oid relOid, BlockNumber blknum, size_t *result_size); @@ -485,7 +487,7 @@ extern int pgFileCompareLinked(const void *f1, const void *f2); extern int pgFileCompareSize(const void *f1, const void *f2); /* in data.c */ -extern bool backup_data_file(backup_files_args* arguments, +extern bool backup_data_file(backup_files_arg* arguments, const char *from_root, const char *to_root, pgFile *file, XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode); diff --git a/src/restore.c b/src/restore.c index dc339fd5..60bd32c2 100644 --- a/src/restore.c +++ b/src/restore.c @@ -28,7 +28,7 @@ typedef struct * 0 means there is no error, 1 - there is an error. */ int ret; -} restore_files_args; +} restore_files_arg; /* Tablespace mapping structures */ @@ -363,8 +363,8 @@ restore_backup(pgBackup *backup) parray *files; int i; /* arrays with meta info for multi threaded backup */ - pthread_t *restore_threads; - restore_files_args *restore_threads_args; + pthread_t *threads; + restore_files_arg *threads_args; bool restore_isok = true; if (backup->status != BACKUP_STATUS_OK) @@ -398,43 +398,44 @@ restore_backup(pgBackup *backup) pgBackupGetPath(backup, list_path, lengthof(list_path), DATABASE_FILE_LIST); files = dir_read_file_list(database_path, list_path); - restore_threads = (pthread_t *) palloc(sizeof(pthread_t)*num_threads); - restore_threads_args = (restore_files_args *) palloc(sizeof(restore_files_args)*num_threads); + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (restore_files_arg *) palloc(sizeof(restore_files_arg)*num_threads); /* setup threads */ for (i = 0; i < parray_num(files); i++) { - pgFile *file = (pgFile *) parray_get(files, i); + pgFile *file = (pgFile *) parray_get(files, i); + pg_atomic_clear_flag(&file->lock); } /* Restore files into target directory */ for (i = 0; i < num_threads; i++) { - restore_files_args *arg = &(restore_threads_args[i]); + restore_files_arg *arg = &(threads_args[i]); arg->files = files; arg->backup = backup; /* By default there are some error */ - arg->ret = 1; + threads_args[i].ret = 1; elog(LOG, "Start thread for num:%li", parray_num(files)); - pthread_create(&restore_threads[i], NULL, restore_files, arg); + pthread_create(&threads[i], NULL, restore_files, arg); } /* Wait theads */ for (i = 0; i < num_threads; i++) { - pthread_join(restore_threads[i], NULL); - if (restore_threads_args[i].ret == 1) + pthread_join(threads[i], NULL); + if (threads_args[i].ret == 1) restore_isok = false; } if (!restore_isok) elog(ERROR, "Data files restoring failed"); - pfree(restore_threads); - pfree(restore_threads_args); + pfree(threads); + pfree(threads_args); /* cleanup */ parray_walk(files, pgFileFree); @@ -710,7 +711,7 @@ static void * restore_files(void *arg) { int i; - restore_files_args *arguments = (restore_files_args *)arg; + restore_files_arg *arguments = (restore_files_arg *)arg; for (i = 0; i < parray_num(arguments->files); i++) { diff --git a/src/validate.c b/src/validate.c index 41467f25..61120d91 100644 --- a/src/validate.c +++ b/src/validate.c @@ -22,15 +22,15 @@ static bool corrupted_backup_found = false; typedef struct { - parray *files; - bool corrupted; + parray *files; + bool corrupted; /* * Return value from the thread. * 0 means there is no error, 1 - there is an error. */ int ret; -} validate_files_args; +} validate_files_arg; /* * Validate backup files. @@ -44,8 +44,8 @@ pgBackupValidate(pgBackup *backup) bool corrupted = false; bool validation_isok = true; /* arrays with meta info for multi threaded validate */ - pthread_t *validate_threads; - validate_files_args *validate_threads_args; + pthread_t *threads; + validate_files_arg *threads_args; int i; /* Revalidation is attempted for DONE, ORPHAN and CORRUPT backups */ @@ -83,29 +83,29 @@ pgBackupValidate(pgBackup *backup) } /* init thread args with own file lists */ - validate_threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); - validate_threads_args = (validate_files_args *) - palloc(sizeof(validate_files_args) * num_threads); + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (validate_files_arg *) + palloc(sizeof(validate_files_arg) * num_threads); /* Validate files */ for (i = 0; i < num_threads; i++) { - validate_files_args *arg = &(validate_threads_args[i]); + validate_files_arg *arg = &(threads_args[i]); arg->files = files; arg->corrupted = false; /* By default there are some error */ - arg->ret = 1; + threads_args[i].ret = 1; - pthread_create(&validate_threads[i], NULL, pgBackupValidateFiles, arg); + pthread_create(&threads[i], NULL, pgBackupValidateFiles, arg); } /* Wait theads */ for (i = 0; i < num_threads; i++) { - validate_files_args *arg = &(validate_threads_args[i]); + validate_files_arg *arg = &(threads_args[i]); - pthread_join(validate_threads[i], NULL); + pthread_join(threads[i], NULL); if (arg->corrupted) corrupted = true; if (arg->ret == 1) @@ -114,8 +114,8 @@ pgBackupValidate(pgBackup *backup) if (!validation_isok) elog(ERROR, "Data files validation failed"); - pfree(validate_threads); - pfree(validate_threads_args); + pfree(threads); + pfree(threads_args); /* cleanup */ parray_walk(files, pgFileFree); @@ -141,14 +141,14 @@ static void * pgBackupValidateFiles(void *arg) { int i; - validate_files_args *arguments = (validate_files_args *)arg; + validate_files_arg *arguments = (validate_files_arg *)arg; pg_crc32 crc; for (i = 0; i < parray_num(arguments->files); i++) { struct stat st; + pgFile *file = (pgFile *) parray_get(arguments->files, i); - pgFile *file = (pgFile *) parray_get(arguments->files, i); if (!pg_atomic_test_set_flag(&file->lock)) continue; @@ -334,11 +334,13 @@ do_validate_instance(void) if (current_backup->status == BACKUP_STATUS_CORRUPT) { int j; + corrupted_backup_found = true; current_backup_id = base36enc_dup(current_backup->start_time); for (j = i - 1; j >= 0; j--) { pgBackup *backup = (pgBackup *) parray_get(backups, j); + if (backup->backup_mode == BACKUP_MODE_FULL) break; if (backup->status != BACKUP_STATUS_OK)