1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-24 08:52:38 +02:00

Catchup command implementation (#392)

[ PR #392] New command "catchup" is added, it allows fallen-behind standby to "catch up" with master, or create standby from scratch without resorting to restore from backup

Co-authored-by: Grigory Smolkin <g.smolkin@postgrespro.ru>
Co-authored-by: anastasia <lubennikovaav@gmail.com>
Co-authored-by: Elena Indrupskaya <e.indrupskaya@postgrespro.ru>
This commit is contained in:
Mikhail A. Kulagin 2021-06-21 11:45:10 +03:00 committed by GitHub
parent b13d3b8c88
commit 7de728496d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 3146 additions and 102 deletions

View File

@ -7,7 +7,7 @@ OBJS = src/utils/configuration.o src/utils/json.o src/utils/logger.o \
OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \
src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \
src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \
src/util.o src/validate.o src/datapagemap.o
src/util.o src/validate.o src/datapagemap.o src/catchup.o
# borrowed files
OBJS += src/pg_crc.o src/receivelog.o src/streamutil.o \

View File

@ -143,6 +143,14 @@ doc/src/sgml/pgprobackup.sgml
<arg choice="plain"><option>--wal-file-name</option> <replaceable>wal_file_name</replaceable></arg>
<arg rep="repeat"><replaceable>option</replaceable></arg>
</cmdsynopsis>
<cmdsynopsis condition="12+">
<command>pg_probackup</command>
<arg choice="plain"><option>catchup</option></arg>
<arg choice="plain"><option>-b</option> <replaceable>catchup_mode</replaceable></arg>
<arg choice="plain"><option>--source-pgdata</option>=<replaceable>path_to_pgdata_on_remote_server</replaceable></arg>
<arg choice="plain"><option>--destination-pgdata</option>=<replaceable>path_to_local_dir</replaceable></arg>
<arg rep="repeat"><replaceable>option</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
@ -283,6 +291,11 @@ doc/src/sgml/pgprobackup.sgml
Partial restore: restoring only the specified databases.
</para>
</listitem>
<listitem condition="12+">
<para>
Catchup: cloning a <productname>PostgreSQL</productname> instance for a fallen-behind standby server to <quote>catch up</quote> with master.
</para>
</listitem>
</itemizedlist>
<para>
To manage backup data, <application>pg_probackup</application> creates a
@ -1076,7 +1089,8 @@ GRANT SELECT ON TABLE pg_catalog.pg_database TO backup;
mode: <xref linkend="pbk-add-instance"/>,
<xref linkend="pbk-backup"/>,
<xref linkend="pbk-restore"/>,
<xref linkend="pbk-archive-push"/>,
<phrase condition="12+"><xref linkend="pbk-catchup"/>,</phrase>
<xref linkend="pbk-archive-push"/>, and
<xref linkend="pbk-archive-get"/>.
</para>
</listitem>
@ -1431,6 +1445,7 @@ pg_probackup backup -B <replaceable>backup_dir</replaceable> --instance <replace
</para>
</refsect3>
</refsect2>
<refsect2 id="pbk-verifying-a-cluster">
<title>Performing Cluster Verification</title>
<para>
@ -1506,6 +1521,7 @@ pg_probackup checkdb --amcheck --skip-block-validation [<replaceable>connection_
higher cost of CPU, memory, and I/O consumption.
</para>
</refsect2>
<refsect2 id="pbk-validating-backups">
<title>Validating a Backup</title>
<para>
@ -2073,6 +2089,7 @@ pg_probackup restore -B <replaceable>backup_dir</replaceable> --instance <replac
<xref linkend="pbk-restore"/>,
<xref linkend="pbk-merge"/>,
<xref linkend="pbk-delete"/>,
<phrase condition="12+"><xref linkend="pbk-catchup"/>,</phrase>
<xref linkend="pbk-checkdb"/> and
<xref linkend="pbk-validate"/> processes can be
executed on several parallel threads. This can significantly
@ -3390,6 +3407,148 @@ pg_probackup delete -B <replaceable>backup_dir</replaceable> --instance <replace
</para>
</refsect2>
<refsect2 id="pbk-creating-backup-to-catchup" condition="12+">
<title>Cloning <productname>PostgreSQL</productname> Instance</title>
<para>
<application>pg_probackup</application> can create a copy of a <productname>PostgreSQL</productname>
instance directly, without using the backup catalog. This allows you
to add a new standby server in a parallel mode or to have a standby
server that has fallen behind <quote>catch up</quote> with master.
</para>
<para>
Cloning a <productname>PostgreSQL</productname> instance is different from other <application>pg_probackup</application>
operations:
<itemizedlist>
<listitem>
<para>
The backup catalog is not required.
</para>
</listitem>
<listitem>
<para>
STREAM WAL delivery mode is only supported.
</para>
</listitem>
<listitem>
<para>
Copying external directories
is not supported.
</para>
</listitem>
<listitem>
<para>
No SQL commands involving tablespaces, such as
<ulink url="https://postgrespro.com/docs/postgresql/current/sql-createtablespace"
><command>CREATE TABLESPACE</command></ulink
>/<ulink url="https://postgrespro.com/docs/postgresql/current/sql-droptablespace"
><command>DROP TABLESPACE</command></ulink>,
can be run simultaneously with <command>catchup</command>.
</para>
</listitem>
<listitem>
<para>
<command>catchup</command> takes configuration files, such as
<filename>postgresql.conf</filename>, <filename>postgresql.auto.conf</filename>,
or <filename>pg_hba.conf</filename>, from the source server and overwrites them
on the target server.
</para>
</listitem>
</itemizedlist>
</para>
<para>
Before cloning a <productname>PostgreSQL</productname> instance, set up the source database server as follows:
<itemizedlist>
<listitem>
<para>
<link linkend="pbk-configuring-the-database-cluster">Configure
the database cluster</link> for the instance to copy.
</para>
</listitem>
<listitem>
<para>
To copy from a remote server, <link linkend="pbk-configuring-the-remote-mode">configure the remote mode</link>.
</para>
</listitem>
<listitem>
<para>
To use the PTRACK catchup mode, <link linkend="pbk-setting-up-ptrack-backups">set up PTRACK backups</link>.
</para>
</listitem>
</itemizedlist>
</para>
<para>
To clone a <productname>PostgreSQL</productname> instance, ensure that the source
database server is running and accepting connections and
on the server with the destination database, run the following command:
</para>
<programlisting>
pg_probackup catchup -b <replaceable>catchup-mode</replaceable> --source-pgdata=<replaceable>path_to_pgdata_on_remote_server</replaceable> --destination-pgdata=<replaceable>path_to_local_dir</replaceable> --stream [<replaceable>connection_options</replaceable>] [<replaceable>remote_options</replaceable>]
</programlisting>
<para>
Where <replaceable>catchup_mode</replaceable> can take one of the
following values: <literal>FULL</literal>, <literal>DELTA</literal>, or <literal>PTRACK</literal>.
</para>
<itemizedlist spacing="compact">
<listitem>
<para>
FULL — creates a full copy of the <productname>PostgreSQL</productname> instance.
The destination directory must be empty for this mode.
</para>
</listitem>
<listitem>
<para>
DELTA — reads all data files in the data directory and
creates an incremental copy for pages that have changed
since the destination database was shut down cleanly.
For this mode, the destination directory must contain a previous
copy of the database that was shut down cleanly.
</para>
</listitem>
<listitem>
<para>
PTRACK — tracking page changes on the fly,
only copies pages that have changed since the point of divergence
of the source and destination databases.
For this mode, the destination directory must contain a previous
copy of the database that was shut down cleanly.
</para>
</listitem>
</itemizedlist>
<para>
You can use <link linkend="pbk-connection-opts">connection_options</link> to specify
the connection to the source database cluster. If it is located on a different server,
also specify <link linkend="pbk-remote-server-opts">remote_options</link>.
If the source database contains tablespaces that must be located in
a different directory, additionally specify the <option>--tablespace-mapping</option>
option:
<programlisting>
pg_probackup catchup -b <replaceable>catchup-mode</replaceable> --source-pgdata=<replaceable>path_to_pgdata_on_remote_server</replaceable> --destination-pgdata=<replaceable>path_to_local_dir</replaceable> --stream --tablespace-mapping=<replaceable>OLDDIR</replaceable>=<replaceable>NEWDIR</replaceable>
</programlisting>
To run the <command>catchup</command> command on parallel threads, specify the number
of threads with the <option>--threads</option> option:
<programlisting>
pg_probackup catchup -b <replaceable>catchup-mode</replaceable> --source-pgdata=<replaceable>path_to_pgdata_on_remote_server</replaceable> --destination-pgdata=<replaceable>path_to_local_dir</replaceable> --stream --threads=<replaceable>num_threads</replaceable>
</programlisting>
</para>
<para>
For example, assume that a remote standby server with the <productname>PostgreSQL</productname> instance having <filename>/replica-pgdata</filename> data directory has fallen behind. To sync this instance with the one in <filename>/master-pgdata</filename> data directory, you can run
the <command>catchup</command> command in the <literal>PTRACK</literal> mode on four parallel threads as follows:
<programlisting>
pg_probackup catchup --source-pgdata=/master-pgdata --destination-pgdata=/replica-pgdata -p 5432 -d postgres -U remote-postgres-user --stream --backup-mode=PTRACK --remote-host=remote-hostname --remote-user=remote-unix-username -j 4
</programlisting>
</para>
<para>
Another example shows how you can add a new remote standby server with the <productname>PostgreSQL</productname> data directory <filename>/replica-pgdata</filename> by running the <command>catchup</command> command in the <literal>FULL</literal> mode
on four parallel threads:
<programlisting>
pg_probackup catchup --source-pgdata=/master-pgdata --destination-pgdata=/replica-pgdata -p 5432 -d postgres -U remote-postgres-user --stream --backup-mode=FULL --remote-host=remote-hostname --remote-user=remote-unix-username -j 4
</programlisting>
</para>
</refsect2>
</refsect1>
<refsect1 id="pbk-reference">
@ -4262,6 +4421,121 @@ pg_probackup archive-get -B <replaceable>backup_dir</replaceable> --instance <re
For details, see section <link linkend="pbk-archiving-options">Archiving Options</link>.
</para>
</refsect3>
<refsect3 id="pbk-catchup" xreflabel="catchup" condition="12+">
<title>catchup</title>
<programlisting>
pg_probackup catchup -b <replaceable>catchup_mode</replaceable>
--source-pgdata=<replaceable>path_to_pgdata_on_remote_server</replaceable>
--destination-pgdata=<replaceable>path_to_local_dir</replaceable>
[--help] [--stream] [-j <replaceable>num_threads</replaceable>]
[-T <replaceable>OLDDIR</replaceable>=<replaceable>NEWDIR</replaceable>]
[<replaceable>connection_options</replaceable>] [<replaceable>remote_options</replaceable>]
</programlisting>
<para>
Creates a copy of a <productname>PostgreSQL</productname>
instance without using the backup catalog.
<variablelist>
<varlistentry>
<term><option>-b <replaceable>catchup_mode</replaceable></option></term>
<term><option>--backup-mode=<replaceable>catchup_mode</replaceable></option></term>
<listitem>
<para>
Specifies the catchup mode to use. Possible values are:
<itemizedlist spacing="compact">
<listitem>
<para>
<literal>FULL</literal> — creates a full copy of the <productname>PostgreSQL</productname> instance.
</para>
</listitem>
<listitem>
<para>
<literal>DELTA</literal> — reads all data files in the data directory and
creates an incremental copy for pages that have changed
since the destination database was shut down cleanly.
</para>
</listitem>
<listitem>
<para>
<literal>PTRACK</literal> — tracking page changes on the fly,
only copies pages that have changed since the point of divergence
of the source and destination databases.
</para>
</listitem>
</itemizedlist>
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--source-pgdata=<replaceable>path_to_pgdata_on_remote_server</replaceable></option></term>
<listitem>
<para>
Specifies the path to the data directory of the instance to be copied. The path can be local or remote.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--destination-pgdata=<replaceable>path_to_local_dir</replaceable></option></term>
<listitem>
<para>
Specifies the path to the local data directory to copy to.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--stream</option></term>
<listitem>
<para>
Makes a <link linkend="pbk-stream-mode">STREAM</link> backup, which
includes all the necessary WAL files by streaming them from
the database server via replication protocol.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-j <replaceable>num_threads</replaceable></option></term>
<term><option>--threads=<replaceable>num_threads</replaceable></option></term>
<listitem>
<para>
Sets the number of parallel threads for
<command>catchup</command> process.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-T <replaceable>OLDDIR</replaceable>=<replaceable>NEWDIR</replaceable></option></term>
<term><option>--tablespace-mapping=<replaceable>OLDDIR</replaceable>=<replaceable>NEWDIR</replaceable></option></term>
<listitem>
<para>
Relocates the tablespace from the <replaceable>OLDDIR</replaceable> to the <replaceable>NEWDIR</replaceable>
directory at the time of recovery. Both <replaceable>OLDDIR</replaceable> and <replaceable>NEWDIR</replaceable> must
be absolute paths. If the path contains the equals sign (<literal>=</literal>),
escape it with a backslash. This option can be specified
multiple times for multiple tablespaces.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
Additionally, <link linkend="pbk-connection-opts">connection
options</link>, <link linkend="pbk-remote-server-opts">remote
mode options</link> can be used.
</para>
<para>
For details on usage, see the section
<link linkend="pbk-creating-backup-to-catchup">Cloning <productname>PostgreSQL</productname> Instance</link>.
</para>
</refsect3>
</refsect2>
<refsect2 id="pbk-options">
<title>Options</title>
@ -4651,7 +4925,7 @@ pg_probackup archive-get -B <replaceable>backup_dir</replaceable> --instance <re
<term><option>--no-color</option></term>
<listitem>
<para>
Disable the coloring for console log messages of <literal>warning</literal> and <literal>error</literal> levels.
Disable coloring for console log messages of <literal>warning</literal> and <literal>error</literal> levels.
</para>
</listitem>
</varlistentry>
@ -4804,7 +5078,8 @@ pg_probackup archive-get -B <replaceable>backup_dir</replaceable> --instance <re
<title>Connection Options</title>
<para>
You can use these options together with
<xref linkend="pbk-backup"/> and
<xref linkend="pbk-backup"/>
<phrase condition="12+">, <xref linkend="pbk-catchup"/>,</phrase> and
<xref linkend="pbk-checkdb"/> commands.
</para>
<para>
@ -5095,6 +5370,7 @@ pg_probackup archive-get -B <replaceable>backup_dir</replaceable> --instance <re
used with <xref linkend="pbk-add-instance"/>,
<xref linkend="pbk-set-config"/>,
<xref linkend="pbk-backup"/>,
<phrase condition="12+"><xref linkend="pbk-catchup"/>,</phrase>
<xref linkend="pbk-restore"/>,
<xref linkend="pbk-archive-push"/>, and
<xref linkend="pbk-archive-get"/> commands.

View File

@ -148,7 +148,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa
elog(ERROR, "getcwd() error");
/* verify that archive-push --instance parameter is valid */
system_id = get_system_identifier(current_dir);
system_id = get_system_identifier(current_dir, FIO_DB_HOST);
if (instance->pgdata == NULL)
elog(ERROR, "Cannot read pg_probackup.conf for this instance");

View File

@ -94,7 +94,6 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
{
int i;
char external_prefix[MAXPGPATH]; /* Temp value. Used as template */
char dst_backup_path[MAXPGPATH];
char label[1024];
XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr;
@ -137,7 +136,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
#if PG_VERSION_NUM >= 90600
current.tli = get_current_timeline(backup_conn);
#else
current.tli = get_current_timeline_from_control(false);
current.tli = get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
#endif
/*
@ -258,17 +257,19 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
/* start stream replication */
if (current.stream)
{
join_path_components(dst_backup_path, current.database_dir, PG_XLOG_DIR);
fio_mkdir(dst_backup_path, DIR_PERMISSION, FIO_BACKUP_HOST);
char stream_xlog_path[MAXPGPATH];
start_WAL_streaming(backup_conn, dst_backup_path, &instance_config.conn_opt,
join_path_components(stream_xlog_path, current.database_dir, PG_XLOG_DIR);
fio_mkdir(stream_xlog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
start_WAL_streaming(backup_conn, stream_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli);
/* Make sure that WAL streaming is working
* PAGE backup in stream mode is waited twice, first for
* segment in WAL archive and then for streamed segment
*/
wait_wal_lsn(dst_backup_path, current.start_lsn, true, current.tli, false, true, ERROR, true);
wait_wal_lsn(stream_xlog_path, current.start_lsn, true, current.tli, false, true, ERROR, true);
}
/* initialize backup's file list */
@ -315,23 +316,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
elog(ERROR, "PGDATA is almost empty. Either it was concurrently deleted or "
"pg_probackup do not possess sufficient permissions to list PGDATA content");
/* Calculate pgdata_bytes */
for (i = 0; i < parray_num(backup_files_list); i++)
{
pgFile *file = (pgFile *) parray_get(backup_files_list, i);
if (file->external_dir_num != 0)
continue;
if (S_ISDIR(file->mode))
{
current.pgdata_bytes += 4096;
continue;
}
current.pgdata_bytes += file->size;
}
current.pgdata_bytes += calculate_datasize_of_filelist(backup_files_list);
pretty_size(current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes));
elog(INFO, "PGDATA size: %s", pretty_bytes);
@ -697,7 +682,7 @@ pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo)
if (nodeInfo->is_superuser)
elog(WARNING, "Current PostgreSQL role is superuser. "
"It is not recommended to run backup or checkdb as superuser.");
"It is not recommended to run pg_probackup under superuser.");
strlcpy(current.server_version, nodeInfo->server_version_str,
sizeof(current.server_version));
@ -786,7 +771,7 @@ do_backup(InstanceState *instanceState, pgSetBackupParams *set_backup_params,
// elog(WARNING, "ptrack_version_num %d", ptrack_version_num);
if (nodeInfo.ptrack_version_num > 0)
nodeInfo.is_ptrack_enable = pg_ptrack_enable(backup_conn, nodeInfo.ptrack_version_num);
nodeInfo.is_ptrack_enabled = pg_is_ptrack_enabled(backup_conn, nodeInfo.ptrack_version_num);
if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
{
@ -795,7 +780,7 @@ do_backup(InstanceState *instanceState, pgSetBackupParams *set_backup_params,
elog(ERROR, "This PostgreSQL instance does not support ptrack");
else
{
if (!nodeInfo.is_ptrack_enable)
if (!nodeInfo.is_ptrack_enabled)
elog(ERROR, "Ptrack is disabled");
}
}
@ -953,12 +938,12 @@ check_server_version(PGconn *conn, PGNodeInfo *nodeInfo)
* All system identifiers must be equal.
*/
void
check_system_identifiers(PGconn *conn, char *pgdata)
check_system_identifiers(PGconn *conn, const char *pgdata)
{
uint64 system_id_conn;
uint64 system_id_pgdata;
system_id_pgdata = get_system_identifier(pgdata);
system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST);
system_id_conn = get_remote_system_identifier(conn);
/* for checkdb check only system_id_pgdata and system_id_conn */
@ -1069,7 +1054,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup,
* Switch to a new WAL segment. It should be called only for master.
* For PG 9.5 it should be called only if pguser is superuser.
*/
static void
void
pg_switch_wal(PGconn *conn)
{
PGresult *res;
@ -2282,7 +2267,7 @@ process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
}
static void
void
check_external_for_tablespaces(parray *external_list, PGconn *backup_conn)
{
PGresult *res;
@ -2346,3 +2331,36 @@ check_external_for_tablespaces(parray *external_list, PGconn *backup_conn)
}
}
}
/*
* Calculate pgdata_bytes
* accepts (parray *) of (pgFile *)
*/
int64
calculate_datasize_of_filelist(parray *filelist)
{
int64 bytes = 0;
int i;
/* parray_num don't check for NULL */
if (filelist == NULL)
return 0;
for (i = 0; i < parray_num(filelist); i++)
{
pgFile *file = (pgFile *) parray_get(filelist, i);
if (file->external_dir_num != 0)
continue;
if (S_ISDIR(file->mode))
{
// TODO is a dir always 4K?
bytes += 4096;
continue;
}
bytes += file->size;
}
return bytes;
}

View File

@ -2883,7 +2883,7 @@ pgNodeInit(PGNodeInfo *node)
node->server_version_str[0] = '\0';
node->ptrack_version_num = 0;
node->is_ptrack_enable = false;
node->is_ptrack_enabled = false;
node->ptrack_schema = NULL;
}

1020
src/catchup.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -268,7 +268,7 @@ get_checksum_errormsg(Page page, char **errormsg, BlockNumber absolute_blkno)
* PageIsOk(0) if page was successfully retrieved
* PageIsTruncated(-1) if the page was truncated
* SkipCurrentPage(-2) if we need to skip this page,
* only used for DELTA backup
* only used for DELTA and PTRACK backup
* PageIsCorrupted(-3) if the page checksum mismatch
* or header corruption,
* only used for checkdb
@ -400,7 +400,12 @@ prepare_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
page_st->lsn > 0 &&
page_st->lsn < prev_backup_start_lsn)
{
elog(VERBOSE, "Skipping blknum %u in file: \"%s\"", blknum, from_fullpath);
elog(VERBOSE, "Skipping blknum %u in file: \"%s\", file->exists_in_prev: %s, page_st->lsn: %X/%X, prev_backup_start_lsn: %X/%X",
blknum, from_fullpath,
file->exists_in_prev ? "true" : "false",
(uint32) (page_st->lsn >> 32), (uint32) page_st->lsn,
(uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn
);
return SkipCurrentPage;
}
@ -458,6 +463,23 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
return compressed_size;
}
/* взята из compress_and_backup_page, но выпилена вся магия заголовков и компрессии, просто копирование 1-в-1 */
static int
copy_page(pgFile *file, BlockNumber blknum,
FILE *in, FILE *out, Page page,
const char *to_fullpath)
{
/* write data page */
if (fio_fwrite(out, page, BLCKSZ) != BLCKSZ)
elog(ERROR, "File: \"%s\", cannot write at block %u: %s",
to_fullpath, blknum, strerror(errno));
file->write_size += BLCKSZ;
file->uncompressed_size += BLCKSZ;
return BLCKSZ;
}
/*
* Backup data file in the from_root directory to the to_root directory with
* same relative path. If prev_backup_start_lsn is not NULL, only pages with
@ -623,6 +645,169 @@ cleanup:
pg_free(headers);
}
/*
* Backup data file in the from_root directory to the to_root directory with
* same relative path. If prev_backup_start_lsn is not NULL, only pages with
* higher lsn will be copied.
* Not just copy file, but read it block by block (use bitmap in case of
* incremental backup), validate checksum, optionally compress and write to
* backup with special header.
*/
void
catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version,
int ptrack_version_num, const char *ptrack_schema,
bool is_merge, size_t prev_size)
{
int rc;
bool use_pagemap;
char *errmsg = NULL;
BlockNumber err_blknum = 0;
/* page headers */
BackupPageHeader2 *headers = NULL;
/* sanity */
if (file->size % BLCKSZ != 0)
elog(WARNING, "File: \"%s\", invalid file size %zu", from_fullpath, file->size);
/*
* Compute expected number of blocks in the file.
* NOTE This is a normal situation, if the file size has changed
* since the moment we computed it.
*/
file->n_blocks = file->size/BLCKSZ;
/*
* Skip unchanged file only if it exists in previous backup.
* This way we can correctly handle null-sized files which are
* not tracked by pagemap and thus always marked as unchanged.
*/
if (backup_mode == BACKUP_MODE_DIFF_PTRACK &&
file->pagemap.bitmapsize == PageBitmapIsEmpty &&
file->exists_in_prev && file->size == prev_size && !file->pagemap_isabsent)
{
/*
* There are no changed blocks since last backup. We want to make
* incremental backup, so we should exit.
*/
file->write_size = BYTES_INVALID;
return;
}
/* reset size summary */
file->read_size = 0;
file->write_size = 0;
file->uncompressed_size = 0;
INIT_FILE_CRC32(true, file->crc);
/*
* Read each page, verify checksum and write it to backup.
* If page map is empty or file is not present in previous backup
* backup all pages of the relation.
*
* In PTRACK 1.x there was a problem
* of data files with missing _ptrack map.
* Such files should be fully copied.
*/
if (file->pagemap.bitmapsize == PageBitmapIsEmpty ||
file->pagemap_isabsent || !file->exists_in_prev ||
!file->pagemap.bitmap)
use_pagemap = false;
else
use_pagemap = true;
if (use_pagemap)
elog(VERBOSE, "Using pagemap for file \"%s\"", file->rel_path);
/* Remote mode */
if (fio_is_remote(FIO_DB_HOST))
{
rc = fio_copy_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
(backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
calg, clevel, checksum_version,
/* send pagemap if any */
use_pagemap,
/* variables for error reporting */
&err_blknum, &errmsg, &headers);
}
else
{
/* TODO: stop handling errors internally */
rc = copy_pages(to_fullpath, from_fullpath, file,
/* send prev backup START_LSN */
(backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
checksum_version, use_pagemap,
backup_mode, ptrack_version_num, ptrack_schema);
}
/* check for errors */
if (rc == FILE_MISSING)
{
elog(is_merge ? ERROR : LOG, "File not found: \"%s\"", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
err_blknum, to_fullpath, strerror(errno));
else if (rc == PAGE_CORRUPTION)
{
if (errmsg)
elog(ERROR, "Corruption detected in file \"%s\", block %u: %s",
from_fullpath, err_blknum, errmsg);
else
elog(ERROR, "Corruption detected in file \"%s\", block %u",
from_fullpath, err_blknum);
}
/* OPEN_FAILED and READ_FAILED */
else if (rc == OPEN_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot open file \"%s\"", from_fullpath);
}
else if (rc == READ_FAILED)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot read file \"%s\"", from_fullpath);
}
file->read_size = rc * BLCKSZ;
/* refresh n_blocks for FULL and DELTA */
if (backup_mode == BACKUP_MODE_FULL ||
backup_mode == BACKUP_MODE_DIFF_DELTA)
file->n_blocks = file->read_size / BLCKSZ;
/* Determine that file didn`t changed in case of incremental catchup */
if (backup_mode != BACKUP_MODE_FULL &&
file->exists_in_prev &&
file->write_size == 0 &&
file->n_blocks > 0)
{
file->write_size = BYTES_INVALID;
}
cleanup:
/* finish CRC calculation */
FIN_FILE_CRC32(true, file->crc);
pg_free(errmsg);
pg_free(file->pagemap.bitmap);
pg_free(headers);
}
/*
* Backup non data file
* We do not apply compression to this file.
@ -1992,6 +2177,7 @@ send_pages(const char *to_fullpath, const char *from_fullpath,
true, checksum_version,
ptrack_version_num, ptrack_schema,
from_fullpath, &page_st);
if (rc == PageIsTruncated)
break;
@ -2068,6 +2254,130 @@ send_pages(const char *to_fullpath, const char *from_fullpath,
return n_blocks_read;
}
/* copy local file (взята из send_pages, но используется простое копирование странички, без добавления заголовков и компрессии) */
int
copy_pages(const char *to_fullpath, const char *from_fullpath,
pgFile *file, XLogRecPtr sync_lsn,
uint32 checksum_version, bool use_pagemap,
BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema)
{
FILE *in = NULL;
FILE *out = NULL;
char curr_page[BLCKSZ];
int n_blocks_read = 0;
BlockNumber blknum = 0;
datapagemap_iterator_t *iter = NULL;
/* stdio buffers */
char *in_buf = NULL;
char *out_buf = NULL;
/* open source file for read */
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
/*
* If file is not found, this is not en error.
* It could have been deleted by concurrent postgres transaction.
*/
if (errno == ENOENT)
return FILE_MISSING;
elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath, strerror(errno));
}
/*
* Enable stdio buffering for local input file,
* unless the pagemap is involved, which
* imply a lot of random access.
*/
if (use_pagemap)
{
iter = datapagemap_iterate(&file->pagemap);
datapagemap_next(iter, &blknum); /* set first block */
setvbuf(in, NULL, _IONBF, BUFSIZ);
}
else
{
in_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
}
out = fio_fopen(to_fullpath, PG_BINARY_R "+", FIO_BACKUP_HOST);
if (out == NULL)
elog(ERROR, "Cannot open destination file \"%s\": %s",
to_fullpath, strerror(errno));
/* update file permission */
if (fio_chmod(to_fullpath, file->mode, FIO_BACKUP_HOST) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno));
elog(VERBOSE, "ftruncate file \"%s\" to size %lu",
to_fullpath, file->size);
if (fio_ftruncate(out, file->size) == -1)
elog(ERROR, "Cannot ftruncate file \"%s\" to size %lu: %s",
to_fullpath, file->size, strerror(errno));
if (!fio_is_remote_file(out))
{
out_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
}
while (blknum < file->n_blocks)
{
PageState page_st;
int rc = prepare_page(file, sync_lsn,
blknum, in, backup_mode, curr_page,
true, checksum_version,
ptrack_version_num, ptrack_schema,
from_fullpath, &page_st);
if (rc == PageIsTruncated)
break;
else if (rc == PageIsOk)
{
if (fio_fseek(out, blknum * BLCKSZ) < 0)
{
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
copy_page(file, blknum, in, out, curr_page, to_fullpath);
}
n_blocks_read++;
/* next block */
if (use_pagemap)
{
/* exit if pagemap is exhausted */
if (!datapagemap_next(iter, &blknum))
break;
}
else
blknum++;
}
/* cleanup */
if (in && fclose(in))
elog(ERROR, "Cannot close the source file \"%s\": %s",
to_fullpath, strerror(errno));
/* close local output file */
if (out && fio_fclose(out))
elog(ERROR, "Cannot close the destination file \"%s\": %s",
to_fullpath, strerror(errno));
pg_free(iter);
pg_free(in_buf);
pg_free(out_buf);
return n_blocks_read;
}
/*
* Attempt to open header file, read content and return as
* array of headers.

View File

@ -485,6 +485,13 @@ pgFileCompareSize(const void *f1, const void *f2)
return 0;
}
/* Compare two pgFile with their size in descending order */
int
pgFileCompareSizeDesc(const void *f1, const void *f2)
{
return -1 * pgFileCompareSize(f1, f2);
}
static int
pgCompareString(const void *str1, const void *str2)
{
@ -887,7 +894,7 @@ dir_list_file_internal(parray *files, pgFile *parent, const char *parent_dir,
*
* Copy of function get_tablespace_mapping() from pg_basebackup.c.
*/
static const char *
const char *
get_tablespace_mapping(const char *dir)
{
TablespaceListCell *cell;

View File

@ -2,7 +2,7 @@
*
* help.c
*
* Copyright (c) 2017-2019, Postgres Professional
* Copyright (c) 2017-2021, Postgres Professional
*
*-------------------------------------------------------------------------
*/
@ -29,6 +29,7 @@ static void help_archive_get(void);
static void help_checkdb(void);
static void help_help(void);
static void help_version(void);
static void help_catchup(void);
void
help_print_version(void)
@ -70,6 +71,7 @@ help_command(ProbackupSubcmd const subcmd)
&help_internal, // AGENT_CMD
&help_help,
&help_version,
&help_catchup,
};
Assert((int)subcmd < sizeof(help_functions) / sizeof(help_functions[0]));
@ -246,6 +248,19 @@ help_pg_probackup(void)
printf(_(" [--ssh-options]\n"));
printf(_(" [--help]\n"));
printf(_("\n %s catchup -b catchup-mode\n"), PROGRAM_NAME);
printf(_(" --source-pgdata=path_to_pgdata_on_remote_server\n"));
printf(_(" --destination-pgdata=path_to_local_dir\n"));
printf(_(" [--stream [-S slot-name]] [--temp-slot]\n"));
printf(_(" [-j num-threads]\n"));
printf(_(" [-T OLDDIR=NEWDIR]\n"));
printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n"));
printf(_(" [-w --no-password] [-W --password]\n"));
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--help]\n"));
if ((PROGRAM_URL || PROGRAM_EMAIL))
{
printf("\n");
@ -1009,3 +1024,49 @@ help_version(void)
printf(_("\n%s version\n"), PROGRAM_NAME);
printf(_("%s --version\n\n"), PROGRAM_NAME);
}
static void
help_catchup(void)
{
printf(_("\n%s catchup -b catchup-mode\n"), PROGRAM_NAME);
printf(_(" --source-pgdata=path_to_pgdata_on_remote_server\n"));
printf(_(" --destination-pgdata=path_to_local_dir\n"));
printf(_(" [--stream [-S slot-name]] [--temp-slot]\n"));
printf(_(" [-j num-threads]\n"));
printf(_(" [-T OLDDIR=NEWDIR]\n"));
printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n"));
printf(_(" [-w --no-password] [-W --password]\n"));
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--help]\n\n"));
printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n"));
printf(_(" --stream stream the transaction log (only supported mode)\n"));
printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
printf(_(" --temp-slot use temporary replication slot\n"));
printf(_(" -j, --threads=NUM number of parallel threads\n"));
printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n"));
printf(_(" relocate the tablespace from directory OLDDIR to NEWDIR\n"));
printf(_("\n Connection options:\n"));
printf(_(" -U, --pguser=USERNAME user name to connect as (default: current local user)\n"));
printf(_(" -d, --pgdatabase=DBNAME database to connect (default: username)\n"));
printf(_(" -h, --pghost=HOSTNAME database server host or socket directory(default: 'local socket')\n"));
printf(_(" -p, --pgport=PORT database server port (default: 5432)\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt\n\n"));
printf(_("\n Remote options:\n"));
printf(_(" --remote-proto=protocol remote protocol to use\n"));
printf(_(" available options: 'ssh', 'none' (default: ssh)\n"));
printf(_(" --remote-host=hostname remote host address or hostname\n"));
printf(_(" --remote-port=port remote host port (default: 22)\n"));
printf(_(" --remote-path=path path to directory with pg_probackup binary on remote host\n"));
printf(_(" (default: current binary path)\n"));
printf(_(" --remote-user=username user name for ssh connection (default: current user)\n"));
printf(_(" --ssh-options=ssh_options additional ssh options (default: none)\n"));
printf(_(" (example: --ssh-options='-c cipher_spec -F configfile')\n\n"));
}

