From debdffade7abcdbf29031bda6c8359a89776ad36 Mon Sep 17 00:00:00 2001
From: Alexey Kondratov <kondratov.aleksey@gmail.com>
Date: Fri, 7 Aug 2020 16:50:57 +0300
Subject: [PATCH] Add postgres_fdw.use_twophase GUC to use 2PC for transactions
 involving several servers.

---
 contrib/postgres_fdw/connection.c   | 234 +++++++++++++++++++++++++---
 contrib/postgres_fdw/postgres_fdw.c |  17 ++
 contrib/postgres_fdw/postgres_fdw.h |   2 +
 3 files changed, 228 insertions(+), 25 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 08daf26fdf0..d18fdd1f94e 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -66,6 +66,20 @@ typedef struct ConnCacheEntry
  */
 static HTAB *ConnectionHash = NULL;
 
+/*
+ * FdwTransactionState
+ *
+ * Holds number of open remote transactions and shared state
+ * needed for all connection entries.
+ */
+typedef struct FdwTransactionState
+{
+	char	   *gid;
+	int			nparticipants;
+	bool		two_phase_commit;
+} FdwTransactionState;
+static FdwTransactionState *fdwTransState;
+
 /* for assigning cursor numbers and prepared statement numbers */
 static unsigned int cursor_number = 0;
 static unsigned int prep_stmt_number = 0;
@@ -73,6 +87,9 @@ static unsigned int prep_stmt_number = 0;
 /* tracks whether any work is needed in callback functions */
 static bool xact_got_connection = false;
 
+/* counter of prepared tx made by this backend */
+static int two_phase_xact_count = 0;
+
 /* prototypes of private functions */
 static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
 static void disconnect_pg_server(ConnCacheEntry *entry);
@@ -81,6 +98,7 @@ static void configure_remote_session(PGconn *conn);
 static void do_sql_command(PGconn *conn, const char *sql);
 static void begin_remote_xact(ConnCacheEntry *entry);
 static void pgfdw_xact_callback(XactEvent event, void *arg);
+static void deallocate_prepared_stmts(ConnCacheEntry *entry);
 static void pgfdw_subxact_callback(SubXactEvent event,
 								   SubTransactionId mySubid,
 								   SubTransactionId parentSubid,
@@ -137,6 +155,16 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 									  pgfdw_inval_callback, (Datum) 0);
 	}
 
+	/* Allocate FdwTransactionState */
+	if (fdwTransState == NULL)
+	{
+		MemoryContext oldcxt;
+		oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+		fdwTransState = palloc0(sizeof(FdwTransactionState));
+		fdwTransState->nparticipants = 0;
+		MemoryContextSwitchTo(oldcxt);
+	}
+
 	/* Set flag that we did GetConnection during the current transaction */
 	xact_got_connection = true;
 
@@ -448,7 +476,8 @@ configure_remote_session(PGconn *conn)
 }
 
 /*
- * Convenience subroutine to issue a non-data-returning SQL command to remote
+ * Convenience subroutine to issue a non-data-returning SQL command or
+ * statement to remote node.
  */
 static void
 do_sql_command(PGconn *conn, const char *sql)
@@ -494,6 +523,8 @@ begin_remote_xact(ConnCacheEntry *entry)
 		do_sql_command(entry->conn, sql);
 		entry->xact_depth = 1;
 		entry->changing_xact_state = false;
