diff --git a/src/backup.c b/src/backup.c index 140188fb..1bf4e02f 100644 --- a/src/backup.c +++ b/src/backup.c @@ -2708,7 +2708,10 @@ pg_ptrack_get_block(backup_files_args *arguments, sprintf(params[3], "%u", blknum); if (arguments->thread_backup_conn == NULL) + { arguments->thread_backup_conn = pgut_connect(pgut_dbname); + } + arguments->thread_cancel_conn = PQgetCancel(arguments->thread_backup_conn); //elog(LOG, "db %i pg_ptrack_get_block(%i, %i, %u)",dbOid, tblsOid, relOid, blknum); res = pgut_execute_parallel(arguments->thread_backup_conn, diff --git a/src/pg_probackup.h b/src/pg_probackup.h index c48c2357..743e670f 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -250,7 +250,7 @@ typedef struct parray *prev_backup_filelist; XLogRecPtr prev_backup_start_lsn; PGconn *thread_backup_conn; - PGconn *thread_cancel_conn; + PGcancel *thread_cancel_conn; } backup_files_args; /* diff --git a/src/utils/pgut.c b/src/utils/pgut.c index b1abbcab..2627c24d 100644 --- a/src/utils/pgut.c +++ b/src/utils/pgut.c @@ -54,8 +54,8 @@ static bool parse_pair(const char buffer[], char key[], char value[]); /* Connection routines */ static void init_cancel_handler(void); -static void on_before_exec(PGconn *conn); -static void on_after_exec(void); +static void on_before_exec(PGconn *conn, PGcancel *thread_cancel_conn); +static void on_after_exec(PGcancel *thread_cancel_conn); static void on_interrupt(void); static void on_cleanup(void); static void exit_or_abort(int exitcode); @@ -1512,7 +1512,7 @@ pgut_set_port(const char *new_port) PGresult * pgut_execute_parallel(PGconn* conn, - PGconn* cancel_conn, const char *query, + PGcancel* thread_cancel_conn, const char *query, int nParams, const char **params, bool text_result) { @@ -1540,7 +1540,7 @@ pgut_execute_parallel(PGconn* conn, return NULL; } - //on_before_exec(conn); + on_before_exec(conn, thread_cancel_conn); if (nParams == 0) res = PQexec(conn, query); else @@ -1550,7 +1550,7 @@ pgut_execute_parallel(PGconn* conn, * or one to obtain results in binary format. */ (text_result) ? 0 : 1); - //on_after_exec(); + on_after_exec(thread_cancel_conn); switch (PQresultStatus(res)) { @@ -1594,7 +1594,7 @@ pgut_execute(PGconn* conn, const char *query, int nParams, const char **params, return NULL; } - on_before_exec(conn); + on_before_exec(conn, NULL); if (nParams == 0) res = PQexec(conn, query); else @@ -1604,7 +1604,7 @@ pgut_execute(PGconn* conn, const char *query, int nParams, const char **params, * or one to obtain results in binary format. */ (text_result) ? 0 : 1); - on_after_exec(); + on_after_exec(NULL); switch (PQresultStatus(res)) { @@ -1745,7 +1745,7 @@ static CRITICAL_SECTION cancelConnLock; * Set cancel_conn to point to the current database connection. */ static void -on_before_exec(PGconn *conn) +on_before_exec(PGconn *conn, PGcancel *thread_cancel_conn) { PGcancel *old; @@ -1756,16 +1756,32 @@ on_before_exec(PGconn *conn) EnterCriticalSection(&cancelConnLock); #endif - /* Free the old one if we have one */ - old = cancel_conn; + if (thread_cancel_conn) + { + elog(WARNING, "Handle tread_cancel_conn. on_before_exec"); + old = thread_cancel_conn; - /* be sure handle_sigint doesn't use pointer while freeing */ - cancel_conn = NULL; + /* be sure handle_sigint doesn't use pointer while freeing */ + thread_cancel_conn = NULL; - if (old != NULL) - PQfreeCancel(old); + if (old != NULL) + PQfreeCancel(old); - cancel_conn = PQgetCancel(conn); + thread_cancel_conn = PQgetCancel(conn); + } + else + { + /* Free the old one if we have one */ + old = cancel_conn; + + /* be sure handle_sigint doesn't use pointer while freeing */ + cancel_conn = NULL; + + if (old != NULL) + PQfreeCancel(old); + + cancel_conn = PQgetCancel(conn); + } #ifdef WIN32 LeaveCriticalSection(&cancelConnLock); @@ -1778,7 +1794,7 @@ on_before_exec(PGconn *conn) * Free the current cancel connection, if any, and set to NULL. */ static void -on_after_exec(void) +on_after_exec(PGcancel *thread_cancel_conn) { PGcancel *old; @@ -1789,14 +1805,27 @@ on_after_exec(void) EnterCriticalSection(&cancelConnLock); #endif - old = cancel_conn; + if (thread_cancel_conn) + { + elog(WARNING, "Handle tread_cancel_conn. on_after_exec"); + old = thread_cancel_conn; - /* be sure handle_sigint doesn't use pointer while freeing */ - cancel_conn = NULL; + /* be sure handle_sigint doesn't use pointer while freeing */ + thread_cancel_conn = NULL; - if (old != NULL) - PQfreeCancel(old); + if (old != NULL) + PQfreeCancel(old); + } + else + { + old = cancel_conn; + /* be sure handle_sigint doesn't use pointer while freeing */ + cancel_conn = NULL; + + if (old != NULL) + PQfreeCancel(old); + } #ifdef WIN32 LeaveCriticalSection(&cancelConnLock); #endif diff --git a/src/utils/pgut.h b/src/utils/pgut.h index 1a35a48c..def0f9d2 100644 --- a/src/utils/pgut.h +++ b/src/utils/pgut.h @@ -127,7 +127,7 @@ extern PGconn *pgut_connect_replication_extended(const char *pghost, const char extern void pgut_disconnect(PGconn *conn); extern PGresult *pgut_execute(PGconn* conn, const char *query, int nParams, const char **params, bool text_result); -extern PGresult *pgut_execute_parallel(PGconn* conn, PGconn* cancel_conn, +extern PGresult *pgut_execute_parallel(PGconn* conn, PGcancel* thread_cancel_conn, const char *query, int nParams, const char **params, bool text_result); extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);