1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-07-15 07:04:14 +02:00

[Issue #385] pg_stop_backup refactoring, part 1

This commit is contained in:
Grigory Smolkin
2021-05-18 12:12:42 +03:00
parent cf0888594f
commit 0e4b3a970a
6 changed files with 533 additions and 439 deletions

View File

@ -32,13 +32,25 @@ static parray *backup_files_list = NULL;
/* We need critical section for datapagemap_add() in case of using threads */ /* We need critical section for datapagemap_add() in case of using threads */
static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER;
// TODO: move to PGnodeInfo
bool exclusive_backup = false; bool exclusive_backup = false;
/* Is pg_start_backup() was executed */ /* Is pg_start_backup() was executed */
static bool backup_in_progress = false; static bool backup_in_progress = false;
/* Is pg_stop_backup() was sent */
static bool pg_stop_backup_is_sent = false; struct pg_stop_backup_result {
/*
* We will use values of snapshot_xid and invocation_time if there are
* no transactions between start_lsn and stop_lsn.
*/
TransactionId snapshot_xid;
time_t invocation_time;
XLogRecPtr lsn;
size_t backup_label_content_len;
char *backup_label_content;
size_t tablespace_map_content_len;
char *tablespace_map_content;
};
/* /*
* Backup routines * Backup routines
@ -53,7 +65,11 @@ static void do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
static void pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pgBackup *backup, static void pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pgBackup *backup,
PGNodeInfo *nodeInfo, PGconn *conn); PGNodeInfo *nodeInfo, PGconn *conn);
static void pg_switch_wal(PGconn *conn); static void pg_switch_wal(PGconn *conn);
static void pg_silent_client_messages(PGconn *conn);
static void pg_create_restore_point(PGconn *conn, time_t backup_start_time);
static void pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn, PGNodeInfo *nodeInfo); static void pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn, PGNodeInfo *nodeInfo);
static void pg_stop_backup_send(PGconn *conn, int server_version, bool is_started_on_replica, bool is_exclusive, char **query_text);
static XLogRecPtr wait_wal_lsn(InstanceState *instanceState, XLogRecPtr lsn, bool is_start_lsn, TimeLineID tli, static XLogRecPtr wait_wal_lsn(InstanceState *instanceState, XLogRecPtr lsn, bool is_start_lsn, TimeLineID tli,
bool in_prev_segment, bool segment_only, bool in_prev_segment, bool segment_only,
@ -74,18 +90,20 @@ static void check_server_version(PGconn *conn, PGNodeInfo *nodeInfo);
static void confirm_block_size(PGconn *conn, const char *name, int blcksz); static void confirm_block_size(PGconn *conn, const char *name, int blcksz);
static void set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i); static void set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i);
static StopBackupCallbackState stop_callback_state;
static void static void
backup_stopbackup_callback(bool fatal, void *userdata) backup_stopbackup_callback(bool fatal, void *userdata)
{ {
InstanceState *instanceState = (InstanceState *) userdata; StopBackupCallbackState *st = (StopBackupCallbackState *) userdata;
PGconn *pg_startbackup_conn = instanceState->conn;
/* /*
* If backup is in progress, notify stop of backup to PostgreSQL * If backup is in progress, notify stop of backup to PostgreSQL
*/ */
if (backup_in_progress) if (backup_in_progress)
{ {
elog(WARNING, "backup in progress, stop backup"); elog(WARNING, "backup in progress, stop backup");
pg_stop_backup(instanceState, NULL, pg_startbackup_conn, NULL); /* don't care about stop_lsn in case of error */ /* don't care about stop_lsn in case of error */
pg_stop_backup_send(st->conn, st->server_version, current.from_replica, exclusive_backup, NULL);
} }
} }
@ -1048,7 +1066,6 @@ pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pg
const char *params[2]; const char *params[2];
uint32 lsn_hi; uint32 lsn_hi;
uint32 lsn_lo; uint32 lsn_lo;
params[0] = label; params[0] = label;
elog(INFO, "wait for pg_start_backup()"); elog(INFO, "wait for pg_start_backup()");
@ -1071,8 +1088,9 @@ pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pg
* is necessary to call pg_stop_backup() in backup_cleanup(). * is necessary to call pg_stop_backup() in backup_cleanup().
*/ */
backup_in_progress = true; backup_in_progress = true;
instanceState->conn = conn; stop_callback_state.conn = conn;
pgut_atexit_push(backup_stopbackup_callback, instanceState); stop_callback_state.server_version = nodeInfo->server_version;
pgut_atexit_push(backup_stopbackup_callback, &stop_callback_state);
/* Extract timeline and LSN from results of pg_start_backup() */ /* Extract timeline and LSN from results of pg_start_backup() */
XLogDataFromLSN(PQgetvalue(res, 0, 0), &lsn_hi, &lsn_lo); XLogDataFromLSN(PQgetvalue(res, 0, 0), &lsn_hi, &lsn_lo);
@ -1103,9 +1121,7 @@ pg_switch_wal(PGconn *conn)
{ {
PGresult *res; PGresult *res;
/* Remove annoying NOTICE messages generated by backend */ pg_silent_client_messages(conn);
res = pgut_execute(conn, "SET client_min_messages = warning;", 0, NULL);
PQclear(res);
#if PG_VERSION_NUM >= 100000 #if PG_VERSION_NUM >= 100000
res = pgut_execute(conn, "SELECT pg_catalog.pg_switch_wal()", 0, NULL); res = pgut_execute(conn, "SELECT pg_catalog.pg_switch_wal()", 0, NULL);
@ -1450,70 +1466,101 @@ wait_wal_lsn(InstanceState *instanceState, XLogRecPtr target_lsn, bool is_start_
} }
} }
/* /* Remove annoying NOTICE messages generated by backend */
* Notify end of backup to PostgreSQL server.
*/
static void static void
pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn, pg_silent_client_messages(PGconn *conn)
PGNodeInfo *nodeInfo)
{ {
PGconn *conn;
PGresult *res; PGresult *res;
PGresult *tablespace_map_content = NULL;
uint32 lsn_hi;
uint32 lsn_lo;
//XLogRecPtr restore_lsn = InvalidXLogRecPtr;
int pg_stop_backup_timeout = 0;
char path[MAXPGPATH];
char backup_label[MAXPGPATH];
FILE *fp;
pgFile *file;
size_t len;
char *val = NULL;
char *stop_backup_query = NULL;
bool stop_lsn_exists = false;
XLogRecPtr stop_backup_lsn_tmp = InvalidXLogRecPtr;
/*
* We will use this values if there are no transactions between start_lsn
* and stop_lsn.
*/
time_t recovery_time;
TransactionId recovery_xid;
if (!backup_in_progress)
elog(ERROR, "backup is not in progress");
conn = pg_startbackup_conn;
/* Remove annoying NOTICE messages generated by backend */
res = pgut_execute(conn, "SET client_min_messages = warning;", res = pgut_execute(conn, "SET client_min_messages = warning;",
0, NULL); 0, NULL);
PQclear(res); PQclear(res);
}
/* Make proper timestamp format for parse_time() */ static void
res = pgut_execute(conn, "SET datestyle = 'ISO, DMY';", 0, NULL); pg_create_restore_point(PGconn *conn, time_t backup_start_time)
PQclear(res); {
PGresult *res;
/* Create restore point
* Only if backup is from master.
* For PG 9.5 create restore point only if pguser is superuser.
*/
if (backup != NULL && !backup->from_replica &&
!(nodeInfo->server_version < 90600 &&
!nodeInfo->is_superuser))
{
const char *params[1]; const char *params[1];
char name[1024]; char name[1024];
snprintf(name, lengthof(name), "pg_probackup, backup_id %s", snprintf(name, lengthof(name), "pg_probackup, backup_id %s",
base36enc(backup->start_time)); base36enc(backup_start_time));
params[0] = name; params[0] = name;
res = pgut_execute(conn, "SELECT pg_catalog.pg_create_restore_point($1)", res = pgut_execute(conn, "SELECT pg_catalog.pg_create_restore_point($1)",
1, params); 1, params);
PQclear(res); PQclear(res);
} }
void
pg_stop_backup_send(PGconn *conn, int server_version, bool is_started_on_replica, bool is_exclusive, char **query_text)
{
static const char
stop_exlusive_backup_query[] =
/*
* Stop the non-exclusive backup. Besides stop_lsn it returns from
* pg_stop_backup(false) copy of the backup label and tablespace map
* so they can be written to disk by the caller.
* TODO, question: add NULLs as backup_label and tablespace_map?
*/
"SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
" pg_catalog.pg_stop_backup() as lsn",
stop_backup_on_master_query[] =
"SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
" lsn,"
" labelfile,"
" spcmapfile"
" FROM pg_catalog.pg_stop_backup(false, false)",
stop_backup_on_master_before10_query[] =
"SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
" lsn,"
" labelfile,"
" spcmapfile"
" FROM pg_catalog.pg_stop_backup(false)",
/*
* In case of backup from replica >= 9.6 we do not trust minRecPoint
* and stop_backup LSN, so we use latest replayed LSN as STOP LSN.
*/
stop_backup_on_replica_query[] =
"SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
" pg_catalog.pg_last_wal_replay_lsn(),"
" labelfile,"
" spcmapfile"
" FROM pg_catalog.pg_stop_backup(false, false)",
stop_backup_on_replica_before10_query[] =
"SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
" pg_catalog.pg_last_xlog_replay_location(),"
" labelfile,"
" spcmapfile"
" FROM pg_catalog.pg_stop_backup(false)";
const char * const stop_backup_query =
is_exclusive ?
stop_exlusive_backup_query :
server_version >= 100000 ?
(is_started_on_replica ?
stop_backup_on_replica_query :
stop_backup_on_master_query
) :
(is_started_on_replica ?
stop_backup_on_replica_before10_query :
stop_backup_on_master_before10_query
);
bool sent = false;
/* Make proper timestamp format for parse_time(recovery_time) */
pgut_execute(conn, "SET datestyle = 'ISO, DMY';", 0, NULL);
// TODO: check result
/* /*
* send pg_stop_backup asynchronously because we could came * send pg_stop_backup asynchronously because we could came
@ -1521,83 +1568,40 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
* postgres archive_command problem and in this case we will * postgres archive_command problem and in this case we will
* wait for pg_stop_backup() forever. * wait for pg_stop_backup() forever.
*/ */
if (!pg_stop_backup_is_sent)
{
bool sent = false;
if (!exclusive_backup)
{
/*
* Stop the non-exclusive backup. Besides stop_lsn it returns from
* pg_stop_backup(false) copy of the backup label and tablespace map
* so they can be written to disk by the caller.
* In case of backup from replica >= 9.6 we do not trust minRecPoint
* and stop_backup LSN, so we use latest replayed LSN as STOP LSN.
*/
/* current is used here because of cleanup */
if (current.from_replica)
stop_backup_query = "SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
#if PG_VERSION_NUM >= 100000
" pg_catalog.pg_last_wal_replay_lsn(),"
#else
" pg_catalog.pg_last_xlog_replay_location(),"
#endif
" labelfile,"
" spcmapfile"
#if PG_VERSION_NUM >= 100000
" FROM pg_catalog.pg_stop_backup(false, false)";
#else
" FROM pg_catalog.pg_stop_backup(false)";
#endif
else
stop_backup_query = "SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
" lsn,"
" labelfile,"
" spcmapfile"
#if PG_VERSION_NUM >= 100000
" FROM pg_catalog.pg_stop_backup(false, false)";
#else
" FROM pg_catalog.pg_stop_backup(false)";
#endif
}
else
{
stop_backup_query = "SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
" current_timestamp(0)::timestamptz,"
" pg_catalog.pg_stop_backup() as lsn";
}
sent = pgut_send(conn, stop_backup_query, 0, NULL, WARNING); sent = pgut_send(conn, stop_backup_query, 0, NULL, WARNING);
pg_stop_backup_is_sent = true;
if (!sent) if (!sent)
elog(ERROR, "Failed to send pg_stop_backup query"); elog(ERROR, "Failed to send pg_stop_backup query");
}
/* After we have sent pg_stop_backup, we don't need this callback anymore */ /* After we have sent pg_stop_backup, we don't need this callback anymore */
instanceState->conn = pg_startbackup_conn; pgut_atexit_pop(backup_stopbackup_callback, &stop_callback_state);
pgut_atexit_pop(backup_stopbackup_callback, instanceState);
/* if (query_text)
* Wait for the result of pg_stop_backup(), but no longer than *query_text = pgut_strdup(stop_backup_query);
* archive_timeout seconds }
/*
* pg_stop_backup_consume -- get 'pg_stop_backup' query results
* side effects:
* - allocates memory for tablespace_map and backup_label contents, so it must freed by caller (if its not null)
* parameters:
* -
*/ */
if (pg_stop_backup_is_sent && !in_cleanup) static void
{ pg_stop_backup_consume(PGconn *conn, int server_version,
int timeout = ARCHIVE_TIMEOUT_DEFAULT; bool is_exclusive, uint32 timeout, const char *query_text,
res = NULL; struct pg_stop_backup_result *result)
{
/* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */ PGresult *query_result;
if (instance_config.archive_timeout > 0) uint32 pg_stop_backup_timeout = 0;
timeout = instance_config.archive_timeout; enum stop_backup_query_result_column_numbers {
recovery_xid_colno = 0,
recovery_time_colno,
lsn_colno,
backup_label_colno,
tablespace_map_colno
};
/* and now wait */
while (1) while (1)
{ {
if (!PQconsumeInput(conn)) if (!PQconsumeInput(conn))
@ -1630,17 +1634,17 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
} }
else else
{ {
res = PQgetResult(conn); query_result = PQgetResult(conn);
break; break;
} }
} }
/* Check successfull execution of pg_stop_backup() */ /* Check successfull execution of pg_stop_backup() */
if (!res) if (!query_result)
elog(ERROR, "pg_stop backup() failed"); elog(ERROR, "pg_stop_backup() failed");
else else
{ {
switch (PQresultStatus(res)) switch (PQresultStatus(query_result))
{ {
/* /*
* We should expect only PGRES_TUPLES_OK since pg_stop_backup * We should expect only PGRES_TUPLES_OK since pg_stop_backup
@ -1650,25 +1654,155 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
break; break;
default: default:
elog(ERROR, "query failed: %s query was: %s", elog(ERROR, "query failed: %s query was: %s",
PQerrorMessage(conn), stop_backup_query); PQerrorMessage(conn), query_text);
} }
backup_in_progress = false;
elog(INFO, "pg_stop backup() successfully executed"); elog(INFO, "pg_stop backup() successfully executed");
} }
backup_in_progress = false; /* get results and fill result structure */
/* get&check recovery_xid */
if (sscanf(PQgetvalue(query_result, 0, recovery_xid_colno), XID_FMT, &result->snapshot_xid) != 1)
elog(ERROR,
"result of txid_snapshot_xmax() is invalid: %s",
PQgetvalue(query_result, 0, recovery_xid_colno));
/* get&check recovery_time */
if (!parse_time(PQgetvalue(query_result, 0, recovery_time_colno), &result->invocation_time, true))
elog(ERROR,
"result of current_timestamp is invalid: %s",
PQgetvalue(query_result, 0, recovery_time_colno));
/* get stop_backup_lsn */
{
uint32 lsn_hi;
uint32 lsn_lo;
// char *target_lsn = "2/F578A000"; // char *target_lsn = "2/F578A000";
// XLogDataFromLSN(target_lsn, &lsn_hi, &lsn_lo); // XLogDataFromLSN(target_lsn, &lsn_hi, &lsn_lo);
/* Extract timeline and LSN from results of pg_stop_backup() */ /* Extract timeline and LSN from results of pg_stop_backup() */
XLogDataFromLSN(PQgetvalue(res, 0, 2), &lsn_hi, &lsn_lo); XLogDataFromLSN(PQgetvalue(query_result, 0, lsn_colno), &lsn_hi, &lsn_lo);
/* Calculate LSN */ /* Calculate LSN */
stop_backup_lsn_tmp = ((uint64) lsn_hi) << 32 | lsn_lo; result->lsn = ((uint64) lsn_hi) << 32 | lsn_lo;
}
/* get backup_label_content */
result->backup_label_content = NULL;
// if (!PQgetisnull(query_result, 0, backup_label_colno))
if (!is_exclusive)
{
result->backup_label_content_len = PQgetlength(query_result, 0, backup_label_colno);
if (result->backup_label_content_len > 0)
result->backup_label_content = pgut_strndup(PQgetvalue(query_result, 0, backup_label_colno),
result->backup_label_content_len);
} else {
result->backup_label_content_len = 0;
}
/* get tablespace_map_content */
result->tablespace_map_content = NULL;
// if (!PQgetisnull(query_result, 0, tablespace_map_colno))
if (!is_exclusive)
{
result->tablespace_map_content_len = PQgetlength(query_result, 0, tablespace_map_colno);
if (result->tablespace_map_content_len > 0)
result->tablespace_map_content = pgut_strndup(PQgetvalue(query_result, 0, tablespace_map_colno),
result->tablespace_map_content_len);
} else {
result->tablespace_map_content_len = 0;
}
}
/*
* helper routine used to write backup_label and tablespace_map in pg_stop_backup()
*/
static void
pg_stop_backup_write_file_helper(const char *path, const char *filename, const char *error_msg_filename,
const void *data, size_t len, parray *file_list)
{
FILE *fp;
pgFile *file;
char full_filename[MAXPGPATH];
join_path_components(full_filename, path, filename);
fp = fio_fopen(full_filename, PG_BINARY_W, FIO_BACKUP_HOST);
if (fp == NULL)
elog(ERROR, "can't open %s file \"%s\": %s",
error_msg_filename, full_filename, strerror(errno));
if (fio_fwrite(fp, data, len) != len ||
fio_fflush(fp) != 0 ||
fio_fclose(fp))
elog(ERROR, "can't write %s file \"%s\": %s",
error_msg_filename, full_filename, strerror(errno));
/*
* It's vital to check if backup_files_list is initialized,
* because we could get here because the backup was interrupted
*/
if (file_list)
{
file = pgFileNew(full_filename, filename, true, 0,
FIO_BACKUP_HOST);
if (S_ISREG(file->mode))
{
file->crc = pgFileGetCRC(full_filename, true, false);
file->write_size = file->size;
file->uncompressed_size = file->size;
}
parray_append(file_list, file);
}
}
/*
* Notify end of backup to PostgreSQL server.
*/
static void
pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn,
PGNodeInfo *nodeInfo)
{
PGconn *conn;
bool stop_lsn_exists = false;
struct pg_stop_backup_result stop_backup_result;
char *xlog_path,stream_xlog_path[MAXPGPATH];
/* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */
int timeout = (instance_config.archive_timeout > 0) ?
instance_config.archive_timeout : ARCHIVE_TIMEOUT_DEFAULT;
char *query_text = NULL;
/* Remove it ? */
if (!backup_in_progress)
elog(ERROR, "backup is not in progress");
conn = pg_startbackup_conn;
pg_silent_client_messages(conn);
/* Create restore point
* Only if backup is from master.
* For PG 9.5 create restore point only if pguser is superuser.
*/
if (!backup->from_replica &&
!(nodeInfo->server_version < 90600 &&
!nodeInfo->is_superuser)) //TODO: check correctness
pg_create_restore_point(conn, backup->start_time);
/* Execute pg_stop_backup using PostgreSQL connection */
pg_stop_backup_send(conn, nodeInfo->server_version, current.from_replica, exclusive_backup, &query_text);
/*
* Wait for the result of pg_stop_backup(), but no longer than
* archive_timeout seconds
*/
pg_stop_backup_consume(conn, nodeInfo->server_version, exclusive_backup, timeout, query_text, &stop_backup_result);
/* It is ok for replica to return invalid STOP LSN /* It is ok for replica to return invalid STOP LSN
* UPD: Apparently it is ok even for a master. * UPD: Apparently it is ok even for a master.
*/ */
if (!XRecOffIsValid(stop_backup_lsn_tmp)) if (!XRecOffIsValid(stop_backup_result.lsn))
{ {
char *xlog_path, char *xlog_path,
stream_xlog_path[MAXPGPATH]; stream_xlog_path[MAXPGPATH];
@ -1680,15 +1814,15 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
* and we're trying to fix it below. * and we're trying to fix it below.
*/ */
elog(LOG, "Invalid offset in stop_lsn value %X/%X, trying to fix", elog(LOG, "Invalid offset in stop_lsn value %X/%X, trying to fix",
(uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_lsn_tmp)); (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn));
/* /*
* Note: even with gdb it is very hard to produce automated tests for * Note: even with gdb it is very hard to produce automated tests for
* contrecord + invalid LSN, so emulate it for manual testing. * contrecord + invalid LSN, so emulate it for manual testing.
*/ */
//stop_backup_lsn_tmp = stop_backup_lsn_tmp - XLOG_SEG_SIZE; //stop_backup_result.lsn = stop_backup_result.lsn - XLOG_SEG_SIZE;
//elog(WARNING, "New Invalid stop_backup_lsn value %X/%X", //elog(WARNING, "New Invalid stop_backup_lsn value %X/%X",
// (uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_lsn_tmp)); // (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn));
if (stream_wal) if (stream_wal)
{ {
@ -1701,7 +1835,7 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
else else
xlog_path = instanceState->instance_wal_subdir_path; xlog_path = instanceState->instance_wal_subdir_path;
GetXLogSegNo(stop_backup_lsn_tmp, segno, instance_config.xlog_seg_size); GetXLogSegNo(stop_backup_result.lsn, segno, instance_config.xlog_seg_size);
/* /*
* Note, that there is no guarantee that corresponding WAL file even exists. * Note, that there is no guarantee that corresponding WAL file even exists.
@ -1721,10 +1855,10 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
*/ */
/* stop_lsn is pointing to a 0 byte of xlog segment */ /* stop_lsn is pointing to a 0 byte of xlog segment */
if (stop_backup_lsn_tmp % instance_config.xlog_seg_size == 0) if (stop_backup_result.lsn % instance_config.xlog_seg_size == 0)
{ {
/* Wait for segment with current stop_lsn, it is ok for it to never arrive */ /* Wait for segment with current stop_lsn, it is ok for it to never arrive */
wait_wal_lsn(instanceState, stop_backup_lsn_tmp, false, backup->tli, wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli,
false, true, WARNING, stream_wal, backup); false, true, WARNING, stream_wal, backup);
/* Get the first record in segment with current stop_lsn */ /* Get the first record in segment with current stop_lsn */
@ -1735,56 +1869,56 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
/* Check that returned LSN is valid and greater than stop_lsn */ /* Check that returned LSN is valid and greater than stop_lsn */
if (XLogRecPtrIsInvalid(lsn_tmp) || if (XLogRecPtrIsInvalid(lsn_tmp) ||
!XRecOffIsValid(lsn_tmp) || !XRecOffIsValid(lsn_tmp) ||
lsn_tmp < stop_backup_lsn_tmp) lsn_tmp < stop_backup_result.lsn)
{ {
/* Backup from master should error out here */ /* Backup from master should error out here */
if (!backup->from_replica) if (!backup->from_replica)
elog(ERROR, "Failed to get next WAL record after %X/%X", elog(ERROR, "Failed to get next WAL record after %X/%X",
(uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_result.lsn >> 32),
(uint32) (stop_backup_lsn_tmp)); (uint32) (stop_backup_result.lsn));
/* No luck, falling back to looking up for previous record */ /* No luck, falling back to looking up for previous record */
elog(WARNING, "Failed to get next WAL record after %X/%X, " elog(WARNING, "Failed to get next WAL record after %X/%X, "
"looking for previous WAL record", "looking for previous WAL record",
(uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_result.lsn >> 32),
(uint32) (stop_backup_lsn_tmp)); (uint32) (stop_backup_result.lsn));
/* Despite looking for previous record there is not guarantee of success /* Despite looking for previous record there is not guarantee of success
* because previous record can be the contrecord. * because previous record can be the contrecord.
*/ */
lsn_tmp = wait_wal_lsn(instanceState, stop_backup_lsn_tmp, false, backup->tli, lsn_tmp = wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli,
true, false, ERROR, stream_wal, backup); true, false, ERROR, stream_wal, backup);
/* sanity */ /* sanity */
if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp)) if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp))
elog(ERROR, "Failed to get WAL record prior to %X/%X", elog(ERROR, "Failed to get WAL record prior to %X/%X",
(uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_result.lsn >> 32),
(uint32) (stop_backup_lsn_tmp)); (uint32) (stop_backup_result.lsn));
} }
} }
/* stop lsn is aligned to xlog block size, just find next lsn */ /* stop lsn is aligned to xlog block size, just find next lsn */
else if (stop_backup_lsn_tmp % XLOG_BLCKSZ == 0) else if (stop_backup_result.lsn % XLOG_BLCKSZ == 0)
{ {
/* Wait for segment with current stop_lsn */ /* Wait for segment with current stop_lsn */
wait_wal_lsn(instanceState, stop_backup_lsn_tmp, false, backup->tli, wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli,
false, true, ERROR, stream_wal, backup); false, true, ERROR, stream_wal, backup);
/* Get the next closest record in segment with current stop_lsn */ /* Get the next closest record in segment with current stop_lsn */
lsn_tmp = get_next_record_lsn(xlog_path, segno, backup->tli, lsn_tmp = get_next_record_lsn(xlog_path, segno, backup->tli,
instance_config.xlog_seg_size, instance_config.xlog_seg_size,
instance_config.archive_timeout, instance_config.archive_timeout,
stop_backup_lsn_tmp); stop_backup_result.lsn);
/* sanity */ /* sanity */
if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp)) if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp))
elog(ERROR, "Failed to get WAL record next to %X/%X", elog(ERROR, "Failed to get WAL record next to %X/%X",
(uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_result.lsn >> 32),
(uint32) (stop_backup_lsn_tmp)); (uint32) (stop_backup_result.lsn));
} }
/* PostgreSQL returned something very illegal as STOP_LSN, error out */ /* PostgreSQL returned something very illegal as STOP_LSN, error out */
else else
elog(ERROR, "Invalid stop_backup_lsn value %X/%X", elog(ERROR, "Invalid stop_backup_lsn value %X/%X",
(uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_lsn_tmp)); (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn));
/* Setting stop_backup_lsn will set stop point for streaming */ /* Setting stop_backup_lsn will set stop point for streaming */
stop_backup_lsn = lsn_tmp; stop_backup_lsn = lsn_tmp;
@ -1792,111 +1926,44 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
} }
elog(LOG, "stop_lsn: %X/%X", elog(LOG, "stop_lsn: %X/%X",
(uint32) (stop_backup_lsn_tmp >> 32), (uint32) (stop_backup_lsn_tmp)); (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn));
/* Write backup_label and tablespace_map */ /* Write backup_label and tablespace_map */
if (!exclusive_backup) if (!exclusive_backup)
{ {
Assert(PQnfields(res) >= 4); char path[MAXPGPATH];
Assert(stop_backup_result.backup_label_content != NULL);
snprintf(path, lengthof(path), "%s/%s/%s", instanceState->instance_backup_subdir_path, snprintf(path, lengthof(path), "%s/%s/%s", instanceState->instance_backup_subdir_path,
base36enc(backup->start_time), DATABASE_DIR); base36enc(backup->start_time), DATABASE_DIR);
/* Write backup_label */ /* Write backup_label */
join_path_components(backup_label, path, PG_BACKUP_LABEL_FILE); pg_stop_backup_write_file_helper(path, PG_BACKUP_LABEL_FILE, "backup label",
fp = fio_fopen(backup_label, PG_BINARY_W, FIO_BACKUP_HOST); stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
if (fp == NULL) backup_files_list);
elog(ERROR, "can't open backup label file \"%s\": %s", free(stop_backup_result.backup_label_content);
backup_label, strerror(errno)); stop_backup_result.backup_label_content = NULL;
stop_backup_result.backup_label_content_len = 0;
len = strlen(PQgetvalue(res, 0, 3));
if (fio_fwrite(fp, PQgetvalue(res, 0, 3), len) != len ||
fio_fflush(fp) != 0 ||
fio_fclose(fp))
elog(ERROR, "can't write backup label file \"%s\": %s",
backup_label, strerror(errno));
/*
* It's vital to check if backup_files_list is initialized,
* because we could get here because the backup was interrupted
*/
if (backup_files_list)
{
file = pgFileNew(backup_label, PG_BACKUP_LABEL_FILE, true, 0,
FIO_BACKUP_HOST);
file->crc = pgFileGetCRC(backup_label, true, false);
file->write_size = file->size;
file->uncompressed_size = file->size;
parray_append(backup_files_list, file);
}
}
if (sscanf(PQgetvalue(res, 0, 0), XID_FMT, &recovery_xid) != 1)
elog(ERROR,
"result of txid_snapshot_xmax() is invalid: %s",
PQgetvalue(res, 0, 0));
if (!parse_time(PQgetvalue(res, 0, 1), &recovery_time, true))
elog(ERROR,
"result of current_timestamp is invalid: %s",
PQgetvalue(res, 0, 1));
/* Get content for tablespace_map from stop_backup results
* in case of non-exclusive backup
*/
if (!exclusive_backup)
val = PQgetvalue(res, 0, 4);
/* Write tablespace_map */ /* Write tablespace_map */
if (!exclusive_backup && val && strlen(val) > 0) if (stop_backup_result.tablespace_map_content != NULL)
{ {
char tablespace_map[MAXPGPATH]; pg_stop_backup_write_file_helper(path, PG_TABLESPACE_MAP_FILE, "tablespace map",
stop_backup_result.tablespace_map_content, stop_backup_result.tablespace_map_content_len,
join_path_components(tablespace_map, path, PG_TABLESPACE_MAP_FILE); backup_files_list);
fp = fio_fopen(tablespace_map, PG_BINARY_W, FIO_BACKUP_HOST); free(stop_backup_result.tablespace_map_content);
if (fp == NULL) stop_backup_result.tablespace_map_content = NULL;
elog(ERROR, "can't open tablespace map file \"%s\": %s", stop_backup_result.tablespace_map_content_len = 0;
tablespace_map, strerror(errno));
len = strlen(val);
if (fio_fwrite(fp, val, len) != len ||
fio_fflush(fp) != 0 ||
fio_fclose(fp))
elog(ERROR, "can't write tablespace map file \"%s\": %s",
tablespace_map, strerror(errno));
if (backup_files_list)
{
file = pgFileNew(tablespace_map, PG_TABLESPACE_MAP_FILE, true, 0,
FIO_BACKUP_HOST);
if (S_ISREG(file->mode))
{
file->crc = pgFileGetCRC(tablespace_map, true, false);
file->write_size = file->size;
}
parray_append(backup_files_list, file);
} }
} }
if (tablespace_map_content)
PQclear(tablespace_map_content);
PQclear(res);
}
/* Fill in fields if that is the correct end of backup. */
if (backup != NULL)
{
char *xlog_path,
stream_xlog_path[MAXPGPATH];
/* /*
* Wait for stop_lsn to be archived or streamed. * Wait for stop_lsn to be archived or streamed.
* If replica returned valid STOP_LSN of not actually existing record, * If replica returned valid STOP_LSN of not actually existing record,
* look for previous record with endpoint >= STOP_LSN. * look for previous record with endpoint >= STOP_LSN.
*/ */
if (!stop_lsn_exists) if (!stop_lsn_exists)
stop_backup_lsn = wait_wal_lsn(instanceState, stop_backup_lsn_tmp, false, backup->tli, stop_backup_lsn = wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli,
false, false, ERROR, stream_wal, backup); false, false, ERROR, stream_wal, backup);
if (stream_wal) if (stream_wal)
@ -1916,7 +1983,7 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
xlog_path = instanceState->instance_wal_subdir_path; xlog_path = instanceState->instance_wal_subdir_path;
backup->stop_lsn = stop_backup_lsn; backup->stop_lsn = stop_backup_lsn;
backup->recovery_xid = recovery_xid; backup->recovery_xid = stop_backup_result.snapshot_xid;
elog(LOG, "Getting the Recovery Time from WAL"); elog(LOG, "Getting the Recovery Time from WAL");
@ -1927,9 +1994,11 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb
&backup->recovery_time)) &backup->recovery_time))
{ {
elog(LOG, "Failed to find Recovery Time in WAL, forced to trust current_timestamp"); elog(LOG, "Failed to find Recovery Time in WAL, forced to trust current_timestamp");
backup->recovery_time = recovery_time; backup->recovery_time = stop_backup_result.invocation_time;
}
} }
/* Cleanup */
pg_free(query_text);
} }
/* /*

View File

@ -679,6 +679,11 @@ typedef struct BackupPageHeader2
uint16 checksum; uint16 checksum;
} BackupPageHeader2; } BackupPageHeader2;
typedef struct StopBackupCallbackState {
PGconn *conn;
int server_version;
} StopBackupCallbackState;
/* Special value for compressed_size field */ /* Special value for compressed_size field */
#define PageIsOk 0 #define PageIsOk 0
#define SkipCurrentPage -1 #define SkipCurrentPage -1

View File

@ -3,7 +3,7 @@
* pgut.c * pgut.c
* *
* Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION * Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2017-2019, Postgres Professional * Portions Copyright (c) 2017-2021, Postgres Professional
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -902,6 +902,20 @@ pgut_strdup(const char *str)
return ret; return ret;
} }
char *
pgut_strndup(const char *str, size_t n)
{
char *ret;
if (str == NULL)
return NULL;
if ((ret = strndup(str, n)) == NULL)
elog(ERROR, "could not duplicate string \"%s\": %s",
str, strerror(errno));
return ret;
}
FILE * FILE *
pgut_fopen(const char *path, const char *mode, bool missing_ok) pgut_fopen(const char *path, const char *mode, bool missing_ok)
{ {

View File

@ -3,7 +3,7 @@
* pgut.h * pgut.h
* *
* Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION * Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2017-2019, Postgres Professional * Portions Copyright (c) 2017-2021, Postgres Professional
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -61,6 +61,7 @@ extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout);
extern void *pgut_malloc(size_t size); extern void *pgut_malloc(size_t size);
extern void *pgut_realloc(void *p, size_t size); extern void *pgut_realloc(void *p, size_t size);
extern char *pgut_strdup(const char *str); extern char *pgut_strdup(const char *str);
extern char *pgut_strndup(const char *str, size_t n);
#define pgut_new(type) ((type *) pgut_malloc(sizeof(type))) #define pgut_new(type) ((type *) pgut_malloc(sizeof(type)))
#define pgut_newarray(type, n) ((type *) pgut_malloc(sizeof(type) * (n))) #define pgut_newarray(type, n) ((type *) pgut_malloc(sizeof(type) * (n)))

View File

@ -733,7 +733,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
# to original data # to original data
master.psql( master.psql(
"postgres", "postgres",
"insert into t_heap as select i as id, md5(i::text) as text, " "insert into t_heap select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(256,512) i") "from generate_series(256,512) i")
before = master.safe_psql("postgres", "SELECT * FROM t_heap") before = master.safe_psql("postgres", "SELECT * FROM t_heap")
@ -768,7 +768,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
# to original data # to original data
master.psql( master.psql(
"postgres", "postgres",
"insert into t_heap as select i as id, md5(i::text) as text, " "insert into t_heap select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(512,80680) i") "from generate_series(512,80680) i")
@ -911,6 +911,11 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
'autovacuum': 'off', 'autovacuum': 'off',
'archive_timeout': '10s'}) 'archive_timeout': '10s'})
if self.get_version(master) < self.version_to_num('9.6.0'):
self.del_test_dir(module_name, fname)
return unittest.skip(
'Skipped because backup from replica is not supported in PG 9.5')
replica = self.make_simple_node( replica = self.make_simple_node(
base_dir=os.path.join(module_name, fname, 'replica')) base_dir=os.path.join(module_name, fname, 'replica'))
replica.cleanup() replica.cleanup()
@ -956,7 +961,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
master.psql( master.psql(
"postgres", "postgres",
"insert into t_heap as select i as id, md5(i::text) as text, " "insert into t_heap select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,10000) i") "from generate_series(0,10000) i")

View File

@ -149,7 +149,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
# to original data # to original data
master.psql( master.psql(
"postgres", "postgres",
"insert into t_heap as select i as id, md5(i::text) as text, " "insert into t_heap select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(256,512) i") "from generate_series(256,512) i")
before = master.safe_psql("postgres", "SELECT * FROM t_heap") before = master.safe_psql("postgres", "SELECT * FROM t_heap")
@ -185,7 +185,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
# to original data # to original data
master.psql( master.psql(
"postgres", "postgres",
"insert into t_heap as select i as id, md5(i::text) as text, " "insert into t_heap select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(512,768) i") "from generate_series(512,768) i")
@ -279,7 +279,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
# equal to original data # equal to original data
master.psql( master.psql(
"postgres", "postgres",
"insert into t_heap as select i as id, md5(i::text) as text, " "insert into t_heap select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(256,25120) i") "from generate_series(256,25120) i")