From 9b36081659e96939b9cb09302e724bb7518b052a Mon Sep 17 00:00:00 2001 From: Grigory Smolkin Date: Wed, 17 Jun 2020 13:39:42 +0300 Subject: [PATCH] [Issue #228] various fixes and improvements --- src/backup.c | 28 +- src/catalog.c | 5 + src/data.c | 722 +++++++++++++++++++++++++++++---------------- src/dir.c | 6 +- src/merge.c | 26 +- src/pg_probackup.h | 22 +- src/utils/file.c | 12 +- src/validate.c | 2 +- 8 files changed, 548 insertions(+), 275 deletions(-) diff --git a/src/backup.c b/src/backup.c index 00896637..57bde384 100644 --- a/src/backup.c +++ b/src/backup.c @@ -664,8 +664,8 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool for (i = 0; i < parray_num(backup_files_list); i++) { - char to_fullpath[MAXPGPATH]; - pgFile *file = (pgFile *) parray_get(backup_files_list, i); + char to_fullpath[MAXPGPATH]; + pgFile *file = (pgFile *) parray_get(backup_files_list, i); /* TODO: sync directory ? */ if (S_ISDIR(file->mode)) @@ -687,7 +687,21 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool } if (fio_sync(to_fullpath, FIO_BACKUP_HOST) != 0) - elog(ERROR, "Failed to sync file \"%s\": %s", to_fullpath, strerror(errno)); + elog(ERROR, "Cannot sync file \"%s\": %s", to_fullpath, strerror(errno)); + + /* fsync header file */ + if (file->external_dir_num == 0 && + file->is_datafile && !file->is_cfs && + file->n_headers > 0) + { + char to_fullpath_hdr[MAXPGPATH]; + + snprintf(to_fullpath_hdr, MAXPGPATH, "%s_hdr", to_fullpath); + + if (fio_sync(to_fullpath, FIO_BACKUP_HOST) != 0) + elog(ERROR, "Cannot sync file \"%s\": %s", to_fullpath_hdr, strerror(errno)); + + } } time(&end_time); @@ -2131,7 +2145,7 @@ backup_files(void *arg) /* backup file */ if (file->is_datafile && !file->is_cfs) { - backup_data_file(&(arguments->conn_arg), file, from_fullpath, to_fullpath, + backup_data_file_new(&(arguments->conn_arg), file, from_fullpath, to_fullpath, arguments->prev_start_lsn, current.backup_mode, instance_config.compress_alg, @@ -2147,12 +2161,6 @@ backup_files(void *arg) current.backup_mode, current.parent_backup, true); } - /* No point in storing empty, missing or not changed files */ - if (file->write_size <= 0) - unlink(to_fullpath); -// elog(ERROR, "Cannot remove file \"%s\": %s", to_fullpath, -// strerror(errno)); - if (file->write_size == FILE_NOT_FOUND) continue; diff --git a/src/catalog.c b/src/catalog.c index 686b21e7..450f1100 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -1906,7 +1906,12 @@ write_backup_filelist(pgBackup *backup, parray *files, const char *root, len += sprintf(line+len, ",\"n_blocks\":\"%i\"", file->n_blocks); if (file->n_headers > 0) + { len += sprintf(line+len, ",\"n_headers\":\"%i\"", file->n_headers); + len += sprintf(line+len, ",\"hdr_crc\":\"%u\"", file->hdr_crc); + +// elog(INFO, "CRC INT: %li, CRC UINT: %u", file->crc_hdr, file->crc_hdr); + } sprintf(line+len, "}\n"); diff --git a/src/data.c b/src/data.c index 9e9f6b4a..c4ddcc98 100644 --- a/src/data.c +++ b/src/data.c @@ -32,6 +32,7 @@ typedef union DataPage } DataPage; static BackupPageHeader2* get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version); +static void write_page_headers(BackupPageHeader2 *headers, pgFile *file, const char* to_fullpath); static bool get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size, BlockNumber *blknum, pg_crc32 *crc, bool use_crc32c); @@ -472,6 +473,7 @@ prepare_page(ConnectionArgs *conn_arg, return PageIsOk; } +/* split this function in two: compress() and backup() */ static int compress_and_backup_page(pgFile *file, BlockNumber blknum, FILE *in, FILE *out, pg_crc32 *crc, @@ -526,7 +528,174 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum, file->write_size += write_buffer_size; file->uncompressed_size += BLCKSZ; - return compressed_size; + return write_buffer_size; +} + +/* + * Backup data file in the from_root directory to the to_root directory with + * same relative path. If prev_backup_start_lsn is not NULL, only pages with + * higher lsn will be copied. + * Not just copy file, but read it block by block (use bitmap in case of + * incremental backup), validate checksum, optionally compress and write to + * backup with special header. + */ +void +backup_data_file_new(ConnectionArgs* conn_arg, pgFile *file, + const char *from_fullpath, const char *to_fullpath, + XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode, + CompressAlg calg, int clevel, uint32 checksum_version, + int ptrack_version_num, const char *ptrack_schema, bool missing_ok) +{ + int rc; + bool use_pagemap; + char *errmsg = NULL; + BlockNumber err_blknum = 0; + /* page headers */ + BackupPageHeader2 *headers = NULL; + + /* sanity */ + if (file->size % BLCKSZ != 0) + elog(WARNING, "File: \"%s\", invalid file size %zu", from_fullpath, file->size); + + /* + * Compute expected number of blocks in the file. + * NOTE This is a normal situation, if the file size has changed + * since the moment we computed it. + */ + file->n_blocks = file->size/BLCKSZ; + + /* + * Skip unchanged file only if it exists in previous backup. + * This way we can correctly handle null-sized files which are + * not tracked by pagemap and thus always marked as unchanged. + */ + if ((backup_mode == BACKUP_MODE_DIFF_PAGE || + backup_mode == BACKUP_MODE_DIFF_PTRACK) && + file->pagemap.bitmapsize == PageBitmapIsEmpty && + file->exists_in_prev && !file->pagemap_isabsent) + { + /* + * There are no changed blocks since last backup. We want to make + * incremental backup, so we should exit. + */ + file->write_size = BYTES_INVALID; + return; + } + + /* reset size summary */ + file->read_size = 0; + file->write_size = 0; + file->uncompressed_size = 0; + INIT_FILE_CRC32(true, file->crc); + + /* + * Read each page, verify checksum and write it to backup. + * If page map is empty or file is not present in previous backup + * backup all pages of the relation. + * + * In PTRACK 1.x there was a problem + * of data files with missing _ptrack map. + * Such files should be fully copied. + */ + + if (file->pagemap.bitmapsize == PageBitmapIsEmpty || + file->pagemap_isabsent || !file->exists_in_prev || + !file->pagemap.bitmap) + use_pagemap = false; + else + use_pagemap = true; + + /* Remote mode */ + if (fio_is_remote(FIO_DB_HOST)) + { + + rc = fio_send_pages(to_fullpath, from_fullpath, file, + /* send prev backup START_LSN */ + backup_mode == BACKUP_MODE_DIFF_DELTA && + file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, + calg, clevel, checksum_version, + /* send pagemap if any */ + use_pagemap ? &file->pagemap : NULL, + /* variables for error reporting */ + &err_blknum, &errmsg, &headers); + } + else + { + /* TODO: stop handling errors internally */ + rc = send_pages(conn_arg, to_fullpath, from_fullpath, file, + /* send prev backup START_LSN */ + backup_mode == BACKUP_MODE_DIFF_DELTA && + file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, + calg, clevel, checksum_version, + /* send pagemap if any */ + use_pagemap ? &file->pagemap : NULL, + &headers, backup_mode, ptrack_version_num, ptrack_schema); + } + + /* check for errors */ + if (rc == FILE_MISSING) + { + elog(LOG, "File \"%s\" is not found", from_fullpath); + file->write_size = FILE_NOT_FOUND; + goto cleanup; + } + + else if (rc == WRITE_FAILED) + elog(ERROR, "Cannot write block %u of \"%s\": %s", + err_blknum, to_fullpath, strerror(errno)); + + else if (rc == PAGE_CORRUPTION) + { + if (errmsg) + elog(ERROR, "Corruption detected in file \"%s\", block %u: %s", + from_fullpath, err_blknum, errmsg); + else + elog(ERROR, "Corruption detected in file \"%s\", block %u", + from_fullpath, err_blknum); + } + /* OPEN_FAILED and READ_FAILED */ + else if (rc == OPEN_FAILED) + { + if (errmsg) + elog(ERROR, "%s", errmsg); + else + elog(ERROR, "Cannot open file \"%s\"", from_fullpath); + } + else if (rc == READ_FAILED) + { + if (errmsg) + elog(ERROR, "%s", errmsg); + else + elog(ERROR, "Cannot read file \"%s\"", from_fullpath); + } + + file->read_size = rc * BLCKSZ; + + /* refresh n_blocks for FULL and DELTA */ + if (backup_mode == BACKUP_MODE_FULL || + backup_mode == BACKUP_MODE_DIFF_DELTA) + file->n_blocks = file->read_size / BLCKSZ; + + /* Determine that file didn`t changed in case of incremental backup */ + if (backup_mode != BACKUP_MODE_FULL && + file->exists_in_prev && + file->write_size == 0 && + file->n_blocks > 0) + { + file->write_size = BYTES_INVALID; + } + +cleanup: + + /* finish CRC calculation */ + FIN_FILE_CRC32(true, file->crc); + + /* dump page headers */ + write_page_headers(headers, file, to_fullpath); + + pg_free(errmsg); + pg_free(file->pagemap.bitmap); + pg_free(headers); } /* @@ -600,17 +769,6 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, file->uncompressed_size = 0; INIT_FILE_CRC32(true, file->crc); - /* open backup file for write */ - out = fopen(to_fullpath, PG_BINARY_W); - if (out == NULL) - elog(ERROR, "Cannot open backup file \"%s\": %s", - to_fullpath, strerror(errno)); - - /* update file permission */ - if (chmod(to_fullpath, FILE_PERMISSION) == -1) - elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, - strerror(errno)); - /* * Read each page, verify checksum and write it to backup. * If page map is empty or file is not present in previous backup @@ -628,17 +786,13 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, else use_pagemap = true; - /* enable stdio buffering for output file */ - out_buf = pgut_malloc(STDIO_BUFSIZE); - setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE); - /* Remote mode */ if (fio_is_remote(FIO_DB_HOST)) { char *errmsg = NULL; BlockNumber err_blknum = 0; - int rc = fio_send_pages(out, from_fullpath, file, + int rc = fio_send_pages(to_fullpath, from_fullpath, file, /* send prev backup START_LSN */ backup_mode == BACKUP_MODE_DIFF_DELTA && file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, @@ -692,6 +846,7 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, /* Local mode */ else { + uint cur_pos_out = 0; /* open source file for read */ in = fopen(from_fullpath, PG_BINARY_R); if (in == NULL) @@ -753,6 +908,11 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, else if (rc == PageIsOk) { + + /* open local backup file for write */ + if (!out) + out = open_local_file_rw(to_fullpath, &out_buf, STDIO_BUFSIZE); + hdr_num++; if (!headers) @@ -763,16 +923,15 @@ backup_data_file(ConnectionArgs* conn_arg, pgFile *file, headers[hdr_num].block = blknum; headers[hdr_num].lsn = page_st.lsn; headers[hdr_num].checksum = page_st.checksum; - headers[hdr_num].pos = ftell(out); /* optimize */ - -// elog(INFO, "CRC: %u", headers[hdr_num].checksum); -// elog(INFO, "POS: %u", headers[hdr_num].pos); + headers[hdr_num].pos = cur_pos_out; /* optimize */ headers[hdr_num].compressed_size = compress_and_backup_page(file, blknum, in, out, &(file->crc), rc, curr_page, calg, clevel, from_fullpath, to_fullpath); file->n_headers = hdr_num +1; + + cur_pos_out += headers[hdr_num].compressed_size; } /* TODO: handle PageIsCorrupted, currently it is done in prepare_page */ else @@ -846,8 +1005,6 @@ cleanup: elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, strerror(errno)); - /* TODO: calculate checksums */ - /* TODO: save file to backup_content metainformation */ hdr_size = file->n_headers * sizeof(BackupPageHeader2); // elog(INFO, "Size: %lu, aligh: %lu", hdr_size, MAXALIGN(hdr_size)); @@ -996,8 +1153,12 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out, /* set stdio buffering for input data file */ setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); + /* get headers for this file */ headers = get_data_file_headers(from_fullpath, tmp_file, parse_program_version(backup->program_version)); + if (!headers && tmp_file->n_headers > 0) + elog(ERROR, "Failed to get headers for file \"%s\"", from_fullpath); + /* * Restore the file. * Datafiles are backed up block by block and every block @@ -1136,7 +1297,7 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers break; if (compressed_size > BLCKSZ) - elog(ERROR, "Size of a blknum %i exceed BLCKSZ", blknum); + elog(ERROR, "Size of a blknum %i exceed BLCKSZ: %i", blknum, compressed_size); /* incremental restore in LSN mode */ if (map && lsn_map && datapagemap_is_set(lsn_map, blknum)) @@ -1171,7 +1332,6 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers if (headers && cur_pos_in != headers[n_hdr].pos) { - elog(INFO, "Seek to %u", headers[n_hdr].pos); if (fseek(in, headers[n_hdr].pos, SEEK_SET) != 0) elog(ERROR, "Cannot seek to offset %u of '%s': %s", headers[n_hdr].pos, from_fullpath, strerror(errno)); @@ -1179,10 +1339,6 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers cur_pos_in = headers[n_hdr].pos; } -// elog(INFO, "BKLKUM: %u", blknum); - -// elog(INFO, "Cur_pos: %u", ftell(in)); - /* read a page from file */ read_len = fread(page.data, 1, MAXALIGN(compressed_size), in); @@ -1712,7 +1868,7 @@ check_data_file(ConnectionArgs *arguments, pgFile *file, /* Valiate pages of datafile in backup one by one */ bool -check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, +validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, uint32 checksum_version, uint32 backup_version) { size_t read_len = 0; @@ -1726,21 +1882,21 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, elog(VERBOSE, "Validate relation blocks for file \"%s\"", fullpath); + /* nothing to validate */ + if (backup_version >= 20400 && + file->n_headers <= 0) + return true; + in = fopen(fullpath, PG_BINARY_R); if (in == NULL) - { - if (errno == ENOENT) - { - elog(WARNING, "File \"%s\" is not found", fullpath); - return false; - } - elog(ERROR, "Cannot open file \"%s\": %s", fullpath, strerror(errno)); - } headers = get_data_file_headers(fullpath, file, backup_version); + if (!headers && file->n_headers > 0) + elog(ERROR, "Failed to get headers for file \"%s\"", fullpath); + /* calc CRC of backup file */ INIT_FILE_CRC32(use_crc32c, crc); @@ -1767,6 +1923,8 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, blknum = headers[n_hdr].block; compressed_size = headers[n_hdr].compressed_size; +// elog(INFO, "POS: %u", headers[n_hdr].pos); + if (cur_pos != headers[n_hdr].pos && fio_fseek(in, headers[n_hdr].pos) < 0) { @@ -1774,14 +1932,25 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, blknum, fullpath, strerror(errno)); } } - else if (!get_compressed_page_meta(in, fullpath, &compressed_size, - &blknum, &crc, use_crc32c)) + else { + if (!get_compressed_page_meta(in, fullpath, &compressed_size, + &blknum, &crc, use_crc32c)) break; } - read_len = fread(compressed_page.data, 1, - MAXALIGN(compressed_size), in); + /* backward compatibility kludge TODO: remove in 3.0 */ + if (compressed_size == PageIsTruncated) + { + elog(LOG, "Block %u of \"%s\" is truncated", + blknum, fullpath); + continue; + } + + Assert(compressed_size <= BLCKSZ); + Assert(compressed_size > 0); + + read_len = fread(compressed_page.data, 1, MAXALIGN(compressed_size), in); if (read_len != MAXALIGN(compressed_size)) { elog(WARNING, "Cannot read block %u of \"%s\" read %zu of %d", @@ -1789,7 +1958,12 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, return false; } - cur_pos += MAXALIGN(compressed_size); +// elog(INFO, "POS1: %lu", cur_pos); + + cur_pos += read_len; + +// elog(INFO, "POS2: %lu", cur_pos); +// elog(INFO, "Compressed size: %i", compressed_size); COMP_FILE_CRC32(use_crc32c, crc, compressed_page.data, read_len); @@ -1806,8 +1980,11 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, file->compress_alg, &errormsg); if (uncompressed_size < 0 && errormsg != NULL) + { elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s", blknum, fullpath, errormsg); + return false; + } if (uncompressed_size != BLCKSZ) { @@ -1816,8 +1993,8 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, is_valid = false; continue; } - elog(WARNING, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ", - fullpath, uncompressed_size); + elog(WARNING, "Page %u of file \"%s\" uncompressed to %d bytes. != BLCKSZ", + blknum, fullpath, uncompressed_size); return false; } @@ -1872,185 +2049,9 @@ check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, return is_valid; } -/* Valiate pages of datafile in backup one by one */ -bool -check_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, - uint32 checksum_version, uint32 backup_version) -{ - size_t read_len = 0; - bool is_valid = true; - FILE *in; - pg_crc32 crc; - bool use_crc32c = backup_version <= 20021 || backup_version >= 20025; - - elog(VERBOSE, "Validate relation blocks for file \"%s\"", fullpath); - - in = fopen(fullpath, PG_BINARY_R); - if (in == NULL) - { - if (errno == ENOENT) - { - elog(WARNING, "File \"%s\" is not found", fullpath); - return false; - } - - elog(ERROR, "Cannot open file \"%s\": %s", - fullpath, strerror(errno)); - } - - /* calc CRC of backup file */ - INIT_FILE_CRC32(use_crc32c, crc); - - /* read and validate pages one by one */ - while (true) - { - int rc = 0; - DataPage compressed_page; /* used as read buffer */ - DataPage page; - BackupPageHeader header; - BlockNumber blknum = 0; - XLogRecPtr page_lsn = 0; - - if (interrupted || thread_interrupted) - elog(ERROR, "Interrupted during data file validation"); - - /* read BackupPageHeader */ - read_len = fread(&header, 1, sizeof(header), in); - - if (ferror(in)) - elog(ERROR, "Cannot read header of block %u of \"%s\": %s", - blknum, fullpath, strerror(errno)); - - if (read_len != sizeof(header)) - { - if (read_len == 0 && feof(in)) - break; /* EOF found */ - else if (read_len != 0 && feof(in)) - elog(WARNING, - "Odd size page found at block %u of \"%s\"", - blknum, fullpath); - else - elog(WARNING, "Cannot read header of block %u of \"%s\": %s", - blknum, fullpath, strerror(errno)); - return false; - } - - COMP_FILE_CRC32(use_crc32c, crc, &header, read_len); - - if (header.block == 0 && header.compressed_size == 0) - { - elog(VERBOSE, "Skip empty block of \"%s\"", fullpath); - continue; - } - - if (header.block < blknum) - { - elog(WARNING, "Backup is broken at block %u of \"%s\"", - blknum, fullpath); - return false; - } - - blknum = header.block; - - if (header.compressed_size == PageIsTruncated) - { - elog(LOG, "Block %u of \"%s\" is truncated", - blknum, fullpath); - continue; - } - - Assert(header.compressed_size <= BLCKSZ); - - read_len = fread(compressed_page.data, 1, - MAXALIGN(header.compressed_size), in); - if (read_len != MAXALIGN(header.compressed_size)) - { - elog(WARNING, "Cannot read block %u of \"%s\" read %zu of %d", - blknum, fullpath, read_len, header.compressed_size); - return false; - } - - COMP_FILE_CRC32(use_crc32c, crc, compressed_page.data, read_len); - - if (header.compressed_size != BLCKSZ - || page_may_be_compressed(compressed_page.data, file->compress_alg, - backup_version)) - { - int32 uncompressed_size = 0; - const char *errormsg = NULL; - - uncompressed_size = do_decompress(page.data, BLCKSZ, - compressed_page.data, - header.compressed_size, - file->compress_alg, - &errormsg); - if (uncompressed_size < 0 && errormsg != NULL) - elog(WARNING, "An error occured during decompressing block %u of file \"%s\": %s", - blknum, fullpath, errormsg); - - if (uncompressed_size != BLCKSZ) - { - if (header.compressed_size == BLCKSZ) - { - is_valid = false; - continue; - } - elog(WARNING, "Page of file \"%s\" uncompressed to %d bytes. != BLCKSZ", - fullpath, uncompressed_size); - return false; - } - -// rc = validate_one_page(page.data, -// file->segno * RELSEG_SIZE + blknum, -// stop_lsn, &page_lsn, NULL, checksum_version); - } - else -// rc = validate_one_page(compressed_page.data, -// file->segno * RELSEG_SIZE + blknum, -// stop_lsn, &page_lsn, NULL, checksum_version); - - switch (rc) - { - case PAGE_IS_NOT_FOUND: - elog(LOG, "File \"%s\", block %u, page is NULL", file->rel_path, blknum); - break; - case PAGE_IS_ZEROED: - elog(LOG, "File: %s blknum %u, empty zeroed page", file->rel_path, blknum); - break; - case PAGE_HEADER_IS_INVALID: - elog(WARNING, "Page header is looking insane: %s, block %i", file->rel_path, blknum); - is_valid = false; - break; - case PAGE_CHECKSUM_MISMATCH: - elog(WARNING, "File: %s blknum %u have wrong checksum", file->rel_path, blknum); - is_valid = false; - break; - case PAGE_LSN_FROM_FUTURE: - elog(WARNING, "File: %s, block %u, checksum is %s. " - "Page is from future: pageLSN %X/%X stopLSN %X/%X", - file->rel_path, blknum, - checksum_version ? "correct" : "not enabled", - (uint32) (page_lsn >> 32), (uint32) page_lsn, - (uint32) (stop_lsn >> 32), (uint32) stop_lsn); - break; - } - } - - FIN_FILE_CRC32(use_crc32c, crc); - fclose(in); - - if (crc != file->crc) - { - elog(WARNING, "Invalid CRC of backup file \"%s\": %X. Expected %X", - fullpath, crc, file->crc); - is_valid = false; - } - - return is_valid; -} - /* read local data file and construct map with block checksums */ -PageState *get_checksum_map(const char *fullpath, uint32 checksum_version, +PageState* +get_checksum_map(const char *fullpath, uint32 checksum_version, int n_blocks, XLogRecPtr dest_stop_lsn, BlockNumber segmentno) { PageState *checksum_map = NULL; @@ -2186,39 +2187,6 @@ get_lsn_map(const char *fullpath, uint32 checksum_version, return lsn_map; } -/* attempt to open header file, read content and return as - * array of headers. - */ -BackupPageHeader2* -get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version) -{ - int len; - FILE *in = NULL; - char fullpath_hdr[MAXPGPATH]; - BackupPageHeader2 *headers = NULL; - - if (backup_version < 20400) - return NULL; - - snprintf(fullpath_hdr, MAXPGPATH, "%s_hdr", fullpath); - - in = fopen(fullpath_hdr, PG_BINARY_R); - - if (!in) - elog(ERROR, "Cannot open header file \"%s\": %s", fullpath_hdr, strerror(errno)); - - len = file->n_headers * sizeof(BackupPageHeader2); - headers = pgut_malloc(len); - - if (fread(headers, 1, len, in) != len) - elog(ERROR, "Cannot read header file \"%s\": %s", fullpath_hdr, strerror(errno)); - - if (fclose(in)) - elog(ERROR, "Cannot close header file \"%s\": %s", fullpath_hdr, strerror(errno)); - - return headers; -} - /* */ bool get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size, @@ -2256,8 +2224,248 @@ get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size, *blknum = header.block; *compressed_size = header.compressed_size; - Assert(*compressed_size <= BLCKSZ); + elog(INFO, "blknum: %i", header.block); + elog(INFO, "size: %i", header.compressed_size); + elog(INFO, "size2: %i", *compressed_size); + + elog(INFO, "BLKNUM: %i", *blknum); + elog(INFO, "File: %s", fullpath); + + Assert(*compressed_size != 0); return true; } +/* Open local backup file for writing, set permissions and buffering */ +FILE* +open_local_file_rw(const char *to_fullpath, char **out_buf, uint32 buf_size) +{ + FILE *out = NULL; + /* open backup file for write */ + out = fopen(to_fullpath, PG_BINARY_W); + if (out == NULL) + elog(ERROR, "Cannot open backup file \"%s\": %s", + to_fullpath, strerror(errno)); + + /* update file permission */ + if (chmod(to_fullpath, FILE_PERMISSION) == -1) + elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, + strerror(errno)); + + /* enable stdio buffering for output file */ + *out_buf = pgut_malloc(buf_size); + setvbuf(out, *out_buf, _IOFBF, buf_size); + + return out; +} + +int +send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_fullpath, + pgFile *file, XLogRecPtr prev_backup_start_lsn, CompressAlg calg, int clevel, + uint32 checksum_version, bool use_pagemap, BackupPageHeader2 **headers, + BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema) +{ + FILE *in = NULL; + FILE *out = NULL; + int hdr_num = -1; + uint cur_pos_out = 0; + char curr_page[BLCKSZ]; + int n_blocks_read = 0; + BlockNumber blknum = 0; + datapagemap_iterator_t *iter = NULL; + + /* stdio buffers */ + char *in_buf = NULL; + char *out_buf = NULL; + + /* open source file for read */ + in = fopen(from_fullpath, PG_BINARY_R); + if (in == NULL) + { + /* + * If file is not found, this is not en error. + * It could have been deleted by concurrent postgres transaction. + */ + if (errno == ENOENT) + return FILE_MISSING; + + elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath, strerror(errno)); + } + + /* + * Enable stdio buffering for local input file, + * unless the pagemap is involved, which + * imply a lot of random access. + */ + + if (use_pagemap) + { + iter = datapagemap_iterate(&file->pagemap); + datapagemap_next(iter, &blknum); /* set first block */ + + setvbuf(in, NULL, _IONBF, BUFSIZ); + } + else + { + in_buf = pgut_malloc(STDIO_BUFSIZE); + setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); + } + + while (blknum < file->n_blocks) + { + PageState page_st; + int rc = prepare_page(conn_arg, file, prev_backup_start_lsn, + blknum, in, backup_mode, curr_page, + true, checksum_version, + ptrack_version_num, ptrack_schema, + from_fullpath, &page_st); + if (rc == PageIsTruncated) + break; + + else if (rc == PageIsOk) + { + /* lazily open backup file (useful for s3) */ + if (!out) + out = open_local_file_rw(to_fullpath, &out_buf, STDIO_BUFSIZE); + + hdr_num++; + + if (!*headers) + *headers = (BackupPageHeader2 *) pgut_malloc(sizeof(BackupPageHeader2)); + else + *headers = (BackupPageHeader2 *) pgut_realloc(*headers, (hdr_num+1 ) * sizeof(BackupPageHeader2)); + + (*headers)[hdr_num].block = blknum; + (*headers)[hdr_num].pos = cur_pos_out; + (*headers)[hdr_num].lsn = page_st.lsn; + (*headers)[hdr_num].checksum = page_st.checksum; + + (*headers)[hdr_num].compressed_size = compress_and_backup_page(file, blknum, in, out, &(file->crc), + rc, curr_page, calg, clevel, + from_fullpath, to_fullpath); + cur_pos_out += MAXALIGN((*headers)[hdr_num].compressed_size); + } + + n_blocks_read++; + + /* next block */ + if (use_pagemap) + { + /* exit if pagemap is exhausted */ + if (!datapagemap_next(iter, &blknum)) + break; + } + else + blknum++; + } + + file->n_headers = hdr_num +1; + + /* cleanup */ + if (in && fclose(in)) + elog(ERROR, "Cannot close the source file \"%s\": %s", + to_fullpath, strerror(errno)); + + /* close local output file */ + if (out && fclose(out)) + elog(ERROR, "Cannot close the backup file \"%s\": %s", + to_fullpath, strerror(errno)); + + pg_free(iter); + pg_free(in_buf); + pg_free(out_buf); + + return n_blocks_read; +} + +/* attempt to open header file, read content and return as + * array of headers. + */ +BackupPageHeader2* +get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version) +{ + int len; + FILE *in = NULL; + pg_crc32 hdr_crc; + char fullpath_hdr[MAXPGPATH]; + BackupPageHeader2 *headers = NULL; + +// elog(INFO, "Backup Version: %u", backup_version); + + if (backup_version < 20400) + { + elog(INFO, "HELLO1"); + return NULL; + } + + if (file->n_headers <= 0) + { + elog(INFO, "HELLO2"); + return NULL; + } + + snprintf(fullpath_hdr, MAXPGPATH, "%s_hdr", fullpath); + + in = fopen(fullpath_hdr, PG_BINARY_R); + + if (!in) + elog(ERROR, "Cannot open header file \"%s\": %s", fullpath_hdr, strerror(errno)); + + len = file->n_headers * sizeof(BackupPageHeader2); + headers = pgut_malloc(len); + + if (fread(headers, 1, len, in) != len) + elog(ERROR, "Cannot read header file \"%s\": %s", fullpath_hdr, strerror(errno)); + + /* validate checksum */ + INIT_FILE_CRC32(true, hdr_crc); + COMP_FILE_CRC32(true, hdr_crc, headers, len); + FIN_FILE_CRC32(true, hdr_crc); + + if (hdr_crc != file->hdr_crc) + { + elog(ERROR, "Header file crc mismatch \"%s\", current: %u, expected: %u", + fullpath_hdr, hdr_crc, file->hdr_crc); + } + + if (fclose(in)) + elog(ERROR, "Cannot close header file \"%s\": %s", fullpath_hdr, strerror(errno)); + + return headers; +} + +void +write_page_headers(BackupPageHeader2 *headers, pgFile *file, const char* to_fullpath) +{ + FILE *out = NULL; + size_t hdr_size = 0; + char to_fullpath_hdr[MAXPGPATH]; + + if (file->n_headers <= 0) + return; + + snprintf(to_fullpath_hdr, MAXPGPATH, "%s_hdr", to_fullpath); + + out = fopen(to_fullpath_hdr, PG_BINARY_W); + if (out == NULL) + elog(ERROR, "Cannot open header file \"%s\": %s", + to_fullpath, strerror(errno)); + + /* update file permission */ + if (chmod(to_fullpath_hdr, FILE_PERMISSION) == -1) + elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, + strerror(errno)); + + hdr_size = file->n_headers * sizeof(BackupPageHeader2); + + /* calculate checksums */ + INIT_FILE_CRC32(true, file->hdr_crc); + COMP_FILE_CRC32(true, file->hdr_crc, headers, hdr_size); + FIN_FILE_CRC32(true, file->hdr_crc); + + if (fwrite(headers, 1, hdr_size, out) != hdr_size) + elog(ERROR, "Cannot write to file \"%s\": %s", to_fullpath_hdr, strerror(errno)); + + if (fclose(out)) + elog(ERROR, "Cannot close file \"%s\": %s", to_fullpath_hdr, strerror(errno)); +} diff --git a/src/dir.c b/src/dir.c index 7b367a34..a7e93fa1 100644 --- a/src/dir.c +++ b/src/dir.c @@ -1550,7 +1550,8 @@ dir_read_file_list(const char *root, const char *external_prefix, segno, n_blocks, n_headers, - dbOid; /* used for partial restore */ + dbOid, /* used for partial restore */ + hdr_crc; pgFile *file; COMP_FILE_CRC32(true, content_crc, buf, strlen(buf)); @@ -1594,6 +1595,9 @@ dir_read_file_list(const char *root, const char *external_prefix, if (get_control_value(buf, "n_headers", NULL, &n_headers, false)) file->n_headers = (int) n_headers; + if (get_control_value(buf, "hdr_crc", NULL, &hdr_crc, false)) + file->hdr_crc = (pg_crc32) hdr_crc; + parray_append(files, file); } diff --git a/src/merge.c b/src/merge.c index fb2f3918..c0709e3e 100644 --- a/src/merge.c +++ b/src/merge.c @@ -1020,6 +1020,9 @@ merge_files(void *arg) tmp_file->n_blocks = file->n_blocks; tmp_file->compress_alg = file->compress_alg; tmp_file->uncompressed_size = file->n_blocks * BLCKSZ; + + tmp_file->n_headers = file->n_headers; + tmp_file->hdr_crc = file->hdr_crc; } else tmp_file->uncompressed_size = tmp_file->write_size; @@ -1140,8 +1143,11 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup, FILE *out = NULL; char *buffer = pgut_malloc(STDIO_BUFSIZE); char to_fullpath[MAXPGPATH]; + char to_fullpath_hdr[MAXPGPATH]; char to_fullpath_tmp1[MAXPGPATH]; /* used for restore */ char to_fullpath_tmp2[MAXPGPATH]; /* used for backup */ + char to_fullpath_tmp2_hdr[MAXPGPATH]; + /* The next possible optimization is copying "as is" the file * from intermediate incremental backup, that didn`t changed in @@ -1152,6 +1158,9 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup, join_path_components(to_fullpath, full_database_dir, tmp_file->rel_path); snprintf(to_fullpath_tmp1, MAXPGPATH, "%s_tmp1", to_fullpath); snprintf(to_fullpath_tmp2, MAXPGPATH, "%s_tmp2", to_fullpath); + /* header files */ + snprintf(to_fullpath_hdr, MAXPGPATH, "%s_hdr", to_fullpath); + snprintf(to_fullpath_tmp2_hdr, MAXPGPATH, "%s_hdr", to_fullpath_tmp2); /* open temp file */ out = fopen(to_fullpath_tmp1, PG_BINARY_W); @@ -1177,7 +1186,7 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup, * 2 backups of old versions, were n_blocks is missing. */ - backup_data_file(NULL, tmp_file, to_fullpath_tmp1, to_fullpath_tmp2, + backup_data_file_new(NULL, tmp_file, to_fullpath_tmp1, to_fullpath_tmp2, InvalidXLogRecPtr, BACKUP_MODE_FULL, dest_backup->compress_alg, dest_backup->compress_level, dest_backup->checksum_version, 0, NULL, false); @@ -1207,11 +1216,26 @@ merge_data_file(parray *parent_chain, pgBackup *full_backup, elog(ERROR, "Cannot sync merge temp file \"%s\": %s", to_fullpath_tmp2, strerror(errno)); + /* sync header file */ + if (fio_sync(to_fullpath_tmp2, FIO_BACKUP_HOST) != 0) + elog(ERROR, "Cannot sync temp header file \"%s\": %s", + to_fullpath_tmp2_hdr, strerror(errno)); + +//<- CRITICAL SECTION + /* Do atomic rename from second temp file to destination file */ if (rename(to_fullpath_tmp2, to_fullpath) == -1) elog(ERROR, "Could not rename file \"%s\" to \"%s\": %s", to_fullpath_tmp2, to_fullpath, strerror(errno)); +//<- If we crash here, merge cannot be continued. + + /* Do atomic rename from header file */ + if (rename(to_fullpath_tmp2_hdr, to_fullpath_hdr) == -1) + elog(ERROR, "Could not rename file \"%s\" to \"%s\": %s", + to_fullpath_tmp2, to_fullpath, strerror(errno)); +//<- + /* drop temp file */ unlink(to_fullpath_tmp1); } diff --git a/src/pg_probackup.h b/src/pg_probackup.h index fcaf3957..3f3e90a3 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -192,6 +192,7 @@ typedef struct pgFile */ /* we need int64 here to store '-1' value */ pg_crc32 crc; /* CRC value of the file, regular file only */ + pg_crc32 hdr_crc; /* CRC value of header file: name_hdr */ char *rel_path; /* relative path of the file */ char *linked; /* path of the linked file */ bool is_datafile; /* true if the file is PostgreSQL data file */ @@ -585,7 +586,7 @@ typedef struct BackupPageHeader typedef struct BackupPageHeader2 { int32 block; /* block number */ - int32 pos; + int32 pos; /* position in backup file */ int32 compressed_size; XLogRecPtr lsn; uint16 checksum; @@ -963,6 +964,11 @@ extern void backup_data_file(ConnectionArgs* conn_arg, pgFile *file, XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode, CompressAlg calg, int clevel, uint32 checksum_version, int ptrack_version_num, const char *ptrack_schema, bool missing_ok); +extern void backup_data_file_new(ConnectionArgs* conn_arg, pgFile *file, + const char *from_fullpath, const char *to_fullpath, + XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode, + CompressAlg calg, int clevel, uint32 checksum_version, + int ptrack_version_num, const char *ptrack_schema, bool missing_ok); extern void backup_non_data_file(pgFile *file, pgFile *prev_file, const char *from_fullpath, const char *to_fullpath, BackupMode backup_mode, time_t parent_backup_time, @@ -993,8 +999,8 @@ extern datapagemap_t *get_lsn_map(const char *fullpath, uint32 checksum_version, int n_blocks, XLogRecPtr shift_lsn, BlockNumber segmentno); extern pid_t check_postmaster(const char *pgdata); -extern bool check_file_pages_new(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, - uint32 checksum_version, uint32 backup_version); +extern bool validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn, + uint32 checksum_version, uint32 backup_version); /* parsexlog.c */ extern bool extractPageMap(const char *archivedir, uint32 wal_seg_size, XLogRecPtr startpoint, TimeLineID start_tli, @@ -1076,8 +1082,16 @@ extern XLogRecPtr get_last_ptrack_lsn(PGconn *backup_conn, PGNodeInfo *nodeInfo) extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema, int ptrack_version_num, XLogRecPtr lsn); +/* open local file to writing */ +extern FILE* open_local_file_rw(const char *to_fullpath, char **out_buf, uint32 buf_size); + +extern int send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_fullpath, + pgFile *file, XLogRecPtr prev_backup_start_lsn, CompressAlg calg, int clevel, + uint32 checksum_version, bool use_pagemap, BackupPageHeader2 **headers, + BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema); + /* FIO */ -extern int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn, +extern int fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version, datapagemap_t *pagemap, BlockNumber* err_blknum, char **errormsg, BackupPageHeader2 **headers); /* return codes for fio_send_pages */ diff --git a/src/utils/file.c b/src/utils/file.c index 70191b6c..75fbca21 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -1405,11 +1405,13 @@ static void fio_load_file(int out, char const* path) * In case of DELTA mode horizonLsn must be a valid lsn, * otherwise it should be set to InvalidXLogRecPtr. */ -int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn, +int fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version, datapagemap_t *pagemap, BlockNumber* err_blknum, char **errormsg, BackupPageHeader2 **headers) { + FILE *out = NULL; + char *out_buf = NULL; struct { fio_header hdr; fio_send_request arg; @@ -1532,6 +1534,10 @@ int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPt COMP_FILE_CRC32(true, file->crc, buf, hdr.size); + /* lazily open backup file */ + if (!out) + out = open_local_file_rw(to_fullpath, &out_buf, STDIO_BUFSIZE); + if (fio_fwrite(out, buf, hdr.size) != hdr.size) { fio_fclose(out); @@ -1545,6 +1551,10 @@ int fio_send_pages(FILE* out, const char *from_fullpath, pgFile *file, XLogRecPt elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop); } + if (out) + fclose(out); + pg_free(out_buf); + return n_blocks_read; } diff --git a/src/validate.c b/src/validate.c index 0579df14..d7c6ff6a 100644 --- a/src/validate.c +++ b/src/validate.c @@ -363,7 +363,7 @@ pgBackupValidateFiles(void *arg) * check page headers, checksums (if enabled) * and compute checksum of the file */ - if (!check_file_pages_new(file, file_fullpath, arguments->stop_lsn, + if (!validate_file_pages(file, file_fullpath, arguments->stop_lsn, arguments->checksum_version, arguments->backup_version)) arguments->corrupted = true;