1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-24 08:52:38 +02:00

[pbckp-128] dry-run option for catchup (#477)

* Added dry-run option for catchup. Run catchup without affect on the files and WAL
This commit is contained in:
dlepikhova 2022-06-01 12:49:09 +05:00 committed by GitHub
parent 7be2e738a9
commit 884e8b09f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 219 additions and 31 deletions

View File

@ -2,7 +2,7 @@
*
* catchup.c: sync DB cluster
*
* Copyright (c) 2021, Postgres Professional
* Copyright (c) 2022, Postgres Professional
*
*-------------------------------------------------------------------------
*/
@ -507,16 +507,20 @@ catchup_multithreaded_copy(int num_threads,
/* Run threads */
thread_interrupted = false;
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
for (i = 0; i < num_threads; i++)
if (!dry_run)
{
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
for (i = 0; i < num_threads; i++)
{
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
}
}
/* Wait threads */
for (i = 0; i < num_threads; i++)
{
pthread_join(threads[i], NULL);
if (!dry_run)
pthread_join(threads[i], NULL);
all_threads_successful &= threads_args[i].completed;
transfered_bytes_result += threads_args[i].transfered_bytes;
}
@ -706,9 +710,14 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
/* Start stream replication */
join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR);
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
if (!dry_run)
{
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
}
else
elog(INFO, "WAL streaming skipping with --dry-run option");
source_filelist = parray_new();
@ -779,9 +788,9 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
/* Build the page map from ptrack information */
make_pagemap_from_ptrack_2(source_filelist, source_conn,
source_node_info.ptrack_schema,
source_node_info.ptrack_version_num,
dest_redo.lsn);
source_node_info.ptrack_schema,
source_node_info.ptrack_version_num,
dest_redo.lsn);
time(&end_time);
elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec",
difftime(end_time, start_time));
@ -820,9 +829,9 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char dirpath[MAXPGPATH];
join_path_components(dirpath, dest_pgdata, file->rel_path);
elog(VERBOSE, "Create directory '%s'", dirpath);
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
if (!dry_run)
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
}
else
{
@ -853,15 +862,18 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
elog(VERBOSE, "Create directory \"%s\" and symbolic link \"%s\"",
linked_path, to_path);
/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));
if (!dry_run)
{
/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));
/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
}
}
}
@ -930,7 +942,10 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char fullpath[MAXPGPATH];
join_path_components(fullpath, dest_pgdata, file->rel_path);
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
if (!dry_run)
{
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
}
elog(VERBOSE, "Deleted file \"%s\"", fullpath);
/* shrink dest pgdata list */
@ -961,7 +976,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
catchup_isok = transfered_datafiles_bytes != -1;
/* at last copy control file */
if (catchup_isok)
if (catchup_isok && !dry_run)
{
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
@ -972,7 +987,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
transfered_datafiles_bytes += source_pg_control_file->size;
}
if (!catchup_isok)
if (!catchup_isok && !dry_run)
{
char pretty_time[20];
char pretty_transfered_data_bytes[20];
@ -1010,14 +1025,18 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
pg_free(stop_backup_query_text);
}
wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);
if (!dry_run)
wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);
#if PG_VERSION_NUM >= 90600
/* Write backup_label */
Assert(stop_backup_result.backup_label_content != NULL);
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
if (!dry_run)
{
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
}
free(stop_backup_result.backup_label_content);
stop_backup_result.backup_label_content = NULL;
stop_backup_result.backup_label_content_len = 0;
@ -1040,6 +1059,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
#endif
/* wait for end of wal streaming and calculate wal size transfered */
if (!dry_run)
{
parray *wal_files_list = NULL;
wal_files_list = parray_new();
@ -1091,17 +1111,17 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
}
/* Sync all copied files unless '--no-sync' flag is used */
if (sync_dest_files)
if (sync_dest_files && !dry_run)
catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);
else
elog(WARNING, "Files are not synced to disk");
/* Cleanup */
if (dest_filelist)
if (dest_filelist && !dry_run)
{
parray_walk(dest_filelist, pgFileFree);
parray_free(dest_filelist);
}
parray_free(dest_filelist);
parray_walk(source_filelist, pgFileFree);
parray_free(source_filelist);
pgFileFree(source_pg_control_file);

View File

@ -261,6 +261,7 @@ help_pg_probackup(void)
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--dry-run]\n"));
printf(_(" [--help]\n"));
if ((PROGRAM_URL || PROGRAM_EMAIL))
@ -1047,6 +1048,7 @@ help_catchup(void)
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--dry-run]\n"));
printf(_(" [--help]\n\n"));
printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n"));
@ -1081,4 +1083,6 @@ help_catchup(void)
printf(_(" --remote-user=username user name for ssh connection (default: current user)\n"));
printf(_(" --ssh-options=ssh_options additional ssh options (default: none)\n"));
printf(_(" (example: --ssh-options='-c cipher_spec -F configfile')\n\n"));
printf(_(" --dry-run perform a trial run without any changes\n\n"));
}

View File

@ -1455,3 +1455,157 @@ class CatchupTest(ProbackupTest, unittest.TestCase):
dst_pg.stop()
#self.assertEqual(1, 0, 'Stop test')
self.del_test_dir(module_name, self.fname)
#########################################
# --dry-run
#########################################
def test_dry_run_catchup_full(self):
"""
Test dry-run option for full catchup
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True
)
src_pg.slow_start()
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()
# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)
# do full catchup
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)
# compare data dirs before and after catchup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_dry_run_catchup_ptrack(self):
"""
Test dry-run option for catchup in incremental ptrack mode
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
ptrack_enable = True,
initdb_params = ['--data-checksums']
)
src_pg.slow_start()
src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()
# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)
# do incremental catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)
# compare data dirs before and after cathup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)
def test_dry_run_catchup_delta(self):
"""
Test dry-run option for catchup in incremental delta mode
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
initdb_params = ['--data-checksums'],
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()
src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()
# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()
# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)
# do delta catchup
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', "--dry-run"]
)
# compare data dirs before and after cathup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)
# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)

View File

@ -178,6 +178,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
[--remote-proto] [--remote-host]
[--remote-port] [--remote-path] [--remote-user]
[--ssh-options]
[--dry-run]
[--help]
Read the website for details <https://github.com/postgrespro/pg_probackup>.

View File

@ -100,11 +100,20 @@ source pyenv/bin/activate
pip3 install testgres
echo "############### Testing:"
echo PG_PROBACKUP_PARANOIA=${PG_PROBACKUP_PARANOIA}
echo ARCHIVE_COMPRESSION=${ARCHIVE_COMPRESSION}
echo PGPROBACKUPBIN_OLD=${PGPROBACKUPBIN_OLD}
echo PGPROBACKUPBIN=${PGPROBACKUPBIN}
echo PGPROBACKUP_SSH_REMOTE=${PGPROBACKUP_SSH_REMOTE}
echo PGPROBACKUP_GDB=${PGPROBACKUP_GDB}
echo PG_PROBACKUP_PTRACK=${PG_PROBACKUP_PTRACK}
if [ "$MODE" = "basic" ]; then
export PG_PROBACKUP_TEST_BASIC=ON
echo PG_PROBACKUP_TEST_BASIC=${PG_PROBACKUP_TEST_BASIC}
python3 -m unittest -v tests
python3 -m unittest -v tests.init
else
echo PG_PROBACKUP_TEST_BASIC=${PG_PROBACKUP_TEST_BASIC}
python3 -m unittest -v tests.$MODE
fi