1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-25 11:53:32 +02:00

[Issue #66] Add lsn-based incremental restore

This commit is contained in:
Grigory Smolkin 2020-05-30 07:15:11 +03:00
parent 87dc977baa
commit 45995f0b0d
8 changed files with 458 additions and 114 deletions

View File

@ -862,14 +862,19 @@ backup_non_data_file(pgFile *file, pgFile *prev_file,
*/
size_t
restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
const char *to_fullpath, bool use_bitmap, uint16 *checksum_map)
const char *to_fullpath, bool use_bitmap, PageState *checksum_map,
XLogRecPtr horizonLsn, datapagemap_t *lsn_map)
{
size_t total_write_len = 0;
char *in_buf = pgut_malloc(STDIO_BUFSIZE);
int backup_seq = 0;
// FULL -> INCR -> DEST
// 2 1 0
/*
* FULL -> INCR -> DEST
* 2 1 0
* Restore of backups of older versions cannot be optimized with bitmap
* because of n_blocks
*/
if (use_bitmap)
/* start with dest backup */
backup_seq = 0;
@ -942,7 +947,8 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
parse_program_version(backup->program_version),
from_fullpath, to_fullpath, dest_file->n_blocks,
use_bitmap ? &(dest_file)->pagemap : NULL,
checksum_map, backup->checksum_version);
checksum_map, backup->checksum_version,
backup->start_lsn <= horizonLsn ? lsn_map : NULL);
if (fclose(in) != 0)
elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath,
@ -963,7 +969,8 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
size_t
restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_version,
const char *from_fullpath, const char *to_fullpath, int nblocks,
datapagemap_t *map, uint16 *checksum_map, int checksum_version)
datapagemap_t *map, PageState *checksum_map, int checksum_version,
datapagemap_t *lsn_map)
{
BackupPageHeader header;
BlockNumber blknum = 0;
@ -1071,6 +1078,9 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
if (compressed_size > BLCKSZ)
elog(ERROR, "Size of a blknum %i exceed BLCKSZ", blknum);
if (lsn_map && datapagemap_is_set(lsn_map, blknum))
datapagemap_add(map, blknum);
/* if this page is marked as already restored, then skip it */
if (map && datapagemap_is_set(map, blknum))
{
@ -1101,10 +1111,14 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
is_compressed = true;
}
/* Incremental restore */
if (checksum_map && checksum_map[blknum] != 0)
/* Incremental restore
* TODO: move to another function
*/
if (checksum_map && checksum_map[blknum].checksum != 0)
{
uint16 page_crc = 0;
XLogRecPtr page_lsn = InvalidXLogRecPtr;
PageHeader phdr;
if (is_compressed)
{
@ -1117,8 +1131,7 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
else
page_crc = pg_checksum_page(uncompressed_buf, file->segno * RELSEG_SIZE + blknum);
// page_crc_1 = pg_checksum_page(uncompressed_buf, file->segno * RELSEG_SIZE + blknum);
// Assert(page_crc == page_crc_1);
phdr = (PageHeader) uncompressed_buf;
}
else
{
@ -1127,10 +1140,19 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
page_crc = ((PageHeader) page.data)->pd_checksum;
else
page_crc = pg_checksum_page(page.data, file->segno + blknum);
phdr = (PageHeader) page.data;
}
/* the heart of incremental restore */
if (page_crc == checksum_map[blknum])
page_lsn = PageXLogRecPtrGet(phdr->pd_lsn);
/*
* The heart of incremental restore
* If page in backup has the same checksum and lsn as
* page in backup, then page can be skipped.
*/
if (page_crc == checksum_map[blknum].checksum &&
page_lsn == checksum_map[blknum].lsn)
{
if (map)
datapagemap_add(map, blknum);
@ -1812,10 +1834,10 @@ check_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
}
/* read local data file and construct map with block checksums */
uint16 *get_checksum_map(const char *fullpath, uint32 checksum_version,
PageState *get_checksum_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr dest_stop_lsn, BlockNumber segmentno)
{
uint16 *checksum_map = NULL;
PageState *checksum_map = NULL;
FILE *in = NULL;
BlockNumber blknum = 0;
XLogRecPtr page_lsn = 0;
@ -1834,8 +1856,8 @@ uint16 *get_checksum_map(const char *fullpath, uint32 checksum_version,
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
/* initialize array of checksums */
checksum_map = pgut_malloc(n_blocks * sizeof(uint16));
memset(checksum_map, 0, n_blocks * sizeof(uint16));
checksum_map = pgut_malloc(n_blocks * sizeof(PageState));
memset(checksum_map, 0, n_blocks * sizeof(PageState));
for (blknum = 0; blknum < n_blocks; blknum++)
{
@ -1855,9 +1877,11 @@ uint16 *get_checksum_map(const char *fullpath, uint32 checksum_version,
if (rc == PAGE_IS_VALID)
{
if (checksum_version)
checksum_map[blknum] = ((PageHeader) read_buffer)->pd_checksum;
checksum_map[blknum].checksum = ((PageHeader) read_buffer)->pd_checksum;
else
checksum_map[blknum] = pg_checksum_page(read_buffer, segmentno + blknum);
checksum_map[blknum].checksum = pg_checksum_page(read_buffer, segmentno + blknum);
checksum_map[blknum].lsn = page_lsn;
}
}
else
@ -1875,3 +1899,71 @@ uint16 *get_checksum_map(const char *fullpath, uint32 checksum_version,
return checksum_map;
}
/* return bitmap of valid blocks, bitmap is empty, then NULL is returned */
datapagemap_t *
get_lsn_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr horizonLsn, BlockNumber segmentno)
{
FILE *in = NULL;
BlockNumber blknum = 0;
XLogRecPtr page_lsn = 0;
char read_buffer[BLCKSZ];
char in_buf[STDIO_BUFSIZE];
datapagemap_t *lsn_map = NULL;
/* truncate up to blocks */
if (truncate(fullpath, n_blocks * BLCKSZ) != 0)
elog(ERROR, "Cannot truncate file to blknum %u \"%s\": %s",
n_blocks, fullpath, strerror(errno));
Assert(horizonLsn > 0);
/* open file */
in = fopen(fullpath, PG_BINARY_R);
if (!in)
elog(ERROR, "Cannot open source file \"%s\": %s", fullpath, strerror(errno));
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
lsn_map = pgut_malloc(sizeof(datapagemap_t));
memset(lsn_map, 0, sizeof(datapagemap_t));
for (blknum = 0; blknum < n_blocks; blknum++)
{
size_t read_len = fread(read_buffer, 1, BLCKSZ, in);
page_lsn = InvalidXLogRecPtr;
/* report error */
if (ferror(in))
elog(ERROR, "Cannot read block %u of \"%s\": %s",
blknum, fullpath, strerror(errno));
if (read_len == BLCKSZ)
{
int rc = validate_one_page(read_buffer, segmentno + blknum,
horizonLsn, &page_lsn, checksum_version);
if (rc == PAGE_IS_VALID)
datapagemap_add(lsn_map, blknum);
}
else
elog(ERROR, "Failed to read blknum %u from file \"%s\"", blknum, fullpath);
if (feof(in))
break;
if (interrupted)
elog(ERROR, "Interrupted during page reading");
}
if (in)
fclose(in);
if (lsn_map->bitmapsize == 0)
{
pg_free(lsn_map);
lsn_map = NULL;
}
return lsn_map;
}

View File

@ -1161,8 +1161,12 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup,
setvbuf(out, buffer, _IOFBF, STDIO_BUFSIZE);
/* restore file into temp file */
tmp_file->size = restore_data_file(parent_chain, dest_file, out, to_fullpath_tmp1, use_bitmap, NULL);
fclose(out);
tmp_file->size = restore_data_file(parent_chain, dest_file, out, to_fullpath_tmp1,
use_bitmap, NULL, InvalidXLogRecPtr, NULL);
if (fclose(out) != 0)
elog(ERROR, "Cannot close file \"%s\": %s",
to_fullpath_tmp1, strerror(errno));
pg_free(buffer);
/* tmp_file->size is greedy, even if there is single 8KB block in file,

View File

@ -98,7 +98,8 @@ static pgRestoreParams *restore_params = NULL;
time_t current_time = 0;
bool restore_as_replica = false;
bool no_validate = false;
bool incremental_restore = false;
bool incremental = false;
bool incremental_lsn = false;
bool skip_block_validation = false;
bool skip_external_dirs = false;
@ -204,7 +205,8 @@ static ConfigOption cmd_options[] =
{ 'b', 'R', "restore-as-replica", &restore_as_replica, SOURCE_CMD_STRICT },
{ 's', 160, "primary-conninfo", &primary_conninfo, SOURCE_CMD_STRICT },
{ 's', 'S', "primary-slot-name",&replication_slot, SOURCE_CMD_STRICT },
{ 'b', 161, "incremental", &incremental_restore, SOURCE_CMD_STRICT },
{ 'b', 161, "incremental", &incremental, SOURCE_CMD_STRICT },
{ 'b', 167, "incremental-lsn", &incremental_lsn, SOURCE_CMD_STRICT },
/* checkdb options */
{ 'b', 195, "amcheck", &need_amcheck, SOURCE_CMD_STRICT },
{ 'b', 196, "heapallindexed", &heapallindexed, SOURCE_CMD_STRICT },
@ -702,6 +704,9 @@ main(int argc, char *argv[])
if (replication_slot != NULL)
restore_as_replica = true;
if (!incremental && incremental_lsn)
incremental = true;
/* keep all params in one structure */
restore_params = pgut_new(pgRestoreParams);
restore_params->is_restore = (backup_subcmd == RESTORE_CMD);
@ -714,7 +719,8 @@ main(int argc, char *argv[])
restore_params->partial_db_list = NULL;
restore_params->partial_restore_type = NONE;
restore_params->primary_conninfo = primary_conninfo;
restore_params->incremental = incremental_restore;
restore_params->incremental = incremental;
restore_params->incremental_lsn = incremental_lsn;
/* handle partial restore parameters */
if (datname_exclude_list && datname_include_list)

View File

@ -101,6 +101,18 @@ extern const char *PROGRAM_EMAIL;
#define XRecOffIsNull(xlrp) \
((xlrp) % XLOG_BLCKSZ == 0)
typedef struct RedoParams
{
TimeLineID tli;
XLogRecPtr lsn;
} RedoParams;
typedef struct PageState
{
uint16 checksum;
XLogRecPtr lsn;
} PageState;
typedef struct db_map_entry
{
Oid dbOid;
@ -442,6 +454,8 @@ typedef struct pgRestoreParams
bool skip_external_dirs;
bool skip_block_validation; //Start using it
bool incremental;
bool incremental_lsn;
XLogRecPtr horizonLsn;
const char *restore_command;
const char *primary_slot_name;
@ -938,10 +952,12 @@ extern void backup_non_data_file_internal(const char *from_fullpath,
bool missing_ok);
extern size_t restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
const char *to_fullpath, bool use_bitmap, uint16 *checksum_map);
const char *to_fullpath, bool use_bitmap, PageState *checksum_map,
XLogRecPtr horizonLsn, datapagemap_t *lsn_map);
extern size_t restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_version,
const char *from_fullpath, const char *to_fullpath, int nblocks,
datapagemap_t *map, uint16 *checksum_map, int checksum_version);
datapagemap_t *map, PageState *checksum_map, int checksum_version,
datapagemap_t *lsn_map);
extern size_t restore_non_data_file(parray *parent_chain, pgBackup *dest_backup,
pgFile *dest_file, FILE *out, const char *to_fullpath,
bool already_exists);
@ -950,8 +966,10 @@ extern void restore_non_data_file_internal(FILE *in, FILE *out, pgFile *file,
extern bool create_empty_file(fio_location from_location, const char *to_root,
fio_location to_location, pgFile *file);
extern uint16 *get_checksum_map(const char *fullpath, uint32 checksum_version,
extern PageState *get_checksum_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr dest_stop_lsn, BlockNumber segmentno);
extern datapagemap_t *get_lsn_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr horizonLsn, BlockNumber segmentno);
extern pid_t check_postmaster(const char *pgdata);
extern bool check_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
@ -989,6 +1007,7 @@ extern uint64 get_remote_system_identifier(PGconn *conn);
extern uint32 get_data_checksum_version(bool safe);
extern pg_crc32c get_pgcontrol_checksum(const char *pgdata_path);
extern uint32 get_xlog_seg_size(char *pgdata_path);
extern void get_redo(const char *pgdata_path, RedoParams *redo);
extern void set_min_recovery_point(pgFile *file, const char *backup_path,
XLogRecPtr stop_backup_lsn);
extern void copy_pgcontrol_file(const char *from_fullpath, fio_location from_location,
@ -1048,9 +1067,13 @@ extern void fio_list_dir(parray *files, const char *root, bool exclude, bool fol
extern bool pgut_rmtree(const char *path, bool rmtopdir, bool strict);
extern uint16 *fio_get_checksum_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr dest_stop_lsn, BlockNumber segmentno);
extern pid_t fio_check_postmaster(const char *pgdata);
extern PageState *fio_get_checksum_map(const char *fullpath, uint32 checksum_version, int n_blocks,
XLogRecPtr dest_stop_lsn, BlockNumber segmentno, fio_location location);
extern datapagemap_t *fio_get_lsn_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr horizonLsn, BlockNumber segmentno,
fio_location location);
extern pid_t fio_check_postmaster(const char *pgdata, fio_location location);
extern int32 fio_decompress(void* dst, void const* src, size_t size, int compress_alg);

View File

@ -30,6 +30,8 @@ typedef struct
size_t restored_bytes;
bool use_bitmap;
bool incremental;
bool incremental_lsn;
XLogRecPtr horizonLsn;
/*
* Return value from the thread.
@ -50,7 +52,7 @@ static void restore_chain(pgBackup *dest_backup, parray *parent_chain,
parray *dbOid_exclude_list, pgRestoreParams *params,
const char *pgdata_path, bool no_sync);
static void check_incremental_compatibility(const char *pgdata, uint64 system_identifier,
bool lsn_based);
bool incremental_lsn);
/*
* Iterate over backup list to find all ancestors of the broken parent_backup
@ -115,6 +117,7 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
parray *dbOid_exclude_list = NULL;
bool pgdata_is_empty = true;
bool tblspaces_are_empty = true;
XLogRecPtr horizonLsn = InvalidXLogRecPtr;
if (params->is_restore)
{
@ -124,8 +127,7 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
/* Check if restore destination empty */
if (!dir_is_empty(instance_config.pgdata, FIO_DB_HOST))
{
// TODO: check that remote systemd id is the same as ours.
// TODO: check that remote system is NOT running, check pg_control and pid.
/* Check that remote system is NOT running and systemd id is the same as ours */
if (params->incremental)
{
elog(INFO, "Running incremental restore into nonempty directory: \"%s\"",
@ -133,14 +135,14 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
check_incremental_compatibility(instance_config.pgdata,
instance_config.system_identifier,
false);
params->incremental_lsn);
}
else
elog(ERROR, "Restore destination is not empty: \"%s\"",
instance_config.pgdata);
/* if remote directory is empty then incremental restore may be disabled */
pgdata_is_empty = true;
/* if destination directory is empty, then incremental restore may be disabled */
pgdata_is_empty = false;
}
}
@ -366,13 +368,72 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
*/
tmp_backup = dest_backup;
while(tmp_backup->parent_backup_link)
while (tmp_backup)
{
parray_append(parent_chain, tmp_backup);
tmp_backup = tmp_backup->parent_backup_link;
}
parray_append(parent_chain, base_full_backup);
/*
* Determine horizon LSN
* Consider the following example:
*
*
* /----D----------F->
* -A--B---C---*-------X----->
*
* [A,F] - incremental chain
* X - the state of pgdata
* F - destination backup
*
* When running incremental-lsn restore, we get a bitmap of pages,
* whose LSN is less than C.
* So when restoring file, we can skip restore of pages coming from
* A, B and C.
* Pages from D and F cannot be skipped due to incremental restore.
*
*/
if (params->is_restore && params->incremental && params->incremental_lsn)
{
RedoParams redo;
get_redo(instance_config.pgdata, &redo);
tmp_backup = dest_backup;
while (tmp_backup)
{
if (tmp_backup->start_lsn < redo.lsn &&
redo.tli == tmp_backup->tli)
{
horizonLsn = tmp_backup->start_lsn;
break;
}
if (!tmp_backup->parent_backup_link)
break;
tmp_backup = tmp_backup->parent_backup_link;
}
if (XLogRecPtrIsInvalid(horizonLsn))
elog(ERROR, "Cannot perform lsn-based incremental restore of backup chain %s, "
"because destination directory redo point %X/%X on tli %i is less than "
"START LSN %X/%X of oldest backup in chain",
base36enc(dest_backup->start_time),
(uint32) (redo.lsn >> 32), (uint32) redo.lsn, redo.tli,
(uint32) (tmp_backup->start_lsn >> 32), (uint32) tmp_backup->start_lsn);
else
elog(INFO, "Destination directory redo point %X/%X on tli %i is within reach of "
"backup %s with START LSN %X/%X, lsn-based incremental restore is possible",
(uint32) (redo.lsn >> 32), (uint32) redo.lsn, redo.tli,
base36enc(tmp_backup->start_time),
(uint32) (tmp_backup->start_lsn >> 32), (uint32) tmp_backup->start_lsn);
elog(INFO, "Horizon LSN: %X/%X",
(uint32) (horizonLsn >> 32), (uint32) horizonLsn);
params->horizonLsn = horizonLsn;
}
/* for validation or restore with enabled validation */
if (!params->is_restore || !params->no_validate)
@ -601,10 +662,16 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
elog(ERROR, "incremental restore is not possible for backups older than 2.3.0 version");
}
/* There is no point in bitmap restore, when restoring a single FULL backup */
// TODO: if there is lsn-based incremental restore, then bitmap is mandatory
if (parray_num(parent_chain) == 1)
use_bitmap = false;
/* There is no point in bitmap restore, when restoring a single FULL backup,
* unless we are running incremental-lsn restore, then bitmap is mandatory.
*/
if (use_bitmap && parray_num(parent_chain) == 1)
{
if (params->incremental && params->incremental_lsn)
use_bitmap = true;
else
use_bitmap = false;
}
/*
* Restore dest_backup internal directories.
@ -661,7 +728,7 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
pg_atomic_clear_flag(&file->lock);
}
// Get list of files in destination directory and remove redundant files.
/* Get list of files in destination directory and remove redundant files */
if (params->incremental)
{
pgdata_files = parray_new();
@ -698,7 +765,7 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
join_path_components(full_file_path, pgdata_path, file->rel_path);
fio_pgFileDelete(file, full_file_path);
elog(WARNING, "Deleted remote file \"%s\"", full_file_path);
elog(WARNING, "Deleted file \"%s\"", full_file_path);
}
}
@ -706,10 +773,8 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
pretty_time_interval(difftime(end_time, start_time),
pretty_time, lengthof(pretty_time));
/* At this point PDATA do not contain files, that do not exists in dest backup file list */
elog(INFO, "Redundant files are removed, time elapsed: %s", pretty_time);
// use_bitmap = true;
/* At this point PDATA do not contain files, that also do not exists in backup filelist */
}
/*
@ -746,6 +811,8 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
arg->to_root = pgdata_path;
arg->use_bitmap = use_bitmap;
arg->incremental = params->incremental;
arg->incremental_lsn = params->incremental_lsn;
arg->horizonLsn = params->horizonLsn;
threads_args[i].restored_bytes = 0;
/* By default there are some error */
threads_args[i].ret = 1;
@ -776,7 +843,7 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
elog(INFO, "Backup files are restored. Transfered bytes: %s, time elapsed: %s",
pretty_total_bytes, pretty_time);
elog(INFO, "Restore overwriting ratio (less is better): %.f%% (%s/%s)",
elog(INFO, "Restore incremental ratio (less is better): %.f%% (%s/%s)",
((float) total_bytes / dest_bytes) * 100,
pretty_total_bytes, pretty_dest_bytes);
}
@ -871,7 +938,8 @@ restore_files(void *arg)
for (i = 0; i < parray_num(arguments->dest_files); i++)
{
bool already_exists = false;
uint16 *checksum_map = NULL; /* it should take 512kB at most */
PageState *checksum_map = NULL; /* it should take ~1.5MB at most */
datapagemap_t *lsn_map = NULL; /* it should take 16kB at most */
pgFile *dest_file = (pgFile *) parray_get(arguments->dest_files, i);
/* Directories were created before */
@ -955,16 +1023,19 @@ restore_files(void *arg)
dest_file->is_datafile && !dest_file->is_cfs &&
dest_file->n_blocks > 0)
{
/* remote mode */
if (fio_is_remote(FIO_DB_HOST))
if (arguments->incremental_lsn)
{
/* TODO: return lsn_map */
lsn_map = fio_get_lsn_map(to_fullpath, arguments->dest_backup->checksum_version,
dest_file->n_blocks, arguments->horizonLsn,
dest_file->segno * RELSEG_SIZE, FIO_DB_HOST);
}
else
{
checksum_map = fio_get_checksum_map(to_fullpath, arguments->dest_backup->checksum_version,
dest_file->n_blocks, arguments->dest_backup->stop_lsn,
dest_file->segno * RELSEG_SIZE);
/* local mode */
else
checksum_map = get_checksum_map(to_fullpath, arguments->dest_backup->checksum_version,
dest_file->n_blocks, arguments->dest_backup->stop_lsn,
dest_file->segno * RELSEG_SIZE);
dest_file->segno * RELSEG_SIZE, FIO_DB_HOST);
}
}
/* open destination file */
@ -999,7 +1070,8 @@ restore_files(void *arg)
/* Destination file is data file */
arguments->restored_bytes += restore_data_file(arguments->parent_chain,
dest_file, out, to_fullpath,
arguments->use_bitmap, checksum_map);
arguments->use_bitmap, checksum_map,
arguments->horizonLsn, lsn_map);
}
else
{
@ -1021,6 +1093,8 @@ done:
elog(ERROR, "Cannot close file \"%s\": %s", to_fullpath,
strerror(errno));
if (lsn_map)
pg_free(lsn_map->bitmap);
pg_free(checksum_map);
}
@ -1765,9 +1839,11 @@ get_dbOid_exclude_list(pgBackup *backup, parray *datname_list,
return dbOid_exclude_list;
}
/* check that instance has the same SYSTEM_ID, */
/* Check that instance is suitable for incremental restore
* Depending on type of incremental restore requirements are differs.
*/
void
check_incremental_compatibility(const char *pgdata, uint64 system_identifier, bool lsn_based)
check_incremental_compatibility(const char *pgdata, uint64 system_identifier, bool incremental_lsn)
{
uint64 system_id_pgdata;
bool success = true;
@ -1783,7 +1859,6 @@ check_incremental_compatibility(const char *pgdata, uint64 system_identifier, bo
* data files content, because based on pg_control information we will
* choose a backup suitable for lsn based incremental restore.
*/
/* TODO: handle timeline discrepancies */
system_id_pgdata = get_system_identifier(pgdata);
@ -1796,10 +1871,7 @@ check_incremental_compatibility(const char *pgdata, uint64 system_identifier, bo
}
/* check postmaster pid */
if (fio_is_remote(FIO_DB_HOST))
pid = fio_check_postmaster(pgdata);
else
pid = check_postmaster(pgdata);
pid = fio_check_postmaster(pgdata, FIO_DB_HOST);
if (pid == 1) /* postmaster.pid is mangled */
{
@ -1810,22 +1882,27 @@ check_incremental_compatibility(const char *pgdata, uint64 system_identifier, bo
pid_file);
success = false;
}
else if (pid > 1)
else if (pid > 1) /* postmaster is up */
{
elog(WARNING, "Postmaster with pid %u is running in destination directory \"%s\"",
pid, pgdata);
success = false;
}
if (lsn_based)
/*
* TODO: maybe there should be some other signs, pointing to pg_control
* desynchronization with cluster state.
*/
if (incremental_lsn)
{
snprintf(backup_label, MAXPGPATH, "%s/backup_label", pgdata);
if (fio_access(backup_label, F_OK, FIO_DB_HOST) == 0)
{
elog(WARNING, "Destination directory contains \"backup_control\" file. "
"It does not mean that you should delete this file, only that "
"lsn-based incremental restore is dangerous to use in this case. "
"Consider to use checksum-based incremental restore");
"This does NOT mean that you should delete this file and retry, only that "
"lsn-based incremental restore can corrupt data, when applied to instance "
"with pg_control not synchronized with cluster state."
"Consider to use checksum-based incremental restore.");
success = false;
}
}

View File

@ -351,6 +351,37 @@ get_pgcontrol_checksum(const char *pgdata_path)
return ControlFile.crc;
}
void
get_redo(const char *pgdata_path, RedoParams *redo)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
/* First fetch file... */
buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, FIO_DB_HOST);
digestControlFile(&ControlFile, buffer, size);
pg_free(buffer);
redo->lsn = ControlFile.checkPointCopy.redo;
redo->tli = ControlFile.checkPointCopy.ThisTimeLineID;
if (ControlFile.minRecoveryPoint > 0 &&
ControlFile.minRecoveryPoint < redo->lsn)
{
redo->lsn = ControlFile.minRecoveryPoint;
redo->tli = ControlFile.minRecoveryPointTLI;
}
if (ControlFile.backupStartPoint > 0 &&
ControlFile.backupStartPoint < redo->lsn)
{
redo->lsn = ControlFile.backupStartPoint;
redo->tli = ControlFile.checkPointCopy.ThisTimeLineID;
}
}
/*
* Rewrite minRecoveryPoint of pg_control in backup directory. minRecoveryPoint
* 'as-is' is not to be trusted.
@ -551,8 +582,9 @@ datapagemap_print_debug(datapagemap_t *map)
}
/*
* return pid of postmaster process running in given pgdata.
* return 0 if there is none.
* Return pid of postmaster process running in given pgdata.
* Return 0 if there is none.
* Return 1 if postmaster.pid is mangled.
*/
pid_t
check_postmaster(const char *pgdata)

