diff --git a/src/archive.c b/src/archive.c index fe42a728..5a1053cd 100644 --- a/src/archive.c +++ b/src/archive.c @@ -12,6 +12,19 @@ #include +static void push_wal_file(const char *from_path, const char *to_path, + bool is_compress, bool overwrite); +static void get_wal_file(const char *from_path, const char *to_path); +#ifdef HAVE_LIBZ +static const char *get_gz_error(gzFile gzf, int errnum); +#endif +static bool fileEqualCRC(const char *path1, const char *path2, + bool path2_is_compressed); +static void copy_file_attributes(const char *from_path, + fio_location from_location, + const char *to_path, fio_location to_location, + bool unlink_on_error); + /* * pg_probackup specific archive command for archive backups * set archive_command = 'pg_probackup archive-push -B /home/anastasia/backup @@ -114,3 +127,436 @@ do_archive_get(char *wal_file_path, char *wal_file_name) return 0; } + +/* ------------- INTERNAL FUNCTIONS ---------- */ +/* + * Copy WAL segment from pgdata to archive catalog with possible compression. + */ +void +push_wal_file(const char *from_path, const char *to_path, bool is_compress, + bool overwrite) +{ + FILE *in = NULL; + int out = -1; + char buf[XLOG_BLCKSZ]; + const char *to_path_p; + char to_path_temp[MAXPGPATH]; + int errno_temp; + +#ifdef HAVE_LIBZ + char gz_to_path[MAXPGPATH]; + gzFile gz_out = NULL; + + if (is_compress) + { + snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); + to_path_p = gz_to_path; + } + else +#endif + to_path_p = to_path; + + /* open file for read */ + in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST); + if (in == NULL) + elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, + strerror(errno)); + + /* Check if possible to skip copying */ + if (fileExists(to_path_p, FIO_BACKUP_HOST)) + { + if (fileEqualCRC(from_path, to_path_p, is_compress)) + return; + /* Do not copy and do not rise error. Just quit as normal. */ + else if (!overwrite) + elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p); + } + + /* open backup file for write */ +#ifdef HAVE_LIBZ + if (is_compress) + { + snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); + + gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, instance_config.compress_level, FIO_BACKUP_HOST); + if (gz_out == NULL) + elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", + to_path_temp, strerror(errno)); + } + else +#endif + { + snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); + + out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST); + if (out < 0) + elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", + to_path_temp, strerror(errno)); + } + + /* copy content */ + for (;;) + { + ssize_t read_len = 0; + + read_len = fio_fread(in, buf, sizeof(buf)); + + if (read_len < 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, + "Cannot read source WAL file \"%s\": %s", + from_path, strerror(errno_temp)); + } + + if (read_len > 0) + { +#ifdef HAVE_LIBZ + if (is_compress) + { + if (fio_gzwrite(gz_out, buf, read_len) != read_len) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s", + to_path_temp, get_gz_error(gz_out, errno_temp)); + } + } + else +#endif + { + if (fio_write(out, buf, read_len) != read_len) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot write to WAL file \"%s\": %s", + to_path_temp, strerror(errno_temp)); + } + } + } + + if (read_len == 0) + break; + } + +#ifdef HAVE_LIBZ + if (is_compress) + { + if (fio_gzclose(gz_out) != 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + to_path_temp, get_gz_error(gz_out, errno_temp)); + } + } + else +#endif + { + if (fio_flush(out) != 0 || fio_close(out) != 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot write WAL file \"%s\": %s", + to_path_temp, strerror(errno_temp)); + } + } + + if (fio_fclose(in)) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot close source WAL file \"%s\": %s", + from_path, strerror(errno_temp)); + } + + /* update file permission. */ + copy_file_attributes(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true); + + if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_BACKUP_HOST); + elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", + to_path_temp, to_path_p, strerror(errno_temp)); + } + +#ifdef HAVE_LIBZ + if (is_compress) + elog(INFO, "WAL file compressed to \"%s\"", gz_to_path); +#endif +} + +/* + * Copy WAL segment from archive catalog to pgdata with possible decompression. + */ +void +get_wal_file(const char *from_path, const char *to_path) +{ + FILE *in = NULL; + int out; + char buf[XLOG_BLCKSZ]; + const char *from_path_p = from_path; + char to_path_temp[MAXPGPATH]; + int errno_temp; + bool is_decompress = false; + +#ifdef HAVE_LIBZ + char gz_from_path[MAXPGPATH]; + gzFile gz_in = NULL; +#endif + + /* First check source file for existance */ + if (fio_access(from_path, F_OK, FIO_BACKUP_HOST) != 0) + { +#ifdef HAVE_LIBZ + /* + * Maybe we need to decompress the file. Check it with .gz + * extension. + */ + snprintf(gz_from_path, sizeof(gz_from_path), "%s.gz", from_path); + if (fio_access(gz_from_path, F_OK, FIO_BACKUP_HOST) == 0) + { + /* Found compressed file */ + is_decompress = true; + from_path_p = gz_from_path; + } +#endif + /* Didn't find compressed file */ + if (!is_decompress) + elog(ERROR, "Source WAL file \"%s\" doesn't exist", + from_path); + } + + /* open file for read */ + if (!is_decompress) + { + in = fio_fopen(from_path, PG_BINARY_R, FIO_BACKUP_HOST); + if (in == NULL) + elog(ERROR, "Cannot open source WAL file \"%s\": %s", + from_path, strerror(errno)); + } +#ifdef HAVE_LIBZ + else + { + gz_in = fio_gzopen(gz_from_path, PG_BINARY_R, Z_DEFAULT_COMPRESSION, + FIO_BACKUP_HOST); + if (gz_in == NULL) + elog(ERROR, "Cannot open compressed WAL file \"%s\": %s", + gz_from_path, strerror(errno)); + } +#endif + + /* open backup file for write */ + snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); + + out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_DB_HOST); + if (out < 0) + elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", + to_path_temp, strerror(errno)); + + /* copy content */ + for (;;) + { + int read_len = 0; + +#ifdef HAVE_LIBZ + if (is_decompress) + { + read_len = fio_gzread(gz_in, buf, sizeof(buf)); + if (read_len <= 0 && !fio_gzeof(gz_in)) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_DB_HOST); + elog(ERROR, "Cannot read compressed WAL file \"%s\": %s", + gz_from_path, get_gz_error(gz_in, errno_temp)); + } + } + else +#endif + { + read_len = fio_fread(in, buf, sizeof(buf)); + if (read_len < 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_DB_HOST); + elog(ERROR, "Cannot read source WAL file \"%s\": %s", + from_path, strerror(errno_temp)); + } + } + + if (read_len > 0) + { + if (fio_write(out, buf, read_len) != read_len) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_DB_HOST); + elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path_temp, + strerror(errno_temp)); + } + } + + /* Check for EOF */ +#ifdef HAVE_LIBZ + if (is_decompress) + { + if (fio_gzeof(gz_in) || read_len == 0) + break; + } + else +#endif + { + if (/* feof(in) || */ read_len == 0) + break; + } + } + + if (fio_flush(out) != 0 || fio_close(out) != 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_DB_HOST); + elog(ERROR, "Cannot write WAL file \"%s\": %s", + to_path_temp, strerror(errno_temp)); + } + +#ifdef HAVE_LIBZ + if (is_decompress) + { + if (fio_gzclose(gz_in) != 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_DB_HOST); + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + gz_from_path, get_gz_error(gz_in, errno_temp)); + } + } + else +#endif + { + if (fio_fclose(in)) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_DB_HOST); + elog(ERROR, "Cannot close source WAL file \"%s\": %s", + from_path, strerror(errno_temp)); + } + } + + /* update file permission. */ + copy_file_attributes(from_path_p, FIO_BACKUP_HOST, to_path_temp, FIO_DB_HOST, true); + + if (fio_rename(to_path_temp, to_path, FIO_DB_HOST) < 0) + { + errno_temp = errno; + fio_unlink(to_path_temp, FIO_DB_HOST); + elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", + to_path_temp, to_path, strerror(errno_temp)); + } + +#ifdef HAVE_LIBZ + if (is_decompress) + elog(INFO, "WAL file decompressed from \"%s\"", gz_from_path); +#endif +} + +#ifdef HAVE_LIBZ +/* + * Show error during work with compressed file + */ +static const char * +get_gz_error(gzFile gzf, int errnum) +{ + int gz_errnum; + const char *errmsg; + + errmsg = fio_gzerror(gzf, &gz_errnum); + if (gz_errnum == Z_ERRNO) + return strerror(errnum); + else + return errmsg; +} +#endif + +/* + * compare CRC of two WAL files. + * If necessary, decompress WAL file from path2 + */ +static bool +fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) +{ + pg_crc32 crc1; + pg_crc32 crc2; + + /* Get checksum of backup file */ +#ifdef HAVE_LIBZ + if (path2_is_compressed) + { + char buf [1024]; + gzFile gz_in = NULL; + + INIT_FILE_CRC32(true, crc2); + gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST); + if (gz_in == NULL) + /* File cannot be read */ + elog(ERROR, + "Cannot compare WAL file \"%s\" with compressed \"%s\"", + path1, path2); + + for (;;) + { + int read_len = fio_gzread(gz_in, buf, sizeof(buf)); + if (read_len <= 0 && !fio_gzeof(gz_in)) + { + /* An error occurred while reading the file */ + elog(WARNING, + "Cannot compare WAL file \"%s\" with compressed \"%s\": %d", + path1, path2, read_len); + return false; + } + COMP_FILE_CRC32(true, crc2, buf, read_len); + if (fio_gzeof(gz_in) || read_len == 0) + break; + } + FIN_FILE_CRC32(true, crc2); + + if (fio_gzclose(gz_in) != 0) + elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", + path2, get_gz_error(gz_in, errno)); + } + else +#endif + { + crc2 = pgFileGetCRC(path2, true, true, NULL, FIO_BACKUP_HOST); + } + + /* Get checksum of original file */ + crc1 = pgFileGetCRC(path1, true, true, NULL, FIO_DB_HOST); + + return EQ_CRC32C(crc1, crc2); +} + +/* Copy file attributes */ +static void +copy_file_attributes(const char *from_path, fio_location from_location, + const char *to_path, fio_location to_location, + bool unlink_on_error) +{ + struct stat st; + + if (fio_stat(from_path, &st, true, from_location) == -1) + { + if (unlink_on_error) + fio_unlink(to_path, to_location); + elog(ERROR, "Cannot stat file \"%s\": %s", + from_path, strerror(errno)); + } + + if (fio_chmod(to_path, st.st_mode, to_location) == -1) + { + if (unlink_on_error) + fio_unlink(to_path, to_location); + elog(ERROR, "Cannot change mode of file \"%s\": %s", + to_path, strerror(errno)); + } +} diff --git a/src/backup.c b/src/backup.c index e7acdd2e..87829d9d 100644 --- a/src/backup.c +++ b/src/backup.c @@ -81,7 +81,6 @@ bool heapallindexed_is_supported = false; /* Backup connections */ static PGconn *backup_conn = NULL; static PGconn *master_conn = NULL; -static PGconn *backup_conn_replication = NULL; /* PostgreSQL server version from "backup_conn" */ static int server_version = 0; @@ -101,7 +100,6 @@ static void backup_disconnect(bool fatal, void *userdata); static void pgdata_basic_setup(bool amcheck_only); static void *backup_files(void *arg); -static void *remote_backup_files(void *arg); static void do_backup_instance(void); @@ -127,9 +125,6 @@ static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void make_pagemap_from_ptrack(parray *files); static void *StreamLog(void *arg); -static void get_remote_pgdata_filelist(parray *files); -static void ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum); -static void remote_copy_file(PGconn *conn, pgFile* file); static void check_external_for_tablespaces(parray *external_list); /* Ptrack functions */ @@ -157,324 +152,6 @@ static void set_cfs_datafiles(parray *files, const char *root, char *relative, s exit(code); \ } -/* Fill "files" with data about all the files to backup */ -static void -get_remote_pgdata_filelist(parray *files) -{ - PGresult *res; - int resultStatus; - int i; - - backup_conn_replication = pgut_connect_replication(instance_config.pghost, - instance_config.pgport, - instance_config.pgdatabase, - instance_config.pguser); - - if (PQsendQuery(backup_conn_replication, "FILE_BACKUP FILELIST") == 0) - elog(ERROR,"%s: could not send replication command \"%s\": %s", - PROGRAM_NAME, "FILE_BACKUP", PQerrorMessage(backup_conn_replication)); - - res = PQgetResult(backup_conn_replication); - - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - resultStatus = PQresultStatus(res); - PQclear(res); - elog(ERROR, "cannot start getting FILE_BACKUP filelist: %s, result_status %d", - PQerrorMessage(backup_conn_replication), resultStatus); - } - - if (PQntuples(res) < 1) - elog(ERROR, "%s: no data returned from server", PROGRAM_NAME); - - for (i = 0; i < PQntuples(res); i++) - { - ReceiveFileList(files, backup_conn_replication, res, i); - } - - res = PQgetResult(backup_conn_replication); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - elog(ERROR, "%s: final receive failed: %s", - PROGRAM_NAME, PQerrorMessage(backup_conn_replication)); - } - - PQfinish(backup_conn_replication); -} - -/* - * workhorse for get_remote_pgdata_filelist(). - * Parse received message into pgFile structure. - */ -static void -ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum) -{ - char filename[MAXPGPATH]; - pgoff_t current_len_left = 0; - bool basetablespace; - char *copybuf = NULL; - pgFile *pgfile; - - /* What for do we need this basetablespace field?? */ - basetablespace = PQgetisnull(res, rownum, 0); - if (basetablespace) - elog(LOG,"basetablespace"); - else - elog(LOG, "basetablespace %s", PQgetvalue(res, rownum, 1)); - - res = PQgetResult(conn); - - if (PQresultStatus(res) != PGRES_COPY_OUT) - elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(conn)); - - while (1) - { - int r; - int filemode; - - if (copybuf != NULL) - { - PQfreemem(copybuf); - copybuf = NULL; - } - - r = PQgetCopyData(conn, ©buf, 0); - - if (r == -2) - elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn)); - - /* end of copy */ - if (r == -1) - break; - - /* This must be the header for a new file */ - if (r != 512) - elog(ERROR, "Invalid tar block header size: %d\n", r); - - current_len_left = read_tar_number(©buf[124], 12); - - /* Set permissions on the file */ - filemode = read_tar_number(©buf[100], 8); - - /* First part of header is zero terminated filename */ - snprintf(filename, sizeof(filename), "%s", copybuf); - - pgfile = pgFileInit(filename, filename); - pgfile->size = current_len_left; - pgfile->mode |= filemode; - - if (filename[strlen(filename) - 1] == '/') - { - /* Symbolic link or directory has size zero */ - Assert (pgfile->size == 0); - /* Ends in a slash means directory or symlink to directory */ - if (copybuf[156] == '5') - { - /* Directory */ - pgfile->mode |= S_IFDIR; - } - else if (copybuf[156] == '2') - { - /* Symlink */ -#ifndef WIN32 - pgfile->mode |= S_IFLNK; -#else - pgfile->mode |= S_IFDIR; -#endif - } - else - elog(ERROR, "Unrecognized link indicator \"%c\"\n", - copybuf[156]); - } - else - { - /* regular file */ - pgfile->mode |= S_IFREG; - } - - parray_append(files, pgfile); - } - - if (copybuf != NULL) - PQfreemem(copybuf); -} - -/* read one file via replication protocol - * and write it to the destination subdir in 'backup_path' */ -static void -remote_copy_file(PGconn *conn, pgFile* file) -{ - PGresult *res; - char *copybuf = NULL; - char buf[32768]; - FILE *out; - char database_path[MAXPGPATH]; - char to_path[MAXPGPATH]; - bool skip_padding = false; - - pgBackupGetPath(¤t, database_path, lengthof(database_path), - DATABASE_DIR); - join_path_components(to_path, database_path, file->path); - - out = fopen(to_path, PG_BINARY_W); - if (out == NULL) - { - int errno_tmp = errno; - elog(ERROR, "cannot open destination file \"%s\": %s", - to_path, strerror(errno_tmp)); - } - - INIT_FILE_CRC32(true, file->crc); - - /* read from stream and write to backup file */ - while (1) - { - int row_length; - int errno_tmp; - int write_buffer_size = 0; - if (copybuf != NULL) - { - PQfreemem(copybuf); - copybuf = NULL; - } - - row_length = PQgetCopyData(conn, ©buf, 0); - - if (row_length == -2) - elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn)); - - if (row_length == -1) - break; - - if (!skip_padding) - { - write_buffer_size = Min(row_length, sizeof(buf)); - memcpy(buf, copybuf, write_buffer_size); - COMP_FILE_CRC32(true, file->crc, buf, write_buffer_size); - - /* TODO calc checksum*/ - if (fwrite(buf, 1, write_buffer_size, out) != write_buffer_size) - { - errno_tmp = errno; - /* oops */ - FIN_FILE_CRC32(true, file->crc); - fclose(out); - PQfinish(conn); - elog(ERROR, "cannot write to \"%s\": %s", to_path, - strerror(errno_tmp)); - } - - file->read_size += write_buffer_size; - } - if (file->read_size >= file->size) - { - skip_padding = true; - } - } - - res = PQgetResult(conn); - - /* File is not found. That's normal. */ - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - elog(ERROR, "final receive failed: status %d ; %s",PQresultStatus(res), PQerrorMessage(conn)); - } - - file->write_size = (int64) file->read_size; - FIN_FILE_CRC32(true, file->crc); - - fclose(out); -} - -/* - * Take a remote backup of the PGDATA at a file level. - * Copy all directories and files listed in backup_files_list. - */ -static void * -remote_backup_files(void *arg) -{ - int i; - backup_files_arg *arguments = (backup_files_arg *) arg; - int n_backup_files_list = parray_num(arguments->files_list); - PGconn *file_backup_conn = NULL; - - for (i = 0; i < n_backup_files_list; i++) - { - char *query_str; - PGresult *res; - char *copybuf = NULL; - pgFile *file; - int row_length; - - file = (pgFile *) parray_get(arguments->files_list, i); - - /* We have already copied all directories */ - if (S_ISDIR(file->mode)) - continue; - - if (!pg_atomic_test_set_flag(&file->lock)) - continue; - - file_backup_conn = pgut_connect_replication(instance_config.pghost, - instance_config.pgport, - instance_config.pgdatabase, - instance_config.pguser); - - /* check for interrupt */ - if (interrupted || thread_interrupted) - elog(ERROR, "interrupted during backup"); - - query_str = psprintf("FILE_BACKUP FILEPATH '%s'",file->path); - - if (PQsendQuery(file_backup_conn, query_str) == 0) - elog(ERROR,"%s: could not send replication command \"%s\": %s", - PROGRAM_NAME, query_str, PQerrorMessage(file_backup_conn)); - - res = PQgetResult(file_backup_conn); - - /* File is not found. That's normal. */ - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - PQclear(res); - PQfinish(file_backup_conn); - continue; - } - - if (PQresultStatus(res) != PGRES_COPY_OUT) - { - PQclear(res); - PQfinish(file_backup_conn); - elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(file_backup_conn)); - } - - /* read the header of the file */ - row_length = PQgetCopyData(file_backup_conn, ©buf, 0); - - if (row_length == -2) - elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(file_backup_conn)); - - /* end of copy TODO handle it */ - if (row_length == -1) - elog(ERROR, "Unexpected end of COPY data"); - - if(row_length != 512) - elog(ERROR, "Invalid tar block header size: %d\n", row_length); - file->size = read_tar_number(©buf[124], 12); - - /* receive the data from stream and write to backup file */ - remote_copy_file(file_backup_conn, file); - - elog(VERBOSE, "File \"%s\". Copied " INT64_FORMAT " bytes", - file->path, file->write_size); - PQfinish(file_backup_conn); - } - - /* Data files transferring is successful */ - arguments->ret = 0; - - return NULL; -} - /* * Take a backup of a single postgresql instance. * Move files from 'pgdata' to a subdirectory in 'backup_path'. @@ -513,32 +190,7 @@ do_backup_instance(void) current.data_bytes = 0; /* Obtain current timeline */ - if (IsReplicationProtocol()) - { - char *sysidentifier; - TimeLineID starttli; - XLogRecPtr startpos; - - backup_conn_replication = pgut_connect_replication(instance_config.pghost, - instance_config.pgport, - instance_config.pgdatabase, - instance_config.pguser); - - /* Check replication prorocol connection */ - if (!RunIdentifySystem(backup_conn_replication, &sysidentifier, &starttli, &startpos, NULL)) - elog(ERROR, "Failed to send command for remote backup"); - -// TODO implement the check -// if (&sysidentifier != instance_config.system_identifier) -// elog(ERROR, "Backup data directory was initialized for system id %ld, but target backup directory system id is %ld", -// instance_config.system_identifier, sysidentifier); - - current.tli = starttli; - - PQfinish(backup_conn_replication); - } - else - current.tli = get_current_timeline(false); + current.tli = get_current_timeline(false); /* * In incremental backup mode ensure that already-validated @@ -664,12 +316,8 @@ do_backup_instance(void) backup_files_list = parray_new(); /* list files with the logical path. omit $PGDATA */ - - if (IsReplicationProtocol()) - get_remote_pgdata_filelist(backup_files_list); - else - dir_list_file(backup_files_list, instance_config.pgdata, - true, true, false, 0, FIO_DB_HOST); + dir_list_file(backup_files_list, instance_config.pgdata, + true, true, false, 0, FIO_DB_HOST); /* * Append to backup list all files and directories @@ -749,15 +397,12 @@ do_backup_instance(void) char dirpath[MAXPGPATH]; char *dir_name; - if (!IsReplicationProtocol()) - if (file->external_dir_num) - dir_name = GetRelativePath(file->path, - parray_get(external_dirs, - file->external_dir_num - 1)); - else - dir_name = GetRelativePath(file->path, instance_config.pgdata); + if (file->external_dir_num) + dir_name = GetRelativePath(file->path, + parray_get(external_dirs, + file->external_dir_num - 1)); else - dir_name = file->path; + dir_name = GetRelativePath(file->path, instance_config.pgdata); elog(VERBOSE, "Create directory \"%s\"", dir_name); @@ -812,11 +457,7 @@ do_backup_instance(void) backup_files_arg *arg = &(threads_args[i]); elog(VERBOSE, "Start thread num: %i", i); - - if (!IsReplicationProtocol()) - pthread_create(&threads[i], NULL, backup_files, arg); - else - pthread_create(&threads[i], NULL, remote_backup_files, arg); + pthread_create(&threads[i], NULL, backup_files, arg); } /* Wait threads */ @@ -896,7 +537,11 @@ do_backup_instance(void) { pgFile *file = (pgFile *) parray_get(xlog_files_list, i); if (S_ISREG(file->mode)) - calc_file_checksum(file, FIO_BACKUP_HOST); + { + file->crc = pgFileGetCRC(file->path, true, false, + &file->read_size, FIO_BACKUP_HOST); + file->write_size = file->read_size; + } /* Remove file path root prefix*/ if (strstr(file->path, database_path) == file->path) { @@ -1234,8 +879,7 @@ pgdata_basic_setup(bool amcheck_only) * instance we opened connection to. And that target backup database PGDATA * belogns to the same instance. */ - /* TODO fix it for remote backup */ - if (!IsReplicationProtocol() && !amcheck_only) + if (!amcheck_only) check_system_identifiers(); if (current.checksum_version) @@ -2383,7 +2027,9 @@ pg_stop_backup(pgBackup *backup) { file = pgFileNew(backup_label, backup_label, true, 0, FIO_BACKUP_HOST); - calc_file_checksum(file, FIO_BACKUP_HOST); + file->crc = pgFileGetCRC(file->path, true, false, + &file->read_size, FIO_BACKUP_HOST); + file->write_size = file->read_size; free(file->path); file->path = strdup(PG_BACKUP_LABEL_FILE); parray_append(backup_files_list, file); @@ -2428,7 +2074,11 @@ pg_stop_backup(pgBackup *backup) file = pgFileNew(tablespace_map, tablespace_map, true, 0, FIO_BACKUP_HOST); if (S_ISREG(file->mode)) - calc_file_checksum(file, FIO_BACKUP_HOST); + { + file->crc = pgFileGetCRC(file->path, true, false, + &file->read_size, FIO_BACKUP_HOST); + file->write_size = file->read_size; + } free(file->path); file->path = strdup(PG_TABLESPACE_MAP_FILE); parray_append(backup_files_list, file); @@ -2836,7 +2486,9 @@ backup_files(void *arg) if (prev_file && file->exists_in_prev && buf.st_mtime < current.parent_backup) { - calc_file_checksum(file, FIO_DB_HOST); + file->crc = pgFileGetCRC(file->path, true, false, + &file->read_size, FIO_DB_HOST); + file->write_size = file->read_size; /* ...and checksum is the same... */ if (EQ_TRADITIONAL_CRC32(file->crc, (*prev_file)->crc)) skip = true; /* ...skip copying file. */ diff --git a/src/data.c b/src/data.c index cf020a1e..bcc92bd8 100644 --- a/src/data.c +++ b/src/data.c @@ -31,9 +31,6 @@ typedef union DataPage char data[BLCKSZ]; } DataPage; -static bool -fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed); - #ifdef HAVE_LIBZ /* Implementation of zlib compression method */ static int32 @@ -1071,394 +1068,6 @@ copy_file(fio_location from_location, const char *to_root, return true; } -#ifdef HAVE_LIBZ -/* - * Show error during work with compressed file - */ -static const char * -get_gz_error(gzFile gzf, int errnum) -{ - int gz_errnum; - const char *errmsg; - - errmsg = fio_gzerror(gzf, &gz_errnum); - if (gz_errnum == Z_ERRNO) - return strerror(errnum); - else - return errmsg; -} -#endif - -/* - * Copy file attributes - */ -static void -copy_meta(const char *from_path, fio_location from_location, const char *to_path, fio_location to_location, bool unlink_on_error) -{ - struct stat st; - - if (fio_stat(from_path, &st, true, from_location) == -1) - { - if (unlink_on_error) - fio_unlink(to_path, to_location); - elog(ERROR, "Cannot stat file \"%s\": %s", - from_path, strerror(errno)); - } - - if (fio_chmod(to_path, st.st_mode, to_location) == -1) - { - if (unlink_on_error) - fio_unlink(to_path, to_location); - elog(ERROR, "Cannot change mode of file \"%s\": %s", - to_path, strerror(errno)); - } -} - -/* - * Copy WAL segment from pgdata to archive catalog with possible compression. - */ -void -push_wal_file(const char *from_path, const char *to_path, bool is_compress, - bool overwrite) -{ - FILE *in = NULL; - int out = -1; - char buf[XLOG_BLCKSZ]; - const char *to_path_p; - char to_path_temp[MAXPGPATH]; - int errno_temp; - -#ifdef HAVE_LIBZ - char gz_to_path[MAXPGPATH]; - gzFile gz_out = NULL; - - if (is_compress) - { - snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path); - to_path_p = gz_to_path; - } - else -#endif - to_path_p = to_path; - - /* open file for read */ - in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST); - if (in == NULL) - elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path, - strerror(errno)); - - /* Check if possible to skip copying */ - if (fileExists(to_path_p, FIO_BACKUP_HOST)) - { - if (fileEqualCRC(from_path, to_path_p, is_compress)) - return; - /* Do not copy and do not rise error. Just quit as normal. */ - else if (!overwrite) - elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p); - } - - /* open backup file for write */ -#ifdef HAVE_LIBZ - if (is_compress) - { - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path); - - gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, instance_config.compress_level, FIO_BACKUP_HOST); - if (gz_out == NULL) - elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", - to_path_temp, strerror(errno)); - } - else -#endif - { - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); - - out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST); - if (out < 0) - elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", - to_path_temp, strerror(errno)); - } - - /* copy content */ - for (;;) - { - ssize_t read_len = 0; - - read_len = fio_fread(in, buf, sizeof(buf)); - - if (read_len < 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, - "Cannot read source WAL file \"%s\": %s", - from_path, strerror(errno_temp)); - } - - if (read_len > 0) - { -#ifdef HAVE_LIBZ - if (is_compress) - { - if (fio_gzwrite(gz_out, buf, read_len) != read_len) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s", - to_path_temp, get_gz_error(gz_out, errno_temp)); - } - } - else -#endif - { - if (fio_write(out, buf, read_len) != read_len) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot write to WAL file \"%s\": %s", - to_path_temp, strerror(errno_temp)); - } - } - } - - if (read_len == 0) - break; - } - -#ifdef HAVE_LIBZ - if (is_compress) - { - if (fio_gzclose(gz_out) != 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", - to_path_temp, get_gz_error(gz_out, errno_temp)); - } - } - else -#endif - { - if (fio_flush(out) != 0 || fio_close(out) != 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot write WAL file \"%s\": %s", - to_path_temp, strerror(errno_temp)); - } - } - - if (fio_fclose(in)) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot close source WAL file \"%s\": %s", - from_path, strerror(errno_temp)); - } - - /* update file permission. */ - copy_meta(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true); - - if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_BACKUP_HOST); - elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", - to_path_temp, to_path_p, strerror(errno_temp)); - } - -#ifdef HAVE_LIBZ - if (is_compress) - elog(INFO, "WAL file compressed to \"%s\"", gz_to_path); -#endif -} - -/* - * Copy WAL segment from archive catalog to pgdata with possible decompression. - */ -void -get_wal_file(const char *from_path, const char *to_path) -{ - FILE *in = NULL; - int out; - char buf[XLOG_BLCKSZ]; - const char *from_path_p = from_path; - char to_path_temp[MAXPGPATH]; - int errno_temp; - bool is_decompress = false; - -#ifdef HAVE_LIBZ - char gz_from_path[MAXPGPATH]; - gzFile gz_in = NULL; -#endif - - /* First check source file for existance */ - if (fio_access(from_path, F_OK, FIO_BACKUP_HOST) != 0) - { -#ifdef HAVE_LIBZ - /* - * Maybe we need to decompress the file. Check it with .gz - * extension. - */ - snprintf(gz_from_path, sizeof(gz_from_path), "%s.gz", from_path); - if (fio_access(gz_from_path, F_OK, FIO_BACKUP_HOST) == 0) - { - /* Found compressed file */ - is_decompress = true; - from_path_p = gz_from_path; - } -#endif - /* Didn't find compressed file */ - if (!is_decompress) - elog(ERROR, "Source WAL file \"%s\" doesn't exist", - from_path); - } - - /* open file for read */ - if (!is_decompress) - { - in = fio_fopen(from_path, PG_BINARY_R, FIO_BACKUP_HOST); - if (in == NULL) - elog(ERROR, "Cannot open source WAL file \"%s\": %s", - from_path, strerror(errno)); - } -#ifdef HAVE_LIBZ - else - { - gz_in = fio_gzopen(gz_from_path, PG_BINARY_R, Z_DEFAULT_COMPRESSION, - FIO_BACKUP_HOST); - if (gz_in == NULL) - elog(ERROR, "Cannot open compressed WAL file \"%s\": %s", - gz_from_path, strerror(errno)); - } -#endif - - /* open backup file for write */ - snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path); - - out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_DB_HOST); - if (out < 0) - elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s", - to_path_temp, strerror(errno)); - - /* copy content */ - for (;;) - { - int read_len = 0; - -#ifdef HAVE_LIBZ - if (is_decompress) - { - read_len = fio_gzread(gz_in, buf, sizeof(buf)); - if (read_len <= 0 && !fio_gzeof(gz_in)) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_DB_HOST); - elog(ERROR, "Cannot read compressed WAL file \"%s\": %s", - gz_from_path, get_gz_error(gz_in, errno_temp)); - } - } - else -#endif - { - read_len = fio_fread(in, buf, sizeof(buf)); - if (read_len < 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_DB_HOST); - elog(ERROR, "Cannot read source WAL file \"%s\": %s", - from_path, strerror(errno_temp)); - } - } - - if (read_len > 0) - { - if (fio_write(out, buf, read_len) != read_len) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_DB_HOST); - elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path_temp, - strerror(errno_temp)); - } - } - - /* Check for EOF */ -#ifdef HAVE_LIBZ - if (is_decompress) - { - if (fio_gzeof(gz_in) || read_len == 0) - break; - } - else -#endif - { - if (/* feof(in) || */ read_len == 0) - break; - } - } - - if (fio_flush(out) != 0 || fio_close(out) != 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_DB_HOST); - elog(ERROR, "Cannot write WAL file \"%s\": %s", - to_path_temp, strerror(errno_temp)); - } - -#ifdef HAVE_LIBZ - if (is_decompress) - { - if (fio_gzclose(gz_in) != 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_DB_HOST); - elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", - gz_from_path, get_gz_error(gz_in, errno_temp)); - } - } - else -#endif - { - if (fio_fclose(in)) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_DB_HOST); - elog(ERROR, "Cannot close source WAL file \"%s\": %s", - from_path, strerror(errno_temp)); - } - } - - /* update file permission. */ - copy_meta(from_path_p, FIO_BACKUP_HOST, to_path_temp, FIO_DB_HOST, true); - - if (fio_rename(to_path_temp, to_path, FIO_DB_HOST) < 0) - { - errno_temp = errno; - fio_unlink(to_path_temp, FIO_DB_HOST); - elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s", - to_path_temp, to_path, strerror(errno_temp)); - } - -#ifdef HAVE_LIBZ - if (is_decompress) - elog(INFO, "WAL file decompressed from \"%s\"", gz_from_path); -#endif -} - -/* - * Calculate checksum of various files which are not copied from PGDATA, - * but created in process of backup, such as stream XLOG files, - * PG_TABLESPACE_MAP_FILE and PG_BACKUP_LABEL_FILE. - */ -void -calc_file_checksum(pgFile *file, fio_location location) -{ - Assert(S_ISREG(file->mode)); - - file->crc = pgFileGetCRC(file->path, true, false, &file->read_size, location); - file->write_size = file->read_size; -} - /* * Validate given page. * @@ -1794,57 +1403,3 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, uint32 checksum_version, return is_valid; } - -static bool -fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed) -{ - pg_crc32 crc1; - pg_crc32 crc2; - - /* Get checksum of backup file */ -#ifdef HAVE_LIBZ - if (path2_is_compressed) - { - char buf [1024]; - gzFile gz_in = NULL; - - INIT_FILE_CRC32(true, crc2); - gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST); - if (gz_in == NULL) - /* File cannot be read */ - elog(ERROR, - "Cannot compare WAL file \"%s\" with compressed \"%s\"", - path1, path2); - - for (;;) - { - int read_len = fio_gzread(gz_in, buf, sizeof(buf)); - if (read_len <= 0 && !fio_gzeof(gz_in)) - { - /* An error occurred while reading the file */ - elog(WARNING, - "Cannot compare WAL file \"%s\" with compressed \"%s\": %d", - path1, path2, read_len); - return false; - } - COMP_FILE_CRC32(true, crc2, buf, read_len); - if (fio_gzeof(gz_in) || read_len == 0) - break; - } - FIN_FILE_CRC32(true, crc2); - - if (fio_gzclose(gz_in) != 0) - elog(ERROR, "Cannot close compressed WAL file \"%s\": %s", - path2, get_gz_error(gz_in, errno)); - } - else -#endif - { - crc2 = pgFileGetCRC(path2, true, true, NULL, FIO_BACKUP_HOST); - } - - /* Get checksum of original file */ - crc1 = pgFileGetCRC(path1, true, true, NULL, FIO_DB_HOST); - - return EQ_CRC32C(crc1, crc2); -} diff --git a/src/pg_probackup.h b/src/pg_probackup.h index abd13ee6..e07eccf5 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -394,7 +394,6 @@ typedef struct BackupPageHeader #endif #define IsSshProtocol() (instance_config.remote.host && strcmp(instance_config.remote.proto, "ssh") == 0) -#define IsReplicationProtocol() (instance_config.remote.host && strcmp(instance_config.remote.proto, "replication") == 0) /* directory options */ extern char *pg_probackup; @@ -631,12 +630,6 @@ extern void restore_data_file(const char *to_path, uint32 backup_version); extern bool copy_file(fio_location from_location, const char *to_root, fio_location to_location, pgFile *file); -extern void move_file(const char *from_root, const char *to_root, pgFile *file); -extern void push_wal_file(const char *from_path, const char *to_path, - bool is_compress, bool overwrite); -extern void get_wal_file(const char *from_path, const char *to_path); - -extern void calc_file_checksum(pgFile *file, fio_location location); extern bool check_file_pages(pgFile *file, XLogRecPtr stop_lsn, uint32 checksum_version, uint32 backup_version); @@ -672,11 +665,8 @@ extern void set_min_recovery_point(pgFile *file, const char *backup_path, extern void copy_pgcontrol_file(const char *from_root, fio_location location, const char *to_root, fio_location to_location, pgFile *file); -extern void sanityChecks(void); extern void time2iso(char *buf, size_t len, time_t time); extern const char *status2str(BackupStatus status); -extern void remove_trailing_space(char *buf, int comment_mark); -extern void remove_not_digit(char *buf, size_t len, const char *str); extern const char *base36enc(long unsigned int value); extern char *base36enc_dup(long unsigned int value); extern long unsigned int base36dec(const char *text); diff --git a/src/restore.c b/src/restore.c index ec7a3882..ce242485 100644 --- a/src/restore.c +++ b/src/restore.c @@ -40,7 +40,6 @@ static void create_recovery_conf(time_t backup_id, pgBackup *backup); static parray *read_timeline_history(TimeLineID targetTLI); static void *restore_files(void *arg); -static void remove_deleted_files(pgBackup *backup, parray *external_dirs); /* * Entry point of pg_probackup RESTORE and VALIDATE subcommands. diff --git a/src/util.c b/src/util.c index 788f5c9d..5f9e87f5 100644 --- a/src/util.c +++ b/src/util.c @@ -458,41 +458,3 @@ status2str(BackupStatus status) return statusName[status]; } - -void -remove_trailing_space(char *buf, int comment_mark) -{ - int i; - char *last_char = NULL; - - for (i = 0; buf[i]; i++) - { - if (buf[i] == comment_mark || buf[i] == '\n' || buf[i] == '\r') - { - buf[i] = '\0'; - break; - } - } - for (i = 0; buf[i]; i++) - { - if (!isspace(buf[i])) - last_char = buf + i; - } - if (last_char != NULL) - *(last_char + 1) = '\0'; - -} - -void -remove_not_digit(char *buf, size_t len, const char *str) -{ - int i, j; - - for (i = 0, j = 0; str[i] && j < len; i++) - { - if (!isdigit(str[i])) - continue; - buf[j++] = str[i]; - } - buf[j] = '\0'; -}