1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-19 21:38:02 +02:00

Merge branch 'master' into pgpro-2071

This commit is contained in:
Arthur Zakirov 2018-11-23 19:07:19 +03:00
commit 0e5cc88062
11 changed files with 422 additions and 193 deletions

View File

@ -108,7 +108,7 @@ static int checkpoint_timeout(void);
//static void backup_list_file(parray *files, const char *root, ) //static void backup_list_file(parray *files, const char *root, )
static void parse_backup_filelist_filenames(parray *files, const char *root); static void parse_backup_filelist_filenames(parray *files, const char *root);
static void wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, static XLogRecPtr wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn,
bool wait_prev_segment); bool wait_prev_segment);
static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup);
static void make_pagemap_from_ptrack(parray *files); static void make_pagemap_from_ptrack(parray *files);
@ -1524,8 +1524,11 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_filenode,
* be archived in archive 'wal' directory regardless stream mode. * be archived in archive 'wal' directory regardless stream mode.
* *
* If 'wait_prev_segment' wait for previous segment. * If 'wait_prev_segment' wait for previous segment.
*
* Returns LSN of last valid record if wait_prev_segment is not true, otherwise
* returns InvalidXLogRecPtr.
*/ */
static void static XLogRecPtr
wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment) wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
{ {
TimeLineID tli; TimeLineID tli;
@ -1565,25 +1568,22 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
DATABASE_DIR, PG_XLOG_DIR); DATABASE_DIR, PG_XLOG_DIR);
join_path_components(wal_segment_path, pg_wal_dir, wal_segment); join_path_components(wal_segment_path, pg_wal_dir, wal_segment);
wal_segment_dir = pg_wal_dir; wal_segment_dir = pg_wal_dir;
timeout = (uint32) checkpoint_timeout();
timeout = timeout + timeout * 0.1;
} }
else else
{ {
join_path_components(wal_segment_path, arclog_path, wal_segment); join_path_components(wal_segment_path, arclog_path, wal_segment);
wal_segment_dir = arclog_path; wal_segment_dir = arclog_path;
}
if (instance_config.archive_timeout > 0) if (instance_config.archive_timeout > 0)
timeout = instance_config.archive_timeout; timeout = instance_config.archive_timeout;
else else
timeout = ARCHIVE_TIMEOUT_DEFAULT; timeout = ARCHIVE_TIMEOUT_DEFAULT;
}
if (wait_prev_segment) if (wait_prev_segment)
elog(LOG, "Looking for segment: %s", wal_segment); elog(LOG, "Looking for segment: %s", wal_segment);
else else
elog(LOG, "Looking for LSN: %X/%X in segment: %s", elog(LOG, "Looking for LSN %X/%X in segment: %s",
(uint32) (lsn >> 32), (uint32) lsn, wal_segment); (uint32) (lsn >> 32), (uint32) lsn, wal_segment);
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
@ -1615,7 +1615,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
{ {
/* Do not check LSN for previous WAL segment */ /* Do not check LSN for previous WAL segment */
if (wait_prev_segment) if (wait_prev_segment)
return; return InvalidXLogRecPtr;
/* /*
* A WAL segment found. Check LSN on it. * A WAL segment found. Check LSN on it.
@ -1625,7 +1625,29 @@ wait_wal_lsn(XLogRecPtr lsn, bool is_start_lsn, bool wait_prev_segment)
/* Target LSN was found */ /* Target LSN was found */
{ {
elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn); elog(LOG, "Found LSN: %X/%X", (uint32) (lsn >> 32), (uint32) lsn);
return; return lsn;
}
/*
* If we failed to get LSN of valid record in a reasonable time, try
* to get LSN of last valid record prior to the target LSN. But only
* in case of a backup from a replica.
*/
if (!exclusive_backup && current.from_replica &&
(try_count > timeout / 4))
{
XLogRecPtr res;
res = get_last_wal_lsn(wal_segment_dir, current.start_lsn,
lsn, tli, false,
instance_config.xlog_seg_size);
if (!XLogRecPtrIsInvalid(res))
{
/* LSN of the prior record was found */
elog(LOG, "Found prior LSN: %X/%X, it is used as stop LSN",
(uint32) (res >> 32), (uint32) res);
return res;
}
} }
} }
@ -1748,6 +1770,7 @@ pg_stop_backup(pgBackup *backup)
size_t len; size_t len;
char *val = NULL; char *val = NULL;
char *stop_backup_query = NULL; char *stop_backup_query = NULL;
bool stop_lsn_exists = false;
/* /*
* We will use this values if there are no transactions between start_lsn * We will use this values if there are no transactions between start_lsn
@ -1826,7 +1849,11 @@ pg_stop_backup(pgBackup *backup)
#endif #endif
" labelfile," " labelfile,"
" spcmapfile" " spcmapfile"
#if PG_VERSION_NUM >= 100000
" FROM pg_catalog.pg_stop_backup(false, false)";
#else
" FROM pg_catalog.pg_stop_backup(false)"; " FROM pg_catalog.pg_stop_backup(false)";
#endif
else else
stop_backup_query = "SELECT" stop_backup_query = "SELECT"
" pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot())," " pg_catalog.txid_snapshot_xmax(pg_catalog.txid_current_snapshot()),"
@ -1834,7 +1861,11 @@ pg_stop_backup(pgBackup *backup)
" lsn," " lsn,"
" labelfile," " labelfile,"
" spcmapfile" " spcmapfile"
#if PG_VERSION_NUM >= 100000
" FROM pg_catalog.pg_stop_backup(false, false)";
#else
" FROM pg_catalog.pg_stop_backup(false)"; " FROM pg_catalog.pg_stop_backup(false)";
#endif
} }
else else
@ -1923,7 +1954,29 @@ pg_stop_backup(pgBackup *backup)
if (!XRecOffIsValid(stop_backup_lsn)) if (!XRecOffIsValid(stop_backup_lsn))
{ {
if (XRecOffIsNull(stop_backup_lsn)) if (XRecOffIsNull(stop_backup_lsn))
stop_backup_lsn = stop_backup_lsn + SizeOfXLogLongPHD; {
char *xlog_path,
stream_xlog_path[MAXPGPATH];
if (stream_wal)
{
pgBackupGetPath2(backup, stream_xlog_path,
lengthof(stream_xlog_path),
DATABASE_DIR, PG_XLOG_DIR);
xlog_path = stream_xlog_path;
}
else
xlog_path = arclog_path;
stop_backup_lsn = get_last_wal_lsn(xlog_path, backup->start_lsn,
stop_backup_lsn, backup->tli,
true, instance_config.xlog_seg_size);
/*
* Do not check existance of LSN again below using
* wait_wal_lsn().
*/
stop_lsn_exists = true;
}
else else
elog(ERROR, "Invalid stop_backup_lsn value %X/%X", elog(ERROR, "Invalid stop_backup_lsn value %X/%X",
(uint32) (stop_backup_lsn >> 32), (uint32) (stop_backup_lsn)); (uint32) (stop_backup_lsn >> 32), (uint32) (stop_backup_lsn));
@ -2029,13 +2082,15 @@ pg_stop_backup(pgBackup *backup)
stream_xlog_path[MAXPGPATH]; stream_xlog_path[MAXPGPATH];
/* Wait for stop_lsn to be received by replica */ /* Wait for stop_lsn to be received by replica */
if (current.from_replica) /* XXX Do we need this? */
wait_replica_wal_lsn(stop_backup_lsn, false); // if (current.from_replica)
// wait_replica_wal_lsn(stop_backup_lsn, false);
/* /*
* Wait for stop_lsn to be archived or streamed. * Wait for stop_lsn to be archived or streamed.
* We wait for stop_lsn in stream mode just in case. * We wait for stop_lsn in stream mode just in case.
*/ */
wait_wal_lsn(stop_backup_lsn, false, false); if (!stop_lsn_exists)
stop_backup_lsn = wait_wal_lsn(stop_backup_lsn, false, false);
if (stream_wal) if (stream_wal)
{ {
@ -2623,7 +2678,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
if (!XLogRecPtrIsInvalid(stop_backup_lsn)) if (!XLogRecPtrIsInvalid(stop_backup_lsn))
{ {
if (xlogpos > stop_backup_lsn) if (xlogpos >= stop_backup_lsn)
{ {
stop_stream_lsn = xlogpos; stop_stream_lsn = xlogpos;
return true; return true;

View File

@ -29,6 +29,9 @@ typedef union DataPage
char data[BLCKSZ]; char data[BLCKSZ];
} DataPage; } DataPage;
static bool
fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed);
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
/* Implementation of zlib compression method */ /* Implementation of zlib compression method */
static int32 static int32
@ -1092,14 +1095,21 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
FILE *in = NULL; FILE *in = NULL;
FILE *out=NULL; FILE *out=NULL;
char buf[XLOG_BLCKSZ]; char buf[XLOG_BLCKSZ];
const char *to_path_p = to_path; const char *to_path_p;
char to_path_temp[MAXPGPATH]; char to_path_temp[MAXPGPATH];
int errno_temp; int errno_temp;
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
char gz_to_path[MAXPGPATH]; char gz_to_path[MAXPGPATH];
gzFile gz_out = NULL; gzFile gz_out = NULL;
if (is_compress)
{
snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path);
to_path_p = gz_to_path;
}
else
#endif #endif
to_path_p = to_path;
/* open file for read */ /* open file for read */
in = fopen(from_path, PG_BINARY_R); in = fopen(from_path, PG_BINARY_R);
@ -1107,15 +1117,20 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path,
strerror(errno)); strerror(errno));
/* Check if possible to skip copying */
if (fileExists(to_path_p))
{
if (fileEqualCRC(from_path, to_path_p, is_compress))
return;
/* Do not copy and do not rise error. Just quit as normal. */
else if (!overwrite)
elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p);
}
/* open backup file for write */ /* open backup file for write */
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (is_compress) if (is_compress)
{ {
snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path);
if (!overwrite && fileExists(gz_to_path))
elog(ERROR, "WAL segment \"%s\" already exists.", gz_to_path);
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path);
gz_out = gzopen(to_path_temp, PG_BINARY_W); gz_out = gzopen(to_path_temp, PG_BINARY_W);
@ -1123,15 +1138,10 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
elog(ERROR, "Cannot set compression level %d to file \"%s\": %s", elog(ERROR, "Cannot set compression level %d to file \"%s\": %s",
instance_config.compress_level, to_path_temp, instance_config.compress_level, to_path_temp,
get_gz_error(gz_out, errno)); get_gz_error(gz_out, errno));
to_path_p = gz_to_path;
} }
else else
#endif #endif
{ {
if (!overwrite && fileExists(to_path))
elog(ERROR, "WAL segment \"%s\" already exists.", to_path);
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
out = fopen(to_path_temp, PG_BINARY_W); out = fopen(to_path_temp, PG_BINARY_W);
@ -1409,75 +1419,13 @@ get_wal_file(const char *from_path, const char *to_path)
* but created in process of backup, such as stream XLOG files, * but created in process of backup, such as stream XLOG files,
* PG_TABLESPACE_MAP_FILE and PG_BACKUP_LABEL_FILE. * PG_TABLESPACE_MAP_FILE and PG_BACKUP_LABEL_FILE.
*/ */
bool void
calc_file_checksum(pgFile *file) calc_file_checksum(pgFile *file)
{ {
FILE *in;
size_t read_len = 0;
int errno_tmp;
char buf[BLCKSZ];
struct stat st;
pg_crc32 crc;
Assert(S_ISREG(file->mode)); Assert(S_ISREG(file->mode));
INIT_TRADITIONAL_CRC32(crc);
/* reset size summary */ file->crc = pgFileGetCRC(file->path, false, false, &file->read_size);
file->read_size = 0; file->write_size = file->read_size;
file->write_size = 0;
/* open backup mode file for read */
in = fopen(file->path, PG_BINARY_R);
if (in == NULL)
{
FIN_TRADITIONAL_CRC32(crc);
file->crc = crc;
/* maybe deleted, it's not error */
if (errno == ENOENT)
return false;
elog(ERROR, "cannot open source file \"%s\": %s", file->path,
strerror(errno));
}
/* stat source file to change mode of destination file */
if (fstat(fileno(in), &st) == -1)
{
fclose(in);
elog(ERROR, "cannot stat \"%s\": %s", file->path,
strerror(errno));
}
for (;;)
{
read_len = fread(buf, 1, sizeof(buf), in);
if(read_len == 0)
break;
/* update CRC */
COMP_TRADITIONAL_CRC32(crc, buf, read_len);
file->write_size += read_len;
file->read_size += read_len;
}
errno_tmp = errno;
if (!feof(in))
{
fclose(in);
elog(ERROR, "cannot read backup mode file \"%s\": %s",
file->path, strerror(errno_tmp));
}
/* finish CRC calculation and store into pgFile */
FIN_TRADITIONAL_CRC32(crc);
file->crc = crc;
fclose(in);
return true;
} }
/* /*
@ -1725,3 +1673,56 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn,
return is_valid; return is_valid;
} }
static bool
fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed)
{
pg_crc32 crc1;
pg_crc32 crc2;
/* Get checksum of backup file */
#ifdef HAVE_LIBZ
if (path2_is_compressed)
{
char buf [1024];
gzFile gz_in = NULL;
INIT_FILE_CRC32(true, crc2);
gz_in = gzopen(path2, PG_BINARY_R);
if (gz_in == NULL)
/* File cannot be read */
elog(ERROR,
"Cannot compare WAL file \"%s\" with compressed \"%s\"",
path1, path2);
for (;;)
{
size_t read_len = 0;
read_len = gzread(gz_in, buf, sizeof(buf));
if (read_len != sizeof(buf) && !gzeof(gz_in))
/* An error occurred while reading the file */
elog(ERROR,
"Cannot compare WAL file \"%s\" with compressed \"%s\"",
path1, path2);
COMP_FILE_CRC32(true, crc2, buf, read_len);
if (gzeof(gz_in) || read_len == 0)
break;
}
FIN_FILE_CRC32(true, crc2);
if (gzclose(gz_in) != 0)
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
path2, get_gz_error(gz_in, errno));
}
else
#endif
{
crc2 = pgFileGetCRC(path2, true, true, NULL);
}
/* Get checksum of original file */
crc1 = pgFileGetCRC(path1, true, true, NULL);
return EQ_CRC32C(crc1, crc2);
}

View File

@ -261,36 +261,55 @@ delete_file:
} }
pg_crc32 pg_crc32
pgFileGetCRC(const char *file_path, bool use_crc32c) pgFileGetCRC(const char *file_path, bool use_crc32c, bool raise_on_deleted,
size_t *bytes_read)
{ {
FILE *fp; FILE *fp;
pg_crc32 crc = 0; pg_crc32 crc = 0;
char buf[1024]; char buf[1024];
size_t len; size_t len;
size_t total = 0;
int errno_tmp; int errno_tmp;
INIT_FILE_CRC32(use_crc32c, crc);
/* open file in binary read mode */ /* open file in binary read mode */
fp = fopen(file_path, PG_BINARY_R); fp = fopen(file_path, PG_BINARY_R);
if (fp == NULL) if (fp == NULL)
{
if (!raise_on_deleted && errno == ENOENT)
{
FIN_FILE_CRC32(use_crc32c, crc);
return crc;
}
else
elog(ERROR, "cannot open file \"%s\": %s", elog(ERROR, "cannot open file \"%s\": %s",
file_path, strerror(errno)); file_path, strerror(errno));
}
/* calc CRC of backup file */ /* calc CRC of file */
INIT_FILE_CRC32(use_crc32c, crc); for (;;)
while ((len = fread(buf, 1, sizeof(buf), fp)) == sizeof(buf))
{ {
if (interrupted) if (interrupted)
elog(ERROR, "interrupted during CRC calculation"); elog(ERROR, "interrupted during CRC calculation");
len = fread(buf, 1, sizeof(buf), fp);
if(len == 0)
break;
/* update CRC */
COMP_FILE_CRC32(use_crc32c, crc, buf, len); COMP_FILE_CRC32(use_crc32c, crc, buf, len);
total += len;
} }
if (bytes_read)
*bytes_read = total;
errno_tmp = errno; errno_tmp = errno;
if (!feof(fp)) if (!feof(fp))
elog(WARNING, "cannot read \"%s\": %s", file_path, elog(WARNING, "cannot read \"%s\": %s", file_path,
strerror(errno_tmp)); strerror(errno_tmp));
if (len > 0)
COMP_FILE_CRC32(use_crc32c, crc, buf, len);
FIN_FILE_CRC32(use_crc32c, crc);
FIN_FILE_CRC32(use_crc32c, crc);
fclose(fp); fclose(fp);
return crc; return crc;

View File

@ -524,7 +524,7 @@ merge_files(void *arg)
* do that. * do that.
*/ */
file->write_size = pgFileSize(to_path_tmp); file->write_size = pgFileSize(to_path_tmp);
file->crc = pgFileGetCRC(to_path_tmp, false); file->crc = pgFileGetCRC(to_path_tmp, false, true, NULL);
} }
} }
else else

