1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-19 21:38:02 +02:00

PGPRO-2180: Retreive stop_lsn from previous WAL segment

This commit is contained in:
Arthur Zakirov 2018-11-20 16:33:36 +03:00
parent d84d79668b
commit 6ea7c61c33
3 changed files with 142 additions and 9 deletions

View File

@ -1907,7 +1907,28 @@ pg_stop_backup(pgBackup *backup)
if (!XRecOffIsValid(stop_backup_lsn))
{
if (XRecOffIsNull(stop_backup_lsn))
stop_backup_lsn = stop_backup_lsn + SizeOfXLogLongPHD;
{
char *xlog_path,
stream_xlog_path[MAXPGPATH];
XLogSegNo segno;
if (stream_wal)
{
pgBackupGetPath2(backup, stream_xlog_path,
lengthof(stream_xlog_path),
DATABASE_DIR, PG_XLOG_DIR);
xlog_path = stream_xlog_path;
}
else
xlog_path = arclog_path;
GetXLogSegNo(stop_backup_lsn, segno, xlog_seg_size);
/* Retreive stop_lsn from previous segment */
segno = segno - 1;
stop_backup_lsn = get_last_wal_lsn(xlog_path, backup->start_lsn,
segno, backup->tli,
xlog_seg_size);
}
else
elog(ERROR, "Invalid stop_backup_lsn value %X/%X",
(uint32) (stop_backup_lsn >> 32), (uint32) (stop_backup_lsn));

View File

@ -88,7 +88,7 @@ static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime
typedef struct XLogPageReadPrivate
{
int thread_num;
int thread_num;
const char *archivedir;
TimeLineID tli;
uint32 xlog_seg_size;
@ -132,8 +132,7 @@ static XLogReaderState *InitXLogPageRead(XLogPageReadPrivate *private_data,
TimeLineID tli, uint32 xlog_seg_size,
bool allocate_reader);
static void CleanupXLogPageRead(XLogReaderState *xlogreader);
static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data,
int elevel);
static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel);
static XLogSegNo nextSegNoToRead = 0;
static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER;
@ -239,11 +238,17 @@ doExtractPageMap(void *arg)
*/
if (XLogRecPtrIsInvalid(found))
{
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X. %s",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint),
(xlogreader->errormsg_buf[0] != '\0')?xlogreader->errormsg_buf:"");
if (xlogreader->errormsg_buf[0] != '\0')
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint),
xlogreader->errormsg_buf);
else
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint));
PrintXLogCorruptionMsg(private_data, ERROR);
}
extract_arg->startpoint = found;
@ -766,6 +771,104 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
return res;
}
/*
* Get last valid LSN within the WAL segment with number 'segno'. If 'start_lsn'
* is in the segment with number 'segno' then start from 'start_lsn', otherwise
* start from offset 0 within the segment.
*/
XLogRecPtr
get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
XLogSegNo segno, TimeLineID tli, uint32 seg_size)
{
XLogReaderState *xlogreader;
XLogPageReadPrivate private;
XLogRecPtr startpoint;
XLogSegNo start_segno;
XLogRecPtr res = InvalidXLogRecPtr;
if (segno == 0)
elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
elog(LOG, "Retreiving last LSN of the segment with number " UINT64_FORMAT,
segno);
xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true);
/*
* Calculate startpoint. Decide: we should use 'start_lsn' or offset 0.
*/
GetXLogSegNo(start_lsn, start_segno, seg_size);
if (start_segno == segno)
startpoint = start_lsn;
else
{
XLogRecPtr found;
GetXLogRecPtr(segno, 0, seg_size, startpoint);
found = XLogFindNextRecord(xlogreader, startpoint);
if (XLogRecPtrIsInvalid(found))
{
if (xlogreader->errormsg_buf[0] != '\0')
elog(WARNING, "Could not read WAL record at %X/%X: %s",
(uint32) (startpoint >> 32), (uint32) (startpoint),
xlogreader->errormsg_buf);
else
elog(WARNING, "Could not read WAL record at %X/%X",
(uint32) (startpoint >> 32), (uint32) (startpoint));
PrintXLogCorruptionMsg(&private, ERROR);
}
startpoint = found;
}
elog(VERBOSE, "Starting LSN is %X/%X",
(uint32) (startpoint >> 32), (uint32) (startpoint));
while (true)
{
XLogRecord *record;
char *errormsg;
XLogSegNo next_segno = 0;
if (interrupted)
elog(ERROR, "Interrupted during WAL reading");
record = XLogReadRecord(xlogreader, startpoint, &errormsg);
if (record == NULL)
{
XLogRecPtr errptr;
errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr :
startpoint;
if (errormsg)
elog(WARNING, "Could not read WAL record at %X/%X: %s",
(uint32) (errptr >> 32), (uint32) (errptr),
errormsg);
else
elog(WARNING, "Could not read WAL record at %X/%X",
(uint32) (errptr >> 32), (uint32) (errptr));
PrintXLogCorruptionMsg(&private, ERROR);
}
res = xlogreader->ReadRecPtr;
/* continue reading at next record */
startpoint = InvalidXLogRecPtr;
GetXLogSegNo(xlogreader->EndRecPtr, next_segno, seg_size);
if (next_segno > segno)
break;
}
CleanupXLogPageRead(xlogreader);
XLogReaderFree(xlogreader);
elog(VERBOSE, "Last LSN is %X/%X", (uint32) (res >> 32), (uint32) (res));
return res;
}
#ifdef HAVE_LIBZ
/*
* Show error during work with compressed file
@ -1035,6 +1138,12 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel)
private_data->gz_xlogpath);
#endif
}
else
{
/* Cannot tell what happened specifically */
elog(elevel, "Thread [%d]: An error occured during WAL reading",
private_data->thread_num);
}
}
/*

View File

@ -576,6 +576,9 @@ extern bool read_recovery_info(const char *archivedir, TimeLineID tli,
TransactionId *recovery_xid);
extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
TimeLineID target_tli, uint32 seg_size);
extern XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
XLogSegNo segno, TimeLineID tli,
uint32 seg_size);
/* in util.c */
extern TimeLineID get_current_timeline(bool safe);