1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-12-12 11:45:24 +02:00

Rewrite backup method to use push inside segment

This commit is contained in:
Konstantin Knizhnik 2019-02-15 14:36:16 +03:00
parent 35f2b5b636
commit fb74027c42
4 changed files with 252 additions and 104 deletions

View File

@ -60,7 +60,7 @@ zlib_decompress(void *dst, size_t dst_size, void const *src, size_t src_size)
* Compresses source into dest using algorithm. Returns the number of bytes
* written in the destination buffer, or -1 if compression fails.
*/
static int32
int32
do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, int level, const char **errormsg)
{
@ -166,22 +166,8 @@ page_may_be_compressed(Page page, CompressAlg alg, uint32 backup_version)
return false;
}
/*
* When copying datafiles to backup we validate and compress them block
* by block. Thus special header is required for each data block.
*/
typedef struct BackupPageHeader
{
BlockNumber block; /* block number */
int32 compressed_size;
} BackupPageHeader;
/* Special value for compressed_size field */
#define PageIsTruncated -2
#define SkipCurrentPage -3
/* Verify page's header */
static bool
bool
parse_page(Page page, XLogRecPtr *lsn)
{
PageHeader phdr = (PageHeader) page;
@ -210,15 +196,15 @@ parse_page(Page page, XLogRecPtr *lsn)
*/
static int
read_page_from_file(pgFile *file, BlockNumber blknum,
FILE *in, Page page, XLogRecPtr *page_lsn, XLogRecPtr horizon_lsn)
FILE *in, Page page, XLogRecPtr *page_lsn)
{
off_t offset = blknum * BLCKSZ;
ssize_t read_len = 0;
/* read the block */
read_len = fio_pread(in, page, offset, horizon_lsn);
read_len = fio_pread(in, page, offset);
if (read_len != BLCKSZ && read_len != sizeof(PageHeaderData))
if (read_len != BLCKSZ)
{
/* The block could have been truncated. It is fine. */
if (read_len == 0)
@ -269,19 +255,11 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
if (current.checksum_version)
{
BlockNumber blkno = file->segno * RELSEG_SIZE + blknum;
uint16 page_crc = read_len == BLCKSZ
? pg_checksum_page(page, blkno)
/*
* Recompute Cpage checksum calculated by agent with blkno=0
* pg_checksum_page is calculating it in this way:
* (((checksum ^ blkno) % 65535) + 1)
*/
: (uint16)(((*PAGE_CHECKSUM(page) - 1) ^ blkno) + 1);
/*
* If checksum is wrong, sleep a bit and then try again
* several times. If it didn't help, throw error
*/
if (page_crc != ((PageHeader) page)->pd_checksum)
if (pg_checksum_page(page, blkno) != ((PageHeader) page)->pd_checksum)
{
elog(WARNING, "File: %s blknum %u have wrong checksum, try again",
file->path, blknum);
@ -314,7 +292,7 @@ static int32
prepare_page(backup_files_arg *arguments,
pgFile *file, XLogRecPtr prev_backup_start_lsn,
BlockNumber blknum, BlockNumber nblocks,
FILE *in, int *n_skipped,
FILE *in, BlockNumber *n_skipped,
BackupMode backup_mode,
Page page)
{
@ -337,11 +315,7 @@ prepare_page(backup_files_arg *arguments,
{
while(!page_is_valid && try_again)
{
bool check_lsn = (backup_mode == BACKUP_MODE_DIFF_DELTA
&& file->exists_in_prev
&& !page_is_truncated);
int result = read_page_from_file(file, blknum, in, page, &page_lsn,
check_lsn ? prev_backup_start_lsn : InvalidXLogRecPtr);
int result = read_page_from_file(file, blknum, in, page, &page_lsn);
try_again--;
if (result == 0)
@ -533,8 +507,8 @@ backup_data_file(backup_files_arg* arguments,
FILE *out;
BlockNumber blknum = 0;
BlockNumber nblocks = 0;
int n_blocks_skipped = 0;
int n_blocks_read = 0;
BlockNumber n_blocks_skipped = 0;
BlockNumber n_blocks_read = 0;
int page_state;
char curr_page[BLCKSZ];
@ -614,16 +588,29 @@ backup_data_file(backup_files_arg* arguments,
if (file->pagemap.bitmapsize == PageBitmapIsEmpty ||
file->pagemap_isabsent || !file->exists_in_prev)
{
for (blknum = 0; blknum < nblocks; blknum++)
if (backup_mode != BACKUP_MODE_DIFF_PTRACK && fio_is_remote_file(in))
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page, calg, clevel);
n_blocks_read++;
if (page_state == PageIsTruncated)
break;
int rc = fio_send_pages(in, out, file,
backup_mode == BACKUP_MODE_DIFF_DELTA && file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
&n_blocks_skipped, calg, clevel);
if (rc < 0)
elog(ERROR, "Failed to read file %s: %s",
file->path, rc == PAGE_CHECKSUM_MISMATCH ? "data file checksum mismatch" : strerror(-rc));
n_blocks_read = rc;
}
else
{
for (blknum = 0; blknum < nblocks; blknum++)
{
page_state = prepare_page(arguments, file, prev_backup_start_lsn,
blknum, nblocks, in, &n_blocks_skipped,
backup_mode, curr_page);
compress_and_backup_page(file, blknum, in, out, &(file->crc),
page_state, curr_page, calg, clevel);
n_blocks_read++;
if (page_state == PageIsTruncated)
break;
}
}
if (backup_mode == BACKUP_MODE_DIFF_DELTA)
file->n_blocks = n_blocks_read;

View File

@ -309,6 +309,21 @@ typedef struct
int ret;
} backup_files_arg;
/*
* When copying datafiles to backup we validate and compress them block
* by block. Thus special header is required for each data block.
*/
typedef struct BackupPageHeader
{
BlockNumber block; /* block number */
int32 compressed_size;
} BackupPageHeader;
/* Special value for compressed_size field */
#define PageIsTruncated -2
#define SkipCurrentPage -3
/*
* return pointer that exceeds the length of prefix from character string.
* ex. str="/xxx/yyy/zzz", prefix="/xxx/yyy", return="zzz".
@ -603,6 +618,9 @@ extern char *base36enc_dup(long unsigned int value);
extern long unsigned int base36dec(const char *text);
extern uint32 parse_server_version(const char *server_version_str);
extern uint32 parse_program_version(const char *program_version);
extern bool parse_page(Page page, XLogRecPtr *lsn);
int32 do_compress(void* dst, size_t dst_size, void const* src, size_t src_size,
CompressAlg alg, int level, const char **errormsg);
#ifdef WIN32
#ifdef _DEBUG

View File

@ -4,24 +4,29 @@
#include <sys/stat.h>
#include "pg_probackup.h"
#include "storage/checksum.h"
#include "file.h"
#include "storage/checksum.h"
#define PRINTF_BUF_SIZE 1024
#define FILE_PERMISSIONS 0600
#define PAGE_READ_ATTEMPTS 100
static __thread unsigned long fio_fdset = 0;
static __thread void* fio_stdin_buffer;
static __thread int fio_stdout = 0;
static __thread int fio_stdin = 0;
fio_location MyLocation;
typedef struct
{
fio_header hdr;
XLogRecPtr lsn;
} fio_pread_request;
fio_location MyLocation;
BlockNumber nblocks;
BlockNumber segBlockNum;
XLogRecPtr horizonLsn;
uint32 checksumVersion;
int calg;
int clevel;
} fio_send_request;
/* Convert FIO pseudo handle to index in file descriptor array */
@ -34,12 +39,6 @@ void fio_redirect(int in, int out)
fio_stdout = out;
}
/* Check if FILE handle is local or remote (created by FIO) */
static bool fio_is_remote_file(FILE* file)
{
return (size_t)file <= FIO_FDMAX;
}
/* Check if file descriptor is local or remote (created by FIO) */
static bool fio_is_remote_fd(int fd)
{
@ -388,42 +387,30 @@ int fio_truncate(int fd, off_t size)
/*
* Read file from specified location.
* This call is optimized for delat backup, to avoid trasfer of old pages to backup host.
* For delta backup horizon_lsn parameter is assigned value of last backup and for all pages with LSN smaller than horizon_lsn only page header is sent.
*/
int fio_pread(FILE* f, void* buf, off_t offs, XLogRecPtr horizon_lsn)
int fio_pread(FILE* f, void* buf, off_t offs)
{
if (fio_is_remote_file(f))
{
int fd = fio_fileno(f);
fio_pread_request req;
fio_header hdr;
req.hdr.cop = FIO_PREAD;
req.hdr.handle = fd & ~FIO_PIPE_MARKER;
req.hdr.size = sizeof(XLogRecPtr);
req.hdr.arg = offs;
req.lsn = horizon_lsn;
hdr.cop = FIO_PREAD;
hdr.handle = fd & ~FIO_PIPE_MARKER;
hdr.size = sizeof(XLogRecPtr);
hdr.arg = offs;
IO_CHECK(fio_write_all(fio_stdout, &req, sizeof(req)), sizeof(req));
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_SEND);
if (hdr.size != 0)
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
/*
* We either return <0 for error, either 0 for EOF, either received size (page or page header size)
* for fully read page either 1 for partly read page. 1 is used just to distinguish it with page header size.
*/
return hdr.arg <= 0 ? hdr.arg : hdr.arg == BLCKSZ ? hdr.size : 1;
return hdr.arg;
}
else
{
int rc = pread(fileno(f), buf, BLCKSZ, offs);
/* See comment above consernign returned value */
return rc <= 0 || rc == BLCKSZ ? rc : 1;
}
return pread(fileno(f), buf, BLCKSZ, offs);
}
/* Set position in stdio file */
@ -804,6 +791,164 @@ static void fio_send_file(int out, char const* path)
}
}
int fio_send_pages(FILE* in, FILE* out, pgFile *file,
XLogRecPtr horizonLsn, BlockNumber* nBlocksSkipped, int calg, int clevel)
{
struct {
fio_header hdr;
fio_send_request arg;
} req;
BlockNumber n_blocks_read = 0;
BlockNumber blknum = 0;
Assert(fio_is_remote_file(in));
req.hdr.cop = FIO_SEND_PAGES;
req.hdr.size = sizeof(fio_send_request);
req.hdr.handle = fio_fileno(in) & ~FIO_PIPE_MARKER;
req.arg.nblocks = file->size/BLCKSZ;
req.arg.segBlockNum = file->segno * RELSEG_SIZE;
req.arg.horizonLsn = horizonLsn;
req.arg.checksumVersion = current.checksum_version;
req.arg.calg = calg;
req.arg.clevel = clevel;
IO_CHECK(fio_write_all(fio_stdout, &req, sizeof(req)), sizeof(req));
while (true)
{
fio_header hdr;
char buf[BLCKSZ + sizeof(BackupPageHeader)];
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
Assert(hdr.cop == FIO_PAGE);
if (hdr.arg < 0) /* read error */
return hdr.arg;
blknum = hdr.arg;
if (hdr.size == 0) /* end of segment */
break;
Assert(hdr.size <= sizeof(buf));
IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
COMP_FILE_CRC32(true, file->crc, buf, hdr.size);
if (fio_fwrite(out, buf, hdr.size) != hdr.size)
{
int errno_tmp = errno;
fio_fclose(out);
elog(ERROR, "File: %s, cannot write backup at block %u: %s",
file->path, blknum, strerror(errno_tmp));
}
file->compress_alg = calg;
file->read_size += BLCKSZ;
file->write_size += hdr.size;
n_blocks_read++;
}
*nBlocksSkipped = blknum - n_blocks_read;
return blknum;
}
static void fio_send_pages_impl(int fd, int out, fio_send_request* req)
{
BlockNumber blknum;
char read_buffer[BLCKSZ+1];
fio_header hdr;
hdr.cop = FIO_PAGE;
read_buffer[BLCKSZ] = 1; /* barrier */
for (blknum = 0; blknum < req->nblocks; blknum++)
{
int retry_attempts = PAGE_READ_ATTEMPTS;
XLogRecPtr page_lsn = InvalidXLogRecPtr;
bool is_empty_page = false;
do
{
ssize_t rc = pread(fd, read_buffer, BLCKSZ, blknum*BLCKSZ);
if (rc <= 0)
{
hdr.size = 0;
if (rc < 0)
{
hdr.arg = -errno;
Assert(hdr.arg < 0);
}
else
{
/* This is the last page */
hdr.arg = blknum;
}
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
return;
}
else if (rc == BLCKSZ)
{
if (!parse_page((Page)read_buffer, &page_lsn))
{
int i;
for (i = 0; read_buffer[i] == 0; i++);
/* Page is zeroed. No need to check header and checksum. */
if (i == BLCKSZ)
{
is_empty_page = true;
break;
}
}
else if (!req->checksumVersion
|| pg_checksum_page(read_buffer, req->segBlockNum + blknum) == ((PageHeader)read_buffer)->pd_checksum)
{
break;
}
}
} while (--retry_attempts != 0);
if (retry_attempts == 0)
{
hdr.size = 0;
hdr.arg = PAGE_CHECKSUM_MISMATCH;
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
return;
}
/* horizonLsn is not 0 for delta backup. As far as unsigned number are always greater or equal than zero, there is no sense to add more checks */
if (page_lsn >= req->horizonLsn)
{
char write_buffer[BLCKSZ*2];
BackupPageHeader* bph = (BackupPageHeader*)write_buffer;
hdr.arg = bph->block = blknum;
hdr.size = sizeof(BackupPageHeader);
if (is_empty_page)
{
bph->compressed_size = PageIsTruncated;
}
else
{
const char *errormsg = NULL;
bph->compressed_size = do_compress(write_buffer + sizeof(BackupPageHeader), sizeof(write_buffer) - sizeof(BackupPageHeader),
read_buffer, BLCKSZ, req->calg, req->clevel,
&errormsg);
if (bph->compressed_size <= 0 || bph->compressed_size >= BLCKSZ)
{
/* Do not compress page */
memcpy(write_buffer + sizeof(BackupPageHeader), read_buffer, BLCKSZ);
bph->compressed_size = BLCKSZ;
}
hdr.size += bph->compressed_size;
}
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, write_buffer, hdr.size), hdr.size);
}
}
hdr.size = 0;
hdr.arg = blknum;
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
}
/* Execute commands at remote host */
void fio_communicate(int in, int out)
@ -820,7 +965,6 @@ void fio_communicate(int in, int out)
char* buf = (char*)malloc(buf_size);
fio_header hdr;
struct stat st;
XLogRecPtr horizon_lsn;
int rc;
/* Main loop until command of processing master command */
@ -872,26 +1016,19 @@ void fio_communicate(int in, int out)
rc = read(fd[hdr.handle], buf, hdr.arg);
hdr.cop = FIO_SEND;
hdr.size = rc > 0 ? rc : 0;
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
IO_CHECK(fio_write_all(out, buf, hdr.size), hdr.size);
break;
case FIO_PREAD: /* Read from specified position in file, ignoring pages beyond horizon of delta backup */
horizon_lsn = *(XLogRecPtr*)buf;
rc = pread(fd[hdr.handle], buf, BLCKSZ, hdr.arg);
hdr.cop = FIO_SEND;
hdr.arg = rc;
/* For pages beyond horizon of delta backup transfer only page header */
hdr.size = (rc == BLCKSZ)
? PageXLogRecPtrGet(((PageHeader)buf)->pd_lsn) < horizon_lsn /* For non-delta backup horizon_lsn == 0, so this condition is always false */
? sizeof(PageHeaderData) : BLCKSZ
: 0;
if (hdr.size == sizeof(PageHeaderData))
/* calculate checksum without XOR-ing with block number to compare it with page CRC at master */
*PAGE_CHECKSUM(buf) = pg_checksum_page(buf, 0);
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size != 0)
IO_CHECK(fio_write_all(out, buf, hdr.size), hdr.size);
break;
case FIO_PREAD: /* Read from specified position in file, ignoring pages beyond horizon of delta backup */
rc = pread(fd[hdr.handle], buf, BLCKSZ, hdr.arg);
hdr.cop = FIO_SEND;
hdr.arg = rc;
hdr.size = rc >= 0 ? rc : 0;
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
if (hdr.size != 0)
IO_CHECK(fio_write_all(out, buf, hdr.size), hdr.size);
break;
case FIO_FSTAT: /* Get information about opened file */
hdr.size = sizeof(st);
hdr.arg = fstat(fd[hdr.handle], &st);
@ -927,6 +1064,10 @@ void fio_communicate(int in, int out)
case FIO_TRUNCATE: /* Truncate file */
SYS_CHECK(ftruncate(fd[hdr.handle], hdr.arg));
break;
case FIO_SEND_PAGES:
Assert(hdr.size == sizeof(fio_send_request));
fio_send_pages_impl(fd[hdr.handle], out, (fio_send_request*)buf);
break;
default:
Assert(false);
}

View File

@ -30,7 +30,9 @@ typedef enum
FIO_ACCESS,
FIO_OPENDIR,
FIO_READDIR,
FIO_CLOSEDIR
FIO_CLOSEDIR,
FIO_SEND_PAGES,
FIO_PAGE
} fio_operations;
typedef enum
@ -43,18 +45,11 @@ typedef enum
#define FIO_FDMAX 64
#define FIO_PIPE_MARKER 0x40000000
#define PAGE_CHECKSUM_MISMATCH (-256)
#define SYS_CHECK(cmd) do if ((cmd) < 0) { fprintf(stderr, "%s:%d: (%s) %s\n", __FILE__, __LINE__, #cmd, strerror(errno)); exit(EXIT_FAILURE); } while (0)
#define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (size)) { fprintf(stderr, "%s:%d: proceeds %d bytes instead of %d: %s\n", __FILE__, __LINE__, _rc, (int)(size), _rc < 0 ? "end of data" : strerror(errno)); exit(EXIT_FAILURE); } } while (0)
/*
* Store one more checksum in page header.
* There is free space at the ned of page header (not used for page verification)
* While delta backup we need to calculate checksum at agent, send it to maini pg_probackup instance
* adjust it according to the real block number and compare with checksum stored in pd_checksum
*/
#define PAGE_CHECKSUM(p) ((uint16*)((p) + sizeof(PageHeaderData)) - 1)
typedef struct
{
unsigned cop : 5;
@ -65,13 +60,16 @@ typedef struct
extern fio_location MyLocation;
/* Check if FILE handle is local or remote (created by FIO) */
#define fio_is_remote_file(file) ((size_t)(file) <= FIO_FDMAX)
extern void fio_redirect(int in, int out);
extern void fio_communicate(int in, int out);
extern FILE* fio_fopen(char const* name, char const* mode, fio_location location);
extern size_t fio_fwrite(FILE* f, void const* buf, size_t size);
extern ssize_t fio_fread(FILE* f, void* buf, size_t size);
extern int fio_pread(FILE* f, void* buf, off_t offs, XLogRecPtr horizon_lsn);
extern int fio_pread(FILE* f, void* buf, off_t offs);
extern int fio_fprintf(FILE* f, char const* arg, ...) __attribute__((format(printf, 2, 3)));
extern int fio_fflush(FILE* f);
extern int fio_fseek(FILE* f, off_t offs);
@ -79,6 +77,10 @@ extern int fio_ftruncate(FILE* f, off_t size);
extern int fio_fclose(FILE* f);
extern int fio_ffstat(FILE* f, struct stat* st);
struct pgFile;
extern int fio_send_pages(FILE* in, FILE* out, struct pgFile *file, XLogRecPtr horizonLsn,
BlockNumber* nBlocksSkipped, int calg, int clevel);
extern int fio_open(char const* name, int mode, fio_location location);
extern ssize_t fio_write(int fd, void const* buf, size_t size);
extern ssize_t fio_read(int fd, void* buf, size_t size);