1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-04 14:11:31 +02:00

Fix MERGE of compressed and uncompressed backups

This commit is contained in:
Arthur Zakirov 2018-12-07 17:24:21 +03:00
parent 59ab21e5c2
commit 2837399e6b
2 changed files with 51 additions and 67 deletions

View File

@ -131,7 +131,6 @@ do_merge(time_t backup_id)
{ {
pgBackup *from_backup = (pgBackup *) parray_get(backups, i - 1); pgBackup *from_backup = (pgBackup *) parray_get(backups, i - 1);
full_backup = (pgBackup *) parray_get(backups, i);
merge_backups(full_backup, from_backup); merge_backups(full_backup, from_backup);
} }
@ -168,8 +167,6 @@ merge_backups(pgBackup *to_backup, pgBackup *from_backup)
merge_files_arg *threads_args = NULL; merge_files_arg *threads_args = NULL;
int i; int i;
bool merge_isok = true; bool merge_isok = true;
int64 to_data_bytes,
to_wal_bytes;
elog(INFO, "Merging backup %s with backup %s", from_backup_id, to_backup_id); elog(INFO, "Merging backup %s with backup %s", from_backup_id, to_backup_id);
@ -283,29 +280,37 @@ merge_backups(pgBackup *to_backup, pgBackup *from_backup)
* Update to_backup metadata. * Update to_backup metadata.
*/ */
to_backup->status = BACKUP_STATUS_OK; to_backup->status = BACKUP_STATUS_OK;
to_backup->parent_backup = INVALID_BACKUP_ID;
to_backup->start_lsn = from_backup->start_lsn;
to_backup->stop_lsn = from_backup->stop_lsn;
to_backup->recovery_time = from_backup->recovery_time;
to_backup->recovery_xid = from_backup->recovery_xid;
/*
* If one of the backups isn't "stream" backup then the target backup become
* non-stream backup too.
*/
to_backup->stream = to_backup->stream && from_backup->stream;
/* Compute summary of size of regular files in the backup */ /* Compute summary of size of regular files in the backup */
to_data_bytes = 0; to_backup->data_bytes = 0;
for (i = 0; i < parray_num(files); i++) for (i = 0; i < parray_num(files); i++)
{ {
pgFile *file = (pgFile *) parray_get(files, i); pgFile *file = (pgFile *) parray_get(files, i);
if (S_ISDIR(file->mode)) if (S_ISDIR(file->mode))
to_data_bytes += 4096; to_backup->data_bytes += 4096;
/* Count the amount of the data actually copied */ /* Count the amount of the data actually copied */
else if (S_ISREG(file->mode)) else if (S_ISREG(file->mode))
to_data_bytes += file->write_size; to_backup->data_bytes += file->write_size;
} }
/* compute size of wal files of this backup stored in the archive */ /* compute size of wal files of this backup stored in the archive */
if (!to_backup->stream) if (!to_backup->stream)
to_wal_bytes = instance_config.xlog_seg_size * to_backup->wal_bytes = instance_config.xlog_seg_size *
(to_backup->stop_lsn / instance_config.xlog_seg_size - (to_backup->stop_lsn / instance_config.xlog_seg_size -
to_backup->start_lsn / instance_config.xlog_seg_size + 1); to_backup->start_lsn / instance_config.xlog_seg_size + 1);
else else
to_wal_bytes = BYTES_INVALID; to_backup->wal_bytes = BYTES_INVALID;
write_backup_filelist(to_backup, files, from_database_path); write_backup_filelist(to_backup, files, from_database_path);
to_backup->data_bytes = to_data_bytes;
to_backup->wal_bytes = to_wal_bytes;
write_backup(to_backup); write_backup(to_backup);
delete_source_backup: delete_source_backup:
@ -330,27 +335,6 @@ delete_source_backup:
} }
} }
/*
* Rename FULL backup directory.
*/
elog(INFO, "Rename %s to %s", to_backup_id, from_backup_id);
if (rename(to_backup_path, from_backup_path) == -1)
elog(ERROR, "Could not rename directory \"%s\" to \"%s\": %s",
to_backup_path, from_backup_path, strerror(errno));
/*
* Merging finished, now we can safely update ID of the destination backup.
*/
pgBackupCopy(to_backup, from_backup);
/* Correct metadata */
to_backup->backup_mode = BACKUP_MODE_FULL;
to_backup->status = BACKUP_STATUS_OK;
to_backup->parent_backup = INVALID_BACKUP_ID;
/* Restore sizes */
to_backup->data_bytes = to_data_bytes;
to_backup->wal_bytes = to_wal_bytes;
write_backup(to_backup);
/* Cleanup */ /* Cleanup */
if (threads) if (threads)
{ {
@ -384,6 +368,8 @@ merge_files(void *arg)
for (i = 0; i < num_files; i++) for (i = 0; i < num_files; i++)
{ {
pgFile *file = (pgFile *) parray_get(argument->files, i); pgFile *file = (pgFile *) parray_get(argument->files, i);
pgFile *to_file;
pgFile **res_file;
if (!pg_atomic_test_set_flag(&file->lock)) if (!pg_atomic_test_set_flag(&file->lock))
continue; continue;
@ -392,17 +378,24 @@ merge_files(void *arg)
if (interrupted) if (interrupted)
elog(ERROR, "Interrupted during merging backups"); elog(ERROR, "Interrupted during merging backups");
/* Directories were created before */
if (S_ISDIR(file->mode))
continue;
if (progress) if (progress)
elog(INFO, "Progress: (%d/%d). Process file \"%s\"", elog(INFO, "Progress: (%d/%d). Process file \"%s\"",
i + 1, num_files, file->path); i + 1, num_files, file->path);
res_file = parray_bsearch(argument->to_files, file,
pgFileComparePathDesc);
to_file = (res_file) ? *res_file : NULL;
/* /*
* Skip files which haven't changed since previous backup. But in case * Skip files which haven't changed since previous backup. But in case
* of DELTA backup we should consider n_blocks to truncate the target * of DELTA backup we should consider n_blocks to truncate the target
* backup. * backup.
*/ */
if (file->write_size == BYTES_INVALID && if (file->write_size == BYTES_INVALID && file->n_blocks == -1)
file->n_blocks == -1)
{ {
elog(VERBOSE, "Skip merging file \"%s\", the file didn't change", elog(VERBOSE, "Skip merging file \"%s\", the file didn't change",
file->path); file->path);
@ -411,27 +404,16 @@ merge_files(void *arg)
* If the file wasn't changed in PAGE backup, retreive its * If the file wasn't changed in PAGE backup, retreive its
* write_size from previous FULL backup. * write_size from previous FULL backup.
*/ */
if (S_ISREG(file->mode)) if (to_file)
{ {
pgFile **res_file; file->compress_alg = to_file->compress_alg;
file->write_size = to_file->write_size;
res_file = parray_bsearch(argument->to_files, file, file->crc = to_file->crc;
pgFileComparePathDesc);
if (res_file && *res_file)
{
file->compress_alg = (*res_file)->compress_alg;
file->write_size = (*res_file)->write_size;
file->crc = (*res_file)->crc;
}
} }
continue; continue;
} }
/* Directories were created before */
if (S_ISDIR(file->mode))
continue;
/* /*
* Move the file. We need to decompress it and compress again if * Move the file. We need to decompress it and compress again if
* necessary. * necessary.
@ -447,7 +429,7 @@ merge_files(void *arg)
file->path + to_root_len + 1); file->path + to_root_len + 1);
/* /*
* We need more complicate algorithm if target file exists and it is * We need more complicate algorithm if target file should be
* compressed. * compressed.
*/ */
if (to_backup->compress_alg == PGLZ_COMPRESS || if (to_backup->compress_alg == PGLZ_COMPRESS ||
@ -462,33 +444,36 @@ merge_files(void *arg)
/* /*
* Merge files: * Merge files:
* - decompress first file * - if target file exists restore and decompress it to the temp
* - decompress second file and merge with first decompressed file * path
* - decompress source file if necessary and merge it with the
* target decompressed file
* - compress result file * - compress result file
*/ */
elog(VERBOSE, "File is compressed, decompress to the temporary file \"%s\"",
tmp_file_path);
prev_path = file->path;
/* /*
* We need to decompress target file only if it exists. * We need to decompress target file if it exists.
*/ */
if (fileExists(to_path_tmp)) if (to_file)
{ {
elog(VERBOSE, "Merge target and source files into the temporary path \"%s\"",
tmp_file_path);
/* /*
* file->path points to the file in from_root directory. But we * file->path points to the file in from_root directory. But we
* need the file in directory to_root. * need the file in directory to_root.
*/ */
file->path = to_path_tmp; prev_path = to_file->path;
to_file->path = to_path_tmp;
/* Decompress first/target file */ /* Decompress target file into temporary one */
restore_data_file(tmp_file_path, file, false, false, restore_data_file(tmp_file_path, to_file, false, false,
parse_program_version(to_backup->program_version)); parse_program_version(to_backup->program_version));
to_file->path = prev_path;
file->path = prev_path;
} }
/* Merge second/source file with first/target file */ else
elog(VERBOSE, "Restore source file into the temporary path \"%s\"",
tmp_file_path);
/* Merge source file with target file */
restore_data_file(tmp_file_path, file, restore_data_file(tmp_file_path, file,
from_backup->backup_mode == BACKUP_MODE_DIFF_DELTA, from_backup->backup_mode == BACKUP_MODE_DIFF_DELTA,
false, false,
@ -497,7 +482,8 @@ merge_files(void *arg)
elog(VERBOSE, "Compress file and save it to the directory \"%s\"", elog(VERBOSE, "Compress file and save it to the directory \"%s\"",
argument->to_root); argument->to_root);
/* Again we need change path */ /* Again we need to change path */
prev_path = file->path;
file->path = tmp_file_path; file->path = tmp_file_path;
/* backup_data_file() requires file size to calculate nblocks */ /* backup_data_file() requires file size to calculate nblocks */
file->size = pgFileSize(file->path); file->size = pgFileSize(file->path);

View File

@ -875,8 +875,6 @@ class MergeTest(ProbackupTest, unittest.TestCase):
repr(self.output), self.cmd)) repr(self.output), self.cmd))
except ProbackupException as e: except ProbackupException as e:
self.assertTrue( self.assertTrue(
"WARNING: Backup {0} data files are corrupted".format(
backup_id) in e.message and
"ERROR: Merging of backup {0} failed".format( "ERROR: Merging of backup {0} failed".format(
backup_id) in e.message, backup_id) in e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format( '\n Unexpected Error Message: {0}\n CMD: {1}'.format(