From 9f038eeb5ee815b7e4905cc89dffc21013d01fb4 Mon Sep 17 00:00:00 2001 From: stalkerg Date: Thu, 29 Sep 2016 17:33:21 +0300 Subject: [PATCH] Add support backup from replica. --- README.md | 6 ++ backup.c | 149 +++++++++++++++++++++++++++++++++----------- doc/pg_arman.md | 6 +- expected/option.out | 1 + pg_arman.c | 3 + pg_arman.h | 2 + 6 files changed, 129 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 6ccaa900..3d57e0d4 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,12 @@ server because of a reason or another. Its differential backup facility reduces the amount of data necessary to be taken between two consecutive backups. +Main features: +* incremental backup from WAL and PTRACK +* backup from replica +* multithreaded backup and restore +* autonomous backup without archive command (will need slot replication) + Download -------- diff --git a/backup.c b/backup.c index a1c85fef..e288ea7b 100644 --- a/backup.c +++ b/backup.c @@ -41,6 +41,7 @@ const char *progname = "pg_arman"; parray *backup_files_list; static volatile uint32 total_copy_files_increment; static uint32 total_files_num; +static PGconn *start_stop_connect = NULL; typedef struct { @@ -61,7 +62,7 @@ static void confirm_block_size(const char *name, int blcksz); static void pg_start_backup(const char *label, bool smooth, pgBackup *backup); static void pg_stop_backup(pgBackup *backup); static bool pg_is_standby(void); -static void get_lsn(PGresult *res, XLogRecPtr *lsn); +static void get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup); static void get_xid(PGresult *res, uint32 *xid); static void pg_ptrack_clear(void); static char *pg_ptrack_get_and_clear(Oid tablespace_oid, @@ -74,7 +75,7 @@ static void create_file_list(parray *files, const char *subdir, const char *prefix, bool is_append); -static void wait_for_archive(pgBackup *backup, const char *sql); +static void wait_for_archive(PGconn *conn, pgBackup *backup, const char *sql, bool stop_backup); static void make_pagemap_from_ptrack(parray *files); static void StreamLog(void *arg); @@ -112,7 +113,7 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt) pgBackup *prev_backup = NULL; /* Block backup operations on a standby */ - if (pg_is_standby()) + if (pg_is_standby() && !from_replica) elog(ERROR, "Backup cannot run on a standby."); elog(LOG, "database backup start"); @@ -164,18 +165,21 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt) strncat(label, " with pg_arman", lengthof(label)); pg_start_backup(label, smooth_checkpoint, ¤t); - /* If backup_label does not exist in $PGDATA, stop taking backup */ - snprintf(path, lengthof(path), "%s/backup_label", pgdata); - make_native_path(path); - if (!fileExists(path)) - has_backup_label = false; - - /* Leave if no backup file */ - if (!has_backup_label) + if(!from_replica) { - elog(LOG, "backup_label does not exist, stopping backup"); - pg_stop_backup(NULL); - elog(ERROR, "backup_label does not exist in PGDATA."); + /* If backup_label does not exist in $PGDATA, stop taking backup */ + snprintf(path, lengthof(path), "%s/backup_label", pgdata); + make_native_path(path); + if (!fileExists(path)) + has_backup_label = false; + + /* Leave if no backup file */ + if (!has_backup_label) + { + elog(LOG, "backup_label does not exist, stopping backup"); + pg_stop_backup(NULL); + elog(ERROR, "backup_label does not exist in PGDATA."); + } } /* @@ -246,7 +250,7 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt) if (current.backup_mode == BACKUP_MODE_DIFF_PAGE) { /* Enforce archiving of last segment and wait for it to be here */ - wait_for_archive(¤t, "SELECT * FROM pg_switch_xlog()"); + wait_for_archive(connection, ¤t, "SELECT * FROM pg_switch_xlog()", false); /* Now build the page map */ parray_qsort(backup_files_list, pgFileComparePathDesc); @@ -563,6 +567,13 @@ check_server_version(void) (server_version / 100) % 100, server_version % 100, "9.5"); + if (from_replica && server_version < 90600) + elog(ERROR, + "server version is %d.%d.%d, must be %s or higher for backup from replica.", + server_version / 10000, + (server_version / 100) % 100, + server_version % 100, "9.6"); + /* confirm block_size (BLCKSZ) and wal_block_size (XLOG_BLCKSZ) */ confirm_block_size("block_size", BLCKSZ); confirm_block_size("wal_block_size", XLOG_BLCKSZ); @@ -601,16 +612,27 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup) params[0] = label; - reconnect(); + if (start_stop_connect == NULL) + start_stop_connect = pgut_connect(ERROR); /* 2nd argument is 'fast'*/ params[1] = smooth ? "false" : "true"; - res = execute("SELECT pg_start_backup($1, $2)", 2, params); + if (from_replica) + res = pgut_execute(start_stop_connect, + "SELECT pg_start_backup($1, $2, false)", + 2, + params, + ERROR); + else + res = pgut_execute(start_stop_connect, + "SELECT pg_start_backup($1, $2)", + 2, + params, + ERROR); if (backup != NULL) - get_lsn(res, &backup->start_lsn); + get_lsn(start_stop_connect, res, &backup->start_lsn, false); PQclear(res); - disconnect(); } static void @@ -670,7 +692,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res } static void -wait_for_archive(pgBackup *backup, const char *sql) +wait_for_archive(PGconn *conn, pgBackup *backup, const char *sql, bool stop_backup) { PGresult *res; char ready_path[MAXPGPATH]; @@ -680,17 +702,18 @@ wait_for_archive(pgBackup *backup, const char *sql) TimeLineID tli; XLogSegNo targetSegNo; - reconnect(); + if (conn == NULL) + conn = pgut_connect(ERROR); /* Remove annoying NOTICE messages generated by backend */ - res = execute("SET client_min_messages = warning;", 0, NULL); + res = pgut_execute(conn, "SET client_min_messages = warning;", 0, NULL, ERROR); PQclear(res); /* And execute the query wanted */ - res = execute(sql, 0, NULL); + res = pgut_execute(conn, sql, 0, NULL, ERROR); /* Get LSN from execution result */ - get_lsn(res, &lsn); + get_lsn(conn, res, &lsn, stop_backup); /* * Enforce TLI obtention if backup is not present as this code @@ -720,13 +743,15 @@ wait_for_archive(pgBackup *backup, const char *sql) PQclear(res); - res = execute(TXID_CURRENT_SQL, 0, NULL); + if (from_replica) + res = pgut_execute(conn, TXID_CURRENT_IF_SQL, 0, NULL, ERROR); + else + res = pgut_execute(conn, TXID_CURRENT_SQL, 0, NULL, ERROR); if (backup != NULL) { get_xid(res, &backup->recovery_xid); backup->recovery_time = time(NULL); } - disconnect(); /* wait until switched WAL is archived */ try_count = 0; @@ -756,17 +781,22 @@ pg_stop_backup(pgBackup *backup) PGresult *res; TimeLineID tli; - reconnect(); + //reconnect(); + if (start_stop_connect == NULL) + start_stop_connect = pgut_connect(ERROR); /* Remove annoying NOTICE messages generated by backend */ - res = execute("SET client_min_messages = warning;", 0, NULL); + res = pgut_execute(start_stop_connect, "SET client_min_messages = warning;", 0, NULL, ERROR); PQclear(res); /* And execute the query wanted */ - res = execute("SELECT * FROM pg_stop_backup()", 0, NULL); + if (from_replica) + res = pgut_execute(start_stop_connect,"SELECT * FROM pg_stop_backup(false)", 0, NULL, ERROR); + else + res = pgut_execute(start_stop_connect,"SELECT * FROM pg_stop_backup()", 0, NULL, ERROR); /* Get LSN from execution result */ - get_lsn(res, &stop_backup_lsn); + get_lsn(start_stop_connect, res, &stop_backup_lsn, true); PQclear(res); /* @@ -786,18 +816,33 @@ pg_stop_backup(pgBackup *backup) (uint32) backup->stop_lsn); } - res = execute(TXID_CURRENT_SQL, 0, NULL); + if (from_replica) + res = pgut_execute(start_stop_connect, TXID_CURRENT_IF_SQL, 0, NULL, ERROR); + else + res = pgut_execute(start_stop_connect, TXID_CURRENT_SQL, 0, NULL, ERROR); if (backup != NULL) { get_xid(res, &backup->recovery_xid); backup->recovery_time = time(NULL); } PQclear(res); - disconnect(); + //disconnect(); + pgut_disconnect(start_stop_connect); } else - wait_for_archive(backup, - "SELECT * FROM pg_stop_backup()"); + { + if (from_replica) + wait_for_archive(start_stop_connect, + backup, + "SELECT * FROM pg_stop_backup(false)", + true); + else + wait_for_archive(start_stop_connect, + backup, + "SELECT * FROM pg_stop_backup()", + true); + pgut_disconnect(start_stop_connect); + } } @@ -818,15 +863,15 @@ pg_is_standby(void) * Get LSN from result of pg_start_backup() or pg_stop_backup(). */ static void -get_lsn(PGresult *res, XLogRecPtr *lsn) +get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup) { uint32 xlogid; uint32 xrecoff; - if (res == NULL || PQntuples(res) != 1 || PQnfields(res) != 1) + if (res == NULL || PQntuples(res) != 1 || (PQnfields(res) != 1 && PQnfields(res) != 3)) elog(ERROR, "result of backup command is invalid: %s", - PQerrorMessage(connection)); + PQerrorMessage(conn)); /* * Extract timeline and LSN from results of pg_stop_backup() @@ -836,6 +881,36 @@ get_lsn(PGresult *res, XLogRecPtr *lsn) XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff); /* Calculate LSN */ *lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff; + + if (stop_backup && from_replica && PQnfields(res) == 3) + { + char path[MAXPGPATH]; + char path_backup_label[MAXPGPATH]; + char path_tablespace_map[MAXPGPATH]; + FILE *fp; + + pgBackupGetPath(¤t, path, lengthof(path), DATABASE_DIR); + snprintf(path_backup_label, lengthof(path_backup_label), "%s/backup_label", path); + snprintf(path_tablespace_map, lengthof(path_tablespace_map), "%s/tablespace_map", path); + + fp = fopen(path_backup_label, "w"); + if (fp == NULL) + elog(ERROR, "can't open backup label file \"%s\": %s", + path_backup_label, strerror(errno)); + + fwrite(PQgetvalue(res, 0, 1), 1, strlen(PQgetvalue(res, 0, 1)), fp); + fclose(fp); + if (strlen(PQgetvalue(res, 0, 2)) == 0) + return; + + fp = fopen(path_tablespace_map, "w"); + if (fp == NULL) + elog(ERROR, "can't open tablespace map file \"%s\": %s", + path_tablespace_map, strerror(errno)); + + fwrite(PQgetvalue(res, 0, 2), 1, strlen(PQgetvalue(res, 0, 2)), fp); + fclose(fp); + } } /* diff --git a/doc/pg_arman.md b/doc/pg_arman.md index 079c099b..9ae00229 100644 --- a/doc/pg_arman.md +++ b/doc/pg_arman.md @@ -241,7 +241,7 @@ absolute paths; relative paths are not allowed. Only files exceeded one of those settings are deleted. **-j**=NUMBER / **--threads**=NUMBER: - Number of threads for backup. + Number of threads for backup. **--stream**: Enable stream replication for save WAL during backup process. @@ -249,6 +249,10 @@ absolute paths; relative paths are not allowed. **--disable-ptrack-clear**: Disable clear ptrack files for postgres without ptrack patch. +**--from-replica**: + Use non exclusive start backup for replica. Only for 9.6 and higher. + + ### RESTORE OPTIONS The parameters whose name start are started with --recovery refer to diff --git a/expected/option.out b/expected/option.out index 7467cf05..c2c6227e 100644 --- a/expected/option.out +++ b/expected/option.out @@ -28,6 +28,7 @@ Backup options: --keep-data-days=DAY keep enough data backup to recover to DAY days age --disable-ptrack-clear disable clear ptrack for postgres without ptrack --backup-pg-log start backup pg_log directory + --from-replica use non exclusive start backup for replica Restore options: --recovery-target-time time stamp up to which recovery will proceed diff --git a/pg_arman.c b/pg_arman.c index a91af247..f01b0595 100644 --- a/pg_arman.c +++ b/pg_arman.c @@ -36,6 +36,7 @@ static int keep_data_generations = KEEP_INFINITE; static int keep_data_days = KEEP_INFINITE; int num_threads = 1; bool stream_wal = false; +bool from_replica = false; bool disable_ptrack_clear = false; static bool backup_logs = false; static bool backup_validate = false; @@ -69,6 +70,7 @@ static pgut_option options[] = { 'b', 10, "backup-pg-log", &backup_logs }, { 'f', 'b', "backup-mode", opt_backup_mode, SOURCE_ENV }, { 'b', 'C', "smooth-checkpoint", &smooth_checkpoint, SOURCE_ENV }, + { 'b', 12, "from-replica", &from_replica }, /* options with only long name (keep-xxx) */ { 'i', 1, "keep-data-generations", &keep_data_generations, SOURCE_ENV }, { 'i', 2, "keep-data-days", &keep_data_days, SOURCE_ENV }, @@ -250,6 +252,7 @@ pgut_help(bool details) printf(_(" --keep-data-days=DAY keep enough data backup to recover to DAY days age\n")); printf(_(" --disable-ptrack-clear disable clear ptrack for postgres without ptrack\n")); printf(_(" --backup-pg-log start backup pg_log directory\n")); + printf(_(" --from-replica use non exclusive start backup for replica\n")); printf(_("\nRestore options:\n")); printf(_(" --recovery-target-time time stamp up to which recovery will proceed\n")); printf(_(" --recovery-target-xid transaction ID up to which recovery will proceed\n")); diff --git a/pg_arman.h b/pg_arman.h index fb11bce7..5db23d5f 100644 --- a/pg_arman.h +++ b/pg_arman.h @@ -27,6 +27,7 @@ /* Query to fetch current transaction ID */ #define TXID_CURRENT_SQL "SELECT txid_current();" +#define TXID_CURRENT_IF_SQL "SELECT txid_snapshot_xmax(txid_current_snapshot());" /* Directory/File names */ #define DATABASE_DIR "database" @@ -211,6 +212,7 @@ extern parray *backup_files_list; extern int num_threads; extern bool stream_wal; +extern bool from_replica; extern bool disable_ptrack_clear; extern bool progress;