mirror of
https://github.com/postgrespro/pg_probackup.git
synced 2025-01-20 11:34:51 +02:00
fix auth_test
This commit is contained in:
parent
942b2df4a9
commit
45d38d6130
@ -43,7 +43,6 @@ class AuthTest(unittest.TestCase):
|
||||
)
|
||||
modify_pg_hba(cls.node)
|
||||
|
||||
cls.backup_dir = os.path.join(tempfile.tempdir, "backups")
|
||||
cls.pb.init_pb(cls.backup_dir)
|
||||
cls.pb.add_instance(cls.backup_dir, cls.node.name, cls.node)
|
||||
cls.pb.set_archiving(cls.backup_dir, cls.node.name, cls.node)
|
||||
@ -86,7 +85,7 @@ class AuthTest(unittest.TestCase):
|
||||
self.pgpass_file = os.path.join(os.path.expanduser('~'), '.pgpass')
|
||||
try:
|
||||
os.remove(self.pgpass_file)
|
||||
except FileNotFoundError:
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
@ -96,9 +95,7 @@ class AuthTest(unittest.TestCase):
|
||||
""" Test case: PGPB_AUTH03 - zero password length """
|
||||
try:
|
||||
self.assertIn("ERROR: no password supplied",
|
||||
"".join(map(lambda x: x.decode("utf-8"),
|
||||
run_pb_with_auth([self.pb.probackup_path] + self.cmd, '\0\r\n'))
|
||||
)
|
||||
str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, '\0\r\n'))
|
||||
)
|
||||
except (TIMEOUT, ExceptionPexpect) as e:
|
||||
self.fail(e.value)
|
||||
@ -107,9 +104,7 @@ class AuthTest(unittest.TestCase):
|
||||
""" Test case: PGPB_AUTH04 - incorrect password """
|
||||
try:
|
||||
self.assertIn("password authentication failed",
|
||||
"".join(map(lambda x: x.decode("utf-8"),
|
||||
run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'wrong_password\r\n'))
|
||||
)
|
||||
str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'wrong_password\r\n'))
|
||||
)
|
||||
except (TIMEOUT, ExceptionPexpect) as e:
|
||||
self.fail(e.value)
|
||||
@ -118,9 +113,7 @@ class AuthTest(unittest.TestCase):
|
||||
""" Test case: PGPB_AUTH01 - correct password """
|
||||
try:
|
||||
self.assertIn("completed",
|
||||
"".join(map(lambda x: x.decode("utf-8"),
|
||||
run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'password\r\n'))
|
||||
)
|
||||
str(run_pb_with_auth([self.pb.probackup_path] + self.cmd, 'password\r\n'))
|
||||
)
|
||||
except (TIMEOUT, ExceptionPexpect) as e:
|
||||
self.fail(e.value)
|
||||
@ -161,7 +154,7 @@ class AuthTest(unittest.TestCase):
|
||||
def test_pgpassword(self):
|
||||
line = ":".join(['127.0.0.1', str(self.node.port), 'postgres', 'backup', 'wrong_password'])
|
||||
create_pgpass(self.pgpass_file, line)
|
||||
os.environ["PGPASSWORD"] = 'password'
|
||||
os.environ["PGPASSWORD"] = "password"
|
||||
try:
|
||||
self.assertEqual(
|
||||
"OK",
|
||||
@ -174,15 +167,16 @@ class AuthTest(unittest.TestCase):
|
||||
|
||||
def run_pb_with_auth(cmd, password=None, kill=False):
|
||||
try:
|
||||
with spawn(" ".join(cmd), timeout=10) as probackup:
|
||||
with spawn(" ".join(cmd), encoding='utf-8', timeout=10) as probackup:
|
||||
result = probackup.expect("Password for user .*:", 5)
|
||||
if kill:
|
||||
probackup.kill(signal.SIGINT)
|
||||
elif result == 0:
|
||||
probackup.sendline(password)
|
||||
return probackup.readlines()
|
||||
probackup.expect(EOF)
|
||||
return probackup.before
|
||||
else:
|
||||
raise TIMEOUT("")
|
||||
raise ExceptionPexpect("Other pexpect errors.")
|
||||
except TIMEOUT:
|
||||
raise TIMEOUT("Timeout error.")
|
||||
except ExceptionPexpect:
|
||||
|
Loading…
x
Reference in New Issue
Block a user