1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-02-13 14:58:35 +02:00

backup via repprotocol v1. WIP

This commit is contained in:
Anastasia 2017-09-28 17:40:24 +03:00
parent e47b02018b
commit 64939fd328
5 changed files with 472 additions and 13 deletions

View File

@ -25,6 +25,7 @@
#include "datapagemap.h"
#include "receivelog.h"
#include "streamutil.h"
#include "pgtar.h"
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr;
@ -53,6 +54,7 @@ static int is_ptrack_enable = false;
/* Backup connections */
static PGconn *backup_conn = NULL;
static PGconn *master_conn = NULL;
static PGconn *backup_conn_replication = NULL;
/* PostgreSQL server version from "backup_conn" */
static int server_version = 0;
@ -77,6 +79,8 @@ static void backup_cleanup(bool fatal, void *userdata);
static void backup_disconnect(bool fatal, void *userdata);
static void backup_files(void *arg);
static void remote_backup_files(void *arg);
static void do_backup_instance(void);
static void pg_start_backup(const char *label, bool smooth, pgBackup *backup);
@ -91,6 +95,10 @@ static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup);
static void make_pagemap_from_ptrack(parray *files);
static void StreamLog(void *arg);
static void get_remote_pgdata_filelist(parray *files);
static void ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum);
static void remote_copy_file(PGconn *conn, pgFile* file);
/* Ptrack functions */
static void pg_ptrack_clear(void);
static bool pg_ptrack_support(void);
@ -114,6 +122,307 @@ static void confirm_block_size(const char *name, int blcksz);
exit(code); \
}
/* Fill "files" with data about all the files to backup */
static void
get_remote_pgdata_filelist(parray *files)
{
PGresult *res;
int resultStatus;
int i;
backup_conn_replication = pgut_connect_replication(pgut_dbname);
if (PQsendQuery(backup_conn_replication, "FILE_BACKUP FILELIST") == 0)
elog(ERROR,"%s: could not send replication command \"%s\": %s",
PROGRAM_NAME, "FILE_BACKUP", PQerrorMessage(backup_conn_replication));
res = PQgetResult(backup_conn_replication);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
resultStatus = PQresultStatus(res);
PQclear(res);
elog(ERROR, "cannot start getting FILE_BACKUP filelist: %s, result_status %d",
PQerrorMessage(backup_conn_replication), resultStatus);
}
if (PQntuples(res) < 1)
elog(ERROR, "%s: no data returned from server", PROGRAM_NAME);
for (i = 0; i < PQntuples(res); i++)
{
ReceiveFileList(files, backup_conn_replication, res, i);
}
res = PQgetResult(backup_conn_replication);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(ERROR, "%s: final receive failed: %s",
PROGRAM_NAME, PQerrorMessage(backup_conn_replication));
}
PQfinish(backup_conn_replication);
}
/*
* workhorse for get_remote_pgdata_filelist().
* Parse received message into pgFile structure.
*/
static void
ReceiveFileList(parray* files, PGconn *conn, PGresult *res, int rownum)
{
char filename[MAXPGPATH];
pgoff_t current_len_left = 0;
bool basetablespace;
char *copybuf = NULL;
pgFile *pgfile;
/* What for do we need this basetablespace field?? */
basetablespace = PQgetisnull(res, rownum, 0);
if (basetablespace)
elog(LOG,"basetablespace");
else
elog(LOG, "basetablespace %s", PQgetvalue(res, rownum, 1));
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COPY_OUT)
elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(conn));
while (1)
{
int r;
int filemode;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
r = PQgetCopyData(conn, &copybuf, 0);
if (r == -2)
elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn));
/* end of copy */
if (r == -1)
break;
/* This must be the header for a new file */
if (r != 512)
elog(ERROR, "Invalid tar block header size: %d\n", r);
current_len_left = read_tar_number(&copybuf[124], 12);
/* Set permissions on the file */
filemode = read_tar_number(&copybuf[100], 8);
/* First part of header is zero terminated filename */
snprintf(filename, sizeof(filename), "%s", copybuf);
pgfile = pgFileInit(filename);
pgfile->size = current_len_left;
pgfile->mode |= filemode;
if (filename[strlen(filename) - 1] == '/')
{
/* Symbolic link or directory has size zero */
Assert (pgfile->size == 0);
/* Ends in a slash means directory or symlink to directory */
if (copybuf[156] == '5')
{
/* Directory */
pgfile->mode |= __S_IFDIR;
}
else if (copybuf[156] == '2')
{
/* Symlink */
pgfile->mode |= __S_IFLNK;
}
else
elog(ERROR, "Unrecognized link indicator \"%c\"\n",
copybuf[156]);
}
else
{
/* regular file */
pgfile->mode |= __S_IFREG;
}
parray_append(files, pgfile);
}
if (copybuf != NULL)
PQfreemem(copybuf);
}
/* read one file via replication protocol
* and write it to the destination subdir in 'backup_path' */
static void remote_copy_file(PGconn *conn, pgFile* file)
{
PGresult *res;
char *copybuf = NULL;
char buf[32768];
FILE *out;
char database_path[MAXPGPATH];
char to_path[MAXPGPATH];
bool skip_padding = false;
pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR);
join_path_components(to_path, database_path, file->path);
out = fopen(to_path, "w");
if (out == NULL)
{
int errno_tmp = errno;
elog(ERROR, "cannot open destination file \"%s\": %s",
to_path, strerror(errno_tmp));
}
INIT_CRC32C(file->crc);
/* read from stream and write to backup file */
while (1)
{
int row_length;
int errno_tmp;
int write_buffer_size = 0;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
row_length = PQgetCopyData(conn, &copybuf, 0);
if (row_length == -2)
elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(conn));
if (row_length == -1)
break;
if (!skip_padding)
{
write_buffer_size = Min(row_length, sizeof(buf));
memcpy(buf, copybuf, write_buffer_size);
COMP_CRC32C(file->crc, &buf, write_buffer_size);
/* TODO calc checksum*/
if (fwrite(buf, 1, write_buffer_size, out) != write_buffer_size)
{
errno_tmp = errno;
/* oops */
FIN_CRC32C(file->crc);
fclose(out);
PQfinish(conn);
elog(ERROR, "cannot write to \"%s\": %s", to_path,
strerror(errno_tmp));
}
file->read_size += write_buffer_size;
}
if (file->read_size >= file->size)
{
skip_padding = true;
}
}
res = PQgetResult(conn);
/* File is not found. That's normal. */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
elog(ERROR, "final receive failed: status %d ; %s",PQresultStatus(res), PQerrorMessage(conn));
}
file->write_size = file->read_size;
FIN_CRC32C(file->crc);
fclose(out);
}
/*
* Take a remote backup of the PGDATA at a file level.
* Copy all directories and files listed in backup_files_list.
*/
static void
remote_backup_files(void *arg)
{
int i;
backup_files_args *arguments = (backup_files_args *) arg;
int n_backup_files_list = parray_num(arguments->backup_files_list);
PGconn *file_backup_conn = NULL;
for (i = 0; i < n_backup_files_list; i++)
{
char *query_str;
PGresult *res;
char *copybuf = NULL;
pgFile *file;
int row_length;
file = (pgFile *) parray_get(arguments->backup_files_list, i);
/* We have already copied all directories */
if (S_ISDIR(file->mode))
continue;
if (__sync_lock_test_and_set(&file->lock, 1) != 0)
continue;
file_backup_conn = pgut_connect_replication(pgut_dbname);
/* check for interrupt */
if (interrupted)
elog(ERROR, "interrupted during backup");
query_str = psprintf("FILE_BACKUP FILEPATH '%s'",file->path);
if (PQsendQuery(file_backup_conn, query_str) == 0)
elog(ERROR,"%s: could not send replication command \"%s\": %s",
PROGRAM_NAME, query_str, PQerrorMessage(file_backup_conn));
res = PQgetResult(file_backup_conn);
/* File is not found. That's normal. */
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
PQfinish(file_backup_conn);
continue;
}
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
PQclear(res);
PQfinish(file_backup_conn);
elog(ERROR, "Could not get COPY data stream: %s", PQerrorMessage(file_backup_conn));
}
/* read the header of the file */
row_length = PQgetCopyData(file_backup_conn, &copybuf, 0);
if (row_length == -2)
elog(ERROR, "Could not read COPY data: %s", PQerrorMessage(file_backup_conn));
/* end of copy TODO handle it */
if (row_length == -1)
elog(ERROR, "Unexpected end of COPY data");
if(row_length != 512)
elog(ERROR, "Invalid tar block header size: %d\n", row_length);
file->size = read_tar_number(&copybuf[124], 12);
/* receive the data from stream and write to backup file */
remote_copy_file(file_backup_conn, file);
elog(LOG, "File \"%s\". Copied %lu bytes",
file->path, (unsigned long) file->write_size);
PQfinish(file_backup_conn);
}
}
/*
* Take a backup of a single postgresql instance.
@ -122,7 +431,7 @@ static void confirm_block_size(const char *name, int blcksz);
static void
do_backup_instance(void)
{
size_t i;
int i;
char database_path[MAXPGPATH];
char dst_backup_path[MAXPGPATH];
char label[1024];
@ -140,9 +449,30 @@ do_backup_instance(void)
/* Initialize size summary */
current.data_bytes = 0;
/* Obtain current timeline from PGDATA control file */
current.tli = get_current_timeline(false);
/* Obtain current timeline */
if (is_remote_backup)
{
char *sysidentifier;
TimeLineID starttli;
XLogRecPtr startpos;
backup_conn_replication = pgut_connect_replication(pgut_dbname);
/* Check replication prorocol connection */
if (!RunIdentifySystem(backup_conn_replication, &sysidentifier, &starttli, &startpos, NULL))
elog(ERROR, "Failed to send command for remote backup");
// TODO implement the check
// if (&sysidentifier != system_identifier)
// elog(ERROR, "Backup data directory was initialized for system id %ld, but target backup directory system id is %ld",
// system_identifier, sysidentifier);
current.tli = starttli;
PQfinish(backup_conn_replication);
}
else
current.tli = get_current_timeline(false);
/*
* In incremental backup mode ensure that already-validated
* backup on current timeline exists and get its filelist.
@ -222,7 +552,11 @@ do_backup_instance(void)
backup_files_list = parray_new();
/* list files with the logical path. omit $PGDATA */
dir_list_file(backup_files_list, pgdata, true, true, false);
if (is_remote_backup)
get_remote_pgdata_filelist(backup_files_list);
else
dir_list_file(backup_files_list, pgdata, true, true, false);
add_pgdata_files(backup_files_list, pgdata);
if (current.backup_mode != BACKUP_MODE_FULL)
@ -289,9 +623,17 @@ do_backup_instance(void)
if (S_ISDIR(file->mode))
{
char dirpath[MAXPGPATH];
char *dir_name = GetRelativePath(file->path, pgdata);
char *dir_name;
char database_path[MAXPGPATH];
if (!is_remote_backup)
dir_name = GetRelativePath(file->path, pgdata);
else
dir_name = file->path;
elog(LOG, "Create directory \"%s\"", dir_name);
pgBackupGetPath(&current, database_path, lengthof(database_path),
DATABASE_DIR);
join_path_components(dirpath, database_path, dir_name);
dir_create_dir(dirpath, DIR_PERMISSION);
@ -320,8 +662,16 @@ do_backup_instance(void)
/* Run threads */
for (i = 0; i < num_threads; i++)
{
elog(LOG, "Start thread num:%li", parray_num(backup_threads_args[i]->backup_files_list));
pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, backup_threads_args[i]);
elog(LOG, "Start thread num:%i", i);
if (!is_remote_backup)
pthread_create(&backup_threads[i], NULL,
(void *(*)(void *)) backup_files,
backup_threads_args[i]);
else
pthread_create(&backup_threads[i], NULL,
(void *(*)(void *)) remote_backup_files,
backup_threads_args[i]);
}
/* Wait threads */
@ -355,7 +705,8 @@ do_backup_instance(void)
for (i = 0; i < parray_num(xlog_files_list); i++)
{
pgFile *file = (pgFile *) parray_get(xlog_files_list, i);
calc_file_checksum(file);
if (S_ISREG(file->mode))
calc_file_checksum(file);
/* Remove file path root prefix*/
if (strstr(file->path, database_path) == file->path)
{
@ -418,7 +769,9 @@ do_backup(void)
/* Confirm that this server version is supported */
check_server_version();
current.checksum_version = get_data_checksum_version(true);
/* TODO fix it for remote backup*/
if (!is_remote_backup)
current.checksum_version = get_data_checksum_version(true);
current.stream = stream_wal;
/* ptrack backup checks */
@ -461,10 +814,13 @@ do_backup(void)
* instance we opened connection to. And that target backup database PGDATA
* belogns to the same instance.
*/
check_system_identifiers();
/* TODO fix it for remote backup */
if (!is_remote_backup)
check_system_identifiers();
elog(LOG, "Backup start. backup-mode = %s, stream = %s",
pgBackupGetBackupMode(&current), current.stream ? "true" : "false");
elog(LOG, "Backup start. backup-mode = %s, stream = %s, remote = %s",
pgBackupGetBackupMode(&current), current.stream ? "true" : "false",
is_remote_backup ? "true" : "false");
/* Start backup. Update backup status. */
current.status = BACKUP_STATUS_RUNNING;
@ -1506,7 +1862,7 @@ backup_files(void *arg)
backup_files_args *arguments = (backup_files_args *) arg;
int n_backup_files_list = parray_num(arguments->backup_files_list);
/* backup a file or create a directory */
/* backup a file */
for (i = 0; i < n_backup_files_list; i++)
{
int ret;

View File

@ -48,6 +48,7 @@ char *replication_slot = NULL;
bool backup_logs = false;
bool smooth_checkpoint;
bool from_replica = false;
bool is_remote_backup = false;
/* Wait timeout for WAL segment archiving */
uint32 archive_timeout = 300; /* default is 300 seconds */
const char *master_db = NULL;
@ -118,6 +119,7 @@ static pgut_option options[] =
{ 's', 15, "master-port", &master_port, SOURCE_CMDLINE, },
{ 's', 16, "master-user", &master_user, SOURCE_CMDLINE, },
{ 'u', 17, "replica-timeout", &replica_timeout, SOURCE_CMDLINE, },
{ 'b', 18, "remote", &is_remote_backup, SOURCE_CMDLINE, },
/* restore options */
{ 's', 20, "time", &target_time, SOURCE_CMDLINE },
{ 's', 21, "xid", &target_xid, SOURCE_CMDLINE },

View File

@ -285,6 +285,7 @@ extern char *replication_slot;
extern bool smooth_checkpoint;
extern uint32 archive_timeout;
extern bool from_replica;
extern bool is_remote_backup;
extern const char *master_db;
extern const char *master_host;
extern const char *master_port;

View File

@ -1054,6 +1054,102 @@ pgut_connect_extended(const char *pghost, const char *pgport,
}
}
PGconn *
pgut_connect_replication(const char *dbname)
{
return pgut_connect_replication_extended(host, port, dbname, username, password);
}
PGconn *
pgut_connect_replication_extended(const char *pghost, const char *pgport,
const char *dbname, const char *pguser, const char *pwd)
{
PGconn *tmpconn;
int argcount = 7; /* dbname, replication, fallback_app_name,
* host, user, port, password */
int i;
const char **keywords;
const char **values;
if (interrupted && !in_cleanup)
elog(ERROR, "interrupted");
i = 0;
keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
values = pg_malloc0((argcount + 1) * sizeof(*values));
keywords[i] = "dbname";
values[i] = "replication";
i++;
keywords[i] = "replication";
values[i] = "true";
i++;
keywords[i] = "fallback_application_name";
values[i] = PROGRAM_NAME;
i++;
if (pghost)
{
keywords[i] = "host";
values[i] = pghost;
i++;
}
if (pguser)
{
keywords[i] = "user";
values[i] = pguser;
i++;
}
if (pgport)
{
keywords[i] = "port";
values[i] = pgport;
i++;
}
/* Use (or reuse, on a subsequent connection) password if we have it */
if (password)
{
keywords[i] = "password";
values[i] = password;
}
else
{
keywords[i] = NULL;
values[i] = NULL;
}
for (;;)
{
tmpconn = PQconnectdbParams(keywords, values, true);
if (PQstatus(tmpconn) == CONNECTION_OK)
{
free(values);
free(keywords);
return tmpconn;
}
if (tmpconn && PQconnectionNeedsPassword(tmpconn) && prompt_password)
{
PQfinish(tmpconn);
prompt_for_password(username);
continue;
}
elog(ERROR, "could not connect to database %s: %s",
dbname, PQerrorMessage(tmpconn));
PQfinish(tmpconn);
free(values);
free(keywords);
return NULL;
}
}
void
pgut_disconnect(PGconn *conn)
{

View File

@ -118,6 +118,10 @@ extern void pgut_atexit_pop(pgut_atexit_callback callback, void *userdata);
extern PGconn *pgut_connect(const char *dbname);
extern PGconn *pgut_connect_extended(const char *pghost, const char *pgport,
const char *dbname, const char *login);
extern PGconn *pgut_connect_replication(const char *dbname);
extern PGconn *pgut_connect_replication_extended(const char *pghost, const char *pgport,
const char *dbname, const char *login,
const char *pwd);
extern void pgut_disconnect(PGconn *conn);
extern PGresult *pgut_execute(PGconn* conn, const char *query, int nParams, const char **params);
extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);