From 3dfff90032c38daba43e1e0c4d3221053d6386ac Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@otacoo.com>
Date: Sat, 9 Aug 2014 14:49:24 +0900
Subject: [PATCH] Add parameter synchronous_standby_num

This makes possible support of synchronous replication on a number of standby
nodes equal to the new parameter. The synchronous standbys are chosen in the
order they are listed in synchronous_standby_names.
---
 doc/src/sgml/config.sgml            | 32 ++++++++++++---
 doc/src/sgml/high-availability.sgml | 18 ++++-----
 src/backend/replication/syncrep.c   | 81 ++++++++++++++++++++++++++++++-------
 src/backend/replication/walsender.c | 74 ++++++++++++++++++++++++++++-----
 src/backend/utils/misc/guc.c        | 10 +++++
 src/include/replication/syncrep.h   |  1 +
 6 files changed, 175 insertions(+), 41 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index be5c25b..c40de16 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2586,12 +2586,13 @@ include_dir 'conf.d'
         Specifies a comma-separated list of standby names that can support
         <firstterm>synchronous replication</>, as described in
         <xref linkend="synchronous-replication">.
-        At any one time there will be at most one active synchronous standby;
-        transactions waiting for commit will be allowed to proceed after
-        this standby server confirms receipt of their data.
-        The synchronous standby will be the first standby named in this list
-        that is both currently connected and streaming data in real-time
-        (as shown by a state of <literal>streaming</literal> in the
+        At any one time there will be at a number of active synchronous standbys
+        defined by <varname>synchronous_standby_num</>; transactions waiting
+        for commit will be allowed to proceed after those standby servers
+        confirms receipt of their data. The synchronous standbys will be
+        the first entries named in this list that are both currently connected
+        and streaming data in real-time (as shown by a state of
+        <literal>streaming</literal> in the
         <link linkend="monitoring-stats-views-table">
         <literal>pg_stat_replication</></link> view).
         Other standby servers appearing later in this list represent potential
@@ -2627,6 +2628,25 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-synchronous-standby-num" xreflabel="synchronous_standby_num">
+      <term><varname>synchronous_standby_num</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>synchronous_standby_num</> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the number of standbys that support
+        <firstterm>synchronous replication</>, as described in
+        <xref linkend="synchronous-replication">, and listed as the first
+        elements of <xref linkend="guc-synchronous-standby-names">.
+       </para>
+       <para>
+        Default value is 1.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-vacuum-defer-cleanup-age" xreflabel="vacuum_defer_cleanup_age">
       <term><varname>vacuum_defer_cleanup_age</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index d249959..085d51b 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -1081,12 +1081,12 @@ primary_slot_name = 'node_a_slot'
     WAL record is then sent to the standby. The standby sends reply
     messages each time a new batch of WAL data is written to disk, unless
     <varname>wal_receiver_status_interval</> is set to zero on the standby.
-    If the standby is the first matching standby, as specified in
-    <varname>synchronous_standby_names</> on the primary, the reply
-    messages from that standby will be used to wake users waiting for
-    confirmation that the commit record has been received. These parameters
-    allow the administrator to specify which standby servers should be
-    synchronous standbys. Note that the configuration of synchronous
+    If the standby is the first <varname>synchronous_standby_num</> matching
+    standbys, as specified in <varname>synchronous_standby_names</> on the
+    primary, the reply messages from that standby will be used to wake users
+    waiting for confirmation that the commit record has been received. These
+    parameters allow the administrator to specify which standby servers should
+    be synchronous standbys. Note that the configuration of synchronous
     replication is mainly on the master. Named standbys must be directly
     connected to the master; the master knows nothing about downstream
     standby servers using cascaded replication.
@@ -1169,9 +1169,9 @@ primary_slot_name = 'node_a_slot'
     The best solution for avoiding data loss is to ensure you don't lose
     your last remaining synchronous standby. This can be achieved by naming multiple
     potential synchronous standbys using <varname>synchronous_standby_names</>.
-    The first named standby will be used as the synchronous standby. Standbys
-    listed after this will take over the role of synchronous standby if the
-    first one should fail.
+    The first <varname>synchronous_standby_num</> named standbys will be used as
+    the synchronous standbys. Standbys listed after this will take over the role
+    of synchronous standby if the first one should fail.
    </para>
 
    <para>
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index aa54bfb..524ff6c 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -59,6 +59,7 @@
 
 /* User-settable parameters for sync rep */
 char	   *SyncRepStandbyNames;
+int			synchronous_standby_num = 1;
 
 #define SyncStandbysDefined() \
 	(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
@@ -206,7 +207,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 			ereport(WARNING,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
 			whereToSendOutput = DestNone;
 			SyncRepCancelWait();
 			break;
@@ -223,7 +224,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
 			QueryCancelPending = false;
 			ereport(WARNING,
 					(errmsg("canceling wait for synchronous replication due to user request"),
-					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
+					 errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s).")));
 			SyncRepCancelWait();
 			break;
 		}
@@ -368,11 +369,15 @@ void
 SyncRepReleaseWaiters(void)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
-	volatile WalSnd *syncWalSnd = NULL;
+	volatile WalSnd *syncWalSnd[synchronous_standby_num];
 	int			numwrite = 0;
 	int			numflush = 0;
 	int			priority = 0;
+	int			num_sync = 0;
 	int			i;
+	bool		found = false;
+
+	syncWalSnd[0] = NULL;
 
 	/*
 	 * If this WALSender is serving a standby that is not on the list of
@@ -388,7 +393,7 @@ SyncRepReleaseWaiters(void)
 	/*
 	 * We're a potential sync standby. Release waiters if we are the highest
 	 * priority standby. If there are multiple standbys with same priorities
-	 * then we use the first mentioned standby. If you change this, also
+	 * then we use the first mentioned standbys. If you change this, also
 	 * change pg_stat_get_wal_senders().
 	 */
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -398,33 +403,79 @@ SyncRepReleaseWaiters(void)
 		/* use volatile pointer to prevent code rearrangement */
 		volatile WalSnd *walsnd = &walsndctl->walsnds[i];
 
-		if (walsnd->pid != 0 &&
-			walsnd->state == WALSNDSTATE_STREAMING &&
-			walsnd->sync_standby_priority > 0 &&
-			(priority == 0 ||
-			 priority > walsnd->sync_standby_priority) &&
-			!XLogRecPtrIsInvalid(walsnd->flush))
+		/* Leave if not streaming */
+		if (walsnd->state != WALSNDSTATE_STREAMING)
+			continue;
+
+		/* Leave if asynchronous */
+		if (walsnd->sync_standby_priority == 0)
+			continue;
+
+		/* Leave if priority conditions not satisfied */
+		if (priority != 0 &&
+			priority <= walsnd->sync_standby_priority &&
+			num_sync == synchronous_standby_num)
+			continue;
+
+		/* Leave if invalid flush position */
+		if (XLogRecPtrIsInvalid(walsnd->flush))
+			continue;
+
+		/*
+		 * We have a potential synchronous candidate, add it to the
+		 * list of nodes already present or evict the node with highest
+		 * priority found until now.
+		 */
+
+		if (num_sync == synchronous_standby_num)
+		{
+			int j;
+
+			for (j = 0; j < num_sync; j++)
+			{
+				if (syncWalSnd[j]->sync_standby_priority == priority)
+				{
+					syncWalSnd[j] = walsnd;
+					break;
+				}
+			}
+		}
+		else
 		{
-			priority = walsnd->sync_standby_priority;
-			syncWalSnd = walsnd;
+			syncWalSnd[num_sync] = walsnd;
+			num_sync++;
 		}
+
+		/* Update priority for next tracking */
+		priority = walsnd->sync_standby_priority;
 	}
 
 	/*
 	 * We should have found ourselves at least.
 	 */
-	Assert(syncWalSnd);
+	Assert(syncWalSnd[0]);
 
 	/*
-	 * If we aren't managing the highest priority standby then just leave.
+	 * If we aren't managing one of the highest priority standby then just leave.
 	 */
-	if (syncWalSnd != MyWalSnd)
+	for (i = 0; i < num_sync; i++)
+	{
+		if (syncWalSnd[i] == MyWalSnd)
+		{
+			found = true;
+			break;
+		}
+	}
+
+	/* We are definitely not one of the chosen... */
+	if (!found)
 	{
 		LWLockRelease(SyncRepLock);
 		announce_next_takeover = true;
 		return;
 	}
 
+
 	/*
 	 * Set the lsn first so that when we wake backends they will release up to
 	 * this location.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3189793..8c74c86 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2734,9 +2734,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 	MemoryContext oldcontext;
 	int		   *sync_priority;
 	int			priority = 0;
-	int			sync_standby = -1;
+	int			sync_standbys[max_wal_senders];
+	int			num_sync = 0;
 	int			i;
 
+	sync_standbys[0] = -1;
+
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 		ereport(ERROR,
@@ -2784,15 +2787,50 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
 				0 : walsnd->sync_standby_priority;
 
-			if (walsnd->state == WALSNDSTATE_STREAMING &&
-				walsnd->sync_standby_priority > 0 &&
-				(priority == 0 ||
-				 priority > walsnd->sync_standby_priority) &&
-				!XLogRecPtrIsInvalid(walsnd->flush))
+			/* Leave if not streaming */
+			if (walsnd->state != WALSNDSTATE_STREAMING)
+				continue;
+
+			/* Leave if asynchronous */
+			if (walsnd->sync_standby_priority == 0)
+				continue;
+
+			/* Leave if priority conditions not satisfied */
+			if (priority != 0 &&
+				priority <= walsnd->sync_standby_priority &&
+				num_sync == synchronous_standby_num)
+				continue;
+
+			/* Leave if invalid flush position */
+			if (XLogRecPtrIsInvalid(walsnd->flush))
+				continue;
+
+			/*
+			 * We have a potential synchronous candidate, add it to the
+			 * list of nodes already present or evict the node with highest
+			 * priority found until now.
+			 */
+			if (num_sync == synchronous_standby_num)
+			{
+				int j;
+				for (j = 0; j < num_sync; j++)
+				{
+					volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]];
+					if (walsndloc->sync_standby_priority == priority)
+					{
+						sync_standbys[j] = i;
+						break;
+					}
+				}
+			}
+			else
 			{
-				priority = walsnd->sync_standby_priority;
-				sync_standby = i;
+				sync_standbys[num_sync] = i;
+				num_sync++;
 			}
