diff --git a/src/backup.c b/src/backup.c index 0843fe24..6acbda70 100644 --- a/src/backup.c +++ b/src/backup.c @@ -49,6 +49,8 @@ const char *progname = "pg_probackup"; /* list of files contained in backup */ static parray *backup_files_list = NULL; +/* list of indexes for use in checkdb --amcheck */ +static parray *index_list = NULL; /* We need critical section for datapagemap_add() in case of using threads */ static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -73,8 +75,8 @@ static StreamThreadArg stream_thread_arg = {"", NULL, 1}; static int is_ptrack_enable = false; bool is_ptrack_support = false; -bool is_checksum_enabled = false; bool exclusive_backup = false; +bool heapallindexed_is_supported = false; /* Backup connections */ static PGconn *backup_conn = NULL; @@ -96,11 +98,22 @@ static bool pg_stop_backup_is_sent = false; static void backup_cleanup(bool fatal, void *userdata); static void backup_disconnect(bool fatal, void *userdata); +static void pgdata_basic_setup(bool amcheck_only); + static void *backup_files(void *arg); static void *remote_backup_files(void *arg); static void do_backup_instance(void); +static void do_block_validation(void); +static void do_amcheck(void); +static void *check_files(void *arg); +static void *check_indexes(void *arg); +static parray* get_index_list(PGresult* res_db, int db_number, + bool first_db_with_amcheck, PGconn* db_conn); +static bool amcheck_one_index(backup_files_arg *arguments, + pg_indexEntry *ind); + static void pg_start_backup(const char *label, bool smooth, pgBackup *backup); static void pg_switch_wal(PGconn *conn); static void pg_stop_backup(pgBackup *backup); @@ -929,19 +942,268 @@ do_backup_instance(void) backup_files_list = NULL; } -/* - * Entry point of pg_probackup BACKUP subcommand. - */ -int -do_backup(time_t start_time, bool no_validate) +/* collect list of files and run threads to check files in the instance */ +static void +do_block_validation(void) { - /* PGDATA and BACKUP_MODE are always required */ - if (instance_config.pgdata == NULL) + int i; + char database_path[MAXPGPATH]; + /* arrays with meta info for multi threaded backup */ + pthread_t *threads; + backup_files_arg *threads_args; + bool check_isok = true; + + pgBackupGetPath(¤t, database_path, lengthof(database_path), + DATABASE_DIR); + + /* initialize file list */ + backup_files_list = parray_new(); + + /* list files with the logical path. omit $PGDATA */ + dir_list_file(backup_files_list, instance_config.pgdata, + true, true, false, 0, FIO_DB_HOST); + + /* + * Sort pathname ascending. + * + * For example: + * 1 - create 'base' + * 2 - create 'base/1' + */ + parray_qsort(backup_files_list, pgFileComparePath); + /* Extract information about files in backup_list parsing their names:*/ + parse_backup_filelist_filenames(backup_files_list, instance_config.pgdata); + + /* setup threads */ + for (i = 0; i < parray_num(backup_files_list); i++) + { + pgFile *file = (pgFile *) parray_get(backup_files_list, i); + pg_atomic_clear_flag(&file->lock); + } + + /* Sort by size for load balancing */ + parray_qsort(backup_files_list, pgFileCompareSize); + + /* init thread args with own file lists */ + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads); + + for (i = 0; i < num_threads; i++) + { + backup_files_arg *arg = &(threads_args[i]); + + arg->from_root = instance_config.pgdata; + arg->to_root = database_path; + arg->files_list = backup_files_list; + arg->index_list = NULL; + arg->prev_filelist = NULL; + arg->prev_start_lsn = InvalidXLogRecPtr; + arg->backup_conn = NULL; + arg->cancel_conn = NULL; + /* By default there are some error */ + arg->ret = 1; + } + + /* TODO write better info message */ + elog(INFO, "Start checking data files"); + + /* Run threads */ + for (i = 0; i < num_threads; i++) + { + backup_files_arg *arg = &(threads_args[i]); + + elog(VERBOSE, "Start thread num: %i", i); + + pthread_create(&threads[i], NULL, check_files, arg); + } + + /* Wait threads */ + for (i = 0; i < num_threads; i++) + { + pthread_join(threads[i], NULL); + if (threads_args[i].ret > 0) + check_isok = false; + } + + /* cleanup */ + if (backup_files_list) + { + parray_walk(backup_files_list, pgFileFree); + parray_free(backup_files_list); + backup_files_list = NULL; + } + + /* TODO write better info message */ + if (check_isok) + elog(INFO, "Data files are valid"); + else + elog(ERROR, "Checkdb failed"); +} + +/* + * Entry point of checkdb --amcheck. + * + * Connect to all databases in the cluster + * and get list of persistent indexes, + * then run parallel threads to perform bt_index_check() + * for all indexes from the list. + * + * If amcheck extension is not installed in the database, + * skip this database and report it via warning message. + */ +static void +do_amcheck(void) +{ + int i; + char database_path[MAXPGPATH]; + /* arrays with meta info for multi threaded backup */ + pthread_t *threads; + backup_files_arg *threads_args; + bool check_isok = true; + PGresult *res_db; + int n_databases = 0; + bool first_db_with_amcheck = true; + PGconn *db_conn = NULL; + bool db_skipped = false; + + pgBackupGetPath(¤t, database_path, lengthof(database_path), + DATABASE_DIR); + + res_db = pgut_execute(backup_conn, + "SELECT datname, oid, dattablespace " + "FROM pg_database " + "WHERE datname NOT IN ('template0', 'template1')", + 0, NULL); + + n_databases = PQntuples(res_db); + + elog(INFO, "Start amchecking PostgreSQL instance"); + + /* For each database check indexes. In parallel. */ + for(i = 0; i < n_databases; i++) + { + int j; + + if (index_list != NULL) + { + free(index_list); + index_list = NULL; + } + + index_list = get_index_list(res_db, i, + first_db_with_amcheck, db_conn); + + if (index_list == NULL) + { + if (db_conn) + pgut_disconnect(db_conn); + db_skipped = true; + continue; + } + + first_db_with_amcheck = false; + + /* init thread args with own file lists */ + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads); + + for (j = 0; j < num_threads; j++) + { + backup_files_arg *arg = &(threads_args[j]); + + arg->from_root = instance_config.pgdata; + arg->to_root = database_path; + arg->files_list = NULL; + arg->index_list = index_list; + arg->prev_filelist = NULL; + arg->prev_start_lsn = InvalidXLogRecPtr; + arg->backup_conn = NULL; + arg->cancel_conn = NULL; + arg->thread_num = j + 1; + /* By default there are some error */ + arg->ret = 1; + } + + /* Run threads */ + for (j = 0; j < num_threads; j++) + { + backup_files_arg *arg = &(threads_args[j]); + elog(VERBOSE, "Start thread num: %i", j); + pthread_create(&threads[j], NULL, check_indexes, arg); + } + + /* Wait threads */ + for (j = 0; j < num_threads; j++) + { + pthread_join(threads[j], NULL); + if (threads_args[j].ret > 0) + check_isok = false; + } + + /* cleanup */ + pgut_disconnect(db_conn); + + if (interrupted) + break; + } + + /* Inform user about amcheck results */ + if (!check_isok || interrupted) + elog(ERROR, "Checkdb --amcheck failed"); + + if (db_skipped) + elog(ERROR, "Some databases were not amchecked"); + + elog(INFO, "Checkdb --amcheck executed successfully"); + + /* We cannot state that all indexes are ok + * without checking indexes in all databases + */ + if (check_isok && !interrupted && !db_skipped) + elog(INFO, "Indexes are valid"); + + /* cleanup */ + PQclear(res_db); +} + +/* Entry point of pg_probackup CHECKDB subcommand. */ +void +do_checkdb(bool need_amcheck) +{ + bool amcheck_only = false; + + if (skip_block_validation && !need_amcheck) + elog(ERROR, "Option '--skip-block-validation' must be used with '--amcheck' option"); + + if (skip_block_validation && need_amcheck) + amcheck_only = true; + + pgdata_basic_setup(amcheck_only); + + if (!skip_block_validation) + do_block_validation(); + + if (need_amcheck) + do_amcheck(); +} + +/* + * Common code for CHECKDB and BACKUP commands. + * Ensure that we're able to connect to the instance + * check compatibility and fill basic info. + * For checkdb launched in amcheck mode with pgdata validation + * do not check system ID, it gives user an opportunity to + * check remote PostgreSQL instance. + * Also checking system ID in this case serves no purpose, because + * all work is done by server. + */ +static void +pgdata_basic_setup(bool amcheck_only) +{ + /* PGDATA is always required unless running checkdb in amcheck only mode */ + if (!instance_config.pgdata && !amcheck_only) elog(ERROR, "required parameter not specified: PGDATA " "(-D, --pgdata)"); - if (current.backup_mode == BACKUP_MODE_INVALID) - elog(ERROR, "required parameter not specified: BACKUP_MODE " - "(-b, --backup-mode)"); /* Create connection for PostgreSQL */ backup_conn = pgut_connect(instance_config.pghost, instance_config.pgport, @@ -951,14 +1213,6 @@ do_backup(time_t start_time, bool no_validate) current.primary_conninfo = pgut_get_conninfo_string(backup_conn); -#if PG_VERSION_NUM >= 110000 - if (!RetrieveWalSegSize(backup_conn)) - elog(ERROR, "Failed to retreive wal_segment_size"); -#endif - - current.compress_alg = instance_config.compress_alg; - current.compress_level = instance_config.compress_level; - /* Confirm data block size and xlog block size are compatible */ confirm_block_size("block_size", BLCKSZ); confirm_block_size("wal_block_size", XLOG_BLCKSZ); @@ -968,13 +1222,21 @@ do_backup(time_t start_time, bool no_validate) /* Confirm that this server version is supported */ check_server_version(); - /* TODO fix it for remote backup*/ - if (!IsReplicationProtocol()) - current.checksum_version = get_data_checksum_version(true); + if (pg_checksum_enable()) + current.checksum_version = 1; + else + current.checksum_version = 0; - is_checksum_enabled = pg_checksum_enable(); + /* + * Ensure that backup directory was initialized for the same PostgreSQL + * instance we opened connection to. And that target backup database PGDATA + * belogns to the same instance. + */ + /* TODO fix it for remote backup */ + if (!IsReplicationProtocol() && !amcheck_only) + check_system_identifiers(); - if (is_checksum_enabled) + if (current.checksum_version) elog(LOG, "This PostgreSQL instance was initialized with data block checksums. " "Data block corruption will be detected"); else @@ -984,6 +1246,29 @@ do_backup(time_t start_time, bool no_validate) StrNCpy(current.server_version, server_version_str, sizeof(current.server_version)); +} + +/* + * Entry point of pg_probackup BACKUP subcommand. + */ +int +do_backup(time_t start_time, bool no_validate) +{ + /* + * setup backup_conn, do some compatibility checks and + * fill basic info about instance + */ + pgdata_basic_setup(false); + + /* below perform checks specific for backup command */ +#if PG_VERSION_NUM >= 110000 + if (!RetrieveWalSegSize(backup_conn)) + elog(ERROR, "Failed to retreive wal_segment_size"); +#endif + + current.compress_alg = instance_config.compress_alg; + current.compress_level = instance_config.compress_level; + current.stream = stream_wal; is_ptrack_support = pg_ptrack_support(); @@ -1016,15 +1301,6 @@ do_backup(time_t start_time, bool no_validate) instance_config.master_user); } - /* - * Ensure that backup directory was initialized for the same PostgreSQL - * instance we opened connection to. And that target backup database PGDATA - * belogns to the same instance. - */ - /* TODO fix it for remote backup */ - if (!IsReplicationProtocol()) - check_system_identifiers(); - /* Start backup. Update backup status. */ current.status = BACKUP_STATUS_RUNNING; current.start_time = start_time; @@ -1171,6 +1447,18 @@ check_system_identifiers(void) system_id_pgdata = get_system_identifier(instance_config.pgdata); system_id_conn = get_remote_system_identifier(backup_conn); + /* for checkdb check only system_id_pgdata and system_id_conn */ + if (current.backup_mode == BACKUP_MODE_INVALID) + { + if (system_id_conn != system_id_pgdata) + { + elog(ERROR, "Data directory initialized with system id " UINT64_FORMAT ", " + "but connected instance system id is " UINT64_FORMAT, + system_id_pgdata, system_id_conn); + } + return; + } + if (system_id_conn != instance_config.system_identifier) elog(ERROR, "Backup data directory was initialized for system id " UINT64_FORMAT ", " "but connected instance system id is " UINT64_FORMAT, @@ -2272,6 +2560,154 @@ backup_disconnect(bool fatal, void *userdata) pgut_disconnect(master_conn); } +/* + * Check files in PGDATA. + * Read all files listed in backup_files_list. + * If the file is 'datafile' (regular relation's main fork), read it page by page, + * verify checksum and copy. + */ +static void * +check_files(void *arg) +{ + int i; + backup_files_arg *arguments = (backup_files_arg *) arg; + int n_backup_files_list = 0; + + if (arguments->files_list) + n_backup_files_list = parray_num(arguments->files_list); + + /* check a file */ + for (i = 0; i < n_backup_files_list; i++) + { + int ret; + struct stat buf; + pgFile *file = (pgFile *) parray_get(arguments->files_list, i); + + if (!pg_atomic_test_set_flag(&file->lock)) + continue; + + elog(VERBOSE, "Checking file: \"%s\" ", file->path); + + /* check for interrupt */ + if (interrupted || thread_interrupted) + elog(ERROR, "interrupted during checkdb"); + + if (progress) + elog(INFO, "Progress: (%d/%d). Process file \"%s\"", + i + 1, n_backup_files_list, file->path); + + /* stat file to check its current state */ + ret = stat(file->path, &buf); + if (ret == -1) + { + if (errno == ENOENT) + { + /* + * If file is not found, this is not en error. + * It could have been deleted by concurrent postgres transaction. + */ + elog(LOG, "File \"%s\" is not found", file->path); + continue; + } + else + { + elog(ERROR, + "can't stat file to check \"%s\": %s", + file->path, strerror(errno)); + } + } + + /* No need to check directories */ + if (S_ISDIR(buf.st_mode)) + continue; + + if (S_ISREG(buf.st_mode)) + { + /* check only uncompressed by cfs datafiles */ + if (file->is_datafile && !file->is_cfs) + { + char to_path[MAXPGPATH]; + + join_path_components(to_path, arguments->to_root, + file->path + strlen(arguments->from_root) + 1); + + if (!check_data_file(arguments, file)) + arguments->ret = 2; /* corruption found */ + } + } + else + elog(WARNING, "unexpected file type %d", buf.st_mode); + } + + /* Ret values: + * 0 everything is ok + * 1 thread errored during execution, e.g. interruption (default value) + * 2 corruption is definitely(!) found + */ + if (arguments->ret == 1) + arguments->ret = 0; + + return NULL; +} + +/* Check indexes with amcheck */ +static void * +check_indexes(void *arg) +{ + int i; + backup_files_arg *arguments = (backup_files_arg *) arg; + int n_indexes = 0; + + if (arguments->index_list) + n_indexes = parray_num(arguments->index_list); + + for (i = 0; i < n_indexes; i++) + { + pg_indexEntry *ind = (pg_indexEntry *) parray_get(arguments->index_list, i); + + if (!pg_atomic_test_set_flag(&ind->lock)) + continue; + + /* check for interrupt */ + if (interrupted || thread_interrupted) + elog(ERROR, "Thread [%d]: interrupted during checkdb --amcheck", + arguments->thread_num); + + if (progress) + elog(INFO, "Thread [%d]. Progress: (%d/%d). Amchecking index '%s.%s'", + arguments->thread_num, i + 1, n_indexes, + ind->amcheck_nspname, ind->name); + + if (arguments->backup_conn == NULL) + { + + arguments->backup_conn = pgut_connect(instance_config.pghost, + instance_config.pgport, + ind->dbname, + instance_config.pguser); + arguments->cancel_conn = PQgetCancel(arguments->backup_conn); + } + + /* remember that we have a failed check */ + if (!amcheck_one_index(arguments, ind)) + arguments->ret = 2; /* corruption found */ + } + + /* Close connection */ + if (arguments->backup_conn) + pgut_disconnect(arguments->backup_conn); + + /* Ret values: + * 0 everything is ok + * 1 thread errored during execution, e.g. interruption (default value) + * 2 corruption is definitely(!) found + */ + if (arguments->ret == 1) + arguments->ret = 0; + + return NULL; +} + /* * Take a backup of the PGDATA at a file level. * Copy all directories and files listed in backup_files_list. @@ -2358,6 +2794,7 @@ backup_files(void *arg) /* File exists in previous backup */ file->exists_in_prev = true; } + /* copy the file into backup */ if (file->is_datafile && !file->is_cfs) { @@ -2985,7 +3422,7 @@ pg_ptrack_get_block(backup_files_arg *arguments, res = pgut_execute_parallel(arguments->backup_conn, arguments->cancel_conn, "SELECT pg_catalog.pg_ptrack_get_block_2($1, $2, $3, $4)", - 4, (const char **)params, true); + 4, (const char **)params, true, false, false); if (PQnfields(res) != 1) { @@ -3052,3 +3489,172 @@ check_external_for_tablespaces(parray *external_list) } PQclear(res); } + +/* Get index list for given database */ +static parray* +get_index_list(PGresult *res_db, int db_number, + bool first_db_with_amcheck, PGconn *db_conn) +{ + PGresult *res; + char *nspname = NULL; + int i; + + dbname = PQgetvalue(res_db, db_number, 0); + + db_conn = pgut_connect(instance_config.pghost, instance_config.pgport, + dbname, + instance_config.pguser); + + res = pgut_execute(db_conn, "SELECT " + "extname, nspname, extversion " + "FROM pg_namespace n " + "JOIN pg_extension e " + "ON n.oid=e.extnamespace " + "WHERE e.extname IN ('amcheck', 'amcheck_next') " + "ORDER BY extversion DESC " + "LIMIT 1", + 0, NULL); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + elog(ERROR, "Cannot check if amcheck is installed in database %s: %s", + dbname, PQerrorMessage(db_conn)); + } + + if (PQntuples(res) < 1) + { + elog(WARNING, "Extension 'amcheck' or 'amcheck_next' are not installed in database %s", dbname); + return NULL; + } + + nspname = pgut_malloc(strlen(PQgetvalue(res, 0, 1)) + 1); + strcpy(nspname, PQgetvalue(res, 0, 1)); + + /* heapallindexed_is_supported is database specific */ + if (strcmp(PQgetvalue(res, 0, 2), "1.0") != 0 && + strcmp(PQgetvalue(res, 0, 2), "1") != 0) + heapallindexed_is_supported = true; + + elog(INFO, "Amchecking database '%s' using extension '%s' version %s from schema '%s'", + dbname, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 2), PQgetvalue(res, 0, 1)); + + if (!heapallindexed_is_supported && heapallindexed) + elog(WARNING, "Extension '%s' verion %s in schema '%s' do not support 'heapallindexed' option", + PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 2), PQgetvalue(res, 0, 1)); + + /* + * In order to avoid duplicates, select global indexes + * (tablespace pg_global with oid 1664) only once. + * + * select only persistent btree indexes. + */ + if (first_db_with_amcheck) + { + + res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname " + "FROM pg_index idx " + "JOIN pg_class cls ON idx.indexrelid=cls.oid " + "JOIN pg_am am ON cls.relam=am.oid " + "WHERE am.amname='btree' AND cls.relpersistence != 't'", + 0, NULL); + } + else + { + + res = pgut_execute(db_conn, "SELECT cls.oid, cls.relname " + "FROM pg_index idx " + "JOIN pg_class cls ON idx.indexrelid=cls.oid " + "JOIN pg_am am ON cls.relam=am.oid " + "LEFT JOIN pg_tablespace tbl " + "ON cls.reltablespace=tbl.oid " + "AND tbl.spcname <> 'pg_global' " + "WHERE am.amname='btree' AND cls.relpersistence != 't'", + 0, NULL); + } + + /* add info needed to check indexes into index_list */ + for (i = 0; i < PQntuples(res); i++) + { + pg_indexEntry *ind = (pg_indexEntry *) pgut_malloc(sizeof(pg_indexEntry)); + char *name = NULL; + + ind->indexrelid = atoi(PQgetvalue(res, i, 0)); + name = PQgetvalue(res, i, 1); + ind->name = pgut_malloc(strlen(name) + 1); + strcpy(ind->name, name); /* enough buffer size guaranteed */ + + ind->dbname = pgut_malloc(strlen(dbname) + 1); + strcpy(ind->dbname, dbname); + + ind->amcheck_nspname = pgut_malloc(strlen(nspname) + 1); + strcpy(ind->amcheck_nspname, nspname); + pg_atomic_clear_flag(&ind->lock); + + if (index_list == NULL) + index_list = parray_new(); + + parray_append(index_list, ind); + } + + PQclear(res); + + return index_list; +} + +/* check one index. Return true if everything is ok, false otherwise. */ +static bool +amcheck_one_index(backup_files_arg *arguments, + pg_indexEntry *ind) +{ + PGresult *res; + char *params[2]; + char *query = NULL; + + params[0] = palloc(64); + + /* first argument is index oid */ + sprintf(params[0], "%i", ind->indexrelid); + /* second argument is heapallindexed */ + params[1] = heapallindexed ? "true" : "false"; + + if (interrupted) + elog(ERROR, "Interrupted"); + + if (heapallindexed_is_supported) + { + query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1, $2)")+1); + sprintf(query, "SELECT %s.bt_index_check($1, $2)", ind->amcheck_nspname); + + res = pgut_execute_parallel(arguments->backup_conn, + arguments->cancel_conn, + query, 2, (const char **)params, true, true, true); + } + else + { + query = palloc(strlen(ind->amcheck_nspname)+strlen("SELECT .bt_index_check($1)")+1); + sprintf(query, "SELECT %s.bt_index_check($1)", ind->amcheck_nspname); + + res = pgut_execute_parallel(arguments->backup_conn, + arguments->cancel_conn, + query, 1, (const char **)params, true, true, true); + } + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + elog(WARNING, "Thread [%d]. Amcheck failed for index: '%s.%s': %s", + arguments->thread_num, ind->amcheck_nspname, + ind->name, PQresultErrorMessage(res)); + + pfree(params[0]); + PQclear(res); + return false; + } + else + elog(LOG, "Thread [%d]. Amcheck succeeded for index: '%s.%s'", + arguments->thread_num, ind->amcheck_nspname, ind->name); + + pfree(params[0]); + PQclear(res); + return true; +} diff --git a/src/data.c b/src/data.c index b70c4dd9..f9cb6630 100644 --- a/src/data.c +++ b/src/data.c @@ -263,7 +263,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum, */ if (pg_checksum_page(page, blkno) != ((PageHeader) page)->pd_checksum) { - elog(WARNING, "File: %s blknum %u have wrong checksum, try again", + elog(LOG, "File: %s blknum %u have wrong checksum, try again", file->path, blknum); return -1; } @@ -289,6 +289,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum, * Returns 0 if page was successfully retrieved * SkipCurrentPage(-3) if we need to skip this page * PageIsTruncated(-2) if the page was truncated + * PageIsCorrupted(-4) if the page check mismatch */ static int32 prepare_page(backup_files_arg *arguments, @@ -296,7 +297,8 @@ prepare_page(backup_files_arg *arguments, BlockNumber blknum, BlockNumber nblocks, FILE *in, BlockNumber *n_skipped, BackupMode backup_mode, - Page page) + Page page, + bool strict) { XLogRecPtr page_lsn = 0; int try_again = 100; @@ -306,7 +308,7 @@ prepare_page(backup_files_arg *arguments, /* check for interrupt */ if (interrupted || thread_interrupted) - elog(ERROR, "Interrupted during backup"); + elog(ERROR, "Interrupted during page reading"); /* * Read the page and verify its header and checksum. @@ -338,7 +340,7 @@ prepare_page(backup_files_arg *arguments, */ //elog(WARNING, "Checksum_Version: %i", current.checksum_version ? 1 : 0); - if (result == -1 && is_ptrack_support) + if (result == -1 && is_ptrack_support && strict) { elog(WARNING, "File %s, block %u, try to fetch via SQL", file->path, blknum); @@ -349,8 +351,27 @@ prepare_page(backup_files_arg *arguments, * If page is not valid after 100 attempts to read it * throw an error. */ - if(!page_is_valid && !is_ptrack_support) - elog(ERROR, "Data file checksum mismatch. Canceling backup"); + + if (!page_is_valid && + ((strict && !is_ptrack_support) || !strict)) + { + /* show this message for checkdb or backup without ptrack support */ + elog(WARNING, "CORRUPTION in file %s, block %u", + file->path, blknum); + } + + /* Backup with invalid block and without ptrack support must throw error */ + if (!page_is_valid && strict && !is_ptrack_support) + elog(ERROR, "Data file corruption. Canceling backup"); + + /* Checkdb not going futher */ + if (!strict) + { + if (page_is_valid) + return 0; + else + return PageIsCorrupted; + } } if (backup_mode == BACKUP_MODE_DIFF_PTRACK || (!page_is_valid && is_ptrack_support)) @@ -381,7 +402,7 @@ prepare_page(backup_files_arg *arguments, */ memcpy(page, ptrack_page, BLCKSZ); free(ptrack_page); - if (is_checksum_enabled) + if (current.checksum_version) ((PageHeader) page)->pd_checksum = pg_checksum_page(page, absolute_blknum); } /* get lsn from page, provided by pg_ptrack_get_block() */ @@ -610,9 +631,9 @@ backup_data_file(backup_files_arg* arguments, { page_state = prepare_page(arguments, file, prev_backup_start_lsn, blknum, nblocks, in, &n_blocks_skipped, - backup_mode, curr_page); + backup_mode, curr_page, true); compress_and_backup_page(file, blknum, in, out, &(file->crc), - page_state, curr_page, calg, clevel); + page_state, curr_page, calg, clevel); n_blocks_read++; if (page_state == PageIsTruncated) break; @@ -634,7 +655,7 @@ backup_data_file(backup_files_arg* arguments, { page_state = prepare_page(arguments, file, prev_backup_start_lsn, blknum, nblocks, in, &n_blocks_skipped, - backup_mode, curr_page); + backup_mode, curr_page, true); compress_and_backup_page(file, blknum, in, out, &(file->crc), page_state, curr_page, calg, clevel); n_blocks_read++; @@ -1456,8 +1477,6 @@ validate_one_page(Page page, pgFile *file, { PageHeader phdr; XLogRecPtr lsn; - bool page_header_is_sane = false; - bool checksum_is_ok = false; /* new level of paranoia */ if (page == NULL) @@ -1487,72 +1506,146 @@ validate_one_page(Page page, pgFile *file, file->path, blknum); } - /* Page is zeroed. No sense to check header and checksum. */ - page_header_is_sane = false; + /* Page is zeroed. No sense in checking header and checksum. */ + return PAGE_IS_FOUND_AND_VALID; } - else + + /* Verify checksum */ + if (checksum_version) { - if (PageGetPageSize(phdr) == BLCKSZ && - PageGetPageLayoutVersion(phdr) == PG_PAGE_LAYOUT_VERSION && - (phdr->pd_flags & ~PD_VALID_FLAG_BITS) == 0 && - phdr->pd_lower >= SizeOfPageHeaderData && - phdr->pd_lower <= phdr->pd_upper && - phdr->pd_upper <= phdr->pd_special && - phdr->pd_special <= BLCKSZ && - phdr->pd_special == MAXALIGN(phdr->pd_special)) - page_header_is_sane = true; + /* Checksums are enabled, so check them. */ + if (!(pg_checksum_page(page, file->segno * RELSEG_SIZE + blknum) + == ((PageHeader) page)->pd_checksum)) + { + elog(WARNING, "File: %s blknum %u have wrong checksum", + file->path, blknum); + return PAGE_IS_FOUND_AND_NOT_VALID; + } } - if (page_header_is_sane) + /* Check page for the sights of insanity. + * TODO: We should give more information about what exactly is looking "wrong" + */ + if (!(PageGetPageSize(phdr) == BLCKSZ && + PageGetPageLayoutVersion(phdr) == PG_PAGE_LAYOUT_VERSION && + (phdr->pd_flags & ~PD_VALID_FLAG_BITS) == 0 && + phdr->pd_lower >= SizeOfPageHeaderData && + phdr->pd_lower <= phdr->pd_upper && + phdr->pd_upper <= phdr->pd_special && + phdr->pd_special <= BLCKSZ && + phdr->pd_special == MAXALIGN(phdr->pd_special))) { - /* Verify checksum */ - if (checksum_version) - { - /* - * If checksum is wrong, sleep a bit and then try again - * several times. If it didn't help, throw error - */ - if (pg_checksum_page(page, file->segno * RELSEG_SIZE + blknum) - == ((PageHeader) page)->pd_checksum) - { - checksum_is_ok = true; - } - else - { - elog(WARNING, "File: %s blknum %u have wrong checksum", - file->path, blknum); - } - } - else - { - /* Get lsn from page header. Ensure that page is from our time */ - lsn = PageXLogRecPtrGet(phdr->pd_lsn); + /* Page does not looking good */ + elog(WARNING, "Page header is looking insane: %s, block %i", + file->path, blknum); + return PAGE_IS_FOUND_AND_NOT_VALID; + } - if (lsn > stop_lsn) - elog(WARNING, "File: %s, block %u, checksum is not enabled. " - "Page is from future: pageLSN %X/%X stopLSN %X/%X", - file->path, blknum, (uint32) (lsn >> 32), (uint32) lsn, - (uint32) (stop_lsn >> 32), (uint32) stop_lsn); - else - return PAGE_IS_FOUND_AND_VALID; - } + /* At this point page header is sane, if checksums are enabled - the`re ok. + * Check that page is not from future. + */ + if (stop_lsn > 0) + { + /* Get lsn from page header. Ensure that page is from our time. */ + lsn = PageXLogRecPtrGet(phdr->pd_lsn); - if (checksum_is_ok) + if (lsn > stop_lsn) { - /* Get lsn from page header. Ensure that page is from our time */ - lsn = PageXLogRecPtrGet(phdr->pd_lsn); - - if (lsn > stop_lsn) - elog(WARNING, "File: %s, block %u, checksum is correct. " - "Page is from future: pageLSN %X/%X stopLSN %X/%X", - file->path, blknum, (uint32) (lsn >> 32), (uint32) lsn, - (uint32) (stop_lsn >> 32), (uint32) stop_lsn); - else - return PAGE_IS_FOUND_AND_VALID; + elog(WARNING, "File: %s, block %u, checksum is %s. " + "Page is from future: pageLSN %X/%X stopLSN %X/%X", + file->path, blknum, checksum_version ? "correct" : "not enabled", + (uint32) (lsn >> 32), (uint32) lsn, + (uint32) (stop_lsn >> 32), (uint32) stop_lsn); + return PAGE_IS_FOUND_AND_NOT_VALID; } } - return PAGE_IS_FOUND_AND_NOT_VALID; + return PAGE_IS_FOUND_AND_VALID; +} + +/* + * Valiate pages of datafile in PGDATA one by one. + * + * returns true if the file is valid + * also returns true if the file was not found + */ +bool +check_data_file(backup_files_arg* arguments, + pgFile *file) +{ + FILE *in; + BlockNumber blknum = 0; + BlockNumber nblocks = 0; + BlockNumber n_blocks_skipped = 0; + int page_state; + char curr_page[BLCKSZ]; + bool is_valid = true; + + in = fopen(file->path, PG_BINARY_R); + if (in == NULL) + { + /* + * If file is not found, this is not en error. + * It could have been deleted by concurrent postgres transaction. + */ + if (errno == ENOENT) + { + elog(LOG, "File \"%s\" is not found", file->path); + return true; + } + + elog(WARNING, "cannot open file \"%s\": %s", + file->path, strerror(errno)); + return false; + } + + if (file->size % BLCKSZ != 0) + { + fclose(in); + elog(WARNING, "File: %s, invalid file size %zu", file->path, file->size); + } + + /* + * Compute expected number of blocks in the file. + * NOTE This is a normal situation, if the file size has changed + * since the moment we computed it. + */ + nblocks = file->size/BLCKSZ; + + for (blknum = 0; blknum < nblocks; blknum++) + { + page_state = prepare_page(arguments, file, InvalidXLogRecPtr, + blknum, nblocks, in, &n_blocks_skipped, + BACKUP_MODE_FULL, curr_page, false); + + if (page_state == PageIsTruncated) + break; + + if (page_state == PageIsCorrupted) + { + /* Page is corrupted, no need to elog about it, + * prepare_page() already done that + */ + is_valid = false; + continue; + } + + /* At this point page is found and its checksum is ok, if any + * but could be 'insane' + * TODO: between prepare_page and validate_one_page we + * compute and compare checksum twice, it`s ineffective + */ + if (validate_one_page(curr_page, file, blknum, + InvalidXLogRecPtr, + 0) == PAGE_IS_FOUND_AND_NOT_VALID) + { + /* Page is corrupted */ + is_valid = false; + } + } + + fclose(in); + return is_valid; } /* Valiate pages of datafile in backup one by one */ diff --git a/src/help.c b/src/help.c index 455211a1..d26c4f8b 100644 --- a/src/help.c +++ b/src/help.c @@ -22,6 +22,7 @@ static void help_add_instance(void); static void help_del_instance(void); static void help_archive_push(void); static void help_archive_get(void); +static void help_checkdb(void); void help_command(char *command) @@ -52,6 +53,8 @@ help_command(char *command) help_archive_push(); else if (strcmp(command, "archive-get") == 0) help_archive_get(); + else if (strcmp(command, "checkdb") == 0) + help_checkdb(); else if (strcmp(command, "--help") == 0 || strcmp(command, "help") == 0 || strcmp(command, "-?") == 0 @@ -147,6 +150,11 @@ help_pg_probackup(void) printf(_(" [--recovery-target-name=target-name]\n")); printf(_(" [--skip-block-validation]\n")); + printf(_("\n %s checkdb [-B backup-path] [--instance=instance_name]\n"), PROGRAM_NAME); + printf(_(" [-D pgdata-path] [--progress] [-j num-threads]\n")); + printf(_(" [--amcheck] [--skip-block-validation]\n")); + printf(_(" [--heapallindexed]\n")); + printf(_("\n %s show -B backup-path\n"), PROGRAM_NAME); printf(_(" [--instance=instance_name [-i backup-id]]\n")); printf(_(" [--format=format]\n")); @@ -200,14 +208,14 @@ help_pg_probackup(void) static void help_init(void) { - printf(_("%s init -B backup-path\n\n"), PROGRAM_NAME); + printf(_("\n%s init -B backup-path\n\n"), PROGRAM_NAME); printf(_(" -B, --backup-path=backup-path location of the backup storage area\n")); } static void help_backup(void) { - printf(_("%s backup -B backup-path -b backup-mode --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s backup -B backup-path -b backup-mode --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [-C] [--stream [-S slot-name] [--temp-slot]\n")); printf(_(" [--backup-pg-log] [-j num-threads]\n")); printf(_(" [--archive-timeout=archive-timeout] [--progress]\n")); @@ -320,7 +328,7 @@ help_backup(void) static void help_restore(void) { - printf(_("%s restore -B backup-path --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s restore -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [-D pgdata-path] [-i backup-id] [-j num-threads]\n")); printf(_(" [--recovery-target-time=time|--recovery-target-xid=xid|--recovery-target-lsn=lsn [--recovery-target-inclusive=boolean]]\n")); printf(_(" [--recovery-target-timeline=timeline]\n")); @@ -404,7 +412,7 @@ help_restore(void) static void help_validate(void) { - printf(_("%s validate -B backup-path [--instance=instance_name]\n"), PROGRAM_NAME); + printf(_("\n%s validate -B backup-path [--instance=instance_name]\n"), PROGRAM_NAME); printf(_(" [-i backup-id] [--progress] [-j num-threads]\n")); printf(_(" [--recovery-target-time=time|--recovery-target-xid=xid|--recovery-target-lsn=lsn [--recovery-target-inclusive=boolean]]\n")); printf(_(" [--recovery-target-timeline=timeline]\n")); @@ -450,10 +458,62 @@ help_validate(void) printf(_(" available units: 'ms', 's', 'min', 'h', 'd' (default: min)\n")); } +static void +help_checkdb(void) +{ + printf(_("\n%s checkdb [-B backup-path] [--instance=instance_name]\n"), PROGRAM_NAME); + printf(_(" [-D pgdata-path] [-j num-threads] [--progress]\n")); + printf(_(" [--amcheck] [--skip-block-validation]\n")); + printf(_(" [--heapallindexed]\n\n")); + + printf(_(" -B, --backup-path=backup-path location of the backup storage area\n")); + printf(_(" --instance=instance_name name of the instance\n")); + printf(_(" -D, --pgdata=pgdata-path location of the database storage area\n")); + + printf(_(" --progress show progress\n")); + printf(_(" -j, --threads=NUM number of parallel threads\n")); + printf(_(" --skip-block-validation skip file-level block checking\n")); + printf(_(" can be used only with '--amcheck' option\n")); + printf(_(" --amcheck in addition to file-level block checking\n")); + printf(_(" check btree indexes via function 'bt_index_check()'\n")); + printf(_(" using 'amcheck' or 'amcheck_next' extensions\n")); + printf(_(" --heapallindexed also check that heap is indexed\n")); + printf(_(" can be used only with '--amcheck' option\n")); + + printf(_("\n Logging options:\n")); + printf(_(" --log-level-console=log-level-console\n")); + printf(_(" level for console logging (default: info)\n")); + printf(_(" available options: 'off', 'error', 'warning', 'info', 'log', 'verbose'\n")); + printf(_(" --log-level-file=log-level-file\n")); + printf(_(" level for file logging (default: off)\n")); + printf(_(" available options: 'off', 'error', 'warning', 'info', 'log', 'verbose'\n")); + printf(_(" --log-filename=log-filename\n")); + printf(_(" filename for file logging (default: 'pg_probackup.log')\n")); + printf(_(" support strftime format (example: pg_probackup-%%Y-%%m-%%d_%%H%%M%%S.log\n")); + printf(_(" --error-log-filename=error-log-filename\n")); + printf(_(" filename for error logging (default: none)\n")); + printf(_(" --log-directory=log-directory\n")); + printf(_(" directory for file logging (default: BACKUP_PATH/log)\n")); + printf(_(" --log-rotation-size=log-rotation-size\n")); + printf(_(" rotate logfile if its size exceeds this value; 0 disables; (default: 0)\n")); + printf(_(" available units: 'kB', 'MB', 'GB', 'TB' (default: kB)\n")); + printf(_(" --log-rotation-age=log-rotation-age\n")); + printf(_(" rotate logfile if its age exceeds this value; 0 disables; (default: 0)\n")); + printf(_(" available units: 'ms', 's', 'min', 'h', 'd' (default: min)\n")); + + printf(_("\n Connection options:\n")); + printf(_(" -U, --username=USERNAME user name to connect as (default: current local user)\n")); + printf(_(" -d, --dbname=DBNAME database to connect (default: username)\n")); + printf(_(" -h, --host=HOSTNAME database server host or socket directory(default: 'local socket')\n")); + printf(_(" -p, --port=PORT database server port (default: 5432)\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt\n")); +} + static void help_show(void) { - printf(_("%s show -B backup-path\n"), PROGRAM_NAME); + printf(_("\n%s show -B backup-path\n"), PROGRAM_NAME); printf(_(" [--instance=instance_name [-i backup-id]]\n")); printf(_(" [--format=format]\n\n")); @@ -466,7 +526,7 @@ help_show(void) static void help_delete(void) { - printf(_("%s delete -B backup-path --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s delete -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [-i backup-id | --expired | --merge-expired] [--wal]\n")); printf(_(" [-j num-threads] [--dry-run]\n\n")); @@ -506,7 +566,7 @@ help_delete(void) static void help_merge(void) { - printf(_("%s merge -B backup-path --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s merge -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" -i backup-id [-j num-threads] [--progress]\n")); printf(_(" [--log-level-console=log-level-console]\n")); printf(_(" [--log-level-file=log-level-file]\n")); @@ -548,7 +608,7 @@ help_merge(void) static void help_set_config(void) { - printf(_("%s set-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s set-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [--log-level-console=log-level-console]\n")); printf(_(" [--log-level-file=log-level-file]\n")); printf(_(" [--log-filename=log-filename]\n")); @@ -627,7 +687,7 @@ help_set_config(void) static void help_show_config(void) { - printf(_("%s show-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s show-config -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" [--format=format]\n\n")); printf(_(" -B, --backup-path=backup-path location of the backup storage area\n")); @@ -638,7 +698,7 @@ help_show_config(void) static void help_add_instance(void) { - printf(_("%s add-instance -B backup-path -D pgdata-path\n"), PROGRAM_NAME); + printf(_("\n%s add-instance -B backup-path -D pgdata-path\n"), PROGRAM_NAME); printf(_(" --instance=instance_name\n")); printf(_(" [-E external-directory-path]\n")); printf(_(" [--remote-proto] [--remote-host]\n")); @@ -665,7 +725,7 @@ help_add_instance(void) static void help_del_instance(void) { - printf(_("%s del-instance -B backup-path --instance=instance_name\n\n"), PROGRAM_NAME); + printf(_("\n%s del-instance -B backup-path --instance=instance_name\n\n"), PROGRAM_NAME); printf(_(" -B, --backup-path=backup-path location of the backup storage area\n")); printf(_(" --instance=instance_name name of the instance to delete\n")); @@ -674,7 +734,7 @@ help_del_instance(void) static void help_archive_push(void) { - printf(_("\n %s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s archive-push -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" --wal-file-path=wal-file-path\n")); printf(_(" --wal-file-name=wal-file-name\n")); printf(_(" [--compress]\n")); @@ -702,7 +762,7 @@ help_archive_push(void) static void help_archive_get(void) { - printf(_("\n %s archive-get -B backup-path --instance=instance_name\n"), PROGRAM_NAME); + printf(_("\n%s archive-get -B backup-path --instance=instance_name\n"), PROGRAM_NAME); printf(_(" --wal-file-path=wal-file-path\n")); printf(_(" --wal-file-name=wal-file-name\n")); printf(_(" [--remote-proto] [--remote-host]\n")); diff --git a/src/pg_probackup.c b/src/pg_probackup.c index 5b491602..dc40fe1c 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -39,7 +39,8 @@ typedef enum ProbackupSubcmd MERGE_CMD, SHOW_CMD, SET_CONFIG_CMD, - SHOW_CONFIG_CMD + SHOW_CONFIG_CMD, + CHECKDB_CMD } ProbackupSubcmd; @@ -94,6 +95,11 @@ bool no_validate = false; bool skip_block_validation = false; bool skip_external_dirs = false; +/* checkdb options */ +bool need_amcheck = false; +bool heapallindexed = false; +bool amcheck_parent = false; + /* delete options */ bool delete_wal = false; bool delete_expired = false; @@ -164,6 +170,10 @@ static ConfigOption cmd_options[] = { 'b', 143, "no-validate", &no_validate, SOURCE_CMD_STRICT }, { 'b', 154, "skip-block-validation", &skip_block_validation, SOURCE_CMD_STRICT }, { 'b', 156, "skip-external-dirs", &skip_external_dirs, SOURCE_CMD_STRICT }, + /* checkdb options */ + { 'b', 195, "amcheck", &need_amcheck, SOURCE_CMD_STRICT }, + { 'b', 196, "heapallindexed", &heapallindexed, SOURCE_CMD_STRICT }, + { 'b', 197, "parent", &amcheck_parent, SOURCE_CMD_STRICT }, /* delete options */ { 'b', 145, "wal", &delete_wal, SOURCE_CMD_STRICT }, { 'b', 146, "expired", &delete_expired, SOURCE_CMD_STRICT }, @@ -279,6 +289,8 @@ main(int argc, char *argv[]) backup_subcmd = SET_CONFIG_CMD; else if (strcmp(argv[1], "show-config") == 0) backup_subcmd = SHOW_CONFIG_CMD; + else if (strcmp(argv[1], "checkdb") == 0) + backup_subcmd = CHECKDB_CMD; else if (strcmp(argv[1], "agent") == 0 && argc > 2) { remote_agent = argv[2]; @@ -326,6 +338,7 @@ main(int argc, char *argv[]) * Make command string before getopt_long() will call. It permutes the * content of argv. */ + /* TODO why do we do that only for some commands? */ command_name = pstrdup(argv[1]); if (backup_subcmd == BACKUP_CMD || backup_subcmd == RESTORE_CMD || @@ -367,7 +380,7 @@ main(int argc, char *argv[]) if (help_opt) help_command(command_name); - /* backup_path is required for all pg_probackup commands except help */ + /* backup_path is required for all pg_probackup commands except help and checkdb */ if (backup_path == NULL) { /* @@ -375,28 +388,36 @@ main(int argc, char *argv[]) * from environment variable */ backup_path = getenv("BACKUP_PATH"); - if (backup_path == NULL) + if (backup_path == NULL && backup_subcmd != CHECKDB_CMD) elog(ERROR, "required parameter not specified: BACKUP_PATH (-B, --backup-path)"); } - canonicalize_path(backup_path); setMyLocation(); - /* Ensure that backup_path is a path to a directory */ - rc = fio_stat(backup_path, &stat_buf, true, FIO_BACKUP_HOST); - if (rc != -1 && !S_ISDIR(stat_buf.st_mode)) - elog(ERROR, "-B, --backup-path must be a path to directory"); + if (backup_path != NULL) + { + canonicalize_path(backup_path); + + /* Ensure that backup_path is an absolute path */ + if (!is_absolute_path(backup_path)) + elog(ERROR, "-B, --backup-path must be an absolute path"); + + /* Ensure that backup_path is a path to a directory */ + rc = stat(backup_path, &stat_buf); + if (rc != -1 && !S_ISDIR(stat_buf.st_mode)) + elog(ERROR, "-B, --backup-path must be a path to directory"); + } /* Ensure that backup_path is an absolute path */ - if (!is_absolute_path(backup_path)) + if (backup_path && !is_absolute_path(backup_path)) elog(ERROR, "-B, --backup-path must be an absolute path"); /* Option --instance is required for all commands except init and show */ - if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD && - backup_subcmd != VALIDATE_CMD) + if (instance_name == NULL) { - if (instance_name == NULL) + if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD && + backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD) elog(ERROR, "required parameter not specified: --instance"); } @@ -404,7 +425,7 @@ main(int argc, char *argv[]) * If --instance option was passed, construct paths for backup data and * xlog files of this backup instance. */ - if (instance_name) + if ((backup_path != NULL) && instance_name) { sprintf(backup_instance_path, "%s/%s/%s", backup_path, BACKUPS_DIR, instance_name); @@ -431,7 +452,6 @@ main(int argc, char *argv[]) if (instance_name) { char path[MAXPGPATH]; - /* Read environment variables */ config_get_opt_env(instance_options); @@ -440,11 +460,38 @@ main(int argc, char *argv[]) { join_path_components(path, backup_instance_path, BACKUP_CATALOG_CONF_FILE); - config_read_opt(path, instance_options, ERROR, true, false); + + if (backup_subcmd == CHECKDB_CMD) + config_read_opt(path, instance_options, ERROR, true, true); + else + config_read_opt(path, instance_options, ERROR, true, false); } setMyLocation(); } + /* Just read environment variables */ + if (backup_path == NULL && backup_subcmd == CHECKDB_CMD) + config_get_opt_env(instance_options); + + /* Sanity for checkdb, if backup_dir is provided but pgdata and instance are not */ + if (backup_subcmd == CHECKDB_CMD && + backup_path != NULL && + instance_name == NULL && + instance_config.pgdata == NULL) + elog(ERROR, "required parameter not specified: --instance"); + + /* Usually checkdb for file logging requires log_directory + * to be specified explicitly, but if backup_dir and instance name are provided, + * checkdb can use the tusual default values or values from config + */ + if (backup_subcmd == CHECKDB_CMD && + (instance_config.logger.log_level_file != LOG_OFF && + instance_config.logger.log_directory == NULL) && + (!instance_config.pgdata || !instance_name)) + elog(ERROR, "Cannot save checkdb logs to a file. " + "You must specify --log-directory option when running checkdb with " + "--log-level-file option enabled."); + /* Initialize logger */ init_logger(backup_path, &instance_config.logger); @@ -559,6 +606,11 @@ main(int argc, char *argv[]) PROGRAM_VERSION, base36enc(start_time), backup_mode, instance_name, stream_wal ? "true" : "false", instance_config.remote.host ? "true" : "false"); + /* sanity */ + if (current.backup_mode == BACKUP_MODE_INVALID) + elog(ERROR, "required parameter not specified: BACKUP_MODE " + "(-b, --backup-mode)"); + return do_backup(start_time, no_validate); } case RESTORE_CMD: @@ -596,6 +648,9 @@ main(int argc, char *argv[]) case SET_CONFIG_CMD: do_set_config(false); break; + case CHECKDB_CMD: + do_checkdb(need_amcheck); + break; case NO_CMD: /* Should not happen */ elog(ERROR, "Unknown subcommand"); diff --git a/src/pg_probackup.h b/src/pg_probackup.h index a4ee9951..1ad5ad0c 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -136,6 +136,15 @@ typedef struct pgFile * i.e. datafiles without _ptrack */ } pgFile; +typedef struct pg_indexEntry +{ + Oid indexrelid; + char *name; + char *dbname; + char *amcheck_nspname; /* schema where amcheck extention is located */ + volatile pg_atomic_flag lock; /* lock for synchronization of parallel threads */ +} pg_indexEntry; + /* Special values of datapagemap_t bitmapsize */ #define PageBitmapIsEmpty 0 /* Used to mark unchanged datafiles */ @@ -311,6 +320,8 @@ typedef struct PGconn *backup_conn; PGcancel *cancel_conn; + parray *index_list; + int thread_num; /* * Return value from the thread. @@ -332,6 +343,7 @@ typedef struct BackupPageHeader /* Special value for compressed_size field */ #define PageIsTruncated -2 #define SkipCurrentPage -3 +#define PageIsCorrupted -4 /* used by checkdb */ /* @@ -399,7 +411,6 @@ extern bool smooth_checkpoint; extern char* remote_agent; extern bool is_ptrack_support; -extern bool is_checksum_enabled; extern bool exclusive_backup; /* restore options */ @@ -423,6 +434,9 @@ extern char *instance_name; /* show options */ extern ShowFormat show_format; +/* checkdb options */ +extern bool heapallindexed; + /* current settings */ extern pgBackup current; @@ -432,6 +446,7 @@ extern const char *pgdata_exclude_dir[]; /* in backup.c */ extern int do_backup(time_t start_time, bool no_validate); +extern void do_checkdb(bool need_amcheck); extern BackupMode parse_backup_mode(const char *value); extern const char *deparse_backup_mode(BackupMode mode); extern void process_block_change(ForkNumber forknum, RelFileNode rnode, @@ -587,6 +602,8 @@ extern int pgFileCompareLinked(const void *f1, const void *f2); extern int pgFileCompareSize(const void *f1, const void *f2); /* in data.c */ +extern bool check_data_file(backup_files_arg* arguments, + pgFile *file); extern bool backup_data_file(backup_files_arg* arguments, const char *to_path, pgFile *file, XLogRecPtr prev_backup_start_lsn, diff --git a/src/utils/configuration.c b/src/utils/configuration.c index 7d181452..d4fb8950 100644 --- a/src/utils/configuration.c +++ b/src/utils/configuration.c @@ -189,10 +189,8 @@ assign_option(ConfigOption *opt, const char *optarg, OptionSource src) const char *message; if (opt == NULL) - { - fprintf(stderr, "Try \"%s --help\" for more information.\n", PROGRAM_NAME); - exit_or_abort(ERROR); - } + elog(ERROR, "Option is not found. Try \"%s --help\" for more information.\n", + PROGRAM_NAME); if (opt->source > src) { diff --git a/src/utils/logger.c b/src/utils/logger.c index 763b3f33..acd9c476 100644 --- a/src/utils/logger.c +++ b/src/utils/logger.c @@ -72,8 +72,12 @@ static pthread_mutex_t log_file_mutex = PTHREAD_MUTEX_INITIALIZER; void init_logger(const char *root_path, LoggerConfig *config) { - /* Set log path */ - if (config->log_directory == NULL) + /* + * If logging to file is enabled and log_directory wasn't set + * by user, init the path with default value: backup_directory/log/ + * */ + if (config->log_level_file != LOG_OFF + && config->log_directory == NULL) { config->log_directory = pgut_malloc(MAXPGPATH); join_path_components(config->log_directory, diff --git a/src/utils/pgut.c b/src/utils/pgut.c index 372de099..6b2147ae 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -22,7 +22,7 @@ #include "file.h" -const char *PROGRAM_NAME = NULL; +const char *PROGRAM_NAME = "pg_probackup"; static char *password = NULL; bool prompt_password = true; @@ -44,6 +44,8 @@ static void on_interrupt(void); static void on_cleanup(void); static pqsigfunc oldhandler = NULL; +void discard_response(PGconn *conn); + void pgut_init(void) { @@ -360,7 +362,7 @@ PGresult * pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn, const char *query, int nParams, const char **params, - bool text_result) + bool text_result, bool ok_error, bool async) { PGresult *res; @@ -388,15 +390,56 @@ pgut_execute_parallel(PGconn* conn, } //on_before_exec(conn, thread_cancel_conn); - if (nParams == 0) - res = PQexec(conn, query); + if (async) + { + /* clean any old data */ + discard_response(conn); + + if (nParams == 0) + PQsendQuery(conn, query); + else + PQsendQueryParams(conn, query, nParams, NULL, params, NULL, NULL, + /* + * Specify zero to obtain results in text format, + * or one to obtain results in binary format. + */ + (text_result) ? 0 : 1); + + /* wait for processing, TODO: timeout */ + for (;;) + { + if (interrupted) + { + pgut_cancel(conn); + pgut_disconnect(conn); + elog(ERROR, "interrupted"); + } + + if (!PQconsumeInput(conn)) + elog(ERROR, "query failed: %squery was: %s", + PQerrorMessage(conn), query); + + /* query is no done */ + if (!PQisBusy(conn)) + break; + + usleep(10000); + } + + res = PQgetResult(conn); + } else - res = PQexecParams(conn, query, nParams, NULL, params, NULL, NULL, - /* - * Specify zero to obtain results in text format, - * or one to obtain results in binary format. - */ - (text_result) ? 0 : 1); + { + if (nParams == 0) + res = PQexec(conn, query); + else + res = PQexecParams(conn, query, nParams, NULL, params, NULL, NULL, + /* + * Specify zero to obtain results in text format, + * or one to obtain results in binary format. + */ + (text_result) ? 0 : 1); + } //on_after_exec(thread_cancel_conn); switch (PQresultStatus(res)) @@ -406,6 +449,9 @@ pgut_execute_parallel(PGconn* conn, case PGRES_COPY_IN: break; default: + if (ok_error && PQresultStatus(res) == PGRES_FATAL_ERROR) + break; + elog(ERROR, "query failed: %squery was: %s", PQerrorMessage(conn), query); break; @@ -787,22 +833,6 @@ on_cleanup(void) call_atexit_callbacks(false); } -void -exit_or_abort(int exitcode) -{ - if (in_cleanup) - { - /* oops, error in cleanup*/ - call_atexit_callbacks(true); - abort(); - } - else - { - /* normal exit */ - exit(exitcode); - } -} - void * pgut_malloc(size_t size) { @@ -998,3 +1028,16 @@ select_win32(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, con } #endif /* WIN32 */ + +void +discard_response(PGconn *conn) +{ + PGresult *res; + + do + { + res = PQgetResult(conn); + if (res) + PQclear(res); + } while (res); +} diff --git a/src/utils/pgut.h b/src/utils/pgut.h index 41a261d2..b527c760 100644 --- a/src/utils/pgut.h +++ b/src/utils/pgut.h @@ -41,7 +41,6 @@ extern void pgut_atexit_push(pgut_atexit_callback callback, void *userdata); extern void pgut_atexit_pop(pgut_atexit_callback callback, void *userdata); extern void pgut_init(void); -extern void exit_or_abort(int exitcode); /* * Database connections @@ -59,7 +58,7 @@ extern PGresult *pgut_execute_extended(PGconn* conn, const char *query, int nPar const char **params, bool text_result, bool ok_error); extern PGresult *pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn, const char *query, int nParams, - const char **params, bool text_result); + const char **params, bool text_result, bool ok_error, bool async); extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel); extern void pgut_cancel(PGconn* conn); extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout); diff --git a/tests/__init__.py b/tests/__init__.py index bc787ca4..5e006f41 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -5,7 +5,7 @@ from . import init_test, merge, option_test, show_test, compatibility, \ retention, pgpro560, pgpro589, pgpro2068, false_positive, replica, \ compression, page, ptrack, archive, exclude, cfs_backup, cfs_restore, \ cfs_validate_backup, auth_test, time_stamp, snapfs, logging, \ - locking, remote, external, config + locking, remote, external, config, checkdb def load_tests(loader, tests, pattern): @@ -14,6 +14,7 @@ def load_tests(loader, tests, pattern): suite.addTests(loader.loadTestsFromModule(archive)) suite.addTests(loader.loadTestsFromModule(backup_test)) suite.addTests(loader.loadTestsFromModule(compatibility)) + suite.addTests(loader.loadTestsFromModule(checkdb)) suite.addTests(loader.loadTestsFromModule(config)) # suite.addTests(loader.loadTestsFromModule(cfs_backup)) # suite.addTests(loader.loadTestsFromModule(cfs_restore)) diff --git a/tests/checkdb.py b/tests/checkdb.py new file mode 100644 index 00000000..5a133cf9 --- /dev/null +++ b/tests/checkdb.py @@ -0,0 +1,422 @@ +import os +import unittest +from .helpers.ptrack_helpers import ProbackupTest, ProbackupException +from datetime import datetime, timedelta +import subprocess +from testgres import QueryException +import shutil +import sys +import time + + +module_name = 'checkdb' + + +class CheckdbTest(ProbackupTest, unittest.TestCase): + + # @unittest.skip("skip") + def test_checkdb_amcheck_only_sanity(self): + """""" + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir="{0}/{1}/node".format(module_name, fname), + set_replication=True, + initdb_params=['--data-checksums']) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + node.slow_start() + + node.safe_psql( + "postgres", + "create table t_heap as select i" + " as id from generate_series(0,100) i") + + node.safe_psql( + "postgres", + "create index on t_heap(id)") + + node.safe_psql( + "postgres", + "create extension amcheck") + + log_file_path = os.path.join( + backup_dir, 'log', 'pg_probackup.log') + + # simple sanity + try: + self.checkdb_node( + options=['--skip-block-validation']) + # we should die here because exception is what we expect to happen + self.assertEqual( + 1, 0, + "Expecting Error because --amcheck option is missing\n" + " Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + "ERROR: Option '--skip-block-validation' must be " + "used with '--amcheck' option", + e.message, + "\n Unexpected Error Message: {0}\n CMD: {1}".format( + repr(e.message), self.cmd)) + + # simple sanity + output = self.checkdb_node( + options=[ + '--amcheck', + '--skip-block-validation', + '-d', 'postgres', '-p', str(node.port)]) + + self.assertIn( + 'INFO: Checkdb --amcheck executed successfully', + output) + self.assertIn( + 'INFO: Indexes are valid', + output) + + # logging to file sanity + try: + self.checkdb_node( + options=[ + '--amcheck', + '--skip-block-validation', + '--log-level-file=verbose', + '-d', 'postgres','-p', str(node.port)]) + # we should die here because exception is what we expect to happen + self.assertEqual( + 1, 0, + "Expecting Error because log_directory missing\n" + " Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + "ERROR: Cannot save checkdb logs to a file. " + "You must specify --log-directory option when " + "running checkdb with --log-level-file option enabled", + e.message, + "\n Unexpected Error Message: {0}\n CMD: {1}".format( + repr(e.message), self.cmd)) + + # If backup_dir provided, then instance name must be + # provided too + try: + self.checkdb_node( + backup_dir, + options=[ + '--amcheck', + '--skip-block-validation', + '--log-level-file=verbose', + '-d', 'postgres', '-p', str(node.port)]) + # we should die here because exception is what we expect to happen + self.assertEqual( + 1, 0, + "Expecting Error because log_directory missing\n" + " Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + "ERROR: required parameter not specified: --instance", + e.message, + "\n Unexpected Error Message: {0}\n CMD: {1}".format( + repr(e.message), self.cmd)) + + # checkdb can use default or set in config values, + # if backup_dir and instance name are provided + self.checkdb_node( + backup_dir, + 'node', + options=[ + '--amcheck', + '--skip-block-validation', + '--log-level-file=verbose', + '-d', 'postgres', '-p', str(node.port)]) + + # check that file present and full of messages + os.path.isfile(log_file_path) + with open(log_file_path) as f: + log_file_content = f.read() + self.assertIn( + 'INFO: Checkdb --amcheck executed successfully', + log_file_content) + self.assertIn( + 'VERBOSE: (query)', + log_file_content) + os.unlink(log_file_path) + + # log-level-file and log-directory are provided + self.checkdb_node( + backup_dir, + 'node', + options=[ + '--amcheck', + '--skip-block-validation', + '--log-level-file=verbose', + '--log-directory={0}'.format( + os.path.join(backup_dir, 'log')), + '-d', 'postgres', '-p', str(node.port)]) + + # check that file present and full of messages + os.path.isfile(log_file_path) + with open(log_file_path) as f: + log_file_content = f.read() + self.assertIn( + 'INFO: Checkdb --amcheck executed successfully', + log_file_content) + self.assertIn( + 'VERBOSE: (query)', + log_file_content) + os.unlink(log_file_path) + + gdb = self.checkdb_node( + gdb=True, + options=[ + '--amcheck', + '--skip-block-validation', + '--log-level-file=verbose', + '--log-directory={0}'.format( + os.path.join(backup_dir, 'log')), + '-d', 'postgres', '-p', str(node.port)]) + + gdb.set_breakpoint('amcheck_one_index') + gdb.run_until_break() + + node.safe_psql( + "postgres", + "drop table t_heap") + + gdb.remove_all_breakpoints() + + gdb.continue_execution_until_exit() + + # check that message about missing index is present + with open(log_file_path) as f: + log_file_content = f.read() + self.assertIn( + 'ERROR: Checkdb --amcheck failed', + log_file_content) + self.assertIn( + "WARNING: Thread [1]. Amcheck failed for index: 'public.t_heap_id_idx':", + log_file_content) + self.assertIn( + 'ERROR: could not open relation with OID', + log_file_content) + + # Clean after yourself + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + def test_checkdb_amcheck_only_sanity_1(self): + """""" + fname = self.id().split('.')[3] + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + node = self.make_simple_node( + base_dir="{0}/{1}/node".format(module_name, fname), + set_replication=True) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + node.slow_start() + + # create two databases + node.safe_psql("postgres", "create database db1") + node.safe_psql("db1", "create extension amcheck") + + node.safe_psql("postgres", "create database db2") + node.safe_psql("db2", "create extension amcheck") + + # init pgbench in two databases and corrupt both indexes + node.pgbench_init(scale=5, dbname='db1') + node.pgbench_init(scale=5, dbname='db2') + + node.safe_psql( + "db2", + "alter index pgbench_accounts_pkey rename to some_index") + + index_path_1 = os.path.join( + node.data_dir, + node.safe_psql( + "db1", + "select pg_relation_filepath('pgbench_accounts_pkey')").rstrip()) + + index_path_2 = os.path.join( + node.data_dir, + node.safe_psql( + "db2", + "select pg_relation_filepath('some_index')").rstrip()) + + try: + self.checkdb_node( + options=[ + '--amcheck', + '--skip-block-validation', + '-d', 'postgres', '-p', str(node.port)]) + # we should die here because exception is what we expect to happen + self.assertEqual( + 1, 0, + "Expecting Error because some db was not amchecked" + " Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + "ERROR: Some databases were not amchecked", + e.message, + "\n Unexpected Error Message: {0}\n CMD: {1}".format( + repr(e.message), self.cmd)) + + node.stop() + + # Let`s do index corruption + with open(index_path_1, "rb+", 0) as f: + f.seek(42000) + f.write(b"blablahblahs") + f.flush() + f.close + + with open(index_path_2, "rb+", 0) as f: + f.seek(42000) + f.write(b"blablahblahs") + f.flush() + f.close + + node.slow_start() + + log_file_path = os.path.join( + backup_dir, 'log', 'pg_probackup.log') + + try: + self.checkdb_node( + options=[ + '--amcheck', + '--skip-block-validation', + '--log-level-file=verbose', + '--log-directory={0}'.format( + os.path.join(backup_dir, 'log')), + '-d', 'postgres', '-p', str(node.port)]) + # we should die here because exception is what we expect to happen + self.assertEqual( + 1, 0, + "Expecting Error because some db was not amchecked" + " Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + "ERROR: Checkdb --amcheck failed", + e.message, + "\n Unexpected Error Message: {0}\n CMD: {1}".format( + repr(e.message), self.cmd)) + + # corruption of both indexes in db1 and db2 must be detected + # also the that amcheck is not installed in 'postgres' + # musted be logged + with open(log_file_path) as f: + log_file_content = f.read() + self.assertIn( + "WARNING: Thread [1]. Amcheck failed for index: 'public.pgbench_accounts_pkey':", + log_file_content) + + self.assertIn( + "WARNING: Thread [1]. Amcheck failed for index: 'public.some_index':", + log_file_content) + + self.assertIn( + "ERROR: Checkdb --amcheck failed", + log_file_content) + + # Clean after yourself + self.del_test_dir(module_name, fname) + + # @unittest.skip("skip") + def test_checkdb_block_validation_sanity(self): + """make node, corrupt some pages, check that checkdb failed""" + fname = self.id().split('.')[3] + node = self.make_simple_node( + base_dir=os.path.join(module_name, fname, 'node'), + set_replication=True, + initdb_params=['--data-checksums']) + + backup_dir = os.path.join(self.tmp_path, module_name, fname, 'backup') + + self.init_pb(backup_dir) + self.add_instance(backup_dir, 'node', node) + node.slow_start() + + node.safe_psql( + "postgres", + "create table t_heap as select 1 as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,1000) i") + node.safe_psql( + "postgres", + "CHECKPOINT;") + + heap_path = node.safe_psql( + "postgres", + "select pg_relation_filepath('t_heap')").rstrip() + + # sanity + try: + self.checkdb_node() + # we should die here because exception is what we expect to happen + self.assertEqual( + 1, 0, + "Expecting Error because pgdata must be specified\n" + " Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + "ERROR: required parameter not specified: PGDATA (-D, --pgdata)", + e.message, + "\n Unexpected Error Message: {0}\n CMD: {1}".format( + repr(e.message), self.cmd)) + + self.checkdb_node( + data_dir=node.data_dir, + options=['-d', 'postgres', '-p', str(node.port)]) + + self.checkdb_node( + backup_dir, 'node', + options=['-d', 'postgres', '-p', str(node.port)]) + + heap_full_path = os.path.join(node.data_dir, heap_path) + + with open(heap_full_path, "rb+", 0) as f: + f.seek(9000) + f.write(b"bla") + f.flush() + f.close + + with open(heap_full_path, "rb+", 0) as f: + f.seek(42000) + f.write(b"bla") + f.flush() + f.close + + try: + self.checkdb_node( + backup_dir, 'node', + options=['-d', 'postgres', '-p', str(node.port)]) + # we should die here because exception is what we expect to happen + self.assertEqual( + 1, 0, + "Expecting Error because of data corruption\n" + " Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + "ERROR: Checkdb failed", + e.message, + "\n Unexpected Error Message: {0}\n CMD: {1}".format( + repr(e.message), self.cmd)) + + self.assertIn( + "WARNING: CORRUPTION in file {0}, block 1".format(heap_full_path), + e.message) + + self.assertIn( + "WARNING: CORRUPTION in file {0}, block 5".format(heap_full_path), + e.message) + + # Clean after yourself + self.del_test_dir(module_name, fname) diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index 55eafc2c..f2ae21e2 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -732,6 +732,24 @@ class ProbackupTest(object): return self.run_pb(cmd_list + options, asynchronous, gdb, old_binary) + def checkdb_node( + self, backup_dir=False, instance=False, data_dir=False, + options=[], async=False, gdb=False, old_binary=False + ): + + cmd_list = ["checkdb"] + + if backup_dir: + cmd_list += ["-B", backup_dir] + + if instance: + cmd_list += ["--instance={0}".format(instance)] + + if data_dir: + cmd_list += ["-D", data_dir] + + return self.run_pb(cmd_list + options, async, gdb, old_binary) + def merge_backup( self, backup_dir, instance, backup_id, asynchronous=False, gdb=False, old_binary=False, options=[]): @@ -1441,6 +1459,18 @@ class GDBobj(ProbackupTest): 'Failed to set breakpoint.\n Output:\n {0}'.format(result) ) + def remove_all_breakpoints(self): + + result = self._execute('delete') + for line in result: + + if line.startswith('^done'): + return + + raise GdbException( + 'Failed to set breakpoint.\n Output:\n {0}'.format(result) + ) + def run_until_break(self): result = self._execute('run', False) for line in result: