1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2024-11-28 09:33:54 +02:00

Remove walmethods

This commit is contained in:
Konstantin Knizhnik 2019-01-27 01:11:52 +03:00
parent f7e2a7a8cb
commit 3bedd3b639
7 changed files with 39 additions and 512 deletions

View File

@ -11,13 +11,13 @@ OBJS += src/archive.o src/backup.o src/catalog.o src/configure.o src/data.o \
# borrowed files
OBJS += src/pg_crc.o src/datapagemap.o src/receivelog.o src/streamutil.o \
src/xlogreader.o src/walmethods.o
src/xlogreader.o
EXTRA_CLEAN = src/pg_crc.c src/datapagemap.c src/datapagemap.h src/logging.h \
src/receivelog.c src/receivelog.h src/streamutil.c src/streamutil.h \
src/xlogreader.c
INCLUDES = src/datapagemap.h src/logging.h src/streamutil.h src/receivelog.h src/walmethods.h
INCLUDES = src/datapagemap.h src/logging.h src/streamutil.h src/receivelog.h
ifdef USE_PGXS
PG_CONFIG = pg_config
@ -39,6 +39,12 @@ else
srchome=$(top_srcdir)
endif
ifeq (,$(filter 9.5 9.6,$(MAJORVERSION)))
OBJS += src/walmethods.o
EXTRA_CLEAN += src/walmethods.c src/walmethods.h
INCLUDES += src/walmethods.h
endif
PG_CPPFLAGS = -I$(libpq_srcdir) ${PTHREAD_CFLAGS} -Isrc -I$(top_srcdir)/$(subdir)/src
override CPPFLAGS := -DFRONTEND $(CPPFLAGS) $(PG_CPPFLAGS)
PG_LIBS = $(libpq_pgport) ${PTHREAD_CFLAGS}
@ -66,6 +72,12 @@ src/streamutil.h: $(top_srcdir)/src/bin/pg_basebackup/streamutil.h
src/xlogreader.c: $(top_srcdir)/src/backend/access/transam/xlogreader.c
rm -f $@ && $(LN_S) $(srchome)/src/backend/access/transam/xlogreader.c $@
ifeq (,$(filter 9.5 9.6,$(MAJORVERSION)))
src/walmethods.c: $(top_srcdir)/src/bin/pg_basebackup/walmethods.c
rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/walmethods.c $@
src/walmethods.h: $(top_srcdir)/src/bin/pg_basebackup/walmethods.h
rm -f $@ && $(LN_S) $(srchome)/src/bin/pg_basebackup/walmethods.h $@
endif
ifeq ($(PORTNAME), aix)
CC=xlc_r

View File

