1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-05 13:20:31 +02:00

PGPRO-1973: Add support pthreads for VALIDATE command

This commit is contained in:
Arthur Zakirov 2019-03-04 19:02:29 +03:00
parent 001c0c2670
commit 62a2850de0
13 changed files with 711 additions and 521 deletions

View File

@ -406,7 +406,7 @@ remote_backup_files(void *arg)
instance_config.pguser);
/* check for interrupt */
if (interrupted)
if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during backup");
query_str = psprintf("FILE_BACKUP FILEPATH '%s'",file->path);
@ -631,6 +631,7 @@ do_backup_instance(void)
/* By default there are some error */
stream_thread_arg.ret = 1;
thread_interrupted = false;
pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg);
}
@ -698,8 +699,7 @@ do_backup_instance(void)
* where this backup has started.
*/
extractPageMap(arclog_path, current.tli, instance_config.xlog_seg_size,
prev_backup->start_lsn, current.start_lsn,
backup_files_list);
prev_backup->start_lsn, current.start_lsn);
}
else if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
{
@ -778,6 +778,7 @@ do_backup_instance(void)
}
/* Run threads */
thread_interrupted = false;
elog(INFO, "Start transfering data files");
for (i = 0; i < num_threads; i++)
{
@ -2288,7 +2289,7 @@ backup_files(void *arg)
elog(VERBOSE, "Copying file: \"%s\" ", file->path);
/* check for interrupt */
if (interrupted)
if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during backup");
if (progress)
@ -2757,7 +2758,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
static XLogRecPtr prevpos = InvalidXLogRecPtr;
/* check for interrupt */
if (interrupted)
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during backup");
/* we assume that we get called once at the end of each segment */

View File

@ -22,6 +22,8 @@
#include <zlib.h>
#endif
#include "utils/thread.h"
/* Union to ease operations on relation pages */
typedef union DataPage
{
@ -318,7 +320,7 @@ prepare_page(backup_files_arg *arguments,
BlockNumber absolute_blknum = file->segno * RELSEG_SIZE + blknum;
/* check for interrupt */
if (interrupted)
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during backup");
/*

View File

@ -134,7 +134,7 @@ help_pg_probackup(void)
printf(_(" [--skip-external-dirs]\n"));
printf(_("\n %s validate -B backup-path [--instance=instance_name]\n"), PROGRAM_NAME);
printf(_(" [-i backup-id] [--progress]\n"));
printf(_(" [-i backup-id] [--progress] [-j num-threads]\n"));
printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n"));
printf(_(" [--recovery-target-name=target-name]\n"));
printf(_(" [--timeline=timeline]\n"));
@ -361,6 +361,7 @@ help_validate(void)
printf(_(" -i, --backup-id=backup-id backup to validate\n"));
printf(_(" --progress show progress\n"));
printf(_(" -j, --threads=NUM number of parallel threads\n"));
printf(_(" --time=time time stamp up to which recovery will proceed\n"));
printf(_(" --xid=xid transaction ID up to which recovery will proceed\n"));
printf(_(" --lsn=lsn LSN of the write-ahead log location up to which recovery will proceed\n"));

View File

@ -284,6 +284,7 @@ merge_backups(pgBackup *to_backup, pgBackup *from_backup)
pg_atomic_init_flag(&file->lock);
}
thread_interrupted = false;
for (i = 0; i < num_threads; i++)
{
merge_files_arg *arg = &(threads_args[i]);
@ -456,7 +457,7 @@ merge_files(void *arg)
continue;
/* check for interrupt */
if (interrupted)
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during merging backups");
/* Directories were created before */

File diff suppressed because it is too large Load Diff

View File

@ -573,14 +573,11 @@ extern bool check_file_pages(pgFile *file, XLogRecPtr stop_lsn,
/* parsexlog.c */
extern void extractPageMap(const char *archivedir,
TimeLineID tli, uint32 seg_size,
XLogRecPtr startpoint, XLogRecPtr endpoint,
parray *files);
extern void validate_wal(pgBackup *backup,
const char *archivedir,
time_t target_time,
TransactionId target_xid,
XLogRecPtr target_lsn,
TimeLineID tli, uint32 seg_size);
XLogRecPtr startpoint, XLogRecPtr endpoint);
extern void validate_wal(pgBackup *backup, const char *archivedir,
time_t target_time, TransactionId target_xid,
XLogRecPtr target_lsn, TimeLineID tli,
uint32 seg_size);
extern bool read_recovery_info(const char *archivedir, TimeLineID tli,
uint32 seg_size,
XLogRecPtr start_lsn, XLogRecPtr stop_lsn,

View File

@ -569,6 +569,7 @@ restore_backup(pgBackup *backup, const char *external_dir_str)
threads_args = (restore_files_arg *) palloc(sizeof(restore_files_arg)*num_threads);
/* Restore files into target directory */
thread_interrupted = false;
for (i = 0; i < num_threads; i++)
{
restore_files_arg *arg = &(threads_args[i]);
@ -680,7 +681,7 @@ restore_files(void *arg)
lengthof(from_root), DATABASE_DIR);
/* check for interrupt */
if (interrupted)
if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during restore database");
rel_path = GetRelativePath(file->path,from_root);

View File

@ -121,9 +121,6 @@ exit_if_necessary(int elevel)
{
if (elevel > WARNING && !in_cleanup)
{
/* Interrupt other possible routines */
interrupted = true;
if (loggin_in_progress)
{
loggin_in_progress = false;
@ -132,11 +129,15 @@ exit_if_necessary(int elevel)
/* If this is not the main thread then don't call exit() */
if (main_tid != pthread_self())
{
#ifdef WIN32
ExitThread(elevel);
#else
pthread_exit(NULL);
#endif
/* Interrupt other possible routines */
thread_interrupted = true;
}
else
exit(elevel);
}

View File

@ -700,7 +700,7 @@ on_interrupt(void)
int save_errno = errno;
char errbuf[256];
/* Set interruped flag */
/* Set interrupted flag */
interrupted = true;
/*

View File

@ -7,8 +7,12 @@
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include "thread.h"
bool thread_interrupted = false;
#ifdef WIN32
DWORD main_tid = 0;
#else

View File

@ -34,7 +34,7 @@ extern DWORD main_tid;
extern pthread_t main_tid;
#endif
extern bool thread_interrupted;
extern int pthread_lock(pthread_mutex_t *mp);

View File

@ -117,6 +117,7 @@ pgBackupValidate(pgBackup *backup)
palloc(sizeof(validate_files_arg) * num_threads);
/* Validate files */
thread_interrupted = false;
for (i = 0; i < num_threads; i++)
{
validate_files_arg *arg = &(threads_args[i]);
@ -186,7 +187,7 @@ pgBackupValidateFiles(void *arg)
if (!pg_atomic_test_set_flag(&file->lock))
continue;
if (interrupted)
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during validate");
/* Validate only regular files */

View File

@ -63,7 +63,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
[--skip-block-validation]
pg_probackup validate -B backup-path [--instance=instance_name]
[-i backup-id] [--progress]
[-i backup-id] [--progress] [-j num-threads]
[--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]
[--recovery-target-name=target-name]
[--timeline=timeline]