1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-24 08:52:38 +02:00

Add stream mode for save WAL during backup process.

This commit is contained in:
stalkerg 2016-05-26 15:56:32 +03:00
parent ccd4f48017
commit 9471875b15
14 changed files with 353 additions and 23 deletions

View File

@ -16,6 +16,8 @@ OBJS = backup.o \
datapagemap.o \
parsexlog.o \
xlogreader.o \
streamutil.o \
receivelog.o \
pgut/pgut.o \
pgut/pgut-port.o
@ -39,7 +41,7 @@ PG_LIBS = $(libpq_pgport) ${PTHREAD_LIBS} ${PTHREAD_CFLAGS}
REGRESS = init option show delete backup restore
all: checksrcdir docs datapagemap.h pg_arman
all: checksrcdir docs datapagemap.h receivelog.h streamutil.h pg_arman
# This rule's only purpose is to give the user instructions on how to pass
# the path to PostgreSQL source tree to the makefile.

219
backup.c
View File

@ -23,14 +23,19 @@
#include "pgut/pgut-port.h"
#include "storage/bufpage.h"
#include "datapagemap.h"
#include "streamutil.h"
#include "receivelog.h"
/* wait 10 sec until WAL archive complete */
#define TIMEOUT_ARCHIVE 10
#define TIMEOUT_ARCHIVE 10
/* Server version */
static int server_version = 0;
static bool in_backup = false; /* TODO: more robust logic */
static bool in_backup = false; /* TODO: more robust logic */
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr;
const char *progname = "pg_arman";
/* list of files contained in backup */
parray *backup_files_list;
@ -71,6 +76,15 @@ static void create_file_list(parray *files,
bool is_append);
static void wait_for_archive(pgBackup *backup, const char *sql);
static void make_pagemap_from_ptrack(parray *files);
static void StreamLog(void *arg);
#define disconnect_and_exit(code) \
{ \
if (conn != NULL) PQfinish(conn); \
exit(code); \
}
/*
* Take a backup of database and return the list of files backed up.
@ -82,12 +96,14 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
parray *prev_files = NULL; /* file list of previous database backup */
FILE *fp;
char path[MAXPGPATH];
char dst_backup_path[MAXPGPATH];
char label[1024];
XLogRecPtr *lsn = NULL;
char prev_file_txt[MAXPGPATH]; /* path of the previous backup
* list file */
bool has_backup_label = true; /* flag if backup_label is there */
pthread_t backup_threads[num_threads];
pthread_t stream_thread;
backup_files_args *backup_threads_args[num_threads];
/* repack the options */
@ -129,9 +145,19 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
"or validate existing one.");
}
/* clear ptrack files for FULL and DIFF backup */
if (current.backup_mode != BACKUP_MODE_DIFF_PTRACK)
pg_ptrack_clear();
/* start stream replication */
if (stream_wal)
{
pgBackupGetPath(&current, path, lengthof(path), DATABASE_DIR);
join_path_components(dst_backup_path, path, "pg_xlog");
dir_create_dir(dst_backup_path, DIR_PERMISSION);
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path);
}
/* notify start of backup to PostgreSQL server */
time2iso(label, lengthof(label), current.start_time);
strncat(label, " with pg_arman", lengthof(label));
@ -322,6 +348,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
/* Notify end of backup */
pg_stop_backup(&current);
if (stream_wal)
{
parray *list_file;
char pg_xlog_path[MAXPGPATH];
/* We expect the completion of stream */
pthread_join(stream_thread, NULL);
/* Scan backup pg_xlog dir */
list_file = parray_new();
join_path_components(pg_xlog_path, path, "pg_xlog");
dir_list_file(list_file, pg_xlog_path, NULL, true, false);
/* Remove file path root prefix and calc meta */
for (i = 0; i < parray_num(list_file); i++)
{
pgFile *file = (pgFile *)parray_get(list_file, i);
calc_file(file);
if (strstr(file->path, path) == file->path)
{
char *ptr = file->path;
file->path = pstrdup(JoinPathEnd(ptr, path));
free(ptr);
}
}
parray_concat(backup_files_list, list_file);
}
/* Create file list */
create_file_list(backup_files_list, pgdata, DATABASE_FILE_LIST, NULL, false);
@ -549,7 +604,7 @@ static void
pg_ptrack_clear(void)
{
PGresult *res_db, *res;
const char *old_dbname = dbname;
const char *old_dbname = pgut_dbname;
int i;
reconnect();
@ -557,8 +612,8 @@ pg_ptrack_clear(void)
disconnect();
for(i=0; i < PQntuples(res_db); i++)
{
dbname = PQgetvalue(res_db, i, 0);
if (!strcmp(dbname, "template0"))
pgut_dbname = PQgetvalue(res_db, i, 0);
if (!strcmp(pgut_dbname, "template0"))
continue;
reconnect();
res = execute("SELECT pg_ptrack_clear()", 0, NULL);
@ -566,14 +621,14 @@ pg_ptrack_clear(void)
}
PQclear(res_db);
disconnect();
dbname = old_dbname;
pgut_dbname = old_dbname;
}
static char *
pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *result_size)
{
PGresult *res_db, *res;
const char *old_dbname = dbname;
const char *old_dbname = pgut_dbname;
char *params[2];
char *result;
@ -584,7 +639,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
sprintf(params[1], "%i", rel_oid);
res_db = execute("SELECT datname FROM pg_database WHERE oid=$1", 1, (const char **)params);
disconnect();
dbname = pstrdup(PQgetvalue(res_db, 0, 0));
pgut_dbname = pstrdup(PQgetvalue(res_db, 0, 0));
PQclear(res_db);
reconnect();
@ -595,8 +650,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
pfree(params[0]);
pfree(params[1]);
pfree((char *)dbname);
dbname = old_dbname;
pfree((char *)pgut_dbname);
pgut_dbname = old_dbname;
return result;
}
@ -683,7 +738,52 @@ wait_for_archive(pgBackup *backup, const char *sql)
static void
pg_stop_backup(pgBackup *backup)
{
wait_for_archive(backup,
if (stream_wal)
{
PGresult *res;
TimeLineID tli;
reconnect();
/* Remove annoying NOTICE messages generated by backend */
res = execute("SET client_min_messages = warning;", 0, NULL);
PQclear(res);
/* And execute the query wanted */
res = execute("SELECT * FROM pg_stop_backup()", 0, NULL);
/* Get LSN from execution result */
get_lsn(res, &stop_backup_lsn);
PQclear(res);
/*
* Enforce TLI obtention if backup is not present as this code
* path can be taken as a callback at exit.
*/
tli = get_current_timeline(false);
/* Fill in fields if backup exists */
if (backup != NULL)
{
backup->tli = tli;
backup->stop_lsn = stop_backup_lsn;
elog(LOG, "%s(): tli=%X lsn=%X/%08X",
__FUNCTION__, backup->tli,
(uint32) (backup->stop_lsn >> 32),
(uint32) backup->stop_lsn);
}
res = execute(TXID_CURRENT_SQL, 0, NULL);
if (backup != NULL)
{
get_xid(res, &backup->recovery_xid);
backup->recovery_time = time(NULL);
}
PQclear(res);
disconnect();
}
else
wait_for_archive(backup,
"SELECT * FROM pg_stop_backup()");
}
@ -719,8 +819,8 @@ get_lsn(PGresult *res, XLogRecPtr *lsn)
* Extract timeline and LSN from results of pg_stop_backup()
* and friends.
*/
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
/* Calculate LSN */
*lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
}
@ -1137,3 +1237,98 @@ void make_pagemap_from_ptrack(parray *files)
}
}
}
static bool
stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
{
static uint32 prevtimeline = 0;
static XLogRecPtr prevpos = InvalidXLogRecPtr;
/* we assume that we get called once at the end of each segment */
if (verbose && segment_finished)
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
timeline);
/*
* Note that we report the previous, not current, position here. After a
* timeline switch, xlogpos points to the beginning of the segment because
* that's where we always begin streaming. Reporting the end of previous
* timeline isn't totally accurate, because the next timeline can begin
* slightly before the end of the WAL that we received on the previous
* timeline, but it's close enough for reporting purposes.
*/
if (prevtimeline != 0 && prevtimeline != timeline)
fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
progname, timeline,
(uint32) (prevpos >> 32), (uint32) prevpos);
if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn)
return true;
prevtimeline = timeline;
prevpos = xlogpos;
return false;
}
/*
* Start the log streaming
*/
static void
StreamLog(void *arg)
{
XLogRecPtr startpos;
TimeLineID starttli;
char *basedir = (char *)arg;
/*
* Connect in replication mode to the server
*/
if (conn == NULL)
conn = GetConnection();
if (!conn)
/* Error message already written in GetConnection() */
return;
if (!CheckServerVersionForStreaming(conn))
{
/*
* Error message already written in CheckServerVersionForStreaming().
* There's no hope of recovering from a version mismatch, so don't
* retry.
*/
disconnect_and_exit(1);
}
/*
* Identify server, obtaining start LSN position and current timeline ID
* at the same time, necessary if not valid data can be found in the
* existing output directory.
*/
if (!RunIdentifySystem(conn, NULL, &starttli, &startpos, NULL))
disconnect_and_exit(1);
/*
* Always start streaming at the beginning of a segment
*/
startpos -= startpos % XLOG_SEG_SIZE;
/*
* Start the replication
*/
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
progname, (uint32) (startpos >> 32), (uint32) startpos,
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial",
false, false);
PQfinish(conn);
conn = NULL;
}

