From 4befefada2eeda3eade56f532f28e62e78a638c9 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Wed, 17 Oct 2018 14:44:21 +0300 Subject: [PATCH 01/16] Skip copy WAL file to archive if already exists --- src/data.c | 90 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 11 deletions(-) diff --git a/src/data.c b/src/data.c index b1acfecc..9d5267a8 100644 --- a/src/data.c +++ b/src/data.c @@ -29,6 +29,9 @@ typedef union DataPage char data[BLCKSZ]; } DataPage; +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed); + #ifdef HAVE_LIBZ /* Implementation of zlib compression method */ static int32 @@ -1092,14 +1095,21 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, FILE *in = NULL; FILE *out=NULL; char buf[XLOG_BLCKSZ]; - const char *to_path_p = to_path; + const char *to_path_p; char to_path_temp[MAXPGPATH]; int errno_temp; #ifdef HAVE_LIBZ char gz_to_path[MAXPGPATH]; gzFile gz_out = NULL; + if (is_compress) + { + snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); + to_path_p = gz_to_path; + } + else #endif + to_path_p = to_path; /* open file for read */ in = fopen(from_path, PG_BINARY_R); @@ -1107,30 +1117,30 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, strerror(errno)); + /* Check if possible to skip copying */ + if (fileExists(to_path_p)) + { + if (fileEqualCRC(from_path, to_path_p, is_compress)) + return; + /* Do not copy and do not rise error. Just quit as normal. */ + else if (!overwrite) + elog(ERROR, "WAL segment \"%s\" already exists.", to_path); + } + /* open backup file for write */ #ifdef HAVE_LIBZ if (is_compress) { - snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); - - if (!overwrite && fileExists(gz_to_path)) - elog(ERROR, "WAL segment \"%s\" already exists.", gz_to_path); - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); gz_out = gzopen(to_path_temp, PG_BINARY_W); if (gzsetparams(gz_out, compress_level, Z_DEFAULT_STRATEGY) != Z_OK) elog(ERROR, "Cannot set compression level %d to file \"%s\": %s", compress_level, to_path_temp, get_gz_error(gz_out, errno)); - - to_path_p = gz_to_path; } else #endif { - if (!overwrite && fileExists(to_path)) - elog(ERROR, "WAL segment \"%s\" already exists.", to_path); - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); out = fopen(to_path_temp, PG_BINARY_W); @@ -1724,3 +1734,61 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, return is_valid; } + +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) +{ + pg_crc32 crc_from; + pg_crc32 crc_to; + + /* Get checksum of backup file */ +#ifdef HAVE_LIBZ + if (path2_is_compressed) + { + char buf [1024]; + gzFile gz_in = NULL; + + INIT_CRC32C(crc_to); + gz_in = gzopen(path2, PG_BINARY_R); + if (gz_in == NULL) + { + /* There is no such file or it cannot be read */ + elog(LOG, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + return false; + } + + for (;;) + { + size_t read_len = 0; + read_len = gzread(gz_in, buf, sizeof(buf)); + if (read_len != sizeof(buf) && !gzeof(gz_in)) + { + /* An error occurred while reading the file */ + elog(LOG, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + return false; + } + COMP_CRC32C(crc_to, buf, read_len); + if (gzeof(gz_in) || read_len == 0) + break; + } + FIN_CRC32C(crc_to); + + if (gzclose(gz_in) != 0) + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + path2, get_gz_error(gz_in, errno)); + } + else +#endif + { + crc_to = pgFileGetCRC(path2); + } + + /* Get checksum of original file */ + crc_from = pgFileGetCRC(path1); + + return EQ_CRC32C(crc_from, crc_to); +} From 6deb3bbe2bda3f03c89cc64867a3f647d8bacdf5 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Wed, 17 Oct 2018 15:40:08 +0300 Subject: [PATCH 02/16] fileEqualCRC() refactoring --- src/data.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/data.c b/src/data.c index 9d5267a8..b9b25b11 100644 --- a/src/data.c +++ b/src/data.c @@ -1738,8 +1738,8 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, static bool fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) { - pg_crc32 crc_from; - pg_crc32 crc_to; + pg_crc32 crc1; + pg_crc32 crc2; /* Get checksum of backup file */ #ifdef HAVE_LIBZ @@ -1748,7 +1748,7 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) char buf [1024]; gzFile gz_in = NULL; - INIT_CRC32C(crc_to); + INIT_CRC32C(crc2); gz_in = gzopen(path2, PG_BINARY_R); if (gz_in == NULL) { @@ -1771,11 +1771,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) path1, path2); return false; } - COMP_CRC32C(crc_to, buf, read_len); + COMP_CRC32C(crc2, buf, read_len); if (gzeof(gz_in) || read_len == 0) break; } - FIN_CRC32C(crc_to); + FIN_CRC32C(crc2); if (gzclose(gz_in) != 0) elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", @@ -1784,11 +1784,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) else #endif { - crc_to = pgFileGetCRC(path2); + crc2 = pgFileGetCRC(path2); } /* Get checksum of original file */ - crc_from = pgFileGetCRC(path1); + crc1 = pgFileGetCRC(path1); - return EQ_CRC32C(crc_from, crc_to); + return EQ_CRC32C(crc1, crc2); } From 20d4dcc291c817e0fd535fbf1b84b641335a4b51 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Thu, 18 Oct 2018 12:06:54 +0300 Subject: [PATCH 03/16] Modifyed test test_arhive_push_file_exists. Fixed fileEqualCRC() error handling --- src/data.c | 15 +++++---------- tests/archive.py | 12 +++++++++++- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/data.c b/src/data.c index b9b25b11..1ed5bfee 100644 --- a/src/data.c +++ b/src/data.c @@ -1751,32 +1751,27 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) INIT_CRC32C(crc2); gz_in = gzopen(path2, PG_BINARY_R); if (gz_in == NULL) - { - /* There is no such file or it cannot be read */ - elog(LOG, + /* File cannot be read */ + elog(ERROR, "Cannot compare WAL file \"%s\" with compressed \"%s\"", path1, path2); - return false; - } for (;;) { size_t read_len = 0; read_len = gzread(gz_in, buf, sizeof(buf)); if (read_len != sizeof(buf) && !gzeof(gz_in)) - { /* An error occurred while reading the file */ - elog(LOG, + elog(ERROR, "Cannot compare WAL file \"%s\" with compressed \"%s\"", path1, path2); - return false; - } + COMP_CRC32C(crc2, buf, read_len); if (gzeof(gz_in) || read_len == 0) break; } FIN_CRC32C(crc2); - + if (gzclose(gz_in) != 0) elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", path2, get_gz_error(gz_in, errno)); diff --git a/tests/archive.py b/tests/archive.py index 4ed783d6..45b235cd 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1,4 +1,6 @@ import os +import shutil +import zlib import unittest from .helpers.ptrack_helpers import ProbackupTest, ProbackupException, archive_script from datetime import datetime, timedelta @@ -325,7 +327,15 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): ) self.assertFalse('pg_probackup archive-push completed successfully' in log_content) - os.remove(file) + wal_src = os.path.join(node.data_dir, 'pg_wal', '000000010000000000000001') + if self.archive_compress: + with open(wal_src, 'rb') as f_in, open(file, 'wb') as f_out: + original_wal = f_in.read() + compressed_wal = zlib.compress(original_wal, 1) + f_out.write(compressed_wal) + else: + shutil.copyfile(wal_src, file) + self.switch_wal_segment(node) sleep(5) From 6d0cbfa2328b2903e2aaa9d216b221524b4bdccb Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Thu, 18 Oct 2018 15:05:51 +0300 Subject: [PATCH 04/16] Fix test test_arhive_push_file_exists --- src/data.c | 2 +- tests/archive.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/data.c b/src/data.c index 1ed5bfee..4c75e1c9 100644 --- a/src/data.c +++ b/src/data.c @@ -1124,7 +1124,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, return; /* Do not copy and do not rise error. Just quit as normal. */ else if (!overwrite) - elog(ERROR, "WAL segment \"%s\" already exists.", to_path); + elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p); } /* open backup file for write */ diff --git a/tests/archive.py b/tests/archive.py index 45b235cd..a311ac44 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1,6 +1,6 @@ import os import shutil -import zlib +import gzip import unittest from .helpers.ptrack_helpers import ProbackupTest, ProbackupException, archive_script from datetime import datetime, timedelta @@ -329,13 +329,11 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): wal_src = os.path.join(node.data_dir, 'pg_wal', '000000010000000000000001') if self.archive_compress: - with open(wal_src, 'rb') as f_in, open(file, 'wb') as f_out: - original_wal = f_in.read() - compressed_wal = zlib.compress(original_wal, 1) - f_out.write(compressed_wal) + with open(wal_src, 'rb') as f_in, gzip.open(file, 'wb', compresslevel=1) as f_out: + shutil.copyfileobj(f_in, f_out) else: shutil.copyfile(wal_src, file) - + self.switch_wal_segment(node) sleep(5) From f53395529b95ed923c87271001531b6834853c54 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Thu, 15 Nov 2018 18:22:43 +0300 Subject: [PATCH 05/16] Refactored calc_file_checksum() --- src/data.c | 72 ++++------------------------------------------ src/dir.c | 37 ++++++++++++++++++------ src/merge.c | 2 +- src/pg_probackup.h | 5 ++-- src/util.c | 2 +- src/validate.c | 3 +- 6 files changed, 40 insertions(+), 81 deletions(-) diff --git a/src/data.c b/src/data.c index 4c75e1c9..924e0fb7 100644 --- a/src/data.c +++ b/src/data.c @@ -1418,75 +1418,13 @@ get_wal_file(const char *from_path, const char *to_path) * but created in process of backup, such as stream XLOG files, * PG_TABLESPACE_MAP_FILE and PG_BACKUP_LABEL_FILE. */ -bool +void calc_file_checksum(pgFile *file) { - FILE *in; - size_t read_len = 0; - int errno_tmp; - char buf[BLCKSZ]; - struct stat st; - pg_crc32 crc; - Assert(S_ISREG(file->mode)); - INIT_TRADITIONAL_CRC32(crc); - /* reset size summary */ - file->read_size = 0; - file->write_size = 0; - - /* open backup mode file for read */ - in = fopen(file->path, PG_BINARY_R); - if (in == NULL) - { - FIN_TRADITIONAL_CRC32(crc); - file->crc = crc; - - /* maybe deleted, it's not error */ - if (errno == ENOENT) - return false; - - elog(ERROR, "cannot open source file \"%s\": %s", file->path, - strerror(errno)); - } - - /* stat source file to change mode of destination file */ - if (fstat(fileno(in), &st) == -1) - { - fclose(in); - elog(ERROR, "cannot stat \"%s\": %s", file->path, - strerror(errno)); - } - - for (;;) - { - read_len = fread(buf, 1, sizeof(buf), in); - - if(read_len == 0) - break; - - /* update CRC */ - COMP_TRADITIONAL_CRC32(crc, buf, read_len); - - file->write_size += read_len; - file->read_size += read_len; - } - - errno_tmp = errno; - if (!feof(in)) - { - fclose(in); - elog(ERROR, "cannot read backup mode file \"%s\": %s", - file->path, strerror(errno_tmp)); - } - - /* finish CRC calculation and store into pgFile */ - FIN_TRADITIONAL_CRC32(crc); - file->crc = crc; - - fclose(in); - - return true; + file->crc = pgFileGetCRC(file->path, false, false, &file->read_size); + file->write_size = file->read_size; } /* @@ -1779,11 +1717,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) else #endif { - crc2 = pgFileGetCRC(path2); + crc2 = pgFileGetCRC(path2, false, true, NULL); } /* Get checksum of original file */ - crc1 = pgFileGetCRC(path1); + crc1 = pgFileGetCRC(path1, false, true, NULL); return EQ_CRC32C(crc1, crc2); } diff --git a/src/dir.c b/src/dir.c index 5c6f60d8..11712c03 100644 --- a/src/dir.c +++ b/src/dir.c @@ -259,36 +259,55 @@ delete_file: } pg_crc32 -pgFileGetCRC(const char *file_path, bool use_crc32c) +pgFileGetCRC(const char *file_path, bool use_crc32c, bool raise_on_deleted, + size_t *bytes_read) { FILE *fp; pg_crc32 crc = 0; char buf[1024]; size_t len; + size_t total = 0; int errno_tmp; + INIT_FILE_CRC32(use_crc32c, crc); + /* open file in binary read mode */ fp = fopen(file_path, PG_BINARY_R); if (fp == NULL) - elog(ERROR, "cannot open file \"%s\": %s", - file_path, strerror(errno)); + { + if (!raise_on_deleted && errno == ENOENT) + { + FIN_FILE_CRC32(use_crc32c, crc); + return crc; + } + else + elog(ERROR, "cannot open file \"%s\": %s", + file_path, strerror(errno)); + } - /* calc CRC of backup file */ - INIT_FILE_CRC32(use_crc32c, crc); - while ((len = fread(buf, 1, sizeof(buf), fp)) == sizeof(buf)) + /* calc CRC of file */ + for (;;) { if (interrupted) elog(ERROR, "interrupted during CRC calculation"); + + len = fread(buf, 1, sizeof(buf), fp); + if(len == 0) + break; + /* update CRC */ COMP_FILE_CRC32(use_crc32c, crc, buf, len); + total += len; } + + if (bytes_read) + *bytes_read = total; + errno_tmp = errno; if (!feof(fp)) elog(WARNING, "cannot read \"%s\": %s", file_path, strerror(errno_tmp)); - if (len > 0) - COMP_FILE_CRC32(use_crc32c, crc, buf, len); - FIN_FILE_CRC32(use_crc32c, crc); + FIN_FILE_CRC32(use_crc32c, crc); fclose(fp); return crc; diff --git a/src/merge.c b/src/merge.c index 13263c64..95682561 100644 --- a/src/merge.c +++ b/src/merge.c @@ -524,7 +524,7 @@ merge_files(void *arg) * do that. */ file->write_size = pgFileSize(to_path_tmp); - file->crc = pgFileGetCRC(to_path_tmp, false); + file->crc = pgFileGetCRC(to_path_tmp, false, true, NULL); } } else diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 0fad6d70..c838c923 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -531,7 +531,8 @@ extern pgFile *pgFileNew(const char *path, bool omit_symlink); extern pgFile *pgFileInit(const char *path); extern void pgFileDelete(pgFile *file); extern void pgFileFree(void *file); -extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c); +extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c, + bool raise_on_deleted, size_t *bytes_read); extern int pgFileComparePath(const void *f1, const void *f2); extern int pgFileComparePathDesc(const void *f1, const void *f2); extern int pgFileCompareLinked(const void *f1, const void *f2); @@ -552,7 +553,7 @@ extern void push_wal_file(const char *from_path, const char *to_path, bool is_compress, bool overwrite); extern void get_wal_file(const char *from_path, const char *to_path); -extern bool calc_file_checksum(pgFile *file); +extern void calc_file_checksum(pgFile *file); extern bool check_file_pages(pgFile* file, XLogRecPtr stop_lsn, diff --git a/src/util.c b/src/util.c index f2820650..946d5e49 100644 --- a/src/util.c +++ b/src/util.c @@ -334,7 +334,7 @@ set_min_recovery_point(pgFile *file, const char *backup_path, XLogRecPtr stop_ba writeControlFile(&ControlFile, fullpath); /* Update pg_control checksum in backup_list */ - file->crc = pgFileGetCRC(fullpath, false); + file->crc = pgFileGetCRC(fullpath, false, true, NULL); pg_free(buffer); } diff --git a/src/validate.c b/src/validate.c index 55ea5ebe..0e6fcb9a 100644 --- a/src/validate.c +++ b/src/validate.c @@ -224,7 +224,8 @@ pgBackupValidateFiles(void *arg) * To avoid this problem we need to use different algorithm, CRC-32 in * this case. */ - crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021); + crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021, + true, NULL); if (crc != file->crc) { elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X", From fe00564707f385870631be429eeb73c062a7edb6 Mon Sep 17 00:00:00 2001 From: Grigory Smolkin Date: Fri, 16 Nov 2018 09:35:41 +0300 Subject: [PATCH 06/16] tests: minor fixes --- tests/archive.py | 21 +++-- tests/helpers/ptrack_helpers.py | 4 +- tests/replica.py | 141 ++++++++++++++++++-------------- 3 files changed, 93 insertions(+), 73 deletions(-) diff --git a/tests/archive.py b/tests/archive.py index a311ac44..355c4b07 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -7,7 +7,6 @@ from datetime import datetime, timedelta import subprocess from sys import exit from time import sleep -from shutil import copyfile module_name = 'archive' @@ -248,7 +247,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): node.append_conf( 'postgresql.auto.conf', "archive_command = '{0} %p %f'".format( archive_script_path)) + node.slow_start() + try: self.backup_node( backup_dir, 'node', node, @@ -262,6 +263,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): "Expecting Error because pg_stop_backup failed to answer.\n " "Output: {0} \n CMD: {1}".format( repr(self.output), self.cmd)) + except ProbackupException as e: self.assertTrue( "ERROR: pg_stop_backup doesn't answer" in e.message and @@ -327,9 +329,11 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): ) self.assertFalse('pg_probackup archive-push completed successfully' in log_content) - wal_src = os.path.join(node.data_dir, 'pg_wal', '000000010000000000000001') + wal_src = os.path.join( + node.data_dir, 'pg_wal', '000000010000000000000001') if self.archive_compress: - with open(wal_src, 'rb') as f_in, gzip.open(file, 'wb', compresslevel=1) as f_out: + with open(wal_src, 'rb') as f_in, gzip.open( + file, 'wb', compresslevel=1) as f_out: shutil.copyfileobj(f_in, f_out) else: shutil.copyfile(wal_src, file) @@ -506,10 +510,6 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): "postgres", "CHECKPOINT") -# copyfile( -# os.path.join(backup_dir, 'wal/master/000000010000000000000002'), -# os.path.join(backup_dir, 'wal/replica/000000010000000000000002')) - backup_id = self.backup_node( backup_dir, 'replica', replica, backup_type='page', @@ -604,10 +604,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): "md5(repeat(i::text,10))::tsvector as tsvector " "from generate_series(0, 60000) i") - # TAKE FULL ARCHIVE BACKUP FROM REPLICA - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000001'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000001')) + master.psql( + "postgres", + "CHECKPOINT") backup_id = self.backup_node( backup_dir, 'replica', replica, diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index 7b4b410b..b8a09343 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -874,8 +874,8 @@ class ProbackupTest(object): return out_dict def set_archiving( - self, backup_dir, instance, node, replica=False, overwrite=False, compress=False, - old_binary=False): + self, backup_dir, instance, node, replica=False, + overwrite=False, compress=False, old_binary=False): if replica: archive_mode = 'always' diff --git a/tests/replica.py b/tests/replica.py index 1ab8515e..f1591d81 100644 --- a/tests/replica.py +++ b/tests/replica.py @@ -5,7 +5,6 @@ from datetime import datetime, timedelta import subprocess from sys import exit import time -from shutil import copyfile module_name = 'replica' @@ -27,8 +26,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s', 'ptrack_enable': 'on'} + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'ptrack_enable': 'on'} ) master.start() self.init_pb(backup_dir) @@ -144,7 +144,6 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): pg_options={ 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s', 'archive_timeout': '10s'} ) self.init_pb(backup_dir) @@ -171,7 +170,8 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): self.restore_node(backup_dir, 'master', replica) # Settings for Replica - self.set_replica(master, replica) + self.add_instance(backup_dir, 'replica', replica) + self.set_replica(master, replica, synchronous=True) self.set_archiving(backup_dir, 'replica', replica, replica=True) replica.slow_start(replica=True) @@ -187,31 +187,23 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): "postgres", "insert into t_heap as select i as id, md5(i::text) as text, " "md5(repeat(i::text,10))::tsvector as tsvector " - "from generate_series(256,5120) i") + "from generate_series(256,25120) i") before = master.safe_psql("postgres", "SELECT * FROM t_heap") - self.add_instance(backup_dir, 'replica', replica) - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000003'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000003')) + master.psql( + "postgres", + "CHECKPOINT") - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000004'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000004')) - - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000005'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000005')) + self.wait_until_replica_catch_with_master(master, replica) backup_id = self.backup_node( backup_dir, 'replica', replica, options=[ - '--archive-timeout=30', + '--archive-timeout=60', '--master-host=localhost', '--master-db=postgres', - '--master-port={0}'.format(master.port), - '--stream']) + '--master-port={0}'.format(master.port)]) self.validate_pb(backup_dir, 'replica') self.assertEqual( @@ -222,8 +214,13 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): base_dir="{0}/{1}/node".format(module_name, fname)) node.cleanup() self.restore_node(backup_dir, 'replica', data_dir=node.data_dir) + node.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(node.port)) + + node.append_conf( + 'postgresql.auto.conf', 'archive_mode = off'.format(node.port)) + node.slow_start() # CHECK DATA CORRECTNESS @@ -234,23 +231,31 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): # Change data on master, make PAGE backup from replica, # restore taken backup and check that restored data equal # to original data - master.psql( - "postgres", - "insert into t_heap as select i as id, md5(i::text) as text, " - "md5(repeat(i::text,10))::tsvector as tsvector " - "from generate_series(512,22680) i") + master.pgbench_init(scale=5) - before = master.safe_psql("postgres", "SELECT * FROM t_heap") + pgbench = master.pgbench( + options=['-T', '30', '-c', '2', '--no-vacuum']) + +# master.psql( +# "postgres", +# "insert into t_heap as select i as id, md5(i::text) as text, " +# "md5(repeat(i::text,10))::tsvector as tsvector " +# "from generate_series(512,25120) i") backup_id = self.backup_node( backup_dir, 'replica', replica, backup_type='page', options=[ - '--archive-timeout=30', + '--archive-timeout=60', '--master-host=localhost', '--master-db=postgres', - '--master-port={0}'.format(master.port), - '--stream']) + '--master-port={0}'.format(master.port)]) + + pgbench.wait() + + self.switch_wal_segment(master) + + before = master.safe_psql("postgres", "SELECT * FROM pgbench_accounts") self.validate_pb(backup_dir, 'replica') self.assertEqual( @@ -258,17 +263,21 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): # RESTORE PAGE BACKUP TAKEN FROM replica self.restore_node( - backup_dir, 'replica', data_dir=node.data_dir, backup_id=backup_id) + backup_dir, 'replica', data_dir=node.data_dir, + backup_id=backup_id) node.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(node.port)) + node.append_conf( 'postgresql.auto.conf', 'archive_mode = off') + node.slow_start() # CHECK DATA CORRECTNESS - after = node.safe_psql("postgres", "SELECT * FROM t_heap") - self.assertEqual(before, after) + after = node.safe_psql("postgres", "SELECT * FROM pgbench_accounts") + self.assertEqual( + before, after, 'Restored data is not equal to original') self.add_instance(backup_dir, 'node', node) self.backup_node( @@ -290,8 +299,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s'} + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'archive_timeout': '10s'} ) self.init_pb(backup_dir) self.add_instance(backup_dir, 'master', master) @@ -310,7 +320,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): "postgres", "create table t_heap as select i as id, md5(i::text) as text, " "md5(repeat(i::text,10))::tsvector as tsvector " - "from generate_series(0,256) i") + "from generate_series(0,8192) i") before = master.safe_psql("postgres", "SELECT * FROM t_heap") @@ -320,6 +330,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): backup_dir, 'master', replica, options=['-R']) # Settings for Replica + self.add_instance(backup_dir, 'replica', replica) self.set_archiving(backup_dir, 'replica', replica, replica=True) replica.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(replica.port)) @@ -328,13 +339,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): replica.slow_start(replica=True) - self.add_instance(backup_dir, 'replica', replica) - - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000003'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000003')) - - self.backup_node(backup_dir, 'replica', replica) + self.backup_node( + backup_dir, 'replica', replica, + options=['--archive-timeout=30s', '--stream']) # Clean after yourself self.del_test_dir(module_name, fname) @@ -353,14 +360,13 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s'} + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'archive_timeout': '10s'} ) self.init_pb(backup_dir) self.add_instance(backup_dir, 'master', master) self.set_archiving(backup_dir, 'master', master) - # force more frequent wal switch - #master.append_conf('postgresql.auto.conf', 'archive_timeout = 10') master.slow_start() replica = self.make_simple_node( @@ -369,6 +375,22 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): self.backup_node(backup_dir, 'master', master) + master.psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,165000) i") + + master.psql( + "postgres", + "CHECKPOINT") + + master.psql( + "postgres", + "create table t_heap_1 as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,165000) i") + self.restore_node( backup_dir, 'master', replica, options=['-R']) @@ -376,36 +398,35 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): self.add_instance(backup_dir, 'replica', replica) self.set_archiving(backup_dir, 'replica', replica, replica=True) - # stupid hack - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000001'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000001')) - replica.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(replica.port)) - replica.append_conf( - 'postgresql.auto.conf', 'hot_standby = on') + replica.slow_start(replica=True) + + self.wait_until_replica_catch_with_master(master, replica) replica.append_conf( 'recovery.conf', "recovery_min_apply_delay = '300s'") - replica.slow_start(replica=True) + replica.restart() master.pgbench_init(scale=10) pgbench = master.pgbench( - options=['-T', '30', '-c', '2', '--no-vacuum']) + options=['-T', '60', '-c', '2', '--no-vacuum']) self.backup_node( - backup_dir, 'replica', replica) + backup_dir, 'replica', + replica, options=['--archive-timeout=60s']) self.backup_node( backup_dir, 'replica', replica, - data_dir=replica.data_dir, backup_type='page') + data_dir=replica.data_dir, + backup_type='page', options=['--archive-timeout=60s']) self.backup_node( - backup_dir, 'replica', replica, backup_type='delta') + backup_dir, 'replica', replica, + backup_type='delta', options=['--archive-timeout=60s']) pgbench.wait() @@ -442,8 +463,8 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s'} + 'wal_level': 'replica', + 'max_wal_senders': '2'} ) self.init_pb(backup_dir) self.add_instance(backup_dir, 'master', master) From d84d79668b0c1394aee17564d484c098a6d56eae Mon Sep 17 00:00:00 2001 From: Grigory Smolkin Date: Fri, 16 Nov 2018 09:39:32 +0300 Subject: [PATCH 07/16] disable wait for archive in pg_stop_backup --- src/backup.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/backup.c b/src/backup.c index dbafd7e1..a5317d8f 100644 --- a/src/backup.c +++ b/src/backup.c @@ -1802,7 +1802,11 @@ pg_stop_backup(pgBackup *backup) #endif " labelfile," " spcmapfile" +#if PG_VERSION_NUM >= 100000 + " FROM pg_catalog.pg_stop_backup(false, false)"; +#else " FROM pg_catalog.pg_stop_backup(false)"; +#endif else stop_backup_query = "SELECT" " pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot())," @@ -1810,7 +1814,11 @@ pg_stop_backup(pgBackup *backup) " lsn," " labelfile," " spcmapfile" +#if PG_VERSION_NUM >= 100000 + " FROM pg_catalog.pg_stop_backup(false, false)"; +#else " FROM pg_catalog.pg_stop_backup(false)"; +#endif } else From 6ea7c61c33c71b905ff6ac8f3316051afa94446b Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Tue, 20 Nov 2018 16:33:36 +0300 Subject: [PATCH 08/16] PGPRO-2180: Retreive stop_lsn from previous WAL segment --- src/backup.c | 23 ++++++++- src/parsexlog.c | 125 ++++++++++++++++++++++++++++++++++++++++++--- src/pg_probackup.h | 3 ++ 3 files changed, 142 insertions(+), 9 deletions(-) diff --git a/src/backup.c b/src/backup.c index a5317d8f..2e36f3f7 100644 --- a/src/backup.c +++ b/src/backup.c @@ -1907,7 +1907,28 @@ pg_stop_backup(pgBackup *backup) if (!XRecOffIsValid(stop_backup_lsn)) { if (XRecOffIsNull(stop_backup_lsn)) - stop_backup_lsn = stop_backup_lsn + SizeOfXLogLongPHD; + { + char *xlog_path, + stream_xlog_path[MAXPGPATH]; + XLogSegNo segno; + + if (stream_wal) + { + pgBackupGetPath2(backup, stream_xlog_path, + lengthof(stream_xlog_path), + DATABASE_DIR, PG_XLOG_DIR); + xlog_path = stream_xlog_path; + } + else + xlog_path = arclog_path; + + GetXLogSegNo(stop_backup_lsn, segno, xlog_seg_size); + /* Retreive stop_lsn from previous segment */ + segno = segno - 1; + stop_backup_lsn = get_last_wal_lsn(xlog_path, backup->start_lsn, + segno, backup->tli, + xlog_seg_size); + } else elog(ERROR, "Invalid stop_backup_lsn value %X/%X", (uint32) (stop_backup_lsn >> 32), (uint32) (stop_backup_lsn)); diff --git a/src/parsexlog.c b/src/parsexlog.c index 5b2e32af..65ae97eb 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -88,7 +88,7 @@ static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime typedef struct XLogPageReadPrivate { - int thread_num; + int thread_num; const char *archivedir; TimeLineID tli; uint32 xlog_seg_size; @@ -132,8 +132,7 @@ static XLogReaderState *InitXLogPageRead(XLogPageReadPrivate *private_data, TimeLineID tli, uint32 xlog_seg_size, bool allocate_reader); static void CleanupXLogPageRead(XLogReaderState *xlogreader); -static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, - int elevel); +static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel); static XLogSegNo nextSegNoToRead = 0; static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -239,11 +238,17 @@ doExtractPageMap(void *arg) */ if (XLogRecPtrIsInvalid(found)) { - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X. %s", - private_data->thread_num, - (uint32) (extract_arg->startpoint >> 32), - (uint32) (extract_arg->startpoint), - (xlogreader->errormsg_buf[0] != '\0')?xlogreader->errormsg_buf:""); + if (xlogreader->errormsg_buf[0] != '\0') + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s", + private_data->thread_num, + (uint32) (extract_arg->startpoint >> 32), + (uint32) (extract_arg->startpoint), + xlogreader->errormsg_buf); + else + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + private_data->thread_num, + (uint32) (extract_arg->startpoint >> 32), + (uint32) (extract_arg->startpoint)); PrintXLogCorruptionMsg(private_data, ERROR); } extract_arg->startpoint = found; @@ -766,6 +771,104 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, return res; } +/* + * Get last valid LSN within the WAL segment with number 'segno'. If 'start_lsn' + * is in the segment with number 'segno' then start from 'start_lsn', otherwise + * start from offset 0 within the segment. + */ +XLogRecPtr +get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, + XLogSegNo segno, TimeLineID tli, uint32 seg_size) +{ + XLogReaderState *xlogreader; + XLogPageReadPrivate private; + XLogRecPtr startpoint; + XLogSegNo start_segno; + XLogRecPtr res = InvalidXLogRecPtr; + + if (segno == 0) + elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno); + + elog(LOG, "Retreiving last LSN of the segment with number " UINT64_FORMAT, + segno); + + xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true); + + /* + * Calculate startpoint. Decide: we should use 'start_lsn' or offset 0. + */ + GetXLogSegNo(start_lsn, start_segno, seg_size); + if (start_segno == segno) + startpoint = start_lsn; + else + { + XLogRecPtr found; + + GetXLogRecPtr(segno, 0, seg_size, startpoint); + found = XLogFindNextRecord(xlogreader, startpoint); + + if (XLogRecPtrIsInvalid(found)) + { + if (xlogreader->errormsg_buf[0] != '\0') + elog(WARNING, "Could not read WAL record at %X/%X: %s", + (uint32) (startpoint >> 32), (uint32) (startpoint), + xlogreader->errormsg_buf); + else + elog(WARNING, "Could not read WAL record at %X/%X", + (uint32) (startpoint >> 32), (uint32) (startpoint)); + PrintXLogCorruptionMsg(&private, ERROR); + } + startpoint = found; + } + + elog(VERBOSE, "Starting LSN is %X/%X", + (uint32) (startpoint >> 32), (uint32) (startpoint)); + + while (true) + { + XLogRecord *record; + char *errormsg; + XLogSegNo next_segno = 0; + + if (interrupted) + elog(ERROR, "Interrupted during WAL reading"); + + record = XLogReadRecord(xlogreader, startpoint, &errormsg); + if (record == NULL) + { + XLogRecPtr errptr; + + errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr : + startpoint; + + if (errormsg) + elog(WARNING, "Could not read WAL record at %X/%X: %s", + (uint32) (errptr >> 32), (uint32) (errptr), + errormsg); + else + elog(WARNING, "Could not read WAL record at %X/%X", + (uint32) (errptr >> 32), (uint32) (errptr)); + PrintXLogCorruptionMsg(&private, ERROR); + } + + res = xlogreader->ReadRecPtr; + + /* continue reading at next record */ + startpoint = InvalidXLogRecPtr; + + GetXLogSegNo(xlogreader->EndRecPtr, next_segno, seg_size); + if (next_segno > segno) + break; + } + + CleanupXLogPageRead(xlogreader); + XLogReaderFree(xlogreader); + + elog(VERBOSE, "Last LSN is %X/%X", (uint32) (res >> 32), (uint32) (res)); + + return res; +} + #ifdef HAVE_LIBZ /* * Show error during work with compressed file @@ -1035,6 +1138,12 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel) private_data->gz_xlogpath); #endif } + else + { + /* Cannot tell what happened specifically */ + elog(elevel, "Thread [%d]: An error occured during WAL reading", + private_data->thread_num); + } } /* diff --git a/src/pg_probackup.h b/src/pg_probackup.h index c838c923..a437396a 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -576,6 +576,9 @@ extern bool read_recovery_info(const char *archivedir, TimeLineID tli, TransactionId *recovery_xid); extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, TimeLineID target_tli, uint32 seg_size); +extern XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, + XLogSegNo segno, TimeLineID tli, + uint32 seg_size); /* in util.c */ extern TimeLineID get_current_timeline(bool safe); From b4672e3ac85f20ed164c740fedfc65f34dc6594b Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Wed, 21 Nov 2018 18:30:03 +0300 Subject: [PATCH 09/16] PGPRO-2180: In pg_stop_backup for replica wait for LSN of prior record --- src/backup.c | 67 +++++++++++++++++++++++++++++++--------------- src/parsexlog.c | 36 ++++++++++++++++--------- src/pg_probackup.h | 4 +-- tests/replica.py | 2 +- 4 files changed, 72 insertions(+), 37 deletions(-) diff --git a/src/backup.c b/src/backup.c index 2e36f3f7..a0fa0023 100644 --- a/src/backup.c +++ b/src/backup.c @@ -109,7 +109,7 @@ static int checkpoint_timeout(void); //static void backup_list_file(parray *files, const char *root, ) static void parse_backup_filelist_filenames(parray *files, const char *root); static void wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, - bool wait_prev_segment); + bool wait_prev_lsn, bool wait_prev_segment); static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void make_pagemap_from_ptrack(parray *files); static void *StreamLog(void *arg); @@ -1166,7 +1166,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup) if (current.backup_mode == BACKUP_MODE_DIFF_PAGE) /* In PAGE mode wait for current segment... */ - wait_wal_lsn(backup->start_lsn, true, false); + wait_wal_lsn(backup->start_lsn, true, false, false); /* * Do not wait start_lsn for stream backup. * Because WAL streaming will start after pg_start_backup() in stream @@ -1174,7 +1174,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup) */ else if (!stream_wal) /* ...for others wait for previous segment */ - wait_wal_lsn(backup->start_lsn, true, true); + wait_wal_lsn(backup->start_lsn, true, false, true); /* In case of backup from replica for PostgreSQL 9.5 * wait for start_lsn to be replayed by replica @@ -1504,7 +1504,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_filenode, * If 'wait_prev_segment' wait for previous segment. */ static void -wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) +wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, + bool wait_prev_segment) { TimeLineID tli; XLogSegNo targetSegNo; @@ -1515,6 +1516,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) bool file_exists = false; uint32 try_count = 0, timeout; + char *prior_to = (wait_prev_lsn) ? " prior to " : ""; #ifdef HAVE_LIBZ char gz_wal_segment_path[MAXPGPATH]; @@ -1555,14 +1557,13 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) timeout = archive_timeout; else timeout = ARCHIVE_TIMEOUT_DEFAULT; - } if (wait_prev_segment) elog(LOG, "Looking for segment: %s", wal_segment); else - elog(LOG, "Looking for LSN: %X/%X in segment: %s", - (uint32) (lsn >> 32), (uint32) lsn, wal_segment); + elog(LOG, "Looking for LSN %s%X/%X in segment: %s", + prior_to, (uint32) (lsn >> 32), (uint32) lsn, wal_segment); #ifdef HAVE_LIBZ snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz", @@ -1598,11 +1599,27 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) /* * A WAL segment found. Check LSN on it. */ - if (wal_contains_lsn(wal_segment_dir, lsn, tli, xlog_seg_size)) - /* Target LSN was found */ + if (!wait_prev_lsn) { - elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); - return; + if (wal_contains_lsn(wal_segment_dir, lsn, tli, xlog_seg_size)) + /* Target LSN was found */ + { + elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); + return; + } + } + else + { + XLogRecPtr res; + + res = get_last_wal_lsn(wal_segment_dir, current.start_lsn, + lsn, tli, false, xlog_seg_size); + if (!XLogRecPtrIsInvalid(res)) + { + /* LSN of the prior record was found */ + elog(LOG, "Found LSN: %X/%X", (uint32) (res >> 32), (uint32) res); + return; + } } } @@ -1618,16 +1635,18 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) elog(INFO, "Wait for WAL segment %s to be archived", wal_segment_path); else - elog(INFO, "Wait for LSN %X/%X in archived WAL segment %s", - (uint32) (lsn >> 32), (uint32) lsn, wal_segment_path); + elog(INFO, "Wait for LSN %s%X/%X in archived WAL segment %s", + prior_to, (uint32) (lsn >> 32), (uint32) lsn, + wal_segment_path); } if (timeout > 0 && try_count > timeout) { if (file_exists) elog(ERROR, "WAL segment %s was archived, " - "but target LSN %X/%X could not be archived in %d seconds", - wal_segment, (uint32) (lsn >> 32), (uint32) lsn, timeout); + "but target LSN %s%X/%X could not be archived in %d seconds", + wal_segment, prior_to, (uint32) (lsn >> 32), (uint32) lsn, + timeout); /* If WAL segment doesn't exist or we wait for previous segment */ else elog(ERROR, @@ -1724,6 +1743,7 @@ pg_stop_backup(pgBackup *backup) size_t len; char *val = NULL; char *stop_backup_query = NULL; + bool stop_lsn_exists = false; /* * We will use this values if there are no transactions between start_lsn @@ -1910,7 +1930,6 @@ pg_stop_backup(pgBackup *backup) { char *xlog_path, stream_xlog_path[MAXPGPATH]; - XLogSegNo segno; if (stream_wal) { @@ -1922,12 +1941,14 @@ pg_stop_backup(pgBackup *backup) else xlog_path = arclog_path; - GetXLogSegNo(stop_backup_lsn, segno, xlog_seg_size); - /* Retreive stop_lsn from previous segment */ - segno = segno - 1; stop_backup_lsn = get_last_wal_lsn(xlog_path, backup->start_lsn, - segno, backup->tli, - xlog_seg_size); + stop_backup_lsn, backup->tli, + true, xlog_seg_size); + /* + * Do not check existance of LSN again below using + * wait_wal_lsn(). + */ + stop_lsn_exists = true; } else elog(ERROR, "Invalid stop_backup_lsn value %X/%X", @@ -2040,7 +2061,9 @@ pg_stop_backup(pgBackup *backup) * Wait for stop_lsn to be archived or streamed. * We wait for stop_lsn in stream mode just in case. */ - wait_wal_lsn(stop_backup_lsn, false, false); + if (!stop_lsn_exists) + wait_wal_lsn(stop_backup_lsn, false, + !exclusive_backup && current.from_replica, false); if (stream_wal) { diff --git a/src/parsexlog.c b/src/parsexlog.c index 65ae97eb..1fc9ac05 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -772,25 +772,33 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, } /* - * Get last valid LSN within the WAL segment with number 'segno'. If 'start_lsn' + * Get LSN of last or prior record within the WAL segment with number 'segno'. + * If 'start_lsn' * is in the segment with number 'segno' then start from 'start_lsn', otherwise * start from offset 0 within the segment. + * + * Returns LSN which points to end+1 of the last WAL record if seek_prev_segment + * is true. Otherwise returns LSN of the record prior to stop_lsn. */ XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, - XLogSegNo segno, TimeLineID tli, uint32 seg_size) + XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment, + uint32 seg_size) { XLogReaderState *xlogreader; XLogPageReadPrivate private; XLogRecPtr startpoint; XLogSegNo start_segno; + XLogSegNo segno; XLogRecPtr res = InvalidXLogRecPtr; - if (segno == 0) + GetXLogSegNo(stop_lsn, segno, seg_size); + + if (segno <= 1) elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno); - elog(LOG, "Retreiving last LSN of the segment with number " UINT64_FORMAT, - segno); + if (seek_prev_segment) + segno = segno - 1; xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true); @@ -821,9 +829,6 @@ get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, startpoint = found; } - elog(VERBOSE, "Starting LSN is %X/%X", - (uint32) (startpoint >> 32), (uint32) (startpoint)); - while (true) { XLogRecord *record; @@ -851,21 +856,28 @@ get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, PrintXLogCorruptionMsg(&private, ERROR); } - res = xlogreader->ReadRecPtr; - /* continue reading at next record */ startpoint = InvalidXLogRecPtr; GetXLogSegNo(xlogreader->EndRecPtr, next_segno, seg_size); if (next_segno > segno) break; + + if (seek_prev_segment) + { + /* end+1 of last record read */ + res = xlogreader->EndRecPtr; + } + else + res = xlogreader->ReadRecPtr; + + if (xlogreader->EndRecPtr >= stop_lsn) + break; } CleanupXLogPageRead(xlogreader); XLogReaderFree(xlogreader); - elog(VERBOSE, "Last LSN is %X/%X", (uint32) (res >> 32), (uint32) (res)); - return res; } diff --git a/src/pg_probackup.h b/src/pg_probackup.h index a437396a..6acdbe4e 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -577,8 +577,8 @@ extern bool read_recovery_info(const char *archivedir, TimeLineID tli, extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, TimeLineID target_tli, uint32 seg_size); extern XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, - XLogSegNo segno, TimeLineID tli, - uint32 seg_size); + XLogRecPtr stop_lsn, TimeLineID tli, + bool seek_prev_segment, uint32 seg_size); /* in util.c */ extern TimeLineID get_current_timeline(bool safe); diff --git a/tests/replica.py b/tests/replica.py index f1591d81..ce976397 100644 --- a/tests/replica.py +++ b/tests/replica.py @@ -551,4 +551,4 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): exit(1) # Clean after yourself - self.del_test_dir(module_name, fname) \ No newline at end of file + self.del_test_dir(module_name, fname) From aab0ce3615b0b97f7622bb876ecce16e795bebe8 Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Thu, 22 Nov 2018 14:44:57 +0300 Subject: [PATCH 10/16] PGPRO-2180: Fix stop_streaming(), read recovery info using valid stop LSN --- src/backup.c | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/backup.c b/src/backup.c index a0fa0023..04478d0a 100644 --- a/src/backup.c +++ b/src/backup.c @@ -108,8 +108,8 @@ static int checkpoint_timeout(void); //static void backup_list_file(parray *files, const char *root, ) static void parse_backup_filelist_filenames(parray *files, const char *root); -static void wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, - bool wait_prev_lsn, bool wait_prev_segment); +static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, + bool wait_prev_lsn, bool wait_prev_segment); static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void make_pagemap_from_ptrack(parray *files); static void *StreamLog(void *arg); @@ -1502,8 +1502,11 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_filenode, * be archived in archive 'wal' directory regardless stream mode. * * If 'wait_prev_segment' wait for previous segment. + * + * Returns LSN of last valid record if wait_prev_segment is not true, otherwise + * returns InvalidXLogRecPtr. */ -static void +static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, bool wait_prev_segment) { @@ -1594,7 +1597,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, { /* Do not check LSN for previous WAL segment */ if (wait_prev_segment) - return; + return InvalidXLogRecPtr; /* * A WAL segment found. Check LSN on it. @@ -1605,7 +1608,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, /* Target LSN was found */ { elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); - return; + return lsn; } } else @@ -1618,7 +1621,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, { /* LSN of the prior record was found */ elog(LOG, "Found LSN: %X/%X", (uint32) (res >> 32), (uint32) res); - return; + return res; } } } @@ -2053,17 +2056,20 @@ pg_stop_backup(pgBackup *backup) { char *xlog_path, stream_xlog_path[MAXPGPATH]; + XLogRecPtr stop_valid_lsn = InvalidXLogRecPtr; /* Wait for stop_lsn to be received by replica */ - if (current.from_replica) - wait_replica_wal_lsn(stop_backup_lsn, false); + /* XXX Do we need this? */ +// if (current.from_replica) +// wait_replica_wal_lsn(stop_backup_lsn, false); /* * Wait for stop_lsn to be archived or streamed. * We wait for stop_lsn in stream mode just in case. */ if (!stop_lsn_exists) - wait_wal_lsn(stop_backup_lsn, false, - !exclusive_backup && current.from_replica, false); + stop_valid_lsn = wait_wal_lsn(stop_backup_lsn, false, + !exclusive_backup && current.from_replica, + false); if (stream_wal) { @@ -2082,7 +2088,7 @@ pg_stop_backup(pgBackup *backup) /* iterate over WAL from stop_backup lsn to start_backup lsn */ if (!read_recovery_info(xlog_path, backup->tli, xlog_seg_size, - backup->start_lsn, backup->stop_lsn, + backup->start_lsn, stop_valid_lsn, &backup->recovery_time, &backup->recovery_xid)) { elog(LOG, "Failed to find Recovery Time in WAL. Forced to trust current_timestamp"); @@ -2649,7 +2655,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) if (!XLogRecPtrIsInvalid(stop_backup_lsn)) { - if (xlogpos > stop_backup_lsn) + if (xlogpos >= stop_backup_lsn) { stop_stream_lsn = xlogpos; return true; From 06b1dbe6f1dcfcbaf663256c28ff81d744c20655 Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Fri, 23 Nov 2018 12:03:29 +0300 Subject: [PATCH 11/16] PGPRO-2180: Rewrite stop_lsn during pg_stop_backup --- src/backup.c | 53 +++++++++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/src/backup.c b/src/backup.c index 04478d0a..23139841 100644 --- a/src/backup.c +++ b/src/backup.c @@ -109,7 +109,7 @@ static int checkpoint_timeout(void); //static void backup_list_file(parray *files, const char *root, ) static void parse_backup_filelist_filenames(parray *files, const char *root); static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, - bool wait_prev_lsn, bool wait_prev_segment); + bool wait_prev_segment); static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void make_pagemap_from_ptrack(parray *files); static void *StreamLog(void *arg); @@ -1166,7 +1166,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup) if (current.backup_mode == BACKUP_MODE_DIFF_PAGE) /* In PAGE mode wait for current segment... */ - wait_wal_lsn(backup->start_lsn, true, false, false); + wait_wal_lsn(backup->start_lsn, true, false); /* * Do not wait start_lsn for stream backup. * Because WAL streaming will start after pg_start_backup() in stream @@ -1174,7 +1174,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup) */ else if (!stream_wal) /* ...for others wait for previous segment */ - wait_wal_lsn(backup->start_lsn, true, false, true); + wait_wal_lsn(backup->start_lsn, true, true); /* In case of backup from replica for PostgreSQL 9.5 * wait for start_lsn to be replayed by replica @@ -1507,8 +1507,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_filenode, * returns InvalidXLogRecPtr. */ static XLogRecPtr -wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, - bool wait_prev_segment) +wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) { TimeLineID tli; XLogSegNo targetSegNo; @@ -1519,7 +1518,6 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, bool file_exists = false; uint32 try_count = 0, timeout; - char *prior_to = (wait_prev_lsn) ? " prior to " : ""; #ifdef HAVE_LIBZ char gz_wal_segment_path[MAXPGPATH]; @@ -1565,8 +1563,8 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, if (wait_prev_segment) elog(LOG, "Looking for segment: %s", wal_segment); else - elog(LOG, "Looking for LSN %s%X/%X in segment: %s", - prior_to, (uint32) (lsn >> 32), (uint32) lsn, wal_segment); + elog(LOG, "Looking for LSN %X/%X in segment: %s", + (uint32) (lsn >> 32), (uint32) lsn, wal_segment); #ifdef HAVE_LIBZ snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz", @@ -1602,16 +1600,19 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, /* * A WAL segment found. Check LSN on it. */ - if (!wait_prev_lsn) + if (wal_contains_lsn(wal_segment_dir, lsn, tli, xlog_seg_size)) + /* Target LSN was found */ { - if (wal_contains_lsn(wal_segment_dir, lsn, tli, xlog_seg_size)) - /* Target LSN was found */ - { - elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); - return lsn; - } + elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); + return lsn; } - else + + /* + * If we failed to get LSN of valid record in a reasonable time, try + * to get LSN of last valid record prior to the target LSN. But only + * in case of a backup from a replica. + */ + if (!exclusive_backup && current.from_replica) { XLogRecPtr res; @@ -1620,7 +1621,8 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, if (!XLogRecPtrIsInvalid(res)) { /* LSN of the prior record was found */ - elog(LOG, "Found LSN: %X/%X", (uint32) (res >> 32), (uint32) res); + elog(LOG, "Found prior LSN: %X/%X, it is used as stop LSN", + (uint32) (res >> 32), (uint32) res); return res; } } @@ -1638,18 +1640,16 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_lsn, elog(INFO, "Wait for WAL segment %s to be archived", wal_segment_path); else - elog(INFO, "Wait for LSN %s%X/%X in archived WAL segment %s", - prior_to, (uint32) (lsn >> 32), (uint32) lsn, - wal_segment_path); + elog(INFO, "Wait for LSN %X/%X in archived WAL segment %s", + (uint32) (lsn >> 32), (uint32) lsn, wal_segment_path); } if (timeout > 0 && try_count > timeout) { if (file_exists) elog(ERROR, "WAL segment %s was archived, " - "but target LSN %s%X/%X could not be archived in %d seconds", - wal_segment, prior_to, (uint32) (lsn >> 32), (uint32) lsn, - timeout); + "but target LSN %X/%X could not be archived in %d seconds", + wal_segment, (uint32) (lsn >> 32), (uint32) lsn, timeout); /* If WAL segment doesn't exist or we wait for previous segment */ else elog(ERROR, @@ -2056,7 +2056,6 @@ pg_stop_backup(pgBackup *backup) { char *xlog_path, stream_xlog_path[MAXPGPATH]; - XLogRecPtr stop_valid_lsn = InvalidXLogRecPtr; /* Wait for stop_lsn to be received by replica */ /* XXX Do we need this? */ @@ -2067,9 +2066,7 @@ pg_stop_backup(pgBackup *backup) * We wait for stop_lsn in stream mode just in case. */ if (!stop_lsn_exists) - stop_valid_lsn = wait_wal_lsn(stop_backup_lsn, false, - !exclusive_backup && current.from_replica, - false); + stop_backup_lsn = wait_wal_lsn(stop_backup_lsn, false, false); if (stream_wal) { @@ -2088,7 +2085,7 @@ pg_stop_backup(pgBackup *backup) /* iterate over WAL from stop_backup lsn to start_backup lsn */ if (!read_recovery_info(xlog_path, backup->tli, xlog_seg_size, - backup->start_lsn, stop_valid_lsn, + backup->start_lsn, stop_backup_lsn, &backup->recovery_time, &backup->recovery_xid)) { elog(LOG, "Failed to find Recovery Time in WAL. Forced to trust current_timestamp"); From bdf2539887ce6dba0bd20df60108e1639d695641 Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Fri, 23 Nov 2018 12:41:42 +0300 Subject: [PATCH 12/16] Improve log message within SimpleXLogPageRead --- src/parsexlog.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/parsexlog.c b/src/parsexlog.c index 1fc9ac05..bb0eb939 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -922,14 +922,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (!IsInXLogSeg(targetPagePtr, private_data->xlogsegno, private_data->xlog_seg_size)) { - elog(VERBOSE, "Thread [%d]: Need to switch to segno next to %X/%X, current LSN %X/%X", + elog(VERBOSE, "Thread [%d]: Need to switch to the next WAL segment, page LSN %X/%X, record being read LSN %X/%X", private_data->thread_num, (uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr), (uint32) (xlogreader->currRecPtr >> 32), (uint32) (xlogreader->currRecPtr )); /* - * if the last record on the page is not complete, + * If the last record on the page is not complete, * we must continue reading pages in the same thread */ if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) && From 7532cb36d1e8791611446ea1d8746dfaed5c9816 Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Fri, 23 Nov 2018 13:20:16 +0300 Subject: [PATCH 13/16] PGPRO-2180: Reasonable time --- src/backup.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backup.c b/src/backup.c index 23139841..0e76dde7 100644 --- a/src/backup.c +++ b/src/backup.c @@ -1612,7 +1612,8 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) * to get LSN of last valid record prior to the target LSN. But only * in case of a backup from a replica. */ - if (!exclusive_backup && current.from_replica) + if (!exclusive_backup && current.from_replica && + (try_count > timeout / 4)) { XLogRecPtr res; From bb3c0645aaa421dd44e46092923dcb7deca27709 Mon Sep 17 00:00:00 2001 From: Arthur Zakirov Date: Fri, 23 Nov 2018 15:08:04 +0300 Subject: [PATCH 14/16] PGPRO-2180: Use archive_timeout only, do not use checkpoint_timeout --- src/backup.c | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/backup.c b/src/backup.c index 0e76dde7..c9fcea3f 100644 --- a/src/backup.c +++ b/src/backup.c @@ -1545,21 +1545,18 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) DATABASE_DIR, PG_XLOG_DIR); join_path_components(wal_segment_path, pg_wal_dir, wal_segment); wal_segment_dir = pg_wal_dir; - - timeout = (uint32) checkpoint_timeout(); - timeout = timeout + timeout * 0.1; } else { join_path_components(wal_segment_path, arclog_path, wal_segment); wal_segment_dir = arclog_path; - - if (archive_timeout > 0) - timeout = archive_timeout; - else - timeout = ARCHIVE_TIMEOUT_DEFAULT; } + if (archive_timeout > 0) + timeout = archive_timeout; + else + timeout = ARCHIVE_TIMEOUT_DEFAULT; + if (wait_prev_segment) elog(LOG, "Looking for segment: %s", wal_segment); else From 7e2fccd0412e81b5949ae02a6360f9e992bbf284 Mon Sep 17 00:00:00 2001 From: Grigory Smolkin Date: Fri, 23 Nov 2018 18:16:49 +0300 Subject: [PATCH 15/16] bugfix: use CRC32 for crc comparison during archive-push if target WAL segment is compressed --- src/data.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/data.c b/src/data.c index 924e0fb7..9e54b2cc 100644 --- a/src/data.c +++ b/src/data.c @@ -1686,7 +1686,7 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) char buf [1024]; gzFile gz_in = NULL; - INIT_CRC32C(crc2); + INIT_FILE_CRC32(false, crc2); gz_in = gzopen(path2, PG_BINARY_R); if (gz_in == NULL) /* File cannot be read */ @@ -1704,11 +1704,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) "Cannot compare WAL file \"%s\" with compressed \"%s\"", path1, path2); - COMP_CRC32C(crc2, buf, read_len); + COMP_FILE_CRC32(false, crc2, buf, read_len); if (gzeof(gz_in) || read_len == 0) break; } - FIN_CRC32C(crc2); + FIN_FILE_CRC32(false, crc2); if (gzclose(gz_in) != 0) elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", From 57ca17ad6c8ad15b23ea940f145efbd1a7f37d36 Mon Sep 17 00:00:00 2001 From: Grigory Smolkin Date: Fri, 23 Nov 2018 18:40:21 +0300 Subject: [PATCH 16/16] bugfix: use CRC32C for crc comparison during archive-push if target WAL segment is compressed --- src/data.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/data.c b/src/data.c index 9e54b2cc..24c38110 100644 --- a/src/data.c +++ b/src/data.c @@ -1686,7 +1686,7 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) char buf [1024]; gzFile gz_in = NULL; - INIT_FILE_CRC32(false, crc2); + INIT_FILE_CRC32(true, crc2); gz_in = gzopen(path2, PG_BINARY_R); if (gz_in == NULL) /* File cannot be read */ @@ -1704,11 +1704,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) "Cannot compare WAL file \"%s\" with compressed \"%s\"", path1, path2); - COMP_FILE_CRC32(false, crc2, buf, read_len); + COMP_FILE_CRC32(true, crc2, buf, read_len); if (gzeof(gz_in) || read_len == 0) break; } - FIN_FILE_CRC32(false, crc2); + FIN_FILE_CRC32(true, crc2); if (gzclose(gz_in) != 0) elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", @@ -1717,11 +1717,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) else #endif { - crc2 = pgFileGetCRC(path2, false, true, NULL); + crc2 = pgFileGetCRC(path2, true, true, NULL); } /* Get checksum of original file */ - crc1 = pgFileGetCRC(path1, false, true, NULL); + crc1 = pgFileGetCRC(path1, true, true, NULL); return EQ_CRC32C(crc1, crc2); }