From 0a89f702e30f333a7923dd4689a2be0272ff85e8 Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Tue, 4 Sep 2018 14:08:50 +0300 Subject: [PATCH] Issue #32: Wait for LSN in archived dir for PAGE backup --- src/backup.c | 57 +++++++++++++++++++++++++++++-------------------- src/parsexlog.c | 35 +++++------------------------- 2 files changed, 39 insertions(+), 53 deletions(-) diff --git a/src/backup.c b/src/backup.c index 107ceea1..3e0c211b 100644 --- a/src/backup.c +++ b/src/backup.c @@ -104,7 +104,8 @@ static int checkpoint_timeout(void); //static void backup_list_file(parray *files, const char *root, ) static void parse_backup_filelist_filenames(parray *files, const char *root); -static void wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment); +static void wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, + bool wait_prev_segment); static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void make_pagemap_from_ptrack(parray *files); static void *StreamLog(void *arg); @@ -1112,20 +1113,17 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup) */ pg_switch_wal(conn); - if (!stream_wal) - { - /* - * Do not wait start_lsn for stream backup. - * Because WAL streaming will start after pg_start_backup() in stream - * mode. - */ + if (current.backup_mode == BACKUP_MODE_DIFF_PAGE) /* In PAGE mode wait for current segment... */ - if (current.backup_mode == BACKUP_MODE_DIFF_PAGE) - wait_wal_lsn(backup->start_lsn, false); + wait_wal_lsn(backup->start_lsn, true, false); + /* + * Do not wait start_lsn for stream backup. + * Because WAL streaming will start after pg_start_backup() in stream + * mode. + */ + else if (!stream_wal) /* ...for others wait for previous segment */ - else - wait_wal_lsn(backup->start_lsn, true); - } + wait_wal_lsn(backup->start_lsn, true, true); /* Wait for start_lsn to be replayed by replica */ if (backup->from_replica) @@ -1443,16 +1441,20 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_filenode, * If current backup started in stream mode wait for 'lsn' to be streamed in * 'pg_wal' directory. * + * If 'is_start_lsn' is true and backup mode is PAGE then we wait for 'lsn' to + * be archived in archive 'wal' directory regardless stream mode. + * * If 'wait_prev_segment' wait for previous segment. */ static void -wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment) +wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) { TimeLineID tli; XLogSegNo targetSegNo; - char wal_dir[MAXPGPATH], - wal_segment_path[MAXPGPATH]; - char wal_segment[MAXFNAMELEN]; + char pg_wal_dir[MAXPGPATH]; + char wal_segment_path[MAXPGPATH], + *wal_segment_dir, + wal_segment[MAXFNAMELEN]; bool file_exists = false; uint32 try_count = 0, timeout; @@ -1469,11 +1471,20 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment) targetSegNo--; XLogFileName(wal_segment, tli, targetSegNo); - if (stream_wal) + /* + * In pg_start_backup we wait for 'lsn' in 'pg_wal' directory iff it is + * stream and non-page backup. Page backup needs archived WAL files, so we + * wait for 'lsn' in archive 'wal' directory for page backups. + * + * In pg_stop_backup it depends only on stream_wal. + */ + if (stream_wal && + (current.backup_mode != BACKUP_MODE_DIFF_PAGE || !is_start_lsn)) { - pgBackupGetPath2(¤t, wal_dir, lengthof(wal_dir), + pgBackupGetPath2(¤t, pg_wal_dir, lengthof(pg_wal_dir), DATABASE_DIR, PG_XLOG_DIR); - join_path_components(wal_segment_path, wal_dir, wal_segment); + join_path_components(wal_segment_path, pg_wal_dir, wal_segment); + wal_segment_dir = pg_wal_dir; timeout = (uint32) checkpoint_timeout(); timeout = timeout + timeout * 0.1; @@ -1481,6 +1492,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment) else { join_path_components(wal_segment_path, arclog_path, wal_segment); + wal_segment_dir = arclog_path; timeout = archive_timeout; } @@ -1523,8 +1535,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment) /* * A WAL segment found. Check LSN on it. */ - if ((stream_wal && wal_contains_lsn(wal_dir, lsn, tli)) || - (!stream_wal && wal_contains_lsn(arclog_path, lsn, tli))) + if (wal_contains_lsn(wal_segment_dir, lsn, tli)) /* Target LSN was found */ { elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); @@ -1912,7 +1923,7 @@ pg_stop_backup(pgBackup *backup) * Wait for stop_lsn to be archived or streamed. * We wait for stop_lsn in stream mode just in case. */ - wait_wal_lsn(stop_backup_lsn, false); + wait_wal_lsn(stop_backup_lsn, false, false); if (stream_wal) { diff --git a/src/parsexlog.c b/src/parsexlog.c index 79247f25..dc34dfe3 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -170,15 +170,16 @@ switchToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg) */ if (XLogRecPtrIsInvalid(found)) { - elog(WARNING, "could not read WAL record at %X/%X", - (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + private_data->thread_num, + (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); PrintXLogCorruptionMsg(private_data, ERROR); } arg->startpoint = found; elog(VERBOSE, "Thread [%d]: switched to LSN %X/%X", - private_data->thread_num, - (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); + private_data->thread_num, + (uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint)); return true; } @@ -261,32 +262,6 @@ doExtractPageMap(void *arg) continue; else break; - - /* Adjust next record position */ - XLogSegNoOffsetToRecPtr(private_data->xlogsegno, 0, - extract_arg->startpoint); - /* Skip over the page header */ - found = XLogFindNextRecord(xlogreader, extract_arg->startpoint); - /* - * We get invalid WAL record pointer usually when WAL segment is - * absent or is corrupted. - */ - if (XLogRecPtrIsInvalid(found)) - { - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", - private_data->thread_num, - (uint32) (extract_arg->startpoint >> 32), - (uint32) (extract_arg->startpoint)); - PrintXLogCorruptionMsg(private_data, ERROR); - } - extract_arg->startpoint = found; - - elog(VERBOSE, "Thread [%d]: switched to LSN %X/%X", - private_data->thread_num, - (uint32) (extract_arg->startpoint >> 32), - (uint32) (extract_arg->startpoint)); - - continue; } errptr = extract_arg->startpoint ?