1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-12-04 10:44:46 +02:00

Merge branch 'pgpro-2065'

This commit is contained in:
Grigory Smolkin 2019-04-19 11:18:23 +03:00
commit 05c1c4e0aa
12 changed files with 1493 additions and 165 deletions

View File

@ -49,6 +49,8 @@ const char *progname = "pg_probackup";
/* list of files contained in backup */
static parray *backup_files_list = NULL;
/* list of indexes for use in checkdb --amcheck */
static parray *index_list = NULL;
/* We need critical section for datapagemap_add() in case of using threads */
static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER;
@ -73,8 +75,8 @@ static StreamThreadArg stream_thread_arg = {"", NULL, 1};
static int is_ptrack_enable = false;
bool is_ptrack_support = false;
bool is_checksum_enabled = false;
bool exclusive_backup = false;
bool heapallindexed_is_supported = false;
/* Backup connections */
static PGconn *backup_conn = NULL;
@ -96,11 +98,22 @@ static bool pg_stop_backup_is_sent = false;
static void backup_cleanup(bool fatal, void *userdata);
static void backup_disconnect(bool fatal, void *userdata);
static void pgdata_basic_setup(bool amcheck_only);
static void *backup_files(void *arg);
static void *remote_backup_files(void *arg);
static void do_backup_instance(void);
static void do_block_validation(void);
static void do_amcheck(void);
static void *check_files(void *arg);
static void *check_indexes(void *arg);
static parray* get_index_list(PGresult* res_db, int db_number,
bool first_db_with_amcheck, PGconn* db_conn);
static bool amcheck_one_index(backup_files_arg *arguments,
pg_indexEntry *ind);
static void pg_start_backup(const char *label, bool smooth, pgBackup *backup);
static void pg_switch_wal(PGconn *conn);
static void pg_stop_backup(pgBackup *backup);
@ -929,19 +942,268 @@ do_backup_instance(void)
backup_files_list = NULL;
}
/*
* Entry point of pg_probackup BACKUP subcommand.
*/
int
do_backup(time_t start_time, bool no_validate)
/* collect list of files and run threads to check files in the instance */
static void
do_block_validation(void)
{
/* PGDATA and BACKUP_MODE are always required */
if (instance_config.pgdata == NULL)
int i;
char database_path[MAXPGPATH];
/* arrays with meta info for multi threaded backup */
pthread_t *threads;
backup_files_arg *threads_args;
bool check_isok = true;
pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR);
/* initialize file list */
backup_files_list = parray_new();
/* list files with the logical path. omit $PGDATA */
dir_list_file(backup_files_list, instance_config.pgdata,
true, true, false, 0, FIO_DB_HOST);
/*
* Sort pathname ascending.
*
* For example:
* 1 - create 'base'
* 2 - create 'base/1'
*/
parray_qsort(backup_files_list, pgFileComparePath);
/* Extract information about files in backup_list parsing their names:*/
parse_backup_filelist_filenames(backup_files_list, instance_config.pgdata);
/* setup threads */
for (i = 0; i < parray_num(backup_files_list); i++)
{
pgFile *file = (pgFile *) parray_get(backup_files_list, i);
pg_atomic_clear_flag(&file->lock);
}
/* Sort by size for load balancing */
parray_qsort(backup_files_list, pgFileCompareSize);
/* init thread args with own file lists */
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
for (i = 0; i < num_threads; i++)
{
backup_files_arg *arg = &(threads_args[i]);
arg->from_root = instance_config.pgdata;
arg->to_root = database_path;
arg->files_list = backup_files_list;
arg->index_list = NULL;
arg->prev_filelist = NULL;
arg->prev_start_lsn = InvalidXLogRecPtr;
arg->backup_conn = NULL;
arg->cancel_conn = NULL;
/* By default there are some error */
arg->ret = 1;
}
/* TODO write better info message */
elog(INFO, "Start checking data files");
/* Run threads */
for (i = 0; i < num_threads; i++)
{
backup_files_arg *arg = &(threads_args[i]);
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, check_files, arg);
}
/* Wait threads */
for (i = 0; i < num_threads; i++)
{
pthread_join(threads[i], NULL);
if (threads_args[i].ret > 0)
check_isok = false;
}
/* cleanup */
if (backup_files_list)
{
parray_walk(backup_files_list, pgFileFree);
parray_free(backup_files_list);
backup_files_list = NULL;
}
/* TODO write better info message */
if (check_isok)
elog(INFO, "Data files are valid");
else
elog(ERROR, "Checkdb failed");
}
/*
* Entry point of checkdb --amcheck.
*
* Connect to all databases in the cluster
* and get list of persistent indexes,
* then run parallel threads to perform bt_index_check()
* for all indexes from the list.
*
* If amcheck extension is not installed in the database,
* skip this database and report it via warning message.
*/
static void
do_amcheck(void)
{
int i;
char database_path[MAXPGPATH];
/* arrays with meta info for multi threaded backup */
pthread_t *threads;
backup_files_arg *threads_args;
bool check_isok = true;
PGresult *res_db;
int n_databases = 0;
bool first_db_with_amcheck = true;
PGconn *db_conn = NULL;
bool db_skipped = false;
pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR);
res_db = pgut_execute(backup_conn,
"SELECT datname, oid, dattablespace "
"FROM pg_database "
"WHERE datname NOT IN ('template0', 'template1')",
0, NULL);
n_databases = PQntuples(res_db);
elog(INFO, "Start amchecking PostgreSQL instance");
/* For each database check indexes. In parallel. */
for(i = 0; i < n_databases; i++)
{
int j;
if (index_list != NULL)
{
free(index_list);
index_list = NULL;
}
index_list = get_index_list(res_db, i,
first_db_with_amcheck, db_conn);
if (index_list == NULL)
{
if (db_conn)
pgut_disconnect(db_conn);
db_skipped = true;
continue;
}
first_db_with_amcheck = false;
/* init thread args with own file lists */
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
for (j = 0; j < num_threads; j++)
{
backup_files_arg *arg = &(threads_args[j]);
arg->from_root = instance_config.pgdata;
arg->to_root = database_path;
arg->files_list = NULL;
arg->index_list = index_list;
arg->prev_filelist = NULL;
arg->prev_start_lsn = InvalidXLogRecPtr;
arg->backup_conn = NULL;
arg->cancel_conn = NULL;
arg->thread_num = j + 1;
/* By default there are some error */
arg->ret = 1;
}
/* Run threads */
for (j = 0; j < num_threads; j++)
{
backup_files_arg *arg = &(threads_args[j]);
elog(VERBOSE, "Start thread num: %i", j);
pthread_create(&threads[j], NULL, check_indexes, arg);
}
/* Wait threads */
for (j = 0; j < num_threads; j++)
{
pthread_join(threads[j], NULL);
if (threads_args[j].ret > 0)
check_isok = false;
}
/* cleanup */
pgut_disconnect(db_conn);
if (interrupted)
break;
}
/* Inform user about amcheck results */
if (!check_isok || interrupted)
elog(ERROR, "Checkdb --amcheck failed");
if (db_skipped)
elog(ERROR, "Some databases were not amchecked");
elog(INFO, "Checkdb --amcheck executed successfully");
/* We cannot state that all indexes are ok
* without checking indexes in all databases
*/
if (check_isok && !interrupted && !db_skipped)
elog(INFO, "Indexes are valid");
/* cleanup */
PQclear(res_db);
}
/* Entry point of pg_probackup CHECKDB subcommand. */
void
do_checkdb(bool need_amcheck)
{
bool amcheck_only = false;
if (skip_block_validation && !need_amcheck)
elog(ERROR, "Option '--skip-block-validation' must be used with '--amcheck' option");
if (skip_block_validation && need_amcheck)
amcheck_only = true;
pgdata_basic_setup(amcheck_only);
if (!skip_block_validation)
do_block_validation();
if (need_amcheck)
do_amcheck();
}
/*
* Common code for CHECKDB and BACKUP commands.
* Ensure that we're able to connect to the instance
* check compatibility and fill basic info.
* For checkdb launched in amcheck mode with pgdata validation
* do not check system ID, it gives user an opportunity to
* check remote PostgreSQL instance.
* Also checking system ID in this case serves no purpose, because
* all work is done by server.
*/
static void
pgdata_basic_setup(bool amcheck_only)
{
/* PGDATA is always required unless running checkdb in amcheck only mode */
if (!instance_config.pgdata && !amcheck_only)
elog(ERROR, "required parameter not specified: PGDATA "
"(-D, --pgdata)");
if (current.backup_mode == BACKUP_MODE_INVALID)
elog(ERROR, "required parameter not specified: BACKUP_MODE "
"(-b, --backup-mode)");
/* Create connection for PostgreSQL */
backup_conn = pgut_connect(instance_config.pghost, instance_config.pgport,
@ -951,14 +1213,6 @@ do_backup(time_t start_time, bool no_validate)
current.primary_conninfo = pgut_get_conninfo_string(backup_conn);
#if PG_VERSION_NUM >= 110000
if (!RetrieveWalSegSize(backup_conn))
elog(ERROR, "Failed to retreive wal_segment_size");
#endif
current.compress_alg = instance_config.compress_alg;
current.compress_level = instance_config.compress_level;
/* Confirm data block size and xlog block size are compatible */
confirm_block_size("block_size", BLCKSZ);
confirm_block_size("wal_block_size", XLOG_BLCKSZ);
@ -968,13 +1222,21 @@ do_backup(time_t start_time, bool no_validate)
/* Confirm that this server version is supported */
check_server_version();
/* TODO fix it for remote backup*/
if (!IsReplicationProtocol())
current.checksum_version = get_data_checksum_version(true);
if (pg_checksum_enable())
current.checksum_version = 1;
else
current.checksum_version = 0;
is_checksum_enabled = pg_checksum_enable();
/*
* Ensure that backup directory was initialized for the same PostgreSQL
* instance we opened connection to. And that target backup database PGDATA
* belogns to the same instance.
*/
/* TODO fix it for remote backup */
if (!IsReplicationProtocol() && !amcheck_only)
check_system_identifiers();
if (is_checksum_enabled)
if (current.checksum_version)
elog(LOG, "This PostgreSQL instance was initialized with data block checksums. "
"Data block corruption will be detected");
else
@ -984,6 +1246,29 @@ do_backup(time_t start_time, bool no_validate)
StrNCpy(current.server_version, server_version_str,
sizeof(current.server_version));
}
/*
* Entry point of pg_probackup BACKUP subcommand.
*/
int
do_backup(time_t start_time, bool no_validate)
{
/*
* setup backup_conn, do some compatibility checks and
* fill basic info about instance
*/
pgdata_basic_setup(false);
/* below perform checks specific for backup command */
#if PG_VERSION_NUM >= 110000
if (!RetrieveWalSegSize(backup_conn))
elog(ERROR, "Failed to retreive wal_segment_size");
#endif
current.compress_alg = instance_config.compress_alg;
current.compress_level = instance_config.compress_level;
current.stream = stream_wal;
is_ptrack_support = pg_ptrack_support();
@ -1016,15 +1301,6 @@ do_backup(time_t start_time, bool no_validate)
instance_config.master_user);
}
/*
* Ensure that backup directory was initialized for the same PostgreSQL
* instance we opened connection to. And that target backup database PGDATA
* belogns to the same instance.
*/
/* TODO fix it for remote backup */
if (!IsReplicationProtocol())
check_system_identifiers();
/* Start backup. Update backup status. */
current.status = BACKUP_STATUS_RUNNING;
current.start_time = start_time;
@ -1171,6 +1447,18 @@ check_system_identifiers(void)
system_id_pgdata = get_system_identifier(instance_config.pgdata);
system_id_conn = get_remote_system_identifier(backup_conn);
/* for checkdb check only system_id_pgdata and system_id_conn */
if (current.backup_mode == BACKUP_MODE_INVALID)
{
if (system_id_conn != system_id_pgdata)
{
elog(ERROR, "Data directory initialized with system id " UINT64_FORMAT ", "
"but connected instance system id is " UINT64_FORMAT,
system_id_pgdata, system_id_conn);
}
return;
}
if (system_id_conn != instance_config.system_identifier)
elog(ERROR, "Backup data directory was initialized for system id " UINT64_FORMAT ", "
"but connected instance system id is " UINT64_FORMAT,
@ -2272,6 +2560,154 @@ backup_disconnect(bool fatal, void *userdata)
pgut_disconnect(master_conn);
}
/*
* Check files in PGDATA.
* Read all files listed in backup_files_list.
* If the file is 'datafile' (regular relation's main fork), read it page by page,
* verify checksum and copy.
*/
static void *
check_files(void *arg)
{
int i;
backup_files_arg *arguments = (backup_files_arg *) arg;
int n_backup_files_list = 0;
if (arguments->files_list)
n_backup_files_list = parray_num(arguments->files_list);
/* check a file */
for (i = 0; i < n_backup_files_list; i++)
{
int ret;
struct stat buf;
pgFile *file = (pgFile *) parray_get(arguments->files_list, i);
if (!pg_atomic_test_set_flag(&file->lock))
continue;
elog(VERBOSE, "Checking file: \"%s\" ", file->path);
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during checkdb");
if (progress)
elog(INFO, "Progress: (%d/%d). Process file \"%s\"",
i + 1, n_backup_files_list, file->path);
/* stat file to check its current state */
ret = stat(file->path, &buf);
if (ret == -1)
{
if (errno == ENOENT)
{
/*
* If file is not found, this is not en error.
* It could have been deleted by concurrent postgres transaction.
*/
elog(LOG, "File \"%s\" is not found", file->path);
continue;
}
else
{
elog(ERROR,
"can't stat file to check \"%s\": %s",
file->path, strerror(errno));
}
}
/* No need to check directories */
if (S_ISDIR(buf.st_mode))
continue;
if (S_ISREG(buf.st_mode))
{
/* check only uncompressed by cfs datafiles */
if (file->is_datafile && !file->is_cfs)
{
char to_path[MAXPGPATH];
join_path_components(to_path, arguments->to_root,
file->path + strlen(arguments->from_root) + 1);
if (!check_data_file(arguments, file))
arguments->ret = 2; /* corruption found */
}
}
else
elog(WARNING, "unexpected file type %d", buf.st_mode);
}
/* Ret values:
* 0 everything is ok
* 1 thread errored during execution, e.g. interruption (default value)
* 2 corruption is definitely(!) found
*/
if (arguments->ret == 1)
arguments->ret = 0;
return NULL;
}
/* Check indexes with amcheck */
static void *
check_indexes(void *arg)
{
int i;
backup_files_arg *arguments = (backup_files_arg *) arg;
int n_indexes = 0;
if (arguments->index_list)
n_indexes = parray_num(arguments->index_list);
for (i = 0; i < n_indexes; i++)
{
pg_indexEntry *ind = (pg_indexEntry *) parray_get(arguments->index_list, i);
if (!pg_atomic_test_set_flag(&ind->lock))
continue;
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "Thread [%d]: interrupted during checkdb --amcheck",
arguments->thread_num);
if (progress)
elog(INFO, "Thread [%d]. Progress: (%d/%d). Amchecking index '%s.%s'",
arguments->thread_num, i + 1, n_indexes,
ind->amcheck_nspname, ind->name);
if (arguments->backup_conn == NULL)
{
arguments->backup_conn = pgut_connect(instance_config.pghost,
instance_config.pgport,
ind->dbname,
instance_config.pguser);
arguments->cancel_conn = PQgetCancel(arguments->backup_conn);
}
/* remember that we have a failed check */
if (!amcheck_one_index(arguments, ind))
arguments->ret = 2; /* corruption found */
}
/* Close connection */
if (arguments->backup_conn)
pgut_disconnect(arguments->backup_conn);
/* Ret values:
* 0 everything is ok
* 1 thread errored during execution, e.g. interruption (default value)
* 2 corruption is definitely(!) found
*/
if (arguments->ret == 1)
arguments->ret = 0;
return NULL;
}
/*
* Take a backup of the PGDATA at a file level.
* Copy all directories and files listed in backup_files_list.
@ -2358,6 +2794,7 @@ backup_files(void *arg)
/* File exists in previous backup */
file->exists_in_prev = true;
}
/* copy the file into backup */
if (file->is_datafile && !file->is_cfs)
{
@ -2985,7 +3422,7 @@ pg_ptrack_get_block(backup_files_arg *arguments,
res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn,
"SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)",
4, (const char **)params, true);
4, (const char **)params, true, false, false);
if (PQnfields(res) != 1)
{
@ -3052,3 +3489,172 @@ check_external_for_tablespaces(parray *external_list)
}
PQclear(res);
}
/* Get index list for given database */
static parray*
get_index_list(PGresult *res_db, int db_number,
bool first_db_with_amcheck, PGconn *db_conn)
{
PGresult *res;
char *nspname = NULL;
int i;
dbname = PQgetvalue(res_db, db_number, 0);
db_conn = pgut_connect(instance_config.pghost, instance_config.pgport,
dbname,
instance_config.pguser);
res = pgut_execute(db_conn, "SELECT "
"extname, nspname, extversion "
"FROM pg_namespace n "
"JOIN pg_extension e "
"ON n.oid=e.extnamespace "
"WHERE e.extname IN ('amcheck', 'amcheck_next') "
"ORDER BY extversion DESC "
"LIMIT 1",
0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
elog(ERROR, "Cannot check if amcheck is installed in database %s: %s",
dbname, PQerrorMessage(db_conn));
}
if (PQntuples(res) < 1)
{
elog(WARNING, "Extension 'amcheck' or 'amcheck_next' are not installed in database %s", dbname);
return NULL;
}
nspname = pgut_malloc(strlen(PQgetvalue(res, 0, 1)) + 1);
strcpy(nspname, PQgetvalue(res, 0, 1));
/* heapallindexed_is_supported is database specific */
if (strcmp(PQgetvalue(res, 0, 2), "1.0") != 0 &&
strcmp(PQgetvalue(res, 0, 2), "1") != 0)
heapallindexed_is_supported = true;
elog(INFO, "Amchecking database '%s' using extension '%s' version %s from schema '%s'",
dbname, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 2), PQgetvalue(res, 0, 1));
if (!heapallindexed_is_supported && heapallindexed)
elog(WARNING, "Extension '%s' verion %s in schema '%s' do not support 'heapallindexed' option",
PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 2), PQgetvalue(res, 0, 1));
/*
* In order to avoid duplicates, select global indexes
* (tablespace pg_global with oid 1664) only once.
*
* select only persistent btree indexes.
*/
if (first_db_with_amcheck)
{
res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname "
"FROM pg_index idx "
"JOIN pg_class cls ON idx.indexrelid=cls.oid "
"JOIN pg_am am ON cls.relam=am.oid "
"WHERE am.amname='btree' AND cls.relpersistence != 't'",
0, NULL);
}
else
{
res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname "
"FROM pg_index idx "
"JOIN pg_class cls ON idx.indexrelid=cls.oid "
"JOIN pg_am am ON cls.relam=am.oid "
"LEFT JOIN pg_tablespace tbl "
"ON cls.reltablespace=tbl.oid "
"AND tbl.spcname <> 'pg_global' "
"WHERE am.amname='btree' AND cls.relpersistence != 't'",
0, NULL);
}
/* add info needed to check indexes into index_list */
for (i = 0; i < PQntuples(res); i++)
{
pg_indexEntry *ind = (pg_indexEntry *) pgut_malloc(sizeof(pg_indexEntry));
char *name = NULL;
ind->indexrelid = atoi(PQgetvalue(res, i, 0));
name = PQgetvalue(res, i, 1);
ind->name = pgut_malloc(strlen(name) + 1);
strcpy(ind->name, name); /* enough buffer size guaranteed */
ind->dbname = pgut_malloc(strlen(dbname) + 1);
strcpy(ind->dbname, dbname);
ind->amcheck_nspname = pgut_malloc(strlen(nspname) + 1);
strcpy(ind->amcheck_nspname, nspname);
pg_atomic_clear_flag(&ind->lock);
if (index_list == NULL)
index_list = parray_new();
parray_append(index_list, ind);
}
PQclear(res);
return index_list;
}
/* check one index. Return true if everything is ok, false otherwise. */
static bool
amcheck_one_index(backup_files_arg *arguments,
pg_indexEntry *ind)
{
PGresult *res;
char *params[2];
char *query = NULL;
params[0] = palloc(64);
/* first argument is index oid */
sprintf(params[0], "%i", ind->indexrelid);
/* second argument is heapallindexed */
params[1] = heapallindexed ? "true" : "false";
if (interrupted)
elog(ERROR, "Interrupted");
if (heapallindexed_is_supported)
{
query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1, $2)")+1);
sprintf(query, "SELECT %s.bt_index_check($1, $2)", ind->amcheck_nspname);
res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn,
query, 2, (const char **)params, true, true, true);
}
else
{
query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1)")+1);
sprintf(query, "SELECT %s.bt_index_check($1)", ind->amcheck_nspname);
res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn,
query, 1, (const char **)params, true, true, true);
}
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(WARNING, "Thread [%d]. Amcheck failed for index: '%s.%s': %s",
arguments->thread_num, ind->amcheck_nspname,
ind->name, PQresultErrorMessage(res));
pfree(params[0]);
PQclear(res);
return false;
}
else
elog(LOG, "Thread [%d]. Amcheck succeeded for index: '%s.%s'",
arguments->thread_num, ind->amcheck_nspname, ind->name);
pfree(params[0]);
PQclear(res);
return true;
}

View File

@ -263,7 +263,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
*/
if (pg_checksum_page(page, blkno) != ((PageHeader) page)->pd_checksum)
{
elog(WARNING, "File: %s blknum %u have wrong checksum, try again",
elog(LOG, "File: %s blknum %u have wrong checksum, try again",
file->path, blknum);
return -1;
}
@ -289,6 +289,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
* Returns 0 if page was successfully retrieved
* SkipCurrentPage(-3) if we need to skip this page
* PageIsTruncated(-2) if the page was truncated
* PageIsCorrupted(-4) if the page check mismatch
*/
static int32
prepare_page(backup_files_arg *arguments,
@ -296,7 +297,8 @@ prepare_page(backup_files_arg *arguments,
BlockNumber blknum, BlockNumber nblocks,
FILE *in, BlockNumber *n_skipped,
BackupMode backup_mode,
Page page)
Page page,
bool strict)
{
XLogRecPtr page_lsn = 0;
int try_again = 100;
@ -306,7 +308,7 @@ prepare_page(backup_files_arg *arguments,
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "Interrupted during backup");
elog(ERROR, "Interrupted during page reading");
/*
* Read the page and verify its header and checksum.
@ -338,7 +340,7 @@ prepare_page(backup_files_arg *arguments,
*/
//elog(WARNING, "Checksum_Version: %i", current.checksum_version ? 1 : 0);
if (result == -1 && is_ptrack_support)
if (result == -1 && is_ptrack_support && strict)
{
elog(WARNING, "File %s, block %u, try to fetch via SQL",
file->path, blknum);
@ -349,8 +351,27 @@ prepare_page(backup_files_arg *arguments,
* If page is not valid after 100 attempts to read it
* throw an error.
*/
if(!page_is_valid && !is_ptrack_support)
elog(ERROR, "Data file checksum mismatch. Canceling backup");
if (!page_is_valid &&
((strict && !is_ptrack_support) || !strict))
{
/* show this message for checkdb or backup without ptrack support */
elog(WARNING, "CORRUPTION in file %s, block %u",
file->path, blknum);
}
/* Backup with invalid block and without ptrack support must throw error */
if (!page_is_valid && strict && !is_ptrack_support)
elog(ERROR, "Data file corruption. Canceling backup");
/* Checkdb not going futher */
if (!strict)
{
if (page_is_valid)
return 0;
else
return PageIsCorrupted;
}
}
if (backup_mode == BACKUP_MODE_DIFF_PTRACK || (!page_is_valid && is_ptrack_support))
@ -381,7 +402,7 @@ prepare_page(backup_files_arg *arguments,
*/
memcpy(page, ptrack_page, BLCKSZ);
free(ptrack_page);
if (is_checksum_enabled)
if (current.checksum_version)
((PageHeader) page)->pd_checksum = pg_checksum_page(page, absolute_blknum);
}
/* get lsn from page, provided by pg_ptrack_get_block() */
@ -610,9 +631,9 @@ backup_data_file(backup_files_arg* arguments,
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
backup_mode, curr_page, true);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page, calg, clevel);
page_state, curr_page, calg, clevel);
n_blocks_read++;
if (page_state == PageIsTruncated)
break;
@ -634,7 +655,7 @@ backup_data_file(backup_files_arg* arguments,
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
backup_mode, curr_page, true);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page, calg, clevel);
n_blocks_read++;
@ -1456,8 +1477,6 @@ validate_one_page(Page page, pgFile *file,
{
PageHeader phdr;
XLogRecPtr lsn;
bool page_header_is_sane = false;
bool checksum_is_ok = false;
/* new level of paranoia */
if (page == NULL)
@ -1487,72 +1506,146 @@ validate_one_page(Page page, pgFile *file,
file->path, blknum);
}
/* Page is zeroed. No sense to check header and checksum. */
page_header_is_sane = false;
/* Page is zeroed. No sense in checking header and checksum. */
return PAGE_IS_FOUND_AND_VALID;
}
else
/* Verify checksum */
if (checksum_version)
{
if (PageGetPageSize(phdr) == BLCKSZ &&
PageGetPageLayoutVersion(phdr) == PG_PAGE_LAYOUT_VERSION &&
(phdr->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
phdr->pd_lower >= SizeOfPageHeaderData &&
phdr->pd_lower <= phdr->pd_upper &&
phdr->pd_upper <= phdr->pd_special &&
phdr->pd_special <= BLCKSZ &&
phdr->pd_special == MAXALIGN(phdr->pd_special))
page_header_is_sane = true;
/* Checksums are enabled, so check them. */
if (!(pg_checksum_page(page, file->segno * RELSEG_SIZE + blknum)
== ((PageHeader) page)->pd_checksum))
{
elog(WARNING, "File: %s blknum %u have wrong checksum",
file->path, blknum);
return PAGE_IS_FOUND_AND_NOT_VALID;
}
}
if (page_header_is_sane)
/* Check page for the sights of insanity.
* TODO: We should give more information about what exactly is looking "wrong"
*/
if (!(PageGetPageSize(phdr) == BLCKSZ &&
PageGetPageLayoutVersion(phdr) == PG_PAGE_LAYOUT_VERSION &&
(phdr->pd_flags & ~PD_VALID_FLAG_BITS) == 0 &&
phdr->pd_lower >= SizeOfPageHeaderData &&
phdr->pd_lower <= phdr->pd_upper &&
phdr->pd_upper <= phdr->pd_special &&
phdr->pd_special <= BLCKSZ &&
phdr->pd_special == MAXALIGN(phdr->pd_special)))
{
/* Verify checksum */
if (checksum_version)
{
/*
* If checksum is wrong, sleep a bit and then try again
* several times. If it didn't help, throw error
*/
if (pg_checksum_page(page, file->segno * RELSEG_SIZE + blknum)
== ((PageHeader) page)->pd_checksum)
{
checksum_is_ok = true;
}
else
{
elog(WARNING, "File: %s blknum %u have wrong checksum",
file->path, blknum);
}
}
else
{
/* Get lsn from page header. Ensure that page is from our time */
lsn = PageXLogRecPtrGet(phdr->pd_lsn);
/* Page does not looking good */
elog(WARNING, "Page header is looking insane: %s, block %i",
file->path, blknum);
return PAGE_IS_FOUND_AND_NOT_VALID;
}
if (lsn > stop_lsn)
elog(WARNING, "File: %s, block %u, checksum is not enabled. "
"Page is from future: pageLSN %X/%X stopLSN %X/%X",
file->path, blknum, (uint32) (lsn >> 32), (uint32) lsn,
(uint32) (stop_lsn >> 32), (uint32) stop_lsn);
else
return PAGE_IS_FOUND_AND_VALID;
}
/* At this point page header is sane, if checksums are enabled - the`re ok.
* Check that page is not from future.
*/
if (stop_lsn > 0)
{
/* Get lsn from page header. Ensure that page is from our time. */
lsn = PageXLogRecPtrGet(phdr->pd_lsn);
if (checksum_is_ok)
if (lsn > stop_lsn)
{
/* Get lsn from page header. Ensure that page is from our time */
lsn = PageXLogRecPtrGet(phdr->pd_lsn);
if (lsn > stop_lsn)
elog(WARNING, "File: %s, block %u, checksum is correct. "
"Page is from future: pageLSN %X/%X stopLSN %X/%X",
file->path, blknum, (uint32) (lsn >> 32), (uint32) lsn,
(uint32) (stop_lsn >> 32), (uint32) stop_lsn);
else
return PAGE_IS_FOUND_AND_VALID;
elog(WARNING, "File: %s, block %u, checksum is %s. "
"Page is from future: pageLSN %X/%X stopLSN %X/%X",
file->path, blknum, checksum_version ? "correct" : "not enabled",
(uint32) (lsn >> 32), (uint32) lsn,
(uint32) (stop_lsn >> 32), (uint32) stop_lsn);
return PAGE_IS_FOUND_AND_NOT_VALID;
}
}
return PAGE_IS_FOUND_AND_NOT_VALID;
return PAGE_IS_FOUND_AND_VALID;
}
/*
* Valiate pages of datafile in PGDATA one by one.
*
* returns true if the file is valid
* also returns true if the file was not found
*/
bool
check_data_file(backup_files_arg* arguments,
pgFile *file)
{
FILE *in;
BlockNumber blknum = 0;
BlockNumber nblocks = 0;
BlockNumber n_blocks_skipped = 0;
int page_state;
char curr_page[BLCKSZ];
bool is_valid = true;
in = fopen(file->path, PG_BINARY_R);
if (in == NULL)
{
/*
* If file is not found, this is not en error.
* It could have been deleted by concurrent postgres transaction.
*/
if (errno == ENOENT)
{
elog(LOG, "File \"%s\" is not found", file->path);
return true;
}
elog(WARNING, "cannot open file \"%s\": %s",
file->path, strerror(errno));
return false;
}
if (file->size % BLCKSZ != 0)
{
fclose(in);
elog(WARNING, "File: %s, invalid file size %zu", file->path, file->size);
}
/*
* Compute expected number of blocks in the file.
* NOTE This is a normal situation, if the file size has changed
* since the moment we computed it.
*/
nblocks = file->size/BLCKSZ;
for (blknum = 0; blknum < nblocks; blknum++)
{
page_state = prepare_page(arguments, file, InvalidXLogRecPtr,
blknum, nblocks, in, &n_blocks_skipped,
BACKUP_MODE_FULL, curr_page, false);
if (page_state == PageIsTruncated)
break;
if (page_state == PageIsCorrupted)
{
/* Page is corrupted, no need to elog about it,
* prepare_page() already done that
*/
is_valid = false;
continue;
}
/* At this point page is found and its checksum is ok, if any
* but could be 'insane'
* TODO: between prepare_page and validate_one_page we
* compute and compare checksum twice, it`s ineffective
*/
if (validate_one_page(curr_page, file, blknum,
InvalidXLogRecPtr,
0) == PAGE_IS_FOUND_AND_NOT_VALID)
{
/* Page is corrupted */
is_valid = false;
}
}
fclose(in);
return is_valid;
}
/* Valiate pages of datafile in backup one by one */

