1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-04-08 16:54:08 +02:00

PGPRO-2180: In pg_stop_backup for replica wait for LSN of prior record

This commit is contained in:
Arthur Zakirov 2018-11-21 18:30:03 +03:00
parent 6ea7c61c33
commit b4672e3ac8
4 changed files with 72 additions and 37 deletions

View File

@ -109,7 +109,7 @@ static int checkpoint_timeout(void);
//static void backup_list_file(parray *files, const char *root, )
static void parse_backup_filelist_filenames(parray *files, const char *root);
static void wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn,
bool wait_prev_segment);
bool wait_prev_lsn, bool wait_prev_segment);
static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup);
static void make_pagemap_from_ptrack(parray *files);
static void *StreamLog(void *arg);
@ -1166,7 +1166,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
if (current.backup_mode == BACKUP_MODE_DIFF_PAGE)
/* In PAGE mode wait for current segment... */
wait_wal_lsn(backup->start_lsn, true, false);
wait_wal_lsn(backup->start_lsn, true, false, false);
/*
* Do not wait start_lsn for stream backup.
* Because WAL streaming will start after pg_start_backup() in stream
@ -1174,7 +1174,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
*/
else if (!stream_wal)
/* ...for others wait for previous segment */
wait_wal_lsn(backup->start_lsn, true, true);
wait_wal_lsn(backup->start_lsn, true, false, true);
/* In case of backup from replica for PostgreSQL 9.5
* wait for start_lsn to be replayed by replica
@ -1504,7 +1504,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_filenode,
* If 'wait_prev_segment' wait for previous segment.
*/
static void
wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn,
bool wait_prev_segment)
{
TimeLineID tli;
XLogSegNo targetSegNo;
@ -1515,6 +1516,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
bool file_exists = false;
uint32 try_count = 0,
timeout;
char *prior_to = (wait_prev_lsn) ? " prior to " : "";
#ifdef HAVE_LIBZ
char gz_wal_segment_path[MAXPGPATH];
@ -1555,14 +1557,13 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
timeout = archive_timeout;
else
timeout = ARCHIVE_TIMEOUT_DEFAULT;
}
if (wait_prev_segment)
elog(LOG, "Looking for segment: %s", wal_segment);
else
elog(LOG, "Looking for LSN: %X/%X in segment: %s",
(uint32) (lsn >> 32), (uint32) lsn, wal_segment);
elog(LOG, "Looking for LSN %s%X/%X in segment: %s",
prior_to, (uint32) (lsn >> 32), (uint32) lsn, wal_segment);
#ifdef HAVE_LIBZ
snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz",
@ -1598,11 +1599,27 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
/*
* A WAL segment found. Check LSN on it.
*/
if (wal_contains_lsn(wal_segment_dir, lsn, tli, xlog_seg_size))
/* Target LSN was found */
if (!wait_prev_lsn)
{
elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn);
return;
if (wal_contains_lsn(wal_segment_dir, lsn, tli, xlog_seg_size))
/* Target LSN was found */
{
elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn);
return;
}
}
else
{
XLogRecPtr res;
res = get_last_wal_lsn(wal_segment_dir, current.start_lsn,
lsn, tli, false, xlog_seg_size);
if (!XLogRecPtrIsInvalid(res))
{
/* LSN of the prior record was found */
elog(LOG, "Found LSN: %X/%X", (uint32) (res >> 32), (uint32) res);
return;
}
}
}
@ -1618,16 +1635,18 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
elog(INFO, "Wait for WAL segment %s to be archived",
wal_segment_path);
else
elog(INFO, "Wait for LSN %X/%X in archived WAL segment %s",
(uint32) (lsn >> 32), (uint32) lsn, wal_segment_path);
elog(INFO, "Wait for LSN %s%X/%X in archived WAL segment %s",
prior_to, (uint32) (lsn >> 32), (uint32) lsn,
wal_segment_path);
}
if (timeout > 0 && try_count > timeout)
{
if (file_exists)
elog(ERROR, "WAL segment %s was archived, "
"but target LSN %X/%X could not be archived in %d seconds",
wal_segment, (uint32) (lsn >> 32), (uint32) lsn, timeout);
"but target LSN %s%X/%X could not be archived in %d seconds",
wal_segment, prior_to, (uint32) (lsn >> 32), (uint32) lsn,
timeout);
/* If WAL segment doesn't exist or we wait for previous segment */
else
elog(ERROR,
@ -1724,6 +1743,7 @@ pg_stop_backup(pgBackup *backup)
size_t len;
char *val = NULL;
char *stop_backup_query = NULL;
bool stop_lsn_exists = false;
/*
* We will use this values if there are no transactions between start_lsn
@ -1910,7 +1930,6 @@ pg_stop_backup(pgBackup *backup)
{
char *xlog_path,
stream_xlog_path[MAXPGPATH];
XLogSegNo segno;
if (stream_wal)
{
@ -1922,12 +1941,14 @@ pg_stop_backup(pgBackup *backup)
else
xlog_path = arclog_path;
GetXLogSegNo(stop_backup_lsn, segno, xlog_seg_size);
/* Retreive stop_lsn from previous segment */
segno = segno - 1;
stop_backup_lsn = get_last_wal_lsn(xlog_path, backup->start_lsn,
segno, backup->tli,
xlog_seg_size);
stop_backup_lsn, backup->tli,
true, xlog_seg_size);
/*
* Do not check existance of LSN again below using
* wait_wal_lsn().
*/
stop_lsn_exists = true;
}
else
elog(ERROR, "Invalid stop_backup_lsn value %X/%X",
@ -2040,7 +2061,9 @@ pg_stop_backup(pgBackup *backup)
* Wait for stop_lsn to be archived or streamed.
* We wait for stop_lsn in stream mode just in case.
*/
wait_wal_lsn(stop_backup_lsn, false, false);
if (!stop_lsn_exists)
wait_wal_lsn(stop_backup_lsn, false,
!exclusive_backup && current.from_replica, false);
if (stream_wal)
{

View File

@ -772,25 +772,33 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
}
/*
* Get last valid LSN within the WAL segment with number 'segno'. If 'start_lsn'
* Get LSN of last or prior record within the WAL segment with number 'segno'.
* If 'start_lsn'
* is in the segment with number 'segno' then start from 'start_lsn', otherwise
* start from offset 0 within the segment.
*
* Returns LSN which points to end+1 of the last WAL record if seek_prev_segment
* is true. Otherwise returns LSN of the record prior to stop_lsn.
*/
XLogRecPtr
get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
XLogSegNo segno, TimeLineID tli, uint32 seg_size)
XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment,
uint32 seg_size)
{
XLogReaderState *xlogreader;
XLogPageReadPrivate private;
XLogRecPtr startpoint;
XLogSegNo start_segno;
XLogSegNo segno;
XLogRecPtr res = InvalidXLogRecPtr;
if (segno == 0)
GetXLogSegNo(stop_lsn, segno, seg_size);
if (segno <= 1)
elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
elog(LOG, "Retreiving last LSN of the segment with number " UINT64_FORMAT,
segno);
if (seek_prev_segment)
segno = segno - 1;
xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true);
@ -821,9 +829,6 @@ get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
startpoint = found;
}
elog(VERBOSE, "Starting LSN is %X/%X",
(uint32) (startpoint >> 32), (uint32) (startpoint));
while (true)
{
XLogRecord *record;
@ -851,21 +856,28 @@ get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
PrintXLogCorruptionMsg(&private, ERROR);
}
res = xlogreader->ReadRecPtr;
/* continue reading at next record */
startpoint = InvalidXLogRecPtr;
GetXLogSegNo(xlogreader->EndRecPtr, next_segno, seg_size);
if (next_segno > segno)
break;
if (seek_prev_segment)
{
/* end+1 of last record read */
res = xlogreader->EndRecPtr;
}
else
res = xlogreader->ReadRecPtr;
if (xlogreader->EndRecPtr >= stop_lsn)
break;
}
CleanupXLogPageRead(xlogreader);
XLogReaderFree(xlogreader);
elog(VERBOSE, "Last LSN is %X/%X", (uint32) (res >> 32), (uint32) (res));
return res;
}

View File

@ -577,8 +577,8 @@ extern bool read_recovery_info(const char *archivedir, TimeLineID tli,
extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
TimeLineID target_tli, uint32 seg_size);
extern XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
XLogSegNo segno, TimeLineID tli,
uint32 seg_size);
XLogRecPtr stop_lsn, TimeLineID tli,
bool seek_prev_segment, uint32 seg_size);
/* in util.c */
extern TimeLineID get_current_timeline(bool safe);

View File

@ -551,4 +551,4 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
exit(1)
# Clean after yourself
self.del_test_dir(module_name, fname)
self.del_test_dir(module_name, fname)