1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-03 14:01:57 +02:00

Merge branch 'pgpro-1184'

This commit is contained in:
Arthur Zakirov 2017-12-14 18:10:14 +03:00
commit 4457529351
8 changed files with 556 additions and 159 deletions

View File

@ -18,26 +18,27 @@
* --wal-file-path %p --wal-file-name %f', to move backups into arclog_path.
* Where archlog_path is $BACKUP_PATH/wal/system_id.
* Currently it just copies wal files to the new location.
* TODO: Planned options: compress, list the arclog content,
* TODO: Planned options: list the arclog content,
* compute and validate checksums.
*/
int
do_archive_push(char *wal_file_path, char *wal_file_name)
do_archive_push(char *wal_file_path, char *wal_file_name, bool overwrite)
{
char backup_wal_file_path[MAXPGPATH];
char absolute_wal_file_path[MAXPGPATH];
char current_dir[MAXPGPATH];
int64 system_id;
pgBackupConfig *config;
bool is_compress = false;
if (wal_file_name == NULL && wal_file_path == NULL)
elog(ERROR, "required parameters are not specified: --wal_file_name %%f --wal_file_path %%p");
elog(ERROR, "required parameters are not specified: --wal-file-name %%f --wal-file-path %%p");
if (wal_file_name == NULL)
elog(ERROR, "required parameter not specified: --wal_file_name %%f");
elog(ERROR, "required parameter not specified: --wal-file-name %%f");
if (wal_file_path == NULL)
elog(ERROR, "required parameter not specified: --wal_file_path %%p");
elog(ERROR, "required parameter not specified: --wal-file-path %%p");
if (!getcwd(current_dir, sizeof(current_dir)))
elog(ERROR, "getcwd() error");
@ -61,10 +62,16 @@ do_archive_push(char *wal_file_path, char *wal_file_name)
join_path_components(backup_wal_file_path, arclog_path, wal_file_name);
elog(INFO, "pg_probackup archive-push from %s to %s", absolute_wal_file_path, backup_wal_file_path);
if (access(backup_wal_file_path, F_OK) != -1)
elog(ERROR, "file '%s', already exists.", backup_wal_file_path);
copy_wal_file(absolute_wal_file_path, backup_wal_file_path);
#ifdef HAVE_LIBZ
if (compress_alg == PGLZ_COMPRESS)
elog(ERROR, "pglz compression is not supported");
if (compress_alg == ZLIB_COMPRESS)
is_compress = IsXLogFileName(wal_file_name);
#endif
push_wal_file(absolute_wal_file_path, backup_wal_file_path, is_compress,
overwrite);
elog(INFO, "pg_probackup archive-push completed successfully");
return 0;
@ -82,13 +89,13 @@ do_archive_get(char *wal_file_path, char *wal_file_name)
char current_dir[MAXPGPATH];
if (wal_file_name == NULL && wal_file_path == NULL)
elog(ERROR, "required parameters are not specified: --wal_file_name %%f --wal_file_path %%p");
elog(ERROR, "required parameters are not specified: --wal-file-name %%f --wal-file-path %%p");
if (wal_file_name == NULL)
elog(ERROR, "required parameter not specified: --wal_file_name %%f");
elog(ERROR, "required parameter not specified: --wal-file-name %%f");
if (wal_file_path == NULL)
elog(ERROR, "required parameter not specified: --wal_file_path %%p");
elog(ERROR, "required parameter not specified: --wal-file-path %%p");
if (!getcwd(current_dir, sizeof(current_dir)))
elog(ERROR, "getcwd() error");
@ -96,8 +103,9 @@ do_archive_get(char *wal_file_path, char *wal_file_name)
join_path_components(absolute_wal_file_path, current_dir, wal_file_path);
join_path_components(backup_wal_file_path, arclog_path, wal_file_name);
elog(INFO, "pg_probackup archive-get from %s to %s", backup_wal_file_path, absolute_wal_file_path);
copy_wal_file(backup_wal_file_path, absolute_wal_file_path);
elog(INFO, "pg_probackup archive-get from %s to %s",
backup_wal_file_path, absolute_wal_file_path);
get_wal_file(backup_wal_file_path, absolute_wal_file_path);
elog(INFO, "pg_probackup archive-get completed successfully");
return 0;

View File

@ -1314,11 +1314,16 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment)
TimeLineID tli;
XLogSegNo targetSegNo;
char wal_dir[MAXPGPATH],
wal_segment_full_path[MAXPGPATH];
wal_segment_path[MAXPGPATH];
char wal_segment[MAXFNAMELEN];
bool file_exists = false;
uint32 try_count = 0,
timeout;
#ifdef HAVE_LIBZ
char gz_wal_segment_path[MAXPGPATH];
#endif
tli = get_current_timeline(false);
/* Compute the name of the WAL file containig requested LSN */
@ -1331,14 +1336,14 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment)
{
pgBackupGetPath2(&current, wal_dir, lengthof(wal_dir),
DATABASE_DIR, PG_XLOG_DIR);
join_path_components(wal_segment_full_path, wal_dir, wal_segment);
join_path_components(wal_segment_path, wal_dir, wal_segment);
timeout = (uint32) checkpoint_timeout();
timeout = timeout + timeout * 0.1;
}
else
{
join_path_components(wal_segment_full_path, arclog_path, wal_segment);
join_path_components(wal_segment_path, arclog_path, wal_segment);
timeout = archive_timeout;
}
@ -1347,14 +1352,33 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment)
else
elog(LOG, "Looking for LSN: %X/%X in segment: %s", (uint32) (lsn >> 32), (uint32) lsn, wal_segment);
#ifdef HAVE_LIBZ
snprintf(gz_wal_segment_path, sizeof(gz_wal_segment_path), "%s.gz",
wal_segment_path);
#endif
/* Wait until target LSN is archived or streamed */
while (true)
{
bool file_exists = fileExists(wal_segment_full_path);
if (!file_exists)
{
file_exists = fileExists(wal_segment_path);
/* Try to find compressed WAL file */
if (!file_exists)
{
#ifdef HAVE_LIBZ
file_exists = fileExists(gz_wal_segment_path);
if (file_exists)
elog(LOG, "Found compressed WAL segment: %s", wal_segment_path);
#endif
}
else
elog(LOG, "Found WAL segment: %s", wal_segment_path);
}
if (file_exists)
{
elog(LOG, "Found segment: %s", wal_segment);
/* Do not check LSN for previous WAL segment */
if (wait_prev_segment)
return;
@ -1373,18 +1397,18 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment)
sleep(1);
if (interrupted)
elog(ERROR, "interrupted during waiting for WAL archiving");
elog(ERROR, "Interrupted during waiting for WAL archiving");
try_count++;
/* Inform user if WAL segment is absent in first attempt */
if (try_count == 1)
{
if (wait_prev_segment)
elog(INFO, "wait for WAL segment %s to be archived",
wal_segment_full_path);
elog(INFO, "Wait for WAL segment %s to be archived",
wal_segment_path);
else
elog(INFO, "wait for LSN %X/%X in archived WAL segment %s",
(uint32) (lsn >> 32), (uint32) lsn, wal_segment_full_path);
elog(INFO, "Wait for LSN %X/%X in archived WAL segment %s",
(uint32) (lsn >> 32), (uint32) lsn, wal_segment_path);
}
if (timeout > 0 && try_count > timeout)
@ -1396,7 +1420,7 @@ wait_wal_lsn(XLogRecPtr lsn, bool wait_prev_segment)
/* If WAL segment doesn't exist or we wait for previous segment */
else
elog(ERROR,
"switched WAL segment %s could not be archived in %d seconds",
"Switched WAL segment %s could not be archived in %d seconds",
wal_segment, timeout);
}
}
@ -1807,22 +1831,6 @@ checkpoint_timeout(void)
return val_int;
}
/*
* Return true if the path is a existing regular file.
*/
bool
fileExists(const char *path)
{
struct stat buf;
if (stat(path, &buf) == -1 && errno == ENOENT)
return false;
else if (!S_ISREG(buf.st_mode))
return false;
else
return true;
}
/*
* Notify end of backup to server when "backup_label" is in the root directory
* of the DB cluster.

View File

@ -20,8 +20,12 @@
#include "storage/bufpage.h"
#include "storage/checksum_impl.h"
#include <common/pg_lzcompress.h>
#include <zlib.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#ifdef HAVE_LIBZ
/* Implementation of zlib compression method */
static size_t zlib_compress(void* dst, size_t dst_size, void const* src, size_t src_size)
{
@ -37,6 +41,7 @@ static size_t zlib_decompress(void* dst, size_t dst_size, void const* src, size_
int rc = uncompress(dst, &dest_len, src, src_size);
return rc == Z_OK ? dest_len : rc;
}
#endif
/*
* Compresses source into dest using algorithm. Returns the number of bytes
@ -50,8 +55,10 @@ do_compress(void* dst, size_t dst_size, void const* src, size_t src_size, Compre
case NONE_COMPRESS:
case NOT_DEFINED_COMPRESS:
return -1;
#ifdef HAVE_LIBZ
case ZLIB_COMPRESS:
return zlib_compress(dst, dst_size, src, src_size);
#endif
case PGLZ_COMPRESS:
return pglz_compress(src, src_size, dst, PGLZ_strategy_always);
}
@ -71,8 +78,10 @@ do_decompress(void* dst, size_t dst_size, void const* src, size_t src_size, Comp
case NONE_COMPRESS:
case NOT_DEFINED_COMPRESS:
return -1;
#ifdef HAVE_LIBZ
case ZLIB_COMPRESS:
return zlib_decompress(dst, dst_size, src, src_size);
#endif
case PGLZ_COMPRESS:
return pglz_decompress(src, src_size, dst, dst_size);
}
@ -702,105 +711,321 @@ copy_file(const char *from_root, const char *to_root, pgFile *file)
return true;
}
/* Almost like copy file, except the fact we don't calculate checksum */
void
copy_wal_file(const char *from_path, const char *to_path)
#ifdef HAVE_LIBZ
/*
* Show error during work with compressed file
*/
static const char *
get_gz_error(gzFile gzf)
{
FILE *in;
int errnum;
const char *errmsg;
errmsg = gzerror(gzf, &errnum);
if (errnum == Z_ERRNO)
return strerror(errno);
else
return errmsg;
}
#endif
/*
* Copy file attributes
*/
static void
copy_meta(const char *from_path, const char *to_path, bool unlink_on_error)
{
struct stat st;
if (stat(from_path, &st) == -1)
{
if (unlink_on_error)
unlink(to_path);
elog(ERROR, "Cannot stat file \"%s\": %s",
from_path, strerror(errno));
}
if (chmod(to_path, st.st_mode) == -1)
{
if (unlink_on_error)
unlink(to_path);
elog(ERROR, "Cannot change mode of file \"%s\": %s",
to_path, strerror(errno));
}
}
/*
* Copy WAL segment from pgdata to archive catalog with possible compression.
*/
void
push_wal_file(const char *from_path, const char *to_path, bool is_compress,
bool overwrite)
{
FILE *in = NULL;
FILE *out;
size_t read_len = 0;
int errno_tmp;
char buf[XLOG_BLCKSZ];
struct stat st;
const char *to_path_p = to_path;
#ifdef HAVE_LIBZ
char gz_to_path[MAXPGPATH];
gzFile gz_out = NULL;
#endif
/* open file for read */
in = fopen(from_path, "r");
if (in == NULL)
elog(ERROR, "Cannot open source WAL file \"%s\": %s", from_path,
strerror(errno));
/* open backup file for write */
#ifdef HAVE_LIBZ
if (is_compress)
{
snprintf(gz_to_path, sizeof(gz_to_path), "%s.gz", to_path);
if (!overwrite && fileExists(gz_to_path))
elog(ERROR, "WAL segment \"%s\" already exists.", gz_to_path);
gz_out = gzopen(gz_to_path, "wb");
if (gzsetparams(gz_out, compress_level, Z_DEFAULT_STRATEGY) != Z_OK)
elog(ERROR, "Cannot set compression level %d to file \"%s\": %s",
compress_level, gz_to_path, get_gz_error(gz_out));
to_path_p = gz_to_path;
}
else
#endif
{
if (!overwrite && fileExists(to_path))
elog(ERROR, "WAL segment \"%s\" already exists.", to_path);
out = fopen(to_path, "w");
if (out == NULL)
elog(ERROR, "Cannot open destination WAL file \"%s\": %s",
to_path, strerror(errno));
}
/* copy content */
for (;;)
{
size_t read_len = 0;
read_len = fread(buf, 1, sizeof(buf), in);
if (ferror(in))
{
unlink(to_path_p);
elog(ERROR, "Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno));
}
if (read_len > 0)
{
#ifdef HAVE_LIBZ
if (is_compress)
{
if (gzwrite(gz_out, buf, read_len) != read_len)
{
unlink(to_path_p);
elog(ERROR, "Cannot write to compressed WAL file \"%s\": %s",
gz_to_path, get_gz_error(gz_out));
}
}
else
#endif
{
if (fwrite(buf, 1, read_len, out) != read_len)
{
unlink(to_path_p);
elog(ERROR, "Cannot write to WAL file \"%s\": %s",
to_path, strerror(errno));
}
}
}
if (feof(in) || read_len == 0)
break;
}
#ifdef HAVE_LIBZ
if (is_compress)
{
if (gzclose(gz_out) != 0)
{
unlink(to_path_p);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
gz_to_path, get_gz_error(gz_out));
}
elog(INFO, "WAL file compressed to \"%s\"", gz_to_path);
}
else
#endif
{
if (fflush(out) != 0 ||
fsync(fileno(out)) != 0 ||
fclose(out))
{
unlink(to_path_p);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path, strerror(errno));
}
}
if (fclose(in))
{
unlink(to_path_p);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno));
}
/* update file permission. */
copy_meta(from_path, to_path_p, true);
}
/*
* Copy WAL segment from archive catalog to pgdata with possible decompression.
*/
void
get_wal_file(const char *from_path, const char *to_path)
{
FILE *in = NULL;
FILE *out;
char buf[XLOG_BLCKSZ];
const char *from_path_p = from_path;
bool is_decompress = false;
#ifdef HAVE_LIBZ
char gz_from_path[MAXPGPATH];
gzFile gz_in = NULL;
#endif
/* open file for read */
in = fopen(from_path, "r");
if (in == NULL)
{
/* maybe deleted, it's not error */
if (errno == ENOENT)
elog(ERROR, "cannot open source WAL file \"%s\": %s", from_path,
strerror(errno));
#ifdef HAVE_LIBZ
/*
* Maybe we need to decompress the file. Check it with .gz
* extension.
*/
snprintf(gz_from_path, sizeof(gz_from_path), "%s.gz", from_path);
gz_in = gzopen(gz_from_path, "rb");
if (gz_in == NULL)
{
if (errno == ENOENT)
{
/* There is no compressed file too, raise an error below */
}
/* Cannot open compressed file for some reason */
else
elog(ERROR, "Cannot open compressed WAL file \"%s\": %s",
gz_from_path, strerror(errno));
}
else
{
/* Found compressed file */
is_decompress = true;
from_path_p = gz_from_path;
}
#endif
/* Didn't find compressed file */
if (!is_decompress)
elog(ERROR, "Cannot open source WAL file \"%s\": %s",
from_path, strerror(errno));
}
/* open backup file for write */
out = fopen(to_path, "w");
if (out == NULL)
{
int errno_tmp = errno;
fclose(in);
elog(ERROR, "cannot open destination file \"%s\": %s",
to_path, strerror(errno_tmp));
}
/* stat source file to change mode of destination file */
if (fstat(fileno(in), &st) == -1)
{
fclose(in);
fclose(out);
elog(ERROR, "cannot stat \"%s\": %s", from_path,
strerror(errno));
}
if (st.st_size > XLOG_SEG_SIZE)
elog(ERROR, "Unexpected wal file size %s : %ld", from_path, st.st_size);
elog(ERROR, "Cannot open destination WAL file \"%s\": %s",
to_path, strerror(errno));
/* copy content */
for (;;)
{
if ((read_len = fread(buf, 1, sizeof(buf), in)) != sizeof(buf))
break;
size_t read_len = 0;
if (fwrite(buf, 1, read_len, out) != read_len)
#ifdef HAVE_LIBZ
if (is_decompress)
{
errno_tmp = errno;
/* oops */
fclose(in);
fclose(out);
elog(ERROR, "cannot write to \"%s\": %s", to_path,
strerror(errno_tmp));
read_len = gzread(gz_in, buf, sizeof(buf));
if (read_len != sizeof(buf) && !gzeof(gz_in))
{
unlink(to_path);
elog(ERROR, "Cannot read compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in));
}
}
}
errno_tmp = errno;
if (!feof(in))
{
fclose(in);
fclose(out);
elog(ERROR, "cannot read backup mode file \"%s\": %s",
from_path, strerror(errno_tmp));
}
/* copy odd part */
if (read_len > 0)
{
if (fwrite(buf, 1, read_len, out) != read_len)
else
#endif
{
errno_tmp = errno;
/* oops */
fclose(in);
fclose(out);
elog(ERROR, "cannot write to \"%s\": %s", to_path,
strerror(errno_tmp));
read_len = fread(buf, 1, sizeof(buf), in);
if (ferror(in))
{
unlink(to_path);
elog(ERROR, "Cannot read source WAL file \"%s\": %s",
from_path, strerror(errno));
}
}
}
if (read_len > 0)
{
if (fwrite(buf, 1, read_len, out) != read_len)
{
unlink(to_path);
elog(ERROR, "Cannot write to WAL file \"%s\": %s", to_path,
strerror(errno));
}
}
/* update file permission. */
if (chmod(to_path, st.st_mode) == -1)
{
errno_tmp = errno;
fclose(in);
fclose(out);
elog(ERROR, "cannot change mode of \"%s\": %s", to_path,
strerror(errno_tmp));
/* Check for EOF */
#ifdef HAVE_LIBZ
if (is_decompress)
{
if (gzeof(gz_in) || read_len == 0)
break;
}
else
#endif
{
if (feof(in) || read_len == 0)
break;
}
}
if (fflush(out) != 0 ||
fsync(fileno(out)) != 0 ||
fclose(out))
elog(ERROR, "cannot write \"%s\": %s", to_path, strerror(errno));
fclose(in);
{
unlink(to_path);
elog(ERROR, "Cannot write WAL file \"%s\": %s",
to_path, strerror(errno));
}
#ifdef HAVE_LIBZ
if (is_decompress)
{
if (gzclose(gz_in) != 0)
{
unlink(to_path);
elog(ERROR, "Cannot close compressed WAL file \"%s\": %s",
gz_from_path, get_gz_error(gz_in));
}
elog(INFO, "WAL file decompressed from \"%s\"", gz_from_path);
}
else
#endif
{
if (fclose(in))
{
unlink(to_path);
elog(ERROR, "Cannot close source WAL file \"%s\": %s",
from_path, strerror(errno));
}
}
/* update file permission. */
copy_meta(from_path_p, to_path, true);
}
/*

View File

@ -80,7 +80,6 @@ static char *pgdata_exclude_files[] =
NULL
};
pgFile *pgFileNew(const char *path, bool omit_symlink);
static int BlackListCompare(const void *str1, const void *str2);
static void dir_list_file_internal(parray *files, const char *root,
@ -947,3 +946,19 @@ dir_is_empty(const char *path)
return true;
}
/*
* Return true if the path is a existing regular file.
*/
bool
fileExists(const char *path)
{
struct stat buf;
if (stat(path, &buf) == -1 && errno == ENOENT)
return false;
else if (!S_ISREG(buf.st_mode))
return false;
else
return true;
}

View File

@ -18,6 +18,8 @@ static void help_set_config(void);
static void help_show_config(void);
static void help_add_instance(void);
static void help_del_instance(void);
static void help_archive_push(void);
static void help_archive_get(void);
void
help_command(char *command)
@ -42,6 +44,10 @@ help_command(char *command)
help_add_instance();
else if (strcmp(command, "del-instance") == 0)
help_del_instance();
else if (strcmp(command, "archive-push") == 0)
help_archive_push();
else if (strcmp(command, "archive-get") == 0)
help_archive_get();
else if (strcmp(command, "--help") == 0
|| strcmp(command, "help") == 0
|| strcmp(command, "-?") == 0
@ -117,6 +123,16 @@ help_pg_probackup(void)
printf(_("\n %s del-instance -B backup-dir\n"), PROGRAM_NAME);
printf(_(" --instance=instance_name\n"));
printf(_("\n %s archive-push -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" [--compress [--compress-level=compress-level]]\n"));
printf(_(" [--overwrite]\n"));
printf(_("\n %s archive-get -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
if ((PROGRAM_URL || PROGRAM_EMAIL))
{
printf("\n");
@ -308,7 +324,7 @@ help_set_config(void)
printf(_("\n Replica options:\n"));
printf(_(" --master-db=db_name database to connect to master\n"));
printf(_(" --master-host=host_name database server host of master\n"));
printf(_(" --master-port=port=port database server port of master\n"));
printf(_(" --master-port=port database server port of master\n"));
printf(_(" --master-user=user_name user name to connect to master\n"));
printf(_(" --replica-timeout=timeout wait timeout for WAL segment streaming through replication\n"));
}
@ -342,3 +358,39 @@ help_del_instance(void)
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance to delete\n"));
}
static void
help_archive_push(void)
{
printf(_("\n %s archive-push -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" [--compress [--compress-level=compress-level]]\n\n"));
printf(_(" [--overwrite]\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance to delete\n"));
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" relative path name of the WAL file on the server\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" name of the WAL file to retrieve from the server\n"));
printf(_(" --compress compress WAL file during archiving\n"));
printf(_(" --compress-level=compress-level\n"));
printf(_(" level of compression [0-9]\n"));
printf(_(" --overwrite overwrite archived WAL file\n"));
}
static void
help_archive_get(void)
{
printf(_("\n %s archive-get -B backup-dir --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance to delete\n"));
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" relative destination path name of the WAL file on the server\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" name of the WAL file to retrieve from the archive\n"));
}

View File

@ -20,6 +20,10 @@
#include "catalog/storage_xlog.h"
#include "access/transam.h"
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
/*
* RmgrNames is an array of resource manager names, to make error messages
* a bit nicer.
@ -81,10 +85,15 @@ typedef struct xl_xact_abort
static void extractPageInfo(XLogReaderState *record);
static bool getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime);
static int xlogreadfd = -1;
static int xlogreadfd = -1;
static XLogSegNo xlogreadsegno = -1;
static char xlogfpath[MAXPGPATH];
static bool xlogexists = false;
static char xlogfpath[MAXPGPATH];
static bool xlogexists = false;
#ifdef HAVE_LIBZ
static gzFile gz_xlogread = NULL;
static char gz_xlogfpath[MAXPGPATH];
#endif
typedef struct XLogPageReadPrivate
{
@ -576,6 +585,24 @@ wal_contains_lsn(const char *archivedir, XLogRecPtr target_lsn,
return res;
}
#ifdef HAVE_LIBZ
/*
* Show error during work with compressed file
*/
static const char *
get_gz_error(gzFile gzf)
{
int errnum;
const char *errmsg;
errmsg = gzerror(gzf, &errnum);
if (errnum == Z_ERRNO)
return strerror(errno);
else
return errmsg;
}
#endif
/* XLogreader callback function, to read a WAL page */
static int
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
@ -591,16 +618,27 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
* See if we need to switch to a new segment because the requested record
* is not in the currently open one.
*/
if (xlogreadfd >= 0 && !XLByteInSeg(targetPagePtr, xlogreadsegno))
if (!XLByteInSeg(targetPagePtr, xlogreadsegno))
{
close(xlogreadfd);
xlogreadfd = -1;
xlogexists = false;
if (xlogreadfd >= 0)
{
close(xlogreadfd);
xlogreadfd = -1;
xlogexists = false;
}
#ifdef HAVE_LIBZ
else if (gz_xlogread != NULL)
{
gzclose(gz_xlogread);
gz_xlogread = NULL;
xlogexists = false;
}
#endif
}
XLByteToSeg(targetPagePtr, xlogreadsegno);
if (xlogreadfd < 0)
if (!xlogexists)
{
char xlogfname[MAXFNAMELEN];
@ -610,42 +648,84 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
if (fileExists(xlogfpath))
{
elog(LOG, "opening WAL segment \"%s\"", xlogfpath);
elog(LOG, "Opening WAL segment \"%s\"", xlogfpath);
xlogexists = true;
xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
if (xlogreadfd < 0)
{
elog(WARNING, "could not open WAL segment \"%s\": %s",
elog(WARNING, "Could not open WAL segment \"%s\": %s",
xlogfpath, strerror(errno));
return -1;
}
}
/* Exit without error if WAL segment doesn't exist */
#ifdef HAVE_LIBZ
/* Try to open compressed WAL segment */
else
{
snprintf(gz_xlogfpath, sizeof(gz_xlogfpath), "%s.gz", xlogfpath);
if (fileExists(gz_xlogfpath))
{
elog(LOG, "Opening compressed WAL segment \"%s\"", gz_xlogfpath);
xlogexists = true;
gz_xlogread = gzopen(gz_xlogfpath, "rb");
if (gz_xlogread == NULL)
{
elog(WARNING, "Could not open compressed WAL segment \"%s\": %s",
gz_xlogfpath, strerror(errno));
return -1;
}
}
}
#endif
/* Exit without error if WAL segment doesn't exist */
if (!xlogexists)
return -1;
}
/*
* At this point, we have the right segment open.
*/
Assert(xlogreadfd != -1);
Assert(xlogexists);
/* Read the requested page */
if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
if (xlogreadfd != -1)
{
elog(WARNING, "could not seek in file \"%s\": %s", xlogfpath,
strerror(errno));
return -1;
}
if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
{
elog(WARNING, "Could not seek in WAL segment \"%s\": %s",
xlogfpath, strerror(errno));
return -1;
}
if (read(xlogreadfd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
elog(WARNING, "could not read from file \"%s\": %s",
xlogfpath, strerror(errno));
return -1;
if (read(xlogreadfd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
elog(WARNING, "Could not read from WAL segment \"%s\": %s",
xlogfpath, strerror(errno));
return -1;
}
}
#ifdef HAVE_LIBZ
else
{
if (gzseek(gz_xlogread, (z_off_t) targetPageOff, SEEK_SET) == -1)
{
elog(WARNING, "Could not seek in compressed WAL segment \"%s\": %s",
gz_xlogfpath, get_gz_error(gz_xlogread));
return -1;
}
if (gzread(gz_xlogread, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
elog(WARNING, "Could not read from compressed WAL segment \"%s\": %s",
gz_xlogfpath, get_gz_error(gz_xlogread));
return -1;
}
}
#endif
*pageTLI = private->tli;
return XLOG_BLCKSZ;

View File

@ -85,6 +85,7 @@ uint64 system_identifier = 0;
/* archive push options */
static char *wal_file_path;
static char *wal_file_name;
static bool file_overwrite = false;
/* current settings */
pgBackup current;
@ -161,6 +162,7 @@ static pgut_option options[] =
/* archive-push options */
{ 's', 160, "wal-file-path", &wal_file_path, SOURCE_CMDLINE },
{ 's', 161, "wal-file-name", &wal_file_name, SOURCE_CMDLINE },
{ 'b', 162, "overwrite", &file_overwrite, SOURCE_CMDLINE },
{ 0 }
};
@ -403,11 +405,14 @@ main(int argc, char *argv[])
if (compress_level < 0 || compress_level > 9)
elog(ERROR, "--compress-level value must be in the range from 0 to 9");
if (compress_level == 0)
compress_alg = NOT_DEFINED_COMPRESS;
/* do actual operation */
switch (backup_subcmd)
{
case ARCHIVE_PUSH:
return do_archive_push(wal_file_path, wal_file_name);
return do_archive_push(wal_file_path, wal_file_name, file_overwrite);
case ARCHIVE_GET:
return do_archive_get(wal_file_path, wal_file_name);
case ADD_INSTANCE:

View File

@ -256,44 +256,44 @@ typedef union DataPage
sscanf(data, "%X/%X", xlogid, xrecoff)
/* directory options */
extern char *backup_path;
extern char backup_instance_path[MAXPGPATH];
extern char *pgdata;
extern char arclog_path[MAXPGPATH];
extern char *backup_path;
extern char backup_instance_path[MAXPGPATH];
extern char *pgdata;
extern char arclog_path[MAXPGPATH];
/* common options */
extern int num_threads;
extern bool stream_wal;
extern bool progress;
extern int num_threads;
extern bool stream_wal;
extern bool progress;
#if PG_VERSION_NUM >= 100000
/* In pre-10 'replication_slot' is defined in receivelog.h */
extern char *replication_slot;
extern char *replication_slot;
#endif
/* backup options */
extern bool smooth_checkpoint;
extern uint32 archive_timeout;
extern bool from_replica;
extern bool is_remote_backup;
extern bool smooth_checkpoint;
extern uint32 archive_timeout;
extern bool from_replica;
extern bool is_remote_backup;
extern const char *master_db;
extern const char *master_host;
extern const char *master_port;
extern const char *master_user;
extern uint32 replica_timeout;
extern uint32 replica_timeout;
/* delete options */
extern bool delete_wal;
extern bool delete_expired;
extern bool apply_to_all;
extern bool force_delete;
extern bool delete_wal;
extern bool delete_expired;
extern bool apply_to_all;
extern bool force_delete;
/* retention options */
extern uint32 retention_redundancy;
extern uint32 retention_window;
extern uint32 retention_redundancy;
extern uint32 retention_window;
/* compression options */
extern CompressAlg compress_alg;
extern int compress_level;
extern int compress_level;
extern bool compress_shortcut;
#define DEFAULT_COMPRESS_LEVEL 6
@ -316,7 +316,6 @@ extern const char *pgdata_exclude_dir[];
extern int do_backup(time_t start_time);
extern BackupMode parse_backup_mode(const char *value);
extern const char *deparse_backup_mode(BackupMode mode);
extern bool fileExists(const char *path);
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
BlockNumber blkno);
@ -343,7 +342,8 @@ extern int do_init(void);
extern int do_add_instance(void);
/* in archive.c */
extern int do_archive_push(char *wal_file_path, char *wal_file_name);
extern int do_archive_push(char *wal_file_path, char *wal_file_name,
bool overwrite);
extern int do_archive_get(char *wal_file_path, char *wal_file_name);
@ -409,6 +409,8 @@ extern parray *dir_read_file_list(const char *root, const char *file_txt);
extern int dir_create_dir(const char *path, mode_t mode);
extern bool dir_is_empty(const char *path);
extern bool fileExists(const char *path);
extern pgFile *pgFileNew(const char *path, bool omit_symlink);
extern pgFile *pgFileInit(const char *path);
extern void pgFileDelete(pgFile *file);
@ -427,7 +429,9 @@ extern void restore_data_file(const char *from_root, const char *to_root,
pgFile *file, pgBackup *backup);
extern bool copy_file(const char *from_root, const char *to_root,
pgFile *file);
extern void copy_wal_file(const char *from_root, const char *to_root);
extern void push_wal_file(const char *from_path, const char *to_path,
bool is_compress, bool overwrite);
extern void get_wal_file(const char *from_path, const char *to_path);
extern bool calc_file_checksum(pgFile *file);