From 7a94d736f4b1795c357eedbb7f1a4bb2ac7a6fc0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 1 Oct 2020 20:53:04 -0300
Subject: [PATCH 1/2] Restore double command completion tags

---
 src/backend/replication/logical/tablesync.c | 61 ++++++---------------
 src/backend/replication/logical/worker.c    | 13 ++---
 src/backend/replication/walsender.c         |  3 +-
 src/test/subscription/t/100_bugs.pl         | 37 ++++++++++++-
 4 files changed, 61 insertions(+), 53 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c27d970589..271fcb7822 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -832,6 +832,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	MyLogicalRepWorker->relstate_lsn = relstate_lsn;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	switch (relstate)
+	{
+		case SUBREL_STATE_SYNCDONE:
+		case SUBREL_STATE_READY:
+		case SUBREL_STATE_UNKNOWN:
+
+			/*
+			 * Nothing to do here but finish.  (UNKNOWN means the relation was
+			 * removed from pg_subscription_rel before the sync worker could
+			 * start.)
+			 */
+			finish_sync_worker();
+	}
+
 	/*
 	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
 	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
@@ -856,10 +870,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		ereport(ERROR,
 				(errmsg("could not connect to the publisher: %s", err)));
 
-	switch (MyLogicalRepWorker->relstate)
-	{
-		case SUBREL_STATE_INIT:
-		case SUBREL_STATE_DATASYNC:
+	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+
 			{
 				Relation	rel;
 				WalRcvExecResult *res;
@@ -944,46 +957,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				/* Wait for main apply worker to tell us to catchup. */
 				wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
-
-				/*----------
-				 * There are now two possible states here:
-				 * a) Sync is behind the apply.  If that's the case we need to
-				 *	  catch up with it by consuming the logical replication
-				 *	  stream up to the relstate_lsn.  For that, we exit this
-				 *	  function and continue in ApplyWorkerMain().
-				 * b) Sync is caught up with the apply.  So it can just set
-				 *	  the state to SYNCDONE and finish.
-				 *----------
-				 */
-				if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
-				{
-					/*
-					 * Update the new state in catalog.  No need to bother
-					 * with the shmem state as we are exiting for good.
-					 */
-					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-											   MyLogicalRepWorker->relid,
-											   SUBREL_STATE_SYNCDONE,
-											   *origin_startpos);
-					finish_sync_worker();
-				}
-				break;
 			}
-		case SUBREL_STATE_SYNCDONE:
-		case SUBREL_STATE_READY:
-		case SUBREL_STATE_UNKNOWN:
-
-			/*
-			 * Nothing to do here but finish.  (UNKNOWN means the relation was
-			 * removed from pg_subscription_rel before the sync worker could
-			 * start.)
-			 */
-			finish_sync_worker();
-			break;
-		default:
-			elog(ERROR, "unknown relation state \"%c\"",
-				 MyLogicalRepWorker->relstate);
-	}
 
+	/* Finally, let caller complete the stream. */
 	return slotname;
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9c6fdeeb56..6299baad27 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2061,6 +2061,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 {
 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 	bool		ping_sent = false;
+	TimeLineID	tli;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -2202,12 +2203,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		/* Check if we need to exit the streaming loop. */
 		if (endofstream)
-		{
-			TimeLineID	tli;
-
-			walrcv_endstreaming(wrconn, &tli);
 			break;
-		}
 
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
@@ -2284,6 +2280,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			send_feedback(last_received, requestReply, requestReply);
 		}
 	}
+
+	/* All done */
+	walrcv_endstreaming(wrconn, &tli);
 }
 
 /*
@@ -3026,9 +3025,7 @@ ApplyWorkerMain(Datum main_arg)
 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
 
 		/* The slot name needs to be allocated in permanent memory context. */
-		oldctx = MemoryContextSwitchTo(ApplyContext);
-		myslotname = pstrdup(syncslotname);
-		MemoryContextSwitchTo(oldctx);
+		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
 		pfree(syncslotname);
 	}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7c9d1b67df..55bce510ab 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1656,7 +1656,8 @@ exec_replication_command(const char *cmd_string)
 				else
 					StartLogicalReplication(cmd);
 
-				/* callees already sent their own completion message */
+				/* necessary per libpqrcv_endstreaming expectation */
+				EndReplicationCommand(cmdtag);
 
 				Assert(xlogreader != NULL);
 				break;
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 366a7a9435..2d1618658f 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 3;
+use Test::More tests => 5;
 
 # Bug #15114
 
@@ -100,3 +100,38 @@ is( $node_publisher->psql(
 );
 
 $node_publisher->stop('fast');
+
+# Bug #16643
+# Initial sync doesn't complete; the protocol was not being followed per
+# expectations after commit 07082b08cc5d.
+my $node_twoways = get_new_node('twoways');
+$node_twoways->init(allows_streaming => 'logical');
+$node_twoways->start;
+for my $db (qw(d1 d2))
+{
+	$node_twoways->safe_psql('postgres', "CREATE DATABASE $db");
+	$node_twoways->safe_psql($db, "CREATE TABLE t (f int)");
+	$node_twoways->safe_psql($db, "CREATE TABLE t2 (f int)");
+}
+
+my $rows = 10000000;
+$node_twoways->safe_psql('d1', qq{
+	INSERT INTO t SELECT * FROM generate_series(1, $rows);
+	INSERT INTO t2 SELECT * FROM generate_series(1, $rows);
+	CREATE PUBLICATION testpub FOR TABLE t, t2;
+	SELECT pg_create_logical_replication_slot('testslot', 'pgoutput');
+	});
+
+$node_twoways->safe_psql('d2',
+	"CREATE SUBSCRIPTION testsub CONNECTION \$\$".
+	$node_twoways->connstr('d1').
+	"\$\$ PUBLICATION testpub WITH (create_slot=false, ".
+	"slot_name='testslot')");
+$node_twoways->wait_for_catchup('testsub');
+$node_twoways->poll_query_until('d2',
+   "SELECT count(*) FROM pg_stat_subscription WHERE subname = 'testsub' AND relid <> 0", "0");
+
+is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
+	$rows, "$rows rows in t");
+is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
+	$rows, "$rows rows in t2");
-- 
2.20.1

