1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-28 09:33:54 +02:00

Refactoring of checkdb. Move code to a separate file, improve error messages

This commit is contained in:
Anastasia 2019-05-31 18:15:43 +03:00
parent a9bbff9fa4
commit 30126c6eb5
10 changed files with 987 additions and 819 deletions

View File

@ -4,7 +4,7 @@ PROGRAM = pg_probackup
OBJS = src/utils/configuration.o src/utils/json.o src/utils/logger.o \
src/utils/parray.o src/utils/pgut.o src/utils/thread.o src/utils/remote.o src/utils/file.o
OBJS += src/archive.o src/backup.o src/catalog.o src/configure.o src/data.o \
OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \
src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \
src/parsexlog.o src/pg_probackup.o src/restore.o src/show.o src/util.o \
src/validate.o

File diff suppressed because it is too large Load Diff

709
src/checkdb.c Normal file
View File

@ -0,0 +1,709 @@
/*-------------------------------------------------------------------------
*
* src/checkdb.c
* pg_probackup checkdb subcommand
*
* It allows to validate all data files located in PGDATA
* via block checksums matching and page header sanity checks.
* Optionally all indexes in all databases in PostgreSQL
* instance can be logically verified using extensions
* amcheck or amcheck_next.
*
* Portions Copyright (c) 2019-2019, Postgres Professional
*
*-------------------------------------------------------------------------
*/
#include "pg_probackup.h"
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#include "utils/thread.h"
#include "utils/file.h"
typedef struct
{
/* list of files to validate */
parray *files_list;
/* if page checksums are enabled in this postgres instance? */
uint32 checksum_version;
/*
* conn and cancel_conn
* to use in check_data_file
* to connect to postgres if we've failed to validate page
* and want to read it via buffer cache to ensure
*/
ConnectionArgs conn_arg;
/* number of thread for debugging */
int thread_num;
/*
* Return value from the thread:
* 0 everything is ok
* 1 thread errored during execution, e.g. interruption (default value)
* 2 corruption is definitely(!) found
*/
int ret;
} check_files_arg;
typedef struct
{
/* list of indexes to amcheck */
parray *index_list;
/*
* credentials to connect to postgres instance
* used for compatibility checks of blocksize,
* server version and so on
*/
ConnectionOptions conn_opt;
/*
* conn and cancel_conn
* to use in threads to connect to databases
*/
ConnectionArgs conn_arg;
/* number of thread for debugging */
int thread_num;
/*
* Return value from the thread:
* 0 everything is ok
* 1 thread errored during execution, e.g. interruption (default value)
* 2 corruption is definitely(!) found
*/
int ret;
} check_indexes_arg;
typedef struct pg_indexEntry
{
Oid indexrelid;
char *name;
bool heapallindexed_is_supported;
/* schema where amcheck extention is located */
char *amcheck_nspname;
/* lock for synchronization of parallel threads */
volatile pg_atomic_flag lock;
} pg_indexEntry;
static void
pg_indexEntry_free(void *index)
{
pg_indexEntry *index_ptr;
if (index == NULL)
return;
index_ptr = (pg_indexEntry *) index;
if (index_ptr->name)
free(index_ptr->name);
if (index_ptr->amcheck_nspname)
free(index_ptr->amcheck_nspname);
free(index_ptr);
}
static void *check_files(void *arg);
static void do_block_validation(char *pgdata, uint32 checksum_version);
static void *check_indexes(void *arg);
static parray* get_index_list(const char *dbname, bool first_db_with_amcheck,
PGconn *db_conn);
static bool amcheck_one_index(check_indexes_arg *arguments,
pg_indexEntry *ind);
static void do_amcheck(ConnectionOptions conn_opt, PGconn *conn);
/*
* Check files in PGDATA.
* Read all files listed in files_list.
* If the file is 'datafile' (regular relation's main fork), read it page by page,
* verify checksum and copy.
*/
static void *
check_files(void *arg)
{
int i;
check_files_arg *arguments = (check_files_arg *) arg;
int n_files_list = 0;
if (arguments->files_list)
n_files_list = parray_num(arguments->files_list);
/* check a file */
for (i = 0; i < n_files_list; i++)
{
int ret;
struct stat buf;
pgFile *file = (pgFile *) parray_get(arguments->files_list, i);
if (!pg_atomic_test_set_flag(&file->lock))
continue;
elog(VERBOSE, "Checking file: \"%s\" ", file->path);
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during checkdb");
if (progress)
elog(INFO, "Progress: (%d/%d). Process file \"%s\"",
i + 1, n_files_list, file->path);
/* stat file to check its current state */
ret = stat(file->path, &buf);
if (ret == -1)
{
if (errno == ENOENT)
{
/*
* If file is not found, this is not en error.
* It could have been deleted by concurrent postgres transaction.
*/
elog(LOG, "File \"%s\" is not found", file->path);
continue;
}
else
{
elog(ERROR,
"can't stat file to check \"%s\": %s",
file->path, strerror(errno));
}
}
/* No need to check directories */
if (S_ISDIR(buf.st_mode))
continue;
if (S_ISREG(buf.st_mode))
{
/* check only uncompressed by cfs datafiles */
if (file->is_datafile && !file->is_cfs)
{
/*
* TODO deep inside check_data_file
* uses global variables to set connections.
* Need refactoring.
*/
if (!check_data_file(&(arguments->conn_arg), file,
arguments->checksum_version))
arguments->ret = 2; /* corruption found */
}
}
else
elog(WARNING, "unexpected file type %d", buf.st_mode);
}
/* Ret values:
* 0 everything is ok
* 1 thread errored during execution, e.g. interruption (default value)
* 2 corruption is definitely(!) found
*/
if (arguments->ret == 1)
arguments->ret = 0;
return NULL;
}
/* collect list of files and run threads to check files in the instance */
static void
do_block_validation(char *pgdata, uint32 checksum_version)
{
int i;
/* arrays with meta info for multi threaded check */
pthread_t *threads;
check_files_arg *threads_args;
bool check_isok = true;
parray *files_list = NULL;
/* initialize file list */
files_list = parray_new();
/* list files with the logical path. omit $PGDATA */
dir_list_file(files_list, pgdata,
true, true, false, 0, FIO_DB_HOST);
/*
* Sort pathname ascending.
*
* For example:
* 1 - create 'base'
* 2 - create 'base/1'
*/
parray_qsort(files_list, pgFileComparePath);
/* Extract information about files in pgdata parsing their names:*/
parse_filelist_filenames(files_list, pgdata);
/* setup threads */
for (i = 0; i < parray_num(files_list); i++)
{
pgFile *file = (pgFile *) parray_get(files_list, i);
pg_atomic_init_flag(&file->lock);
}
/* Sort by size for load balancing */
parray_qsort(files_list, pgFileCompareSize);
/* init thread args with own file lists */
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
threads_args = (check_files_arg *) palloc(sizeof(check_files_arg)*num_threads);
for (i = 0; i < num_threads; i++)
{
check_files_arg *arg = &(threads_args[i]);
arg->files_list = files_list;
arg->checksum_version = checksum_version;
arg->conn_arg.conn = NULL;
arg->conn_arg.cancel_conn = NULL;
arg->thread_num = i + 1;
/* By default there is some error */
arg->ret = 1;
}
elog(INFO, "Start checking data files");
/* Run threads */
for (i = 0; i < num_threads; i++)
{
check_files_arg *arg = &(threads_args[i]);
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, check_files, arg);
}
/* Wait threads */
for (i = 0; i < num_threads; i++)
{
pthread_join(threads[i], NULL);
if (threads_args[i].ret > 0)
check_isok = false;
}
/* cleanup */
if (files_list)
{
parray_walk(files_list, pgFileFree);
parray_free(files_list);
files_list = NULL;
}
if (check_isok)
elog(INFO, "Data files are valid");
else
elog(ERROR, "Checkdb failed");
}
/* Check indexes with amcheck */
static void *
check_indexes(void *arg)
{
int i;
check_indexes_arg *arguments = (check_indexes_arg *) arg;
int n_indexes = 0;
if (arguments->index_list)
n_indexes = parray_num(arguments->index_list);
for (i = 0; i < n_indexes; i++)
{
pg_indexEntry *ind = (pg_indexEntry *) parray_get(arguments->index_list, i);
if (!pg_atomic_test_set_flag(&ind->lock))
continue;
/* check for interrupt */
if (interrupted || thread_interrupted)
elog(ERROR, "Thread [%d]: interrupted during checkdb --amcheck",
arguments->thread_num);
if (progress)
elog(INFO, "Thread [%d]. Progress: (%d/%d). Amchecking index '%s.%s'",
arguments->thread_num, i + 1, n_indexes,
ind->amcheck_nspname, ind->name);
if (arguments->conn_arg.conn == NULL)
{
arguments->conn_arg.conn = pgut_connect(arguments->conn_opt.pghost,
arguments->conn_opt.pgport,
arguments->conn_opt.pgdatabase,
arguments->conn_opt.pguser);
arguments->conn_arg.cancel_conn = PQgetCancel(arguments->conn_arg.conn);
}
/* remember that we have a failed check */
if (!amcheck_one_index(arguments, ind))
arguments->ret = 2; /* corruption found */
}
/* Close connection. */
if (arguments->conn_arg.conn)
pgut_disconnect(arguments->conn_arg.conn);
/* Ret values:
* 0 everything is ok
* 1 thread errored during execution, e.g. interruption (default value)
* 2 corruption is definitely(!) found
*/
if (arguments->ret == 1)
arguments->ret = 0;
return NULL;
}
/* Get index list for given database */
static parray*
get_index_list(const char *dbname, bool first_db_with_amcheck,
PGconn *db_conn)
{
PGresult *res;
char *nspname = NULL;
int i;
bool heapallindexed_is_supported = false;
parray *index_list = NULL;
res = pgut_execute(db_conn, "SELECT "
"extname, nspname, extversion "
"FROM pg_namespace n "
"JOIN pg_extension e "
"ON n.oid=e.extnamespace "
"WHERE e.extname IN ('amcheck', 'amcheck_next') "
"ORDER BY extversion DESC "
"LIMIT 1",
0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
elog(ERROR, "Cannot check if amcheck is installed in database %s: %s",
dbname, PQerrorMessage(db_conn));
}
if (PQntuples(res) < 1)
{
elog(WARNING, "Extension 'amcheck' or 'amcheck_next' are "
"not installed in database %s", dbname);
return NULL;
}
nspname = pgut_malloc(strlen(PQgetvalue(res, 0, 1)) + 1);
strcpy(nspname, PQgetvalue(res, 0, 1));
/* heapallindexed_is_supported is database specific */
if (strcmp(PQgetvalue(res, 0, 2), "1.0") != 0 &&
strcmp(PQgetvalue(res, 0, 2), "1") != 0)
heapallindexed_is_supported = true;
elog(INFO, "Amchecking database '%s' using extension '%s' "
"version %s from schema '%s'",
dbname, PQgetvalue(res, 0, 0),
PQgetvalue(res, 0, 2), PQgetvalue(res, 0, 1));
if (!heapallindexed_is_supported && heapallindexed)
elog(WARNING, "Extension '%s' verion %s in schema '%s'"
"do not support 'heapallindexed' option",
PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 2),
PQgetvalue(res, 0, 1));
/*
* In order to avoid duplicates, select global indexes
* (tablespace pg_global with oid 1664) only once.
*
* select only persistent btree indexes.
*/
if (first_db_with_amcheck)
{
res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname "
"FROM pg_index idx "
"JOIN pg_class cls ON idx.indexrelid=cls.oid "
"JOIN pg_am am ON cls.relam=am.oid "
"WHERE am.amname='btree' AND cls.relpersistence != 't'",
0, NULL);
}
else
{
res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname "
"FROM pg_index idx "
"JOIN pg_class cls ON idx.indexrelid=cls.oid "
"JOIN pg_am am ON cls.relam=am.oid "
"LEFT JOIN pg_tablespace tbl "
"ON cls.reltablespace=tbl.oid "
"AND tbl.spcname <> 'pg_global' "
"WHERE am.amname='btree' AND cls.relpersistence != 't'",
0, NULL);
}
/* add info needed to check indexes into index_list */
for (i = 0; i < PQntuples(res); i++)
{
pg_indexEntry *ind = (pg_indexEntry *) pgut_malloc(sizeof(pg_indexEntry));
char *name = NULL;
ind->indexrelid = atoi(PQgetvalue(res, i, 0));
name = PQgetvalue(res, i, 1);
ind->name = pgut_malloc(strlen(name) + 1);
strcpy(ind->name, name); /* enough buffer size guaranteed */
ind->heapallindexed_is_supported = heapallindexed_is_supported;
ind->amcheck_nspname = pgut_malloc(strlen(nspname) + 1);
strcpy(ind->amcheck_nspname, nspname);
pg_atomic_clear_flag(&ind->lock);
if (index_list == NULL)
index_list = parray_new();
parray_append(index_list, ind);
}
PQclear(res);
return index_list;
}
/* check one index. Return true if everything is ok, false otherwise. */
static bool
amcheck_one_index(check_indexes_arg *arguments,
pg_indexEntry *ind)
{
PGresult *res;
char *params[2];
char *query = NULL;
params[0] = palloc(64);
/* first argument is index oid */
sprintf(params[0], "%i", ind->indexrelid);
/* second argument is heapallindexed */
params[1] = heapallindexed ? "true" : "false";
if (interrupted)
elog(ERROR, "Interrupted");
if (ind->heapallindexed_is_supported)
{
query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1, $2)")+1);
sprintf(query, "SELECT %s.bt_index_check($1, $2)", ind->amcheck_nspname);
res = pgut_execute_parallel(arguments->conn_arg.conn,
arguments->conn_arg.cancel_conn,
query, 2, (const char **)params, true, true, true);
}
else
{
query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1)")+1);
sprintf(query, "SELECT %s.bt_index_check($1)", ind->amcheck_nspname);
res = pgut_execute_parallel(arguments->conn_arg.conn,
arguments->conn_arg.cancel_conn,
query, 1, (const char **)params, true, true, true);
}
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(WARNING, "Thread [%d]. Amcheck failed in database '%s' for index: '%s.%s': %s",
arguments->thread_num, arguments->conn_opt.pgdatabase,
ind->amcheck_nspname, ind->name, PQresultErrorMessage(res));
pfree(params[0]);
pfree(query);
PQclear(res);
return false;
}
else
elog(LOG, "Thread [%d]. Amcheck succeeded in database '%s' for index: '%s.%s'",
arguments->thread_num,
arguments->conn_opt.pgdatabase, ind->amcheck_nspname, ind->name);
pfree(params[0]);
pfree(query);
PQclear(res);
return true;
}
/*
* Entry point of checkdb --amcheck.
*
* Connect to all databases in the cluster
* and get list of persistent indexes,
* then run parallel threads to perform bt_index_check()
* for all indexes from the list.
*
* If amcheck extension is not installed in the database,
* skip this database and report it via warning message.
*/
static void
do_amcheck(ConnectionOptions conn_opt, PGconn *conn)
{
int i;
/* arrays with meta info for multi threaded amcheck */
pthread_t *threads;
check_indexes_arg *threads_args;
bool check_isok = true;
PGresult *res_db;
int n_databases = 0;
bool first_db_with_amcheck = true;
bool db_skipped = false;
elog(INFO, "Start amchecking PostgreSQL instance");
res_db = pgut_execute(conn,
"SELECT datname, oid, dattablespace "
"FROM pg_database "
"WHERE datname NOT IN ('template0', 'template1')",
0, NULL);
/* we don't need this connection anymore */
if (conn)
pgut_disconnect(conn);
n_databases = PQntuples(res_db);
/* For each database check indexes. In parallel. */
for(i = 0; i < n_databases; i++)
{
int j;
const char *dbname;
PGconn *db_conn = NULL;
parray *index_list = NULL;
dbname = PQgetvalue(res_db, i, 0);
db_conn = pgut_connect(conn_opt.pghost, conn_opt.pgport,
dbname, conn_opt.pguser);
index_list = get_index_list(dbname, first_db_with_amcheck,
db_conn);
/* we don't need this connection anymore */
if (db_conn)
pgut_disconnect(db_conn);
if (index_list == NULL)
{
db_skipped = true;
continue;
}
first_db_with_amcheck = false;
/* init thread args with own index lists */
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
threads_args = (check_indexes_arg *) palloc(sizeof(check_indexes_arg)*num_threads);
for (j = 0; j < num_threads; j++)
{
check_indexes_arg *arg = &(threads_args[j]);
arg->index_list = index_list;
arg->conn_arg.conn = NULL;
arg->conn_arg.cancel_conn = NULL;
arg->conn_opt.pghost = conn_opt.pghost;
arg->conn_opt.pgport = conn_opt.pgport;
arg->conn_opt.pgdatabase = dbname;
arg->conn_opt.pguser = conn_opt.pguser;
arg->thread_num = j + 1;
/* By default there are some error */
arg->ret = 1;
}
/* Run threads */
for (j = 0; j < num_threads; j++)
{
check_indexes_arg *arg = &(threads_args[j]);
elog(VERBOSE, "Start thread num: %i", j);
pthread_create(&threads[j], NULL, check_indexes, arg);
}
/* Wait threads */
for (j = 0; j < num_threads; j++)
{
pthread_join(threads[j], NULL);
if (threads_args[j].ret > 0)
check_isok = false;
}
if (check_isok)
elog(INFO, "Amcheck succeeded for database '%s'", dbname);
else
elog(WARNING, "Amcheck failed for database %s", dbname);
parray_walk(index_list, pg_indexEntry_free);
parray_free(index_list);
if (interrupted)
break;
}
/* cleanup */
PQclear(res_db);
/* Inform user about amcheck results */
if (interrupted)
elog(ERROR, "checkdb --amcheck is interrupted.");
if (check_isok)
{
elog(INFO, "checkdb --amcheck finished successfully. "
"All checked indexes are valid.");
if (db_skipped)
elog(ERROR, "Some databases were not amchecked.");
else
elog(INFO, "All databases were amchecked.");
}
else
elog(ERROR, "checkdb --amcheck finished with failure. "
"Not all checked indexes are valid. %s",
db_skipped?"Some databases were not amchecked.":
"All databases were amchecked.");
}
/* Entry point of pg_probackup CHECKDB subcommand */
void
do_checkdb(bool need_amcheck,
ConnectionOptions conn_opt, char *pgdata)
{
PGNodeInfo nodeInfo;
PGconn *cur_conn;
if (skip_block_validation && !need_amcheck)
elog(ERROR, "Option '--skip-block-validation' must be used with '--amcheck' option");
if (!skip_block_validation)
{
if (!pgdata)
elog(ERROR, "required parameter not specified: PGDATA "
"(-D, --pgdata)");
/* get node info */
cur_conn = pgdata_basic_setup(conn_opt, &nodeInfo);
/* ensure that conn credentials and pgdata are consistent */
check_system_identifiers(cur_conn, pgdata);
/*
* we don't need this connection anymore.
* block validation can last long time,
* so we don't hold the connection open,
* rather open new connection for amcheck
*/
if (cur_conn)
pgut_disconnect(cur_conn);
do_block_validation(pgdata, nodeInfo.checksum_version);
}
if (need_amcheck)
{
cur_conn = pgdata_basic_setup(conn_opt, &nodeInfo);
do_amcheck(conn_opt, cur_conn);
}
}

