1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-12-12 11:45:24 +02:00

Make replication connection outside StreamLog()

This commit is contained in:
Arthur Zakirov 2018-04-10 19:02:00 +03:00
parent ad083f827b
commit 45988aae68

View File

@ -46,14 +46,14 @@ const char *progname = "pg_probackup";
/* list of files contained in backup */ /* list of files contained in backup */
static parray *backup_files_list = NULL; static parray *backup_files_list = NULL;
static pthread_mutex_t start_stream_mut = PTHREAD_MUTEX_INITIALIZER;
/* /*
* We need to wait end of WAL streaming before execute pg_stop_backup(). * We need to wait end of WAL streaming before execute pg_stop_backup().
*/ */
typedef struct typedef struct
{ {
const char *basedir; const char *basedir;
PGconn *conn;
/* /*
* Return value from the thread. * Return value from the thread.
* 0 means there is no error, 1 - there is an error. * 0 means there is no error, 1 - there is an error.
@ -62,7 +62,7 @@ typedef struct
} StreamThreadArg; } StreamThreadArg;
static pthread_t stream_thread; static pthread_t stream_thread;
static StreamThreadArg stream_thread_arg = {"", 1}; static StreamThreadArg stream_thread_arg = {"", NULL, 1};
static int is_ptrack_enable = false; static int is_ptrack_enable = false;
bool is_ptrack_support = false; bool is_ptrack_support = false;
@ -558,17 +558,39 @@ do_backup_instance(void)
dir_create_dir(dst_backup_path, DIR_PERMISSION); dir_create_dir(dst_backup_path, DIR_PERMISSION);
stream_thread_arg.basedir = dst_backup_path; stream_thread_arg.basedir = dst_backup_path;
/*
* Connect in replication mode to the server.
*/
stream_thread_arg.conn = pgut_connect_replication(pgut_dbname);
if (!CheckServerVersionForStreaming(stream_thread_arg.conn))
{
PQfinish(stream_thread_arg.conn);
/*
* Error message already written in CheckServerVersionForStreaming().
* There's no hope of recovering from a version mismatch, so don't
* retry.
*/
elog(ERROR, "Cannot continue backup because stream connect has failed.");
}
/*
* Identify server, obtaining start LSN position and current timeline ID
* at the same time, necessary if not valid data can be found in the
* existing output directory.
*/
if (!RunIdentifySystem(stream_thread_arg.conn, NULL, NULL, NULL, NULL))
{
PQfinish(stream_thread_arg.conn);
elog(ERROR, "Cannot continue backup because stream connect has failed.");
}
/* By default there are some error */ /* By default there are some error */
stream_thread_arg.ret = 1; stream_thread_arg.ret = 1;
pthread_mutex_lock(&start_stream_mut);
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog,
&stream_thread_arg); &stream_thread_arg);
pthread_mutex_lock(&start_stream_mut);
if (conn == NULL)
elog(ERROR, "Cannot continue backup because stream connect has failed.");
pthread_mutex_unlock(&start_stream_mut);
} }
/* initialize backup list */ /* initialize backup list */
@ -2653,43 +2675,11 @@ StreamLog(void *arg)
TimeLineID starttli; TimeLineID starttli;
StreamThreadArg *stream_arg = (StreamThreadArg *) arg; StreamThreadArg *stream_arg = (StreamThreadArg *) arg;
/*
* Connect in replication mode to the server
*/
if (conn == NULL)
conn = pgut_connect_replication(pgut_dbname);
if (!conn)
{
pthread_mutex_unlock(&start_stream_mut);
/* Error message already written in GetConnection() */
return;
}
if (!CheckServerVersionForStreaming(conn))
{
/*
* Error message already written in CheckServerVersionForStreaming().
* There's no hope of recovering from a version mismatch, so don't
* retry.
*/
disconnect_and_exit(1);
}
/*
* Identify server, obtaining start LSN position and current timeline ID
* at the same time, necessary if not valid data can be found in the
* existing output directory.
*/
if (!RunIdentifySystem(conn, NULL, &starttli, &startpos, NULL))
disconnect_and_exit(1);
/* Ok we have normal stream connect and main process can work again */
pthread_mutex_unlock(&start_stream_mut);
/* /*
* We must use startpos as start_lsn from start_backup * We must use startpos as start_lsn from start_backup
*/ */
startpos = current.start_lsn; startpos = current.start_lsn;
starttli = current.tli;
/* /*
* Always start streaming at the beginning of a segment * Always start streaming at the beginning of a segment
@ -2730,7 +2720,7 @@ StreamLog(void *arg)
ctl.synchronous = false; ctl.synchronous = false;
ctl.mark_done = false; ctl.mark_done = false;
if(ReceiveXlogStream(conn, &ctl) == false) if(ReceiveXlogStream(stream_arg->conn, &ctl) == false)
elog(ERROR, "Problem in receivexlog"); elog(ERROR, "Problem in receivexlog");
#if PG_VERSION_NUM >= 100000 #if PG_VERSION_NUM >= 100000
@ -2740,7 +2730,7 @@ StreamLog(void *arg)
#endif #endif
} }
#else #else
if(ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, if(ReceiveXlogStream(stream_arg->conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, NULL, stop_streaming, standby_message_timeout, NULL,
false, false) == false) false, false) == false)
elog(ERROR, "Problem in receivexlog"); elog(ERROR, "Problem in receivexlog");
@ -2750,8 +2740,8 @@ StreamLog(void *arg)
(uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, starttli); (uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, starttli);
stream_arg->ret = 0; stream_arg->ret = 0;
PQfinish(conn); PQfinish(stream_arg->conn);
conn = NULL; stream_arg->conn = NULL;
} }
/* /*