diff --git a/src/data.c b/src/data.c index a68197cd..1d0fada2 100644 --- a/src/data.c +++ b/src/data.c @@ -757,7 +757,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, } /* - * Open backup file for write. We use "r+" at first to overwrite only + * Open backup file for write. We use "r+" at first to overwrite only * modified pages for differential restore. If the file does not exist, * re-open it with "w" to create an empty file. */ @@ -961,6 +961,193 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, fclose(in); } +/* + * Restore files in the from_root directory to the to_root directory with + * same relative path. + * + * If write_header is true then we add header to each restored block, currently + * it is used for MERGE command. + * + * to_fullpath and from_fullpath are provided strictly for ERROR reporting + */ +void +restore_data_file_new(FILE *in, FILE *out, pgFile *file, uint32 backup_version, + const char *from_fullpath, const char *to_fullpath, int nblocks) +{ + BackupPageHeader header; + BlockNumber blknum = 0; + size_t write_len = 0; + + while (true) + { + off_t write_pos; + size_t read_len; + DataPage compressed_page; /* used as read buffer */ + DataPage page; + int32 uncompressed_size = 0; + + /* read BackupPageHeader */ + read_len = fread(&header, 1, sizeof(header), in); + + if (read_len != sizeof(header)) + { + int errno_tmp = errno; + if (read_len == 0 && feof(in)) + break; /* EOF found */ + else if (read_len != 0 && feof(in)) + elog(ERROR, "Odd size page found at block %u of \"%s\"", + blknum, from_fullpath); + else + elog(ERROR, "Cannot read header of block %u of \"%s\": %s", + blknum, from_fullpath, strerror(errno_tmp)); + } + + /* Consider empty block */ + if (header.block == 0 && header.compressed_size == 0) + { + elog(VERBOSE, "Skip empty block of \"%s\"", from_fullpath); + continue; + } + + /* sanity? */ + if (header.block < blknum) + elog(ERROR, "Backup is broken at block %u of \"%s\"", + blknum, from_fullpath); + + blknum = header.block; + + /* no point in writing redundant data */ + if (nblocks > 0 && blknum >= nblocks) + return; + + if (header.compressed_size > BLCKSZ) + elog(ERROR, "Size of a blknum %i exceed BLCKSZ", blknum); + + /* read a page from file */ + read_len = fread(compressed_page.data, 1, + MAXALIGN(header.compressed_size), in); + + if (read_len != MAXALIGN(header.compressed_size)) + elog(ERROR, "Cannot read block %u of \"%s\", read %zu of %d", + blknum, from_fullpath, read_len, header.compressed_size); + + /* + * if page size is smaller than BLCKSZ, decompress the page. + * BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ. + * we have to check, whether it is compressed or not using + * page_may_be_compressed() function. + */ + if (header.compressed_size != BLCKSZ + || page_may_be_compressed(compressed_page.data, file->compress_alg, + backup_version)) + { + const char *errormsg = NULL; + + uncompressed_size = do_decompress(page.data, BLCKSZ, + compressed_page.data, + header.compressed_size, + file->compress_alg, &errormsg); + + if (uncompressed_size < 0 && errormsg != NULL) + elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s", + blknum, from_fullpath, errormsg); + + if (uncompressed_size != BLCKSZ) + elog(ERROR, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ", + from_fullpath, uncompressed_size); + } + + write_pos = blknum * BLCKSZ; + + /* + * Seek and write the restored page. + * TODO: invent fio_pwrite(). + */ + if (fio_fseek(out, write_pos) < 0) + elog(ERROR, "Cannot seek block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + + /* if we uncompressed the page - write page.data, + * if page wasn't compressed - + * write what we've read - compressed_page.data + */ + if (uncompressed_size == BLCKSZ) + { + if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ) + elog(ERROR, "Cannot write block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + } + else + { + if (fio_fwrite(out, compressed_page.data, BLCKSZ) != BLCKSZ) + elog(ERROR, "Cannot write block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + } + + write_len += BLCKSZ; + } + + elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, write_len); +} + +/* + * Copy file to backup. + * We do not apply compression to these files, because + * it is either small control file or already compressed cfs file. + */ +void +restore_non_data_file(FILE *in, FILE *out, pgFile *file, + const char *from_fullpath, const char *to_fullpath) +{ + size_t read_len = 0; + int errno_tmp; + char buf[BLCKSZ]; + + /* copy content */ + for (;;) + { + read_len = 0; + + if ((read_len = fio_fread(in, buf, sizeof(buf))) != sizeof(buf)) + break; + + if (fio_fwrite(out, buf, read_len) != read_len) + { + errno_tmp = errno; + /* oops */ + fio_fclose(in); + fio_fclose(out); + elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, + strerror(errno_tmp)); + } + } + + errno_tmp = errno; + if (read_len < 0) + { + fio_fclose(in); + fio_fclose(out); + elog(ERROR, "Cannot read backup mode file \"%s\": %s", + from_fullpath, strerror(errno_tmp)); + } + + /* copy odd part. */ + if (read_len > 0) + { + if (fio_fwrite(out, buf, read_len) != read_len) + { + errno_tmp = errno; + /* oops */ + fio_fclose(in); + fio_fclose(out); + elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, + strerror(errno_tmp)); + } + } + + elog(VERBOSE, "Copied file \"%s\": %lu bytes", from_fullpath, file->write_size); +} + /* * Copy file to backup. * We do not apply compression to these files, because diff --git a/src/pg_probackup.h b/src/pg_probackup.h index af782f1d..d5d5f2b4 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -372,6 +372,8 @@ struct pgBackup * in the format suitable for recovery.conf */ char *external_dir_str; /* List of external directories, * separated by ':' */ + parray *files; /* list of files belonging to this backup + * must be populated by calling backup_populate() */ }; /* Recovery target for restore and validate subcommands */ @@ -835,6 +837,10 @@ extern void restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, bool write_header, uint32 backup_version); +extern void restore_data_file_new(FILE *in, FILE *out, pgFile *file, uint32 backup_version, + const char *from_fullpath, const char *to_fullpath, int nblocks); +extern void restore_non_data_file(FILE *in, FILE *out, pgFile *file, + const char *from_fullpath, const char *to_fullpath); extern bool copy_file(fio_location from_location, const char *to_root, fio_location to_location, pgFile *file, bool missing_ok); extern bool create_empty_file(fio_location from_location, const char *to_root, diff --git a/src/restore.c b/src/restore.c index 3d4e544c..13b6184d 100644 --- a/src/restore.c +++ b/src/restore.c @@ -35,6 +35,24 @@ typedef struct int ret; } restore_files_arg; +typedef struct +{ + parray *dest_files; + pgBackup *dest_backup; + char *external_prefix; + parray *dest_external_dirs; + parray *parent_chain; + parray *dbOid_exclude_list; + bool skip_external_dirs; + const char *to_root; + + /* + * Return value from the thread. + * 0 means there is no error, 1 - there is an error. + */ + int ret; +} restore_files_arg_new; + static void restore_backup(pgBackup *backup, parray *dest_external_dirs, parray *dest_files, parray *dbOid_exclude_list, pgRestoreParams *params); @@ -46,6 +64,12 @@ static void *restore_files(void *arg); static void set_orphan_status(parray *backups, pgBackup *parent_backup); static void pg12_recovery_config(pgBackup *backup, bool add_include); +static void restore_chain(pgBackup *dest_backup, parray *dest_files, + parray *parent_chain, parray *dbOid_exclude_list, + pgRestoreParams *params, const char *pgdata_path); + +static void *restore_files_new(void *arg); + /* * Iterate over backup list to find all ancestors of the broken parent_backup @@ -499,29 +523,74 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt, DIR_PERMISSION, FIO_DB_HOST); } + if (rt->lsn_string && + parse_server_version(dest_backup->server_version) < 100000) + elog(ERROR, "Backup %s was created for version %s which doesn't support recovery_target_lsn", + base36enc(dest_backup->start_time), + dest_backup->server_version); + /* * Restore backups files starting from the parent backup. */ +// for (i = parray_num(parent_chain) - 1; i >= 0; i--) +// { +// pgBackup *backup = (pgBackup *) parray_get(parent_chain, i); +// +// /* +// * Backup was locked during validation if no-validate wasn't +// * specified. +// */ +// if (params->no_validate && !lock_backup(backup)) +// elog(ERROR, "Cannot lock backup directory"); +// +// restore_backup(backup, dest_external_dirs, dest_files, dbOid_exclude_list, params); +// } + + // lock entire chain + + // sanity: + // 1. check status of every backup in chain for (i = parray_num(parent_chain) - 1; i >= 0; i--) { pgBackup *backup = (pgBackup *) parray_get(parent_chain, i); - if (rt->lsn_string && - parse_server_version(backup->server_version) < 100000) - elog(ERROR, "Backup %s was created for version %s which doesn't support recovery_target_lsn", - base36enc(dest_backup->start_time), - dest_backup->server_version); + if (backup->status != BACKUP_STATUS_OK && + backup->status != BACKUP_STATUS_DONE) + { + if (params->force) + elog(WARNING, "Backup %s is not valid, restore is forced", + base36enc(backup->start_time)); + else + elog(ERROR, "Backup %s cannot be restored because it is not valid", + base36enc(backup->start_time)); + } - /* - * Backup was locked during validation if no-validate wasn't - * specified. - */ - if (params->no_validate && !lock_backup(backup)) - elog(ERROR, "Cannot lock backup directory"); + /* confirm block size compatibility */ + if (backup->block_size != BLCKSZ) + elog(ERROR, + "BLCKSZ(%d) is not compatible(%d expected)", + backup->block_size, BLCKSZ); - restore_backup(backup, dest_external_dirs, dest_files, dbOid_exclude_list, params); + if (backup->wal_block_size != XLOG_BLCKSZ) + elog(ERROR, + "XLOG_BLCKSZ(%d) is not compatible(%d expected)", + backup->wal_block_size, XLOG_BLCKSZ); + + /* populate backup filelist */ + if (backup->start_time != dest_backup->start_time) + { + pgBackupGetPath(backup, control_file, lengthof(control_file), DATABASE_FILE_LIST); + backup->files = dir_read_file_list(NULL, NULL, control_file, FIO_BACKUP_HOST); + } + else + backup->files = dest_files; + + parray_qsort(backup->files, pgFileCompareRelPathWithExternal); } + restore_chain(dest_backup, dest_files, parent_chain, dbOid_exclude_list, + params, instance_config.pgdata); + if (dest_external_dirs != NULL) free_dir_list(dest_external_dirs); @@ -542,6 +611,384 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt, return 0; } +/* + * Restore backup chain. + */ +void +restore_chain(pgBackup *dest_backup, parray *dest_files, + parray *parent_chain, parray *dbOid_exclude_list, + pgRestoreParams *params, const char *pgdata_path) +{ + char timestamp[100]; + char external_prefix[MAXPGPATH]; + parray *external_dirs = NULL; + int i; + /* arrays with meta info for multi threaded backup */ + pthread_t *threads; + restore_files_arg_new *threads_args; + bool restore_isok = true; + + time2iso(timestamp, lengthof(timestamp), dest_backup->start_time); + elog(LOG, "Restoring database from backup %s", timestamp); + + if (dest_backup->external_dir_str) + external_dirs = make_external_directory_list(dest_backup->external_dir_str, + true); + + /* Restore directories first */ + parray_qsort(dest_files, pgFileCompareRelPathWithExternal); + + /* + * Setup file locks + */ + for (i = 0; i < parray_num(dest_files); i++) + { + pgFile *file = (pgFile *) parray_get(dest_files, i); + + /* setup threads */ + pg_atomic_clear_flag(&file->lock); + } + + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (restore_files_arg_new *) palloc(sizeof(restore_files_arg_new) * + num_threads); + + /* Restore files into target directory */ + thread_interrupted = false; + for (i = 0; i < num_threads; i++) + { + restore_files_arg_new *arg = &(threads_args[i]); + + arg->dest_files = dest_files; + arg->dest_backup = dest_backup; + arg->external_prefix = external_prefix; + arg->dest_external_dirs = external_dirs; + arg->parent_chain = parent_chain; + arg->dbOid_exclude_list = dbOid_exclude_list; + arg->skip_external_dirs = params->skip_external_dirs; + arg->to_root = pgdata_path; + /* By default there are some error */ + threads_args[i].ret = 1; + + /* Useless message TODO: rewrite */ + elog(LOG, "Start thread %i", i + 1); + + pthread_create(&threads[i], NULL, restore_files_new, arg); + } + + /* Wait theads */ + for (i = 0; i < num_threads; i++) + { + pthread_join(threads[i], NULL); + if (threads_args[i].ret == 1) + restore_isok = false; + } + if (!restore_isok) + elog(ERROR, "Data files restoring failed"); + + pfree(threads); + pfree(threads_args); + + if (external_dirs != NULL) + free_dir_list(external_dirs); + +// elog(LOG, "Restore of backup %s is completed", base36enc(backup->start_time)); +} + +/* + * Restore files into $PGDATA. + */ +static void * +restore_files_new(void *arg) +{ + int i, j; + char to_fullpath[MAXPGPATH]; + FILE *out = NULL; + + restore_files_arg_new *arguments = (restore_files_arg_new *) arg; + +// for (i = parray_num(arguments->parent_chain) - 1; i >= 0; i--) +// { +// pgBackup *backup = (pgBackup *) parray_get(arguments->parent_chain, i); +// +// for (j = 0; j < parray_num(backup->files); j++) +// { +// pgFile *file = (pgFile *) parray_get(backup->files, j); +// +// elog(INFO, "Backup %s;File: %s, Size: %li", +// base36enc(backup->start_time), file->name, file->write_size); +// } +// } +// +// elog(ERROR, "HELLO"); + + for (i = 0; i < parray_num(arguments->dest_files); i++) + { + pgFile *file = (pgFile *) parray_get(arguments->dest_files, i); + + /* Directories were created before */ + if (S_ISDIR(file->mode)) + continue; + + if (!pg_atomic_test_set_flag(&file->lock)) + continue; + + /* check for interrupt */ + if (interrupted || thread_interrupted) + elog(ERROR, "Interrupted during restore"); + + if (progress) + elog(INFO, "Progress: (%d/%lu). Process file %s ", + i + 1, (unsigned long) parray_num(arguments->dest_files), + file->rel_path); + + /* Only files from pgdata can be skipped by partial restore */ + if (arguments->dbOid_exclude_list && file->external_dir_num == 0) + { + /* Check if the file belongs to the database we exclude */ + if (parray_bsearch(arguments->dbOid_exclude_list, + &file->dbOid, pgCompareOid)) + { + /* + * We cannot simply skip the file, because it may lead to + * failure during WAL redo; hence, create empty file. + */ + create_empty_file(FIO_BACKUP_HOST, + arguments->to_root, FIO_DB_HOST, file); + + elog(VERBOSE, "Exclude file due to partial restore: \"%s\"", + file->rel_path); + continue; + } + } + + /* Do not restore tablespace_map file */ + if (path_is_prefix_of_path(PG_TABLESPACE_MAP_FILE, file->rel_path)) + { + elog(VERBOSE, "Skip tablespace_map"); + continue; + } + + /* Do not restore database_map file */ + if ((file->external_dir_num == 0) && + strcmp(DATABASE_MAP, file->rel_path) == 0) + { + elog(VERBOSE, "Skip database_map"); + continue; + } + + /* Do no restore external directory file if a user doesn't want */ + if (arguments->skip_external_dirs && file->external_dir_num > 0) + continue; + + //set max_blknum based on file->n_blocks + /* set fullpath of destination file */ + if (file->external_dir_num == 0) + join_path_components(to_fullpath, arguments->to_root, file->rel_path); + //else + // TODO + + /* open destination file */ + out = fio_fopen(to_fullpath, PG_BINARY_W, FIO_DB_HOST); + if (out == NULL) + { + int errno_tmp = errno; + elog(ERROR, "Cannot open restore target file \"%s\": %s", + to_fullpath, strerror(errno_tmp)); + } + + if (!file->is_datafile || file->is_cfs) + elog(VERBOSE, "Restoring non-data file: \"%s\"", to_fullpath); + else + elog(VERBOSE, "Restoring data file: \"%s\"", to_fullpath); + + // if dest file is 0 sized, then just close it and go for the next + if (file->write_size == 0) + goto done; + + // TODO + // optimize copying of non-data files: + // lookup latest backup with file that has not BYTES_INVALID size + // and copy only it. + + if (!file->is_datafile || file->is_cfs) + { + char from_root[MAXPGPATH]; + char from_fullpath[MAXPGPATH]; + FILE *in = NULL; + + pgFile *tmp_file = NULL; + + if (file->write_size > 0) + { + tmp_file = file; + pgBackupGetPath(arguments->dest_backup, from_root, lengthof(from_root), DATABASE_DIR); + join_path_components(from_fullpath, from_root, file->rel_path); + } + else + { + for (j = 0; j < parray_num(arguments->parent_chain); j++) + { + pgFile **res_file = NULL; + pgBackup *backup = (pgBackup *) parray_get(arguments->parent_chain, j); + + /* lookup file in intermediate backup */ + res_file = parray_bsearch(backup->files, file, pgFileCompareRelPathWithExternal); + tmp_file = (res_file) ? *res_file : NULL; + + if (!tmp_file) + continue; + + if (tmp_file->write_size == 0) + goto done; + + if (tmp_file->write_size > 0) + { + pgBackupGetPath(backup, from_root, lengthof(from_root), DATABASE_DIR); + join_path_components(from_fullpath, from_root, file->rel_path); + break; + } + } + } + + if (!tmp_file) + elog(ERROR, "Something went wrong"); + + in = fopen(from_fullpath, PG_BINARY_R); + if (in == NULL) + { + elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath, + strerror(errno)); + } + + if (strcmp(file->name, "pg_control") == 0) + copy_pgcontrol_file(from_root, FIO_BACKUP_HOST, + instance_config.pgdata, FIO_DB_HOST, + file); + else + restore_non_data_file(in, out, tmp_file, from_fullpath, to_fullpath); + + if (fio_fclose(in) != 0) + elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath, + strerror(errno)); + + goto done; + } + + for (j = parray_num(arguments->parent_chain) - 1; j >= 0; j--) + { + char from_root[MAXPGPATH]; + char from_fullpath[MAXPGPATH]; + FILE *in = NULL; + + pgFile **res_file = NULL; + pgFile *tmp_file = NULL; + + pgBackup *backup = (pgBackup *) parray_get(arguments->parent_chain, j); + + /* lookup file in intermediate backup */ + res_file = parray_bsearch(backup->files, file, pgFileCompareRelPathWithExternal); + tmp_file = (res_file) ? *res_file : NULL; + + /* destination file is not exists in this intermediate backup */ + if (tmp_file == NULL) + continue; + + /* check for interrupt */ + if (interrupted || thread_interrupted) + elog(ERROR, "Interrupted during restore"); + + /* + * For incremental backups skip files which haven't changed + * since previous backup and thus were not backed up. + */ + if (tmp_file->write_size == BYTES_INVALID) + { + elog(VERBOSE, "The file didn`t change. Skip restore: \"%s\"", from_fullpath); + continue; + } + + /* + * At this point we are sure, that something is going to be copied + * Open source file. + */ + + /* TODO: special handling for files from external directories */ + pgBackupGetPath(backup, from_root, lengthof(from_root), DATABASE_DIR); + join_path_components(from_fullpath, from_root, file->rel_path); + + in = fopen(from_fullpath, PG_BINARY_R); + if (in == NULL) + { + elog(ERROR, "Cannot open backup file \"%s\": %s", from_fullpath, + strerror(errno)); + } + + /* + * restore the file. + * We treat datafiles separately, cause they were backed up block by + * block and have BackupPageHeader meta information, so we cannot just + * copy the file from backup. + */ +// if (file->is_datafile && file->is_cfs) + restore_data_file_new(in, out, tmp_file, + parse_program_version(backup->program_version), + from_fullpath, to_fullpath, file->n_blocks); +// else if (strcmp(file->name, "pg_control") == 0) +// copy_pgcontrol_file(from_root, FIO_BACKUP_HOST, +// instance_config.pgdata, FIO_DB_HOST, +// file); +// else +// restore_non_data_file(in, out, tmp_file, from_fullpath, to_fullpath); + + if (fio_fclose(in) != 0) + elog(ERROR, "Cannot close file \"%s\": %s", from_fullpath, + strerror(errno)); + } + + done: + // chmod + // fsync + // close + + /* truncate file up to n_blocks. NOTE: no need, we just should not write + * blocks that are exceeding n_blocks. + * But for this to work, n_blocks should be trusted. + */ + + /* update file permission */ + if (fio_chmod(to_fullpath, file->mode, FIO_DB_HOST) == -1) + { + int errno_tmp = errno; + fio_fclose(out); + elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, + strerror(errno_tmp)); + } + + /* flush file */ + if (fio_fflush(out) != 0) + elog(ERROR, "Cannot flush file \"%s\": %s", to_fullpath, + strerror(errno)); + + /* fsync file */ + + /* close file */ + if (fio_fclose(out) != 0) + elog(ERROR, "Cannot close file \"%s\": %s", to_fullpath, + strerror(errno)); + + /* print size of restored file */ +// if (file->write_size != BYTES_INVALID) +// elog(VERBOSE, "Restored file %s : " INT64_FORMAT " bytes", +// file->path, file->write_size); + } + + /* Data files restoring is successful */ + arguments->ret = 0; + + return NULL; +} + /* * Restore one backup. */