diff --git a/src/data.c b/src/data.c index 476cbbdf..544fb98c 100644 --- a/src/data.c +++ b/src/data.c @@ -27,7 +27,7 @@ #ifdef HAVE_LIBZ /* Implementation of zlib compression method */ -static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size) +static int32 zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size) { uLongf compressed_size = dst_size; int rc = compress2(dst, &compressed_size, src, src_size, compress_level); @@ -35,7 +35,7 @@ static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t } /* Implementation of zlib compression method */ -static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size) +static int32 zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size) { uLongf dest_len = dst_size; int rc = uncompress(dst, &dest_len, src, src_size); @@ -47,7 +47,7 @@ static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_ * Compresses source into dest using algorithm. Returns the number of bytes * written in the destination buffer, or -1 if compression fails. */ -static size_t +static int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg) { switch (alg) @@ -70,7 +70,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, Compre * Decompresses source into dest using algorithm. Returns the number of bytes * decompressed in the destination buffer, or -1 if decompression fails. */ -static size_t +static int32 do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg) { switch (alg) @@ -101,6 +101,7 @@ typedef struct BackupPageHeader /* Special value for compressed_size field */ #define PageIsTruncated -2 +#define SkipCurrentPage -3 /* Verify page's header */ static bool @@ -134,8 +135,8 @@ static int read_page_from_file(pgFile *file, BlockNumber blknum, FILE *in, Page page, XLogRecPtr *page_lsn) { - off_t offset = blknum*BLCKSZ; - size_t read_len = 0; + off_t offset = blknum * BLCKSZ; + size_t read_len = 0; /* read the block */ if (fseek(in, offset, SEEK_SET) != 0) @@ -216,32 +217,29 @@ read_page_from_file(pgFile *file, BlockNumber blknum, } /* - * Backup the specified block from a file of a relation. - * Verify page header and checksum of the page and write it - * to the backup file. + * Retrieves a page taking the backup mode into account + * and writes it into argument "page". Argument "page" + * should be a pointer to allocated BLCKSZ of bytes. + * + * Prints appropriate warnings/errors/etc into log. + * Returns 0 if page was successfully retrieved + * SkipCurrentPage(-3) if we need to skip this page + * PageIsTruncated(-2) if the page was truncated */ -static void -backup_data_page(backup_files_args *arguments, +static int32 +prepare_page(backup_files_args *arguments, pgFile *file, XLogRecPtr prev_backup_start_lsn, BlockNumber blknum, BlockNumber nblocks, - FILE *in, FILE *out, - pg_crc32 *crc, int *n_skipped, - BackupMode backup_mode) + FILE *in, int *n_skipped, + BackupMode backup_mode, + Page page) { - BackupPageHeader header; - Page page = malloc(BLCKSZ); - Page compressed_page = NULL; - XLogRecPtr page_lsn = 0; - size_t write_buffer_size; - char write_buffer[BLCKSZ+sizeof(header)]; - - int try_again = 100; - bool page_is_valid = false; + XLogRecPtr page_lsn = 0; + int try_again = 100; + bool page_is_valid = false; + bool page_is_truncated = false; BlockNumber absolute_blknum = file->segno * RELSEG_SIZE + blknum; - header.block = blknum; - header.compressed_size = 0; - /* check for interrupt */ if (interrupted) elog(ERROR, "Interrupted during backup"); @@ -262,7 +260,7 @@ backup_data_page(backup_files_args *arguments, if (result == 0) { /* This block was truncated.*/ - header.compressed_size = PageIsTruncated; + page_is_truncated = true; /* Page is not actually valid, but it is absent * and we're not going to reread it or validate */ page_is_valid = true; @@ -295,35 +293,38 @@ backup_data_page(backup_files_args *arguments, if (backup_mode == BACKUP_MODE_DIFF_PTRACK || (!page_is_valid && is_ptrack_support)) { size_t page_size = 0; - - free(page); - page = NULL; - page = (Page) pg_ptrack_get_block(arguments, file->dbOid, file->tblspcOid, + Page ptrack_page = NULL; + ptrack_page = (Page) pg_ptrack_get_block(arguments, file->dbOid, file->tblspcOid, file->relOid, absolute_blknum, &page_size); - if (page == NULL) + if (ptrack_page == NULL) { /* This block was truncated.*/ - header.compressed_size = PageIsTruncated; + page_is_truncated = true; } else if (page_size != BLCKSZ) { + free(ptrack_page); elog(ERROR, "File: %s, block %u, expected block size %d, but read %lu", file->path, absolute_blknum, BLCKSZ, page_size); } else { /* + * We need to copy the page that was successfully + * retreieved from ptrack into our output "page" parameter. * We must set checksum here, because it is outdated * in the block recieved from shared buffers. */ + memcpy(page, ptrack_page, BLCKSZ); + free(ptrack_page); if (is_checksum_enabled) ((PageHeader) page)->pd_checksum = pg_checksum_page(page, absolute_blknum); } /* get lsn from page, provided by pg_ptrack_get_block() */ if (backup_mode == BACKUP_MODE_DIFF_DELTA && file->exists_in_prev && - header.compressed_size != PageIsTruncated && + !page_is_truncated && !parse_page(page, &page_lsn)) elog(ERROR, "Cannot parse page after pg_ptrack_get_block. " "Possible risk of a memory corruption"); @@ -332,52 +333,70 @@ backup_data_page(backup_files_args *arguments, if (backup_mode == BACKUP_MODE_DIFF_DELTA && file->exists_in_prev && - header.compressed_size != PageIsTruncated && + !page_is_truncated && page_lsn < prev_backup_start_lsn) { elog(VERBOSE, "Skipping blknum: %u in file: %s", blknum, file->path); (*n_skipped)++; - free(page); - return; + return SkipCurrentPage; } - if (header.compressed_size != PageIsTruncated) - { - file->read_size += BLCKSZ; + if (page_is_truncated) + return PageIsTruncated; - compressed_page = malloc(BLCKSZ); + return 0; +} + +static void +compress_and_backup_page(pgFile *file, BlockNumber blknum, + FILE *in, FILE *out, pg_crc32 *crc, + int page_state, Page page) +{ + BackupPageHeader header; + size_t write_buffer_size = sizeof(header); + char write_buffer[BLCKSZ+sizeof(header)]; + char compressed_page[BLCKSZ]; + + if(page_state == SkipCurrentPage) + return; + + header.block = blknum; + header.compressed_size = page_state; + + if(page_state == PageIsTruncated) + { + /* + * The page was truncated. Write only header + * to know that we must truncate restored file + */ + memcpy(write_buffer, &header, sizeof(header)); + } + else + { + /* The page was not truncated, so we need to compress it */ header.compressed_size = do_compress(compressed_page, BLCKSZ, - page, BLCKSZ, compress_alg); + page, BLCKSZ, compress_alg); file->compress_alg = compress_alg; - + file->read_size += BLCKSZ; Assert (header.compressed_size <= BLCKSZ); - } - write_buffer_size = sizeof(header); - - /* - * The page was truncated. Write only header - * to know that we must truncate restored file - */ - if (header.compressed_size == PageIsTruncated) - { - memcpy(write_buffer, &header, sizeof(header)); - } - /* The page compression failed. Write it as is. */ - else if (header.compressed_size == -1) - { - header.compressed_size = BLCKSZ; - memcpy(write_buffer, &header, sizeof(header)); - memcpy(write_buffer + sizeof(header), page, BLCKSZ); - write_buffer_size += header.compressed_size; - } - /* The page was successfully compressed */ - else if (header.compressed_size > 0) - { - memcpy(write_buffer, &header, sizeof(header)); - memcpy(write_buffer + sizeof(header), compressed_page, header.compressed_size); - write_buffer_size += MAXALIGN(header.compressed_size); + /* The page was successfully compressed. */ + if (header.compressed_size > 0) + { + memcpy(write_buffer, &header, sizeof(header)); + memcpy(write_buffer + sizeof(header), + compressed_page, header.compressed_size); + write_buffer_size += MAXALIGN(header.compressed_size); + } + /* Nonpositive value means that compression failed. Write it as is. */ + else + { + header.compressed_size = BLCKSZ; + memcpy(write_buffer, &header, sizeof(header)); + memcpy(write_buffer + sizeof(header), page, BLCKSZ); + write_buffer_size += header.compressed_size; + } } /* elog(VERBOSE, "backup blkno %u, compressed_size %d write_buffer_size %ld", @@ -389,7 +408,7 @@ backup_data_page(backup_files_args *arguments, /* write data page */ if(fwrite(write_buffer, 1, write_buffer_size, out) != write_buffer_size) { - int errno_tmp = errno; + int errno_tmp = errno; fclose(in); fclose(out); elog(ERROR, "File: %s, cannot write backup at block %u : %s", @@ -397,11 +416,6 @@ backup_data_page(backup_files_args *arguments, } file->write_size += write_buffer_size; - - if (page != NULL) - free(page); - if (compressed_page != NULL) - free(compressed_page); } /* @@ -418,13 +432,15 @@ backup_data_file(backup_files_args* arguments, pgFile *file, XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode) { - char to_path[MAXPGPATH]; - FILE *in; - FILE *out; - BlockNumber blknum = 0; - BlockNumber nblocks = 0; - int n_blocks_skipped = 0; - int n_blocks_read = 0; + char to_path[MAXPGPATH]; + FILE *in; + FILE *out; + BlockNumber blknum = 0; + BlockNumber nblocks = 0; + int n_blocks_skipped = 0; + int n_blocks_read = 0; + int page_state; + char curr_page[BLCKSZ]; /* * Skip unchanged file only if it exists in previous backup. @@ -503,9 +519,11 @@ backup_data_file(backup_files_args* arguments, { for (blknum = 0; blknum < nblocks; blknum++) { - backup_data_page(arguments, file, prev_backup_start_lsn, blknum, - nblocks, in, out, &(file->crc), - &n_blocks_skipped, backup_mode); + page_state = prepare_page(arguments, file, prev_backup_start_lsn, + blknum, nblocks, in, &n_blocks_skipped, + backup_mode, curr_page); + compress_and_backup_page(file, blknum, in, out, &(file->crc), + page_state, curr_page); n_blocks_read++; } if (backup_mode == BACKUP_MODE_DIFF_DELTA) @@ -518,9 +536,11 @@ backup_data_file(backup_files_args* arguments, iter = datapagemap_iterate(&file->pagemap); while (datapagemap_next(iter, &blknum)) { - backup_data_page(arguments, file, prev_backup_start_lsn, blknum, - nblocks, in, out, &(file->crc), - &n_blocks_skipped, backup_mode); + page_state = prepare_page(arguments, file, prev_backup_start_lsn, + blknum, nblocks, in, &n_blocks_skipped, + backup_mode, curr_page); + compress_and_backup_page(file, blknum, in, out, &(file->crc), + page_state, curr_page); n_blocks_read++; } @@ -635,7 +655,8 @@ restore_data_file(const char *from_root, } if (header.block < blknum) - elog(ERROR, "backup is broken at file->path %s block %u",file->path, blknum); + elog(ERROR, "backup is broken at file->path %s block %u", + file->path, blknum); if (header.compressed_size == PageIsTruncated) { @@ -646,7 +667,8 @@ restore_data_file(const char *from_root, if (ftruncate(fileno(out), header.block * BLCKSZ) != 0) elog(ERROR, "cannot truncate \"%s\": %s", file->path, strerror(errno)); - elog(VERBOSE, "truncate file %s to block %u", file->path, header.block); + elog(VERBOSE, "truncate file %s to block %u", + file->path, header.block); break; } @@ -664,10 +686,12 @@ restore_data_file(const char *from_root, uncompressed_size = do_decompress(page.data, BLCKSZ, compressed_page.data, - header.compressed_size, file->compress_alg); + header.compressed_size, + file->compress_alg); if (uncompressed_size != BLCKSZ) - elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", uncompressed_size); + elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", + uncompressed_size); } /* @@ -714,7 +738,8 @@ restore_data_file(const char *from_root, if (ftruncate(fileno(out), file->n_blocks * BLCKSZ) != 0) elog(ERROR, "cannot truncate \"%s\": %s", file->path, strerror(errno)); - elog(INFO, "Delta truncate file %s to block %u", file->path, file->n_blocks); + elog(INFO, "Delta truncate file %s to block %u", + file->path, file->n_blocks); } }