+
+		fdwTransState->nparticipants += 1;
 	}
 
 	/*
@@ -701,6 +732,76 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
 	PG_END_TRY();
 }
 
+/* Callback typedef for BroadcastStmt */
+typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg);
+
+/*
+ * Broadcast sql in parallel to all ConnectionHash entries.
+ *
+ * In the case of elevel < ERROR and error occured only a elevel message
+ * will be rised and 0 (false) will be returned as a return code.  That way,
+ * it will be up to the caller to handle this situation gracefully.
+ */
+static bool
+BroadcastStmt(char const * sql, unsigned expectedStatus,
+			  int elevel, BroadcastCmdResHandler handler,
+			  void *arg)
+{
+	HASH_SEQ_STATUS scan;
+	ConnCacheEntry *entry;
+	bool		allOk = true;
+
+	/* Broadcast sql */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		pgfdw_reject_incomplete_xact_state_change(entry);
+
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			if (!PQsendQuery(entry->conn, sql))
+			{
+				PGresult   *res = PQgetResult(entry->conn);
+
+				elog(elevel < ERROR ? elevel : WARNING, "failed to send command %s", sql);
+				pgfdw_report_error(elevel, res, entry->conn, true, sql);
+				PQclear(res);
+			}
+		}
+	}
+
+	/* Collect responses */
+	hash_seq_init(&scan, ConnectionHash);
+	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
+	{
+		if (entry->xact_depth > 0 && entry->conn != NULL)
+		{
+			PGresult   *result = PQgetResult(entry->conn);
+
+			if (PQresultStatus(result) != expectedStatus ||
+				(handler && !handler(result, arg)))
+			{
+				elog(elevel < ERROR ? elevel : WARNING,
+					 "failed command %s: status=%d, expected status=%d",
+					 sql, PQresultStatus(result), expectedStatus);
+				pgfdw_report_error(elevel, result, entry->conn, true, sql);
+				allOk = false;
+			}
+			PQclear(result);
+			PQgetResult(entry->conn);	/* consume NULL result */
+		}
+	}
+
+	return allOk;
+}
+
+/* Wrapper for broadcasting commands */
+static bool
+BroadcastCmd(char const *sql, int elevel)
+{
+	return BroadcastStmt(sql, PGRES_COMMAND_OK, elevel, NULL, NULL);
+}
+
 /*
  * pgfdw_xact_callback --- cleanup at main-transaction end.
  */
@@ -714,6 +815,74 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	if (!xact_got_connection)
 		return;
 
+	/*
+	 * On PRE_COMMIT event we should figure out whether to use 2PC or not.
+	 * This decision is based on two factors:
+	 *    # postgres_fdw.use_twophase is turned on;
+	 *    # more than one server have participated in this transaction.
+	 *
+	 * If we decide to use 2PC for this xact, then we should broadcast
+	 * PREPARE to all participated foreign servers.
+	 */
+	if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT)
+	{
+		/* Should we take into account this node? */
+		if (TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+			fdwTransState->nparticipants += 1;
+
+		/* Switch to 2PC mode if there were more than one participant */
+		if (Use2PC && fdwTransState->nparticipants > 1)
+			fdwTransState->two_phase_commit = true;
+
+		if (fdwTransState->two_phase_commit)
+		{
+			char   *sql;
+
+			fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d",
+										  (long long) GetCurrentTimestamp(),
+										  (long long) GetSystemIdentifier(),
+										  MyProcPid,
+										  GetCurrentTransactionIdIfAny(),
+										  ++two_phase_xact_count,
+										  fdwTransState->nparticipants);
+
+			/* Broadcast PREPARE */
+			sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid);
+
+			/*
+			 * If we got any problem, then it does not make much sence to
+			 * broadcast ABORT PREPARED in order to clean up prepared xacts
+			 * everywhere, since this method does not guarantee a 100%
+			 * success.  This is a work for external tools.  Rise an ERROR
+			 * immediately in the case of failure during broadcast.
+			 */
+			BroadcastCmd(sql, ERROR);
+
+			/*
+			 * Do not fall down. Consequent COMMIT event will clean things up.
+			 */
+			return;
+		}
+	}
+
+	/*
+	 * COMMIT event occurs when the local transaction is fully committed.
+	 * That way, we have to broadcast COMMIT PREPARED to all participated
+	 * foreign servers in order to finalize this 'distributed' transaction.
+	 * Actually, it is too late to abort the host transaction if any error
+	 * occurs during COMMIT PREPARED broadcast stage.
+	 */
+	if (fdwTransState->two_phase_commit &&
+		(event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT))
+	{
+		if (!BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid), WARNING))
+			ereport(WARNING,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("canceling the wait for foreign servers to commit prepared transaction due to the error occured on one of them"),
+					 errdetail("The transaction has already committed locally, but might not have been committed on all participated foreign servers."),
+					 errhint("Consider committing it everywhere manually with COMMIT PREPARED '%s'", fdwTransState->gid)));
+	}
+
 	/*
 	 * Scan all connection cache entries to find open remote transactions, and
 	 * close them.
@@ -721,8 +890,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 	hash_seq_init(&scan, ConnectionHash);
 	while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 	{
-		PGresult   *res;
-
 		/* Ignore cache entry if no open connection right now */
 		if (entry->conn == NULL)
 			continue;
