mirror of
https://github.com/postgrespro/pg_probackup.git
synced 2025-02-09 14:33:17 +02:00
Merge branch 'master' into pgpro_1053_697
This commit is contained in:
commit
5f62980cb6
176
src/backup.c
176
src/backup.c
@ -31,6 +31,7 @@
|
||||
|
||||
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
|
||||
static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr;
|
||||
static XLogRecPtr stop_stream_lsn = InvalidXLogRecPtr;
|
||||
|
||||
/*
|
||||
* How long we should wait for streaming end in seconds.
|
||||
@ -45,11 +46,23 @@ const char *progname = "pg_probackup";
|
||||
/* list of files contained in backup */
|
||||
static parray *backup_files_list = NULL;
|
||||
|
||||
static pthread_mutex_t start_stream_mut = PTHREAD_MUTEX_INITIALIZER;
|
||||
/*
|
||||
* We need to wait end of WAL streaming before execute pg_stop_backup().
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
const char *basedir;
|
||||
PGconn *conn;
|
||||
|
||||
/*
|
||||
* Return value from the thread.
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} StreamThreadArg;
|
||||
|
||||
static pthread_t stream_thread;
|
||||
static StreamThreadArg stream_thread_arg = {"", NULL, 1};
|
||||
|
||||
static int is_ptrack_enable = false;
|
||||
bool is_ptrack_support = false;
|
||||
@ -423,6 +436,9 @@ remote_backup_files(void *arg)
|
||||
file->path, (unsigned long) file->write_size);
|
||||
PQfinish(file_backup_conn);
|
||||
}
|
||||
|
||||
/* Data files transferring is successful */
|
||||
arguments->ret = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -440,6 +456,7 @@ do_backup_instance(void)
|
||||
|
||||
pthread_t backup_threads[num_threads];
|
||||
backup_files_args *backup_threads_args[num_threads];
|
||||
bool backup_isok = true;
|
||||
|
||||
pgBackup *prev_backup = NULL;
|
||||
char prev_backup_filelist_path[MAXPGPATH];
|
||||
@ -540,13 +557,40 @@ do_backup_instance(void)
|
||||
join_path_components(dst_backup_path, database_path, PG_XLOG_DIR);
|
||||
dir_create_dir(dst_backup_path, DIR_PERMISSION);
|
||||
|
||||
pthread_mutex_lock(&start_stream_mut);
|
||||
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path);
|
||||
pthread_mutex_lock(&start_stream_mut);
|
||||
if (conn == NULL)
|
||||
elog(ERROR, "Cannot continue backup because stream connect has failed.");
|
||||
stream_thread_arg.basedir = dst_backup_path;
|
||||
|
||||
pthread_mutex_unlock(&start_stream_mut);
|
||||
/*
|
||||
* Connect in replication mode to the server.
|
||||
*/
|
||||
stream_thread_arg.conn = pgut_connect_replication(pgut_dbname);
|
||||
|
||||
if (!CheckServerVersionForStreaming(stream_thread_arg.conn))
|
||||
{
|
||||
PQfinish(stream_thread_arg.conn);
|
||||
/*
|
||||
* Error message already written in CheckServerVersionForStreaming().
|
||||
* There's no hope of recovering from a version mismatch, so don't
|
||||
* retry.
|
||||
*/
|
||||
elog(ERROR, "Cannot continue backup because stream connect has failed.");
|
||||
}
|
||||
|
||||
/*
|
||||
* Identify server, obtaining start LSN position and current timeline ID
|
||||
* at the same time, necessary if not valid data can be found in the
|
||||
* existing output directory.
|
||||
*/
|
||||
if (!RunIdentifySystem(stream_thread_arg.conn, NULL, NULL, NULL, NULL))
|
||||
{
|
||||
PQfinish(stream_thread_arg.conn);
|
||||
elog(ERROR, "Cannot continue backup because stream connect has failed.");
|
||||
}
|
||||
|
||||
/* By default there are some error */
|
||||
stream_thread_arg.ret = 1;
|
||||
|
||||
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog,
|
||||
&stream_thread_arg);
|
||||
}
|
||||
|
||||
/* initialize backup list */
|
||||
@ -652,6 +696,8 @@ do_backup_instance(void)
|
||||
arg->prev_backup_start_lsn = prev_backup_start_lsn;
|
||||
arg->thread_backup_conn = NULL;
|
||||
arg->thread_cancel_conn = NULL;
|
||||
/* By default there are some error */
|
||||
arg->ret = 1;
|
||||
backup_threads_args[i] = arg;
|
||||
}
|
||||
|
||||
@ -675,9 +721,15 @@ do_backup_instance(void)
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
pthread_join(backup_threads[i], NULL);
|
||||
if (backup_threads_args[i]->ret == 1)
|
||||
backup_isok = false;
|
||||
|
||||
pg_free(backup_threads_args[i]);
|
||||
}
|
||||
elog(LOG, "Data files are transfered");
|
||||
if (backup_isok)
|
||||
elog(LOG, "Data files are transfered");
|
||||
else
|
||||
elog(ERROR, "Data files transferring failed");
|
||||
|
||||
/* clean previous backup file list */
|
||||
if (prev_backup_filelist)
|
||||
@ -776,10 +828,10 @@ do_backup(time_t start_time)
|
||||
is_checksum_enabled = pg_checksum_enable();
|
||||
|
||||
if (is_checksum_enabled)
|
||||
elog(LOG, "This PostgreSQL instance initialized with data block checksums. "
|
||||
elog(LOG, "This PostgreSQL instance was initialized with data block checksums. "
|
||||
"Data block corruption will be detected");
|
||||
else
|
||||
elog(WARNING, "This PostgreSQL instance initialized without data block checksums. "
|
||||
elog(WARNING, "This PostgreSQL instance was initialized without data block checksums. "
|
||||
"pg_probackup have no way to detect data block corruption without them. "
|
||||
"Reinitialize PGDATA with option '--data-checksums'.");
|
||||
|
||||
@ -1544,7 +1596,8 @@ pg_stop_backup(pgBackup *backup)
|
||||
FILE *fp;
|
||||
pgFile *file;
|
||||
size_t len;
|
||||
char *val = NULL;
|
||||
char *val = NULL;
|
||||
char *stop_backup_query = NULL;
|
||||
|
||||
/*
|
||||
* We will use this values if there are no transactions between start_lsn
|
||||
@ -1601,26 +1654,25 @@ pg_stop_backup(pgBackup *backup)
|
||||
* pg_stop_backup(false) copy of the backup label and tablespace map
|
||||
* so they can be written to disk by the caller.
|
||||
*/
|
||||
sent = pgut_send(conn,
|
||||
"SELECT"
|
||||
stop_backup_query = "SELECT"
|
||||
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
|
||||
" current_timestamp(0)::timestamptz,"
|
||||
" lsn,"
|
||||
" labelfile,"
|
||||
" spcmapfile"
|
||||
" FROM pg_catalog.pg_stop_backup(false)",
|
||||
0, NULL, WARNING);
|
||||
" FROM pg_catalog.pg_stop_backup(false)";
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
sent = pgut_send(conn,
|
||||
"SELECT"
|
||||
stop_backup_query = "SELECT"
|
||||
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
|
||||
" current_timestamp(0)::timestamptz,"
|
||||
" pg_catalog.pg_stop_backup() as lsn",
|
||||
0, NULL, WARNING);
|
||||
" pg_catalog.pg_stop_backup() as lsn";
|
||||
}
|
||||
|
||||
sent = pgut_send(conn, stop_backup_query, 0, NULL, WARNING);
|
||||
pg_stop_backup_is_sent = true;
|
||||
if (!sent)
|
||||
elog(ERROR, "Failed to send pg_stop_backup query");
|
||||
@ -1665,10 +1717,23 @@ pg_stop_backup(pgBackup *backup)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Check successfull execution of pg_stop_backup() */
|
||||
if (!res)
|
||||
elog(ERROR, "pg_stop backup() failed");
|
||||
else
|
||||
{
|
||||
switch (PQresultStatus(res))
|
||||
{
|
||||
case PGRES_TUPLES_OK:
|
||||
case PGRES_COMMAND_OK:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "query failed: %s query was: %s",
|
||||
PQerrorMessage(conn), stop_backup_query);
|
||||
}
|
||||
elog(INFO, "pg_stop backup() successfully executed");
|
||||
}
|
||||
|
||||
backup_in_progress = false;
|
||||
|
||||
@ -1771,8 +1836,12 @@ pg_stop_backup(pgBackup *backup)
|
||||
PQclear(res);
|
||||
|
||||
if (stream_wal)
|
||||
{
|
||||
/* Wait for the completion of stream */
|
||||
pthread_join(stream_thread, NULL);
|
||||
if (stream_thread_arg.ret == 1)
|
||||
elog(ERROR, "WAL streaming failed");
|
||||
}
|
||||
}
|
||||
|
||||
/* Fill in fields if that is the correct end of backup. */
|
||||
@ -1858,7 +1927,7 @@ backup_cleanup(bool fatal, void *userdata)
|
||||
*/
|
||||
if (current.status == BACKUP_STATUS_RUNNING && current.end_time == 0)
|
||||
{
|
||||
elog(INFO, "Backup %s is running, setting its status to ERROR",
|
||||
elog(WARNING, "Backup %s is running, setting its status to ERROR",
|
||||
base36enc(current.start_time));
|
||||
current.end_time = time(NULL);
|
||||
current.status = BACKUP_STATUS_ERROR;
|
||||
@ -1870,7 +1939,7 @@ backup_cleanup(bool fatal, void *userdata)
|
||||
*/
|
||||
if (backup_in_progress)
|
||||
{
|
||||
elog(LOG, "backup in progress, stop backup");
|
||||
elog(WARNING, "backup in progress, stop backup");
|
||||
pg_stop_backup(NULL); /* don't care stop_lsn on error case */
|
||||
}
|
||||
}
|
||||
@ -2012,6 +2081,8 @@ backup_files(void *arg)
|
||||
if (arguments->thread_backup_conn)
|
||||
pgut_disconnect(arguments->thread_backup_conn);
|
||||
|
||||
/* Data files transferring is successful */
|
||||
arguments->ret = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2548,7 +2619,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
|
||||
|
||||
/* we assume that we get called once at the end of each segment */
|
||||
if (segment_finished)
|
||||
elog(LOG, _("finished segment at %X/%X (timeline %u)\n"),
|
||||
elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
|
||||
(uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
|
||||
|
||||
/*
|
||||
@ -2566,7 +2637,10 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
|
||||
if (!XLogRecPtrIsInvalid(stop_backup_lsn))
|
||||
{
|
||||
if (xlogpos > stop_backup_lsn)
|
||||
{
|
||||
stop_stream_lsn = xlogpos;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* pg_stop_backup() was executed, wait for the completion of stream */
|
||||
if (stream_stop_timeout == 0)
|
||||
@ -2600,45 +2674,13 @@ StreamLog(void *arg)
|
||||
{
|
||||
XLogRecPtr startpos;
|
||||
TimeLineID starttli;
|
||||
char *basedir = (char *)arg;
|
||||
|
||||
/*
|
||||
* Connect in replication mode to the server
|
||||
*/
|
||||
if (conn == NULL)
|
||||
conn = pgut_connect_replication(pgut_dbname);
|
||||
if (!conn)
|
||||
{
|
||||
pthread_mutex_unlock(&start_stream_mut);
|
||||
/* Error message already written in GetConnection() */
|
||||
return;
|
||||
}
|
||||
|
||||
if (!CheckServerVersionForStreaming(conn))
|
||||
{
|
||||
/*
|
||||
* Error message already written in CheckServerVersionForStreaming().
|
||||
* There's no hope of recovering from a version mismatch, so don't
|
||||
* retry.
|
||||
*/
|
||||
disconnect_and_exit(1);
|
||||
}
|
||||
|
||||
/*
|
||||
* Identify server, obtaining start LSN position and current timeline ID
|
||||
* at the same time, necessary if not valid data can be found in the
|
||||
* existing output directory.
|
||||
*/
|
||||
if (!RunIdentifySystem(conn, NULL, &starttli, &startpos, NULL))
|
||||
disconnect_and_exit(1);
|
||||
|
||||
/* Ok we have normal stream connect and main process can work again */
|
||||
pthread_mutex_unlock(&start_stream_mut);
|
||||
StreamThreadArg *stream_arg = (StreamThreadArg *) arg;
|
||||
|
||||
/*
|
||||
* We must use startpos as start_lsn from start_backup
|
||||
*/
|
||||
startpos = current.start_lsn;
|
||||
starttli = current.tli;
|
||||
|
||||
/*
|
||||
* Always start streaming at the beginning of a segment
|
||||
@ -2652,7 +2694,7 @@ StreamLog(void *arg)
|
||||
/*
|
||||
* Start the replication
|
||||
*/
|
||||
elog(LOG, _("starting log streaming at %X/%X (timeline %u)\n"),
|
||||
elog(LOG, _("started streaming WAL at %X/%X (timeline %u)"),
|
||||
(uint32) (startpos >> 32), (uint32) startpos, starttli);
|
||||
|
||||
#if PG_VERSION_NUM >= 90600
|
||||
@ -2666,11 +2708,11 @@ StreamLog(void *arg)
|
||||
ctl.sysidentifier = NULL;
|
||||
|
||||
#if PG_VERSION_NUM >= 100000
|
||||
ctl.walmethod = CreateWalDirectoryMethod(basedir, 0, true);
|
||||
ctl.walmethod = CreateWalDirectoryMethod(stream_arg->basedir, 0, true);
|
||||
ctl.replication_slot = replication_slot;
|
||||
ctl.stop_socket = PGINVALID_SOCKET;
|
||||
#else
|
||||
ctl.basedir = basedir;
|
||||
ctl.basedir = (char *) stream_arg->basedir;
|
||||
#endif
|
||||
|
||||
ctl.stream_stop = stop_streaming;
|
||||
@ -2679,7 +2721,7 @@ StreamLog(void *arg)
|
||||
ctl.synchronous = false;
|
||||
ctl.mark_done = false;
|
||||
|
||||
if(ReceiveXlogStream(conn, &ctl) == false)
|
||||
if(ReceiveXlogStream(stream_arg->conn, &ctl) == false)
|
||||
elog(ERROR, "Problem in receivexlog");
|
||||
|
||||
#if PG_VERSION_NUM >= 100000
|
||||
@ -2689,14 +2731,18 @@ StreamLog(void *arg)
|
||||
#endif
|
||||
}
|
||||
#else
|
||||
if(ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
|
||||
stop_streaming, standby_message_timeout, NULL,
|
||||
false, false) == false)
|
||||
if(ReceiveXlogStream(stream_arg->conn, startpos, starttli, NULL, basedir,
|
||||
stop_streaming, standby_message_timeout, NULL,
|
||||
false, false) == false)
|
||||
elog(ERROR, "Problem in receivexlog");
|
||||
#endif
|
||||
|
||||
PQfinish(conn);
|
||||
conn = NULL;
|
||||
elog(LOG, _("finished streaming WAL at %X/%X (timeline %u)"),
|
||||
(uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, starttli);
|
||||
stream_arg->ret = 0;
|
||||
|
||||
PQfinish(stream_arg->conn);
|
||||
stream_arg->conn = NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -321,9 +321,6 @@ main(int argc, char *argv[])
|
||||
if (rc != -1 && !S_ISDIR(stat_buf.st_mode))
|
||||
elog(ERROR, "-B, --backup-path must be a path to directory");
|
||||
|
||||
/* Initialize logger */
|
||||
init_logger(backup_path);
|
||||
|
||||
/* command was initialized for a few commands */
|
||||
if (command)
|
||||
{
|
||||
@ -376,6 +373,9 @@ main(int argc, char *argv[])
|
||||
pgut_readopt(path, options, ERROR);
|
||||
}
|
||||
|
||||
/* Initialize logger */
|
||||
init_logger(backup_path);
|
||||
|
||||
/*
|
||||
* We have read pgdata path from command line or from configuration file.
|
||||
* Ensure that pgdata is an absolute path.
|
||||
|
@ -252,19 +252,25 @@ typedef struct pgRecoveryTarget
|
||||
/* Union to ease operations on relation pages */
|
||||
typedef union DataPage
|
||||
{
|
||||
PageHeaderData page_data;
|
||||
char data[BLCKSZ];
|
||||
PageHeaderData page_data;
|
||||
char data[BLCKSZ];
|
||||
} DataPage;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
const char *from_root;
|
||||
const char *to_root;
|
||||
parray *backup_files_list;
|
||||
parray *prev_backup_filelist;
|
||||
XLogRecPtr prev_backup_start_lsn;
|
||||
PGconn *thread_backup_conn;
|
||||
PGcancel *thread_cancel_conn;
|
||||
parray *backup_files_list;
|
||||
parray *prev_backup_filelist;
|
||||
XLogRecPtr prev_backup_start_lsn;
|
||||
PGconn *thread_backup_conn;
|
||||
PGcancel *thread_cancel_conn;
|
||||
|
||||
/*
|
||||
* Return value from the thread.
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} backup_files_args;
|
||||
|
||||
/*
|
||||
|
@ -22,6 +22,12 @@ typedef struct
|
||||
{
|
||||
parray *files;
|
||||
pgBackup *backup;
|
||||
|
||||
/*
|
||||
* Return value from the thread.
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} restore_files_args;
|
||||
|
||||
/* Tablespace mapping structures */
|
||||
@ -358,6 +364,7 @@ restore_backup(pgBackup *backup)
|
||||
int i;
|
||||
pthread_t restore_threads[num_threads];
|
||||
restore_files_args *restore_threads_args[num_threads];
|
||||
bool restore_isok = true;
|
||||
|
||||
if (backup->status != BACKUP_STATUS_OK)
|
||||
elog(ERROR, "Backup %s cannot be restored because it is not valid",
|
||||
@ -403,19 +410,27 @@ restore_backup(pgBackup *backup)
|
||||
restore_files_args *arg = pg_malloc(sizeof(restore_files_args));
|
||||
arg->files = files;
|
||||
arg->backup = backup;
|
||||
/* By default there are some error */
|
||||
arg->ret = 1;
|
||||
|
||||
elog(LOG, "Start thread for num:%li", parray_num(files));
|
||||
|
||||
restore_threads_args[i] = arg;
|
||||
pthread_create(&restore_threads[i], NULL, (void *(*)(void *)) restore_files, arg);
|
||||
pthread_create(&restore_threads[i], NULL,
|
||||
(void *(*)(void *)) restore_files, arg);
|
||||
}
|
||||
|
||||
/* Wait theads */
|
||||
for (i = 0; i < num_threads; i++)
|
||||
{
|
||||
pthread_join(restore_threads[i], NULL);
|
||||
if (restore_threads_args[i]->ret == 1)
|
||||
restore_isok = false;
|
||||
|
||||
pg_free(restore_threads_args[i]);
|
||||
}
|
||||
if (!restore_isok)
|
||||
elog(ERROR, "Data files restoring failed");
|
||||
|
||||
/* cleanup */
|
||||
parray_walk(files, pgFileFree);
|
||||
@ -761,6 +776,9 @@ restore_files(void *arg)
|
||||
elog(LOG, "Restored file %s : %lu bytes",
|
||||
file->path, (unsigned long) file->write_size);
|
||||
}
|
||||
|
||||
/* Data files restoring is successful */
|
||||
arguments->ret = 0;
|
||||
}
|
||||
|
||||
/* Create recovery.conf with given recovery target parameters */
|
||||
|
@ -142,7 +142,7 @@ elog_internal(int elevel, bool file_only, const char *fmt, va_list args)
|
||||
* There is no need to lock if this is elog() from upper elog() and
|
||||
* logging is not initialized.
|
||||
*/
|
||||
if (write_to_file || write_to_error_log)
|
||||
if (write_to_file || write_to_error_log || write_to_stderr)
|
||||
pthread_mutex_lock(&log_file_mutex);
|
||||
|
||||
/* We need copy args only if we need write to error log file */
|
||||
@ -228,12 +228,25 @@ elog_internal(int elevel, bool file_only, const char *fmt, va_list args)
|
||||
va_end(std_args);
|
||||
}
|
||||
|
||||
if (write_to_file || write_to_error_log)
|
||||
if (write_to_file || write_to_error_log || write_to_stderr)
|
||||
pthread_mutex_unlock(&log_file_mutex);
|
||||
|
||||
/* Exit with code if it is an error */
|
||||
if (elevel > WARNING)
|
||||
exit(elevel);
|
||||
/*
|
||||
* Exit with code if it is an error.
|
||||
* Check for in_cleanup flag to avoid deadlock in case of ERROR in cleanup
|
||||
* routines.
|
||||
*/
|
||||
if (elevel > WARNING && !in_cleanup)
|
||||
{
|
||||
/* Interrupt other possible routines */
|
||||
interrupted = true;
|
||||
|
||||
/* If this is not the main thread then don't call exit() */
|
||||
if (!pthread_equal(main_tid, pthread_self()))
|
||||
pthread_exit(NULL);
|
||||
else
|
||||
exit(elevel);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -42,6 +42,8 @@ static char *password = NULL;
|
||||
bool prompt_password = true;
|
||||
bool force_password = false;
|
||||
|
||||
pthread_t main_tid = 0;
|
||||
|
||||
/* Database connections */
|
||||
static PGcancel *volatile cancel_conn = NULL;
|
||||
|
||||
@ -1065,6 +1067,7 @@ pgut_getopt(int argc, char **argv, pgut_option options[])
|
||||
|
||||
init_cancel_handler();
|
||||
atexit(on_cleanup);
|
||||
main_tid = pthread_self();
|
||||
|
||||
return optind;
|
||||
}
|
||||
@ -1640,7 +1643,7 @@ pgut_execute_parallel(PGconn* conn,
|
||||
elog(ERROR, "interrupted");
|
||||
|
||||
/* write query to elog if verbose */
|
||||
if (LOG_LEVEL_CONSOLE <= LOG || LOG_LEVEL_FILE <= LOG)
|
||||
if (LOG_LEVEL_CONSOLE <= VERBOSE || LOG_LEVEL_FILE <= VERBOSE)
|
||||
{
|
||||
int i;
|
||||
|
||||
@ -1694,7 +1697,7 @@ pgut_execute(PGconn* conn, const char *query, int nParams, const char **params,
|
||||
elog(ERROR, "interrupted");
|
||||
|
||||
/* write query to elog if verbose */
|
||||
if (LOG_LEVEL_CONSOLE <= LOG || LOG_LEVEL_FILE <= LOG)
|
||||
if (LOG_LEVEL_CONSOLE <= VERBOSE || LOG_LEVEL_FILE <= VERBOSE)
|
||||
{
|
||||
int i;
|
||||
|
||||
@ -1748,16 +1751,16 @@ pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int
|
||||
elog(ERROR, "interrupted");
|
||||
|
||||
/* write query to elog if verbose */
|
||||
if (LOG_LEVEL_CONSOLE <= LOG)
|
||||
if (LOG_LEVEL_CONSOLE <= VERBOSE || LOG_LEVEL_FILE <= VERBOSE)
|
||||
{
|
||||
int i;
|
||||
|
||||
if (strchr(query, '\n'))
|
||||
elog(LOG, "(query)\n%s", query);
|
||||
elog(VERBOSE, "(query)\n%s", query);
|
||||
else
|
||||
elog(LOG, "(query) %s", query);
|
||||
elog(VERBOSE, "(query) %s", query);
|
||||
for (i = 0; i < nParams; i++)
|
||||
elog(LOG, "\t(param:%d) = %s", i, params[i] ? params[i] : "(null)");
|
||||
elog(VERBOSE, "\t(param:%d) = %s", i, params[i] ? params[i] : "(null)");
|
||||
}
|
||||
|
||||
if (conn == NULL)
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "pqexpbuffer.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "logger.h"
|
||||
@ -93,6 +94,9 @@ extern const char *PROGRAM_VERSION;
|
||||
extern const char *PROGRAM_URL;
|
||||
extern const char *PROGRAM_EMAIL;
|
||||
|
||||
/* ID of the main thread */
|
||||
extern pthread_t main_tid;
|
||||
|
||||
extern void pgut_help(bool details);
|
||||
|
||||
/*
|
||||
|
@ -23,6 +23,12 @@ typedef struct
|
||||
{
|
||||
parray *files;
|
||||
bool corrupted;
|
||||
|
||||
/*
|
||||
* Return value from the thread.
|
||||
* 0 means there is no error, 1 - there is an error.
|
||||
*/
|
||||
int ret;
|
||||
} validate_files_args;
|
||||
|
||||
/*
|
||||
@ -35,6 +41,7 @@ pgBackupValidate(pgBackup *backup)
|
||||
char path[MAXPGPATH];
|
||||
parray *files;
|
||||
bool corrupted = false;
|
||||
bool validation_isok = true;
|
||||
pthread_t validate_threads[num_threads];
|
||||
validate_files_args *validate_threads_args[num_threads];
|
||||
int i;
|
||||
@ -79,8 +86,12 @@ pgBackupValidate(pgBackup *backup)
|
||||
validate_files_args *arg = pg_malloc(sizeof(validate_files_args));
|
||||
arg->files = files;
|
||||
arg->corrupted = false;
|
||||
/* By default there are some error */
|
||||
arg->ret = 1;
|
||||
|
||||
validate_threads_args[i] = arg;
|
||||
pthread_create(&validate_threads[i], NULL, (void *(*)(void *)) pgBackupValidateFiles, arg);
|
||||
pthread_create(&validate_threads[i], NULL,
|
||||
(void *(*)(void *)) pgBackupValidateFiles, arg);
|
||||
}
|
||||
|
||||
/* Wait theads */
|
||||
@ -89,8 +100,12 @@ pgBackupValidate(pgBackup *backup)
|
||||
pthread_join(validate_threads[i], NULL);
|
||||
if (validate_threads_args[i]->corrupted)
|
||||
corrupted = true;
|
||||
if (validate_threads_args[i]->ret == 1)
|
||||
validation_isok = false;
|
||||
pg_free(validate_threads_args[i]);
|
||||
}
|
||||
if (!validation_isok)
|
||||
elog(ERROR, "Data files validation failed");
|
||||
|
||||
/* cleanup */
|
||||
parray_walk(files, pgFileFree);
|
||||
@ -159,7 +174,7 @@ pgBackupValidateFiles(void *arg)
|
||||
elog(WARNING, "Cannot stat backup file \"%s\": %s",
|
||||
file->path, strerror(errno));
|
||||
arguments->corrupted = true;
|
||||
return;
|
||||
break;
|
||||
}
|
||||
|
||||
if (file->write_size != st.st_size)
|
||||
@ -168,7 +183,7 @@ pgBackupValidateFiles(void *arg)
|
||||
file->path, (unsigned long) file->write_size,
|
||||
(unsigned long) st.st_size);
|
||||
arguments->corrupted = true;
|
||||
return;
|
||||
break;
|
||||
}
|
||||
|
||||
crc = pgFileGetCRC(file);
|
||||
@ -177,9 +192,12 @@ pgBackupValidateFiles(void *arg)
|
||||
elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X",
|
||||
file->path, file->crc, crc);
|
||||
arguments->corrupted = true;
|
||||
return;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Data files validation is successful */
|
||||
arguments->ret = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -6,6 +6,7 @@ The Test suite check behavior of pg_probackup utility, if password is required f
|
||||
import os
|
||||
import unittest
|
||||
import signal
|
||||
import time
|
||||
|
||||
from .helpers.ptrack_helpers import ProbackupTest, ProbackupException
|
||||
from testgres import StartNodeException
|
||||
@ -20,6 +21,162 @@ except ImportError:
|
||||
skip_test = True
|
||||
|
||||
|
||||
class SimpleAuthTest(ProbackupTest, unittest.TestCase):
|
||||
|
||||
# @unittest.skip("skip")
|
||||
def test_backup_via_unpriviledged_user(self):
|
||||
"""
|
||||
Make node, create unpriviledged user, try to
|
||||
run a backups without EXECUTE rights on
|
||||
certain functions
|
||||
"""
|
||||
fname = self.id().split('.')[3]
|
||||
node = self.make_simple_node(
|
||||
base_dir="{0}/{1}/node".format(module_name, fname),
|
||||
set_replication=True,
|
||||
initdb_params=['--data-checksums'],
|
||||
pg_options={
|
||||
'wal_level': 'replica',
|
||||
'max_wal_senders': '2'}
|
||||
)
|
||||
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
|
||||
self.init_pb(backup_dir)
|
||||
self.add_instance(backup_dir, 'node', node)
|
||||
self.set_archiving(backup_dir, 'node', node)
|
||||
node.start()
|
||||
|
||||
node.safe_psql("postgres", "CREATE ROLE backup with LOGIN")
|
||||
|
||||
try:
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, options=['-U', 'backup'])
|
||||
self.assertEqual(
|
||||
1, 0,
|
||||
"Expecting Error due to missing grant on EXECUTE.")
|
||||
except ProbackupException as e:
|
||||
self.assertIn(
|
||||
"ERROR: query failed: ERROR: permission denied "
|
||||
"for function pg_start_backup", e.message,
|
||||
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
|
||||
repr(e.message), self.cmd))
|
||||
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"GRANT EXECUTE ON FUNCTION"
|
||||
" pg_start_backup(text, boolean, boolean) TO backup;")
|
||||
|
||||
time.sleep(1)
|
||||
try:
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, options=['-U', 'backup'])
|
||||
self.assertEqual(
|
||||
1, 0,
|
||||
"Expecting Error due to missing grant on EXECUTE.")
|
||||
except ProbackupException as e:
|
||||
self.assertIn(
|
||||
"ERROR: query failed: ERROR: permission denied for function "
|
||||
"pg_create_restore_point\nquery was: "
|
||||
"SELECT pg_catalog.pg_create_restore_point($1)", e.message,
|
||||
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
|
||||
repr(e.message), self.cmd))
|
||||
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"GRANT EXECUTE ON FUNCTION"
|
||||
" pg_create_restore_point(text) TO backup;")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, options=['-U', 'backup'])
|
||||
self.assertEqual(
|
||||
1, 0,
|
||||
"Expecting Error due to missing grant on EXECUTE.")
|
||||
except ProbackupException as e:
|
||||
self.assertIn(
|
||||
"ERROR: query failed: ERROR: permission denied "
|
||||
"for function pg_stop_backup", e.message,
|
||||
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
|
||||
repr(e.message), self.cmd))
|
||||
|
||||
if self.get_version(node) < self.version_to_num('10.0'):
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"GRANT EXECUTE ON FUNCTION pg_stop_backup(boolean) TO backup")
|
||||
else:
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"GRANT EXECUTE ON FUNCTION "
|
||||
"pg_stop_backup(boolean, boolean) TO backup")
|
||||
# Do this for ptrack backups
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"GRANT EXECUTE ON FUNCTION pg_stop_backup() TO backup")
|
||||
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, options=['-U', 'backup'])
|
||||
|
||||
node.safe_psql("postgres", "CREATE DATABASE test1")
|
||||
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, options=['-U', 'backup'])
|
||||
|
||||
node.safe_psql(
|
||||
"test1", "create table t1 as select generate_series(0,100)")
|
||||
|
||||
node.append_conf("postgresql.auto.conf", "ptrack_enable = 'on'")
|
||||
node.restart()
|
||||
|
||||
try:
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, options=['-U', 'backup'])
|
||||
self.assertEqual(
|
||||
1, 0,
|
||||
"Expecting Error due to missing grant on clearing ptrack_files.")
|
||||
except ProbackupException as e:
|
||||
self.assertIn(
|
||||
"ERROR: must be superuser or replication role to clear ptrack files\n"
|
||||
"query was: SELECT pg_catalog.pg_ptrack_clear()", e.message,
|
||||
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
|
||||
repr(e.message), self.cmd))
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node,
|
||||
backup_type='ptrack', options=['-U', 'backup'])
|
||||
self.assertEqual(
|
||||
1, 0,
|
||||
"Expecting Error due to missing grant on clearing ptrack_files.")
|
||||
except ProbackupException as e:
|
||||
self.assertIn(
|
||||
"ERROR: must be superuser or replication role read ptrack files\n"
|
||||
"query was: select pg_catalog.pg_ptrack_control_lsn()", e.message,
|
||||
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(
|
||||
repr(e.message), self.cmd))
|
||||
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"ALTER ROLE backup REPLICATION")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# FULL
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node,
|
||||
options=['-U', 'backup'])
|
||||
|
||||
# PTRACK
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node,
|
||||
backup_type='ptrack', options=['-U', 'backup'])
|
||||
|
||||
# Clean after yourself
|
||||
self.del_test_dir(module_name, fname)
|
||||
|
||||
|
||||
class AuthTest(unittest.TestCase):
|
||||
pb = None
|
||||
node = None
|
||||
|
@ -1708,3 +1708,56 @@ class PtrackTest(ProbackupTest, unittest.TestCase):
|
||||
|
||||
# Clean after yourself
|
||||
self.del_test_dir(module_name, fname)
|
||||
|
||||
# @unittest.skip("skip")
|
||||
# @unittest.expectedFailure
|
||||
def test_atexit_fail(self):
|
||||
"""
|
||||
Take backups of every available types and check that PTRACK is clean
|
||||
"""
|
||||
fname = self.id().split('.')[3]
|
||||
node = self.make_simple_node(
|
||||
base_dir="{0}/{1}/node".format(module_name, fname),
|
||||
set_replication=True,
|
||||
initdb_params=['--data-checksums'],
|
||||
pg_options={
|
||||
'ptrack_enable': 'on',
|
||||
'wal_level': 'replica',
|
||||
'max_wal_senders': '2'})
|
||||
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
|
||||
self.init_pb(backup_dir)
|
||||
self.add_instance(backup_dir, 'node', node)
|
||||
node.start()
|
||||
|
||||
# Take FULL backup to clean every ptrack
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, options=['--stream'])
|
||||
|
||||
try:
|
||||
self.backup_node(
|
||||
backup_dir, 'node', node, backup_type='ptrack',
|
||||
options=["--stream", "-j 300"]
|
||||
)
|
||||
# we should die here because exception is what we expect to happen
|
||||
self.assertEqual(
|
||||
1, 0,
|
||||
"Expecting Error because we are opening too many connections"
|
||||
"\n Output: {0} \n CMD: {1}".format(
|
||||
repr(self.output), self.cmd)
|
||||
)
|
||||
except ProbackupException as e:
|
||||
self.assertIn(
|
||||
'setting its status to ERROR',
|
||||
e.message,
|
||||
'\n Unexpected Error Message: {0}\n'
|
||||
' CMD: {1}'.format(repr(e.message), self.cmd)
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
node.safe_psql(
|
||||
"postgres",
|
||||
"select * from pg_is_in_backup()").rstrip(),
|
||||
"f")
|
||||
|
||||
# Clean after yourself
|
||||
self.del_test_dir(module_name, fname)
|
||||
|
Loading…
x
Reference in New Issue
Block a user