1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-03-05 16:07:35 +02:00

PGPRO-2065: --amcheck minor fixes

This commit is contained in:
Grigory Smolkin 2019-04-01 03:45:15 +03:00
parent 19405a95b3
commit 030832c0ba
3 changed files with 66 additions and 93 deletions

View File

@ -109,7 +109,7 @@ static void do_amcheck(void);
static void *check_files(void *arg); static void *check_files(void *arg);
static void *check_indexes(void *arg); static void *check_indexes(void *arg);
static parray* get_index_list(PGresult* res_db, int db_number, static parray* get_index_list(PGresult* res_db, int db_number,
bool* first_db_with_amcheck, PGconn* db_conn); bool first_db_with_amcheck, PGconn* db_conn);
static bool amcheck_one_index(backup_files_arg *arguments, static bool amcheck_one_index(backup_files_arg *arguments,
pg_indexEntry *ind); pg_indexEntry *ind);
@ -1049,13 +1049,19 @@ do_amcheck(void)
pgBackupGetPath(&current, database_path, lengthof(database_path), pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR); DATABASE_DIR);
res_db = pgut_execute(backup_conn, "SELECT datname, oid, dattablespace FROM pg_database", res_db = pgut_execute(backup_conn,
"SELECT datname, oid, dattablespace "
"FROM pg_database "
"WHERE datname NOT IN ('template0', 'template1')",
0, NULL); 0, NULL);
n_databases = PQntuples(res_db); n_databases = PQntuples(res_db);
/* TODO Warn user that one connection is used for snaphot */ /* TODO Warn user that one connection is used for snaphot */
if (num_threads > 1) //if (num_threads > 1)
num_threads--; // num_threads--;
elog(INFO, "Start checking instance with amcheck");
/* For each database check indexes. In parallel. */ /* For each database check indexes. In parallel. */
for(i = 0; i < n_databases; i++) for(i = 0; i < n_databases; i++)
@ -1069,7 +1075,8 @@ do_amcheck(void)
} }
index_list = get_index_list(res_db, i, index_list = get_index_list(res_db, i,
&first_db_with_amcheck, db_conn); first_db_with_amcheck, db_conn);
if (index_list == NULL) if (index_list == NULL)
{ {
if (db_conn) if (db_conn)
@ -1077,6 +1084,8 @@ do_amcheck(void)
continue; continue;
} }
first_db_with_amcheck = false;
/* init thread args with own file lists */ /* init thread args with own file lists */
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads); threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
@ -1099,8 +1108,6 @@ do_amcheck(void)
pgut_atexit_push(threads_conn_disconnect, NULL); pgut_atexit_push(threads_conn_disconnect, NULL);
elog(INFO, "Start checking indexes with amcheck");
/* Run threads */ /* Run threads */
for (j = 0; j < num_threads; j++) for (j = 0; j < num_threads; j++)
{ {
@ -1115,7 +1122,7 @@ do_amcheck(void)
for (j = 0; j < num_threads; j++) for (j = 0; j < num_threads; j++)
{ {
pthread_join(threads[j], NULL); pthread_join(threads[j], NULL);
if (threads_args[j].ret == 1) if (threads_args[j].ret > 0)
check_isok = false; check_isok = false;
} }
pgut_disconnect(db_conn); pgut_disconnect(db_conn);
@ -1123,16 +1130,16 @@ do_amcheck(void)
/* TODO write better info message */ /* TODO write better info message */
if (check_isok) if (check_isok)
elog(INFO, "Indexes are checked"); elog(INFO, "Indexes are valid");
else else
elog(ERROR, "Indexs checking failed"); elog(ERROR, "Some indexes are corrupted");
if (backup_files_list) //if (backup_files_list)
{ //{
parray_walk(backup_files_list, pgFileFree); // parray_walk(backup_files_list, pgFileFree);
parray_free(backup_files_list); // parray_free(backup_files_list);
backup_files_list = NULL; // backup_files_list = NULL;
} //}
} }
/* Entry point of pg_probackup CHECKDB subcommand. */ /* Entry point of pg_probackup CHECKDB subcommand. */
@ -1150,8 +1157,8 @@ do_checkdb(bool need_amcheck)
if (!skip_block_validation) if (!skip_block_validation)
do_block_validation(); do_block_validation();
//if (need_amcheck) if (need_amcheck)
// do_amcheck(); do_amcheck();
return 0; return 0;
} }
@ -2540,7 +2547,7 @@ threads_conn_disconnect(bool fatal, void *userdata)
{ {
// int i; // int i;
elog(VERBOSE, "threads_conn_disconnect, num_threads %d", num_threads); // elog(VERBOSE, "threads_conn_disconnect, num_threads %d", num_threads);
// for (i = 0; i < num_threads; i++) // for (i = 0; i < num_threads; i++)
// { // {
// backup_files_arg *arg = &(threads_args[i]); // backup_files_arg *arg = &(threads_args[i]);
@ -2654,7 +2661,9 @@ check_indexes(void *arg)
if (interrupted || thread_interrupted) if (interrupted || thread_interrupted)
elog(ERROR, "interrupted during checkdb --amcheck"); elog(ERROR, "interrupted during checkdb --amcheck");
elog(VERBOSE, "Checking index number %d of %d : \"%s\" ", i,n_indexes, ind->name); if (progress)
elog(INFO, "Progress: (%d/%d). Processing index '%s.%s'",
i + 1, n_indexes, ind->amcheck_nspname, ind->name);
if (arguments->backup_conn == NULL) if (arguments->backup_conn == NULL)
{ {
@ -2666,26 +2675,11 @@ check_indexes(void *arg)
ind->dbname, ind->dbname,
instance_config.pguser); instance_config.pguser);
arguments->cancel_conn = PQgetCancel(arguments->backup_conn); arguments->cancel_conn = PQgetCancel(arguments->backup_conn);
res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn,
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;", 0, NULL, false);
PQclear(res);
query = palloc(strlen("SET TRANSACTION SNAPSHOT '' ") + strlen(ind->snapshot));
sprintf(query, "SET TRANSACTION SNAPSHOT '%s'", ind->snapshot);
res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn,
query, 0, NULL, false);
PQclear(res);
free(query);
} }
/* remember that we have a failed check */ /* remember that we have a failed check */
if (!amcheck_one_index(arguments, ind)) if (!amcheck_one_index(arguments, ind))
arguments->ret = 1; arguments->ret = 2;
} }
/* Close connection */ /* Close connection */
@ -2693,6 +2687,7 @@ check_indexes(void *arg)
pgut_disconnect(arguments->backup_conn); pgut_disconnect(arguments->backup_conn);
/* TODO where should we set arguments->ret to 1? */ /* TODO where should we set arguments->ret to 1? */
if (arguments->ret == 1)
arguments->ret = 0; arguments->ret = 0;
return NULL; return NULL;
@ -3410,7 +3405,7 @@ pg_ptrack_get_block(backup_files_arg *arguments,
res = pgut_execute_parallel(arguments->backup_conn, res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn, arguments->cancel_conn,
"SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)", "SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)",
4, (const char **)params, true); 4, (const char **)params, true, false);
if (PQnfields(res) != 1) if (PQnfields(res) != 1)
{ {
@ -3478,10 +3473,10 @@ check_external_for_tablespaces(parray *external_list)
PQclear(res); PQclear(res);
} }
/* Clear ptrack files in all databases of the instance we connected to */ /* Get index list for given database */
static parray* static parray*
get_index_list(PGresult *res_db, int db_number, get_index_list(PGresult *res_db, int db_number,
bool *first_db_with_amcheck, PGconn *db_conn) bool first_db_with_amcheck, PGconn *db_conn)
{ {
PGresult *res; PGresult *res;
char *nspname = NULL; char *nspname = NULL;
@ -3489,27 +3484,28 @@ get_index_list(PGresult *res_db, int db_number,
int i; int i;
dbname = PQgetvalue(res_db, db_number, 0); dbname = PQgetvalue(res_db, db_number, 0);
if (strcmp(dbname, "template0") == 0)
return NULL;
db_conn = pgut_connect(instance_config.pghost, instance_config.pgport, db_conn = pgut_connect(instance_config.pghost, instance_config.pgport,
dbname, dbname,
instance_config.pguser); instance_config.pguser);
res = pgut_execute(db_conn, "select extname, nspname, extversion from pg_namespace " res = pgut_execute(db_conn, "SELECT extname, nspname, extversion "
"n join pg_extension e on n.oid=e.extnamespace where e.extname='amcheck'", "FROM pg_namespace n "
"JOIN pg_extension e "
"ON n.oid=e.extnamespace "
"WHERE e.extname='amcheck'",
0, NULL); 0, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
PQclear(res); PQclear(res);
elog(ERROR, "cannot check if amcheck is installed in database %s: %s", elog(ERROR, "Cannot check if amcheck is installed in database %s: %s",
dbname, PQerrorMessage(db_conn)); dbname, PQerrorMessage(db_conn));
} }
if (PQntuples(res) < 1) if (PQntuples(res) < 1)
{ {
elog(WARNING, "extension amcheck is not installed in database %s", dbname); elog(WARNING, "Extension amcheck is not installed in database %s", dbname);
return NULL; return NULL;
} }
@ -3520,52 +3516,28 @@ get_index_list(PGresult *res_db, int db_number,
* In order to avoid duplicates, select global indexes * In order to avoid duplicates, select global indexes
* (tablespace pg_global with oid 1664) only once * (tablespace pg_global with oid 1664) only once
*/ */
if (*first_db_with_amcheck) if (first_db_with_amcheck)
{ {
res = pgut_execute(db_conn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;", 0, NULL);
PQclear(res);
res = pgut_execute(db_conn, "SELECT pg_export_snapshot();", 0, NULL);
if (PQntuples(res) < 1)
elog(ERROR, "Failed to export snapshot for amcheck in database %s", dbname);
snapshot = pgut_malloc(strlen(PQgetvalue(res, 0, 0)) + 1);
strcpy(snapshot, PQgetvalue(res, 0, 0));
PQclear(res);
/* select only valid btree and persistent indexes */
res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname" res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname"
" FROM pg_index idx " " FROM pg_index idx "
" JOIN pg_class cls ON cls.oid=idx.indexrelid " " JOIN pg_class cls ON cls.oid=idx.indexrelid "
" JOIN pg_am am ON am.oid=cls.relam " " JOIN pg_am am ON am.oid=cls.relam "
" WHERE am.amname='btree' AND cls.relpersistence != 't'" " WHERE am.amname='btree' AND cls.relpersistence != 't'"
" AND idx.indisready AND idx.indisvalid; ", 0, NULL); " AND idx.indisready AND idx.indisvalid; ", 0, NULL);
*first_db_with_amcheck = false;
} }
else else
{ {
res = pgut_execute(db_conn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;", 0, NULL);
PQclear(res);
res = pgut_execute(db_conn, "SELECT pg_export_snapshot();", 0, NULL);
if (PQntuples(res) < 1)
elog(ERROR, "Failed to export snapshot for amcheck in database %s", dbname);
if (snapshot)
free(snapshot);
snapshot = pgut_malloc(strlen(PQgetvalue(res, 0, 0)) + 1);
strcpy(snapshot, PQgetvalue(res, 0, 0));
PQclear(res);
res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname " res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname "
"FROM pg_index idx " "FROM pg_index idx "
"JOIN pg_class cls ON cls.oid=idx.indexrelid " "JOIN pg_class cls ON cls.oid=idx.indexrelid "
"JOIN pg_am am ON am.oid=cls.relam " "JOIN pg_am am ON am.oid=cls.relam "
"JOIN pg_tablespace tbl ON tbl.oid=cls.reltablespace "
"WHERE am.amname='btree' AND cls.relpersistence != 't' " "WHERE am.amname='btree' AND cls.relpersistence != 't' "
" AND idx.indisready AND idx.indisvalid AND cls.reltablespace!=1664; ", 0, NULL); "AND idx.indisready AND idx.indisvalid "
"AND tbl.spcname !='pg_global'", 0, NULL);
} }
/* add info needed to check indexes into index_list */ /* add info needed to check indexes into index_list */
@ -3582,9 +3554,6 @@ get_index_list(PGresult *res_db, int db_number,
ind->dbname = pgut_malloc(strlen(dbname) + 1); ind->dbname = pgut_malloc(strlen(dbname) + 1);
strcpy(ind->dbname, dbname); strcpy(ind->dbname, dbname);
ind->snapshot = pgut_malloc(strlen(snapshot) + 1);
strcpy(ind->snapshot, snapshot);
ind->amcheck_nspname = pgut_malloc(strlen(nspname) + 1); ind->amcheck_nspname = pgut_malloc(strlen(nspname) + 1);
strcpy(ind->amcheck_nspname, nspname); strcpy(ind->amcheck_nspname, nspname);
pg_atomic_clear_flag(&ind->lock); pg_atomic_clear_flag(&ind->lock);
@ -3598,6 +3567,8 @@ get_index_list(PGresult *res_db, int db_number,
PQclear(res); PQclear(res);
elog(INFO, "Amcheck database '%s'", dbname);
return index_list; return index_list;
} }
@ -3613,27 +3584,26 @@ amcheck_one_index(backup_files_arg *arguments,
sprintf(params[0], "%i", ind->indexrelid); sprintf(params[0], "%i", ind->indexrelid);
elog(VERBOSE, "amcheck index: '%s'", ind->name);
query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1)")+1); query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1)")+1);
sprintf(query, "SELECT %s.bt_index_check($1)", ind->amcheck_nspname); sprintf(query, "SELECT %s.bt_index_check($1)", ind->amcheck_nspname);
res = pgut_execute_parallel(arguments->backup_conn, res = pgut_execute_parallel(arguments->backup_conn,
arguments->cancel_conn, arguments->cancel_conn,
query, 1, (const char **)params, true); query, 1, (const char **)params, true, true);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
elog(VERBOSE, "amcheck failed for relation oid %u: %s", elog(WARNING, "Amcheck failed for index: '%s.%s': %s",
ind->indexrelid, PQresultErrorMessage(res)); ind->amcheck_nspname,
ind->name, PQresultErrorMessage(res));
pfree(params[0]); pfree(params[0]);
PQclear(res); PQclear(res);
return false; return false;
} }
else else
elog(VERBOSE, "amcheck succeeded for relation oid %u", elog(LOG, "Amcheck succeeded for index: '%s.%s'",
ind->indexrelid); ind->amcheck_nspname, ind->name);
pfree(params[0]); pfree(params[0]);
PQclear(res); PQclear(res);

