1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-22 11:44:34 +02:00

Merge branch 'pgpro-1643-squashed'

This commit is contained in:
Arthur Zakirov 2018-07-27 17:56:55 +03:00
commit 9a7daf009f
12 changed files with 265 additions and 16 deletions

View File

@ -116,7 +116,7 @@ help_pg_probackup(void)
printf(_("\n %s restore -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [-D pgdata-dir] [-i backup-id] [--progress]\n"));
printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n"));
printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n"));
printf(_(" [--timeline=timeline] [-T OLDDIR=NEWDIR]\n"));
printf(_(" [--immediate] [--recovery-target-name=target-name]\n"));
printf(_(" [--recovery-target-action=pause|promote|shutdown]\n"));
@ -125,7 +125,7 @@ help_pg_probackup(void)
printf(_("\n %s validate -B backup-dir [--instance=instance_name]\n"), PROGRAM_NAME);
printf(_(" [-i backup-id] [--progress]\n"));
printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n"));
printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n"));
printf(_(" [--recovery-target-name=target-name]\n"));
printf(_(" [--timeline=timeline]\n"));
@ -265,7 +265,7 @@ help_restore(void)
{
printf(_("%s restore -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [-D pgdata-dir] [-i backup-id] [--progress]\n"));
printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n"));
printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n"));
printf(_(" [--timeline=timeline] [-T OLDDIR=NEWDIR]\n"));
printf(_(" [--immediate] [--recovery-target-name=target-name]\n"));
printf(_(" [--recovery-target-action=pause|promote|shutdown]\n"));
@ -280,6 +280,7 @@ help_restore(void)
printf(_(" --progress show progress\n"));
printf(_(" --time=time time stamp up to which recovery will proceed\n"));
printf(_(" --xid=xid transaction ID up to which recovery will proceed\n"));
printf(_(" --lsn=lsn LSN of the write-ahead log location up to which recovery will proceed\n"));
printf(_(" --inclusive=boolean whether we stop just after the recovery target\n"));
printf(_(" --timeline=timeline recovering into a particular timeline\n"));
printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n"));
@ -323,7 +324,7 @@ help_validate(void)
{
printf(_("%s validate -B backup-dir [--instance=instance_name]\n"), PROGRAM_NAME);
printf(_(" [-i backup-id] [--progress]\n"));
printf(_(" [--time=time|--xid=xid [--inclusive=boolean]]\n"));
printf(_(" [--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]\n"));
printf(_(" [--timeline=timeline]\n\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
@ -333,6 +334,7 @@ help_validate(void)
printf(_(" --progress show progress\n"));
printf(_(" --time=time time stamp up to which recovery will proceed\n"));
printf(_(" --xid=xid transaction ID up to which recovery will proceed\n"));
printf(_(" --lsn=lsn LSN of the write-ahead log location up to which recovery will proceed\n"));
printf(_(" --inclusive=boolean whether we stop just after the recovery target\n"));
printf(_(" --timeline=timeline recovering into a particular timeline\n"));
printf(_(" --recovery-target-name=target-name\n"));

View File

@ -421,6 +421,7 @@ validate_wal(pgBackup *backup,
const char *archivedir,
time_t target_time,
TransactionId target_xid,
XLogRecPtr target_lsn,
TimeLineID tli)
{
XLogRecPtr startpoint = backup->start_lsn;
@ -472,7 +473,7 @@ validate_wal(pgBackup *backup,
* If recovery target is provided check that we can restore backup to a
* recovery target time or xid.
*/
if (!TransactionIdIsValid(target_xid) && target_time == 0)
if (!TransactionIdIsValid(target_xid) && target_time == 0 && !XRecOffIsValid(target_lsn))
{
/* Recovery target is not given so exit */
elog(INFO, "Backup %s WAL segments are valid", backup_id);
@ -498,7 +499,8 @@ validate_wal(pgBackup *backup,
last_xid = backup->recovery_xid;
if ((TransactionIdIsValid(target_xid) && target_xid == last_xid)
|| (target_time != 0 && backup->recovery_time >= target_time))
|| (target_time != 0 && backup->recovery_time >= target_time)
|| (XRecOffIsValid(target_lsn) && backup->stop_lsn >= target_lsn))
all_wal = true;
startpoint = backup->stop_lsn;
@ -570,6 +572,9 @@ validate_wal(pgBackup *backup,
else if (target_time != 0)
elog(ERROR, "not enough WAL records to time %s",
target_timestamp);
else if (XRecOffIsValid(target_lsn))
elog(ERROR, "not enough WAL records to lsn %X/%X",
(uint32) (target_lsn >> 32), (uint32) (target_lsn));
}
/* clean */

View File

@ -61,6 +61,7 @@ uint32 replica_timeout = 300; /* default is 300 seconds */
/* restore options */
static char *target_time;
static char *target_xid;
static char *target_lsn;
static char *target_inclusive;
static TimeLineID target_tli;
static bool target_immediate;
@ -150,6 +151,7 @@ static pgut_option options[] =
{ 's', 26, "recovery-target-action", &target_action, SOURCE_CMDLINE },
{ 'b', 'R', "restore-as-replica", &restore_as_replica, SOURCE_CMDLINE },
{ 'b', 27, "no-validate", &restore_no_validate, SOURCE_CMDLINE },
{ 's', 28, "lsn", &target_lsn, SOURCE_CMDLINE },
/* delete options */
{ 'b', 130, "wal", &delete_wal, SOURCE_CMDLINE },
{ 'b', 131, "expired", &delete_expired, SOURCE_CMDLINE },
@ -436,7 +438,7 @@ main(int argc, char *argv[])
{
/* parse all recovery target options into recovery_target_options structure */
recovery_target_options = parseRecoveryTargetOptions(target_time, target_xid,
target_inclusive, target_tli, target_immediate,
target_inclusive, target_tli, target_lsn, target_immediate,
target_name, target_action, restore_no_validate);
}

View File

@ -259,6 +259,10 @@ typedef struct pgRecoveryTarget
TransactionId recovery_target_xid;
/* add one more field in order to avoid deparsing recovery_target_xid back */
const char *target_xid_string;
bool lsn_specified;
XLogRecPtr recovery_target_lsn;
/* add one more field in order to avoid deparsing recovery_target_lsn back */
const char *target_lsn_string;
TimeLineID recovery_target_tli;
bool recovery_target_inclusive;
bool inclusive_specified;
@ -400,8 +404,9 @@ extern bool satisfy_recovery_target(const pgBackup *backup,
extern parray * readTimeLineHistory_probackup(TimeLineID targetTLI);
extern pgRecoveryTarget *parseRecoveryTargetOptions(
const char *target_time, const char *target_xid,
const char *target_inclusive, TimeLineID target_tli, bool target_immediate,
const char *target_name, const char *target_action, bool restore_no_validate);
const char *target_inclusive, TimeLineID target_tli, const char* target_lsn,
bool target_immediate, const char *target_name,
const char *target_action, bool restore_no_validate);
extern void opt_tablespace_map(pgut_option *opt, const char *arg);
@ -514,6 +519,7 @@ extern void validate_wal(pgBackup *backup,
const char *archivedir,
time_t target_time,
TransactionId target_xid,
XLogRecPtr target_lsn,
TimeLineID tli);
extern bool read_recovery_info(const char *archivedir, TimeLineID tli,
XLogRecPtr start_lsn, XLogRecPtr stop_lsn,
@ -536,6 +542,7 @@ extern long unsigned int base36dec(const char *text);
extern uint64 get_system_identifier(char *pgdata);
extern uint64 get_remote_system_identifier(PGconn *conn);
extern pg_time_t timestamptz_to_time_t(TimestampTz t);
extern int parse_server_version(char *server_version_str);
extern void pgBackup_init(pgBackup *backup);
/* in status.c */

View File

@ -276,7 +276,8 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
* because it's needed to form the name of xlog file.
*/
validate_wal(dest_backup, arclog_path, rt->recovery_target_time,
rt->recovery_target_xid, base_full_backup->tli);
rt->recovery_target_xid, rt->recovery_target_lsn,
base_full_backup->tli);
/* Set every incremental backup between corrupted backup and nearest FULL backup as orphans */
if (corrupted_backup)
@ -335,6 +336,11 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
for (i = base_full_backup_index; i >= dest_backup_index; i--)
{
pgBackup *backup = (pgBackup *) parray_get(backups, i);
if (rt->lsn_specified && parse_server_version(backup->server_version) < 100000)
elog(ERROR, "Backup %s was created for version %s which doesn't support recovery_target_lsn",
base36enc(dest_backup->start_time), dest_backup->server_version);
restore_backup(backup);
}
@ -838,6 +844,9 @@ create_recovery_conf(time_t backup_id,
if (rt->xid_specified)
fprintf(fp, "recovery_target_xid = '%s'\n", rt->target_xid_string);
if (rt->recovery_target_lsn)
fprintf(fp, "recovery_target_lsn = '%s'\n", rt->target_lsn_string);
if (rt->recovery_target_immediate)
fprintf(fp, "recovery_target = 'immediate'\n");
@ -982,6 +991,9 @@ satisfy_recovery_target(const pgBackup *backup, const pgRecoveryTarget *rt)
if (rt->time_specified)
return backup->recovery_time <= rt->recovery_target_time;
if (rt->lsn_specified)
return backup->stop_lsn <= rt->recovery_target_lsn;
return true;
}
@ -1010,6 +1022,7 @@ parseRecoveryTargetOptions(const char *target_time,
const char *target_xid,
const char *target_inclusive,
TimeLineID target_tli,
const char *target_lsn,
bool target_immediate,
const char *target_name,
const char *target_action,
@ -1018,6 +1031,7 @@ parseRecoveryTargetOptions(const char *target_time,
time_t dummy_time;
TransactionId dummy_xid;
bool dummy_bool;
XLogRecPtr dummy_lsn;
/*
* count the number of the mutually exclusive options which may specify
* recovery target. If final value > 1, throw an error.
@ -1029,10 +1043,13 @@ parseRecoveryTargetOptions(const char *target_time,
rt->time_specified = false;
rt->xid_specified = false;
rt->inclusive_specified = false;
rt->lsn_specified = false;
rt->recovery_target_time = 0;
rt->recovery_target_xid = 0;
rt->recovery_target_lsn = InvalidXLogRecPtr;
rt->target_time_string = NULL;
rt->target_xid_string = NULL;
rt->target_lsn_string = NULL;
rt->recovery_target_inclusive = false;
rt->recovery_target_tli = 0;
rt->recovery_target_immediate = false;
@ -1069,6 +1086,17 @@ parseRecoveryTargetOptions(const char *target_time,
elog(ERROR, "Invalid value of --xid option %s", target_xid);
}
if (target_lsn)
{
recovery_target_specified++;
rt->lsn_specified = true;
rt->target_lsn_string = target_lsn;
if (parse_lsn(target_lsn, &dummy_lsn))
rt->recovery_target_lsn = dummy_lsn;
else
elog(ERROR, "Invalid value of --lsn option %s", target_lsn);
}
if (target_inclusive)
{
rt->inclusive_specified = true;
@ -1113,10 +1141,10 @@ parseRecoveryTargetOptions(const char *target_time,
/* More than one mutually exclusive option was defined. */
if (recovery_target_specified > 1)
elog(ERROR, "At most one of --immediate, --target-name, --time, or --xid can be used");
elog(ERROR, "At most one of --immediate, --target-name, --time, --xid, or --lsn can be used");
/* If none of the options is defined, '--inclusive' option is meaningless */
if (!(rt->xid_specified || rt->time_specified) && rt->recovery_target_inclusive)
if (!(rt->xid_specified || rt->time_specified || rt->lsn_specified) && rt->recovery_target_inclusive)
elog(ERROR, "--inclusive option applies when either --time or --xid is specified");
return rt;

View File

@ -235,6 +235,35 @@ timestamptz_to_time_t(TimestampTz t)
return result;
}
/* Parse string representation of the server version */
int
parse_server_version(char *server_version_str)
{
int nfields;
int result = 0;
int major_version = 0;
int minor_version = 0;
nfields = sscanf(server_version_str, "%d.%d", &major_version, &minor_version);
if (nfields == 2)
{
/* Server version lower than 10 */
if (major_version > 10)
elog(ERROR, "Server version format doesn't match major version %d", major_version);
result = major_version * 10000 + minor_version * 100;
}
else if (nfields == 1)
{
if (major_version < 10)
elog(ERROR, "Server version format doesn't match major version %d", major_version);
result = major_version * 10000;
}
else
elog(ERROR, "Unknown server version format");
return result;
}
const char *
status2str(BackupStatus status)
{

View File

@ -31,6 +31,7 @@
#define MAX_TZDISP_HOUR 15 /* maximum allowed hour part */
#define SECS_PER_MINUTE 60
#define MINS_PER_HOUR 60
#define MAXPG_LSNCOMPONENT 8
const char *PROGRAM_NAME = NULL;
@ -983,6 +984,32 @@ parse_int(const char *value, int *result, int flags, const char **hintmsg)
return true;
}
bool
parse_lsn(const char *value, XLogRecPtr *result)
{
uint32 xlogid;
uint32 xrecoff;
int len1;
int len2;
len1 = strspn(value, "0123456789abcdefABCDEF");
if (len1 < 1 || len1 > MAXPG_LSNCOMPONENT || value[len1] != '/')
elog(ERROR, "invalid LSN \"%s\"", value);
len2 = strspn(value + len1 + 1, "0123456789abcdefABCDEF");
if (len2 < 1 || len2 > MAXPG_LSNCOMPONENT || value[len1 + 1 + len2] != '\0')
elog(ERROR, "invalid LSN \"%s\"", value);
if (sscanf(value, "%X/%X", &xlogid, &xrecoff) == 2)
*result = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
else
{
elog(ERROR, "invalid LSN \"%s\"", value);
return false;
}
return true;
}
static char *
longopts_to_optstring(const struct option opts[], const size_t len)
{

View File

@ -17,6 +17,7 @@
#include <assert.h>
#include <sys/time.h>
#include "access/xlogdefs.h"
#include "logger.h"
#if !defined(C_H) && !defined(__cplusplus)
@ -207,6 +208,7 @@ extern bool parse_uint64(const char *value, uint64 *result, int flags);
extern bool parse_time(const char *value, time_t *result, bool utc_default);
extern bool parse_int(const char *value, int *result, int flags,
const char **hintmsg);
extern bool parse_lsn(const char *value, XLogRecPtr *result);
extern void convert_from_base_unit(int64 base_value, int base_unit,
int64 *value, const char **unit);

View File

@ -328,7 +328,7 @@ do_validate_instance(void)
base36enc(current_backup->start_time));
/* Validate corresponding WAL files */
validate_wal(current_backup, arclog_path, 0,
0, base_full_backup->tli);
0, 0, base_full_backup->tli);
}
/* Mark every incremental backup between corrupted backup and nearest FULL backup as orphans */
if (current_backup->status == BACKUP_STATUS_CORRUPT)

View File

@ -4,7 +4,7 @@ restore
pg_probackup restore -B backupdir --instance instance_name
[-D datadir]
[ -i backup_id | [{--time=time | --xid=xid } [--inclusive=boolean]]][--timeline=timeline] [-T OLDDIR=NEWDIR]
[ -i backup_id | [{--time=time | --xid=xid | --lsn=lsn } [--inclusive=boolean]]][--timeline=timeline] [-T OLDDIR=NEWDIR]
[-j num_threads] [--progress] [-q] [-v]
"""

View File

@ -53,7 +53,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
pg_probackup restore -B backup-dir --instance=instance_name
[-D pgdata-dir] [-i backup-id] [--progress]
[--time=time|--xid=xid [--inclusive=boolean]]
[--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]
[--timeline=timeline] [-T OLDDIR=NEWDIR]
[--immediate] [--recovery-target-name=target-name]
[--recovery-target-action=pause|promote|shutdown]
@ -62,7 +62,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
pg_probackup validate -B backup-dir [--instance=instance_name]
[-i backup-id] [--progress]
[--time=time|--xid=xid [--inclusive=boolean]]
[--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]
[--recovery-target-name=target-name]
[--timeline=timeline]

View File

@ -353,6 +353,153 @@ class RestoreTest(ProbackupTest, unittest.TestCase):
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_restore_to_lsn_inclusive(self):
"""recovery to target lsn"""
fname = self.id().split('.')[3]
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
initdb_params=['--data-checksums'],
pg_options={'wal_level': 'replica'}
)
if self.get_version(node) < self.version_to_num('10.0'):
self.del_test_dir(module_name, fname)
return
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
node.start()
node.pgbench_init(scale=2)
with node.connect("postgres") as con:
con.execute("CREATE TABLE tbl0005 (a int)")
con.commit()
backup_id = self.backup_node(backup_dir, 'node', node)
pgbench = node.pgbench(
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pgbench.wait()
pgbench.stdout.close()
before = node.safe_psql("postgres", "SELECT * FROM pgbench_branches")
with node.connect("postgres") as con:
con.execute("INSERT INTO tbl0005 VALUES (1)")
con.commit()
res = con.execute("SELECT pg_current_wal_lsn()")
con.commit()
con.execute("INSERT INTO tbl0005 VALUES (2)")
con.commit()
xlogid, xrecoff = res[0][0].split('/')
xrecoff = hex(int(xrecoff, 16) + 1)[2:]
target_lsn = "{0}/{1}".format(xlogid, xrecoff)
pgbench = node.pgbench(
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pgbench.wait()
pgbench.stdout.close()
node.stop()
node.cleanup()
self.assertIn(
"INFO: Restore of backup {0} completed.".format(backup_id),
self.restore_node(
backup_dir, 'node', node,
options=[
"-j", "4", '--lsn={0}'.format(target_lsn),
"--recovery-target-action=promote"]
),
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
repr(self.output), self.cmd))
node.slow_start()
after = node.safe_psql("postgres", "SELECT * FROM pgbench_branches")
self.assertEqual(before, after)
self.assertEqual(
len(node.execute("postgres", "SELECT * FROM tbl0005")), 2)
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_restore_to_lsn_not_inclusive(self):
"""recovery to target lsn"""
fname = self.id().split('.')[3]
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
initdb_params=['--data-checksums'],
pg_options={'wal_level': 'replica'}
)
if self.get_version(node) < self.version_to_num('10.0'):
self.del_test_dir(module_name, fname)
return
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
node.start()
node.pgbench_init(scale=2)
with node.connect("postgres") as con:
con.execute("CREATE TABLE tbl0005 (a int)")
con.commit()
backup_id = self.backup_node(backup_dir, 'node', node)
pgbench = node.pgbench(
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pgbench.wait()
pgbench.stdout.close()
before = node.safe_psql("postgres", "SELECT * FROM pgbench_branches")
with node.connect("postgres") as con:
con.execute("INSERT INTO tbl0005 VALUES (1)")
con.commit()
res = con.execute("SELECT pg_current_wal_lsn()")
con.commit()
con.execute("INSERT INTO tbl0005 VALUES (2)")
con.commit()
xlogid, xrecoff = res[0][0].split('/')
xrecoff = hex(int(xrecoff, 16) + 1)[2:]
target_lsn = "{0}/{1}".format(xlogid, xrecoff)
pgbench = node.pgbench(
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
pgbench.wait()
pgbench.stdout.close()
node.stop()
node.cleanup()
self.assertIn(
"INFO: Restore of backup {0} completed.".format(backup_id),
self.restore_node(
backup_dir, 'node', node,
options=[
"--inclusive=false",
"-j", "4", '--lsn={0}'.format(target_lsn),
"--recovery-target-action=promote"]
),
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
repr(self.output), self.cmd))
node.slow_start()
after = node.safe_psql("postgres", "SELECT * FROM pgbench_branches")
self.assertEqual(before, after)
self.assertEqual(
len(node.execute("postgres", "SELECT * FROM tbl0005")), 1)
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_restore_full_ptrack_archive(self):
"""recovery to latest from archive full+ptrack backups"""