1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-03 14:01:57 +02:00

Add support backup from replica.

This commit is contained in:
stalkerg 2016-09-29 17:33:21 +03:00
parent 43b7455601
commit 9f038eeb5e
6 changed files with 129 additions and 38 deletions

View File

@ -13,6 +13,12 @@ server because of a reason or another. Its differential backup
facility reduces the amount of data necessary to be taken between
two consecutive backups.
Main features:
* incremental backup from WAL and PTRACK
* backup from replica
* multithreaded backup and restore
* autonomous backup without archive command (will need slot replication)
Download
--------

149
backup.c
View File

@ -41,6 +41,7 @@ const char *progname = "pg_arman";
parray *backup_files_list;
static volatile uint32 total_copy_files_increment;
static uint32 total_files_num;
static PGconn *start_stop_connect = NULL;
typedef struct
{
@ -61,7 +62,7 @@ static void confirm_block_size(const char *name, int blcksz);
static void pg_start_backup(const char *label, bool smooth, pgBackup *backup);
static void pg_stop_backup(pgBackup *backup);
static bool pg_is_standby(void);
static void get_lsn(PGresult *res, XLogRecPtr *lsn);
static void get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup);
static void get_xid(PGresult *res, uint32 *xid);
static void pg_ptrack_clear(void);
static char *pg_ptrack_get_and_clear(Oid tablespace_oid,
@ -74,7 +75,7 @@ static void create_file_list(parray *files,
const char *subdir,
const char *prefix,
bool is_append);
static void wait_for_archive(pgBackup *backup, const char *sql);
static void wait_for_archive(PGconn *conn, pgBackup *backup, const char *sql, bool stop_backup);
static void make_pagemap_from_ptrack(parray *files);
static void StreamLog(void *arg);
@ -112,7 +113,7 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
pgBackup *prev_backup = NULL;
/* Block backup operations on a standby */
if (pg_is_standby())
if (pg_is_standby() && !from_replica)
elog(ERROR, "Backup cannot run on a standby.");
elog(LOG, "database backup start");
@ -164,18 +165,21 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
strncat(label, " with pg_arman", lengthof(label));
pg_start_backup(label, smooth_checkpoint, &current);
/* If backup_label does not exist in $PGDATA, stop taking backup */
snprintf(path, lengthof(path), "%s/backup_label", pgdata);
make_native_path(path);
if (!fileExists(path))
has_backup_label = false;
/* Leave if no backup file */
if (!has_backup_label)
if(!from_replica)
{
elog(LOG, "backup_label does not exist, stopping backup");
pg_stop_backup(NULL);
elog(ERROR, "backup_label does not exist in PGDATA.");
/* If backup_label does not exist in $PGDATA, stop taking backup */
snprintf(path, lengthof(path), "%s/backup_label", pgdata);
make_native_path(path);
if (!fileExists(path))
has_backup_label = false;
/* Leave if no backup file */
if (!has_backup_label)
{
elog(LOG, "backup_label does not exist, stopping backup");
pg_stop_backup(NULL);
elog(ERROR, "backup_label does not exist in PGDATA.");
}
}
/*
@ -246,7 +250,7 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
if (current.backup_mode == BACKUP_MODE_DIFF_PAGE)
{
/* Enforce archiving of last segment and wait for it to be here */
wait_for_archive(&current, "SELECT * FROM pg_switch_xlog()");
wait_for_archive(connection, &current, "SELECT * FROM pg_switch_xlog()", false);
/* Now build the page map */
parray_qsort(backup_files_list, pgFileComparePathDesc);
@ -563,6 +567,13 @@ check_server_version(void)
(server_version / 100) % 100,
server_version % 100, "9.5");
if (from_replica && server_version < 90600)
elog(ERROR,
"server version is %d.%d.%d, must be %s or higher for backup from replica.",
server_version / 10000,
(server_version / 100) % 100,
server_version % 100, "9.6");
/* confirm block_size (BLCKSZ) and wal_block_size (XLOG_BLCKSZ) */
confirm_block_size("block_size", BLCKSZ);
confirm_block_size("wal_block_size", XLOG_BLCKSZ);
@ -601,16 +612,27 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
params[0] = label;
reconnect();
if (start_stop_connect == NULL)
start_stop_connect = pgut_connect(ERROR);
/* 2nd argument is 'fast'*/
params[1] = smooth ? "false" : "true";
res = execute("SELECT pg_start_backup($1, $2)", 2, params);
if (from_replica)
res = pgut_execute(start_stop_connect,
"SELECT pg_start_backup($1, $2, false)",
2,
params,
ERROR);
else
res = pgut_execute(start_stop_connect,
"SELECT pg_start_backup($1, $2)",
2,
params,
ERROR);
if (backup != NULL)
get_lsn(res, &backup->start_lsn);
get_lsn(start_stop_connect, res, &backup->start_lsn, false);
PQclear(res);
disconnect();
}
static void
@ -670,7 +692,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
}
static void
wait_for_archive(pgBackup *backup, const char *sql)
wait_for_archive(PGconn *conn, pgBackup *backup, const char *sql, bool stop_backup)
{
PGresult *res;
char ready_path[MAXPGPATH];
@ -680,17 +702,18 @@ wait_for_archive(pgBackup *backup, const char *sql)
TimeLineID tli;
XLogSegNo targetSegNo;
reconnect();
if (conn == NULL)
conn = pgut_connect(ERROR);
/* Remove annoying NOTICE messages generated by backend */
res = execute("SET client_min_messages = warning;", 0, NULL);
res = pgut_execute(conn, "SET client_min_messages = warning;", 0, NULL, ERROR);
PQclear(res);
/* And execute the query wanted */
res = execute(sql, 0, NULL);
res = pgut_execute(conn, sql, 0, NULL, ERROR);
/* Get LSN from execution result */
get_lsn(res, &lsn);
get_lsn(conn, res, &lsn, stop_backup);
/*
* Enforce TLI obtention if backup is not present as this code
@ -720,13 +743,15 @@ wait_for_archive(pgBackup *backup, const char *sql)
PQclear(res);
res = execute(TXID_CURRENT_SQL, 0, NULL);
if (from_replica)
res = pgut_execute(conn, TXID_CURRENT_IF_SQL, 0, NULL, ERROR);
else
res = pgut_execute(conn, TXID_CURRENT_SQL, 0, NULL, ERROR);
if (backup != NULL)
{
get_xid(res, &backup->recovery_xid);
backup->recovery_time = time(NULL);
}
disconnect();
/* wait until switched WAL is archived */
try_count = 0;
@ -756,17 +781,22 @@ pg_stop_backup(pgBackup *backup)
PGresult *res;
TimeLineID tli;
reconnect();
//reconnect();
if (start_stop_connect == NULL)
start_stop_connect = pgut_connect(ERROR);
/* Remove annoying NOTICE messages generated by backend */
res = execute("SET client_min_messages = warning;", 0, NULL);
res = pgut_execute(start_stop_connect, "SET client_min_messages = warning;", 0, NULL, ERROR);
PQclear(res);
/* And execute the query wanted */
res = execute("SELECT * FROM pg_stop_backup()", 0, NULL);
if (from_replica)
res = pgut_execute(start_stop_connect,"SELECT * FROM pg_stop_backup(false)", 0, NULL, ERROR);
else
res = pgut_execute(start_stop_connect,"SELECT * FROM pg_stop_backup()", 0, NULL, ERROR);
/* Get LSN from execution result */
get_lsn(res, &stop_backup_lsn);
get_lsn(start_stop_connect, res, &stop_backup_lsn, true);
PQclear(res);
/*
@ -786,18 +816,33 @@ pg_stop_backup(pgBackup *backup)
(uint32) backup->stop_lsn);
}
res = execute(TXID_CURRENT_SQL, 0, NULL);
if (from_replica)
res = pgut_execute(start_stop_connect, TXID_CURRENT_IF_SQL, 0, NULL, ERROR);
else
res = pgut_execute(start_stop_connect, TXID_CURRENT_SQL, 0, NULL, ERROR);
if (backup != NULL)
{
get_xid(res, &backup->recovery_xid);
backup->recovery_time = time(NULL);
}
PQclear(res);
disconnect();
//disconnect();
pgut_disconnect(start_stop_connect);
}
else
wait_for_archive(backup,
"SELECT * FROM pg_stop_backup()");
{
if (from_replica)
wait_for_archive(start_stop_connect,
backup,
"SELECT * FROM pg_stop_backup(false)",
true);
else
wait_for_archive(start_stop_connect,
backup,
"SELECT * FROM pg_stop_backup()",
true);
pgut_disconnect(start_stop_connect);
}
}
@ -818,15 +863,15 @@ pg_is_standby(void)
* Get LSN from result of pg_start_backup() or pg_stop_backup().
*/
static void
get_lsn(PGresult *res, XLogRecPtr *lsn)
get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup)
{
uint32 xlogid;
uint32 xrecoff;
if (res == NULL || PQntuples(res) != 1 || PQnfields(res) != 1)
if (res == NULL || PQntuples(res) != 1 || (PQnfields(res) != 1 && PQnfields(res) != 3))
elog(ERROR,
"result of backup command is invalid: %s",
PQerrorMessage(connection));
PQerrorMessage(conn));
/*
* Extract timeline and LSN from results of pg_stop_backup()
@ -836,6 +881,36 @@ get_lsn(PGresult *res, XLogRecPtr *lsn)
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
/* Calculate LSN */
*lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
if (stop_backup && from_replica && PQnfields(res) == 3)
{
char path[MAXPGPATH];
char path_backup_label[MAXPGPATH];
char path_tablespace_map[MAXPGPATH];
FILE *fp;
pgBackupGetPath(&current, path, lengthof(path), DATABASE_DIR);
snprintf(path_backup_label, lengthof(path_backup_label), "%s/backup_label", path);
snprintf(path_tablespace_map, lengthof(path_tablespace_map), "%s/tablespace_map", path);
fp = fopen(path_backup_label, "w");
if (fp == NULL)
elog(ERROR, "can't open backup label file \"%s\": %s",
path_backup_label, strerror(errno));
fwrite(PQgetvalue(res, 0, 1), 1, strlen(PQgetvalue(res, 0, 1)), fp);
fclose(fp);
if (strlen(PQgetvalue(res, 0, 2)) == 0)
return;
fp = fopen(path_tablespace_map, "w");
if (fp == NULL)
elog(ERROR, "can't open tablespace map file \"%s\": %s",
path_tablespace_map, strerror(errno));
fwrite(PQgetvalue(res, 0, 2), 1, strlen(PQgetvalue(res, 0, 2)), fp);
fclose(fp);
}
}
/*

View File

@ -241,7 +241,7 @@ absolute paths; relative paths are not allowed.
Only files exceeded one of those settings are deleted.
**-j**=NUMBER / **--threads**=NUMBER:
Number of threads for backup.
Number of threads for backup.
**--stream**:
Enable stream replication for save WAL during backup process.
@ -249,6 +249,10 @@ absolute paths; relative paths are not allowed.
**--disable-ptrack-clear**:
Disable clear ptrack files for postgres without ptrack patch.
**--from-replica**:
Use non exclusive start backup for replica. Only for 9.6 and higher.
### RESTORE OPTIONS
The parameters whose name start are started with --recovery refer to

View File

@ -28,6 +28,7 @@ Backup options:
--keep-data-days=DAY keep enough data backup to recover to DAY days age
--disable-ptrack-clear disable clear ptrack for postgres without ptrack
--backup-pg-log start backup pg_log directory
--from-replica use non exclusive start backup for replica
Restore options:
--recovery-target-time time stamp up to which recovery will proceed

View File

@ -36,6 +36,7 @@ static int keep_data_generations = KEEP_INFINITE;
static int keep_data_days = KEEP_INFINITE;
int num_threads = 1;
bool stream_wal = false;
bool from_replica = false;
bool disable_ptrack_clear = false;
static bool backup_logs = false;
static bool backup_validate = false;
@ -69,6 +70,7 @@ static pgut_option options[] =
{ 'b', 10, "backup-pg-log", &backup_logs },
{ 'f', 'b', "backup-mode", opt_backup_mode, SOURCE_ENV },
{ 'b', 'C', "smooth-checkpoint", &smooth_checkpoint, SOURCE_ENV },
{ 'b', 12, "from-replica", &from_replica },
/* options with only long name (keep-xxx) */
{ 'i', 1, "keep-data-generations", &keep_data_generations, SOURCE_ENV },
{ 'i', 2, "keep-data-days", &keep_data_days, SOURCE_ENV },
@ -250,6 +252,7 @@ pgut_help(bool details)
printf(_(" --keep-data-days=DAY keep enough data backup to recover to DAY days age\n"));
printf(_(" --disable-ptrack-clear disable clear ptrack for postgres without ptrack\n"));
printf(_(" --backup-pg-log start backup pg_log directory\n"));
printf(_(" --from-replica use non exclusive start backup for replica\n"));
printf(_("\nRestore options:\n"));
printf(_(" --recovery-target-time time stamp up to which recovery will proceed\n"));
printf(_(" --recovery-target-xid transaction ID up to which recovery will proceed\n"));

View File

@ -27,6 +27,7 @@
/* Query to fetch current transaction ID */
#define TXID_CURRENT_SQL "SELECT txid_current();"
#define TXID_CURRENT_IF_SQL "SELECT txid_snapshot_xmax(txid_current_snapshot());"
/* Directory/File names */
#define DATABASE_DIR "database"
@ -211,6 +212,7 @@ extern parray *backup_files_list;
extern int num_threads;
extern bool stream_wal;
extern bool from_replica;
extern bool disable_ptrack_clear;
extern bool progress;