diff --git a/Makefile b/Makefile index 3b450367..3902485c 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,8 @@ OBJS = backup.o \ datapagemap.o \ parsexlog.o \ xlogreader.o \ + streamutil.o \ + receivelog.o \ pgut/pgut.o \ pgut/pgut-port.o @@ -39,7 +41,7 @@ PG_LIBS = $(libpq_pgport) ${PTHREAD_LIBS} ${PTHREAD_CFLAGS} REGRESS = init option show delete backup restore -all: checksrcdir docs datapagemap.h pg_arman +all: checksrcdir docs datapagemap.h receivelog.h streamutil.h pg_arman # This rule's only purpose is to give the user instructions on how to pass # the path to PostgreSQL source tree to the makefile. diff --git a/backup.c b/backup.c index 1ee4a82d..323d62d6 100644 --- a/backup.c +++ b/backup.c @@ -23,14 +23,19 @@ #include "pgut/pgut-port.h" #include "storage/bufpage.h" #include "datapagemap.h" +#include "streamutil.h" +#include "receivelog.h" /* wait 10 sec until WAL archive complete */ -#define TIMEOUT_ARCHIVE 10 +#define TIMEOUT_ARCHIVE 10 /* Server version */ static int server_version = 0; -static bool in_backup = false; /* TODO: more robust logic */ +static bool in_backup = false; /* TODO: more robust logic */ +static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr; +const char *progname = "pg_arman"; /* list of files contained in backup */ parray *backup_files_list; @@ -71,6 +76,15 @@ static void create_file_list(parray *files, bool is_append); static void wait_for_archive(pgBackup *backup, const char *sql); static void make_pagemap_from_ptrack(parray *files); +static void StreamLog(void *arg); + + +#define disconnect_and_exit(code) \ + { \ + if (conn != NULL) PQfinish(conn); \ + exit(code); \ + } + /* * Take a backup of database and return the list of files backed up. @@ -82,12 +96,14 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt) parray *prev_files = NULL; /* file list of previous database backup */ FILE *fp; char path[MAXPGPATH]; + char dst_backup_path[MAXPGPATH]; char label[1024]; XLogRecPtr *lsn = NULL; char prev_file_txt[MAXPGPATH]; /* path of the previous backup * list file */ bool has_backup_label = true; /* flag if backup_label is there */ pthread_t backup_threads[num_threads]; + pthread_t stream_thread; backup_files_args *backup_threads_args[num_threads]; /* repack the options */ @@ -129,9 +145,19 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt) "or validate existing one."); } + /* clear ptrack files for FULL and DIFF backup */ if (current.backup_mode != BACKUP_MODE_DIFF_PTRACK) pg_ptrack_clear(); + /* start stream replication */ + if (stream_wal) + { + pgBackupGetPath(¤t, path, lengthof(path), DATABASE_DIR); + join_path_components(dst_backup_path, path, "pg_xlog"); + dir_create_dir(dst_backup_path, DIR_PERMISSION); + pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path); + } + /* notify start of backup to PostgreSQL server */ time2iso(label, lengthof(label), current.start_time); strncat(label, " with pg_arman", lengthof(label)); @@ -322,6 +348,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt) /* Notify end of backup */ pg_stop_backup(¤t); + if (stream_wal) + { + parray *list_file; + char pg_xlog_path[MAXPGPATH]; + + /* We expect the completion of stream */ + pthread_join(stream_thread, NULL); + + /* Scan backup pg_xlog dir */ + list_file = parray_new(); + join_path_components(pg_xlog_path, path, "pg_xlog"); + dir_list_file(list_file, pg_xlog_path, NULL, true, false); + + /* Remove file path root prefix and calc meta */ + for (i = 0; i < parray_num(list_file); i++) + { + pgFile *file = (pgFile *)parray_get(list_file, i); + + calc_file(file); + if (strstr(file->path, path) == file->path) + { + char *ptr = file->path; + file->path = pstrdup(JoinPathEnd(ptr, path)); + free(ptr); + } + } + parray_concat(backup_files_list, list_file); + } + /* Create file list */ create_file_list(backup_files_list, pgdata, DATABASE_FILE_LIST, NULL, false); @@ -549,7 +604,7 @@ static void pg_ptrack_clear(void) { PGresult *res_db, *res; - const char *old_dbname = dbname; + const char *old_dbname = pgut_dbname; int i; reconnect(); @@ -557,8 +612,8 @@ pg_ptrack_clear(void) disconnect(); for(i=0; i < PQntuples(res_db); i++) { - dbname = PQgetvalue(res_db, i, 0); - if (!strcmp(dbname, "template0")) + pgut_dbname = PQgetvalue(res_db, i, 0); + if (!strcmp(pgut_dbname, "template0")) continue; reconnect(); res = execute("SELECT pg_ptrack_clear()", 0, NULL); @@ -566,14 +621,14 @@ pg_ptrack_clear(void) } PQclear(res_db); disconnect(); - dbname = old_dbname; + pgut_dbname = old_dbname; } static char * pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *result_size) { PGresult *res_db, *res; - const char *old_dbname = dbname; + const char *old_dbname = pgut_dbname; char *params[2]; char *result; @@ -584,7 +639,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res sprintf(params[1], "%i", rel_oid); res_db = execute("SELECT datname FROM pg_database WHERE oid=$1", 1, (const char **)params); disconnect(); - dbname = pstrdup(PQgetvalue(res_db, 0, 0)); + pgut_dbname = pstrdup(PQgetvalue(res_db, 0, 0)); PQclear(res_db); reconnect(); @@ -595,8 +650,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res pfree(params[0]); pfree(params[1]); - pfree((char *)dbname); - dbname = old_dbname; + pfree((char *)pgut_dbname); + pgut_dbname = old_dbname; return result; } @@ -683,7 +738,52 @@ wait_for_archive(pgBackup *backup, const char *sql) static void pg_stop_backup(pgBackup *backup) { - wait_for_archive(backup, + if (stream_wal) + { + PGresult *res; + TimeLineID tli; + + reconnect(); + + /* Remove annoying NOTICE messages generated by backend */ + res = execute("SET client_min_messages = warning;", 0, NULL); + PQclear(res); + + /* And execute the query wanted */ + res = execute("SELECT * FROM pg_stop_backup()", 0, NULL); + + /* Get LSN from execution result */ + get_lsn(res, &stop_backup_lsn); + PQclear(res); + + /* + * Enforce TLI obtention if backup is not present as this code + * path can be taken as a callback at exit. + */ + tli = get_current_timeline(false); + + /* Fill in fields if backup exists */ + if (backup != NULL) + { + backup->tli = tli; + backup->stop_lsn = stop_backup_lsn; + elog(LOG, "%s(): tli=%X lsn=%X/%08X", + __FUNCTION__, backup->tli, + (uint32) (backup->stop_lsn >> 32), + (uint32) backup->stop_lsn); + } + + res = execute(TXID_CURRENT_SQL, 0, NULL); + if (backup != NULL) + { + get_xid(res, &backup->recovery_xid); + backup->recovery_time = time(NULL); + } + PQclear(res); + disconnect(); + } + else + wait_for_archive(backup, "SELECT * FROM pg_stop_backup()"); } @@ -719,8 +819,8 @@ get_lsn(PGresult *res, XLogRecPtr *lsn) * Extract timeline and LSN from results of pg_stop_backup() * and friends. */ - XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff); + XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff); /* Calculate LSN */ *lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff; } @@ -1137,3 +1237,98 @@ void make_pagemap_from_ptrack(parray *files) } } } + + +static bool +stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) +{ + static uint32 prevtimeline = 0; + static XLogRecPtr prevpos = InvalidXLogRecPtr; + + /* we assume that we get called once at the end of each segment */ + if (verbose && segment_finished) + fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), + progname, (uint32) (xlogpos >> 32), (uint32) xlogpos, + timeline); + + /* + * Note that we report the previous, not current, position here. After a + * timeline switch, xlogpos points to the beginning of the segment because + * that's where we always begin streaming. Reporting the end of previous + * timeline isn't totally accurate, because the next timeline can begin + * slightly before the end of the WAL that we received on the previous + * timeline, but it's close enough for reporting purposes. + */ + if (prevtimeline != 0 && prevtimeline != timeline) + fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"), + progname, timeline, + (uint32) (prevpos >> 32), (uint32) prevpos); + + if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn) + return true; + + prevtimeline = timeline; + prevpos = xlogpos; + + return false; +} + +/* + * Start the log streaming + */ +static void +StreamLog(void *arg) +{ + XLogRecPtr startpos; + TimeLineID starttli; + char *basedir = (char *)arg; + + /* + * Connect in replication mode to the server + */ + if (conn == NULL) + conn = GetConnection(); + if (!conn) + /* Error message already written in GetConnection() */ + return; + + if (!CheckServerVersionForStreaming(conn)) + { + /* + * Error message already written in CheckServerVersionForStreaming(). + * There's no hope of recovering from a version mismatch, so don't + * retry. + */ + disconnect_and_exit(1); + } + + /* + * Identify server, obtaining start LSN position and current timeline ID + * at the same time, necessary if not valid data can be found in the + * existing output directory. + */ + if (!RunIdentifySystem(conn, NULL, &starttli, &startpos, NULL)) + disconnect_and_exit(1); + + + /* + * Always start streaming at the beginning of a segment + */ + startpos -= startpos % XLOG_SEG_SIZE; + + /* + * Start the replication + */ + if (verbose) + fprintf(stderr, + _("%s: starting log streaming at %X/%X (timeline %u)\n"), + progname, (uint32) (startpos >> 32), (uint32) startpos, + starttli); + + ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, + stop_streaming, standby_message_timeout, ".partial", + false, false); + + PQfinish(conn); + conn = NULL; +} diff --git a/data.c b/data.c index 29bba5c3..75ad3de5 100644 --- a/data.c +++ b/data.c @@ -542,3 +542,71 @@ copy_file(const char *from_root, const char *to_root, pgFile *file) return true; } + +bool +calc_file(pgFile *file) +{ + FILE *in; + size_t read_len = 0; + int errno_tmp; + char buf[8192]; + struct stat st; + pg_crc32 crc; + + INIT_CRC32C(crc); + + /* reset size summary */ + file->read_size = 0; + file->write_size = 0; + + /* open backup mode file for read */ + in = fopen(file->path, "r"); + if (in == NULL) + { + FIN_CRC32C(crc); + file->crc = crc; + + /* maybe deleted, it's not error */ + if (errno == ENOENT) + return false; + + elog(ERROR, "cannot open source file \"%s\": %s", file->path, + strerror(errno)); + } + + /* stat source file to change mode of destination file */ + if (fstat(fileno(in), &st) == -1) + { + fclose(in); + elog(ERROR, "cannot stat \"%s\": %s", file->path, + strerror(errno)); + } + + for (;;) + { + if ((read_len = fread(buf, 1, sizeof(buf), in)) != sizeof(buf)) + break; + + /* update CRC */ + COMP_CRC32C(crc, buf, read_len); + + file->write_size += sizeof(buf); + file->read_size += sizeof(buf); + } + + errno_tmp = errno; + if (!feof(in)) + { + fclose(in); + elog(ERROR, "cannot read backup mode file \"%s\": %s", + file->path, strerror(errno_tmp)); + } + + /* finish CRC calculation and store into pgFile */ + FIN_CRC32C(crc); + file->crc = crc; + + fclose(in); + + return true; +} diff --git a/expected/backup.out b/expected/backup.out index 9770dd16..94496c12 100644 --- a/expected/backup.out +++ b/expected/backup.out @@ -44,3 +44,9 @@ page-level backup without validated full backup 0 2 6 +###### BACKUP COMMAND TEST-0008 ###### +###### ptrack multi thread backup mode + stream ###### +0 +0 +2 +6 diff --git a/expected/option.out b/expected/option.out index 6b90f690..4a01617d 100644 --- a/expected/option.out +++ b/expected/option.out @@ -17,6 +17,7 @@ Common Options: -B, --backup-path=PATH location of the backup storage area -c, --check show what would have been done -j, --threads=NUM num threads for backup and restore + --stream use stream for save/restore WAL during backup Backup options: -b, --backup-mode=MODE full,page,ptrack diff --git a/expected/restore.out b/expected/restore.out index de6c532e..4d539647 100644 --- a/expected/restore.out +++ b/expected/restore.out @@ -42,6 +42,12 @@ OK: recovery-target-xid options works well. 0 0 +###### RESTORE COMMAND TEST-0011 ###### +###### recovery in stream mode to latest from full + ptrack backups ###### +0 +0 +0 + ###### RESTORE COMMAND TEST-0010 ###### ###### recovery to latest from full + page backups with loads when ptrack backup do ###### 0 diff --git a/pg_arman.c b/pg_arman.c index 2edd0632..9a575503 100644 --- a/pg_arman.c +++ b/pg_arman.c @@ -8,6 +8,7 @@ */ #include "pg_arman.h" +#include "streamutil.h" #include #include @@ -34,6 +35,7 @@ static bool smooth_checkpoint; static int keep_data_generations = KEEP_INFINITE; static int keep_data_days = KEEP_INFINITE; int num_threads = 1; +bool stream_wal = false; static bool backup_validate = false; /* restore configuration */ @@ -57,6 +59,7 @@ static pgut_option options[] = /* common options */ { 'b', 'c', "check", &check }, { 'i', 'j', "threads", &num_threads }, + { 'b', 8, "stream", &stream_wal }, /* backup options */ { 'f', 'b', "backup-mode", opt_backup_mode, SOURCE_ENV }, { 'b', 'C', "smooth-checkpoint", &smooth_checkpoint, SOURCE_ENV }, @@ -136,6 +139,16 @@ main(int argc, char *argv[]) join_path_components(path, backup_path, PG_RMAN_INI_FILE); pgut_readopt(path, options, ERROR); + + /* setup stream options */ + if (pgut_dbname != NULL) + dbname = pstrdup(pgut_dbname); + if (host != NULL) + dbhost = pstrdup(host); + if (port != NULL) + dbport = pstrdup(port); + if (username != NULL) + dbuser = pstrdup(username); } /* BACKUP_PATH is always required */ @@ -218,6 +231,7 @@ pgut_help(bool details) printf(_(" -B, --backup-path=PATH location of the backup storage area\n")); printf(_(" -c, --check show what would have been done\n")); printf(_(" -j, --threads=NUM num threads for backup and restore\n")); + printf(_(" --stream use stream for save/restore WAL during backup\n")); printf(_("\nBackup options:\n")); printf(_(" -b, --backup-mode=MODE full,page,ptrack\n")); printf(_(" -C, --smooth-checkpoint do smooth checkpoint before backup\n")); diff --git a/pg_arman.h b/pg_arman.h index d70623a7..2ff2225b 100644 --- a/pg_arman.h +++ b/pg_arman.h @@ -207,6 +207,7 @@ extern const char *pgdata_exclude[]; extern parray *backup_files_list; extern int num_threads; +extern bool stream_wal; /* in backup.c */ extern int do_backup(pgBackupOption bkupopt); @@ -291,6 +292,8 @@ extern void restore_data_file(const char *from_root, const char *to_root, extern bool copy_file(const char *from_root, const char *to_root, pgFile *file); +extern bool calc_file(pgFile *file); + /* parsexlog.c */ extern void extractPageMap(const char *datadir, XLogRecPtr startpoint, TimeLineID tli, XLogRecPtr endpoint); diff --git a/pgut/pgut.c b/pgut/pgut.c index 58deed52..ff71d9e1 100644 --- a/pgut/pgut.c +++ b/pgut/pgut.c @@ -28,7 +28,7 @@ const char *PROGRAM_NAME = NULL; -const char *dbname = NULL; +const char *pgut_dbname = NULL; const char *host = NULL; const char *port = NULL; const char *username = NULL; @@ -61,7 +61,7 @@ static const char *get_username(void); static pgut_option default_options[] = { - { 's', 'd', "dbname" , &dbname }, + { 's', 'd', "dbname" , &pgut_dbname }, { 's', 'h', "host" , &host }, { 's', 'p', "port" , &port }, { 'b', 'q', "quiet" , &quiet }, @@ -627,10 +627,10 @@ pgut_getopt(int argc, char **argv, pgut_option options[]) /* Read environment variables */ option_from_env(options); - (void) (dbname || - (dbname = getenv("PGDATABASE")) || - (dbname = getenv("PGUSER")) || - (dbname = get_username())); + (void) (pgut_dbname || + (pgut_dbname = getenv("PGDATABASE")) || + (pgut_dbname = getenv("PGUSER")) || + (pgut_dbname = get_username())); init_cancel_handler(); atexit(on_cleanup); @@ -897,7 +897,7 @@ pgut_connect(int elevel) /* Start the connection. Loop until we have a password if requested by backend. */ for (;;) { - conn = PQsetdbLogin(host, port, NULL, NULL, dbname, username, password); + conn = PQsetdbLogin(host, port, NULL, NULL, pgut_dbname, username, password); if (PQstatus(conn) == CONNECTION_OK) return conn; @@ -910,7 +910,7 @@ pgut_connect(int elevel) continue; } #endif - elog(elevel, "could not connect to database %s: %s", dbname, PQerrorMessage(conn)); + elog(elevel, "could not connect to database %s: %s", pgut_dbname, PQerrorMessage(conn)); PQfinish(conn); return NULL; } diff --git a/pgut/pgut.h b/pgut/pgut.h index f0fe9cb9..068f9378 100644 --- a/pgut/pgut.h +++ b/pgut/pgut.h @@ -86,7 +86,7 @@ extern void pgut_help(bool details); /* * pgut framework variables and functions */ -extern const char *dbname; +extern const char *pgut_dbname; extern const char *host; extern const char *port; extern const char *username; diff --git a/restore.c b/restore.c index b26b92ec..e46d5cd9 100644 --- a/restore.c +++ b/restore.c @@ -208,7 +208,8 @@ base_backup_found: } /* create recovery.conf */ - create_recovery_conf(target_time, target_xid, target_inclusive, target_tli); + if (!stream_wal) + create_recovery_conf(target_time, target_xid, target_inclusive, target_tli); /* release catalog lock */ catalog_unlock(); @@ -470,6 +471,7 @@ create_recovery_conf(const char *target_time, fprintf(fp, "# recovery.conf generated by pg_arman %s\n", PROGRAM_VERSION); fprintf(fp, "restore_command = 'cp %s/%%f %%p'\n", arclog_path); + if (target_time) fprintf(fp, "recovery_target_time = '%s'\n", target_time); if (target_xid) diff --git a/sql/backup.sh b/sql/backup.sh index 0cf7012b..3ebfcd69 100644 --- a/sql/backup.sh +++ b/sql/backup.sh @@ -96,6 +96,17 @@ pg_arman show -B ${BACKUP_PATH} > ${TEST_BASE}/TEST-0007.log 2>&1 grep -c OK ${TEST_BASE}/TEST-0007.log grep OK ${TEST_BASE}/TEST-0007.log | sed -e 's@[^-]@@g' | wc -c | sed 's/^ *//' +echo '###### BACKUP COMMAND TEST-0008 ######' +echo '###### ptrack multi thread backup mode + stream ######' +init_catalog +pg_arman backup -B ${BACKUP_PATH} -b full -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0008-run.log 2>&1;echo $? +pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0008-run.log 2>&1 +pg_arman backup -B ${BACKUP_PATH} -b ptrack -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0008-run.log 2>&1;echo $? +pg_arman validate -B ${BACKUP_PATH} >> ${TEST_BASE}/TEST-0008-run.log 2>&1 +pg_arman show -B ${BACKUP_PATH} > ${TEST_BASE}/TEST-0008.log 2>&1 +grep -c OK ${TEST_BASE}/TEST-0008.log +grep OK ${TEST_BASE}/TEST-0008.log | sed -e 's@[^-]@@g' | wc -c | sed 's/^ *//' + # cleanup ## clean up the temporal test data pg_ctl stop -m immediate -D ${PGDATA_PATH} > /dev/null 2>&1 diff --git a/sql/common.sh b/sql/common.sh index dd994400..dfd6e470 100644 --- a/sql/common.sh +++ b/sql/common.sh @@ -74,8 +74,14 @@ wal_log_hints = on archive_mode = on archive_command = 'cp %p ${ARCLOG_PATH}/%f' ptrack_enable = on +max_wal_senders = 5 +wal_keep_segments = 32 EOF + cat << EOF >> $PGDATA_PATH/pg_hba.conf +local replication all trust +host replication all 127.0.0.1/32 trust +EOF # start PostgreSQL pg_ctl start -D ${PGDATA_PATH} -w -t 300 > /dev/null 2>&1 pgbench -i -p ${TEST_PGPORT} -d postgres > ${TEST_BASE}/pgbench.log 2>&1 diff --git a/sql/restore.sh b/sql/restore.sh index 8f9b4e29..ea2b6574 100644 --- a/sql/restore.sh +++ b/sql/restore.sh @@ -170,6 +170,22 @@ psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches diff ${TEST_BASE}/TEST-0007-before.out ${TEST_BASE}/TEST-0007-after.out echo '' +echo '###### RESTORE COMMAND TEST-0011 ######' +echo '###### recovery in stream mode to latest from full + ptrack backups ######' +init_backup +pgbench_objs 0006 +pg_arman backup -B ${BACKUP_PATH} -b full -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose > ${TEST_BASE}/TEST-0011-run.out 2>&1;echo $? +pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1 +pgbench -p ${TEST_PGPORT} -d pgbench > /dev/null 2>&1 +pg_arman backup -B ${BACKUP_PATH} -b ptrack -j 4 --stream -p ${TEST_PGPORT} -d postgres --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1;echo $? +pg_arman validate -B ${BACKUP_PATH} --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1 +psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0011-before.out +pg_ctl stop -m immediate > /dev/null 2>&1 +pg_arman restore -B ${BACKUP_PATH} --stream --verbose >> ${TEST_BASE}/TEST-0011-run.out 2>&1;echo $? +pg_ctl start -w -t 600 > /dev/null 2>&1 +psql --no-psqlrc -p ${TEST_PGPORT} -d pgbench -c "SELECT * FROM pgbench_branches;" > ${TEST_BASE}/TEST-0011-after.out +diff ${TEST_BASE}/TEST-0011-before.out ${TEST_BASE}/TEST-0011-after.out +echo '' echo '###### RESTORE COMMAND TEST-0010 ######' echo '###### recovery to latest from full + page backups with loads when ptrack backup do ######'