From 08048c5fc255c70d8306bd573ccb827ea320f481 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 19 Feb 2020 12:33:09 -0800
Subject: [PATCH v1 3/6] Use dlist for syncrep queue.

---
 src/include/replication/walsender_private.h |  3 +-
 src/include/storage/proc.h                  |  2 +-
 src/backend/replication/syncrep.c           | 89 +++++++++------------
 src/backend/replication/walsender.c         |  2 +-
 src/backend/storage/lmgr/proc.c             |  2 +-
 5 files changed, 41 insertions(+), 57 deletions(-)

diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 366828f0a47..c2f946c5c48 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_PRIVATE_H
 
 #include "access/xlog.h"
+#include "lib/ilist.h"
 #include "nodes/nodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
@@ -96,7 +97,7 @@ typedef struct
 	 * Synchronous replication queue with one queue per request type.
 	 * Protected by SyncRepLock.
 	 */
-	SHM_QUEUE	SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
+	dlist_head	SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
 
 	/*
 	 * Current location of the head of the queue. All waiters should have a
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 2ba37f250de..de222f96681 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -150,7 +150,7 @@ struct PGPROC
 	 */
 	XLogRecPtr	waitLSN;		/* waiting for this LSN or higher */
 	int			syncRepState;	/* wait state for sync rep */
-	SHM_QUEUE	syncRepLinks;	/* list link if process is in syncrep queue */
+	dlist_node	syncRepLinks;	/* list link if process is in syncrep queue */
 
 	/*
 	 * All PROCLOCK objects for locks held or awaited by this backend are
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index c284103b548..402d3e41fc6 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -168,7 +168,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	if (!SyncRepRequested())
 		return;
 
-	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
 	Assert(WalSndCtl != NULL);
 
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -304,7 +304,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	 * assertions, but better safe than sorry).
 	 */
 	pg_read_barrier();
-	Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+	Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	MyProc->waitLSN = 0;
 
@@ -325,31 +325,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 static void
 SyncRepQueueInsert(int mode)
 {
-	PGPROC	   *proc;
+	dlist_head *queue;
+	dlist_iter iter;
 
 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
-	proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-								   &(WalSndCtl->SyncRepQueue[mode]),
-								   offsetof(PGPROC, syncRepLinks));
+	queue = &WalSndCtl->SyncRepQueue[mode];
 
-	while (proc)
+	dlist_reverse_foreach(iter, queue)
 	{
+		PGPROC	   *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
 		/*
-		 * Stop at the queue element that we should after to ensure the queue
-		 * is ordered by LSN.
+		 * Stop at the queue element that we should insert after to ensure the
+		 * queue is ordered by LSN.
 		 */
 		if (proc->waitLSN < MyProc->waitLSN)
-			break;
-
-		proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-									   &(proc->syncRepLinks),
-									   offsetof(PGPROC, syncRepLinks));
+		{
+			dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
+			return;
+		}
 	}
 
-	if (proc)
-		SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
-	else
-		SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
+	/*
+	 * If we get here, the list was either empty, or this process needs to be
+	 * at the head.
+	 */
+	dlist_push_head(queue, &MyProc->syncRepLinks);
 }
 
 /*
@@ -359,8 +360,8 @@ static void
 SyncRepCancelWait(void)
 {
 	LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
-	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-		SHMQueueDelete(&(MyProc->syncRepLinks));
+	if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+		dlist_delete_thoroughly(&MyProc->syncRepLinks);
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 	LWLockRelease(SyncRepLock);
 }
@@ -372,13 +373,13 @@ SyncRepCleanupAtProcExit(void)
 	 * First check if we are removed from the queue without the lock to not
 	 * slow down backend exit.
 	 */
-	if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
+	if (!dlist_node_is_detached(&MyProc->syncRepLinks))
 	{
 		LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
 		/* maybe we have just been removed, so recheck */
-		if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-			SHMQueueDelete(&(MyProc->syncRepLinks));
+		if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+			dlist_delete_thoroughly(&MyProc->syncRepLinks);
 
 		LWLockRelease(SyncRepLock);
 	}