View File

@ -57,7 +57,7 @@ do_add_instance(InstanceState *instanceState, InstanceConfig *instance)
"(-D, --pgdata)");
/* Read system_identifier from PGDATA */
instance->system_identifier = get_system_identifier(instance->pgdata);
instance->system_identifier = get_system_identifier(instance->pgdata, FIO_DB_HOST);
/* Starting from PostgreSQL 11 read WAL segment size from PGDATA */
instance->xlog_seg_size = get_xlog_seg_size(instance->pgdata);

View File

@ -88,6 +88,9 @@ bool backup_logs = false;
bool smooth_checkpoint;
char *remote_agent;
static char *backup_note = NULL;
/* catchup options */
static char *catchup_source_pgdata = NULL;
static char *catchup_destination_pgdata = NULL;
/* restore options */
static char *target_time = NULL;
static char *target_xid = NULL;
@ -201,6 +204,9 @@ static ConfigOption cmd_options[] =
{ 'b', 184, "merge-expired", &merge_expired, SOURCE_CMD_STRICT },
{ 'b', 185, "dry-run", &dry_run, SOURCE_CMD_STRICT },
{ 's', 238, "note", &backup_note, SOURCE_CMD_STRICT },
/* catchup options */
{ 's', 239, "source-pgdata", &catchup_source_pgdata, SOURCE_CMD_STRICT },
{ 's', 240, "destination-pgdata", &catchup_destination_pgdata, SOURCE_CMD_STRICT },
/* restore options */
{ 's', 136, "recovery-target-time", &target_time, SOURCE_CMD_STRICT },
{ 's', 137, "recovery-target-xid", &target_xid, SOURCE_CMD_STRICT },
@ -445,11 +451,12 @@ main(int argc, char *argv[])
catalogState->catalog_path, WAL_SUBDIR);
}
/* backup_path is required for all pg_probackup commands except help, version and checkdb */
/* backup_path is required for all pg_probackup commands except help, version, checkdb and catchup */
if (backup_path == NULL &&
backup_subcmd != CHECKDB_CMD &&
backup_subcmd != HELP_CMD &&
backup_subcmd != VERSION_CMD)
backup_subcmd != VERSION_CMD &&
backup_subcmd != CATCHUP_CMD)
elog(ERROR, "required parameter not specified: BACKUP_PATH (-B, --backup-path)");
/* ===== catalogState (END) ======*/
@ -458,12 +465,12 @@ main(int argc, char *argv[])
/*
* Option --instance is required for all commands except
* init, show, checkdb and validate
* init, show, checkdb, validate and catchup
*/
if (instance_name == NULL)
{
if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD &&
backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD)
backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD && backup_subcmd != CATCHUP_CMD)
elog(ERROR, "required parameter not specified: --instance");
}
else
@ -545,6 +552,10 @@ main(int argc, char *argv[])
setMyLocation(backup_subcmd);
}
}
else if (backup_subcmd == CATCHUP_CMD)
{
config_get_opt_env(instance_options);
}
/*
* Disable logging into file for archive-push and archive-get.
@ -587,6 +598,13 @@ main(int argc, char *argv[])
"You must specify --log-directory option when running checkdb with "
"--log-level-file option enabled.");
if (backup_subcmd == CATCHUP_CMD &&
instance_config.logger.log_level_file != LOG_OFF &&
instance_config.logger.log_directory == NULL)
elog(ERROR, "Cannot save catchup logs to a file. "
"You must specify --log-directory option when running catchup with "
"--log-level-file option enabled.");
/* Initialize logger */
init_logger(backup_path, &instance_config.logger);
@ -745,6 +763,25 @@ main(int argc, char *argv[])
}
}
/* checking required options */
if (backup_subcmd == CATCHUP_CMD)
{
if (catchup_source_pgdata == NULL)
elog(ERROR, "You must specify \"--source-pgdata\" option with the \"%s\" command", get_subcmd_name(backup_subcmd));
if (catchup_destination_pgdata == NULL)
elog(ERROR, "You must specify \"--destination-pgdata\" option with the \"%s\" command", get_subcmd_name(backup_subcmd));
if (current.backup_mode == BACKUP_MODE_INVALID)
elog(ERROR, "Required parameter not specified: BACKUP_MODE (-b, --backup-mode)");
if (current.backup_mode != BACKUP_MODE_FULL && current.backup_mode != BACKUP_MODE_DIFF_PTRACK && current.backup_mode != BACKUP_MODE_DIFF_DELTA)
elog(ERROR, "Only \"FULL\", \"PTRACK\" and \"DELTA\" modes are supported with the \"%s\" command", get_subcmd_name(backup_subcmd));
if (!stream_wal)
elog(INFO, "--stream is required, forcing stream mode");
current.stream = stream_wal = true;
if (instance_config.external_dir_str)
elog(ERROR, "external directories not supported fom \"%s\" command", get_subcmd_name(backup_subcmd));
// TODO проверить instance_config.conn_opt
}
/* sanity */
if (backup_subcmd == VALIDATE_CMD && restore_params->no_validate)
elog(ERROR, "You cannot specify \"--no-validate\" option with the \"%s\" command",
@ -787,6 +824,8 @@ main(int argc, char *argv[])
return do_backup(instanceState, set_backup_params,
no_validate, no_sync, backup_logs);
}
case CATCHUP_CMD:
return do_catchup(catchup_source_pgdata, catchup_destination_pgdata, num_threads, !no_sync);
case RESTORE_CMD:
return do_restore_or_validate(instanceState, current.backup_id,
recovery_target_options,

View File

@ -17,6 +17,7 @@
#include "access/xlog_internal.h"
#include "utils/pg_crc.h"
#include "catalog/pg_control.h"
#if PG_VERSION_NUM >= 120000
#include "common/logging.h"
@ -420,7 +421,7 @@ typedef struct PGNodeInfo
char server_version_str[100];
int ptrack_version_num;
bool is_ptrack_enable;
bool is_ptrack_enabled;
const char *ptrack_schema; /* used only for ptrack 2.x */
} PGNodeInfo;
@ -840,13 +841,16 @@ extern const char *deparse_backup_mode(BackupMode mode);
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
BlockNumber blkno);
/* in catchup.c */
extern int do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, bool sync_dest_files);
/* in restore.c */
extern int do_restore_or_validate(InstanceState *instanceState,
time_t target_backup_id,
pgRecoveryTarget *rt,
pgRestoreParams *params,
bool no_sync);
extern bool satisfy_timeline(const parray *timelines, const pgBackup *backup);
extern bool satisfy_timeline(const parray *timelines, TimeLineID tli, XLogRecPtr lsn);
extern bool satisfy_recovery_target(const pgBackup *backup,
const pgRecoveryTarget *rt);
extern pgRecoveryTarget *parseRecoveryTargetOptions(
@ -861,6 +865,8 @@ extern parray *get_dbOid_exclude_list(pgBackup *backup, parray *datname_list,
extern parray *get_backup_filelist(pgBackup *backup, bool strict);
extern parray *read_timeline_history(const char *arclog_path, TimeLineID targetTLI, bool strict);
extern bool tliIsPartOfHistory(const parray *timelines, TimeLineID tli);
extern DestDirIncrCompatibility check_incremental_compatibility(const char *pgdata, uint64 system_identifier,
IncrRestoreMode incremental_mode);
/* in merge.c */
extern void do_merge(InstanceState *instanceState, time_t backup_id, bool no_validate, bool no_sync);
@ -1002,6 +1008,7 @@ extern void dir_list_file(parray *files, const char *root, bool exclude,
bool follow_symlink, bool add_root, bool backup_logs,
bool skip_hidden, int external_dir_num, fio_location location);
extern const char *get_tablespace_mapping(const char *dir);
extern void create_data_directories(parray *dest_files,
const char *data_dir,
const char *backup_dir,
@ -1054,6 +1061,7 @@ extern int pgFileCompareRelPathWithExternal(const void *f1, const void *f2);
extern int pgFileCompareRelPathWithExternalDesc(const void *f1, const void *f2);
extern int pgFileCompareLinked(const void *f1, const void *f2);
extern int pgFileCompareSize(const void *f1, const void *f2);
extern int pgFileCompareSizeDesc(const void *f1, const void *f2);
extern int pgCompareOid(const void *f1, const void *f2);
extern void pfilearray_clear_locks(parray *file_list);
@ -1061,6 +1069,12 @@ extern void pfilearray_clear_locks(parray *file_list);
extern bool check_data_file(ConnectionArgs *arguments, pgFile *file,
const char *from_fullpath, uint32 checksum_version);
extern void catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version,
int ptrack_version_num, const char *ptrack_schema,
bool is_merge, size_t prev_size);
extern void backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version,
@ -1129,14 +1143,15 @@ extern XLogRecPtr get_next_record_lsn(const char *archivedir, XLogSegNo segno, T
/* in util.c */
extern TimeLineID get_current_timeline(PGconn *conn);
extern TimeLineID get_current_timeline_from_control(bool safe);
extern TimeLineID get_current_timeline_from_control(const char *pgdata_path, fio_location location, bool safe);
extern XLogRecPtr get_checkpoint_location(PGconn *conn);
extern uint64 get_system_identifier(const char *pgdata_path);
extern uint64 get_system_identifier(const char *pgdata_path, fio_location location);
extern uint64 get_remote_system_identifier(PGconn *conn);
extern uint32 get_data_checksum_version(bool safe);
extern pg_crc32c get_pgcontrol_checksum(const char *pgdata_path);
extern uint32 get_xlog_seg_size(char *pgdata_path);
extern void get_redo(const char *pgdata_path, RedoParams *redo);
extern DBState get_system_dbstate(const char *pgdata_path, fio_location location);
extern uint32 get_xlog_seg_size(const char *pgdata_path);
extern void get_redo(const char *pgdata_path, fio_location pgdata_location, RedoParams *redo);
extern void set_min_recovery_point(pgFile *file, const char *backup_path,
XLogRecPtr stop_backup_lsn);
extern void copy_pgcontrol_file(const char *from_fullpath, fio_location from_location,
@ -1161,7 +1176,7 @@ extern void pretty_size(int64 size, char *buf, size_t len);
extern void pretty_time_interval(double time, char *buf, size_t len);
extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo);
extern void check_system_identifiers(PGconn *conn, char *pgdata);
extern void check_system_identifiers(PGconn *conn, const char *pgdata);
extern void parse_filelist_filenames(parray *files, const char *root);
/* in ptrack.c */
@ -1170,7 +1185,8 @@ extern void make_pagemap_from_ptrack_2(parray* files, PGconn* backup_conn,
int ptrack_version_num,
XLogRecPtr lsn);
extern void get_ptrack_version(PGconn *backup_conn, PGNodeInfo *nodeInfo);
extern bool pg_ptrack_enable(PGconn *backup_conn, int ptrack_version_num);
extern bool pg_is_ptrack_enabled(PGconn *backup_conn, int ptrack_version_num);
extern XLogRecPtr get_last_ptrack_lsn(PGconn *backup_conn, PGNodeInfo *nodeInfo);
extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema,
int ptrack_version_num, XLogRecPtr lsn);
@ -1182,6 +1198,10 @@ extern int send_pages(const char *to_fullpath, const char *from_fullpath,
pgFile *file, XLogRecPtr prev_backup_start_lsn, CompressAlg calg, int clevel,
uint32 checksum_version, bool use_pagemap, BackupPageHeader2 **headers,
BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema);
extern int copy_pages(const char *to_fullpath, const char *from_fullpath,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
uint32 checksum_version, bool use_pagemap,
BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema);
/* FIO */
extern void setMyLocation(ProbackupSubcmd const subcmd);
@ -1190,6 +1210,10 @@ extern int fio_send_pages(const char *to_fullpath, const char *from_fullpath, pg
XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version,
bool use_pagemap, BlockNumber *err_blknum, char **errormsg,
BackupPageHeader2 **headers);
extern int fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version,
bool use_pagemap, BlockNumber *err_blknum, char **errormsg,
BackupPageHeader2 **headers);
/* return codes for fio_send_pages */
extern int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, char **errormsg);
extern int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
@ -1243,6 +1267,7 @@ extern void start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path,
ConnectionOptions *conn_opt,
XLogRecPtr startpos, TimeLineID starttli);
extern int wait_WAL_streaming_end(parray *backup_files_list);
extern parray* parse_tli_history_buffer(char *history, TimeLineID tli);
/* external variables and functions, implemented in backup.c */
typedef struct PGStopBackupResult
@ -1280,5 +1305,6 @@ extern XLogRecPtr wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr lsn, bool
bool in_prev_segment, bool segment_only,
int timeout_elevel, bool in_stream_dir);
extern void wait_wal_and_calculate_stop_lsn(const char *xlog_path, XLogRecPtr stop_lsn, pgBackup *backup);
extern int64 calculate_datasize_of_filelist(parray *filelist);
#endif /* PG_PROBACKUP_H */

