From 0de4ec09aba4713fd41da5aad39c81086bee7cab Mon Sep 17 00:00:00 2001 From: Grigory Smolkin Date: Wed, 10 Jan 2018 16:03:33 +0300 Subject: [PATCH] tests: PGPRO-1230 ptrack race condition tests --- tests/Readme.md | 2 +- tests/helpers/ptrack_helpers.py | 150 +++++++++++++++--- tests/ptrack.py | 267 +++++++++++++++++++++++++++++--- 3 files changed, 378 insertions(+), 41 deletions(-) diff --git a/tests/Readme.md b/tests/Readme.md index 2a514410..63eefd7d 100644 --- a/tests/Readme.md +++ b/tests/Readme.md @@ -14,7 +14,7 @@ Check archive compression: export ARCHIVE_COMPRESSION=ON Usage: - pip install testgres=0.4.0 + pip install testgres==0.4.0 export PG_CONFIG=/path/to/pg_config python -m unittest [-v] tests[.specific_module][.class.test] ``` diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index 84012dc4..452e7359 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -8,6 +8,9 @@ import testgres import hashlib import re import pwd +import select +import psycopg2 +from time import sleep idx_ptrack = { @@ -946,6 +949,34 @@ class ProbackupTest(object): fail = True self.assertFalse(fail, error_message) + def get_asyc_connect(self, database=None, host=None, port=5432): + if not database: + database = 'postgres' + if not host: + host = '127.0.0.1' + + return psycopg2.connect( + database="postgres", + host='127.0.0.1', + port=port, + async=True + ) + + def wait(self, connection): + while True: + state = connection.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [connection.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([connection.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s" % state) + + def gdb_attach(self, pid): + return GDBobj([str(pid)], self.verbose, attach=True) + class GdbException(Exception): def __init__(self, message=False): @@ -956,7 +987,7 @@ class GdbException(Exception): class GDBobj(ProbackupTest): - def __init__(self, cmd, verbose): + def __init__(self, cmd, verbose, attach=False): self.verbose = verbose # Check gdb presense @@ -972,8 +1003,12 @@ class GDBobj(ProbackupTest): 'gdb', '--interpreter', 'mi2', - '--args' - ] + cmd + ] + + if attach: + self.cmd = self.base_cmd + ['--pid'] + cmd + else: + self.cmd = self.base_cmd + ['--args'] + cmd # Get version gdb_version_number = re.search( @@ -981,15 +1016,19 @@ class GDBobj(ProbackupTest): gdb_version) self.major_version = int(gdb_version_number.group(1)) self.minor_version = int(gdb_version_number.group(2)) + if self.verbose: - print([' '.join(map(str, self.base_cmd))]) + print([' '.join(map(str, self.cmd))]) + + print(self.cmd) self.proc = subprocess.Popen( - self.base_cmd, + self.cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - bufsize=0, universal_newlines=True + bufsize=0, + universal_newlines=True ) self.gdb_pid = self.proc.pid @@ -997,6 +1036,10 @@ class GDBobj(ProbackupTest): # is there a way to do it a less derpy way? while True: line = self.proc.stdout.readline() + + if 'No such process' in line: + raise GdbException(line) + if not line.startswith('(gdb)'): pass else: @@ -1004,52 +1047,115 @@ class GDBobj(ProbackupTest): def set_breakpoint(self, location): result = self._execute('break ' + location) - success = False for line in result: + print(line) if line.startswith('~"Breakpoint'): - success = True - break - if line.startswith('^error') or line.startswith('(gdb)'): + return + + elif line.startswith('^error') or line.startswith('(gdb)'): break - if line.startswith('&"break'): + elif line.startswith('&"break'): pass - if line.startswith('&"Function'): + elif line.startswith('&"Function'): raise GdbException(line) - if line.startswith('&"No line'): + elif line.startswith('&"No line'): raise GdbException(line) - return success - def run(self): - result = self._execute('run') + elif line.startswith('~"Make breakpoint pending on future shared'): + raise GdbException(line) + + raise GdbException( + 'Failed to set breakpoint.\n Output:\n {0}'.format(result) + ) + + def run_until_break(self): + result = self._execute('run', False) for line in result: if line.startswith('*stopped,reason="breakpoint-hit"'): - return 'breakpoint-hit' - if line.startswith('*stopped,reason="exited-normally"'): - return 'exit correct' + return + raise GdbException( + 'Failed to run until breakpoint.\n' + ) - def continue_execution(self, sync=True): + def continue_execution_until_running(self): result = self._execute('continue') + + running = False for line in result: + if line.startswith('*running'): + running = True + break + if line.startswith('*stopped,reason="breakpoint-hit"'): + running = False + continue + if line.startswith('*stopped,reason="exited-normally"'): + running = False + continue + return running + + def continue_execution_until_exit(self): + result = self._execute('continue', False) + + for line in result: + if line.startswith('*running'): + continue + if line.startswith('*stopped,reason="breakpoint-hit"'): + continue + if line.startswith('*stopped,reason="exited-normally"'): + return + raise GdbException( + 'Failed to continue execution until exit.\n' + ) + + def continue_execution_until_break(self, ignore_count=0): + if ignore_count > 0: + result = self._execute( + 'continue ' + str(ignore_count), + False + ) + else: + result = self._execute('continue', False) + + running = False + for line in result: + if line.startswith('*running'): + running = True if line.startswith('*stopped,reason="breakpoint-hit"'): return 'breakpoint-hit' if line.startswith('*stopped,reason="exited-normally"'): - return 'exit correct' + return 'exited-normally' + if running: + return 'running' + + def stopped_in_breakpoint(self): + output = [] + while True: + line = self.proc.stdout.readline() + output += [line] + if self.verbose: + print(line) + if line.startswith('*stopped,reason="breakpoint-hit"'): + return True + return False # use for breakpoint, run, continue - def _execute(self, cmd): + def _execute(self, cmd, running=True): output = [] self.proc.stdin.flush() self.proc.stdin.write(cmd + '\n') self.proc.stdin.flush() while True: + sleep(1) line = self.proc.stdout.readline() output += [line] if self.verbose: print(line) if line == '^done\n' or line.startswith('*stopped'): break + if running and line.startswith('*running'): + break return output diff --git a/tests/ptrack.py b/tests/ptrack.py index d6c18177..eb7260d2 100644 --- a/tests/ptrack.py +++ b/tests/ptrack.py @@ -128,6 +128,245 @@ class PtrackBackupTest(ProbackupTest, unittest.TestCase): # Clean after yourself self.del_test_dir(module_name, fname) + # @unittest.skip("skip") + def test_ptrack_uncommited_xact(self): + """make ptrack backup while there is uncommited open transaction""" + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir="{0}/{1}/node".format(module_name, fname), + set_replication=True, + initdb_params=['--data-checksums'], + pg_options={ + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'checkpoint_timeout': '300s', + 'ptrack_enable': 'on' + } + ) + node_restored = self.make_simple_node( + base_dir="{0}/{1}/node_restored".format(module_name, fname), + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node) + node_restored.cleanup() + node.start() + + self.backup_node(backup_dir, 'node', node) + con = node.connect("postgres") + con.execute( + "create table t_heap as select i" + " as id from generate_series(0,1) i" + ) + + self.backup_node( + backup_dir, 'node', node, backup_type='ptrack', + options=['--stream', '--log-level-file=verbose'] + ) + + self.backup_node( + backup_dir, 'node', node, backup_type='ptrack', + options=['--stream', '--log-level-file=verbose'] + ) + + pgdata = self.pgdata_content(node.data_dir) + + self.restore_node( + backup_dir, 'node', node_restored, options=["-j", "4"]) + pgdata_restored = self.pgdata_content(node_restored.data_dir) + node_restored.append_conf( + "postgresql.auto.conf", "port = {0}".format(node_restored.port)) + + node_restored.start() + + # Physical comparison + self.compare_pgdata(pgdata, pgdata_restored) + + # Clean after yourself + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + def test_ptrack_vacuum_full(self): + """make node, make full and ptrack stream backups, + restore them and check data correctness""" + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir="{0}/{1}/node".format(module_name, fname), + set_replication=True, + initdb_params=['--data-checksums'], + pg_options={ + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'checkpoint_timeout': '300s', + 'ptrack_enable': 'on' + } + ) + node_restored = self.make_simple_node( + base_dir="{0}/{1}/node_restored".format(module_name, fname), + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node) + node_restored.cleanup() + node.start() + self.create_tblspace_in_node(node, 'somedata') + + self.backup_node(backup_dir, 'node', node) + + node.safe_psql( + "postgres", + "create table t_heap tablespace somedata as select i" + " as id from generate_series(0,1000000) i" + ) + + # create async connection + conn = self.get_asyc_connect(port=node.port) + + self.wait(conn) + + acurs = conn.cursor() + acurs.execute("select pg_backend_pid()") + + self.wait(conn) + pid = acurs.fetchall()[0][0] + print(pid) + + gdb = self.gdb_attach(pid) + gdb.set_breakpoint('reform_and_rewrite_tuple') + + if not gdb.continue_execution_until_running(): + print('Failed gdb continue') + exit(1) + + acurs.execute("VACUUM FULL t_heap") + + if gdb.stopped_in_breakpoint(): + if gdb.continue_execution_until_break(20) != 'breakpoint-hit': + print('Failed to hit breakpoint') + exit(1) + + self.backup_node( + backup_dir, 'node', node, backup_type='ptrack', + options=['--stream', '--log-level-file=verbose'] + ) + + self.backup_node( + backup_dir, 'node', node, backup_type='ptrack', + options=['--stream', '--log-level-file=verbose'] + ) + + pgdata = self.pgdata_content(node.data_dir) + + old_tablespace = self.get_tblspace_path(node, 'somedata') + new_tablespace = self.get_tblspace_path(node_restored, 'somedata_new') + + self.restore_node( + backup_dir, 'node', node_restored, + options=["-j", "4", "-T", "{0}={1}".format( + old_tablespace, new_tablespace)] + ) + pgdata_restored = self.pgdata_content(node_restored.data_dir) + node_restored.append_conf( + "postgresql.auto.conf", "port = {0}".format(node_restored.port)) + + node_restored.start() + + # Physical comparison + self.compare_pgdata(pgdata, pgdata_restored) + + # Clean after yourself + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + def test_ptrack_vacuum_truncate(self): + """make node, create table, take full backup, + delete last 3 pages, vacuum relation, + take ptrack backup, take second ptrack backup, + restore last ptrack backup and check data correctness""" + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir="{0}/{1}/node".format(module_name, fname), + set_replication=True, + initdb_params=['--data-checksums'], + pg_options={ + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'checkpoint_timeout': '300s', + 'ptrack_enable': 'on' + } + ) + node_restored = self.make_simple_node( + base_dir="{0}/{1}/node_restored".format(module_name, fname), + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node) + node_restored.cleanup() + node.start() + self.create_tblspace_in_node(node, 'somedata') + + node.safe_psql( + "postgres", + "create sequence t_seq; " + "create table t_heap tablespace somedata as select i as id, " + "md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,1024) i;" + ) + node.safe_psql( + "postgres", + "vacuum t_heap" + ) + + self.backup_node(backup_dir, 'node', node) + + node.safe_psql( + "postgres", + "delete from t_heap where ctid >= '(11,0)'" + ) + node.safe_psql( + "postgres", + "vacuum t_heap" + ) + + self.backup_node( + backup_dir, 'node', node, backup_type='ptrack', + options=['--stream', '--log-level-file=verbose'] + ) + + self.backup_node( + backup_dir, 'node', node, backup_type='ptrack', + options=['--stream', '--log-level-file=verbose'] + ) + + pgdata = self.pgdata_content(node.data_dir) + + old_tablespace = self.get_tblspace_path(node, 'somedata') + new_tablespace = self.get_tblspace_path(node_restored, 'somedata_new') + + self.restore_node( + backup_dir, 'node', node_restored, + options=["-j", "4", "-T", "{0}={1}".format( + old_tablespace, new_tablespace)] + ) + pgdata_restored = self.pgdata_content(node_restored.data_dir) + node_restored.append_conf( + "postgresql.auto.conf", "port = {0}".format(node_restored.port)) + + node_restored.start() + + # Physical comparison + self.compare_pgdata(pgdata, pgdata_restored) + + # Clean after yourself + self.del_test_dir(module_name, fname) + # @unittest.skip("skip") def test_ptrack_simple(self): """make node, make full and ptrack stream backups," @@ -179,9 +418,11 @@ class PtrackBackupTest(ProbackupTest, unittest.TestCase): pgdata = self.pgdata_content(node.data_dir) result = node.safe_psql("postgres", "SELECT * FROM t_heap") - self.restore_node(backup_dir, 'node', node_restored, options=["-j", "4"]) + self.restore_node( + backup_dir, 'node', node_restored, options=["-j", "4"]) pgdata_restored = self.pgdata_content(node_restored.data_dir) - node_restored.append_conf("postgresql.auto.conf", "port = {0}".format(node_restored.port)) + node_restored.append_conf( + "postgresql.auto.conf", "port = {0}".format(node_restored.port)) node_restored.start() # Logical comparison @@ -194,7 +435,7 @@ class PtrackBackupTest(ProbackupTest, unittest.TestCase): self.compare_pgdata(pgdata, pgdata_restored) # Clean after yourself - # self.del_test_dir(module_name, fname) + self.del_test_dir(module_name, fname) # @unittest.skip("skip") def test_ptrack_get_block(self): @@ -231,24 +472,14 @@ class PtrackBackupTest(ProbackupTest, unittest.TestCase): gdb=True ) - if gdb.set_breakpoint('make_pagemap_from_ptrack'): - result = gdb.run() - else: - self.assertTrue(False, 'Failed to set gdb breakpoint') - - if result != 'breakpoint-hit': - print('Error in hitting breaking point') - sys.exit(1) + gdb.set_breakpoint('make_pagemap_from_ptrack') + gdb.run_until_break() node.safe_psql( "postgres", "update t_heap set id = 100500") - if not gdb.continue_execution(): - self.assertTrue( - False, - 'Failed to continue execution after breakpoint' - ) + gdb.continue_execution_until_exit() self.backup_node( backup_dir, 'node', node, @@ -271,7 +502,7 @@ class PtrackBackupTest(ProbackupTest, unittest.TestCase): self.compare_pgdata(pgdata, pgdata_restored) # Clean after yourself - # self.del_test_dir(module_name, fname) + self.del_test_dir(module_name, fname) # @unittest.skip("skip") def test_ptrack_stream(self): @@ -802,7 +1033,7 @@ class PtrackBackupTest(ProbackupTest, unittest.TestCase): self.compare_pgdata(pgdata, pgdata_restored) # Clean after yourself - # self.del_test_dir(module_name, fname) + self.del_test_dir(module_name, fname) # @unittest.skip("skip") def test_alter_database_set_tablespace_ptrack(self):