View File

@ -132,8 +132,7 @@ static XLogReaderState *InitXLogPageRead(XLogPageReadPrivate *private_data,
TimeLineID tli, uint32 xlog_seg_size, TimeLineID tli, uint32 xlog_seg_size,
bool allocate_reader); bool allocate_reader);
static void CleanupXLogPageRead(XLogReaderState *xlogreader); static void CleanupXLogPageRead(XLogReaderState *xlogreader);
static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, static void PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel);
int elevel);
static XLogSegNo nextSegNoToRead = 0; static XLogSegNo nextSegNoToRead = 0;
static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t wal_segment_mutex = PTHREAD_MUTEX_INITIALIZER;
@ -239,11 +238,17 @@ doExtractPageMap(void *arg)
*/ */
if (XLogRecPtrIsInvalid(found)) if (XLogRecPtrIsInvalid(found))
{ {
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X. %s", if (xlogreader->errormsg_buf[0] != '\0')
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X: %s",
private_data->thread_num, private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32), (uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint), (uint32) (extract_arg->startpoint),
(xlogreader->errormsg_buf[0] != '\0')?xlogreader->errormsg_buf:""); xlogreader->errormsg_buf);
else
elog(WARNING, "Thread [%d]: could not read WAL record at %X/%X",
private_data->thread_num,
(uint32) (extract_arg->startpoint >> 32),
(uint32) (extract_arg->startpoint));
PrintXLogCorruptionMsg(private_data, ERROR); PrintXLogCorruptionMsg(private_data, ERROR);
} }
extract_arg->startpoint = found; extract_arg->startpoint = found;
@ -766,6 +771,116 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
return res; return res;
} }
/*
* Get LSN of last or prior record within the WAL segment with number 'segno'.
* If 'start_lsn'
* is in the segment with number 'segno' then start from 'start_lsn', otherwise
* start from offset 0 within the segment.
*
* Returns LSN which points to end+1 of the last WAL record if seek_prev_segment
* is true. Otherwise returns LSN of the record prior to stop_lsn.
*/
XLogRecPtr
get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
XLogRecPtr stop_lsn, TimeLineID tli, bool seek_prev_segment,
uint32 seg_size)
{
XLogReaderState *xlogreader;
XLogPageReadPrivate private;
XLogRecPtr startpoint;
XLogSegNo start_segno;
XLogSegNo segno;
XLogRecPtr res = InvalidXLogRecPtr;
GetXLogSegNo(stop_lsn, segno, seg_size);
if (segno <= 1)
elog(ERROR, "Invalid WAL segment number " UINT64_FORMAT, segno);
if (seek_prev_segment)
segno = segno - 1;
xlogreader = InitXLogPageRead(&private, archivedir, tli, seg_size, true);
/*
* Calculate startpoint. Decide: we should use 'start_lsn' or offset 0.
*/
GetXLogSegNo(start_lsn, start_segno, seg_size);
if (start_segno == segno)
startpoint = start_lsn;
else
{
XLogRecPtr found;
GetXLogRecPtr(segno, 0, seg_size, startpoint);
found = XLogFindNextRecord(xlogreader, startpoint);
if (XLogRecPtrIsInvalid(found))
{
if (xlogreader->errormsg_buf[0] != '\0')
elog(WARNING, "Could not read WAL record at %X/%X: %s",
(uint32) (startpoint >> 32), (uint32) (startpoint),
xlogreader->errormsg_buf);
else
elog(WARNING, "Could not read WAL record at %X/%X",
(uint32) (startpoint >> 32), (uint32) (startpoint));
PrintXLogCorruptionMsg(&private, ERROR);
}
startpoint = found;
}
while (true)
{
XLogRecord *record;
char *errormsg;
XLogSegNo next_segno = 0;
if (interrupted)
elog(ERROR, "Interrupted during WAL reading");
record = XLogReadRecord(xlogreader, startpoint, &errormsg);
if (record == NULL)
{
XLogRecPtr errptr;
errptr = XLogRecPtrIsInvalid(startpoint) ? xlogreader->EndRecPtr :
startpoint;
if (errormsg)
elog(WARNING, "Could not read WAL record at %X/%X: %s",
(uint32) (errptr >> 32), (uint32) (errptr),
errormsg);
else
elog(WARNING, "Could not read WAL record at %X/%X",
(uint32) (errptr >> 32), (uint32) (errptr));
PrintXLogCorruptionMsg(&private, ERROR);
}
/* continue reading at next record */
startpoint = InvalidXLogRecPtr;
GetXLogSegNo(xlogreader->EndRecPtr, next_segno, seg_size);
if (next_segno > segno)
break;
if (seek_prev_segment)
{
/* end+1 of last record read */
res = xlogreader->EndRecPtr;
}
else
res = xlogreader->ReadRecPtr;
if (xlogreader->EndRecPtr >= stop_lsn)
break;
}
CleanupXLogPageRead(xlogreader);
XLogReaderFree(xlogreader);
return res;
}
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
/* /*
* Show error during work with compressed file * Show error during work with compressed file
@ -807,14 +922,14 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (!IsInXLogSeg(targetPagePtr, private_data->xlogsegno, if (!IsInXLogSeg(targetPagePtr, private_data->xlogsegno,
private_data->xlog_seg_size)) private_data->xlog_seg_size))
{ {
elog(VERBOSE, "Thread [%d]: Need to switch to segno next to %X/%X, current LSN %X/%X", elog(VERBOSE, "Thread [%d]: Need to switch to the next WAL segment, page LSN %X/%X, record being read LSN %X/%X",
private_data->thread_num, private_data->thread_num,
(uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr), (uint32) (targetPagePtr >> 32), (uint32) (targetPagePtr),
(uint32) (xlogreader->currRecPtr >> 32), (uint32) (xlogreader->currRecPtr >> 32),
(uint32) (xlogreader->currRecPtr )); (uint32) (xlogreader->currRecPtr ));
/* /*
* if the last record on the page is not complete, * If the last record on the page is not complete,
* we must continue reading pages in the same thread * we must continue reading pages in the same thread
*/ */
if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) && if (!XLogRecPtrIsInvalid(xlogreader->currRecPtr) &&
@ -1035,6 +1150,12 @@ PrintXLogCorruptionMsg(XLogPageReadPrivate *private_data, int elevel)
private_data->gz_xlogpath); private_data->gz_xlogpath);
#endif #endif
} }
else
{
/* Cannot tell what happened specifically */
elog(elevel, "Thread [%d]: An error occured during WAL reading",
private_data->thread_num);
}
} }
/* /*

View File

@ -515,7 +515,8 @@ extern pgFile *pgFileNew(const char *path, bool omit_symlink);
extern pgFile *pgFileInit(const char *path); extern pgFile *pgFileInit(const char *path);
extern void pgFileDelete(pgFile *file); extern void pgFileDelete(pgFile *file);
extern void pgFileFree(void *file); extern void pgFileFree(void *file);
extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c); extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c,
bool raise_on_deleted, size_t *bytes_read);
extern int pgFileComparePath(const void *f1, const void *f2); extern int pgFileComparePath(const void *f1, const void *f2);
extern int pgFileComparePathDesc(const void *f1, const void *f2); extern int pgFileComparePathDesc(const void *f1, const void *f2);
extern int pgFileCompareLinked(const void *f1, const void *f2); extern int pgFileCompareLinked(const void *f1, const void *f2);
@ -536,7 +537,7 @@ extern void push_wal_file(const char *from_path, const char *to_path,
bool is_compress, bool overwrite); bool is_compress, bool overwrite);
extern void get_wal_file(const char *from_path, const char *to_path); extern void get_wal_file(const char *from_path, const char *to_path);
extern bool calc_file_checksum(pgFile *file); extern void calc_file_checksum(pgFile *file);
extern bool check_file_pages(pgFile* file, extern bool check_file_pages(pgFile* file,
XLogRecPtr stop_lsn, XLogRecPtr stop_lsn,
@ -559,6 +560,9 @@ extern bool read_recovery_info(const char *archivedir, TimeLineID tli,
TransactionId *recovery_xid); TransactionId *recovery_xid);
extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn, extern bool wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
TimeLineID target_tli, uint32 seg_size); TimeLineID target_tli, uint32 seg_size);
extern XLogRecPtr get_last_wal_lsn(const char *archivedir, XLogRecPtr start_lsn,
XLogRecPtr stop_lsn, TimeLineID tli,
bool seek_prev_segment, uint32 seg_size);
/* in util.c */ /* in util.c */
extern TimeLineID get_current_timeline(bool safe); extern TimeLineID get_current_timeline(bool safe);

