1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-09 14:45:47 +02:00

[Issue #228] various fixes and improvements

This commit is contained in:
Grigory Smolkin 2020-06-17 13:39:42 +03:00
parent 29adb5bfdf
commit 9b36081659
8 changed files with 548 additions and 275 deletions

View File

@ -664,8 +664,8 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
for (i = 0; i < parray_num(backup_files_list); i++) for (i = 0; i < parray_num(backup_files_list); i++)
{ {
char to_fullpath[MAXPGPATH]; char to_fullpath[MAXPGPATH];
pgFile *file = (pgFile *) parray_get(backup_files_list, i); pgFile *file = (pgFile *) parray_get(backup_files_list, i);
/* TODO: sync directory ? */ /* TODO: sync directory ? */
if (S_ISDIR(file->mode)) if (S_ISDIR(file->mode))
@ -687,7 +687,21 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
} }
if (fio_sync(to_fullpath, FIO_BACKUP_HOST) != 0) if (fio_sync(to_fullpath, FIO_BACKUP_HOST) != 0)
elog(ERROR, "Failed to sync file \"%s\": %s", to_fullpath, strerror(errno)); elog(ERROR, "Cannot sync file \"%s\": %s", to_fullpath, strerror(errno));
/* fsync header file */
if (file->external_dir_num == 0 &&
file->is_datafile && !file->is_cfs &&
file->n_headers > 0)
{
char to_fullpath_hdr[MAXPGPATH];
snprintf(to_fullpath_hdr, MAXPGPATH, "%s_hdr", to_fullpath);
if (fio_sync(to_fullpath, FIO_BACKUP_HOST) != 0)
elog(ERROR, "Cannot sync file \"%s\": %s", to_fullpath_hdr, strerror(errno));
}
} }
time(&end_time); time(&end_time);
@ -2131,7 +2145,7 @@ backup_files(void *arg)
/* backup file */ /* backup file */
if (file->is_datafile && !file->is_cfs) if (file->is_datafile && !file->is_cfs)
{ {
backup_data_file(&(arguments->conn_arg), file, from_fullpath, to_fullpath, backup_data_file_new(&(arguments->conn_arg), file, from_fullpath, to_fullpath,
arguments->prev_start_lsn, arguments->prev_start_lsn,
current.backup_mode, current.backup_mode,
instance_config.compress_alg, instance_config.compress_alg,
@ -2147,12 +2161,6 @@ backup_files(void *arg)
current.backup_mode, current.parent_backup, true); current.backup_mode, current.parent_backup, true);
} }
/* No point in storing empty, missing or not changed files */
if (file->write_size <= 0)
unlink(to_fullpath);
// elog(ERROR, "Cannot remove file \"%s\": %s", to_fullpath,
// strerror(errno));
if (file->write_size == FILE_NOT_FOUND) if (file->write_size == FILE_NOT_FOUND)
continue; continue;

View File

