Author: Noah Misch Commit: Noah Misch Make dblink interruptible, via new libpqsrv APIs. This replaces dblink's blocking libpq calls, allowing cancellation and allowing DROP DATABASE (of a database not involved in the query). Apart from explicit dblink_cancel_query() calls, dblink still doesn't cancel the remote side. The replacement for the blocking calls consists of new, general-purpose query execution wrappers in the libpqsrv facility. Out-of-tree extensions should adopt these. Use them in postgres_fdw, replacing a local implementation from which the libpqsrv implementation derives. This is a bug fix for dblink. Code inspection identified the bug at least thirteen years ago, but user complaints have not appeared. Hence, no back-patch for now. Reviewed by FIXME. Discussion: https://postgr.es/m/FIXME diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 195b278..4624e53 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -133,6 +133,7 @@ static HTAB *remoteConnHash = NULL; /* custom wait event values, retrieved from shared memory */ static uint32 dblink_we_connect = 0; static uint32 dblink_we_get_conn = 0; +static uint32 dblink_we_get_result = 0; /* * Following is list that holds multiple remote connections. @@ -252,6 +253,9 @@ dblink_init(void) { if (!pconn) { + if (dblink_we_get_result == 0) + dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult"); + pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); pconn->conn = NULL; pconn->openCursorCount = 0; @@ -442,7 +446,7 @@ dblink_open(PG_FUNCTION_ARGS) /* If we are not in a transaction, start one */ if (PQtransactionStatus(conn) == PQTRANS_IDLE) { - res = PQexec(conn, "BEGIN"); + res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result); if (PQresultStatus(res) != PGRES_COMMAND_OK) dblink_res_internalerror(conn, res, "begin error"); PQclear(res); @@ -461,7 +465,7 @@ dblink_open(PG_FUNCTION_ARGS) (rconn->openCursorCount)++; appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql); - res = PQexec(conn, buf.data); + res = libpqsrv_exec(conn, buf.data, dblink_we_get_result); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { dblink_res_error(conn, conname, res, fail, @@ -530,7 +534,7 @@ dblink_close(PG_FUNCTION_ARGS) appendStringInfo(&buf, "CLOSE %s", curname); /* close the cursor */ - res = PQexec(conn, buf.data); + res = libpqsrv_exec(conn, buf.data, dblink_we_get_result); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) { dblink_res_error(conn, conname, res, fail, @@ -550,7 +554,7 @@ dblink_close(PG_FUNCTION_ARGS) { rconn->newXactForCursor = false; - res = PQexec(conn, "COMMIT"); + res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result); if (PQresultStatus(res) != PGRES_COMMAND_OK) dblink_res_internalerror(conn, res, "commit error"); PQclear(res); @@ -632,7 +636,7 @@ dblink_fetch(PG_FUNCTION_ARGS) * PGresult will be long-lived even though we are still in a short-lived * memory context. */ - res = PQexec(conn, buf.data); + res = libpqsrv_exec(conn, buf.data, dblink_we_get_result); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) @@ -780,7 +784,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) else { /* async result retrieval, do it the old way */ - PGresult *res = PQgetResult(conn); + PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result); /* NULL means we're all done with the async results */ if (res) @@ -1088,7 +1092,8 @@ materializeQueryResult(FunctionCallInfo fcinfo, PQclear(sinfo.last_res); PQclear(sinfo.cur_res); /* and clear out any pending data in libpq */ - while ((res = PQgetResult(conn)) != NULL) + while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) != + NULL) PQclear(res); PG_RE_THROW(); } @@ -1115,7 +1120,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql) { CHECK_FOR_INTERRUPTS(); - sinfo->cur_res = PQgetResult(conn); + sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result); if (!sinfo->cur_res) break; @@ -1443,7 +1448,7 @@ dblink_exec(PG_FUNCTION_ARGS) if (!conn) dblink_conn_not_avail(conname); - res = PQexec(conn, sql); + res = libpqsrv_exec(conn, sql, dblink_we_get_result); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) @@ -2740,8 +2745,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res, /* * If we don't get a message from the PGresult, try the PGconn. This is - * needed because for connection-level failures, PQexec may just return - * NULL, not a PGresult at all. + * needed because for connection-level failures, PQgetResult may just + * return NULL, not a PGresult at all. */ if (message_primary == NULL) message_primary = pchomp(PQerrorMessage(conn)); diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 5800c6a..8755244 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -187,6 +187,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { HASHCTL ctl; + if (pgfdw_we_get_result == 0) + pgfdw_we_get_result = + WaitEventExtensionNew("PostgresFdwGetResult"); + ctl.keysize = sizeof(ConnCacheKey); ctl.entrysize = sizeof(ConnCacheEntry); ConnectionHash = hash_create("postgres_fdw connections", 8, @@ -716,7 +720,7 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) */ if (consume_input && !PQconsumeInput(conn)) pgfdw_report_error(ERROR, NULL, conn, false, sql); - res = pgfdw_get_result(conn, sql); + res = pgfdw_get_result(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); @@ -819,7 +823,9 @@ GetPrepStmtNumber(PGconn *conn) /* * Submit a query and wait for the result. * - * This function is interruptible by signals. + * Since we don't use non-blocking mode, this can't process interrupts while + * pushing the query text to the server. That risk is relatively small, so we + * ignore that for now. * * Caller is responsible for the error handling on the result. */ @@ -830,81 +836,20 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) if (state && state->pendingAreq) process_pending_request(state->pendingAreq); - /* - * Submit a query. Since we don't use non-blocking mode, this also can - * block. But its risk is relatively small, so we ignore that for now. - */ if (!PQsendQuery(conn, query)) - pgfdw_report_error(ERROR, NULL, conn, false, query); - - /* Wait for the result. */ - return pgfdw_get_result(conn, query); + return NULL; + return pgfdw_get_result(conn); } /* - * Wait for the result from a prior asynchronous execution function call. - * - * This function offers quick responsiveness by checking for any interruptions. - * - * This function emulates PQexec()'s behavior of returning the last result - * when there are many. + * Wrap libpqsrv_get_result_last(), adding wait event. * * Caller is responsible for the error handling on the result. */ PGresult * -pgfdw_get_result(PGconn *conn, const char *query) +pgfdw_get_result(PGconn *conn) { - PGresult *volatile last_res = NULL; - - /* In what follows, do not leak any PGresults on an error. */ - PG_TRY(); - { - for (;;) - { - PGresult *res; - - while (PQisBusy(conn)) - { - int wc; - - /* first time, allocate or get the custom wait event */ - if (pgfdw_we_get_result == 0) - pgfdw_we_get_result = WaitEventExtensionNew("PostgresFdwGetResult"); - - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - -1L, pgfdw_we_get_result); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (wc & WL_SOCKET_READABLE) - { - if (!PQconsumeInput(conn)) - pgfdw_report_error(ERROR, NULL, conn, false, query); - } - } - - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ - - PQclear(last_res); - last_res = res; - } - } - PG_CATCH(); - { - PQclear(last_res); - PG_RE_THROW(); - } - PG_END_TRY(); - - return last_res; + return libpqsrv_get_result_last(conn, pgfdw_we_get_result); } /* @@ -945,8 +890,8 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, /* * If we don't get a message from the PGresult, try the PGconn. This - * is needed because for connection-level failures, PQexec may just - * return NULL, not a PGresult at all. + * is needed because for connection-level failures, PQgetResult may + * just return NULL, not a PGresult at all. */ if (message_primary == NULL) message_primary = pchomp(PQerrorMessage(conn)); @@ -1046,7 +991,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ if (entry->have_prep_stmt && entry->have_error) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); + res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL", + NULL); PQclear(res); } entry->have_prep_stmt = false; diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c index 09fd489..5084aa6 100644 --- a/contrib/postgres_fdw/deparse.c +++ b/contrib/postgres_fdw/deparse.c @@ -3673,7 +3673,7 @@ appendOrderBySuffix(Oid sortop, Oid sortcoltype, bool nulls_first, * Print the representation of a parameter to be sent to the remote side. * * Note: we always label the Param's type explicitly rather than relying on - * transmitting a numeric type OID in PQexecParams(). This allows us to + * transmitting a numeric type OID in PQsendQueryParams(). This allows us to * avoid assuming that types have the same OIDs on the remote side as they * do locally --- they need only have the same names. */ diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 6de2bec..7d509c6 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -3759,7 +3759,7 @@ create_cursor(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(conn, buf.data); + res = pgfdw_get_result(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); @@ -3809,7 +3809,7 @@ fetch_more_data(ForeignScanState *node) * The query was already sent by an earlier call to * fetch_more_data_begin. So now we just fetch the result. */ - res = pgfdw_get_result(conn, fsstate->query); + res = pgfdw_get_result(conn); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); @@ -4158,7 +4158,7 @@ execute_foreign_modify(EState *estate, * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->conn); if (PQresultStatus(res) != (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); @@ -4228,7 +4228,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = pgfdw_get_result(fmstate->conn, fmstate->query); + res = pgfdw_get_result(fmstate->conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); PQclear(res); @@ -4570,7 +4570,7 @@ execute_dml_stmt(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + dmstate->result = pgfdw_get_result(dmstate->conn); if (PQresultStatus(dmstate->result) != (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 47157ac..3e94d51 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -158,7 +158,7 @@ extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern void do_sql_command(PGconn *conn, const char *sql); -extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); +extern PGresult *pgfdw_get_result(PGconn *conn); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml index e8de5a6..81f3598 100644 --- a/doc/src/sgml/dblink.sgml +++ b/doc/src/sgml/dblink.sgml @@ -37,6 +37,15 @@ + + + DblinkGetResult + + + Waiting to receive the results of a query from a remote server. + + + diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1f..2825001 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -648,12 +648,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Send a query and wait for the results by using the asynchronous libpq * functions and socket readiness events. * - * We must not use the regular blocking libpq functions like PQexec() - * since they are uninterruptible by signals on some platforms, such as - * Windows. - * - * The function is modeled on PQexec() in libpq, but only implements - * those parts that are in use in the walreceiver api. + * The function is modeled on libpqsrv_exec(), with the behavior difference + * being that it calls ProcessWalRcvInterrupts(). As an optimization, it + * skips try/catch, since all errors terminate the process. * * May return NULL, rather than an error result, on failure. */ diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index 41e3bb4..a4b3e80 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -49,6 +49,8 @@ static inline void libpqsrv_connect_prepare(void); static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info); /* @@ -239,4 +241,129 @@ libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) PG_END_TRY(); } +/* + * PQexec() wrapper that processes interrupts. + * + * Unless PQsetnonblocking(conn, 1) is in effect, this can't process + * interrupts while pushing the query text to the server. Consider that + * setting if query strings can be long relative to TCP buffer size. + * + * This has the preconditions of PQsendQuery(), not those of PQexec(). Most + * notably, PQexec() would silently discard any prior query results. + */ +static inline PGresult * +libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info) +{ + if (!PQsendQuery(conn, query)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * PQexecParams() wrapper that processes interrupts. + * + * See notes at libpqsrv_exec(). + */ +static inline PGresult * +libpqsrv_exec_params(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + uint32 wait_event_info) +{ + if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * Like PQexec(), loop over PQgetResult() until it returns NULL or another + * terminal state. Return the last non-NULL result or the terminal state. + */ +static inline PGresult * +libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info) +{ + PGresult *volatile lastResult = NULL; + + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); + { + for (;;) + { + /* Wait for, and collect, the next PGresult. */ + PGresult *result; + + result = libpqsrv_get_result(conn, wait_event_info); + if (result == NULL) + break; /* query is complete, or failure */ + + /* + * Emulate PQexec()'s behavior of returning the last result when + * there are many. + */ + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || + PQstatus(conn) == CONNECTION_BAD) + break; + } + } + PG_CATCH(); + { + PQclear(lastResult); + PG_RE_THROW(); + } + PG_END_TRY(); + + return lastResult; +} + +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static inline PGresult * +libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(conn)) + { + int rc; + + rc = WaitLatchOrSocket(MyLatch, + WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | + WL_SOCKET_READABLE, + PQsocket(conn), + 0, + wait_event_info); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(conn) == 0) + { + /* trouble; expect PQgetResult() to return NULL */ + break; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(conn); +} + #endif /* LIBPQ_BE_FE_HELPERS_H */