From 4befefada2eeda3eade56f532f28e62e78a638c9 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Wed, 17 Oct 2018 14:44:21 +0300 Subject: [PATCH 1/5] Skip copy WAL file to archive if already exists --- src/data.c | 90 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 11 deletions(-) diff --git a/src/data.c b/src/data.c index b1acfecc..9d5267a8 100644 --- a/src/data.c +++ b/src/data.c @@ -29,6 +29,9 @@ typedef union DataPage char data[BLCKSZ]; } DataPage; +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed); + #ifdef HAVE_LIBZ /* Implementation of zlib compression method */ static int32 @@ -1092,14 +1095,21 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, FILE *in = NULL; FILE *out=NULL; char buf[XLOG_BLCKSZ]; - const char *to_path_p = to_path; + const char *to_path_p; char to_path_temp[MAXPGPATH]; int errno_temp; #ifdef HAVE_LIBZ char gz_to_path[MAXPGPATH]; gzFile gz_out = NULL; + if (is_compress) + { + snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); + to_path_p = gz_to_path; + } + else #endif + to_path_p = to_path; /* open file for read */ in = fopen(from_path, PG_BINARY_R); @@ -1107,30 +1117,30 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, strerror(errno)); + /* Check if possible to skip copying */ + if (fileExists(to_path_p)) + { + if (fileEqualCRC(from_path, to_path_p, is_compress)) + return; + /* Do not copy and do not rise error. Just quit as normal. */ + else if (!overwrite) + elog(ERROR, "WAL segment \"%s\" already exists.", to_path); + } + /* open backup file for write */ #ifdef HAVE_LIBZ if (is_compress) { - snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); - - if (!overwrite && fileExists(gz_to_path)) - elog(ERROR, "WAL segment \"%s\" already exists.", gz_to_path); - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); gz_out = gzopen(to_path_temp, PG_BINARY_W); if (gzsetparams(gz_out, compress_level, Z_DEFAULT_STRATEGY) != Z_OK) elog(ERROR, "Cannot set compression level %d to file \"%s\": %s", compress_level, to_path_temp, get_gz_error(gz_out, errno)); - - to_path_p = gz_to_path; } else #endif { - if (!overwrite && fileExists(to_path)) - elog(ERROR, "WAL segment \"%s\" already exists.", to_path); - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); out = fopen(to_path_temp, PG_BINARY_W); @@ -1724,3 +1734,61 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, return is_valid; } + +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) +{ + pg_crc32 crc_from; + pg_crc32 crc_to; + + /* Get checksum of backup file */ +#ifdef HAVE_LIBZ + if (path2_is_compressed) + { + char buf [1024]; + gzFile gz_in = NULL; + + INIT_CRC32C(crc_to); + gz_in = gzopen(path2, PG_BINARY_R); + if (gz_in == NULL) + { + /* There is no such file or it cannot be read */ + elog(LOG, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + return false; + } + + for (;;) + { + size_t read_len = 0; + read_len = gzread(gz_in, buf, sizeof(buf)); + if (read_len != sizeof(buf) && !gzeof(gz_in)) + { + /* An error occurred while reading the file */ + elog(LOG, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + return false; + } + COMP_CRC32C(crc_to, buf, read_len); + if (gzeof(gz_in) || read_len == 0) + break; + } + FIN_CRC32C(crc_to); + + if (gzclose(gz_in) != 0) + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + path2, get_gz_error(gz_in, errno)); + } + else +#endif + { + crc_to = pgFileGetCRC(path2); + } + + /* Get checksum of original file */ + crc_from = pgFileGetCRC(path1); + + return EQ_CRC32C(crc_from, crc_to); +} From 6deb3bbe2bda3f03c89cc64867a3f647d8bacdf5 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Wed, 17 Oct 2018 15:40:08 +0300 Subject: [PATCH 2/5] fileEqualCRC() refactoring --- src/data.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/data.c b/src/data.c index 9d5267a8..b9b25b11 100644 --- a/src/data.c +++ b/src/data.c @@ -1738,8 +1738,8 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, static bool fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) { - pg_crc32 crc_from; - pg_crc32 crc_to; + pg_crc32 crc1; + pg_crc32 crc2; /* Get checksum of backup file */ #ifdef HAVE_LIBZ @@ -1748,7 +1748,7 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) char buf [1024]; gzFile gz_in = NULL; - INIT_CRC32C(crc_to); + INIT_CRC32C(crc2); gz_in = gzopen(path2, PG_BINARY_R); if (gz_in == NULL) { @@ -1771,11 +1771,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) path1, path2); return false; } - COMP_CRC32C(crc_to, buf, read_len); + COMP_CRC32C(crc2, buf, read_len); if (gzeof(gz_in) || read_len == 0) break; } - FIN_CRC32C(crc_to); + FIN_CRC32C(crc2); if (gzclose(gz_in) != 0) elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", @@ -1784,11 +1784,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) else #endif { - crc_to = pgFileGetCRC(path2); + crc2 = pgFileGetCRC(path2); } /* Get checksum of original file */ - crc_from = pgFileGetCRC(path1); + crc1 = pgFileGetCRC(path1); - return EQ_CRC32C(crc_from, crc_to); + return EQ_CRC32C(crc1, crc2); } From 20d4dcc291c817e0fd535fbf1b84b641335a4b51 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Thu, 18 Oct 2018 12:06:54 +0300 Subject: [PATCH 3/5] Modifyed test test_arhive_push_file_exists. Fixed fileEqualCRC() error handling --- src/data.c | 15 +++++---------- tests/archive.py | 12 +++++++++++- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/data.c b/src/data.c index b9b25b11..1ed5bfee 100644 --- a/src/data.c +++ b/src/data.c @@ -1751,32 +1751,27 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) INIT_CRC32C(crc2); gz_in = gzopen(path2, PG_BINARY_R); if (gz_in == NULL) - { - /* There is no such file or it cannot be read */ - elog(LOG, + /* File cannot be read */ + elog(ERROR, "Cannot compare WAL file \"%s\" with compressed \"%s\"", path1, path2); - return false; - } for (;;) { size_t read_len = 0; read_len = gzread(gz_in, buf, sizeof(buf)); if (read_len != sizeof(buf) && !gzeof(gz_in)) - { /* An error occurred while reading the file */ - elog(LOG, + elog(ERROR, "Cannot compare WAL file \"%s\" with compressed \"%s\"", path1, path2); - return false; - } + COMP_CRC32C(crc2, buf, read_len); if (gzeof(gz_in) || read_len == 0) break; } FIN_CRC32C(crc2); - + if (gzclose(gz_in) != 0) elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", path2, get_gz_error(gz_in, errno)); diff --git a/tests/archive.py b/tests/archive.py index 4ed783d6..45b235cd 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1,4 +1,6 @@ import os +import shutil +import zlib import unittest from .helpers.ptrack_helpers import ProbackupTest, ProbackupException, archive_script from datetime import datetime, timedelta @@ -325,7 +327,15 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): ) self.assertFalse('pg_probackup archive-push completed successfully' in log_content) - os.remove(file) + wal_src = os.path.join(node.data_dir, 'pg_wal', '000000010000000000000001') + if self.archive_compress: + with open(wal_src, 'rb') as f_in, open(file, 'wb') as f_out: + original_wal = f_in.read() + compressed_wal = zlib.compress(original_wal, 1) + f_out.write(compressed_wal) + else: + shutil.copyfile(wal_src, file) + self.switch_wal_segment(node) sleep(5) From 6d0cbfa2328b2903e2aaa9d216b221524b4bdccb Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Thu, 18 Oct 2018 15:05:51 +0300 Subject: [PATCH 4/5] Fix test test_arhive_push_file_exists --- src/data.c | 2 +- tests/archive.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/data.c b/src/data.c index 1ed5bfee..4c75e1c9 100644 --- a/src/data.c +++ b/src/data.c @@ -1124,7 +1124,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, return; /* Do not copy and do not rise error. Just quit as normal. */ else if (!overwrite) - elog(ERROR, "WAL segment \"%s\" already exists.", to_path); + elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p); } /* open backup file for write */ diff --git a/tests/archive.py b/tests/archive.py index 45b235cd..a311ac44 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1,6 +1,6 @@ import os import shutil -import zlib +import gzip import unittest from .helpers.ptrack_helpers import ProbackupTest, ProbackupException, archive_script from datetime import datetime, timedelta @@ -329,13 +329,11 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): wal_src = os.path.join(node.data_dir, 'pg_wal', '000000010000000000000001') if self.archive_compress: - with open(wal_src, 'rb') as f_in, open(file, 'wb') as f_out: - original_wal = f_in.read() - compressed_wal = zlib.compress(original_wal, 1) - f_out.write(compressed_wal) + with open(wal_src, 'rb') as f_in, gzip.open(file, 'wb', compresslevel=1) as f_out: + shutil.copyfileobj(f_in, f_out) else: shutil.copyfile(wal_src, file) - + self.switch_wal_segment(node) sleep(5) From f53395529b95ed923c87271001531b6834853c54 Mon Sep 17 00:00:00 2001 From: Sergey Cherkashin <4erkashin@list.ru> Date: Thu, 15 Nov 2018 18:22:43 +0300 Subject: [PATCH 5/5] Refactored calc_file_checksum() --- src/data.c | 72 ++++------------------------------------------ src/dir.c | 37 ++++++++++++++++++------ src/merge.c | 2 +- src/pg_probackup.h | 5 ++-- src/util.c | 2 +- src/validate.c | 3 +- 6 files changed, 40 insertions(+), 81 deletions(-) diff --git a/src/data.c b/src/data.c index 4c75e1c9..924e0fb7 100644 --- a/src/data.c +++ b/src/data.c @@ -1418,75 +1418,13 @@ get_wal_file(const char *from_path, const char *to_path) * but created in process of backup, such as stream XLOG files, * PG_TABLESPACE_MAP_FILE and PG_BACKUP_LABEL_FILE. */ -bool +void calc_file_checksum(pgFile *file) { - FILE *in; - size_t read_len = 0; - int errno_tmp; - char buf[BLCKSZ]; - struct stat st; - pg_crc32 crc; - Assert(S_ISREG(file->mode)); - INIT_TRADITIONAL_CRC32(crc); - /* reset size summary */ - file->read_size = 0; - file->write_size = 0; - - /* open backup mode file for read */ - in = fopen(file->path, PG_BINARY_R); - if (in == NULL) - { - FIN_TRADITIONAL_CRC32(crc); - file->crc = crc; - - /* maybe deleted, it's not error */ - if (errno == ENOENT) - return false; - - elog(ERROR, "cannot open source file \"%s\": %s", file->path, - strerror(errno)); - } - - /* stat source file to change mode of destination file */ - if (fstat(fileno(in), &st) == -1) - { - fclose(in); - elog(ERROR, "cannot stat \"%s\": %s", file->path, - strerror(errno)); - } - - for (;;) - { - read_len = fread(buf, 1, sizeof(buf), in); - - if(read_len == 0) - break; - - /* update CRC */ - COMP_TRADITIONAL_CRC32(crc, buf, read_len); - - file->write_size += read_len; - file->read_size += read_len; - } - - errno_tmp = errno; - if (!feof(in)) - { - fclose(in); - elog(ERROR, "cannot read backup mode file \"%s\": %s", - file->path, strerror(errno_tmp)); - } - - /* finish CRC calculation and store into pgFile */ - FIN_TRADITIONAL_CRC32(crc); - file->crc = crc; - - fclose(in); - - return true; + file->crc = pgFileGetCRC(file->path, false, false, &file->read_size); + file->write_size = file->read_size; } /* @@ -1779,11 +1717,11 @@ fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) else #endif { - crc2 = pgFileGetCRC(path2); + crc2 = pgFileGetCRC(path2, false, true, NULL); } /* Get checksum of original file */ - crc1 = pgFileGetCRC(path1); + crc1 = pgFileGetCRC(path1, false, true, NULL); return EQ_CRC32C(crc1, crc2); } diff --git a/src/dir.c b/src/dir.c index 5c6f60d8..11712c03 100644 --- a/src/dir.c +++ b/src/dir.c @@ -259,36 +259,55 @@ delete_file: } pg_crc32 -pgFileGetCRC(const char *file_path, bool use_crc32c) +pgFileGetCRC(const char *file_path, bool use_crc32c, bool raise_on_deleted, + size_t *bytes_read) { FILE *fp; pg_crc32 crc = 0; char buf[1024]; size_t len; + size_t total = 0; int errno_tmp; + INIT_FILE_CRC32(use_crc32c, crc); + /* open file in binary read mode */ fp = fopen(file_path, PG_BINARY_R); if (fp == NULL) - elog(ERROR, "cannot open file \"%s\": %s", - file_path, strerror(errno)); + { + if (!raise_on_deleted && errno == ENOENT) + { + FIN_FILE_CRC32(use_crc32c, crc); + return crc; + } + else + elog(ERROR, "cannot open file \"%s\": %s", + file_path, strerror(errno)); + } - /* calc CRC of backup file */ - INIT_FILE_CRC32(use_crc32c, crc); - while ((len = fread(buf, 1, sizeof(buf), fp)) == sizeof(buf)) + /* calc CRC of file */ + for (;;) { if (interrupted) elog(ERROR, "interrupted during CRC calculation"); + + len = fread(buf, 1, sizeof(buf), fp); + if(len == 0) + break; + /* update CRC */ COMP_FILE_CRC32(use_crc32c, crc, buf, len); + total += len; } + + if (bytes_read) + *bytes_read = total; + errno_tmp = errno; if (!feof(fp)) elog(WARNING, "cannot read \"%s\": %s", file_path, strerror(errno_tmp)); - if (len > 0) - COMP_FILE_CRC32(use_crc32c, crc, buf, len); - FIN_FILE_CRC32(use_crc32c, crc); + FIN_FILE_CRC32(use_crc32c, crc); fclose(fp); return crc; diff --git a/src/merge.c b/src/merge.c index 13263c64..95682561 100644 --- a/src/merge.c +++ b/src/merge.c @@ -524,7 +524,7 @@ merge_files(void *arg) * do that. */ file->write_size = pgFileSize(to_path_tmp); - file->crc = pgFileGetCRC(to_path_tmp, false); + file->crc = pgFileGetCRC(to_path_tmp, false, true, NULL); } } else diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 0fad6d70..c838c923 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -531,7 +531,8 @@ extern pgFile *pgFileNew(const char *path, bool omit_symlink); extern pgFile *pgFileInit(const char *path); extern void pgFileDelete(pgFile *file); extern void pgFileFree(void *file); -extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c); +extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c, + bool raise_on_deleted, size_t *bytes_read); extern int pgFileComparePath(const void *f1, const void *f2); extern int pgFileComparePathDesc(const void *f1, const void *f2); extern int pgFileCompareLinked(const void *f1, const void *f2); @@ -552,7 +553,7 @@ extern void push_wal_file(const char *from_path, const char *to_path, bool is_compress, bool overwrite); extern void get_wal_file(const char *from_path, const char *to_path); -extern bool calc_file_checksum(pgFile *file); +extern void calc_file_checksum(pgFile *file); extern bool check_file_pages(pgFile* file, XLogRecPtr stop_lsn, diff --git a/src/util.c b/src/util.c index f2820650..946d5e49 100644 --- a/src/util.c +++ b/src/util.c @@ -334,7 +334,7 @@ set_min_recovery_point(pgFile *file, const char *backup_path, XLogRecPtr stop_ba writeControlFile(&ControlFile, fullpath); /* Update pg_control checksum in backup_list */ - file->crc = pgFileGetCRC(fullpath, false); + file->crc = pgFileGetCRC(fullpath, false, true, NULL); pg_free(buffer); } diff --git a/src/validate.c b/src/validate.c index 55ea5ebe..0e6fcb9a 100644 --- a/src/validate.c +++ b/src/validate.c @@ -224,7 +224,8 @@ pgBackupValidateFiles(void *arg) * To avoid this problem we need to use different algorithm, CRC-32 in * this case. */ - crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021); + crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021, + true, NULL); if (crc != file->crc) { elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X",