1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-09 14:33:17 +02:00

Wait lsn in pg_start_backup() and pg_stop_backup(). Do pg_switch_xlog() only for master

This commit is contained in:
Artur Zakirov 2017-03-21 18:30:48 +03:00
parent 176102b159
commit 2671a0c08e

372
backup.c
View File

@ -64,15 +64,14 @@ static void backup_disconnect(bool fatal, void *userdata);
static void backup_files(void *arg);
static parray *do_backup_database(parray *backup_list, bool smooth_checkpoint);
static void pg_start_backup(const char *label, bool smooth, pgBackup *backup);
static void pg_stop_backup(pgBackup *backup);
static void pg_switch_xlog(void);
static bool pg_is_standby(void);
static void get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup);
static void get_xid(PGresult *res, TransactionId *xid);
static void add_pgdata_files(parray *files, const char *root);
static void create_file_list(parray *files, const char *root, bool is_append);
static void wait_for_archive(pgBackup *backup, const char *sql,
bool stop_backup);
static void wait_archive_lsn(XLogRecPtr lsn, bool last_segno);
static void make_pagemap_from_ptrack(parray *files);
static void StreamLog(void *arg);
@ -239,7 +238,8 @@ do_backup_database(parray *backup_list, bool smooth_checkpoint)
if (current.backup_mode == BACKUP_MODE_DIFF_PAGE)
{
/* Enforce archiving of last segment and wait for it to be here */
wait_for_archive(&current, "SELECT * FROM pg_switch_xlog()", false);
if (!from_replica)
pg_switch_xlog();
/* Now build the page map */
parray_qsort(backup_files_list, pgFileComparePathDesc);
@ -254,16 +254,16 @@ do_backup_database(parray *backup_list, bool smooth_checkpoint)
extractPageMap(arclog_path, prev_backup->start_lsn, current.tli,
current.start_lsn);
}
if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
else if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
{
XLogRecPtr ptrack_lsn = get_last_ptrack_lsn();
XLogRecPtr ptrack_lsn = get_last_ptrack_lsn();
if (ptrack_lsn > prev_backup->stop_lsn)
{
elog(ERROR, "Wrong ptrack lsn:%lx prev:%lx current:%lx",
ptrack_lsn,
prev_backup->start_lsn,
current.start_lsn);
ptrack_lsn,
prev_backup->start_lsn,
current.start_lsn);
elog(ERROR, "Create new full backup before an incremental one.");
}
parray_qsort(backup_files_list, pgFileComparePathDesc);
@ -578,8 +578,10 @@ confirm_block_size(const char *name, int blcksz)
static void
pg_start_backup(const char *label, bool smooth, pgBackup *backup)
{
PGresult *res;
const char *params[2];
PGresult *res;
const char *params[2];
uint32 xlogid;
uint32 xrecoff;
params[0] = label;
@ -596,12 +598,15 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
2,
params);
if (backup != NULL)
{
get_lsn(backup_conn, res, &backup->start_lsn, false);
if (!stream_wal && !from_replica)
wait_archive_lsn(backup->start_lsn, true);
}
/*
* Extract timeline and LSN from results of pg_start_backup()
*/
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
/* Calculate LSN */
backup->start_lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
if (!stream_wal)
wait_archive_lsn(backup->start_lsn, true);
PQclear(res);
}
@ -727,83 +732,6 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid,
return result;
}
static void
wait_for_archive(pgBackup *backup, const char *sql, bool stop_backup)
{
PGresult *res;
char ready_path[MAXPGPATH];
char file_name[MAXFNAMELEN];
int try_count;
XLogRecPtr lsn;
TimeLineID tli;
XLogSegNo targetSegNo;
/* Remove annoying NOTICE messages generated by backend */
res = pgut_execute(backup_conn, "SET client_min_messages = warning;", 0,
NULL);
PQclear(res);
/* And execute the query wanted */
res = pgut_execute(backup_conn, sql, 0, NULL);
/* Get LSN from execution result */
get_lsn(backup_conn, res, &lsn, stop_backup);
/*
* Enforce TLI obtention if backup is not present as this code
* path can be taken as a callback at exit.
*/
tli = get_current_timeline(false);
/* Fill in fields if backup exists */
if (backup != NULL)
{
backup->tli = tli;
backup->stop_lsn = lsn;
elog(LOG, "%s(): tli=%X lsn=%X/%08X",
__FUNCTION__, backup->tli,
(uint32) (backup->stop_lsn >> 32),
(uint32) backup->stop_lsn);
}
/* As well as WAL file name */
XLByteToSeg(lsn, targetSegNo);
XLogFileName(file_name, tli, targetSegNo);
snprintf(ready_path, lengthof(ready_path),
"%s/pg_xlog/archive_status/%s.ready", pgdata,
file_name);
elog(LOG, "%s() wait for %s", __FUNCTION__, ready_path);
PQclear(res);
if (from_replica)
res = pgut_execute(backup_conn, TXID_CURRENT_IF_SQL, 0, NULL);
else
res = pgut_execute(backup_conn, TXID_CURRENT_SQL, 0, NULL);
if (backup != NULL)
{
get_xid(res, &backup->recovery_xid);
backup->recovery_time = time(NULL);
}
/* wait until switched WAL is archived */
try_count = 0;
while (fileExists(ready_path))
{
sleep(1);
if (interrupted)
elog(ERROR,
"interrupted during waiting for WAL archiving");
try_count++;
if (try_count > TIMEOUT_ARCHIVE)
elog(ERROR,
"switched WAL could not be archived in %d seconds",
TIMEOUT_ARCHIVE);
}
elog(LOG, "%s() .ready deleted in %d try", __FUNCTION__, try_count);
}
static void
wait_archive_lsn(XLogRecPtr lsn, bool last_segno)
{
@ -844,68 +772,142 @@ wait_archive_lsn(XLogRecPtr lsn, bool last_segno)
static void
pg_stop_backup(pgBackup *backup)
{
if (stream_wal)
PGresult *res;
uint32 xlogid;
uint32 xrecoff;
/* Remove annoying NOTICE messages generated by backend */
res = pgut_execute(backup_conn, "SET client_min_messages = warning;",
0, NULL);
PQclear(res);
if (from_replica)
res = pgut_execute(backup_conn,
"SELECT * FROM pg_stop_backup(false)", 0, NULL);
else
res = pgut_execute(backup_conn,
"SELECT * FROM pg_stop_backup()", 0, NULL);
/*
* Extract timeline and LSN from results of pg_stop_backup()
*/
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
/* Calculate LSN */
stop_backup_lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
/* Write backup_label and tablespace_map for backup from replica */
if (from_replica)
{
PGresult *res;
TimeLineID tli;
char path[MAXPGPATH];
char backup_label[MAXPGPATH];
FILE *fp;
pgFile *file;
/* Remove annoying NOTICE messages generated by backend */
res = pgut_execute(backup_conn, "SET client_min_messages = warning;",
0, NULL);
PQclear(res);
Assert(PQnfields(res) >= 3);
/* And execute the query wanted */
if (from_replica)
res = pgut_execute(backup_conn,
"SELECT * FROM pg_stop_backup(false)", 0, NULL);
else
res = pgut_execute(backup_conn,
"SELECT * FROM pg_stop_backup()", 0, NULL);
pgBackupGetPath(&current, path, lengthof(path), DATABASE_DIR);
join_path_components(backup_label, path, "backup_label");
/* Get LSN from execution result */
get_lsn(backup_conn, res, &stop_backup_lsn, true);
PQclear(res);
/* Write backup_label */
fp = fopen(backup_label, "w");
if (fp == NULL)
elog(ERROR, "can't open backup label file \"%s\": %s",
backup_label, strerror(errno));
/*
* Enforce TLI obtention if backup is not present as this code
* path can be taken as a callback at exit.
*/
tli = get_current_timeline(false);
fwrite(PQgetvalue(res, 0, 1), 1, strlen(PQgetvalue(res, 0, 1)), fp);
fclose(fp);
/* Fill in fields if backup exists */
if (backup != NULL)
file = pgFileNew(backup_label, true);
calc_file(file);
free(file->path);
file->path = strdup("backup_label");
parray_append(backup_files_list, file);
/* Write tablespace_map */
if (strlen(PQgetvalue(res, 0, 2)) > 0)
{
backup->tli = tli;
backup->stop_lsn = stop_backup_lsn;
elog(LOG, "%s(): tli=%X lsn=%X/%08X",
__FUNCTION__, backup->tli,
(uint32) (backup->stop_lsn >> 32),
(uint32) backup->stop_lsn);
char tablespace_map[MAXPGPATH];
join_path_components(tablespace_map, path, "tablespace_map");
fp = fopen(tablespace_map, "w");
if (fp == NULL)
elog(ERROR, "can't open tablespace map file \"%s\": %s",
tablespace_map, strerror(errno));
fwrite(PQgetvalue(res, 0, 2), 1, strlen(PQgetvalue(res, 0, 2)), fp);
fclose(fp);
file = pgFileNew(tablespace_map, true);
calc_file(file);
free(file->path);
file->path = strdup("tablespace_map");
parray_append(backup_files_list, file);
}
}
PQclear(res);
/* Fill in fields if backup exists */
if (backup != NULL)
{
backup->tli = get_current_timeline(false);
backup->stop_lsn = stop_backup_lsn;
if (from_replica)
res = pgut_execute(backup_conn, TXID_CURRENT_IF_SQL, 0, NULL);
else
res = pgut_execute(backup_conn, TXID_CURRENT_SQL, 0, NULL);
if (backup != NULL)
{
get_xid(res, &backup->recovery_xid);
backup->recovery_time = time(NULL);
}
if (sscanf(PQgetvalue(res, 0, 0), XID_FMT, &backup->recovery_xid) != 1)
elog(ERROR,
"result of txid_current() is invalid: %s",
PQerrorMessage(backup_conn));
backup->recovery_time = time(NULL);
elog(LOG, "finish backup: tli=%X lsn=%X/%08X xid=%s",
backup->tli,
(uint32) (backup->stop_lsn >> 32), (uint32) backup->stop_lsn,
PQgetvalue(res, 0, 0));
PQclear(res);
}
else
{
if (from_replica)
wait_for_archive(backup,
"SELECT * FROM pg_stop_backup(false)",
true);
else
wait_for_archive(backup,
"SELECT * FROM pg_stop_backup()",
true);
}
if (!stream_wal)
wait_archive_lsn(stop_backup_lsn, false);
}
/*
* Switch to a new WAL segment for master.
*/
static void
pg_switch_xlog(void)
{
PGresult *res;
XLogRecPtr lsn;
uint32 xlogid;
uint32 xrecoff;
/* Remove annoying NOTICE messages generated by backend */
res = pgut_execute(backup_conn, "SET client_min_messages = warning;", 0,
NULL);
PQclear(res);
res = pgut_execute(backup_conn, "SELECT * FROM pg_switch_xlog()",
0, NULL);
/*
* Extract timeline and LSN from results of pg_stop_backup()
* and friends.
*/
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
/* Calculate LSN */
lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
PQclear(res);
/* Wait for returned lsn - 1 in archive folder */
wait_archive_lsn(lsn, false);
}
@ -922,92 +924,6 @@ pg_is_standby(void)
return fileExists(path);
}
/*
* Get LSN from result of pg_start_backup() or pg_stop_backup().
*/
static void
get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup)
{
uint32 xlogid;
uint32 xrecoff;
if (res == NULL || PQntuples(res) != 1 ||
(PQnfields(res) != 1 && PQnfields(res) != 3))
elog(ERROR,
"result of backup command is invalid: %s",
PQerrorMessage(conn));
/*
* Extract timeline and LSN from results of pg_stop_backup()
* and friends.
*/
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
/* Calculate LSN */
*lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
if (stop_backup && from_replica && PQnfields(res) == 3)
{
char path[MAXPGPATH];
char path_backup_label[MAXPGPATH];
char path_tablespace_map[MAXPGPATH];
FILE *fp;
pgFile *file;
pgBackupGetPath(&current, path, lengthof(path), DATABASE_DIR);
snprintf(path_backup_label, lengthof(path_backup_label), "%s/backup_label", path);
snprintf(path_tablespace_map, lengthof(path_tablespace_map), "%s/tablespace_map", path);
fp = fopen(path_backup_label, "w");
if (fp == NULL)
elog(ERROR, "can't open backup label file \"%s\": %s",
path_backup_label, strerror(errno));
fwrite(PQgetvalue(res, 0, 1), 1, strlen(PQgetvalue(res, 0, 1)), fp);
fclose(fp);
file = pgFileNew(path_backup_label, true);
calc_file(file);
free(file->path);
file->path = strdup("backup_label");
parray_append(backup_files_list, file);
if (strlen(PQgetvalue(res, 0, 2)) == 0)
return;
fp = fopen(path_tablespace_map, "w");
if (fp == NULL)
elog(ERROR, "can't open tablespace map file \"%s\": %s",
path_tablespace_map, strerror(errno));
fwrite(PQgetvalue(res, 0, 2), 1, strlen(PQgetvalue(res, 0, 2)), fp);
fclose(fp);
file = pgFileNew(path_tablespace_map, true);
calc_file(file);
free(file->path);
file->path = strdup("tablespace_map");
parray_append(backup_files_list, file);
}
}
/*
* Get XID from result of txid_current() after pg_stop_backup().
*/
static void
get_xid(PGresult *res, TransactionId *xid)
{
if (res == NULL || PQntuples(res) != 1 || PQnfields(res) != 1)
elog(ERROR,
"result of txid_current() is invalid: %s",
PQerrorMessage(backup_conn));
if (sscanf(PQgetvalue(res, 0, 0), XID_FMT, xid) != 1)
{
elog(ERROR,
"result of txid_current() is invalid: %s",
PQerrorMessage(backup_conn));
}
elog(LOG, "%s():%s", __FUNCTION__, PQgetvalue(res, 0, 0));
}
/*
* Return true if the path is a existing regular file.
*/