1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-24 08:52:38 +02:00

[Issue #174] archive-push improvements: batching, multi-thread support, checksum computation on remote agent, use O_EXCL flag, --no-sync flag support

This commit is contained in:
Grigory Smolkin 2020-03-25 00:19:46 +03:00
parent 943548c857
commit a196073944
14 changed files with 1303 additions and 408 deletions

View File

@ -15,9 +15,9 @@ OBJS += src/pg_crc.o src/datapagemap.o src/receivelog.o src/streamutil.o \
EXTRA_CLEAN = src/pg_crc.c src/datapagemap.c src/datapagemap.h \
src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h \
src/xlogreader.c
src/xlogreader.c src/instr_time.h
INCLUDES = src/datapagemap.h src/streamutil.h src/receivelog.h
INCLUDES = src/datapagemap.h src/streamutil.h src/receivelog.h src/instr_time.h
ifdef USE_PGXS
PG_CONFIG = pg_config
@ -60,6 +60,8 @@ all: checksrcdir $(INCLUDES);
$(PROGRAM): $(OBJS)
src/instr_time.h: $(top_srcdir)/src/include/portability/instr_time.h
rm -f $@ && $(LN_S) $(srchome)/src/include/portability/instr_time.h $@
src/datapagemap.c: $(top_srcdir)/src/bin/pg_rewind/datapagemap.c
rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_rewind/datapagemap.c $@
src/datapagemap.h: $(top_srcdir)/src/bin/pg_rewind/datapagemap.h

View File

@ -131,7 +131,6 @@ doc/src/sgml/pgprobackup.sgml
<arg choice="plain"><option>archive-push</option></arg>
<arg choice="plain"><option>-B</option> <replaceable>backup_dir</replaceable></arg>
<arg choice="plain"><option>--instance</option> <replaceable>instance_name</replaceable></arg>
<arg choice="plain"><option>--wal-file-path</option> <replaceable>wal_file_path</replaceable></arg>
<arg choice="plain"><option>--wal-file-name</option> <replaceable>wal_file_name</replaceable></arg>
<arg rep="repeat"><replaceable>option</replaceable></arg>
</cmdsynopsis>
@ -786,7 +785,7 @@ ALTER ROLE backup WITH REPLICATION;
parameter, as follows:
</para>
<programlisting>
archive_command = '<replaceable>install_dir</replaceable>/pg_probackup archive-push -B <replaceable>backup_dir</replaceable> --instance <replaceable>instance_name</replaceable> --wal-file-path=%p --wal-file-name=%f [<replaceable>remote_options</replaceable>]'
archive_command = '<replaceable>install_dir</replaceable>/pg_probackup archive-push -B <replaceable>backup_dir</replaceable> --instance <replaceable>instance_name</replaceable> --wal-file-name=%f [<replaceable>remote_options</replaceable>]'
</programlisting>
</listitem>
</itemizedlist>
@ -3934,9 +3933,12 @@ pg_probackup delete -B <replaceable>backup_dir</replaceable> --instance <replace
<title>archive-push</title>
<programlisting>
pg_probackup archive-push -B <replaceable>backup_dir</replaceable> --instance <replaceable>instance_name</replaceable>
--wal-file-path=<replaceable>wal_file_path</replaceable> --wal-file-name=<replaceable>wal_file_name</replaceable>
[--help] [--compress] [--compress-algorithm=<replaceable>compression_algorithm</replaceable>]
[--compress-level=<replaceable>compression_level</replaceable>] [--overwrite]
--wal-file-name=<replaceable>wal_file_name</replaceable>
[--help] [--no-sync] [--compress] [--no-ready-rename] [--overwrite]
[-j <replaceable>num_threads</replaceable>] [--batch-size=<replaceable>batch_size</replaceable>]
[--archive-timeout=<replaceable>timeout</replaceable>]
[--compress-algorithm=<replaceable>compression_algorithm</replaceable>]
[--compress-level=<replaceable>compression_level</replaceable>]
[<replaceable>remote_options</replaceable>] [<replaceable>logging_options</replaceable>]
</programlisting>
<para>
@ -3961,13 +3963,24 @@ pg_probackup archive-push -B <replaceable>backup_dir</replaceable> --instance <r
with the <option>--overwrite</option> flag.
</para>
<para>
The files are copied to a temporary file with the
<literal>.part</literal> suffix. After the copy is
done, atomic rename is performed. This algorithm ensures that a
failed <command>archive-push</command> will not stall continuous archiving and
that concurrent archiving from multiple sources into a single
WAL archive have no risk of archive corruption. WAL segments copied to
the archive are synced to disk.
Every file is copied to a temporary file with the
<literal>.part</literal> suffix. If the temporary file already
exists, <application>pg_probackup</application> will wait
<option>archive_timeout</option> seconds before discarding it.
After the copy is done, atomic rename is performed.
This algorithm ensures that a failed <command>archive-push</command>
will not stall continuous archiving and that concurrent archiving from
multiple sources into a single WAL archive have no risk of archive
corruption.
</para>
<para>
To speed up archiving, especially in remote mode, <command>archive-push</command>
can be run on multiple threads using <option>-j num_threads</option> option.
Files can also be copied in batches using option <option>--batch-size</option>.
</para>
<para>
WAL segments copied to the archive are synced to disk unless
<option>--no-sync</option> flag is used.
</para>
<para>
You can use <command>archive-push</command> in the
@ -4073,7 +4086,8 @@ pg_probackup archive-get -B <replaceable>backup_dir</replaceable> --instance <re
<para>
Sets the number of parallel threads for <command>backup</command>,
<command>restore</command>, <command>merge</command>,
<command>validate</command>, and <command>checkdb</command> processes.
<command>validate</command>, <command>checkdb</command> and
<command>archive-push</command> processes.
</para>
</listitem>
</varlistentry>
@ -4732,6 +4746,48 @@ pg_probackup archive-get -B <replaceable>backup_dir</replaceable> --instance <re
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--batch-size=batch_size</option></term>
<listitem>
<para>
Sets the maximum number of files to be copied into archive by signle
<command>archive-push</command> process.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--archive-timeout=<replaceable>wait_time</replaceable></option></term>
<listitem>
<para>
Sets the timeout for considering existing <literal>.part</literal> file to be stale. By default <application>pg_probackup</application> waits 300 seconds.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--no-ready-rename</option></term>
<listitem>
<para>
Do not rename status files in <literal>archive_status</literal> directory.
This option should be used only if <parameter>archive_command</parameter>
contain multiple commands.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--no-sync</option></term>
<listitem>
<para>
Do not sync copied WAL files to disk. You can use this flag to speed
up archiving process. Using this flag can result in WAL archive
corruption in case of operating system or hardware crash.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</refsect3>

File diff suppressed because it is too large Load Diff

View File

@ -803,7 +803,7 @@ backup_non_data_file(pgFile *file, pgFile *prev_file,
file->mtime <= parent_backup_time)
{
file->crc = fio_get_crc32(from_fullpath, FIO_DB_HOST);
file->crc = fio_get_crc32(from_fullpath, FIO_DB_HOST, false);
/* ...and checksum is the same... */
if (EQ_TRADITIONAL_CRC32(file->crc, prev_file->crc))
@ -1069,7 +1069,7 @@ restore_non_data_file_internal(FILE *in, FILE *out, pgFile *file,
break;
if (read_len < 0)
elog(ERROR, "Cannot read backup mode file \"%s\": %s",
elog(ERROR, "Cannot read backup file \"%s\": %s",
from_fullpath, strerror(errno));
if (fio_fwrite(out, buf, read_len) != read_len)

View File

@ -315,6 +315,72 @@ pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok)
return crc;
}
/*
* Read the local file to compute its CRC.
* We cannot make decision about file decompression because
* user may ask to backup already compressed files and we should be
* obvious about it.
*/
pg_crc32
pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok)
{
gzFile fp;
pg_crc32 crc = 0;
char buf[STDIO_BUFSIZE];
int len = 0;
int err;
INIT_FILE_CRC32(use_crc32c, crc);
/* open file in binary read mode */
fp = gzopen(file_path, PG_BINARY_R);
if (fp == NULL)
{
if (errno == ENOENT)
{
if (missing_ok)
{
FIN_FILE_CRC32(use_crc32c, crc);
return crc;
}
}
elog(ERROR, "Cannot open file \"%s\": %s",
file_path, strerror(errno));
}
/* calc CRC of file */
for (;;)
{
if (interrupted)
elog(ERROR, "interrupted during CRC calculation");
len = gzread(fp, &buf, sizeof(buf));
if (len <= 0)
{
/* we either run into eof or error */
if (gzeof(fp))
break;
else
{
const char *err_str = NULL;
err_str = gzerror(fp, &err);
elog(ERROR, "Cannot read from compressed file %s", err_str);
}
}
/* update CRC */
COMP_FILE_CRC32(use_crc32c, crc, buf, len);
}
FIN_FILE_CRC32(use_crc32c, crc);
gzclose(fp);
return crc;
}
void
pgFileFree(void *file)
{

View File

@ -214,10 +214,11 @@ help_pg_probackup(void)
printf(_(" [--help]\n"));
printf(_("\n %s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" [--overwrite]\n"));
printf(_(" [--compress]\n"));
printf(_(" [-j num-threads] [--batch-size batch_size]\n"));
printf(_(" [--archive-timeout=timeout]\n"));
printf(_(" [--no-ready-rename] [--no-sync]\n"));
printf(_(" [--overwrite] [--compress]\n"));
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
printf(_(" [--compress-level=compress-level]\n"));
printf(_(" [--remote-proto] [--remote-host]\n"));
@ -868,10 +869,11 @@ static void
help_archive_push(void)
{
printf(_("\n%s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" [--overwrite]\n"));
printf(_(" [--compress]\n"));
printf(_(" [-j num-threads] [--batch-size batch_size]\n"));
printf(_(" [--archive-timeout=timeout]\n"));
printf(_(" [--no-ready-rename] [--no-sync]\n"));
printf(_(" [--overwrite] [--compress]\n"));
printf(_(" [--compress-algorithm=compress-algorithm]\n"));
printf(_(" [--compress-level=compress-level]\n"));
printf(_(" [--remote-proto] [--remote-host]\n"));
@ -880,10 +882,13 @@ help_archive_push(void)
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance to delete\n"));
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" relative path name of the WAL file on the server\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" name of the WAL file to retrieve from the server\n"));
printf(_(" name of the file to copy into WAL archive\n"));
printf(_(" -j, --threads=NUM number of parallel threads\n"));
printf(_(" --batch-size=NUM number of files to be copied\n"));
printf(_(" --archive-timeout=timeout wait timeout before discarding stale temp file(default: 5min)\n"));
printf(_(" --no-ready-rename do not rename '.ready' files in 'archive_status' directory\n"));
printf(_(" --no-sync do not sync WAL file to disk\n"));
printf(_(" --overwrite overwrite archived WAL file\n"));
printf(_("\n Compression options:\n"));

View File

@ -125,9 +125,11 @@ bool compress_shortcut = false;
char *instance_name;
/* archive push options */
int batch_size = 1;
static char *wal_file_path;
static char *wal_file_name;
static bool file_overwrite = false;
static bool file_overwrite = false;
static bool no_ready_rename = false;
/* show options */
ShowFormat show_format = SHOW_PLAIN;
@ -172,7 +174,6 @@ static ConfigOption cmd_options[] =
{ 'f', 'b', "backup-mode", opt_backup_mode, SOURCE_CMD_STRICT },
{ 'b', 'C', "smooth-checkpoint", &smooth_checkpoint, SOURCE_CMD_STRICT },
{ 's', 'S', "slot", &replication_slot, SOURCE_CMD_STRICT },
{ 's', 'S', "primary-slot-name",&replication_slot, SOURCE_CMD_STRICT },
{ 'b', 181, "temp-slot", &temp_slot, SOURCE_CMD_STRICT },
{ 'b', 182, "delete-wal", &delete_wal, SOURCE_CMD_STRICT },
{ 'b', 183, "delete-expired", &delete_expired, SOURCE_CMD_STRICT },
@ -189,13 +190,14 @@ static ConfigOption cmd_options[] =
{ 'f', 155, "external-mapping", opt_externaldir_map, SOURCE_CMD_STRICT },
{ 's', 141, "recovery-target-name", &target_name, SOURCE_CMD_STRICT },
{ 's', 142, "recovery-target-action", &target_action, SOURCE_CMD_STRICT },
{ 'b', 'R', "restore-as-replica", &restore_as_replica, SOURCE_CMD_STRICT },
{ 'b', 143, "no-validate", &no_validate, SOURCE_CMD_STRICT },
{ 'b', 154, "skip-block-validation", &skip_block_validation, SOURCE_CMD_STRICT },
{ 'b', 156, "skip-external-dirs", &skip_external_dirs, SOURCE_CMD_STRICT },
{ 'f', 158, "db-include", opt_datname_include_list, SOURCE_CMD_STRICT },
{ 'f', 159, "db-exclude", opt_datname_exclude_list, SOURCE_CMD_STRICT },
{ 'b', 'R', "restore-as-replica", &restore_as_replica, SOURCE_CMD_STRICT },
{ 's', 160, "primary-conninfo", &primary_conninfo, SOURCE_CMD_STRICT },
{ 's', 'S', "primary-slot-name",&replication_slot, SOURCE_CMD_STRICT },
/* checkdb options */
{ 'b', 195, "amcheck", &need_amcheck, SOURCE_CMD_STRICT },
{ 'b', 196, "heapallindexed", &heapallindexed, SOURCE_CMD_STRICT },
@ -216,9 +218,11 @@ static ConfigOption cmd_options[] =
{ 's', 150, "wal-file-path", &wal_file_path, SOURCE_CMD_STRICT },
{ 's', 151, "wal-file-name", &wal_file_name, SOURCE_CMD_STRICT },
{ 'b', 152, "overwrite", &file_overwrite, SOURCE_CMD_STRICT },
{ 'b', 153, "no-ready-rename", &no_ready_rename, SOURCE_CMD_STRICT },
{ 'i', 162, "batch-size", &batch_size, SOURCE_CMD_STRICT },
/* show options */
{ 'f', 153, "format", opt_show_format, SOURCE_CMD_STRICT },
{ 'b', 161, "archive", &show_archive, SOURCE_CMD_STRICT },
{ 'f', 163, "format", opt_show_format, SOURCE_CMD_STRICT },
{ 'b', 164, "archive", &show_archive, SOURCE_CMD_STRICT },
/* set-backup options */
{ 'I', 170, "ttl", &ttl, SOURCE_CMD_STRICT, SOURCE_DEFAULT, 0, OPTION_UNIT_S, option_get_value},
{ 's', 171, "expire-time", &expire_time_string, SOURCE_CMD_STRICT },
@ -746,14 +750,18 @@ main(int argc, char *argv[])
if (num_threads < 1)
num_threads = 1;
if (batch_size < 1)
batch_size = 1;
compress_init();
/* do actual operation */
switch (backup_subcmd)
{
case ARCHIVE_PUSH_CMD:
return do_archive_push(&instance_config, wal_file_path,
wal_file_name, file_overwrite);
do_archive_push(&instance_config, wal_file_path, wal_file_name,
batch_size, file_overwrite, no_sync, no_ready_rename);
break;
case ARCHIVE_GET_CMD:
return do_archive_get(&instance_config,
wal_file_path, wal_file_name);

View File

@ -67,7 +67,6 @@ extern const char *PROGRAM_EMAIL;
#define DATABASE_MAP "database_map"
/* Timeout defaults */
#define PARTIAL_WAL_TIMER 60
#define ARCHIVE_TIMEOUT_DEFAULT 300
#define REPLICA_TIMEOUT_DEFAULT 300
@ -573,6 +572,9 @@ typedef struct BackupPageHeader
#define GetXLogSegNoFromScrath(logSegNo, log, seg, wal_segsz_bytes) \
logSegNo = (uint64) log * XLogSegmentsPerXLogId(wal_segsz_bytes) + seg
#define GetXLogFromFileName(fname, tli, logSegNo, wal_segsz_bytes) \
XLogFromFileName(fname, tli, logSegNo, wal_segsz_bytes)
#else
#define GetXLogSegNo(xlrp, logSegNo, wal_segsz_bytes) \
XLByteToSeg(xlrp, logSegNo)
@ -589,6 +591,9 @@ typedef struct BackupPageHeader
#define GetXLogSegNoFromScrath(logSegNo, log, seg, wal_segsz_bytes) \
logSegNo = (uint64) log * XLogSegmentsPerXLogId + seg
#define GetXLogFromFileName(fname, tli, logSegNo, wal_segsz_bytes) \
XLogFromFileName(fname, tli, logSegNo)
#endif
#define IsSshProtocol() (instance_config.remote.host && strcmp(instance_config.remote.proto, "ssh") == 0)
@ -692,8 +697,9 @@ extern int do_init(void);
extern int do_add_instance(InstanceConfig *instance);
/* in archive.c */
extern int do_archive_push(InstanceConfig *instance, char *wal_file_path,
char *wal_file_name, bool overwrite);
extern void do_archive_push(InstanceConfig *instance, char *wal_file_path,
char *wal_file_name, int batch_size, bool overwrite,
bool no_sync, bool no_ready_rename);
extern int do_archive_get(InstanceConfig *instance, char *wal_file_path,
char *wal_file_name);
@ -846,6 +852,7 @@ extern void pgFileDelete(pgFile *file, const char *full_path);
extern void pgFileFree(void *file);
extern pg_crc32 pgFileGetCRC(const char *file_path, bool missing_ok, bool use_crc32c);
extern pg_crc32 pgFileGetCRCgz(const char *file_path, bool missing_ok, bool use_crc32c);
extern int pgFileCompareName(const void *f1, const void *f2);
extern int pgFileComparePath(const void *f1, const void *f2);
@ -939,7 +946,7 @@ extern int32 do_decompress(void* dst, size_t dst_size, void const* src, size_t
CompressAlg alg, const char **errormsg);
extern void pretty_size(int64 size, char *buf, size_t len);
extern void pretty_time_interval(int64 num_seconds, char *buf, size_t len);
extern void pretty_time_interval(double time, char *buf, size_t len);
extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo);
extern void check_system_identifiers(PGconn *conn, char *pgdata);

View File

@ -191,14 +191,18 @@ pretty_size(int64 size, char *buf, size_t len)
}
void
pretty_time_interval(int64 num_seconds, char *buf, size_t len)
pretty_time_interval(double time, char *buf, size_t len)
{
int seconds = 0;
int minutes = 0;
int hours = 0;
int days = 0;
int num_seconds = 0;
int milliseconds = 0;
int seconds = 0;
int minutes = 0;
int hours = 0;
int days = 0;
if (num_seconds <= 0)
num_seconds = (int) time;
if (time <= 0)
{
strncpy(buf, "0", len);
return;
@ -214,6 +218,7 @@ pretty_time_interval(int64 num_seconds, char *buf, size_t len)
num_seconds %= 60;
seconds = num_seconds;
milliseconds = (int)((time - (int) time) * 1000.0);
if (days > 0)
{
@ -233,7 +238,16 @@ pretty_time_interval(int64 num_seconds, char *buf, size_t len)
return;
}
snprintf(buf, len, "%ds", seconds);
if (seconds > 0)
{
if (milliseconds > 0)
snprintf(buf, len, "%ds:%dms", seconds, milliseconds);
else
snprintf(buf, len, "%ds", seconds);
return;
}
snprintf(buf, len, "%dms", milliseconds);
return;
}

View File

@ -340,7 +340,10 @@ int fio_open(char const* path, int mode, fio_location location)
hdr.cop = FIO_OPEN;
hdr.handle = i;
hdr.size = strlen(path) + 1;
hdr.arg = mode & ~O_EXCL;
hdr.arg = mode;
// hdr.arg = mode & ~O_EXCL;
// elog(INFO, "PATH: %s MODE: %i, %i", path, mode, O_EXCL);
// elog(INFO, "MODE: %i", hdr.arg);
fio_fdset |= 1 << i;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
@ -490,6 +493,7 @@ int fio_close(int fd)
fio_fdset &= ~(1 << hdr.handle);
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
/* Note, that file is closed without waiting for confirmation */
return 0;
}
@ -865,6 +869,8 @@ int fio_rename(char const* old_path, char const* new_path, fio_location location
IO_CHECK(fio_write_all(fio_stdout, old_path, old_path_len), old_path_len);
IO_CHECK(fio_write_all(fio_stdout, new_path, new_path_len), new_path_len);
//TODO: wait for confirmation.
return 0;
}
else
@ -916,7 +922,7 @@ int fio_sync(char const* path, fio_location location)
}
/* Get crc32 of file */
pg_crc32 fio_get_crc32(const char *file_path, fio_location location)
pg_crc32 fio_get_crc32(const char *file_path, fio_location location, bool decompress)
{
if (fio_is_remote(location))
{
@ -926,6 +932,10 @@ pg_crc32 fio_get_crc32(const char *file_path, fio_location location)
hdr.cop = FIO_GET_CRC32;
hdr.handle = -1;
hdr.size = path_len;
hdr.arg = 0;
if (decompress)
hdr.arg = 1;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, file_path, path_len), path_len);
@ -934,7 +944,12 @@ pg_crc32 fio_get_crc32(const char *file_path, fio_location location)
return crc;
}
else
return pgFileGetCRC(file_path, true, true);
{
if (decompress)
return pgFileGetCRCgz(file_path, true, true);
else
return pgFileGetCRC(file_path, true, true);
}
}
/* Remove file */
@ -1027,6 +1042,7 @@ typedef struct fioGZFile
Bytef buf[ZLIB_BUFFER_SIZE];
} fioGZFile;
/* On error returns NULL and errno should be checked */
gzFile
fio_gzopen(char const* path, char const* mode, int level, fio_location location)
{
@ -1037,6 +1053,7 @@ fio_gzopen(char const* path, char const* mode, int level, fio_location location)
memset(&gz->strm, 0, sizeof(gz->strm));
gz->eof = 0;
gz->errnum = Z_OK;
/* check if file opened for writing */
if (strcmp(mode, PG_BINARY_W) == 0) /* compress */
{
gz->strm.next_out = gz->buf;
@ -1049,14 +1066,12 @@ fio_gzopen(char const* path, char const* mode, int level, fio_location location)
if (rc == Z_OK)
{
gz->compress = 1;
if (fio_access(path, F_OK, location) == 0)
gz->fd = fio_open(path, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, location);
if (gz->fd < 0)
{
elog(LOG, "File %s exists", path);
free(gz);
errno = EEXIST;
return NULL;
}
gz->fd = fio_open(path, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, location);
}
}
else
@ -1069,21 +1084,27 @@ fio_gzopen(char const* path, char const* mode, int level, fio_location location)
{
gz->compress = 0;
gz->fd = fio_open(path, O_RDONLY | PG_BINARY, location);
if (gz->fd < 0)
{
free(gz);
return NULL;
}
}
}
if (rc != Z_OK)
{
free(gz);
return NULL;
elog(ERROR, "zlib internal error when opening file %s: %s",
path, gz->strm.msg);
}
return (gzFile)((size_t)gz + FIO_GZ_REMOTE_MARKER);
}
else
{
gzFile file;
/* check if file opened for writing */
if (strcmp(mode, PG_BINARY_W) == 0)
{
int fd = open(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FILE_PERMISSIONS);
int fd = open(path, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, FILE_PERMISSIONS);
if (fd < 0)
return NULL;
file = gzdopen(fd, mode);
@ -1143,7 +1164,8 @@ fio_gzread(gzFile f, void *buf, unsigned size)
{
gz->strm.next_in = gz->buf;
}
rc = fio_read(gz->fd, gz->strm.next_in + gz->strm.avail_in, gz->buf + ZLIB_BUFFER_SIZE - gz->strm.next_in - gz->strm.avail_in);
rc = fio_read(gz->fd, gz->strm.next_in + gz->strm.avail_in,
gz->buf + ZLIB_BUFFER_SIZE - gz->strm.next_in - gz->strm.avail_in);
if (rc > 0)
{
gz->strm.avail_in += rc;
@ -1772,7 +1794,10 @@ void fio_communicate(int in, int out)
break;
case FIO_GET_CRC32:
/* calculate crc32 for a file */
crc = pgFileGetCRC(buf, true, true);
if (hdr.arg == 1)
crc = pgFileGetCRCgz(buf, true, true);
else
crc = pgFileGetCRC(buf, true, true);
IO_CHECK(fio_write_all(out, &crc, sizeof(crc)), sizeof(crc));
break;
case FIO_DISCONNECT:

View File

@ -100,7 +100,7 @@ extern int fio_truncate(int fd, off_t size);
extern int fio_close(int fd);
extern void fio_disconnect(void);
extern int fio_sync(char const* path, fio_location location);
extern pg_crc32 fio_get_crc32(const char *file_path, fio_location location);
extern pg_crc32 fio_get_crc32(const char *file_path, fio_location location, bool decompress);
extern int fio_rename(char const* old_path, char const* new_path, fio_location location);
extern int fio_symlink(char const* target, char const* link_path, fio_location location);

View File

@ -220,7 +220,7 @@ bool launch_agent(void)
return false;
} else {
#endif
elog(LOG, "Spawn agent %d version %s", child_pid, PROGRAM_VERSION);
elog(LOG, "Start SSH client process, pid %d", child_pid);
SYS_CHECK(close(infd[1])); /* These are being used by the child */
SYS_CHECK(close(outfd[0]));
SYS_CHECK(close(errfd[1]));

View File

@ -281,7 +281,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
# @unittest.skip("skip")
def test_pgpro434_4(self):
"""
Check pg_stop_backup_timeout, needed backup_timeout
Check pg_stop_backup_timeout, libpq-timeout requested.
Fixed in commit d84d79668b0c139 and assert fixed by ptrack 1.7
"""
fname = self.id().split('.')[3]
@ -398,15 +398,11 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
log_content)
self.assertIn(
'INFO: pg_probackup archive-push from',
'pg_probackup push file',
log_content)
self.assertIn(
'ERROR: WAL segment ',
log_content)
self.assertIn(
'already exists.',
'WAL file already exists in archive with different checksum',
log_content)
self.assertNotIn(
@ -448,8 +444,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
base_dir=os.path.join(module_name, fname, 'node'),
set_replication=True,
initdb_params=['--data-checksums'],
pg_options={
'checkpoint_timeout': '30s'})
pg_options={'checkpoint_timeout': '30s'})
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
@ -487,9 +482,13 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
self.assertIn(
'DETAIL: The failed archive command was:', log_content)
self.assertIn(
'INFO: pg_probackup archive-push from', log_content)
'pg_probackup push file', log_content)
self.assertNotIn(
'WAL file already exists in archive with '
'different checksum, overwriting', log_content)
self.assertIn(
'{0}" already exists.'.format(filename), log_content)
'WAL file already exists in archive with '
'different checksum', log_content)
self.assertNotIn(
'pg_probackup archive-push completed successfully', log_content)
@ -497,7 +496,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
self.set_archiving(backup_dir, 'node', node, overwrite=True)
node.reload()
self.switch_wal_segment(node)
sleep(2)
sleep(5)
with open(log_file, 'r') as f:
log_content = f.read()
@ -505,6 +504,10 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
'pg_probackup archive-push completed successfully' in log_content,
'Expecting messages about successfull execution archive_command')
self.assertIn(
'WAL file already exists in archive with '
'different checksum, overwriting', log_content)
# Clean after yourself
self.del_test_dir(module_name, fname)
@ -520,7 +523,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
self.set_archiving(
backup_dir, 'node', node,
log_level='verbose', archive_timeout=60)
node.slow_start()
@ -579,12 +584,9 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
log_file = os.path.join(node.logs_dir, 'postgresql.log')
with open(log_file, 'r') as f:
log_content = f.read()
self.assertIn(
'Cannot open destination temporary WAL file',
log_content)
self.assertIn(
'Reusing stale destination temporary WAL file',
'Reusing stale temp WAL file',
log_content)
# Clean after yourself
@ -602,7 +604,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node, archive_timeout=60)
node.slow_start()
@ -905,8 +907,8 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
initdb_params=['--data-checksums'],
pg_options={
'checkpoint_timeout': '30s',
'archive_timeout': '10s'}
)
'archive_timeout': '10s'})
replica = self.make_simple_node(
base_dir=os.path.join(module_name, fname, 'replica'))
replica.cleanup()
@ -923,6 +925,8 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,10000) i")
master.pgbench_init(scale=5)
# TAKE FULL ARCHIVE BACKUP FROM MASTER
self.backup_node(backup_dir, 'master', master)
# GET LOGICAL CONTENT FROM MASTER
@ -1718,7 +1722,7 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node, log_level='verbose')
node.slow_start()
backup_id = self.backup_node(backup_dir, 'node', node)
@ -1734,6 +1738,8 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
node.slow_start()
node.pgbench_init(scale=2)
sleep(5)
show = self.show_archive(backup_dir)
timelines = show[0]['timelines']
@ -1761,6 +1767,146 @@ class ArchiveTest(ProbackupTest, unittest.TestCase):
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
# @unittest.expectedFailure
def test_archiving_and_slots(self):
"""
Check that archiving don`t break slot
guarantee.
"""
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
base_dir=os.path.join(module_name, fname, 'node'),
set_replication=True,
initdb_params=['--data-checksums'],
pg_options={
'autovacuum': 'off',
'checkpoint_timeout': '30s',
'max_wal_size': '64MB'})
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
self.set_archiving(backup_dir, 'node', node, log_level='verbose')
node.slow_start()
if self.get_version(node) < 100000:
pg_receivexlog_path = self.get_bin_path('pg_receivexlog')
else:
pg_receivexlog_path = self.get_bin_path('pg_receivewal')
# "pg_receivewal --create-slot --slot archive_slot --if-not-exists "
# "&& pg_receivewal --synchronous -Z 1 /tmp/wal --slot archive_slot --no-loop"
self.run_binary(
[
pg_receivexlog_path, '-p', str(node.port), '--synchronous',
'--create-slot', '--slot', 'archive_slot', '--if-not-exists'
])
node.pgbench_init(scale=10)
pg_receivexlog = self.run_binary(
[
pg_receivexlog_path, '-p', str(node.port), '--synchronous',
'-D', os.path.join(backup_dir, 'wal', 'node'),
'--no-loop', '--slot', 'archive_slot',
'-Z', '1'
], asynchronous=True)
if pg_receivexlog.returncode:
self.assertFalse(
True,
'Failed to start pg_receivexlog: {0}'.format(
pg_receivexlog.communicate()[1]))
sleep(2)
pg_receivexlog.kill()
backup_id = self.backup_node(backup_dir, 'node', node)
node.pgbench_init(scale=20)
exit(1)
# Clean after yourself
self.del_test_dir(module_name, fname)
def test_archive_push_sanity(self):
""""""
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
base_dir=os.path.join(module_name, fname, 'node'),
set_replication=True,
initdb_params=['--data-checksums'],
pg_options={
'archive_mode': 'on',
'archive_command': 'exit 1'})
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
node.slow_start()
node.pgbench_init(scale=50)
node.stop()
self.set_archiving(backup_dir, 'node', node)
os.remove(os.path.join(node.logs_dir, 'postgresql.log'))
node.slow_start()
self.backup_node(backup_dir, 'node', node)
with open(os.path.join(node.logs_dir, 'postgresql.log'), 'r') as f:
postgres_log_content = f.read()
# print(postgres_log_content)
# make sure that .backup file is not compressed
self.assertNotIn('.backup.gz', postgres_log_content)
self.assertNotIn('WARNING', postgres_log_content)
replica = self.make_simple_node(
base_dir=os.path.join(module_name, fname, 'replica'))
replica.cleanup()
self.restore_node(
backup_dir, 'node', replica,
data_dir=replica.data_dir, options=['-R'])
#self.set_archiving(backup_dir, 'replica', replica, replica=True)
self.set_auto_conf(replica, {'port': replica.port})
self.set_auto_conf(replica, {'archive_mode': 'always'})
self.set_auto_conf(replica, {'hot_standby': 'on'})
replica.slow_start(replica=True)
self.wait_until_replica_catch_with_master(node, replica)
node.pgbench_init(scale=5)
replica.promote()
replica.pgbench_init(scale=10)
with open(os.path.join(replica.logs_dir, 'postgresql.log'), 'r') as f:
replica_log_content = f.read()
# make sure that .partial file is not compressed
self.assertNotIn('.partial.gz', replica_log_content)
# make sure that .history file is not compressed
self.assertNotIn('.history.gz', replica_log_content)
self.assertNotIn('WARNING', replica_log_content)
output = self.show_archive(
backup_dir, 'node', as_json=False, as_text=True,
options=['--log-level-console=VERBOSE'])
self.assertNotIn('WARNING', output)
# Clean after yourself
self.del_test_dir(module_name, fname)
# TODO test with multiple not archived segments.
# important - switchpoint may be NullOffset LSN and not actually existing in archive to boot.
# so write WAL validation code accordingly

View File

@ -1131,7 +1131,8 @@ class ProbackupTest(object):
def set_archiving(
self, backup_dir, instance, node, replica=False,
overwrite=False, compress=False, old_binary=False):
overwrite=False, compress=False, old_binary=False,
log_level=False, archive_timeout=False):
# parse postgresql.auto.conf
options = {}
@ -1161,12 +1162,26 @@ class ProbackupTest(object):
if overwrite:
options['archive_command'] += '--overwrite '
options['archive_command'] += '--log-level-console=verbose '
options['archive_command'] += '-j 5 '
options['archive_command'] += '--batch-size 10 '
options['archive_command'] += '--no-sync '
if archive_timeout:
options['archive_command'] += '--archive-timeout={0} '.format(
archive_timeout)
if os.name == 'posix':
options['archive_command'] += '--wal-file-path=%p --wal-file-name=%f'
elif os.name == 'nt':
options['archive_command'] += '--wal-file-path="%p" --wal-file-name="%f"'
if log_level:
options['archive_command'] += ' --log-level-console={0}'.format(log_level)
options['archive_command'] += ' --log-level-file={0} '.format(log_level)
self.set_auto_conf(node, options)
def get_restore_command(self, backup_dir, instance, node):