@@ -739,6 +906,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 			{
 				case XACT_EVENT_PARALLEL_PRE_COMMIT:
 				case XACT_EVENT_PRE_COMMIT:
+					Assert(!fdwTransState->two_phase_commit);
 
 					/*
 					 * If abort cleanup previously failed for this connection,
@@ -751,28 +919,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					do_sql_command(entry->conn, "COMMIT TRANSACTION");
 					entry->changing_xact_state = false;
 
-					/*
-					 * If there were any errors in subtransactions, and we
-					 * made prepared statements, do a DEALLOCATE ALL to make
-					 * sure we get rid of all prepared statements. This is
-					 * annoying and not terribly bulletproof, but it's
-					 * probably not worth trying harder.
-					 *
-					 * DEALLOCATE ALL only exists in 8.3 and later, so this
-					 * constrains how old a server postgres_fdw can
-					 * communicate with.  We intentionally ignore errors in
-					 * the DEALLOCATE, so that we can hobble along to some
-					 * extent with older servers (leaking prepared statements
-					 * as we go; but we don't really support update operations
-					 * pre-8.3 anyway).
-					 */
-					if (entry->have_prep_stmt && entry->have_error)
-					{
-						res = PQexec(entry->conn, "DEALLOCATE ALL");
-						PQclear(res);
-					}
-					entry->have_prep_stmt = false;
-					entry->have_error = false;
+					deallocate_prepared_stmts(entry);
 					break;
 				case XACT_EVENT_PRE_PREPARE:
 
@@ -791,6 +938,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 					break;
 				case XACT_EVENT_PARALLEL_COMMIT:
 				case XACT_EVENT_COMMIT:
+					if (fdwTransState->two_phase_commit)
+						deallocate_prepared_stmts(entry);
+					else /* Pre-commit should have closed the open transaction */
+						elog(ERROR, "missed cleaning up connection during pre-commit");
+					break;
 				case XACT_EVENT_PREPARE:
 					/* Pre-commit should have closed the open transaction */
 					elog(ERROR, "missed cleaning up connection during pre-commit");
@@ -886,6 +1038,38 @@ pgfdw_xact_callback(XactEvent event, void *arg)
 
 	/* Also reset cursor numbering for next transaction */
 	cursor_number = 0;
+
+	/* Reset fdwTransState */
+	memset(fdwTransState, '\0', sizeof(FdwTransactionState));
+}
+
+/*
+ * If there were any errors in subtransactions, and we
+ * made prepared statements, do a DEALLOCATE ALL to make
+ * sure we get rid of all prepared statements. This is
+ * annoying and not terribly bulletproof, but it's
+ * probably not worth trying harder.
+ *
+ * DEALLOCATE ALL only exists in 8.3 and later, so this
+ * constrains how old a server postgres_fdw can
+ * communicate with.  We intentionally ignore errors in
+ * the DEALLOCATE, so that we can hobble along to some
+ * extent with older servers (leaking prepared statements
+ * as we go; but we don't really support update operations
+ * pre-8.3 anyway).
+ */
+static void
+deallocate_prepared_stmts(ConnCacheEntry *entry)
+{
+	PGresult   *res;
+
+	if (entry->have_prep_stmt && entry->have_error)
+	{
+		res = PQexec(entry->conn, "DEALLOCATE ALL");
+		PQclear(res);
+	}
+	entry->have_prep_stmt = false;
+	entry->have_error = false;
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index a31abce7c99..7a7772f5dd3 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -301,6 +301,12 @@ typedef struct
 	List	   *already_used;	/* expressions already dealt with */
 } ec_member_foreign_arg;
 
+bool		Use2PC = false;
+
+#ifndef PG_FDW_BUILTIN
+void		_PG_init(void);
+#endif
+
 /*
  * SQL functions
  */
@@ -6583,3 +6589,14 @@ find_em_expr_for_input_target(PlannerInfo *root,
 	elog(ERROR, "could not find pathkey item to sort");
 	return NULL;				/* keep compiler quiet */
 }
+
+#ifndef PG_FDW_BUILTIN
+void
+_PG_init(void)
+{
+	DefineCustomBoolVariable("postgres_fdw.use_twophase",
+							 "Use two phase commit for distributed transactions", NULL,
+							 &Use2PC, false, PGC_USERSET, 0, NULL,
+							 NULL, NULL);
+}
+#endif
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db392..3c8cadc508a 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -208,4 +208,6 @@ extern const char *get_jointype_name(JoinType jointype);
 extern bool is_builtin(Oid objectId);
 extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo);
 
+extern bool Use2PC;
+
 #endif							/* POSTGRES_FDW_H */

base-commit: 49d7165117893405ae9b5b8d8e7877acff33c0e7
-- 
2.19.1

