*** a/doc/src/sgml/mvcc.sgml
--- b/doc/src/sgml/mvcc.sgml
***************
*** 386,407 **** COMMIT;
      behave the same as <command>SELECT</command>
      in terms of searching for target rows: they will only find target rows
      that were committed as of the transaction start time.  However, such a
!     target
!     row might have already been updated (or deleted or locked) by
!     another concurrent transaction by the time it is found.  In this case, the
!     serializable transaction will wait for the first updating transaction to commit or
!     roll back (if it is still in progress).  If the first updater rolls back,
!     then its effects are negated and the serializable transaction can proceed
!     with updating the originally found row.  But if the first updater commits
!     (and actually updated or deleted the row, not just locked it)
!     then the serializable transaction will be rolled back with the message
  
  <screen>
  ERROR:  could not serialize access due to concurrent update
  </screen>
  
!     because a serializable transaction cannot modify or lock rows changed by
!     other transactions after the serializable transaction began.
     </para>
  
     <para>
--- 386,407 ----
      behave the same as <command>SELECT</command>
      in terms of searching for target rows: they will only find target rows
      that were committed as of the transaction start time.  However, such a
!     target row might have already been subject to a concurrent
!     <command>UPDATE</command>, <command>DELETE</command>, <command>SELECT
!     FOR UPDATE</command>, or <command>SELECT FOR SHARE</command>. In this case,
!     the serializable transaction will wait for the other transaction to commit
!     or roll back (if it is still in progress). If it rolls back then its effects
!     are negated and the serializable transaction can proceed with modifying
!     or locking the originally found row. If it commits, and the two commands
!     conflict according to <xref linkend="mvcc-serialization-conflicts">,
!     the serializable transaction is rolled back with the message
  
  <screen>
  ERROR:  could not serialize access due to concurrent update
  </screen>
  
!     since serializable transaction cannot simply proceed with the newer row
!     version like read committed ones do.
     </para>
  
     <para>
***************
*** 418,423 **** ERROR:  could not serialize access due to concurrent update
--- 418,463 ----
      transactions will never have serialization conflicts.
     </para>
  
+    <table tocentry="1" id="mvcc-serialization-conflicts">
+     <title>Serialization Conflicts</title>
+     <tgroup cols="4">
+      <colspec colnum="2" colname="lockst">
+      <colspec colnum="4" colname="lockend">
+      <spanspec namest="lockst" nameend="lockend" spanname="lockreq">
+      <thead>
+       <row>
+        <entry morerows="1">Serializable Transaction</entry>
+        <entry spanname="lockreq">Concurrent Transaction</entry>
+       </row>
+       <row>
+        <entry><command>UPDATE</command>, <command>DELETE</command></entry>
+        <entry><command>SELECT FOR UPDATE</command></entry>
+        <entry><command>SELECT FOR SHARE</command></entry>
+       </row>
+      </thead>
+      <tbody>
+       <row>
+        <entry><command>UPDATE</command>, <command>DELETE</command></entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+       </row>
+       <row>
+        <entry><command>SELECT FOR UPDATE</command></entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+        <entry align="center">X</entry>
+       </row>
+       <row>
+        <entry><command>SELECT FOR SHARE</command></entry>
+        <entry align="center">X</entry>
+        <entry align="center"></entry>
+        <entry align="center"></entry>
+       </row>
+      </tbody>
+     </tgroup>
+    </table>
+ 
     <para>
      The Serializable mode provides a rigorous guarantee that each
      transaction sees a wholly consistent view of the database.  However,
***************
*** 921,926 **** SELECT SUM(value) FROM mytab WHERE class = 2;
--- 961,974 ----
      </para>
  
      <para>
+       Serializable transactions are affected by concurrent
+       <command>SELECT FOR SHARE</command> and <command>SELECT FOR UPDATE</command>
+       for longer than those locks are actually held, and may be aborted
+       when trying to obtain a conflicting lock. For details,
+       see <xref linkend="xact-serializable">
+     </para>
+ 
+     <para>
       <productname>PostgreSQL</productname> doesn't remember any
       information about modified rows in memory, so there is no limit on
       the number of rows locked at one time.  However, locking a row
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 83,88 **** static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
--- 83,89 ----
  				bool all_visible_cleared, bool new_all_visible_cleared);
  static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
  					   HeapTuple oldtup, HeapTuple newtup);
+ static bool HeapSatisfiesLockersVisible(HeapTupleHeader tuple, Snapshot snapshot);
  
  
  /* ----------------------------------------------------------------
***************
*** 2033,2040 **** simple_heap_insert(Relation relation, HeapTuple tup)
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - delete command ID (used for visibility test, and stored into
   *		cmax if successful)
-  *	crosscheck - if not InvalidSnapshot, also check tuple against this
   *	wait - true if should wait for any conflicting update to commit/abort
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we did delete it.  Failure return codes are
--- 2034,2042 ----
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - delete command ID (used for visibility test, and stored into
   *		cmax if successful)
   *	wait - true if should wait for any conflicting update to commit/abort
+  *	lockcheck_snapshot - if not NULL, report the tuple as updated if it
+  *		was locked by a transaction not visible under this snapshot
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
   * actually means we did delete it.  Failure return codes are
***************
*** 2049,2055 **** simple_heap_insert(Relation relation, HeapTuple tup)
  HTSU_Result
  heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
--- 2051,2057 ----
  HTSU_Result
  heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
***************
*** 2171,2189 **** l1:
  			result = HeapTupleUpdated;
  	}
  
! 	if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
! 	{
! 		/* Perform additional check for transaction-snapshot mode RI updates */
! 		if (!HeapTupleSatisfiesVisibility(&tp, crosscheck, buffer))
! 			result = HeapTupleUpdated;
! 	}
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
  		*ctid = tp.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tp.t_data);
  		UnlockReleaseBuffer(buffer);
--- 2173,2190 ----
  			result = HeapTupleUpdated;
  	}
  
