From 64939fd328c674038994b904cf7715de5bf3ae2c Mon Sep 17 00:00:00 2001 From: Anastasia Date: Thu, 28 Sep 2017 17:40:24 +0300 Subject: [PATCH] backup via repprotocol v1. WIP --- src/backup.c | 382 +++++++++++++++++++++++++++++++++++++++++++-- src/pg_probackup.c | 2 + src/pg_probackup.h | 1 + src/utils/pgut.c | 96 ++++++++++++ src/utils/pgut.h | 4 + 5 files changed, 472 insertions(+), 13 deletions(-) diff --git a/src/backup.c b/src/backup.c index 62c2fe8d..84b5d587 100644 --- a/src/backup.c +++ b/src/backup.c @@ -25,6 +25,7 @@ #include "datapagemap.h" #include "receivelog.h" #include "streamutil.h" +#include "pgtar.h" static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr; @@ -53,6 +54,7 @@ static int is_ptrack_enable = false; /* Backup connections */ static PGconn *backup_conn = NULL; static PGconn *master_conn = NULL; +static PGconn *backup_conn_replication = NULL; /* PostgreSQL server version from "backup_conn" */ static int server_version = 0; @@ -77,6 +79,8 @@ static void backup_cleanup(bool fatal, void *userdata); static void backup_disconnect(bool fatal, void *userdata); static void backup_files(void *arg); +static void remote_backup_files(void *arg); + static void do_backup_instance(void); static void pg_start_backup(const char *label, bool smooth, pgBackup *backup); @@ -91,6 +95,10 @@ static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup); static void make_pagemap_from_ptrack(parray *files); static void StreamLog(void *arg); +static void get_remote_pgdata_filelist(parray *files); +static void ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum); +static void remote_copy_file(PGconn *conn, pgFile* file); + /* Ptrack functions */ static void pg_ptrack_clear(void); static bool pg_ptrack_support(void); @@ -114,6 +122,307 @@ static void confirm_block_size(const char *name, int blcksz); exit(code); \ } +/* Fill "files" with data about all the files to backup */ +static void +get_remote_pgdata_filelist(parray *files) +{ + PGresult *res; + int resultStatus; + int i; + + backup_conn_replication = pgut_connect_replication(pgut_dbname); + + if (PQsendQuery(backup_conn_replication, "FILE_BACKUP FILELIST") == 0) + elog(ERROR,"%s: could not send replication command \"%s\": %s", + PROGRAM_NAME, "FILE_BACKUP", PQerrorMessage(backup_conn_replication)); + + res = PQgetResult(backup_conn_replication); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + resultStatus = PQresultStatus(res); + PQclear(res); + elog(ERROR, "cannot start getting FILE_BACKUP filelist: %s, result_status %d", + PQerrorMessage(backup_conn_replication), resultStatus); + } + + if (PQntuples(res) < 1) + elog(ERROR, "%s: no data returned from server", PROGRAM_NAME); + + for (i = 0; i < PQntuples(res); i++) + { + ReceiveFileList(files, backup_conn_replication, res, i); + } + + res = PQgetResult(backup_conn_replication); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + elog(ERROR, "%s: final receive failed: %s", + PROGRAM_NAME, PQerrorMessage(backup_conn_replication)); + } + + PQfinish(backup_conn_replication); +} + +/* + * workhorse for get_remote_pgdata_filelist(). + * Parse received message into pgFile structure. + */ +static void +ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum) +{ + char filename[MAXPGPATH]; + pgoff_t current_len_left = 0; + bool basetablespace; + char *copybuf = NULL; + pgFile *pgfile; + + /* What for do we need this basetablespace field?? */ + basetablespace = PQgetisnull(res, rownum, 0); + if (basetablespace) + elog(LOG,"basetablespace"); + else + elog(LOG, "basetablespace %s", PQgetvalue(res, rownum, 1)); + + res = PQgetResult(conn); + + if (PQresultStatus(res) != PGRES_COPY_OUT) + elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(conn)); + + while (1) + { + int r; + int filemode; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + r = PQgetCopyData(conn, ©buf, 0); + + if (r == -2) + elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn)); + + /* end of copy */ + if (r == -1) + break; + + /* This must be the header for a new file */ + if (r != 512) + elog(ERROR, "Invalid tar block header size: %d\n", r); + + current_len_left = read_tar_number(©buf[124], 12); + + /* Set permissions on the file */ + filemode = read_tar_number(©buf[100], 8); + + /* First part of header is zero terminated filename */ + snprintf(filename, sizeof(filename), "%s", copybuf); + + pgfile = pgFileInit(filename); + pgfile->size = current_len_left; + pgfile->mode |= filemode; + + if (filename[strlen(filename) - 1] == '/') + { + /* Symbolic link or directory has size zero */ + Assert (pgfile->size == 0); + /* Ends in a slash means directory or symlink to directory */ + if (copybuf[156] == '5') + { + /* Directory */ + pgfile->mode |= __S_IFDIR; + } + else if (copybuf[156] == '2') + { + /* Symlink */ + pgfile->mode |= __S_IFLNK; + } + else + elog(ERROR, "Unrecognized link indicator \"%c\"\n", + copybuf[156]); + } + else + { + /* regular file */ + pgfile->mode |= __S_IFREG; + } + + parray_append(files, pgfile); + } + + if (copybuf != NULL) + PQfreemem(copybuf); +} + +/* read one file via replication protocol + * and write it to the destination subdir in 'backup_path' */ +static void remote_copy_file(PGconn *conn, pgFile* file) +{ + PGresult *res; + char *copybuf = NULL; + char buf[32768]; + FILE *out; + char database_path[MAXPGPATH]; + char to_path[MAXPGPATH]; + bool skip_padding = false; + + pgBackupGetPath(¤t, database_path, lengthof(database_path), + DATABASE_DIR); + join_path_components(to_path, database_path, file->path); + + out = fopen(to_path, "w"); + if (out == NULL) + { + int errno_tmp = errno; + elog(ERROR, "cannot open destination file \"%s\": %s", + to_path, strerror(errno_tmp)); + } + + INIT_CRC32C(file->crc); + + /* read from stream and write to backup file */ + while (1) + { + int row_length; + int errno_tmp; + int write_buffer_size = 0; + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + row_length = PQgetCopyData(conn, ©buf, 0); + + if (row_length == -2) + elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn)); + + if (row_length == -1) + break; + + if (!skip_padding) + { + write_buffer_size = Min(row_length, sizeof(buf)); + memcpy(buf, copybuf, write_buffer_size); + COMP_CRC32C(file->crc, &buf, write_buffer_size); + + /* TODO calc checksum*/ + if (fwrite(buf, 1, write_buffer_size, out) != write_buffer_size) + { + errno_tmp = errno; + /* oops */ + FIN_CRC32C(file->crc); + fclose(out); + PQfinish(conn); + elog(ERROR, "cannot write to \"%s\": %s", to_path, + strerror(errno_tmp)); + } + + file->read_size += write_buffer_size; + } + if (file->read_size >= file->size) + { + skip_padding = true; + } + } + + res = PQgetResult(conn); + + /* File is not found. That's normal. */ + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + elog(ERROR, "final receive failed: status %d ; %s",PQresultStatus(res), PQerrorMessage(conn)); + } + + file->write_size = file->read_size; + FIN_CRC32C(file->crc); + + fclose(out); +} + +/* + * Take a remote backup of the PGDATA at a file level. + * Copy all directories and files listed in backup_files_list. + */ +static void +remote_backup_files(void *arg) +{ + int i; + backup_files_args *arguments = (backup_files_args *) arg; + int n_backup_files_list = parray_num(arguments->backup_files_list); + PGconn *file_backup_conn = NULL; + + for (i = 0; i < n_backup_files_list; i++) + { + char *query_str; + PGresult *res; + char *copybuf = NULL; + pgFile *file; + int row_length; + + file = (pgFile *) parray_get(arguments->backup_files_list, i); + + /* We have already copied all directories */ + if (S_ISDIR(file->mode)) + continue; + + if (__sync_lock_test_and_set(&file->lock, 1) != 0) + continue; + + file_backup_conn = pgut_connect_replication(pgut_dbname); + + /* check for interrupt */ + if (interrupted) + elog(ERROR, "interrupted during backup"); + + query_str = psprintf("FILE_BACKUP FILEPATH '%s'",file->path); + + if (PQsendQuery(file_backup_conn, query_str) == 0) + elog(ERROR,"%s: could not send replication command \"%s\": %s", + PROGRAM_NAME, query_str, PQerrorMessage(file_backup_conn)); + + res = PQgetResult(file_backup_conn); + + /* File is not found. That's normal. */ + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + PQfinish(file_backup_conn); + continue; + } + + if (PQresultStatus(res) != PGRES_COPY_OUT) + { + PQclear(res); + PQfinish(file_backup_conn); + elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(file_backup_conn)); + } + + /* read the header of the file */ + row_length = PQgetCopyData(file_backup_conn, ©buf, 0); + + if (row_length == -2) + elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(file_backup_conn)); + + /* end of copy TODO handle it */ + if (row_length == -1) + elog(ERROR, "Unexpected end of COPY data"); + + if(row_length != 512) + elog(ERROR, "Invalid tar block header size: %d\n", row_length); + file->size = read_tar_number(©buf[124], 12); + + /* receive the data from stream and write to backup file */ + remote_copy_file(file_backup_conn, file); + + elog(LOG, "File \"%s\". Copied %lu bytes", + file->path, (unsigned long) file->write_size); + PQfinish(file_backup_conn); + } +} /* * Take a backup of a single postgresql instance. @@ -122,7 +431,7 @@ static void confirm_block_size(const char *name, int blcksz); static void do_backup_instance(void) { - size_t i; + int i; char database_path[MAXPGPATH]; char dst_backup_path[MAXPGPATH]; char label[1024]; @@ -140,9 +449,30 @@ do_backup_instance(void) /* Initialize size summary */ current.data_bytes = 0; - /* Obtain current timeline from PGDATA control file */ - current.tli = get_current_timeline(false); + /* Obtain current timeline */ + if (is_remote_backup) + { + char *sysidentifier; + TimeLineID starttli; + XLogRecPtr startpos; + backup_conn_replication = pgut_connect_replication(pgut_dbname); + + /* Check replication prorocol connection */ + if (!RunIdentifySystem(backup_conn_replication, &sysidentifier, &starttli, &startpos, NULL)) + elog(ERROR, "Failed to send command for remote backup"); + +// TODO implement the check +// if (&sysidentifier != system_identifier) +// elog(ERROR, "Backup data directory was initialized for system id %ld, but target backup directory system id is %ld", +// system_identifier, sysidentifier); + + current.tli = starttli; + + PQfinish(backup_conn_replication); + } + else + current.tli = get_current_timeline(false); /* * In incremental backup mode ensure that already-validated * backup on current timeline exists and get its filelist. @@ -222,7 +552,11 @@ do_backup_instance(void) backup_files_list = parray_new(); /* list files with the logical path. omit $PGDATA */ - dir_list_file(backup_files_list, pgdata, true, true, false); + if (is_remote_backup) + get_remote_pgdata_filelist(backup_files_list); + else + dir_list_file(backup_files_list, pgdata, true, true, false); + add_pgdata_files(backup_files_list, pgdata); if (current.backup_mode != BACKUP_MODE_FULL) @@ -289,9 +623,17 @@ do_backup_instance(void) if (S_ISDIR(file->mode)) { char dirpath[MAXPGPATH]; - char *dir_name = GetRelativePath(file->path, pgdata); + char *dir_name; + char database_path[MAXPGPATH]; + + if (!is_remote_backup) + dir_name = GetRelativePath(file->path, pgdata); + else + dir_name = file->path; elog(LOG, "Create directory \"%s\"", dir_name); + pgBackupGetPath(¤t, database_path, lengthof(database_path), + DATABASE_DIR); join_path_components(dirpath, database_path, dir_name); dir_create_dir(dirpath, DIR_PERMISSION); @@ -320,8 +662,16 @@ do_backup_instance(void) /* Run threads */ for (i = 0; i < num_threads; i++) { - elog(LOG, "Start thread num:%li", parray_num(backup_threads_args[i]->backup_files_list)); - pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, backup_threads_args[i]); + elog(LOG, "Start thread num:%i", i); + + if (!is_remote_backup) + pthread_create(&backup_threads[i], NULL, + (void *(*)(void *)) backup_files, + backup_threads_args[i]); + else + pthread_create(&backup_threads[i], NULL, + (void *(*)(void *)) remote_backup_files, + backup_threads_args[i]); } /* Wait threads */ @@ -355,7 +705,8 @@ do_backup_instance(void) for (i = 0; i < parray_num(xlog_files_list); i++) { pgFile *file = (pgFile *) parray_get(xlog_files_list, i); - calc_file_checksum(file); + if (S_ISREG(file->mode)) + calc_file_checksum(file); /* Remove file path root prefix*/ if (strstr(file->path, database_path) == file->path) { @@ -418,7 +769,9 @@ do_backup(void) /* Confirm that this server version is supported */ check_server_version(); - current.checksum_version = get_data_checksum_version(true); + /* TODO fix it for remote backup*/ + if (!is_remote_backup) + current.checksum_version = get_data_checksum_version(true); current.stream = stream_wal; /* ptrack backup checks */ @@ -461,10 +814,13 @@ do_backup(void) * instance we opened connection to. And that target backup database PGDATA * belogns to the same instance. */ - check_system_identifiers(); + /* TODO fix it for remote backup */ + if (!is_remote_backup) + check_system_identifiers(); - elog(LOG, "Backup start. backup-mode = %s, stream = %s", - pgBackupGetBackupMode(¤t), current.stream ? "true" : "false"); + elog(LOG, "Backup start. backup-mode = %s, stream = %s, remote = %s", + pgBackupGetBackupMode(¤t), current.stream ? "true" : "false", + is_remote_backup ? "true" : "false"); /* Start backup. Update backup status. */ current.status = BACKUP_STATUS_RUNNING; @@ -1506,7 +1862,7 @@ backup_files(void *arg) backup_files_args *arguments = (backup_files_args *) arg; int n_backup_files_list = parray_num(arguments->backup_files_list); - /* backup a file or create a directory */ + /* backup a file */ for (i = 0; i < n_backup_files_list; i++) { int ret; diff --git a/src/pg_probackup.c b/src/pg_probackup.c index e9472c49..34d072cf 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -48,6 +48,7 @@ char *replication_slot = NULL; bool backup_logs = false; bool smooth_checkpoint; bool from_replica = false; +bool is_remote_backup = false; /* Wait timeout for WAL segment archiving */ uint32 archive_timeout = 300; /* default is 300 seconds */ const char *master_db = NULL; @@ -118,6 +119,7 @@ static pgut_option options[] = { 's', 15, "master-port", &master_port, SOURCE_CMDLINE, }, { 's', 16, "master-user", &master_user, SOURCE_CMDLINE, }, { 'u', 17, "replica-timeout", &replica_timeout, SOURCE_CMDLINE, }, + { 'b', 18, "remote", &is_remote_backup, SOURCE_CMDLINE, }, /* restore options */ { 's', 20, "time", &target_time, SOURCE_CMDLINE }, { 's', 21, "xid", &target_xid, SOURCE_CMDLINE }, diff --git a/src/pg_probackup.h b/src/pg_probackup.h index 57dbb5e6..d365e75c 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -285,6 +285,7 @@ extern char *replication_slot; extern bool smooth_checkpoint; extern uint32 archive_timeout; extern bool from_replica; +extern bool is_remote_backup; extern const char *master_db; extern const char *master_host; extern const char *master_port; diff --git a/src/utils/pgut.c b/src/utils/pgut.c index d607605e..e805d922 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -1054,6 +1054,102 @@ pgut_connect_extended(const char *pghost, const char *pgport, } } +PGconn * +pgut_connect_replication(const char *dbname) +{ + return pgut_connect_replication_extended(host, port, dbname, username, password); +} + +PGconn * +pgut_connect_replication_extended(const char *pghost, const char *pgport, + const char *dbname, const char *pguser, const char *pwd) +{ + PGconn *tmpconn; + int argcount = 7; /* dbname, replication, fallback_app_name, + * host, user, port, password */ + int i; + const char **keywords; + const char **values; + + if (interrupted && !in_cleanup) + elog(ERROR, "interrupted"); + + i = 0; + + keywords = pg_malloc0((argcount + 1) * sizeof(*keywords)); + values = pg_malloc0((argcount + 1) * sizeof(*values)); + + + keywords[i] = "dbname"; + values[i] = "replication"; + i++; + keywords[i] = "replication"; + values[i] = "true"; + i++; + keywords[i] = "fallback_application_name"; + values[i] = PROGRAM_NAME; + i++; + + if (pghost) + { + keywords[i] = "host"; + values[i] = pghost; + i++; + } + if (pguser) + { + keywords[i] = "user"; + values[i] = pguser; + i++; + } + if (pgport) + { + keywords[i] = "port"; + values[i] = pgport; + i++; + } + + /* Use (or reuse, on a subsequent connection) password if we have it */ + if (password) + { + keywords[i] = "password"; + values[i] = password; + } + else + { + keywords[i] = NULL; + values[i] = NULL; + } + + for (;;) + { + tmpconn = PQconnectdbParams(keywords, values, true); + + + if (PQstatus(tmpconn) == CONNECTION_OK) + { + free(values); + free(keywords); + return tmpconn; + } + + if (tmpconn && PQconnectionNeedsPassword(tmpconn) && prompt_password) + { + PQfinish(tmpconn); + prompt_for_password(username); + continue; + } + + elog(ERROR, "could not connect to database %s: %s", + dbname, PQerrorMessage(tmpconn)); + PQfinish(tmpconn); + free(values); + free(keywords); + return NULL; + } +} + + void pgut_disconnect(PGconn *conn) { diff --git a/src/utils/pgut.h b/src/utils/pgut.h index 240a4596..18acfdae 100644 --- a/src/utils/pgut.h +++ b/src/utils/pgut.h @@ -118,6 +118,10 @@ extern void pgut_atexit_pop(pgut_atexit_callback callback, void *userdata); extern PGconn *pgut_connect(const char *dbname); extern PGconn *pgut_connect_extended(const char *pghost, const char *pgport, const char *dbname, const char *login); +extern PGconn *pgut_connect_replication(const char *dbname); +extern PGconn *pgut_connect_replication_extended(const char *pghost, const char *pgport, + const char *dbname, const char *login, + const char *pwd); extern void pgut_disconnect(PGconn *conn); extern PGresult *pgut_execute(PGconn* conn, const char *query, int nParams, const char **params); extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);