1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-17 21:18:00 +02:00

Separated data.c => backup_data_page() into 2 functions:

prepare_page() and compress_and_backup_page().
Additionally, fixed formatting in some places.
(task pgpro-1726 in jira)
This commit is contained in:
Anna 2018-07-13 19:30:48 +03:00
parent f98c91b3ba
commit 086cd4f7d4

View File

@ -27,7 +27,7 @@
#ifdef HAVE_LIBZ
/* Implementation of zlib compression method */
static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
static int32 zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
{
uLongf compressed_size = dst_size;
int rc = compress2(dst, &compressed_size, src, src_size, compress_level);
@ -35,7 +35,7 @@ static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t
}
/* Implementation of zlib compression method */
static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size)
static int32 zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size)
{
uLongf dest_len = dst_size;
int rc = uncompress(dst, &dest_len, src, src_size);
@ -47,7 +47,7 @@ static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_
* Compresses source into dest using algorithm. Returns the number of bytes
* written in the destination buffer, or -1 if compression fails.
*/
static size_t
static int32
do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
{
switch (alg)
@ -70,7 +70,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, Compre
* Decompresses source into dest using algorithm. Returns the number of bytes
* decompressed in the destination buffer, or -1 if decompression fails.
*/
static size_t
static int32
do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg)
{
switch (alg)
@ -101,6 +101,7 @@ typedef struct BackupPageHeader
/* Special value for compressed_size field */
#define PageIsTruncated -2
#define SkipCurrentPage -3
/* Verify page's header */
static bool
@ -134,8 +135,8 @@ static int
read_page_from_file(pgFile *file, BlockNumber blknum,
FILE *in, Page page, XLogRecPtr *page_lsn)
{
off_t offset = blknum*BLCKSZ;
size_t read_len = 0;
off_t offset = blknum * BLCKSZ;
size_t read_len = 0;
/* read the block */
if (fseek(in, offset, SEEK_SET) != 0)
@ -216,32 +217,29 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
}
/*
* Backup the specified block from a file of a relation.
* Verify page header and checksum of the page and write it
* to the backup file.
* Retrieves a page taking the backup mode into account
* and writes it into argument "page". Argument "page"
* should be a pointer to allocated BLCKSZ of bytes.
*
* Prints appropriate warnings/errors/etc into log.
* Returns 0 if page was successfully retrieved
* SkipCurrentPage(-3) if we need to skip this page
* PageIsTruncated(-2) if the page was truncated
*/
static void
backup_data_page(backup_files_args *arguments,
static int32
prepare_page(backup_files_args *arguments,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BlockNumber blknum, BlockNumber nblocks,
FILE *in, FILE *out,
pg_crc32 *crc, int *n_skipped,
BackupMode backup_mode)
FILE *in, int *n_skipped,
BackupMode backup_mode,
Page page)
{
BackupPageHeader header;
Page page = malloc(BLCKSZ);
Page compressed_page = NULL;
XLogRecPtr page_lsn = 0;
size_t write_buffer_size;
char write_buffer[BLCKSZ+sizeof(header)];
int try_again = 100;
bool page_is_valid = false;
XLogRecPtr page_lsn = 0;
int try_again = 100;
bool page_is_valid = false;
bool page_is_truncated = false;
BlockNumber absolute_blknum = file->segno * RELSEG_SIZE + blknum;
header.block = blknum;
header.compressed_size = 0;
/* check for interrupt */
if (interrupted)
elog(ERROR, "Interrupted during backup");
@ -262,7 +260,7 @@ backup_data_page(backup_files_args *arguments,
if (result == 0)
{
/* This block was truncated.*/
header.compressed_size = PageIsTruncated;
page_is_truncated = true;
/* Page is not actually valid, but it is absent
* and we're not going to reread it or validate */
page_is_valid = true;
@ -295,35 +293,38 @@ backup_data_page(backup_files_args *arguments,
if (backup_mode == BACKUP_MODE_DIFF_PTRACK || (!page_is_valid && is_ptrack_support))
{
size_t page_size = 0;
free(page);
page = NULL;
page = (Page) pg_ptrack_get_block(arguments, file->dbOid, file->tblspcOid,
Page ptrack_page = NULL;
ptrack_page = (Page) pg_ptrack_get_block(arguments, file->dbOid, file->tblspcOid,
file->relOid, absolute_blknum, &page_size);
if (page == NULL)
if (ptrack_page == NULL)
{
/* This block was truncated.*/
header.compressed_size = PageIsTruncated;
page_is_truncated = true;
}
else if (page_size != BLCKSZ)
{
free(ptrack_page);
elog(ERROR, "File: %s, block %u, expected block size %d, but read %lu",
file->path, absolute_blknum, BLCKSZ, page_size);
}
else
{
/*
* We need to copy the page that was successfully
* retreieved from ptrack into our output "page" parameter.
* We must set checksum here, because it is outdated
* in the block recieved from shared buffers.
*/
memcpy(page, ptrack_page, BLCKSZ);
free(ptrack_page);
if (is_checksum_enabled)
((PageHeader) page)->pd_checksum = pg_checksum_page(page, absolute_blknum);
}
/* get lsn from page, provided by pg_ptrack_get_block() */
if (backup_mode == BACKUP_MODE_DIFF_DELTA &&
file->exists_in_prev &&
header.compressed_size != PageIsTruncated &&
!page_is_truncated &&
!parse_page(page, &page_lsn))
elog(ERROR, "Cannot parse page after pg_ptrack_get_block. "
"Possible risk of a memory corruption");
@ -332,52 +333,70 @@ backup_data_page(backup_files_args *arguments,
if (backup_mode == BACKUP_MODE_DIFF_DELTA &&
file->exists_in_prev &&
header.compressed_size != PageIsTruncated &&
!page_is_truncated &&
page_lsn < prev_backup_start_lsn)
{
elog(VERBOSE, "Skipping blknum: %u in file: %s", blknum, file->path);
(*n_skipped)++;
free(page);
return;
return SkipCurrentPage;
}
if (header.compressed_size != PageIsTruncated)
{
file->read_size += BLCKSZ;
if (page_is_truncated)
return PageIsTruncated;
compressed_page = malloc(BLCKSZ);
return 0;
}
static void
compress_and_backup_page(pgFile *file, BlockNumber blknum,
FILE *in, FILE *out, pg_crc32 *crc,
int page_state, Page page)
{
BackupPageHeader header;
size_t write_buffer_size = sizeof(header);
char write_buffer[BLCKSZ+sizeof(header)];
char compressed_page[BLCKSZ];
if(page_state == SkipCurrentPage)
return;
header.block = blknum;
header.compressed_size = page_state;
if(page_state == PageIsTruncated)
{
/*
* The page was truncated. Write only header
* to know that we must truncate restored file
*/
memcpy(write_buffer, &header, sizeof(header));
}
else
{
/* The page was not truncated, so we need to compress it */
header.compressed_size = do_compress(compressed_page, BLCKSZ,
page, BLCKSZ, compress_alg);
page, BLCKSZ, compress_alg);
file->compress_alg = compress_alg;
file->read_size += BLCKSZ;
Assert (header.compressed_size <= BLCKSZ);
}
write_buffer_size = sizeof(header);
/*
* The page was truncated. Write only header
* to know that we must truncate restored file
*/
if (header.compressed_size == PageIsTruncated)
{
memcpy(write_buffer, &header, sizeof(header));
}
/* The page compression failed. Write it as is. */
else if (header.compressed_size == -1)
{
header.compressed_size = BLCKSZ;
memcpy(write_buffer, &header, sizeof(header));
memcpy(write_buffer + sizeof(header), page, BLCKSZ);
write_buffer_size += header.compressed_size;
}
/* The page was successfully compressed */
else if (header.compressed_size > 0)
{
memcpy(write_buffer, &header, sizeof(header));
memcpy(write_buffer + sizeof(header), compressed_page, header.compressed_size);
write_buffer_size += MAXALIGN(header.compressed_size);
/* The page was successfully compressed. */
if (header.compressed_size > 0)
{
memcpy(write_buffer, &header, sizeof(header));
memcpy(write_buffer + sizeof(header),
compressed_page, header.compressed_size);
write_buffer_size += MAXALIGN(header.compressed_size);
}
/* Nonpositive value means that compression failed. Write it as is. */
else
{
header.compressed_size = BLCKSZ;
memcpy(write_buffer, &header, sizeof(header));
memcpy(write_buffer + sizeof(header), page, BLCKSZ);
write_buffer_size += header.compressed_size;
}
}
/* elog(VERBOSE, "backup blkno %u, compressed_size %d write_buffer_size %ld",
@ -389,7 +408,7 @@ backup_data_page(backup_files_args *arguments,
/* write data page */
if(fwrite(write_buffer, 1, write_buffer_size, out) != write_buffer_size)
{
int errno_tmp = errno;
int errno_tmp = errno;
fclose(in);
fclose(out);
elog(ERROR, "File: %s, cannot write backup at block %u : %s",
@ -397,11 +416,6 @@ backup_data_page(backup_files_args *arguments,
}
file->write_size += write_buffer_size;
if (page != NULL)
free(page);
if (compressed_page != NULL)
free(compressed_page);
}
/*
@ -418,13 +432,15 @@ backup_data_file(backup_files_args* arguments,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BackupMode backup_mode)
{
char to_path[MAXPGPATH];
FILE *in;
FILE *out;
BlockNumber blknum = 0;
BlockNumber nblocks = 0;
int n_blocks_skipped = 0;
int n_blocks_read = 0;
char to_path[MAXPGPATH];
FILE *in;
FILE *out;
BlockNumber blknum = 0;
BlockNumber nblocks = 0;
int n_blocks_skipped = 0;
int n_blocks_read = 0;
int page_state;
char curr_page[BLCKSZ];
/*
* Skip unchanged file only if it exists in previous backup.
@ -503,9 +519,11 @@ backup_data_file(backup_files_args* arguments,
{
for (blknum = 0; blknum < nblocks; blknum++)
{
backup_data_page(arguments, file, prev_backup_start_lsn, blknum,
nblocks, in, out, &(file->crc),
&n_blocks_skipped, backup_mode);
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page);
n_blocks_read++;
}
if (backup_mode == BACKUP_MODE_DIFF_DELTA)
@ -518,9 +536,11 @@ backup_data_file(backup_files_args* arguments,
iter = datapagemap_iterate(&file->pagemap);
while (datapagemap_next(iter, &blknum))
{
backup_data_page(arguments, file, prev_backup_start_lsn, blknum,
nblocks, in, out, &(file->crc),
&n_blocks_skipped, backup_mode);
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page);
n_blocks_read++;
}
@ -635,7 +655,8 @@ restore_data_file(const char *from_root,
}
if (header.block < blknum)
elog(ERROR, "backup is broken at file->path %s block %u",file->path, blknum);
elog(ERROR, "backup is broken at file->path %s block %u",
file->path, blknum);
if (header.compressed_size == PageIsTruncated)
{
@ -646,7 +667,8 @@ restore_data_file(const char *from_root,
if (ftruncate(fileno(out), header.block * BLCKSZ) != 0)
elog(ERROR, "cannot truncate \"%s\": %s",
file->path, strerror(errno));
elog(VERBOSE, "truncate file %s to block %u", file->path, header.block);
elog(VERBOSE, "truncate file %s to block %u",
file->path, header.block);
break;
}
@ -664,10 +686,12 @@ restore_data_file(const char *from_root,
uncompressed_size = do_decompress(page.data, BLCKSZ,
compressed_page.data,
header.compressed_size, file->compress_alg);
header.compressed_size,
file->compress_alg);
if (uncompressed_size != BLCKSZ)
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", uncompressed_size);
elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ",
uncompressed_size);
}
/*
@ -714,7 +738,8 @@ restore_data_file(const char *from_root,
if (ftruncate(fileno(out), file->n_blocks * BLCKSZ) != 0)
elog(ERROR, "cannot truncate \"%s\": %s",
file->path, strerror(errno));
elog(INFO, "Delta truncate file %s to block %u", file->path, file->n_blocks);
elog(INFO, "Delta truncate file %s to block %u",
file->path, file->n_blocks);
}
}