mirror of
https://github.com/postgrespro/pg_probackup.git
synced 2025-01-06 13:26:28 +02:00
Merge branch 'master' of git.postgrespro.ru:pgpro-dev/pg_probackup
This commit is contained in:
commit
39519f74c7
@ -1,15 +1,14 @@
|
||||
"""
|
||||
Need to install pexpect.
|
||||
pip install pexpect
|
||||
The Test suite check behavior of pg_probackup utility, if password is required for connection to PostgreSQL instance.
|
||||
- https://confluence.postgrespro.ru/pages/viewpage.action?pageId=16777522
|
||||
"""
|
||||
|
||||
import os
|
||||
import unittest
|
||||
import tempfile
|
||||
import signal
|
||||
|
||||
from .helpers.ptrack_helpers import ProbackupTest
|
||||
from testgres import StartNodeException, configure_testgres
|
||||
from .helpers.ptrack_helpers import ProbackupTest, ProbackupException
|
||||
from testgres import ClusterException as StartNodeException
|
||||
|
||||
module_name = 'auth_test'
|
||||
skip_test = False
|
||||
@ -33,7 +32,6 @@ class AuthTest(unittest.TestCase):
|
||||
cls.pb = ProbackupTest()
|
||||
cls.backup_dir = os.path.join(cls.pb.tmp_path, module_name, 'backup')
|
||||
|
||||
configure_testgres(cache_pg_config=False, cache_initdb=False)
|
||||
cls.node = cls.pb.make_simple_node(
|
||||
base_dir="{}/node".format(module_name),
|
||||
set_replication=True,
|
||||
@ -44,7 +42,6 @@ class AuthTest(unittest.TestCase):
|
||||
)
|
||||
modify_pg_hba(cls.node)
|
||||
|
||||
cls.backup_dir = os.path.join(tempfile.tempdir, "backups")
|
||||
cls.pb.init_pb(cls.backup_dir)
|
||||
cls.pb.add_instance(cls.backup_dir, cls.node.name, cls.node)
|
||||
cls.pb.set_archiving(cls.backup_dir, cls.node.name, cls.node)
|
||||
@ -68,6 +65,7 @@ class AuthTest(unittest.TestCase):
|
||||
GRANT EXECUTE ON FUNCTION txid_snapshot_xmax(txid_snapshot) TO backup; \
|
||||
GRANT EXECUTE ON FUNCTION pg_ptrack_clear() TO backup; \
|
||||
GRANT EXECUTE ON FUNCTION pg_ptrack_get_and_clear(oid, oid) TO backup;")
|
||||
cls.pgpass_file = os.path.join(os.path.expanduser('~'), '.pgpass')
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
@ -76,7 +74,7 @@ class AuthTest(unittest.TestCase):
|
||||
|
||||
@unittest.skipIf(skip_test, "Module pexpect isn't installed. You need to install it.")
|
||||
def setUp(self):
|
||||
self.cmd = [self.pb.probackup_path, 'backup',
|
||||
self.cmd = ['backup',
|
||||
'-B', self.backup_dir,
|
||||
'--instance', self.node.name,
|
||||
'-h', '127.0.0.1',
|
||||
@ -86,51 +84,115 @@ class AuthTest(unittest.TestCase):
|
||||
]
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
if "PGPASSWORD" in self.pb.test_env.keys():
|
||||
del self.pb.test_env["PGPASSWORD"]
|
||||
|
||||
if "PGPASSWORD" in self.pb.test_env.keys():
|
||||
del self.pb.test_env["PGPASSFILE"]
|
||||
|
||||
try:
|
||||
os.remove(self.pgpass_file)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def test_empty_password(self):
|
||||
""" Test case: PGPB_AUTH03 - zero password length """
|
||||
try:
|
||||
self.assertIn("ERROR: no password supplied",
|
||||
"".join(map(lambda x: x.decode("utf-8"),
|
||||
run_pb_with_auth(self.cmd, '\0\r\n'))
|
||||
)
|
||||
str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, '\0\r\n'))
|
||||
)
|
||||
except (TIMEOUT, ExceptionPexpect) as e:
|
||||
self.fail(e.value)
|
||||
|
||||
def test_wrong_password(self):
|
||||
""" Test case: PGPB_AUTH04 - incorrect password """
|
||||
try:
|
||||
self.assertIn("password authentication failed",
|
||||
"".join(map(lambda x: x.decode("utf-8"),
|
||||
run_pb_with_auth(self.cmd, 'wrong_password\r\n'))
|
||||
)
|
||||
str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'wrong_password\r\n'))
|
||||
)
|
||||
except (TIMEOUT, ExceptionPexpect) as e:
|
||||
self.fail(e.value)
|
||||
|
||||
def test_right_password(self):
|
||||
""" Test case: PGPB_AUTH01 - correct password """
|
||||
try:
|
||||
self.assertIn("completed",
|
||||
"".join(map(lambda x: x.decode("utf-8"),
|
||||
run_pb_with_auth(self.cmd, 'password\r\n'))
|
||||
)
|
||||
str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'password\r\n'))
|
||||
)
|
||||
except (TIMEOUT, ExceptionPexpect) as e:
|
||||
self.fail(e.value)
|
||||
|
||||
def test_right_password_and_wrong_pgpass(self):
|
||||
""" Test case: PGPB_AUTH05 - correct password and incorrect .pgpass"""
|
||||
line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'wrong_password'])
|
||||
create_pgpass(self.pgpass_file, line)
|
||||
try:
|
||||
self.assertIn("completed",
|
||||
str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'password\r\n'))
|
||||
)
|
||||
except (TIMEOUT, ExceptionPexpect) as e:
|
||||
self.fail(e.value)
|
||||
|
||||
def test_ctrl_c_event(self):
|
||||
""" Test case: PGPB_AUTH02 - send interrupt signal """
|
||||
try:
|
||||
run_pb_with_auth(self.cmd, kill=True)
|
||||
run_pb_with_auth([self.pb.probackup_path] + self.cmd, kill=True)
|
||||
except TIMEOUT:
|
||||
self.fail("Error: CTRL+C event ignored")
|
||||
|
||||
def test_pgpassfile_env(self):
|
||||
""" Test case: PGPB_AUTH06 - set environment var PGPASSFILE """
|
||||
path = os.path.join(self.pb.tmp_path, module_name, 'pgpass.conf')
|
||||
line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'password'])
|
||||
create_pgpass(path, line)
|
||||
self.pb.test_env["PGPASSFILE"] = path
|
||||
try:
|
||||
self.assertEqual(
|
||||
"OK",
|
||||
self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"],
|
||||
"ERROR: Full backup status is not valid."
|
||||
)
|
||||
except ProbackupException as e:
|
||||
self.fail(e)
|
||||
|
||||
def modify_pg_hba(node):
|
||||
hba_conf = os.path.join(node.data_dir, "pg_hba.conf")
|
||||
with open(hba_conf, 'r+') as fio:
|
||||
data = fio.read()
|
||||
fio.seek(0)
|
||||
fio.write('host\tall\tpostgres\t127.0.0.1/0\ttrust\n' + data)
|
||||
def test_pgpass(self):
|
||||
""" Test case: PGPB_AUTH07 - Create file .pgpass in home dir. """
|
||||
line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'password'])
|
||||
create_pgpass(self.pgpass_file, line)
|
||||
try:
|
||||
self.assertEqual(
|
||||
"OK",
|
||||
self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"],
|
||||
"ERROR: Full backup status is not valid."
|
||||
)
|
||||
except ProbackupException as e:
|
||||
self.fail(e)
|
||||
|
||||
def test_pgpassword(self):
|
||||
""" Test case: PGPB_AUTH08 - set environment var PGPASSWORD """
|
||||
self.pb.test_env["PGPASSWORD"] = "password"
|
||||
try:
|
||||
self.assertEqual(
|
||||
"OK",
|
||||
self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"],
|
||||
"ERROR: Full backup status is not valid."
|
||||
)
|
||||
except ProbackupException as e:
|
||||
self.fail(e)
|
||||
|
||||
def test_pgpassword_and_wrong_pgpass(self):
|
||||
""" Test case: PGPB_AUTH09 - Check priority between PGPASSWORD and .pgpass file"""
|
||||
line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'wrong_password'])
|
||||
create_pgpass(self.pgpass_file, line)
|
||||
self.pb.test_env["PGPASSWORD"] = "password"
|
||||
try:
|
||||
self.assertEqual(
|
||||
"OK",
|
||||
self.pb.show_pb(self.backup_dir, self.node.name, self.pb.run_pb(self.cmd + ['-w']))["status"],
|
||||
"ERROR: Full backup status is not valid."
|
||||
)
|
||||
except ProbackupException as e:
|
||||
self.fail(e)
|
||||
|
||||
|
||||
def run_pb_with_auth(cmd, password=None, kill=False):
|
||||
@ -141,10 +203,32 @@ def run_pb_with_auth(cmd, password=None, kill=False):
|
||||
probackup.kill(signal.SIGINT)
|
||||
elif result == 0:
|
||||
probackup.sendline(password)
|
||||
return probackup.readlines()
|
||||
probackup.expect(EOF)
|
||||
return probackup.before
|
||||
else:
|
||||
raise TIMEOUT("")
|
||||
raise ExceptionPexpect("Other pexpect errors.")
|
||||
except TIMEOUT:
|
||||
raise TIMEOUT("Timeout error.")
|
||||
except ExceptionPexpect:
|
||||
raise ExceptionPexpect("Pexpect error.")
|
||||
|
||||
|
||||
def modify_pg_hba(node):
|
||||
"""
|
||||
Description:
|
||||
Add trust authentication for user postgres. Need for add new role and set grant.
|
||||
:param node:
|
||||
:return None:
|
||||
"""
|
||||
hba_conf = os.path.join(node.data_dir, "pg_hba.conf")
|
||||
with open(hba_conf, 'r+') as fio:
|
||||
data = fio.read()
|
||||
fio.seek(0)
|
||||
fio.write('host\tall\tpostgres\t127.0.0.1/0\ttrust\n' + data)
|
||||
|
||||
|
||||
def create_pgpass(path, line):
|
||||
with open(path, 'w') as passfile:
|
||||
# host:port:db:username:password
|
||||
passfile.write(line)
|
||||
os.chmod(path, 0o600)
|
||||
|
Loading…
Reference in New Issue
Block a user