@ -1906,7 +1906,12 @@ write_backup_filelist(pgBackup *backup, parray *files, const char *root,
len += sprintf(line+len, ",\"n_blocks\":\"%i\"", file->n_blocks); len += sprintf(line+len, ",\"n_blocks\":\"%i\"", file->n_blocks);
if (file->n_headers > 0) if (file->n_headers > 0)
{
len += sprintf(line+len, ",\"n_headers\":\"%i\"", file->n_headers); len += sprintf(line+len, ",\"n_headers\":\"%i\"", file->n_headers);
len += sprintf(line+len, ",\"hdr_crc\":\"%u\"", file->hdr_crc);
// elog(INFO, "CRC INT: %li, CRC UINT: %u", file->crc_hdr, file->crc_hdr);
}
sprintf(line+len, "}\n"); sprintf(line+len, "}\n");

View File

@ -32,6 +32,7 @@ typedef union DataPage
} DataPage; } DataPage;
static BackupPageHeader2* get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version); static BackupPageHeader2* get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version);
static void write_page_headers(BackupPageHeader2 *headers, pgFile *file, const char* to_fullpath);
static bool get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size, static bool get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size,
BlockNumber *blknum, pg_crc32 *crc, bool use_crc32c); BlockNumber *blknum, pg_crc32 *crc, bool use_crc32c);
@ -472,6 +473,7 @@ prepare_page(ConnectionArgs *conn_arg,
return PageIsOk; return PageIsOk;
} }
/* split this function in two: compress() and backup() */
static int static int
compress_and_backup_page(pgFile *file, BlockNumber blknum, compress_and_backup_page(pgFile *file, BlockNumber blknum,
FILE *in, FILE *out, pg_crc32 *crc, FILE *in, FILE *out, pg_crc32 *crc,
@ -526,7 +528,174 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
file->write_size += write_buffer_size; file->write_size += write_buffer_size;
file->uncompressed_size += BLCKSZ; file->uncompressed_size += BLCKSZ;
return compressed_size; return write_buffer_size;
}
/*
* Backup data file in the from_root directory to the to_root directory with
* same relative path. If prev_backup_start_lsn is not NULL, only pages with
* higher lsn will be copied.
* Not just copy file, but read it block by block (use bitmap in case of
* incremental backup), validate checksum, optionally compress and write to
* backup with special header.
*/
void
backup_data_file_new(ConnectionArgs* conn_arg, pgFile *file,
const char *from_fullpath, const char *to_fullpath,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version,
int ptrack_version_num, const char *ptrack_schema, bool missing_ok)
{
int rc;
bool use_pagemap;
char *errmsg = NULL;
BlockNumber err_blknum = 0;
/* page headers */
BackupPageHeader2 *headers = NULL;
/* sanity */
if (file->size % BLCKSZ != 0)
elog(WARNING, "File: \"%s\", invalid file size %zu", from_fullpath, file->size);
/*
* Compute expected number of blocks in the file.
* NOTE This is a normal situation, if the file size has changed
* since the moment we computed it.
*/
file->n_blocks = file->size/BLCKSZ;
/*
* Skip unchanged file only if it exists in previous backup.
* This way we can correctly handle null-sized files which are
* not tracked by pagemap and thus always marked as unchanged.
*/
if ((backup_mode == BACKUP_MODE_DIFF_PAGE ||
backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->pagemap.bitmapsize == PageBitmapIsEmpty &&
file->exists_in_prev && !file->pagemap_isabsent)
{
/*
* There are no changed blocks since last backup. We want to make
* incremental backup, so we should exit.
*/
file->write_size = BYTES_INVALID;
return;
}
/* reset size summary */
file->read_size = 0;
file->write_size = 0;
file->uncompressed_size = 0;
INIT_FILE_CRC32(true, file->crc);
/*
* Read each page, verify checksum and write it to backup.
* If page map is empty or file is not present in previous backup
* backup all pages of the relation.
*
* In PTRACK 1.x there was a problem
* of data files with missing _ptrack map.
* Such files should be fully copied.
*/
if (file->pagemap.bitmapsize == PageBitmapIsEmpty ||
file->pagemap_isabsent || !file->exists_in_prev ||
!file->pagemap.bitmap)
use_pagemap = false;
else
use_pagemap = true;
/* Remote mode */
if (fio_is_remote(FIO_DB_HOST))
{
rc = fio_send_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
backup_mode == BACKUP_MODE_DIFF_DELTA &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
calg, clevel, checksum_version,
/* send pagemap if any */
use_pagemap ? &file->pagemap : NULL,
/* variables for error reporting */
&err_blknum, &errmsg, &headers);
}
else
{
/* TODO: stop handling errors internally */
rc = send_pages(conn_arg, to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
backup_mode == BACKUP_MODE_DIFF_DELTA &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
calg, clevel, checksum_version,
/* send pagemap if any */
use_pagemap ? &file->pagemap : NULL,
&headers, backup_mode, ptrack_version_num, ptrack_schema);
}
/* check for errors */
if (rc == FILE_MISSING)
{
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
err_blknum, to_fullpath, strerror(errno));
else if (rc == PAGE_CORRUPTION)
{
if (errmsg)
elog(ERROR, "Corruption detected in file \"%s\", block %u: %s",
from_fullpath, err_blknum, errmsg);
else
elog(ERROR, "Corruption detected in file \"%s\", block %u",
from_fullpath, err_blknum);
}
/* OPEN_FAILED and READ_FAILED */
else if (rc == OPEN_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot open file \"%s\"", from_fullpath);
}
else if (rc == READ_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot read file \"%s\"", from_fullpath);
}
file->read_size = rc * BLCKSZ;
/* refresh n_blocks for FULL and DELTA */
if (backup_mode == BACKUP_MODE_FULL ||
backup_mode == BACKUP_MODE_DIFF_DELTA)
file->n_blocks = file->read_size / BLCKSZ;
/* Determine that file didn`t changed in case of incremental backup */
if (backup_mode != BACKUP_MODE_FULL &&
file->exists_in_prev &&
file->write_size == 0 &&
file->n_blocks > 0)
{
file->write_size = BYTES_INVALID;
}
cleanup:
/* finish CRC calculation */
FIN_FILE_CRC32(true, file->crc);
/* dump page headers */
write_page_headers(headers, file, to_fullpath);
pg_free(errmsg);
pg_free(file->pagemap.bitmap);
pg_free(headers);
} }
/* /*
@ -600,17 +769,6 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
file->uncompressed_size = 0; file->uncompressed_size = 0;
INIT_FILE_CRC32(true, file->crc); INIT_FILE_CRC32(true, file->crc);
/* open backup file for write */
out = fopen(to_fullpath, PG_BINARY_W);
if (out == NULL)
elog(ERROR, "Cannot open backup file \"%s\": %s",
to_fullpath, strerror(errno));
/* update file permission */
if (chmod(to_fullpath, FILE_PERMISSION) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno));
/* /*
* Read each page, verify checksum and write it to backup. * Read each page, verify checksum and write it to backup.
* If page map is empty or file is not present in previous backup * If page map is empty or file is not present in previous backup
@ -628,17 +786,13 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
else else
use_pagemap = true; use_pagemap = true;
/* enable stdio buffering for output file */
out_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
/* Remote mode */ /* Remote mode */
if (fio_is_remote(FIO_DB_HOST)) if (fio_is_remote(FIO_DB_HOST))
{ {
char *errmsg = NULL; char *errmsg = NULL;
BlockNumber err_blknum = 0; BlockNumber err_blknum = 0;
int rc = fio_send_pages(out, from_fullpath, file, int rc = fio_send_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */ /* send prev backup START_LSN */
backup_mode == BACKUP_MODE_DIFF_DELTA && backup_mode == BACKUP_MODE_DIFF_DELTA &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
@ -692,6 +846,7 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
/* Local mode */ /* Local mode */
else else
{ {
uint cur_pos_out = 0;
/* open source file for read */ /* open source file for read */
in = fopen(from_fullpath, PG_BINARY_R); in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL) if (in == NULL)
@ -753,6 +908,11 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
else if (rc == PageIsOk) else if (rc == PageIsOk)
{ {
/* open local backup file for write */
if (!out)
out = open_local_file_rw(to_fullpath, &out_buf, STDIO_BUFSIZE);
hdr_num++; hdr_num++;
if (!headers) if (!headers)
@ -763,16 +923,15 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
headers[hdr_num].block = blknum; headers[hdr_num].block = blknum;
headers[hdr_num].lsn = page_st.lsn; headers[hdr_num].lsn = page_st.lsn;
headers[hdr_num].checksum = page_st.checksum; headers[hdr_num].checksum = page_st.checksum;
headers[hdr_num].pos = ftell(out); /* optimize */ headers[hdr_num].pos = cur_pos_out; /* optimize */
// elog(INFO, "CRC: %u", headers[hdr_num].checksum);
// elog(INFO, "POS: %u", headers[hdr_num].pos);
headers[hdr_num].compressed_size = compress_and_backup_page(file, blknum, in, out, &(file->crc), headers[hdr_num].compressed_size = compress_and_backup_page(file, blknum, in, out, &(file->crc),
rc, curr_page, calg, clevel, rc, curr_page, calg, clevel,
from_fullpath, to_fullpath); from_fullpath, to_fullpath);
file->n_headers = hdr_num +1; file->n_headers = hdr_num +1;
cur_pos_out += headers[hdr_num].compressed_size;
} }
/* TODO: handle PageIsCorrupted, currently it is done in prepare_page */ /* TODO: handle PageIsCorrupted, currently it is done in prepare_page */
else else
@ -846,8 +1005,6 @@ cleanup:
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno)); strerror(errno));
/* TODO: calculate checksums */
/* TODO: save file to backup_content metainformation */
hdr_size = file->n_headers * sizeof(BackupPageHeader2); hdr_size = file->n_headers * sizeof(BackupPageHeader2);
// elog(INFO, "Size: %lu, aligh: %lu", hdr_size, MAXALIGN(hdr_size)); // elog(INFO, "Size: %lu, aligh: %lu", hdr_size, MAXALIGN(hdr_size));
@ -996,8 +1153,12 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
/* set stdio buffering for input data file */ /* set stdio buffering for input data file */
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
/* get headers for this file */
headers = get_data_file_headers(from_fullpath, tmp_file, parse_program_version(backup->program_version)); headers = get_data_file_headers(from_fullpath, tmp_file, parse_program_version(backup->program_version));
if (!headers && tmp_file->n_headers > 0)
elog(ERROR, "Failed to get headers for file \"%s\"", from_fullpath);
/* /*
* Restore the file. * Restore the file.
* Datafiles are backed up block by block and every block * Datafiles are backed up block by block and every block
@ -1136,7 +1297,7 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
break; break;
if (compressed_size > BLCKSZ) if (compressed_size > BLCKSZ)
elog(ERROR, "Size of a blknum %i exceed BLCKSZ", blknum); elog(ERROR, "Size of a blknum %i exceed BLCKSZ: %i", blknum, compressed_size);
/* incremental restore in LSN mode */ /* incremental restore in LSN mode */
if (map && lsn_map && datapagemap_is_set(lsn_map, blknum)) if (map && lsn_map && datapagemap_is_set(lsn_map, blknum))
@ -1171,7 +1332,6 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
if (headers && if (headers &&
cur_pos_in != headers[n_hdr].pos) cur_pos_in != headers[n_hdr].pos)
{ {
elog(INFO, "Seek to %u", headers[n_hdr].pos);
if (fseek(in, headers[n_hdr].pos, SEEK_SET) != 0) if (fseek(in, headers[n_hdr].pos, SEEK_SET) != 0)
elog(ERROR, "Cannot seek to offset %u of '%s': %s", elog(ERROR, "Cannot seek to offset %u of '%s': %s",
headers[n_hdr].pos, from_fullpath, strerror(errno)); headers[n_hdr].pos, from_fullpath, strerror(errno));
@ -1179,10 +1339,6 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
cur_pos_in = headers[n_hdr].pos; cur_pos_in = headers[n_hdr].pos;
} }
// elog(INFO, "BKLKUM: %u", blknum);
// elog(INFO, "Cur_pos: %u", ftell(in));
/* read a page from file */ /* read a page from file */
read_len = fread(page.data, 1, MAXALIGN(compressed_size), in); read_len = fread(page.data, 1, MAXALIGN(compressed_size), in);
@ -1712,7 +1868,7 @@ check_data_file(ConnectionArgs *arguments, pgFile *file,
/* Valiate pages of datafile in backup one by one */ /* Valiate pages of datafile in backup one by one */
bool bool
check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
uint32 checksum_version, uint32 backup_version) uint32 checksum_version, uint32 backup_version)
{ {
size_t read_len = 0; size_t read_len = 0;
@ -1726,21 +1882,21 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
elog(VERBOSE, "Validate relation blocks for file \"%s\"", fullpath); elog(VERBOSE, "Validate relation blocks for file \"%s\"", fullpath);
/* nothing to validate */
if (backup_version >= 20400 &&
file->n_headers <= 0)
return true;
in = fopen(fullpath, PG_BINARY_R); in = fopen(fullpath, PG_BINARY_R);
if (in == NULL) if (in == NULL)
{
if (errno == ENOENT)
{
elog(WARNING, "File \"%s\" is not found", fullpath);
return false;
}
elog(ERROR, "Cannot open file \"%s\": %s", elog(ERROR, "Cannot open file \"%s\": %s",
fullpath, strerror(errno)); fullpath, strerror(errno));
}
headers = get_data_file_headers(fullpath, file, backup_version); headers = get_data_file_headers(fullpath, file, backup_version);
if (!headers && file->n_headers > 0)
elog(ERROR, "Failed to get headers for file \"%s\"", fullpath);
/* calc CRC of backup file */ /* calc CRC of backup file */
INIT_FILE_CRC32(use_crc32c, crc); INIT_FILE_CRC32(use_crc32c, crc);
@ -1767,6 +1923,8 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
blknum = headers[n_hdr].block; blknum = headers[n_hdr].block;
compressed_size = headers[n_hdr].compressed_size; compressed_size = headers[n_hdr].compressed_size;
// elog(INFO, "POS: %u", headers[n_hdr].pos);
if (cur_pos != headers[n_hdr].pos && if (cur_pos != headers[n_hdr].pos &&
fio_fseek(in, headers[n_hdr].pos) < 0) fio_fseek(in, headers[n_hdr].pos) < 0)
{ {
@ -1774,14 +1932,25 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
blknum, fullpath, strerror(errno)); blknum, fullpath, strerror(errno));
} }
} }
else if (!get_compressed_page_meta(in, fullpath, &compressed_size, else
&blknum, &crc, use_crc32c))
{ {
if (!get_compressed_page_meta(in, fullpath, &compressed_size,
&blknum, &crc, use_crc32c))
break; break;
} }
read_len = fread(compressed_page.data, 1, /* backward compatibility kludge TODO: remove in 3.0 */
MAXALIGN(compressed_size), in); if (compressed_size == PageIsTruncated)
{
elog(LOG, "Block %u of \"%s\" is truncated",
blknum, fullpath);
continue;
}
Assert(compressed_size <= BLCKSZ);
Assert(compressed_size > 0);
read_len = fread(compressed_page.data, 1, MAXALIGN(compressed_size), in);
if (read_len != MAXALIGN(compressed_size)) if (read_len != MAXALIGN(compressed_size))
{ {
elog(WARNING, "Cannot read block %u of \"%s\" read %zu of %d", elog(WARNING, "Cannot read block %u of \"%s\" read %zu of %d",
@ -1789,7 +1958,12 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
return false; return false;
} }
cur_pos += MAXALIGN(compressed_size); // elog(INFO, "POS1: %lu", cur_pos);
cur_pos += read_len;
// elog(INFO, "POS2: %lu", cur_pos);
// elog(INFO, "Compressed size: %i", compressed_size);
COMP_FILE_CRC32(use_crc32c, crc, compressed_page.data, read_len); COMP_FILE_CRC32(use_crc32c, crc, compressed_page.data, read_len);
@ -1806,8 +1980,11 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
file->compress_alg, file->compress_alg,
&errormsg); &errormsg);
if (uncompressed_size < 0 && errormsg != NULL) if (uncompressed_size < 0 && errormsg != NULL)
{
elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s", elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s",
blknum, fullpath, errormsg); blknum, fullpath, errormsg);
return false;
}
if (uncompressed_size != BLCKSZ) if (uncompressed_size != BLCKSZ)
{ {
@ -1816,8 +1993,8 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
is_valid = false; is_valid = false;
continue; continue;
} }
elog(WARNING, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ", elog(WARNING, "Page %u of file \"%s\" uncompressed to %d bytes. != BLCKSZ",
fullpath, uncompressed_size); blknum, fullpath, uncompressed_size);
return false; return false;
} }
@ -1872,185 +2049,9 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
return is_valid; return is_valid;
} }
/* Valiate pages of datafile in backup one by one */
bool
check_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
uint32 checksum_version, uint32 backup_version)
{
size_t read_len = 0;
bool is_valid = true;
FILE *in;
pg_crc32 crc;
bool use_crc32c = backup_version <= 20021 || backup_version >= 20025;
elog(VERBOSE, "Validate relation blocks for file \"%s\"", fullpath);
in = fopen(fullpath, PG_BINARY_R);
if (in == NULL)
{
if (errno == ENOENT)
{
elog(WARNING, "File \"%s\" is not found", fullpath);
return false;
}
elog(ERROR, "Cannot open file \"%s\": %s",
fullpath, strerror(errno));
}
/* calc CRC of backup file */
INIT_FILE_CRC32(use_crc32c, crc);
/* read and validate pages one by one */
while (true)
{
int rc = 0;
DataPage compressed_page; /* used as read buffer */
DataPage page;
BackupPageHeader header;
BlockNumber blknum = 0;
XLogRecPtr page_lsn = 0;
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during data file validation");
/* read BackupPageHeader */
read_len = fread(&header, 1, sizeof(header), in);
if (ferror(in))
elog(ERROR, "Cannot read header of block %u of \"%s\": %s",
blknum, fullpath, strerror(errno));
if (read_len != sizeof(header))
{
if (read_len == 0 && feof(in))
break; /* EOF found */
else if (read_len != 0 && feof(in))
elog(WARNING,
"Odd size page found at block %u of \"%s\"",
blknum, fullpath);
else
elog(WARNING, "Cannot read header of block %u of \"%s\": %s",
blknum, fullpath, strerror(errno));
return false;
}
COMP_FILE_CRC32(use_crc32c, crc, &header, read_len);
if (header.block == 0 && header.compressed_size == 0)
{
elog(VERBOSE, "Skip empty block of \"%s\"", fullpath);
continue;
}
if (header.block < blknum)
{
elog(WARNING, "Backup is broken at block %u of \"%s\"",
blknum, fullpath);
return false;
}
blknum = header.block;
if (header.compressed_size == PageIsTruncated)
{
elog(LOG, "Block %u of \"%s\" is truncated",
blknum, fullpath);
continue;
}
Assert(header.compressed_size <= BLCKSZ);
read_len = fread(compressed_page.data, 1,
MAXALIGN(header.compressed_size), in);
if (read_len != MAXALIGN(header.compressed_size))
{
elog(WARNING, "Cannot read block %u of \"%s\" read %zu of %d",
blknum, fullpath, read_len, header.compressed_size);
return false;
}
COMP_FILE_CRC32(use_crc32c, crc, compressed_page.data, read_len);
if (header.compressed_size != BLCKSZ
|| page_may_be_compressed(compressed_page.data, file->compress_alg,
backup_version))
{
int32 uncompressed_size = 0;
const char *errormsg = NULL;
uncompressed_size = do_decompress(page.data, BLCKSZ,
compressed_page.data,
header.compressed_size,
file->compress_alg,
&errormsg);
if (uncompressed_size < 0 && errormsg != NULL)
elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s",
blknum, fullpath, errormsg);
if (uncompressed_size != BLCKSZ)
{
if (header.compressed_size == BLCKSZ)
{
is_valid = false;
continue;
}
elog(WARNING, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ",
fullpath, uncompressed_size);
return false;
}
// rc = validate_one_page(page.data,
// file->segno * RELSEG_SIZE + blknum,
// stop_lsn, &page_lsn, NULL, checksum_version);
}
else
// rc = validate_one_page(compressed_page.data,
// file->segno * RELSEG_SIZE + blknum,
// stop_lsn, &page_lsn, NULL, checksum_version);
switch (rc)
{
case PAGE_IS_NOT_FOUND:
elog(LOG, "File \"%s\", block %u, page is NULL", file->rel_path, blknum);
break;
case PAGE_IS_ZEROED:
elog(LOG, "File: %s blknum %u, empty zeroed page", file->rel_path, blknum);
break;
case PAGE_HEADER_IS_INVALID:
elog(WARNING, "Page header is looking insane: %s, block %i", file->rel_path, blknum);
is_valid = false;
break;
case PAGE_CHECKSUM_MISMATCH:
elog(WARNING, "File: %s blknum %u have wrong checksum", file->rel_path, blknum);
is_valid = false;
break;
case PAGE_LSN_FROM_FUTURE:
elog(WARNING, "File: %s, block %u, checksum is %s. "
"Page is from future: pageLSN %X/%X stopLSN %X/%X",
file->rel_path, blknum,
checksum_version ? "correct" : "not enabled",
(uint32) (page_lsn >> 32), (uint32) page_lsn,
(uint32) (stop_lsn >> 32), (uint32) stop_lsn);
break;
}
}
FIN_FILE_CRC32(use_crc32c, crc);
fclose(in);
if (crc != file->crc)
{
elog(WARNING, "Invalid CRC of backup file \"%s\": %X. Expected %X",
fullpath, crc, file->crc);
is_valid = false;
}
return is_valid;
}
/* read local data file and construct map with block checksums */ /* read local data file and construct map with block checksums */
PageState *get_checksum_map(const char *fullpath, uint32 checksum_version, PageState*
get_checksum_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr dest_stop_lsn, BlockNumber segmentno) int n_blocks, XLogRecPtr dest_stop_lsn, BlockNumber segmentno)
{ {
PageState *checksum_map = NULL; PageState *checksum_map = NULL;
@ -2186,39 +2187,6 @@ get_lsn_map(const char *fullpath, uint32 checksum_version,
return lsn_map; return lsn_map;
} }
/* attempt to open header file, read content and return as
* array of headers.
*/
BackupPageHeader2*
get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version)
{
int len;
FILE *in = NULL;
char fullpath_hdr[MAXPGPATH];
BackupPageHeader2 *headers = NULL;
if (backup_version < 20400)
return NULL;
snprintf(fullpath_hdr, MAXPGPATH, "%s_hdr", fullpath);
in = fopen(fullpath_hdr, PG_BINARY_R);
if (!in)
elog(ERROR, "Cannot open header file \"%s\": %s", fullpath_hdr, strerror(errno));
len = file->n_headers * sizeof(BackupPageHeader2);
headers = pgut_malloc(len);
if (fread(headers, 1, len, in) != len)
elog(ERROR, "Cannot read header file \"%s\": %s", fullpath_hdr, strerror(errno));
if (fclose(in))
elog(ERROR, "Cannot close header file \"%s\": %s", fullpath_hdr, strerror(errno));
return headers;
}
/* */ /* */
bool bool
get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size, get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size,
@ -2256,8 +2224,248 @@ get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size,
*blknum = header.block; *blknum = header.block;
*compressed_size = header.compressed_size; *compressed_size = header.compressed_size;
Assert(*compressed_size <= BLCKSZ); elog(INFO, "blknum: %i", header.block);
elog(INFO, "size: %i", header.compressed_size);
elog(INFO, "size2: %i", *compressed_size);
elog(INFO, "BLKNUM: %i", *blknum);
elog(INFO, "File: %s", fullpath);
Assert(*compressed_size != 0);
return true; return true;
} }
/* Open local backup file for writing, set permissions and buffering */
FILE*
open_local_file_rw(const char *to_fullpath, char **out_buf, uint32 buf_size)
{
FILE *out = NULL;
/* open backup file for write */
out = fopen(to_fullpath, PG_BINARY_W);
if (out == NULL)
elog(ERROR, "Cannot open backup file \"%s\": %s",
to_fullpath, strerror(errno));
/* update file permission */
if (chmod(to_fullpath, FILE_PERMISSION) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno));
/* enable stdio buffering for output file */
*out_buf = pgut_malloc(buf_size);
setvbuf(out, *out_buf, _IOFBF, buf_size);
return out;
}
int
send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_fullpath,
pgFile *file, XLogRecPtr prev_backup_start_lsn, CompressAlg calg, int clevel,
uint32 checksum_version, bool use_pagemap, BackupPageHeader2 **headers,
BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema)
{
FILE *in = NULL;
FILE *out = NULL;
int hdr_num = -1;
uint cur_pos_out = 0;
char curr_page[BLCKSZ];
int n_blocks_read = 0;
BlockNumber blknum = 0;
datapagemap_iterator_t *iter = NULL;
/* stdio buffers */
char *in_buf = NULL;
char *out_buf = NULL;
/* open source file for read */
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
/*
* If file is not found, this is not en error.
* It could have been deleted by concurrent postgres transaction.
*/
if (errno == ENOENT)
return FILE_MISSING;
elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath, strerror(errno));
}
/*
* Enable stdio buffering for local input file,
* unless the pagemap is involved, which
* imply a lot of random access.
*/
if (use_pagemap)
{
iter = datapagemap_iterate(&file->pagemap);
datapagemap_next(iter, &blknum); /* set first block */
setvbuf(in, NULL, _IONBF, BUFSIZ);
}
else
{
in_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
}
while (blknum < file->n_blocks)
{
PageState page_st;
int rc = prepare_page(conn_arg, file, prev_backup_start_lsn,
blknum, in, backup_mode, curr_page,
true, checksum_version,
ptrack_version_num, ptrack_schema,
from_fullpath, &page_st);
if (rc == PageIsTruncated)
break;
else if (rc == PageIsOk)
{
/* lazily open backup file (useful for s3) */
if (!out)
out = open_local_file_rw(to_fullpath, &out_buf, STDIO_BUFSIZE);
hdr_num++;
if (!*headers)
*headers = (BackupPageHeader2 *) pgut_malloc(sizeof(BackupPageHeader2));
else
*headers = (BackupPageHeader2 *) pgut_realloc(*headers, (hdr_num+1 ) * sizeof(BackupPageHeader2));
(*headers)[hdr_num].block = blknum;
(*headers)[hdr_num].pos = cur_pos_out;
(*headers)[hdr_num].lsn = page_st.lsn;
(*headers)[hdr_num].checksum = page_st.checksum;
(*headers)[hdr_num].compressed_size = compress_and_backup_page(file, blknum, in, out, &(file->crc),
rc, curr_page, calg, clevel,
from_fullpath, to_fullpath);
cur_pos_out += MAXALIGN((*headers)[hdr_num].compressed_size);
}
n_blocks_read++;
/* next block */
if (use_pagemap)
{
/* exit if pagemap is exhausted */
if (!datapagemap_next(iter, &blknum))
break;
}
else
blknum++;
}
file->n_headers = hdr_num +1;
/* cleanup */
if (in && fclose(in))
elog(ERROR, "Cannot close the source file \"%s\": %s",
to_fullpath, strerror(errno));
/* close local output file */
if (out && fclose(out))
elog(ERROR, "Cannot close the backup file \"%s\": %s",
to_fullpath, strerror(errno));
pg_free(iter);
pg_free(in_buf);
pg_free(out_buf);
return n_blocks_read;
}
/* attempt to open header file, read content and return as
* array of headers.
*/
BackupPageHeader2*
get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version)
{
int len;
FILE *in = NULL;
pg_crc32 hdr_crc;
char fullpath_hdr[MAXPGPATH];
BackupPageHeader2 *headers = NULL;
// elog(INFO, "Backup Version: %u", backup_version);
if (backup_version < 20400)
{
elog(INFO, "HELLO1");
return NULL;
}
if (file->n_headers <= 0)
{
elog(INFO, "HELLO2");
return NULL;
}
snprintf(fullpath_hdr, MAXPGPATH, "%s_hdr", fullpath);
in = fopen(fullpath_hdr, PG_BINARY_R);
if (!in)
elog(ERROR, "Cannot open header file \"%s\": %s", fullpath_hdr, strerror(errno));
len = file->n_headers * sizeof(BackupPageHeader2);
headers = pgut_malloc(len);
if (fread(headers, 1, len, in) != len)
elog(ERROR, "Cannot read header file \"%s\": %s", fullpath_hdr, strerror(errno));
/* validate checksum */
INIT_FILE_CRC32(true, hdr_crc);
COMP_FILE_CRC32(true, hdr_crc, headers, len);
FIN_FILE_CRC32(true, hdr_crc);
if (hdr_crc != file->hdr_crc)
{
elog(ERROR, "Header file crc mismatch \"%s\", current: %u, expected: %u",
fullpath_hdr, hdr_crc, file->hdr_crc);
}
if (fclose(in))
elog(ERROR, "Cannot close header file \"%s\": %s", fullpath_hdr, strerror(errno));
return headers;
}
void
write_page_headers(BackupPageHeader2 *headers, pgFile *file, const char* to_fullpath)
{
FILE *out = NULL;
size_t hdr_size = 0;
char to_fullpath_hdr[MAXPGPATH];
if (file->n_headers <= 0)
return;
snprintf(to_fullpath_hdr, MAXPGPATH, "%s_hdr", to_fullpath);
out = fopen(to_fullpath_hdr, PG_BINARY_W);
if (out == NULL)
elog(ERROR, "Cannot open header file \"%s\": %s",
to_fullpath, strerror(errno));
/* update file permission */
if (chmod(to_fullpath_hdr, FILE_PERMISSION) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno));
hdr_size = file->n_headers * sizeof(BackupPageHeader2);
/* calculate checksums */
INIT_FILE_CRC32(true, file->hdr_crc);
COMP_FILE_CRC32(true, file->hdr_crc, headers, hdr_size);
FIN_FILE_CRC32(true, file->hdr_crc);
if (fwrite(headers, 1, hdr_size, out) != hdr_size)
elog(ERROR, "Cannot write to file \"%s\": %s", to_fullpath_hdr, strerror(errno));
if (fclose(out))
elog(ERROR, "Cannot close file \"%s\": %s", to_fullpath_hdr, strerror(errno));
}