View File

@ -118,7 +118,7 @@ get_ptrack_version(PGconn *backup_conn, PGNodeInfo *nodeInfo)
* Check if ptrack is enabled in target instance
*/
bool
pg_ptrack_enable(PGconn *backup_conn, int ptrack_version_num)
pg_is_ptrack_enabled(PGconn *backup_conn, int ptrack_version_num)
{
PGresult *res_db;
bool result = false;

View File

@ -67,8 +67,6 @@ static void restore_chain(pgBackup *dest_backup, parray *parent_chain,
parray *dbOid_exclude_list, pgRestoreParams *params,
const char *pgdata_path, bool no_sync, bool cleanup_pgdata,
bool backup_has_tblspc);
static DestDirIncrCompatibility check_incremental_compatibility(const char *pgdata, uint64 system_identifier,
IncrRestoreMode incremental_mode);
/*
* Iterate over backup list to find all ancestors of the broken parent_backup
@ -293,7 +291,7 @@ do_restore_or_validate(InstanceState *instanceState, time_t target_backup_id, pg
if (!timelines)
elog(ERROR, "Failed to get history file for target timeline %i", rt->target_tli);
if (!satisfy_timeline(timelines, current_backup))
if (!satisfy_timeline(timelines, current_backup->tli, current_backup->stop_lsn))
{
if (target_backup_id != INVALID_BACKUP_ID)
elog(ERROR, "target backup %s does not satisfy target timeline",
@ -487,7 +485,7 @@ do_restore_or_validate(InstanceState *instanceState, time_t target_backup_id, pg
{
RedoParams redo;
parray *timelines = NULL;
get_redo(instance_config.pgdata, &redo);
get_redo(instance_config.pgdata, FIO_DB_HOST, &redo);
if (redo.checksum_version == 0)
elog(ERROR, "Incremental restore in 'lsn' mode require "
@ -1819,7 +1817,7 @@ satisfy_recovery_target(const pgBackup *backup, const pgRecoveryTarget *rt)
/* TODO description */
bool
satisfy_timeline(const parray *timelines, const pgBackup *backup)
satisfy_timeline(const parray *timelines, TimeLineID tli, XLogRecPtr lsn)
{
int i;
@ -1828,9 +1826,9 @@ satisfy_timeline(const parray *timelines, const pgBackup *backup)
TimeLineHistoryEntry *timeline;
timeline = (TimeLineHistoryEntry *) parray_get(timelines, i);
if (backup->tli == timeline->tli &&
if (tli == timeline->tli &&
(XLogRecPtrIsInvalid(timeline->end) ||
backup->stop_lsn <= timeline->end))
lsn <= timeline->end))
return true;
}
return false;
@ -2186,9 +2184,9 @@ check_incremental_compatibility(const char *pgdata, uint64 system_identifier,
* data files content, because based on pg_control information we will
* choose a backup suitable for lsn based incremental restore.
*/
elog(INFO, "Trying to read pg_control file in destination direstory");
elog(INFO, "Trying to read pg_control file in destination directory");
system_id_pgdata = get_system_identifier(pgdata);
system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST);
if (system_id_pgdata == instance_config.system_identifier)
system_id_match = true;

