diff --git a/src/parsexlog.c b/src/parsexlog.c index 71e6ddc5..927f3605 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -141,12 +141,13 @@ static void * doExtractPageMap(void *arg) { xlog_thread_arg *extract_arg = (xlog_thread_arg *) arg; + XLogPageReadPrivate *private_data; XLogReaderState *xlogreader; XLogSegNo nextSegNo = 0; char *errormsg; - xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, - &extract_arg->private_data); + private_data = &extract_arg->private_data; + xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, private_data); if (xlogreader == NULL) elog(ERROR, "out of memory"); @@ -158,6 +159,9 @@ doExtractPageMap(void *arg) (uint32) (extract_arg->startpoint >> 32), (uint32) (extract_arg->startpoint)); + /* Switch WAL segment manually below without using SimpleXLogPageRead() */ + private_data->manual_switch = true; + do { XLogRecord *record; @@ -171,23 +175,28 @@ doExtractPageMap(void *arg) { XLogRecPtr errptr; - /* Try to switch to the next WAL segment */ - if (extract_arg->private_data.need_switch) + /* + * Try to switch to the next WAL segment. Usually + * SimpleXLogPageRead() does it by itself. But here we need to do it + * manually to support threads. + */ + if (private_data->need_switch) { - extract_arg->private_data.need_switch = false; + private_data->need_switch = false; + /* Critical section */ pthread_lock(&wal_segment_mutex); Assert(nextSegNoToRead); - extract_arg->private_data.xlogsegno = nextSegNoToRead; + private_data->xlogsegno = nextSegNoToRead; nextSegNoToRead++; pthread_mutex_unlock(&wal_segment_mutex); /* We reach the end */ - if (extract_arg->private_data.xlogsegno > extract_arg->endSegNo) + if (private_data->xlogsegno > extract_arg->endSegNo) break; /* Adjust next record position */ - XLogSegNoOffsetToRecPtr(extract_arg->private_data.xlogsegno, 0, + XLogSegNoOffsetToRecPtr(private_data->xlogsegno, 0, extract_arg->startpoint); /* Skip over the page header */ extract_arg->startpoint = XLogFindNextRecord(xlogreader, @@ -217,7 +226,7 @@ doExtractPageMap(void *arg) * start_lsn, we won't be able to build page map and PAGE backup will * be incorrect. Stop it and throw an error. */ - PrintXLogCorruptionMsg(&extract_arg->private_data, ERROR); + PrintXLogCorruptionMsg(private_data, ERROR); } extractPageInfo(xlogreader); @@ -255,8 +264,8 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli, int threads_need = 0; XLogSegNo endSegNo; bool extract_isok = true; - pthread_t threads[num_threads]; - xlog_thread_arg thread_args[num_threads]; + pthread_t *threads; + xlog_thread_arg *thread_args; time_t start_time, end_time; @@ -276,6 +285,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli, nextSegNoToRead = 0; time(&start_time); + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + thread_args = (xlog_thread_arg *) palloc(sizeof(xlog_thread_arg)*num_threads); + /* * Initialize thread args. * @@ -286,7 +298,6 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli, { InitXLogPageRead(&thread_args[i].private_data, archivedir, tli, false); thread_args[i].thread_num = i; - thread_args[i].private_data.manual_switch = true; thread_args[i].startpoint = startpoint; thread_args[i].endpoint = endpoint; @@ -327,6 +338,9 @@ extractPageMap(const char *archivedir, XLogRecPtr startpoint, TimeLineID tli, extract_isok = false; } + pfree(threads); + pfree(thread_args); + time(&end_time); if (extract_isok) elog(LOG, "Pagemap compiled, time elapsed %.0f sec", @@ -700,6 +714,10 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (!XLByteInSeg(targetPagePtr, private_data->xlogsegno)) { CleanupXLogPageRead(xlogreader); + /* + * Do not switch to next WAL segment in this function. Currently it is + * manually switched only in doExtractPageMap(). + */ if (private_data->manual_switch) { private_data->need_switch = true; @@ -709,6 +727,7 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLByteToSeg(targetPagePtr, private_data->xlogsegno); + /* Try to switch to the next WAL segment */ if (!private_data->xlogexists) { char xlogfname[MAXFNAMELEN]; diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index c430738c..3fd34de6 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -907,13 +907,26 @@ class ProbackupTest(object): return num def switch_wal_segment(self, node): - """ Execute pg_switch_wal/xlog() in given node""" - if self.version_to_num( - node.safe_psql("postgres", "show server_version") - ) >= self.version_to_num('10.0'): - node.safe_psql("postgres", "select pg_switch_wal()") + """ + Execute pg_switch_wal/xlog() in given node + + Args: + node: an instance of PostgresNode or NodeConnection class + """ + if isinstance(node, testgres.PostgresNode): + if self.version_to_num( + node.safe_psql("postgres", "show server_version") + ) >= self.version_to_num('10.0'): + node.safe_psql("postgres", "select pg_switch_wal()") + else: + node.safe_psql("postgres", "select pg_switch_xlog()") else: - node.safe_psql("postgres", "select pg_switch_xlog()") + if self.version_to_num( + node.execute("show server_version")[0][0] + ) >= self.version_to_num('10.0'): + node.execute("select pg_switch_wal()") + else: + node.execute("select pg_switch_xlog()") sleep(1) def get_version(self, node): diff --git a/tests/page.py b/tests/page.py index 1ad1a75d..ef7122b6 100644 --- a/tests/page.py +++ b/tests/page.py @@ -528,9 +528,13 @@ class PageBackupTest(ProbackupTest, unittest.TestCase): "hot_standby": "on" } ) + node_restored = self.make_simple_node( + base_dir="{0}/{1}/node_restored".format(module_name, fname), + ) self.init_pb(backup_dir) self.add_instance(backup_dir, 'node', node) + node_restored.cleanup() self.set_archiving(backup_dir, 'node', node) node.start() @@ -542,13 +546,14 @@ class PageBackupTest(ProbackupTest, unittest.TestCase): self.assertEqual(show_backup['backup-mode'], "FULL") # Fill instance with data and make several WAL segments ... - node.safe_psql("postgres", "create table test (id int)") - for x in range(0, 8): - node.safe_psql( - "postgres", - "insert into test select i from generate_series(1,100) s(i)") - self.switch_wal_segment(node) - count1 = node.safe_psql("postgres", "select count(*) from test") + with node.connect() as conn: + conn.execute("create table test (id int)") + for x in range(0, 8): + conn.execute( + "insert into test select i from generate_series(1,100) s(i)") + conn.commit() + self.switch_wal_segment(conn) + count1 = conn.execute("select count(*) from test") # ... and do page backup with parallel pagemap self.backup_node( @@ -558,18 +563,29 @@ class PageBackupTest(ProbackupTest, unittest.TestCase): self.assertEqual(show_backup['status'], "OK") self.assertEqual(show_backup['backup-mode'], "PAGE") - # Drop node and restore it - node.cleanup() - self.restore_node(backup_dir, 'node', node) - node.start() + if self.paranoia: + pgdata = self.pgdata_content(node.data_dir) + + # Restore it + self.restore_node(backup_dir, 'node', node_restored) + + # Physical comparison + if self.paranoia: + pgdata_restored = self.pgdata_content(node_restored.data_dir) + self.compare_pgdata(pgdata, pgdata_restored) + + node_restored.append_conf( + "postgresql.auto.conf", "port = {0}".format(node_restored.port)) + node_restored.start() # Check restored node - count2 = node.safe_psql("postgres", "select count(*) from test") + count2 = node_restored.execute("postgres", "select count(*) from test") self.assertEqual(count1, count2) # Clean after yourself node.cleanup() + node_restored.cleanup() self.del_test_dir(module_name, fname) def test_parallel_pagemap_1(self):