diff --git a/src/help.c b/src/help.c index 549a2e2b..e11f398f 100644 --- a/src/help.c +++ b/src/help.c @@ -116,7 +116,7 @@ help_pg_probackup(void) printf(_("\n %s restore -B backup-dir --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [-D pgdata-dir] [-i backup-id] [--progress]\n")); - printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n")); + printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n")); printf(_(" [--timeline=timeline] [-T OLDDIR=NEWDIR]\n")); printf(_(" [--immediate] [--recovery-target-name=target-name]\n")); printf(_(" [--recovery-target-action=pause|promote|shutdown]\n")); @@ -125,7 +125,7 @@ help_pg_probackup(void) printf(_("\n %s validate -B backup-dir [--instance=instance_name]\n"), PROGRAM_NAME); printf(_(" [-i backup-id] [--progress]\n")); - printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n")); + printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n")); printf(_(" [--recovery-target-name=target-name]\n")); printf(_(" [--timeline=timeline]\n")); @@ -265,7 +265,7 @@ help_restore(void) { printf(_("%s restore -B backup-dir --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [-D pgdata-dir] [-i backup-id] [--progress]\n")); - printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n")); + printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n")); printf(_(" [--timeline=timeline] [-T OLDDIR=NEWDIR]\n")); printf(_(" [--immediate] [--recovery-target-name=target-name]\n")); printf(_(" [--recovery-target-action=pause|promote|shutdown]\n")); @@ -280,6 +280,7 @@ help_restore(void) printf(_(" --progress show progress\n")); printf(_(" --time=time time stamp up to which recovery will proceed\n")); printf(_(" --xid=xid transaction ID up to which recovery will proceed\n")); + printf(_(" --lsn=lsn LSN of the write-ahead log location up to which recovery will proceed\n")); printf(_(" --inclusive=boolean whether we stop just after the recovery target\n")); printf(_(" --timeline=timeline recovering into a particular timeline\n")); printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n")); @@ -323,7 +324,7 @@ help_validate(void) { printf(_("%s validate -B backup-dir [--instance=instance_name]\n"), PROGRAM_NAME); printf(_(" [-i backup-id] [--progress]\n")); - printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n")); + printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n")); printf(_(" [--timeline=timeline]\n\n")); printf(_(" -B, --backup-path=backup-path location of the backup storage area\n")); @@ -333,6 +334,7 @@ help_validate(void) printf(_(" --progress show progress\n")); printf(_(" --time=time time stamp up to which recovery will proceed\n")); printf(_(" --xid=xid transaction ID up to which recovery will proceed\n")); + printf(_(" --lsn=lsn LSN of the write-ahead log location up to which recovery will proceed\n")); printf(_(" --inclusive=boolean whether we stop just after the recovery target\n")); printf(_(" --timeline=timeline recovering into a particular timeline\n")); printf(_(" --recovery-target-name=target-name\n")); diff --git a/src/parsexlog.c b/src/parsexlog.c index 927f3605..3e5e8790 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -421,6 +421,7 @@ validate_wal(pgBackup *backup, const char *archivedir, time_t target_time, TransactionId target_xid, + XLogRecPtr target_lsn, TimeLineID tli) { XLogRecPtr startpoint = backup->start_lsn; @@ -472,7 +473,7 @@ validate_wal(pgBackup *backup, * If recovery target is provided check that we can restore backup to a * recovery target time or xid. */ - if (!TransactionIdIsValid(target_xid) && target_time == 0) + if (!TransactionIdIsValid(target_xid) && target_time == 0 && !XRecOffIsValid(target_lsn)) { /* Recovery target is not given so exit */ elog(INFO, "Backup %s WAL segments are valid", backup_id); @@ -498,7 +499,8 @@ validate_wal(pgBackup *backup, last_xid = backup->recovery_xid; if ((TransactionIdIsValid(target_xid) && target_xid == last_xid) - || (target_time != 0 && backup->recovery_time >= target_time)) + || (target_time != 0 && backup->recovery_time >= target_time) + || (XRecOffIsValid(target_lsn) && backup->stop_lsn >= target_lsn)) all_wal = true; startpoint = backup->stop_lsn; @@ -570,6 +572,9 @@ validate_wal(pgBackup *backup, else if (target_time != 0) elog(ERROR, "not enough WAL records to time %s", target_timestamp); + else if (XRecOffIsValid(target_lsn)) + elog(ERROR, "not enough WAL records to lsn %X/%X", + (uint32) (target_lsn >> 32), (uint32) (target_lsn)); } /* clean */ diff --git a/src/pg_probackup.c b/src/pg_probackup.c index d1217e00..fdfdf5aa 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -61,6 +61,7 @@ uint32 replica_timeout = 300; /* default is 300 seconds */ /* restore options */ static char *target_time; static char *target_xid; +static char *target_lsn; static char *target_inclusive; static TimeLineID target_tli; static bool target_immediate; @@ -150,6 +151,7 @@ static pgut_option options[] = { 's', 26, "recovery-target-action", &target_action, SOURCE_CMDLINE }, { 'b', 'R', "restore-as-replica", &restore_as_replica, SOURCE_CMDLINE }, { 'b', 27, "no-validate", &restore_no_validate, SOURCE_CMDLINE }, + { 's', 28, "lsn", &target_lsn, SOURCE_CMDLINE }, /* delete options */ { 'b', 130, "wal", &delete_wal, SOURCE_CMDLINE }, { 'b', 131, "expired", &delete_expired, SOURCE_CMDLINE }, @@ -436,7 +438,7 @@ main(int argc, char *argv[]) { /* parse all recovery target options into recovery_target_options structure */ recovery_target_options = parseRecoveryTargetOptions(target_time, target_xid, - target_inclusive, target_tli, target_immediate, + target_inclusive, target_tli, target_lsn, target_immediate, target_name, target_action, restore_no_validate); } diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 5f37553e..25314f10 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -259,6 +259,10 @@ typedef struct pgRecoveryTarget TransactionId recovery_target_xid; /* add one more field in order to avoid deparsing recovery_target_xid back */ const char *target_xid_string; + bool lsn_specified; + XLogRecPtr recovery_target_lsn; + /* add one more field in order to avoid deparsing recovery_target_lsn back */ + const char *target_lsn_string; TimeLineID recovery_target_tli; bool recovery_target_inclusive; bool inclusive_specified; @@ -400,8 +404,9 @@ extern bool satisfy_recovery_target(const pgBackup *backup, extern parray * readTimeLineHistory_probackup(TimeLineID targetTLI); extern pgRecoveryTarget *parseRecoveryTargetOptions( const char *target_time, const char *target_xid, - const char *target_inclusive, TimeLineID target_tli, bool target_immediate, - const char *target_name, const char *target_action, bool restore_no_validate); + const char *target_inclusive, TimeLineID target_tli, const char* target_lsn, + bool target_immediate, const char *target_name, + const char *target_action, bool restore_no_validate); extern void opt_tablespace_map(pgut_option *opt, const char *arg); @@ -514,6 +519,7 @@ extern void validate_wal(pgBackup *backup, const char *archivedir, time_t target_time, TransactionId target_xid, + XLogRecPtr target_lsn, TimeLineID tli); extern bool read_recovery_info(const char *archivedir, TimeLineID tli, XLogRecPtr start_lsn, XLogRecPtr stop_lsn, @@ -536,6 +542,7 @@ extern long unsigned int base36dec(const char *text); extern uint64 get_system_identifier(char *pgdata); extern uint64 get_remote_system_identifier(PGconn *conn); extern pg_time_t timestamptz_to_time_t(TimestampTz t); +extern int parse_server_version(char *server_version_str); extern void pgBackup_init(pgBackup *backup); /* in status.c */ diff --git a/src/restore.c b/src/restore.c index 5e7f4eb3..02296c2c 100644 --- a/src/restore.c +++ b/src/restore.c @@ -276,7 +276,8 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt, * because it's needed to form the name of xlog file. */ validate_wal(dest_backup, arclog_path, rt->recovery_target_time, - rt->recovery_target_xid, base_full_backup->tli); + rt->recovery_target_xid, rt->recovery_target_lsn, + base_full_backup->tli); /* Set every incremental backup between corrupted backup and nearest FULL backup as orphans */ if (corrupted_backup) @@ -335,6 +336,11 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt, for (i = base_full_backup_index; i >= dest_backup_index; i--) { pgBackup *backup = (pgBackup *) parray_get(backups, i); + + if (rt->lsn_specified && parse_server_version(backup->server_version) < 100000) + elog(ERROR, "Backup %s was created for version %s which doesn't support recovery_target_lsn", + base36enc(dest_backup->start_time), dest_backup->server_version); + restore_backup(backup); } @@ -838,6 +844,9 @@ create_recovery_conf(time_t backup_id, if (rt->xid_specified) fprintf(fp, "recovery_target_xid = '%s'\n", rt->target_xid_string); + if (rt->recovery_target_lsn) + fprintf(fp, "recovery_target_lsn = '%s'\n", rt->target_lsn_string); + if (rt->recovery_target_immediate) fprintf(fp, "recovery_target = 'immediate'\n"); @@ -982,6 +991,9 @@ satisfy_recovery_target(const pgBackup *backup, const pgRecoveryTarget *rt) if (rt->time_specified) return backup->recovery_time <= rt->recovery_target_time; + if (rt->lsn_specified) + return backup->stop_lsn <= rt->recovery_target_lsn; + return true; } @@ -1010,6 +1022,7 @@ parseRecoveryTargetOptions(const char *target_time, const char *target_xid, const char *target_inclusive, TimeLineID target_tli, + const char *target_lsn, bool target_immediate, const char *target_name, const char *target_action, @@ -1018,6 +1031,7 @@ parseRecoveryTargetOptions(const char *target_time, time_t dummy_time; TransactionId dummy_xid; bool dummy_bool; + XLogRecPtr dummy_lsn; /* * count the number of the mutually exclusive options which may specify * recovery target. If final value > 1, throw an error. @@ -1029,10 +1043,13 @@ parseRecoveryTargetOptions(const char *target_time, rt->time_specified = false; rt->xid_specified = false; rt->inclusive_specified = false; + rt->lsn_specified = false; rt->recovery_target_time = 0; rt->recovery_target_xid = 0; + rt->recovery_target_lsn = InvalidXLogRecPtr; rt->target_time_string = NULL; rt->target_xid_string = NULL; + rt->target_lsn_string = NULL; rt->recovery_target_inclusive = false; rt->recovery_target_tli = 0; rt->recovery_target_immediate = false; @@ -1069,6 +1086,17 @@ parseRecoveryTargetOptions(const char *target_time, elog(ERROR, "Invalid value of --xid option %s", target_xid); } + if (target_lsn) + { + recovery_target_specified++; + rt->lsn_specified = true; + rt->target_lsn_string = target_lsn; + if (parse_lsn(target_lsn, &dummy_lsn)) + rt->recovery_target_lsn = dummy_lsn; + else + elog(ERROR, "Invalid value of --lsn option %s", target_lsn); + } + if (target_inclusive) { rt->inclusive_specified = true; @@ -1113,10 +1141,10 @@ parseRecoveryTargetOptions(const char *target_time, /* More than one mutually exclusive option was defined. */ if (recovery_target_specified > 1) - elog(ERROR, "At most one of --immediate, --target-name, --time, or --xid can be used"); + elog(ERROR, "At most one of --immediate, --target-name, --time, --xid, or --lsn can be used"); /* If none of the options is defined, '--inclusive' option is meaningless */ - if (!(rt->xid_specified || rt->time_specified) && rt->recovery_target_inclusive) + if (!(rt->xid_specified || rt->time_specified || rt->lsn_specified) && rt->recovery_target_inclusive) elog(ERROR, "--inclusive option applies when either --time or --xid is specified"); return rt; diff --git a/src/util.c b/src/util.c index 051f4d54..0df94da6 100644 --- a/src/util.c +++ b/src/util.c @@ -235,6 +235,35 @@ timestamptz_to_time_t(TimestampTz t) return result; } +/* Parse string representation of the server version */ +int +parse_server_version(char *server_version_str) +{ + int nfields; + int result = 0; + int major_version = 0; + int minor_version = 0; + + nfields = sscanf(server_version_str, "%d.%d", &major_version, &minor_version); + if (nfields == 2) + { + /* Server version lower than 10 */ + if (major_version > 10) + elog(ERROR, "Server version format doesn't match major version %d", major_version); + result = major_version * 10000 + minor_version * 100; + } + else if (nfields == 1) + { + if (major_version < 10) + elog(ERROR, "Server version format doesn't match major version %d", major_version); + result = major_version * 10000; + } + else + elog(ERROR, "Unknown server version format"); + + return result; +} + const char * status2str(BackupStatus status) { diff --git a/src/utils/pgut.c b/src/utils/pgut.c index 6948a0c1..c062aec8 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -31,6 +31,7 @@ #define MAX_TZDISP_HOUR 15 /* maximum allowed hour part */ #define SECS_PER_MINUTE 60 #define MINS_PER_HOUR 60 +#define MAXPG_LSNCOMPONENT 8 const char *PROGRAM_NAME = NULL; @@ -983,6 +984,32 @@ parse_int(const char *value, int *result, int flags, const char **hintmsg) return true; } +bool +parse_lsn(const char *value, XLogRecPtr *result) +{ + uint32 xlogid; + uint32 xrecoff; + int len1; + int len2; + + len1 = strspn(value, "0123456789abcdefABCDEF"); + if (len1 < 1 || len1 > MAXPG_LSNCOMPONENT || value[len1] != '/') + elog(ERROR, "invalid LSN \"%s\"", value); + len2 = strspn(value + len1 + 1, "0123456789abcdefABCDEF"); + if (len2 < 1 || len2 > MAXPG_LSNCOMPONENT || value[len1 + 1 + len2] != '\0') + elog(ERROR, "invalid LSN \"%s\"", value); + + if (sscanf(value, "%X/%X", &xlogid, &xrecoff) == 2) + *result = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff; + else + { + elog(ERROR, "invalid LSN \"%s\"", value); + return false; + } + + return true; +} + static char * longopts_to_optstring(const struct option opts[], const size_t len) { diff --git a/src/utils/pgut.h b/src/utils/pgut.h index 5e447a07..0947fb7f 100644 --- a/src/utils/pgut.h +++ b/src/utils/pgut.h @@ -17,6 +17,7 @@ #include #include +#include "access/xlogdefs.h" #include "logger.h" #if !defined(C_H) && !defined(__cplusplus) @@ -207,6 +208,7 @@ extern bool parse_uint64(const char *value, uint64 *result, int flags); extern bool parse_time(const char *value, time_t *result, bool utc_default); extern bool parse_int(const char *value, int *result, int flags, const char **hintmsg); +extern bool parse_lsn(const char *value, XLogRecPtr *result); extern void convert_from_base_unit(int64 base_value, int base_unit, int64 *value, const char **unit); diff --git a/src/validate.c b/src/validate.c index 61120d91..7130b1a5 100644 --- a/src/validate.c +++ b/src/validate.c @@ -328,7 +328,7 @@ do_validate_instance(void) base36enc(current_backup->start_time)); /* Validate corresponding WAL files */ validate_wal(current_backup, arclog_path, 0, - 0, base_full_backup->tli); + 0, 0, base_full_backup->tli); } /* Mark every incremental backup between corrupted backup and nearest FULL backup as orphans */ if (current_backup->status == BACKUP_STATUS_CORRUPT) diff --git a/tests/cfs_restore.py b/tests/cfs_restore.py index 2004cb14..73553a30 100644 --- a/tests/cfs_restore.py +++ b/tests/cfs_restore.py @@ -4,7 +4,7 @@ restore pg_probackup restore -B backupdir --instance instance_name [-D datadir] - [ -i backup_id | [{--time=time | --xid=xid } [--inclusive=boolean]]][--timeline=timeline] [-T OLDDIR=NEWDIR] + [ -i backup_id | [{--time=time | --xid=xid | --lsn=lsn } [--inclusive=boolean]]][--timeline=timeline] [-T OLDDIR=NEWDIR] [-j num_threads] [--progress] [-q] [-v] """ diff --git a/tests/expected/option_help.out b/tests/expected/option_help.out index f9c59e0e..d60d4d08 100644 --- a/tests/expected/option_help.out +++ b/tests/expected/option_help.out @@ -53,7 +53,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database. pg_probackup restore -B backup-dir --instance=instance_name [-D pgdata-dir] [-i backup-id] [--progress] - [--time=time|--xid=xid [--inclusive=boolean]] + [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]] [--timeline=timeline] [-T OLDDIR=NEWDIR] [--immediate] [--recovery-target-name=target-name] [--recovery-target-action=pause|promote|shutdown] @@ -62,7 +62,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database. pg_probackup validate -B backup-dir [--instance=instance_name] [-i backup-id] [--progress] - [--time=time|--xid=xid [--inclusive=boolean]] + [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]] [--recovery-target-name=target-name] [--timeline=timeline] diff --git a/tests/restore_test.py b/tests/restore_test.py index 862c8662..c33a1e29 100644 --- a/tests/restore_test.py +++ b/tests/restore_test.py @@ -353,6 +353,153 @@ class RestoreTest(ProbackupTest, unittest.TestCase): # Clean after yourself self.del_test_dir(module_name, fname) + # @unittest.skip("skip") + def test_restore_to_lsn_inclusive(self): + """recovery to target lsn""" + fname = self.id().split('.')[3] + node = self.make_simple_node( + base_dir="{0}/{1}/node".format(module_name, fname), + initdb_params=['--data-checksums'], + pg_options={'wal_level': 'replica'} + ) + + if self.get_version(node) < self.version_to_num('10.0'): + self.del_test_dir(module_name, fname) + return + + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node) + node.start() + + node.pgbench_init(scale=2) + with node.connect("postgres") as con: + con.execute("CREATE TABLE tbl0005 (a int)") + con.commit() + + backup_id = self.backup_node(backup_dir, 'node', node) + + pgbench = node.pgbench( + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + pgbench.wait() + pgbench.stdout.close() + + before = node.safe_psql("postgres", "SELECT * FROM pgbench_branches") + with node.connect("postgres") as con: + con.execute("INSERT INTO tbl0005 VALUES (1)") + con.commit() + res = con.execute("SELECT pg_current_wal_lsn()") + con.commit() + con.execute("INSERT INTO tbl0005 VALUES (2)") + con.commit() + xlogid, xrecoff = res[0][0].split('/') + xrecoff = hex(int(xrecoff, 16) + 1)[2:] + target_lsn = "{0}/{1}".format(xlogid, xrecoff) + + pgbench = node.pgbench( + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + pgbench.wait() + pgbench.stdout.close() + + node.stop() + node.cleanup() + + self.assertIn( + "INFO: Restore of backup {0} completed.".format(backup_id), + self.restore_node( + backup_dir, 'node', node, + options=[ + "-j", "4", '--lsn={0}'.format(target_lsn), + "--recovery-target-action=promote"] + ), + '\n Unexpected Error Message: {0}\n CMD: {1}'.format( + repr(self.output), self.cmd)) + + node.slow_start() + + after = node.safe_psql("postgres", "SELECT * FROM pgbench_branches") + self.assertEqual(before, after) + self.assertEqual( + len(node.execute("postgres", "SELECT * FROM tbl0005")), 2) + + # Clean after yourself + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + def test_restore_to_lsn_not_inclusive(self): + """recovery to target lsn""" + fname = self.id().split('.')[3] + node = self.make_simple_node( + base_dir="{0}/{1}/node".format(module_name, fname), + initdb_params=['--data-checksums'], + pg_options={'wal_level': 'replica'} + ) + + if self.get_version(node) < self.version_to_num('10.0'): + self.del_test_dir(module_name, fname) + return + + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + self.set_archiving(backup_dir, 'node', node) + node.start() + + node.pgbench_init(scale=2) + with node.connect("postgres") as con: + con.execute("CREATE TABLE tbl0005 (a int)") + con.commit() + + backup_id = self.backup_node(backup_dir, 'node', node) + + pgbench = node.pgbench( + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + pgbench.wait() + pgbench.stdout.close() + + before = node.safe_psql("postgres", "SELECT * FROM pgbench_branches") + with node.connect("postgres") as con: + con.execute("INSERT INTO tbl0005 VALUES (1)") + con.commit() + res = con.execute("SELECT pg_current_wal_lsn()") + con.commit() + con.execute("INSERT INTO tbl0005 VALUES (2)") + con.commit() + xlogid, xrecoff = res[0][0].split('/') + xrecoff = hex(int(xrecoff, 16) + 1)[2:] + target_lsn = "{0}/{1}".format(xlogid, xrecoff) + + pgbench = node.pgbench( + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + pgbench.wait() + pgbench.stdout.close() + + node.stop() + node.cleanup() + + self.assertIn( + "INFO: Restore of backup {0} completed.".format(backup_id), + self.restore_node( + backup_dir, 'node', node, + options=[ + "--inclusive=false", + "-j", "4", '--lsn={0}'.format(target_lsn), + "--recovery-target-action=promote"] + ), + '\n Unexpected Error Message: {0}\n CMD: {1}'.format( + repr(self.output), self.cmd)) + + node.slow_start() + + after = node.safe_psql("postgres", "SELECT * FROM pgbench_branches") + self.assertEqual(before, after) + self.assertEqual( + len(node.execute("postgres", "SELECT * FROM tbl0005")), 1) + + # Clean after yourself + self.del_test_dir(module_name, fname) + # @unittest.skip("skip") def test_restore_full_ptrack_archive(self): """recovery to latest from archive full+ptrack backups"""