View File

@ -337,7 +337,7 @@ set_min_recovery_point(pgFile *file, const char *backup_path,
writeControlFile(&ControlFile, fullpath); writeControlFile(&ControlFile, fullpath);
/* Update pg_control checksum in backup_list */ /* Update pg_control checksum in backup_list */
file->crc = pgFileGetCRC(fullpath, false); file->crc = pgFileGetCRC(fullpath, false, true, NULL);
pg_free(buffer); pg_free(buffer);
} }

View File

@ -224,7 +224,8 @@ pgBackupValidateFiles(void *arg)
* To avoid this problem we need to use different algorithm, CRC-32 in * To avoid this problem we need to use different algorithm, CRC-32 in
* this case. * this case.
*/ */
crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021); crc = pgFileGetCRC(file->path, arguments->backup_version <= 20021,
true, NULL);
if (crc != file->crc) if (crc != file->crc)
{ {
elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X", elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X",

View File

@ -1,11 +1,12 @@
import os import os
import shutil
import gzip
import unittest import unittest
from .helpers.ptrack_helpers import ProbackupTest, ProbackupException, archive_script from .helpers.ptrack_helpers import ProbackupTest, ProbackupException, archive_script
from datetime import datetime, timedelta from datetime import datetime, timedelta
import subprocess import subprocess
from sys import exit from sys import exit
from time import sleep from time import sleep
from shutil import copyfile
module_name = 'archive' module_name = 'archive'
@ -246,7 +247,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
node.append_conf( node.append_conf(
'postgresql.auto.conf', "archive_command = '{0} %p %f'".format( 'postgresql.auto.conf', "archive_command = '{0} %p %f'".format(
archive_script_path)) archive_script_path))
node.slow_start() node.slow_start()
try: try:
self.backup_node( self.backup_node(
backup_dir, 'node', node, backup_dir, 'node', node,
@ -260,6 +263,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
"Expecting Error because pg_stop_backup failed to answer.\n " "Expecting Error because pg_stop_backup failed to answer.\n "
"Output: {0} \n CMD: {1}".format( "Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd)) repr(self.output), self.cmd))
except ProbackupException as e: except ProbackupException as e:
self.assertTrue( self.assertTrue(
"ERROR: pg_stop_backup doesn't answer" in e.message and "ERROR: pg_stop_backup doesn't answer" in e.message and
@ -325,7 +329,15 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
) )
self.assertFalse('pg_probackup archive-push completed successfully' in log_content) self.assertFalse('pg_probackup archive-push completed successfully' in log_content)
os.remove(file) wal_src = os.path.join(
node.data_dir, 'pg_wal', '000000010000000000000001')
if self.archive_compress:
with open(wal_src, 'rb') as f_in, gzip.open(
file, 'wb', compresslevel=1) as f_out:
shutil.copyfileobj(f_in, f_out)
else:
shutil.copyfile(wal_src, file)
self.switch_wal_segment(node) self.switch_wal_segment(node)
sleep(5) sleep(5)
@ -498,10 +510,6 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
"postgres", "postgres",
"CHECKPOINT") "CHECKPOINT")
# copyfile(
# os.path.join(backup_dir, 'wal/master/000000010000000000000002'),
# os.path.join(backup_dir, 'wal/replica/000000010000000000000002'))
backup_id = self.backup_node( backup_id = self.backup_node(
backup_dir, 'replica', backup_dir, 'replica',
replica, backup_type='page', replica, backup_type='page',
@ -596,10 +604,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0, 60000) i") "from generate_series(0, 60000) i")
# TAKE FULL ARCHIVE BACKUP FROM REPLICA master.psql(
copyfile( "postgres",
os.path.join(backup_dir, 'wal/master/000000010000000000000001'), "CHECKPOINT")
os.path.join(backup_dir, 'wal/replica/000000010000000000000001'))
backup_id = self.backup_node( backup_id = self.backup_node(
backup_dir, 'replica', replica, backup_dir, 'replica', replica,

View File

@ -874,8 +874,8 @@ class ProbackupTest(object):
return out_dict return out_dict
def set_archiving( def set_archiving(
self, backup_dir, instance, node, replica=False, overwrite=False, compress=False, self, backup_dir, instance, node, replica=False,
old_binary=False): overwrite=False, compress=False, old_binary=False):
if replica: if replica:
archive_mode = 'always' archive_mode = 'always'

View File

@ -5,7 +5,6 @@ from datetime import datetime, timedelta
import subprocess import subprocess
from sys import exit from sys import exit
import time import time
from shutil import copyfile
module_name = 'replica' module_name = 'replica'
@ -27,8 +26,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
set_replication=True, set_replication=True,
initdb_params=['--data-checksums'], initdb_params=['--data-checksums'],
pg_options={ pg_options={
'wal_level': 'replica', 'max_wal_senders': '2', 'wal_level': 'replica',
'checkpoint_timeout': '30s', 'ptrack_enable': 'on'} 'max_wal_senders': '2',
'ptrack_enable': 'on'}
) )
master.start() master.start()
self.init_pb(backup_dir) self.init_pb(backup_dir)
@ -144,7 +144,6 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
pg_options={ pg_options={
'wal_level': 'replica', 'wal_level': 'replica',
'max_wal_senders': '2', 'max_wal_senders': '2',
'checkpoint_timeout': '30s',
'archive_timeout': '10s'} 'archive_timeout': '10s'}
) )
self.init_pb(backup_dir) self.init_pb(backup_dir)
@ -171,7 +170,8 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
self.restore_node(backup_dir, 'master', replica) self.restore_node(backup_dir, 'master', replica)
# Settings for Replica # Settings for Replica
self.set_replica(master, replica) self.add_instance(backup_dir, 'replica', replica)
self.set_replica(master, replica, synchronous=True)
self.set_archiving(backup_dir, 'replica', replica, replica=True) self.set_archiving(backup_dir, 'replica', replica, replica=True)
replica.slow_start(replica=True) replica.slow_start(replica=True)
@ -187,31 +187,23 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
"postgres", "postgres",
"insert into t_heap as select i as id, md5(i::text) as text, " "insert into t_heap as select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(256,5120) i") "from generate_series(256,25120) i")
before = master.safe_psql("postgres", "SELECT * FROM t_heap") before = master.safe_psql("postgres", "SELECT * FROM t_heap")
self.add_instance(backup_dir, 'replica', replica)
copyfile( master.psql(
os.path.join(backup_dir, 'wal/master/000000010000000000000003'), "postgres",
os.path.join(backup_dir, 'wal/replica/000000010000000000000003')) "CHECKPOINT")
copyfile( self.wait_until_replica_catch_with_master(master, replica)
os.path.join(backup_dir, 'wal/master/000000010000000000000004'),
os.path.join(backup_dir, 'wal/replica/000000010000000000000004'))
copyfile(
os.path.join(backup_dir, 'wal/master/000000010000000000000005'),
os.path.join(backup_dir, 'wal/replica/000000010000000000000005'))
backup_id = self.backup_node( backup_id = self.backup_node(
backup_dir, 'replica', replica, backup_dir, 'replica', replica,
options=[ options=[
'--archive-timeout=30', '--archive-timeout=60',
'--master-host=localhost', '--master-host=localhost',
'--master-db=postgres', '--master-db=postgres',
'--master-port={0}'.format(master.port), '--master-port={0}'.format(master.port)])
'--stream'])
self.validate_pb(backup_dir, 'replica') self.validate_pb(backup_dir, 'replica')
self.assertEqual( self.assertEqual(
@ -222,8 +214,13 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
base_dir="{0}/{1}/node".format(module_name, fname)) base_dir="{0}/{1}/node".format(module_name, fname))
node.cleanup() node.cleanup()
self.restore_node(backup_dir, 'replica', data_dir=node.data_dir) self.restore_node(backup_dir, 'replica', data_dir=node.data_dir)
node.append_conf( node.append_conf(
'postgresql.auto.conf', 'port = {0}'.format(node.port)) 'postgresql.auto.conf', 'port = {0}'.format(node.port))
node.append_conf(
'postgresql.auto.conf', 'archive_mode = off'.format(node.port))
node.slow_start() node.slow_start()
# CHECK DATA CORRECTNESS # CHECK DATA CORRECTNESS
@ -234,23 +231,31 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
# Change data on master, make PAGE backup from replica, # Change data on master, make PAGE backup from replica,
# restore taken backup and check that restored data equal # restore taken backup and check that restored data equal
# to original data # to original data
master.psql( master.pgbench_init(scale=5)
"postgres",
"insert into t_heap as select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(512,22680) i")
before = master.safe_psql("postgres", "SELECT * FROM t_heap") pgbench = master.pgbench(
options=['-T', '30', '-c', '2', '--no-vacuum'])
# master.psql(
# "postgres",
# "insert into t_heap as select i as id, md5(i::text) as text, "
# "md5(repeat(i::text,10))::tsvector as tsvector "
# "from generate_series(512,25120) i")
backup_id = self.backup_node( backup_id = self.backup_node(
backup_dir, 'replica', backup_dir, 'replica',
replica, backup_type='page', replica, backup_type='page',
options=[ options=[
'--archive-timeout=30', '--archive-timeout=60',
'--master-host=localhost', '--master-host=localhost',
'--master-db=postgres', '--master-db=postgres',
'--master-port={0}'.format(master.port), '--master-port={0}'.format(master.port)])
'--stream'])
pgbench.wait()
self.switch_wal_segment(master)
before = master.safe_psql("postgres", "SELECT * FROM pgbench_accounts")
self.validate_pb(backup_dir, 'replica') self.validate_pb(backup_dir, 'replica')
self.assertEqual( self.assertEqual(
@ -258,17 +263,21 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
# RESTORE PAGE BACKUP TAKEN FROM replica # RESTORE PAGE BACKUP TAKEN FROM replica
self.restore_node( self.restore_node(
backup_dir, 'replica', data_dir=node.data_dir, backup_id=backup_id) backup_dir, 'replica', data_dir=node.data_dir,
backup_id=backup_id)
node.append_conf( node.append_conf(
'postgresql.auto.conf', 'port = {0}'.format(node.port)) 'postgresql.auto.conf', 'port = {0}'.format(node.port))
node.append_conf( node.append_conf(
'postgresql.auto.conf', 'archive_mode = off') 'postgresql.auto.conf', 'archive_mode = off')
node.slow_start() node.slow_start()
# CHECK DATA CORRECTNESS # CHECK DATA CORRECTNESS
after = node.safe_psql("postgres", "SELECT * FROM t_heap") after = node.safe_psql("postgres", "SELECT * FROM pgbench_accounts")
self.assertEqual(before, after) self.assertEqual(
before, after, 'Restored data is not equal to original')
self.add_instance(backup_dir, 'node', node) self.add_instance(backup_dir, 'node', node)
self.backup_node( self.backup_node(
@ -290,8 +299,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
set_replication=True, set_replication=True,
initdb_params=['--data-checksums'], initdb_params=['--data-checksums'],
pg_options={ pg_options={
'wal_level': 'replica', 'max_wal_senders': '2', 'wal_level': 'replica',
'checkpoint_timeout': '30s'} 'max_wal_senders': '2',
'archive_timeout': '10s'}
) )
self.init_pb(backup_dir) self.init_pb(backup_dir)
self.add_instance(backup_dir, 'master', master) self.add_instance(backup_dir, 'master', master)
@ -310,7 +320,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
"postgres", "postgres",
"create table t_heap as select i as id, md5(i::text) as text, " "create table t_heap as select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector " "md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,256) i") "from generate_series(0,8192) i")
before = master.safe_psql("postgres", "SELECT * FROM t_heap") before = master.safe_psql("postgres", "SELECT * FROM t_heap")
@ -320,6 +330,7 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
backup_dir, 'master', replica, options=['-R']) backup_dir, 'master', replica, options=['-R'])
# Settings for Replica # Settings for Replica
self.add_instance(backup_dir, 'replica', replica)
self.set_archiving(backup_dir, 'replica', replica, replica=True) self.set_archiving(backup_dir, 'replica', replica, replica=True)
replica.append_conf( replica.append_conf(
'postgresql.auto.conf', 'port = {0}'.format(replica.port)) 'postgresql.auto.conf', 'port = {0}'.format(replica.port))
@ -328,13 +339,9 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
replica.slow_start(replica=True) replica.slow_start(replica=True)
self.add_instance(backup_dir, 'replica', replica) self.backup_node(
backup_dir, 'replica', replica,
copyfile( options=['--archive-timeout=30s', '--stream'])
os.path.join(backup_dir, 'wal/master/000000010000000000000003'),
os.path.join(backup_dir, 'wal/replica/000000010000000000000003'))
self.backup_node(backup_dir, 'replica', replica)
# Clean after yourself # Clean after yourself
self.del_test_dir(module_name, fname) self.del_test_dir(module_name, fname)
@ -353,14 +360,13 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
set_replication=True, set_replication=True,
initdb_params=['--data-checksums'], initdb_params=['--data-checksums'],
pg_options={ pg_options={
'wal_level': 'replica', 'max_wal_senders': '2', 'wal_level': 'replica',
'checkpoint_timeout': '30s'} 'max_wal_senders': '2',
'archive_timeout': '10s'}
) )
self.init_pb(backup_dir) self.init_pb(backup_dir)
self.add_instance(backup_dir, 'master', master) self.add_instance(backup_dir, 'master', master)
self.set_archiving(backup_dir, 'master', master) self.set_archiving(backup_dir, 'master', master)
# force more frequent wal switch
#master.append_conf('postgresql.auto.conf', 'archive_timeout = 10')
master.slow_start() master.slow_start()
replica = self.make_simple_node( replica = self.make_simple_node(
@ -369,6 +375,22 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
self.backup_node(backup_dir, 'master', master) self.backup_node(backup_dir, 'master', master)
master.psql(
"postgres",
"create table t_heap as select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,165000) i")
master.psql(
"postgres",
"CHECKPOINT")
master.psql(
"postgres",
"create table t_heap_1 as select i as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,165000) i")
self.restore_node( self.restore_node(
backup_dir, 'master', replica, options=['-R']) backup_dir, 'master', replica, options=['-R'])
@ -376,36 +398,35 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
self.add_instance(backup_dir, 'replica', replica) self.add_instance(backup_dir, 'replica', replica)
self.set_archiving(backup_dir, 'replica', replica, replica=True) self.set_archiving(backup_dir, 'replica', replica, replica=True)
# stupid hack
copyfile(
os.path.join(backup_dir, 'wal/master/000000010000000000000001'),
os.path.join(backup_dir, 'wal/replica/000000010000000000000001'))
replica.append_conf( replica.append_conf(
'postgresql.auto.conf', 'port = {0}'.format(replica.port)) 'postgresql.auto.conf', 'port = {0}'.format(replica.port))
replica.append_conf( replica.slow_start(replica=True)
'postgresql.auto.conf', 'hot_standby = on')
self.wait_until_replica_catch_with_master(master, replica)
replica.append_conf( replica.append_conf(
'recovery.conf', "recovery_min_apply_delay = '300s'") 'recovery.conf', "recovery_min_apply_delay = '300s'")
replica.slow_start(replica=True) replica.restart()
master.pgbench_init(scale=10) master.pgbench_init(scale=10)
pgbench = master.pgbench( pgbench = master.pgbench(
options=['-T', '30', '-c', '2', '--no-vacuum']) options=['-T', '60', '-c', '2', '--no-vacuum'])
self.backup_node( self.backup_node(
backup_dir, 'replica', replica) backup_dir, 'replica',
replica, options=['--archive-timeout=60s'])
self.backup_node( self.backup_node(
backup_dir, 'replica', replica, backup_dir, 'replica', replica,
data_dir=replica.data_dir, backup_type='page') data_dir=replica.data_dir,
backup_type='page', options=['--archive-timeout=60s'])
self.backup_node( self.backup_node(
backup_dir, 'replica', replica, backup_type='delta') backup_dir, 'replica', replica,
backup_type='delta', options=['--archive-timeout=60s'])
pgbench.wait() pgbench.wait()
@ -442,8 +463,8 @@ class ReplicaTest(ProbackupTest, unittest.TestCase):
set_replication=True, set_replication=True,
initdb_params=['--data-checksums'], initdb_params=['--data-checksums'],
pg_options={ pg_options={
'wal_level': 'replica', 'max_wal_senders': '2', 'wal_level': 'replica',
'checkpoint_timeout': '30s'} 'max_wal_senders': '2'}
) )
self.init_pb(backup_dir) self.init_pb(backup_dir)
self.add_instance(backup_dir, 'master', master) self.add_instance(backup_dir, 'master', master)