! 	/* Verify visibility of locking transactions. */
! 	if ((result == HeapTupleMayBeUpdated) &&
! 		!HeapSatisfiesLockersVisible(tp.t_data, lockcheck_snapshot))
! 		result = HeapTupleUpdated;
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
! 			   (tp.t_data->t_infomask & HEAP_IS_LOCKED));
  		*ctid = tp.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tp.t_data);
  		UnlockReleaseBuffer(buffer);
***************
*** 2313,2320 **** simple_heap_delete(Relation relation, ItemPointer tid)
  
  	result = heap_delete(relation, tid,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true), InvalidSnapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 2314,2322 ----
  
  	result = heap_delete(relation, tid,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true),
! 						 true /* wait for commit */ ,
! 						 InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
***************
*** 2349,2355 **** simple_heap_delete(Relation relation, ItemPointer tid)
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - update command ID (used for visibility test, and stored into
   *		cmax/cmin if successful)
!  *	crosscheck - if not InvalidSnapshot, also check old tuple against this
   *	wait - true if should wait for any conflicting update to commit/abort
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
--- 2351,2358 ----
   *	update_xmax - output parameter, used only for failure case (see below)
   *	cid - update command ID (used for visibility test, and stored into
   *		cmax/cmin if successful)
!  *	lockcheck_snapshot - if not NULL, report the tuple as updated if it
!  *		was locked by a transaction not visible under this snapshot
   *	wait - true if should wait for any conflicting update to commit/abort
   *
   * Normal, successful return value is HeapTupleMayBeUpdated, which
***************
*** 2371,2377 **** simple_heap_delete(Relation relation, ItemPointer tid)
  HTSU_Result
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
--- 2374,2380 ----
  HTSU_Result
  heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot)
  {
  	HTSU_Result result;
  	TransactionId xid = GetCurrentTransactionId();
***************
*** 2523,2541 **** l2:
  			result = HeapTupleUpdated;
  	}
  
! 	if (crosscheck != InvalidSnapshot && result == HeapTupleMayBeUpdated)
! 	{
! 		/* Perform additional check for transaction-snapshot mode RI updates */
! 		if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
! 			result = HeapTupleUpdated;
! 	}
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
  		*ctid = oldtup.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
  		UnlockReleaseBuffer(buffer);
--- 2526,2543 ----
  			result = HeapTupleUpdated;
  	}
  
! 	/* Verify visibility of locking transactions. */
! 	if ((result == HeapTupleMayBeUpdated) &&
! 		!HeapSatisfiesLockersVisible(oldtup.t_data, lockcheck_snapshot))
! 		result = HeapTupleUpdated;
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated ||
  			   result == HeapTupleUpdated ||
  			   result == HeapTupleBeingUpdated);
! 		Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID) ||
! 			   (oldtup.t_data->t_infomask & HEAP_IS_LOCKED));
  		*ctid = oldtup.t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
  		UnlockReleaseBuffer(buffer);
***************
*** 2961,2966 **** HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,
--- 2963,3063 ----
  	return true;
  }
  
+ 
+ /*
+  * Returns false if one of the tuple's lockers committed but aren't visible
+  * according to lockcheck_snapshot, excluding subtransactions of the current
+  * transaction.  Assumes that all locking transaction either committed or
+  * aborted, but aren't still in progress.
+  */
+ static bool
+ HeapSatisfiesLockersVisible(HeapTupleHeader tuple,
+ 							Snapshot lockcheck_snapshot)
+ {
+ 	if (lockcheck_snapshot == InvalidSnapshot)
+ 		return true;
+ 
+ 	if (tuple->t_infomask & HEAP_IS_LOCKED)
+ 	{
+ 		/*
+ 		 * If the tuple was locked, we now check whether the locking
+ 		 * transaction(s) are visible under lockcheck_snapshot. If
+ 		 * they aren't, we pretend that the tuple was updated.
+ 		 */
+ 
+ 		if (tuple->t_infomask & HEAP_XMAX_IS_MULTI)
+ 		{
+ 			TransactionId* xids;
+ 			int xids_l = GetMultiXactIdMembers(HeapTupleHeaderGetXmax(tuple),
+ 											   &xids);
+ 
+ 			if (xids_l < 1)
+ 			{
+ 				/*
+ 				 * The multi xact either is too old to be inspected or
+ 				 * doesn't contain members.  The second case is probably
+ 				 * impossible, but even if not it doesn't pose any problem.
+ 				 *
+ 				 * In the first case, we have to trust that all xids that
+ 				 * were contained in the xact are in fact visible under
+ 				 * lockcheck_snapshot. Currently this is always the case,
+ 				 * since lockcheck_snapshot is always the transaction's
+ 				 * snapshot, and we call MultiXactIdSetOldestVisible() before
+ 				 * acquiring that snapshot.
+ 				 */
+ 				return true;
+ 			}
+ 			else
+ 			{
+ 				int i;
+ 				for (i = 0; i < xids_l; ++i)
+ 				{
+ 					/* Ignore our own subtransactions */
+ 					if (TransactionIdIsCurrentTransactionId(xids[i]))
+ 						continue;
+ 
+ 					/*
+ 					 * We expect to be called after the locking transactions'
+ 					 * fates have been decided
+ 					 */
+ 					Assert(!TransactionIdIsInProgress(xids[i]));
+ 
+ 					if (!TransactionIdDidAbort(xids[i]) &&
+ 					    XidInMVCCSnapshot(xids[i], lockcheck_snapshot))
+ 					{
+ 						/* Non-aborted, invisible locker */
+ 						return false;
+ 					}
+ 				}
+ 				return true;
+ 			}
+ 		}
+ 		else
+ 		{
+ 			TransactionId xid = HeapTupleHeaderGetXmax(tuple);
+ 
+ 			/* Ignore our own subtransactions */
+ 			if (TransactionIdIsCurrentTransactionId(xid))
+ 				return true;
+ 
+ 			/*
+ 			 * We expect to be called after the locking transactions' fates
+ 			 * have been decided
+ 			 */
+ 			Assert(!TransactionIdIsInProgress(xid));
+ 
+ 			/* Locker must either be visible or have aborted */
+ 			return TransactionIdDidAbort(xid) ||
+ 				   !XidInMVCCSnapshot(xid, lockcheck_snapshot);
+ 		}
+ 	}
+ 	else
+ 	{
+ 		/* Tuple wasn't locked */
+ 		return true;
+ 	}
+ }
+ 
  /*
   *	simple_heap_update - replace a tuple
   *
***************
*** 2978,2985 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
  
  	result = heap_update(relation, otid, tup,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true), InvalidSnapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 3075,3082 ----
  
  	result = heap_update(relation, otid, tup,
  						 &update_ctid, &update_xmax,
! 						 GetCurrentCommandId(true),
! 						 true /* wait for commit */, InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
***************
*** 3013,3018 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
--- 3110,3118 ----
   *		tuple's cmax if lock is successful)
   *	mode: indicates if shared or exclusive tuple lock is desired
   *	nowait: if true, ereport rather than blocking if lock not available
+  *	lockcheck_snapshot: if not NULL, report the tuple as updated if it
+  *						was locked by a transaction not visible under
+  *						this snapshot
   *
   * Output parameters:
   *	*tuple: all fields filled in
***************
*** 3066,3072 **** simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
  HTSU_Result
  heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
  				ItemPointer ctid, TransactionId *update_xmax,
! 				CommandId cid, LockTupleMode mode, bool nowait)
  {
  	HTSU_Result result;
  	ItemPointer tid = &(tuple->t_self);
--- 3166,3173 ----
  HTSU_Result
  heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
  				ItemPointer ctid, TransactionId *update_xmax,
! 				CommandId cid, LockTupleMode mode, bool nowait,
! 				Snapshot lockcheck_snapshot)
  {
  	HTSU_Result result;
  	ItemPointer tid = &(tuple->t_self);
***************
*** 3247,3256 **** l3:
  			result = HeapTupleUpdated;
  	}
  
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
! 		Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
  		*ctid = tuple->t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tuple->t_data);
  		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
--- 3348,3363 ----
  			result = HeapTupleUpdated;
  	}
  
+ 	/* Verify visibility of locking transactions */
+ 	if ((result == HeapTupleMayBeUpdated) &&
+ 		!HeapSatisfiesLockersVisible(tuple->t_data, lockcheck_snapshot))
+ 		result = HeapTupleUpdated;
+ 
  	if (result != HeapTupleMayBeUpdated)
  	{
  		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
! 		Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
! 			   (tuple->t_data->t_infomask & HEAP_IS_LOCKED));
  		*ctid = tuple->t_data->t_ctid;
  		*update_xmax = HeapTupleHeaderGetXmax(tuple->t_data);
  		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
*** a/src/backend/access/transam/multixact.c
--- b/src/backend/access/transam/multixact.c
***************
*** 211,217 **** static MemoryContext MXactContext = NULL;
  #endif
  
  /* internal MultiXactId management */
- static void MultiXactIdSetOldestVisible(void);
  static MultiXactId CreateMultiXactId(int nxids, TransactionId *xids);
  static void RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
  				   int nxids, TransactionId *xids);
--- 211,216 ----
***************
*** 531,537 **** MultiXactIdSetOldestMember(void)
   * there is no live transaction, now or later, that can be a member of any
   * MultiXactId older than the OldestVisibleMXactId we compute here.
   */
! static void
  MultiXactIdSetOldestVisible(void)
  {
  	if (!MultiXactIdIsValid(OldestVisibleMXactId[MyBackendId]))
--- 530,536 ----
   * there is no live transaction, now or later, that can be a member of any
   * MultiXactId older than the OldestVisibleMXactId we compute here.
   */
! void
  MultiXactIdSetOldestVisible(void)
  {
  	if (!MultiXactIdIsValid(OldestVisibleMXactId[MyBackendId]))
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 1088,1094 **** DoCopy(const CopyStmt *stmt, const char *queryString)
  		/* Create a QueryDesc requesting no output */
  		cstate->queryDesc = CreateQueryDesc(plan, queryString,
  											GetActiveSnapshot(),
- 											InvalidSnapshot,
  											dest, NULL, 0);
  
  		/*
--- 1088,1093 ----
*** a/src/backend/commands/explain.c
--- b/src/backend/commands/explain.c
***************
*** 372,378 **** ExplainOnePlan(PlannedStmt *plannedstmt, ExplainState *es,
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt, queryString,
! 								GetActiveSnapshot(), InvalidSnapshot,
  								None_Receiver, params, instrument_option);
  
  	INSTR_TIME_SET_CURRENT(starttime);
--- 372,378 ----
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt, queryString,
! 								GetActiveSnapshot(),
  								None_Receiver, params, instrument_option);
  
  	INSTR_TIME_SET_CURRENT(starttime);
*** a/src/backend/commands/trigger.c
--- b/src/backend/commands/trigger.c
***************
*** 2367,2380 **** GetTupleForTrigger(EState *estate,
  		Assert(epqstate != NULL);
  
  		/*
! 		 * lock tuple for update
  		 */
  ltrmark:;
  		tuple.t_self = *tid;
  		test = heap_lock_tuple(relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   LockTupleExclusive, false);
  		switch (test)
  		{
  			case HeapTupleSelfUpdated:
--- 2367,2386 ----
  		Assert(epqstate != NULL);
  
  		/*
! 		 * Lock tuple for update.
! 		 *
! 		 * Transaction snapshot mode transactions pass their snapshot as the
! 		 * logcheck_snapshot.  This lets heap_lock_tuple report concurrently
! 		 * FOR SHARE or FOR UPDATE locked tuples as HeapTupleUpdated.
  		 */
  ltrmark:;
  		tuple.t_self = *tid;
  		test = heap_lock_tuple(relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   LockTupleExclusive, false,
! 							   IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 															 InvalidSnapshot);
  		switch (test)
  		{
  			case HeapTupleSelfUpdated:
*** a/src/backend/executor/README
--- b/src/backend/executor/README
***************
*** 139,157 **** be hidden inside function calls).  This case has a flow of control like
  		(a separate FreeExprContext call is not necessary)
  
  
! EvalPlanQual (READ COMMITTED Update Checking)
! ---------------------------------------------
  
  For simple SELECTs, the executor need only pay attention to tuples that are
  valid according to the snapshot seen by the current transaction (ie, they
  were inserted by a previously committed transaction, and not deleted by any
  previously committed transaction).  However, for UPDATE and DELETE it is not
  cool to modify or delete a tuple that's been modified by an open or
! concurrently-committed transaction.  If we are running in SERIALIZABLE
! isolation level then we just raise an error when this condition is seen to
! occur.  In READ COMMITTED isolation level, we must work a lot harder.
  
! The basic idea in READ COMMITTED mode is to take the modified tuple
  committed by the concurrent transaction (after waiting for it to commit,
  if need be) and re-evaluate the query qualifications to see if it would
  still meet the quals.  If so, we regenerate the updated tuple (if we are
--- 139,158 ----
  		(a separate FreeExprContext call is not necessary)
  
  
! EvalPlanQual (Statement-Snapshot Update Checking)
! -------------------------------------------------
  
  For simple SELECTs, the executor need only pay attention to tuples that are
  valid according to the snapshot seen by the current transaction (ie, they
  were inserted by a previously committed transaction, and not deleted by any
  previously committed transaction).  However, for UPDATE and DELETE it is not
  cool to modify or delete a tuple that's been modified by an open or
! concurrently-committed transaction.  If we are running in a transaction-
! snapshot isolation level then we just raise an error when this condition is
! seen to occur.  In statement-snapshot isolation levels, we must work a lot
! harder.
  
! The basic idea in statement-snapshot modes is to take the modified tuple
  committed by the concurrent transaction (after waiting for it to commit,
  if need be) and re-evaluate the query qualifications to see if it would
  still meet the quals.  If so, we regenerate the updated tuple (if we are
***************
*** 195,197 **** is no explicit prohibition on SRFs in UPDATE, but the net effect will be
--- 196,227 ----
  that only the first result row of an SRF counts, because all subsequent
  rows will result in attempts to re-update an already updated target row.
  This is historical behavior and seems not worth changing.)
+ 
+ Row Locks and Transaction-Snapshot Mode Transactions
+ ----------------------------------------------------
+ 
+ In a statement-snapshot mode, a transaction which encounters a locked row
+ during an UPDATE, DELETE, SELECT FOR UPDATE or SELECT FOR SHARE simply blocks
+ until the locking transaction commits or roll backs, and in the former case
+ then re-executes the statement using the new row version, as described above.
+ 
+ For transaction-snapshot mode transactions this is not satisfactory. The RI
+ triggers, for example, take a FOR SHARE lock on a parent row before allowing
+ a child row to be inserted and verify that deleting a parent row leaves no
+ orphaned children behind before allowing the delete to occur. From within
+ statement-snapshot mode transactions, blocking upon a delete or a parent row
+ until all lockers have finished is sufficient to guarantee that this check
+ finds any potential orphan, since the check will be executed with a up-to-
+ date snapshot to which the locking transaction's changes are visible. This,
+ however, is not true for transaction-snapshot mode transactions since these
+ will continue to use their old snapshot and hence miss newly inserted rows.
+ 
+ Transaction-snapshot mode transactions therefore treat a FOR SHARE or FOR
+ UPDATE lock on a tuple the same as an actual update during UPDATE and SELECT
+ FOR SHARE. They are thus aborted when trying to UPDATE or FOR UPDATE lock a
+ row that was FOR SHARE or FOR UPDATE locked by a concurrent transaction.
+ 
+ This is implemented by the lockcheck_snapshot parameter of heap_update,
+ heap_delete and heap_lock_tuple. If such a snapshot is provided to one of
+ these functions, they return HeapTupleUpdated if the tuple locked (but not
+ necessarily updated) by any transaction invisible to the snapshot.
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 183,189 **** standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
  	 * Copy other important information into the EState
  	 */
  	estate->es_snapshot = RegisterSnapshot(queryDesc->snapshot);
- 	estate->es_crosscheck_snapshot = RegisterSnapshot(queryDesc->crosscheck_snapshot);
  	estate->es_instrument = queryDesc->instrument_options;
  
  	/*
--- 183,188 ----
***************
*** 348,354 **** standard_ExecutorEnd(QueryDesc *queryDesc)
  
  	/* do away with our snapshots */
  	UnregisterSnapshot(estate->es_snapshot);
- 	UnregisterSnapshot(estate->es_crosscheck_snapshot);
  
  	/*
  	 * Must switch out of context before destroying it
--- 347,352 ----
***************
*** 1533,1543 **** EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
  
  			/*
  			 * This is a live tuple, so now try to lock it.
  			 */
  			test = heap_lock_tuple(relation, &tuple, &buffer,
  								   &update_ctid, &update_xmax,
  								   estate->es_output_cid,
! 								   lockmode, false);
  			/* We now have two pins on the buffer, get rid of one */
  			ReleaseBuffer(buffer);
  
--- 1531,1549 ----
  
  			/*
  			 * This is a live tuple, so now try to lock it.
+ 			 *
+ 			 * Transaction-snapshot mode transactions pass their snapshot as
+ 			 * the logcheck_snapshot.  This lets heap_lock_tuple report
+ 			 * concurrently FOR SHARE or FOR UPDATE locked tuples as
+ 			 * HeapTupleUpdated.
  			 */
+ 			Assert(!IsolationUsesXactSnapshot() || (estate->es_snapshot != InvalidSnapshot));
  			test = heap_lock_tuple(relation, &tuple, &buffer,
  								   &update_ctid, &update_xmax,
  								   estate->es_output_cid,
! 								   lockmode, false,
! 								   IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 																 InvalidSnapshot);
  			/* We now have two pins on the buffer, get rid of one */
  			ReleaseBuffer(buffer);
  
***************
*** 1906,1912 **** EvalPlanQualStart(EPQState *epqstate, EState *parentestate, Plan *planTree)
  	 */
  	estate->es_direction = ForwardScanDirection;
  	estate->es_snapshot = parentestate->es_snapshot;
- 	estate->es_crosscheck_snapshot = parentestate->es_crosscheck_snapshot;
  	estate->es_range_table = parentestate->es_range_table;
  	estate->es_plannedstmt = parentestate->es_plannedstmt;
  	estate->es_junkFilter = parentestate->es_junkFilter;
--- 1912,1917 ----
*** a/src/backend/executor/execUtils.c
--- b/src/backend/executor/execUtils.c
***************
*** 109,115 **** CreateExecutorState(void)
  	 */
  	estate->es_direction = ForwardScanDirection;
  	estate->es_snapshot = SnapshotNow;
- 	estate->es_crosscheck_snapshot = InvalidSnapshot;	/* no crosscheck */
  	estate->es_range_table = NIL;
  	estate->es_plannedstmt = NULL;
  
--- 109,114 ----
*** a/src/backend/executor/functions.c
--- b/src/backend/executor/functions.c
***************
*** 415,421 **** postquel_start(execution_state *es, SQLFunctionCachePtr fcache)
  	if (IsA(es->stmt, PlannedStmt))
  		es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
  								 fcache->src,
! 								 snapshot, InvalidSnapshot,
  								 dest,
  								 fcache->paramLI, 0);
  	else
--- 415,421 ----
  	if (IsA(es->stmt, PlannedStmt))
  		es->qd = CreateQueryDesc((PlannedStmt *) es->stmt,
  								 fcache->src,
! 								 snapshot,
  								 dest,
  								 fcache->paramLI, 0);
  	else
*** a/src/backend/executor/nodeLockRows.c
--- b/src/backend/executor/nodeLockRows.c
***************
*** 71,76 **** lnext:
--- 71,77 ----
  		ItemPointerData update_ctid;
  		TransactionId update_xmax;
  		LockTupleMode lockmode;
+ 		Snapshot lockcheck_snapshot = InvalidSnapshot;
  		HTSU_Result test;
  		HeapTuple	copyTuple;
  
***************
*** 110,123 **** lnext:
  
  		/* okay, try to lock the tuple */
  		if (erm->markType == ROW_MARK_EXCLUSIVE)
  			lockmode = LockTupleExclusive;
  		else
  			lockmode = LockTupleShared;
  
  		test = heap_lock_tuple(erm->relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   lockmode, erm->noWait);
  		ReleaseBuffer(buffer);
  		switch (test)
  		{
--- 111,136 ----
  
  		/* okay, try to lock the tuple */
  		if (erm->markType == ROW_MARK_EXCLUSIVE)
+ 		{
  			lockmode = LockTupleExclusive;
+ 
+ 			/*
+ 			 * Transaction-snapshot mode transactions pass their snapshot as
+ 			 * the logcheck_snapshot.  This lets heap_lock_tuple report
+ 			 * concurrently FOR SHARE or FOR UPDATE locked tuples as
+ 			 * HeapTupleUpdated.
+ 			 */
+ 			if (IsolationUsesXactSnapshot())
+ 				lockcheck_snapshot = estate->es_snapshot;
+ 		}
  		else
  			lockmode = LockTupleShared;
  
  		test = heap_lock_tuple(erm->relation, &tuple, &buffer,
  							   &update_ctid, &update_xmax,
  							   estate->es_output_cid,
! 							   lockmode, erm->noWait,
! 							   lockcheck_snapshot);
  		ReleaseBuffer(buffer);
  		switch (test)
  		{
*** a/src/backend/executor/nodeModifyTable.c
--- b/src/backend/executor/nodeModifyTable.c
***************
*** 307,323 **** ExecDelete(ItemPointer tupleid,
  	/*
  	 * delete the tuple
  	 *
! 	 * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check that
! 	 * the row to be deleted is visible to that snapshot, and throw a can't-
! 	 * serialize error if not.	This is a special-case behavior needed for
! 	 * referential integrity updates in transaction-snapshot mode transactions.
  	 */
  ldelete:;
  	result = heap_delete(resultRelationDesc, tupleid,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 estate->es_crosscheck_snapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 307,323 ----
  	/*
  	 * delete the tuple
  	 *
! 	 * Transaction-snapshot mode transactions pass their snapshot as the
! 	 * logcheck_snapshot.  This lets heap_lock_tuple report concurrently FOR
! 	 * SHARE or FOR UPDATE locked tuples as HeapTupleUpdated.
  	 */
  ldelete:;
  	result = heap_delete(resultRelationDesc, tupleid,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 true, /* wait for commit */
! 						 IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 													   InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
***************
*** 496,511 **** lreplace:;
  	/*
  	 * replace the heap tuple
  	 *
! 	 * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check that
! 	 * the row to be updated is visible to that snapshot, and throw a can't-
! 	 * serialize error if not.	This is a special-case behavior needed for
! 	 * referential integrity updates in transaction-snapshot mode transactions.
  	 */
  	result = heap_update(resultRelationDesc, tupleid, tuple,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 estate->es_crosscheck_snapshot,
! 						 true /* wait for commit */ );
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
--- 496,511 ----
  	/*
  	 * replace the heap tuple
  	 *
! 	 * Transaction-snapshot mode transactions pass their snapshot as the
! 	 * logcheck_snapshot.  This lets heap_lock_tuple report concurrently FOR
! 	 * SHARE or FOR UPDATE locked tuples as HeapTupleUpdated.
  	 */
  	result = heap_update(resultRelationDesc, tupleid, tuple,
  						 &update_ctid, &update_xmax,
  						 estate->es_output_cid,
! 						 true, /* wait for commit */
! 						 IsolationUsesXactSnapshot() ? estate->es_snapshot :
! 													   InvalidSnapshot);
  	switch (result)
  	{
  		case HeapTupleSelfUpdated:
*** a/src/backend/executor/spi.c
--- b/src/backend/executor/spi.c
***************
*** 51,57 **** static void _SPI_prepare_plan(const char *src, SPIPlanPtr plan,
  				  ParamListInfo boundParams);
  
  static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
  				  bool read_only, bool fire_triggers, long tcount);
  
  static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
--- 51,57 ----
  				  ParamListInfo boundParams);
  
  static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot,
  				  bool read_only, bool fire_triggers, long tcount);
  
  static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
***************
*** 357,363 **** SPI_execute(const char *src, bool read_only, long tcount)
  	_SPI_prepare_plan(src, &plan, NULL);
  
  	res = _SPI_execute_plan(&plan, NULL,
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 357,363 ----
  	_SPI_prepare_plan(src, &plan, NULL);
  
  	res = _SPI_execute_plan(&plan, NULL,
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 392,398 **** SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 392,398 ----
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 421,427 **** SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
  		return res;
  
  	res = _SPI_execute_plan(plan, params,
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 421,427 ----
  		return res;
  
  	res = _SPI_execute_plan(plan, params,
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 444,450 **** SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
  int
  SPI_execute_snapshot(SPIPlanPtr plan,
  					 Datum *Values, const char *Nulls,
! 					 Snapshot snapshot, Snapshot crosscheck_snapshot,
  					 bool read_only, bool fire_triggers, long tcount)
  {
  	int			res;
--- 444,450 ----
  int
  SPI_execute_snapshot(SPIPlanPtr plan,
  					 Datum *Values, const char *Nulls,
! 					 Snapshot snapshot,
  					 bool read_only, bool fire_triggers, long tcount)
  {
  	int			res;
***************
*** 463,469 **** SPI_execute_snapshot(SPIPlanPtr plan,
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							snapshot, crosscheck_snapshot,
  							read_only, fire_triggers, tcount);
  
  	_SPI_end_call(true);
--- 463,469 ----
  							_SPI_convert_params(plan->nargs, plan->argtypes,
  												Values, Nulls,
  												0),
! 							snapshot,
  							read_only, fire_triggers, tcount);
  
  	_SPI_end_call(true);
***************
*** 516,522 **** SPI_execute_with_args(const char *src,
  	/* We don't need to copy the plan since it will be thrown away anyway */
  
  	res = _SPI_execute_plan(&plan, paramLI,
! 							InvalidSnapshot, InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
--- 516,522 ----
  	/* We don't need to copy the plan since it will be thrown away anyway */
  
  	res = _SPI_execute_plan(&plan, paramLI,
! 							InvalidSnapshot,
  							read_only, true, tcount);
  
  	_SPI_end_call(true);
***************
*** 1752,1758 **** _SPI_prepare_plan(const char *src, SPIPlanPtr plan, ParamListInfo boundParams)
   *
   * snapshot: query snapshot to use, or InvalidSnapshot for the normal
   *		behavior of taking a new snapshot for each query.
-  * crosscheck_snapshot: for RI use, all others pass InvalidSnapshot
   * read_only: TRUE for read-only execution (no CommandCounterIncrement)
   * fire_triggers: TRUE to fire AFTER triggers at end of query (normal case);
   *		FALSE means any AFTER triggers are postponed to end of outer query
--- 1752,1757 ----
***************
*** 1760,1766 **** _SPI_prepare_plan(const char *src, SPIPlanPtr plan, ParamListInfo boundParams)
   */
  static int
  _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
  				  bool read_only, bool fire_triggers, long tcount)
  {
  	int			my_res = 0;
--- 1759,1765 ----
   */
  static int
  _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
! 				  Snapshot snapshot,
  				  bool read_only, bool fire_triggers, long tcount)
  {
  	int			my_res = 0;
***************
*** 1903,1909 **** _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  
  				qdesc = CreateQueryDesc((PlannedStmt *) stmt,
  										plansource->query_string,
! 										snap, crosscheck_snapshot,
  										dest,
  										paramLI, 0);
  				res = _SPI_pquery(qdesc, fire_triggers,
--- 1902,1908 ----
  
  				qdesc = CreateQueryDesc((PlannedStmt *) stmt,
  										plansource->query_string,
! 										snap,
  										dest,
  										paramLI, 0);
  				res = _SPI_pquery(qdesc, fire_triggers,
*** a/src/backend/tcop/pquery.c
--- b/src/backend/tcop/pquery.c
***************
*** 64,70 **** QueryDesc *
  CreateQueryDesc(PlannedStmt *plannedstmt,
  				const char *sourceText,
  				Snapshot snapshot,
- 				Snapshot crosscheck_snapshot,
  				DestReceiver *dest,
  				ParamListInfo params,
  				int instrument_options)
--- 64,69 ----
***************
*** 76,83 **** CreateQueryDesc(PlannedStmt *plannedstmt,
  	qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */
  	qd->sourceText = sourceText;	/* query text */
  	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
- 	/* RI check snapshot */
- 	qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->instrument_options = instrument_options;		/* instrumentation
--- 75,80 ----
***************
*** 109,115 **** CreateUtilityQueryDesc(Node *utilitystmt,
  	qd->utilitystmt = utilitystmt;		/* utility command */
  	qd->sourceText = sourceText;	/* query text */
  	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
- 	qd->crosscheck_snapshot = InvalidSnapshot;	/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->instrument_options = false;		/* uninteresting for utilities */
--- 106,111 ----
***************
*** 134,140 **** FreeQueryDesc(QueryDesc *qdesc)
  
  	/* forget our snapshots */
  	UnregisterSnapshot(qdesc->snapshot);
- 	UnregisterSnapshot(qdesc->crosscheck_snapshot);
  
  	/* Only the QueryDesc itself need be freed */
  	pfree(qdesc);
--- 130,135 ----
***************
*** 178,184 **** ProcessQuery(PlannedStmt *plan,
  	 * Create the QueryDesc object
  	 */
  	queryDesc = CreateQueryDesc(plan, sourceText,
! 								GetActiveSnapshot(), InvalidSnapshot,
  								dest, params, 0);
  
  	/*
--- 173,179 ----
  	 * Create the QueryDesc object
  	 */
  	queryDesc = CreateQueryDesc(plan, sourceText,
! 								GetActiveSnapshot(),
  								dest, params, 0);
  
  	/*
***************
*** 514,520 **** PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot)
  				queryDesc = CreateQueryDesc((PlannedStmt *) linitial(portal->stmts),
  											portal->sourceText,
  											GetActiveSnapshot(),
- 											InvalidSnapshot,
  											None_Receiver,
  											params,
  											0);
--- 509,514 ----
*** a/src/backend/utils/adt/ri_triggers.c
--- b/src/backend/utils/adt/ri_triggers.c
***************
*** 230,236 **** static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
  static bool ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  				Relation fk_rel, Relation pk_rel,
  				HeapTuple old_tuple, HeapTuple new_tuple,
- 				bool detectNewRows,
  				int expect_OK, const char *constrname);
  static void ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
  				 Relation rel, HeapTuple tuple,
--- 230,235 ----
***************
*** 357,363 **** RI_FKey_check(PG_FUNCTION_ARGS)
  		ri_PerformCheck(&qkey, qplan,
  						fk_rel, pk_rel,
  						NULL, NULL,
- 						false,
  						SPI_OK_SELECT,
  						NameStr(riinfo.conname));
  
--- 356,361 ----
***************
*** 500,506 **** RI_FKey_check(PG_FUNCTION_ARGS)
  	ri_PerformCheck(&qkey, qplan,
  					fk_rel, pk_rel,
  					NULL, new_row,
- 					false,
  					SPI_OK_SELECT,
  					NameStr(riinfo.conname));
  
--- 498,503 ----
***************
*** 661,667 **** ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
  	result = ri_PerformCheck(&qkey, qplan,
  							 fk_rel, pk_rel,
  							 old_row, NULL,
- 							 true,		/* treat like update */
  							 SPI_OK_SELECT, NULL);
  
  	if (SPI_finish() != SPI_OK_FINISH)
--- 658,663 ----
***************
*** 818,824 **** RI_FKey_noaction_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 814,819 ----
***************
*** 1006,1012 **** RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 1001,1006 ----
***************
*** 1168,1174 **** RI_FKey_cascade_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_DELETE,
  							NameStr(riinfo.conname));
  
--- 1162,1167 ----
***************
*** 1356,1362 **** RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, new_row,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 1349,1354 ----
***************
*** 1527,1533 **** RI_FKey_restrict_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 1519,1524 ----
***************
*** 1710,1716 **** RI_FKey_restrict_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_SELECT,
  							NameStr(riinfo.conname));
  
--- 1701,1706 ----
***************
*** 1881,1887 **** RI_FKey_setnull_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 1871,1876 ----
***************
*** 2097,2103 **** RI_FKey_setnull_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 2086,2091 ----
***************
*** 2269,2275 **** RI_FKey_setdefault_del(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 2257,2262 ----
***************
*** 2472,2478 **** RI_FKey_setdefault_upd(PG_FUNCTION_ARGS)
  			ri_PerformCheck(&qkey, qplan,
  							fk_rel, pk_rel,
  							old_row, NULL,
- 							true,		/* must detect new rows */
  							SPI_OK_UPDATE,
  							NameStr(riinfo.conname));
  
--- 2459,2464 ----
***************
*** 2792,2798 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
  	spi_result = SPI_execute_snapshot(qplan,
  									  NULL, NULL,
  									  GetLatestSnapshot(),
- 									  InvalidSnapshot,
  									  true, false, 1);
  
  	/* Check result */
--- 2778,2783 ----
***************
*** 3271,3284 **** static bool
  ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  				Relation fk_rel, Relation pk_rel,
  				HeapTuple old_tuple, HeapTuple new_tuple,
- 				bool detectNewRows,
  				int expect_OK, const char *constrname)
  {
  	Relation	query_rel,
  				source_rel;
  	int			key_idx;
  	Snapshot	test_snapshot;
- 	Snapshot	crosscheck_snapshot;
  	int			limit;
  	int			spi_result;
  	Oid			save_userid;
--- 3256,3267 ----
***************
*** 3330,3359 **** ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  	}
  
  	/*
- 	 * In READ COMMITTED mode, we just need to use an up-to-date regular
- 	 * snapshot, and we will see all rows that could be interesting. But in
- 	 * transaction-snapshot mode, we can't change the transaction snapshot.
- 	 * If the caller passes detectNewRows == false then it's okay to do the query
- 	 * with the transaction snapshot; otherwise we use a current snapshot, and
- 	 * tell the executor to error out if it finds any rows under the current
- 	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
- 	 * that SPI_execute_snapshot will register the snapshots, so we don't need
- 	 * to bother here.
- 	 */
- 	if (IsolationUsesXactSnapshot() && detectNewRows)
- 	{
- 		CommandCounterIncrement();		/* be sure all my own work is visible */
- 		test_snapshot = GetLatestSnapshot();
- 		crosscheck_snapshot = GetTransactionSnapshot();
- 	}
- 	else
- 	{
- 		/* the default SPI behavior is okay */
- 		test_snapshot = InvalidSnapshot;
- 		crosscheck_snapshot = InvalidSnapshot;
- 	}
- 
- 	/*
  	 * If this is a select query (e.g., for a 'no action' or 'restrict'
  	 * trigger), we only need to see if there is a single row in the table,
  	 * matching the key.  Otherwise, limit = 0 - because we want the query to
--- 3313,3318 ----
***************
*** 3369,3375 **** ri_PerformCheck(RI_QueryKey *qkey, SPIPlanPtr qplan,
  	/* Finally we can run the query. */
  	spi_result = SPI_execute_snapshot(qplan,
  									  vals, nulls,
! 									  test_snapshot, crosscheck_snapshot,
  									  false, false, limit);
  
  	/* Restore UID and security context */
--- 3328,3334 ----
  	/* Finally we can run the query. */
  	spi_result = SPI_execute_snapshot(qplan,
  									  vals, nulls,
! 									  InvalidSnapshot,
  									  false, false, limit);
  
  	/* Restore UID and security context */
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
***************
*** 27,32 ****
--- 27,33 ----
  
  #include "access/transam.h"
  #include "access/xact.h"
+ #include "access/multixact.h"
  #include "storage/proc.h"
  #include "storage/procarray.h"
  #include "utils/memutils.h"
***************
*** 126,131 **** GetTransactionSnapshot(void)
--- 127,142 ----
  	{
  		Assert(RegisteredSnapshots == 0);
  
+ 		/*
+ 		 * We must store the oldest visible multi xact *before* taking the
+ 		 * transaction snapshot. Otherwise HeapSatisfiesLockersVisible in
+ 		 * heapam.c will be in trouble, since it depends on being able to
+ 		 * inspect all multi xact ids which might contain xids invisible to
+ 		 * the transaction snapshot.
+ 		 */
+ 		if (IsolationUsesXactSnapshot())
+ 			MultiXactIdSetOldestVisible();
+ 
  		CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
  		FirstSnapshotSet = true;
  
*** a/src/backend/utils/time/tqual.c
--- b/src/backend/utils/time/tqual.c
***************
*** 72,80 **** SnapshotData SnapshotSelfData = {HeapTupleSatisfiesSelf};
  SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
  SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
  
- /* local functions */
- static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
- 
  
  /*
   * SetHintBits()
--- 72,77 ----
***************
*** 1253,1259 **** HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin,
   * by this function.  This is OK for current uses, because we actually only
   * apply this for known-committed XIDs.
   */
! static bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
  	uint32		i;
--- 1250,1256 ----
   * by this function.  This is OK for current uses, because we actually only
   * apply this for known-committed XIDs.
   */
! bool
  XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
  {
  	uint32		i;
*** a/src/include/access/heapam.h
--- b/src/include/access/heapam.h
***************
*** 98,112 **** extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
  			int options, BulkInsertState bistate);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait);
  extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
  			HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, Snapshot crosscheck, bool wait);
  extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
  				Buffer *buffer, ItemPointer ctid,
  				TransactionId *update_xmax, CommandId cid,
! 				LockTupleMode mode, bool nowait);
  extern void heap_inplace_update(Relation relation, HeapTuple tuple);
  extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
  				  Buffer buf);
--- 98,113 ----
  			int options, BulkInsertState bistate);
  extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot);
  extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
  			HeapTuple newtup,
  			ItemPointer ctid, TransactionId *update_xmax,
! 			CommandId cid, bool wait, Snapshot lockcheck_snapshot);
  extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
  				Buffer *buffer, ItemPointer ctid,
  				TransactionId *update_xmax, CommandId cid,
! 				LockTupleMode mode, bool nowait,
! 				Snapshot lockcheck_snapshot);
  extern void heap_inplace_update(Relation relation, HeapTuple tuple);
  extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
  				  Buffer buf);
*** a/src/include/access/multixact.h
--- b/src/include/access/multixact.h
***************
*** 49,54 **** extern bool MultiXactIdIsCurrent(MultiXactId multi);
--- 49,55 ----
  extern void MultiXactIdWait(MultiXactId multi);
  extern bool ConditionalMultiXactIdWait(MultiXactId multi);
  extern void MultiXactIdSetOldestMember(void);
+ extern void MultiXactIdSetOldestVisible(void);
  extern int	GetMultiXactIdMembers(MultiXactId multi, TransactionId **xids);
  
  extern void AtEOXact_MultiXact(void);
*** a/src/include/executor/execdesc.h
--- b/src/include/executor/execdesc.h
***************
*** 39,45 **** typedef struct QueryDesc
  	Node	   *utilitystmt;	/* utility statement, or null */
  	const char *sourceText;		/* source text of the query */
  	Snapshot	snapshot;		/* snapshot to use for query */
- 	Snapshot	crosscheck_snapshot;	/* crosscheck for RI update/delete */
  	DestReceiver *dest;			/* the destination for tuple output */
  	ParamListInfo params;		/* param values being passed in */
  	int			instrument_options;		/* OR of InstrumentOption flags */
--- 39,44 ----
***************
*** 57,63 **** typedef struct QueryDesc
  extern QueryDesc *CreateQueryDesc(PlannedStmt *plannedstmt,
  				const char *sourceText,
  				Snapshot snapshot,
- 				Snapshot crosscheck_snapshot,
  				DestReceiver *dest,
  				ParamListInfo params,
  				int instrument_options);
--- 56,61 ----
*** a/src/include/executor/spi.h
--- b/src/include/executor/spi.h
***************
*** 82,88 **** extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
  extern int SPI_execute_snapshot(SPIPlanPtr plan,
  					 Datum *Values, const char *Nulls,
  					 Snapshot snapshot,
- 					 Snapshot crosscheck_snapshot,
  					 bool read_only, bool fire_triggers, long tcount);
  extern int SPI_execute_with_args(const char *src,
  					  int nargs, Oid *argtypes,
--- 82,87 ----
*** a/src/include/nodes/execnodes.h
--- b/src/include/nodes/execnodes.h
***************
*** 337,343 **** typedef struct EState
  	/* Basic state for all query types: */
  	ScanDirection es_direction; /* current scan direction */
  	Snapshot	es_snapshot;	/* time qual to use */
- 	Snapshot	es_crosscheck_snapshot; /* crosscheck time qual for RI */
  	List	   *es_range_table; /* List of RangeTblEntry */
  	PlannedStmt *es_plannedstmt;	/* link to top of plan tree */
  
--- 337,342 ----
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
***************
*** 41,46 **** extern PGDLLIMPORT SnapshotData SnapshotToastData;
--- 41,48 ----
  #define IsMVCCSnapshot(snapshot)  \
  	((snapshot)->satisfies == HeapTupleSatisfiesMVCC)
  
+ bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
+ 
  /*
   * HeapTupleSatisfiesVisibility
   *		True iff heap tuple satisfies a time qual.