View File

@ -70,7 +70,6 @@ static void add_walsegment_to_filelist(parray *filelist, uint32 timeline,
uint32 xlog_seg_size);
static void add_history_file_to_filelist(parray *filelist, uint32 timeline,
char *basedir);
static parray* parse_tli_history_buffer(char *history, TimeLineID tli);
/*
* Run IDENTIFY_SYSTEM through a given connection and
@ -173,7 +172,7 @@ StreamLog(void *arg)
*/
stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size;
xlog_files_list = parray_new();
xlog_files_list = parray_new();
/* Initialize timeout */
stream_stop_begin = 0;
@ -308,14 +307,14 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
/* we assume that we get called once at the end of each segment */
if (segment_finished)
{
elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
(uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
{
elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
(uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
add_walsegment_to_filelist(xlog_files_list, timeline, xlogpos,
(char*) stream_thread_arg.basedir,
instance_config.xlog_seg_size);
}
add_walsegment_to_filelist(xlog_files_list, timeline, xlogpos,
(char*) stream_thread_arg.basedir,
instance_config.xlog_seg_size);
}
/*
* Note that we report the previous, not current, position here. After a
@ -588,20 +587,25 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption
/* Set error exit code as default */
stream_thread_arg.ret = 1;
/* we must use startpos as start_lsn from start_backup */
stream_thread_arg.startpos = current.start_lsn;
stream_thread_arg.starttli = current.tli;
stream_thread_arg.startpos = startpos;
stream_thread_arg.starttli = starttli;
thread_interrupted = false;
pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg);
}
/* Wait for the completion of stream */
/*
* Wait for the completion of stream
* append list of streamed xlog files
* into backup_files_list (if it is not NULL)
*/
int
wait_WAL_streaming_end(parray *backup_files_list)
{
pthread_join(stream_thread, NULL);
parray_concat(backup_files_list, xlog_files_list);
if(backup_files_list != NULL)
parray_concat(backup_files_list, xlog_files_list);
parray_free(xlog_files_list);
return stream_thread_arg.ret;
}

View File

@ -10,8 +10,6 @@
#include "pg_probackup.h"
#include "catalog/pg_control.h"
#include <time.h>
#include <unistd.h>
@ -174,7 +172,7 @@ get_current_timeline(PGconn *conn)
if (PQresultStatus(res) == PGRES_TUPLES_OK)
val = PQgetvalue(res, 0, 0);
else
return get_current_timeline_from_control(false);
return get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
if (!parse_uint32(val, &tli, 0))
{
@ -182,7 +180,7 @@ get_current_timeline(PGconn *conn)
elog(WARNING, "Invalid value of timeline_id %s", val);
/* TODO 3.0 remove it and just error out */
return get_current_timeline_from_control(false);
return get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
}
return tli;
@ -190,15 +188,15 @@ get_current_timeline(PGconn *conn)
/* Get timeline from pg_control file */
TimeLineID
get_current_timeline_from_control(bool safe)
get_current_timeline_from_control(const char *pgdata_path, fio_location location, bool safe)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
/* First fetch file... */
buffer = slurpFile(instance_config.pgdata, XLOG_CONTROL_FILE, &size,
safe, FIO_DB_HOST);
buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size,
safe, location);
if (safe && buffer == NULL)
return 0;
@ -249,14 +247,14 @@ get_checkpoint_location(PGconn *conn)
}
uint64
get_system_identifier(const char *pgdata_path)
get_system_identifier(const char *pgdata_path, fio_location location)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
/* First fetch file... */
buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, FIO_DB_HOST);
buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, location);
if (buffer == NULL)
return 0;
digestControlFile(&ControlFile, buffer, size);
@ -299,7 +297,7 @@ get_remote_system_identifier(PGconn *conn)
}
uint32
get_xlog_seg_size(char *pgdata_path)
get_xlog_seg_size(const char *pgdata_path)
{
#if PG_VERSION_NUM >= 110000
ControlFileData ControlFile;
@ -351,15 +349,31 @@ get_pgcontrol_checksum(const char *pgdata_path)
return ControlFile.crc;
}
DBState
get_system_dbstate(const char *pgdata_path, fio_location location)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, location);
if (buffer == NULL)
return 0;
digestControlFile(&ControlFile, buffer, size);
pg_free(buffer);
return ControlFile.state;
}
void
get_redo(const char *pgdata_path, RedoParams *redo)
get_redo(const char *pgdata_path, fio_location pgdata_location, RedoParams *redo)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
/* First fetch file... */
buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, FIO_DB_HOST);
buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, pgdata_location);
digestControlFile(&ControlFile, buffer, size);
pg_free(buffer);

View File

@ -110,6 +110,7 @@ static char const * const subcmd_names[] =
"agent",
"help",
"version",
"catchup",
};
ProbackupSubcmd

View File

@ -38,7 +38,8 @@ typedef enum ProbackupSubcmd
SSH_CMD,
AGENT_CMD,
HELP_CMD,
VERSION_CMD
VERSION_CMD,
CATCHUP_CMD,
} ProbackupSubcmd;
typedef enum OptionSource

View File

@ -94,7 +94,7 @@ setMyLocation(ProbackupSubcmd const subcmd)
MyLocation = IsSshProtocol()
? (subcmd == ARCHIVE_PUSH_CMD || subcmd == ARCHIVE_GET_CMD)
? FIO_DB_HOST
: (subcmd == BACKUP_CMD || subcmd == RESTORE_CMD || subcmd == ADD_INSTANCE_CMD)
: (subcmd == BACKUP_CMD || subcmd == RESTORE_CMD || subcmd == ADD_INSTANCE_CMD || subcmd == CATCHUP_CMD)
? FIO_BACKUP_HOST
: FIO_LOCAL_HOST
: FIO_LOCAL_HOST;
@ -1139,6 +1139,46 @@ fio_stat(char const* path, struct stat* st, bool follow_symlink, fio_location lo
}
}
/*
* Read value of a symbolic link
* this is a wrapper about readlink() syscall
* side effects: string truncation occur (and it
* can be checked by caller by comparing
* returned value >= valsiz)
*/
ssize_t
fio_readlink(const char *path, char *value, size_t valsiz, fio_location location)
{
if (!fio_is_remote(location))
{
/* readlink don't place trailing \0 */
ssize_t len = readlink(path, value, valsiz);
value[len < valsiz ? len : valsiz] = '\0';
return len;
}
else
{
fio_header hdr;
size_t path_len = strlen(path) + 1;
hdr.cop = FIO_READLINK;
hdr.handle = -1;
Assert(valsiz <= UINT_MAX); /* max value of fio_header.arg */
hdr.arg = valsiz;
hdr.size = path_len;
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len);
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_READLINK);
Assert(hdr.size <= valsiz);
IO_CHECK(fio_read_all(fio_stdin, value, hdr.size), hdr.size);
value[hdr.size < valsiz ? hdr.size : valsiz] = '\0';
return hdr.size;
}
}
/* Check presence of the file */
int
fio_access(char const* path, int mode, fio_location location)
@ -1769,7 +1809,7 @@ fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
/* send message with header
8bytes 24bytes var var
16bytes 24bytes var var
--------------------------------------------------------------
| fio_header | fio_send_request | FILE PATH | BITMAP(if any) |
--------------------------------------------------------------
@ -1903,6 +1943,198 @@ fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
return n_blocks_read;
}
/*
* Return number of actually(!) readed blocks, attempts or
* half-readed block are not counted.
* Return values in case of error:
* FILE_MISSING
* OPEN_FAILED
* READ_ERROR
* PAGE_CORRUPTION
* WRITE_FAILED
*
* If none of the above, this function return number of blocks
* readed by remote agent.
*
* In case of DELTA mode horizonLsn must be a valid lsn,
* otherwise it should be set to InvalidXLogRecPtr.
* Взято из fio_send_pages
*/
int
fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version,
bool use_pagemap, BlockNumber* err_blknum, char **errormsg,
BackupPageHeader2 **headers)
{
FILE *out = NULL;
char *out_buf = NULL;
struct {
fio_header hdr;
fio_send_request arg;
} req;
BlockNumber n_blocks_read = 0;
BlockNumber blknum = 0;
/* send message with header
16bytes 24bytes var var
--------------------------------------------------------------
| fio_header | fio_send_request | FILE PATH | BITMAP(if any) |
--------------------------------------------------------------
*/
req.hdr.cop = FIO_SEND_PAGES;
if (use_pagemap)
{
req.hdr.size = sizeof(fio_send_request) + (*file).pagemap.bitmapsize + strlen(from_fullpath) + 1;
req.arg.bitmapsize = (*file).pagemap.bitmapsize;
/* TODO: add optimization for the case of pagemap
* containing small number of blocks with big serial numbers:
* https://github.com/postgrespro/pg_probackup/blob/remote_page_backup/src/utils/file.c#L1211
*/
}
else
{
req.hdr.size = sizeof(fio_send_request) + strlen(from_fullpath) + 1;
req.arg.bitmapsize = 0;
}
req.arg.nblocks = file->size/BLCKSZ;
req.arg.segmentno = file->segno * RELSEG_SIZE;
req.arg.horizonLsn = horizonLsn;
req.arg.checksumVersion = checksum_version;
req.arg.calg = calg;
req.arg.clevel = clevel;
req.arg.path_len = strlen(from_fullpath) + 1;
file->compress_alg = calg; /* TODO: wtf? why here? */
//<-----
// datapagemap_iterator_t *iter;
// BlockNumber blkno;
// iter = datapagemap_iterate(pagemap);
// while (datapagemap_next(iter, &blkno))
// elog(INFO, "block %u", blkno);
// pg_free(iter);
//<-----
/* send header */
IO_CHECK(fio_write_all(fio_stdout, &req, sizeof(req)), sizeof(req));
/* send file path */
IO_CHECK(fio_write_all(fio_stdout, from_fullpath, req.arg.path_len), req.arg.path_len);
/* send pagemap if any */
if (use_pagemap)
IO_CHECK(fio_write_all(fio_stdout, (*file).pagemap.bitmap, (*file).pagemap.bitmapsize), (*file).pagemap.bitmapsize);
out = fio_fopen(to_fullpath, PG_BINARY_R "+", FIO_BACKUP_HOST);
if (out == NULL)
elog(ERROR, "Cannot open restore target file \"%s\": %s", to_fullpath, strerror(errno));
/* update file permission */
if (fio_chmod(to_fullpath, file->mode, FIO_BACKUP_HOST) == -1)
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
strerror(errno));
elog(VERBOSE, "ftruncate file \"%s\" to size %lu",
to_fullpath, file->size);
if (fio_ftruncate(out, file->size) == -1)
elog(ERROR, "Cannot ftruncate file \"%s\" to size %lu: %s",
to_fullpath, file->size, strerror(errno));
if (!fio_is_remote_file(out))
{
out_buf = pgut_malloc(STDIO_BUFSIZE);
setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
}
while (true)
{
fio_header hdr;
char buf[BLCKSZ + sizeof(BackupPageHeader)];
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
if (interrupted)
elog(ERROR, "Interrupted during page reading");
if (hdr.cop == FIO_ERROR)
{
/* FILE_MISSING, OPEN_FAILED and READ_FAILED */
if (hdr.size > 0)
{
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
*errormsg = pgut_malloc(hdr.size);
snprintf(*errormsg, hdr.size, "%s", buf);
}
return hdr.arg;
}
else if (hdr.cop == FIO_SEND_FILE_CORRUPTION)
{
*err_blknum = hdr.arg;
if (hdr.size > 0)
{
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
*errormsg = pgut_malloc(hdr.size);
snprintf(*errormsg, hdr.size, "%s", buf);
}
return PAGE_CORRUPTION;
}
else if (hdr.cop == FIO_SEND_FILE_EOF)
{
/* n_blocks_read reported by EOF */
n_blocks_read = hdr.arg;
/* receive headers if any */
if (hdr.size > 0)
{
*headers = pgut_malloc(hdr.size);
IO_CHECK(fio_read_all(fio_stdin, *headers, hdr.size), hdr.size);
file->n_headers = (hdr.size / sizeof(BackupPageHeader2)) -1;
}
break;
}
else if (hdr.cop == FIO_PAGE)
{
blknum = hdr.arg;
Assert(hdr.size <= sizeof(buf));
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
COMP_FILE_CRC32(true, file->crc, buf, hdr.size);
if (fio_fseek(out, blknum * BLCKSZ) < 0)
{
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
// должен прилетать некомпрессированный блок с заголовком
// Вставить assert?
if (fio_fwrite(out, buf + sizeof(BackupPageHeader), hdr.size - sizeof(BackupPageHeader)) != BLCKSZ)
{
fio_fclose(out);
*err_blknum = blknum;
return WRITE_FAILED;
}
file->write_size += BLCKSZ;
file->uncompressed_size += BLCKSZ;
}
else
elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop);
}
if (out)
fclose(out);
pg_free(out_buf);
return n_blocks_read;
}
/* TODO: read file using large buffer
* Return codes:
* FIO_ERROR:
@ -3147,6 +3379,26 @@ fio_communicate(int in, int out)
case FIO_GET_ASYNC_ERROR:
fio_get_async_error_impl(out);
break;
case FIO_READLINK: /* Read content of a symbolic link */
{
/*
* We need a buf for a arguments and for a result at the same time
* hdr.size = strlen(symlink_name) + 1
* hdr.arg = bufsize for a answer (symlink content)
*/
size_t filename_size = (size_t)hdr.size;
if (filename_size + hdr.arg > buf_size) {
buf_size = hdr.arg;
buf = (char*)realloc(buf, buf_size);
}
rc = readlink(buf, buf + filename_size, hdr.arg);
hdr.cop = FIO_READLINK;
hdr.size = rc > 0 ? rc : 0;
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size != 0)
IO_CHECK(fio_write_all(out, buf + filename_size, hdr.size), hdr.size);
}
break;
default:
Assert(false);
}

