1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-13 14:58:35 +02:00

pgpro-2065. WIP. fix amcheck return status, call amcheck function using correct schema name

This commit is contained in:
Anastasia 2018-12-12 18:19:04 +03:00
parent 26c2e066f6
commit ffd4b2f5cf
3 changed files with 76 additions and 35 deletions

View File

@ -95,6 +95,7 @@ static bool pg_stop_backup_is_sent = false;
*/
static void backup_cleanup(bool fatal, void *userdata);
static void backup_disconnect(bool fatal, void *userdata);
static void threads_conn_disconnect(bool fatal, void *userdata);
static void *backup_files(void *arg);
static void *check_files(void *arg);
@ -104,7 +105,7 @@ static void do_backup_instance(void);
static void do_check_instance(bool do_amcheck);
static parray* get_index_list(void);
static bool amcheck_one_index(backup_files_arg *arguments,
Oid indexrelid);
pg_indexEntry *ind);
static void pg_start_backup(const char *label, bool smooth, pgBackup *backup);
static void pg_switch_wal(PGconn *conn);
@ -535,6 +536,8 @@ do_check_instance(bool do_amcheck)
arg->ret = 1;
}
pgut_atexit_push(threads_conn_disconnect, NULL);
/* Run threads */
elog(INFO, "Start checking data files");
for (i = 0; i < num_threads; i++)
@ -2345,6 +2348,27 @@ backup_disconnect(bool fatal, void *userdata)
pgut_disconnect(master_conn);
}
/*
* Disconnect backup connections created in threads during quit pg_probackup.
*/
static void
threads_conn_disconnect(bool fatal, void *userdata)
{
int i;
elog(VERBOSE, "threads_conn_disconnect, num_threads %d", num_threads);
for (i = 0; i < num_threads; i++)
{
// backup_files_arg *arg = &(threads_args[i]);
//
// if (arg->backup_conn)
// {
// pgut_cancel(arg->backup_conn);
// pgut_disconnect(arg->backup_conn);
// }
}
}
static void *
check_files(void *arg)
{
@ -2352,6 +2376,7 @@ check_files(void *arg)
backup_files_arg *arguments = (backup_files_arg *) arg;
int n_backup_files_list = parray_num(arguments->files_list);
int n_indexes = 0;
char dbname[NAMEDATALEN];
/* We only have index_list in amcheck mode */
if (arguments->index_list)
@ -2421,11 +2446,11 @@ check_files(void *arg)
/* check index with amcheck */
/* TODO sort index_list by dbname */
for (i = 0; i < n_indexes; i++)
{
pg_indexEntry *ind = (pg_indexEntry *) parray_get(arguments->index_list, i);
elog(VERBOSE, "Checking index: \"%s\" ", ind->name);
if (!pg_atomic_test_set_flag(&ind->lock))
continue;
@ -2433,7 +2458,22 @@ check_files(void *arg)
if (interrupted)
elog(ERROR, "interrupted during checkdb --amcheck");
amcheck_one_index(arguments, ind->indexrelid);
elog(VERBOSE, "Checking index: \"%s\" ", ind->name);
/* reconnect, if index to check is in another database */
if (strcmp((const char *) &dbname, ind->dbname) != 0)
{
if (arguments->backup_conn)
pgut_disconnect(arguments->backup_conn);
arguments->backup_conn = pgut_connect(instance_config.pghost,
instance_config.pgport,
ind->dbname,
instance_config.pguser);
arguments->cancel_conn = PQgetCancel(arguments->backup_conn);
}
amcheck_one_index(arguments, ind);
}
/* Close connection */
@ -3150,10 +3190,10 @@ get_index_list(void)
PGresult *res_db,
*res;
const char *dbname;
const char *nspname = NULL;
int i;
Oid dbOid, tblspcOid;
char *params[2];
const char *indexname;
Oid indexrelid;
params[0] = palloc(64);
@ -3176,7 +3216,9 @@ get_index_list(void)
dbname,
instance_config.pguser);
res = pgut_execute(tmp_conn, "select * from pg_extension where extname='amcheck'", 0, NULL);
res = pgut_execute(tmp_conn, "select extname, nspname, extversion from pg_namespace "
"n join pg_extension e on n.oid=e.extnamespace where e.extname='amcheck'",
0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
@ -3191,7 +3233,14 @@ get_index_list(void)
continue;
}
if (nspname)
free(nspname);
nspname = pgut_malloc(strlen(PQgetvalue(res, 0, 1)) + 1);
strcpy(nspname, PQgetvalue(res, 0, 1));
PQclear(res);
res = pgut_execute(tmp_conn, "SELECT cls.oid, cls.relname"
" FROM pg_index idx "
" JOIN pg_class cls ON cls.oid=idx.indexrelid "
@ -3210,17 +3259,20 @@ get_index_list(void)
ind->name = pgut_malloc(strlen(name) + 1);
strcpy(ind->name, name); /* enough buffer size guaranteed */
ind->dbname = pgut_malloc(strlen(dbname) + 1);
strcpy(ind->dbname, dbname); /* enough buffer size guaranteed */
ind->amcheck_nspname = pgut_malloc(strlen(nspname) + 1);
strcpy(ind->amcheck_nspname, nspname);
pg_atomic_clear_flag(&ind->lock);
if (index_list == NULL)
index_list = parray_new();
/* TODO maybe keep structure with index name and some other fileds? */
parray_append(index_list, ind);
}
PQclear(res);
pgut_disconnect(tmp_conn);
}
@ -3233,48 +3285,35 @@ get_index_list(void)
/* check one index. Return true if everything is ok, false otherwise. */
static bool
amcheck_one_index(backup_files_arg *arguments,
Oid indexrelid)
pg_indexEntry *ind)
{
PGresult *res;
char *params[1];
char *result;
char *query;
params[0] = palloc(64);
/*
* Use tmp_conn, since we may work in parallel threads.
* We can connect to any database.
*/
sprintf(params[0], "%i", indexrelid);
sprintf(params[0], "%i", ind->indexrelid);
if (arguments->backup_conn == NULL)
{
arguments->backup_conn = pgut_connect(instance_config.pghost,
instance_config.pgport,
instance_config.pgdatabase,
instance_config.pguser);
}
if (arguments->cancel_conn == NULL)
arguments->cancel_conn = PQgetCancel(arguments->backup_conn);
query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1)")+1);
sprintf(query, "SELECT %s.bt_index_check($1)", ind->amcheck_nspname);
res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn,
"SELECT bt_index_check($1)",
1, (const char **)params, true);
query, 1, (const char **)params, true);
/* TODO now this check is hidden inside pgut_execute_parallel
* We need to handle failed check as an error.
*/
if (PQresultStatus(res) != PGRES_COMMAND_OK)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
elog(VERBOSE, "amcheck failed for relation oid %u: %s",
indexrelid, PQresultErrorMessage(res));
ind->indexrelid, PQresultErrorMessage(res));
pfree(params[0]);
PQclear(res);
return false;
}
else
elog(VERBOSE, "amcheck succeeded for relation oid %u",
ind->indexrelid);
pfree(params[0]);
PQclear(res);

View File

@ -396,7 +396,7 @@ main(int argc, char *argv[])
config_get_opt_env(instance_options);
if (backup_subcmd == CHECKDB_CMD
&& (&instance_config.logger.log_level_file != LOG_OFF)
&& (instance_config.logger.log_level_file != LOG_OFF)
&& (&instance_config.logger.log_directory == NULL))
elog(ERROR, "Cannot save checkdb logs to a file. "
"You must specify --log-directory option when running checkdb with "

View File

@ -133,6 +133,8 @@ typedef struct pg_indexEntry
{
Oid indexrelid;
char *name;
char *dbname;
char *amcheck_nspname; /* schema where amcheck extention is located */
volatile pg_atomic_flag lock; /* lock for synchronization of parallel threads */
} pg_indexEntry;