1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-07 13:40:17 +02:00

Fix MERGE of compressed and uncompressed backups

This commit is contained in:
Arthur Zakirov 2018-12-07 17:24:21 +03:00 committed by Anastasia
parent b36c9c7180
commit affc01012f
2 changed files with 51 additions and 67 deletions

View File

@ -131,7 +131,6 @@ do_merge(time_t backup_id)
{
pgBackup *from_backup = (pgBackup *) parray_get(backups, i - 1);
full_backup = (pgBackup *) parray_get(backups, i);
merge_backups(full_backup, from_backup);
}
@ -168,8 +167,6 @@ merge_backups(pgBackup *to_backup, pgBackup *from_backup)
merge_files_arg *threads_args = NULL;
int i;
bool merge_isok = true;
int64 to_data_bytes,
to_wal_bytes;
elog(INFO, "Merging backup %s with backup %s", from_backup_id, to_backup_id);
@ -283,29 +280,37 @@ merge_backups(pgBackup *to_backup, pgBackup *from_backup)
* Update to_backup metadata.
*/
to_backup->status = BACKUP_STATUS_OK;
to_backup->parent_backup = INVALID_BACKUP_ID;
to_backup->start_lsn = from_backup->start_lsn;
to_backup->stop_lsn = from_backup->stop_lsn;
to_backup->recovery_time = from_backup->recovery_time;
to_backup->recovery_xid = from_backup->recovery_xid;
/*
* If one of the backups isn't "stream" backup then the target backup become
* non-stream backup too.
*/
to_backup->stream = to_backup->stream && from_backup->stream;
/* Compute summary of size of regular files in the backup */
to_data_bytes = 0;
to_backup->data_bytes = 0;
for (i = 0; i < parray_num(files); i++)
{
pgFile *file = (pgFile *) parray_get(files, i);
if (S_ISDIR(file->mode))
to_data_bytes += 4096;
to_backup->data_bytes += 4096;
/* Count the amount of the data actually copied */
else if (S_ISREG(file->mode))
to_data_bytes += file->write_size;
to_backup->data_bytes += file->write_size;
}
/* compute size of wal files of this backup stored in the archive */
if (!to_backup->stream)
to_wal_bytes = instance_config.xlog_seg_size *
to_backup->wal_bytes = instance_config.xlog_seg_size *
(to_backup->stop_lsn / instance_config.xlog_seg_size -
to_backup->start_lsn / instance_config.xlog_seg_size + 1);
else
to_wal_bytes = BYTES_INVALID;
to_backup->wal_bytes = BYTES_INVALID;
write_backup_filelist(to_backup, files, from_database_path);
to_backup->data_bytes = to_data_bytes;
to_backup->wal_bytes = to_wal_bytes;
write_backup(to_backup);
delete_source_backup:
@ -330,27 +335,6 @@ delete_source_backup:
}
}
/*
* Rename FULL backup directory.
*/
elog(INFO, "Rename %s to %s", to_backup_id, from_backup_id);
if (rename(to_backup_path, from_backup_path) == -1)
elog(ERROR, "Could not rename directory \"%s\" to \"%s\": %s",
to_backup_path, from_backup_path, strerror(errno));
/*
* Merging finished, now we can safely update ID of the destination backup.
*/
pgBackupCopy(to_backup, from_backup);
/* Correct metadata */
to_backup->backup_mode = BACKUP_MODE_FULL;
to_backup->status = BACKUP_STATUS_OK;
to_backup->parent_backup = INVALID_BACKUP_ID;
/* Restore sizes */
to_backup->data_bytes = to_data_bytes;
to_backup->wal_bytes = to_wal_bytes;
write_backup(to_backup);
/* Cleanup */
if (threads)
{
@ -384,6 +368,8 @@ merge_files(void *arg)
for (i = 0; i < num_files; i++)
{
pgFile *file = (pgFile *) parray_get(argument->files, i);
pgFile *to_file;
pgFile **res_file;
if (!pg_atomic_test_set_flag(&file->lock))
continue;
@ -392,17 +378,24 @@ merge_files(void *arg)
if (interrupted)
elog(ERROR, "Interrupted during merging backups");
/* Directories were created before */
if (S_ISDIR(file->mode))
continue;
if (progress)
elog(INFO, "Progress: (%d/%d). Process file \"%s\"",
i + 1, num_files, file->path);
res_file = parray_bsearch(argument->to_files, file,
pgFileComparePathDesc);
to_file = (res_file) ? *res_file : NULL;
/*
* Skip files which haven't changed since previous backup. But in case
* of DELTA backup we should consider n_blocks to truncate the target
* backup.
*/
if (file->write_size == BYTES_INVALID &&
file->n_blocks == -1)
if (file->write_size == BYTES_INVALID && file->n_blocks == -1)
{
elog(VERBOSE, "Skip merging file \"%s\", the file didn't change",
file->path);
@ -411,27 +404,16 @@ merge_files(void *arg)
* If the file wasn't changed in PAGE backup, retreive its
* write_size from previous FULL backup.
*/
if (S_ISREG(file->mode))
if (to_file)
{
pgFile **res_file;
res_file = parray_bsearch(argument->to_files, file,
pgFileComparePathDesc);
if (res_file && *res_file)
{
file->compress_alg = (*res_file)->compress_alg;
file->write_size = (*res_file)->write_size;
file->crc = (*res_file)->crc;
}
file->compress_alg = to_file->compress_alg;
file->write_size = to_file->write_size;
file->crc = to_file->crc;
}
continue;
}
/* Directories were created before */
if (S_ISDIR(file->mode))
continue;
/*
* Move the file. We need to decompress it and compress again if
* necessary.
@ -447,7 +429,7 @@ merge_files(void *arg)
file->path + to_root_len + 1);
/*
* We need more complicate algorithm if target file exists and it is
* We need more complicate algorithm if target file should be
* compressed.
*/
if (to_backup->compress_alg == PGLZ_COMPRESS ||
@ -462,33 +444,36 @@ merge_files(void *arg)
/*
* Merge files:
* - decompress first file
* - decompress second file and merge with first decompressed file
* - if target file exists restore and decompress it to the temp
* path
* - decompress source file if necessary and merge it with the
* target decompressed file
* - compress result file
*/
elog(VERBOSE, "File is compressed, decompress to the temporary file \"%s\"",
tmp_file_path);
prev_path = file->path;
/*
* We need to decompress target file only if it exists.
* We need to decompress target file if it exists.
*/
if (fileExists(to_path_tmp))
if (to_file)
{
elog(VERBOSE, "Merge target and source files into the temporary path \"%s\"",
tmp_file_path);
/*
* file->path points to the file in from_root directory. But we
* need the file in directory to_root.
*/
file->path = to_path_tmp;
/* Decompress first/target file */
restore_data_file(tmp_file_path, file, false, false,
prev_path = to_file->path;
to_file->path = to_path_tmp;
/* Decompress target file into temporary one */
restore_data_file(tmp_file_path, to_file, false, false,
parse_program_version(to_backup->program_version));
file->path = prev_path;
to_file->path = prev_path;
}
/* Merge second/source file with first/target file */
else
elog(VERBOSE, "Restore source file into the temporary path \"%s\"",
tmp_file_path);
/* Merge source file with target file */
restore_data_file(tmp_file_path, file,
from_backup->backup_mode == BACKUP_MODE_DIFF_DELTA,
false,
@ -497,7 +482,8 @@ merge_files(void *arg)
elog(VERBOSE, "Compress file and save it to the directory \"%s\"",
argument->to_root);
/* Again we need change path */
/* Again we need to change path */
prev_path = file->path;
file->path = tmp_file_path;
/* backup_data_file() requires file size to calculate nblocks */
file->size = pgFileSize(file->path);

View File

@ -875,8 +875,6 @@ class MergeTest(ProbackupTest, unittest.TestCase):
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertTrue(
"WARNING: Backup {0} data files are corrupted".format(
backup_id) in e.message and
"ERROR: Merging of backup {0} failed".format(
backup_id) in e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(