1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-12-03 09:59:53 +02:00

Merge remote-tracking branch 'origin/master' into issue_63

This commit is contained in:
Arthur Zakirov 2019-05-06 17:51:41 +03:00
commit 0ee0226b82
6 changed files with 472 additions and 868 deletions

View File

@ -12,6 +12,19 @@
#include <unistd.h>
static void push_wal_file(const char *from_path, const char *to_path,
bool is_compress, bool overwrite);
static void get_wal_file(const char *from_path, const char *to_path);
#ifdef HAVE_LIBZ
static const char *get_gz_error(gzFile gzf, int errnum);
#endif
static bool fileEqualCRC(const char *path1, const char *path2,
bool path2_is_compressed);
static void copy_file_attributes(const char *from_path,
fio_location from_location,
const char *to_path, fio_location to_location,
bool unlink_on_error);
/*
* pg_probackup specific archive command for archive backups
* set archive_command = 'pg_probackup archive-push -B /home/anastasia/backup
@ -114,3 +127,436 @@ do_archive_get(char *wal_file_path, char *wal_file_name)
return 0;
}
/* ------------- INTERNAL FUNCTIONS ---------- */
/*
* Copy WAL segment from pgdata to archive catalog with possible compression.
*/
void
push_wal_file(const char *from_path, const char *to_path, bool is_compress,
bool overwrite)
{
FILE *in = NULL;
int out = -1;
char buf[XLOG_BLCKSZ];
const char *to_path_p;
char to_path_temp[MAXPGPATH];
int errno_temp;
#ifdef HAVE_LIBZ
char gz_to_path[MAXPGPATH];
gzFile gz_out = NULL;
if (is_compress)
{
snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path);
to_path_p = gz_to_path;
}
else
#endif
to_path_p = to_path;
/* open file for read */
in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST);
if (in == NULL)
elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path,
strerror(errno));
/* Check if possible to skip copying */
if (fileExists(to_path_p, FIO_BACKUP_HOST))
{
if (fileEqualCRC(from_path, to_path_p, is_compress))
return;
/* Do not copy and do not rise error. Just quit as normal. */
else if (!overwrite)
elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p);
}
/* open backup file for write */
#ifdef HAVE_LIBZ
if (is_compress)
{
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path);
gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, instance_config.compress_level, FIO_BACKUP_HOST);
if (gz_out == NULL)
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
to_path_temp, strerror(errno));
}
else
#endif
{
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST);
if (out < 0)
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
to_path_temp, strerror(errno));
}
/* copy content */
for (;;)
{
ssize_t read_len = 0;
read_len = fio_fread(in, buf, sizeof(buf));
if (read_len < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR,
"Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
if (read_len > 0)
{
#ifdef HAVE_LIBZ
if (is_compress)
{
if (fio_gzwrite(gz_out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s",
to_path_temp, get_gz_error(gz_out, errno_temp));
}
}
else
#endif
{
if (fio_write(out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write to WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
}
}
if (read_len == 0)
break;
}
#ifdef HAVE_LIBZ
if (is_compress)
{
if (fio_gzclose(gz_out) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
to_path_temp, get_gz_error(gz_out, errno_temp));
}
}
else
#endif
{
if (fio_flush(out) != 0 || fio_close(out) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
}
if (fio_fclose(in))
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
/* update file permission. */
copy_file_attributes(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true);
if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s",
to_path_temp, to_path_p, strerror(errno_temp));
}
#ifdef HAVE_LIBZ
if (is_compress)
elog(INFO, "WAL file compressed to \"%s\"", gz_to_path);
#endif
}
/*
* Copy WAL segment from archive catalog to pgdata with possible decompression.
*/
void
get_wal_file(const char *from_path, const char *to_path)
{
FILE *in = NULL;
int out;
char buf[XLOG_BLCKSZ];
const char *from_path_p = from_path;
char to_path_temp[MAXPGPATH];
int errno_temp;
bool is_decompress = false;
#ifdef HAVE_LIBZ
char gz_from_path[MAXPGPATH];
gzFile gz_in = NULL;
#endif
/* First check source file for existance */
if (fio_access(from_path, F_OK, FIO_BACKUP_HOST) != 0)
{
#ifdef HAVE_LIBZ
/*
* Maybe we need to decompress the file. Check it with .gz
* extension.
*/
snprintf(gz_from_path, sizeof(gz_from_path), "%s.gz", from_path);
if (fio_access(gz_from_path, F_OK, FIO_BACKUP_HOST) == 0)
{
/* Found compressed file */
is_decompress = true;
from_path_p = gz_from_path;
}
#endif
/* Didn't find compressed file */
if (!is_decompress)
elog(ERROR, "Source WAL file \"%s\" doesn't exist",
from_path);
}
/* open file for read */
if (!is_decompress)
{
in = fio_fopen(from_path, PG_BINARY_R, FIO_BACKUP_HOST);
if (in == NULL)
elog(ERROR, "Cannot open source WAL file \"%s\": %s",
from_path, strerror(errno));
}
#ifdef HAVE_LIBZ
else
{
gz_in = fio_gzopen(gz_from_path, PG_BINARY_R, Z_DEFAULT_COMPRESSION,
FIO_BACKUP_HOST);
if (gz_in == NULL)
elog(ERROR, "Cannot open compressed WAL file \"%s\": %s",
gz_from_path, strerror(errno));
}
#endif
/* open backup file for write */
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_DB_HOST);
if (out < 0)
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
to_path_temp, strerror(errno));
/* copy content */
for (;;)
{
int read_len = 0;
#ifdef HAVE_LIBZ
if (is_decompress)
{
read_len = fio_gzread(gz_in, buf, sizeof(buf));
if (read_len <= 0 && !fio_gzeof(gz_in))
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot read compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in, errno_temp));
}
}
else
#endif
{
read_len = fio_fread(in, buf, sizeof(buf));
if (read_len < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
}
if (read_len > 0)
{
if (fio_write(out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path_temp,
strerror(errno_temp));
}
}
/* Check for EOF */
#ifdef HAVE_LIBZ
if (is_decompress)
{
if (fio_gzeof(gz_in) || read_len == 0)
break;
}
else
#endif
{
if (/* feof(in) || */ read_len == 0)
break;
}
}
if (fio_flush(out) != 0 || fio_close(out) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
#ifdef HAVE_LIBZ
if (is_decompress)
{
if (fio_gzclose(gz_in) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in, errno_temp));
}
}
else
#endif
{
if (fio_fclose(in))
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
}
/* update file permission. */
copy_file_attributes(from_path_p, FIO_BACKUP_HOST, to_path_temp, FIO_DB_HOST, true);
if (fio_rename(to_path_temp, to_path, FIO_DB_HOST) < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s",
to_path_temp, to_path, strerror(errno_temp));
}
#ifdef HAVE_LIBZ
if (is_decompress)
elog(INFO, "WAL file decompressed from \"%s\"", gz_from_path);
#endif
}
#ifdef HAVE_LIBZ
/*
* Show error during work with compressed file
*/
static const char *
get_gz_error(gzFile gzf, int errnum)
{
int gz_errnum;
const char *errmsg;
errmsg = fio_gzerror(gzf, &gz_errnum);
if (gz_errnum == Z_ERRNO)
return strerror(errnum);
else
return errmsg;
}
#endif
/*
* compare CRC of two WAL files.
* If necessary, decompress WAL file from path2
*/
static bool
fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed)
{
pg_crc32 crc1;
pg_crc32 crc2;
/* Get checksum of backup file */
#ifdef HAVE_LIBZ
if (path2_is_compressed)
{
char buf [1024];
gzFile gz_in = NULL;
INIT_FILE_CRC32(true, crc2);
gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST);
if (gz_in == NULL)
/* File cannot be read */
elog(ERROR,
"Cannot compare WAL file \"%s\" with compressed \"%s\"",
path1, path2);
for (;;)
{
int read_len = fio_gzread(gz_in, buf, sizeof(buf));
if (read_len <= 0 && !fio_gzeof(gz_in))
{
/* An error occurred while reading the file */
elog(WARNING,
"Cannot compare WAL file \"%s\" with compressed \"%s\": %d",
path1, path2, read_len);
return false;
}
COMP_FILE_CRC32(true, crc2, buf, read_len);
if (fio_gzeof(gz_in) || read_len == 0)
break;
}
FIN_FILE_CRC32(true, crc2);
if (fio_gzclose(gz_in) != 0)
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
path2, get_gz_error(gz_in, errno));
}
else
#endif
{
crc2 = pgFileGetCRC(path2, true, true, NULL, FIO_BACKUP_HOST);
}
/* Get checksum of original file */
crc1 = pgFileGetCRC(path1, true, true, NULL, FIO_DB_HOST);
return EQ_CRC32C(crc1, crc2);
}
/* Copy file attributes */
static void
copy_file_attributes(const char *from_path, fio_location from_location,
const char *to_path, fio_location to_location,
bool unlink_on_error)
{
struct stat st;
if (fio_stat(from_path, &st, true, from_location) == -1)
{
if (unlink_on_error)
fio_unlink(to_path, to_location);
elog(ERROR, "Cannot stat file \"%s\": %s",
from_path, strerror(errno));
}
if (fio_chmod(to_path, st.st_mode, to_location) == -1)
{
if (unlink_on_error)
fio_unlink(to_path, to_location);
elog(ERROR, "Cannot change mode of file \"%s\": %s",
to_path, strerror(errno));
}
}

View File

@ -81,7 +81,6 @@ bool heapallindexed_is_supported = false;
/* Backup connections */
static PGconn *backup_conn = NULL;
static PGconn *master_conn = NULL;
static PGconn *backup_conn_replication = NULL;
/* PostgreSQL server version from "backup_conn" */
static int server_version = 0;
@ -101,7 +100,6 @@ static void backup_disconnect(bool fatal, void *userdata);
static void pgdata_basic_setup(bool amcheck_only);
static void *backup_files(void *arg);
static void *remote_backup_files(void *arg);
static void do_backup_instance(void);
@ -127,9 +125,6 @@ static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup);
static void make_pagemap_from_ptrack(parray *files);
static void *StreamLog(void *arg);
static void get_remote_pgdata_filelist(parray *files);
static void ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum);
static void remote_copy_file(PGconn *conn, pgFile* file);
static void check_external_for_tablespaces(parray *external_list);
/* Ptrack functions */
@ -157,324 +152,6 @@ static void set_cfs_datafiles(parray *files, const char *root, char *relative, s
exit(code); \
}
/* Fill "files" with data about all the files to backup */
static void
get_remote_pgdata_filelist(parray *files)
{
PGresult *res;
int resultStatus;
int i;
backup_conn_replication = pgut_connect_replication(instance_config.pghost,
instance_config.pgport,
instance_config.pgdatabase,
instance_config.pguser);
if (PQsendQuery(backup_conn_replication, "FILE_BACKUP FILELIST") == 0)
elog(ERROR,"%s: could not send replication command \"%s\": %s",
PROGRAM_NAME, "FILE_BACKUP", PQerrorMessage(backup_conn_replication));
res = PQgetResult(backup_conn_replication);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
resultStatus = PQresultStatus(res);
PQclear(res);
elog(ERROR, "cannot start getting FILE_BACKUP filelist: %s, result_status %d",
PQerrorMessage(backup_conn_replication), resultStatus);
}
if (PQntuples(res) < 1)
elog(ERROR, "%s: no data returned from server", PROGRAM_NAME);
for (i = 0; i < PQntuples(res); i++)
{
ReceiveFileList(files, backup_conn_replication, res, i);
}
res = PQgetResult(backup_conn_replication);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(ERROR, "%s: final receive failed: %s",
PROGRAM_NAME, PQerrorMessage(backup_conn_replication));
}
PQfinish(backup_conn_replication);
}
/*
* workhorse for get_remote_pgdata_filelist().
* Parse received message into pgFile structure.
*/
static void
ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum)
{
char filename[MAXPGPATH];
pgoff_t current_len_left = 0;
bool basetablespace;
char *copybuf = NULL;
pgFile *pgfile;
/* What for do we need this basetablespace field?? */
basetablespace = PQgetisnull(res, rownum, 0);
if (basetablespace)
elog(LOG,"basetablespace");
else
elog(LOG, "basetablespace %s", PQgetvalue(res, rownum, 1));
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COPY_OUT)
elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(conn));
while (1)
{
int r;
int filemode;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
r = PQgetCopyData(conn, &copybuf, 0);
if (r == -2)
elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn));
/* end of copy */
if (r == -1)
break;
/* This must be the header for a new file */
if (r != 512)
elog(ERROR, "Invalid tar block header size: %d\n", r);
current_len_left = read_tar_number(&copybuf[124], 12);
/* Set permissions on the file */
filemode = read_tar_number(&copybuf[100], 8);
/* First part of header is zero terminated filename */
snprintf(filename, sizeof(filename), "%s", copybuf);
pgfile = pgFileInit(filename, filename);
pgfile->size = current_len_left;
pgfile->mode |= filemode;
if (filename[strlen(filename) - 1] == '/')
{
/* Symbolic link or directory has size zero */
Assert (pgfile->size == 0);
/* Ends in a slash means directory or symlink to directory */
if (copybuf[156] == '5')
{
/* Directory */
pgfile->mode |= S_IFDIR;
}
else if (copybuf[156] == '2')
{
/* Symlink */
#ifndef WIN32
pgfile->mode |= S_IFLNK;
#else
pgfile->mode |= S_IFDIR;
#endif
}
else
elog(ERROR, "Unrecognized link indicator \"%c\"\n",
copybuf[156]);
}
else
{
/* regular file */
pgfile->mode |= S_IFREG;
}
parray_append(files, pgfile);
}
if (copybuf != NULL)
PQfreemem(copybuf);
}
/* read one file via replication protocol
* and write it to the destination subdir in 'backup_path' */
static void
remote_copy_file(PGconn *conn, pgFile* file)
{
PGresult *res;
char *copybuf = NULL;
char buf[32768];
FILE *out;
char database_path[MAXPGPATH];
char to_path[MAXPGPATH];
bool skip_padding = false;
pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR);
join_path_components(to_path, database_path, file->path);
out = fopen(to_path, PG_BINARY_W);
if (out == NULL)
{
int errno_tmp = errno;
elog(ERROR, "cannot open destination file \"%s\": %s",
to_path, strerror(errno_tmp));
}
INIT_FILE_CRC32(true, file->crc);
/* read from stream and write to backup file */
while (1)
{
int row_length;
int errno_tmp;
int write_buffer_size = 0;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
row_length = PQgetCopyData(conn, &copybuf, 0);
if (row_length == -2)
elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn));
if (row_length == -1)
break;
if (!skip_padding)
{
write_buffer_size = Min(row_length, sizeof(buf));
memcpy(buf, copybuf, write_buffer_size);
COMP_FILE_CRC32(true, file->crc, buf, write_buffer_size);
/* TODO calc checksum*/
if (fwrite(buf, 1, write_buffer_size, out) != write_buffer_size)
{
errno_tmp = errno;
/* oops */
FIN_FILE_CRC32(true, file->crc);
fclose(out);
PQfinish(conn);
elog(ERROR, "cannot write to \"%s\": %s", to_path,
strerror(errno_tmp));
}
file->read_size += write_buffer_size;
}
if (file->read_size >= file->size)
{
skip_padding = true;
}
}
res = PQgetResult(conn);
/* File is not found. That's normal. */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(ERROR, "final receive failed: status %d ; %s",PQresultStatus(res), PQerrorMessage(conn));
}
file->write_size = (int64) file->read_size;
FIN_FILE_CRC32(true, file->crc);
fclose(out);
}
/*
* Take a remote backup of the PGDATA at a file level.
* Copy all directories and files listed in backup_files_list.
*/
static void *
remote_backup_files(void *arg)
{
int i;
backup_files_arg *arguments = (backup_files_arg *) arg;
int n_backup_files_list = parray_num(arguments->files_list);
PGconn *file_backup_conn = NULL;
for (i = 0; i < n_backup_files_list; i++)
{
char *query_str;
PGresult *res;
char *copybuf = NULL;
pgFile *file;
int row_length;
file = (pgFile *) parray_get(arguments->files_list, i);
/* We have already copied all directories */
if (S_ISDIR(file->mode))
continue;
if (!pg_atomic_test_set_flag(&file->lock))
continue;
file_backup_conn = pgut_connect_replication(instance_config.pghost,
instance_config.pgport,
instance_config.pgdatabase,
instance_config.pguser);
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during backup");
query_str = psprintf("FILE_BACKUP FILEPATH '%s'",file->path);
if (PQsendQuery(file_backup_conn, query_str) == 0)
elog(ERROR,"%s: could not send replication command \"%s\": %s",
PROGRAM_NAME, query_str, PQerrorMessage(file_backup_conn));
res = PQgetResult(file_backup_conn);
/* File is not found. That's normal. */
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
PQfinish(file_backup_conn);
continue;
}
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
PQclear(res);
PQfinish(file_backup_conn);
elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(file_backup_conn));
}
/* read the header of the file */
row_length = PQgetCopyData(file_backup_conn, &copybuf, 0);
if (row_length == -2)
elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(file_backup_conn));
/* end of copy TODO handle it */
if (row_length == -1)
elog(ERROR, "Unexpected end of COPY data");
if(row_length != 512)
elog(ERROR, "Invalid tar block header size: %d\n", row_length);
file->size = read_tar_number(&copybuf[124], 12);
/* receive the data from stream and write to backup file */
remote_copy_file(file_backup_conn, file);
elog(VERBOSE, "File \"%s\". Copied " INT64_FORMAT " bytes",
file->path, file->write_size);
PQfinish(file_backup_conn);
}
/* Data files transferring is successful */
arguments->ret = 0;
return NULL;
}
/*
* Take a backup of a single postgresql instance.
* Move files from 'pgdata' to a subdirectory in 'backup_path'.
@ -513,32 +190,7 @@ do_backup_instance(void)
current.data_bytes = 0;
/* Obtain current timeline */
if (IsReplicationProtocol())
{
char *sysidentifier;
TimeLineID starttli;
XLogRecPtr startpos;
backup_conn_replication = pgut_connect_replication(instance_config.pghost,
instance_config.pgport,
instance_config.pgdatabase,
instance_config.pguser);
/* Check replication prorocol connection */
if (!RunIdentifySystem(backup_conn_replication, &sysidentifier, &starttli, &startpos, NULL))
elog(ERROR, "Failed to send command for remote backup");
// TODO implement the check
// if (&sysidentifier != instance_config.system_identifier)
// elog(ERROR, "Backup data directory was initialized for system id %ld, but target backup directory system id is %ld",
// instance_config.system_identifier, sysidentifier);
current.tli = starttli;
PQfinish(backup_conn_replication);
}
else
current.tli = get_current_timeline(false);
current.tli = get_current_timeline(false);
/*
* In incremental backup mode ensure that already-validated
@ -664,12 +316,8 @@ do_backup_instance(void)
backup_files_list = parray_new();
/* list files with the logical path. omit $PGDATA */
if (IsReplicationProtocol())
get_remote_pgdata_filelist(backup_files_list);
else
dir_list_file(backup_files_list, instance_config.pgdata,
true, true, false, 0, FIO_DB_HOST);
dir_list_file(backup_files_list, instance_config.pgdata,
true, true, false, 0, FIO_DB_HOST);
/*
* Append to backup list all files and directories
@ -749,15 +397,12 @@ do_backup_instance(void)
char dirpath[MAXPGPATH];
char *dir_name;
if (!IsReplicationProtocol())
if (file->external_dir_num)
dir_name = GetRelativePath(file->path,
parray_get(external_dirs,
file->external_dir_num - 1));
else
dir_name = GetRelativePath(file->path, instance_config.pgdata);
if (file->external_dir_num)
dir_name = GetRelativePath(file->path,
parray_get(external_dirs,
file->external_dir_num - 1));
else
dir_name = file->path;
dir_name = GetRelativePath(file->path, instance_config.pgdata);
elog(VERBOSE, "Create directory \"%s\"", dir_name);
@ -812,11 +457,7 @@ do_backup_instance(void)
backup_files_arg *arg = &(threads_args[i]);
elog(VERBOSE, "Start thread num: %i", i);
if (!IsReplicationProtocol())
pthread_create(&threads[i], NULL, backup_files, arg);
else
pthread_create(&threads[i], NULL, remote_backup_files, arg);
pthread_create(&threads[i], NULL, backup_files, arg);
}
/* Wait threads */
@ -896,7 +537,11 @@ do_backup_instance(void)
{
pgFile *file = (pgFile *) parray_get(xlog_files_list, i);
if (S_ISREG(file->mode))
calc_file_checksum(file, FIO_BACKUP_HOST);
{
file->crc = pgFileGetCRC(file->path, true, false,
&file->read_size, FIO_BACKUP_HOST);
file->write_size = file->read_size;
}
/* Remove file path root prefix*/
if (strstr(file->path, database_path) == file->path)
{
@ -1234,8 +879,7 @@ pgdata_basic_setup(bool amcheck_only)
* instance we opened connection to. And that target backup database PGDATA
* belogns to the same instance.
*/
/* TODO fix it for remote backup */
if (!IsReplicationProtocol() && !amcheck_only)
if (!amcheck_only)
check_system_identifiers();
if (current.checksum_version)
@ -2383,7 +2027,9 @@ pg_stop_backup(pgBackup *backup)
{
file = pgFileNew(backup_label, backup_label, true, 0,
FIO_BACKUP_HOST);
calc_file_checksum(file, FIO_BACKUP_HOST);
file->crc = pgFileGetCRC(file->path, true, false,
&file->read_size, FIO_BACKUP_HOST);
file->write_size = file->read_size;
free(file->path);
file->path = strdup(PG_BACKUP_LABEL_FILE);
parray_append(backup_files_list, file);
@ -2428,7 +2074,11 @@ pg_stop_backup(pgBackup *backup)
file = pgFileNew(tablespace_map, tablespace_map, true, 0,
FIO_BACKUP_HOST);
if (S_ISREG(file->mode))
calc_file_checksum(file, FIO_BACKUP_HOST);
{
file->crc = pgFileGetCRC(file->path, true, false,
&file->read_size, FIO_BACKUP_HOST);
file->write_size = file->read_size;
}
free(file->path);
file->path = strdup(PG_TABLESPACE_MAP_FILE);
parray_append(backup_files_list, file);
@ -2836,7 +2486,9 @@ backup_files(void *arg)
if (prev_file && file->exists_in_prev &&
buf.st_mtime < current.parent_backup)
{
calc_file_checksum(file, FIO_DB_HOST);
file->crc = pgFileGetCRC(file->path, true, false,
&file->read_size, FIO_DB_HOST);
file->write_size = file->read_size;
/* ...and checksum is the same... */
if (EQ_TRADITIONAL_CRC32(file->crc, (*prev_file)->crc))
skip = true; /* ...skip copying file. */

View File

@ -31,9 +31,6 @@ typedef union DataPage
char data[BLCKSZ];
} DataPage;
static bool
fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed);
#ifdef HAVE_LIBZ
/* Implementation of zlib compression method */
static int32
@ -1071,394 +1068,6 @@ copy_file(fio_location from_location, const char *to_root,
return true;
}
#ifdef HAVE_LIBZ
/*
* Show error during work with compressed file
*/
static const char *
get_gz_error(gzFile gzf, int errnum)
{
int gz_errnum;
const char *errmsg;
errmsg = fio_gzerror(gzf, &gz_errnum);
if (gz_errnum == Z_ERRNO)
return strerror(errnum);
else
return errmsg;
}
#endif
/*
* Copy file attributes
*/
static void
copy_meta(const char *from_path, fio_location from_location, const char *to_path, fio_location to_location, bool unlink_on_error)
{
struct stat st;
if (fio_stat(from_path, &st, true, from_location) == -1)
{
if (unlink_on_error)
fio_unlink(to_path, to_location);
elog(ERROR, "Cannot stat file \"%s\": %s",
from_path, strerror(errno));
}
if (fio_chmod(to_path, st.st_mode, to_location) == -1)
{
if (unlink_on_error)
fio_unlink(to_path, to_location);
elog(ERROR, "Cannot change mode of file \"%s\": %s",
to_path, strerror(errno));
}
}
/*
* Copy WAL segment from pgdata to archive catalog with possible compression.
*/
void
push_wal_file(const char *from_path, const char *to_path, bool is_compress,
bool overwrite)
{
FILE *in = NULL;
int out = -1;
char buf[XLOG_BLCKSZ];
const char *to_path_p;
char to_path_temp[MAXPGPATH];
int errno_temp;
#ifdef HAVE_LIBZ
char gz_to_path[MAXPGPATH];
gzFile gz_out = NULL;
if (is_compress)
{
snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path);
to_path_p = gz_to_path;
}
else
#endif
to_path_p = to_path;
/* open file for read */
in = fio_fopen(from_path, PG_BINARY_R, FIO_DB_HOST);
if (in == NULL)
elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path,
strerror(errno));
/* Check if possible to skip copying */
if (fileExists(to_path_p, FIO_BACKUP_HOST))
{
if (fileEqualCRC(from_path, to_path_p, is_compress))
return;
/* Do not copy and do not rise error. Just quit as normal. */
else if (!overwrite)
elog(ERROR, "WAL segment \"%s\" already exists.", to_path_p);
}
/* open backup file for write */
#ifdef HAVE_LIBZ
if (is_compress)
{
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", gz_to_path);
gz_out = fio_gzopen(to_path_temp, PG_BINARY_W, instance_config.compress_level, FIO_BACKUP_HOST);
if (gz_out == NULL)
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
to_path_temp, strerror(errno));
}
else
#endif
{
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST);
if (out < 0)
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
to_path_temp, strerror(errno));
}
/* copy content */
for (;;)
{
ssize_t read_len = 0;
read_len = fio_fread(in, buf, sizeof(buf));
if (read_len < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR,
"Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
if (read_len > 0)
{
#ifdef HAVE_LIBZ
if (is_compress)
{
if (fio_gzwrite(gz_out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s",
to_path_temp, get_gz_error(gz_out, errno_temp));
}
}
else
#endif
{
if (fio_write(out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write to WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
}
}
if (read_len == 0)
break;
}
#ifdef HAVE_LIBZ
if (is_compress)
{
if (fio_gzclose(gz_out) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
to_path_temp, get_gz_error(gz_out, errno_temp));
}
}
else
#endif
{
if (fio_flush(out) != 0 || fio_close(out) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
}
if (fio_fclose(in))
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
/* update file permission. */
copy_meta(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true);
if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_BACKUP_HOST);
elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s",
to_path_temp, to_path_p, strerror(errno_temp));
}
#ifdef HAVE_LIBZ
if (is_compress)
elog(INFO, "WAL file compressed to \"%s\"", gz_to_path);
#endif
}
/*
* Copy WAL segment from archive catalog to pgdata with possible decompression.
*/
void
get_wal_file(const char *from_path, const char *to_path)
{
FILE *in = NULL;
int out;
char buf[XLOG_BLCKSZ];
const char *from_path_p = from_path;
char to_path_temp[MAXPGPATH];
int errno_temp;
bool is_decompress = false;
#ifdef HAVE_LIBZ
char gz_from_path[MAXPGPATH];
gzFile gz_in = NULL;
#endif
/* First check source file for existance */
if (fio_access(from_path, F_OK, FIO_BACKUP_HOST) != 0)
{
#ifdef HAVE_LIBZ
/*
* Maybe we need to decompress the file. Check it with .gz
* extension.
*/
snprintf(gz_from_path, sizeof(gz_from_path), "%s.gz", from_path);
if (fio_access(gz_from_path, F_OK, FIO_BACKUP_HOST) == 0)
{
/* Found compressed file */
is_decompress = true;
from_path_p = gz_from_path;
}
#endif
/* Didn't find compressed file */
if (!is_decompress)
elog(ERROR, "Source WAL file \"%s\" doesn't exist",
from_path);
}
/* open file for read */
if (!is_decompress)
{
in = fio_fopen(from_path, PG_BINARY_R, FIO_BACKUP_HOST);
if (in == NULL)
elog(ERROR, "Cannot open source WAL file \"%s\": %s",
from_path, strerror(errno));
}
#ifdef HAVE_LIBZ
else
{
gz_in = fio_gzopen(gz_from_path, PG_BINARY_R, Z_DEFAULT_COMPRESSION,
FIO_BACKUP_HOST);
if (gz_in == NULL)
elog(ERROR, "Cannot open compressed WAL file \"%s\": %s",
gz_from_path, strerror(errno));
}
#endif
/* open backup file for write */
snprintf(to_path_temp, sizeof(to_path_temp), "%s.partial", to_path);
out = fio_open(to_path_temp, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_DB_HOST);
if (out < 0)
elog(ERROR, "Cannot open destination temporary WAL file \"%s\": %s",
to_path_temp, strerror(errno));
/* copy content */
for (;;)
{
int read_len = 0;
#ifdef HAVE_LIBZ
if (is_decompress)
{
read_len = fio_gzread(gz_in, buf, sizeof(buf));
if (read_len <= 0 && !fio_gzeof(gz_in))
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot read compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in, errno_temp));
}
}
else
#endif
{
read_len = fio_fread(in, buf, sizeof(buf));
if (read_len < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
}
if (read_len > 0)
{
if (fio_write(out, buf, read_len) != read_len)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path_temp,
strerror(errno_temp));
}
}
/* Check for EOF */
#ifdef HAVE_LIBZ
if (is_decompress)
{
if (fio_gzeof(gz_in) || read_len == 0)
break;
}
else
#endif
{
if (/* feof(in) || */ read_len == 0)
break;
}
}
if (fio_flush(out) != 0 || fio_close(out) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path_temp, strerror(errno_temp));
}
#ifdef HAVE_LIBZ
if (is_decompress)
{
if (fio_gzclose(gz_in) != 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in, errno_temp));
}
}
else
#endif
{
if (fio_fclose(in))
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno_temp));
}
}
/* update file permission. */
copy_meta(from_path_p, FIO_BACKUP_HOST, to_path_temp, FIO_DB_HOST, true);
if (fio_rename(to_path_temp, to_path, FIO_DB_HOST) < 0)
{
errno_temp = errno;
fio_unlink(to_path_temp, FIO_DB_HOST);
elog(ERROR, "Cannot rename WAL file \"%s\" to \"%s\": %s",
to_path_temp, to_path, strerror(errno_temp));
}
#ifdef HAVE_LIBZ
if (is_decompress)
elog(INFO, "WAL file decompressed from \"%s\"", gz_from_path);
#endif
}
/*
* Calculate checksum of various files which are not copied from PGDATA,
* but created in process of backup, such as stream XLOG files,
* PG_TABLESPACE_MAP_FILE and PG_BACKUP_LABEL_FILE.
*/
void
calc_file_checksum(pgFile *file, fio_location location)
{
Assert(S_ISREG(file->mode));
file->crc = pgFileGetCRC(file->path, true, false, &file->read_size, location);
file->write_size = file->read_size;
}
/*
* Validate given page.
*
@ -1794,57 +1403,3 @@ check_file_pages(pgFile *file, XLogRecPtr stop_lsn, uint32 checksum_version,
return is_valid;
}
static bool
fileEqualCRC(const char *path1, const char *path2, bool path2_is_compressed)
{
pg_crc32 crc1;
pg_crc32 crc2;
/* Get checksum of backup file */
#ifdef HAVE_LIBZ
if (path2_is_compressed)
{
char buf [1024];
gzFile gz_in = NULL;
INIT_FILE_CRC32(true, crc2);
gz_in = fio_gzopen(path2, PG_BINARY_R, Z_DEFAULT_COMPRESSION, FIO_BACKUP_HOST);
if (gz_in == NULL)
/* File cannot be read */
elog(ERROR,
"Cannot compare WAL file \"%s\" with compressed \"%s\"",
path1, path2);
for (;;)
{
int read_len = fio_gzread(gz_in, buf, sizeof(buf));
if (read_len <= 0 && !fio_gzeof(gz_in))
{
/* An error occurred while reading the file */
elog(WARNING,
"Cannot compare WAL file \"%s\" with compressed \"%s\": %d",
path1, path2, read_len);
return false;
}
COMP_FILE_CRC32(true, crc2, buf, read_len);
if (fio_gzeof(gz_in) || read_len == 0)
break;
}
FIN_FILE_CRC32(true, crc2);
if (fio_gzclose(gz_in) != 0)
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
path2, get_gz_error(gz_in, errno));
}
else
#endif
{
crc2 = pgFileGetCRC(path2, true, true, NULL, FIO_BACKUP_HOST);
}
/* Get checksum of original file */
crc1 = pgFileGetCRC(path1, true, true, NULL, FIO_DB_HOST);
return EQ_CRC32C(crc1, crc2);
}

