diff --git a/backup.c b/backup.c index fc60495b..2ccdd728 100644 --- a/backup.c +++ b/backup.c @@ -28,6 +28,15 @@ static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr; + +/* + * How long we should wait for streaming end in seconds. + * Retreived as checkpoint_timeout + checkpoint_timeout * 0.1 + */ +static uint32 stream_stop_timeout = 0; +/* Time in which we started to wait for streaming end */ +static time_t stream_stop_begin = 0; + const char *progname = "pg_probackup"; /* list of files contained in backup */ @@ -65,6 +74,7 @@ static void do_backup_database(parray *backup_list); static void pg_start_backup(const char *label, bool smooth, pgBackup *backup); static void pg_switch_wal(void); static void pg_stop_backup(pgBackup *backup); +static int checkpoint_timeout(void); static void add_pgdata_files(parray *files, const char *root); static void write_backup_file_list(parray *files, const char *root); @@ -785,8 +795,12 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, } /* - * Wait for target 'lsn' to be archived in archive 'wal' directory with - * WAL segment file. + * Wait for target 'lsn'. + * + * If current backup started in archive mode wait for 'lsn' to be archived in + * archive 'wal' directory with WAL segment file. + * If current backup started in stream mode wait for 'lsn' to be streamed in + * 'pg_xlog' directory. */ static void wait_wal_lsn(XLogRecPtr lsn) @@ -807,29 +821,12 @@ wait_wal_lsn(XLogRecPtr lsn) if (stream_wal) { - PGresult *res; - const char *val; - const char *hintmsg; - pgBackupGetPath2(¤t, wal_dir, lengthof(wal_dir), DATABASE_DIR, PG_XLOG_DIR); join_path_components(wal_segment_full_path, wal_dir, wal_segment); - res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL); - val = PQgetvalue(res, 0, 0); - PQclear(res); - - if (!parse_int(val, (int *) &timeout, OPTION_UNIT_S, - &hintmsg)) - { - if (hintmsg) - elog(ERROR, "Invalid value of checkout_timeout %s: %s", val, - hintmsg); - else - elog(ERROR, "Invalid value of checkout_timeout %s", val); - } - /* Add 3 seconds to the initial value of checkpoint_timeout */ - timeout = timeout + 3; + timeout = (uint32) checkpoint_timeout(); + timeout = timeout + timeout * 0.1; } else { @@ -837,7 +834,7 @@ wait_wal_lsn(XLogRecPtr lsn) timeout = archive_timeout; } - /* Wait until target LSN is archived */ + /* Wait until target LSN is archived or streamed */ while (true) { bool file_exists = fileExists(wal_segment_full_path); @@ -845,7 +842,7 @@ wait_wal_lsn(XLogRecPtr lsn) if (file_exists) { /* - * WAL segment was archived. Check LSN on it. + * A WAL segment found. Check LSN on it. */ if ((stream_wal && wal_contains_lsn(wal_dir, lsn, tli)) || (!stream_wal && wal_contains_lsn(arclog_path, lsn, tli))) @@ -996,11 +993,12 @@ pg_stop_backup(pgBackup *backup) PQclear(res); if (stream_wal) - { /* Wait for the completion of stream */ - elog(INFO, "Wait end of WAL streaming"); pthread_join(stream_thread, NULL); - } + /* + * Wait for stop_lsn to be archived or streamed. + * We wait for stop_lsn in stream mode just in case. + */ wait_wal_lsn(stop_backup_lsn); /* Fill in fields if that is the correct end of backup. */ @@ -1032,6 +1030,33 @@ pg_stop_backup(pgBackup *backup) } } +/* + * Retreive checkpoint_timeout GUC value in seconds. + */ +static int +checkpoint_timeout(void) +{ + PGresult *res; + const char *val; + const char *hintmsg; + int val_int; + + res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL); + val = PQgetvalue(res, 0, 0); + PQclear(res); + + if (!parse_int(val, &val_int, OPTION_UNIT_S, &hintmsg)) + { + if (hintmsg) + elog(ERROR, "Invalid value of checkout_timeout %s: %s", val, + hintmsg); + else + elog(ERROR, "Invalid value of checkout_timeout %s", val); + } + + return val_int; +} + /* * Return true if the path is a existing regular file. */ @@ -1647,8 +1672,28 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) elog(LOG, _("switched to timeline %u at %X/%X\n"), timeline, (uint32) (prevpos >> 32), (uint32) prevpos); - if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn) - return true; + if (!XLogRecPtrIsInvalid(stop_backup_lsn)) + { + if (xlogpos > stop_backup_lsn) + return true; + + /* pg_stop_backup() was executed, wait for the completion of stream */ + if (stream_stop_timeout == 0) + { + elog(INFO, "Wait for LSN %X/%X to be streamed", + (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn); + + stream_stop_timeout = checkpoint_timeout(); + stream_stop_timeout = stream_stop_timeout + stream_stop_timeout * 0.1; + + stream_stop_begin = time(NULL); + } + + if (time(NULL) - stream_stop_begin > stream_stop_timeout) + elog(ERROR, "Target LSN %X/%X could not be streamed in %d seconds", + (uint32) (stop_backup_lsn >> 32), (uint32) stop_backup_lsn, + stream_stop_timeout); + } prevtimeline = timeline; prevpos = xlogpos; @@ -1709,6 +1754,10 @@ StreamLog(void *arg) */ startpos -= startpos % XLOG_SEG_SIZE; + /* Initialize timeout */ + stream_stop_timeout = 0; + stream_stop_begin = 0; + /* * Start the replication */