1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-09 14:45:47 +02:00

Wait checkpoint_timeout + 3s for LSN in stream mode

This commit is contained in:
Arthur Zakirov 2017-05-25 14:05:48 +03:00
parent 82ca5a0213
commit aa9764d8e9
4 changed files with 271 additions and 32 deletions

View File

@ -33,7 +33,11 @@ const char *progname = "pg_probackup";
/* list of files contained in backup */
static parray *backup_files_list = NULL;
static pthread_mutex_t check_stream_mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t start_stream_mut = PTHREAD_MUTEX_INITIALIZER;
/*
* We need to wait end of WAL streaming before execute pg_stop_backup().
*/
static pthread_t stream_thread;
static int is_ptrack_enable = false;
@ -105,7 +109,6 @@ do_backup_database(parray *backup_list)
XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr;
pthread_t backup_threads[num_threads];
pthread_t stream_thread;
backup_files_args *backup_threads_args[num_threads];
pgBackup *prev_backup = NULL;
@ -142,24 +145,6 @@ do_backup_database(parray *backup_list)
strncat(label, " with pg_probackup", lengthof(label));
pg_start_backup(label, smooth_checkpoint, &current);
pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR);
/* start stream replication */
if (stream_wal)
{
join_path_components(dst_backup_path, database_path, PG_XLOG_DIR);
dir_create_dir(dst_backup_path, DIR_PERMISSION);
pthread_mutex_lock(&check_stream_mut);
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path);
pthread_mutex_lock(&check_stream_mut);
if (conn == NULL)
elog(ERROR, "Cannot continue backup because stream connect has failed.");
pthread_mutex_unlock(&check_stream_mut);
}
/*
* If backup_label does not exist in $PGDATA, stop taking backup.
* NOTE. We can check it only on master, though.
@ -178,6 +163,24 @@ do_backup_database(parray *backup_list)
}
}
pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR);
/* start stream replication */
if (stream_wal)
{
join_path_components(dst_backup_path, database_path, PG_XLOG_DIR);
dir_create_dir(dst_backup_path, DIR_PERMISSION);
pthread_mutex_lock(&start_stream_mut);
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path);
pthread_mutex_lock(&start_stream_mut);
if (conn == NULL)
elog(ERROR, "Cannot continue backup because stream connect has failed.");
pthread_mutex_unlock(&start_stream_mut);
}
/*
* To take incremental backup get the filelist of the last completed database
*/
@ -299,7 +302,7 @@ do_backup_database(parray *backup_list)
pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, backup_threads_args[i]);
}
/* Wait theads */
/* Wait threads */
for (i = 0; i < num_threads; i++)
{
pthread_join(backup_threads[i], NULL);
@ -322,9 +325,6 @@ do_backup_database(parray *backup_list)
parray *xlog_files_list;
char pg_xlog_path[MAXPGPATH];
/* Wait for the completion of stream */
pthread_join(stream_thread, NULL);
/* Scan backup PG_XLOG_DIR */
xlog_files_list = parray_new();
join_path_components(pg_xlog_path, database_path, PG_XLOG_DIR);
@ -796,7 +796,8 @@ wait_wal_lsn(XLogRecPtr lsn)
char wal_dir[MAXPGPATH],
wal_segment_full_path[MAXPGPATH];
char wal_segment[MAXFNAMELEN];
uint32 try_count = 0;
uint32 try_count = 0,
timeout;
tli = get_current_timeline(false);
@ -806,12 +807,35 @@ wait_wal_lsn(XLogRecPtr lsn)
if (stream_wal)
{
PGresult *res;
const char *val;
const char *hintmsg;
pgBackupGetPath2(&current, wal_dir, lengthof(wal_dir),
DATABASE_DIR, PG_XLOG_DIR);
join_path_components(wal_segment_full_path, wal_dir, wal_segment);
res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL);
val = PQgetvalue(res, 0, 0);
PQclear(res);
if (!parse_int(val, (int *) &timeout, OPTION_UNIT_S,
&hintmsg))
{
if (hintmsg)
elog(ERROR, "Invalid value of checkout_timeout %s: %s", val,
hintmsg);
else
elog(ERROR, "Invalid value of checkout_timeout %s", val);
}
/* Add 3 seconds to the initial value of checkpoint_timeout */
timeout = timeout + 3;
}
else
{
join_path_components(wal_segment_full_path, arclog_path, wal_segment);
timeout = archive_timeout;
}
/* Wait until switched WAL is archived */
while (!fileExists(wal_segment_full_path))
@ -826,10 +850,10 @@ wait_wal_lsn(XLogRecPtr lsn)
elog(INFO, "wait for LSN %X/%X in archived WAL segment %s",
(uint32) (lsn >> 32), (uint32) lsn, wal_segment_full_path);
if (archive_timeout > 0 && try_count > archive_timeout)
if (timeout > 0 && try_count > timeout)
elog(ERROR,
"switched WAL segment %s could not be archived in %d seconds",
wal_segment, archive_timeout);
wal_segment, timeout);
}
/*
@ -959,6 +983,9 @@ pg_stop_backup(pgBackup *backup)
PQclear(res);
if (stream_wal)
/* Wait for the completion of stream */
pthread_join(stream_thread, NULL);
wait_wal_lsn(stop_backup_lsn);
/* Fill in fields if that is the correct end of backup. */
@ -1631,7 +1658,7 @@ StreamLog(void *arg)
conn = GetConnection();
if (!conn)
{
pthread_mutex_unlock(&check_stream_mut);
pthread_mutex_unlock(&start_stream_mut);
/* Error message already written in GetConnection() */
return;
}
@ -1655,7 +1682,7 @@ StreamLog(void *arg)
disconnect_and_exit(1);
/* Ok we have normal stream connect and main process can work again */
pthread_mutex_unlock(&check_stream_mut);
pthread_mutex_unlock(&start_stream_mut);
/*
* We must use startpos as start_lsn from start_backup

2
show.c
View File

@ -253,7 +253,7 @@ show_backup_list(FILE *out, parray *backup_list)
parent_tli = get_parent_tli(backup->tli);
backup_id = base36enc(backup->start_time);
fprintf(out, "%-6s %-19s %-6s %-7s %3d / %-3d %5s %6s %2X/%08X %2X/%08X %-8s\n",
fprintf(out, "%-6s %-19s %-6s %-7s %3d / %-3d %5s %6s %2X/%-8X %2X/%-8X %-8s\n",
backup_id,
timestamp,
pgBackupGetBackupMode(backup),

View File

@ -2,7 +2,8 @@
*
* pgut.c
*
* Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2017-2017, Postgres Professional
*
*-------------------------------------------------------------------------
*/
@ -54,6 +55,75 @@ static void on_cleanup(void);
static void exit_or_abort(int exitcode);
static const char *get_username(void);
/*
* Unit conversion tables.
*
* Copied from guc.c.
*/
#define MAX_UNIT_LEN 3 /* length of longest recognized unit string */
typedef struct
{
char unit[MAX_UNIT_LEN + 1]; /* unit, as a string, like "kB" or
* "min" */
int base_unit; /* OPTION_UNIT_XXX */
int multiplier; /* If positive, multiply the value with this
* for unit -> base_unit conversion. If
* negative, divide (with the absolute value) */
} unit_conversion;
static const char *memory_units_hint = "Valid units for this parameter are \"kB\", \"MB\", \"GB\", and \"TB\".";
static const unit_conversion memory_unit_conversion_table[] =
{
{"TB", OPTION_UNIT_KB, 1024 * 1024 * 1024},
{"GB", OPTION_UNIT_KB, 1024 * 1024},
{"MB", OPTION_UNIT_KB, 1024},
{"kB", OPTION_UNIT_KB, 1},
{"TB", OPTION_UNIT_BLOCKS, (1024 * 1024 * 1024) / (BLCKSZ / 1024)},
{"GB", OPTION_UNIT_BLOCKS, (1024 * 1024) / (BLCKSZ / 1024)},
{"MB", OPTION_UNIT_BLOCKS, 1024 / (BLCKSZ / 1024)},
{"kB", OPTION_UNIT_BLOCKS, -(BLCKSZ / 1024)},
{"TB", OPTION_UNIT_XBLOCKS, (1024 * 1024 * 1024) / (XLOG_BLCKSZ / 1024)},
{"GB", OPTION_UNIT_XBLOCKS, (1024 * 1024) / (XLOG_BLCKSZ / 1024)},
{"MB", OPTION_UNIT_XBLOCKS, 1024 / (XLOG_BLCKSZ / 1024)},
{"kB", OPTION_UNIT_XBLOCKS, -(XLOG_BLCKSZ / 1024)},
{"TB", OPTION_UNIT_XSEGS, (1024 * 1024 * 1024) / (XLOG_SEG_SIZE / 1024)},
{"GB", OPTION_UNIT_XSEGS, (1024 * 1024) / (XLOG_SEG_SIZE / 1024)},
{"MB", OPTION_UNIT_XSEGS, -(XLOG_SEG_SIZE / (1024 * 1024))},
{"kB", OPTION_UNIT_XSEGS, -(XLOG_SEG_SIZE / 1024)},
{""} /* end of table marker */
};
static const char *time_units_hint = "Valid units for this parameter are \"ms\", \"s\", \"min\", \"h\", and \"d\".";
static const unit_conversion time_unit_conversion_table[] =
{
{"d", OPTION_UNIT_MS, 1000 * 60 * 60 * 24},
{"h", OPTION_UNIT_MS, 1000 * 60 * 60},
{"min", OPTION_UNIT_MS, 1000 * 60},
{"s", OPTION_UNIT_MS, 1000},
{"ms", OPTION_UNIT_MS, 1},
{"d", OPTION_UNIT_S, 60 * 60 * 24},
{"h", OPTION_UNIT_S, 60 * 60},
{"min", OPTION_UNIT_S, 60},
{"s", OPTION_UNIT_S, 1},
{"ms", OPTION_UNIT_S, -1000},
{"d", OPTION_UNIT_MIN, 60 * 24},
{"h", OPTION_UNIT_MIN, 60},
{"min", OPTION_UNIT_MIN, 1},
{"s", OPTION_UNIT_MIN, -60},
{"ms", OPTION_UNIT_MIN, -1000 * 60},
{""} /* end of table marker */
};
static size_t
option_length(const pgut_option opts[])
{
@ -470,6 +540,129 @@ parse_time(const char *value, time_t *time)
return true;
}
/*
* Convert a value from one of the human-friendly units ("kB", "min" etc.)
* to the given base unit. 'value' and 'unit' are the input value and unit
* to convert from. The converted value is stored in *base_value.
*
* Returns true on success, false if the input unit is not recognized.
*/
static bool
convert_to_base_unit(int64 value, const char *unit,
int base_unit, int64 *base_value)
{
const unit_conversion *table;
int i;
if (base_unit & OPTION_UNIT_MEMORY)
table = memory_unit_conversion_table;
else
table = time_unit_conversion_table;
for (i = 0; *table[i].unit; i++)
{
if (base_unit == table[i].base_unit &&
strcmp(unit, table[i].unit) == 0)
{
if (table[i].multiplier < 0)
*base_value = value / (-table[i].multiplier);
else
*base_value = value * table[i].multiplier;
return true;
}
}
return false;
}
/*
* Try to parse value as an integer. The accepted formats are the
* usual decimal, octal, or hexadecimal formats, optionally followed by
* a unit name if "flags" indicates a unit is allowed.
*
* If the string parses okay, return true, else false.
* If okay and result is not NULL, return the value in *result.
* If not okay and hintmsg is not NULL, *hintmsg is set to a suitable
* HINT message, or NULL if no hint provided.
*/
bool
parse_int(const char *value, int *result, int flags, const char **hintmsg)
{
int64 val;
char *endptr;
/* To suppress compiler warnings, always set output params */
if (result)
*result = 0;
if (hintmsg)
*hintmsg = NULL;
/* We assume here that int64 is at least as wide as long */
errno = 0;
val = strtol(value, &endptr, 0);
if (endptr == value)
return false; /* no HINT for integer syntax error */
if (errno == ERANGE || val != (int64) ((int32) val))
{
if (hintmsg)
*hintmsg = "Value exceeds integer range.";
return false;
}
/* allow whitespace between integer and unit */
while (isspace((unsigned char) *endptr))
endptr++;
/* Handle possible unit */
if (*endptr != '\0')
{
char unit[MAX_UNIT_LEN + 1];
int unitlen;
bool converted = false;
if ((flags & OPTION_UNIT) == 0)
return false; /* this setting does not accept a unit */
unitlen = 0;
while (*endptr != '\0' && !isspace((unsigned char) *endptr) &&
unitlen < MAX_UNIT_LEN)
unit[unitlen++] = *(endptr++);
unit[unitlen] = '\0';
/* allow whitespace after unit */
while (isspace((unsigned char) *endptr))
endptr++;
if (*endptr == '\0')
converted = convert_to_base_unit(val, unit, (flags & OPTION_UNIT),
&val);
if (!converted)
{
/* invalid unit, or garbage after the unit; set hint and fail. */
if (hintmsg)
{
if (flags & OPTION_UNIT_MEMORY)
*hintmsg = memory_units_hint;
else
*hintmsg = time_units_hint;
}
return false;
}
/* Check for overflow due to units conversion */
if (val != (int64) ((int32) val))
{
if (hintmsg)
*hintmsg = "Value exceeds integer range.";
return false;
}
}
if (result)
*result = (int) val;
return true;
}
static char *
longopts_to_optstring(const struct option opts[])
{

View File

@ -2,7 +2,8 @@
*
* pgut.h
*
* Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2017-2017, Postgres Professional
*
*-------------------------------------------------------------------------
*/
@ -67,6 +68,22 @@ typedef struct pgut_option
typedef void (*pgut_optfn) (pgut_option *opt, const char *arg);
typedef void (*pgut_atexit_callback)(bool fatal, void *userdata);
/*
* bit values in "flags" of an option
*/
#define OPTION_UNIT_KB 0x1000 /* value is in kilobytes */
#define OPTION_UNIT_BLOCKS 0x2000 /* value is in blocks */
#define OPTION_UNIT_XBLOCKS 0x3000 /* value is in xlog blocks */
#define OPTION_UNIT_XSEGS 0x4000 /* value is in xlog segments */
#define OPTION_UNIT_MEMORY 0xF000 /* mask for size-related units */
#define OPTION_UNIT_MS 0x10000 /* value is in milliseconds */
#define OPTION_UNIT_S 0x20000 /* value is in seconds */
#define OPTION_UNIT_MIN 0x30000 /* value is in minutes */
#define OPTION_UNIT_TIME 0xF0000 /* mask for time-related units */
#define OPTION_UNIT (OPTION_UNIT_MEMORY | OPTION_UNIT_TIME)
/*
* pgut client variables and functions
*/
@ -172,6 +189,8 @@ extern bool parse_uint32(const char *value, uint32 *result);
extern bool parse_int64(const char *value, int64 *result);
extern bool parse_uint64(const char *value, uint64 *result);
extern bool parse_time(const char *value, time_t *time);
extern bool parse_int(const char *value, int *result, int flags,
const char **hintmsg);
#define IsSpace(c) (isspace((unsigned char)(c)))
#define IsAlpha(c) (isalpha((unsigned char)(c)))