View File

@ -394,7 +394,6 @@ typedef struct BackupPageHeader
#endif
#define IsSshProtocol() (instance_config.remote.host && strcmp(instance_config.remote.proto, "ssh") == 0)
#define IsReplicationProtocol() (instance_config.remote.host && strcmp(instance_config.remote.proto, "replication") == 0)
/* directory options */
extern char *pg_probackup;
@ -631,12 +630,6 @@ extern void restore_data_file(const char *to_path,
uint32 backup_version);
extern bool copy_file(fio_location from_location, const char *to_root,
fio_location to_location, pgFile *file);
extern void move_file(const char *from_root, const char *to_root, pgFile *file);
extern void push_wal_file(const char *from_path, const char *to_path,
bool is_compress, bool overwrite);
extern void get_wal_file(const char *from_path, const char *to_path);
extern void calc_file_checksum(pgFile *file, fio_location location);
extern bool check_file_pages(pgFile *file, XLogRecPtr stop_lsn,
uint32 checksum_version, uint32 backup_version);
@ -672,11 +665,8 @@ extern void set_min_recovery_point(pgFile *file, const char *backup_path,
extern void copy_pgcontrol_file(const char *from_root, fio_location location, const char *to_root, fio_location to_location,
pgFile *file);
extern void sanityChecks(void);
extern void time2iso(char *buf, size_t len, time_t time);
extern const char *status2str(BackupStatus status);
extern void remove_trailing_space(char *buf, int comment_mark);
extern void remove_not_digit(char *buf, size_t len, const char *str);
extern const char *base36enc(long unsigned int value);
extern char *base36enc_dup(long unsigned int value);
extern long unsigned int base36dec(const char *text);

View File

@ -40,7 +40,6 @@ static void create_recovery_conf(time_t backup_id,
pgBackup *backup);
static parray *read_timeline_history(TimeLineID targetTLI);
static void *restore_files(void *arg);
static void remove_deleted_files(pgBackup *backup, parray *external_dirs);
/*
* Entry point of pg_probackup RESTORE and VALIDATE subcommands.

View File

@ -458,41 +458,3 @@ status2str(BackupStatus status)
return statusName[status];
}
void
remove_trailing_space(char *buf, int comment_mark)
{
int i;
char *last_char = NULL;
for (i = 0; buf[i]; i++)
{
if (buf[i] == comment_mark || buf[i] == '\n' || buf[i] == '\r')
{
buf[i] = '\0';
break;
}
}
for (i = 0; buf[i]; i++)
{
if (!isspace(buf[i]))
last_char = buf + i;
}
if (last_char != NULL)
*(last_char + 1) = '\0';
}
void
remove_not_digit(char *buf, size_t len, const char *str)
{
int i, j;
for (i = 0, j = 0; str[i] && j < len; i++)
{
if (!isdigit(str[i]))
continue;
buf[j++] = str[i];
}
buf[j] = '\0';
}