mirror of
https://github.com/postgrespro/pg_probackup.git
synced 2025-02-13 14:58:35 +02:00
send pg_stop_backup() asynchronously, cancel query if no answer in 300 seconds
This commit is contained in:
parent
1a9dd343d2
commit
ab3b160a1a
60
backup.c
60
backup.c
@ -909,6 +909,9 @@ pg_stop_backup(pgBackup *backup)
|
||||
uint32 xlogid;
|
||||
uint32 xrecoff;
|
||||
XLogRecPtr restore_lsn;
|
||||
bool sent = false;
|
||||
int pg_stop_backup_timeout = 0;
|
||||
int is_busy = 1;
|
||||
|
||||
/*
|
||||
* We will use this values if there are no transactions between start_lsn
|
||||
@ -999,23 +1002,72 @@ pg_stop_backup(pgBackup *backup)
|
||||
pfree(backup_id);
|
||||
}
|
||||
|
||||
/*
|
||||
* send pg_stop_backup asynchronously because we could came
|
||||
* here from backup_cleanup() after some error caused by
|
||||
* postgres archive_command problem and in this case we will
|
||||
* wait for pg_stop_backup() forever.
|
||||
*/
|
||||
if (!exclusive_backup)
|
||||
/*
|
||||
* Stop the non-exclusive backup. Besides stop_lsn it returns from
|
||||
* pg_stop_backup(false) copy of the backup label and tablespace map
|
||||
* so they can be written to disk by the caller.
|
||||
*/
|
||||
res = pgut_execute(backup_conn,
|
||||
sent = pgut_send(backup_conn,
|
||||
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
|
||||
" current_timestamp(0)::timestamp"
|
||||
" FROM pg_stop_backup(false)",
|
||||
0, NULL);
|
||||
0, NULL, WARNING);
|
||||
else
|
||||
res = pgut_execute(backup_conn,
|
||||
sent = pgut_send(backup_conn,
|
||||
"SELECT *, txid_snapshot_xmax(txid_current_snapshot()),"
|
||||
" current_timestamp(0)::timestamp"
|
||||
" FROM pg_stop_backup()",
|
||||
0, NULL);
|
||||
0, NULL, WARNING);
|
||||
|
||||
if (!sent)
|
||||
elog(WARNING, "Failed to send pg_stop_backup query");
|
||||
|
||||
|
||||
/*
|
||||
* Wait for the result of pg_stop_backup(),
|
||||
* but no longer than PG_STOP_BACKUP_TIMEOUT seconds
|
||||
*/
|
||||
elog(INFO, "wait for pg_stop_backup()");
|
||||
do
|
||||
{
|
||||
/*
|
||||
* PQisBusy returns 1 if a command is busy, that is, PQgetResult would
|
||||
* block waiting for input. A 0 return indicates that PQgetResult can
|
||||
* be called with assurance of not blocking
|
||||
*/
|
||||
is_busy = PQisBusy(backup_conn);
|
||||
pg_stop_backup_timeout++;
|
||||
sleep(1);
|
||||
|
||||
if (interrupted)
|
||||
{
|
||||
pgut_cancel(backup_conn);
|
||||
elog(ERROR, "interrupted during waiting for pg_stop_backup");
|
||||
}
|
||||
|
||||
} while (is_busy && pg_stop_backup_timeout < PG_STOP_BACKUP_TIMEOUT);
|
||||
|
||||
/*
|
||||
* If postgres haven't answered in PG_STOP_BACKUP_TIMEOUT seconds,
|
||||
* send an interrupt.
|
||||
*/
|
||||
if (is_busy)
|
||||
{
|
||||
pgut_cancel(backup_conn);
|
||||
elog(ERROR, "pg_stop_backup doesn't finish in 300 seconds.");
|
||||
}
|
||||
|
||||
res = PQgetResult(backup_conn);
|
||||
|
||||
if (!res)
|
||||
elog(ERROR, "pg_stop backup() failed");
|
||||
|
||||
backup_in_progress = false;
|
||||
|
||||
|
19
data.c
19
data.c
@ -570,8 +570,6 @@ restore_data_file(const char *from_root,
|
||||
if (uncompressed_size != BLCKSZ)
|
||||
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", uncompressed_size);
|
||||
}
|
||||
else
|
||||
memcpy(page.data, compressed_page.data, BLCKSZ);
|
||||
|
||||
/*
|
||||
* Seek and write the restored page.
|
||||
@ -580,9 +578,20 @@ restore_data_file(const char *from_root,
|
||||
if (fseek(out, blknum * BLCKSZ, SEEK_SET) < 0)
|
||||
elog(ERROR, "cannot seek block %u of \"%s\": %s",
|
||||
blknum, to_path, strerror(errno));
|
||||
if (fwrite(page.data, 1, sizeof(page), out) != sizeof(page))
|
||||
elog(ERROR, "cannot write block %u of \"%s\": %s",
|
||||
blknum, file->path, strerror(errno));
|
||||
|
||||
if (header.compressed_size < BLCKSZ)
|
||||
{
|
||||
if (fwrite(page.data, 1, BLCKSZ, out) != BLCKSZ)
|
||||
elog(ERROR, "cannot write block %u of \"%s\": %s",
|
||||
blknum, file->path, strerror(errno));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* if page wasn't compressed, we've read full block */
|
||||
if (fwrite(compressed_page.data, 1, BLCKSZ, out) != BLCKSZ)
|
||||
elog(ERROR, "cannot write block %u of \"%s\": %s",
|
||||
blknum, file->path, strerror(errno));
|
||||
}
|
||||
}
|
||||
|
||||
/* update file permission */
|
||||
|
@ -33,6 +33,7 @@
|
||||
|
||||
#include "datapagemap.h"
|
||||
|
||||
# define PG_STOP_BACKUP_TIMEOUT 300
|
||||
/*
|
||||
* Macro needed to parse ptrack.
|
||||
* NOTE Keep those values syncronised with definitions in ptrack.h
|
||||
|
18
utils/pgut.c
18
utils/pgut.c
@ -1174,6 +1174,24 @@ pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
pgut_cancel(PGconn* conn)
|
||||
{
|
||||
PGcancel *cancel_conn = PQgetCancel(conn);
|
||||
char errbuf[256];
|
||||
|
||||
if (cancel_conn != NULL)
|
||||
{
|
||||
if (PQcancel(cancel_conn, errbuf, sizeof(errbuf)))
|
||||
elog(WARNING, "Cancel request sent");
|
||||
else
|
||||
elog(WARNING, "Cancel request failed");
|
||||
}
|
||||
|
||||
if (cancel_conn)
|
||||
PQfreeCancel(cancel_conn);
|
||||
}
|
||||
|
||||
int
|
||||
pgut_wait(int num, PGconn *connections[], struct timeval *timeout)
|
||||
{
|
||||
|
@ -122,6 +122,7 @@ extern PGconn *pgut_connect_extended(const char *pghost, const char *pgport,
|
||||
extern void pgut_disconnect(PGconn *conn);
|
||||
extern PGresult *pgut_execute(PGconn* conn, const char *query, int nParams, const char **params);
|
||||
extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);
|
||||
extern void pgut_cancel(PGconn* conn);
|
||||
extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout);
|
||||
|
||||
extern const char *pgut_get_host(void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user