View File

@ -55,7 +55,8 @@ typedef enum
FIO_LIST_DIR,
FIO_CHECK_POSTMASTER,
FIO_GET_ASYNC_ERROR,
FIO_WRITE_ASYNC
FIO_WRITE_ASYNC,
FIO_READLINK
} fio_operations;
typedef enum
@ -128,6 +129,7 @@ extern int fio_mkdir(char const* path, int mode, fio_location location);
extern int fio_chmod(char const* path, int mode, fio_location location);
extern int fio_access(char const* path, int mode, fio_location location);
extern int fio_stat(char const* path, struct stat* st, bool follow_symlinks, fio_location location);
extern ssize_t fio_readlink(const char *path, char *value, size_t valsiz, fio_location location);
extern DIR* fio_opendir(char const* path, fio_location location);
extern struct dirent * fio_readdir(DIR *dirp);
extern int fio_closedir(DIR *dirp);

View File

@ -198,6 +198,13 @@ parray_bsearch(parray *array, const void *key, int(*compare)(const void *, const
return bsearch(&key, array->data, array->used, sizeof(void *), compare);
}
int
parray_bsearch_index(parray *array, const void *key, int(*compare)(const void *, const void *))
{
void **elem = parray_bsearch(array, key, compare);
return elem != NULL ? elem - array->data : -1;
}
/* checks that parray contains element */
bool parray_contains(parray *array, void *elem)
{

View File

@ -29,6 +29,7 @@ extern bool parray_rm(parray *array, const void *key, int(*compare)(const void *
extern size_t parray_num(const parray *array);
extern void parray_qsort(parray *array, int(*compare)(const void *, const void *));
extern void *parray_bsearch(parray *array, const void *key, int(*compare)(const void *, const void *));
extern int parray_bsearch_index(parray *array, const void *key, int(*compare)(const void *, const void *));
extern void parray_walk(parray *array, void (*action)(void *));
extern bool parray_contains(parray *array, void *elem);

View File

@ -6,7 +6,8 @@ from . import init, merge, option, show, compatibility, \
retention, pgpro560, pgpro589, pgpro2068, false_positive, replica, \
compression, page, ptrack, archive, exclude, cfs_backup, cfs_restore, \
cfs_validate_backup, auth_test, time_stamp, snapfs, logging, \
locking, remote, external, config, checkdb, set_backup, incr_restore
locking, remote, external, config, checkdb, set_backup, incr_restore, \
catchup
def load_tests(loader, tests, pattern):
@ -23,6 +24,7 @@ def load_tests(loader, tests, pattern):
# suite.addTests(loader.loadTestsFromModule(auth_test))
suite.addTests(loader.loadTestsFromModule(archive))
suite.addTests(loader.loadTestsFromModule(backup))
suite.addTests(loader.loadTestsFromModule(catchup))
suite.addTests(loader.loadTestsFromModule(compatibility))
suite.addTests(loader.loadTestsFromModule(checkdb))
suite.addTests(loader.loadTestsFromModule(config))

977
tests/catchup.py Normal file
View File

@ -0,0 +1,977 @@
import os
import signal
import unittest
from .helpers.ptrack_helpers import ProbackupTest, ProbackupException
module_name = 'catchup'
class CatchupTest(ProbackupTest, unittest.TestCase):
def setUp(self):
self.fname = self.id().split('.')[3]
#########################################
# Basic tests
#########################################
def test_basic_full_catchup(self):
"""
Test 'multithreaded basebackup' mode (aka FULL catchup)
"""
# preparation
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True
)
src_pg.slow_start()
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question AS SELECT 42 AS answer")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# do full catchup
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# 1st check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# run&recover catchup'ed instance
src_pg.stop()
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
# 2nd check: run verification query
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
# Cleanup
dst_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_full_catchup_with_tablespace(self):
"""
Test tablespace transfers
"""
# preparation
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True
)
src_pg.slow_start()
tblspace1_old_path = self.get_tblspace_path(src_pg, 'tblspace1_old')
self.create_tblspace_in_node(src_pg, 'tblspace1', tblspc_path = tblspace1_old_path)
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question TABLESPACE tblspace1 AS SELECT 42 AS answer")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# do full catchup with tablespace mapping
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
tblspace1_new_path = self.get_tblspace_path(dst_pg, 'tblspace1_new')
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = [
'-d', 'postgres',
'-p', str(src_pg.port),
'--stream',
'-T', '{0}={1}'.format(tblspace1_old_path, tblspace1_new_path)
]
)
# 1st check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# make changes in master tablespace
src_pg.safe_psql(
"postgres",
"UPDATE ultimate_question SET answer = -1")
src_pg.stop()
# run&recover catchup'ed instance
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
# 2nd check: run verification query
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
# Cleanup
dst_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_basic_delta_catchup(self):
"""
Test delta catchup
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question(answer int)")
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()
# preparation 3: make changes on master (source)
src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()
src_pg.safe_psql("postgres", "INSERT INTO ultimate_question VALUES(42)")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# do delta catchup
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# 1st check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# run&recover catchup'ed instance
src_pg.stop()
self.set_replica(master = src_pg, replica = dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
# 2nd check: run verification query
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
# Cleanup
dst_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_basic_ptrack_catchup(self):
"""
Test ptrack catchup
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
ptrack_enable = True,
initdb_params = ['--data-checksums']
)
src_pg.slow_start()
src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question(answer int)")
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()
# preparation 3: make changes on master (source)
src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()
src_pg.safe_psql("postgres", "INSERT INTO ultimate_question VALUES(42)")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# do ptrack catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# 1st check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# run&recover catchup'ed instance
src_pg.stop()
self.set_replica(master = src_pg, replica = dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
# 2nd check: run verification query
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
# Cleanup
dst_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_tli_delta_catchup(self):
"""
Test that we correctly follow timeline change with delta catchup
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
# preparation 2: destination
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# preparation 3: promote source
src_pg.stop()
self.set_replica(dst_pg, src_pg) # fake replication
src_pg.slow_start(replica = True)
src_pg.promote()
src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# do catchup
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# 1st check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# run&recover catchup'ed instance
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
# 2nd check: run verification query
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
# Cleanup
src_pg.stop()
dst_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_tli_ptrack_catchup(self):
"""
Test that we correctly follow timeline change with ptrack catchup
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
ptrack_enable = True,
initdb_params = ['--data-checksums']
)
src_pg.slow_start()
src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
# preparation 2: destination
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# preparation 3: promote source
src_pg.stop()
self.set_replica(dst_pg, src_pg) # fake replication
src_pg.slow_start(replica = True)
src_pg.promote()
src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# do catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# 1st check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# run&recover catchup'ed instance
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
# 2nd check: run verification query
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
# Cleanup
src_pg.stop()
dst_pg.stop()
self.del_test_dir(module_name, self.fname)
#########################################
# Test various corner conditions
#########################################
def test_table_drop_with_delta(self):
"""
Test that dropped table in source will be dropped in delta catchup'ed instance too
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question AS SELECT 42 AS answer")
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# preparation 3: make changes on master (source)
# perform checkpoint twice to ensure, that datafile is actually deleted on filesystem
src_pg.safe_psql("postgres", "DROP TABLE ultimate_question")
src_pg.safe_psql("postgres", "CHECKPOINT")
src_pg.safe_psql("postgres", "CHECKPOINT")
# do delta catchup
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# Check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_table_drop_with_ptrack(self):
"""
Test that dropped table in source will be dropped in ptrack catchup'ed instance too
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
ptrack_enable = True,
initdb_params = ['--data-checksums']
)
src_pg.slow_start()
src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question AS SELECT 42 AS answer")
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# preparation 3: make changes on master (source)
# perform checkpoint twice to ensure, that datafile is actually deleted on filesystem
src_pg.safe_psql("postgres", "DROP TABLE ultimate_question")
src_pg.safe_psql("postgres", "CHECKPOINT")
src_pg.safe_psql("postgres", "CHECKPOINT")
# do ptrack catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# Check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_tablefile_truncation_with_delta(self):
"""
Test that truncated table in source will be truncated in delta catchup'ed instance too
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
src_pg.safe_psql(
"postgres",
"CREATE SEQUENCE t_seq; "
"CREATE TABLE t_heap AS SELECT i AS id, "
"md5(i::text) AS text, "
"md5(repeat(i::text, 10))::tsvector AS tsvector "
"FROM generate_series(0, 1024) i")
src_pg.safe_psql("postgres", "VACUUM t_heap")
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dest_options = {}
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# preparation 3: make changes on master (source)
src_pg.safe_psql("postgres", "DELETE FROM t_heap WHERE ctid >= '(11,0)'")
src_pg.safe_psql("postgres", "VACUUM t_heap")
# do delta catchup
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# Check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_tablefile_truncation_with_ptrack(self):
"""
Test that truncated table in source will be truncated in ptrack catchup'ed instance too
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
ptrack_enable = True,
initdb_params = ['--data-checksums']
)
src_pg.slow_start()
src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
src_pg.safe_psql(
"postgres",
"CREATE SEQUENCE t_seq; "
"CREATE TABLE t_heap AS SELECT i AS id, "
"md5(i::text) AS text, "
"md5(repeat(i::text, 10))::tsvector AS tsvector "
"FROM generate_series(0, 1024) i")
src_pg.safe_psql("postgres", "VACUUM t_heap")
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dest_options = {}
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# preparation 3: make changes on master (source)
src_pg.safe_psql("postgres", "DELETE FROM t_heap WHERE ctid >= '(11,0)'")
src_pg.safe_psql("postgres", "VACUUM t_heap")
# do ptrack catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# Check: compare data directories
self.compare_pgdata(
self.pgdata_content(src_pg.data_dir),
self.pgdata_content(dst_pg.data_dir)
)
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
#########################################
# Test reaction on user errors
#########################################
def test_local_tablespace_without_mapping(self):
"""
Test that we detect absence of needed --tablespace-mapping option
"""
if self.remote:
return unittest.skip('Skipped because this test tests local catchup error handling')
src_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'src'))
src_pg.slow_start()
tblspace_path = self.get_tblspace_path(src_pg, 'tblspace')
self.create_tblspace_in_node(
src_pg, 'tblspace',
tblspc_path = tblspace_path)
src_pg.safe_psql(
"postgres",
"CREATE TABLE ultimate_question TABLESPACE tblspace AS SELECT 42 AS answer")
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
try:
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = [
'-d', 'postgres',
'-p', str(src_pg.port),
'--stream',
]
)
self.assertEqual(1, 0, "Expecting Error because '-T' parameter is not specified.\n Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
'ERROR: Local catchup executed, but source database contains tablespace',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_running_dest_postmaster(self):
"""
Test that we detect running postmaster in destination
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
# preparation 2: destination
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
# leave running destination postmaster
# so don't call dst_pg.stop()
# try delta catchup
try:
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.assertEqual(1, 0, "Expecting Error because postmaster in destination is running.\n Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
'ERROR: Postmaster with pid ',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_same_db_id(self):
"""
Test that we detect different id's of source and destination
"""
# preparation:
# source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True
)
src_pg.slow_start()
# destination
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# fake destination
fake_dst_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'fake_dst'))
# fake source
fake_src_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'fake_src'))
# try delta catchup (src (with correct src conn), fake_dst)
try:
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = fake_dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.assertEqual(1, 0, "Expecting Error because database identifiers mismatch.\n Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
'ERROR: Database identifiers mismatch: ',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# try delta catchup (fake_src (with wrong src conn), dst)
try:
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = fake_src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.assertEqual(1, 0, "Expecting Error because database identifiers mismatch.\n Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
'ERROR: Database identifiers mismatch: ',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_destination_dbstate(self):
"""
Test that we detect that destination pg is not cleanly shutdowned
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
# preparation 2: destination
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
# try #1
try:
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.assertEqual(1, 0, "Expecting Error because destination pg is not cleanly shutdowned.\n Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
'ERROR: Destination directory contains "backup_label" file',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# try #2
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
self.assertNotEqual(dst_pg.pid, 0, "Cannot detect pid of running postgres")
os.kill(dst_pg.pid, signal.SIGKILL)
try:
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.assertEqual(1, 0, "Expecting Error because destination pg is not cleanly shutdowned.\n Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
'must be stopped cleanly',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_tli_destination_mismatch(self):
"""
Test that we detect TLI mismatch in destination
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
# preparation 2: destination
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
self.set_replica(src_pg, dst_pg)
dst_pg.slow_start(replica = True)
dst_pg.promote()
dst_pg.stop()
# preparation 3: "useful" changes
src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# try catchup
try:
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
dst_pg.stop()
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
except ProbackupException as e:
self.assertIn(
'ERROR: Source is behind destination in timeline history',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_tli_source_mismatch(self):
"""
Test that we detect TLI mismatch in source history
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
# preparation 2: fake source (promouted copy)
fake_src_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'fake_src'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = fake_src_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
fake_src_options = {}
fake_src_options['port'] = str(fake_src_pg.port)
self.set_auto_conf(fake_src_pg, fake_src_options)
self.set_replica(src_pg, fake_src_pg)
fake_src_pg.slow_start(replica = True)
fake_src_pg.promote()
self.switch_wal_segment(fake_src_pg)
fake_src_pg.safe_psql(
"postgres",
"CREATE TABLE t_heap AS SELECT i AS id, "
"md5(i::text) AS text, "
"md5(repeat(i::text, 10))::tsvector AS tsvector "
"FROM generate_series(0, 256) i")
self.switch_wal_segment(fake_src_pg)
fake_src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 'trash' AS garbage")
# preparation 3: destination
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_pg.stop()
# preparation 4: "useful" changes
src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
# try catchup
try:
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = fake_src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(fake_src_pg.port), '--stream']
)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start()
dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
dst_pg.stop()
self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
except ProbackupException as e:
self.assertIn(
'ERROR: Destination is not in source timeline history',
e.message,
'\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
# Cleanup
src_pg.stop()
fake_src_pg.stop()
self.del_test_dir(module_name, self.fname)

View File

@ -345,14 +345,9 @@ class ProbackupTest(object):
# print('PGPROBACKUP_SSH_USER is not set')
# exit(1)
def make_simple_node(
def make_empty_node(
self,
base_dir=None,
set_replication=False,
ptrack_enable=False,
initdb_params=[],
pg_options={}):
base_dir=None):
real_base_dir = os.path.join(self.tmp_path, base_dir)
shutil.rmtree(real_base_dir, ignore_errors=True)
os.makedirs(real_base_dir)
@ -361,6 +356,17 @@ class ProbackupTest(object):
# bound method slow_start() to 'node' class instance
node.slow_start = slow_start.__get__(node)
node.should_rm_dirs = True
return node
def make_simple_node(
self,
base_dir=None,
set_replication=False,
ptrack_enable=False,
initdb_params=[],
pg_options={}):
node = self.make_empty_node(base_dir)
node.init(
initdb_params=initdb_params, allow_streaming=set_replication)
@ -1036,6 +1042,28 @@ class ProbackupTest(object):
return self.run_pb(cmd_list + options, gdb=gdb, old_binary=old_binary)
def catchup_node(
self,
backup_mode, source_pgdata, destination_node,
options = []
):
cmd_list = [
'catchup',
'--backup-mode={0}'.format(backup_mode),
'--source-pgdata={0}'.format(source_pgdata),
'--destination-pgdata={0}'.format(destination_node.data_dir)
]
if self.remote:
cmd_list += ['--remote-proto=ssh', '--remote-host=localhost']
if self.verbose:
cmd_list += [
'--log-level-file=VERBOSE',
'--log-directory={0}'.format(destination_node.logs_dir)
]
return self.run_pb(cmd_list + options)
def show_pb(
self, backup_dir, instance=None, backup_id=None,
options=[], as_text=False, as_json=True, old_binary=False,
@ -1736,10 +1764,10 @@ class ProbackupTest(object):
):
fail = True
error_message += '\nFile permissions mismatch:\n'
error_message += ' File_old: {0} Permissions: {1}\n'.format(
error_message += ' File_old: {0} Permissions: {1:o}\n'.format(
os.path.join(original_pgdata['pgdata'], file),
original_pgdata['files'][file]['mode'])
error_message += ' File_new: {0} Permissions: {1}\n'.format(
error_message += ' File_new: {0} Permissions: {1:o}\n'.format(
os.path.join(restored_pgdata['pgdata'], file),
restored_pgdata['files'][file]['mode'])