From 3873318eec4bf817a782d9ff879370d0a1ccdb5e Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 3 Jul 2019 23:22:48 +0300 Subject: [PATCH] Decompress restored file by remote agent --- src/data.c | 77 +++++++++++++++++----------------------------- src/pg_probackup.h | 2 ++ src/utils/file.c | 45 +++++++++++++++++++++++++++ src/utils/file.h | 4 ++- 4 files changed, 79 insertions(+), 49 deletions(-) diff --git a/src/data.c b/src/data.c index 0304f10b..950fe400 100644 --- a/src/data.c +++ b/src/data.c @@ -89,7 +89,7 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, * Decompresses source into dest using algorithm. Returns the number of bytes * decompressed in the destination buffer, or -1 if decompression fails. */ -static int32 +int32 do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg, const char **errormsg) { @@ -719,6 +719,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, BlockNumber blknum = 0, truncate_from = 0; bool need_truncate = false; + size_t rc; /* BYTES_INVALID allowed only in case of restoring file from DELTA backup */ if (file->write_size != BYTES_INVALID) @@ -750,9 +751,9 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, { off_t write_pos; size_t read_len; - DataPage compressed_page; /* used as read buffer */ DataPage page; - int32 uncompressed_size = 0; + int32 compressed_size; + const char *errormsg = NULL; /* File didn`t changed. Nothig to copy */ if (file->write_size == BYTES_INVALID) @@ -789,7 +790,9 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, blknum, file->path, strerror(errno_tmp)); } - if (header.block == 0 && header.compressed_size == 0) + compressed_size = header.compressed_size; + + if (header.block == 0 && compressed_size) { elog(VERBOSE, "Skip empty block of \"%s\"", file->path); continue; @@ -801,7 +804,7 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, blknum = header.block; - if (header.compressed_size == PageIsTruncated) + if (compressed_size == PageIsTruncated) { /* * Backup contains information that this block was truncated. @@ -812,39 +815,14 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, break; } - Assert(header.compressed_size <= BLCKSZ); + Assert(compressed_size <= BLCKSZ); /* read a page from file */ - read_len = fread(compressed_page.data, 1, - MAXALIGN(header.compressed_size), in); - if (read_len != MAXALIGN(header.compressed_size)) + read_len = fread(page.data, 1, + MAXALIGN(compressed_size), in); + if (read_len != MAXALIGN(compressed_size)) elog(ERROR, "Cannot read block %u of \"%s\" read %zu of %d", - blknum, file->path, read_len, header.compressed_size); - - /* - * if page size is smaller than BLCKSZ, decompress the page. - * BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ. - * we have to check, whether it is compressed or not using - * page_may_be_compressed() function. - */ - if (header.compressed_size != BLCKSZ - || page_may_be_compressed(compressed_page.data, file->compress_alg, - backup_version)) - { - const char *errormsg = NULL; - - uncompressed_size = do_decompress(page.data, BLCKSZ, - compressed_page.data, - header.compressed_size, - file->compress_alg, &errormsg); - if (uncompressed_size < 0 && errormsg != NULL) - elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s", - blknum, file->path, errormsg); - - if (uncompressed_size != BLCKSZ) - elog(ERROR, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ", - file->path, uncompressed_size); - } + blknum, file->path, read_len, compressed_size); write_pos = (write_header) ? blknum * (BLCKSZ + sizeof(header)) : blknum * BLCKSZ; @@ -865,21 +843,24 @@ restore_data_file(const char *to_path, pgFile *file, bool allow_truncate, blknum, file->path, strerror(errno)); } - /* if we uncompressed the page - write page.data, - * if page wasn't compressed - - * write what we've read - compressed_page.data + /* + * if page size is smaller than BLCKSZ, decompress the page. + * BUGFIX for versions < 2.0.23: if page size is equal to BLCKSZ. + * we have to check, whether it is compressed or not using + * page_may_be_compressed() function. */ - if (uncompressed_size == BLCKSZ) + rc = (compressed_size != BLCKSZ || page_may_be_compressed(page.data, file->compress_alg, backup_version)) + ? fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg, &errormsg) + : fio_fwrite(out, page.data, compressed_size); + + if (rc != compressed_size) { - if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ) - elog(ERROR, "Cannot write block %u of \"%s\": %s", - blknum, file->path, strerror(errno)); - } - else - { - if (fio_fwrite(out, compressed_page.data, BLCKSZ) != BLCKSZ) - elog(ERROR, "Cannot write block %u of \"%s\": %s", - blknum, file->path, strerror(errno)); + if (errormsg != NULL) + elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s", + blknum, file->path, errormsg); + + elog(ERROR, "Cannot write block %u of \"%s\": %s", + blknum, file->path, strerror(errno)); } } diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 36e23db4..8a02fbb7 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -700,6 +700,8 @@ int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo); extern void check_system_identifiers(PGconn *conn, char *pgdata); extern void parse_filelist_filenames(parray *files, const char *root); +extern int32 do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, + CompressAlg alg, const char **errormsg); #endif /* PG_PROBACKUP_H */ diff --git a/src/utils/file.c b/src/utils/file.c index d9f669a3..a8b72dfb 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -566,6 +566,35 @@ size_t fio_fwrite(FILE* f, void const* buf, size_t size) : fwrite(buf, 1, size, f); } +/* Write data to stdio file */ +size_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char** errmsg) +{ + uint32 decompressed_size; + char decompressed_page[BLCKSZ]; + + if (fio_is_remote_file(f)) + { + fio_header hdr; + + hdr.cop = FIO_WRITE_COMPRESSED; + hdr.handle = fio_fileno(f) & ~FIO_PIPE_MARKER; + hdr.size = size; + hdr.arg = compress_alg; + + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, buf, size), size); + + return size; + } + decompressed_size = do_decompress(decompressed_page, + BLCKSZ, + buf, + size, + (CompressAlg)compress_alg, errmsg); + return decompressed_size != BLCKSZ + ? 0 : fwrite(decompressed_page, 1, decompressed_size, f); +} + /* Write data to the file */ ssize_t fio_write(int fd, void const* buf, size_t size) { @@ -1381,6 +1410,22 @@ void fio_communicate(int in, int out) case FIO_WRITE: /* Write to the current position in file */ IO_CHECK(fio_write_all(fd[hdr.handle], buf, hdr.size), hdr.size); break; + case FIO_WRITE_COMPRESSED: /* Write to the current position in file */ + { + char decompressed_page[BLCKSZ]; + char const* errmsg = NULL; + int32 decompressed_size = do_decompress(decompressed_page, BLCKSZ, + buf, + hdr.size, + (CompressAlg)hdr.arg, &errmsg); + if (errmsg != NULL || decompressed_size != BLCKSZ) + { + fprintf(stderr, "Failed to decompress block: %s", errmsg ? errmsg: "unknown error"); + exit(EXIT_FAILURE); + } + IO_CHECK(fio_write_all(fd[hdr.handle], decompressed_page, BLCKSZ), BLCKSZ); + } + break; case FIO_READ: /* Read from the current position in file */ if ((size_t)hdr.arg > buf_size) { buf_size = hdr.arg; diff --git a/src/utils/file.h b/src/utils/file.h index bb610101..1d19ec00 100644 --- a/src/utils/file.h +++ b/src/utils/file.h @@ -33,7 +33,8 @@ typedef enum FIO_READDIR, FIO_CLOSEDIR, FIO_SEND_PAGES, - FIO_PAGE + FIO_PAGE, + FIO_WRITE_COMPRESSED, } fio_operations; typedef enum @@ -69,6 +70,7 @@ extern void fio_communicate(int in, int out); extern FILE* fio_fopen(char const* name, char const* mode, fio_location location); extern size_t fio_fwrite(FILE* f, void const* buf, size_t size); +extern size_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char** errmsg); extern ssize_t fio_fread(FILE* f, void* buf, size_t size); extern int fio_pread(FILE* f, void* buf, off_t offs); extern int fio_fprintf(FILE* f, char const* arg, ...) pg_attribute_printf(2, 3);