View File

@ -72,43 +72,43 @@ ConfigOption instance_options[] =
/* Connection options */
{
's', 'd', "pgdatabase",
&instance_config.pgdatabase, SOURCE_CMD, 0,
&instance_config.conn_opt.pgdatabase, SOURCE_CMD, 0,
OPTION_CONN_GROUP, 0, option_get_value
},
{
's', 'h', "pghost",
&instance_config.pghost, SOURCE_CMD, 0,
&instance_config.conn_opt.pghost, SOURCE_CMD, 0,
OPTION_CONN_GROUP, 0, option_get_value
},
{
's', 'p', "pgport",
&instance_config.pgport, SOURCE_CMD, 0,
&instance_config.conn_opt.pgport, SOURCE_CMD, 0,
OPTION_CONN_GROUP, 0, option_get_value
},
{
's', 'U', "pguser",
&instance_config.pguser, SOURCE_CMD, 0,
&instance_config.conn_opt.pguser, SOURCE_CMD, 0,
OPTION_CONN_GROUP, 0, option_get_value
},
/* Replica options */
{
's', 202, "master-db",
&instance_config.master_db, SOURCE_CMD, 0,
&instance_config.master_conn_opt.pgdatabase, SOURCE_CMD, 0,
OPTION_REPLICA_GROUP, 0, option_get_value
},
{
's', 203, "master-host",
&instance_config.master_host, SOURCE_CMD, 0,
&instance_config.master_conn_opt.pghost, SOURCE_CMD, 0,
OPTION_REPLICA_GROUP, 0, option_get_value
},
{
's', 204, "master-port",
&instance_config.master_port, SOURCE_CMD, 0,
&instance_config.master_conn_opt.pgport, SOURCE_CMD, 0,
OPTION_REPLICA_GROUP, 0, option_get_value
},
{
's', 205, "master-user",
&instance_config.master_user, SOURCE_CMD, 0,
&instance_config.master_conn_opt.pguser, SOURCE_CMD, 0,
OPTION_REPLICA_GROUP, 0, option_get_value
},
{

View File

@ -195,7 +195,8 @@ parse_page(Page page, XLogRecPtr *lsn)
*/
static int
read_page_from_file(pgFile *file, BlockNumber blknum,
FILE *in, Page page, XLogRecPtr *page_lsn)
FILE *in, Page page, XLogRecPtr *page_lsn,
uint32 checksum_version)
{
off_t offset = blknum * BLCKSZ;
ssize_t read_len = 0;
@ -251,7 +252,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
}
/* Verify checksum */
if (current.checksum_version)
if (checksum_version)
{
BlockNumber blkno = file->segno * RELSEG_SIZE + blknum;
/*
@ -289,13 +290,14 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
* PageIsCorrupted(-4) if the page check mismatch
*/
static int32
prepare_page(backup_files_arg *arguments,
prepare_page(ConnectionArgs *arguments,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BlockNumber blknum, BlockNumber nblocks,
FILE *in, BlockNumber *n_skipped,
BackupMode backup_mode,
Page page,
bool strict)
bool strict,
uint32 checksum_version)
{
XLogRecPtr page_lsn = 0;
int try_again = 100;
@ -316,7 +318,8 @@ prepare_page(backup_files_arg *arguments,
{
while(!page_is_valid && try_again)
{
int result = read_page_from_file(file, blknum, in, page, &page_lsn);
int result = read_page_from_file(file, blknum, in, page,
&page_lsn, checksum_version);
try_again--;
if (result == 0)
@ -335,7 +338,7 @@ prepare_page(backup_files_arg *arguments,
* If ptrack support is available use it to get invalid block
* instead of rereading it 99 times
*/
//elog(WARNING, "Checksum_Version: %i", current.checksum_version ? 1 : 0);
//elog(WARNING, "Checksum_Version: %i", checksum_version ? 1 : 0);
if (result == -1 && is_ptrack_support && strict)
{
@ -399,7 +402,7 @@ prepare_page(backup_files_arg *arguments,
*/
memcpy(page, ptrack_page, BLCKSZ);
free(ptrack_page);
if (current.checksum_version)
if (checksum_version)
((PageHeader) page)->pd_checksum = pg_checksum_page(page, absolute_blknum);
}
/* get lsn from page, provided by pg_ptrack_get_block() */
@ -631,9 +634,9 @@ backup_data_file(backup_files_arg* arguments,
RetryUsingPtrack:
for (blknum = 0; blknum < nblocks; blknum++)
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
page_state = prepare_page(&(arguments->conn_arg), file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page, true);
backup_mode, curr_page, true, current.checksum_version);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page, calg, clevel);
n_blocks_read++;
@ -655,9 +658,9 @@ backup_data_file(backup_files_arg* arguments,
iter = datapagemap_iterate(&file->pagemap);
while (datapagemap_next(iter, &blknum))
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
page_state = prepare_page(&(arguments->conn_arg), file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page, true);
backup_mode, curr_page, true, current.checksum_version);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page, calg, clevel);
n_blocks_read++;
@ -1189,8 +1192,8 @@ validate_one_page(Page page, pgFile *file,
* also returns true if the file was not found
*/
bool
check_data_file(backup_files_arg* arguments,
pgFile *file)
check_data_file(ConnectionArgs *arguments,
pgFile *file, uint32 checksum_version)
{
FILE *in;
BlockNumber blknum = 0;
@ -1235,7 +1238,7 @@ check_data_file(backup_files_arg* arguments,
{
page_state = prepare_page(arguments, file, InvalidXLogRecPtr,
blknum, nblocks, in, &n_blocks_skipped,
BACKUP_MODE_FULL, curr_page, false);
BACKUP_MODE_FULL, curr_page, false, checksum_version);
if (page_state == PageIsTruncated)
break;

View File

@ -418,7 +418,10 @@ main(int argc, char *argv[])
elog(ERROR, "-B, --backup-path must be an absolute path");
/* Option --instance is required for all commands except init and show */
/*
* Option --instance is required for all commands except
* init, show, checkdb and validate
*/
if (instance_name == NULL)
{
if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD &&
@ -553,16 +556,16 @@ main(int argc, char *argv[])
elog(ERROR, "Invalid backup-id \"%s\"", backup_id_string);
}
if (!instance_config.pghost && instance_config.remote.host)
instance_config.pghost = instance_config.remote.host;
if (!instance_config.conn_opt.pghost && instance_config.remote.host)
instance_config.conn_opt.pghost = instance_config.remote.host;
/* Setup stream options. They are used in streamutil.c. */
if (instance_config.pghost != NULL)
dbhost = pstrdup(instance_config.pghost);
if (instance_config.pgport != NULL)
dbport = pstrdup(instance_config.pgport);
if (instance_config.pguser != NULL)
dbuser = pstrdup(instance_config.pguser);
if (instance_config.conn_opt.pghost != NULL)
dbhost = pstrdup(instance_config.conn_opt.pghost);
if (instance_config.conn_opt.pgport != NULL)
dbport = pstrdup(instance_config.conn_opt.pgport);
if (instance_config.conn_opt.pguser != NULL)
dbuser = pstrdup(instance_config.conn_opt.pguser);
/* setup exclusion list for file search */
if (!backup_logs)
@ -663,7 +666,8 @@ main(int argc, char *argv[])
do_set_config(false);
break;
case CHECKDB_CMD:
do_checkdb(need_amcheck);
do_checkdb(need_amcheck,
instance_config.conn_opt, instance_config.pgdata);
break;
case NO_CMD:
/* Should not happen */

View File

@ -148,15 +148,6 @@ typedef struct pgFile
* i.e. datafiles without _ptrack */
} pgFile;
typedef struct pg_indexEntry
{
Oid indexrelid;
char *name;
char *dbname;
char *amcheck_nspname; /* schema where amcheck extention is located */
volatile pg_atomic_flag lock; /* lock for synchronization of parallel threads */
} pg_indexEntry;
/* Special values of datapagemap_t bitmapsize */
#define PageBitmapIsEmpty 0 /* Used to mark unchanged datafiles */
@ -199,6 +190,21 @@ typedef enum ShowFormat
#define PROGRAM_VERSION "2.1.3"
#define AGENT_PROTOCOL_VERSION 20103
typedef struct ConnectionOptions
{
const char *pgdatabase;
const char *pghost;
const char *pgport;
const char *pguser;
} ConnectionOptions;
typedef struct ConnectionArgs
{
PGconn *conn;
PGcancel *cancel_conn;
} ConnectionArgs;
/*
* An instance configuration. It can be stored in a configuration file or passed
* from command line.
@ -210,15 +216,10 @@ typedef struct InstanceConfig
char *pgdata;
char *external_dir_str;
const char *pgdatabase;
const char *pghost;
const char *pgport;
const char *pguser;
const char *master_host;
const char *master_port;
const char *master_db;
const char *master_user;
ConnectionOptions conn_opt;
ConnectionOptions master_conn_opt;
uint32 replica_timeout;
/* Wait timeout for WAL segment archiving */
@ -241,6 +242,16 @@ typedef struct InstanceConfig
extern ConfigOption instance_options[];
extern InstanceConfig instance_config;
typedef struct PGNodeInfo
{
uint32 block_size;
uint32 wal_block_size;
uint32 checksum_version;
char program_version[100];
char server_version[100];
} PGNodeInfo;
typedef struct pgBackup pgBackup;
/* Information about single backup stored in backup.conf */
@ -278,10 +289,10 @@ struct pgBackup
int compress_level;
/* Fields needed for compatibility check */
PGNodeInfo nodeInfo;
uint32 block_size;
uint32 wal_block_size;
uint32 checksum_version;
char program_version[100];
char server_version[100];
@ -330,9 +341,7 @@ typedef struct
parray *external_dirs;
XLogRecPtr prev_start_lsn;
PGconn *backup_conn;
PGcancel *cancel_conn;
parray *index_list;
ConnectionArgs conn_arg;
int thread_num;
/*
@ -342,6 +351,7 @@ typedef struct
int ret;
} backup_files_arg;
/*
* When copying datafiles to backup we validate and compress them block
* by block. Thus special header is required for each data block.
@ -462,13 +472,14 @@ extern const char *pgdata_exclude_dir[];
/* in backup.c */
extern int do_backup(time_t start_time, bool no_validate);
extern void do_checkdb(bool need_amcheck);
extern void do_checkdb(bool need_amcheck, ConnectionOptions conn_opt,
char *pgdata);
extern BackupMode parse_backup_mode(const char *value);
extern const char *deparse_backup_mode(BackupMode mode);
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
BlockNumber blkno);
extern char *pg_ptrack_get_block(backup_files_arg *arguments,
extern char *pg_ptrack_get_block(ConnectionArgs *arguments,
Oid dbOid, Oid tblsOid, Oid relOid,
BlockNumber blknum,
size_t *result_size);
@ -625,8 +636,7 @@ extern int pgFileCompareLinked(const void *f1, const void *f2);
extern int pgFileCompareSize(const void *f1, const void *f2);
/* in data.c */
extern bool check_data_file(backup_files_arg* arguments,
pgFile *file);
extern bool check_data_file(ConnectionArgs* arguments, pgFile* file, uint32 checksum_version);
extern bool backup_data_file(backup_files_arg* arguments,
const char *to_path, pgFile *file,
XLogRecPtr prev_backup_start_lsn,
@ -685,4 +695,10 @@ extern bool parse_page(Page page, XLogRecPtr *lsn);
int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, int level, const char **errormsg);
extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo);
extern void check_system_identifiers(PGconn *conn, char *pgdata);
extern void parse_filelist_filenames(parray *files, const char *root);
#endif /* PG_PROBACKUP_H */

View File

@ -228,7 +228,10 @@ pgut_connect(const char *host, const char *port,
dbname, username, password);
if (PQstatus(conn) == CONNECTION_OK)
{
pgut_atexit_push(pgut_disconnect_callback, conn);
return conn;
}
if (conn && PQconnectionNeedsPassword(conn) && prompt_password)
{
@ -354,6 +357,7 @@ pgut_disconnect(PGconn *conn)
{
if (conn)
PQfinish(conn);
pgut_atexit_pop(pgut_disconnect_callback, conn);
}
@ -781,6 +785,14 @@ struct pgut_atexit_item
static pgut_atexit_item *pgut_atexit_stack = NULL;
void
pgut_disconnect_callback(bool fatal, void *userdata)
{
PGconn *conn = (PGconn *) userdata;
if (conn)
pgut_disconnect(conn);
}
void
pgut_atexit_push(pgut_atexit_callback callback, void *userdata)
{

View File

@ -43,6 +43,7 @@ extern PGconn *pgut_connect_replication(const char *host, const char *port,
const char *dbname,
const char *username);
extern void pgut_disconnect(PGconn *conn);
extern void pgut_disconnect_callback(bool fatal, void *userdata);
extern PGresult *pgut_execute(PGconn* conn, const char *query, int nParams,
const char **params);
extern PGresult *pgut_execute_extended(PGconn* conn, const char *query, int nParams,

View File

@ -75,10 +75,10 @@ class CheckdbTest(ProbackupTest, unittest.TestCase):
'-d', 'postgres', '-p', str(node.port)])
self.assertIn(
'INFO: Checkdb --amcheck executed successfully',
'INFO: checkdb --amcheck finished successfully',
output)
self.assertIn(
'INFO: Indexes are valid',
'All checked indexes are valid',
output)
# logging to file sanity
@ -143,7 +143,7 @@ class CheckdbTest(ProbackupTest, unittest.TestCase):
with open(log_file_path) as f:
log_file_content = f.read()
self.assertIn(
'INFO: Checkdb --amcheck executed successfully',
'INFO: checkdb --amcheck finished successfully',
log_file_content)
self.assertIn(
'VERBOSE: (query)',
@ -167,7 +167,7 @@ class CheckdbTest(ProbackupTest, unittest.TestCase):
with open(log_file_path) as f:
log_file_content = f.read()
self.assertIn(
'INFO: Checkdb --amcheck executed successfully',
'INFO: checkdb --amcheck finished successfully',
log_file_content)
self.assertIn(
'VERBOSE: (query)',
@ -199,7 +199,7 @@ class CheckdbTest(ProbackupTest, unittest.TestCase):
with open(log_file_path) as f:
log_file_content = f.read()
self.assertIn(
'ERROR: Checkdb --amcheck failed',
'ERROR: checkdb --amcheck finished with failure',
log_file_content)
self.assertIn(
"WARNING: Thread [1]. Amcheck failed in database 'postgres' "
@ -323,7 +323,7 @@ class CheckdbTest(ProbackupTest, unittest.TestCase):
repr(self.output), self.cmd))
except ProbackupException as e:
self.assertIn(
"ERROR: Checkdb --amcheck failed",
"ERROR: checkdb --amcheck finished with failure",
e.message,
"\n Unexpected Error Message: {0}\n CMD: {1}".format(
repr(e.message), self.cmd))
@ -344,7 +344,7 @@ class CheckdbTest(ProbackupTest, unittest.TestCase):
log_file_content)
self.assertIn(
"ERROR: Checkdb --amcheck failed",
"ERROR: checkdb --amcheck finished with failure",
log_file_content)
# Clean after yourself