1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-24 08:52:38 +02:00

PGPRO-1449: Add MERGE command: merge PAGE with FULL backup

This commit is contained in:
Arthur Zakirov 2018-08-02 11:57:39 +03:00
parent e7d088c238
commit 4ed28cf6e5
18 changed files with 1657 additions and 547 deletions

View File

@ -5,7 +5,7 @@ OBJS = src/backup.o src/catalog.o src/configure.o src/data.o \
src/util.o src/validate.o src/datapagemap.o src/parsexlog.o \
src/xlogreader.o src/streamutil.o src/receivelog.o \
src/archive.o src/utils/parray.o src/utils/pgut.o src/utils/logger.o \
src/utils/json.o src/utils/thread.o
src/utils/json.o src/utils/thread.o src/merge.o
EXTRA_CLEAN = src/datapagemap.c src/datapagemap.h src/xlogreader.c \
src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h src/logging.h

View File

@ -104,7 +104,6 @@ static int checkpoint_timeout(void);
//static void backup_list_file(parray *files, const char *root, )
static void parse_backup_filelist_filenames(parray *files, const char *root);
static void write_backup_file_list(parray *files, const char *root);
static void wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment);
static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup);
static void make_pagemap_from_ptrack(parray *files);
@ -515,8 +514,6 @@ do_backup_instance(void)
/* get list of backups already taken */
backup_list = catalog_get_backup_list(INVALID_BACKUP_ID);
if (backup_list == NULL)
elog(ERROR, "Failed to get backup list.");
prev_backup = catalog_get_last_data_backup(backup_list, current.tli);
if (prev_backup == NULL)
@ -697,8 +694,11 @@ do_backup_instance(void)
pg_atomic_clear_flag(&file->lock);
}
/* sort by size for load balancing */
/* Sort by size for load balancing */
parray_qsort(backup_files_list, pgFileCompareSize);
/* Sort the array for binary search */
if (prev_backup_filelist)
parray_qsort(prev_backup_filelist, pgFileComparePath);
/* init thread args with own file lists */
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
@ -788,7 +788,7 @@ do_backup_instance(void)
}
/* Print the list of files to backup catalog */
write_backup_file_list(backup_files_list, pgdata);
pgBackupWriteFileList(&current, backup_files_list, pgdata);
/* Compute summary of size of regular files in the backup */
for (i = 0; i < parray_num(backup_files_list); i++)
@ -2076,35 +2076,32 @@ backup_files(void *arg)
/* Check that file exist in previous backup */
if (current.backup_mode != BACKUP_MODE_FULL)
{
int p;
char *relative;
int n_prev_files = parray_num(arguments->prev_filelist);
pgFile key;
pgFile **prev_file;
relative = GetRelativePath(file->path, arguments->from_root);
for (p = 0; p < n_prev_files; p++)
{
pgFile *prev_file;
key.path = relative;
prev_file = (pgFile *) parray_get(arguments->prev_filelist, p);
if (strcmp(relative, prev_file->path) == 0)
{
/* File exists in previous backup */
file->exists_in_prev = true;
// elog(VERBOSE, "File exists at the time of previous backup %s", relative);
break;
}
}
prev_file = (pgFile **) parray_bsearch(arguments->prev_filelist,
&key, pgFileComparePath);
if (prev_file)
/* File exists in previous backup */
file->exists_in_prev = true;
}
/* copy the file into backup */
if (file->is_datafile && !file->is_cfs)
{
char to_path[MAXPGPATH];
join_path_components(to_path, arguments->to_root,
file->path + strlen(arguments->from_root) + 1);
/* backup block by block if datafile AND not compressed by cfs*/
if (!backup_data_file(arguments,
arguments->from_root,
arguments->to_root, file,
if (!backup_data_file(arguments, to_path, file,
arguments->prev_start_lsn,
current.backup_mode))
current.backup_mode,
compress_alg, compress_level))
{
file->write_size = BYTES_INVALID;
elog(VERBOSE, "File \"%s\" was not copied to backup", file->path);
@ -2279,31 +2276,6 @@ set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i)
free(cfs_tblspc_path);
}
/*
* Output the list of files to backup catalog DATABASE_FILE_LIST
*/
static void
write_backup_file_list(parray *files, const char *root)
{
FILE *fp;
char path[MAXPGPATH];
pgBackupGetPath(&current, path, lengthof(path), DATABASE_FILE_LIST);
fp = fopen(path, "wt");
if (fp == NULL)
elog(ERROR, "cannot open file list \"%s\": %s", path,
strerror(errno));
print_file_list(fp, files, root);
if (fflush(fp) != 0 ||
fsync(fileno(fp)) != 0 ||
fclose(fp))
elog(ERROR, "cannot write file list \"%s\": %s", path, strerror(errno));
}
/*
* Find pgfile by given rnode in the backup_files_list
* and add given blkno to its pagemap.

View File

@ -325,6 +325,9 @@ err_proc:
if (backups)
parray_walk(backups, pgBackupFree);
parray_free(backups);
elog(ERROR, "Failed to get backup list");
return NULL;
}
@ -389,7 +392,7 @@ pgBackupWriteControl(FILE *out, pgBackup *backup)
deparse_compress_alg(backup->compress_alg));
fprintf(out, "compress-level = %d\n", backup->compress_level);
fprintf(out, "from-replica = %s\n", backup->from_replica ? "true" : "false");
fprintf(out, "\n#Compatibility\n");
fprintf(out, "block-size = %u\n", backup->block_size);
fprintf(out, "xlog-block-size = %u\n", backup->wal_block_size);
@ -462,6 +465,30 @@ pgBackupWriteBackupControlFile(pgBackup *backup)
fclose(fp);
}
/*
* Output the list of files to backup catalog DATABASE_FILE_LIST
*/
void
pgBackupWriteFileList(pgBackup *backup, parray *files, const char *root)
{
FILE *fp;
char path[MAXPGPATH];
pgBackupGetPath(backup, path, lengthof(path), DATABASE_FILE_LIST);
fp = fopen(path, "wt");
if (fp == NULL)
elog(ERROR, "cannot open file list \"%s\": %s", path,
strerror(errno));
print_file_list(fp, files, root);
if (fflush(fp) != 0 ||
fsync(fileno(fp)) != 0 ||
fclose(fp))
elog(ERROR, "cannot write file list \"%s\": %s", path, strerror(errno));
}
/*
* Read BACKUP_CONTROL_FILE and create pgBackup.
* - Comment starts with ';'.
@ -515,7 +542,7 @@ readBackupControlFile(const char *path)
return NULL;
}
pgBackup_init(backup);
pgBackupInit(backup);
parsed_options = pgut_readopt(path, options, WARNING);
if (parsed_options == 0)
@ -566,10 +593,12 @@ readBackupControlFile(const char *path)
{
if (strcmp(status, "OK") == 0)
backup->status = BACKUP_STATUS_OK;
else if (strcmp(status, "RUNNING") == 0)
backup->status = BACKUP_STATUS_RUNNING;
else if (strcmp(status, "ERROR") == 0)
backup->status = BACKUP_STATUS_ERROR;
else if (strcmp(status, "RUNNING") == 0)
backup->status = BACKUP_STATUS_RUNNING;
else if (strcmp(status, "MERGING") == 0)
backup->status = BACKUP_STATUS_MERGING;
else if (strcmp(status, "DELETING") == 0)
backup->status = BACKUP_STATUS_DELETING;
else if (strcmp(status, "DELETED") == 0)
@ -698,11 +727,63 @@ deparse_compress_alg(int alg)
return NULL;
}
/*
* Fill pgBackup struct with default values.
*/
void
pgBackupInit(pgBackup *backup)
{
backup->backup_id = INVALID_BACKUP_ID;
backup->backup_mode = BACKUP_MODE_INVALID;
backup->status = BACKUP_STATUS_INVALID;
backup->tli = 0;
backup->start_lsn = 0;
backup->stop_lsn = 0;
backup->start_time = (time_t) 0;
backup->end_time = (time_t) 0;
backup->recovery_xid = 0;
backup->recovery_time = (time_t) 0;
backup->data_bytes = BYTES_INVALID;
backup->wal_bytes = BYTES_INVALID;
backup->compress_alg = COMPRESS_ALG_DEFAULT;
backup->compress_level = COMPRESS_LEVEL_DEFAULT;
backup->block_size = BLCKSZ;
backup->wal_block_size = XLOG_BLCKSZ;
backup->checksum_version = 0;
backup->stream = false;
backup->from_replica = false;
backup->parent_backup = INVALID_BACKUP_ID;
backup->primary_conninfo = NULL;
backup->program_version[0] = '\0';
backup->server_version[0] = '\0';
}
/*
* Copy backup metadata from **src** into **dst**.
*/
void
pgBackupCopy(pgBackup *dst, pgBackup *src)
{
pfree(dst->primary_conninfo);
memcpy(dst, src, sizeof(pgBackup));
if (src->primary_conninfo)
dst->primary_conninfo = pstrdup(src->primary_conninfo);
}
/* free pgBackup object */
void
pgBackupFree(void *backup)
{
free(backup);
pgBackup *b = (pgBackup *) backup;
pfree(b->primary_conninfo);
pfree(backup);
}
/* Compare two pgBackup with their IDs (start time) in ascending order */

View File

