From 6174a863cebdfe1712cf3f159f182273d269a08a Mon Sep 17 00:00:00 2001 From: "s.logvinenko" Date: Mon, 11 Dec 2017 15:31:16 +0300 Subject: [PATCH] fix auth_test for testgres 0.4.0 --- tests/auth_test.py | 138 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 111 insertions(+), 27 deletions(-) diff --git a/tests/auth_test.py b/tests/auth_test.py index ec5a1177..bbd20258 100644 --- a/tests/auth_test.py +++ b/tests/auth_test.py @@ -1,15 +1,14 @@ """ -Need to install pexpect. -pip install pexpect +The Test suite check behavior of pg_probackup utility, if password is required for connection to PostgreSQL instance. + - https://confluence.postgrespro.ru/pages/viewpage.action?pageId=16777522 """ import os import unittest -import tempfile import signal -from .helpers.ptrack_helpers import ProbackupTest -from testgres import StartNodeException, configure_testgres +from .helpers.ptrack_helpers import ProbackupTest, ProbackupException +from testgres import ClusterException as StartNodeException module_name = 'auth_test' skip_test = False @@ -33,7 +32,6 @@ class AuthTest(unittest.TestCase): cls.pb = ProbackupTest() cls.backup_dir = os.path.join(cls.pb.tmp_path, module_name, 'backup') - configure_testgres(cache_pg_config=False, cache_initdb=False) cls.node = cls.pb.make_simple_node( base_dir="{}/node".format(module_name), set_replication=True, @@ -44,7 +42,6 @@ class AuthTest(unittest.TestCase): ) modify_pg_hba(cls.node) - cls.backup_dir = os.path.join(tempfile.tempdir, "backups") cls.pb.init_pb(cls.backup_dir) cls.pb.add_instance(cls.backup_dir, cls.node.name, cls.node) cls.pb.set_archiving(cls.backup_dir, cls.node.name, cls.node) @@ -68,6 +65,7 @@ class AuthTest(unittest.TestCase): GRANT EXECUTE ON FUNCTION txid_snapshot_xmax(txid_snapshot) TO backup; \ GRANT EXECUTE ON FUNCTION pg_ptrack_clear() TO backup; \ GRANT EXECUTE ON FUNCTION pg_ptrack_get_and_clear(oid, oid) TO backup;") + cls.pgpass_file = os.path.join(os.path.expanduser('~'), '.pgpass') @classmethod def tearDownClass(cls): @@ -76,7 +74,7 @@ class AuthTest(unittest.TestCase): @unittest.skipIf(skip_test, "Module pexpect isn't installed. You need to install it.") def setUp(self): - self.cmd = [self.pb.probackup_path, 'backup', + self.cmd = ['backup', '-B', self.backup_dir, '--instance', self.node.name, '-h', '127.0.0.1', @@ -86,51 +84,115 @@ class AuthTest(unittest.TestCase): ] def tearDown(self): - pass + if "PGPASSWORD" in self.pb.test_env.keys(): + del self.pb.test_env["PGPASSWORD"] + + if "PGPASSWORD" in self.pb.test_env.keys(): + del self.pb.test_env["PGPASSFILE"] + + try: + os.remove(self.pgpass_file) + except OSError: + pass def test_empty_password(self): + """ Test case: PGPB_AUTH03 - zero password length """ try: self.assertIn("ERROR: no password supplied", - "".join(map(lambda x: x.decode("utf-8"), - run_pb_with_auth(self.cmd, '\0\r\n')) - ) + str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, '\0\r\n')) ) except (TIMEOUT, ExceptionPexpect) as e: self.fail(e.value) def test_wrong_password(self): + """ Test case: PGPB_AUTH04 - incorrect password """ try: self.assertIn("password authentication failed", - "".join(map(lambda x: x.decode("utf-8"), - run_pb_with_auth(self.cmd, 'wrong_password\r\n')) - ) + str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'wrong_password\r\n')) ) except (TIMEOUT, ExceptionPexpect) as e: self.fail(e.value) def test_right_password(self): + """ Test case: PGPB_AUTH01 - correct password """ try: self.assertIn("completed", - "".join(map(lambda x: x.decode("utf-8"), - run_pb_with_auth(self.cmd, 'password\r\n')) - ) + str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'password\r\n')) + ) + except (TIMEOUT, ExceptionPexpect) as e: + self.fail(e.value) + + def test_right_password_and_wrong_pgpass(self): + """ Test case: PGPB_AUTH05 - correct password and incorrect .pgpass""" + line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'wrong_password']) + create_pgpass(self.pgpass_file, line) + try: + self.assertIn("completed", + str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'password\r\n')) ) except (TIMEOUT, ExceptionPexpect) as e: self.fail(e.value) def test_ctrl_c_event(self): + """ Test case: PGPB_AUTH02 - send interrupt signal """ try: - run_pb_with_auth(self.cmd, kill=True) + run_pb_with_auth([self.pb.probackup_path] + self.cmd, kill=True) except TIMEOUT: self.fail("Error: CTRL+C event ignored") + def test_pgpassfile_env(self): + """ Test case: PGPB_AUTH06 - set environment var PGPASSFILE """ + path = os.path.join(self.pb.tmp_path, module_name, 'pgpass.conf') + line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'password']) + create_pgpass(path, line) + self.pb.test_env["PGPASSFILE"] = path + try: + self.assertEqual( + "OK", + self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"], + "ERROR: Full backup status is not valid." + ) + except ProbackupException as e: + self.fail(e) -def modify_pg_hba(node): - hba_conf = os.path.join(node.data_dir, "pg_hba.conf") - with open(hba_conf, 'r+') as fio: - data = fio.read() - fio.seek(0) - fio.write('host\tall\tpostgres\t127.0.0.1/0\ttrust\n' + data) + def test_pgpass(self): + """ Test case: PGPB_AUTH07 - Create file .pgpass in home dir. """ + line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'password']) + create_pgpass(self.pgpass_file, line) + try: + self.assertEqual( + "OK", + self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"], + "ERROR: Full backup status is not valid." + ) + except ProbackupException as e: + self.fail(e) + + def test_pgpassword(self): + """ Test case: PGPB_AUTH08 - set environment var PGPASSWORD """ + self.pb.test_env["PGPASSWORD"] = "password" + try: + self.assertEqual( + "OK", + self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"], + "ERROR: Full backup status is not valid." + ) + except ProbackupException as e: + self.fail(e) + + def test_pgpassword_and_wrong_pgpass(self): + """ Test case: PGPB_AUTH09 - Check priority between PGPASSWORD and .pgpass file""" + line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'wrong_password']) + create_pgpass(self.pgpass_file, line) + self.pb.test_env["PGPASSWORD"] = "password" + try: + self.assertEqual( + "OK", + self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"], + "ERROR: Full backup status is not valid." + ) + except ProbackupException as e: + self.fail(e) def run_pb_with_auth(cmd, password=None, kill=False): @@ -141,10 +203,32 @@ def run_pb_with_auth(cmd, password=None, kill=False): probackup.kill(signal.SIGINT) elif result == 0: probackup.sendline(password) - return probackup.readlines() + probackup.expect(EOF) + return probackup.before else: - raise TIMEOUT("") + raise ExceptionPexpect("Other pexpect errors.") except TIMEOUT: raise TIMEOUT("Timeout error.") except ExceptionPexpect: raise ExceptionPexpect("Pexpect error.") + + +def modify_pg_hba(node): + """ + Description: + Add trust authentication for user postgres. Need for add new role and set grant. + :param node: + :return None: + """ + hba_conf = os.path.join(node.data_dir, "pg_hba.conf") + with open(hba_conf, 'r+') as fio: + data = fio.read() + fio.seek(0) + fio.write('host\tall\tpostgres\t127.0.0.1/0\ttrust\n' + data) + + +def create_pgpass(path, line): + with open(path, 'w') as passfile: + # host:port:db:username:password + passfile.write(line) + os.chmod(path, 0o600)