View File

@ -1550,7 +1550,8 @@ dir_read_file_list(const char *root, const char *external_prefix,
segno, segno,
n_blocks, n_blocks,
n_headers, n_headers,
dbOid; /* used for partial restore */ dbOid, /* used for partial restore */
hdr_crc;
pgFile *file; pgFile *file;
COMP_FILE_CRC32(true, content_crc, buf, strlen(buf)); COMP_FILE_CRC32(true, content_crc, buf, strlen(buf));
@ -1594,6 +1595,9 @@ dir_read_file_list(const char *root, const char *external_prefix,
if (get_control_value(buf, "n_headers", NULL, &n_headers, false)) if (get_control_value(buf, "n_headers", NULL, &n_headers, false))
file->n_headers = (int) n_headers; file->n_headers = (int) n_headers;
if (get_control_value(buf, "hdr_crc", NULL, &hdr_crc, false))
file->hdr_crc = (pg_crc32) hdr_crc;
parray_append(files, file); parray_append(files, file);
} }

View File

@ -1020,6 +1020,9 @@ merge_files(void *arg)
tmp_file->n_blocks = file->n_blocks; tmp_file->n_blocks = file->n_blocks;
tmp_file->compress_alg = file->compress_alg; tmp_file->compress_alg = file->compress_alg;
tmp_file->uncompressed_size = file->n_blocks * BLCKSZ; tmp_file->uncompressed_size = file->n_blocks * BLCKSZ;
tmp_file->n_headers = file->n_headers;
tmp_file->hdr_crc = file->hdr_crc;
} }
else else
tmp_file->uncompressed_size = tmp_file->write_size; tmp_file->uncompressed_size = tmp_file->write_size;
@ -1140,8 +1143,11 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup,
FILE *out = NULL; FILE *out = NULL;
char *buffer = pgut_malloc(STDIO_BUFSIZE); char *buffer = pgut_malloc(STDIO_BUFSIZE);
char to_fullpath[MAXPGPATH]; char to_fullpath[MAXPGPATH];
char to_fullpath_hdr[MAXPGPATH];
char to_fullpath_tmp1[MAXPGPATH]; /* used for restore */ char to_fullpath_tmp1[MAXPGPATH]; /* used for restore */
char to_fullpath_tmp2[MAXPGPATH]; /* used for backup */ char to_fullpath_tmp2[MAXPGPATH]; /* used for backup */
char to_fullpath_tmp2_hdr[MAXPGPATH];
/* The next possible optimization is copying "as is" the file /* The next possible optimization is copying "as is" the file
* from intermediate incremental backup, that didn`t changed in * from intermediate incremental backup, that didn`t changed in
@ -1152,6 +1158,9 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup,
join_path_components(to_fullpath, full_database_dir, tmp_file->rel_path); join_path_components(to_fullpath, full_database_dir, tmp_file->rel_path);
snprintf(to_fullpath_tmp1, MAXPGPATH, "%s_tmp1", to_fullpath); snprintf(to_fullpath_tmp1, MAXPGPATH, "%s_tmp1", to_fullpath);
snprintf(to_fullpath_tmp2, MAXPGPATH, "%s_tmp2", to_fullpath); snprintf(to_fullpath_tmp2, MAXPGPATH, "%s_tmp2", to_fullpath);
/* header files */
snprintf(to_fullpath_hdr, MAXPGPATH, "%s_hdr", to_fullpath);
snprintf(to_fullpath_tmp2_hdr, MAXPGPATH, "%s_hdr", to_fullpath_tmp2);
/* open temp file */ /* open temp file */
out = fopen(to_fullpath_tmp1, PG_BINARY_W); out = fopen(to_fullpath_tmp1, PG_BINARY_W);
@ -1177,7 +1186,7 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup,
* 2 backups of old versions, were n_blocks is missing. * 2 backups of old versions, were n_blocks is missing.
*/ */
backup_data_file(NULL, tmp_file, to_fullpath_tmp1, to_fullpath_tmp2, backup_data_file_new(NULL, tmp_file, to_fullpath_tmp1, to_fullpath_tmp2,
InvalidXLogRecPtr, BACKUP_MODE_FULL, InvalidXLogRecPtr, BACKUP_MODE_FULL,
dest_backup->compress_alg, dest_backup->compress_level, dest_backup->compress_alg, dest_backup->compress_level,
dest_backup->checksum_version, 0, NULL, false); dest_backup->checksum_version, 0, NULL, false);
@ -1207,11 +1216,26 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup,
elog(ERROR, "Cannot sync merge temp file \"%s\": %s", elog(ERROR, "Cannot sync merge temp file \"%s\": %s",
to_fullpath_tmp2, strerror(errno)); to_fullpath_tmp2, strerror(errno));
/* sync header file */
if (fio_sync(to_fullpath_tmp2, FIO_BACKUP_HOST) != 0)
elog(ERROR, "Cannot sync temp header file \"%s\": %s",
to_fullpath_tmp2_hdr, strerror(errno));
//<- CRITICAL SECTION
/* Do atomic rename from second temp file to destination file */ /* Do atomic rename from second temp file to destination file */
if (rename(to_fullpath_tmp2, to_fullpath) == -1) if (rename(to_fullpath_tmp2, to_fullpath) == -1)
elog(ERROR, "Could not rename file \"%s\" to \"%s\": %s", elog(ERROR, "Could not rename file \"%s\" to \"%s\": %s",
to_fullpath_tmp2, to_fullpath, strerror(errno)); to_fullpath_tmp2, to_fullpath, strerror(errno));
//<- If we crash here, merge cannot be continued.
/* Do atomic rename from header file */
if (rename(to_fullpath_tmp2_hdr, to_fullpath_hdr) == -1)
elog(ERROR, "Could not rename file \"%s\" to \"%s\": %s",
to_fullpath_tmp2, to_fullpath, strerror(errno));
//<-
/* drop temp file */ /* drop temp file */
unlink(to_fullpath_tmp1); unlink(to_fullpath_tmp1);
} }