68
data.c
View File

@ -542,3 +542,71 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
return true;
}
bool
calc_file(pgFile *file)
{
FILE *in;
size_t read_len = 0;
int errno_tmp;
char buf[8192];
struct stat st;
pg_crc32 crc;
INIT_CRC32C(crc);
/* reset size summary */
file->read_size = 0;
file->write_size = 0;
/* open backup mode file for read */
in = fopen(file->path, "r");
if (in == NULL)
{
FIN_CRC32C(crc);
file->crc = crc;
/* maybe deleted, it's not error */
if (errno == ENOENT)
return false;
elog(ERROR, "cannot open source file \"%s\": %s", file->path,
strerror(errno));
}
/* stat source file to change mode of destination file */
if (fstat(fileno(in), &st) == -1)
{
fclose(in);
elog(ERROR, "cannot stat \"%s\": %s", file->path,
strerror(errno));
}
for (;;)
{
if ((read_len = fread(buf, 1, sizeof(buf), in)) != sizeof(buf))
break;
/* update CRC */
COMP_CRC32C(crc, buf, read_len);
file->write_size += sizeof(buf);
file->read_size += sizeof(buf);
}
errno_tmp = errno;
if (!feof(in))
{
fclose(in);
elog(ERROR, "cannot read backup mode file \"%s\": %s",
file->path, strerror(errno_tmp));
}
/* finish CRC calculation and store into pgFile */
FIN_CRC32C(crc);
file->crc = crc;
fclose(in);
return true;
}

