import os from os import path, listdir import subprocess import shutil import six from testgres import get_new_node def dir_files(base_dir): out_list = [] for dir_name, subdir_list, file_list in os.walk(base_dir): if dir_name != base_dir: out_list.append(path.relpath(dir_name, base_dir)) for fname in file_list: out_list.append(path.relpath(path.join(dir_name, fname), base_dir)) out_list.sort() return out_list class ShowBackup(object): def __init__(self, split_line): self.counter = 0 self.id = self.get_inc(split_line) # TODO: parse to datetime if len(split_line) == 12: self.recovery_time = "%s %s" % (self.get_inc(split_line), self.get_inc(split_line)) # if recovery time is '----' else: self.recovery_time = self.get_inc(split_line) self.mode = self.get_inc(split_line) self.cur_tli = self.get_inc(split_line) # slash self.counter += 1 self.parent_tli = self.get_inc(split_line) # TODO: parse to interval self.time = self.get_inc(split_line) # TODO: maybe rename to size? self.data = self.get_inc(split_line) self.start_lsn = self.get_inc(split_line) self.stop_lsn = self.get_inc(split_line) self.status = self.get_inc(split_line) def get_inc(self, split_line): self.counter += 1 return split_line[self.counter - 1] class ProbackupTest(object): def __init__(self, *args, **kwargs): super(ProbackupTest, self).__init__(*args, **kwargs) self.test_env = os.environ.copy() envs_list = [ "LANGUAGE", "LC_ALL", "PGCONNECT_TIMEOUT", "PGDATA", "PGDATABASE", "PGHOSTADDR", "PGREQUIRESSL", "PGSERVICE", "PGSSLMODE", "PGUSER", "PGPORT", "PGHOST" ] for e in envs_list: try: del self.test_env[e] except: pass self.test_env["LC_MESSAGES"] = "C" self.test_env["LC_TIME"] = "C" self.dir_path = path.dirname(os.path.realpath(__file__)) try: os.makedirs(path.join(self.dir_path, "tmp_dirs")) except: pass self.probackup_path = os.path.abspath(path.join( self.dir_path, "../pg_probackup" )) def arcwal_dir(self, node): return "%s/backup/wal" % node.base_dir def backup_dir(self, node): return os.path.abspath("%s/backup" % node.base_dir) def make_bnode(self, base_dir=None, allows_streaming=False, options={}): real_base_dir = path.join(self.dir_path, base_dir) shutil.rmtree(real_base_dir, ignore_errors=True) node = get_new_node('test', base_dir=real_base_dir) node.init(allows_streaming=allows_streaming) if not allows_streaming: node.append_conf("postgresql.conf", "wal_level = hot_standby") node.append_conf("postgresql.conf", "archive_mode = on") node.append_conf( "postgresql.conf", """archive_command = 'cp "%%p" "%s/%%f"'""" % os.path.abspath(self.arcwal_dir(node)) ) for key, value in six.iteritems(options): node.append_conf("postgresql.conf", "%s = %s" % (key, value)) return node def make_bnode_replica(self, root_node, base_dir=None, options={}): real_base_dir = path.join(self.dir_path, base_dir) shutil.rmtree(real_base_dir, ignore_errors=True) root_node.backup("basebackup") replica = get_new_node("replica", base_dir=real_base_dir) # replica.init_from_backup(root_node, "data_replica", has_streaming=True) # Move data from backup backup_path = os.path.join(root_node.base_dir, "basebackup") shutil.move(backup_path, replica.data_dir) os.chmod(replica.data_dir, 0o0700) # Change port in config file replica.append_conf( "postgresql.conf", "port = {}".format(replica.port) ) # Enable streaming replica.enable_streaming(root_node) for key, value in six.iteritems(options): replica.append_conf("postgresql.conf", "%s = %s" % (key, value)) return replica def run_pb(self, command): try: return subprocess.check_output( [self.probackup_path] + command, stderr=subprocess.STDOUT, env=self.test_env ) except subprocess.CalledProcessError as err: return err.output def init_pb(self, node): return self.run_pb([ "init", "-B", self.backup_dir(node), "-D", node.data_dir ]) def clean_pb(self, node): shutil.rmtree(self.backup_dir(node), ignore_errors=True) def backup_pb(self, node, backup_type="full", options=[]): cmd_list = [ "backup", "-D", node.data_dir, "-B", self.backup_dir(node), "-p", "%i" % node.port, "-d", "postgres" ] if backup_type: cmd_list += ["-b", backup_type] return self.run_pb(cmd_list + options) def backup_pb_proc(self, node, backup_dir, backup_type="full", stdout=None, stderr=None, options=[]): cmd_list = [ self.probackup_path, "backup", "-D", node.data_dir, "-B", backup_dir, "-p", "%i" % (node.port), "-d", "postgres" ] if backup_type: cmd_list += ["-b", backup_type] proc = subprocess.Popen( cmd_list + options, stdout=stdout, stderr=stderr ) return proc def restore_pb(self, node, id=None, options=[]): cmd_list = [ "-D", node.data_dir, "-B", self.backup_dir(node), "restore" ] if id: cmd_list.append(id) # print(cmd_list) return self.run_pb(cmd_list + options) def show_pb(self, node, id=None, options=[], as_text=False): cmd_list = [ "-B", self.backup_dir(node), "show", ] if id: cmd_list += [id] # print(cmd_list) if as_text: return self.run_pb(options + cmd_list) elif id is None: return [ShowBackup(line.split()) for line in self.run_pb(options + cmd_list).splitlines()[3:]] else: return dict([ line.split(six.b("=")) for line in self.run_pb(options + cmd_list).splitlines() if line[0] != six.b("#")[0] ]) def validate_pb(self, node, id=None, options=[]): cmd_list = [ "-B", self.backup_dir(node), "validate", ] if id: cmd_list += [id] # print(cmd_list) return self.run_pb(options + cmd_list) def delete_pb(self, node, id=None, options=[]): cmd_list = [ "-B", self.backup_dir(node), "delete", ] if id: cmd_list += [id] # print(cmd_list) return self.run_pb(options + cmd_list) def retention_purge_pb(self, node, options=[]): cmd_list = [ "-B", self.backup_dir(node), "retention", "purge", ] return self.run_pb(options + cmd_list) def retention_show(self, node, options=[]): cmd_list = [ "-B", self.backup_dir(node), "retention", "show", ] return self.run_pb(options + cmd_list) def get_control_data(self, node): pg_controldata = node.get_bin_path("pg_controldata") out_data = {} lines = subprocess.check_output( [pg_controldata] + ["-D", node.data_dir], stderr=subprocess.STDOUT, env=self.test_env ).splitlines() for l in lines: key, value = l.split(b":", maxsplit=1) out_data[key.strip()] = value.strip() return out_data def get_recovery_conf(self, node): out_dict = {} with open(path.join(node.data_dir, "recovery.conf"), "r") as recovery_conf: for line in recovery_conf: try: key, value = line.split("=") except: continue out_dict[key.strip()] = value.strip(" '").replace("'\n", "") return out_dict def wrong_wal_clean(self, node, wal_size): wals_dir = path.join(self.backup_dir(node), "wal") wals = [f for f in listdir(wals_dir) if path.isfile(path.join(wals_dir, f))] wals.sort() file_path = path.join(wals_dir, wals[-1]) if path.getsize(file_path) != wal_size: os.remove(file_path) def guc_wal_segment_size(self, node): var = node.execute("postgres", "select setting from pg_settings where name = 'wal_segment_size'") return int(var[0][0]) * self.guc_wal_block_size(node) def guc_wal_block_size(self, node): var = node.execute("postgres", "select setting from pg_settings where name = 'wal_block_size'") return int(var[0][0])