@@ -1011,20 +1012,17 @@ static int
 SyncRepWakeQueue(bool all, int mode)
 {
 	volatile WalSndCtlData *walsndctl = WalSndCtl;
-	PGPROC	   *proc = NULL;
-	PGPROC	   *thisproc = NULL;
 	int			numprocs = 0;
+	dlist_mutable_iter iter;
 
 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
 	Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
 	Assert(SyncRepQueueIsOrderedByLSN(mode));
 
-	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-								   &(WalSndCtl->SyncRepQueue[mode]),
-								   offsetof(PGPROC, syncRepLinks));
-
-	while (proc)
+	dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
 	{
+		PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
 		/*
 		 * Assume the queue is ordered by LSN
 		 */
@@ -1032,18 +1030,9 @@ SyncRepWakeQueue(bool all, int mode)
 			return numprocs;
 
 		/*
-		 * Move to next proc, so we can delete thisproc from the queue.
-		 * thisproc is valid, proc may be NULL after this.
+		 * Remove from queue.
 		 */
-		thisproc = proc;
-		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-									   &(proc->syncRepLinks),
-									   offsetof(PGPROC, syncRepLinks));
-
-		/*
-		 * Remove thisproc from queue.
-		 */
-		SHMQueueDelete(&(thisproc->syncRepLinks));
+		dlist_delete_thoroughly(&proc->syncRepLinks);
 
 		/*
 		 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
@@ -1056,12 +1045,12 @@ SyncRepWakeQueue(bool all, int mode)
 		 * Set state to complete; see SyncRepWaitForLSN() for discussion of
 		 * the various states.
 		 */
-		thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
+		proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
 
 		/*
 		 * Wake only when we have set state and removed from queue.
 		 */
-		SetLatch(&(thisproc->procLatch));
+		SetLatch(&(proc->procLatch));
 
 		numprocs++;
 	}
@@ -1115,19 +1104,17 @@ SyncRepUpdateSyncStandbysDefined(void)
 static bool
 SyncRepQueueIsOrderedByLSN(int mode)
 {
-	PGPROC	   *proc = NULL;
 	XLogRecPtr	lastLSN;
+	dlist_iter	iter;
 
 	Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
 
 	lastLSN = 0;
 
-	proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-								   &(WalSndCtl->SyncRepQueue[mode]),
-								   offsetof(PGPROC, syncRepLinks));
-
-	while (proc)
+	dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
 	{
+		PGPROC	   *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
 		/*
 		 * Check the queue is ordered by LSN and that multiple procs don't
 		 * have matching LSNs
@@ -1136,10 +1123,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
 			return false;
 
 		lastLSN = proc->waitLSN;
-
-		proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-									   &(proc->syncRepLinks),
-									   offsetof(PGPROC, syncRepLinks));
 	}
 
 	return true;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index abb533b9d03..2d2a0ceb503 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3021,7 +3021,7 @@ WalSndShmemInit(void)
 		MemSet(WalSndCtl, 0, WalSndShmemSize());
 
 		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
-			SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
+			dlist_init(&(WalSndCtl->SyncRepQueue[i]));
 
 		for (i = 0; i < max_wal_senders; i++)
 		{
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 5a157fc07d8..a4c338d7aff 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -415,7 +415,7 @@ InitProcess(void)
 	/* Initialize fields for sync rep */
 	MyProc->waitLSN = 0;
 	MyProc->syncRepState = SYNC_REP_NOT_WAITING;
-	SHMQueueElemInit(&(MyProc->syncRepLinks));
+	dlist_node_init(&MyProc->syncRepLinks);
 
 	/* Initialize fields for group XID clearing. */
 	MyProc->procArrayGroupMember = false;
-- 
2.25.0.114.g5b0ca878e0