View File

@ -44,3 +44,9 @@ page-level backup without validated full backup
0
2
6
###### BACKUP COMMAND TEST-0008 ######
###### ptrack multi thread backup mode + stream ######
0
0
2
6

View File

@ -17,6 +17,7 @@ Common Options:
-B, --backup-path=PATH location of the backup storage area
-c, --check show what would have been done
-j, --threads=NUM num threads for backup and restore
--stream use stream for save/restore WAL during backup
Backup options:
-b, --backup-mode=MODE full,page,ptrack

View File

@ -42,6 +42,12 @@ OK: recovery-target-xid options works well.
0
0
###### RESTORE COMMAND TEST-0011 ######
###### recovery in stream mode to latest from full + ptrack backups ######
0
0
0
###### RESTORE COMMAND TEST-0010 ######
###### recovery to latest from full + page backups with loads when ptrack backup do ######
0

View File

@ -8,6 +8,7 @@
*/
#include "pg_arman.h"
#include "streamutil.h"
#include <stdio.h>
#include <stdlib.h>
@ -34,6 +35,7 @@ static bool smooth_checkpoint;
static int keep_data_generations = KEEP_INFINITE;
static int keep_data_days = KEEP_INFINITE;
int num_threads = 1;
bool stream_wal = false;
static bool backup_validate = false;
/* restore configuration */
@ -57,6 +59,7 @@ static pgut_option options[] =
/* common options */
{ 'b', 'c', "check", &check },
{ 'i', 'j', "threads", &num_threads },
{ 'b', 8, "stream", &stream_wal },
/* backup options */
{ 'f', 'b', "backup-mode", opt_backup_mode, SOURCE_ENV },
{ 'b', 'C', "smooth-checkpoint", &smooth_checkpoint, SOURCE_ENV },
@ -136,6 +139,16 @@ main(int argc, char *argv[])
join_path_components(path, backup_path, PG_RMAN_INI_FILE);
pgut_readopt(path, options, ERROR);
/* setup stream options */
if (pgut_dbname != NULL)
dbname = pstrdup(pgut_dbname);
if (host != NULL)
dbhost = pstrdup(host);
if (port != NULL)
dbport = pstrdup(port);
if (username != NULL)
dbuser = pstrdup(username);
}
/* BACKUP_PATH is always required */
@ -218,6 +231,7 @@ pgut_help(bool details)
printf(_(" -B, --backup-path=PATH location of the backup storage area\n"));
printf(_(" -c, --check show what would have been done\n"));
printf(_(" -j, --threads=NUM num threads for backup and restore\n"));
printf(_(" --stream use stream for save/restore WAL during backup\n"));
printf(_("\nBackup options:\n"));
printf(_(" -b, --backup-mode=MODE full,page,ptrack\n"));
printf(_(" -C, --smooth-checkpoint do smooth checkpoint before backup\n"));

