From dad2747ddc593382058358cff50969022ef54832 Mon Sep 17 00:00:00 2001 From: anastasia Date: Wed, 11 Nov 2020 22:19:09 +0300 Subject: [PATCH] Refactor code related to WAL streaming. Move it to stream.c --- Makefile | 4 +- src/backup.c | 314 +------------------------------------- src/pg_probackup.h | 7 + src/stream.c | 368 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 380 insertions(+), 313 deletions(-) create mode 100644 src/stream.c diff --git a/Makefile b/Makefile index bc37860b..1431be4e 100644 --- a/Makefile +++ b/Makefile @@ -6,8 +6,8 @@ OBJS = src/utils/configuration.o src/utils/json.o src/utils/logger.o \ OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \ src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \ - src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/util.o \ - src/validate.o src/datapagemap.o + src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \ + src/util.o src/validate.o src/datapagemap.o # borrowed files OBJS += src/pg_crc.o src/receivelog.o src/streamutil.o \ diff --git a/src/backup.c b/src/backup.c index 931a8367..c65a2171 100644 --- a/src/backup.c +++ b/src/backup.c @@ -15,7 +15,6 @@ #endif #include "catalog/pg_tablespace.h" #include "pgtar.h" -#include "receivelog.h" #include "streamutil.h" #include @@ -25,18 +24,6 @@ #include "utils/thread.h" #include "utils/file.h" -static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ -static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr; -static XLogRecPtr stop_stream_lsn = InvalidXLogRecPtr; - -/* - * How long we should wait for streaming end in seconds. - * Retrieved as checkpoint_timeout + checkpoint_timeout * 0.1 - */ -static uint32 stream_stop_timeout = 0; -/* Time in which we started to wait for streaming end */ -static time_t stream_stop_begin = 0; - const char *progname = "pg_probackup"; /* list of files contained in backup */ @@ -45,26 +32,6 @@ static parray *backup_files_list = NULL; /* We need critical section for datapagemap_add() in case of using threads */ static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER; -/* - * We need to wait end of WAL streaming before execute pg_stop_backup(). - */ -typedef struct -{ - const char *basedir; - PGconn *conn; - - /* - * Return value from the thread. - * 0 means there is no error, 1 - there is an error. - */ - int ret; - - XLogRecPtr startpos; - TimeLineID starttli; -} StreamThreadArg; - -static pthread_t stream_thread; -static StreamThreadArg stream_thread_arg = {"", NULL, 1}; bool exclusive_backup = false; @@ -86,15 +53,11 @@ static void pg_start_backup(const char *label, bool smooth, pgBackup *backup, PGNodeInfo *nodeInfo, PGconn *conn); static void pg_switch_wal(PGconn *conn); static void pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn, PGNodeInfo *nodeInfo); -static int checkpoint_timeout(PGconn *backup_conn); static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, TimeLineID tli, bool in_prev_segment, bool segment_only, int timeout_elevel, bool in_stream_dir); -static void *StreamLog(void *arg); -static void IdentifySystem(StreamThreadArg *stream_thread_arg); - static void check_external_for_tablespaces(parray *external_list, PGconn *backup_conn); static parray *get_database_map(PGconn *pg_startbackup_conn); @@ -310,33 +273,11 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool /* start stream replication */ if (stream_wal) { - /* How long we should wait for streaming end after pg_stop_backup */ - stream_stop_timeout = checkpoint_timeout(backup_conn); - stream_stop_timeout = stream_stop_timeout + stream_stop_timeout * 0.1; - join_path_components(dst_backup_path, database_path, PG_XLOG_DIR); fio_mkdir(dst_backup_path, DIR_PERMISSION, FIO_BACKUP_HOST); - stream_thread_arg.basedir = dst_backup_path; - - /* - * Connect in replication mode to the server. - */ - stream_thread_arg.conn = pgut_connect_replication(instance_config.conn_opt.pghost, - instance_config.conn_opt.pgport, - instance_config.conn_opt.pgdatabase, - instance_config.conn_opt.pguser); - /* sanity */ - IdentifySystem(&stream_thread_arg); - - /* By default there are some error */ - stream_thread_arg.ret = 1; - /* we must use startpos as start_lsn from start_backup */ - stream_thread_arg.startpos = current.start_lsn; - stream_thread_arg.starttli = current.tli; - - thread_interrupted = false; - pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg); + start_WAL_streaming(backup_conn, dst_backup_path, &instance_config.conn_opt, + current.start_lsn, current.tli); } /* initialize backup list */ @@ -1979,10 +1920,7 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn, if (stream_wal) { - /* Wait for the completion of stream */ - pthread_join(stream_thread, NULL); - if (stream_thread_arg.ret == 1) - elog(ERROR, "WAL streaming failed"); + wait_WAL_streaming_end(); pgBackupGetPath2(backup, stream_xlog_path, lengthof(stream_xlog_path), @@ -2009,35 +1947,6 @@ pg_stop_backup(pgBackup *backup, PGconn *pg_startbackup_conn, } } -/* - * Retrieve checkpoint_timeout GUC value in seconds. - */ -static int -checkpoint_timeout(PGconn *backup_conn) -{ - PGresult *res; - const char *val; - const char *hintmsg; - int val_int; - - res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL); - val = PQgetvalue(res, 0, 0); - - if (!parse_int(val, &val_int, OPTION_UNIT_S, &hintmsg)) - { - PQclear(res); - if (hintmsg) - elog(ERROR, "Invalid value of checkout_timeout %s: %s", val, - hintmsg); - else - elog(ERROR, "Invalid value of checkout_timeout %s", val); - } - - PQclear(res); - - return val_int; -} - /* * Notify end of backup to server when "backup_label" is in the root directory * of the DB cluster. @@ -2391,164 +2300,6 @@ process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno) } -/* - * Stop WAL streaming if current 'xlogpos' exceeds 'stop_backup_lsn', which is - * set by pg_stop_backup(). - * - * TODO: Add streamed file to file list when segment is finished - */ -static bool -stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) -{ - static uint32 prevtimeline = 0; - static XLogRecPtr prevpos = InvalidXLogRecPtr; - - /* check for interrupt */ - if (interrupted || thread_interrupted) - elog(ERROR, "Interrupted during WAL streaming"); - - /* we assume that we get called once at the end of each segment */ - if (segment_finished) - elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"), - (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); - - /* - * Note that we report the previous, not current, position here. After a - * timeline switch, xlogpos points to the beginning of the segment because - * that's where we always begin streaming. Reporting the end of previous - * timeline isn't totally accurate, because the next timeline can begin - * slightly before the end of the WAL that we received on the previous - * timeline, but it's close enough for reporting purposes. - */ - if (prevtimeline != 0 && prevtimeline != timeline) - elog(LOG, _("switched to timeline %u at %X/%X\n"), - timeline, (uint32) (prevpos >> 32), (uint32) prevpos); - - if (!XLogRecPtrIsInvalid(stop_backup_lsn)) - { - if (xlogpos >= stop_backup_lsn) - { - stop_stream_lsn = xlogpos; - return true; - } - - /* pg_stop_backup() was executed, wait for the completion of stream */ - if (stream_stop_begin == 0) - { - elog(INFO, "Wait for LSN %X/%X to be streamed", - (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn); - - stream_stop_begin = time(NULL); - } - - if (time(NULL) - stream_stop_begin > stream_stop_timeout) - elog(ERROR, "Target LSN %X/%X could not be streamed in %d seconds", - (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn, - stream_stop_timeout); - } - - prevtimeline = timeline; - prevpos = xlogpos; - - return false; -} - -/* - * Start the log streaming - */ -static void * -StreamLog(void *arg) -{ - StreamThreadArg *stream_arg = (StreamThreadArg *) arg; - - /* - * Always start streaming at the beginning of a segment - */ - stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size; - - /* Initialize timeout */ - stream_stop_begin = 0; - -#if PG_VERSION_NUM >= 100000 - /* if slot name was not provided for temp slot, use default slot name */ - if (!replication_slot && temp_slot) - replication_slot = "pg_probackup_slot"; -#endif - - -#if PG_VERSION_NUM >= 110000 - /* Create temp repslot */ - if (temp_slot) - CreateReplicationSlot(stream_arg->conn, replication_slot, - NULL, temp_slot, true, true, false); -#endif - - /* - * Start the replication - */ - elog(LOG, "started streaming WAL at %X/%X (timeline %u)", - (uint32) (stream_arg->startpos >> 32), (uint32) stream_arg->startpos, - stream_arg->starttli); - -#if PG_VERSION_NUM >= 90600 - { - StreamCtl ctl; - - MemSet(&ctl, 0, sizeof(ctl)); - - ctl.startpos = stream_arg->startpos; - ctl.timeline = stream_arg->starttli; - ctl.sysidentifier = NULL; - -#if PG_VERSION_NUM >= 100000 - ctl.walmethod = CreateWalDirectoryMethod( - stream_arg->basedir, -// (instance_config.compress_alg == NONE_COMPRESS) ? 0 : instance_config.compress_level, - 0, - true); - ctl.replication_slot = replication_slot; - ctl.stop_socket = PGINVALID_SOCKET; - ctl.do_sync = false; /* We sync all files at the end of backup */ -// ctl.mark_done /* for future use in s3 */ -#if PG_VERSION_NUM >= 100000 && PG_VERSION_NUM < 110000 - ctl.temp_slot = temp_slot; -#endif -#else - ctl.basedir = (char *) stream_arg->basedir; -#endif - - ctl.stream_stop = stop_streaming; - ctl.standby_message_timeout = standby_message_timeout; - ctl.partial_suffix = NULL; - ctl.synchronous = false; - ctl.mark_done = false; - - if(ReceiveXlogStream(stream_arg->conn, &ctl) == false) - elog(ERROR, "Problem in receivexlog"); - -#if PG_VERSION_NUM >= 100000 - if (!ctl.walmethod->finish()) - elog(ERROR, "Could not finish writing WAL files: %s", - strerror(errno)); -#endif - } -#else - if(ReceiveXlogStream(stream_arg->conn, stream_arg->startpos, stream_arg->starttli, - NULL, (char *) stream_arg->basedir, stop_streaming, - standby_message_timeout, NULL, false, false) == false) - elog(ERROR, "Problem in receivexlog"); -#endif - - elog(LOG, "finished streaming WAL at %X/%X (timeline %u)", - (uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, stream_arg->starttli); - stream_arg->ret = 0; - - PQfinish(stream_arg->conn); - stream_arg->conn = NULL; - - return NULL; -} - static void check_external_for_tablespaces(parray *external_list, PGconn *backup_conn) { @@ -2613,62 +2364,3 @@ check_external_for_tablespaces(parray *external_list, PGconn *backup_conn) } } } - -/* - * Run IDENTIFY_SYSTEM through a given connection and - * check system identifier and timeline are matching - */ -void -IdentifySystem(StreamThreadArg *stream_thread_arg) -{ - PGresult *res; - - uint64 stream_conn_sysidentifier = 0; - char *stream_conn_sysidentifier_str; - TimeLineID stream_conn_tli = 0; - - if (!CheckServerVersionForStreaming(stream_thread_arg->conn)) - { - PQfinish(stream_thread_arg->conn); - /* - * Error message already written in CheckServerVersionForStreaming(). - * There's no hope of recovering from a version mismatch, so don't - * retry. - */ - elog(ERROR, "Cannot continue backup because stream connect has failed."); - } - - /* - * Identify server, obtain server system identifier and timeline - */ - res = pgut_execute(stream_thread_arg->conn, "IDENTIFY_SYSTEM", 0, NULL); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - elog(WARNING,"Could not send replication command \"%s\": %s", - "IDENTIFY_SYSTEM", PQerrorMessage(stream_thread_arg->conn)); - PQfinish(stream_thread_arg->conn); - elog(ERROR, "Cannot continue backup because stream connect has failed."); - } - - stream_conn_sysidentifier_str = PQgetvalue(res, 0, 0); - stream_conn_tli = atoll(PQgetvalue(res, 0, 1)); - - /* Additional sanity, primary for PG 9.5, - * where system id can be obtained only via "IDENTIFY SYSTEM" - */ - if (!parse_uint64(stream_conn_sysidentifier_str, &stream_conn_sysidentifier, 0)) - elog(ERROR, "%s is not system_identifier", stream_conn_sysidentifier_str); - - if (stream_conn_sysidentifier != instance_config.system_identifier) - elog(ERROR, "System identifier mismatch. Connected PostgreSQL instance has system id: " - "" UINT64_FORMAT ". Expected: " UINT64_FORMAT ".", - stream_conn_sysidentifier, instance_config.system_identifier); - - if (stream_conn_tli != current.tli) - elog(ERROR, "Timeline identifier mismatch. " - "Connected PostgreSQL instance has timeline id: %X. Expected: %X.", - stream_conn_tli, current.tli); - - PQclear(res); -} diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 9e9fd01a..1827780a 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -1172,4 +1172,11 @@ datapagemap_is_set(datapagemap_t *map, BlockNumber blkno); extern void datapagemap_print_debug(datapagemap_t *map); +/* in stream.c */ +extern XLogRecPtr stop_backup_lsn; +extern void start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, + ConnectionOptions *conn_opt, + XLogRecPtr startpos, TimeLineID starttli); +extern void wait_WAL_streaming_end(void); + #endif /* PG_PROBACKUP_H */ diff --git a/src/stream.c b/src/stream.c new file mode 100644 index 00000000..e7d6ce68 --- /dev/null +++ b/src/stream.c @@ -0,0 +1,368 @@ +/*------------------------------------------------------------------------- + * + * stream.c: pg_probackup specific code for WAL streaming + * + * Portions Copyright (c) 2015-2020, Postgres Professional + * + *------------------------------------------------------------------------- + */ + +#include "pg_probackup.h" +#include "receivelog.h" +#include "streamutil.h" + +#include +#include + +/* + * global variable needed by ReceiveXlogStream() + * + * standby_message_timeout controls how often we send a message + * back to the primary letting it know our progress, in milliseconds. + * + * in pg_probackup we use a default setting = 10 sec + */ +static int standby_message_timeout = 10 * 1000; + +/* stop_backup_lsn is set by pg_stop_backup() to stop streaming */ +XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr; +static XLogRecPtr stop_stream_lsn = InvalidXLogRecPtr; + +/* + * How long we should wait for streaming end in seconds. + * Retrieved as checkpoint_timeout + checkpoint_timeout * 0.1 + */ +static uint32 stream_stop_timeout = 0; +/* Time in which we started to wait for streaming end */ +static time_t stream_stop_begin = 0; + +/* + * We need to wait end of WAL streaming before execute pg_stop_backup(). + */ +typedef struct +{ + const char *basedir; + PGconn *conn; + + /* + * Return value from the thread. + * 0 means there is no error, 1 - there is an error. + */ + int ret; + + XLogRecPtr startpos; + TimeLineID starttli; +} StreamThreadArg; + +static pthread_t stream_thread; +static StreamThreadArg stream_thread_arg = {"", NULL, 1}; + +static void IdentifySystem(StreamThreadArg *stream_thread_arg); +static int checkpoint_timeout(PGconn *backup_conn); +static void *StreamLog(void *arg); +static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline, + bool segment_finished); + +/* + * Run IDENTIFY_SYSTEM through a given connection and + * check system identifier and timeline are matching + */ +static void +IdentifySystem(StreamThreadArg *stream_thread_arg) +{ + PGresult *res; + + uint64 stream_conn_sysidentifier = 0; + char *stream_conn_sysidentifier_str; + TimeLineID stream_conn_tli = 0; + + if (!CheckServerVersionForStreaming(stream_thread_arg->conn)) + { + PQfinish(stream_thread_arg->conn); + /* + * Error message already written in CheckServerVersionForStreaming(). + * There's no hope of recovering from a version mismatch, so don't + * retry. + */ + elog(ERROR, "Cannot continue backup because stream connect has failed."); + } + + /* + * Identify server, obtain server system identifier and timeline + */ + res = pgut_execute(stream_thread_arg->conn, "IDENTIFY_SYSTEM", 0, NULL); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + elog(WARNING,"Could not send replication command \"%s\": %s", + "IDENTIFY_SYSTEM", PQerrorMessage(stream_thread_arg->conn)); + PQfinish(stream_thread_arg->conn); + elog(ERROR, "Cannot continue backup because stream connect has failed."); + } + + stream_conn_sysidentifier_str = PQgetvalue(res, 0, 0); + stream_conn_tli = atoll(PQgetvalue(res, 0, 1)); + + /* Additional sanity, primary for PG 9.5, + * where system id can be obtained only via "IDENTIFY SYSTEM" + */ + if (!parse_uint64(stream_conn_sysidentifier_str, &stream_conn_sysidentifier, 0)) + elog(ERROR, "%s is not system_identifier", stream_conn_sysidentifier_str); + + if (stream_conn_sysidentifier != instance_config.system_identifier) + elog(ERROR, "System identifier mismatch. Connected PostgreSQL instance has system id: " + "" UINT64_FORMAT ". Expected: " UINT64_FORMAT ".", + stream_conn_sysidentifier, instance_config.system_identifier); + + if (stream_conn_tli != current.tli) + elog(ERROR, "Timeline identifier mismatch. " + "Connected PostgreSQL instance has timeline id: %X. Expected: %X.", + stream_conn_tli, current.tli); + + PQclear(res); +} + +/* + * Retrieve checkpoint_timeout GUC value in seconds. + */ +static int +checkpoint_timeout(PGconn *backup_conn) +{ + PGresult *res; + const char *val; + const char *hintmsg; + int val_int; + + res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL); + val = PQgetvalue(res, 0, 0); + + if (!parse_int(val, &val_int, OPTION_UNIT_S, &hintmsg)) + { + PQclear(res); + if (hintmsg) + elog(ERROR, "Invalid value of checkout_timeout %s: %s", val, + hintmsg); + else + elog(ERROR, "Invalid value of checkout_timeout %s", val); + } + + PQclear(res); + + return val_int; +} + +/* + * Start the log streaming + */ +static void * +StreamLog(void *arg) +{ + StreamThreadArg *stream_arg = (StreamThreadArg *) arg; + + /* + * Always start streaming at the beginning of a segment + */ + stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size; + + /* Initialize timeout */ + stream_stop_begin = 0; + +#if PG_VERSION_NUM >= 100000 + /* if slot name was not provided for temp slot, use default slot name */ + if (!replication_slot && temp_slot) + replication_slot = "pg_probackup_slot"; +#endif + + +#if PG_VERSION_NUM >= 110000 + /* Create temp repslot */ + if (temp_slot) + CreateReplicationSlot(stream_arg->conn, replication_slot, + NULL, temp_slot, true, true, false); +#endif + + /* + * Start the replication + */ + elog(LOG, "started streaming WAL at %X/%X (timeline %u)", + (uint32) (stream_arg->startpos >> 32), (uint32) stream_arg->startpos, + stream_arg->starttli); + +#if PG_VERSION_NUM >= 90600 + { + StreamCtl ctl; + + MemSet(&ctl, 0, sizeof(ctl)); + + ctl.startpos = stream_arg->startpos; + ctl.timeline = stream_arg->starttli; + ctl.sysidentifier = NULL; + +#if PG_VERSION_NUM >= 100000 + ctl.walmethod = CreateWalDirectoryMethod( + stream_arg->basedir, +// (instance_config.compress_alg == NONE_COMPRESS) ? 0 : instance_config.compress_level, + 0, + true); + ctl.replication_slot = replication_slot; + ctl.stop_socket = PGINVALID_SOCKET; + ctl.do_sync = false; /* We sync all files at the end of backup */ +// ctl.mark_done /* for future use in s3 */ +#if PG_VERSION_NUM >= 100000 && PG_VERSION_NUM < 110000 + ctl.temp_slot = temp_slot; +#endif +#else + ctl.basedir = (char *) stream_arg->basedir; +#endif + + ctl.stream_stop = stop_streaming; + ctl.standby_message_timeout = standby_message_timeout; + ctl.partial_suffix = NULL; + ctl.synchronous = false; + ctl.mark_done = false; + + if(ReceiveXlogStream(stream_arg->conn, &ctl) == false) + elog(ERROR, "Problem in receivexlog"); + +#if PG_VERSION_NUM >= 100000 + if (!ctl.walmethod->finish()) + elog(ERROR, "Could not finish writing WAL files: %s", + strerror(errno)); +#endif + } +#else + if(ReceiveXlogStream(stream_arg->conn, stream_arg->startpos, stream_arg->starttli, + NULL, (char *) stream_arg->basedir, stop_streaming, + standby_message_timeout, NULL, false, false) == false) + elog(ERROR, "Problem in receivexlog"); +#endif + + elog(LOG, "finished streaming WAL at %X/%X (timeline %u)", + (uint32) (stop_stream_lsn >> 32), (uint32) stop_stream_lsn, stream_arg->starttli); + stream_arg->ret = 0; + + PQfinish(stream_arg->conn); + stream_arg->conn = NULL; + + return NULL; +} + +/* + * for ReceiveXlogStream + * + * The stream_stop callback will be called every time data + * is received, and whenever a segment is completed. If it returns + * true, the streaming will stop and the function + * return. As long as it returns false, streaming will continue + * indefinitely. + * + * Stop WAL streaming if current 'xlogpos' exceeds 'stop_backup_lsn', which is + * set by pg_stop_backup(). + * + */ +static bool +stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) +{ + static uint32 prevtimeline = 0; + static XLogRecPtr prevpos = InvalidXLogRecPtr; + + /* check for interrupt */ + if (interrupted || thread_interrupted) + elog(ERROR, "Interrupted during WAL streaming"); + + /* we assume that we get called once at the end of each segment */ + if (segment_finished) + { + elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"), + (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); + + /* TODO Add streamed file to file list */ + } + + /* + * Note that we report the previous, not current, position here. After a + * timeline switch, xlogpos points to the beginning of the segment because + * that's where we always begin streaming. Reporting the end of previous + * timeline isn't totally accurate, because the next timeline can begin + * slightly before the end of the WAL that we received on the previous + * timeline, but it's close enough for reporting purposes. + */ + if (prevtimeline != 0 && prevtimeline != timeline) + elog(LOG, _("switched to timeline %u at %X/%X\n"), + timeline, (uint32) (prevpos >> 32), (uint32) prevpos); + + if (!XLogRecPtrIsInvalid(stop_backup_lsn)) + { + if (xlogpos >= stop_backup_lsn) + { + stop_stream_lsn = xlogpos; + return true; + } + + /* pg_stop_backup() was executed, wait for the completion of stream */ + if (stream_stop_begin == 0) + { + elog(INFO, "Wait for LSN %X/%X to be streamed", + (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn); + + stream_stop_begin = time(NULL); + } + + if (time(NULL) - stream_stop_begin > stream_stop_timeout) + elog(ERROR, "Target LSN %X/%X could not be streamed in %d seconds", + (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn, + stream_stop_timeout); + } + + prevtimeline = timeline; + prevpos = xlogpos; + + return false; +} + + +/* --- External API --- */ + +/* + * Maybe add a StreamOptions struct ? + * Backup conn only needed to calculate stream_stop_timeout. Think about refactoring it. + */ +void +start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOptions *conn_opt, + XLogRecPtr startpos, TimeLineID starttli) +{ + /* How long we should wait for streaming end after pg_stop_backup */ + stream_stop_timeout = checkpoint_timeout(backup_conn); + //TODO Add a comment about this calculation + stream_stop_timeout = stream_stop_timeout + stream_stop_timeout * 0.1; + + stream_thread_arg.basedir = stream_dst_path; + + /* + * Connect in replication mode to the server. + */ + stream_thread_arg.conn = pgut_connect_replication(conn_opt->pghost, + conn_opt->pgport, + conn_opt->pgdatabase, + conn_opt->pguser); + /* sanity check*/ + IdentifySystem(&stream_thread_arg); + + /* Set error exit code as default */ + stream_thread_arg.ret = 1; + /* we must use startpos as start_lsn from start_backup */ + stream_thread_arg.startpos = current.start_lsn; + stream_thread_arg.starttli = current.tli; + + thread_interrupted = false; + pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg); +} + +/* Wait for the completion of stream */ +void +wait_WAL_streaming_end(void) +{ + pthread_join(stream_thread, NULL); + if (stream_thread_arg.ret == 1) + elog(ERROR, "WAL streaming failed"); +}