/*------------------------------------------------------------------------- * * backup.c: backup DB cluster, archived WAL * * Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION * *------------------------------------------------------------------------- */ #include "pg_arman.h" #include #include #include #include #include #include #include #include #include #include "libpq/pqsignal.h" #include "pgut/pgut-port.h" #include "storage/bufpage.h" #include "datapagemap.h" #include "streamutil.h" #include "receivelog.h" /* wait 10 sec until WAL archive complete */ #define TIMEOUT_ARCHIVE 10 /* Server version */ static int server_version = 0; static bool in_backup = false; /* TODO: more robust logic */ static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr; const char *progname = "pg_arman"; /* list of files contained in backup */ parray *backup_files_list; static volatile uint32 total_copy_files_increment; static uint32 total_files_num; static PGconn *start_stop_connect = NULL; static pthread_mutex_t check_stream_mut = PTHREAD_MUTEX_INITIALIZER; typedef struct { const char *from_root; const char *to_root; parray *files; parray *prev_files; const XLogRecPtr *lsn; } backup_files_args; /* * Backup routines */ static void backup_cleanup(bool fatal, void *userdata); static void backup_files(void *arg); static parray *do_backup_database(parray *backup_list, pgBackupOption bkupopt); static void confirm_block_size(const char *name, int blcksz); static void pg_start_backup(const char *label, bool smooth, pgBackup *backup); static void pg_stop_backup(pgBackup *backup); static bool pg_is_standby(void); static void get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup); static void get_xid(PGresult *res, uint32 *xid); static void pg_ptrack_clear(void); static bool pg_ptrack_support(void); static bool pg_is_in_recovery(void); static char *pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *result_size); static void add_files(parray *files, const char *root, bool add_root, bool is_pgdata); static void create_file_list(parray *files, const char *root, const char *subdir, const char *prefix, bool is_append); static void wait_for_archive(PGconn *conn, pgBackup *backup, const char *sql, bool stop_backup); static void make_pagemap_from_ptrack(parray *files); static void StreamLog(void *arg); #define disconnect_and_exit(code) \ { \ if (conn != NULL) PQfinish(conn); \ exit(code); \ } /* * Take a backup of database and return the list of files backed up. */ static parray * do_backup_database(parray *backup_list, pgBackupOption bkupopt) { int i; parray *prev_files = NULL; /* file list of previous database backup */ FILE *fp; char path[MAXPGPATH]; char dst_backup_path[MAXPGPATH]; char label[1024]; XLogRecPtr *lsn = NULL; char prev_file_txt[MAXPGPATH]; /* path of the previous backup * list file */ bool has_backup_label = true; /* flag if backup_label is there */ pthread_t backup_threads[num_threads]; pthread_t stream_thread; backup_files_args *backup_threads_args[num_threads]; bool is_ptrack_support; /* repack the options */ bool smooth_checkpoint = bkupopt.smooth_checkpoint; pgBackup *prev_backup = NULL; /* Block backup operations on a standby */ from_replica = pg_is_in_recovery(); if (pg_is_standby() && !from_replica) elog(ERROR, "Backup cannot run on a standby."); elog(LOG, "database backup start"); /* Initialize size summary */ current.data_bytes = 0; /* do some checks on the node */ sanityChecks(); /* * Obtain current timeline by scanning control file, theh LSN * obtained at output of pg_start_backup or pg_stop_backup does * not contain this information. */ current.tli = get_current_timeline(false); is_ptrack_support = pg_ptrack_support(); if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK && !is_ptrack_support) elog(ERROR, "Current Postgres instance is not support ptrack"); /* * In differential backup mode, check if there is an already-validated * full backup on current timeline. */ if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || current.backup_mode == BACKUP_MODE_DIFF_PTRACK) { pgBackup *prev_backup; prev_backup = catalog_get_last_data_backup(backup_list, current.tli); if (prev_backup == NULL) elog(ERROR, "Valid full backup not found for " "differential backup. Either create a full backup " "or validate existing one."); } /* clear ptrack files for FULL and DIFF backup */ if (current.backup_mode != BACKUP_MODE_DIFF_PTRACK && is_ptrack_support) pg_ptrack_clear(); /* notify start of backup to PostgreSQL server */ time2iso(label, lengthof(label), current.start_time); strncat(label, " with pg_arman", lengthof(label)); pg_start_backup(label, smooth_checkpoint, ¤t); /* start stream replication */ if (stream_wal) { pgBackupGetPath(¤t, path, lengthof(path), DATABASE_DIR); join_path_components(dst_backup_path, path, "pg_xlog"); dir_create_dir(dst_backup_path, DIR_PERMISSION); pthread_mutex_lock(&check_stream_mut); pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path); pthread_mutex_lock(&check_stream_mut); if (conn == NULL) elog(ERROR, "I can't continue work because stream connect has failed."); pthread_mutex_unlock(&check_stream_mut); } if(!from_replica) { /* If backup_label does not exist in $PGDATA, stop taking backup */ snprintf(path, lengthof(path), "%s/backup_label", pgdata); make_native_path(path); if (!fileExists(path)) has_backup_label = false; /* Leave if no backup file */ if (!has_backup_label) { elog(LOG, "backup_label does not exist, stopping backup"); pg_stop_backup(NULL); elog(ERROR, "backup_label does not exist in PGDATA."); } } /* * List directories and symbolic links with the physical path to make * mkdirs.sh, then sort them in order of path. Omit $PGDATA. */ backup_files_list = parray_new(); dir_list_file(backup_files_list, pgdata, NULL, false, false); if (!check) { pgBackupGetPath(¤t, path, lengthof(path), MKDIRS_SH_FILE); fp = fopen(path, "wt"); if (fp == NULL) elog(ERROR, "can't open make directory script \"%s\": %s", path, strerror(errno)); dir_print_mkdirs_sh(fp, backup_files_list, pgdata); fclose(fp); if (chmod(path, DIR_PERMISSION) == -1) elog(ERROR, "can't change mode of \"%s\": %s", path, strerror(errno)); } /* clear directory list */ parray_walk(backup_files_list, pgFileFree); parray_free(backup_files_list); backup_files_list = NULL; /* * To take differential backup, the file list of the last completed database * backup is needed. */ if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || current.backup_mode == BACKUP_MODE_DIFF_PTRACK) { /* find last completed database backup */ prev_backup = catalog_get_last_data_backup(backup_list, current.tli); pgBackupGetPath(prev_backup, prev_file_txt, lengthof(prev_file_txt), DATABASE_FILE_LIST); prev_files = dir_read_file_list(pgdata, prev_file_txt); /* * Do backup only pages having larger LSN than previous backup. */ lsn = &prev_backup->start_lsn; elog(LOG, "backup only the page that there was of the update from LSN(%X/%08X)", (uint32) (*lsn >> 32), (uint32) *lsn); } /* initialize backup list */ backup_files_list = parray_new(); /* list files with the logical path. omit $PGDATA */ add_files(backup_files_list, pgdata, false, true); /* backup files */ pgBackupGetPath(¤t, path, lengthof(path), DATABASE_DIR); /* * Build page mapping in differential mode. When using this mode, the * list of blocks to be taken is known by scanning the WAL segments * present in archives up to the point where start backup has begun. * However, normally this segment is not yet available in the archives, * leading to failures when building the page map. Hence before doing * anything and in order to ensure that all the segments needed for the * scan are here, for a switch of the last segment with pg_switch_xlog. */ if (current.backup_mode == BACKUP_MODE_DIFF_PAGE) { /* Enforce archiving of last segment and wait for it to be here */ wait_for_archive(connection, ¤t, "SELECT * FROM pg_switch_xlog()", false); /* Now build the page map */ parray_qsort(backup_files_list, pgFileComparePathDesc); elog(LOG, "extractPageMap"); elog(LOG, "current_tli:%X", current.tli); elog(LOG, "prev_backup->start_lsn: %X/%X", (uint32) (prev_backup->start_lsn >> 32), (uint32) (prev_backup->start_lsn)); elog(LOG, "current.start_lsn: %X/%X", (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn)); extractPageMap(arclog_path, prev_backup->start_lsn, current.tli, current.start_lsn); } if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK) { XLogRecPtr ptrack_lsn = get_last_ptrack_lsn(); if (ptrack_lsn > prev_backup->stop_lsn) elog(ERROR, "Wrong ptrack lsn:%lx prev:%lx current:%lx", ptrack_lsn, prev_backup->start_lsn, current.start_lsn); parray_qsort(backup_files_list, pgFileComparePathDesc); make_pagemap_from_ptrack(backup_files_list); } /* sort pathname ascending */ parray_qsort(backup_files_list, pgFileComparePath); /* make dirs before backup */ for (i = 0; i < parray_num(backup_files_list); i++) { int ret; struct stat buf; pgFile *file = (pgFile *) parray_get(backup_files_list, i); ret = stat(file->path, &buf); if (ret == -1) { if (errno == ENOENT) { /* record as skipped file in file_xxx.txt */ file->write_size = BYTES_INVALID; elog(LOG, "skip"); continue; } else { elog(ERROR, "can't stat backup mode. \"%s\": %s", file->path, strerror(errno)); } } /* if the entry was a directory, create it in the backup */ if (S_ISDIR(buf.st_mode)) { char dirpath[MAXPGPATH]; if (verbose) elog(LOG, "Make dir %s", file->path + strlen(pgdata) + 1); join_path_components(dirpath, path, JoinPathEnd(file->path, pgdata)); if (!check) dir_create_dir(dirpath, DIR_PERMISSION); } else { total_files_num++; } __sync_lock_release(&file->lock); } if (num_threads < 1) num_threads = 1; /* sort by size for load balancing */ parray_qsort(backup_files_list, pgFileCompareSize); /* init thread args with own file lists */ for (i = 0; i < num_threads; i++) { backup_files_args *arg = pg_malloc(sizeof(backup_files_args)); arg->from_root = pgdata; arg->to_root = path; arg->files = backup_files_list; arg->prev_files = prev_files; arg->lsn = lsn; backup_threads_args[i] = arg; } total_copy_files_increment = 0; /* Run threads */ for (i = 0; i < num_threads; i++) { if (verbose) elog(WARNING, "Start thread num:%li", parray_num(backup_threads_args[i]->files)); pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, backup_threads_args[i]); } /* Wait theads */ for (i = 0; i < num_threads; i++) { pthread_join(backup_threads[i], NULL); pg_free(backup_threads_args[i]); } if (progress) fprintf(stderr, "\n"); /* Notify end of backup */ pg_stop_backup(¤t); if (stream_wal) { parray *list_file; char pg_xlog_path[MAXPGPATH]; /* We expect the completion of stream */ pthread_join(stream_thread, NULL); /* Scan backup pg_xlog dir */ list_file = parray_new(); join_path_components(pg_xlog_path, path, "pg_xlog"); dir_list_file(list_file, pg_xlog_path, NULL, true, false); /* Remove file path root prefix and calc meta */ for (i = 0; i < parray_num(list_file); i++) { pgFile *file = (pgFile *)parray_get(list_file, i); calc_file(file); if (strstr(file->path, path) == file->path) { char *ptr = file->path; file->path = pstrdup(JoinPathEnd(ptr, path)); free(ptr); } } parray_concat(backup_files_list, list_file); } /* Create file list */ create_file_list(backup_files_list, pgdata, DATABASE_FILE_LIST, NULL, false); /* Print summary of size of backup mode files */ for (i = 0; i < parray_num(backup_files_list); i++) { pgFile *file = (pgFile *) parray_get(backup_files_list, i); if (!S_ISREG(file->mode)) continue; /* * Count only the amount of data. For a full backup, the total * amount of data written counts while for an differential * backup only the data read counts. */ if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || current.backup_mode == BACKUP_MODE_DIFF_PTRACK) current.data_bytes += file->write_size; else if (current.backup_mode == BACKUP_MODE_FULL) current.data_bytes += file->size; } elog(LOG, "database backup completed(Backup: " INT64_FORMAT ")", current.data_bytes); elog(LOG, "========================================"); return backup_files_list; } int do_backup(pgBackupOption bkupopt) { parray *backup_list; parray *files_database; int ret; /* repack the necessary options */ int keep_data_generations = bkupopt.keep_data_generations; int keep_data_days = bkupopt.keep_data_days; /* PGDATA and BACKUP_MODE are always required */ if (pgdata == NULL) elog(ERROR, "Required parameter not specified: PGDATA " "(-D, --pgdata)"); /* A backup mode is needed */ if (current.backup_mode == BACKUP_MODE_INVALID) elog(ERROR, "Required parameter not specified: BACKUP_MODE " "(-b, --backup-mode)"); /* Confirm data block size and xlog block size are compatible */ check_server_version(); /* setup cleanup callback function */ in_backup = true; /* show configuration actually used */ elog(LOG, "========================================"); elog(LOG, "backup start"); elog(LOG, "----------------------------------------"); if (verbose) pgBackupWriteConfigSection(stderr, ¤t); elog(LOG, "----------------------------------------"); /* get exclusive lock of backup catalog */ ret = catalog_lock(); if (ret == -1) elog(ERROR, "cannot lock backup catalog"); else if (ret == 1) elog(ERROR, "another pg_arman is running, skipping this backup"); /* initialize backup result */ current.status = BACKUP_STATUS_RUNNING; current.tli = 0; /* get from result of pg_start_backup() */ current.start_lsn = 0; current.stop_lsn = 0; current.start_time = time(NULL); current.end_time = (time_t) 0; current.data_bytes = BYTES_INVALID; current.block_size = BLCKSZ; current.wal_block_size = XLOG_BLCKSZ; current.recovery_xid = 0; current.recovery_time = (time_t) 0; current.checksum_version = get_data_checksum_version(true); current.stream = stream_wal; /* create backup directory and backup.ini */ if (!check) { if (pgBackupCreateDir(¤t)) elog(ERROR, "cannot create backup directory"); pgBackupWriteIni(¤t); } elog(LOG, "backup destination is initialized"); /* get list of backups already taken */ backup_list = catalog_get_backup_list(0); if (!backup_list) elog(ERROR, "cannot process any more"); /* set the error processing function for the backup process */ pgut_atexit_push(backup_cleanup, NULL); /* backup data */ files_database = do_backup_database(backup_list, bkupopt); pgut_atexit_pop(backup_cleanup, NULL); /* update backup status to DONE */ current.end_time = time(NULL); current.status = BACKUP_STATUS_DONE; if (!check) pgBackupWriteIni(¤t); /* Calculate the total data read */ if (verbose) { int64 total_read = 0; /* Database data */ if (current.backup_mode == BACKUP_MODE_FULL || current.backup_mode == BACKUP_MODE_DIFF_PAGE || current.backup_mode == BACKUP_MODE_DIFF_PTRACK) total_read += current.data_bytes; if (total_read == 0) elog(LOG, "nothing to backup"); else elog(LOG, "all backup completed(read: " INT64_FORMAT " write: " INT64_FORMAT ")", total_read, current.data_bytes); elog(LOG, "========================================"); } /* Delete old backup files after all backup operation. */ pgBackupDelete(keep_data_generations, keep_data_days); /* Cleanup backup mode file list */ if (files_database) parray_walk(files_database, pgFileFree); parray_free(files_database); /* release catalog lock */ catalog_unlock(); return 0; } /* * get server version and confirm block sizes. */ void check_server_version(void) { bool my_conn; /* Leave if server has already been checked */ if (server_version > 0) return; my_conn = (connection == NULL); if (my_conn) reconnect(); /* confirm server version */ server_version = PQserverVersion(connection); if (server_version < 90500) elog(ERROR, "server version is %d.%d.%d, must be %s or higher.", server_version / 10000, (server_version / 100) % 100, server_version % 100, "9.5"); if (from_replica && server_version < 90600) elog(ERROR, "server version is %d.%d.%d, must be %s or higher for backup from replica.", server_version / 10000, (server_version / 100) % 100, server_version % 100, "9.6"); /* confirm block_size (BLCKSZ) and wal_block_size (XLOG_BLCKSZ) */ confirm_block_size("block_size", BLCKSZ); confirm_block_size("wal_block_size", XLOG_BLCKSZ); if (my_conn) disconnect(); } static void confirm_block_size(const char *name, int blcksz) { PGresult *res; char *endp; int block_size; res = execute("SELECT current_setting($1)", 1, &name); if (PQntuples(res) != 1 || PQnfields(res) != 1) elog(ERROR, "cannot get %s: %s", name, PQerrorMessage(connection)); block_size = strtol(PQgetvalue(res, 0, 0), &endp, 10); PQclear(res); if ((endp && *endp) || block_size != blcksz) elog(ERROR, "%s(%d) is not compatible(%d expected)", name, block_size, blcksz); } /* * Notify start of backup to PostgreSQL server. */ static void pg_start_backup(const char *label, bool smooth, pgBackup *backup) { PGresult *res; const char *params[2]; params[0] = label; if (start_stop_connect == NULL) start_stop_connect = pgut_connect(ERROR); /* 2nd argument is 'fast'*/ params[1] = smooth ? "false" : "true"; if (from_replica) res = pgut_execute(start_stop_connect, "SELECT pg_start_backup($1, $2, false)", 2, params, ERROR); else res = pgut_execute(start_stop_connect, "SELECT pg_start_backup($1, $2)", 2, params, ERROR); if (backup != NULL) get_lsn(start_stop_connect, res, &backup->start_lsn, false); PQclear(res); } static bool pg_ptrack_support(void) { PGresult *res_db; reconnect(); res_db = execute("SELECT proname FROM pg_proc WHERE proname='pg_ptrack_clear'", 0, NULL); if (PQntuples(res_db) == 0) { PQclear(res_db); disconnect(); return false; } PQclear(res_db); disconnect(); return true; } static bool pg_is_in_recovery(void) { PGresult *res_db; reconnect(); res_db = execute("SELECT pg_is_in_recovery()", 0, NULL); if (PQgetvalue(res_db, 0, 0)[0] == 't') { PQclear(res_db); disconnect(); return true; } PQclear(res_db); disconnect(); return false; } static void pg_ptrack_clear(void) { PGresult *res_db, *res; const char *old_dbname = pgut_dbname; int i; reconnect(); res_db = execute("SELECT datname FROM pg_database", 0, NULL); disconnect(); for(i=0; i < PQntuples(res_db); i++) { pgut_dbname = PQgetvalue(res_db, i, 0); if (!strcmp(pgut_dbname, "template0")) continue; reconnect(); res = execute("SELECT pg_ptrack_clear()", 0, NULL); PQclear(res); } PQclear(res_db); disconnect(); pgut_dbname = old_dbname; } static char * pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *result_size) { PGresult *res_db, *res; const char *old_dbname = pgut_dbname; char *params[2]; char *result; reconnect(); params[0] = palloc(64); params[1] = palloc(64); sprintf(params[0], "%i", db_oid); sprintf(params[1], "%i", rel_oid); res_db = execute("SELECT datname FROM pg_database WHERE oid=$1", 1, (const char **)params); disconnect(); pgut_dbname = pstrdup(PQgetvalue(res_db, 0, 0)); PQclear(res_db); reconnect(); sprintf(params[0], "%i", tablespace_oid); res = execute("SELECT pg_ptrack_get_and_clear($1, $2)", 2, (const char **)params); result = (char *)PQunescapeBytea((unsigned char *)PQgetvalue(res, 0, 0), result_size); PQclear(res); pfree(params[0]); pfree(params[1]); pfree((char *)pgut_dbname); pgut_dbname = old_dbname; return result; } static void wait_for_archive(PGconn *conn, pgBackup *backup, const char *sql, bool stop_backup) { PGresult *res; char ready_path[MAXPGPATH]; char file_name[MAXFNAMELEN]; int try_count; XLogRecPtr lsn; TimeLineID tli; XLogSegNo targetSegNo; if (conn == NULL) conn = pgut_connect(ERROR); /* Remove annoying NOTICE messages generated by backend */ res = pgut_execute(conn, "SET client_min_messages = warning;", 0, NULL, ERROR); PQclear(res); /* And execute the query wanted */ res = pgut_execute(conn, sql, 0, NULL, ERROR); /* Get LSN from execution result */ get_lsn(conn, res, &lsn, stop_backup); /* * Enforce TLI obtention if backup is not present as this code * path can be taken as a callback at exit. */ tli = get_current_timeline(false); /* Fill in fields if backup exists */ if (backup != NULL) { backup->tli = tli; backup->stop_lsn = lsn; elog(LOG, "%s(): tli=%X lsn=%X/%08X", __FUNCTION__, backup->tli, (uint32) (backup->stop_lsn >> 32), (uint32) backup->stop_lsn); } /* As well as WAL file name */ XLByteToSeg(lsn, targetSegNo); XLogFileName(file_name, tli, targetSegNo); snprintf(ready_path, lengthof(ready_path), "%s/pg_xlog/archive_status/%s.ready", pgdata, file_name); elog(LOG, "%s() wait for %s", __FUNCTION__, ready_path); PQclear(res); if (from_replica) res = pgut_execute(conn, TXID_CURRENT_IF_SQL, 0, NULL, ERROR); else res = pgut_execute(conn, TXID_CURRENT_SQL, 0, NULL, ERROR); if (backup != NULL) { get_xid(res, &backup->recovery_xid); backup->recovery_time = time(NULL); } /* wait until switched WAL is archived */ try_count = 0; while (fileExists(ready_path)) { sleep(1); if (interrupted) elog(ERROR, "interrupted during waiting for WAL archiving"); try_count++; if (try_count > TIMEOUT_ARCHIVE) elog(ERROR, "switched WAL could not be archived in %d seconds", TIMEOUT_ARCHIVE); } elog(LOG, "%s() .ready deleted in %d try", __FUNCTION__, try_count); } /* * Notify end of backup to PostgreSQL server. */ static void pg_stop_backup(pgBackup *backup) { if (stream_wal) { PGresult *res; TimeLineID tli; //reconnect(); if (start_stop_connect == NULL) start_stop_connect = pgut_connect(ERROR); /* Remove annoying NOTICE messages generated by backend */ res = pgut_execute(start_stop_connect, "SET client_min_messages = warning;", 0, NULL, ERROR); PQclear(res); /* And execute the query wanted */ if (from_replica) res = pgut_execute(start_stop_connect,"SELECT * FROM pg_stop_backup(false)", 0, NULL, ERROR); else res = pgut_execute(start_stop_connect,"SELECT * FROM pg_stop_backup()", 0, NULL, ERROR); /* Get LSN from execution result */ get_lsn(start_stop_connect, res, &stop_backup_lsn, true); PQclear(res); /* * Enforce TLI obtention if backup is not present as this code * path can be taken as a callback at exit. */ tli = get_current_timeline(false); /* Fill in fields if backup exists */ if (backup != NULL) { backup->tli = tli; backup->stop_lsn = stop_backup_lsn; elog(LOG, "%s(): tli=%X lsn=%X/%08X", __FUNCTION__, backup->tli, (uint32) (backup->stop_lsn >> 32), (uint32) backup->stop_lsn); } if (from_replica) res = pgut_execute(start_stop_connect, TXID_CURRENT_IF_SQL, 0, NULL, ERROR); else res = pgut_execute(start_stop_connect, TXID_CURRENT_SQL, 0, NULL, ERROR); if (backup != NULL) { get_xid(res, &backup->recovery_xid); backup->recovery_time = time(NULL); } PQclear(res); //disconnect(); pgut_disconnect(start_stop_connect); } else { if (from_replica) wait_for_archive(start_stop_connect, backup, "SELECT * FROM pg_stop_backup(false)", true); else wait_for_archive(start_stop_connect, backup, "SELECT * FROM pg_stop_backup()", true); pgut_disconnect(start_stop_connect); } } /* * Check if node is a standby by looking at the presence of * recovery.conf. */ static bool pg_is_standby(void) { char path[MAXPGPATH]; snprintf(path, lengthof(path), "%s/recovery.conf", pgdata); make_native_path(path); return fileExists(path); } /* * Get LSN from result of pg_start_backup() or pg_stop_backup(). */ static void get_lsn(PGconn *conn, PGresult *res, XLogRecPtr *lsn, bool stop_backup) { uint32 xlogid; uint32 xrecoff; if (res == NULL || PQntuples(res) != 1 || (PQnfields(res) != 1 && PQnfields(res) != 3)) elog(ERROR, "result of backup command is invalid: %s", PQerrorMessage(conn)); /* * Extract timeline and LSN from results of pg_stop_backup() * and friends. */ XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff); /* Calculate LSN */ *lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff; if (stop_backup && from_replica && PQnfields(res) == 3) { char path[MAXPGPATH]; char path_backup_label[MAXPGPATH]; char path_tablespace_map[MAXPGPATH]; FILE *fp; pgFile *file; pgBackupGetPath(¤t, path, lengthof(path), DATABASE_DIR); snprintf(path_backup_label, lengthof(path_backup_label), "%s/backup_label", path); snprintf(path_tablespace_map, lengthof(path_tablespace_map), "%s/tablespace_map", path); fp = fopen(path_backup_label, "w"); if (fp == NULL) elog(ERROR, "can't open backup label file \"%s\": %s", path_backup_label, strerror(errno)); fwrite(PQgetvalue(res, 0, 1), 1, strlen(PQgetvalue(res, 0, 1)), fp); fclose(fp); file = pgFileNew(path_backup_label, true); calc_file(file); free(file->path); file->path = strdup("backup_label"); parray_append(backup_files_list, file); if (strlen(PQgetvalue(res, 0, 2)) == 0) return; fp = fopen(path_tablespace_map, "w"); if (fp == NULL) elog(ERROR, "can't open tablespace map file \"%s\": %s", path_tablespace_map, strerror(errno)); fwrite(PQgetvalue(res, 0, 2), 1, strlen(PQgetvalue(res, 0, 2)), fp); fclose(fp); file = pgFileNew(path_tablespace_map, true); calc_file(file); free(file->path); file->path = strdup("tablespace_map"); parray_append(backup_files_list, file); } } /* * Get XID from result of txid_current() after pg_stop_backup(). */ static void get_xid(PGresult *res, uint32 *xid) { if (res == NULL || PQntuples(res) != 1 || PQnfields(res) != 1) elog(ERROR, "result of txid_current() is invalid: %s", PQerrorMessage(connection)); if (sscanf(PQgetvalue(res, 0, 0), "%u", xid) != 1) { elog(ERROR, "result of txid_current() is invalid: %s", PQerrorMessage(connection)); } elog(LOG, "%s():%s", __FUNCTION__, PQgetvalue(res, 0, 0)); } /* * Return true if the path is a existing regular file. */ bool fileExists(const char *path) { struct stat buf; if (stat(path, &buf) == -1 && errno == ENOENT) return false; else if (!S_ISREG(buf.st_mode)) return false; else return true; } /* * Notify end of backup to server when "backup_label" is in the root directory * of the DB cluster. * Also update backup status to ERROR when the backup is not finished. */ static void backup_cleanup(bool fatal, void *userdata) { char path[MAXPGPATH]; if (!in_backup) return; /* If backup_label exist in $PGDATA, notify stop of backup to PostgreSQL */ snprintf(path, lengthof(path), "%s/backup_label", pgdata); make_native_path(path); if (fileExists(path)) { elog(LOG, "backup_label exists, stop backup"); pg_stop_backup(NULL); /* don't care stop_lsn on error case */ } /* * Update status of backup.ini to ERROR. * end_time != 0 means backup finished */ if (current.status == BACKUP_STATUS_RUNNING && current.end_time == 0) { elog(LOG, "backup is running, update its status to ERROR"); current.end_time = time(NULL); current.status = BACKUP_STATUS_ERROR; pgBackupWriteIni(¤t); } } /* * Take differential backup at page level. */ static void backup_files(void *arg) { int i; struct timeval tv; backup_files_args *arguments = (backup_files_args *) arg; gettimeofday(&tv, NULL); /* backup a file or create a directory */ for (i = 0; i < parray_num(arguments->files); i++) { int ret; struct stat buf; pgFile *file = (pgFile *) parray_get(arguments->files, i); if (__sync_lock_test_and_set(&file->lock, 1) != 0) continue; /* If current time is rewinded, abort this backup. */ if (tv.tv_sec < file->mtime) elog(ERROR, "current time may be rewound. Please retry with full backup mode."); /* check for interrupt */ if (interrupted) elog(ERROR, "interrupted during backup"); /* print progress in verbose mode */ if (verbose) elog(LOG, "(%d/%lu) %s", i + 1, (unsigned long) parray_num(arguments->files), file->path + strlen(arguments->from_root) + 1); /* stat file to get file type, size and modify timestamp */ ret = stat(file->path, &buf); if (ret == -1) { if (errno == ENOENT) { /* record as skipped file in file_xxx.txt */ file->write_size = BYTES_INVALID; elog(LOG, "skip"); continue; } else { elog(ERROR, "can't stat backup mode. \"%s\": %s", file->path, strerror(errno)); } } /* skip dir because make before */ if (S_ISDIR(buf.st_mode)) { continue; } else if (S_ISREG(buf.st_mode)) { /* skip files which have not been modified since last backup */ if (arguments->prev_files) { pgFile *prev_file = NULL; pgFile **p = (pgFile **) parray_bsearch(arguments->prev_files, file, pgFileComparePath); if (p) prev_file = *p; if (prev_file && prev_file->mtime == file->mtime) { /* record as skipped file in file_xxx.txt */ file->write_size = BYTES_INVALID; elog(LOG, "skip"); continue; } } /* * We will wait until the next second of mtime so that backup * file should contain all modifications at the clock of mtime. * timer resolution of ext3 file system is one second. */ if (tv.tv_sec == file->mtime) { /* update time and recheck */ gettimeofday(&tv, NULL); while (tv.tv_sec <= file->mtime) { usleep(1000000 - tv.tv_usec); gettimeofday(&tv, NULL); } } /* copy the file into backup */ if (!(file->is_datafile ? backup_data_file(arguments->from_root, arguments->to_root, file, arguments->lsn) : copy_file(arguments->from_root, arguments->to_root, file))) { /* record as skipped file in file_xxx.txt */ file->write_size = BYTES_INVALID; elog(LOG, "skip"); continue; } elog(LOG, "copied %lu", (unsigned long) file->write_size); } else elog(LOG, "unexpected file type %d", buf.st_mode); if (progress) fprintf(stderr, "\rProgress %i/%u", total_copy_files_increment, total_files_num-1); __sync_fetch_and_add(&total_copy_files_increment, 1); } } /* * Append files to the backup list array. */ static void add_files(parray *files, const char *root, bool add_root, bool is_pgdata) { parray *list_file; int i; list_file = parray_new(); /* list files with the logical path. omit $PGDATA */ dir_list_file(list_file, root, pgdata_exclude, true, add_root); /* mark files that are possible datafile as 'datafile' */ for (i = 0; i < parray_num(list_file); i++) { pgFile *file = (pgFile *) parray_get(list_file, i); char *relative; char *fname; int path_len; /* data file must be a regular file */ if (!S_ISREG(file->mode)) continue; /* data files are under "base", "global", or "pg_tblspc" */ relative = file->path + strlen(root) + 1; if (is_pgdata && !path_is_prefix_of_path("base", relative) && /*!path_is_prefix_of_path("global", relative) &&*/ !path_is_prefix_of_path("pg_tblspc", relative)) continue; path_len = strlen(file->path); if (path_len > 6 && strncmp(file->path+(path_len-6), "ptrack", 6) == 0) { pgFile *search_file; pgFile **pre_search_file; int segno = 0; while(true) { pgFile tmp_file; tmp_file.path = pg_strdup(file->path); /* I hope segno not more than 999999 */ if (segno > 0) sprintf(tmp_file.path+path_len-7, ".%d", segno); else tmp_file.path[path_len-7] = '\0'; pre_search_file = (pgFile **) parray_bsearch(list_file, &tmp_file, pgFileComparePath); if (pre_search_file != NULL) { search_file = *pre_search_file; search_file->ptrack_path = pg_strdup(file->path); search_file->segno = segno; } else { pg_free(tmp_file.path); break; } pg_free(tmp_file.path); segno++; } pgFileFree(file); parray_remove(list_file, i); i--; continue; } /* name of data file start with digit */ fname = last_dir_separator(relative); if (fname == NULL) fname = relative; else fname++; if (!isdigit(fname[0])) continue; file->is_datafile = true; { int find_dot; int check_digit; char *text_segno; for(find_dot = path_len-1; file->path[find_dot] != '.' && find_dot >= 0; find_dot--); if (find_dot <= 0) continue; text_segno = file->path + find_dot + 1; for(check_digit=0; text_segno[check_digit] != '\0'; check_digit++) if (!isdigit(text_segno[check_digit])) { check_digit = -1; break; } if (check_digit == -1) continue; file->segno = (int) strtol(text_segno, NULL, 10); } } parray_concat(files, list_file); } /* * Output the list of files to backup catalog */ static void create_file_list(parray *files, const char *root, const char *subdir, const char *prefix, bool is_append) { FILE *fp; char path[MAXPGPATH]; if (!check) { /* output path is '$BACKUP_PATH/file_database.txt' */ pgBackupGetPath(¤t, path, lengthof(path), subdir); fp = fopen(path, is_append ? "at" : "wt"); if (fp == NULL) elog(ERROR, "can't open file list \"%s\": %s", path, strerror(errno)); dir_print_file_list(fp, files, root, prefix); fclose(fp); } } /* * A helper function to create the path of a relation file and segment. * * The returned path is palloc'd */ static char * datasegpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno) { char *path; char *segpath; path = relpathperm(rnode, forknum); if (segno > 0) { segpath = psprintf("%s.%u", path, segno); pfree(path); return segpath; } else return path; } /* * This routine gets called while reading WAL segments from the WAL archive, * for every block that have changed in the target system. It makes note of * all the changed blocks in the pagemap of the file and adds them in the * things to track for the backup. */ void process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno) { char *path; char *rel_path; BlockNumber blkno_inseg; int segno; pgFile *file_item = NULL; int j; segno = blkno / RELSEG_SIZE; blkno_inseg = blkno % RELSEG_SIZE; rel_path = datasegpath(rnode, forknum, segno); path = pg_malloc(strlen(rel_path) + strlen(pgdata) + 2); sprintf(path, "%s/%s", pgdata, rel_path); for (j = 0; j < parray_num(backup_files_list); j++) { pgFile *p = (pgFile *) parray_get(backup_files_list, j); if (strcmp(p->path, path) == 0) { file_item = p; break; } } /* * If we don't have any record of this file in the file map, it means * that it's a relation that did not have much activity since the last * backup. We can safely ignore it. If it is a new relation file, the * backup would simply copy it as-is. */ if (file_item) datapagemap_add(&file_item->pagemap, blkno_inseg); pg_free(path); pg_free(rel_path); } void make_pagemap_from_ptrack(parray *files) { int i; for (i = 0; i < parray_num(files); i++) { pgFile *p = (pgFile *) parray_get(files, i); if (p->ptrack_path != NULL) { char *flat_memory; char *tmp_path = p->ptrack_path; char *tablespace; size_t path_length = strlen(p->ptrack_path); size_t flat_size = 0; size_t start_addr; Oid db_oid, rel_oid, tablespace_oid = 0; int sep_iter, sep_count = 0; /* Find target path*/ for(sep_iter = (int)path_length; sep_iter >= 0; sep_iter--) { if (IS_DIR_SEP(tmp_path[sep_iter])) { sep_count++; } if (sep_count == 2) { tmp_path += sep_iter + 1; break; } } /* For unix only now */ sscanf(tmp_path, "%u/%u_ptrack", &db_oid, &rel_oid); tablespace = strstr(p->ptrack_path, "pg_tblspc"); if (tablespace != NULL) sscanf(tablespace+10, "%i/", &tablespace_oid); flat_memory = pg_ptrack_get_and_clear(tablespace_oid, db_oid, rel_oid, &flat_size); start_addr = (RELSEG_SIZE/8)*p->segno; p->pagemap.bitmapsize = start_addr+RELSEG_SIZE/8 > flat_size ? flat_size - start_addr : RELSEG_SIZE/8; p->pagemap.bitmap = pg_malloc(p->pagemap.bitmapsize); memcpy(p->pagemap.bitmap, flat_memory+start_addr, p->pagemap.bitmapsize); pg_free(flat_memory); } } } static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) { static uint32 prevtimeline = 0; static XLogRecPtr prevpos = InvalidXLogRecPtr; /* we assume that we get called once at the end of each segment */ if (verbose && segment_finished) fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), progname, (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); /* * Note that we report the previous, not current, position here. After a * timeline switch, xlogpos points to the beginning of the segment because * that's where we always begin streaming. Reporting the end of previous * timeline isn't totally accurate, because the next timeline can begin * slightly before the end of the WAL that we received on the previous * timeline, but it's close enough for reporting purposes. */ if (prevtimeline != 0 && prevtimeline != timeline) fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"), progname, timeline, (uint32) (prevpos >> 32), (uint32) prevpos); if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn) return true; prevtimeline = timeline; prevpos = xlogpos; return false; } /* * Start the log streaming */ static void StreamLog(void *arg) { XLogRecPtr startpos; TimeLineID starttli; char *basedir = (char *)arg; /* * Connect in replication mode to the server */ if (conn == NULL) conn = GetConnection(); if (!conn) { pthread_mutex_unlock(&check_stream_mut); /* Error message already written in GetConnection() */ return; } if (!CheckServerVersionForStreaming(conn)) { /* * Error message already written in CheckServerVersionForStreaming(). * There's no hope of recovering from a version mismatch, so don't * retry. */ disconnect_and_exit(1); } /* * Identify server, obtaining start LSN position and current timeline ID * at the same time, necessary if not valid data can be found in the * existing output directory. */ if (!RunIdentifySystem(conn, NULL, &starttli, &startpos, NULL)) disconnect_and_exit(1); /* Ok we have normal stream connect and main process can work again */ pthread_mutex_unlock(&check_stream_mut); /* * We must use startpos as start_lsn from start_backup */ startpos = current.start_lsn; /* * Always start streaming at the beginning of a segment */ startpos -= startpos % XLOG_SEG_SIZE; /* * Start the replication */ if (verbose) fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), progname, (uint32) (startpos >> 32), (uint32) startpos, starttli); #if PG_VERSION_NUM >= 90600 StreamCtl ctl; ctl.startpos = startpos; ctl.timeline = starttli; ctl.sysidentifier = NULL; ctl.basedir = basedir; ctl.stream_stop = stop_streaming; ctl.standby_message_timeout = standby_message_timeout; ctl.partial_suffix = NULL; ctl.synchronous = false; ctl.mark_done = false; if(ReceiveXlogStream(conn, &ctl) == false) elog(ERROR, "Problem in recivexlog"); #else if(ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, NULL, false, false) == false) elog(ERROR, "Problem in recivexlog"); #endif PQfinish(conn); conn = NULL; }