diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c
index 9548287217..4c12aa33df 100644
--- a/src/backend/commands/functioncmds.c
+++ b/src/backend/commands/functioncmds.c
@@ -64,6 +64,7 @@
 #include "parser/parse_func.h"
 #include "parser/parse_type.h"
 #include "pgstat.h"
+#include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -2319,6 +2320,20 @@ ExecuteCallStmt(CallStmt *stmt, ParamListInfo params, bool atomic, DestReceiver
 		if (fcinfo->isnull)
 			elog(ERROR, "procedure returned null record");
 
+		/*
+		 * Ensure there's an active snapshot whilst we execute whatever's
+		 * involved here.  Note that this is *not* sufficient to make the
+		 * world safe for TOAST pointers to be included in the returned data:
+		 * the referenced data could have gone away while we didn't hold a
+		 * snapshot.  Hence, it's incumbent on PLs that can do COMMIT/ROLLBACK
+		 * to not return TOAST pointers, unless those pointers were fetched
+		 * after the last COMMIT/ROLLBACK in the procedure.
+		 *
+		 * XXX that is a really nasty, hard-to-test requirement.  Is there a
+		 * way to remove it?
+		 */
+		EnsurePortalSnapshotExists();
+
 		td = DatumGetHeapTupleHeader(retval);
 		tupType = HeapTupleHeaderGetTypeId(td);
 		tupTypmod = HeapTupleHeaderGetTypMod(td);
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 00aa78ea53..0912967fe3 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -66,7 +66,7 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
 static int	_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 							  Snapshot snapshot, Snapshot crosscheck_snapshot,
-							  bool read_only, bool no_snapshots,
+							  bool read_only, bool allow_nonatomic,
 							  bool fire_triggers, uint64 tcount,
 							  DestReceiver *caller_dest,
 							  ResourceOwner plan_owner);
@@ -260,12 +260,8 @@ _SPI_commit(bool chain)
 	/* Start the actual commit */
 	_SPI_current->internal_xact = true;
 
-	/*
-	 * Before committing, pop all active snapshots to avoid error about
-	 * "snapshot %p still active".
-	 */
-	while (ActiveSnapshotSet())
-		PopActiveSnapshot();
+	/* Release snapshots associated with portals */
+	ForgetPortalSnapshots();
 
 	if (chain)
 		SaveTransactionCharacteristics();
@@ -322,6 +318,9 @@ _SPI_rollback(bool chain)
 	/* Start the actual rollback */
 	_SPI_current->internal_xact = true;
 
+	/* Release snapshots associated with portals */
+	ForgetPortalSnapshots();
+
 	if (chain)
 		SaveTransactionCharacteristics();
 
@@ -2264,7 +2263,7 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
  *		behavior of taking a new snapshot for each query.
  * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
  * read_only: true for read-only execution (no CommandCounterIncrement)
- * no_snapshots: true to skip snapshot management
+ * allow_nonatomic: true to allow nonatomic CALL/DO execution
  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
  *		false means any AFTER triggers are postponed to end of outer query
  * tcount: execution tuple-count limit, or 0 for none
@@ -2275,7 +2274,7 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
-				  bool read_only, bool no_snapshots,
+				  bool read_only, bool allow_nonatomic,
 				  bool fire_triggers, uint64 tcount,
 				  DestReceiver *caller_dest, ResourceOwner plan_owner)
 {
@@ -2318,11 +2317,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	 * In the first two cases, we can just push the snap onto the stack once
 	 * for the whole plan list.
 	 *
-	 * But if no_snapshots is true, then don't manage snapshots at all here.
-	 * The caller must then take care of that.
+	 * Note that snapshot != InvalidSnapshot implies an atomic execution
+	 * context.
 	 */
-	if (snapshot != InvalidSnapshot && !no_snapshots)
+	if (snapshot != InvalidSnapshot)
 	{
+		Assert(!allow_nonatomic);
 		if (read_only)
 		{
 			PushActiveSnapshot(snapshot);
@@ -2408,15 +2408,39 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 		stmt_list = cplan->stmt_list;
 
 		/*
-		 * In the default non-read-only case, get a new snapshot, replacing
-		 * any that we pushed in a previous cycle.
+		 * If we weren't given a specific snapshot to use, and the statement
+		 * list requires a snapshot, set that up.
 		 */
-		if (snapshot == InvalidSnapshot && !read_only && !no_snapshots)
+		if (snapshot == InvalidSnapshot &&
+			(list_length(stmt_list) > 1 ||
+			 (list_length(stmt_list) == 1 &&
+			  PlannedStmtRequiresSnapshot(linitial_node(PlannedStmt,
+														stmt_list)))))
 		{
-			if (pushed_active_snap)
-				PopActiveSnapshot();
-			PushActiveSnapshot(GetTransactionSnapshot());
-			pushed_active_snap = true;
+			/*
+			 * First, ensure there's a Portal-level snapshot.  This back-fills
+			 * the snapshot stack in case the previous operation was a COMMIT
+			 * or ROLLBACK inside a procedure or DO block.  (We can't put back
+			 * the Portal snapshot any sooner, or we'd break cases like doing
+			 * SET or LOCK just after COMMIT.)  It's enough to check once per
+			 * statement list, since COMMIT/ROLLBACK/CALL/DO can't appear
+			 * within a multi-statement list.
+			 */
+			EnsurePortalSnapshotExists();
+
+			/*
+			 * In the default non-read-only case, get a new per-statement-list
+			 * snapshot, replacing any that we pushed in a previous cycle.
+			 * Skip it when doing non-atomic execution, though (we rely
+			 * entirely on the Portal snapshot in that case).
+			 */
+			if (!read_only && !allow_nonatomic)
+			{
+				if (pushed_active_snap)
+					PopActiveSnapshot();
+				PushActiveSnapshot(GetTransactionSnapshot());
+				pushed_active_snap = true;
+			}
 		}
 
 		foreach(lc2, stmt_list)
@@ -2434,6 +2458,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 			_SPI_current->processed = 0;
 			_SPI_current->tuptable = NULL;
 
+			/* Check for unsupported cases. */
 			if (stmt->utilityStmt)
 			{
 				if (IsA(stmt->utilityStmt, CopyStmt))
@@ -2462,9 +2487,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 
 			/*
 			 * If not read-only mode, advance the command counter before each
-			 * command and update the snapshot.
+			 * command and update the snapshot.  (But skip it if the snapshot
+			 * isn't under our control.)
 			 */
-			if (!read_only && !no_snapshots)
+			if (!read_only && pushed_active_snap)
 			{
 				CommandCounterIncrement();
 				UpdateActiveSnapshotCommandId();
@@ -2507,13 +2533,11 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				QueryCompletion qc;
 
 				/*
-				 * If the SPI context is atomic, or we are asked to manage
-				 * snapshots, then we are in an atomic execution context.
-				 * Conversely, to propagate a nonatomic execution context, the
-				 * caller must be in a nonatomic SPI context and manage
-				 * snapshots itself.
+				 * If the SPI context is atomic, or we were not told to allow
+				 * nonatomic operations, tell ProcessUtility this is an atomic
+				 * execution context.
 				 */
-				if (_SPI_current->atomic || !no_snapshots)
+				if (_SPI_current->atomic || !allow_nonatomic)
 					context = PROCESS_UTILITY_QUERY;
 				else
 					context = PROCESS_UTILITY_QUERY_NONATOMIC;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 1432554d5a..d92a512839 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -349,6 +349,13 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel,
 	EState	   *estate;
 	RangeTblEntry *rte;
 
+	/*
+	 * Input functions may need an active snapshot, as may AFTER triggers
+	 * invoked during finish_estate.  For safety, ensure an active snapshot
+	 * exists throughout all our usage of the executor.
+	 */
+	PushActiveSnapshot(GetTransactionSnapshot());
+
 	estate = CreateExecutorState();
 
 	rte = makeNode(RangeTblEntry);
@@ -400,6 +407,7 @@ finish_estate(EState *estate)
 	/* Cleanup. */
 	ExecResetTupleTable(estate->es_tupleTable, false);
 	FreeExecutorState(estate);
+	PopActiveSnapshot();
 }
 
 /*
@@ -1212,9 +1220,6 @@ apply_handle_insert(StringInfo s)
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
 
-	/* Input functions may need an active snapshot, so get one */
-	PushActiveSnapshot(GetTransactionSnapshot());
-
 	/* Process and store remote tuple in the slot */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel, &newtup);
@@ -1229,8 +1234,6 @@ apply_handle_insert(StringInfo s)
 		apply_handle_insert_internal(resultRelInfo, estate,
 									 remoteslot);
 
-	PopActiveSnapshot();
-
 	finish_estate(estate);
 
 	logicalrep_rel_close(rel, NoLock);
@@ -1358,8 +1361,6 @@ apply_handle_update(StringInfo s)
 	/* Also populate extraUpdatedCols, in case we have generated columns */
 	fill_extraUpdatedCols(target_rte, rel->localrel);
 
-	PushActiveSnapshot(GetTransactionSnapshot());
-
 	/* Build the search tuple. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel,
@@ -1374,8 +1375,6 @@ apply_handle_update(StringInfo s)
 		apply_handle_update_internal(resultRelInfo, estate,
 									 remoteslot, &newtup, rel);
 
-	PopActiveSnapshot();
-
 	finish_estate(estate);
 
 	logicalrep_rel_close(rel, NoLock);
@@ -1482,8 +1481,6 @@ apply_handle_delete(StringInfo s)
 										RelationGetDescr(rel->localrel),
 										&TTSOpsVirtual);
 
-	PushActiveSnapshot(GetTransactionSnapshot());
-
 	/* Build the search tuple. */
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel, &oldtup);
@@ -1497,8 +1494,6 @@ apply_handle_delete(StringInfo s)
 		apply_handle_delete_internal(resultRelInfo, estate,
 									 remoteslot, &rel->remoterel);
 
-	PopActiveSnapshot();
-
 	finish_estate(estate);
 
 	logicalrep_rel_close(rel, NoLock);
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 44f5fe8fc9..f7f08e6c05 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -476,6 +476,13 @@ PortalStart(Portal portal, ParamListInfo params,
 				else
 					PushActiveSnapshot(GetTransactionSnapshot());
 
+				/*
+				 * We could remember the snapshot in portal->portalSnapshot,
+				 * but presently there seems no need to, as this code path
+				 * cannot be used for non-atomic execution.  Hence there can't
+				 * be any commit/abort that might destroy the snapshot.
+				 */
+
 				/*
 				 * Create QueryDesc in portal's context; for the moment, set
 				 * the destination to DestNone.
@@ -1116,45 +1123,26 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
 				 bool isTopLevel, bool setHoldSnapshot,
 				 DestReceiver *dest, QueryCompletion *qc)
 {
-	Node	   *utilityStmt = pstmt->utilityStmt;
-	Snapshot	snapshot;
-
 	/*
-	 * Set snapshot if utility stmt needs one.  Most reliable way to do this
-	 * seems to be to enumerate those that do not need one; this is a short
-	 * list.  Transaction control, LOCK, and SET must *not* set a snapshot
-	 * since they need to be executable at the start of a transaction-snapshot
-	 * mode transaction without freezing a snapshot.  By extension we allow
-	 * SHOW not to set a snapshot.  The other stmts listed are just efficiency
-	 * hacks.  Beware of listing anything that can modify the database --- if,
-	 * say, it has to update an index with expressions that invoke
-	 * user-defined functions, then it had better have a snapshot.
+	 * Set snapshot if utility stmt needs one.
 	 */
-	if (!(IsA(utilityStmt, TransactionStmt) ||
-		  IsA(utilityStmt, LockStmt) ||
-		  IsA(utilityStmt, VariableSetStmt) ||
-		  IsA(utilityStmt, VariableShowStmt) ||
-		  IsA(utilityStmt, ConstraintsSetStmt) ||
-	/* efficiency hacks from here down */
-		  IsA(utilityStmt, FetchStmt) ||
-		  IsA(utilityStmt, ListenStmt) ||
-		  IsA(utilityStmt, NotifyStmt) ||
-		  IsA(utilityStmt, UnlistenStmt) ||
-		  IsA(utilityStmt, CheckPointStmt)))
+	if (PlannedStmtRequiresSnapshot(pstmt))
 	{
-		snapshot = GetTransactionSnapshot();
+		Snapshot	snapshot = GetTransactionSnapshot();
+
 		/* If told to, register the snapshot we're using and save in portal */
 		if (setHoldSnapshot)
 		{
 			snapshot = RegisterSnapshot(snapshot);
 			portal->holdSnapshot = snapshot;
 		}
+		/* In any case, make the snapshot active and remember it in portal */
 		PushActiveSnapshot(snapshot);
 		/* PushActiveSnapshot might have copied the snapshot */
-		snapshot = GetActiveSnapshot();
+		portal->portalSnapshot = GetActiveSnapshot();
 	}
 	else
-		snapshot = NULL;
+		portal->portalSnapshot = NULL;
 
 	ProcessUtility(pstmt,
 				   portal->sourceText,
@@ -1168,13 +1156,17 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
 	MemoryContextSwitchTo(portal->portalContext);
 
 	/*
-	 * Some utility commands may pop the ActiveSnapshot stack from under us,
-	 * so be careful to only pop the stack if our snapshot is still at the
-	 * top.
+	 * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
+	 * under us, so don't complain if it's now empty.  Otherwise, our snapshot
+	 * should be the top one; pop it.  Note that this could be a different
+	 * snapshot from the one we made above; see EnsurePortalSnapshotExists.
 	 */
-	if (snapshot != NULL && ActiveSnapshotSet() &&
-		snapshot == GetActiveSnapshot())
+	if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
+	{
+		Assert(portal->portalSnapshot == GetActiveSnapshot());
 		PopActiveSnapshot();
+	}
+	portal->portalSnapshot = NULL;
 }
 
 /*
@@ -1256,6 +1248,12 @@ PortalRunMulti(Portal portal,
 				 * from what holdSnapshot has.)
 				 */
 				PushCopiedSnapshot(snapshot);
+
+				/*
+				 * As for PORTAL_ONE_SELECT portals, it does not seem
+				 * necessary to maintain portal->portalSnapshot here.
+				 */
+
 				active_snapshot_set = true;
 			}
 			else
@@ -1692,3 +1690,78 @@ DoPortalRewind(Portal portal)
 	portal->atEnd = false;
 	portal->portalPos = 0;
 }
+
+/*
+ * PlannedStmtRequiresSnapshot - what it says on the tin
+ */
+bool
+PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
+{
+	Node	   *utilityStmt = pstmt->utilityStmt;
+
+	/* If it's not a utility statement, it definitely needs a snapshot */
+	if (utilityStmt == NULL)
+		return true;
+
+	/*
+	 * Most utility statements need a snapshot, and the default presumption
+	 * about new ones should be that they do too.  Hence, enumerate those that
+	 * do not need one.
+	 *
+	 * Transaction control, LOCK, and SET must *not* set a snapshot, since
+	 * they need to be executable at the start of a transaction-snapshot-mode
+	 * transaction without freezing a snapshot.  By extension we allow SHOW
+	 * not to set a snapshot.  The other stmts listed are just efficiency
+	 * hacks.  Beware of listing anything that can modify the database --- if,
+	 * say, it has to update an index with expressions that invoke
+	 * user-defined functions, then it had better have a snapshot.
+	 */
+	if (IsA(utilityStmt, TransactionStmt) ||
+		IsA(utilityStmt, LockStmt) ||
+		IsA(utilityStmt, VariableSetStmt) ||
+		IsA(utilityStmt, VariableShowStmt) ||
+		IsA(utilityStmt, ConstraintsSetStmt) ||
+	/* efficiency hacks from here down */
+		IsA(utilityStmt, FetchStmt) ||
+		IsA(utilityStmt, ListenStmt) ||
+		IsA(utilityStmt, NotifyStmt) ||
+		IsA(utilityStmt, UnlistenStmt) ||
+		IsA(utilityStmt, CheckPointStmt))
+		return false;
+
+	return true;
+}
+
+/*
+ * EnsurePortalSnapshotExists - recreate Portal-level snapshot, if needed
+ *
+ * Generally, we will have an active snapshot whenever we are executing
+ * inside a Portal, unless the Portal's query is one of the utility
+ * statements exempted from that rule (see PlannedStmtRequiresSnapshot).
+ * However, procedures and DO blocks can commit or abort the transaction,
+ * and thereby destroy all snapshots.  This function can be called to
+ * re-establish the Portal-level snapshot when none exists.
+ */
+void
+EnsurePortalSnapshotExists(void)
+{
+	Portal		portal;
+
+	/*
+	 * Nothing to do if a snapshot is set.  (We take it on faith that the
+	 * outermost active snapshot belongs to some Portal; or if there is no
+	 * Portal, it's somebody else's responsibility to manage things.)
+	 */
+	if (ActiveSnapshotSet())
+		return;
+
+	/* Otherwise, we'd better have an active Portal */
+	portal = ActivePortal;
+	Assert(portal != NULL);
+	Assert(portal->portalSnapshot == NULL);
+
+	/* Create a new snapshot and make it active */
+	PushActiveSnapshot(GetTransactionSnapshot());
+	/* PushActiveSnapshot might have copied the snapshot */
+	portal->portalSnapshot = GetActiveSnapshot();
+}
diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c
index 66e3181815..5c30e141f5 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -502,6 +502,9 @@ PortalDrop(Portal portal, bool isTopCommit)
 		portal->cleanup = NULL;
 	}
 
+	/* There shouldn't be an active snapshot anymore, except after error */
+	Assert(portal->portalSnapshot == NULL || !isTopCommit);
+
 	/*
 	 * Remove portal from hash table.  Because we do this here, we will not
 	 * come back to try to remove the portal again if there's any error in the
@@ -709,6 +712,8 @@ PreCommit_Portals(bool isPrepare)
 				portal->holdSnapshot = NULL;
 			}
 			portal->resowner = NULL;
+			/* Clear portalSnapshot too, for cleanliness */
+			portal->portalSnapshot = NULL;
 			continue;
 		}
 
@@ -1278,3 +1283,54 @@ HoldPinnedPortals(void)
 		}
 	}
 }
+
+/*
+ * Drop the outer active snapshots for all portals, so that no snapshots
+ * remain active.
+ *
+ * Like HoldPinnedPortals, this must be called when initiating a COMMIT or
+ * ROLLBACK inside a procedure.  This has to be separate from that since it
+ * should not be run until we're done with steps that are likely to fail.
+ *
+ * It's tempting to fold this into PreCommit_Portals, but to do so, we'd
+ * need to clean up snapshot management in VACUUM and perhaps other places.
+ */
+void
+ForgetPortalSnapshots(void)
+{
+	HASH_SEQ_STATUS status;
+	PortalHashEnt *hentry;
+	int			numPortalSnaps = 0;
+	int			numActiveSnaps = 0;
+
+	/* First, scan PortalHashTable and clear portalSnapshot fields */
+	hash_seq_init(&status, PortalHashTable);
+
+	while ((hentry = (PortalHashEnt *) hash_seq_search(&status)) != NULL)
+	{
+		Portal		portal = hentry->portal;
+
+		if (portal->portalSnapshot != NULL)
+		{
+			portal->portalSnapshot = NULL;
+			numPortalSnaps++;
+		}
+		/* portal->holdSnapshot will be cleaned up in PreCommit_Portals */
+	}
+
+	/*
+	 * Now, pop all the active snapshots, which should be just those that were
+	 * portal snapshots.  Ideally we'd drive this directly off the portal
+	 * scan, but there's no good way to visit the portals in the correct
+	 * order.  So just cross-check after the fact.
+	 */
+	while (ActiveSnapshotSet())
+	{
+		PopActiveSnapshot();
+		numActiveSnaps++;
+	}
+
+	if (numPortalSnaps != numActiveSnaps)
+		elog(ERROR, "portal snapshots (%d) did not account for all active snapshots (%d)",
+			 numPortalSnaps, numActiveSnaps);
+}
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index ed2c4d374b..2318f04ff0 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -17,6 +17,8 @@
 #include "nodes/parsenodes.h"
 #include "utils/portal.h"
 