View File

@ -72,6 +72,14 @@ typedef struct
uint32 checksumVersion;
} fio_checksum_map_request;
typedef struct
{
BlockNumber n_blocks;
BlockNumber segmentno;
XLogRecPtr horizonLsn;
uint32 checksumVersion;
} fio_lsn_map_request;
/* Convert FIO pseudo handle to index in file descriptor array */
#define fio_fileno(f) (((size_t)f - 1) | FIO_PIPE_MARKER)
@ -99,6 +107,7 @@ void fio_error(int rc, int size, char const* file, int line)
else
{
char buf[PRINTF_BUF_SIZE+1];
// Assert(false);
int err_size = read(fio_stderr, buf, PRINTF_BUF_SIZE);
if (err_size > 0)
{
@ -200,14 +209,16 @@ static ssize_t fio_read_all(int fd, void* buf, size_t size)
while (offs < size)
{
ssize_t rc = read(fd, (char*)buf + offs, size - offs);
if (rc < 0) {
if (errno == EINTR) {
if (rc < 0)
{
if (errno == EINTR)
continue;
}
elog(WARNING, "fio_read_all error: %s", strerror(errno));
return rc;
} else if (rc == 0) {
break;
}
else if (rc == 0)
break;
offs += rc;
}
return offs;
@ -2260,43 +2271,53 @@ static void fio_list_dir_impl(int out, char* buf)
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
}
uint16 *fio_get_checksum_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr dest_stop_lsn, BlockNumber segmentno)
PageState *
fio_get_checksum_map(const char *fullpath, uint32 checksum_version, int n_blocks,
XLogRecPtr dest_stop_lsn, BlockNumber segmentno, fio_location location)
{
fio_header hdr;
fio_checksum_map_request req_hdr;
uint16 *checksum_map = NULL;
size_t path_len = strlen(fullpath) + 1;
req_hdr.n_blocks = n_blocks;
req_hdr.segmentno = segmentno;
req_hdr.stop_lsn = dest_stop_lsn;
req_hdr.checksumVersion = checksum_version;
hdr.cop = FIO_GET_CHECKSUM_MAP;
hdr.size = sizeof(req_hdr) + path_len;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, &req_hdr, sizeof(req_hdr)), sizeof(req_hdr));
IO_CHECK(fio_write_all(fio_stdout, fullpath, path_len), path_len);
/* receive data */
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size > 0)
if (fio_is_remote(location))
{
checksum_map = pgut_malloc(hdr.size * sizeof(uint16));
IO_CHECK(fio_read_all(fio_stdin, checksum_map, hdr.size * sizeof(uint16)), hdr.size * sizeof(uint16));
}
fio_header hdr;
fio_checksum_map_request req_hdr;
PageState *checksum_map = NULL;
size_t path_len = strlen(fullpath) + 1;
return checksum_map;
req_hdr.n_blocks = n_blocks;
req_hdr.segmentno = segmentno;
req_hdr.stop_lsn = dest_stop_lsn;
req_hdr.checksumVersion = checksum_version;
hdr.cop = FIO_GET_CHECKSUM_MAP;
hdr.size = sizeof(req_hdr) + path_len;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, &req_hdr, sizeof(req_hdr)), sizeof(req_hdr));
IO_CHECK(fio_write_all(fio_stdout, fullpath, path_len), path_len);
/* receive data */
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size > 0)
{
checksum_map = pgut_malloc(n_blocks * sizeof(PageState));
memset(checksum_map, 0, n_blocks * sizeof(PageState));
IO_CHECK(fio_read_all(fio_stdin, checksum_map, hdr.size * sizeof(PageState)), hdr.size * sizeof(PageState));
}
return checksum_map;
}
else
{
return get_checksum_map(fullpath, checksum_version,
n_blocks, dest_stop_lsn, segmentno);
}
}
/* TODO: sent n_blocks to truncate file before reading */
static void fio_get_checksum_map_impl(int out, char *buf)
{
fio_header hdr;
uint16 *checksum_map = NULL;
PageState *checksum_map = NULL;
char *fullpath = (char*) buf + sizeof(fio_checksum_map_request);
fio_checksum_map_request *req = (fio_checksum_map_request*) buf;
@ -2307,24 +2328,107 @@ static void fio_get_checksum_map_impl(int out, char *buf)
/* send arrays of checksums to main process */
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size > 0)
IO_CHECK(fio_write_all(out, checksum_map, hdr.size * sizeof(uint16)), hdr.size * sizeof(uint16));
IO_CHECK(fio_write_all(out, checksum_map, hdr.size * sizeof(PageState)), hdr.size * sizeof(PageState));
pg_free(checksum_map);
}
pid_t fio_check_postmaster(const char *pgdata)
datapagemap_t *
fio_get_lsn_map(const char *fullpath, uint32 checksum_version,
int n_blocks, XLogRecPtr horizonLsn, BlockNumber segmentno,
fio_location location)
{
fio_header hdr;
datapagemap_t* lsn_map = NULL;
hdr.cop = FIO_CHECK_POSTMASTER;
hdr.size = strlen(pgdata) + 1;
if (fio_is_remote(location))
{
fio_header hdr;
fio_lsn_map_request req_hdr;
size_t path_len = strlen(fullpath) + 1;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, pgdata, hdr.size), hdr.size);
req_hdr.n_blocks = n_blocks;
req_hdr.segmentno = segmentno;
req_hdr.horizonLsn = horizonLsn;
req_hdr.checksumVersion = checksum_version;
/* receive result */
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
return hdr.arg;
hdr.cop = FIO_GET_LSN_MAP;
hdr.size = sizeof(req_hdr) + path_len;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, &req_hdr, sizeof(req_hdr)), sizeof(req_hdr));
IO_CHECK(fio_write_all(fio_stdout, fullpath, path_len), path_len);
/* receive data */
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size > 0)
{
lsn_map = pgut_malloc(sizeof(datapagemap_t));
memset(lsn_map, 0, sizeof(datapagemap_t));
lsn_map->bitmap = pgut_malloc(hdr.size);
lsn_map->bitmapsize = hdr.size;
IO_CHECK(fio_read_all(fio_stdin, lsn_map->bitmap, hdr.size), hdr.size);
}
}
else
{
lsn_map = get_lsn_map(fullpath, checksum_version, n_blocks,
horizonLsn, segmentno);
}
return lsn_map;
}
static void fio_get_lsn_map_impl(int out, char *buf)
{
fio_header hdr;
datapagemap_t *lsn_map = NULL;
char *fullpath = (char*) buf + sizeof(fio_lsn_map_request);
fio_lsn_map_request *req = (fio_lsn_map_request*) buf;
lsn_map = get_lsn_map(fullpath, req->checksumVersion, req->n_blocks,
req->horizonLsn, req->segmentno);
if (lsn_map)
hdr.size = lsn_map->bitmapsize;
else
hdr.size = 0;
/* send arrays of checksums to main process */
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size > 0)
IO_CHECK(fio_write_all(out, lsn_map->bitmap, hdr.size), hdr.size);
if (lsn_map)
{
pg_free(lsn_map->bitmap);
pg_free(lsn_map);
}
}
/*
* Go to the remote host and get postmaster pid from file postmaster.pid
* and check that process is running, if process is running, return its pid number.
*/
pid_t fio_check_postmaster(const char *pgdata, fio_location location)
{
if (fio_is_remote(location))
{
fio_header hdr;
hdr.cop = FIO_CHECK_POSTMASTER;
hdr.size = strlen(pgdata) + 1;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, pgdata, hdr.size), hdr.size);
/* receive result */
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
return hdr.arg;
}
else
return check_postmaster(pgdata);
}
static void fio_check_postmaster_impl(int out, char *buf)
@ -2515,6 +2619,10 @@ void fio_communicate(int in, int out)
/* calculate crc32 for a file */
fio_get_checksum_map_impl(out, buf);
break;
case FIO_GET_LSN_MAP:
/* calculate crc32 for a file */
fio_get_lsn_map_impl(out, buf);
break;
case FIO_CHECK_POSTMASTER:
/* calculate crc32 for a file */
fio_check_postmaster_impl(out, buf);

View File

@ -35,7 +35,9 @@ typedef enum
FIO_PAGE,
FIO_WRITE_COMPRESSED,
FIO_GET_CRC32,
/* used for incremental restore */
FIO_GET_CHECKSUM_MAP,
FIO_GET_LSN_MAP,
/* used in fio_send_pages */
FIO_SEND_PAGES,
FIO_ERROR,