@ -27,18 +27,24 @@
#ifdef HAVE_LIBZ
/* Implementation of zlib compression method */
static int32 zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
static int32
zlib_compress(void *dst, size_t dst_size, void const *src, size_t src_size,
int level)
{
uLongf compressed_size = dst_size;
int rc = compress2(dst, &compressed_size, src, src_size, compress_level);
uLongf compressed_size = dst_size;
int rc = compress2(dst, &compressed_size, src, src_size,
level);
return rc == Z_OK ? compressed_size : rc;
}
/* Implementation of zlib compression method */
static int32 zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size)
static int32
zlib_decompress(void *dst, size_t dst_size, void const *src, size_t src_size)
{
uLongf dest_len = dst_size;
int rc = uncompress(dst, &dest_len, src, src_size);
uLongf dest_len = dst_size;
int rc = uncompress(dst, &dest_len, src, src_size);
return rc == Z_OK ? dest_len : rc;
}
#endif
@ -48,7 +54,8 @@ static int32 zlib_decompress(void* dst, size_t dst_size, void const* src, size_t
* written in the destination buffer, or -1 if compression fails.
*/
static int32
do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, int level)
{
switch (alg)
{
@ -57,7 +64,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, Compre
return -1;
#ifdef HAVE_LIBZ
case ZLIB_COMPRESS:
return zlib_compress(dst, dst_size, src, src_size);
return zlib_compress(dst, dst_size, src, src_size, level);
#endif
case PGLZ_COMPRESS:
return pglz_compress(src, src_size, dst, PGLZ_strategy_always);
@ -71,7 +78,8 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, Compre
* decompressed in the destination buffer, or -1 if decompression fails.
*/
static int32
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg)
{
switch (alg)
{
@ -350,7 +358,8 @@ prepare_page(backup_files_arg *arguments,
static void
compress_and_backup_page(pgFile *file, BlockNumber blknum,
FILE *in, FILE *out, pg_crc32 *crc,
int page_state, Page page)
int page_state, Page page,
CompressAlg calg, int clevel)
{
BackupPageHeader header;
size_t write_buffer_size = sizeof(header);
@ -375,9 +384,9 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
{
/* The page was not truncated, so we need to compress it */
header.compressed_size = do_compress(compressed_page, BLCKSZ,
page, BLCKSZ, compress_alg);
page, BLCKSZ, calg, clevel);
file->compress_alg = compress_alg;
file->compress_alg = calg;
file->read_size += BLCKSZ;
Assert (header.compressed_size <= BLCKSZ);
@ -386,7 +395,7 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
{
memcpy(write_buffer, &header, sizeof(header));
memcpy(write_buffer + sizeof(header),
compressed_page, header.compressed_size);
compressed_page, header.compressed_size);
write_buffer_size += MAXALIGN(header.compressed_size);
}
/* Nonpositive value means that compression failed. Write it as is. */
@ -408,11 +417,12 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
/* write data page */
if(fwrite(write_buffer, 1, write_buffer_size, out) != write_buffer_size)
{
int errno_tmp = errno;
int errno_tmp = errno;
fclose(in);
fclose(out);
elog(ERROR, "File: %s, cannot write backup at block %u : %s",
file->path, blknum, strerror(errno_tmp));
file->path, blknum, strerror(errno_tmp));
}
file->write_size += write_buffer_size;
@ -428,11 +438,10 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
*/
bool
backup_data_file(backup_files_arg* arguments,
const char *from_root, const char *to_root,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BackupMode backup_mode)
const char *to_path, pgFile *file,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel)
{
char to_path[MAXPGPATH];
FILE *in;
FILE *out;
BlockNumber blknum = 0;
@ -499,7 +508,6 @@ backup_data_file(backup_files_arg* arguments,
nblocks = file->size/BLCKSZ;
/* open backup file for write */
join_path_components(to_path, to_root, file->path + strlen(from_root) + 1);
out = fopen(to_path, PG_BINARY_W);
if (out == NULL)
{
@ -522,10 +530,10 @@ backup_data_file(backup_files_arg* arguments,
for (blknum = 0; blknum < nblocks; blknum++)
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page);
page_state, curr_page, calg, clevel);
n_blocks_read++;
if (page_state == PageIsTruncated)
break;
@ -545,10 +553,10 @@ backup_data_file(backup_files_arg* arguments,
while (datapagemap_next(iter, &blknum))
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page);
page_state, curr_page, calg, clevel);
n_blocks_read++;
if (page_state == PageIsTruncated)
break;
@ -595,14 +603,14 @@ backup_data_file(backup_files_arg* arguments,
/*
* Restore files in the from_root directory to the to_root directory with
* same relative path.
*
* If write_header is true then we add header to each restored block, currently
* it is used for MERGE command.
*/
void
restore_data_file(const char *from_root,
const char *to_root,
pgFile *file,
pgBackup *backup)
restore_data_file(const char *to_path, pgFile *file, bool allow_truncate,
bool write_header)
{
char to_path[MAXPGPATH];
FILE *in = NULL;
FILE *out = NULL;
BackupPageHeader header;
@ -627,7 +635,6 @@ restore_data_file(const char *from_root,
* modified pages for differential restore. If the file does not exist,
* re-open it with "w" to create an empty file.
*/
join_path_components(to_path, to_root, file->path + strlen(from_root) + 1);
out = fopen(to_path, PG_BINARY_R "+");
if (out == NULL && errno == ENOENT)
out = fopen(to_path, PG_BINARY_W);
@ -641,6 +648,7 @@ restore_data_file(const char *from_root,
while (true)
{
off_t write_pos;
size_t read_len;
DataPage compressed_page; /* used as read buffer */
DataPage page;
@ -682,7 +690,7 @@ restore_data_file(const char *from_root,
if (header.block < blknum)
elog(ERROR, "backup is broken at file->path %s block %u",
file->path, blknum);
file->path, blknum);
blknum = header.block;
@ -707,37 +715,47 @@ restore_data_file(const char *from_root,
if (header.compressed_size != BLCKSZ)
{
size_t uncompressed_size = 0;
int32 uncompressed_size = 0;
uncompressed_size = do_decompress(page.data, BLCKSZ,
compressed_page.data,
header.compressed_size,
file->compress_alg);
compressed_page.data,
MAXALIGN(header.compressed_size),
file->compress_alg);
if (uncompressed_size != BLCKSZ)
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ",
uncompressed_size);
elog(ERROR, "page of file \"%s\" uncompressed to %d bytes. != BLCKSZ",
file->path, uncompressed_size);
}
write_pos = (write_header) ? blknum * (BLCKSZ + sizeof(header)) :
blknum * BLCKSZ;
/*
* Seek and write the restored page.
*/
if (fseek(out, blknum * BLCKSZ, SEEK_SET) < 0)
if (fseek(out, write_pos, SEEK_SET) < 0)
elog(ERROR, "cannot seek block %u of \"%s\": %s",
blknum, to_path, strerror(errno));
if (write_header)
{
if (fwrite(&header, 1, sizeof(header), out) != sizeof(header))
elog(ERROR, "cannot write header of block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
}
if (header.compressed_size < BLCKSZ)
{
if (fwrite(page.data, 1, BLCKSZ, out) != BLCKSZ)
elog(ERROR, "cannot write block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
blknum, file->path, strerror(errno));
}
else
{
/* if page wasn't compressed, we've read full block */
if (fwrite(compressed_page.data, 1, BLCKSZ, out) != BLCKSZ)
elog(ERROR, "cannot write block %u of \"%s\": %s",
blknum, file->path, strerror(errno));
blknum, file->path, strerror(errno));
}
}
@ -748,8 +766,7 @@ restore_data_file(const char *from_root,
* So when restoring file from DELTA backup we, knowning it`s size at
* a time of a backup, can truncate file to this size.
*/
if (backup->backup_mode == BACKUP_MODE_DIFF_DELTA &&
file->n_blocks != BLOCKNUM_INVALID && !need_truncate)
if (allow_truncate && file->n_blocks != BLOCKNUM_INVALID && !need_truncate)
{
size_t file_size = 0;
@ -766,10 +783,15 @@ restore_data_file(const char *from_root,
if (need_truncate)
{
off_t write_pos;
write_pos = (write_header) ? truncate_from * (BLCKSZ + sizeof(header)) :
truncate_from * BLCKSZ;
/*
* Truncate file to this length.
*/
if (ftruncate(fileno(out), truncate_from * BLCKSZ) != 0)
if (ftruncate(fileno(out), write_pos) != 0)
elog(ERROR, "cannot truncate \"%s\": %s",
file->path, strerror(errno));
elog(INFO, "Delta truncate file %s to block %u",
@ -928,6 +950,22 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
return true;
}
/*
* Move file from one backup to another.
* We do not apply compression to these files, because
* it is either small control file or already compressed cfs file.
*/
void
move_file(const char *from_root, const char *to_root, pgFile *file)
{
char to_path[MAXPGPATH];
join_path_components(to_path, to_root, file->path + strlen(from_root) + 1);
if (rename(file->path, to_path) == -1)
elog(ERROR, "Cannot move file \"%s\" to path \"%s\": %s",
file->path, to_path, strerror(errno));
}
#ifdef HAVE_LIBZ
/*
* Show error during work with compressed file

View File

@ -33,8 +33,6 @@ do_delete(time_t backup_id)
/* Get complete list of backups */
backup_list = catalog_get_backup_list(INVALID_BACKUP_ID);
if (backup_list == NULL)
elog(ERROR, "Failed to get backup list.");
if (backup_id != 0)
{

393
src/dir.c
View File

@ -87,6 +87,34 @@ static char *pgdata_exclude_files_non_exclusive[] =
NULL
};
/* Tablespace mapping structures */
typedef struct TablespaceListCell
{
struct TablespaceListCell *next;
char old_dir[MAXPGPATH];
char new_dir[MAXPGPATH];
} TablespaceListCell;
typedef struct TablespaceList
{
TablespaceListCell *head;
TablespaceListCell *tail;
} TablespaceList;
typedef struct TablespaceCreatedListCell
{
struct TablespaceCreatedListCell *next;
char link_name[MAXPGPATH];
char linked_dir[MAXPGPATH];
} TablespaceCreatedListCell;
typedef struct TablespaceCreatedList
{
TablespaceCreatedListCell *head;
TablespaceCreatedListCell *tail;
} TablespaceCreatedList;
static int BlackListCompare(const void *str1, const void *str2);
static bool dir_check_file(const char *root, pgFile *file);
@ -94,6 +122,13 @@ static void dir_list_file_internal(parray *files, const char *root,
pgFile *parent, bool exclude,
bool omit_symlink, parray *black_list);
static void list_data_directories(parray *files, const char *path, bool is_root,
bool exclude);
/* Tablespace mapping */
static TablespaceList tablespace_dirs = {NULL, NULL};
static TablespaceCreatedList tablespace_created_dirs = {NULL, NULL};
/*
* Create directory, also create parent directories if necessary.
*/
@ -225,7 +260,7 @@ delete_file:
}
pg_crc32
pgFileGetCRC(pgFile *file)
pgFileGetCRC(const char *file_path)
{
FILE *fp;
pg_crc32 crc = 0;
@ -234,10 +269,10 @@ pgFileGetCRC(pgFile *file)
int errno_tmp;
/* open file in binary read mode */
fp = fopen(file->path, PG_BINARY_R);
fp = fopen(file_path, PG_BINARY_R);
if (fp == NULL)
elog(ERROR, "cannot open file \"%s\": %s",
file->path, strerror(errno));
file_path, strerror(errno));
/* calc CRC of backup file */
INIT_CRC32C(crc);
@ -249,7 +284,7 @@ pgFileGetCRC(pgFile *file)
}
errno_tmp = errno;
if (!feof(fp))
elog(WARNING, "cannot read \"%s\": %s", file->path,
elog(WARNING, "cannot read \"%s\": %s", file_path,
strerror(errno_tmp));
if (len > 0)
COMP_CRC32C(crc, buf, len);
@ -677,7 +712,7 @@ dir_list_file_internal(parray *files, const char *root, pgFile *parent,
* **is_root** is a little bit hack. We exclude only first level of directories
* and on the first level we check all files and directories.
*/
void
static void
list_data_directories(parray *files, const char *path, bool is_root,
bool exclude)
{
@ -749,6 +784,277 @@ list_data_directories(parray *files, const char *path, bool is_root,
path, strerror(prev_errno));
}
/*
* Save create directory path into memory. We can use it in next page restore to
* not raise the error "restore tablespace destination is not empty" in
* create_data_directories().
*/
static void
set_tablespace_created(const char *link, const char *dir)
{
TablespaceCreatedListCell *cell = pgut_new(TablespaceCreatedListCell);
strcpy(cell->link_name, link);
strcpy(cell->linked_dir, dir);
cell->next = NULL;
if (tablespace_created_dirs.tail)
tablespace_created_dirs.tail->next = cell;
else
tablespace_created_dirs.head = cell;
tablespace_created_dirs.tail = cell;
}
/*
* Retrieve tablespace path, either relocated or original depending on whether
* -T was passed or not.
*
* Copy of function get_tablespace_mapping() from pg_basebackup.c.
*/
static const char *
get_tablespace_mapping(const char *dir)
{
TablespaceListCell *cell;
for (cell = tablespace_dirs.head; cell; cell = cell->next)
if (strcmp(dir, cell->old_dir) == 0)
return cell->new_dir;
return dir;
}
/*
* Is directory was created when symlink was created in restore_directories().
*/
static const char *
get_tablespace_created(const char *link)
{
TablespaceCreatedListCell *cell;
for (cell = tablespace_created_dirs.head; cell; cell = cell->next)
if (strcmp(link, cell->link_name) == 0)
return cell->linked_dir;
return NULL;
}
/*
* Split argument into old_dir and new_dir and append to tablespace mapping
* list.
*
* Copy of function tablespace_list_append() from pg_basebackup.c.
*/
void
opt_tablespace_map(pgut_option *opt, const char *arg)
{
TablespaceListCell *cell = pgut_new(TablespaceListCell);
char *dst;
char *dst_ptr;
const char *arg_ptr;
dst_ptr = dst = cell->old_dir;
for (arg_ptr = arg; *arg_ptr; arg_ptr++)
{
if (dst_ptr - dst >= MAXPGPATH)
elog(ERROR, "directory name too long");
if (*arg_ptr == '\\' && *(arg_ptr + 1) == '=')
; /* skip backslash escaping = */
else if (*arg_ptr == '=' && (arg_ptr == arg || *(arg_ptr - 1) != '\\'))
{
if (*cell->new_dir)
elog(ERROR, "multiple \"=\" signs in tablespace mapping\n");
else
dst = dst_ptr = cell->new_dir;
}
else
*dst_ptr++ = *arg_ptr;
}
if (!*cell->old_dir || !*cell->new_dir)
elog(ERROR, "invalid tablespace mapping format \"%s\", "
"must be \"OLDDIR=NEWDIR\"", arg);
/*
* This check isn't absolutely necessary. But all tablespaces are created
* with absolute directories, so specifying a non-absolute path here would
* just never match, possibly confusing users. It's also good to be
* consistent with the new_dir check.
*/
if (!is_absolute_path(cell->old_dir))
elog(ERROR, "old directory is not an absolute path in tablespace mapping: %s\n",
cell->old_dir);
if (!is_absolute_path(cell->new_dir))
elog(ERROR, "new directory is not an absolute path in tablespace mapping: %s\n",
cell->new_dir);
if (tablespace_dirs.tail)
tablespace_dirs.tail->next = cell;
else
tablespace_dirs.head = cell;
tablespace_dirs.tail = cell;
}
/*
* Create backup directories from **backup_dir** to **data_dir**. Doesn't raise
* an error if target directories exist.
*
* If **extract_tablespaces** is true then try to extract tablespace data
* directories into their initial path using tablespace_map file.
*/
void
create_data_directories(const char *data_dir, const char *backup_dir,
bool extract_tablespaces)
{
parray *dirs,
*links = NULL;
size_t i;
char backup_database_dir[MAXPGPATH],
to_path[MAXPGPATH];
dirs = parray_new();
if (extract_tablespaces)
{
links = parray_new();
read_tablespace_map(links, backup_dir);
}
join_path_components(backup_database_dir, backup_dir, DATABASE_DIR);
list_data_directories(dirs, backup_database_dir, true, false);
elog(LOG, "restore directories and symlinks...");
for (i = 0; i < parray_num(dirs); i++)
{
pgFile *dir = (pgFile *) parray_get(dirs, i);
char *relative_ptr = GetRelativePath(dir->path, backup_database_dir);
Assert(S_ISDIR(dir->mode));
/* Try to create symlink and linked directory if necessary */
if (extract_tablespaces &&
path_is_prefix_of_path(PG_TBLSPC_DIR, relative_ptr))
{
char *link_ptr = GetRelativePath(relative_ptr, PG_TBLSPC_DIR),
*link_sep,
*tmp_ptr;
char link_name[MAXPGPATH];
pgFile **link;
/* Extract link name from relative path */
link_sep = first_dir_separator(link_ptr);
if (link_sep != NULL)
{
int len = link_sep - link_ptr;
strncpy(link_name, link_ptr, len);
link_name[len] = '\0';
}
else
goto create_directory;
tmp_ptr = dir->path;
dir->path = link_name;
/* Search only by symlink name without path */
link = (pgFile **) parray_bsearch(links, dir, pgFileComparePath);
dir->path = tmp_ptr;
if (link)
{
const char *linked_path = get_tablespace_mapping((*link)->linked);
const char *dir_created;
if (!is_absolute_path(linked_path))
elog(ERROR, "tablespace directory is not an absolute path: %s\n",
linked_path);
/* Check if linked directory was created earlier */
dir_created = get_tablespace_created(link_name);
if (dir_created)
{
/*
* If symlink and linked directory were created do not
* create it second time.
*/
if (strcmp(dir_created, linked_path) == 0)
{
/*
* Create rest of directories.
* First check is there any directory name after
* separator.
*/
if (link_sep != NULL && *(link_sep + 1) != '\0')
goto create_directory;
else
continue;
}
else
elog(ERROR, "tablespace directory \"%s\" of page backup does not "
"match with previous created tablespace directory \"%s\" of symlink \"%s\"",
linked_path, dir_created, link_name);
}
/*
* This check was done in check_tablespace_mapping(). But do
* it again.
*/
if (!dir_is_empty(linked_path))
elog(ERROR, "restore tablespace destination is not empty: \"%s\"",
linked_path);
if (link_sep)
elog(LOG, "create directory \"%s\" and symbolic link \"%.*s\"",
linked_path,
(int) (link_sep - relative_ptr), relative_ptr);
else
elog(LOG, "create directory \"%s\" and symbolic link \"%s\"",
linked_path, relative_ptr);
/* Firstly, create linked directory */
dir_create_dir(linked_path, DIR_PERMISSION);
join_path_components(to_path, data_dir, PG_TBLSPC_DIR);
/* Create pg_tblspc directory just in case */
dir_create_dir(to_path, DIR_PERMISSION);
/* Secondly, create link */
join_path_components(to_path, to_path, link_name);
if (symlink(linked_path, to_path) < 0)
elog(ERROR, "could not create symbolic link \"%s\": %s",
to_path, strerror(errno));
/* Save linked directory */
set_tablespace_created(link_name, linked_path);
/*
* Create rest of directories.
* First check is there any directory name after separator.
*/
if (link_sep != NULL && *(link_sep + 1) != '\0')
goto create_directory;
continue;
}
}
create_directory:
elog(LOG, "create directory \"%s\"", relative_ptr);
/* This is not symlink, create directory */
join_path_components(to_path, data_dir, relative_ptr);
dir_create_dir(to_path, DIR_PERMISSION);
}
if (extract_tablespaces)
{
parray_walk(links, pgFileFree);
parray_free(links);
}
parray_walk(dirs, pgFileFree);
parray_free(dirs);
}
/*
* Read names of symbolik names of tablespaces with links to directories from
* tablespace_map or tablespace_map.txt.
@ -800,6 +1106,70 @@ read_tablespace_map(parray *files, const char *backup_dir)
fclose(fp);
}
/*
* Check that all tablespace mapping entries have correct linked directory
* paths. Linked directories must be empty or do not exist.
*
* If tablespace-mapping option is supplied, all OLDDIR entries must have
* entries in tablespace_map file.
*/
void
check_tablespace_mapping(pgBackup *backup)
{
char this_backup_path[MAXPGPATH];
parray *links;
size_t i;
TablespaceListCell *cell;
pgFile *tmp_file = pgut_new(pgFile);
links = parray_new();
pgBackupGetPath(backup, this_backup_path, lengthof(this_backup_path), NULL);
read_tablespace_map(links, this_backup_path);
if (log_level_console <= LOG || log_level_file <= LOG)
elog(LOG, "check tablespace directories of backup %s",
base36enc(backup->start_time));
/* 1 - each OLDDIR must have an entry in tablespace_map file (links) */
for (cell = tablespace_dirs.head; cell; cell = cell->next)
{
tmp_file->linked = cell->old_dir;
if (parray_bsearch(links, tmp_file, pgFileCompareLinked) == NULL)
elog(ERROR, "--tablespace-mapping option's old directory "
"doesn't have an entry in tablespace_map file: \"%s\"",
cell->old_dir);
}
/* 2 - all linked directories must be empty */
for (i = 0; i < parray_num(links); i++)
{
pgFile *link = (pgFile *) parray_get(links, i);
const char *linked_path = link->linked;
TablespaceListCell *cell;
for (cell = tablespace_dirs.head; cell; cell = cell->next)
if (strcmp(link->linked, cell->old_dir) == 0)
{
linked_path = cell->new_dir;
break;
}
if (!is_absolute_path(linked_path))
elog(ERROR, "tablespace directory is not an absolute path: %s\n",
linked_path);
if (!dir_is_empty(linked_path))
elog(ERROR, "restore tablespace destination is not empty: \"%s\"",
linked_path);
}
free(tmp_file);
parray_walk(links, pgFileFree);
parray_free(links);
}
/*
* Print backup content list.
*/
@ -858,7 +1228,7 @@ print_file_list(FILE *out, const parray *files, const char *root)
* {"name1":"value1", "name2":"value2"}
*
* The value will be returned to "value_str" as string if it is not NULL. If it
* is NULL the value will be returned to "value_uint64" as int64.
* is NULL the value will be returned to "value_int64" as int64.
*
* Returns true if the value was found in the line.
*/
@ -1108,3 +1478,14 @@ fileExists(const char *path)
else
return true;
}
size_t
pgFileSize(const char *path)
{
struct stat buf;
if (stat(path, &buf) == -1)
elog(ERROR, "Cannot stat file \"%s\": %s", path, strerror(errno));
return buf.st_size;
}

View File

@ -14,6 +14,7 @@ static void help_restore(void);
static void help_validate(void);
static void help_show(void);
static void help_delete(void);
static void help_merge(void);
static void help_set_config(void);
static void help_show_config(void);
static void help_add_instance(void);
@ -36,6 +37,8 @@ help_command(char *command)
help_show();
else if (strcmp(command, "delete") == 0)
help_delete();
else if (strcmp(command, "merge") == 0)
help_merge();
else if (strcmp(command, "set-config") == 0)
help_set_config();
else if (strcmp(command, "show-config") == 0)
@ -113,7 +116,7 @@ help_pg_probackup(void)
printf(_(" [-w --no-password] [-W --password]\n"));
printf(_(" [--master-db=db_name] [--master-host=host_name]\n"));
printf(_(" [--master-port=port] [--master-user=user_name]\n"));
printf(_(" [--replica-timeout=timeout]\n\n"));
printf(_(" [--replica-timeout=timeout]\n"));
printf(_("\n %s restore -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [-D pgdata-dir] [-i backup-id] [--progress]\n"));
@ -136,6 +139,8 @@ help_pg_probackup(void)
printf(_("\n %s delete -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [--wal] [-i backup-id | --expired]\n"));
printf(_("\n %s merge -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" -i backup-id\n"));
printf(_("\n %s add-instance -B backup-dir -D pgdata-dir\n"), PROGRAM_NAME);
printf(_(" --instance=instance_name\n"));
@ -411,6 +416,17 @@ help_delete(void)
printf(_(" available units: 'ms', 's', 'min', 'h', 'd' (default: min)\n"));
}
static void
help_merge(void)
{
printf(_("%s merge -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" -i backup-id\n\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance\n"));
printf(_(" -i, --backup-id=backup-id backup to merge\n"));
}
static void
help_set_config(void)
{

520
src/merge.c Normal file
View File

@ -0,0 +1,520 @@
/*-------------------------------------------------------------------------
*
* merge.c: merge FULL and incremental backups
*
* Copyright (c) 2018, Postgres Professional
*
*-------------------------------------------------------------------------
*/
#include "pg_probackup.h"
#include <sys/stat.h>
#include <unistd.h>
#include "utils/thread.h"
typedef struct
{
parray *to_files;
parray *files;
pgBackup *to_backup;
pgBackup *from_backup;
const char *to_root;
const char *from_root;
/*
* Return value from the thread.
* 0 means there is no error, 1 - there is an error.
*/
int ret;
} merge_files_arg;
static void merge_backups(pgBackup *backup, pgBackup *next_backup);
static void *merge_files(void *arg);
/*
* Implementation of MERGE command.
*
* - Find target and its parent full backup
* - Merge data files of target, parent and and intermediate backups
* - Remove unnecessary files, which doesn't exist in the target backup anymore
*/
void
do_merge(time_t backup_id)
{
parray *backups;
pgBackup *dest_backup = NULL;
pgBackup *full_backup = NULL;
time_t prev_parent = INVALID_BACKUP_ID;
int i;
int dest_backup_idx = 0;
int full_backup_idx = 0;
if (backup_id == INVALID_BACKUP_ID)
elog(ERROR, "required parameter is not specified: --backup-id");
if (instance_name == NULL)
elog(ERROR, "required parameter is not specified: --instance");
elog(LOG, "Merge started");
catalog_lock();
/* Get list of all backups sorted in order of descending start time */
backups = catalog_get_backup_list(INVALID_BACKUP_ID);
/* Find destination and parent backups */
for (i = 0; i < parray_num(backups); i++)
{
pgBackup *backup = (pgBackup *) parray_get(backups, i);
if (backup->start_time > backup_id)
continue;
else if (backup->start_time == backup_id && !dest_backup)
{
if (backup->status != BACKUP_STATUS_OK)
elog(ERROR, "Backup %s has status: %s",
base36enc(backup->start_time), status2str(backup->status));
if (backup->backup_mode == BACKUP_MODE_FULL)
elog(ERROR, "Backup %s if full backup",
base36enc(backup->start_time));
dest_backup = backup;
dest_backup_idx = i;
}
else
{
Assert(dest_backup);
if (backup->start_time != prev_parent)
continue;
if (backup->status != BACKUP_STATUS_OK)
elog(ERROR, "Skipping backup %s, because it has non-valid status: %s",
base36enc(backup->start_time), status2str(backup->status));
/* If we already found dest_backup, look for full backup */
if (dest_backup && backup->backup_mode == BACKUP_MODE_FULL)
{
if (backup->status != BACKUP_STATUS_OK)
elog(ERROR, "Parent full backup %s for the given backup %s has status: %s",
base36enc_dup(backup->start_time),
base36enc_dup(dest_backup->start_time),
status2str(backup->status));
full_backup = backup;
full_backup_idx = i;
/* Found target and full backups, so break the loop */
break;
}
}
prev_parent = backup->parent_backup;
}
if (dest_backup == NULL)
elog(ERROR, "Target backup %s was not found", base36enc(backup_id));
if (full_backup == NULL)
elog(ERROR, "Parent full backup for the given backup %s was not found",
base36enc(backup_id));
Assert(full_backup_idx != dest_backup_idx);
/*
* Found target and full backups, merge them and intermediate backups
*/
for (i = full_backup_idx; i > dest_backup_idx; i--)
{
pgBackup *to_backup = (pgBackup *) parray_get(backups, i);
pgBackup *from_backup = (pgBackup *) parray_get(backups, i - 1);
merge_backups(to_backup, from_backup);
}
/* cleanup */
parray_walk(backups, pgBackupFree);
parray_free(backups);
elog(LOG, "Merge completed");
}
/*
* Merge two backups data files using threads.
* - move instance files from from_backup to to_backup
* - remove unnecessary directories and files from to_backup
* - update metadata of from_backup, it becames FULL backup
*/
static void
merge_backups(pgBackup *to_backup, pgBackup *from_backup)
{
char *to_backup_id = base36enc_dup(to_backup->start_time),
*from_backup_id = base36enc_dup(from_backup->start_time);
char to_backup_path[MAXPGPATH],
to_database_path[MAXPGPATH],
from_backup_path[MAXPGPATH],
from_database_path[MAXPGPATH],
control_file[MAXPGPATH];
parray *files,
*to_files;
pthread_t *threads;
merge_files_arg *threads_args;
int i;
bool merge_isok = true;
elog(LOG, "Merging backup %s with backup %s", from_backup_id, to_backup_id);
to_backup->status = BACKUP_STATUS_MERGING;
pgBackupWriteBackupControlFile(to_backup);
from_backup->status = BACKUP_STATUS_MERGING;
pgBackupWriteBackupControlFile(from_backup);
/*
* Make backup paths.
*/
pgBackupGetPath(to_backup, to_backup_path, lengthof(to_backup_path), NULL);
pgBackupGetPath(to_backup, to_database_path, lengthof(to_database_path),
DATABASE_DIR);
pgBackupGetPath(from_backup, from_backup_path, lengthof(from_backup_path), NULL);
pgBackupGetPath(from_backup, from_database_path, lengthof(from_database_path),
DATABASE_DIR);
create_data_directories(to_database_path, from_backup_path, false);
/*
* Get list of files which will be modified or removed.
*/
pgBackupGetPath(to_backup, control_file, lengthof(control_file),
DATABASE_FILE_LIST);
to_files = dir_read_file_list(from_database_path, /* Use from_database_path
* so root path will be
* equal with 'files' */
control_file);
/* To delete from leaf, sort in reversed order */
parray_qsort(to_files, pgFileComparePathDesc);
/*
* Get list of files which need to be moved.
*/
pgBackupGetPath(from_backup, control_file, lengthof(control_file),
DATABASE_FILE_LIST);
files = dir_read_file_list(from_database_path, control_file);
/* sort by size for load balancing */
parray_qsort(files, pgFileCompareSize);
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
threads_args = (merge_files_arg *) palloc(sizeof(merge_files_arg) * num_threads);
/* Setup threads */
for (i = 0; i < parray_num(files); i++)
{
pgFile *file = (pgFile *) parray_get(files, i);
pg_atomic_init_flag(&file->lock);
}
for (i = 0; i < num_threads; i++)
{
merge_files_arg *arg = &(threads_args[i]);
arg->to_files = to_files;
arg->files = files;
arg->to_backup = to_backup;
arg->from_backup = from_backup;
arg->to_root = to_database_path;
arg->from_root = from_database_path;
/* By default there are some error */
arg->ret = 1;
elog(VERBOSE, "Start thread: %d", i);
pthread_create(&threads[i], NULL, merge_files, arg);
}
/* Wait threads */
for (i = 0; i < num_threads; i++)
{
pthread_join(threads[i], NULL);
if (threads_args[i].ret == 1)
merge_isok = false;
}
if (!merge_isok)
elog(ERROR, "Data files merging failed");
/*
* Files were copied into to_backup and deleted from from_backup. Remove
* remaining directories from from_backup.
*/
parray_qsort(files, pgFileComparePathDesc);
for (i = 0; i < parray_num(files); i++)
{
pgFile *file = (pgFile *) parray_get(files, i);
if (!S_ISDIR(file->mode))
continue;
if (rmdir(file->path))
elog(ERROR, "Could not remove directory \"%s\": %s",
file->path, strerror(errno));
}
if (rmdir(from_database_path))
elog(ERROR, "Could not remove directory \"%s\": %s",
from_database_path, strerror(errno));
if (unlink(control_file))
elog(ERROR, "Could not remove file \"%s\": %s",
control_file, strerror(errno));
pgBackupGetPath(from_backup, control_file, lengthof(control_file),
BACKUP_CONTROL_FILE);
if (unlink(control_file))
elog(ERROR, "Could not remove file \"%s\": %s",
control_file, strerror(errno));
if (rmdir(from_backup_path))
elog(ERROR, "Could not remove directory \"%s\": %s",
from_backup_path, strerror(errno));
/*
* Delete files which are not in from_backup file list.
*/
for (i = 0; i < parray_num(to_files); i++)
{
pgFile *file = (pgFile *) parray_get(to_files, i);
if (parray_bsearch(files, file, pgFileComparePathDesc) == NULL)
{
pgFileDelete(file);
elog(LOG, "Deleted \"%s\"", file->path);
}
}
/*
* Rename FULL backup directory.
*/
if (rename(to_backup_path, from_backup_path) == -1)
elog(ERROR, "Could not rename directory \"%s\" to \"%s\": %s",
to_backup_path, from_backup_path, strerror(errno));
/*
* Update to_backup metadata.
*/
pgBackupCopy(to_backup, from_backup);
/* Correct metadata */
to_backup->backup_mode = BACKUP_MODE_FULL;
to_backup->status = BACKUP_STATUS_OK;
to_backup->parent_backup = INVALID_BACKUP_ID;
/* Compute summary of size of regular files in the backup */
to_backup->data_bytes = 0;
for (i = 0; i < parray_num(files); i++)
{
pgFile *file = (pgFile *) parray_get(files, i);
if (S_ISDIR(file->mode))
to_backup->data_bytes += 4096;
/* Count the amount of the data actually copied */
else if (S_ISREG(file->mode))
to_backup->data_bytes += file->write_size;
}
/* compute size of wal files of this backup stored in the archive */
if (!current.stream)
to_backup->wal_bytes = XLOG_SEG_SIZE *
(to_backup->stop_lsn / XLogSegSize - to_backup->start_lsn / XLogSegSize + 1);
else
to_backup->wal_bytes = BYTES_INVALID;
pgBackupWriteFileList(to_backup, files, from_database_path);
pgBackupWriteBackupControlFile(to_backup);
/* Cleanup */
pfree(threads_args);
pfree(threads);
parray_walk(to_files, pgFileFree);
parray_free(to_files);
parray_walk(files, pgFileFree);
parray_free(files);
pfree(to_backup_id);
pfree(from_backup_id);
}
/*
* Thread worker of merge_backups().
*/
static void *
merge_files(void *arg)
{
merge_files_arg *argument = (merge_files_arg *) arg;
pgBackup *to_backup = argument->to_backup;
pgBackup *from_backup = argument->from_backup;
char tmp_file_path[MAXPGPATH];
int i;
int to_root_len = strlen(argument->to_root);
if (to_backup->compress_alg == PGLZ_COMPRESS ||
to_backup->compress_alg == ZLIB_COMPRESS)
join_path_components(tmp_file_path, argument->to_root, "tmp");
for (i = 0; i < parray_num(argument->files); i++)
{
pgFile *file = (pgFile *) parray_get(argument->files, i);
if (!pg_atomic_test_set_flag(&file->lock))
continue;
/* check for interrupt */
if (interrupted)
elog(ERROR, "Interrupted during merging backups");
/*
* Skip files which haven't changed since previous backup. But in case
* of DELTA backup we should consider n_blocks to truncate the target
* backup.
*/
if (file->write_size == BYTES_INVALID &&
file->n_blocks == -1)
{
elog(VERBOSE, "Skip merging file \"%s\", the file didn't change",
file->path);
/*
* If the file wasn't changed in PAGE backup, retreive its
* write_size from previous FULL backup.
*/
if (S_ISREG(file->mode))
{
pgFile **res_file;
res_file = parray_bsearch(argument->to_files, file,
pgFileComparePathDesc);
if (res_file && *res_file)
{
file->compress_alg = (*res_file)->compress_alg;
file->write_size = (*res_file)->write_size;
file->crc = (*res_file)->crc;
}
}
continue;
}
/* Directories were created before */
if (S_ISDIR(file->mode))
continue;
/*
* Move the file. We need to decompress it and compress again if
* necessary.
*/
elog(VERBOSE, "Moving file \"%s\", is_datafile %d, is_cfs %d",
file->path, file->is_database, file->is_cfs);
if (file->is_datafile && !file->is_cfs)
{
char to_path_tmp[MAXPGPATH]; /* Path of target file */
join_path_components(to_path_tmp, argument->to_root,
file->path + to_root_len + 1);
/*
* We need more complicate algorithm if target file exists and it is
* compressed.
*/
if (to_backup->compress_alg == PGLZ_COMPRESS ||
to_backup->compress_alg == ZLIB_COMPRESS)
{
char *prev_path;
/* Start the magic */
/*
* Merge files:
* - decompress first file
* - decompress second file and merge with first decompressed file
* - compress result file
*/
elog(VERBOSE, "File is compressed, decompress to the temporary file \"%s\"",
tmp_file_path);
prev_path = file->path;
/*
* We need to decompress target file only if it exists.
*/
if (fileExists(to_path_tmp))
{
/*
* file->path points to the file in from_root directory. But we
* need the file in directory to_root.
*/
file->path = to_path_tmp;
/* Decompress first/target file */
restore_data_file(tmp_file_path, file, false, false);
file->path = prev_path;
}
/* Merge second/source file with first/target file */
restore_data_file(tmp_file_path, file,
from_backup->backup_mode == BACKUP_MODE_DIFF_DELTA,
false);
elog(VERBOSE, "Compress file and save it to the directory \"%s\"",
argument->to_root);
/* Again we need change path */
file->path = tmp_file_path;
/* backup_data_file() requires file size to calculate nblocks */
file->size = pgFileSize(file->path);
/* Now we can compress the file */
backup_data_file(NULL, /* We shouldn't need 'arguments' here */
to_path_tmp, file,
to_backup->start_lsn,
to_backup->backup_mode,
to_backup->compress_alg,
to_backup->compress_level);
file->path = prev_path;
/* We can remove temporary file now */
if (unlink(tmp_file_path))
elog(ERROR, "Could not remove temporary file \"%s\": %s",
tmp_file_path, strerror(errno));
}
/*
* Otherwise merging algorithm is simpler.
*/
else
{
/* We can merge in-place here */
restore_data_file(to_path_tmp, file,
from_backup->backup_mode == BACKUP_MODE_DIFF_DELTA,
true);
/*
* We need to calculate write_size, restore_data_file() doesn't
* do that.
*/
file->write_size = pgFileSize(to_path_tmp);
file->crc = pgFileGetCRC(to_path_tmp);
}
pgFileDelete(file);
}
else
move_file(argument->from_root, argument->to_root, file);
if (file->write_size != BYTES_INVALID)
elog(LOG, "Moved file \"%s\": " INT64_FORMAT " bytes",
file->path, file->write_size);
}
/* Data files merging is successful */
argument->ret = 0;
return NULL;
}

View File

@ -38,7 +38,7 @@ char backup_instance_path[MAXPGPATH];
char arclog_path[MAXPGPATH] = "";
/* common options */
char *backup_id_string_param = NULL;
static char *backup_id_string = NULL;
int num_threads = 1;
bool stream_wal = false;
bool progress = false;
@ -125,7 +125,7 @@ static pgut_option options[] =
{ 'u', 'j', "threads", &num_threads, SOURCE_CMDLINE },
{ 'b', 2, "stream", &stream_wal, SOURCE_CMDLINE },
{ 'b', 3, "progress", &progress, SOURCE_CMDLINE },
{ 's', 'i', "backup-id", &backup_id_string_param, SOURCE_CMDLINE },
{ 's', 'i', "backup-id", &backup_id_string, SOURCE_CMDLINE },
/* backup options */
{ 'b', 10, "backup-pg-log", &backup_logs, SOURCE_CMDLINE },
{ 'f', 'b', "backup-mode", opt_backup_mode, SOURCE_CMDLINE },
@ -206,7 +206,7 @@ main(int argc, char *argv[])
int rc;
/* initialize configuration */
pgBackup_init(&current);
pgBackupInit(&current);
PROGRAM_NAME = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], "pgscripts");
@ -235,10 +235,12 @@ main(int argc, char *argv[])
backup_subcmd = RESTORE_CMD;
else if (strcmp(argv[1], "validate") == 0)
backup_subcmd = VALIDATE_CMD;
else if (strcmp(argv[1], "show") == 0)
backup_subcmd = SHOW_CMD;
else if (strcmp(argv[1], "delete") == 0)
backup_subcmd = DELETE_CMD;
else if (strcmp(argv[1], "merge") == 0)
backup_subcmd = MERGE_CMD;
else if (strcmp(argv[1], "show") == 0)
backup_subcmd = SHOW_CMD;
else if (strcmp(argv[1], "set-config") == 0)
backup_subcmd = SET_CONFIG_CMD;
else if (strcmp(argv[1], "show-config") == 0)
@ -281,7 +283,8 @@ main(int argc, char *argv[])
if (backup_subcmd == BACKUP_CMD ||
backup_subcmd == RESTORE_CMD ||
backup_subcmd == VALIDATE_CMD ||
backup_subcmd == DELETE_CMD)
backup_subcmd == DELETE_CMD ||
backup_subcmd == MERGE_CMD)
{
int i,
len = 0,
@ -347,7 +350,8 @@ main(int argc, char *argv[])
}
/* Option --instance is required for all commands except init and show */
if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD && backup_subcmd != VALIDATE_CMD)
if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD &&
backup_subcmd != VALIDATE_CMD)
{
if (instance_name == NULL)
elog(ERROR, "required parameter not specified: --instance");
@ -359,7 +363,8 @@ main(int argc, char *argv[])
*/
if (instance_name)
{
sprintf(backup_instance_path, "%s/%s/%s", backup_path, BACKUPS_DIR, instance_name);
sprintf(backup_instance_path, "%s/%s/%s",
backup_path, BACKUPS_DIR, instance_name);
sprintf(arclog_path, "%s/%s/%s", backup_path, "wal", instance_name);
/*
@ -402,18 +407,19 @@ main(int argc, char *argv[])
elog(ERROR, "-D, --pgdata must be an absolute path");
/* Sanity check of --backup-id option */
if (backup_id_string_param != NULL)
if (backup_id_string != NULL)
{
if (backup_subcmd != RESTORE_CMD
&& backup_subcmd != VALIDATE_CMD
&& backup_subcmd != DELETE_CMD
&& backup_subcmd != SHOW_CMD)
elog(ERROR, "Cannot use -i (--backup-id) option together with the '%s' command",
argv[1]);
if (backup_subcmd != RESTORE_CMD &&
backup_subcmd != VALIDATE_CMD &&
backup_subcmd != DELETE_CMD &&
backup_subcmd != MERGE_CMD &&
backup_subcmd != SHOW_CMD)
elog(ERROR, "Cannot use -i (--backup-id) option together with the \"%s\" command",
command_name);
current.backup_id = base36dec(backup_id_string_param);
current.backup_id = base36dec(backup_id_string);
if (current.backup_id == 0)
elog(ERROR, "Invalid backup-id");
elog(ERROR, "Invalid backup-id \"%s\"", backup_id_string);
}
/* Setup stream options. They are used in streamutil.c. */
@ -490,16 +496,19 @@ main(int argc, char *argv[])
case SHOW_CMD:
return do_show(current.backup_id);
case DELETE_CMD:
if (delete_expired && backup_id_string_param)
if (delete_expired && backup_id_string)
elog(ERROR, "You cannot specify --delete-expired and --backup-id options together");
if (!delete_expired && !delete_wal && !backup_id_string_param)
if (!delete_expired && !delete_wal && !backup_id_string)
elog(ERROR, "You must specify at least one of the delete options: --expired |--wal |--backup_id");
if (delete_wal && !delete_expired && !backup_id_string_param)
if (delete_wal && !delete_expired && !backup_id_string)
return do_retention_purge();
if (delete_expired)
return do_retention_purge();
else
return do_delete(current.backup_id);
case MERGE_CMD:
do_merge(current.backup_id);
break;
case SHOW_CONFIG_CMD:
return do_configure(true);
case SET_CONFIG_CMD:

View File

@ -122,8 +122,9 @@ typedef enum BackupStatus
{
BACKUP_STATUS_INVALID, /* the pgBackup is invalid */
BACKUP_STATUS_OK, /* completed backup */
BACKUP_STATUS_RUNNING, /* running backup */
BACKUP_STATUS_ERROR, /* aborted because of unexpected error */
BACKUP_STATUS_RUNNING, /* running backup */
BACKUP_STATUS_MERGING, /* merging backups */
BACKUP_STATUS_DELETING, /* data files are being deleted */
BACKUP_STATUS_DELETED, /* data files have been deleted */
BACKUP_STATUS_DONE, /* completed but not validated yet */
@ -144,15 +145,16 @@ typedef enum ProbackupSubcmd
{
NO_CMD = 0,
INIT_CMD,
ARCHIVE_PUSH_CMD,
ARCHIVE_GET_CMD,
ADD_INSTANCE_CMD,
DELETE_INSTANCE_CMD,
ARCHIVE_PUSH_CMD,
ARCHIVE_GET_CMD,
BACKUP_CMD,
RESTORE_CMD,
VALIDATE_CMD,
SHOW_CMD,
DELETE_CMD,
MERGE_CMD,
SHOW_CMD,
SET_CONFIG_CMD,
SHOW_CONFIG_CMD
} ProbackupSubcmd;
@ -418,7 +420,8 @@ extern pgRecoveryTarget *parseRecoveryTargetOptions(
bool target_immediate, const char *target_name,
const char *target_action, bool restore_no_validate);
extern void opt_tablespace_map(pgut_option *opt, const char *arg);
/* in merge.c */
extern void do_merge(time_t backup_id);
/* in init.c */
extern int do_init(void);
@ -470,10 +473,15 @@ extern pgBackup *catalog_get_last_data_backup(parray *backup_list,
extern void catalog_lock(void);
extern void pgBackupWriteControl(FILE *out, pgBackup *backup);
extern void pgBackupWriteBackupControlFile(pgBackup *backup);
extern void pgBackupWriteFileList(pgBackup *backup, parray *files,
const char *root);
extern void pgBackupGetPath(const pgBackup *backup, char *path, size_t len, const char *subdir);
extern void pgBackupGetPath2(const pgBackup *backup, char *path, size_t len,
const char *subdir1, const char *subdir2);
extern int pgBackupCreateDir(pgBackup *backup);
extern void pgBackupInit(pgBackup *backup);
extern void pgBackupCopy(pgBackup *dst, pgBackup *src);
extern void pgBackupFree(void *backup);
extern int pgBackupCompareId(const void *f1, const void *f2);
extern int pgBackupCompareIdDesc(const void *f1, const void *f2);
@ -481,10 +489,13 @@ extern int pgBackupCompareIdDesc(const void *f1, const void *f2);
/* in dir.c */
extern void dir_list_file(parray *files, const char *root, bool exclude,
bool omit_symlink, bool add_root);
extern void list_data_directories(parray *files, const char *path,
bool is_root, bool exclude);
extern void create_data_directories(const char *data_dir,
const char *backup_dir,
bool extract_tablespaces);
extern void read_tablespace_map(parray *files, const char *backup_dir);
extern void opt_tablespace_map(pgut_option *opt, const char *arg);
extern void check_tablespace_mapping(pgBackup *backup);
extern void print_file_list(FILE *out, const parray *files, const char *root);
extern parray *dir_read_file_list(const char *root, const char *file_txt);
@ -493,12 +504,13 @@ extern int dir_create_dir(const char *path, mode_t mode);
extern bool dir_is_empty(const char *path);
extern bool fileExists(const char *path);
extern size_t pgFileSize(const char *path);
extern pgFile *pgFileNew(const char *path, bool omit_symlink);
extern pgFile *pgFileInit(const char *path);
extern void pgFileDelete(pgFile *file);
extern void pgFileFree(void *file);
extern pg_crc32 pgFileGetCRC(pgFile *file);
extern pg_crc32 pgFileGetCRC(const char *file_path);
extern int pgFileComparePath(const void *f1, const void *f2);
extern int pgFileComparePathDesc(const void *f1, const void *f2);
extern int pgFileCompareLinked(const void *f1, const void *f2);
@ -506,13 +518,15 @@ extern int pgFileCompareSize(const void *f1, const void *f2);
/* in data.c */
extern bool backup_data_file(backup_files_arg* arguments,
const char *from_root, const char *to_root,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BackupMode backup_mode);
extern void restore_data_file(const char *from_root, const char *to_root,
pgFile *file, pgBackup *backup);
extern bool copy_file(const char *from_root, const char *to_root,
pgFile *file);
const char *to_path, pgFile *file,
XLogRecPtr prev_backup_start_lsn,
BackupMode backup_mode,
CompressAlg calg, int clevel);
extern void restore_data_file(const char *to_path,
pgFile *file, bool allow_truncate,
bool write_header);
extern bool copy_file(const char *from_root, const char *to_root, pgFile *file);
extern void move_file(const char *from_root, const char *to_root, pgFile *file);
extern void push_wal_file(const char *from_path, const char *to_path,
bool is_compress, bool overwrite);
extern void get_wal_file(const char *from_path, const char *to_path);
@ -553,7 +567,6 @@ extern uint64 get_system_identifier(char *pgdata);
extern uint64 get_remote_system_identifier(PGconn *conn);
extern pg_time_t timestamptz_to_time_t(TimestampTz t);
extern int parse_server_version(char *server_version_str);
extern void pgBackup_init(pgBackup *backup);
/* in status.c */
extern bool is_pg_running(void);

View File

@ -32,50 +32,12 @@ typedef struct
int ret;
} restore_files_arg;
/* Tablespace mapping structures */
typedef struct TablespaceListCell
{
struct TablespaceListCell *next;
char old_dir[MAXPGPATH];
char new_dir[MAXPGPATH];
} TablespaceListCell;
typedef struct TablespaceList
{
TablespaceListCell *head;
TablespaceListCell *tail;
} TablespaceList;
typedef struct TablespaceCreatedListCell
{
struct TablespaceCreatedListCell *next;
char link_name[MAXPGPATH];
char linked_dir[MAXPGPATH];
} TablespaceCreatedListCell;
typedef struct TablespaceCreatedList
{
TablespaceCreatedListCell *head;
TablespaceCreatedListCell *tail;
} TablespaceCreatedList;
static void restore_backup(pgBackup *backup);
static void restore_directories(const char *pg_data_dir,
const char *backup_dir);
static void check_tablespace_mapping(pgBackup *backup);
static void create_recovery_conf(time_t backup_id,
pgRecoveryTarget *rt,
pgBackup *backup);
static void *restore_files(void *arg);
static void remove_deleted_files(pgBackup *backup);
static const char *get_tablespace_mapping(const char *dir);
static void set_tablespace_created(const char *link, const char *dir);
static const char *get_tablespace_created(const char *link);
/* Tablespace mapping */
static TablespaceList tablespace_dirs = {NULL, NULL};
static TablespaceCreatedList tablespace_created_dirs = {NULL, NULL};
/*
@ -115,8 +77,6 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
catalog_lock();
/* Get list of all backups sorted in order of descending start time */
backups = catalog_get_backup_list(INVALID_BACKUP_ID);
if (backups == NULL)
elog(ERROR, "Failed to get backup list.");
/* Find backup range we should restore or validate. */
for (i = 0; i < parray_num(backups); i++)
@ -256,6 +216,7 @@ do_restore_or_validate(time_t target_backup_id, pgRecoveryTarget *rt,
for (i = base_full_backup_index; i >= dest_backup_index; i--)
{
pgBackup *backup = (pgBackup *) parray_get(backups, i);
pgBackupValidate(backup);
/* Maybe we should be more paranoid and check for !BACKUP_STATUS_OK? */
if (backup->status == BACKUP_STATUS_CORRUPT)
@ -405,7 +366,7 @@ restore_backup(pgBackup *backup)
* this_backup_path = $BACKUP_PATH/backups/instance_name/backup_id
*/
pgBackupGetPath(backup, this_backup_path, lengthof(this_backup_path), NULL);
restore_directories(pgdata, this_backup_path);
create_data_directories(pgdata, this_backup_path, true);
/*
* Get list of files which need to be restored.
@ -506,212 +467,6 @@ remove_deleted_files(pgBackup *backup)
parray_free(files_restored);
}
/*
* Restore backup directories from **backup_database_dir** to **pg_data_dir**.
*
* TODO: Think about simplification and clarity of the function.
*/
static void
restore_directories(const char *pg_data_dir, const char *backup_dir)
{
parray *dirs,
*links;
size_t i;
char backup_database_dir[MAXPGPATH],
to_path[MAXPGPATH];
dirs = parray_new();
links = parray_new();
join_path_components(backup_database_dir, backup_dir, DATABASE_DIR);
list_data_directories(dirs, backup_database_dir, true, false);
read_tablespace_map(links, backup_dir);
elog(LOG, "restore directories and symlinks...");
for (i = 0; i < parray_num(dirs); i++)
{
pgFile *dir = (pgFile *) parray_get(dirs, i);
char *relative_ptr = GetRelativePath(dir->path, backup_database_dir);
Assert(S_ISDIR(dir->mode));
/* First try to create symlink and linked directory */
if (path_is_prefix_of_path(PG_TBLSPC_DIR, relative_ptr))
{
char *link_ptr = GetRelativePath(relative_ptr, PG_TBLSPC_DIR),
*link_sep,
*tmp_ptr;
char link_name[MAXPGPATH];
pgFile **link;
/* Extract link name from relative path */
link_sep = first_dir_separator(link_ptr);
if (link_sep != NULL)
{
int len = link_sep - link_ptr;
strncpy(link_name, link_ptr, len);
link_name[len] = '\0';
}
else
goto create_directory;
tmp_ptr = dir->path;
dir->path = link_name;
/* Search only by symlink name without path */
link = (pgFile **) parray_bsearch(links, dir, pgFileComparePath);
dir->path = tmp_ptr;
if (link)
{
const char *linked_path = get_tablespace_mapping((*link)->linked);
const char *dir_created;
if (!is_absolute_path(linked_path))
elog(ERROR, "tablespace directory is not an absolute path: %s\n",
linked_path);
/* Check if linked directory was created earlier */
dir_created = get_tablespace_created(link_name);
if (dir_created)
{
/*
* If symlink and linked directory were created do not
* create it second time.
*/
if (strcmp(dir_created, linked_path) == 0)
{
/*
* Create rest of directories.
* First check is there any directory name after
* separator.
*/
if (link_sep != NULL && *(link_sep + 1) != '\0')
goto create_directory;
else
continue;
}
else
elog(ERROR, "tablespace directory \"%s\" of page backup does not "
"match with previous created tablespace directory \"%s\" of symlink \"%s\"",
linked_path, dir_created, link_name);
}
if (link_sep)
elog(LOG, "create directory \"%s\" and symbolic link \"%.*s\"",
linked_path,
(int) (link_sep - relative_ptr), relative_ptr);
else
elog(LOG, "create directory \"%s\" and symbolic link \"%s\"",
linked_path, relative_ptr);
/* Firstly, create linked directory */
dir_create_dir(linked_path, DIR_PERMISSION);
join_path_components(to_path, pg_data_dir, PG_TBLSPC_DIR);
/* Create pg_tblspc directory just in case */
dir_create_dir(to_path, DIR_PERMISSION);
/* Secondly, create link */
join_path_components(to_path, to_path, link_name);
if (symlink(linked_path, to_path) < 0)
elog(ERROR, "could not create symbolic link \"%s\": %s",
to_path, strerror(errno));
/* Save linked directory */
set_tablespace_created(link_name, linked_path);
/*
* Create rest of directories.
* First check is there any directory name after separator.
*/
if (link_sep != NULL && *(link_sep + 1) != '\0')
goto create_directory;
continue;
}
}
create_directory:
elog(LOG, "create directory \"%s\"", relative_ptr);
/* This is not symlink, create directory */
join_path_components(to_path, pg_data_dir, relative_ptr);
dir_create_dir(to_path, DIR_PERMISSION);
}
parray_walk(links, pgFileFree);
parray_free(links);
parray_walk(dirs, pgFileFree);
parray_free(dirs);
}
/*
* Check that all tablespace mapping entries have correct linked directory
* paths. Linked directories must be empty or do not exist.
*
* If tablespace-mapping option is supplied, all OLDDIR entries must have
* entries in tablespace_map file.
*/
static void
check_tablespace_mapping(pgBackup *backup)
{
char this_backup_path[MAXPGPATH];
parray *links;
size_t i;
TablespaceListCell *cell;
pgFile *tmp_file = pgut_new(pgFile);
links = parray_new();
pgBackupGetPath(backup, this_backup_path, lengthof(this_backup_path), NULL);
read_tablespace_map(links, this_backup_path);
if (log_level_console <= LOG || log_level_file <= LOG)
elog(LOG, "check tablespace directories of backup %s",
base36enc(backup->start_time));
/* 1 - each OLDDIR must have an entry in tablespace_map file (links) */
for (cell = tablespace_dirs.head; cell; cell = cell->next)
{
tmp_file->linked = cell->old_dir;
if (parray_bsearch(links, tmp_file, pgFileCompareLinked) == NULL)
elog(ERROR, "--tablespace-mapping option's old directory "
"doesn't have an entry in tablespace_map file: \"%s\"",
cell->old_dir);
}
/* 2 - all linked directories must be empty */
for (i = 0; i < parray_num(links); i++)
{
pgFile *link = (pgFile *) parray_get(links, i);
const char *linked_path = link->linked;
TablespaceListCell *cell;
for (cell = tablespace_dirs.head; cell; cell = cell->next)
if (strcmp(link->linked, cell->old_dir) == 0)
{
linked_path = cell->new_dir;
break;
}
if (!is_absolute_path(linked_path))
elog(ERROR, "tablespace directory is not an absolute path: %s\n",
linked_path);
if (!dir_is_empty(linked_path))
elog(ERROR, "restore tablespace destination is not empty: \"%s\"",
linked_path);
}
free(tmp_file);
parray_walk(links, pgFileFree);
parray_free(links);
}
/*
* Restore files into $PGDATA.
*/
@ -743,7 +498,6 @@ restore_files(void *arg)
elog(LOG, "Progress: (%d/%lu). Process file %s ",
i + 1, (unsigned long) parray_num(arguments->files), rel_path);
/*
* For PAGE and PTRACK backups skip files which haven't changed
* since previous backup and thus were not backed up.
@ -754,11 +508,11 @@ restore_files(void *arg)
(arguments->backup->backup_mode == BACKUP_MODE_DIFF_PAGE
|| arguments->backup->backup_mode == BACKUP_MODE_DIFF_PTRACK))
{
elog(VERBOSE, "The file didn`t changed. Skip restore: %s", file->path);
elog(VERBOSE, "The file didn`t change. Skip restore: %s", file->path);
continue;
}
/* Directories was created before */
/* Directories were created before */
if (S_ISDIR(file->mode))
{
elog(VERBOSE, "directory, skip");
@ -778,9 +532,18 @@ restore_files(void *arg)
* block and have BackupPageHeader meta information, so we cannot just
* copy the file from backup.
*/
elog(VERBOSE, "Restoring file %s, is_datafile %i, is_cfs %i", file->path, file->is_datafile?1:0, file->is_cfs?1:0);
elog(VERBOSE, "Restoring file %s, is_datafile %i, is_cfs %i",
file->path, file->is_datafile?1:0, file->is_cfs?1:0);
if (file->is_datafile && !file->is_cfs)
restore_data_file(from_root, pgdata, file, arguments->backup);
{
char to_path[MAXPGPATH];
join_path_components(to_path, pgdata,
file->path + strlen(from_root) + 1);
restore_data_file(to_path, file,
arguments->backup->backup_mode == BACKUP_MODE_DIFF_DELTA,
false);
}
else
copy_file(from_root, pgdata, file);
@ -1151,115 +914,3 @@ parseRecoveryTargetOptions(const char *target_time,
return rt;
}
/*
* Split argument into old_dir and new_dir and append to tablespace mapping
* list.
*
* Copy of function tablespace_list_append() from pg_basebackup.c.
*/
void
opt_tablespace_map(pgut_option *opt, const char *arg)
{
TablespaceListCell *cell = pgut_new(TablespaceListCell);
char *dst;
char *dst_ptr;
const char *arg_ptr;
dst_ptr = dst = cell->old_dir;
for (arg_ptr = arg; *arg_ptr; arg_ptr++)
{
if (dst_ptr - dst >= MAXPGPATH)
elog(ERROR, "directory name too long");
if (*arg_ptr == '\\' && *(arg_ptr + 1) == '=')
; /* skip backslash escaping = */
else if (*arg_ptr == '=' && (arg_ptr == arg || *(arg_ptr - 1) != '\\'))
{
if (*cell->new_dir)
elog(ERROR, "multiple \"=\" signs in tablespace mapping\n");
else
dst = dst_ptr = cell->new_dir;
}
else
*dst_ptr++ = *arg_ptr;
}
if (!*cell->old_dir || !*cell->new_dir)
elog(ERROR, "invalid tablespace mapping format \"%s\", "
"must be \"OLDDIR=NEWDIR\"", arg);
/*
* This check isn't absolutely necessary. But all tablespaces are created
* with absolute directories, so specifying a non-absolute path here would
* just never match, possibly confusing users. It's also good to be
* consistent with the new_dir check.
*/
if (!is_absolute_path(cell->old_dir))
elog(ERROR, "old directory is not an absolute path in tablespace mapping: %s\n",
cell->old_dir);
if (!is_absolute_path(cell->new_dir))
elog(ERROR, "new directory is not an absolute path in tablespace mapping: %s\n",
cell->new_dir);
if (tablespace_dirs.tail)
tablespace_dirs.tail->next = cell;
else
tablespace_dirs.head = cell;
tablespace_dirs.tail = cell;
}
/*
* Retrieve tablespace path, either relocated or original depending on whether
* -T was passed or not.
*
* Copy of function get_tablespace_mapping() from pg_basebackup.c.
*/
static const char *
get_tablespace_mapping(const char *dir)
{
TablespaceListCell *cell;
for (cell = tablespace_dirs.head; cell; cell = cell->next)
if (strcmp(dir, cell->old_dir) == 0)
return cell->new_dir;
return dir;
}
/*
* Save create directory path into memory. We can use it in next page restore to
* not raise the error "restore tablespace destination is not empty" in
* restore_directories().
*/
static void
set_tablespace_created(const char *link, const char *dir)
{
TablespaceCreatedListCell *cell = pgut_new(TablespaceCreatedListCell);
strcpy(cell->link_name, link);
strcpy(cell->linked_dir, dir);
cell->next = NULL;
if (tablespace_created_dirs.tail)
tablespace_created_dirs.tail->next = cell;
else
tablespace_created_dirs.head = cell;
tablespace_created_dirs.tail = cell;
}
/*
* Is directory was created when symlink was created in restore_directories().
*/
static const char *
get_tablespace_created(const char *link)
{
TablespaceCreatedListCell *cell;
for (cell = tablespace_created_dirs.head; cell; cell = cell->next)
if (strcmp(link, cell->link_name) == 0)
return cell->linked_dir;
return NULL;
}

View File

@ -247,8 +247,6 @@ show_instance(time_t requested_backup_id, bool show_name)
parray *backup_list;
backup_list = catalog_get_backup_list(requested_backup_id);
if (backup_list == NULL)
elog(ERROR, "Failed to get backup list.");
if (show_format == SHOW_PLAIN)
show_instance_plain(backup_list, show_name);

View File

@ -271,8 +271,9 @@ status2str(BackupStatus status)
{
"UNKNOWN",
"OK",
"RUNNING",
"ERROR",
"RUNNING",
"MERGING",
"DELETING",
"DELETED",
"DONE",
@ -322,36 +323,3 @@ remove_not_digit(char *buf, size_t len, const char *str)
}
buf[j] = '\0';
}
/* Fill pgBackup struct with default values */
void
pgBackup_init(pgBackup *backup)
{
backup->backup_id = INVALID_BACKUP_ID;
backup->backup_mode = BACKUP_MODE_INVALID;
backup->status = BACKUP_STATUS_INVALID;
backup->tli = 0;
backup->start_lsn = 0;
backup->stop_lsn = 0;
backup->start_time = (time_t) 0;
backup->end_time = (time_t) 0;
backup->recovery_xid = 0;
backup->recovery_time = (time_t) 0;
backup->data_bytes = BYTES_INVALID;
backup->wal_bytes = BYTES_INVALID;
backup->compress_alg = COMPRESS_ALG_DEFAULT;
backup->compress_level = COMPRESS_LEVEL_DEFAULT;
backup->block_size = BLCKSZ;
backup->wal_block_size = XLOG_BLCKSZ;
backup->checksum_version = 0;
backup->stream = false;
backup->from_replica = false;
backup->parent_backup = 0;
backup->primary_conninfo = NULL;
backup->program_version[0] = '\0';
backup->server_version[0] = '\0';
}

View File

@ -195,7 +195,7 @@ pgBackupValidateFiles(void *arg)
break;
}
crc = pgFileGetCRC(file);
crc = pgFileGetCRC(file->path);
if (crc != file->crc)
{
elog(WARNING, "Invalid CRC of backup file \"%s\" : %X. Expected %X",
@ -290,8 +290,6 @@ do_validate_instance(void)
/* Get list of all backups sorted in order of descending start time */
backups = catalog_get_backup_list(INVALID_BACKUP_ID);
if (backups == NULL)
elog(ERROR, "Failed to get backup list.");
/* Valiate each backup along with its xlog files. */
for (i = 0; i < parray_num(backups); i++)

View File

@ -50,7 +50,6 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
[--master-port=port] [--master-user=user_name]
[--replica-timeout=timeout]
pg_probackup restore -B backup-dir --instance=instance_name
[-D pgdata-dir] [-i backup-id] [--progress]
[--time=time|--xid=xid|--lsn=lsn [--inclusive=boolean]]
@ -73,6 +72,9 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
pg_probackup delete -B backup-dir --instance=instance_name
[--wal] [-i backup-id | --expired]
pg_probackup merge -B backup-dir --instance=instance_name
-i backup-id
pg_probackup add-instance -B backup-dir -D pgdata-dir
--instance=instance_name

View File

@ -614,6 +614,16 @@ class ProbackupTest(object):
return self.run_pb(cmd_list + options, async, gdb)
def merge_backup(self, backup_dir, instance, backup_id):
cmd_list = [
"merge",
"-B", backup_dir,
"--instance={0}".format(instance),
"-i", backup_id
]
return self.run_pb(cmd_list)
def restore_node(
self, backup_dir, instance, node=False,
data_dir=None, backup_id=None, options=[]

454
tests/merge.py Normal file
View File

@ -0,0 +1,454 @@
# coding: utf-8
import unittest
import os
from .helpers.ptrack_helpers import ProbackupTest
module_name = "merge"
class MergeTest(ProbackupTest, unittest.TestCase):
def test_merge_full_page(self):
"""
Test MERGE command, it merges FULL backup with target PAGE backups
"""
fname = self.id().split(".")[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, "backup")
# Initialize instance and backup directory
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
initdb_params=["--data-checksums"]
)
self.init_pb(backup_dir)
self.add_instance(backup_dir, "node", node)
self.set_archiving(backup_dir, "node", node)
node.start()
# Do full backup
self.backup_node(backup_dir, "node", node)
show_backup = self.show_pb(backup_dir, "node")[0]
self.assertEqual(show_backup["status"], "OK")
self.assertEqual(show_backup["backup-mode"], "FULL")
# Fill with data
with node.connect() as conn:
conn.execute("create table test (id int)")
conn.execute(
"insert into test select i from generate_series(1,10) s(i)")
conn.commit()
# Do first page backup
self.backup_node(backup_dir, "node", node, backup_type="page")
show_backup = self.show_pb(backup_dir, "node")[1]
# sanity check
self.assertEqual(show_backup["status"], "OK")
self.assertEqual(show_backup["backup-mode"], "PAGE")
# Fill with data
with node.connect() as conn:
conn.execute(
"insert into test select i from generate_series(1,10) s(i)")
count1 = conn.execute("select count(*) from test")
conn.commit()
# Do second page backup
self.backup_node(backup_dir, "node", node, backup_type="page")
show_backup = self.show_pb(backup_dir, "node")[2]
page_id = show_backup["id"]
if self.paranoia:
pgdata = self.pgdata_content(node.data_dir)
# sanity check
self.assertEqual(show_backup["status"], "OK")
self.assertEqual(show_backup["backup-mode"], "PAGE")
# Merge all backups
self.merge_backup(backup_dir, "node", page_id)
show_backups = self.show_pb(backup_dir, "node")
# sanity check
self.assertEqual(len(show_backups), 1)
self.assertEqual(show_backups[0]["status"], "OK")
self.assertEqual(show_backups[0]["backup-mode"], "FULL")
# Drop node and restore it
node.cleanup()
self.restore_node(backup_dir, 'node', node)
# Check physical correctness
if self.paranoia:
pgdata_restored = self.pgdata_content(
node.data_dir, ignore_ptrack=False)
self.compare_pgdata(pgdata, pgdata_restored)
node.slow_start()
# Check restored node
count2 = node.execute("postgres", "select count(*) from test")
self.assertEqual(count1, count2)
# Clean after yourself
node.cleanup()
self.del_test_dir(module_name, fname)
def test_merge_compressed_backups(self):
"""
Test MERGE command with compressed backups
"""
fname = self.id().split(".")[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, "backup")
# Initialize instance and backup directory
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
initdb_params=["--data-checksums"]
)
self.init_pb(backup_dir)
self.add_instance(backup_dir, "node", node)
self.set_archiving(backup_dir, "node", node)
node.start()
# Do full compressed backup
self.backup_node(backup_dir, "node", node, options=[
'--compress-algorithm=zlib'])
show_backup = self.show_pb(backup_dir, "node")[0]
self.assertEqual(show_backup["status"], "OK")
self.assertEqual(show_backup["backup-mode"], "FULL")
# Fill with data
with node.connect() as conn:
conn.execute("create table test (id int)")
conn.execute(
"insert into test select i from generate_series(1,10) s(i)")
count1 = conn.execute("select count(*) from test")
conn.commit()
# Do compressed page backup
self.backup_node(
backup_dir, "node", node, backup_type="page",
options=['--compress-algorithm=zlib'])
show_backup = self.show_pb(backup_dir, "node")[1]
page_id = show_backup["id"]
self.assertEqual(show_backup["status"], "OK")
self.assertEqual(show_backup["backup-mode"], "PAGE")
# Merge all backups
self.merge_backup(backup_dir, "node", page_id)
show_backups = self.show_pb(backup_dir, "node")
self.assertEqual(len(show_backups), 1)
self.assertEqual(show_backups[0]["status"], "OK")
self.assertEqual(show_backups[0]["backup-mode"], "FULL")
# Drop node and restore it
node.cleanup()
self.restore_node(backup_dir, 'node', node)
node.slow_start()
# Check restored node
count2 = node.execute("postgres", "select count(*) from test")
self.assertEqual(count1, count2)
# Clean after yourself
node.cleanup()
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_merge_tablespaces(self):
"""
Some test here
"""
def test_merge_page_truncate(self):
"""
make node, create table, take full backup,
delete last 3 pages, vacuum relation,
take page backup, merge full and page,
restore last page backup and check data correctness
"""
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
set_replication=True,
initdb_params=['--data-checksums'],
pg_options={
'wal_level': 'replica',
'max_wal_senders': '2',
'checkpoint_timeout': '300s',
'autovacuum': 'off'
}
)
node_restored = self.make_simple_node(
base_dir="{0}/{1}/node_restored".format(module_name, fname))
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
node_restored.cleanup()
node.start()
self.create_tblspace_in_node(node, 'somedata')
node.safe_psql(
"postgres",
"create sequence t_seq; "
"create table t_heap tablespace somedata as select i as id, "
"md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,1024) i;")
node.safe_psql(
"postgres",
"vacuum t_heap")
self.backup_node(backup_dir, 'node', node)
node.safe_psql(
"postgres",
"delete from t_heap where ctid >= '(11,0)'")
node.safe_psql(
"postgres",
"vacuum t_heap")
self.backup_node(
backup_dir, 'node', node, backup_type='page')
if self.paranoia:
pgdata = self.pgdata_content(node.data_dir)
page_id = self.show_pb(backup_dir, "node")[1]["id"]
self.merge_backup(backup_dir, "node", page_id)
self.validate_pb(backup_dir)
old_tablespace = self.get_tblspace_path(node, 'somedata')
new_tablespace = self.get_tblspace_path(node_restored, 'somedata_new')
self.restore_node(
backup_dir, 'node', node_restored,
options=[
"-j", "4",
"-T", "{0}={1}".format(old_tablespace, new_tablespace),
"--recovery-target-action=promote"])
# Physical comparison
if self.paranoia:
pgdata_restored = self.pgdata_content(node_restored.data_dir)
self.compare_pgdata(pgdata, pgdata_restored)
node_restored.append_conf(
"postgresql.auto.conf", "port = {0}".format(node_restored.port))
node_restored.slow_start()
# Logical comparison
result1 = node.safe_psql(
"postgres",
"select * from t_heap")
result2 = node_restored.safe_psql(
"postgres",
"select * from t_heap")
self.assertEqual(result1, result2)
# Clean after yourself
self.del_test_dir(module_name, fname)
def test_merge_delta_truncate(self):
"""
make node, create table, take full backup,
delete last 3 pages, vacuum relation,
take page backup, merge full and page,
restore last page backup and check data correctness
"""
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
set_replication=True,
initdb_params=['--data-checksums'],
pg_options={
'wal_level': 'replica',
'max_wal_senders': '2',
'checkpoint_timeout': '300s',
'autovacuum': 'off'
}
)
node_restored = self.make_simple_node(
base_dir="{0}/{1}/node_restored".format(module_name, fname))
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
node_restored.cleanup()
node.start()
self.create_tblspace_in_node(node, 'somedata')
node.safe_psql(
"postgres",
"create sequence t_seq; "
"create table t_heap tablespace somedata as select i as id, "
"md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,1024) i;")
node.safe_psql(
"postgres",
"vacuum t_heap")
self.backup_node(backup_dir, 'node', node)
node.safe_psql(
"postgres",
"delete from t_heap where ctid >= '(11,0)'")
node.safe_psql(
"postgres",
"vacuum t_heap")
self.backup_node(
backup_dir, 'node', node, backup_type='delta')
if self.paranoia:
pgdata = self.pgdata_content(node.data_dir)
page_id = self.show_pb(backup_dir, "node")[1]["id"]
self.merge_backup(backup_dir, "node", page_id)
self.validate_pb(backup_dir)
old_tablespace = self.get_tblspace_path(node, 'somedata')
new_tablespace = self.get_tblspace_path(node_restored, 'somedata_new')
self.restore_node(
backup_dir, 'node', node_restored,
options=[
"-j", "4",
"-T", "{0}={1}".format(old_tablespace, new_tablespace),
"--recovery-target-action=promote"])
# Physical comparison
if self.paranoia:
pgdata_restored = self.pgdata_content(node_restored.data_dir)
self.compare_pgdata(pgdata, pgdata_restored)
node_restored.append_conf(
"postgresql.auto.conf", "port = {0}".format(node_restored.port))
node_restored.slow_start()
# Logical comparison
result1 = node.safe_psql(
"postgres",
"select * from t_heap")
result2 = node_restored.safe_psql(
"postgres",
"select * from t_heap")
self.assertEqual(result1, result2)
# Clean after yourself
self.del_test_dir(module_name, fname)
def test_merge_ptrack_truncate(self):
"""
make node, create table, take full backup,
delete last 3 pages, vacuum relation,
take page backup, merge full and page,
restore last page backup and check data correctness
"""
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
set_replication=True,
initdb_params=['--data-checksums'],
pg_options={
'wal_level': 'replica',
'max_wal_senders': '2',
'checkpoint_timeout': '300s',
'autovacuum': 'off'
}
)
node_restored = self.make_simple_node(
base_dir="{0}/{1}/node_restored".format(module_name, fname))
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
node_restored.cleanup()
node.start()
self.create_tblspace_in_node(node, 'somedata')
node.safe_psql(
"postgres",
"create sequence t_seq; "
"create table t_heap tablespace somedata as select i as id, "
"md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,1024) i;")
node.safe_psql(
"postgres",
"vacuum t_heap")
self.backup_node(backup_dir, 'node', node)
node.safe_psql(
"postgres",
"delete from t_heap where ctid >= '(11,0)'")
node.safe_psql(
"postgres",
"vacuum t_heap")
self.backup_node(
backup_dir, 'node', node, backup_type='delta')
if self.paranoia:
pgdata = self.pgdata_content(node.data_dir)
page_id = self.show_pb(backup_dir, "node")[1]["id"]
self.merge_backup(backup_dir, "node", page_id)
self.validate_pb(backup_dir)
old_tablespace = self.get_tblspace_path(node, 'somedata')
new_tablespace = self.get_tblspace_path(node_restored, 'somedata_new')
self.restore_node(
backup_dir, 'node', node_restored,
options=[
"-j", "4",
"-T", "{0}={1}".format(old_tablespace, new_tablespace),
"--recovery-target-action=promote"])
# Physical comparison
if self.paranoia:
pgdata_restored = self.pgdata_content(node_restored.data_dir)
self.compare_pgdata(pgdata, pgdata_restored)
node_restored.append_conf(
"postgresql.auto.conf", "port = {0}".format(node_restored.port))
node_restored.slow_start()
# Logical comparison
result1 = node.safe_psql(
"postgres",
"select * from t_heap")
result2 = node_restored.safe_psql(
"postgres",
"select * from t_heap")
self.assertEqual(result1, result2)
# Clean after yourself
self.del_test_dir(module_name, fname)

View File

@ -12,6 +12,7 @@ class OptionTest(ProbackupTest, unittest.TestCase):
# @unittest.expectedFailure
def test_help_1(self):
"""help options"""
self.maxDiff = None
fname = self.id().split(".")[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
with open(os.path.join(self.dir_path, "expected/option_help.out"), "rb") as help_out: