From 32dbc891cc829f9dbbfa69690743d97699862453 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Mon, 3 Oct 2022 19:39:44 -0700
Subject: [PATCH v3 1/2] Move WAL receivers' non-shared state to a new struct.

This is preparatory work for a follow-up change that will revamp
the wakeup mechanism for periodic tasks that WAL receivers must
perform.
---
 src/backend/replication/walreceiver.c | 46 +++++++++++++++------------
 1 file changed, 26 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 3767466ef3..83e333e89c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -116,6 +116,14 @@ static struct
 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
 }			LogstreamResult;
 
+/*
+ * A struct to keep track of non-shared state.
+ */
+typedef struct WalRcvInfo
+{
+	TimeLineID	startpointTLI;
+} WalRcvInfo;
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -175,7 +183,6 @@ WalReceiverMain(void)
 	char		slotname[NAMEDATALEN];
 	bool		is_temp_slot;
 	XLogRecPtr	startpoint;
-	TimeLineID	startpointTLI;
 	TimeLineID	primaryTLI;
 	bool		first_stream;
 	WalRcvData *walrcv = WalRcv;
@@ -185,6 +192,7 @@ WalReceiverMain(void)
 	char	   *err;
 	char	   *sender_host = NULL;
 	int			sender_port = 0;
+	WalRcvInfo	state = {0};
 
 	/*
 	 * WalRcv should be set up already (if we are a backend, we inherit this
@@ -238,7 +246,7 @@ WalReceiverMain(void)
 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
 	is_temp_slot = walrcv->is_temp_slot;
 	startpoint = walrcv->receiveStart;
-	startpointTLI = walrcv->receiveStartTLI;
+	state.startpointTLI = walrcv->receiveStartTLI;
 
 	/*
 	 * At most one of is_temp_slot and slotname can be set; otherwise,
@@ -258,7 +266,7 @@ WalReceiverMain(void)
 	pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 
 	/* Arrange to clean up at walreceiver exit */
-	on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
+	on_shmem_exit(WalRcvDie, PointerGetDatum(&state));
 
 	/* Properly accept or ignore signals the postmaster might send us */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -345,11 +353,11 @@ WalReceiverMain(void)
 		 * Confirm that the current timeline of the primary is the same or
 		 * ahead of ours.
 		 */
-		if (primaryTLI < startpointTLI)
+		if (primaryTLI < state.startpointTLI)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
-							primaryTLI, startpointTLI)));
+							primaryTLI, state.startpointTLI)));
 
 		/*
 		 * Get any missing history files. We do this always, even when we're
@@ -361,7 +369,7 @@ WalReceiverMain(void)
 		 * but let's avoid the confusion of timeline id collisions where we
 		 * can.
 		 */
-		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+		WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 
 		/*
 		 * Create temporary replication slot if requested, and update slot
@@ -396,17 +404,17 @@ WalReceiverMain(void)
 		options.logical = false;
 		options.startpoint = startpoint;
 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
-		options.proto.physical.startpointTLI = startpointTLI;
+		options.proto.physical.startpointTLI = state.startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
 			if (first_stream)
 				ereport(LOG,
 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			else
 				ereport(LOG,
 						(errmsg("restarted WAL streaming at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			first_stream = false;
 
 			/* Initialize LogstreamResult and buffers for processing messages */
@@ -465,7 +473,7 @@ WalReceiverMain(void)
 							last_recv_timestamp = GetCurrentTimestamp();
 							ping_sent = false;
 							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
-												 startpointTLI);
+												 state.startpointTLI);
 						}
 						else if (len == 0)
 							break;
@@ -474,7 +482,7 @@ WalReceiverMain(void)
 							ereport(LOG,
 									(errmsg("replication terminated by primary server"),
 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
-											   startpointTLI,
+											   state.startpointTLI,
 											   LSN_FORMAT_ARGS(LogstreamResult.Write))));
 							endofwal = true;
 							break;
@@ -490,7 +498,7 @@ WalReceiverMain(void)
 					 * let the startup process and primary server know about
 					 * them.
 					 */
-					XLogWalRcvFlush(false, startpointTLI);
+					XLogWalRcvFlush(false, state.startpointTLI);
 				}
 
 				/* Check if we need to exit the streaming loop. */
@@ -596,12 +604,12 @@ WalReceiverMain(void)
 			 * know about when we began streaming, fetch its timeline history
 			 * file now.
 			 */
-			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+			WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 		}
 		else
 			ereport(LOG,
 					(errmsg("primary server contains no more WAL on requested timeline %u",
-							startpointTLI)));
+							state.startpointTLI)));
 
 		/*
 		 * End of WAL reached on the requested timeline. Close the last
@@ -611,7 +619,7 @@ WalReceiverMain(void)
 		{
 			char		xlogfname[MAXFNAMELEN];
 
-			XLogWalRcvFlush(false, startpointTLI);
+			XLogWalRcvFlush(false, state.startpointTLI);
 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 			if (close(recvFile) != 0)
 				ereport(PANIC,
@@ -631,7 +639,7 @@ WalReceiverMain(void)
 		recvFile = -1;
 
 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
-		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+		WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI);
 	}
 	/* not reached */
 }
@@ -779,12 +787,10 @@ static void
 WalRcvDie(int code, Datum arg)
 {
 	WalRcvData *walrcv = WalRcv;
-	TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
-
-	Assert(*startpointTLI_p != 0);
+	WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg);
 
 	/* Ensure that all WAL records received are flushed to disk */
-	XLogWalRcvFlush(true, *startpointTLI_p);
+	XLogWalRcvFlush(true, state->startpointTLI);
 
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
-- 
2.25.1

