diff --git a/src/backup.c b/src/backup.c index 2014a6ef..bd5ee590 100644 --- a/src/backup.c +++ b/src/backup.c @@ -406,7 +406,7 @@ remote_backup_files(void *arg) instance_config.pguser); /* check for interrupt */ - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "interrupted during backup"); query_str = psprintf("FILE_BACKUP FILEPATH '%s'",file->path); @@ -631,6 +631,7 @@ do_backup_instance(void) /* By default there are some error */ stream_thread_arg.ret = 1; + thread_interrupted = false; pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg); } @@ -698,8 +699,7 @@ do_backup_instance(void) * where this backup has started. */ extractPageMap(arclog_path, current.tli, instance_config.xlog_seg_size, - prev_backup->start_lsn, current.start_lsn, - backup_files_list); + prev_backup->start_lsn, current.start_lsn); } else if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK) { @@ -778,6 +778,7 @@ do_backup_instance(void) } /* Run threads */ + thread_interrupted = false; elog(INFO, "Start transfering data files"); for (i = 0; i < num_threads; i++) { @@ -2288,7 +2289,7 @@ backup_files(void *arg) elog(VERBOSE, "Copying file: \"%s\" ", file->path); /* check for interrupt */ - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "interrupted during backup"); if (progress) @@ -2757,7 +2758,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) static XLogRecPtr prevpos = InvalidXLogRecPtr; /* check for interrupt */ - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "Interrupted during backup"); /* we assume that we get called once at the end of each segment */ diff --git a/src/data.c b/src/data.c index fc2d5c38..36e0de28 100644 --- a/src/data.c +++ b/src/data.c @@ -22,6 +22,8 @@ #include #endif +#include "utils/thread.h" + /* Union to ease operations on relation pages */ typedef union DataPage { @@ -318,7 +320,7 @@ prepare_page(backup_files_arg *arguments, BlockNumber absolute_blknum = file->segno * RELSEG_SIZE + blknum; /* check for interrupt */ - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "Interrupted during backup"); /* diff --git a/src/help.c b/src/help.c index 2bedbfef..84b56678 100644 --- a/src/help.c +++ b/src/help.c @@ -134,7 +134,7 @@ help_pg_probackup(void) printf(_(" [--skip-external-dirs]\n")); printf(_("\n %s validate -B backup-path [--instance=instance_name]\n"), PROGRAM_NAME); - printf(_(" [-i backup-id] [--progress]\n")); + printf(_(" [-i backup-id] [--progress] [-j num-threads]\n")); printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n")); printf(_(" [--recovery-target-name=target-name]\n")); printf(_(" [--timeline=timeline]\n")); @@ -361,6 +361,7 @@ help_validate(void) printf(_(" -i, --backup-id=backup-id backup to validate\n")); printf(_(" --progress show progress\n")); + printf(_(" -j, --threads=NUM number of parallel threads\n")); printf(_(" --time=time time stamp up to which recovery will proceed\n")); printf(_(" --xid=xid transaction ID up to which recovery will proceed\n")); printf(_(" --lsn=lsn LSN of the write-ahead log location up to which recovery will proceed\n")); diff --git a/src/merge.c b/src/merge.c index e5215cdc..5726e36c 100644 --- a/src/merge.c +++ b/src/merge.c @@ -284,6 +284,7 @@ merge_backups(pgBackup *to_backup, pgBackup *from_backup) pg_atomic_init_flag(&file->lock); } + thread_interrupted = false; for (i = 0; i < num_threads; i++) { merge_files_arg *arg = &(threads_args[i]); @@ -456,7 +457,7 @@ merge_files(void *arg) continue; /* check for interrupt */ - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "Interrupted during merging backups"); /* Directories were created before */ diff --git a/src/parsexlog.c b/src/parsexlog.c index 5a625884..198f9c5a 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -83,42 +83,61 @@ typedef struct xl_xact_abort /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ } xl_xact_abort; -static void extractPageInfo(XLogReaderState *record); -static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime); +/* + * XLogRecTarget allows to track the last recovery targets. Currently used only + * within validate_wal(). + */ +typedef struct XLogRecTarget +{ + TimestampTz rec_time; + TransactionId rec_xid; + XLogRecPtr rec_lsn; +} XLogRecTarget; -typedef struct XLogPageReadPrivate +typedef struct XLogReaderData { int thread_num; - const char *archivedir; TimeLineID tli; - uint32 xlog_seg_size; + + XLogRecTarget cur_rec; + XLogSegNo xlogsegno; + bool xlogexists; char page_buf[XLOG_BLCKSZ]; uint32 prev_page_off; - bool manual_switch; bool need_switch; int xlogfile; - XLogSegNo xlogsegno; char xlogpath[MAXPGPATH]; - bool xlogexists; #ifdef HAVE_LIBZ gzFile gz_xlogfile; char gz_xlogpath[MAXPGPATH]; #endif -} XLogPageReadPrivate; +} XLogReaderData; + +/* Function to process a WAL record */ +typedef void (*xlog_record_function) (XLogReaderState *record, + XLogReaderData *reader_data, + bool *stop_reading); /* An argument for a thread function */ typedef struct { - XLogPageReadPrivate private_data; + XLogReaderData reader_data; + + xlog_record_function process_record; XLogRecPtr startpoint; XLogRecPtr endpoint; XLogSegNo endSegNo; + /* + * The thread got the recovery target. + */ + bool got_target; + /* * Return value from the thread. * 0 means there is no error, 1 - there is an error. @@ -130,14 +149,41 @@ static int SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI); -static XLogReaderState *InitXLogPageRead(XLogPageReadPrivate *private_data, +static XLogReaderState *InitXLogPageRead(XLogReaderData *reader_data, const char *archivedir, - TimeLineID tli, uint32 xlog_seg_size, + TimeLineID tli, uint32 segment_size, + bool manual_switch, + bool consistent_read, bool allocate_reader); +static bool RunXLogThreads(const char *archivedir, + time_t target_time, TransactionId target_xid, + XLogRecPtr target_lsn, + TimeLineID tli, uint32 segment_size, + XLogRecPtr startpoint, XLogRecPtr endpoint, + bool consistent_read, + xlog_record_function process_record, + XLogRecTarget *last_rec); +//static XLogReaderState *InitXLogThreadRead(xlog_thread_arg *arg); +static bool SwitchThreadToNextWal(XLogReaderState *xlogreader, + xlog_thread_arg *arg); +static bool XLogWaitForConsistency(XLogReaderState *xlogreader); +static void *XLogThreadWorker(void *arg); static void CleanupXLogPageRead(XLogReaderState *xlogreader); -static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel); +static void PrintXLogCorruptionMsg(XLogReaderData *reader_data, int elevel); -static XLogSegNo nextSegNoToRead = 0; +static void extractPageInfo(XLogReaderState *record, + XLogReaderData *reader_data, bool *stop_reading); +static void validateXLogRecord(XLogReaderState *record, + XLogReaderData *reader_data, bool *stop_reading); +static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime); + +static XLogSegNo segno_start = 0; +/* Segment number where target record is located */ +static XLogSegNo segno_target = 0; +/* Next segment number to read by a thread */ +static XLogSegNo segno_next = 0; +/* Number of segments already read by threads */ +static uint32 segnum_read = 0; static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER; /* copied from timestamp.c */ @@ -156,189 +202,25 @@ timestamptz_to_time_t(TimestampTz t) return result; } +static const char *wal_archivedir = NULL; +static uint32 wal_seg_size = 0; /* - * Do manual switch to the next WAL segment. - * - * Returns false if the reader reaches the end of a WAL segment list. + * If true a wal reader thread switches to the next segment using + * segno_next. */ -static bool -switchToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg) -{ - XLogPageReadPrivate *private_data; - XLogRecPtr found; - - private_data = (XLogPageReadPrivate *) xlogreader->private_data; - private_data->need_switch = false; - - /* Critical section */ - pthread_lock(&wal_segment_mutex); - Assert(nextSegNoToRead); - private_data->xlogsegno = nextSegNoToRead; - nextSegNoToRead++; - pthread_mutex_unlock(&wal_segment_mutex); - - /* We've reached the end */ - if (private_data->xlogsegno > arg->endSegNo) - return false; - - /* Adjust next record position */ - GetXLogRecPtr(private_data->xlogsegno, 0, - private_data->xlog_seg_size, arg->startpoint); - /* We need to close previously opened file if it wasn't closed earlier */ - CleanupXLogPageRead(xlogreader); - /* Skip over the page header and contrecord if any */ - found = XLogFindNextRecord(xlogreader, arg->startpoint); - - /* - * We get invalid WAL record pointer usually when WAL segment is - * absent or is corrupted. - */ - if (XLogRecPtrIsInvalid(found)) - { - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", - private_data->thread_num, - (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); - PrintXLogCorruptionMsg(private_data, ERROR); - } - arg->startpoint = found; - - elog(VERBOSE, "Thread [%d]: switched to LSN %X/%X", - private_data->thread_num, - (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); - - return true; -} +static bool wal_manual_switch = false; +/* + * If true a wal reader thread waits for other threads if the thread met absent + * wal segment. + */ +static bool wal_consistent_read = false; /* - * extractPageMap() worker. + * Variables used within validate_wal() and validateXLogRecord() to stop workers */ -static void * -doExtractPageMap(void *arg) -{ - xlog_thread_arg *extract_arg = (xlog_thread_arg *) arg; - XLogPageReadPrivate *private_data; - XLogReaderState *xlogreader; - XLogSegNo nextSegNo = 0; - XLogRecPtr found; - char *errormsg; - - private_data = &extract_arg->private_data; -#if PG_VERSION_NUM >= 110000 - xlogreader = XLogReaderAllocate(private_data->xlog_seg_size, - &SimpleXLogPageRead, private_data); -#else - xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data); -#endif - if (xlogreader == NULL) - elog(ERROR, "Thread [%d]: out of memory", private_data->thread_num); - xlogreader->system_identifier = instance_config.system_identifier; - - found = XLogFindNextRecord(xlogreader, extract_arg->startpoint); - - /* - * We get invalid WAL record pointer usually when WAL segment is absent or - * is corrupted. - */ - if (XLogRecPtrIsInvalid(found)) - { - if (xlogreader->errormsg_buf[0] != '\0') - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s", - private_data->thread_num, - (uint32) (extract_arg->startpoint >> 32), - (uint32) (extract_arg->startpoint), - xlogreader->errormsg_buf); - else - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", - private_data->thread_num, - (uint32) (extract_arg->startpoint >> 32), - (uint32) (extract_arg->startpoint)); - PrintXLogCorruptionMsg(private_data, ERROR); - } - extract_arg->startpoint = found; - - elog(VERBOSE, "Thread [%d]: Starting LSN: %X/%X", - private_data->thread_num, - (uint32) (extract_arg->startpoint >> 32), - (uint32) (extract_arg->startpoint)); - - /* Switch WAL segment manually below without using SimpleXLogPageRead() */ - private_data->manual_switch = true; - - do - { - XLogRecord *record; - - if (interrupted) - elog(ERROR, "Thread [%d]: Interrupted during WAL reading", - private_data->thread_num); - - /* - * We need to switch to the next WAL segment after reading previous - * record. It may happen if we read contrecord. - */ - if (private_data->need_switch) - { - if (!switchToNextWal(xlogreader, extract_arg)) - break; - } - - record = XLogReadRecord(xlogreader, extract_arg->startpoint, &errormsg); - - if (record == NULL) - { - XLogRecPtr errptr; - - /* - * There is no record, try to switch to the next WAL segment. - * Usually SimpleXLogPageRead() does it by itself. But here we need - * to do it manually to support threads. - */ - if (private_data->need_switch && errormsg == NULL) - { - if (switchToNextWal(xlogreader, extract_arg)) - continue; - else - break; - } - - errptr = extract_arg->startpoint ? - extract_arg->startpoint : xlogreader->EndRecPtr; - - if (errormsg) - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s", - private_data->thread_num, - (uint32) (errptr >> 32), (uint32) (errptr), - errormsg); - else - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", - private_data->thread_num, - (uint32) (errptr >> 32), (uint32) (errptr)); - - /* - * If we don't have all WAL files from prev backup start_lsn to current - * start_lsn, we won't be able to build page map and PAGE backup will - * be incorrect. Stop it and throw an error. - */ - PrintXLogCorruptionMsg(private_data, ERROR); - } - - extractPageInfo(xlogreader); - - /* continue reading at next record */ - extract_arg->startpoint = InvalidXLogRecPtr; - - GetXLogSegNo(xlogreader->EndRecPtr, nextSegNo, - private_data->xlog_seg_size); - } while (nextSegNo <= extract_arg->endSegNo && - xlogreader->ReadRecPtr < extract_arg->endpoint); - - CleanupXLogPageRead(xlogreader); - XLogReaderFree(xlogreader); - - /* Extracting is successful */ - extract_arg->ret = 0; - return NULL; -} +static time_t wal_target_time = 0; +static TransactionId wal_target_xid = InvalidTransactionId; +static XLogRecPtr wal_target_lsn = InvalidXLogRecPtr; /* * Read WAL from the archive directory, from 'startpoint' to 'endpoint' on the @@ -348,86 +230,20 @@ doExtractPageMap(void *arg) * file. */ void -extractPageMap(const char *archivedir, TimeLineID tli, uint32 seg_size, - XLogRecPtr startpoint, XLogRecPtr endpoint, parray *files) +extractPageMap(const char *archivedir, TimeLineID tli, uint32 wal_seg_size, + XLogRecPtr startpoint, XLogRecPtr endpoint) { - int i; - int threads_need = 0; - XLogSegNo endSegNo; bool extract_isok = true; - pthread_t *threads; - xlog_thread_arg *thread_args; time_t start_time, end_time; elog(LOG, "Compiling pagemap"); - if (!XRecOffIsValid(startpoint)) - elog(ERROR, "Invalid startpoint value %X/%X", - (uint32) (startpoint >> 32), (uint32) (startpoint)); - - if (!XRecOffIsValid(endpoint)) - elog(ERROR, "Invalid endpoint value %X/%X", - (uint32) (endpoint >> 32), (uint32) (endpoint)); - - GetXLogSegNo(endpoint, endSegNo, seg_size); - - nextSegNoToRead = 0; time(&start_time); - threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); - thread_args = (xlog_thread_arg *) palloc(sizeof(xlog_thread_arg)*num_threads); - - /* - * Initialize thread args. - * - * Each thread works with its own WAL segment and we need to adjust - * startpoint value for each thread. - */ - for (i = 0; i < num_threads; i++) - { - InitXLogPageRead(&thread_args[i].private_data, archivedir, tli, - seg_size, false); - thread_args[i].private_data.thread_num = i + 1; - - thread_args[i].startpoint = startpoint; - thread_args[i].endpoint = endpoint; - thread_args[i].endSegNo = endSegNo; - /* By default there is some error */ - thread_args[i].ret = 1; - - threads_need++; - - /* Adjust startpoint to the next thread */ - if (nextSegNoToRead == 0) - GetXLogSegNo(startpoint, nextSegNoToRead, seg_size); - - nextSegNoToRead++; - /* - * If we need to read less WAL segments than num_threads, create less - * threads. - */ - if (nextSegNoToRead > endSegNo) - break; - GetXLogRecPtr(nextSegNoToRead, 0, seg_size, startpoint); - } - - /* Run threads */ - for (i = 0; i < threads_need; i++) - { - elog(VERBOSE, "Start WAL reader thread: %d", i + 1); - pthread_create(&threads[i], NULL, doExtractPageMap, &thread_args[i]); - } - - /* Wait for threads */ - for (i = 0; i < threads_need; i++) - { - pthread_join(threads[i], NULL); - if (thread_args[i].ret == 1) - extract_isok = false; - } - - pfree(threads); - pfree(thread_args); + extract_isok = RunXLogThreads(archivedir, 0, InvalidTransactionId, + InvalidXLogRecPtr, tli, wal_seg_size, + startpoint, endpoint, false, extractPageInfo, + NULL); time(&end_time); if (extract_isok) @@ -438,48 +254,26 @@ extractPageMap(const char *archivedir, TimeLineID tli, uint32 seg_size, } /* - * Ensure that the backup has all wal files needed for recovery to consistent state. + * Ensure that the backup has all wal files needed for recovery to consistent + * state. + * + * WAL records reading is processed using threads. Each thread reads single WAL + * file. */ static void validate_backup_wal_from_start_to_stop(pgBackup *backup, - char *backup_xlog_path, TimeLineID tli, + const char *archivedir, TimeLineID tli, uint32 xlog_seg_size) { - XLogRecPtr startpoint = backup->start_lsn; - XLogRecord *record; - XLogReaderState *xlogreader; - char *errormsg; - XLogPageReadPrivate private; - bool got_endpoint = false; + bool got_endpoint; - xlogreader = InitXLogPageRead(&private, backup_xlog_path, tli, - xlog_seg_size, true); - - while (true) - { - record = XLogReadRecord(xlogreader, startpoint, &errormsg); - - if (record == NULL) - { - if (errormsg) - elog(WARNING, "%s", errormsg); - - break; - } - - /* Got WAL record at stop_lsn */ - if (xlogreader->ReadRecPtr == backup->stop_lsn) - { - got_endpoint = true; - break; - } - startpoint = InvalidXLogRecPtr; /* continue reading at next record */ - } + got_endpoint = RunXLogThreads(archivedir, 0, InvalidTransactionId, + InvalidXLogRecPtr, tli, xlog_seg_size, + backup->start_lsn, backup->stop_lsn, + false, NULL, NULL); if (!got_endpoint) { - PrintXLogCorruptionMsg(&private, WARNING); - /* * If we don't have WAL between start_lsn and stop_lsn, * the backup is definitely corrupted. Update its status. @@ -494,10 +288,6 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup, (uint32) (backup->stop_lsn >> 32), (uint32) (backup->stop_lsn)); } - - /* clean */ - CleanupXLogPageRead(xlogreader); - XLogReaderFree(xlogreader); } /* @@ -508,20 +298,12 @@ validate_backup_wal_from_start_to_stop(pgBackup *backup, void validate_wal(pgBackup *backup, const char *archivedir, time_t target_time, TransactionId target_xid, - XLogRecPtr target_lsn, - TimeLineID tli, uint32 seg_size) + XLogRecPtr target_lsn, TimeLineID tli, uint32 wal_seg_size) { - XLogRecPtr startpoint = backup->start_lsn; const char *backup_id; - XLogRecord *record; - XLogReaderState *xlogreader; - char *errormsg; - XLogPageReadPrivate private; - TransactionId last_xid = InvalidTransactionId; - TimestampTz last_time = 0; + XLogRecTarget last_rec; char last_timestamp[100], target_timestamp[100]; - XLogRecPtr last_lsn = InvalidXLogRecPtr; bool all_wal = false; char backup_xlog_path[MAXPGPATH]; @@ -548,11 +330,11 @@ validate_wal(pgBackup *backup, const char *archivedir, DATABASE_DIR, PG_XLOG_DIR); validate_backup_wal_from_start_to_stop(backup, backup_xlog_path, tli, - seg_size); + wal_seg_size); } else validate_backup_wal_from_start_to_stop(backup, (char *) archivedir, tli, - seg_size); + wal_seg_size); if (backup->status == BACKUP_STATUS_CORRUPT) { @@ -563,7 +345,8 @@ validate_wal(pgBackup *backup, const char *archivedir, * If recovery target is provided check that we can restore backup to a * recovery target time or xid. */ - if (!TransactionIdIsValid(target_xid) && target_time == 0 && !XRecOffIsValid(target_lsn)) + if (!TransactionIdIsValid(target_xid) && target_time == 0 && + !XRecOffIsValid(target_lsn)) { /* Recovery target is not given so exit */ elog(INFO, "Backup %s WAL segments are valid", backup_id); @@ -582,89 +365,41 @@ validate_wal(pgBackup *backup, const char *archivedir, * up to the given recovery target. * In any case we cannot restore to the point before stop_lsn. */ - xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, - true); /* We can restore at least up to the backup end */ - time2iso(last_timestamp, lengthof(last_timestamp), backup->recovery_time); - last_xid = backup->recovery_xid; - last_lsn = backup->stop_lsn; + last_rec.rec_time = 0; + last_rec.rec_xid = backup->recovery_xid; + last_rec.rec_lsn = backup->stop_lsn; - if ((TransactionIdIsValid(target_xid) && target_xid == last_xid) + time2iso(last_timestamp, lengthof(last_timestamp), backup->recovery_time); + + if ((TransactionIdIsValid(target_xid) && target_xid == last_rec.rec_xid) || (target_time != 0 && backup->recovery_time >= target_time) - || (XRecOffIsValid(target_lsn) && backup->stop_lsn >= target_lsn)) + || (XRecOffIsValid(target_lsn) && last_rec.rec_lsn >= target_lsn)) all_wal = true; - startpoint = backup->stop_lsn; - while (true) - { - bool timestamp_record; - - record = XLogReadRecord(xlogreader, startpoint, &errormsg); - if (record == NULL) - { - if (errormsg) - elog(WARNING, "%s", errormsg); - - break; - } - - timestamp_record = getRecordTimestamp(xlogreader, &last_time); - if (XLogRecGetXid(xlogreader) != InvalidTransactionId) - last_xid = XLogRecGetXid(xlogreader); - last_lsn = xlogreader->ReadRecPtr; - - /* Check target xid */ - if (TransactionIdIsValid(target_xid) && target_xid == last_xid) - { - all_wal = true; - break; - } - /* Check target time */ - else if (target_time != 0 && timestamp_record && - timestamptz_to_time_t(last_time) >= target_time) - { - all_wal = true; - break; - } - /* Check target lsn */ - else if (XRecOffIsValid(target_lsn) && last_lsn >= target_lsn) - { - all_wal = true; - break; - } - /* If there are no target xid, target time and target lsn */ - else if (!TransactionIdIsValid(target_xid) && target_time == 0 && - !XRecOffIsValid(target_lsn)) - { - all_wal = true; - /* We don't stop here. We want to get last_xid and last_time */ - } - - startpoint = InvalidXLogRecPtr; /* continue reading at next record */ - } - - if (last_time > 0) + all_wal = all_wal || + RunXLogThreads(archivedir, target_time, target_xid, target_lsn, + tli, wal_seg_size, backup->stop_lsn, + InvalidXLogRecPtr, true, validateXLogRecord, &last_rec); + if (last_rec.rec_time > 0) time2iso(last_timestamp, lengthof(last_timestamp), - timestamptz_to_time_t(last_time)); + timestamptz_to_time_t(last_rec.rec_time)); /* There are all needed WAL records */ if (all_wal) elog(INFO, "backup validation completed successfully on time %s, xid " XID_FMT " and LSN %X/%X", - last_timestamp, last_xid, - (uint32) (last_lsn >> 32), (uint32) last_lsn); + last_timestamp, last_rec.rec_xid, + (uint32) (last_rec.rec_lsn >> 32), (uint32) last_rec.rec_lsn); /* Some needed WAL records are absent */ else { - PrintXLogCorruptionMsg(&private, WARNING); - elog(WARNING, "recovery can be done up to time %s, xid " XID_FMT " and LSN %X/%X", - last_timestamp, last_xid, - (uint32) (last_lsn >> 32), (uint32) last_lsn); + last_timestamp, last_rec.rec_xid, + (uint32) (last_rec.rec_lsn >> 32), (uint32) last_rec.rec_lsn); if (target_time > 0) - time2iso(target_timestamp, lengthof(target_timestamp), - target_time); + time2iso(target_timestamp, lengthof(target_timestamp), target_time); if (TransactionIdIsValid(target_xid) && target_time != 0) elog(ERROR, "not enough WAL records to time %s and xid " XID_FMT, target_timestamp, target_xid); @@ -678,10 +413,6 @@ validate_wal(pgBackup *backup, const char *archivedir, elog(ERROR, "not enough WAL records to lsn %X/%X", (uint32) (target_lsn >> 32), (uint32) (target_lsn)); } - - /* clean */ - CleanupXLogPageRead(xlogreader); - XLogReaderFree(xlogreader); } /* @@ -690,13 +421,13 @@ validate_wal(pgBackup *backup, const char *archivedir, * pg_stop_backup(). */ bool -read_recovery_info(const char *archivedir, TimeLineID tli, uint32 seg_size, +read_recovery_info(const char *archivedir, TimeLineID tli, uint32 wal_seg_size, XLogRecPtr start_lsn, XLogRecPtr stop_lsn, time_t *recovery_time, TransactionId *recovery_xid) { XLogRecPtr startpoint = stop_lsn; XLogReaderState *xlogreader; - XLogPageReadPrivate private; + XLogReaderData reader_data; bool res; if (!XRecOffIsValid(start_lsn)) @@ -707,7 +438,8 @@ read_recovery_info(const char *archivedir, TimeLineID tli, uint32 seg_size, elog(ERROR, "Invalid stop_lsn value %X/%X", (uint32) (stop_lsn >> 32), (uint32) (stop_lsn)); - xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true); + xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size, + false, true, true); /* Read records from stop_lsn down to start_lsn */ do @@ -762,10 +494,10 @@ cleanup: */ bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, - TimeLineID target_tli, uint32 seg_size) + TimeLineID target_tli, uint32 wal_seg_size) { XLogReaderState *xlogreader; - XLogPageReadPrivate private; + XLogReaderData reader_data; char *errormsg; bool res; @@ -773,8 +505,8 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, elog(ERROR, "Invalid target_lsn value %X/%X", (uint32) (target_lsn >> 32), (uint32) (target_lsn)); - xlogreader = InitXLogPageRead(&private, archivedir, target_tli, seg_size, - true); + xlogreader = InitXLogPageRead(&reader_data, archivedir, target_tli, + wal_seg_size, false, false, true); res = XLogReadRecord(xlogreader, target_lsn, &errormsg) != NULL; /* Didn't find 'target_lsn' and there is no error, return false */ @@ -797,16 +529,16 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment, - uint32 seg_size) + uint32 wal_seg_size) { XLogReaderState *xlogreader; - XLogPageReadPrivate private; + XLogReaderData reader_data; XLogRecPtr startpoint; XLogSegNo start_segno; XLogSegNo segno; XLogRecPtr res = InvalidXLogRecPtr; - GetXLogSegNo(stop_lsn, segno, seg_size); + GetXLogSegNo(stop_lsn, segno, wal_seg_size); if (segno <= 1) elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno); @@ -814,19 +546,20 @@ get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, if (seek_prev_segment) segno = segno - 1; - xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true); + xlogreader = InitXLogPageRead(&reader_data, archivedir, tli, wal_seg_size, + false, false, true); /* * Calculate startpoint. Decide: we should use 'start_lsn' or offset 0. */ - GetXLogSegNo(start_lsn, start_segno, seg_size); + GetXLogSegNo(start_lsn, start_segno, wal_seg_size); if (start_segno == segno) startpoint = start_lsn; else { XLogRecPtr found; - GetXLogRecPtr(segno, 0, seg_size, startpoint); + GetXLogRecPtr(segno, 0, wal_seg_size, startpoint); found = XLogFindNextRecord(xlogreader, startpoint); if (XLogRecPtrIsInvalid(found)) @@ -838,7 +571,7 @@ get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, else elog(WARNING, "Could not read WAL record at %X/%X", (uint32) (startpoint >> 32), (uint32) (startpoint)); - PrintXLogCorruptionMsg(&private, ERROR); + PrintXLogCorruptionMsg(&reader_data, ERROR); } startpoint = found; } @@ -867,13 +600,13 @@ get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, else elog(WARNING, "Could not read WAL record at %X/%X", (uint32) (errptr >> 32), (uint32) (errptr)); - PrintXLogCorruptionMsg(&private, ERROR); + PrintXLogCorruptionMsg(&reader_data, ERROR); } /* continue reading at next record */ startpoint = InvalidXLogRecPtr; - GetXLogSegNo(xlogreader->EndRecPtr, next_segno, seg_size); + GetXLogSegNo(xlogreader->EndRecPtr, next_segno, wal_seg_size); if (next_segno > segno) break; @@ -919,25 +652,24 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *pageTLI) { - XLogPageReadPrivate *private_data; + XLogReaderData *reader_data; uint32 targetPageOff; - private_data = (XLogPageReadPrivate *) xlogreader->private_data; - targetPageOff = targetPagePtr % private_data->xlog_seg_size; + reader_data = (XLogReaderData *) xlogreader->private_data; + targetPageOff = targetPagePtr % wal_seg_size; - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "Thread [%d]: Interrupted during WAL reading", - private_data->thread_num); + reader_data->thread_num); /* * See if we need to switch to a new segment because the requested record * is not in the currently open one. */ - if (!IsInXLogSeg(targetPagePtr, private_data->xlogsegno, - private_data->xlog_seg_size)) + if (!IsInXLogSeg(targetPagePtr, reader_data->xlogsegno, wal_seg_size)) { elog(VERBOSE, "Thread [%d]: Need to switch to the next WAL segment, page LSN %X/%X, record being read LSN %X/%X", - private_data->thread_num, + reader_data->thread_num, (uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr), (uint32) (xlogreader->currRecPtr >> 32), (uint32) (xlogreader->currRecPtr )); @@ -954,52 +686,49 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, /* * Switch to the next WAL segment after reading contrecord. */ - if (private_data->manual_switch) - private_data->need_switch = true; + if (wal_manual_switch) + reader_data->need_switch = true; } else { CleanupXLogPageRead(xlogreader); /* - * Do not switch to next WAL segment in this function. Currently it is - * manually switched only in doExtractPageMap(). - */ - if (private_data->manual_switch) + * Do not switch to next WAL segment in this function. It is + * manually switched by a thread routine. + */ + if (wal_manual_switch) { - private_data->need_switch = true; + reader_data->need_switch = true; return -1; } } } - GetXLogSegNo(targetPagePtr, private_data->xlogsegno, - private_data->xlog_seg_size); + GetXLogSegNo(targetPagePtr, reader_data->xlogsegno, wal_seg_size); /* Try to switch to the next WAL segment */ - if (!private_data->xlogexists) + if (!reader_data->xlogexists) { char xlogfname[MAXFNAMELEN]; - GetXLogFileName(xlogfname, private_data->tli, private_data->xlogsegno, - private_data->xlog_seg_size); - snprintf(private_data->xlogpath, MAXPGPATH, "%s/%s", - private_data->archivedir, xlogfname); + GetXLogFileName(xlogfname, reader_data->tli, reader_data->xlogsegno, + wal_seg_size); + snprintf(reader_data->xlogpath, MAXPGPATH, "%s/%s", wal_archivedir, + xlogfname); - if (fileExists(private_data->xlogpath)) + if (fileExists(reader_data->xlogpath)) { elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"", - private_data->thread_num, - private_data->xlogpath); + reader_data->thread_num, reader_data->xlogpath); - private_data->xlogexists = true; - private_data->xlogfile = open(private_data->xlogpath, - O_RDONLY | PG_BINARY, 0); + reader_data->xlogexists = true; + reader_data->xlogfile = open(reader_data->xlogpath, + O_RDONLY | PG_BINARY, 0); - if (private_data->xlogfile < 0) + if (reader_data->xlogfile < 0) { elog(WARNING, "Thread [%d]: Could not open WAL segment \"%s\": %s", - private_data->thread_num, - private_data->xlogpath, + reader_data->thread_num, reader_data->xlogpath, strerror(errno)); return -1; } @@ -1008,21 +737,21 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, /* Try to open compressed WAL segment */ else { - snprintf(private_data->gz_xlogpath, - sizeof(private_data->gz_xlogpath), "%s.gz", - private_data->xlogpath); - if (fileExists(private_data->gz_xlogpath)) + snprintf(reader_data->gz_xlogpath, sizeof(reader_data->gz_xlogpath), + "%s.gz", reader_data->xlogpath); + if (fileExists(reader_data->gz_xlogpath)) { elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"", - private_data->thread_num, private_data->gz_xlogpath); + reader_data->thread_num, reader_data->gz_xlogpath); - private_data->xlogexists = true; - private_data->gz_xlogfile = gzopen(private_data->gz_xlogpath, - "rb"); - if (private_data->gz_xlogfile == NULL) + reader_data->xlogexists = true; + reader_data->gz_xlogfile = gzopen(reader_data->gz_xlogpath, + "rb"); + if (reader_data->gz_xlogfile == NULL) { elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s", - private_data->thread_num, private_data->gz_xlogpath, strerror(errno)); + reader_data->thread_num, reader_data->gz_xlogpath, + strerror(errno)); return -1; } } @@ -1030,69 +759,67 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, #endif /* Exit without error if WAL segment doesn't exist */ - if (!private_data->xlogexists) + if (!reader_data->xlogexists) return -1; } /* * At this point, we have the right segment open. */ - Assert(private_data->xlogexists); + Assert(reader_data->xlogexists); /* * Do not read same page read earlier from the file, read it from the buffer */ - if (private_data->prev_page_off != 0 && - private_data->prev_page_off == targetPageOff) + if (reader_data->prev_page_off != 0 && + reader_data->prev_page_off == targetPageOff) { - memcpy(readBuf, private_data->page_buf, XLOG_BLCKSZ); - *pageTLI = private_data->tli; + memcpy(readBuf, reader_data->page_buf, XLOG_BLCKSZ); + *pageTLI = reader_data->tli; return XLOG_BLCKSZ; } /* Read the requested page */ - if (private_data->xlogfile != -1) + if (reader_data->xlogfile != -1) { - if (lseek(private_data->xlogfile, (off_t) targetPageOff, SEEK_SET) < 0) + if (lseek(reader_data->xlogfile, (off_t) targetPageOff, SEEK_SET) < 0) { elog(WARNING, "Thread [%d]: Could not seek in WAL segment \"%s\": %s", - private_data->thread_num, private_data->xlogpath, strerror(errno)); + reader_data->thread_num, reader_data->xlogpath, strerror(errno)); return -1; } - if (read(private_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + if (read(reader_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { elog(WARNING, "Thread [%d]: Could not read from WAL segment \"%s\": %s", - private_data->thread_num, private_data->xlogpath, strerror(errno)); + reader_data->thread_num, reader_data->xlogpath, strerror(errno)); return -1; } } #ifdef HAVE_LIBZ else { - if (gzseek(private_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1) + if (gzseek(reader_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1) { elog(WARNING, "Thread [%d]: Could not seek in compressed WAL segment \"%s\": %s", - private_data->thread_num, - private_data->gz_xlogpath, - get_gz_error(private_data->gz_xlogfile)); + reader_data->thread_num, reader_data->gz_xlogpath, + get_gz_error(reader_data->gz_xlogfile)); return -1; } - if (gzread(private_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) + if (gzread(reader_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) { elog(WARNING, "Thread [%d]: Could not read from compressed WAL segment \"%s\": %s", - private_data->thread_num, - private_data->gz_xlogpath, - get_gz_error(private_data->gz_xlogfile)); + reader_data->thread_num, reader_data->gz_xlogpath, + get_gz_error(reader_data->gz_xlogfile)); return -1; } } #endif - memcpy(private_data->page_buf, readBuf, XLOG_BLCKSZ); - private_data->prev_page_off = targetPageOff; - *pageTLI = private_data->tli; + memcpy(reader_data->page_buf, readBuf, XLOG_BLCKSZ); + reader_data->prev_page_off = targetPageOff; + *pageTLI = reader_data->tli; return XLOG_BLCKSZ; } @@ -1100,89 +827,521 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, * Initialize WAL segments reading. */ static XLogReaderState * -InitXLogPageRead(XLogPageReadPrivate *private_data, const char *archivedir, - TimeLineID tli, uint32 xlog_seg_size, bool allocate_reader) +InitXLogPageRead(XLogReaderData *reader_data, const char *archivedir, + TimeLineID tli, uint32 segment_size, bool manual_switch, + bool consistent_read, bool allocate_reader) { XLogReaderState *xlogreader = NULL; - MemSet(private_data, 0, sizeof(XLogPageReadPrivate)); - private_data->archivedir = archivedir; - private_data->tli = tli; - private_data->xlog_seg_size = xlog_seg_size; - private_data->xlogfile = -1; + wal_archivedir = archivedir; + wal_seg_size = segment_size; + wal_manual_switch = manual_switch; + wal_consistent_read = consistent_read; + + MemSet(reader_data, 0, sizeof(XLogReaderData)); + reader_data->tli = tli; + reader_data->xlogfile = -1; if (allocate_reader) { #if PG_VERSION_NUM >= 110000 - xlogreader = XLogReaderAllocate(xlog_seg_size, - &SimpleXLogPageRead, private_data); + xlogreader = XLogReaderAllocate(wal_seg_size, &SimpleXLogPageRead, + reader_data); #else - xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data); + xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, reader_data); #endif if (xlogreader == NULL) - elog(ERROR, "out of memory"); + elog(ERROR, "Out of memory"); xlogreader->system_identifier = instance_config.system_identifier; } return xlogreader; } +/* + * Run WAL processing routines using threads. Start from startpoint up to + * endpoint. It is possible to send zero endpoint, threads will read WAL + * infinitely in this case. + */ +static bool +RunXLogThreads(const char *archivedir, time_t target_time, + TransactionId target_xid, XLogRecPtr target_lsn, TimeLineID tli, + uint32 segment_size, XLogRecPtr startpoint, XLogRecPtr endpoint, + bool consistent_read, xlog_record_function process_record, + XLogRecTarget *last_rec) +{ + pthread_t *threads; + xlog_thread_arg *thread_args; + int i; + int threads_need = 0; + XLogSegNo endSegNo = 0; + XLogSegNo errorSegNo = 0; + bool result = true; + + if (!XRecOffIsValid(startpoint)) + elog(ERROR, "Invalid startpoint value %X/%X", + (uint32) (startpoint >> 32), (uint32) (startpoint)); + + if (!XLogRecPtrIsInvalid(endpoint)) + { + if (!XRecOffIsValid(endpoint)) + elog(ERROR, "Invalid endpoint value %X/%X", + (uint32) (endpoint >> 32), (uint32) (endpoint)); + + GetXLogSegNo(endpoint, endSegNo, segment_size); + } + + /* Initialize static variables for workers */ + wal_target_time = target_time; + wal_target_xid = target_xid; + wal_target_lsn = target_lsn; + + GetXLogSegNo(startpoint, segno_start, segment_size); + segno_target = 0; + GetXLogSegNo(startpoint, segno_next, segment_size); + segnum_read = 0; + + threads = (pthread_t *) pgut_malloc(sizeof(pthread_t) * num_threads); + thread_args = (xlog_thread_arg *) pgut_malloc(sizeof(xlog_thread_arg) * num_threads); + + /* + * Initialize thread args. + * + * Each thread works with its own WAL segment and we need to adjust + * startpoint value for each thread. + */ + for (i = 0; i < num_threads; i++) + { + xlog_thread_arg *arg = &thread_args[i]; + + InitXLogPageRead(&arg->reader_data, archivedir, tli, segment_size, true, + consistent_read, false); + arg->reader_data.xlogsegno = segno_next; + arg->reader_data.thread_num = i + 1; + arg->process_record = process_record; + arg->startpoint = startpoint; + arg->endpoint = endpoint; + arg->endSegNo = endSegNo; + arg->got_target = false; + /* By default there is some error */ + arg->ret = 1; + + threads_need++; + segno_next++; + /* + * If we need to read less WAL segments than num_threads, create less + * threads. + */ + if (endSegNo != 0 && segno_next > endSegNo) + break; + GetXLogRecPtr(segno_next, 0, segment_size, startpoint); + } + + /* Run threads */ + thread_interrupted = false; + for (i = 0; i < threads_need; i++) + { + elog(VERBOSE, "Start WAL reader thread: %d", i + 1); + pthread_create(&threads[i], NULL, XLogThreadWorker, &thread_args[i]); + } + + /* Wait for threads */ + for (i = 0; i < threads_need; i++) + { + pthread_join(threads[i], NULL); + if (thread_args[i].ret == 1) + result = false; + } + + if (last_rec) + for (i = 0; i < threads_need; i++) + { + XLogRecTarget *cur_rec; + + if (thread_args[i].ret != 0) + { + /* + * Save invalid segment number after which all segments are not + * valid. + */ + if (errorSegNo == 0 || + errorSegNo > thread_args[i].reader_data.xlogsegno) + errorSegNo = thread_args[i].reader_data.xlogsegno; + continue; + } + + /* Is this segment valid */ + if (errorSegNo != 0 && + thread_args[i].reader_data.xlogsegno > errorSegNo) + continue; + + cur_rec = &thread_args[i].reader_data.cur_rec; + /* + * If we got the target return minimum possible record. + */ + if (segno_target > 0) + { + if (thread_args[i].got_target && + thread_args[i].reader_data.xlogsegno == segno_target) + { + *last_rec = *cur_rec; + break; + } + } + /* + * Else return maximum possible record up to which restore is + * possible. + */ + else if (last_rec->rec_lsn < cur_rec->rec_lsn) + *last_rec = *cur_rec; + } + + pfree(threads); + pfree(thread_args); + + return result; +} + +/* + * WAL reader worker. + */ +void * +XLogThreadWorker(void *arg) +{ + xlog_thread_arg *thread_arg = (xlog_thread_arg *) arg; + XLogReaderData *reader_data = &thread_arg->reader_data; + XLogReaderState *xlogreader; + XLogSegNo nextSegNo = 0; + XLogRecPtr found; + uint32 prev_page_off = 0; + bool need_read = true; + +#if PG_VERSION_NUM >= 110000 + xlogreader = XLogReaderAllocate(wal_seg_size, &SimpleXLogPageRead, + reader_data); +#else + xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, reader_data); +#endif + if (xlogreader == NULL) + elog(ERROR, "Thread [%d]: out of memory", reader_data->thread_num); + xlogreader->system_identifier = instance_config.system_identifier; + + found = XLogFindNextRecord(xlogreader, thread_arg->startpoint); + + /* + * We get invalid WAL record pointer usually when WAL segment is absent or + * is corrupted. + */ + if (XLogRecPtrIsInvalid(found)) + { + if (wal_consistent_read && XLogWaitForConsistency(xlogreader)) + need_read = false; + else + { + if (xlogreader->errormsg_buf[0] != '\0') + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s", + reader_data->thread_num, + (uint32) (thread_arg->startpoint >> 32), + (uint32) (thread_arg->startpoint), + xlogreader->errormsg_buf); + else + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + reader_data->thread_num, + (uint32) (thread_arg->startpoint >> 32), + (uint32) (thread_arg->startpoint)); + PrintXLogCorruptionMsg(reader_data, ERROR); + } + } + + thread_arg->startpoint = found; + + elog(VERBOSE, "Thread [%d]: Starting LSN: %X/%X", + reader_data->thread_num, + (uint32) (thread_arg->startpoint >> 32), + (uint32) (thread_arg->startpoint)); + + while (need_read) + { + XLogRecord *record; + char *errormsg; + bool stop_reading = false; + + if (interrupted || thread_interrupted) + elog(ERROR, "Thread [%d]: Interrupted during WAL reading", + reader_data->thread_num); + + /* + * We need to switch to the next WAL segment after reading previous + * record. It may happen if we read contrecord. + */ + if (reader_data->need_switch && + !SwitchThreadToNextWal(xlogreader, thread_arg)) + break; + + record = XLogReadRecord(xlogreader, thread_arg->startpoint, &errormsg); + + if (record == NULL) + { + XLogRecPtr errptr; + + /* + * There is no record, try to switch to the next WAL segment. + * Usually SimpleXLogPageRead() does it by itself. But here we need + * to do it manually to support threads. + */ + if (reader_data->need_switch && errormsg == NULL) + { + if (SwitchThreadToNextWal(xlogreader, thread_arg)) + continue; + else + break; + } + + /* + * XLogWaitForConsistency() is normally used only with threads. + * Call it here for just in case. + */ + if (wal_consistent_read && XLogWaitForConsistency(xlogreader)) + break; + + errptr = thread_arg->startpoint ? + thread_arg->startpoint : xlogreader->EndRecPtr; + + if (errormsg) + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s", + reader_data->thread_num, + (uint32) (errptr >> 32), (uint32) (errptr), + errormsg); + else + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + reader_data->thread_num, + (uint32) (errptr >> 32), (uint32) (errptr)); + + /* + * If we don't have all WAL files from prev backup start_lsn to current + * start_lsn, we won't be able to build page map and PAGE backup will + * be incorrect. Stop it and throw an error. + */ + PrintXLogCorruptionMsg(reader_data, ERROR); + } + + getRecordTimestamp(xlogreader, &reader_data->cur_rec.rec_time); + if (TransactionIdIsValid(XLogRecGetXid(xlogreader))) + reader_data->cur_rec.rec_xid = XLogRecGetXid(xlogreader); + reader_data->cur_rec.rec_lsn = xlogreader->ReadRecPtr; + + if (thread_arg->process_record) + thread_arg->process_record(xlogreader, reader_data, &stop_reading); + if (stop_reading) + { + thread_arg->got_target = true; + + pthread_lock(&wal_segment_mutex); + /* We should store least target segment number */ + if (segno_target == 0 || segno_target > reader_data->xlogsegno) + segno_target = reader_data->xlogsegno; + pthread_mutex_unlock(&wal_segment_mutex); + + break; + } + + /* + * Check if other thread got the target segment. Check it not very + * often, only every WAL page. + */ + if (wal_consistent_read && prev_page_off != 0 && + prev_page_off != reader_data->prev_page_off) + { + XLogSegNo segno; + + pthread_lock(&wal_segment_mutex); + segno = segno_target; + pthread_mutex_unlock(&wal_segment_mutex); + + if (segno != 0 && segno < reader_data->xlogsegno) + break; + } + prev_page_off = reader_data->prev_page_off; + + /* continue reading at next record */ + thread_arg->startpoint = InvalidXLogRecPtr; + + GetXLogSegNo(xlogreader->EndRecPtr, nextSegNo, wal_seg_size); + + if (thread_arg->endSegNo != 0 && + !XLogRecPtrIsInvalid(thread_arg->endpoint) && + /* + * Consider thread_arg->endSegNo and thread_arg->endpoint only if + * they are valid. + */ + xlogreader->ReadRecPtr == thread_arg->endpoint && + nextSegNo > thread_arg->endSegNo) + break; + } + + CleanupXLogPageRead(xlogreader); + XLogReaderFree(xlogreader); + + /* Extracting is successful */ + thread_arg->ret = 0; + return NULL; +} + +/* + * Do manual switch to the next WAL segment. + * + * Returns false if the reader reaches the end of a WAL segment list. + */ +static bool +SwitchThreadToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg) +{ + XLogReaderData *reader_data; + XLogRecPtr found; + + reader_data = (XLogReaderData *) xlogreader->private_data; + reader_data->need_switch = false; + + /* Critical section */ + pthread_lock(&wal_segment_mutex); + Assert(segno_next); + reader_data->xlogsegno = segno_next; + segnum_read++; + segno_next++; + pthread_mutex_unlock(&wal_segment_mutex); + + /* We've reached the end */ + if (arg->endSegNo != 0 && reader_data->xlogsegno > arg->endSegNo) + return false; + + /* Adjust next record position */ + GetXLogRecPtr(reader_data->xlogsegno, 0, wal_seg_size, arg->startpoint); + /* We need to close previously opened file if it wasn't closed earlier */ + CleanupXLogPageRead(xlogreader); + /* Skip over the page header and contrecord if any */ + found = XLogFindNextRecord(xlogreader, arg->startpoint); + + /* + * We get invalid WAL record pointer usually when WAL segment is + * absent or is corrupted. + */ + if (XLogRecPtrIsInvalid(found)) + { + /* + * Check if we need to stop reading. We stop if other thread found a + * target segment. + */ + if (wal_consistent_read && XLogWaitForConsistency(xlogreader)) + return false; + + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + reader_data->thread_num, + (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); + PrintXLogCorruptionMsg(reader_data, ERROR); + } + arg->startpoint = found; + + elog(VERBOSE, "Thread [%d]: switched to LSN %X/%X", + reader_data->thread_num, + (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); + + return true; +} + +/* + * Wait for other threads since the current thread couldn't read its segment. + * We need to decide is it fail or not. + * + * Returns true if there is no failure and previous target segment was found. + * Otherwise return false. + */ +static bool +XLogWaitForConsistency(XLogReaderState *xlogreader) +{ + uint32 segnum_need = 0; + XLogReaderData *reader_data =(XLogReaderData *) xlogreader->private_data; + + segnum_need = reader_data->xlogsegno - segno_start; + while (true) + { + uint32 segnum_current_read; + XLogSegNo segno; + + if (interrupted || thread_interrupted) + elog(ERROR, "Thread [%d]: Interrupted during WAL reading", + reader_data->thread_num); + + pthread_lock(&wal_segment_mutex); + segnum_current_read = segnum_read; + segno = segno_target; + pthread_mutex_unlock(&wal_segment_mutex); + + /* Other threads read all previous segments and didn't find target */ + if (segnum_need <= segnum_current_read) + return false; + + if (segno < reader_data->xlogsegno) + return true; + + pg_usleep(1000000L); /* 1000 ms */ + } + + /* We shouldn't reach it */ + return false; +} + /* * Cleanup after WAL segment reading. */ static void CleanupXLogPageRead(XLogReaderState *xlogreader) { - XLogPageReadPrivate *private_data; + XLogReaderData *reader_data; - private_data = (XLogPageReadPrivate *) xlogreader->private_data; - if (private_data->xlogfile >= 0) + reader_data = (XLogReaderData *) xlogreader->private_data; + if (reader_data->xlogfile >= 0) { - close(private_data->xlogfile); - private_data->xlogfile = -1; + close(reader_data->xlogfile); + reader_data->xlogfile = -1; } #ifdef HAVE_LIBZ - else if (private_data->gz_xlogfile != NULL) + else if (reader_data->gz_xlogfile != NULL) { - gzclose(private_data->gz_xlogfile); - private_data->gz_xlogfile = NULL; + gzclose(reader_data->gz_xlogfile); + reader_data->gz_xlogfile = NULL; } #endif - private_data->prev_page_off = 0; - private_data->xlogexists = false; + reader_data->prev_page_off = 0; + reader_data->xlogexists = false; } static void -PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel) +PrintXLogCorruptionMsg(XLogReaderData *reader_data, int elevel) { - if (private_data->xlogpath[0] != 0) + if (reader_data->xlogpath[0] != 0) { /* * XLOG reader couldn't read WAL segment. * We throw a WARNING here to be able to update backup status. */ - if (!private_data->xlogexists) + if (!reader_data->xlogexists) elog(elevel, "Thread [%d]: WAL segment \"%s\" is absent", - private_data->thread_num, - private_data->xlogpath); - else if (private_data->xlogfile != -1) + reader_data->thread_num, reader_data->xlogpath); + else if (reader_data->xlogfile != -1) elog(elevel, "Thread [%d]: Possible WAL corruption. " "Error has occured during reading WAL segment \"%s\"", - private_data->thread_num, - private_data->xlogpath); + reader_data->thread_num, reader_data->xlogpath); #ifdef HAVE_LIBZ - else if (private_data->gz_xlogfile != NULL) + else if (reader_data->gz_xlogfile != NULL) elog(elevel, "Thread [%d]: Possible WAL corruption. " "Error has occured during reading WAL segment \"%s\"", - private_data->thread_num, - private_data->gz_xlogpath); + reader_data->thread_num, reader_data->gz_xlogpath); #endif } else { /* Cannot tell what happened specifically */ elog(elevel, "Thread [%d]: An error occured during WAL reading", - private_data->thread_num); + reader_data->thread_num); } } @@ -1190,7 +1349,8 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel) * Extract information about blocks modified in this record. */ static void -extractPageInfo(XLogReaderState *record) +extractPageInfo(XLogReaderState *record, XLogReaderData *reader_data, + bool *stop_reading) { uint8 block_id; RmgrId rmid = XLogRecGetRmid(record); @@ -1258,6 +1418,27 @@ extractPageInfo(XLogReaderState *record) } } +/* + * Check the current read WAL record during validation. + */ +static void +validateXLogRecord(XLogReaderState *record, XLogReaderData *reader_data, + bool *stop_reading) +{ + /* Check target xid */ + if (TransactionIdIsValid(wal_target_xid) && + wal_target_xid == reader_data->cur_rec.rec_xid) + *stop_reading = true; + /* Check target time */ + else if (wal_target_time != 0 && + timestamptz_to_time_t(reader_data->cur_rec.rec_time) >= wal_target_time) + *stop_reading = true; + /* Check target lsn */ + else if (XRecOffIsValid(wal_target_lsn) && + reader_data->cur_rec.rec_lsn >= wal_target_lsn) + *stop_reading = true; +} + /* * Extract timestamp from WAL record. * diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 1bb08e32..2cceaaed 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -573,14 +573,11 @@ extern bool check_file_pages(pgFile *file, XLogRecPtr stop_lsn, /* parsexlog.c */ extern void extractPageMap(const char *archivedir, TimeLineID tli, uint32 seg_size, - XLogRecPtr startpoint, XLogRecPtr endpoint, - parray *files); -extern void validate_wal(pgBackup *backup, - const char *archivedir, - time_t target_time, - TransactionId target_xid, - XLogRecPtr target_lsn, - TimeLineID tli, uint32 seg_size); + XLogRecPtr startpoint, XLogRecPtr endpoint); +extern void validate_wal(pgBackup *backup, const char *archivedir, + time_t target_time, TransactionId target_xid, + XLogRecPtr target_lsn, TimeLineID tli, + uint32 seg_size); extern bool read_recovery_info(const char *archivedir, TimeLineID tli, uint32 seg_size, XLogRecPtr start_lsn, XLogRecPtr stop_lsn, diff --git a/src/restore.c b/src/restore.c index a486fb50..ae379e9f 100644 --- a/src/restore.c +++ b/src/restore.c @@ -569,6 +569,7 @@ restore_backup(pgBackup *backup, const char *external_dir_str) threads_args = (restore_files_arg *) palloc(sizeof(restore_files_arg)*num_threads); /* Restore files into target directory */ + thread_interrupted = false; for (i = 0; i < num_threads; i++) { restore_files_arg *arg = &(threads_args[i]); @@ -680,7 +681,7 @@ restore_files(void *arg) lengthof(from_root), DATABASE_DIR); /* check for interrupt */ - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "interrupted during restore database"); rel_path = GetRelativePath(file->path,from_root); diff --git a/src/utils/logger.c b/src/utils/logger.c index ba054a62..72840aa8 100644 --- a/src/utils/logger.c +++ b/src/utils/logger.c @@ -121,9 +121,6 @@ exit_if_necessary(int elevel) { if (elevel > WARNING && !in_cleanup) { - /* Interrupt other possible routines */ - interrupted = true; - if (loggin_in_progress) { loggin_in_progress = false; @@ -132,11 +129,15 @@ exit_if_necessary(int elevel) /* If this is not the main thread then don't call exit() */ if (main_tid != pthread_self()) + { #ifdef WIN32 ExitThread(elevel); #else pthread_exit(NULL); #endif + /* Interrupt other possible routines */ + thread_interrupted = true; + } else exit(elevel); } diff --git a/src/utils/pgut.c b/src/utils/pgut.c index a7e12e91..cdd4b26d 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -700,7 +700,7 @@ on_interrupt(void) int save_errno = errno; char errbuf[256]; - /* Set interruped flag */ + /* Set interrupted flag */ interrupted = true; /* diff --git a/src/utils/thread.c b/src/utils/thread.c index 0999a0d5..f1624be9 100644 --- a/src/utils/thread.c +++ b/src/utils/thread.c @@ -7,8 +7,12 @@ *------------------------------------------------------------------------- */ +#include "postgres_fe.h" + #include "thread.h" +bool thread_interrupted = false; + #ifdef WIN32 DWORD main_tid = 0; #else diff --git a/src/utils/thread.h b/src/utils/thread.h index 6b8349bf..a2948156 100644 --- a/src/utils/thread.h +++ b/src/utils/thread.h @@ -34,7 +34,7 @@ extern DWORD main_tid; extern pthread_t main_tid; #endif - +extern bool thread_interrupted; extern int pthread_lock(pthread_mutex_t *mp); diff --git a/src/validate.c b/src/validate.c index c19e319d..7d5e94f4 100644 --- a/src/validate.c +++ b/src/validate.c @@ -117,6 +117,7 @@ pgBackupValidate(pgBackup *backup) palloc(sizeof(validate_files_arg) * num_threads); /* Validate files */ + thread_interrupted = false; for (i = 0; i < num_threads; i++) { validate_files_arg *arg = &(threads_args[i]); @@ -186,7 +187,7 @@ pgBackupValidateFiles(void *arg) if (!pg_atomic_test_set_flag(&file->lock)) continue; - if (interrupted) + if (interrupted || thread_interrupted) elog(ERROR, "Interrupted during validate"); /* Validate only regular files */ diff --git a/tests/expected/option_help.out b/tests/expected/option_help.out index abb5ce71..a83c3905 100644 --- a/tests/expected/option_help.out +++ b/tests/expected/option_help.out @@ -63,7 +63,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database. [--skip-block-validation] pg_probackup validate -B backup-path [--instance=instance_name] - [-i backup-id] [--progress] + [-i backup-id] [--progress] [-j num-threads] [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]] [--recovery-target-name=target-name] [--timeline=timeline]