@ -66,11 +66,10 @@ typedef struct
* 0 means there is no error, 1 - there is an error.
*/
int ret;
parray *files_list;
} StreamThreadArg;
static pthread_t stream_thread;
static StreamThreadArg stream_thread_arg = {"", NULL, 1, NULL};
static StreamThreadArg stream_thread_arg = {"", NULL, 1};
static int is_ptrack_enable = false;
bool is_ptrack_support = false;
@ -482,7 +481,6 @@ do_backup_instance(void)
pgBackup *prev_backup = NULL;
parray *prev_backup_filelist = NULL;
parray *xlog_files_list = NULL;
parray *backup_list = NULL;
pgFile *pg_control = NULL;
@ -620,7 +618,6 @@ do_backup_instance(void)
/* By default there are some error */
stream_thread_arg.ret = 1;
stream_thread_arg.files_list = xlog_files_list;
pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg);
}
@ -804,15 +801,14 @@ do_backup_instance(void)
/* Add archived xlog files into the list of files of this backup */
if (stream_wal)
{
if (xlog_files_list == NULL)
{
char pg_xlog_path[MAXPGPATH];
parray *xlog_files_list;
char pg_xlog_path[MAXPGPATH];
/* Scan backup PG_XLOG_DIR */
xlog_files_list = parray_new();
join_path_components(pg_xlog_path, database_path, PG_XLOG_DIR);
dir_list_file(xlog_files_list, pg_xlog_path, false, true, false, FIO_BACKUP_HOST);
/* Scan backup PG_XLOG_DIR */
xlog_files_list = parray_new();
join_path_components(pg_xlog_path, database_path, PG_XLOG_DIR);
dir_list_file(xlog_files_list, pg_xlog_path, false, true, false, FIO_BACKUP_HOST);
}
for (i = 0; i < parray_num(xlog_files_list); i++)
{
pgFile *file = (pgFile *) parray_get(xlog_files_list, i);
@ -2231,7 +2227,7 @@ backup_files(void *arg)
i + 1, n_backup_files_list, file->path);
/* stat file to check its current state */
ret = fio_stat(file->path, &buf, false, FIO_DB_HOST);
ret = fio_stat(file->path, &buf, true, FIO_DB_HOST);
if (ret == -1)
{
if (errno == ENOENT)
@ -2758,7 +2754,7 @@ StreamLog(void *arg)
ctl.sysidentifier = NULL;
#if PG_VERSION_NUM >= 100000
ctl.walmethod = CreateWalDirectoryMethod(stream_arg->basedir, 0, true, stream_arg->files_list);
ctl.walmethod = CreateWalDirectoryMethod(stream_arg->basedir, 0, true);
ctl.replication_slot = replication_slot;
ctl.stop_socket = PGINVALID_SOCKET;
#else

View File

@ -213,7 +213,7 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
FILE *in, Page page, XLogRecPtr *page_lsn, XLogRecPtr horizon_lsn)
{
off_t offset = blknum * BLCKSZ;
size_t read_len = 0;
ssize_t read_len = 0;
/* read the block */
read_len = fio_pread(in, page, offset, horizon_lsn);
@ -1060,22 +1060,22 @@ get_gz_error(gzFile gzf, int errnum)
* Copy file attributes
*/
static void
copy_meta(const char *from_path, const char *to_path, bool unlink_on_error, fio_location location)
copy_meta(const char *from_path, fio_location from_location, const char *to_path, fio_location to_location, bool unlink_on_error)
{
struct stat st;
if (stat(from_path, &st) == -1)
if (fio_stat(from_path, &st, true, from_location) == -1)
{
if (unlink_on_error)
fio_unlink(to_path, location);
fio_unlink(to_path, to_location);
elog(ERROR, "Cannot stat file \"%s\": %s",
from_path, strerror(errno));
}
if (fio_chmod(to_path, st.st_mode, location) == -1)
if (fio_chmod(to_path, st.st_mode, to_location) == -1)
{
if (unlink_on_error)
fio_unlink(to_path, location);
fio_unlink(to_path, to_location);
elog(ERROR, "Cannot change mode of file \"%s\": %s",
to_path, strerror(errno));
}
@ -1227,7 +1227,7 @@ push_wal_file(const char *from_path, const char *to_path, bool is_compress,
}
/* update file permission. */
copy_meta(from_path, to_path_temp, true, FIO_BACKUP_HOST);
copy_meta(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true);
if (fio_rename(to_path_temp, to_path_p, FIO_BACKUP_HOST) < 0)
{
@ -1394,7 +1394,7 @@ get_wal_file(const char *from_path, const char *to_path)
}
/* update file permission. */
copy_meta(from_path_p, to_path_temp, true, FIO_DB_HOST);
copy_meta(from_path_p, FIO_BACKUP_HOST, to_path_temp, FIO_DB_HOST, true);
if (fio_rename(to_path_temp, to_path, FIO_DB_HOST) < 0)
{

View File

@ -471,11 +471,13 @@ ssize_t fio_write(int fd, void const* buf, size_t size)
}
/* Read data from stdio file */
size_t fio_fread(FILE* f, void* buf, size_t size)
ssize_t fio_fread(FILE* f, void* buf, size_t size)
{
return fio_is_remote_file(f)
? fio_read(fio_fileno(f), buf, size)
: fread(buf, 1, size, f);
size_t rc;
if (fio_is_remote_file(f))
return fio_read(fio_fileno(f), buf, size);
rc = fread(buf, 1, size, f);
return rc == 0 && !feof(f) ? -1 : rc;
}
/* Read data from file */

View File

@ -61,7 +61,7 @@ extern void fio_communicate(int in, int out);
extern FILE* fio_fopen(char const* name, char const* mode, fio_location location);
extern size_t fio_fwrite(FILE* f, void const* buf, size_t size);
extern size_t fio_fread(FILE* f, void* buf, size_t size);
extern ssize_t fio_fread(FILE* f, void* buf, size_t size);
extern int fio_pread(FILE* f, void* buf, off_t offs, XLogRecPtr horizon_lsn);
extern int fio_fprintf(FILE* f, char const* arg, ...) __attribute__((format(printf, 2, 3)));
extern int fio_fflush(FILE* f);

View File

@ -1,391 +0,0 @@
/*-------------------------------------------------------------------------
*
* walmethods.c - implementations of different ways to write received wal
*
* NOTE! The caller must ensure that only one method is instantiated in
* any given program, and that it's only instantiated once!
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/walmethods.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#include "pgtar.h"
#include "common/file_utils.h"
#include "receivelog.h"
#include "streamutil.h"
#include "pg_probackup.h"
/* Size of zlib buffer for .tar.gz */
#define ZLIB_OUT_SIZE 4096
/*-------------------------------------------------------------------------
* WalDirectoryMethod - write wal to a directory looking like pg_wal
*-------------------------------------------------------------------------
*/
/*
* Global static data for this method
*/
typedef struct DirectoryMethodData
{
char *basedir;
int compression;
bool sync;
parray *files_list;
} DirectoryMethodData;
static DirectoryMethodData *dir_data = NULL;
/*
* Local file handle
*/
typedef struct DirectoryMethodFile
{
int fd;
off_t currpos;
char *pathname;
char *fullpath;
char *temp_suffix;
#ifdef HAVE_LIBZ
gzFile gzfp;
int gz_tmp;
#endif
} DirectoryMethodFile;
static const char *
dir_getlasterror(void)
{
/* Directory method always sets errno, so just use strerror */
return strerror(errno);
}
static Walfile
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
{
char tmppath[MAXPGPATH];
int fd = -1;
DirectoryMethodFile *f;
#ifdef HAVE_LIBZ
gzFile gzfp = NULL;
int gz_tmp = -1;
#endif
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
dir_data->basedir, pathname,
dir_data->compression > 0 ? ".gz" : "",
temp_suffix ? temp_suffix : "");
/*
* Open a file for non-compressed as well as compressed files. Tracking
* the file descriptor is important for dir_sync() method as gzflush()
* does not do any system calls to fsync() to make changes permanent on
* disk.
*/
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
{
gzfp = fio_gzopen(tmppath, "wb", &gz_tmp, FIO_BACKUP_HOST);
if (gzfp == NULL)
{
return NULL;
}
if (gzsetparams(gzfp, dir_data->compression,
Z_DEFAULT_STRATEGY) != Z_OK)
{
gzclose(gzfp);
return NULL;
}
}
else
#endif
{
fd = fio_open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, FIO_BACKUP_HOST);
if (fd < 0)
return NULL;
}
/* Do pre-padding on non-compressed files */
if (pad_to_size && dir_data->compression == 0)
{
char zerobuf[XLOG_BLCKSZ];
int bytes;
memset(zerobuf, 0, XLOG_BLCKSZ);
for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
{
errno = 0;
if (fio_write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
int save_errno = errno;
fio_close(fd);
/*
* If write didn't set errno, assume problem is no disk space.
*/
errno = save_errno ? save_errno : ENOSPC;
return NULL;
}
}
if (fio_seek(fd, 0) != 0)
{
int save_errno = errno;
fio_close(fd);
errno = save_errno;
return NULL;
}
}
/*
* fsync WAL file and containing directory, to ensure the file is
* persistently created and zeroed (if padded). That's particularly
* important when using synchronous mode, where the file is modified and
* fsynced in-place, without a directory fsync.
*/
if (!remote_agent && dir_data->sync)
{
if (fsync_fname(tmppath, false, progname) != 0 ||
fsync_parent_path(tmppath, progname) != 0)
{
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
gzclose(gzfp);
else
#endif
close(fd);
return NULL;
}
}
f = pg_malloc0(sizeof(DirectoryMethodFile));
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
{
f->gzfp = gzfp;
f->gz_tmp = gz_tmp;
}
#endif
f->fd = fd;
f->currpos = 0;
f->pathname = pg_strdup(pathname);
f->fullpath = pg_strdup(tmppath);
if (temp_suffix)
f->temp_suffix = pg_strdup(temp_suffix);
return f;
}
static ssize_t
dir_write(Walfile f, const void *buf, size_t count)
{
ssize_t r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
Assert(f != NULL);
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
r = (ssize_t) gzwrite(df->gzfp, buf, count);
else
#endif
r = fio_write(df->fd, buf, count);
if (r > 0)
df->currpos += r;
return r;
}
static off_t
dir_get_current_pos(Walfile f)
{
Assert(f != NULL);
/* Use a cached value to prevent lots of reseeks */
return ((DirectoryMethodFile *) f)->currpos;
}
static int
dir_close(Walfile f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
char tmppath[MAXPGPATH];
char tmppath2[MAXPGPATH];
Assert(f != NULL);
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
r = fio_gzclose(df->gzfp, df->fullpath, df->gz_tmp);
else
#endif
r = fio_close(df->fd);
if (r == 0)
{
char const* file_path = NULL;
/* Build path to the current version of the file */
if (method == CLOSE_NORMAL && df->temp_suffix)
{
/*
* If we have a temp prefix, normal operation is to rename the
* file.
*/
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "",
df->temp_suffix);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "");
r = fio_rename(tmppath, tmppath2, FIO_BACKUP_HOST);
file_path = tmppath2;
}
else if (method == CLOSE_UNLINK)
{
/* Unlink the file once it's closed */
snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
dir_data->basedir, df->pathname,
dir_data->compression > 0 ? ".gz" : "",
df->temp_suffix ? df->temp_suffix : "");
r = fio_unlink(tmppath, FIO_BACKUP_HOST);
}
else
{
/*
* Else either CLOSE_NORMAL and no temp suffix, or
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
file_path = df->fullpath;
if (dir_data->sync && !remote_agent)
{
r = fsync_fname(df->fullpath, false, progname);
if (r == 0)
r = fsync_parent_path(df->fullpath, progname);
}
}
if (file_path && dir_data->files_list)
{
pgFile* file = pgFileNew(file_path, false, FIO_BACKUP_HOST);
Assert(file != NULL);
parray_append(dir_data->files_list, file);
}
}
pg_free(df->pathname);
pg_free(df->fullpath);
if (df->temp_suffix)
pg_free(df->temp_suffix);
pg_free(df);
return r;
}
static int
dir_sync(Walfile f)
{
Assert(f != NULL);
if (!dir_data->sync)
return 0;
#ifdef HAVE_LIBZ
if (dir_data->compression > 0)
{
if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
return -1;
}
#endif
return fio_flush(((DirectoryMethodFile *) f)->fd);
}
static ssize_t
dir_get_file_size(const char *pathname)
{
struct stat statbuf;
char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
if (fio_stat(tmppath, &statbuf, true, FIO_BACKUP_HOST) != 0)
return -1;
return statbuf.st_size;
}
static bool
dir_existsfile(const char *pathname)
{
char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
return fio_access(tmppath, F_OK, FIO_BACKUP_HOST) == 0;
}
static bool
dir_finish(void)
{
if (dir_data->sync && !remote_agent)
{
/*
* Files are fsynced when they are closed, but we need to fsync the
* directory entry here as well.
*/
if (fsync_fname(dir_data->basedir, true, progname) != 0)
return false;
}
return true;
}
WalWriteMethod *
CreateWalDirectoryMethod(const char *basedir, int compression, bool sync, parray* files_list)
{
WalWriteMethod *method;
method = pg_malloc0(sizeof(WalWriteMethod));
method->open_for_write = dir_open_for_write;
method->write = dir_write;
method->get_current_pos = dir_get_current_pos;
method->get_file_size = dir_get_file_size;
method->close = dir_close;
method->sync = dir_sync;
method->existsfile = dir_existsfile;
method->finish = dir_finish;
method->getlasterror = dir_getlasterror;
dir_data = pg_malloc0(sizeof(DirectoryMethodData));
dir_data->compression = compression;
dir_data->basedir = pg_strdup(basedir);
dir_data->sync = sync;
dir_data->files_list = files_list;
return method;
}
void
FreeWalDirectoryMethod(void)
{
pg_free(dir_data->basedir);
pg_free(dir_data);
}