+
+			/* Update priority for next tracking */
+			priority = walsnd->sync_standby_priority;
 		}
 	}
 	LWLockRelease(SyncRepLock);
@@ -2856,10 +2894,24 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 			 */
 			if (sync_priority[i] == 0)
 				values[7] = CStringGetTextDatum("async");
-			else if (i == sync_standby)
-				values[7] = CStringGetTextDatum("sync");
 			else
-				values[7] = CStringGetTextDatum("potential");
+			{
+				int j;
+				bool found = false;
+
+				for (j = 0; j < num_sync; j++)
+				{
+					/* Found that this node is one in sync */
+					if (i == sync_standbys[j])
+					{
+						values[7] = CStringGetTextDatum("sync");
+						found = true;
+						break;
+					}
+				}
+				if (!found)
+					values[7] = CStringGetTextDatum("potential");
+			}
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6c52db8..73523db 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2551,6 +2551,16 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"synchronous_standby_num", PGC_SIGHUP, REPLICATION_MASTER,
+			gettext_noop("Number of potential synchronous standbys."),
+			NULL
+		},
+		&synchronous_standby_num,
+		1, 1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 7eeaf3b..da1cf7c 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -33,6 +33,7 @@
 
 /* user-settable parameters for synchronous replication */
 extern char *SyncRepStandbyNames;
+extern int	synchronous_standby_num;
 
 /* called by user backend */
 extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
-- 
2.0.4

