1
0
mirror of https://github.com/postgrespro/pg_probackup.git synced 2025-01-23 11:45:36 +02:00

Add load balance for backup threads.

This commit is contained in:
stalkerg 2016-09-02 19:31:49 +03:00
parent 3eb1013baa
commit 02ac17bb6b
4 changed files with 41 additions and 18 deletions

View File

@ -47,8 +47,6 @@ typedef struct
parray *files;
parray *prev_files;
const XLogRecPtr *lsn;
unsigned int start_file_idx;
unsigned int end_file_idx;
} backup_files_args;
/*
@ -106,6 +104,7 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
pthread_t stream_thread;
backup_files_args *backup_threads_args[num_threads];
/* repack the options */
bool smooth_checkpoint = bkupopt.smooth_checkpoint;
pgBackup *prev_backup = NULL;
@ -315,27 +314,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
if (num_threads < 1)
num_threads = 1;
/* sort by size for load balancing */
parray_qsort(backup_files_list, pgFileCompareSize);
/* init thread args with own file lists */
for (i = 0; i < num_threads; i++)
{
backup_files_args *arg = pg_malloc(sizeof(backup_files_args));
arg->from_root = pgdata;
arg->to_root = path;
arg->files = backup_files_list;
arg->files = parray_new();
arg->prev_files = prev_files;
arg->lsn = lsn;
arg->start_file_idx = i * (parray_num(backup_files_list)/num_threads);
if (i == num_threads - 1)
arg->end_file_idx = parray_num(backup_files_list);
else
arg->end_file_idx = (i + 1) * (parray_num(backup_files_list)/num_threads);
if (verbose)
elog(WARNING, "Start thread for start_file_idx:%i end_file_idx:%i num:%li",
arg->start_file_idx,
arg->end_file_idx,
parray_num(backup_files_list));
backup_threads_args[i] = arg;
pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, arg);
}
/* balance load between threads */
for (i = 0; i < parray_num(backup_files_list); i++)
{
int cur_thread = i % num_threads;
parray_append(backup_threads_args[cur_thread]->files,
parray_get(backup_files_list, i));
}
/* Run threads */
for (i = 0; i < num_threads; i++)
{
if (verbose)
elog(WARNING, "Start thread num:%li", parray_num(backup_threads_args[i]->files));
pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, backup_threads_args[i]);
}
/* Wait theads */
@ -911,7 +918,7 @@ backup_files(void *arg)
gettimeofday(&tv, NULL);
/* backup a file or create a directory */
for (i = arguments->start_file_idx; i < arguments->end_file_idx; i++)
for (i = 0; i < parray_num(arguments->files); i++)
{
int ret;
struct stat buf;

15
dir.c
View File

@ -193,6 +193,21 @@ pgFileComparePathDesc(const void *f1, const void *f2)
return -pgFileComparePath(f1, f2);
}
/* Compare two pgFile with their size */
int
pgFileCompareSize(const void *f1, const void *f2)
{
pgFile *f1p = *(pgFile **)f1;
pgFile *f2p = *(pgFile **)f2;
if (f1p->size > f2p->size)
return 1;
else if (f1p->size < f2p->size)
return -1;
else
return 0;
}
/* Compare two pgFile with their modify timestamp. */
int
pgFileCompareMtime(const void *f1, const void *f2)

View File

@ -172,8 +172,8 @@ main(int argc, char *argv[])
elog(ERROR, "delete command needs ARCLOG_PATH (-A, --arclog-path) to be set");
/* setup exclusion list for file search */
for (i = 0; pgdata_exclude[i]; i++) /* find first empty slot */
;
for (i = 0; pgdata_exclude[i]; i++); /* find first empty slot */
if (arclog_path)
pgdata_exclude[i++] = arclog_path;

View File

@ -284,6 +284,7 @@ extern void pgFileFree(void *file);
extern pg_crc32 pgFileGetCRC(pgFile *file);
extern int pgFileComparePath(const void *f1, const void *f2);
extern int pgFileComparePathDesc(const void *f1, const void *f2);
extern int pgFileCompareSize(const void *f1, const void *f2);
extern int pgFileCompareMtime(const void *f1, const void *f2);
extern int pgFileCompareMtimeDesc(const void *f1, const void *f2);