diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 76994f3820..8bbad61c3b 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -74,6 +74,7 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ +static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); @@ -108,9 +109,10 @@ PGconn * GetConnection(UserMapping *user, bool will_prep_stmt) { bool found; - volatile bool retry_conn = false; + bool retry = false; ConnCacheEntry *entry; ConnCacheKey key; + MemoryContext ccxt = CurrentMemoryContext; /* First time through, initialize connection cache hashtable */ if (ConnectionHash == NULL) @@ -160,23 +162,14 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); -retry: - /* * If the connection needs to be remade due to invalidation, disconnect as - * soon as we're out of all transactions. Also, if previous attempt to - * start new remote transaction failed on the cached connection, - * disconnect it to retry a new connection. + * soon as we're out of all transactions. */ - if ((entry->conn != NULL && entry->invalidated && - entry->xact_depth == 0) || retry_conn) + if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) { - if (retry_conn) - elog(DEBUG3, "closing connection %p to reestablish a new one", - entry->conn); - else - elog(DEBUG3, "closing connection %p for option changes to take effect", - entry->conn); + elog(DEBUG3, "closing connection %p for option changes to take effect", + entry->conn); disconnect_pg_server(entry); } @@ -186,58 +179,67 @@ retry: * will remain in a valid empty state, ie conn == NULL.) */ if (entry->conn == NULL) - { - ForeignServer *server = GetForeignServer(user->serverid); - - /* Reset all transient state fields, to be sure all are clean */ - entry->xact_depth = 0; - entry->have_prep_stmt = false; - entry->have_error = false; - entry->changing_xact_state = false; - entry->invalidated = false; - entry->server_hashvalue = - GetSysCacheHashValue1(FOREIGNSERVEROID, - ObjectIdGetDatum(server->serverid)); - entry->mapping_hashvalue = - GetSysCacheHashValue1(USERMAPPINGOID, - ObjectIdGetDatum(user->umid)); - - /* Now try to make the connection */ - entry->conn = connect_pg_server(server, user); - - elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", - entry->conn, server->servername, user->umid, user->userid); - } + make_new_connection(entry, user); /* * We check the health of the cached connection here when starting a new - * remote transaction. If a broken connection is detected in the first - * attempt, we try to reestablish a new connection. If broken connection - * is detected again here, we give up getting a connection. + * remote transaction. If a broken connection is detected, we try to + * reestablish a new connection later. */ PG_TRY(); { /* Start a new transaction or subtransaction if needed. */ begin_remote_xact(entry); - retry_conn = false; } PG_CATCH(); { - if (PQstatus(entry->conn) != CONNECTION_BAD || - entry->xact_depth > 0 || - retry_conn) + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + + /* + * The error code ERRCODE_CONNECTION_FAILURE indicates a broken + * connection. If it's detected when starting a new remote transaction + * (not subtransaction), new connection will be reestablished later. + */ + if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE || + entry->xact_depth > 0) + { + MemoryContextSwitchTo(ecxt); PG_RE_THROW(); - retry_conn = true; + } + + /* Clean up the error state */ + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + + retry = true; } PG_END_TRY(); - if (retry_conn) + /* + * If a broken connection is detected, disconnect it, reestablish a new + * connection and retry a new remote transaction. If connection failure + * is reported again, we give up getting a connection. + */ + if (retry) { + Assert(PQstatus(entry->conn) == CONNECTION_BAD); + Assert(entry->xact_depth == 0); + ereport(DEBUG3, (errmsg_internal("could not start remote transaction on connection %p", entry->conn)), errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn)))); - goto retry; + + elog(DEBUG3, "closing connection %p to reestablish a new one", + entry->conn); + disconnect_pg_server(entry); + + if (entry->conn == NULL) + make_new_connection(entry, user); + + begin_remote_xact(entry); } /* Remember if caller will prepare statements */ @@ -246,6 +248,37 @@ retry: return entry->conn; } +/* + * Reset all transient state fields in the cached connection entry and + * establish new connection to the remote server. + */ +static void +make_new_connection(ConnCacheEntry *entry, UserMapping *user) +{ + ForeignServer *server = GetForeignServer(user->serverid); + + Assert(entry->conn == NULL); + + /* Reset all transient state fields, to be sure all are clean */ + entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; + entry->changing_xact_state = false; + entry->invalidated = false; + entry->server_hashvalue = + GetSysCacheHashValue1(FOREIGNSERVEROID, + ObjectIdGetDatum(server->serverid)); + entry->mapping_hashvalue = + GetSysCacheHashValue1(USERMAPPINGOID, + ObjectIdGetDatum(user->umid)); + + /* Now try to make the connection */ + entry->conn = connect_pg_server(server, user); + + elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", + entry->conn, server->servername, user->umid, user->userid); +} + /* * Connect to remote server using specified server and user mapping properties. */