1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-17 21:18:00 +02:00

Merge branch 'master' into stable

This commit is contained in:
Anastasia 2018-08-17 16:31:39 +03:00
commit 96bd64dde7
6 changed files with 384 additions and 69 deletions

View File

@ -650,12 +650,7 @@ do_backup_instance(void)
* where this backup has started.
*/
extractPageMap(arclog_path, prev_backup->start_lsn, current.tli,
current.start_lsn,
/*
* For backup from master wait for previous segment.
* For backup from replica wait for current segment.
*/
!current.from_replica, backup_files_list);
current.start_lsn, backup_files_list);
}
else if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
{

View File

@ -86,6 +86,7 @@ static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime
typedef struct XLogPageReadPrivate
{
int thread_num;
const char *archivedir;
TimeLineID tli;
@ -106,7 +107,6 @@ typedef struct XLogPageReadPrivate
/* An argument for a thread function */
typedef struct
{
int thread_num;
XLogPageReadPrivate private_data;
XLogRecPtr startpoint;
@ -134,6 +134,55 @@ static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data,
static XLogSegNo nextSegNoToRead = 0;
static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER;
/*
* Do manual switch to the next WAL segment.
*
* Returns false if the reader reaches the end of a WAL segment list.
*/
static bool
switchToNextWal(XLogReaderState *xlogreader, xlog_thread_arg *arg)
{
XLogPageReadPrivate *private_data;
XLogRecPtr found;
private_data = (XLogPageReadPrivate *) xlogreader->private_data;
private_data->need_switch = false;
/* Critical section */
pthread_lock(&wal_segment_mutex);
Assert(nextSegNoToRead);
private_data->xlogsegno = nextSegNoToRead;
nextSegNoToRead++;
pthread_mutex_unlock(&wal_segment_mutex);
/* We've reached the end */
if (private_data->xlogsegno > arg->endSegNo)
return false;
/* Adjust next record position */
XLogSegNoOffsetToRecPtr(private_data->xlogsegno, 0, arg->startpoint);
/* Skip over the page header and contrecord if any */
found = XLogFindNextRecord(xlogreader, arg->startpoint);
/*
* We get invalid WAL record pointer usually when WAL segment is
* absent or is corrupted.
*/
if (XLogRecPtrIsInvalid(found))
{
elog(WARNING, "could not read WAL record at %X/%X",
(uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint));
PrintXLogCorruptionMsg(private_data, ERROR);
}
arg->startpoint = found;
elog(VERBOSE, "Thread [%d]: switched to LSN %X/%X",
private_data->thread_num,
(uint32) (arg->startpoint >> 32), (uint32) (arg->startpoint));
return true;
}
/*
* extractPageMap() worker.
*/
@ -144,18 +193,33 @@ doExtractPageMap(void *arg)
XLogPageReadPrivate *private_data;
XLogReaderState *xlogreader;
XLogSegNo nextSegNo = 0;
XLogRecPtr found;
char *errormsg;
private_data = &extract_arg->private_data;
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data);
if (xlogreader == NULL)
elog(ERROR, "out of memory");
elog(ERROR, "Thread [%d]: out of memory", private_data->thread_num);
xlogreader->system_identifier = system_identifier;
extract_arg->startpoint = XLogFindNextRecord(xlogreader,
extract_arg->startpoint);
found = XLogFindNextRecord(xlogreader, extract_arg->startpoint);
elog(VERBOSE, "Start LSN of thread %d: %X/%X",
extract_arg->thread_num,
/*
* We get invalid WAL record pointer usually when WAL segment is absent or
* is corrupted.
*/
if (XLogRecPtrIsInvalid(found))
{
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint));
PrintXLogCorruptionMsg(private_data, ERROR);
}
extract_arg->startpoint = found;
elog(VERBOSE, "Thread [%d]: Starting LSN: %X/%X",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint));
@ -167,7 +231,18 @@ doExtractPageMap(void *arg)
XLogRecord *record;
if (interrupted)
elog(ERROR, "Interrupted during WAL reading");
elog(ERROR, "Thread [%d]: Interrupted during WAL reading",
private_data->thread_num);
/*
* We need to switch to the next WAL segment after reading previous
* record. It may happen if we read contrecord.
*/
if (private_data->need_switch)
{
if (!switchToNextWal(xlogreader, extract_arg))
break;
}
record = XLogReadRecord(xlogreader, extract_arg->startpoint, &errormsg);
@ -176,34 +251,38 @@ doExtractPageMap(void *arg)
XLogRecPtr errptr;
/*
* Try to switch to the next WAL segment. Usually
* SimpleXLogPageRead() does it by itself. But here we need to do it
* manually to support threads.
* There is no record, try to switch to the next WAL segment.
* Usually SimpleXLogPageRead() does it by itself. But here we need
* to do it manually to support threads.
*/
if (private_data->need_switch)
if (private_data->need_switch && errormsg == NULL)
{
private_data->need_switch = false;
/* Critical section */
pthread_lock(&wal_segment_mutex);
Assert(nextSegNoToRead);
private_data->xlogsegno = nextSegNoToRead;
nextSegNoToRead++;
pthread_mutex_unlock(&wal_segment_mutex);
/* We reach the end */
if (private_data->xlogsegno > extract_arg->endSegNo)
if (switchToNextWal(xlogreader, extract_arg))
continue;
else
break;
/* Adjust next record position */
XLogSegNoOffsetToRecPtr(private_data->xlogsegno, 0,
extract_arg->startpoint);
/* Skip over the page header */
extract_arg->startpoint = XLogFindNextRecord(xlogreader,
extract_arg->startpoint);
found = XLogFindNextRecord(xlogreader, extract_arg->startpoint);
/*
* We get invalid WAL record pointer usually when WAL segment is
* absent or is corrupted.
*/
if (XLogRecPtrIsInvalid(found))
{
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint));
PrintXLogCorruptionMsg(private_data, ERROR);
}
extract_arg->startpoint = found;
elog(VERBOSE, "Thread %d switched to LSN %X/%X",
extract_arg->thread_num,
elog(VERBOSE, "Thread [%d]: switched to LSN %X/%X",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint));
@ -214,11 +293,13 @@ doExtractPageMap(void *arg)
extract_arg->startpoint : xlogreader->EndRecPtr;
if (errormsg)
elog(WARNING, "could not read WAL record at %X/%X: %s",
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s",
private_data->thread_num,
(uint32) (errptr >> 32), (uint32) (errptr),
errormsg);
else
elog(WARNING, "could not read WAL record at %X/%X",
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X",
private_data->thread_num,
(uint32) (errptr >> 32), (uint32) (errptr));
/*
@ -236,7 +317,7 @@ doExtractPageMap(void *arg)
XLByteToSeg(xlogreader->EndRecPtr, nextSegNo);
} while (nextSegNo <= extract_arg->endSegNo &&
xlogreader->EndRecPtr < extract_arg->endpoint);
xlogreader->ReadRecPtr < extract_arg->endpoint);
CleanupXLogPageRead(xlogreader);
XLogReaderFree(xlogreader);
@ -250,15 +331,12 @@ doExtractPageMap(void *arg)
* Read WAL from the archive directory, from 'startpoint' to 'endpoint' on the
* given timeline. Collect data blocks touched by the WAL records into a page map.
*
* If **prev_segno** is true then read all segments up to **endpoint** segment
* minus one. Else read all segments up to **endpoint** segment.
*
* Pagemap extracting is processed using threads. Eeach thread reads single WAL
* file.
*/
void
extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
XLogRecPtr endpoint, bool prev_seg, parray *files)
XLogRecPtr endpoint, parray *files)
{
int i;
int threads_need = 0;
@ -279,8 +357,6 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
(uint32) (endpoint >> 32), (uint32) (endpoint));
XLByteToSeg(endpoint, endSegNo);
if (prev_seg)
endSegNo--;
nextSegNoToRead = 0;
time(&start_time);
@ -297,7 +373,7 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
for (i = 0; i < num_threads; i++)
{
InitXLogPageRead(&thread_args[i].private_data, archivedir, tli, false);
thread_args[i].thread_num = i;
thread_args[i].private_data.thread_num = i + 1;
thread_args[i].startpoint = startpoint;
thread_args[i].endpoint = endpoint;
@ -324,7 +400,7 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
/* Run threads */
for (i = 0; i < threads_need; i++)
{
elog(VERBOSE, "Start WAL reader thread: %d", i);
elog(VERBOSE, "Start WAL reader thread: %d", i + 1);
pthread_create(&threads[i], NULL, doExtractPageMap, &thread_args[i]);
}
@ -716,15 +792,38 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
*/
if (!XLByteInSeg(targetPagePtr, private_data->xlogsegno))
{
CleanupXLogPageRead(xlogreader);
elog(VERBOSE, "Need to switch to segno next to %X/%X, current LSN %X/%X",
(uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr),
(uint32) (xlogreader->currRecPtr >> 32),
(uint32) (xlogreader->currRecPtr ));
/*
* Do not switch to next WAL segment in this function. Currently it is
* manually switched only in doExtractPageMap().
* if the last record on the page is not complete,
* we must continue reading pages in the same thread
*/
if (private_data->manual_switch)
if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) &&
xlogreader->currRecPtr < targetPagePtr)
{
private_data->need_switch = true;
return -1;
CleanupXLogPageRead(xlogreader);
/*
* Switch to the next WAL segment after reading contrecord.
*/
if (private_data->manual_switch)
private_data->need_switch = true;
}
else
{
CleanupXLogPageRead(xlogreader);
/*
* Do not switch to next WAL segment in this function. Currently it is
* manually switched only in doExtractPageMap().
*/
if (private_data->manual_switch)
{
private_data->need_switch = true;
return -1;
}
}
}
@ -741,7 +840,9 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (fileExists(private_data->xlogpath))
{
elog(LOG, "Opening WAL segment \"%s\"", private_data->xlogpath);
elog(LOG, "Thread [%d]: Opening WAL segment \"%s\"",
private_data->thread_num,
private_data->xlogpath);
private_data->xlogexists = true;
private_data->xlogfile = open(private_data->xlogpath,
@ -749,8 +850,10 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (private_data->xlogfile < 0)
{
elog(WARNING, "Could not open WAL segment \"%s\": %s",
private_data->xlogpath, strerror(errno));
elog(WARNING, "Thread [%d]: Could not open WAL segment \"%s\": %s",
private_data->thread_num,
private_data->xlogpath,
strerror(errno));
return -1;
}
}
@ -763,16 +866,16 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
private_data->xlogpath);
if (fileExists(private_data->gz_xlogpath))
{
elog(LOG, "Opening compressed WAL segment \"%s\"",
private_data->gz_xlogpath);
elog(LOG, "Thread [%d]: Opening compressed WAL segment \"%s\"",
private_data->thread_num, private_data->gz_xlogpath);
private_data->xlogexists = true;
private_data->gz_xlogfile = gzopen(private_data->gz_xlogpath,
"rb");
if (private_data->gz_xlogfile == NULL)
{
elog(WARNING, "Could not open compressed WAL segment \"%s\": %s",
private_data->gz_xlogpath, strerror(errno));
elog(WARNING, "Thread [%d]: Could not open compressed WAL segment \"%s\": %s",
private_data->thread_num, private_data->gz_xlogpath, strerror(errno));
return -1;
}
}
@ -794,15 +897,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
{
if (lseek(private_data->xlogfile, (off_t) targetPageOff, SEEK_SET) < 0)
{
elog(WARNING, "Could not seek in WAL segment \"%s\": %s",
private_data->xlogpath, strerror(errno));
elog(WARNING, "Thread [%d]: Could not seek in WAL segment \"%s\": %s",
private_data->thread_num, private_data->xlogpath, strerror(errno));
return -1;
}
if (read(private_data->xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
elog(WARNING, "Could not read from WAL segment \"%s\": %s",
private_data->xlogpath, strerror(errno));
elog(WARNING, "Thread [%d]: Could not read from WAL segment \"%s\": %s",
private_data->thread_num, private_data->xlogpath, strerror(errno));
return -1;
}
}
@ -811,7 +914,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
{
if (gzseek(private_data->gz_xlogfile, (z_off_t) targetPageOff, SEEK_SET) == -1)
{
elog(WARNING, "Could not seek in compressed WAL segment \"%s\": %s",
elog(WARNING, "Thread [%d]: Could not seek in compressed WAL segment \"%s\": %s",
private_data->thread_num,
private_data->gz_xlogpath,
get_gz_error(private_data->gz_xlogfile));
return -1;
@ -819,7 +923,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (gzread(private_data->gz_xlogfile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
elog(WARNING, "Could not read from compressed WAL segment \"%s\": %s",
elog(WARNING, "Thread [%d]: Could not read from compressed WAL segment \"%s\": %s",
private_data->thread_num,
private_data->gz_xlogpath,
get_gz_error(private_data->gz_xlogfile));
return -1;
@ -850,6 +955,7 @@ InitXLogPageRead(XLogPageReadPrivate *private_data, const char *archivedir,
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data);
if (xlogreader == NULL)
elog(ERROR, "out of memory");
xlogreader->system_identifier = system_identifier;
}
return xlogreader;
@ -889,15 +995,19 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel)
* We throw a WARNING here to be able to update backup status.
*/
if (!private_data->xlogexists)
elog(elevel, "WAL segment \"%s\" is absent", private_data->xlogpath);
elog(elevel, "Thread [%d]: WAL segment \"%s\" is absent",
private_data->thread_num,
private_data->xlogpath);
else if (private_data->xlogfile != -1)
elog(elevel, "Possible WAL corruption. "
elog(elevel, "Thread [%d]: Possible WAL corruption. "
"Error has occured during reading WAL segment \"%s\"",
private_data->thread_num,
private_data->xlogpath);
#ifdef HAVE_LIBZ
else if (private_data->gz_xlogfile != NULL)
elog(elevel, "Possible WAL corruption. "
elog(elevel, "Thread [%d]: Possible WAL corruption. "
"Error has occured during reading WAL segment \"%s\"",
private_data->thread_num,
private_data->gz_xlogpath);
#endif
}

View File

@ -19,7 +19,7 @@
#include <unistd.h>
#include "pg_getopt.h"
const char *PROGRAM_VERSION = "2.0.18";
const char *PROGRAM_VERSION = "2.0.19";
const char *PROGRAM_URL = "https://github.com/postgrespro/pg_probackup";
const char *PROGRAM_EMAIL = "https://github.com/postgrespro/pg_probackup/issues";

View File

@ -540,7 +540,7 @@ extern bool calc_file_checksum(pgFile *file);
extern void extractPageMap(const char *datadir,
XLogRecPtr startpoint,
TimeLineID tli,
XLogRecPtr endpoint, bool prev_seg,
XLogRecPtr endpoint,
parray *backup_files_list);
extern void validate_wal(pgBackup *backup,
const char *archivedir,

View File

@ -1 +1 @@
pg_probackup 2.0.18
pg_probackup 2.0.19

View File

@ -726,7 +726,8 @@ class PageBackupTest(ProbackupTest, unittest.TestCase):
try:
self.backup_node(
backup_dir, 'node', node,
backup_type='page', options=["-j", "4"])
backup_type='page',
options=["-j", "4", '--log-level-file=verbose'])
self.assertEqual(
1, 0,
"Expecting Error because of wal segment disappearance.\n "
@ -749,3 +750,212 @@ class PageBackupTest(ProbackupTest, unittest.TestCase):
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_page_backup_with_corrupted_wal_segment(self):
"""
make node with archiving
make archive backup, then generate some wals with pgbench,
corrupt latest archived wal segment
run page backup, expecting error because of missing wal segment
make sure that backup status is 'ERROR'
"""
fname = self.id().split('.')[3]
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
initdb_params=['--data-checksums'],
pg_options={'wal_level': 'replica'}
)
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
node.start()
self.backup_node(backup_dir, 'node', node)
# make some wals
node.pgbench_init(scale=3)
# delete last wal segment
wals_dir = os.path.join(backup_dir, 'wal', 'node')
wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(
wals_dir, f)) and not f.endswith('.backup')]
wals = map(str, wals)
# file = os.path.join(wals_dir, max(wals))
file = os.path.join(wals_dir, '000000010000000000000004')
print(file)
with open(file, "rb+", 0) as f:
f.seek(42)
f.write(b"blah")
f.flush()
f.close
if self.archive_compress:
file = file[:-3]
# Single-thread PAGE backup
try:
self.backup_node(
backup_dir, 'node', node,
backup_type='page', options=['--log-level-file=verbose'])
self.assertEqual(
1, 0,
"Expecting Error because of wal segment disappearance.\n "
"Output: {0} \n CMD: {1}".format(
self.output, self.cmd))
except ProbackupException as e:
self.assertTrue(
'INFO: Wait for LSN' in e.message and
'in archived WAL segment' in e.message and
'WARNING: could not read WAL record at' in e.message and
'incorrect resource manager data checksum in record at' in e.message and
'ERROR: Possible WAL corruption. Error has occured during reading WAL segment "{0}"'.format(
file) in e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
repr(e.message), self.cmd))
self.assertEqual(
'ERROR',
self.show_pb(backup_dir, 'node')[1]['status'],
'Backup {0} should have STATUS "ERROR"')
# Multi-thread PAGE backup
try:
self.backup_node(
backup_dir, 'node', node,
backup_type='page', options=["-j", "4"])
self.assertEqual(
1, 0,
"Expecting Error because of wal segment disappearance.\n "
"Output: {0} \n CMD: {1}".format(
self.output, self.cmd))
except ProbackupException as e:
self.assertTrue(
'INFO: Wait for LSN' in e.message and
'in archived WAL segment' in e.message and
'WARNING: could not read WAL record at' in e.message and
'incorrect resource manager data checksum in record at' in e.message and
'ERROR: Possible WAL corruption. Error has occured during reading WAL segment "{0}"'.format(
file) in e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
repr(e.message), self.cmd))
self.assertEqual(
'ERROR',
self.show_pb(backup_dir, 'node')[2]['status'],
'Backup {0} should have STATUS "ERROR"')
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_page_backup_with_alien_wal_segment(self):
"""
make two nodes with archiving
take archive full backup from both nodes,
generate some wals with pgbench on both nodes,
move latest archived wal segment from second node to first node`s archive
run page backup on first node
expecting error because of alien wal segment
make sure that backup status is 'ERROR'
"""
fname = self.id().split('.')[3]
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
initdb_params=['--data-checksums'],
pg_options={'wal_level': 'replica'}
)
alien_node = self.make_simple_node(
base_dir="{0}/{1}/alien_node".format(module_name, fname)
)
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
node.start()
self.add_instance(backup_dir, 'alien_node', alien_node)
self.set_archiving(backup_dir, 'alien_node', alien_node)
alien_node.start()
self.backup_node(backup_dir, 'node', node)
self.backup_node(backup_dir, 'alien_node', alien_node)
# make some wals
node.safe_psql(
"postgres",
"create sequence t_seq; "
"create table t_heap as select i as id, "
"md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,1000) i;")
alien_node.safe_psql(
"postgres",
"create database alien")
alien_node.safe_psql(
"alien",
"create sequence t_seq; "
"create table t_heap_alien as select i as id, "
"md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,1000) i;")
# copy lastest wal segment
wals_dir = os.path.join(backup_dir, 'wal', 'alien_node')
wals = [f for f in os.listdir(wals_dir) if os.path.isfile(os.path.join(
wals_dir, f)) and not f.endswith('.backup')]
wals = map(str, wals)
filename = max(wals)
file = os.path.join(wals_dir, filename)
file_destination = os.path.join(
os.path.join(backup_dir, 'wal', 'node'), filename)
# file = os.path.join(wals_dir, '000000010000000000000004')
print(file)
print(file_destination)
os.rename(file, file_destination)
if self.archive_compress:
file = file[:-3]
# Single-thread PAGE backup
try:
self.backup_node(
backup_dir, 'node', node,
backup_type='page')
self.assertEqual(
1, 0,
"Expecting Error because of alien wal segment.\n "
"Output: {0} \n CMD: {1}".format(
self.output, self.cmd))
except ProbackupException as e:
print("SUCCESS")
self.assertEqual(
'ERROR',
self.show_pb(backup_dir, 'node')[1]['status'],
'Backup {0} should have STATUS "ERROR"')
# Multi-thread PAGE backup
try:
self.backup_node(
backup_dir, 'node', node,
backup_type='page', options=["-j", "4"])
self.assertEqual(
1, 0,
"Expecting Error because of alien wal segment.\n "
"Output: {0} \n CMD: {1}".format(
self.output, self.cmd))
except ProbackupException as e:
print("SUCCESS")
self.assertEqual(
'ERROR',
self.show_pb(backup_dir, 'node')[2]['status'],
'Backup {0} should have STATUS "ERROR"')
# Clean after yourself
self.del_test_dir(module_name, fname)