+struct PlannedStmt;				/* avoid including plannodes.h here */
+
 
 extern PGDLLIMPORT Portal ActivePortal;
 
@@ -42,4 +44,8 @@ extern uint64 PortalRunFetch(Portal portal,
 							 long count,
 							 DestReceiver *dest);
 
+extern bool PlannedStmtRequiresSnapshot(struct PlannedStmt *pstmt);
+
+extern void EnsurePortalSnapshotExists(void);
+
 #endif							/* PQUERY_H */
diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h
index 3c17b039cc..2e5bbdd0ec 100644
--- a/src/include/utils/portal.h
+++ b/src/include/utils/portal.h
@@ -160,6 +160,14 @@ typedef struct PortalData
 	/* and these are the format codes to use for the columns: */
 	int16	   *formats;		/* a format code for each column */
 
+	/*
+	 * Outermost ActiveSnapshot for execution of the portal's queries.  For
+	 * all but a few utility commands, we require such a snapshot to exist.
+	 * This ensures that TOAST references in query results can be detoasted,
+	 * and helps to reduce thrashing of the process's exposed xmin.
+	 */
+	Snapshot	portalSnapshot; /* active snapshot, or NULL if none */
+
 	/*
 	 * Where we store tuples for a held cursor or a PORTAL_ONE_RETURNING or
 	 * PORTAL_UTIL_SELECT query.  (A cursor held past the end of its
@@ -237,5 +245,6 @@ extern void PortalCreateHoldStore(Portal portal);
 extern void PortalHashTableDeleteAll(void);
 extern bool ThereAreNoReadyPortals(void);
 extern void HoldPinnedPortals(void);
+extern void ForgetPortalSnapshots(void);
 
 #endif							/* PORTAL_H */
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index b4c70aaa7f..5b0f698ac7 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -2159,7 +2159,6 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 	PLpgSQL_expr *expr = stmt->expr;
 	LocalTransactionId before_lxid;
 	LocalTransactionId after_lxid;
-	bool		pushed_active_snap = false;
 	ParamListInfo paramLI;
 	SPIExecuteOptions options;
 	int			rc;
@@ -2189,22 +2188,13 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 
 	before_lxid = MyProc->lxid;
 
-	/*
-	 * The procedure call could end transactions, which would upset the
-	 * snapshot management in SPI_execute*, so handle snapshots here instead.
-	 * Set a snapshot only for non-read-only procedures, similar to SPI
-	 * behavior.
-	 */
-	if (!estate->readonly_func)
-	{
-		PushActiveSnapshot(GetTransactionSnapshot());
-		pushed_active_snap = true;
-	}
-
 	/*
 	 * If we have a procedure-lifespan resowner, use that to hold the refcount
 	 * for the plan.  This avoids refcount leakage complaints if the called
 	 * procedure ends the current transaction.
+	 *
+	 * Also, disable SPI's snapshot management; this is a hacky way of telling
+	 * it to allow non-atomicness to propagate to the callee.
 	 */
 	memset(&options, 0, sizeof(options));
 	options.params = paramLI;
@@ -2220,17 +2210,7 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 
 	after_lxid = MyProc->lxid;
 
-	if (before_lxid == after_lxid)
-	{
-		/*
-		 * If we are still in the same transaction after the call, pop the
-		 * snapshot that we might have pushed.  (If it's a new transaction,
-		 * then all the snapshots are gone already.)
-		 */
-		if (pushed_active_snap)
-			PopActiveSnapshot();
-	}
-	else
+	if (before_lxid != after_lxid)
 	{
 		/*
 		 * If we are in a new transaction after the call, we need to build new
@@ -4954,6 +4934,7 @@ exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt)
  *
  * We just parse and execute the statement normally, but we have to do it
  * without setting a snapshot, for things like SET TRANSACTION.
+ * XXX spi.c now handles this correctly, so we no longer need a special case.
  */
 static int
 exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
@@ -4967,7 +4948,6 @@ exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
 
 	memset(&options, 0, sizeof(options));
 	options.read_only = estate->readonly_func;
-	options.no_snapshots = true;	/* disable SPI's snapshot management */
 
 	rc = SPI_execute_plan_extended(expr->plan, &options);
 
diff --git a/src/test/isolation/expected/plpgsql-toast.out b/src/test/isolation/expected/plpgsql-toast.out
index fc557da5e7..61a33c113c 100644
--- a/src/test/isolation/expected/plpgsql-toast.out
+++ b/src/test/isolation/expected/plpgsql-toast.out
@@ -192,3 +192,30 @@ pg_advisory_unlock
 t              
 s1: NOTICE:  length(r) = 6002
 step assign5: <... completed>
+
+starting permutation: fetch-after-commit
+pg_advisory_unlock_all
+
+               
+pg_advisory_unlock_all
+
+               
+s1: NOTICE:  length(t) = 6000
+s1: NOTICE:  length(t) = 9000
+s1: NOTICE:  length(t) = 12000
+step fetch-after-commit: 
+do $$
+  declare
+    r record;
+    t text;
+  begin
+    insert into test1 values (2, repeat('bar', 3000));
+    insert into test1 values (3, repeat('baz', 4000));
+    for r in select test1.a from test1 loop
+      commit;
+      select b into t from test1 where a = r.a;
+      raise notice 'length(t) = %', length(t);
+    end loop;
+  end;
+$$;
+
diff --git a/src/test/isolation/specs/plpgsql-toast.spec b/src/test/isolation/specs/plpgsql-toast.spec
index fe7090addb..8806e3e6dd 100644
--- a/src/test/isolation/specs/plpgsql-toast.spec
+++ b/src/test/isolation/specs/plpgsql-toast.spec
@@ -112,6 +112,26 @@ do $$
 $$;
 }
 
+# Check that the results of a query can be detoasted just after committing
+# (there's no interaction with VACUUM here)
+step "fetch-after-commit"
+{
+do $$
+  declare
+    r record;
+    t text;
+  begin
+    insert into test1 values (2, repeat('bar', 3000));
+    insert into test1 values (3, repeat('baz', 4000));
+    for r in select test1.a from test1 loop
+      commit;
+      select b into t from test1 where a = r.a;
+      raise notice 'length(t) = %', length(t);
+    end loop;
+  end;
+$$;
+}
+
 session "s2"
 setup
 {
@@ -135,3 +155,4 @@ permutation "lock" "assign2" "vacuum" "unlock"
 permutation "lock" "assign3" "vacuum" "unlock"
 permutation "lock" "assign4" "vacuum" "unlock"
 permutation "lock" "assign5" "vacuum" "unlock"
+permutation "fetch-after-commit"
