1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-07 14:18:17 +02:00

[Issue #169] Restore optimization

This commit is contained in:
Grigory Smolkin 2020-01-22 02:58:40 +03:00
parent c6e7490caf
commit 7f983c599a
3 changed files with 653 additions and 13 deletions

View File

@ -757,7 +757,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
}
/*
* Open backup file for write. We use "r+" at first to overwrite only
* Open backup file for write. We use "r+" at first to overwrite only
* modified pages for differential restore. If the file does not exist,
* re-open it with "w" to create an empty file.
*/
@ -961,6 +961,193 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
fclose(in);
}
/*
* Restore files in the from_root directory to the to_root directory with
* same relative path.
*
* If write_header is true then we add header to each restored block, currently
* it is used for MERGE command.
*
* to_fullpath and from_fullpath are provided strictly for ERROR reporting
*/
void
restore_data_file_new(FILE *in, FILE *out, pgFile *file, uint32 backup_version,
const char *from_fullpath, const char *to_fullpath, int nblocks)
{
BackupPageHeader header;
BlockNumber blknum = 0;
size_t write_len = 0;
while (true)
{
off_t write_pos;
size_t read_len;
DataPage compressed_page; /* used as read buffer */
DataPage page;
int32 uncompressed_size = 0;
/* read BackupPageHeader */
read_len = fread(&header, 1, sizeof(header), in);
if (read_len != sizeof(header))
{
int errno_tmp = errno;
if (read_len == 0 && feof(in))
break; /* EOF found */
else if (read_len != 0 && feof(in))
elog(ERROR, "Odd size page found at block %u of \"%s\"",
blknum, from_fullpath);
else
elog(ERROR, "Cannot read header of block %u of \"%s\": %s",
blknum, from_fullpath, strerror(errno_tmp));
}
/* Consider empty block */
if (header.block == 0 && header.compressed_size == 0)
{
elog(VERBOSE, "Skip empty block of \"%s\"", from_fullpath);
continue;
}
/* sanity? */
if (header.block < blknum)
elog(ERROR, "Backup is broken at block %u of \"%s\"",
blknum, from_fullpath);
blknum = header.block;
/* no point in writing redundant data */
if (nblocks > 0 && blknum >= nblocks)
return;
if (header.compressed_size > BLCKSZ)
elog(ERROR, "Size of a blknum %i exceed BLCKSZ", blknum);
/* read a page from file */
read_len = fread(compressed_page.data, 1,
MAXALIGN(header.compressed_size), in);
if (read_len != MAXALIGN(header.compressed_size))
elog(ERROR, "Cannot read block %u of \"%s\", read %zu of %d",
blknum, from_fullpath, read_len, header.compressed_size);
/*
* if page size is smaller than BLCKSZ, decompress the page.
* BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ.
* we have to check, whether it is compressed or not using
* page_may_be_compressed() function.
*/
if (header.compressed_size != BLCKSZ
|| page_may_be_compressed(compressed_page.data, file->compress_alg,
backup_version))
{
const char *errormsg = NULL;
uncompressed_size = do_decompress(page.data, BLCKSZ,
compressed_page.data,
header.compressed_size,
file->compress_alg, &errormsg);
if (uncompressed_size < 0 && errormsg != NULL)
elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s",
blknum, from_fullpath, errormsg);
if (uncompressed_size != BLCKSZ)
elog(ERROR, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ",
from_fullpath, uncompressed_size);
}
write_pos = blknum * BLCKSZ;
/*
* Seek and write the restored page.
* TODO: invent fio_pwrite().
*/
if (fio_fseek(out, write_pos) < 0)
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
/* if we uncompressed the page - write page.data,
* if page wasn't compressed -
* write what we've read - compressed_page.data
*/
if (uncompressed_size == BLCKSZ)
{
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
else
{
if (fio_fwrite(out, compressed_page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
write_len += BLCKSZ;
}
elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, write_len);
}
/*
* Copy file to backup.
* We do not apply compression to these files, because
* it is either small control file or already compressed cfs file.
*/
void
restore_non_data_file(FILE *in, FILE *out, pgFile *file,
const char *from_fullpath, const char *to_fullpath)
{
size_t read_len = 0;
int errno_tmp;
char buf[BLCKSZ];
/* copy content */
for (;;)
{
read_len = 0;
if ((read_len = fio_fread(in, buf, sizeof(buf))) != sizeof(buf))
break;
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_tmp = errno;
/* oops */
fio_fclose(in);
fio_fclose(out);
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath,
strerror(errno_tmp));
}
}
errno_tmp = errno;
if (read_len < 0)
{
fio_fclose(in);
fio_fclose(out);
elog(ERROR, "Cannot read backup mode file \"%s\": %s",
from_fullpath, strerror(errno_tmp));
}
/* copy odd part. */
if (read_len > 0)
{
if (fio_fwrite(out, buf, read_len) != read_len)
{
errno_tmp = errno;
/* oops */
fio_fclose(in);
fio_fclose(out);
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath,
strerror(errno_tmp));
}
}
elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, file->write_size);
}
/*
* Copy file to backup.
* We do not apply compression to these files, because

View File

@ -372,6 +372,8 @@ struct pgBackup
* in the format suitable for recovery.conf */
char *external_dir_str; /* List of external directories,
* separated by ':' */
parray *files; /* list of files belonging to this backup
* must be populated by calling backup_populate() */
};
/* Recovery target for restore and validate subcommands */
@ -835,6 +837,10 @@ extern void restore_data_file(const char *to_path,
pgFile *file, bool allow_truncate,
bool write_header,
uint32 backup_version);
extern void restore_data_file_new(FILE *in, FILE *out, pgFile *file, uint32 backup_version,
const char *from_fullpath, const char *to_fullpath, int nblocks);
extern void restore_non_data_file(FILE *in, FILE *out, pgFile *file,
const char *from_fullpath, const char *to_fullpath);
extern bool copy_file(fio_location from_location, const char *to_root,
fio_location to_location, pgFile *file, bool missing_ok);
extern bool create_empty_file(fio_location from_location, const char *to_root,

View File

@ -35,6 +35,24 @@ typedef struct
int ret;
} restore_files_arg;
typedef struct
{
parray *dest_files;
pgBackup *dest_backup;
char *external_prefix;
parray *dest_external_dirs;
parray *parent_chain;
parray *dbOid_exclude_list;
bool skip_external_dirs;
const char *to_root;
/*
* Return value from the thread.
* 0 means there is no error, 1 - there is an error.
*/
int ret;
} restore_files_arg_new;
static void restore_backup(pgBackup *backup, parray *dest_external_dirs,
parray *dest_files, parray *dbOid_exclude_list,
pgRestoreParams *params);
@ -46,6 +64,12 @@ static void *restore_files(void *arg);
static void set_orphan_status(parray *backups, pgBackup *parent_backup);
static void pg12_recovery_config(pgBackup *backup, bool add_include);
static void restore_chain(pgBackup *dest_backup, parray *dest_files,
parray *parent_chain, parray *dbOid_exclude_list,
pgRestoreParams *params, const char *pgdata_path);
static void *restore_files_new(void *arg);
/*
* Iterate over backup list to find all ancestors of the broken parent_backup
@ -499,29 +523,74 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
DIR_PERMISSION, FIO_DB_HOST);
}
if (rt->lsn_string &&
parse_server_version(dest_backup->server_version) < 100000)
elog(ERROR, "Backup %s was created for version %s which doesn't support recovery_target_lsn",
base36enc(dest_backup->start_time),
dest_backup->server_version);
/*
* Restore backups files starting from the parent backup.
*/
// for (i = parray_num(parent_chain) - 1; i >= 0; i--)
// {
// pgBackup *backup = (pgBackup *) parray_get(parent_chain, i);
//
// /*
// * Backup was locked during validation if no-validate wasn't
// * specified.
// */
// if (params->no_validate && !lock_backup(backup))
// elog(ERROR, "Cannot lock backup directory");
//
// restore_backup(backup, dest_external_dirs, dest_files, dbOid_exclude_list, params);
// }
// lock entire chain
// sanity:
// 1. check status of every backup in chain
for (i = parray_num(parent_chain) - 1; i >= 0; i--)
{
pgBackup *backup = (pgBackup *) parray_get(parent_chain, i);
if (rt->lsn_string &&
parse_server_version(backup->server_version) < 100000)
elog(ERROR, "Backup %s was created for version %s which doesn't support recovery_target_lsn",
base36enc(dest_backup->start_time),
dest_backup->server_version);
if (backup->status != BACKUP_STATUS_OK &&
backup->status != BACKUP_STATUS_DONE)
{
if (params->force)
elog(WARNING, "Backup %s is not valid, restore is forced",
base36enc(backup->start_time));
else
elog(ERROR, "Backup %s cannot be restored because it is not valid",
base36enc(backup->start_time));
}
/*
* Backup was locked during validation if no-validate wasn't
* specified.
*/
if (params->no_validate && !lock_backup(backup))
elog(ERROR, "Cannot lock backup directory");
/* confirm block size compatibility */
if (backup->block_size != BLCKSZ)
elog(ERROR,
"BLCKSZ(%d) is not compatible(%d expected)",
backup->block_size, BLCKSZ);
restore_backup(backup, dest_external_dirs, dest_files, dbOid_exclude_list, params);
if (backup->wal_block_size != XLOG_BLCKSZ)
elog(ERROR,
"XLOG_BLCKSZ(%d) is not compatible(%d expected)",
backup->wal_block_size, XLOG_BLCKSZ);
/* populate backup filelist */
if (backup->start_time != dest_backup->start_time)
{
pgBackupGetPath(backup, control_file, lengthof(control_file), DATABASE_FILE_LIST);
backup->files = dir_read_file_list(NULL, NULL, control_file, FIO_BACKUP_HOST);
}
else
backup->files = dest_files;
parray_qsort(backup->files, pgFileCompareRelPathWithExternal);
}
restore_chain(dest_backup, dest_files, parent_chain, dbOid_exclude_list,
params, instance_config.pgdata);
if (dest_external_dirs != NULL)
free_dir_list(dest_external_dirs);
@ -542,6 +611,384 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
return 0;
}
/*
* Restore backup chain.
*/
void
restore_chain(pgBackup *dest_backup, parray *dest_files,
parray *parent_chain, parray *dbOid_exclude_list,
pgRestoreParams *params, const char *pgdata_path)
{
char timestamp[100];
char external_prefix[MAXPGPATH];
parray *external_dirs = NULL;
int i;
/* arrays with meta info for multi threaded backup */
pthread_t *threads;
restore_files_arg_new *threads_args;
bool restore_isok = true;
time2iso(timestamp, lengthof(timestamp), dest_backup->start_time);
elog(LOG, "Restoring database from backup %s", timestamp);
if (dest_backup->external_dir_str)
external_dirs = make_external_directory_list(dest_backup->external_dir_str,
true);
/* Restore directories first */
parray_qsort(dest_files, pgFileCompareRelPathWithExternal);
/*
* Setup file locks
*/
for (i = 0; i < parray_num(dest_files); i++)
{
pgFile *file = (pgFile *) parray_get(dest_files, i);
/* setup threads */
pg_atomic_clear_flag(&file->lock);
}
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
threads_args = (restore_files_arg_new *) palloc(sizeof(restore_files_arg_new) *
num_threads);
/* Restore files into target directory */
thread_interrupted = false;
for (i = 0; i < num_threads; i++)
{
restore_files_arg_new *arg = &(threads_args[i]);
arg->dest_files = dest_files;
arg->dest_backup = dest_backup;
arg->external_prefix = external_prefix;
arg->dest_external_dirs = external_dirs;
arg->parent_chain = parent_chain;
arg->dbOid_exclude_list = dbOid_exclude_list;
arg->skip_external_dirs = params->skip_external_dirs;
arg->to_root = pgdata_path;
/* By default there are some error */
threads_args[i].ret = 1;
/* Useless message TODO: rewrite */
elog(LOG, "Start thread %i", i + 1);
pthread_create(&threads[i], NULL, restore_files_new, arg);
}
/* Wait theads */
for (i = 0; i < num_threads; i++)
{
pthread_join(threads[i], NULL);
if (threads_args[i].ret == 1)
restore_isok = false;
}
if (!restore_isok)
elog(ERROR, "Data files restoring failed");
pfree(threads);
pfree(threads_args);
if (external_dirs != NULL)
free_dir_list(external_dirs);
// elog(LOG, "Restore of backup %s is completed", base36enc(backup->start_time));
}
/*
* Restore files into $PGDATA.
*/
static void *
restore_files_new(void *arg)
{
int i, j;
char to_fullpath[MAXPGPATH];
FILE *out = NULL;
restore_files_arg_new *arguments = (restore_files_arg_new *) arg;
// for (i = parray_num(arguments->parent_chain) - 1; i >= 0; i--)
// {
// pgBackup *backup = (pgBackup *) parray_get(arguments->parent_chain, i);
//
// for (j = 0; j < parray_num(backup->files); j++)
// {
// pgFile *file = (pgFile *) parray_get(backup->files, j);
//
// elog(INFO, "Backup %s;File: %s, Size: %li",
// base36enc(backup->start_time), file->name, file->write_size);
// }
// }
//
// elog(ERROR, "HELLO");
for (i = 0; i < parray_num(arguments->dest_files); i++)
{
pgFile *file = (pgFile *) parray_get(arguments->dest_files, i);
/* Directories were created before */
if (S_ISDIR(file->mode))
continue;
if (!pg_atomic_test_set_flag(&file->lock))
continue;
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during restore");
if (progress)
elog(INFO, "Progress: (%d/%lu). Process file %s ",
i + 1, (unsigned long) parray_num(arguments->dest_files),
file->rel_path);
/* Only files from pgdata can be skipped by partial restore */
if (arguments->dbOid_exclude_list && file->external_dir_num == 0)
{
/* Check if the file belongs to the database we exclude */
if (parray_bsearch(arguments->dbOid_exclude_list,
&file->dbOid, pgCompareOid))
{
/*
* We cannot simply skip the file, because it may lead to
* failure during WAL redo; hence, create empty file.
*/
create_empty_file(FIO_BACKUP_HOST,
arguments->to_root, FIO_DB_HOST, file);
elog(VERBOSE, "Exclude file due to partial restore: \"%s\"",
file->rel_path);
continue;
}
}
/* Do not restore tablespace_map file */
if (path_is_prefix_of_path(PG_TABLESPACE_MAP_FILE, file->rel_path))
{
elog(VERBOSE, "Skip tablespace_map");
continue;
}
/* Do not restore database_map file */
if ((file->external_dir_num == 0) &&
strcmp(DATABASE_MAP, file->rel_path) == 0)
{
elog(VERBOSE, "Skip database_map");
continue;
}
/* Do no restore external directory file if a user doesn't want */
if (arguments->skip_external_dirs && file->external_dir_num > 0)
continue;
//set max_blknum based on file->n_blocks
/* set fullpath of destination file */
if (file->external_dir_num == 0)
join_path_components(to_fullpath, arguments->to_root, file->rel_path);
//else
// TODO
/* open destination file */
out = fio_fopen(to_fullpath, PG_BINARY_W, FIO_DB_HOST);
if (out == NULL)
{
int errno_tmp = errno;
elog(ERROR, "Cannot open restore target file \"%s\": %s",
to_fullpath, strerror(errno_tmp));
}
if (!file->is_datafile || file->is_cfs)
elog(VERBOSE, "Restoring non-data file: \"%s\"", to_fullpath);
else
elog(VERBOSE, "Restoring data file: \"%s\"", to_fullpath);
// if dest file is 0 sized, then just close it and go for the next
if (file->write_size == 0)
goto done;
// TODO
// optimize copying of non-data files:
// lookup latest backup with file that has not BYTES_INVALID size
// and copy only it.
if (!file->is_datafile || file->is_cfs)
{
char from_root[MAXPGPATH];
char from_fullpath[MAXPGPATH];
FILE *in = NULL;
pgFile *tmp_file = NULL;
if (file->write_size > 0)
{
tmp_file = file;
pgBackupGetPath(arguments->dest_backup, from_root, lengthof(from_root), DATABASE_DIR);
join_path_components(from_fullpath, from_root, file->rel_path);
}
else
{
for (j = 0; j < parray_num(arguments->parent_chain); j++)
{
pgFile **res_file = NULL;
pgBackup *backup = (pgBackup *) parray_get(arguments->parent_chain, j);
/* lookup file in intermediate backup */
res_file = parray_bsearch(backup->files, file, pgFileCompareRelPathWithExternal);
tmp_file = (res_file) ? *res_file : NULL;
if (!tmp_file)
continue;
if (tmp_file->write_size == 0)
goto done;
if (tmp_file->write_size > 0)
{
pgBackupGetPath(backup, from_root, lengthof(from_root), DATABASE_DIR);
join_path_components(from_fullpath, from_root, file->rel_path);
break;
}
}
}
if (!tmp_file)
elog(ERROR, "Something went wrong");
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath,
strerror(errno));
}
if (strcmp(file->name, "pg_control") == 0)
copy_pgcontrol_file(from_root, FIO_BACKUP_HOST,
instance_config.pgdata, FIO_DB_HOST,
file);
else
restore_non_data_file(in, out, tmp_file, from_fullpath, to_fullpath);
if (fio_fclose(in) != 0)
elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath,
strerror(errno));
goto done;
}
for (j = parray_num(arguments->parent_chain) - 1; j >= 0; j--)
{
char from_root[MAXPGPATH];
char from_fullpath[MAXPGPATH];
FILE *in = NULL;
pgFile **res_file = NULL;
pgFile *tmp_file = NULL;
pgBackup *backup = (pgBackup *) parray_get(arguments->parent_chain, j);
/* lookup file in intermediate backup */
res_file = parray_bsearch(backup->files, file, pgFileCompareRelPathWithExternal);
tmp_file = (res_file) ? *res_file : NULL;
/* destination file is not exists in this intermediate backup */
if (tmp_file == NULL)
continue;
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during restore");
/*
* For incremental backups skip files which haven't changed
* since previous backup and thus were not backed up.
*/
if (tmp_file->write_size == BYTES_INVALID)
{
elog(VERBOSE, "The file didn`t change. Skip restore: \"%s\"", from_fullpath);
continue;
}
/*
* At this point we are sure, that something is going to be copied
* Open source file.
*/
/* TODO: special handling for files from external directories */
pgBackupGetPath(backup, from_root, lengthof(from_root), DATABASE_DIR);
join_path_components(from_fullpath, from_root, file->rel_path);
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath,
strerror(errno));
}
/*
* restore the file.
* We treat datafiles separately, cause they were backed up block by
* block and have BackupPageHeader meta information, so we cannot just
* copy the file from backup.
*/
// if (file->is_datafile && file->is_cfs)
restore_data_file_new(in, out, tmp_file,
parse_program_version(backup->program_version),
from_fullpath, to_fullpath, file->n_blocks);
// else if (strcmp(file->name, "pg_control") == 0)
// copy_pgcontrol_file(from_root, FIO_BACKUP_HOST,
// instance_config.pgdata, FIO_DB_HOST,
// file);
// else
// restore_non_data_file(in, out, tmp_file, from_fullpath, to_fullpath);
if (fio_fclose(in) != 0)
elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath,
strerror(errno));
}
done:
// chmod
// fsync
// close
/* truncate file up to n_blocks. NOTE: no need, we just should not write
* blocks that are exceeding n_blocks.
* But for this to work, n_blocks should be trusted.
*/
/* update file permission */
if (fio_chmod(to_fullpath, file->mode, FIO_DB_HOST) == -1)
{
int errno_tmp = errno;
fio_fclose(out);
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno_tmp));
}
/* flush file */
if (fio_fflush(out) != 0)
elog(ERROR, "Cannot flush file \"%s\": %s", to_fullpath,
strerror(errno));
/* fsync file */
/* close file */
if (fio_fclose(out) != 0)
elog(ERROR, "Cannot close file \"%s\": %s", to_fullpath,
strerror(errno));
/* print size of restored file */
// if (file->write_size != BYTES_INVALID)
// elog(VERBOSE, "Restored file %s : " INT64_FORMAT " bytes",
// file->path, file->write_size);
}
/* Data files restoring is successful */
arguments->ret = 0;
return NULL;
}
/*
* Restore one backup.
*/