1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-17 21:18:00 +02:00

PGPRO-427: Fix bug in SimpleXLogPageRead() with parallel mode

This commit is contained in:
Arthur Zakirov 2018-07-25 14:17:04 +03:00
parent e8037dd3b0
commit 6d912340d6
3 changed files with 78 additions and 30 deletions

View File

@ -141,12 +141,13 @@ static void *
doExtractPageMap(void *arg)
{
xlog_thread_arg *extract_arg = (xlog_thread_arg *) arg;
XLogPageReadPrivate *private_data;
XLogReaderState *xlogreader;
XLogSegNo nextSegNo = 0;
char *errormsg;
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead,
&extract_arg->private_data);
private_data = &extract_arg->private_data;
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data);
if (xlogreader == NULL)
elog(ERROR, "out of memory");
@ -158,6 +159,9 @@ doExtractPageMap(void *arg)
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint));
/* Switch WAL segment manually below without using SimpleXLogPageRead() */
private_data->manual_switch = true;
do
{
XLogRecord *record;
@ -171,23 +175,28 @@ doExtractPageMap(void *arg)
{
XLogRecPtr errptr;
/* Try to switch to the next WAL segment */
if (extract_arg->private_data.need_switch)
/*
* Try to switch to the next WAL segment. Usually
* SimpleXLogPageRead() does it by itself. But here we need to do it
* manually to support threads.
*/
if (private_data->need_switch)
{
extract_arg->private_data.need_switch = false;
private_data->need_switch = false;
/* Critical section */
pthread_lock(&wal_segment_mutex);
Assert(nextSegNoToRead);
extract_arg->private_data.xlogsegno = nextSegNoToRead;
private_data->xlogsegno = nextSegNoToRead;
nextSegNoToRead++;
pthread_mutex_unlock(&wal_segment_mutex);
/* We reach the end */
if (extract_arg->private_data.xlogsegno > extract_arg->endSegNo)
if (private_data->xlogsegno > extract_arg->endSegNo)
break;
/* Adjust next record position */
XLogSegNoOffsetToRecPtr(extract_arg->private_data.xlogsegno, 0,
XLogSegNoOffsetToRecPtr(private_data->xlogsegno, 0,
extract_arg->startpoint);
/* Skip over the page header */
extract_arg->startpoint = XLogFindNextRecord(xlogreader,
@ -217,7 +226,7 @@ doExtractPageMap(void *arg)
* start_lsn, we won't be able to build page map and PAGE backup will
* be incorrect. Stop it and throw an error.
*/
PrintXLogCorruptionMsg(&extract_arg->private_data, ERROR);
PrintXLogCorruptionMsg(private_data, ERROR);
}
extractPageInfo(xlogreader);
@ -255,8 +264,8 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
int threads_need = 0;
XLogSegNo endSegNo;
bool extract_isok = true;
pthread_t threads[num_threads];
xlog_thread_arg thread_args[num_threads];
pthread_t *threads;
xlog_thread_arg *thread_args;
time_t start_time,
end_time;
@ -276,6 +285,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
nextSegNoToRead = 0;
time(&start_time);
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
thread_args = (xlog_thread_arg *) palloc(sizeof(xlog_thread_arg)*num_threads);
/*
* Initialize thread args.
*
@ -286,7 +298,6 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
{
InitXLogPageRead(&thread_args[i].private_data, archivedir, tli, false);
thread_args[i].thread_num = i;
thread_args[i].private_data.manual_switch = true;
thread_args[i].startpoint = startpoint;
thread_args[i].endpoint = endpoint;
@ -327,6 +338,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli,
extract_isok = false;
}
pfree(threads);
pfree(thread_args);
time(&end_time);
if (extract_isok)
elog(LOG, "Pagemap compiled, time elapsed %.0f sec",
@ -700,6 +714,10 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (!XLByteInSeg(targetPagePtr, private_data->xlogsegno))
{
CleanupXLogPageRead(xlogreader);
/*
* Do not switch to next WAL segment in this function. Currently it is
* manually switched only in doExtractPageMap().
*/
if (private_data->manual_switch)
{
private_data->need_switch = true;
@ -709,6 +727,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
XLByteToSeg(targetPagePtr, private_data->xlogsegno);
/* Try to switch to the next WAL segment */
if (!private_data->xlogexists)
{
char xlogfname[MAXFNAMELEN];

View File

@ -907,13 +907,26 @@ class ProbackupTest(object):
return num
def switch_wal_segment(self, node):
""" Execute pg_switch_wal/xlog() in given node"""
if self.version_to_num(
node.safe_psql("postgres", "show server_version")
) >= self.version_to_num('10.0'):
node.safe_psql("postgres", "select pg_switch_wal()")
"""
Execute pg_switch_wal/xlog() in given node
Args:
node: an instance of PostgresNode or NodeConnection class
"""
if isinstance(node, testgres.PostgresNode):
if self.version_to_num(
node.safe_psql("postgres", "show server_version")
) >= self.version_to_num('10.0'):
node.safe_psql("postgres", "select pg_switch_wal()")
else:
node.safe_psql("postgres", "select pg_switch_xlog()")
else:
node.safe_psql("postgres", "select pg_switch_xlog()")
if self.version_to_num(
node.execute("show server_version")[0][0]
) >= self.version_to_num('10.0'):
node.execute("select pg_switch_wal()")
else:
node.execute("select pg_switch_xlog()")
sleep(1)
def get_version(self, node):

View File

@ -528,9 +528,13 @@ class PageBackupTest(ProbackupTest, unittest.TestCase):
"hot_standby": "on"
}
)
node_restored = self.make_simple_node(
base_dir="{0}/{1}/node_restored".format(module_name, fname),
)
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
node_restored.cleanup()
self.set_archiving(backup_dir, 'node', node)
node.start()
@ -542,13 +546,14 @@ class PageBackupTest(ProbackupTest, unittest.TestCase):
self.assertEqual(show_backup['backup-mode'], "FULL")
# Fill instance with data and make several WAL segments ...
node.safe_psql("postgres", "create table test (id int)")
for x in range(0, 8):
node.safe_psql(
"postgres",
"insert into test select i from generate_series(1,100) s(i)")
self.switch_wal_segment(node)
count1 = node.safe_psql("postgres", "select count(*) from test")
with node.connect() as conn:
conn.execute("create table test (id int)")
for x in range(0, 8):
conn.execute(
"insert into test select i from generate_series(1,100) s(i)")
conn.commit()
self.switch_wal_segment(conn)
count1 = conn.execute("select count(*) from test")
# ... and do page backup with parallel pagemap
self.backup_node(
@ -558,18 +563,29 @@ class PageBackupTest(ProbackupTest, unittest.TestCase):
self.assertEqual(show_backup['status'], "OK")
self.assertEqual(show_backup['backup-mode'], "PAGE")
# Drop node and restore it
node.cleanup()
self.restore_node(backup_dir, 'node', node)
node.start()
if self.paranoia:
pgdata = self.pgdata_content(node.data_dir)
# Restore it
self.restore_node(backup_dir, 'node', node_restored)
# Physical comparison
if self.paranoia:
pgdata_restored = self.pgdata_content(node_restored.data_dir)
self.compare_pgdata(pgdata, pgdata_restored)
node_restored.append_conf(
"postgresql.auto.conf", "port = {0}".format(node_restored.port))
node_restored.start()
# Check restored node
count2 = node.safe_psql("postgres", "select count(*) from test")
count2 = node_restored.execute("postgres", "select count(*) from test")
self.assertEqual(count1, count2)
# Clean after yourself
node.cleanup()
node_restored.cleanup()
self.del_test_dir(module_name, fname)
def test_parallel_pagemap_1(self):