View File

@ -192,6 +192,7 @@ typedef struct pgFile
*/ */
/* we need int64 here to store '-1' value */ /* we need int64 here to store '-1' value */
pg_crc32 crc; /* CRC value of the file, regular file only */ pg_crc32 crc; /* CRC value of the file, regular file only */
pg_crc32 hdr_crc; /* CRC value of header file: name_hdr */
char *rel_path; /* relative path of the file */ char *rel_path; /* relative path of the file */
char *linked; /* path of the linked file */ char *linked; /* path of the linked file */
bool is_datafile; /* true if the file is PostgreSQL data file */ bool is_datafile; /* true if the file is PostgreSQL data file */
@ -585,7 +586,7 @@ typedef struct BackupPageHeader
typedef struct BackupPageHeader2 typedef struct BackupPageHeader2
{ {
int32 block; /* block number */ int32 block; /* block number */
int32 pos; int32 pos; /* position in backup file */
int32 compressed_size; int32 compressed_size;
XLogRecPtr lsn; XLogRecPtr lsn;
uint16 checksum; uint16 checksum;
@ -963,6 +964,11 @@ extern void backup_data_file(ConnectionArgs* conn_arg, pgFile *file,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode, XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version, CompressAlg calg, int clevel, uint32 checksum_version,
int ptrack_version_num, const char *ptrack_schema, bool missing_ok); int ptrack_version_num, const char *ptrack_schema, bool missing_ok);
extern void backup_data_file_new(ConnectionArgs* conn_arg, pgFile *file,
const char *from_fullpath, const char *to_fullpath,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version,
int ptrack_version_num, const char *ptrack_schema, bool missing_ok);
extern void backup_non_data_file(pgFile *file, pgFile *prev_file, extern void backup_non_data_file(pgFile *file, pgFile *prev_file,
const char *from_fullpath, const char *to_fullpath, const char *from_fullpath, const char *to_fullpath,
BackupMode backup_mode, time_t parent_backup_time, BackupMode backup_mode, time_t parent_backup_time,
@ -993,8 +999,8 @@ extern datapagemap_t *get_lsn_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr shift_lsn, BlockNumber segmentno); int n_blocks, XLogRecPtr shift_lsn, BlockNumber segmentno);
extern pid_t check_postmaster(const char *pgdata); extern pid_t check_postmaster(const char *pgdata);
extern bool check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, extern bool validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
uint32 checksum_version, uint32 backup_version); uint32 checksum_version, uint32 backup_version);
/* parsexlog.c */ /* parsexlog.c */
extern bool extractPageMap(const char *archivedir, uint32 wal_seg_size, extern bool extractPageMap(const char *archivedir, uint32 wal_seg_size,
XLogRecPtr startpoint, TimeLineID start_tli, XLogRecPtr startpoint, TimeLineID start_tli,
@ -1076,8 +1082,16 @@ extern XLogRecPtr get_last_ptrack_lsn(PGconn *backup_conn, PGNodeInfo *nodeInfo)
extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema, extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema,
int ptrack_version_num, XLogRecPtr lsn); int ptrack_version_num, XLogRecPtr lsn);
/* open local file to writing */
extern FILE* open_local_file_rw(const char *to_fullpath, char **out_buf, uint32 buf_size);
extern int send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_fullpath,
pgFile *file, XLogRecPtr prev_backup_start_lsn, CompressAlg calg, int clevel,
uint32 checksum_version, bool use_pagemap, BackupPageHeader2 **headers,
BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema);
/* FIO */ /* FIO */
extern int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn, extern int fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn,
int calg, int clevel, uint32 checksum_version, datapagemap_t *pagemap, int calg, int clevel, uint32 checksum_version, datapagemap_t *pagemap,
BlockNumber* err_blknum, char **errormsg, BackupPageHeader2 **headers); BlockNumber* err_blknum, char **errormsg, BackupPageHeader2 **headers);
/* return codes for fio_send_pages */ /* return codes for fio_send_pages */