View File

@ -1,92 +0,0 @@
/*-------------------------------------------------------------------------
*
* walmethods.h
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/walmethods.h
*-------------------------------------------------------------------------
*/
#include "utils/parray.h"
typedef void *Walfile;
typedef enum
{
CLOSE_NORMAL,
CLOSE_UNLINK,
CLOSE_NO_RENAME
} WalCloseMethod;
/*
* A WalWriteMethod structure represents the different methods used
* to write the streaming WAL as it's received.
*
* All methods that have a failure return indicator will set state
* allowing the getlasterror() method to return a suitable message.
* Commonly, errno is this state (or part of it); so callers must take
* care not to clobber errno between a failed method call and use of
* getlasterror() to retrieve the message.
*/
typedef struct WalWriteMethod WalWriteMethod;
struct WalWriteMethod
{
/*
* Open a target file. Returns Walfile, or NULL if open failed. If a temp
* suffix is specified, a file with that name will be opened, and then
* automatically renamed in close(). If pad_to_size is specified, the file
* will be padded with NUL up to that size, if supported by the Walmethod.
*/
Walfile (*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
/*
* Close an open Walfile, using one or more methods for handling automatic
* unlinking etc. Returns 0 on success, other values for error.
*/
int (*close) (Walfile f, WalCloseMethod method);
/* Check if a file exist */
bool (*existsfile) (const char *pathname);
/* Return the size of a file, or -1 on failure. */
ssize_t (*get_file_size) (const char *pathname);
/*
* Write count number of bytes to the file, and return the number of bytes
* actually written or -1 for error.
*/
ssize_t (*write) (Walfile f, const void *buf, size_t count);
/* Return the current position in a file or -1 on error */
off_t (*get_current_pos) (Walfile f);
/*
* fsync the contents of the specified file. Returns 0 on success.
*/
int (*sync) (Walfile f);
/*
* Clean up the Walmethod, closing any shared resources. For methods like
* tar, this includes writing updated headers. Returns true if the
* close/write/sync of shared resources succeeded, otherwise returns false
* (but the resources are still closed).
*/
bool (*finish) (void);
/* Return a text for the last error in this Walfile */
const char *(*getlasterror) (void);
};
/*
* Available WAL methods:
* - WalDirectoryMethod - write WAL to regular files in a standard pg_wal
* - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_wal
* (only implements the methods required for pg_basebackup,
* not all those required for pg_receivewal)
*/
WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
int compression, bool sync, parray* file_list);
/* Cleanup routines for previously-created methods */
void FreeWalDirectoryMethod(void);