diff --git a/configure.c b/configure.c index b02e6db7..6926b0bc 100644 --- a/configure.c +++ b/configure.c @@ -10,6 +10,7 @@ #include "pg_probackup.h" static void opt_log_level(pgut_option *opt, const char *arg); +static void opt_compress_alg(pgut_option *opt, const char *arg); static pgBackupConfig *cur_config = NULL; @@ -47,6 +48,11 @@ do_configure(bool show_only) if (retention_window) config->retention_window = retention_window; + if (compress_alg != NOT_DEFINED_COMPRESS) + config->compress_alg = compress_alg; + if (compress_level != -1) + config->compress_level = compress_level; + if (show_only) writeBackupCatalogConfig(stderr, config); else @@ -74,6 +80,9 @@ pgBackupConfigInit(pgBackupConfig *config) config->retention_redundancy = 0; config->retention_window = 0; + + config->compress_alg = NOT_DEFINED_COMPRESS; + config->compress_level = -1; } void @@ -113,6 +122,15 @@ writeBackupCatalogConfig(FILE *out, pgBackupConfig *config) if (config->retention_window) fprintf(out, "retention-window = %u\n", config->retention_window); + fprintf(out, "#Compression parameters:\n"); + + fprintf(out, "compress-algorithm = %s\n", deparse_compress_alg(config->compress_alg)); + + /* if none value is set, print default */ + if (config->compress_level == -1) + fprintf(out, "compress-level = %u\n", DEFAULT_COMPRESS_LEVEL); + else + fprintf(out, "compress-level = %u\n", config->compress_level); } void @@ -143,6 +161,9 @@ readBackupCatalogConfigFile(void) /* retention options */ { 'u', 0, "retention-redundancy", &(config->retention_redundancy),SOURCE_FILE_STRICT }, { 'u', 0, "retention-window", &(config->retention_window), SOURCE_FILE_STRICT }, + /* compression options */ + { 'f', 36, "compress-algorithm", opt_compress_alg, SOURCE_CMDLINE }, + { 'u', 37, "compress-level", &(config->compress_level), SOURCE_CMDLINE }, /* logging options */ { 'f', 40, "log-level", opt_log_level, SOURCE_CMDLINE }, { 's', 41, "log-filename", &(config->log_filename), SOURCE_CMDLINE }, @@ -177,3 +198,9 @@ opt_log_level(pgut_option *opt, const char *arg) { cur_config->log_level = parse_log_level(arg); } + +static void +opt_compress_alg(pgut_option *opt, const char *arg) +{ + cur_config->compress_alg = parse_compress_alg(arg); +} diff --git a/data.c b/data.c index abe54688..b485c087 100644 --- a/data.c +++ b/data.c @@ -19,10 +19,63 @@ #include "storage/block.h" #include "storage/bufpage.h" #include "storage/checksum_impl.h" +#include +#include + +static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size) +{ + uLongf compressed_size = dst_size; + int rc = compress2(dst, &compressed_size, src, src_size, compress_level); + return rc == Z_OK ? compressed_size : rc; +} + +static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_t src_size) +{ + uLongf dest_len = dst_size; + int rc = uncompress(dst, &dest_len, src, src_size); + return rc == Z_OK ? dest_len : rc; +} + +static size_t +do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg) +{ + switch (alg) + { + case NONE_COMPRESS: + case NOT_DEFINED_COMPRESS: + return -1; + case ZLIB_COMPRESS: + return zlib_compress(dst, dst_size, src, src_size); + case PGLZ_COMPRESS: + return pglz_compress(src, src_size, dst, PGLZ_strategy_always); + } + + return -1; +} + +static size_t +do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, CompressAlg alg) +{ + switch (alg) + { + case NONE_COMPRESS: + case NOT_DEFINED_COMPRESS: + return -1; + case ZLIB_COMPRESS: + return zlib_decompress(dst, dst_size, src, src_size); + case PGLZ_COMPRESS: + return pglz_decompress(src, src_size, dst, dst_size); + } + + return -1; +} + + typedef struct BackupPageHeader { BlockNumber block; /* block number */ + int32 compressed_size; } BackupPageHeader; /* Verify page's header */ @@ -61,8 +114,10 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn, BackupPageHeader header; off_t offset; DataPage page; /* used as read buffer */ - size_t write_buffer_size = sizeof(header) + BLCKSZ; - char write_buffer[write_buffer_size]; + DataPage compressed_page; /* used as read buffer */ + size_t write_buffer_size; + /* maximum size of write buffer */ + char write_buffer[BLCKSZ+sizeof(header)]; size_t read_len = 0; XLogRecPtr page_lsn; int try_checksum = 100; @@ -162,9 +217,31 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn, file->read_size += read_len; - memcpy(write_buffer, &header, sizeof(header)); - /* TODO implement block compression here? */ - memcpy(write_buffer + sizeof(header), page.data, BLCKSZ); + header.compressed_size = do_compress(compressed_page.data, sizeof(compressed_page.data), + page.data, sizeof(page.data), compress_alg); + + file->compress_alg = compress_alg; + + Assert (header.compressed_size <= BLCKSZ); + write_buffer_size = sizeof(header); + + if (header.compressed_size > 0) + { + memcpy(write_buffer, &header, sizeof(header)); + memcpy(write_buffer + sizeof(header), compressed_page.data, header.compressed_size); + write_buffer_size += MAXALIGN(header.compressed_size); + } + else + { + header.compressed_size = BLCKSZ; + memcpy(write_buffer, &header, sizeof(header)); + memcpy(write_buffer + sizeof(header), page.data, BLCKSZ); + write_buffer_size += header.compressed_size; + } + + /* Update CRC */ + COMP_CRC32C(*crc, &write_buffer, write_buffer_size); + /* write data page */ if(fwrite(write_buffer, 1, write_buffer_size, out) != write_buffer_size) { @@ -175,10 +252,6 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn, file->path, blknum, strerror(errno_tmp)); } - /* update CRC */ - COMP_CRC32C(*crc, &header, sizeof(header)); - COMP_CRC32C(*crc, page.data, BLCKSZ); - file->write_size += write_buffer_size; } @@ -468,7 +541,8 @@ restore_data_file(const char *from_root, for (blknum = 0; ; blknum++) { size_t read_len; - DataPage page; /* used as read buffer */ + DataPage compressed_page; /* used as read buffer */ + DataPage page; /* read BackupPageHeader */ read_len = fread(&header, 1, sizeof(header), in); @@ -482,18 +556,35 @@ restore_data_file(const char *from_root, "odd size page found at block %u of \"%s\"", blknum, file->path); else - elog(ERROR, "cannot read block %u of \"%s\": %s", + elog(ERROR, "cannot read header of block %u of \"%s\": %s", blknum, file->path, strerror(errno_tmp)); } if (header.block < blknum) - elog(ERROR, "backup is broken at block %u", - blknum); + elog(ERROR, "backup is broken at block %u", blknum); + /* TODO fix this assert */ + Assert (header.compressed_size <= BLCKSZ); - if (fread(page.data, 1, BLCKSZ, in) != BLCKSZ) - elog(ERROR, "cannot read block %u of \"%s\": %s", - blknum, file->path, strerror(errno)); + read_len = fread(compressed_page.data, 1, + MAXALIGN(header.compressed_size), in); + if (read_len != MAXALIGN(header.compressed_size)) + elog(ERROR, "cannot read block %u of \"%s\" read %lu of %d", + blknum, file->path, read_len, header.compressed_size); + + if (header.compressed_size < BLCKSZ) + { + size_t uncompressed_size = 0; + + uncompressed_size = do_decompress(page.data, BLCKSZ, + compressed_page.data, + header.compressed_size, file->compress_alg); + + if (uncompressed_size != BLCKSZ) + elog(ERROR, "page uncompressed to %ld bytes. != BLCKSZ", uncompressed_size); + } + else + memcpy(page.data, compressed_page.data, BLCKSZ); /* update checksum because we are not save whole */ if(backup->checksum_version) diff --git a/dir.c b/dir.c index 21e65ef1..5ced50f6 100644 --- a/dir.c +++ b/dir.c @@ -156,6 +156,7 @@ pgFileInit(const char *path) strcpy(file->path, path); /* enough buffer size guaranteed */ file->generation = -1; file->is_partial_copy = 0; + file->compress_alg = NOT_DEFINED_COMPRESS; return file; } @@ -671,9 +672,9 @@ print_file_list(FILE *out, const parray *files, const char *root) path = GetRelativePath(path, root); fprintf(out, "{\"path\":\"%s\", \"size\":\"%lu\",\"mode\":\"%u\"," - "\"is_datafile\":\"%u\", \"crc\":\"%u\"", + "\"is_datafile\":\"%u\", \"crc\":\"%u\", \"compress_alg\":\"%s\"", path, (unsigned long) file->write_size, file->mode, - file->is_datafile?1:0, file->crc); + file->is_datafile?1:0, file->crc, deparse_compress_alg(file->compress_alg)); if (file->is_datafile) fprintf(out, ",\"segno\":\"%d\"", file->segno); @@ -847,6 +848,7 @@ dir_read_file_list(const char *root, const char *file_txt) char path[MAXPGPATH]; char filepath[MAXPGPATH]; char linked[MAXPGPATH]; + char compress_alg_string[MAXPGPATH]; uint64 write_size, mode, /* bit length of mode_t depends on platforms */ is_datafile, @@ -867,6 +869,7 @@ dir_read_file_list(const char *root, const char *file_txt) /* optional fields */ get_control_value(buf, "linked", linked, NULL, false); get_control_value(buf, "segno", NULL, &segno, false); + get_control_value(buf, "compress_alg", compress_alg_string, NULL, false); #ifdef PGPRO_EE get_control_value(buf, "CFS_generation", NULL, &generation, true); @@ -883,6 +886,7 @@ dir_read_file_list(const char *root, const char *file_txt) file->mode = (mode_t) mode; file->is_datafile = is_datafile ? true : false; file->crc = (pg_crc32) crc; + file->compress_alg = parse_compress_alg(compress_alg_string); if (linked[0]) file->linked = pgut_strdup(linked); file->segno = (int) segno; diff --git a/help.c b/help.c index 4459198b..a3f4f0b1 100644 --- a/help.c +++ b/help.c @@ -75,12 +75,16 @@ help_pg_probackup(void) printf(_(" [--log-rotation-age=log-rotation-age]\n")); printf(_(" [--retention-redundancy=retention-redundancy]\n")); printf(_(" [--retention-window=retention-window]\n")); + printf(_(" [--compress-algorithm=compress-algorithm]\n")); + printf(_(" [--compress-level=compress-level]\n")); printf(_("\n %s show-config -B backup-dir --instance=instance_name\n"), PROGRAM_NAME); printf(_("\n %s backup -B backup-path -b backup-mode --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [-D pgdata-dir] [-C] [--stream [-S slot-name]] [--backup-pg-log]\n")); printf(_(" [-j num-threads] [--archive-timeout=archive-timeout]\n")); + printf(_(" [--compress-algorithm=compress-algorithm]\n")); + printf(_(" [--compress-level=compress-level]\n")); printf(_(" [--progress] [--delete-expired]\n")); printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n")); @@ -132,6 +136,8 @@ help_backup(void) printf(_(" [-D pgdata-dir] [-C] [--stream [-S slot-name]] [--backup-pg-log]\n")); printf(_(" [-j num-threads] [--archive-timeout=archive-timeout]\n")); printf(_(" [--progress] [--delete-expired]\n")); + printf(_(" [--compress-algorithm=compress-algorithm]\n")); + printf(_(" [--compress-level=compress-level]\n")); printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n\n")); printf(_(" -B, --backup-path=backup-path location of the backup storage area\n")); @@ -148,6 +154,11 @@ help_backup(void) printf(_(" --delete-expired delete backups expired according to current\n")); printf(_(" retention policy after successful backup completion\n")); + printf(_("\n Compression options:\n")); + printf(_(" --compress-algorithm=compress-algorithm\n")); + printf(_(" available options: 'zlib','pglz','none'\n")); + printf(_(" --compress-level=compress-level level of compression [0-9]\n")); + printf(_("\n Connection options:\n")); printf(_(" -d, --dbname=DBNAME database to connect\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); @@ -236,6 +247,8 @@ help_set_config(void) printf(_(" [--log-rotation-age=log-rotation-age]\n")); printf(_(" [--retention-redundancy=retention-redundancy]\n")); printf(_(" [--retention-window=retention-window]\n\n")); + printf(_(" [--compress-algorithm=compress-algorithm]\n")); + printf(_(" [--compress-level=compress-level]\n")); printf(_(" -B, --backup-path=backup-path location of the backup storage area\n")); printf(_(" --instance=instance_name name of the instance\n")); @@ -264,6 +277,11 @@ help_set_config(void) printf(_(" --retention-window=retention-window\n")); printf(_(" number of days of recoverability\n")); + printf(_("\n Compression options:\n")); + printf(_(" --compress-algorithm=compress-algorithm\n")); + printf(_(" available options: 'zlib','pglz','none'\n")); + printf(_(" --compress-level=compress-level\n")); + printf(_(" level of compression [0-9]\n")); } static void diff --git a/pg_probackup.c b/pg_probackup.c index 80ff493b..607dede6 100644 --- a/pg_probackup.c +++ b/pg_probackup.c @@ -63,6 +63,9 @@ uint64 system_identifier = 0; uint32 retention_redundancy = 0; uint32 retention_window = 0; +CompressAlg compress_alg = NOT_DEFINED_COMPRESS; +int compress_level = -1; + /* restore configuration */ static char *target_time; static char *target_xid; @@ -75,6 +78,7 @@ static char *wal_file_name; static void opt_backup_mode(pgut_option *opt, const char *arg); static void opt_log_level(pgut_option *opt, const char *arg); +static void opt_compress_alg(pgut_option *opt, const char *arg); static pgut_option options[] = { @@ -109,6 +113,9 @@ static pgut_option options[] = /* retention options */ { 'u', 34, "retention-redundancy", &retention_redundancy, SOURCE_CMDLINE }, { 'u', 35, "retention-window", &retention_window, SOURCE_CMDLINE }, + /* compression options */ + { 'f', 36, "compress-algorithm", opt_compress_alg, SOURCE_CMDLINE }, + { 'u', 37, "compress-level", &compress_level, SOURCE_CMDLINE }, /* logging options */ { 'f', 40, "log-level", opt_log_level, SOURCE_CMDLINE }, { 's', 41, "log-filename", &log_filename, SOURCE_CMDLINE }, @@ -330,6 +337,10 @@ main(int argc, char *argv[]) if (num_threads < 1) num_threads = 1; + if (compress_level != -1 && + (compress_level < 0 || compress_level > 9)) + elog(ERROR, "--compress-level value must be in the range from 0 to 9"); + /* do actual operation */ switch (backup_subcmd) { @@ -389,3 +400,51 @@ opt_log_level(pgut_option *opt, const char *arg) log_level = parse_log_level(arg); log_level_defined = true; } + +CompressAlg +parse_compress_alg(const char *arg) +{ + size_t len; + + /* Skip all spaces detected */ + while (isspace((unsigned char)*arg)) + arg++; + len = strlen(arg); + + if (len == 0) + elog(ERROR, "compress algrorithm is empty"); + + if (pg_strncasecmp("zlib", arg, len) == 0) + return ZLIB_COMPRESS; + else if (pg_strncasecmp("pglz", arg, len) == 0) + return PGLZ_COMPRESS; + else if (pg_strncasecmp("none", arg, len) == 0) + return NONE_COMPRESS; + else + elog(ERROR, "invalid compress algorithm value \"%s\"", arg); + + return NOT_DEFINED_COMPRESS; +} + +const char* +deparse_compress_alg(int alg) +{ + switch (alg) + { + case NONE_COMPRESS: + case NOT_DEFINED_COMPRESS: + return "none"; + case ZLIB_COMPRESS: + return "zlib"; + case PGLZ_COMPRESS: + return "pglz"; + } + + return NULL; +} + +void +opt_compress_alg(pgut_option *opt, const char *arg) +{ + compress_alg = parse_compress_alg(arg); +} diff --git a/pg_probackup.h b/pg_probackup.h index 03c1ed54..4e862d6f 100644 --- a/pg_probackup.h +++ b/pg_probackup.h @@ -56,6 +56,14 @@ #define XID_FMT "%u" #endif +typedef enum CompressAlg +{ + NOT_DEFINED_COMPRESS = 0, + NONE_COMPRESS, + PGLZ_COMPRESS, + ZLIB_COMPRESS, +} CompressAlg; + /* Information about single file (or dir) in backup */ typedef struct pgFile { @@ -77,6 +85,7 @@ typedef struct pgFile * we cannot backup compressed file partially. */ int is_partial_copy; /* for compressed files. Set to '1' if backed up * via copy_file_partly() */ + CompressAlg compress_alg; /* compression algorithm applied to the file */ volatile uint32 lock; /* lock for synchronization of parallel threads */ datapagemap_t pagemap; /* bitmap of pages updated since previous backup */ } pgFile; @@ -118,6 +127,7 @@ typedef enum ProbackupSubcmd SHOW_CONFIG } ProbackupSubcmd; + /* special values of pgBackup fields */ #define INVALID_BACKUP_ID 0 #define BYTES_INVALID (-1) @@ -140,6 +150,9 @@ typedef struct pgBackupConfig uint32 retention_redundancy; uint32 retention_window; + + CompressAlg compress_alg; + int compress_level; } pgBackupConfig; /* Information about single backup stored in backup.conf */ @@ -261,6 +274,14 @@ extern uint64 system_identifier; extern uint32 retention_redundancy; extern uint32 retention_window; +extern CompressAlg compress_alg; +extern int compress_level; + +#define DEFAULT_COMPRESS_LEVEL 6 + +extern CompressAlg parse_compress_alg(const char *arg); +extern const char* deparse_compress_alg(int alg); + /* in dir.c */ /* exclude directory list for $PGDATA file listing */ extern const char *pgdata_exclude_dir[];