View File

@ -22,6 +22,7 @@ static void help_add_instance(void);
static void help_del_instance(void);
static void help_archive_push(void);
static void help_archive_get(void);
static void help_checkdb(void);
void
help_command(char *command)
@ -52,6 +53,8 @@ help_command(char *command)
help_archive_push();
else if (strcmp(command, "archive-get") == 0)
help_archive_get();
else if (strcmp(command, "checkdb") == 0)
help_checkdb();
else if (strcmp(command, "--help") == 0
|| strcmp(command, "help") == 0
|| strcmp(command, "-?") == 0
@ -147,6 +150,11 @@ help_pg_probackup(void)
printf(_(" [--recovery-target-name=target-name]\n"));
printf(_(" [--skip-block-validation]\n"));
printf(_("\n %s checkdb [-B backup-path] [--instance=instance_name]\n"), PROGRAM_NAME);
printf(_(" [-D pgdata-path] [--progress] [-j num-threads]\n"));
printf(_(" [--amcheck] [--skip-block-validation]\n"));
printf(_(" [--heapallindexed]\n"));
printf(_("\n %s show -B backup-path\n"), PROGRAM_NAME);
printf(_(" [--instance=instance_name [-i backup-id]]\n"));
printf(_(" [--format=format]\n"));
@ -200,14 +208,14 @@ help_pg_probackup(void)
static void
help_init(void)
{
printf(_("%s init -B backup-path\n\n"), PROGRAM_NAME);
printf(_("\n%s init -B backup-path\n\n"), PROGRAM_NAME);
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
}
static void
help_backup(void)
{
printf(_("%s backup -B backup-path -b backup-mode --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s backup -B backup-path -b backup-mode --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [-C] [--stream [-S slot-name] [--temp-slot]\n"));
printf(_(" [--backup-pg-log] [-j num-threads]\n"));
printf(_(" [--archive-timeout=archive-timeout] [--progress]\n"));
@ -320,7 +328,7 @@ help_backup(void)
static void
help_restore(void)
{
printf(_("%s restore -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s restore -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [-D pgdata-path] [-i backup-id] [-j num-threads]\n"));
printf(_(" [--recovery-target-time=time|--recovery-target-xid=xid|--recovery-target-lsn=lsn [--recovery-target-inclusive=boolean]]\n"));
printf(_(" [--recovery-target-timeline=timeline]\n"));
@ -404,7 +412,7 @@ help_restore(void)
static void
help_validate(void)
{
printf(_("%s validate -B backup-path [--instance=instance_name]\n"), PROGRAM_NAME);
printf(_("\n%s validate -B backup-path [--instance=instance_name]\n"), PROGRAM_NAME);
printf(_(" [-i backup-id] [--progress] [-j num-threads]\n"));
printf(_(" [--recovery-target-time=time|--recovery-target-xid=xid|--recovery-target-lsn=lsn [--recovery-target-inclusive=boolean]]\n"));
printf(_(" [--recovery-target-timeline=timeline]\n"));
@ -450,10 +458,62 @@ help_validate(void)
printf(_(" available units: 'ms', 's', 'min', 'h', 'd' (default: min)\n"));
}
static void
help_checkdb(void)
{
printf(_("\n%s checkdb [-B backup-path] [--instance=instance_name]\n"), PROGRAM_NAME);
printf(_(" [-D pgdata-path] [-j num-threads] [--progress]\n"));
printf(_(" [--amcheck] [--skip-block-validation]\n"));
printf(_(" [--heapallindexed]\n\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance\n"));
printf(_(" -D, --pgdata=pgdata-path location of the database storage area\n"));
printf(_(" --progress show progress\n"));
printf(_(" -j, --threads=NUM number of parallel threads\n"));
printf(_(" --skip-block-validation skip file-level block checking\n"));
printf(_(" can be used only with '--amcheck' option\n"));
printf(_(" --amcheck in addition to file-level block checking\n"));
printf(_(" check btree indexes via function 'bt_index_check()'\n"));
printf(_(" using 'amcheck' or 'amcheck_next' extensions\n"));
printf(_(" --heapallindexed also check that heap is indexed\n"));
printf(_(" can be used only with '--amcheck' option\n"));
printf(_("\n Logging options:\n"));
printf(_(" --log-level-console=log-level-console\n"));
printf(_(" level for console logging (default: info)\n"));
printf(_(" available options: 'off', 'error', 'warning', 'info', 'log', 'verbose'\n"));
printf(_(" --log-level-file=log-level-file\n"));
printf(_(" level for file logging (default: off)\n"));
printf(_(" available options: 'off', 'error', 'warning', 'info', 'log', 'verbose'\n"));
printf(_(" --log-filename=log-filename\n"));
printf(_(" filename for file logging (default: 'pg_probackup.log')\n"));
printf(_(" support strftime format (example: pg_probackup-%%Y-%%m-%%d_%%H%%M%%S.log\n"));
printf(_(" --error-log-filename=error-log-filename\n"));
printf(_(" filename for error logging (default: none)\n"));
printf(_(" --log-directory=log-directory\n"));
printf(_(" directory for file logging (default: BACKUP_PATH/log)\n"));
printf(_(" --log-rotation-size=log-rotation-size\n"));
printf(_(" rotate logfile if its size exceeds this value; 0 disables; (default: 0)\n"));
printf(_(" available units: 'kB', 'MB', 'GB', 'TB' (default: kB)\n"));
printf(_(" --log-rotation-age=log-rotation-age\n"));
printf(_(" rotate logfile if its age exceeds this value; 0 disables; (default: 0)\n"));
printf(_(" available units: 'ms', 's', 'min', 'h', 'd' (default: min)\n"));
printf(_("\n Connection options:\n"));
printf(_(" -U, --username=USERNAME user name to connect as (default: current local user)\n"));
printf(_(" -d, --dbname=DBNAME database to connect (default: username)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory(default: 'local socket')\n"));
printf(_(" -p, --port=PORT database server port (default: 5432)\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt\n"));
}
static void
help_show(void)
{
printf(_("%s show -B backup-path\n"), PROGRAM_NAME);
printf(_("\n%s show -B backup-path\n"), PROGRAM_NAME);
printf(_(" [--instance=instance_name [-i backup-id]]\n"));
printf(_(" [--format=format]\n\n"));
@ -466,7 +526,7 @@ help_show(void)
static void
help_delete(void)
{
printf(_("%s delete -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s delete -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [-i backup-id | --expired | --merge-expired] [--wal]\n"));
printf(_(" [-j num-threads] [--dry-run]\n\n"));
@ -506,7 +566,7 @@ help_delete(void)
static void
help_merge(void)
{
printf(_("%s merge -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s merge -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" -i backup-id [-j num-threads] [--progress]\n"));
printf(_(" [--log-level-console=log-level-console]\n"));
printf(_(" [--log-level-file=log-level-file]\n"));
@ -548,7 +608,7 @@ help_merge(void)
static void
help_set_config(void)
{
printf(_("%s set-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s set-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [--log-level-console=log-level-console]\n"));
printf(_(" [--log-level-file=log-level-file]\n"));
printf(_(" [--log-filename=log-filename]\n"));
@ -627,7 +687,7 @@ help_set_config(void)
static void
help_show_config(void)
{
printf(_("%s show-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s show-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" [--format=format]\n\n"));
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
@ -638,7 +698,7 @@ help_show_config(void)
static void
help_add_instance(void)
{
printf(_("%s add-instance -B backup-path -D pgdata-path\n"), PROGRAM_NAME);
printf(_("\n%s add-instance -B backup-path -D pgdata-path\n"), PROGRAM_NAME);
printf(_(" --instance=instance_name\n"));
printf(_(" [-E external-directory-path]\n"));
printf(_(" [--remote-proto] [--remote-host]\n"));
@ -665,7 +725,7 @@ help_add_instance(void)
static void
help_del_instance(void)
{
printf(_("%s del-instance -B backup-path --instance=instance_name\n\n"), PROGRAM_NAME);
printf(_("\n%s del-instance -B backup-path --instance=instance_name\n\n"), PROGRAM_NAME);
printf(_(" -B, --backup-path=backup-path location of the backup storage area\n"));
printf(_(" --instance=instance_name name of the instance to delete\n"));
@ -674,7 +734,7 @@ help_del_instance(void)
static void
help_archive_push(void)
{
printf(_("\n %s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" [--compress]\n"));
@ -702,7 +762,7 @@ help_archive_push(void)
static void
help_archive_get(void)
{
printf(_("\n %s archive-get -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_("\n%s archive-get -B backup-path --instance=instance_name\n"), PROGRAM_NAME);
printf(_(" --wal-file-path=wal-file-path\n"));
printf(_(" --wal-file-name=wal-file-name\n"));
printf(_(" [--remote-proto] [--remote-host]\n"));

View File

@ -39,7 +39,8 @@ typedef enum ProbackupSubcmd
MERGE_CMD,
SHOW_CMD,
SET_CONFIG_CMD,
SHOW_CONFIG_CMD
SHOW_CONFIG_CMD,
CHECKDB_CMD
} ProbackupSubcmd;
@ -94,6 +95,11 @@ bool no_validate = false;
bool skip_block_validation = false;
bool skip_external_dirs = false;
/* checkdb options */
bool need_amcheck = false;
bool heapallindexed = false;
bool amcheck_parent = false;
/* delete options */
bool delete_wal = false;
bool delete_expired = false;
@ -164,6 +170,10 @@ static ConfigOption cmd_options[] =
{ 'b', 143, "no-validate", &no_validate, SOURCE_CMD_STRICT },
{ 'b', 154, "skip-block-validation", &skip_block_validation, SOURCE_CMD_STRICT },
{ 'b', 156, "skip-external-dirs", &skip_external_dirs, SOURCE_CMD_STRICT },
/* checkdb options */
{ 'b', 195, "amcheck", &need_amcheck, SOURCE_CMD_STRICT },
{ 'b', 196, "heapallindexed", &heapallindexed, SOURCE_CMD_STRICT },
{ 'b', 197, "parent", &amcheck_parent, SOURCE_CMD_STRICT },
/* delete options */
{ 'b', 145, "wal", &delete_wal, SOURCE_CMD_STRICT },
{ 'b', 146, "expired", &delete_expired, SOURCE_CMD_STRICT },
@ -279,6 +289,8 @@ main(int argc, char *argv[])
backup_subcmd = SET_CONFIG_CMD;
else if (strcmp(argv[1], "show-config") == 0)
backup_subcmd = SHOW_CONFIG_CMD;
else if (strcmp(argv[1], "checkdb") == 0)
backup_subcmd = CHECKDB_CMD;
else if (strcmp(argv[1], "agent") == 0 && argc > 2)
{
remote_agent = argv[2];
@ -326,6 +338,7 @@ main(int argc, char *argv[])
* Make command string before getopt_long() will call. It permutes the
* content of argv.
*/
/* TODO why do we do that only for some commands? */
command_name = pstrdup(argv[1]);
if (backup_subcmd == BACKUP_CMD ||
backup_subcmd == RESTORE_CMD ||
@ -367,7 +380,7 @@ main(int argc, char *argv[])
if (help_opt)
help_command(command_name);
/* backup_path is required for all pg_probackup commands except help */
/* backup_path is required for all pg_probackup commands except help and checkdb */
if (backup_path == NULL)
{
/*
@ -375,28 +388,36 @@ main(int argc, char *argv[])
* from environment variable
*/
backup_path = getenv("BACKUP_PATH");
if (backup_path == NULL)
if (backup_path == NULL && backup_subcmd != CHECKDB_CMD)
elog(ERROR, "required parameter not specified: BACKUP_PATH (-B, --backup-path)");
}
canonicalize_path(backup_path);
setMyLocation();
/* Ensure that backup_path is a path to a directory */
rc = fio_stat(backup_path, &stat_buf, true, FIO_BACKUP_HOST);
if (rc != -1 && !S_ISDIR(stat_buf.st_mode))
elog(ERROR, "-B, --backup-path must be a path to directory");
if (backup_path != NULL)
{
canonicalize_path(backup_path);
/* Ensure that backup_path is an absolute path */
if (!is_absolute_path(backup_path))
elog(ERROR, "-B, --backup-path must be an absolute path");
/* Ensure that backup_path is a path to a directory */
rc = stat(backup_path, &stat_buf);
if (rc != -1 && !S_ISDIR(stat_buf.st_mode))
elog(ERROR, "-B, --backup-path must be a path to directory");
}
/* Ensure that backup_path is an absolute path */
if (!is_absolute_path(backup_path))
if (backup_path && !is_absolute_path(backup_path))
elog(ERROR, "-B, --backup-path must be an absolute path");
/* Option --instance is required for all commands except init and show */
if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD &&
backup_subcmd != VALIDATE_CMD)
if (instance_name == NULL)
{
if (instance_name == NULL)
if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD &&
backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD)
elog(ERROR, "required parameter not specified: --instance");
}
@ -404,7 +425,7 @@ main(int argc, char *argv[])
* If --instance option was passed, construct paths for backup data and
* xlog files of this backup instance.
*/
if (instance_name)
if ((backup_path != NULL) && instance_name)
{
sprintf(backup_instance_path, "%s/%s/%s",
backup_path, BACKUPS_DIR, instance_name);
@ -431,7 +452,6 @@ main(int argc, char *argv[])
if (instance_name)
{
char path[MAXPGPATH];
/* Read environment variables */
config_get_opt_env(instance_options);
@ -440,11 +460,38 @@ main(int argc, char *argv[])
{
join_path_components(path, backup_instance_path,
BACKUP_CATALOG_CONF_FILE);
config_read_opt(path, instance_options, ERROR, true, false);
if (backup_subcmd == CHECKDB_CMD)
config_read_opt(path, instance_options, ERROR, true, true);
else
config_read_opt(path, instance_options, ERROR, true, false);
}
setMyLocation();
}
/* Just read environment variables */
if (backup_path == NULL && backup_subcmd == CHECKDB_CMD)
config_get_opt_env(instance_options);
/* Sanity for checkdb, if backup_dir is provided but pgdata and instance are not */
if (backup_subcmd == CHECKDB_CMD &&
backup_path != NULL &&
instance_name == NULL &&
instance_config.pgdata == NULL)
elog(ERROR, "required parameter not specified: --instance");
/* Usually checkdb for file logging requires log_directory
* to be specified explicitly, but if backup_dir and instance name are provided,
* checkdb can use the tusual default values or values from config
*/
if (backup_subcmd == CHECKDB_CMD &&
(instance_config.logger.log_level_file != LOG_OFF &&
instance_config.logger.log_directory == NULL) &&
(!instance_config.pgdata || !instance_name))
elog(ERROR, "Cannot save checkdb logs to a file. "
"You must specify --log-directory option when running checkdb with "
"--log-level-file option enabled.");
/* Initialize logger */
init_logger(backup_path, &instance_config.logger);
@ -559,6 +606,11 @@ main(int argc, char *argv[])
PROGRAM_VERSION, base36enc(start_time), backup_mode, instance_name,
stream_wal ? "true" : "false", instance_config.remote.host ? "true" : "false");
/* sanity */
if (current.backup_mode == BACKUP_MODE_INVALID)
elog(ERROR, "required parameter not specified: BACKUP_MODE "
"(-b, --backup-mode)");
return do_backup(start_time, no_validate);
}
case RESTORE_CMD:
@ -596,6 +648,9 @@ main(int argc, char *argv[])
case SET_CONFIG_CMD:
do_set_config(false);
break;
case CHECKDB_CMD:
do_checkdb(need_amcheck);
break;
case NO_CMD:
/* Should not happen */
elog(ERROR, "Unknown subcommand");

View File

@ -136,6 +136,15 @@ typedef struct pgFile
* i.e. datafiles without _ptrack */
} pgFile;
typedef struct pg_indexEntry
{
Oid indexrelid;
char *name;
char *dbname;
char *amcheck_nspname; /* schema where amcheck extention is located */
volatile pg_atomic_flag lock; /* lock for synchronization of parallel threads */
} pg_indexEntry;
/* Special values of datapagemap_t bitmapsize */
#define PageBitmapIsEmpty 0 /* Used to mark unchanged datafiles */
@ -311,6 +320,8 @@ typedef struct
PGconn *backup_conn;
PGcancel *cancel_conn;
parray *index_list;
int thread_num;
/*
* Return value from the thread.
@ -332,6 +343,7 @@ typedef struct BackupPageHeader
/* Special value for compressed_size field */
#define PageIsTruncated -2
#define SkipCurrentPage -3
#define PageIsCorrupted -4 /* used by checkdb */
/*
@ -399,7 +411,6 @@ extern bool smooth_checkpoint;
extern char* remote_agent;
extern bool is_ptrack_support;
extern bool is_checksum_enabled;
extern bool exclusive_backup;
/* restore options */
@ -423,6 +434,9 @@ extern char *instance_name;
/* show options */
extern ShowFormat show_format;
/* checkdb options */
extern bool heapallindexed;
/* current settings */
extern pgBackup current;
@ -432,6 +446,7 @@ extern const char *pgdata_exclude_dir[];
/* in backup.c */
extern int do_backup(time_t start_time, bool no_validate);
extern void do_checkdb(bool need_amcheck);
extern BackupMode parse_backup_mode(const char *value);
extern const char *deparse_backup_mode(BackupMode mode);
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
@ -587,6 +602,8 @@ extern int pgFileCompareLinked(const void *f1, const void *f2);
extern int pgFileCompareSize(const void *f1, const void *f2);
/* in data.c */
extern bool check_data_file(backup_files_arg* arguments,
pgFile *file);
extern bool backup_data_file(backup_files_arg* arguments,
const char *to_path, pgFile *file,
XLogRecPtr prev_backup_start_lsn,

View File

@ -189,10 +189,8 @@ assign_option(ConfigOption *opt, const char *optarg, OptionSource src)
const char *message;
if (opt == NULL)
{
fprintf(stderr, "Try \"%s --help\" for more information.\n", PROGRAM_NAME);
exit_or_abort(ERROR);
}
elog(ERROR, "Option is not found. Try \"%s --help\" for more information.\n",
PROGRAM_NAME);
if (opt->source > src)
{

View File

@ -72,8 +72,12 @@ static pthread_mutex_t log_file_mutex = PTHREAD_MUTEX_INITIALIZER;
void
init_logger(const char *root_path, LoggerConfig *config)
{
/* Set log path */
if (config->log_directory == NULL)
/*
* If logging to file is enabled and log_directory wasn't set
* by user, init the path with default value: backup_directory/log/
* */
if (config->log_level_file != LOG_OFF
&& config->log_directory == NULL)
{
config->log_directory = pgut_malloc(MAXPGPATH);
join_path_components(config->log_directory,

View File

@ -22,7 +22,7 @@
#include "file.h"
const char *PROGRAM_NAME = NULL;
const char *PROGRAM_NAME = "pg_probackup";
static char *password = NULL;
bool prompt_password = true;
@ -44,6 +44,8 @@ static void on_interrupt(void);
static void on_cleanup(void);
static pqsigfunc oldhandler = NULL;
void discard_response(PGconn *conn);
void
pgut_init(void)
{
@ -360,7 +362,7 @@ PGresult *
pgut_execute_parallel(PGconn* conn,
PGcancel* thread_cancel_conn, const char *query,
int nParams, const char **params,
bool text_result)
bool text_result, bool ok_error, bool async)
{
PGresult *res;
@ -388,15 +390,56 @@ pgut_execute_parallel(PGconn* conn,
}
//on_before_exec(conn, thread_cancel_conn);
if (nParams == 0)
res = PQexec(conn, query);
if (async)
{
/* clean any old data */
discard_response(conn);
if (nParams == 0)
PQsendQuery(conn, query);
else
PQsendQueryParams(conn, query, nParams, NULL, params, NULL, NULL,
/*
* Specify zero to obtain results in text format,
* or one to obtain results in binary format.
*/
(text_result) ? 0 : 1);
/* wait for processing, TODO: timeout */
for (;;)
{
if (interrupted)
{
pgut_cancel(conn);
pgut_disconnect(conn);
elog(ERROR, "interrupted");
}
if (!PQconsumeInput(conn))
elog(ERROR, "query failed: %squery was: %s",
PQerrorMessage(conn), query);
/* query is no done */
if (!PQisBusy(conn))
break;
usleep(10000);
}
res = PQgetResult(conn);
}
else
res = PQexecParams(conn, query, nParams, NULL, params, NULL, NULL,
/*
* Specify zero to obtain results in text format,
* or one to obtain results in binary format.
*/
(text_result) ? 0 : 1);
{
if (nParams == 0)
res = PQexec(conn, query);
else
res = PQexecParams(conn, query, nParams, NULL, params, NULL, NULL,
/*
* Specify zero to obtain results in text format,
* or one to obtain results in binary format.
*/
(text_result) ? 0 : 1);
}
//on_after_exec(thread_cancel_conn);
switch (PQresultStatus(res))
@ -406,6 +449,9 @@ pgut_execute_parallel(PGconn* conn,
case PGRES_COPY_IN:
break;
default:
if (ok_error && PQresultStatus(res) == PGRES_FATAL_ERROR)
break;
elog(ERROR, "query failed: %squery was: %s",
PQerrorMessage(conn), query);
break;
@ -787,22 +833,6 @@ on_cleanup(void)
call_atexit_callbacks(false);
}
void
exit_or_abort(int exitcode)
{
if (in_cleanup)
{
/* oops, error in cleanup*/
call_atexit_callbacks(true);
abort();
}
else
{
/* normal exit */
exit(exitcode);
}
}
void *
pgut_malloc(size_t size)
{
@ -998,3 +1028,16 @@ select_win32(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, con
}
#endif /* WIN32 */
void
discard_response(PGconn *conn)
{
PGresult *res;
do
{
res = PQgetResult(conn);
if (res)
PQclear(res);
} while (res);
}

View File

@ -41,7 +41,6 @@ extern void pgut_atexit_push(pgut_atexit_callback callback, void *userdata);
extern void pgut_atexit_pop(pgut_atexit_callback callback, void *userdata);
extern void pgut_init(void);
extern void exit_or_abort(int exitcode);
/*
* Database connections
@ -59,7 +58,7 @@ extern PGresult *pgut_execute_extended(PGconn* conn, const char *query, int nPar
const char **params, bool text_result, bool ok_error);
extern PGresult *pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn,
const char *query, int nParams,
const char **params, bool text_result);
const char **params, bool text_result, bool ok_error, bool async);
extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);
extern void pgut_cancel(PGconn* conn);
extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout);

View File

@ -5,7 +5,7 @@ from . import init_test, merge, option_test, show_test, compatibility, \
retention, pgpro560, pgpro589, pgpro2068, false_positive, replica, \
compression, page, ptrack, archive, exclude, cfs_backup, cfs_restore, \
cfs_validate_backup, auth_test, time_stamp, snapfs, logging, \
locking, remote, external, config
locking, remote, external, config, checkdb
def load_tests(loader, tests, pattern):
@ -14,6 +14,7 @@ def load_tests(loader, tests, pattern):
suite.addTests(loader.loadTestsFromModule(archive))
suite.addTests(loader.loadTestsFromModule(backup_test))
suite.addTests(loader.loadTestsFromModule(compatibility))
suite.addTests(loader.loadTestsFromModule(checkdb))
suite.addTests(loader.loadTestsFromModule(config))
# suite.addTests(loader.loadTestsFromModule(cfs_backup))
# suite.addTests(loader.loadTestsFromModule(cfs_restore))

422
tests/checkdb.py Normal file
View File

@ -0,0 +1,422 @@
import os
import unittest
from .helpers.ptrack_helpers import ProbackupTest, ProbackupException
from datetime import datetime, timedelta
import subprocess
from testgres import QueryException
import shutil
import sys
import time
module_name = 'checkdb'
class CheckdbTest(ProbackupTest, unittest.TestCase):
# @unittest.skip("skip")
def test_checkdb_amcheck_only_sanity(self):
""""""
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
set_replication=True,
initdb_params=['--data-checksums'])
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
node.slow_start()
node.safe_psql(
"postgres",
"create table t_heap as select i"
" as id from generate_series(0,100) i")
node.safe_psql(
"postgres",
"create index on t_heap(id)")
node.safe_psql(
"postgres",
"create extension amcheck")
log_file_path = os.path.join(
backup_dir, 'log', 'pg_probackup.log')
# simple sanity
try:
self.checkdb_node(
options=['--skip-block-validation'])
# we should die here because exception is what we expect to happen
self.assertEqual(
1, 0,
"Expecting Error because --amcheck option is missing\n"
" Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: Option '--skip-block-validation' must be "
"used with '--amcheck' option",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
# simple sanity
output = self.checkdb_node(
options=[
'--amcheck',
'--skip-block-validation',
'-d', 'postgres', '-p', str(node.port)])
self.assertIn(
'INFO: Checkdb --amcheck executed successfully',
output)
self.assertIn(
'INFO: Indexes are valid',
output)
# logging to file sanity
try:
self.checkdb_node(
options=[
'--amcheck',
'--skip-block-validation',
'--log-level-file=verbose',
'-d', 'postgres','-p', str(node.port)])
# we should die here because exception is what we expect to happen
self.assertEqual(
1, 0,
"Expecting Error because log_directory missing\n"
" Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: Cannot save checkdb logs to a file. "
"You must specify --log-directory option when "
"running checkdb with --log-level-file option enabled",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
# If backup_dir provided, then instance name must be
# provided too
try:
self.checkdb_node(
backup_dir,
options=[
'--amcheck',
'--skip-block-validation',
'--log-level-file=verbose',
'-d', 'postgres', '-p', str(node.port)])
# we should die here because exception is what we expect to happen
self.assertEqual(
1, 0,
"Expecting Error because log_directory missing\n"
" Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: required parameter not specified: --instance",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
# checkdb can use default or set in config values,
# if backup_dir and instance name are provided
self.checkdb_node(
backup_dir,
'node',
options=[
'--amcheck',
'--skip-block-validation',
'--log-level-file=verbose',
'-d', 'postgres', '-p', str(node.port)])
# check that file present and full of messages
os.path.isfile(log_file_path)
with open(log_file_path) as f:
log_file_content = f.read()
self.assertIn(
'INFO: Checkdb --amcheck executed successfully',
log_file_content)
self.assertIn(
'VERBOSE: (query)',
log_file_content)
os.unlink(log_file_path)
# log-level-file and log-directory are provided
self.checkdb_node(
backup_dir,
'node',
options=[
'--amcheck',
'--skip-block-validation',
'--log-level-file=verbose',
'--log-directory={0}'.format(
os.path.join(backup_dir, 'log')),
'-d', 'postgres', '-p', str(node.port)])
# check that file present and full of messages
os.path.isfile(log_file_path)
with open(log_file_path) as f:
log_file_content = f.read()
self.assertIn(
'INFO: Checkdb --amcheck executed successfully',
log_file_content)
self.assertIn(
'VERBOSE: (query)',
log_file_content)
os.unlink(log_file_path)
gdb = self.checkdb_node(
gdb=True,
options=[
'--amcheck',
'--skip-block-validation',
'--log-level-file=verbose',
'--log-directory={0}'.format(
os.path.join(backup_dir, 'log')),
'-d', 'postgres', '-p', str(node.port)])
gdb.set_breakpoint('amcheck_one_index')
gdb.run_until_break()
node.safe_psql(
"postgres",
"drop table t_heap")
gdb.remove_all_breakpoints()
gdb.continue_execution_until_exit()
# check that message about missing index is present
with open(log_file_path) as f:
log_file_content = f.read()
self.assertIn(
'ERROR: Checkdb --amcheck failed',
log_file_content)
self.assertIn(
"WARNING: Thread [1]. Amcheck failed for index: 'public.t_heap_id_idx':",
log_file_content)
self.assertIn(
'ERROR: could not open relation with OID',
log_file_content)
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_checkdb_amcheck_only_sanity_1(self):
""""""
fname = self.id().split('.')[3]
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
node = self.make_simple_node(
base_dir="{0}/{1}/node".format(module_name, fname),
set_replication=True)
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
node.slow_start()
# create two databases
node.safe_psql("postgres", "create database db1")
node.safe_psql("db1", "create extension amcheck")
node.safe_psql("postgres", "create database db2")
node.safe_psql("db2", "create extension amcheck")
# init pgbench in two databases and corrupt both indexes
node.pgbench_init(scale=5, dbname='db1')
node.pgbench_init(scale=5, dbname='db2')
node.safe_psql(
"db2",
"alter index pgbench_accounts_pkey rename to some_index")
index_path_1 = os.path.join(
node.data_dir,
node.safe_psql(
"db1",
"select pg_relation_filepath('pgbench_accounts_pkey')").rstrip())
index_path_2 = os.path.join(
node.data_dir,
node.safe_psql(
"db2",
"select pg_relation_filepath('some_index')").rstrip())
try:
self.checkdb_node(
options=[
'--amcheck',
'--skip-block-validation',
'-d', 'postgres', '-p', str(node.port)])
# we should die here because exception is what we expect to happen
self.assertEqual(
1, 0,
"Expecting Error because some db was not amchecked"
" Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: Some databases were not amchecked",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
node.stop()
# Let`s do index corruption
with open(index_path_1, "rb+", 0) as f:
f.seek(42000)
f.write(b"blablahblahs")
f.flush()
f.close
with open(index_path_2, "rb+", 0) as f:
f.seek(42000)
f.write(b"blablahblahs")
f.flush()
f.close
node.slow_start()
log_file_path = os.path.join(
backup_dir, 'log', 'pg_probackup.log')
try:
self.checkdb_node(
options=[
'--amcheck',
'--skip-block-validation',
'--log-level-file=verbose',
'--log-directory={0}'.format(
os.path.join(backup_dir, 'log')),
'-d', 'postgres', '-p', str(node.port)])
# we should die here because exception is what we expect to happen
self.assertEqual(
1, 0,
"Expecting Error because some db was not amchecked"
" Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: Checkdb --amcheck failed",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
# corruption of both indexes in db1 and db2 must be detected
# also the that amcheck is not installed in 'postgres'
# musted be logged
with open(log_file_path) as f:
log_file_content = f.read()
self.assertIn(
"WARNING: Thread [1]. Amcheck failed for index: 'public.pgbench_accounts_pkey':",
log_file_content)
self.assertIn(
"WARNING: Thread [1]. Amcheck failed for index: 'public.some_index':",
log_file_content)
self.assertIn(
"ERROR: Checkdb --amcheck failed",
log_file_content)
# Clean after yourself
self.del_test_dir(module_name, fname)
# @unittest.skip("skip")
def test_checkdb_block_validation_sanity(self):
"""make node, corrupt some pages, check that checkdb failed"""
fname = self.id().split('.')[3]
node = self.make_simple_node(
base_dir=os.path.join(module_name, fname, 'node'),
set_replication=True,
initdb_params=['--data-checksums'])
backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup')
self.init_pb(backup_dir)
self.add_instance(backup_dir, 'node', node)
node.slow_start()
node.safe_psql(
"postgres",
"create table t_heap as select 1 as id, md5(i::text) as text, "
"md5(repeat(i::text,10))::tsvector as tsvector "
"from generate_series(0,1000) i")
node.safe_psql(
"postgres",
"CHECKPOINT;")
heap_path = node.safe_psql(
"postgres",
"select pg_relation_filepath('t_heap')").rstrip()
# sanity
try:
self.checkdb_node()
# we should die here because exception is what we expect to happen
self.assertEqual(
1, 0,
"Expecting Error because pgdata must be specified\n"
" Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: required parameter not specified: PGDATA (-D, --pgdata)",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
self.checkdb_node(
data_dir=node.data_dir,
options=['-d', 'postgres', '-p', str(node.port)])
self.checkdb_node(
backup_dir, 'node',
options=['-d', 'postgres', '-p', str(node.port)])
heap_full_path = os.path.join(node.data_dir, heap_path)
with open(heap_full_path, "rb+", 0) as f:
f.seek(9000)
f.write(b"bla")
f.flush()
f.close
with open(heap_full_path, "rb+", 0) as f:
f.seek(42000)
f.write(b"bla")
f.flush()
f.close
try:
self.checkdb_node(
backup_dir, 'node',
options=['-d', 'postgres', '-p', str(node.port)])
# we should die here because exception is what we expect to happen
self.assertEqual(
1, 0,
"Expecting Error because of data corruption\n"
" Output: {0} \n CMD: {1}".format(
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: Checkdb failed",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
self.assertIn(
"WARNING: CORRUPTION in file {0}, block 1".format(heap_full_path),
e.message)
self.assertIn(
"WARNING: CORRUPTION in file {0}, block 5".format(heap_full_path),
e.message)
# Clean after yourself
self.del_test_dir(module_name, fname)

View File

@ -732,6 +732,24 @@ class ProbackupTest(object):
return self.run_pb(cmd_list + options, asynchronous, gdb, old_binary)
def checkdb_node(
self, backup_dir=False, instance=False, data_dir=False,
options=[], async=False, gdb=False, old_binary=False
):
cmd_list = ["checkdb"]
if backup_dir:
cmd_list += ["-B", backup_dir]
if instance:
cmd_list += ["--instance={0}".format(instance)]
if data_dir:
cmd_list += ["-D", data_dir]
return self.run_pb(cmd_list + options, async, gdb, old_binary)
def merge_backup(
self, backup_dir, instance, backup_id, asynchronous=False,
gdb=False, old_binary=False, options=[]):
@ -1441,6 +1459,18 @@ class GDBobj(ProbackupTest):
'Failed to set breakpoint.\n Output:\n {0}'.format(result)
)
def remove_all_breakpoints(self):
result = self._execute('delete')
for line in result:
if line.startswith('^done'):
return
raise GdbException(
'Failed to set breakpoint.\n Output:\n {0}'.format(result)
)
def run_until_break(self):
result = self._execute('run', False)
for line in result: