diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4aff315b7c..3fd1abb8f5 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -103,8 +103,8 @@ static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); -static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, - PGresult **result); +static bool pgfdw_get_result_with_timeout(PGconn *conn, TimestampTz endtime, + PGresult **result, const char *query); static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); @@ -729,53 +729,13 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) PGresult * pgfdw_get_result(PGconn *conn, const char *query) { - PGresult *volatile last_res = NULL; + PGresult *res; + bool timed_out; - /* In what follows, do not leak any PGresults on an error. */ - PG_TRY(); - { - for (;;) - { - PGresult *res; + timed_out = pgfdw_get_result_with_timeout(conn, -1, &res, query); + Assert(!timed_out); - while (PQisBusy(conn)) - { - int wc; - - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - -1L, PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (wc & WL_SOCKET_READABLE) - { - if (!PQconsumeInput(conn)) - pgfdw_report_error(ERROR, NULL, conn, false, query); - } - } - - res = PQgetResult(conn); - if (res == NULL) - break; /* query is complete */ - - PQclear(last_res); - last_res = res; - } - } - PG_CATCH(); - { - PQclear(last_res); - PG_RE_THROW(); - } - PG_END_TRY(); - - return last_res; + return res; } /* @@ -1179,7 +1139,7 @@ pgfdw_cancel_query(PGconn *conn) } /* Get and discard the result of the query. */ - if (pgfdw_get_cleanup_result(conn, endtime, &result)) + if (pgfdw_get_result_with_timeout(conn, endtime, &result, NULL)) return false; PQclear(result); @@ -1223,7 +1183,7 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) } /* Get the result of the query. */ - if (pgfdw_get_cleanup_result(conn, endtime, &result)) + if (pgfdw_get_result_with_timeout(conn, endtime, &result, query)) return false; /* Issue a warning if not successful. */ @@ -1248,10 +1208,16 @@ pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors) * Sets *result except in case of a timeout. */ static bool -pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result) +pgfdw_get_result_with_timeout(PGconn *conn, TimestampTz endtime, + PGresult **result, const char *query) { volatile bool timed_out = false; PGresult *volatile last_res = NULL; + int wakeEvents; + + wakeEvents = WL_LATCH_SET | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH; + if (endtime >= 0) + wakeEvents |= WL_TIMEOUT; /* In what follows, do not leak any PGresults on an error. */ PG_TRY(); @@ -1263,37 +1229,44 @@ pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result) while (PQisBusy(conn)) { int wc; - TimestampTz now = GetCurrentTimestamp(); - long cur_timeout; + long cur_timeout = -1; - /* If timeout has expired, give up, else get sleep time. */ - cur_timeout = TimestampDifferenceMilliseconds(now, endtime); - if (cur_timeout <= 0) + if (endtime >= 0) { - timed_out = true; - goto exit; - } + TimestampTz now = GetCurrentTimestamp(); - /* Sleep until there's something to do */ - wc = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_SOCKET_READABLE | - WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - PQsocket(conn), - cur_timeout, PG_WAIT_EXTENSION); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (wc & WL_SOCKET_READABLE) - { - if (!PQconsumeInput(conn)) + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) { - /* connection trouble; treat the same as a timeout */ timed_out = true; goto exit; } } + + /* Sleep until there's something to do */ + wc = WaitLatchOrSocket(MyLatch, wakeEvents, + PQsocket(conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Data available in socket? */ + if (wc & WL_SOCKET_READABLE) + { + if (!PQconsumeInput(conn)) + { + if (endtime >= 0) + { + /* connection trouble; treat the same as a timeout */ + timed_out = true; + goto exit; + } + else + pgfdw_report_error(ERROR, NULL, conn, false, query); + } + } } res = PQgetResult(conn);