1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-05 13:20:31 +02:00

fix auth_test

This commit is contained in:
sfalkon 2017-11-30 13:41:26 +03:00
parent bdeff4029d
commit 462ce33f45

View File

@ -6,11 +6,10 @@ import signal
try:
import pexpect
except:
unittest.TestCase.fail("Error: The module pexpect not found")
pass
from .helpers.ptrack_helpers import ProbackupTest, ProbackupException
from testgres import QueryException
from testgres import StartNodeException, configure_testgres
module_name = 'auth_test'
@ -27,6 +26,7 @@ class AuthTest(unittest.TestCase):
cls.pb = ProbackupTest()
cls.backup_dir = os.path.join(cls.pb.tmp_path, module_name, 'backup')
configure_testgres(cache_pg_config=False, cache_initdb=False)
cls.node = cls.pb.make_simple_node(
base_dir="{}/node".format(module_name),
set_replication=True,
@ -41,11 +41,26 @@ class AuthTest(unittest.TestCase):
cls.pb.init_pb(cls.backup_dir)
cls.pb.add_instance(cls.backup_dir, cls.node.name, cls.node)
cls.pb.set_archiving(cls.backup_dir, cls.node.name, cls.node)
cls.node.start()
try:
add_backup_user(cls.node)
except QueryException:
assert False, "Query error. The backup user cannot be added."
cls.node.start()
except StartNodeException:
raise unittest.skip("Node hasn't started")
cls.node.safe_psql("postgres",
"CREATE ROLE backup WITH LOGIN PASSWORD 'password'; \
GRANT USAGE ON SCHEMA pg_catalog TO backup; \
GRANT EXECUTE ON FUNCTION current_setting(text) TO backup; \
GRANT EXECUTE ON FUNCTION pg_is_in_recovery() TO backup; \
GRANT EXECUTE ON FUNCTION pg_start_backup(text, boolean, boolean) TO backup; \
GRANT EXECUTE ON FUNCTION pg_stop_backup() TO backup; \
GRANT EXECUTE ON FUNCTION pg_stop_backup(boolean) TO backup; \
GRANT EXECUTE ON FUNCTION pg_create_restore_point(text) TO backup; \
GRANT EXECUTE ON FUNCTION pg_switch_xlog() TO backup; \
GRANT EXECUTE ON FUNCTION txid_current() TO backup; \
GRANT EXECUTE ON FUNCTION txid_current_snapshot() TO backup; \
GRANT EXECUTE ON FUNCTION txid_snapshot_xmax(txid_snapshot) TO backup; \
GRANT EXECUTE ON FUNCTION pg_ptrack_clear() TO backup; \
GRANT EXECUTE ON FUNCTION pg_ptrack_get_and_clear(oid, oid) TO backup;")
@classmethod
def tearDownClass(cls):
@ -54,28 +69,52 @@ class AuthTest(unittest.TestCase):
def setUp(self):
self.cmd = [self.pb.probackup_path, 'backup',
'-B', self.backup_dir,
'--instance', self.node.name,
'-h', '127.0.0.1',
'-p', str(self.node.port),
'-U', 'backup',
'-b', 'FULL'
]
'-B', self.backup_dir,
'--instance', self.node.name,
'-h', '127.0.0.1',
'-p', str(self.node.port),
'-U', 'backup',
'-b', 'FULL'
]
def tearDown(self):
pass
def test_empty_password(self):
print(run_pb_pexpect(self.cmd, '\0\r\n'))
try:
self.assertIn("ERROR: no password supplied",
"".join(map(lambda x: x.decode("utf-8"),
run_pb_with_auth(self.cmd, '\0\r\n'))
)
)
except (pexpect.TIMEOUT, pexpect.ExceptionPexpect) as e:
self.fail(e.value)
def test_wrong_password(self):
print(run_pb_pexpect(self.cmd, 'wrong_password\r\n'))
try:
self.assertIn("password authentication failed",
"".join(map(lambda x: x.decode("utf-8"),
run_pb_with_auth(self.cmd, 'wrong_password\r\n'))
)
)
except (pexpect.TIMEOUT, pexpect.ExceptionPexpect) as e:
self.fail(e.value)
def test_right_password(self):
print(run_pb_pexpect(self.cmd, 'password\r\n'))
try:
self.assertIn("complited",
"".join(map(lambda x: x.decode("utf-8"),
run_pb_with_auth(self.cmd, 'password\r\n'))
)
)
except (pexpect.TIMEOUT, pexpect.ExceptionPexpect) as e:
self.fail(e.value)
def test_ctrl_c_event(self):
print(run_pb_pexpect(self.cmd, kill=True))
try:
print(run_pb_with_auth(self.cmd, kill=True))
except pexpect.TIMEOUT:
self.fail("Error: CTRL+C event ignored")
def modify_pg_hba(node):
@ -86,40 +125,18 @@ def modify_pg_hba(node):
fio.write('host\tall\tpostgres\t127.0.0.1/0\ttrust\n' + data)
def run_pb_pexpect(cmd, password=None, kill=False):
def run_pb_with_auth(cmd, password=None, kill=False):
try:
with pexpect.spawn(" ".join(cmd)) as probackup:
result = probackup.expect("Password .*:")
result = probackup.expect("Password for user .*:", 5)
if kill:
try:
probackup.kill(signal.SIGINT)
except TIMEOUT:
raise ProbackupException("Error: Timeout exepired", " ".join(cmd))
probackup.kill(signal.SIGINT)
elif result == 0:
probackup.sendline(password)
return probackup.readlines()
else:
raise ProbackupTest("Error: runtime error", " ".join(cmd))
except ProbackupTest:
raise ProbackupException("Error: pexpect error", " ".join(cmd))
def add_backup_user(node):
query = """
CREATE ROLE backup WITH LOGIN PASSWORD 'password';
GRANT USAGE ON SCHEMA pg_catalog TO backup;
GRANT EXECUTE ON FUNCTION current_setting(text) TO backup;
GRANT EXECUTE ON FUNCTION pg_is_in_recovery() TO backup;
GRANT EXECUTE ON FUNCTION pg_start_backup(text, boolean, boolean) TO backup;
GRANT EXECUTE ON FUNCTION pg_stop_backup() TO backup;
GRANT EXECUTE ON FUNCTION pg_stop_backup(boolean) TO backup;
GRANT EXECUTE ON FUNCTION pg_create_restore_point(text) TO backup;
GRANT EXECUTE ON FUNCTION pg_switch_xlog() TO backup;
GRANT EXECUTE ON FUNCTION txid_current() TO backup;
GRANT EXECUTE ON FUNCTION txid_current_snapshot() TO backup;
GRANT EXECUTE ON FUNCTION txid_snapshot_xmax(txid_snapshot) TO backup;
GRANT EXECUTE ON FUNCTION pg_ptrack_clear() TO backup;
GRANT EXECUTE ON FUNCTION pg_ptrack_get_and_clear(oid, oid) TO backup;
"""
node.safe_psql("postgres", query)
raise pexpect.TIMEOUT("")
except pexpect.TIMEOUT:
raise pexpect.TIMEOUT("Timeout error.")
except pexpect.ExceptionPexpect:
raise pexpect.ExceptionPexpect("Pexpect error.")