View File

@ -207,6 +207,7 @@ extern const char *pgdata_exclude[];
extern parray *backup_files_list;
extern int num_threads;
extern bool stream_wal;
/* in backup.c */
extern int do_backup(pgBackupOption bkupopt);
@ -291,6 +292,8 @@ extern void restore_data_file(const char *from_root, const char *to_root,
extern bool copy_file(const char *from_root, const char *to_root,
pgFile *file);
extern bool calc_file(pgFile *file);
/* parsexlog.c */
extern void extractPageMap(const char *datadir, XLogRecPtr startpoint,
TimeLineID tli, XLogRecPtr endpoint);

View File

@ -28,7 +28,7 @@
const char *PROGRAM_NAME = NULL;
const char *dbname = NULL;
const char *pgut_dbname = NULL;
const char *host = NULL;
const char *port = NULL;
const char *username = NULL;
@ -61,7 +61,7 @@ static const char *get_username(void);
static pgut_option default_options[] =
{
{ 's', 'd', "dbname" , &dbname },
{ 's', 'd', "dbname" , &pgut_dbname },
{ 's', 'h', "host" , &host },
{ 's', 'p', "port" , &port },
{ 'b', 'q', "quiet" , &quiet },
@ -627,10 +627,10 @@ pgut_getopt(int argc, char **argv, pgut_option options[])
/* Read environment variables */
option_from_env(options);
(void) (dbname ||
(dbname = getenv("PGDATABASE")) ||
(dbname = getenv("PGUSER")) ||
(dbname = get_username()));
(void) (pgut_dbname ||
(pgut_dbname = getenv("PGDATABASE")) ||
(pgut_dbname = getenv("PGUSER")) ||
(pgut_dbname = get_username()));
init_cancel_handler();
atexit(on_cleanup);
@ -897,7 +897,7 @@ pgut_connect(int elevel)
/* Start the connection. Loop until we have a password if requested by backend. */
for (;;)
{
conn = PQsetdbLogin(host, port, NULL, NULL, dbname, username, password);
conn = PQsetdbLogin(host, port, NULL, NULL, pgut_dbname, username, password);
if (PQstatus(conn) == CONNECTION_OK)
return conn;
@ -910,7 +910,7 @@ pgut_connect(int elevel)
continue;
}
#endif
elog(elevel, "could not connect to database %s: %s", dbname, PQerrorMessage(conn));
elog(elevel, "could not connect to database %s: %s", pgut_dbname, PQerrorMessage(conn));
PQfinish(conn);
return NULL;
}

View File

@ -86,7 +86,7 @@ extern void pgut_help(bool details);
/*
* pgut framework variables and functions
*/
extern const char *dbname;
extern const char *pgut_dbname;
extern const char *host;
extern const char *port;
extern const char *username;

View File

@ -208,7 +208,8 @@ base_backup_found:
}
/* create recovery.conf */
create_recovery_conf(target_time, target_xid, target_inclusive, target_tli);
if (!stream_wal)
create_recovery_conf(target_time, target_xid, target_inclusive, target_tli);
/* release catalog lock */
catalog_unlock();
@ -470,6 +471,7 @@ create_recovery_conf(const char *target_time,
fprintf(fp, "# recovery.conf generated by pg_arman %s\n",
PROGRAM_VERSION);
fprintf(fp, "restore_command = 'cp %s/%%f %%p'\n", arclog_path);
if (target_time)
fprintf(fp, "recovery_target_time = '%s'\n", target_time);
if (target_xid)

View File

@ -96,6 +96,17 @@ pg_arman show -B ${BACKUP_PATH} > ${TEST_BASE}/TEST-0007.log 2>&1
grep -c OK ${TEST_BASE}/TEST-0007.log
grep OK ${TEST_BASE}/TEST-0007.log | sed -e 's@[^-]@@g' | wc -c | sed 's/^ *//'
echo '###### BACKUP COMMAND TEST-0008 ######'
echo '###### ptrack multi thread backup mode + stream ######'
init_catalog
pg_arman backup -B ${BACKUP_PATH} -b full -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0008-run.log 2>&1;echo $?
pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0008-run.log 2>&1
pg_arman backup -B ${BACKUP_PATH} -b ptrack -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0008-run.log 2>&1;echo $?
pg_arman validate -B ${BACKUP_PATH} >> ${TEST_BASE}/TEST-0008-run.log 2>&1
pg_arman show -B ${BACKUP_PATH} > ${TEST_BASE}/TEST-0008.log 2>&1
grep -c OK ${TEST_BASE}/TEST-0008.log
grep OK ${TEST_BASE}/TEST-0008.log | sed -e 's@[^-]@@g' | wc -c | sed 's/^ *//'
# cleanup
## clean up the temporal test data
pg_ctl stop -m immediate -D ${PGDATA_PATH} > /dev/null 2>&1

View File

@ -74,8 +74,14 @@ wal_log_hints = on
archive_mode = on
archive_command = 'cp %p ${ARCLOG_PATH}/%f'
ptrack_enable = on
max_wal_senders = 5
wal_keep_segments = 32
EOF
cat << EOF >> $PGDATA_PATH/pg_hba.conf
local replication all trust
host replication all 127.0.0.1/32 trust
EOF
# start PostgreSQL
pg_ctl start -D ${PGDATA_PATH} -w -t 300 > /dev/null 2>&1
pgbench -i -p ${TEST_PGPORT} -d postgres > ${TEST_BASE}/pgbench.log 2>&1

View File

@ -170,6 +170,22 @@ psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches
diff ${TEST_BASE}/TEST-0007-before.out ${TEST_BASE}/TEST-0007-after.out
echo ''
echo '###### RESTORE COMMAND TEST-0011 ######'
echo '###### recovery in stream mode to latest from full + ptrack backups ######'
init_backup
pgbench_objs 0006
pg_arman backup -B ${BACKUP_PATH} -b full -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0011-run.out 2>&1;echo $?
pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1
pgbench -p ${TEST_PGPORT} -d pgbench > /dev/null 2>&1
pg_arman backup -B ${BACKUP_PATH} -b ptrack -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1;echo $?
pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1
psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0011-before.out
pg_ctl stop -m immediate > /dev/null 2>&1
pg_arman restore -B ${BACKUP_PATH} --stream --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1;echo $?
pg_ctl start -w -t 600 > /dev/null 2>&1
psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0011-after.out
diff ${TEST_BASE}/TEST-0011-before.out ${TEST_BASE}/TEST-0011-after.out
echo ''
echo '###### RESTORE COMMAND TEST-0010 ######'
echo '###### recovery to latest from full + page backups with loads when ptrack backup do ######'