View File

@ -1405,11 +1405,13 @@ static void fio_load_file(int out, char const* path)
* In case of DELTA mode horizonLsn must be a valid lsn, * In case of DELTA mode horizonLsn must be a valid lsn,
* otherwise it should be set to InvalidXLogRecPtr. * otherwise it should be set to InvalidXLogRecPtr.
*/ */
int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn, int fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn,
int calg, int clevel, uint32 checksum_version, int calg, int clevel, uint32 checksum_version,
datapagemap_t *pagemap, BlockNumber* err_blknum, datapagemap_t *pagemap, BlockNumber* err_blknum,
char **errormsg, BackupPageHeader2 **headers) char **errormsg, BackupPageHeader2 **headers)
{ {
FILE *out = NULL;
char *out_buf = NULL;
struct { struct {
fio_header hdr; fio_header hdr;
fio_send_request arg; fio_send_request arg;
@ -1532,6 +1534,10 @@ int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPt
COMP_FILE_CRC32(true, file->crc, buf, hdr.size); COMP_FILE_CRC32(true, file->crc, buf, hdr.size);
/* lazily open backup file */
if (!out)
out = open_local_file_rw(to_fullpath, &out_buf, STDIO_BUFSIZE);
if (fio_fwrite(out, buf, hdr.size) != hdr.size) if (fio_fwrite(out, buf, hdr.size) != hdr.size)
{ {
fio_fclose(out); fio_fclose(out);
@ -1545,6 +1551,10 @@ int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPt
elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop); elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop);
} }
if (out)
fclose(out);
pg_free(out_buf);
return n_blocks_read; return n_blocks_read;
} }

View File

@ -363,7 +363,7 @@ pgBackupValidateFiles(void *arg)
* check page headers, checksums (if enabled) * check page headers, checksums (if enabled)
* and compute checksum of the file * and compute checksum of the file
*/ */
if (!check_file_pages_new(file, file_fullpath, arguments->stop_lsn, if (!validate_file_pages(file, file_fullpath, arguments->stop_lsn,
arguments->checksum_version, arguments->checksum_version,
arguments->backup_version)) arguments->backup_version))
arguments->corrupted = true; arguments->corrupted = true;