View File

@ -359,7 +359,7 @@ PGresult *
pgut_execute_parallel(PGconn* conn, pgut_execute_parallel(PGconn* conn,
PGcancel* thread_cancel_conn, const char *query, PGcancel* thread_cancel_conn, const char *query,
int nParams, const char **params, int nParams, const char **params,
bool text_result) bool text_result, bool ok_error)
{ {
PGresult *res; PGresult *res;
@ -405,6 +405,9 @@ pgut_execute_parallel(PGconn* conn,
case PGRES_COPY_IN: case PGRES_COPY_IN:
break; break;
default: default:
if (ok_error && PQresultStatus(res) == PGRES_FATAL_ERROR)
break;
elog(ERROR, "query failed: %squery was: %s", elog(ERROR, "query failed: %squery was: %s",
PQerrorMessage(conn), query); PQerrorMessage(conn), query);
break; break;

View File

@ -57,7 +57,7 @@ extern PGresult *pgut_execute_extended(PGconn* conn, const char *query, int nPar
const char **params, bool text_result, bool ok_error); const char **params, bool text_result, bool ok_error);
extern PGresult *pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn, extern PGresult *pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn,
const char *query, int nParams, const char *query, int nParams,
const char **params, bool text_result); const char **params, bool text_result, bool ok_error);
extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel); extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);
extern void pgut_cancel(PGconn* conn); extern void pgut_cancel(PGconn* conn);
extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout); extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout);