diff --git a/src/backup.c b/src/backup.c index fc47d22d..a8e1e117 100644 --- a/src/backup.c +++ b/src/backup.c @@ -108,8 +108,8 @@ static int checkpoint_timeout(void); //static void backup_list_file(parray *files, const char *root, ) static void parse_backup_filelist_filenames(parray *files, const char *root); -static void wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, - bool wait_prev_segment); +static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, + bool wait_prev_segment); static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void make_pagemap_from_ptrack(parray *files); static void *StreamLog(void *arg); @@ -1184,7 +1184,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup) if (current.backup_mode == BACKUP_MODE_DIFF_PAGE) /* In PAGE mode wait for current segment... */ - wait_wal_lsn(backup->start_lsn, true, false); + wait_wal_lsn(backup->start_lsn, true, false); /* * Do not wait start_lsn for stream backup. * Because WAL streaming will start after pg_start_backup() in stream @@ -1524,8 +1524,11 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_filenode, * be archived in archive 'wal' directory regardless stream mode. * * If 'wait_prev_segment' wait for previous segment. + * + * Returns LSN of last valid record if wait_prev_segment is not true, otherwise + * returns InvalidXLogRecPtr. */ -static void +static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) { TimeLineID tli; @@ -1565,25 +1568,22 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) DATABASE_DIR, PG_XLOG_DIR); join_path_components(wal_segment_path, pg_wal_dir, wal_segment); wal_segment_dir = pg_wal_dir; - - timeout = (uint32) checkpoint_timeout(); - timeout = timeout + timeout * 0.1; } else { join_path_components(wal_segment_path, arclog_path, wal_segment); wal_segment_dir = arclog_path; - - if (instance_config.archive_timeout > 0) - timeout = instance_config.archive_timeout; - else - timeout = ARCHIVE_TIMEOUT_DEFAULT; } + if (instance_config.archive_timeout > 0) + timeout = instance_config.archive_timeout; + else + timeout = ARCHIVE_TIMEOUT_DEFAULT; + if (wait_prev_segment) elog(LOG, "Looking for segment: %s", wal_segment); else - elog(LOG, "Looking for LSN: %X/%X in segment: %s", + elog(LOG, "Looking for LSN %X/%X in segment: %s", (uint32) (lsn >> 32), (uint32) lsn, wal_segment); #ifdef HAVE_LIBZ @@ -1615,7 +1615,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) { /* Do not check LSN for previous WAL segment */ if (wait_prev_segment) - return; + return InvalidXLogRecPtr; /* * A WAL segment found. Check LSN on it. @@ -1625,7 +1625,29 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) /* Target LSN was found */ { elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); - return; + return lsn; + } + + /* + * If we failed to get LSN of valid record in a reasonable time, try + * to get LSN of last valid record prior to the target LSN. But only + * in case of a backup from a replica. + */ + if (!exclusive_backup && current.from_replica && + (try_count > timeout / 4)) + { + XLogRecPtr res; + + res = get_last_wal_lsn(wal_segment_dir, current.start_lsn, + lsn, tli, false, + instance_config.xlog_seg_size); + if (!XLogRecPtrIsInvalid(res)) + { + /* LSN of the prior record was found */ + elog(LOG, "Found prior LSN: %X/%X, it is used as stop LSN", + (uint32) (res >> 32), (uint32) res); + return res; + } } } @@ -1748,6 +1770,7 @@ pg_stop_backup(pgBackup *backup) size_t len; char *val = NULL; char *stop_backup_query = NULL; + bool stop_lsn_exists = false; /* * We will use this values if there are no transactions between start_lsn @@ -1826,7 +1849,11 @@ pg_stop_backup(pgBackup *backup) #endif " labelfile," " spcmapfile" +#if PG_VERSION_NUM >= 100000 + " FROM pg_catalog.pg_stop_backup(false, false)"; +#else " FROM pg_catalog.pg_stop_backup(false)"; +#endif else stop_backup_query = "SELECT" " pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot())," @@ -1834,7 +1861,11 @@ pg_stop_backup(pgBackup *backup) " lsn," " labelfile," " spcmapfile" +#if PG_VERSION_NUM >= 100000 + " FROM pg_catalog.pg_stop_backup(false, false)"; +#else " FROM pg_catalog.pg_stop_backup(false)"; +#endif } else @@ -1923,7 +1954,29 @@ pg_stop_backup(pgBackup *backup) if (!XRecOffIsValid(stop_backup_lsn)) { if (XRecOffIsNull(stop_backup_lsn)) - stop_backup_lsn = stop_backup_lsn + SizeOfXLogLongPHD; + { + char *xlog_path, + stream_xlog_path[MAXPGPATH]; + + if (stream_wal) + { + pgBackupGetPath2(backup, stream_xlog_path, + lengthof(stream_xlog_path), + DATABASE_DIR, PG_XLOG_DIR); + xlog_path = stream_xlog_path; + } + else + xlog_path = arclog_path; + + stop_backup_lsn = get_last_wal_lsn(xlog_path, backup->start_lsn, + stop_backup_lsn, backup->tli, + true, instance_config.xlog_seg_size); + /* + * Do not check existance of LSN again below using + * wait_wal_lsn(). + */ + stop_lsn_exists = true; + } else elog(ERROR, "Invalid stop_backup_lsn value %X/%X", (uint32) (stop_backup_lsn >> 32), (uint32) (stop_backup_lsn)); @@ -2029,13 +2082,15 @@ pg_stop_backup(pgBackup *backup) stream_xlog_path[MAXPGPATH]; /* Wait for stop_lsn to be received by replica */ - if (current.from_replica) - wait_replica_wal_lsn(stop_backup_lsn, false); + /* XXX Do we need this? */ +// if (current.from_replica) +// wait_replica_wal_lsn(stop_backup_lsn, false); /* * Wait for stop_lsn to be archived or streamed. * We wait for stop_lsn in stream mode just in case. */ - wait_wal_lsn(stop_backup_lsn, false, false); + if (!stop_lsn_exists) + stop_backup_lsn = wait_wal_lsn(stop_backup_lsn, false, false); if (stream_wal) { @@ -2623,7 +2678,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) if (!XLogRecPtrIsInvalid(stop_backup_lsn)) { - if (xlogpos > stop_backup_lsn) + if (xlogpos >= stop_backup_lsn) { stop_stream_lsn = xlogpos; return true; diff --git a/src/data.c b/src/data.c index c063bcd5..f44b0d86 100644 --- a/src/data.c +++ b/src/data.c @@ -29,6 +29,9 @@ typedef union DataPage char data[BLCKSZ]; } DataPage; +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed); + #ifdef HAVE_LIBZ /* Implementation of zlib compression method */ static int32 @@ -1092,14 +1095,21 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, FILE *in = NULL; FILE *out=NULL; char buf[XLOG_BLCKSZ]; - const char *to_path_p = to_path; + const char *to_path_p; char to_path_temp[MAXPGPATH]; int errno_temp; #ifdef HAVE_LIBZ char gz_to_path[MAXPGPATH]; gzFile gz_out = NULL; + if (is_compress) + { + snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); + to_path_p = gz_to_path; + } + else #endif + to_path_p = to_path; /* open file for read */ in = fopen(from_path, PG_BINARY_R); @@ -1107,15 +1117,20 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, strerror(errno)); + /* Check if possible to skip copying */ + if (fileExists(to_path_p)) + { + if (fileEqualCRC(from_path, to_path_p, is_compress)) + return; + /* Do not copy and do not rise error. Just quit as normal. */ + else if (!overwrite) + elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p); + } + /* open backup file for write */ #ifdef HAVE_LIBZ if (is_compress) { - snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); - - if (!overwrite && fileExists(gz_to_path)) - elog(ERROR, "WAL segment \"%s\" already exists.", gz_to_path); - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); gz_out = gzopen(to_path_temp, PG_BINARY_W); @@ -1123,15 +1138,10 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress, elog(ERROR, "Cannot set compression level %d to file \"%s\": %s", instance_config.compress_level, to_path_temp, get_gz_error(gz_out, errno)); - - to_path_p = gz_to_path; } else #endif { - if (!overwrite && fileExists(to_path)) - elog(ERROR, "WAL segment \"%s\" already exists.", to_path); - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); out = fopen(to_path_temp, PG_BINARY_W); @@ -1409,75 +1419,13 @@ get_wal_file(const char *from_path, const char *to_path) * but created in process of backup, such as stream XLOG files, * PG_TABLESPACE_MAP_FILE and PG_BACKUP_LABEL_FILE. */ -bool +void calc_file_checksum(pgFile *file) { - FILE *in; - size_t read_len = 0; - int errno_tmp; - char buf[BLCKSZ]; - struct stat st; - pg_crc32 crc; - Assert(S_ISREG(file->mode)); - INIT_TRADITIONAL_CRC32(crc); - /* reset size summary */ - file->read_size = 0; - file->write_size = 0; - - /* open backup mode file for read */ - in = fopen(file->path, PG_BINARY_R); - if (in == NULL) - { - FIN_TRADITIONAL_CRC32(crc); - file->crc = crc; - - /* maybe deleted, it's not error */ - if (errno == ENOENT) - return false; - - elog(ERROR, "cannot open source file \"%s\": %s", file->path, - strerror(errno)); - } - - /* stat source file to change mode of destination file */ - if (fstat(fileno(in), &st) == -1) - { - fclose(in); - elog(ERROR, "cannot stat \"%s\": %s", file->path, - strerror(errno)); - } - - for (;;) - { - read_len = fread(buf, 1, sizeof(buf), in); - - if(read_len == 0) - break; - - /* update CRC */ - COMP_TRADITIONAL_CRC32(crc, buf, read_len); - - file->write_size += read_len; - file->read_size += read_len; - } - - errno_tmp = errno; - if (!feof(in)) - { - fclose(in); - elog(ERROR, "cannot read backup mode file \"%s\": %s", - file->path, strerror(errno_tmp)); - } - - /* finish CRC calculation and store into pgFile */ - FIN_TRADITIONAL_CRC32(crc); - file->crc = crc; - - fclose(in); - - return true; + file->crc = pgFileGetCRC(file->path, false, false, &file->read_size); + file->write_size = file->read_size; } /* @@ -1725,3 +1673,56 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, return is_valid; } + +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) +{ + pg_crc32 crc1; + pg_crc32 crc2; + + /* Get checksum of backup file */ +#ifdef HAVE_LIBZ + if (path2_is_compressed) + { + char buf [1024]; + gzFile gz_in = NULL; + + INIT_FILE_CRC32(true, crc2); + gz_in = gzopen(path2, PG_BINARY_R); + if (gz_in == NULL) + /* File cannot be read */ + elog(ERROR, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + + for (;;) + { + size_t read_len = 0; + read_len = gzread(gz_in, buf, sizeof(buf)); + if (read_len != sizeof(buf) && !gzeof(gz_in)) + /* An error occurred while reading the file */ + elog(ERROR, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + + COMP_FILE_CRC32(true, crc2, buf, read_len); + if (gzeof(gz_in) || read_len == 0) + break; + } + FIN_FILE_CRC32(true, crc2); + + if (gzclose(gz_in) != 0) + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + path2, get_gz_error(gz_in, errno)); + } + else +#endif + { + crc2 = pgFileGetCRC(path2, true, true, NULL); + } + + /* Get checksum of original file */ + crc1 = pgFileGetCRC(path1, true, true, NULL); + + return EQ_CRC32C(crc1, crc2); +} diff --git a/src/dir.c b/src/dir.c index 241ddb5f..710598b7 100644 --- a/src/dir.c +++ b/src/dir.c @@ -261,36 +261,55 @@ delete_file: } pg_crc32 -pgFileGetCRC(const char *file_path, bool use_crc32c) +pgFileGetCRC(const char *file_path, bool use_crc32c, bool raise_on_deleted, + size_t *bytes_read) { FILE *fp; pg_crc32 crc = 0; char buf[1024]; size_t len; + size_t total = 0; int errno_tmp; + INIT_FILE_CRC32(use_crc32c, crc); + /* open file in binary read mode */ fp = fopen(file_path, PG_BINARY_R); if (fp == NULL) - elog(ERROR, "cannot open file \"%s\": %s", - file_path, strerror(errno)); + { + if (!raise_on_deleted && errno == ENOENT) + { + FIN_FILE_CRC32(use_crc32c, crc); + return crc; + } + else + elog(ERROR, "cannot open file \"%s\": %s", + file_path, strerror(errno)); + } - /* calc CRC of backup file */ - INIT_FILE_CRC32(use_crc32c, crc); - while ((len = fread(buf, 1, sizeof(buf), fp)) == sizeof(buf)) + /* calc CRC of file */ + for (;;) { if (interrupted) elog(ERROR, "interrupted during CRC calculation"); + + len = fread(buf, 1, sizeof(buf), fp); + if(len == 0) + break; + /* update CRC */ COMP_FILE_CRC32(use_crc32c, crc, buf, len); + total += len; } + + if (bytes_read) + *bytes_read = total; + errno_tmp = errno; if (!feof(fp)) elog(WARNING, "cannot read \"%s\": %s", file_path, strerror(errno_tmp)); - if (len > 0) - COMP_FILE_CRC32(use_crc32c, crc, buf, len); - FIN_FILE_CRC32(use_crc32c, crc); + FIN_FILE_CRC32(use_crc32c, crc); fclose(fp); return crc; diff --git a/src/merge.c b/src/merge.c index a6ca504a..c0175f70 100644 --- a/src/merge.c +++ b/src/merge.c @@ -524,7 +524,7 @@ merge_files(void *arg) * do that. */ file->write_size = pgFileSize(to_path_tmp); - file->crc = pgFileGetCRC(to_path_tmp, false); + file->crc = pgFileGetCRC(to_path_tmp, false, true, NULL); } } else diff --git a/src/parsexlog.c b/src/parsexlog.c index 1912e7b6..a1ca14da 100644 --- a/src/parsexlog.c +++ b/src/parsexlog.c @@ -88,7 +88,7 @@ static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime typedef struct XLogPageReadPrivate { - int thread_num; + int thread_num; const char *archivedir; TimeLineID tli; uint32 xlog_seg_size; @@ -132,8 +132,7 @@ static XLogReaderState *InitXLogPageRead(XLogPageReadPrivate *private_data, TimeLineID tli, uint32 xlog_seg_size, bool allocate_reader); static void CleanupXLogPageRead(XLogReaderState *xlogreader); -static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, - int elevel); +static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel); static XLogSegNo nextSegNoToRead = 0; static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -239,11 +238,17 @@ doExtractPageMap(void *arg) */ if (XLogRecPtrIsInvalid(found)) { - elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X. %s", - private_data->thread_num, - (uint32) (extract_arg->startpoint >> 32), - (uint32) (extract_arg->startpoint), - (xlogreader->errormsg_buf[0] != '\0')?xlogreader->errormsg_buf:""); + if (xlogreader->errormsg_buf[0] != '\0') + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s", + private_data->thread_num, + (uint32) (extract_arg->startpoint >> 32), + (uint32) (extract_arg->startpoint), + xlogreader->errormsg_buf); + else + elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X", + private_data->thread_num, + (uint32) (extract_arg->startpoint >> 32), + (uint32) (extract_arg->startpoint)); PrintXLogCorruptionMsg(private_data, ERROR); } extract_arg->startpoint = found; @@ -766,6 +771,116 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, return res; } +/* + * Get LSN of last or prior record within the WAL segment with number 'segno'. + * If 'start_lsn' + * is in the segment with number 'segno' then start from 'start_lsn', otherwise + * start from offset 0 within the segment. + * + * Returns LSN which points to end+1 of the last WAL record if seek_prev_segment + * is true. Otherwise returns LSN of the record prior to stop_lsn. + */ +XLogRecPtr +get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, + XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment, + uint32 seg_size) +{ + XLogReaderState *xlogreader; + XLogPageReadPrivate private; + XLogRecPtr startpoint; + XLogSegNo start_segno; + XLogSegNo segno; + XLogRecPtr res = InvalidXLogRecPtr; + + GetXLogSegNo(stop_lsn, segno, seg_size); + + if (segno <= 1) + elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno); + + if (seek_prev_segment) + segno = segno - 1; + + xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true); + + /* + * Calculate startpoint. Decide: we should use 'start_lsn' or offset 0. + */ + GetXLogSegNo(start_lsn, start_segno, seg_size); + if (start_segno == segno) + startpoint = start_lsn; + else + { + XLogRecPtr found; + + GetXLogRecPtr(segno, 0, seg_size, startpoint); + found = XLogFindNextRecord(xlogreader, startpoint); + + if (XLogRecPtrIsInvalid(found)) + { + if (xlogreader->errormsg_buf[0] != '\0') + elog(WARNING, "Could not read WAL record at %X/%X: %s", + (uint32) (startpoint >> 32), (uint32) (startpoint), + xlogreader->errormsg_buf); + else + elog(WARNING, "Could not read WAL record at %X/%X", + (uint32) (startpoint >> 32), (uint32) (startpoint)); + PrintXLogCorruptionMsg(&private, ERROR); + } + startpoint = found; + } + + while (true) + { + XLogRecord *record; + char *errormsg; + XLogSegNo next_segno = 0; + + if (interrupted) + elog(ERROR, "Interrupted during WAL reading"); + + record = XLogReadRecord(xlogreader, startpoint, &errormsg); + if (record == NULL) + { + XLogRecPtr errptr; + + errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr : + startpoint; + + if (errormsg) + elog(WARNING, "Could not read WAL record at %X/%X: %s", + (uint32) (errptr >> 32), (uint32) (errptr), + errormsg); + else + elog(WARNING, "Could not read WAL record at %X/%X", + (uint32) (errptr >> 32), (uint32) (errptr)); + PrintXLogCorruptionMsg(&private, ERROR); + } + + /* continue reading at next record */ + startpoint = InvalidXLogRecPtr; + + GetXLogSegNo(xlogreader->EndRecPtr, next_segno, seg_size); + if (next_segno > segno) + break; + + if (seek_prev_segment) + { + /* end+1 of last record read */ + res = xlogreader->EndRecPtr; + } + else + res = xlogreader->ReadRecPtr; + + if (xlogreader->EndRecPtr >= stop_lsn) + break; + } + + CleanupXLogPageRead(xlogreader); + XLogReaderFree(xlogreader); + + return res; +} + #ifdef HAVE_LIBZ /* * Show error during work with compressed file @@ -807,14 +922,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (!IsInXLogSeg(targetPagePtr, private_data->xlogsegno, private_data->xlog_seg_size)) { - elog(VERBOSE, "Thread [%d]: Need to switch to segno next to %X/%X, current LSN %X/%X", + elog(VERBOSE, "Thread [%d]: Need to switch to the next WAL segment, page LSN %X/%X, record being read LSN %X/%X", private_data->thread_num, (uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr), (uint32) (xlogreader->currRecPtr >> 32), (uint32) (xlogreader->currRecPtr )); /* - * if the last record on the page is not complete, + * If the last record on the page is not complete, * we must continue reading pages in the same thread */ if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) && @@ -1035,6 +1150,12 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel) private_data->gz_xlogpath); #endif } + else + { + /* Cannot tell what happened specifically */ + elog(elevel, "Thread [%d]: An error occured during WAL reading", + private_data->thread_num); + } } /* diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 54ab6f2f..0dc0cbf1 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -515,7 +515,8 @@ extern pgFile *pgFileNew(const char *path, bool omit_symlink); extern pgFile *pgFileInit(const char *path); extern void pgFileDelete(pgFile *file); extern void pgFileFree(void *file); -extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c); +extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c, + bool raise_on_deleted, size_t *bytes_read); extern int pgFileComparePath(const void *f1, const void *f2); extern int pgFileComparePathDesc(const void *f1, const void *f2); extern int pgFileCompareLinked(const void *f1, const void *f2); @@ -536,7 +537,7 @@ extern void push_wal_file(const char *from_path, const char *to_path, bool is_compress, bool overwrite); extern void get_wal_file(const char *from_path, const char *to_path); -extern bool calc_file_checksum(pgFile *file); +extern void calc_file_checksum(pgFile *file); extern bool check_file_pages(pgFile* file, XLogRecPtr stop_lsn, @@ -559,6 +560,9 @@ extern bool read_recovery_info(const char *archivedir, TimeLineID tli, TransactionId *recovery_xid); extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, TimeLineID target_tli, uint32 seg_size); +extern XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn, + XLogRecPtr stop_lsn, TimeLineID tli, + bool seek_prev_segment, uint32 seg_size); /* in util.c */ extern TimeLineID get_current_timeline(bool safe); diff --git a/src/util.c b/src/util.c index 26329cec..b171f408 100644 --- a/src/util.c +++ b/src/util.c @@ -337,7 +337,7 @@ set_min_recovery_point(pgFile *file, const char *backup_path, writeControlFile(&ControlFile, fullpath); /* Update pg_control checksum in backup_list */ - file->crc = pgFileGetCRC(fullpath, false); + file->crc = pgFileGetCRC(fullpath, false, true, NULL); pg_free(buffer); } diff --git a/src/validate.c b/src/validate.c index 69f62cba..8f0c1c08 100644 --- a/src/validate.c +++ b/src/validate.c @@ -224,7 +224,8 @@ pgBackupValidateFiles(void *arg) * To avoid this problem we need to use different algorithm, CRC-32 in * this case. */ - crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021); + crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021, + true, NULL); if (crc != file->crc) { elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X", diff --git a/tests/archive.py b/tests/archive.py index 4ed783d6..355c4b07 100644 --- a/tests/archive.py +++ b/tests/archive.py @@ -1,11 +1,12 @@ import os +import shutil +import gzip import unittest from .helpers.ptrack_helpers import ProbackupTest, ProbackupException, archive_script from datetime import datetime, timedelta import subprocess from sys import exit from time import sleep -from shutil import copyfile module_name = 'archive' @@ -246,7 +247,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): node.append_conf( 'postgresql.auto.conf', "archive_command = '{0} %p %f'".format( archive_script_path)) + node.slow_start() + try: self.backup_node( backup_dir, 'node', node, @@ -260,6 +263,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): "Expecting Error because pg_stop_backup failed to answer.\n " "Output: {0} \n CMD: {1}".format( repr(self.output), self.cmd)) + except ProbackupException as e: self.assertTrue( "ERROR: pg_stop_backup doesn't answer" in e.message and @@ -325,7 +329,15 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): ) self.assertFalse('pg_probackup archive-push completed successfully' in log_content) - os.remove(file) + wal_src = os.path.join( + node.data_dir, 'pg_wal', '000000010000000000000001') + if self.archive_compress: + with open(wal_src, 'rb') as f_in, gzip.open( + file, 'wb', compresslevel=1) as f_out: + shutil.copyfileobj(f_in, f_out) + else: + shutil.copyfile(wal_src, file) + self.switch_wal_segment(node) sleep(5) @@ -498,10 +510,6 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): "postgres", "CHECKPOINT") -# copyfile( -# os.path.join(backup_dir, 'wal/master/000000010000000000000002'), -# os.path.join(backup_dir, 'wal/replica/000000010000000000000002')) - backup_id = self.backup_node( backup_dir, 'replica', replica, backup_type='page', @@ -596,10 +604,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase): "md5(repeat(i::text,10))::tsvector as tsvector " "from generate_series(0, 60000) i") - # TAKE FULL ARCHIVE BACKUP FROM REPLICA - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000001'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000001')) + master.psql( + "postgres", + "CHECKPOINT") backup_id = self.backup_node( backup_dir, 'replica', replica, diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index 7b4b410b..b8a09343 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -874,8 +874,8 @@ class ProbackupTest(object): return out_dict def set_archiving( - self, backup_dir, instance, node, replica=False, overwrite=False, compress=False, - old_binary=False): + self, backup_dir, instance, node, replica=False, + overwrite=False, compress=False, old_binary=False): if replica: archive_mode = 'always' diff --git a/tests/replica.py b/tests/replica.py index 1ab8515e..ce976397 100644 --- a/tests/replica.py +++ b/tests/replica.py @@ -5,7 +5,6 @@ from datetime import datetime, timedelta import subprocess from sys import exit import time -from shutil import copyfile module_name = 'replica' @@ -27,8 +26,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s', 'ptrack_enable': 'on'} + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'ptrack_enable': 'on'} ) master.start() self.init_pb(backup_dir) @@ -144,7 +144,6 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): pg_options={ 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s', 'archive_timeout': '10s'} ) self.init_pb(backup_dir) @@ -171,7 +170,8 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): self.restore_node(backup_dir, 'master', replica) # Settings for Replica - self.set_replica(master, replica) + self.add_instance(backup_dir, 'replica', replica) + self.set_replica(master, replica, synchronous=True) self.set_archiving(backup_dir, 'replica', replica, replica=True) replica.slow_start(replica=True) @@ -187,31 +187,23 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): "postgres", "insert into t_heap as select i as id, md5(i::text) as text, " "md5(repeat(i::text,10))::tsvector as tsvector " - "from generate_series(256,5120) i") + "from generate_series(256,25120) i") before = master.safe_psql("postgres", "SELECT * FROM t_heap") - self.add_instance(backup_dir, 'replica', replica) - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000003'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000003')) + master.psql( + "postgres", + "CHECKPOINT") - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000004'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000004')) - - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000005'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000005')) + self.wait_until_replica_catch_with_master(master, replica) backup_id = self.backup_node( backup_dir, 'replica', replica, options=[ - '--archive-timeout=30', + '--archive-timeout=60', '--master-host=localhost', '--master-db=postgres', - '--master-port={0}'.format(master.port), - '--stream']) + '--master-port={0}'.format(master.port)]) self.validate_pb(backup_dir, 'replica') self.assertEqual( @@ -222,8 +214,13 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): base_dir="{0}/{1}/node".format(module_name, fname)) node.cleanup() self.restore_node(backup_dir, 'replica', data_dir=node.data_dir) + node.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(node.port)) + + node.append_conf( + 'postgresql.auto.conf', 'archive_mode = off'.format(node.port)) + node.slow_start() # CHECK DATA CORRECTNESS @@ -234,23 +231,31 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): # Change data on master, make PAGE backup from replica, # restore taken backup and check that restored data equal # to original data - master.psql( - "postgres", - "insert into t_heap as select i as id, md5(i::text) as text, " - "md5(repeat(i::text,10))::tsvector as tsvector " - "from generate_series(512,22680) i") + master.pgbench_init(scale=5) - before = master.safe_psql("postgres", "SELECT * FROM t_heap") + pgbench = master.pgbench( + options=['-T', '30', '-c', '2', '--no-vacuum']) + +# master.psql( +# "postgres", +# "insert into t_heap as select i as id, md5(i::text) as text, " +# "md5(repeat(i::text,10))::tsvector as tsvector " +# "from generate_series(512,25120) i") backup_id = self.backup_node( backup_dir, 'replica', replica, backup_type='page', options=[ - '--archive-timeout=30', + '--archive-timeout=60', '--master-host=localhost', '--master-db=postgres', - '--master-port={0}'.format(master.port), - '--stream']) + '--master-port={0}'.format(master.port)]) + + pgbench.wait() + + self.switch_wal_segment(master) + + before = master.safe_psql("postgres", "SELECT * FROM pgbench_accounts") self.validate_pb(backup_dir, 'replica') self.assertEqual( @@ -258,17 +263,21 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): # RESTORE PAGE BACKUP TAKEN FROM replica self.restore_node( - backup_dir, 'replica', data_dir=node.data_dir, backup_id=backup_id) + backup_dir, 'replica', data_dir=node.data_dir, + backup_id=backup_id) node.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(node.port)) + node.append_conf( 'postgresql.auto.conf', 'archive_mode = off') + node.slow_start() # CHECK DATA CORRECTNESS - after = node.safe_psql("postgres", "SELECT * FROM t_heap") - self.assertEqual(before, after) + after = node.safe_psql("postgres", "SELECT * FROM pgbench_accounts") + self.assertEqual( + before, after, 'Restored data is not equal to original') self.add_instance(backup_dir, 'node', node) self.backup_node( @@ -290,8 +299,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s'} + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'archive_timeout': '10s'} ) self.init_pb(backup_dir) self.add_instance(backup_dir, 'master', master) @@ -310,7 +320,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): "postgres", "create table t_heap as select i as id, md5(i::text) as text, " "md5(repeat(i::text,10))::tsvector as tsvector " - "from generate_series(0,256) i") + "from generate_series(0,8192) i") before = master.safe_psql("postgres", "SELECT * FROM t_heap") @@ -320,6 +330,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): backup_dir, 'master', replica, options=['-R']) # Settings for Replica + self.add_instance(backup_dir, 'replica', replica) self.set_archiving(backup_dir, 'replica', replica, replica=True) replica.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(replica.port)) @@ -328,13 +339,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): replica.slow_start(replica=True) - self.add_instance(backup_dir, 'replica', replica) - - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000003'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000003')) - - self.backup_node(backup_dir, 'replica', replica) + self.backup_node( + backup_dir, 'replica', replica, + options=['--archive-timeout=30s', '--stream']) # Clean after yourself self.del_test_dir(module_name, fname) @@ -353,14 +360,13 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s'} + 'wal_level': 'replica', + 'max_wal_senders': '2', + 'archive_timeout': '10s'} ) self.init_pb(backup_dir) self.add_instance(backup_dir, 'master', master) self.set_archiving(backup_dir, 'master', master) - # force more frequent wal switch - #master.append_conf('postgresql.auto.conf', 'archive_timeout = 10') master.slow_start() replica = self.make_simple_node( @@ -369,6 +375,22 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): self.backup_node(backup_dir, 'master', master) + master.psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,165000) i") + + master.psql( + "postgres", + "CHECKPOINT") + + master.psql( + "postgres", + "create table t_heap_1 as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,165000) i") + self.restore_node( backup_dir, 'master', replica, options=['-R']) @@ -376,36 +398,35 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): self.add_instance(backup_dir, 'replica', replica) self.set_archiving(backup_dir, 'replica', replica, replica=True) - # stupid hack - copyfile( - os.path.join(backup_dir, 'wal/master/000000010000000000000001'), - os.path.join(backup_dir, 'wal/replica/000000010000000000000001')) - replica.append_conf( 'postgresql.auto.conf', 'port = {0}'.format(replica.port)) - replica.append_conf( - 'postgresql.auto.conf', 'hot_standby = on') + replica.slow_start(replica=True) + + self.wait_until_replica_catch_with_master(master, replica) replica.append_conf( 'recovery.conf', "recovery_min_apply_delay = '300s'") - replica.slow_start(replica=True) + replica.restart() master.pgbench_init(scale=10) pgbench = master.pgbench( - options=['-T', '30', '-c', '2', '--no-vacuum']) + options=['-T', '60', '-c', '2', '--no-vacuum']) self.backup_node( - backup_dir, 'replica', replica) + backup_dir, 'replica', + replica, options=['--archive-timeout=60s']) self.backup_node( backup_dir, 'replica', replica, - data_dir=replica.data_dir, backup_type='page') + data_dir=replica.data_dir, + backup_type='page', options=['--archive-timeout=60s']) self.backup_node( - backup_dir, 'replica', replica, backup_type='delta') + backup_dir, 'replica', replica, + backup_type='delta', options=['--archive-timeout=60s']) pgbench.wait() @@ -442,8 +463,8 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): set_replication=True, initdb_params=['--data-checksums'], pg_options={ - 'wal_level': 'replica', 'max_wal_senders': '2', - 'checkpoint_timeout': '30s'} + 'wal_level': 'replica', + 'max_wal_senders': '2'} ) self.init_pb(backup_dir) self.add_instance(backup_dir, 'master', master) @@ -530,4 +551,4 @@ class ReplicaTest(ProbackupTest, unittest.TestCase): exit(1) # Clean after yourself - self.del_test_dir(module_name, fname) \ No newline at end of file + self.del_test_dir(module_name, fname)