diff --git a/src/backup.c b/src/backup.c index 52bc72cf..b41cbdeb 100644 --- a/src/backup.c +++ b/src/backup.c @@ -109,7 +109,7 @@ static void do_amcheck(void); static void *check_files(void *arg); static void *check_indexes(void *arg); static parray* get_index_list(PGresult* res_db, int db_number, - bool* first_db_with_amcheck, PGconn* db_conn); + bool first_db_with_amcheck, PGconn* db_conn); static bool amcheck_one_index(backup_files_arg *arguments, pg_indexEntry *ind); @@ -1049,13 +1049,19 @@ do_amcheck(void) pgBackupGetPath(¤t, database_path, lengthof(database_path), DATABASE_DIR); - res_db = pgut_execute(backup_conn, "SELECT datname, oid, dattablespace FROM pg_database", + res_db = pgut_execute(backup_conn, + "SELECT datname, oid, dattablespace " + "FROM pg_database " + "WHERE datname NOT IN ('template0', 'template1')", 0, NULL); + n_databases = PQntuples(res_db); /* TODO Warn user that one connection is used for snaphot */ - if (num_threads > 1) - num_threads--; + //if (num_threads > 1) + // num_threads--; + + elog(INFO, "Start checking instance with amcheck"); /* For each database check indexes. In parallel. */ for(i = 0; i < n_databases; i++) @@ -1069,7 +1075,8 @@ do_amcheck(void) } index_list = get_index_list(res_db, i, - &first_db_with_amcheck, db_conn); + first_db_with_amcheck, db_conn); + if (index_list == NULL) { if (db_conn) @@ -1077,6 +1084,8 @@ do_amcheck(void) continue; } + first_db_with_amcheck = false; + /* init thread args with own file lists */ threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads); @@ -1099,8 +1108,6 @@ do_amcheck(void) pgut_atexit_push(threads_conn_disconnect, NULL); - elog(INFO, "Start checking indexes with amcheck"); - /* Run threads */ for (j = 0; j < num_threads; j++) { @@ -1115,7 +1122,7 @@ do_amcheck(void) for (j = 0; j < num_threads; j++) { pthread_join(threads[j], NULL); - if (threads_args[j].ret == 1) + if (threads_args[j].ret > 0) check_isok = false; } pgut_disconnect(db_conn); @@ -1123,16 +1130,16 @@ do_amcheck(void) /* TODO write better info message */ if (check_isok) - elog(INFO, "Indexes are checked"); + elog(INFO, "Indexes are valid"); else - elog(ERROR, "Indexs checking failed"); + elog(ERROR, "Some indexes are corrupted"); - if (backup_files_list) - { - parray_walk(backup_files_list, pgFileFree); - parray_free(backup_files_list); - backup_files_list = NULL; - } + //if (backup_files_list) + //{ + // parray_walk(backup_files_list, pgFileFree); + // parray_free(backup_files_list); + // backup_files_list = NULL; + //} } /* Entry point of pg_probackup CHECKDB subcommand. */ @@ -1150,8 +1157,8 @@ do_checkdb(bool need_amcheck) if (!skip_block_validation) do_block_validation(); - //if (need_amcheck) - // do_amcheck(); + if (need_amcheck) + do_amcheck(); return 0; } @@ -2540,7 +2547,7 @@ threads_conn_disconnect(bool fatal, void *userdata) { // int i; - elog(VERBOSE, "threads_conn_disconnect, num_threads %d", num_threads); +// elog(VERBOSE, "threads_conn_disconnect, num_threads %d", num_threads); // for (i = 0; i < num_threads; i++) // { // backup_files_arg *arg = &(threads_args[i]); @@ -2654,7 +2661,9 @@ check_indexes(void *arg) if (interrupted || thread_interrupted) elog(ERROR, "interrupted during checkdb --amcheck"); - elog(VERBOSE, "Checking index number %d of %d : \"%s\" ", i,n_indexes, ind->name); + if (progress) + elog(INFO, "Progress: (%d/%d). Processing index '%s.%s'", + i + 1, n_indexes, ind->amcheck_nspname, ind->name); if (arguments->backup_conn == NULL) { @@ -2666,26 +2675,11 @@ check_indexes(void *arg) ind->dbname, instance_config.pguser); arguments->cancel_conn = PQgetCancel(arguments->backup_conn); - - res = pgut_execute_parallel(arguments->backup_conn, - arguments->cancel_conn, - "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;", 0, NULL, false); - PQclear(res); - - query = palloc(strlen("SET TRANSACTION SNAPSHOT '' ") + strlen(ind->snapshot)); - sprintf(query, "SET TRANSACTION SNAPSHOT '%s'", ind->snapshot); - - res = pgut_execute_parallel(arguments->backup_conn, - arguments->cancel_conn, - query, 0, NULL, false); - PQclear(res); - - free(query); } /* remember that we have a failed check */ if (!amcheck_one_index(arguments, ind)) - arguments->ret = 1; + arguments->ret = 2; } /* Close connection */ @@ -2693,7 +2687,8 @@ check_indexes(void *arg) pgut_disconnect(arguments->backup_conn); /* TODO where should we set arguments->ret to 1? */ - arguments->ret = 0; + if (arguments->ret == 1) + arguments->ret = 0; return NULL; } @@ -3410,7 +3405,7 @@ pg_ptrack_get_block(backup_files_arg *arguments, res = pgut_execute_parallel(arguments->backup_conn, arguments->cancel_conn, "SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)", - 4, (const char **)params, true); + 4, (const char **)params, true, false); if (PQnfields(res) != 1) { @@ -3478,10 +3473,10 @@ check_external_for_tablespaces(parray *external_list) PQclear(res); } -/* Clear ptrack files in all databases of the instance we connected to */ +/* Get index list for given database */ static parray* get_index_list(PGresult *res_db, int db_number, - bool *first_db_with_amcheck, PGconn *db_conn) + bool first_db_with_amcheck, PGconn *db_conn) { PGresult *res; char *nspname = NULL; @@ -3489,27 +3484,28 @@ get_index_list(PGresult *res_db, int db_number, int i; dbname = PQgetvalue(res_db, db_number, 0); - if (strcmp(dbname, "template0") == 0) - return NULL; db_conn = pgut_connect(instance_config.pghost, instance_config.pgport, dbname, instance_config.pguser); - res = pgut_execute(db_conn, "select extname, nspname, extversion from pg_namespace " - "n join pg_extension e on n.oid=e.extnamespace where e.extname='amcheck'", + res = pgut_execute(db_conn, "SELECT extname, nspname, extversion " + "FROM pg_namespace n " + "JOIN pg_extension e " + "ON n.oid=e.extnamespace " + "WHERE e.extname='amcheck'", 0, NULL); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); - elog(ERROR, "cannot check if amcheck is installed in database %s: %s", + elog(ERROR, "Cannot check if amcheck is installed in database %s: %s", dbname, PQerrorMessage(db_conn)); } if (PQntuples(res) < 1) { - elog(WARNING, "extension amcheck is not installed in database %s", dbname); + elog(WARNING, "Extension amcheck is not installed in database %s", dbname); return NULL; } @@ -3520,56 +3516,32 @@ get_index_list(PGresult *res_db, int db_number, * In order to avoid duplicates, select global indexes * (tablespace pg_global with oid 1664) only once */ - if (*first_db_with_amcheck) + if (first_db_with_amcheck) { - res = pgut_execute(db_conn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;", 0, NULL); - PQclear(res); - - res = pgut_execute(db_conn, "SELECT pg_export_snapshot();", 0, NULL); - - if (PQntuples(res) < 1) - elog(ERROR, "Failed to export snapshot for amcheck in database %s", dbname); - - snapshot = pgut_malloc(strlen(PQgetvalue(res, 0, 0)) + 1); - strcpy(snapshot, PQgetvalue(res, 0, 0)); - - PQclear(res); + /* select only valid btree and persistent indexes */ res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname" " FROM pg_index idx " " JOIN pg_class cls ON cls.oid=idx.indexrelid " " JOIN pg_am am ON am.oid=cls.relam " " WHERE am.amname='btree' AND cls.relpersistence != 't'" " AND idx.indisready AND idx.indisvalid; ", 0, NULL); - *first_db_with_amcheck = false; } else { - res = pgut_execute(db_conn, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;", 0, NULL); - PQclear(res); - res = pgut_execute(db_conn, "SELECT pg_export_snapshot();", 0, NULL); - - if (PQntuples(res) < 1) - elog(ERROR, "Failed to export snapshot for amcheck in database %s", dbname); - - if (snapshot) - free(snapshot); - snapshot = pgut_malloc(strlen(PQgetvalue(res, 0, 0)) + 1); - strcpy(snapshot, PQgetvalue(res, 0, 0)); - - PQclear(res); - - res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname" - " FROM pg_index idx " - " JOIN pg_class cls ON cls.oid=idx.indexrelid " - " JOIN pg_am am ON am.oid=cls.relam " - " WHERE am.amname='btree' AND cls.relpersistence != 't'" - " AND idx.indisready AND idx.indisvalid AND cls.reltablespace!=1664; ", 0, NULL); + res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname " + "FROM pg_index idx " + "JOIN pg_class cls ON cls.oid=idx.indexrelid " + "JOIN pg_am am ON am.oid=cls.relam " + "JOIN pg_tablespace tbl ON tbl.oid=cls.reltablespace " + "WHERE am.amname='btree' AND cls.relpersistence != 't' " + "AND idx.indisready AND idx.indisvalid " + "AND tbl.spcname !='pg_global'", 0, NULL); } /* add info needed to check indexes into index_list */ - for(i = 0; i < PQntuples(res); i++) + for (i = 0; i < PQntuples(res); i++) { pg_indexEntry *ind = (pg_indexEntry *) pgut_malloc(sizeof(pg_indexEntry)); char *name = NULL; @@ -3582,9 +3554,6 @@ get_index_list(PGresult *res_db, int db_number, ind->dbname = pgut_malloc(strlen(dbname) + 1); strcpy(ind->dbname, dbname); - ind->snapshot = pgut_malloc(strlen(snapshot) + 1); - strcpy(ind->snapshot, snapshot); - ind->amcheck_nspname = pgut_malloc(strlen(nspname) + 1); strcpy(ind->amcheck_nspname, nspname); pg_atomic_clear_flag(&ind->lock); @@ -3598,6 +3567,8 @@ get_index_list(PGresult *res_db, int db_number, PQclear(res); + elog(INFO, "Amcheck database '%s'", dbname); + return index_list; } @@ -3613,27 +3584,26 @@ amcheck_one_index(backup_files_arg *arguments, sprintf(params[0], "%i", ind->indexrelid); - elog(VERBOSE, "amcheck index: '%s'", ind->name); - query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1)")+1); sprintf(query, "SELECT %s.bt_index_check($1)", ind->amcheck_nspname); res = pgut_execute_parallel(arguments->backup_conn, arguments->cancel_conn, - query, 1, (const char **)params, true); + query, 1, (const char **)params, true, true); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - elog(VERBOSE, "amcheck failed for relation oid %u: %s", - ind->indexrelid, PQresultErrorMessage(res)); + elog(WARNING, "Amcheck failed for index: '%s.%s': %s", + ind->amcheck_nspname, + ind->name, PQresultErrorMessage(res)); pfree(params[0]); PQclear(res); return false; } else - elog(VERBOSE, "amcheck succeeded for relation oid %u", - ind->indexrelid); + elog(LOG, "Amcheck succeeded for index: '%s.%s'", + ind->amcheck_nspname, ind->name); pfree(params[0]); PQclear(res); diff --git a/src/utils/pgut.c b/src/utils/pgut.c index 7e36eaf8..bdd99f2b 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -359,7 +359,7 @@ PGresult * pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn, const char *query, int nParams, const char **params, - bool text_result) + bool text_result, bool ok_error) { PGresult *res; @@ -405,6 +405,9 @@ pgut_execute_parallel(PGconn* conn, case PGRES_COPY_IN: break; default: + if (ok_error && PQresultStatus(res) == PGRES_FATAL_ERROR) + break; + elog(ERROR, "query failed: %squery was: %s", PQerrorMessage(conn), query); break; diff --git a/src/utils/pgut.h b/src/utils/pgut.h index ca44fb39..350b9672 100644 --- a/src/utils/pgut.h +++ b/src/utils/pgut.h @@ -57,7 +57,7 @@ extern PGresult *pgut_execute_extended(PGconn* conn, const char *query, int nPar const char **params, bool text_result, bool ok_error); extern PGresult *pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn, const char *query, int nParams, - const char **params, bool text_result); + const char **params, bool text_result, bool ok_error); extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel); extern void pgut_cancel(PGconn* conn); extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout);