Index: src/backend/access/transam/xact.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/access/transam/xact.c,v
retrieving revision 1.262
diff -c -p -r1.262 xact.c
*** src/backend/access/transam/xact.c	26 Mar 2008 18:48:59 -0000	1.262
--- src/backend/access/transam/xact.c	27 Mar 2008 13:16:15 -0000
*************** CommitTransaction(void)
*** 1752,1757 ****
--- 1752,1758 ----
  	AtEOXact_ComboCid();
  	AtEOXact_HashTables(true);
  	AtEOXact_PgStat(true);
+ 	AtEOXact_Snapshot(true);
  	pgstat_report_xact_timestamp(0);
  
  	CurrentResourceOwner = NULL;
*************** PrepareTransaction(void)
*** 1984,1989 ****
--- 1985,1991 ----
  	AtEOXact_ComboCid();
  	AtEOXact_HashTables(true);
  	/* don't call AtEOXact_PgStat here */
+ 	AtEOXact_Snapshot(true);
  
  	CurrentResourceOwner = NULL;
  	ResourceOwnerDelete(TopTransactionResourceOwner);
*************** AbortTransaction(void)
*** 2128,2133 ****
--- 2130,2136 ----
  	AtEOXact_ComboCid();
  	AtEOXact_HashTables(false);
  	AtEOXact_PgStat(false);
+ 	AtEOXact_Snapshot(false);
  	pgstat_report_xact_timestamp(0);
  
  	/*
*************** CommitSubTransaction(void)
*** 3806,3811 ****
--- 3809,3815 ----
  	AtSubCommit_Notify();
  	AtEOSubXact_UpdateFlatFiles(true, s->subTransactionId,
  								s->parent->subTransactionId);
+ 	AtSubCommit_Snapshot();
  
  	CallSubXactCallbacks(SUBXACT_EVENT_COMMIT_SUB, s->subTransactionId,
  						 s->parent->subTransactionId);
*************** AbortSubTransaction(void)
*** 3926,3931 ****
--- 3930,3936 ----
  		AtSubAbort_Notify();
  		AtEOSubXact_UpdateFlatFiles(false, s->subTransactionId,
  									s->parent->subTransactionId);
+ 		AtSubAbort_Snapshot();
  
  		/* Advertise the fact that we aborted in pg_clog. */
  		(void) RecordTransactionAbort(true);
Index: src/backend/catalog/index.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/catalog/index.c,v
retrieving revision 1.296
diff -c -p -r1.296 index.c
*** src/backend/catalog/index.c	26 Mar 2008 21:10:37 -0000	1.296
--- src/backend/catalog/index.c	27 Mar 2008 13:16:15 -0000
*************** IndexBuildHeapScan(Relation heapRelation
*** 1466,1471 ****
--- 1466,1472 ----
  	TransactionId OldestXmin;
  	BlockNumber root_blkno = InvalidBlockNumber;
  	OffsetNumber root_offsets[MaxHeapTuplesPerPage];
+ 	bool		unreg_snapshot = false;
  
  	/*
  	 * sanity checks
*************** IndexBuildHeapScan(Relation heapRelation
*** 1502,1508 ****
  	}
  	else if (indexInfo->ii_Concurrent)
  	{
! 		snapshot = CopySnapshot(GetTransactionSnapshot());
  		OldestXmin = InvalidTransactionId;		/* not used */
  	}
  	else
--- 1503,1510 ----
  	}
  	else if (indexInfo->ii_Concurrent)
  	{
! 		snapshot = RegisterSnapshot(GetTransactionSnapshot());
! 		unreg_snapshot = true;
  		OldestXmin = InvalidTransactionId;		/* not used */
  	}
  	else
*************** IndexBuildHeapScan(Relation heapRelation
*** 1787,1792 ****
--- 1789,1796 ----
  	}
  
  	heap_endscan(scan);
+ 	if (unreg_snapshot)
+ 		UnregisterSnapshot(snapshot);
  
  	ExecDropSingleTupleTableSlot(slot);
  
Index: src/backend/commands/cluster.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/cluster.c,v
retrieving revision 1.172
diff -c -p -r1.172 cluster.c
*** src/backend/commands/cluster.c	26 Mar 2008 21:10:37 -0000	1.172
--- src/backend/commands/cluster.c	27 Mar 2008 13:16:15 -0000
*************** cluster(ClusterStmt *stmt, bool isTopLev
*** 211,216 ****
--- 211,217 ----
  		rvs = get_tables_to_cluster(cluster_context);
  
  		/* Commit to get out of starting transaction */
+ 		UnregisterSnapshot(ActiveSnapshot);
  		CommitTransactionCommand();
  
  		/* Ok, now that we've got them all, cluster them one by one */
*************** cluster(ClusterStmt *stmt, bool isTopLev
*** 221,228 ****
  			/* Start a new transaction for each relation. */
  			StartTransactionCommand();
  			/* functions in indexes may want a snapshot set */
! 			ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  			cluster_rel(rvtc, true);
  			CommitTransactionCommand();
  		}
  
--- 222,230 ----
  			/* Start a new transaction for each relation. */
  			StartTransactionCommand();
  			/* functions in indexes may want a snapshot set */
! 			ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  			cluster_rel(rvtc, true);
+ 			UnregisterSnapshot(ActiveSnapshot);
  			CommitTransactionCommand();
  		}
  
Index: src/backend/commands/copy.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.298
diff -c -p -r1.298 copy.c
*** src/backend/commands/copy.c	26 Mar 2008 18:48:59 -0000	1.298
--- src/backend/commands/copy.c	27 Mar 2008 13:16:15 -0000
*************** DoCopy(const CopyStmt *stmt, const char 
*** 1003,1008 ****
--- 1003,1009 ----
  		Query	   *query;
  		PlannedStmt *plan;
  		DestReceiver *dest;
+ 		Snapshot	snap;
  
  		Assert(!is_from);
  		cstate->rel = NULL;
*************** DoCopy(const CopyStmt *stmt, const char 
*** 1044,1056 ****
  		plan = planner(query, 0, NULL);
  
  		/*
! 		 * Update snapshot command ID to ensure this query sees results of any
! 		 * previously executed queries.  (It's a bit cheesy to modify
! 		 * ActiveSnapshot without making a copy, but for the limited ways in
! 		 * which COPY can be invoked, I think it's OK, because the active
! 		 * snapshot shouldn't be shared with anything else anyway.)
  		 */
! 		ActiveSnapshot->curcid = GetCurrentCommandId(false);
  
  		/* Create dest receiver for COPY OUT */
  		dest = CreateDestReceiver(DestCopyOut, NULL);
--- 1045,1056 ----
  		plan = planner(query, 0, NULL);
  
  		/*
! 		 * Use a snapshot with updated command ID to ensure this query sees
! 		 * results of any previously executed queries.  We don't register
! 		 * the snapshot here -- CreateQueryDesc does it.
  		 */
! 		snap = CopySnapshot(ActiveSnapshot);
! 		snap->curcid = GetCurrentCommandId(false);
  
  		/* Create dest receiver for COPY OUT */
  		dest = CreateDestReceiver(DestCopyOut, NULL);
*************** DoCopy(const CopyStmt *stmt, const char 
*** 1058,1065 ****
  
  		/* Create a QueryDesc requesting no output */
  		cstate->queryDesc = CreateQueryDesc(plan,
! 											ActiveSnapshot, InvalidSnapshot,
  											dest, NULL, false);
  
  		/*
  		 * Call ExecutorStart to prepare the plan for execution.
--- 1058,1066 ----
  
  		/* Create a QueryDesc requesting no output */
  		cstate->queryDesc = CreateQueryDesc(plan,
! 											snap, InvalidSnapshot,
  											dest, NULL, false);
+ 		FreeSnapshot(snap);
  
  		/*
  		 * Call ExecutorStart to prepare the plan for execution.
Index: src/backend/commands/explain.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/explain.c,v
retrieving revision 1.171
diff -c -p -r1.171 explain.c
*** src/backend/commands/explain.c	26 Mar 2008 18:48:59 -0000	1.171
--- src/backend/commands/explain.c	27 Mar 2008 13:17:43 -0000
*************** ExplainOnePlan(PlannedStmt *plannedstmt,
*** 226,246 ****
  	ExplainState *es;
  	StringInfoData buf;
  	int			eflags;
  
  	/*
! 	 * Update snapshot command ID to ensure this query sees results of any
! 	 * previously executed queries.  (It's a bit cheesy to modify
! 	 * ActiveSnapshot without making a copy, but for the limited ways in which
! 	 * EXPLAIN can be invoked, I think it's OK, because the active snapshot
! 	 * shouldn't be shared with anything else anyway.)
  	 */
! 	ActiveSnapshot->curcid = GetCurrentCommandId(false);
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt,
! 								ActiveSnapshot, InvalidSnapshot,
  								None_Receiver, params,
  								stmt->analyze);
  
  	INSTR_TIME_SET_CURRENT(starttime);
  
--- 226,246 ----
  	ExplainState *es;
  	StringInfoData buf;
  	int			eflags;
+ 	Snapshot	snap;
  
  	/*
! 	 * Update a snapshot with an updated command ID to ensure this query sees
! 	 * results of any previously executed queries.
  	 */
! 	snap = CopySnapshot(ActiveSnapshot);
! 	snap->curcid = GetCurrentCommandId(false);
  
  	/* Create a QueryDesc requesting no output */
  	queryDesc = CreateQueryDesc(plannedstmt,
! 								snap, InvalidSnapshot,
  								None_Receiver, params,
  								stmt->analyze);
+ 	FreeSnapshot(snap);
  
  	INSTR_TIME_SET_CURRENT(starttime);
  
Index: src/backend/commands/indexcmds.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/indexcmds.c,v
retrieving revision 1.174
diff -c -p -r1.174 indexcmds.c
*** src/backend/commands/indexcmds.c	26 Mar 2008 21:10:37 -0000	1.174
--- src/backend/commands/indexcmds.c	27 Mar 2008 13:16:15 -0000
*************** DefineIndex(RangeVar *heapRelation,
*** 483,488 ****
--- 483,491 ----
  	 */
  	LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
  
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	CommitTransactionCommand();
  	StartTransactionCommand();
  
*************** DefineIndex(RangeVar *heapRelation,
*** 541,547 ****
  	indexRelation = index_open(indexRelationId, RowExclusiveLock);
  
  	/* Set ActiveSnapshot since functions in the indexes may need it */
! 	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/* We have to re-build the IndexInfo struct, since it was lost in commit */
  	indexInfo = BuildIndexInfo(indexRelation);
--- 544,550 ----
  	indexRelation = index_open(indexRelationId, RowExclusiveLock);
  
  	/* Set ActiveSnapshot since functions in the indexes may need it */
! 	ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  
  	/* We have to re-build the IndexInfo struct, since it was lost in commit */
  	indexInfo = BuildIndexInfo(indexRelation);
*************** DefineIndex(RangeVar *heapRelation,
*** 580,585 ****
--- 583,592 ----
  
  	heap_close(pg_index, RowExclusiveLock);
  
+ 	/* we can do away with our snapshot */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	/*
  	 * Commit this transaction to make the indisready update visible.
  	 */
*************** DefineIndex(RangeVar *heapRelation,
*** 615,621 ****
  	 * We also set ActiveSnapshot to this snap, since functions in indexes may
  	 * need a snapshot.
  	 */
! 	snapshot = CopySnapshot(GetTransactionSnapshot());
  	ActiveSnapshot = snapshot;
  
  	/*
--- 622,628 ----
  	 * We also set ActiveSnapshot to this snap, since functions in indexes may
  	 * need a snapshot.
  	 */
! 	snapshot = RegisterSnapshot(GetTransactionSnapshot());
  	ActiveSnapshot = snapshot;
  
  	/*
*************** DefineIndex(RangeVar *heapRelation,
*** 624,629 ****
--- 631,642 ----
  	validate_index(relationId, indexRelationId, snapshot);
  
  	/*
+ 	 * Note: we no longer need the validating snapshot here anymore, but we
+ 	 * still need ActiveSnapshot, so we'll call UnregisterSnapshot once at the
+ 	 * end.
+ 	 */
+ 
+ 	/*
  	 * The index is now valid in the sense that it contains all currently
  	 * interesting tuples.	But since it might not contain tuples deleted just
  	 * before the reference snap was taken, we have to wait out any
*************** DefineIndex(RangeVar *heapRelation,
*** 685,690 ****
--- 698,707 ----
  	 */
  	CacheInvalidateRelcacheByRelid(heaprelid.relId);
  
+ 	/* we can now unregister ActiveSnapshot */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	/*
  	 * Last thing to do is release the session-level lock on the parent table.
  	 */
*************** ReindexDatabase(const char *databaseName
*** 1452,1457 ****
--- 1469,1476 ----
  	heap_close(relationRelation, AccessShareLock);
  
  	/* Now reindex each rel in a separate transaction */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
  	CommitTransactionCommand();
  	foreach(l, relids)
  	{
*************** ReindexDatabase(const char *databaseName
*** 1459,1469 ****
  
  		StartTransactionCommand();
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  		if (reindex_relation(relid, true))
  			ereport(NOTICE,
  					(errmsg("table \"%s\" was reindexed",
  							get_rel_name(relid))));
  		CommitTransactionCommand();
  	}
  	StartTransactionCommand();
--- 1478,1490 ----
  
  		StartTransactionCommand();
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  		if (reindex_relation(relid, true))
  			ereport(NOTICE,
  					(errmsg("table \"%s\" was reindexed",
  							get_rel_name(relid))));
+ 		UnregisterSnapshot(ActiveSnapshot);
+ 		ActiveSnapshot = NULL;
  		CommitTransactionCommand();
  	}
  	StartTransactionCommand();
Index: src/backend/commands/trigger.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.230
diff -c -p -r1.230 trigger.c
*** src/backend/commands/trigger.c	26 Mar 2008 21:10:38 -0000	1.230
--- src/backend/commands/trigger.c	27 Mar 2008 13:16:15 -0000
*************** void
*** 2884,2889 ****
--- 2884,2890 ----
  AfterTriggerFireDeferred(void)
  {
  	AfterTriggerEventList *events;
+ 	bool		unreg_snapshot = false;
  
  	/* Must be inside a transaction */
  	Assert(afterTriggers != NULL);
*************** AfterTriggerFireDeferred(void)
*** 2898,2904 ****
  	 */
  	events = &afterTriggers->events;
  	if (events->head != NULL)
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Run all the remaining triggers.	Loop until they are all gone, in case
--- 2899,2908 ----
  	 */
  	events = &afterTriggers->events;
  	if (events->head != NULL)
! 	{
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
! 		unreg_snapshot = true;
! 	}
  
  	/*
  	 * Run all the remaining triggers.	Loop until they are all gone, in case
*************** AfterTriggerFireDeferred(void)
*** 2911,2916 ****
--- 2915,2926 ----
  		afterTriggerInvokeEvents(events, firing_id, NULL, true);
  	}
  
+ 	if (unreg_snapshot)
+ 	{
+ 		UnregisterSnapshot(ActiveSnapshot);
+ 		ActiveSnapshot = NULL;
+ 	}
+ 
  	Assert(events->head == NULL);
  }
  
Index: src/backend/commands/vacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/commands/vacuum.c,v
retrieving revision 1.371
diff -c -p -r1.371 vacuum.c
*** src/backend/commands/vacuum.c	26 Mar 2008 21:10:38 -0000	1.371
--- src/backend/commands/vacuum.c	27 Mar 2008 13:16:15 -0000
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 407,412 ****
--- 407,418 ----
  	 */
  	if (use_own_xacts)
  	{
+ 		/* ActiveSnapshot is not set by autovacuum */
+ 		if (ActiveSnapshot)
+ 		{
+ 			UnregisterSnapshot(ActiveSnapshot);
+ 			ActiveSnapshot = NULL;
+ 		}
  		/* matches the StartTransaction in PostgresMain() */
  		CommitTransactionCommand();
  	}
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 444,450 ****
  				{
  					StartTransactionCommand();
  					/* functions in indexes may want a snapshot set */
! 					ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  				}
  				else
  					old_context = MemoryContextSwitchTo(anl_context);
--- 450,456 ----
  				{
  					StartTransactionCommand();
  					/* functions in indexes may want a snapshot set */
! 					ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  				}
  				else
  					old_context = MemoryContextSwitchTo(anl_context);
*************** vacuum(VacuumStmt *vacstmt, List *relids
*** 452,458 ****
--- 458,468 ----
  				analyze_rel(relid, vacstmt, vac_strategy);
  
  				if (use_own_xacts)
+ 				{
+ 					UnregisterSnapshot(ActiveSnapshot);
+ 					ActiveSnapshot = NULL;
  					CommitTransactionCommand();
+ 				}
  				else
  				{
  					MemoryContextSwitchTo(old_context);
*************** vacuum_rel(Oid relid, VacuumStmt *vacstm
*** 979,985 ****
  	if (vacstmt->full)
  	{
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  	else
  	{
--- 989,995 ----
  	if (vacstmt->full)
  	{
  		/* functions in indexes may want a snapshot set */
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  	}
  	else
  	{
*************** vacuum_rel(Oid relid, VacuumStmt *vacstm
*** 1138,1143 ****
--- 1148,1160 ----
  	/* all done with this class, but hold lock until commit */
  	relation_close(onerel, NoLock);
  
+ 	/* do away with our snapshot */
+ 	if (vacstmt->full)
+ 	{
+ 		UnregisterSnapshot(ActiveSnapshot);
+ 		ActiveSnapshot = NULL;
+ 	}
+ 
  	/*
  	 * Complete the transaction and free all temporary memory used.
  	 */
Index: src/backend/executor/execMain.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/execMain.c,v
retrieving revision 1.304
diff -c -p -r1.304 execMain.c
*** src/backend/executor/execMain.c	26 Mar 2008 21:10:38 -0000	1.304
--- src/backend/executor/execMain.c	27 Mar 2008 14:19:05 -0000
***************
*** 52,57 ****
--- 52,58 ----
  #include "utils/acl.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
+ #include "utils/snapmgr.h"
  #include "utils/tqual.h"
  
  
*************** ExecutorStart(QueryDesc *queryDesc, int 
*** 188,195 ****
  	/*
  	 * Copy other important information into the EState
  	 */
! 	estate->es_snapshot = queryDesc->snapshot;
! 	estate->es_crosscheck_snapshot = queryDesc->crosscheck_snapshot;
  	estate->es_instrument = queryDesc->doInstrument;
  
  	/*
--- 189,196 ----
  	/*
  	 * Copy other important information into the EState
  	 */
! 	estate->es_snapshot = RegisterSnapshot(queryDesc->snapshot);
! 	estate->es_crosscheck_snapshot = RegisterSnapshot(queryDesc->crosscheck_snapshot);
  	estate->es_instrument = queryDesc->doInstrument;
  
  	/*
*************** ExecutorEnd(QueryDesc *queryDesc)
*** 316,321 ****
--- 317,326 ----
  	if (estate->es_select_into)
  		CloseIntoRel(queryDesc);
  
+ 	/* do away with our snapshots */
+ 	UnregisterSnapshot(estate->es_snapshot);
+ 	UnregisterSnapshot(estate->es_crosscheck_snapshot);
+ 
  	/*
  	 * Must switch out of context before destroying it
  	 */
Index: src/backend/executor/functions.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/functions.c,v
retrieving revision 1.124
diff -c -p -r1.124 functions.c
*** src/backend/executor/functions.c	26 Mar 2008 18:48:59 -0000	1.124
--- src/backend/executor/functions.c	27 Mar 2008 13:16:15 -0000
*************** postquel_start(execution_state *es, SQLF
*** 295,309 ****
  	 * In a read-only function, use the surrounding query's snapshot;
  	 * otherwise take a new snapshot for each query.  The snapshot should
  	 * include a fresh command ID so that all work to date in this transaction
! 	 * is visible.	We copy in both cases so that postquel_end can
! 	 * unconditionally do FreeSnapshot.
  	 */
  	if (fcache->readonly_func)
! 		snapshot = CopySnapshot(ActiveSnapshot);
  	else
  	{
  		CommandCounterIncrement();
! 		snapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  
  	if (IsA(es->stmt, PlannedStmt))
--- 295,309 ----
  	 * In a read-only function, use the surrounding query's snapshot;
  	 * otherwise take a new snapshot for each query.  The snapshot should
  	 * include a fresh command ID so that all work to date in this transaction
! 	 * is visible.	No need to create a separate copy; CreateQueryDesc will
! 	 * register it appropriately.
  	 */
  	if (fcache->readonly_func)
! 		snapshot = ActiveSnapshot;
  	else
  	{
  		CommandCounterIncrement();
! 		snapshot = GetTransactionSnapshot();
  	}
  
  	if (IsA(es->stmt, PlannedStmt))
*************** postquel_end(execution_state *es)
*** 425,431 ****
  		ActiveSnapshot = saveActiveSnapshot;
  	}
  
- 	FreeSnapshot(es->qd->snapshot);
  	FreeQueryDesc(es->qd);
  	es->qd = NULL;
  }
--- 425,430 ----
Index: src/backend/executor/spi.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/executor/spi.c,v
retrieving revision 1.191
diff -c -p -r1.191 spi.c
*** src/backend/executor/spi.c	26 Mar 2008 18:48:59 -0000	1.191
--- src/backend/executor/spi.c	27 Mar 2008 13:16:15 -0000
*************** SPI_execp(SPIPlanPtr plan, Datum *Values
*** 365,373 ****
  
  /*
   * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
!  * the caller to specify exactly which snapshots to use.  Also, the caller
!  * may specify that AFTER triggers should be queued as part of the outer
!  * query rather than being fired immediately at the end of the command.
   *
   * This is currently not documented in spi.sgml because it is only intended
   * for use by RI triggers.
--- 365,374 ----
  
  /*
   * SPI_execute_snapshot -- identical to SPI_execute_plan, except that we allow
!  * the caller to specify exactly which snapshots to use, which will be
!  * registered here.  Also, the caller may specify that AFTER triggers should be
!  * queued as part of the outer query rather than being fired immediately at the
!  * end of the command.
   *
   * This is currently not documented in spi.sgml because it is only intended
   * for use by RI triggers.
*************** SPI_cursor_open(const char *name, SPIPla
*** 1028,1034 ****
  	}
  
  	/*
! 	 * Set up the snapshot to use.	(PortalStart will do CopySnapshot, so we
  	 * skip that here.)
  	 */
  	if (read_only)
--- 1029,1035 ----
  	}
  
  	/*
! 	 * Set up the snapshot to use.	(PortalStart will do RegisterSnapshot, so we
  	 * skip that here.)
  	 */
  	if (read_only)
*************** _SPI_execute_plan(SPIPlanPtr plan, Datum
*** 1598,1606 ****
  					 * ActiveSnapshot; if read-write, grab a full new snap.
  					 */
  					if (read_only)
! 						ActiveSnapshot = CopySnapshot(saveActiveSnapshot);
  					else
! 						ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  				}
  				else
  				{
--- 1599,1607 ----
  					 * ActiveSnapshot; if read-write, grab a full new snap.
  					 */
  					if (read_only)
! 						ActiveSnapshot = RegisterSnapshot(saveActiveSnapshot);
  					else
! 						ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  				}
  				else
  				{
*************** _SPI_execute_plan(SPIPlanPtr plan, Datum
*** 1609,1617 ****
  					 * exactly that snapshot, but read-write means use the
  					 * snap with advancing of command ID.
  					 */
! 					ActiveSnapshot = CopySnapshot(snapshot);
! 					if (!read_only)
! 						ActiveSnapshot->curcid = GetCurrentCommandId(false);
  				}
  
  				if (IsA(stmt, PlannedStmt) &&
--- 1610,1626 ----
  					 * exactly that snapshot, but read-write means use the
  					 * snap with advancing of command ID.
  					 */
! 					if (read_only)
! 						ActiveSnapshot = RegisterSnapshot(snapshot);
! 					else
! 					{
! 						Snapshot	snap;
! 
! 						snap = CopySnapshot(snapshot);
! 						snap->curcid = GetCurrentCommandId(false);
! 						ActiveSnapshot = RegisterSnapshot(snap);
! 						FreeSnapshot(snap);
! 					}
  				}
  
  				if (IsA(stmt, PlannedStmt) &&
*************** _SPI_execute_plan(SPIPlanPtr plan, Datum
*** 1641,1647 ****
  						_SPI_current->processed = _SPI_current->tuptable->alloced - _SPI_current->tuptable->free;
  					res = SPI_OK_UTILITY;
  				}
! 				FreeSnapshot(ActiveSnapshot);
  				ActiveSnapshot = NULL;
  
  				/*
--- 1650,1656 ----
  						_SPI_current->processed = _SPI_current->tuptable->alloced - _SPI_current->tuptable->free;
  					res = SPI_OK_UTILITY;
  				}
! 				UnregisterSnapshot(ActiveSnapshot);
  				ActiveSnapshot = NULL;
  
  				/*
Index: src/backend/storage/ipc/procarray.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/procarray.c,v
retrieving revision 1.43
diff -c -p -r1.43 procarray.c
*** src/backend/storage/ipc/procarray.c	26 Mar 2008 18:48:59 -0000	1.43
--- src/backend/storage/ipc/procarray.c	28 Mar 2008 13:42:55 -0000
*************** GetSnapshotData(Snapshot snapshot, bool 
*** 681,691 ****
  
  	Assert(snapshot != NULL);
  
- 	/* Serializable snapshot must be computed before any other... */
- 	Assert(serializable ?
- 		   !TransactionIdIsValid(MyProc->xmin) :
- 		   TransactionIdIsValid(MyProc->xmin));
- 
  	/*
  	 * Allocating space for maxProcs xids is usually overkill; numProcs would
  	 * be sufficient.  But it seems better to do the malloc while not holding
--- 681,686 ----
Index: src/backend/storage/large_object/inv_api.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/large_object/inv_api.c,v
retrieving revision 1.131
diff -c -p -r1.131 inv_api.c
*** src/backend/storage/large_object/inv_api.c	26 Mar 2008 21:10:38 -0000	1.131
--- src/backend/storage/large_object/inv_api.c	27 Mar 2008 13:16:16 -0000
*************** inv_open(Oid lobjId, int flags, MemoryCo
*** 245,256 ****
  	}
  	else if (flags & INV_READ)
  	{
! 		/* be sure to copy snap into mcxt */
! 		MemoryContext oldContext = MemoryContextSwitchTo(mcxt);
! 
! 		retval->snapshot = CopySnapshot(ActiveSnapshot);
  		retval->flags = IFS_RDLOCK;
- 		MemoryContextSwitchTo(oldContext);
  	}
  	else
  		elog(ERROR, "invalid flags: %d", flags);
--- 245,252 ----
  	}
  	else if (flags & INV_READ)
  	{
! 		retval->snapshot = RegisterSnapshot(ActiveSnapshot);
  		retval->flags = IFS_RDLOCK;
  	}
  	else
  		elog(ERROR, "invalid flags: %d", flags);
*************** inv_close(LargeObjectDesc *obj_desc)
*** 273,279 ****
  {
  	Assert(PointerIsValid(obj_desc));
  	if (obj_desc->snapshot != SnapshotNow)
! 		FreeSnapshot(obj_desc->snapshot);
  	pfree(obj_desc);
  }
  
--- 269,275 ----
  {
  	Assert(PointerIsValid(obj_desc));
  	if (obj_desc->snapshot != SnapshotNow)
! 		UnregisterSnapshot(obj_desc->snapshot);
  	pfree(obj_desc);
  }
  
Index: src/backend/tcop/fastpath.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/fastpath.c,v
retrieving revision 1.99
diff -c -p -r1.99 fastpath.c
*** src/backend/tcop/fastpath.c	26 Mar 2008 18:48:59 -0000	1.99
--- src/backend/tcop/fastpath.c	27 Mar 2008 13:16:16 -0000
*************** HandleFunctionRequest(StringInfo msgBuf)
*** 309,315 ****
  	 * Now that we know we are in a valid transaction, set snapshot in case
  	 * needed by function itself or one of the datatype I/O routines.
  	 */
! 	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Begin parsing the buffer contents.
--- 309,315 ----
  	 * Now that we know we are in a valid transaction, set snapshot in case
  	 * needed by function itself or one of the datatype I/O routines.
  	 */
! 	ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Begin parsing the buffer contents.
*************** HandleFunctionRequest(StringInfo msgBuf)
*** 396,401 ****
--- 396,405 ----
  
  	SendFunctionResult(retval, fcinfo.isnull, fip->rettype, rformat);
  
+ 	/* We no longer need the snapshot */
+ 	UnregisterSnapshot(ActiveSnapshot);
+ 	ActiveSnapshot = NULL;
+ 
  	/*
  	 * Emit duration logging if appropriate.
  	 */
Index: src/backend/tcop/postgres.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/postgres.c,v
retrieving revision 1.547
diff -c -p -r1.547 postgres.c
*** src/backend/tcop/postgres.c	26 Mar 2008 18:48:59 -0000	1.547
--- src/backend/tcop/postgres.c	27 Mar 2008 13:16:16 -0000
*************** pg_plan_queries(List *querytrees, int cu
*** 754,760 ****
  			{
  				if (needSnapshot && mySnapshot == NULL)
  				{
! 					mySnapshot = CopySnapshot(GetTransactionSnapshot());
  					ActiveSnapshot = mySnapshot;
  				}
  				stmt = (Node *) pg_plan_query(query, cursorOptions,
--- 754,760 ----
  			{
  				if (needSnapshot && mySnapshot == NULL)
  				{
! 					mySnapshot = RegisterSnapshot(GetTransactionSnapshot());
  					ActiveSnapshot = mySnapshot;
  				}
  				stmt = (Node *) pg_plan_query(query, cursorOptions,
*************** pg_plan_queries(List *querytrees, int cu
*** 765,771 ****
  		}
  
  		if (mySnapshot)
! 			FreeSnapshot(mySnapshot);
  	}
  	PG_CATCH();
  	{
--- 765,771 ----
  		}
  
  		if (mySnapshot)
! 			UnregisterSnapshot(mySnapshot);
  	}
  	PG_CATCH();
  	{
Index: src/backend/tcop/pquery.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/tcop/pquery.c,v
retrieving revision 1.122
diff -c -p -r1.122 pquery.c
*** src/backend/tcop/pquery.c	26 Mar 2008 18:48:59 -0000	1.122
--- src/backend/tcop/pquery.c	27 Mar 2008 13:16:16 -0000
*************** CreateQueryDesc(PlannedStmt *plannedstmt
*** 70,77 ****
  	qd->operation = plannedstmt->commandType;	/* operation */
  	qd->plannedstmt = plannedstmt;		/* plan */
  	qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */
! 	qd->snapshot = snapshot;	/* snapshot */
! 	qd->crosscheck_snapshot = crosscheck_snapshot;		/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->doInstrument = doInstrument;	/* instrumentation wanted? */
--- 70,78 ----
  	qd->operation = plannedstmt->commandType;	/* operation */
  	qd->plannedstmt = plannedstmt;		/* plan */
  	qd->utilitystmt = plannedstmt->utilityStmt; /* in case DECLARE CURSOR */
! 	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
! 	/* RI check snapshot */
! 	qd->crosscheck_snapshot = RegisterSnapshot(crosscheck_snapshot);
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
  	qd->doInstrument = doInstrument;	/* instrumentation wanted? */
*************** CreateUtilityQueryDesc(Node *utilitystmt
*** 98,104 ****
  	qd->operation = CMD_UTILITY;	/* operation */
  	qd->plannedstmt = NULL;
  	qd->utilitystmt = utilitystmt;		/* utility command */
! 	qd->snapshot = snapshot;	/* snapshot */
  	qd->crosscheck_snapshot = InvalidSnapshot;	/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
--- 99,105 ----
  	qd->operation = CMD_UTILITY;	/* operation */
  	qd->plannedstmt = NULL;
  	qd->utilitystmt = utilitystmt;		/* utility command */
! 	qd->snapshot = RegisterSnapshot(snapshot);	/* snapshot */
  	qd->crosscheck_snapshot = InvalidSnapshot;	/* RI check snapshot */
  	qd->dest = dest;			/* output dest */
  	qd->params = params;		/* parameter values passed into query */
*************** CreateUtilityQueryDesc(Node *utilitystmt
*** 118,123 ****
--- 119,127 ----
  void
  FreeQueryDesc(QueryDesc *qdesc)
  {
+ 	UnregisterSnapshot(qdesc->snapshot);
+ 	UnregisterSnapshot(qdesc->crosscheck_snapshot);
+ 
  	/* Can't be a live query */
  	Assert(qdesc->estate == NULL);
  	/* Only the QueryDesc itself need be freed */
*************** ProcessQuery(PlannedStmt *plan,
*** 155,161 ****
  	 * Must always set snapshot for plannable queries.	Note we assume that
  	 * caller will take care of restoring ActiveSnapshot on exit/error.
  	 */
! 	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Create the QueryDesc object
--- 159,165 ----
  	 * Must always set snapshot for plannable queries.	Note we assume that
  	 * caller will take care of restoring ActiveSnapshot on exit/error.
  	 */
! 	ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  
  	/*
  	 * Create the QueryDesc object
*************** ProcessQuery(PlannedStmt *plan,
*** 220,230 ****
  	 * Now, we close down all the scans and free allocated resources.
  	 */
  	ExecutorEnd(queryDesc);
  
  	FreeQueryDesc(queryDesc);
- 
- 	FreeSnapshot(ActiveSnapshot);
- 	ActiveSnapshot = NULL;
  }
  
  /*
--- 224,232 ----
  	 * Now, we close down all the scans and free allocated resources.
  	 */
  	ExecutorEnd(queryDesc);
+ 	UnregisterSnapshot(ActiveSnapshot);
  
  	FreeQueryDesc(queryDesc);
  }
  
  /*
*************** PortalStart(Portal portal, ParamListInfo
*** 488,500 ****
  			case PORTAL_ONE_SELECT:
  
  				/*
! 				 * Must set snapshot before starting executor.	Be sure to
! 				 * copy it into the portal's context.
  				 */
  				if (snapshot)
! 					ActiveSnapshot = CopySnapshot(snapshot);
  				else
! 					ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  
  				/*
  				 * Create QueryDesc in portal's context; for the moment, set
--- 490,502 ----
  			case PORTAL_ONE_SELECT:
  
  				/*
! 				 * Must set snapshot before starting executor.  Note: we don't
! 				 * register them here; they're registered in ExecutorStart.
  				 */
  				if (snapshot)
! 					ActiveSnapshot = snapshot;
  				else
! 					ActiveSnapshot = GetTransactionSnapshot();
  
  				/*
  				 * Create QueryDesc in portal's context; for the moment, set
*************** PortalRunUtility(Portal portal, Node *ut
*** 1167,1173 ****
  		  IsA(utilityStmt, NotifyStmt) ||
  		  IsA(utilityStmt, UnlistenStmt) ||
  		  IsA(utilityStmt, CheckPointStmt)))
! 		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  	else
  		ActiveSnapshot = NULL;
  
--- 1169,1175 ----
  		  IsA(utilityStmt, NotifyStmt) ||
  		  IsA(utilityStmt, UnlistenStmt) ||
  		  IsA(utilityStmt, CheckPointStmt)))
! 		ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  	else
  		ActiveSnapshot = NULL;
  
*************** PortalRunUtility(Portal portal, Node *ut
*** 1182,1188 ****
  	MemoryContextSwitchTo(PortalGetHeapMemory(portal));
  
  	if (ActiveSnapshot)
! 		FreeSnapshot(ActiveSnapshot);
  	ActiveSnapshot = NULL;
  }
  
--- 1184,1190 ----
  	MemoryContextSwitchTo(PortalGetHeapMemory(portal));
  
  	if (ActiveSnapshot)
! 		UnregisterSnapshot(ActiveSnapshot);
  	ActiveSnapshot = NULL;
  }
  
Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.107
diff -c -p -r1.107 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c	26 Mar 2008 21:10:39 -0000	1.107
--- src/backend/utils/adt/ri_triggers.c	27 Mar 2008 13:16:16 -0000
*************** RI_Initial_Check(Trigger *trigger, Relat
*** 2756,2767 ****
  	/*
  	 * Run the plan.  For safety we force a current snapshot to be used. (In
  	 * serializable mode, this arguably violates serializability, but we
! 	 * really haven't got much choice.)  We need at most one tuple returned,
! 	 * so pass limit = 1.
  	 */
  	spi_result = SPI_execute_snapshot(qplan,
  									  NULL, NULL,
! 									  CopySnapshot(GetLatestSnapshot()),
  									  InvalidSnapshot,
  									  true, false, 1);
  
--- 2756,2768 ----
  	/*
  	 * Run the plan.  For safety we force a current snapshot to be used. (In
  	 * serializable mode, this arguably violates serializability, but we
! 	 * really haven't got much choice.)  We don't need to register the
! 	 * snapshot, because SPI_execute_snapshot will see to it.  We need at most
! 	 * one tuple returned, so pass limit = 1.
  	 */
  	spi_result = SPI_execute_snapshot(qplan,
  									  NULL, NULL,
! 									  GetLatestSnapshot(),
  									  InvalidSnapshot,
  									  true, false, 1);
  
*************** ri_PerformCheck(RI_QueryKey *qkey, SPIPl
*** 3311,3323 ****
  	 * caller passes detectNewRows == false then it's okay to do the query
  	 * with the transaction snapshot; otherwise we use a current snapshot, and
  	 * tell the executor to error out if it finds any rows under the current
! 	 * snapshot that wouldn't be visible per the transaction snapshot.
  	 */
  	if (IsXactIsoLevelSerializable && detectNewRows)
  	{
  		CommandCounterIncrement();		/* be sure all my own work is visible */
! 		test_snapshot = CopySnapshot(GetLatestSnapshot());
! 		crosscheck_snapshot = CopySnapshot(GetTransactionSnapshot());
  	}
  	else
  	{
--- 3312,3326 ----
  	 * caller passes detectNewRows == false then it's okay to do the query
  	 * with the transaction snapshot; otherwise we use a current snapshot, and
  	 * tell the executor to error out if it finds any rows under the current
! 	 * snapshot that wouldn't be visible per the transaction snapshot.  Note
! 	 * that SPI_execute_snapshot will register the snapshots, so we don't need
! 	 * to bother here.
  	 */
  	if (IsXactIsoLevelSerializable && detectNewRows)
  	{
  		CommandCounterIncrement();		/* be sure all my own work is visible */
! 		test_snapshot = GetLatestSnapshot();
! 		crosscheck_snapshot = GetTransactionSnapshot();
  	}
  	else
  	{
Index: src/backend/utils/time/snapmgr.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/time/snapmgr.c,v
retrieving revision 1.1
diff -c -p -r1.1 snapmgr.c
*** src/backend/utils/time/snapmgr.c	26 Mar 2008 18:48:59 -0000	1.1
--- src/backend/utils/time/snapmgr.c	28 Mar 2008 13:43:43 -0000
***************
*** 14,20 ****
--- 14,22 ----
  
  #include "access/xact.h"
  #include "access/transam.h"
+ #include "storage/proc.h"
  #include "storage/procarray.h"
+ #include "utils/memutils.h"
  #include "utils/snapmgr.h"
  #include "utils/tqual.h"
  
*************** FreeXactSnapshot(void)
*** 170,172 ****
--- 172,372 ----
  	LatestSnapshot = NULL;
  	ActiveSnapshot = NULL;		/* just for cleanliness */
  }
+ 
+ /*
+  * Struct to keep track of the registered snapshots
+  */
+ typedef struct SnapshotElt
+ {
+ 	Snapshot	s_snap;
+ 	uint32		s_refcount;
+ 	uint32		s_level;
+ 	struct SnapshotElt	*s_next;
+ } SnapshotElt;
+ 
+ /*
+  * Head of the list of registered snapshots, and memory context where it
+  * resides
+  */
+ static SnapshotElt	   *SnapshotHead = NULL;
+ static MemoryContext	SnapshotCxt = NULL;
+ 
+ /*
+  * RegisterSnapshot
+  * 		Register a snapshot as being in use
+  *
+  * This registers a snapshot on our list.  If the snapshot has been registered
+  * previously, increment the reference count.
+  *
+  * If InvalidSnapshot or a non-MVCC snapshot is passed, it is not registered.
+  * XXX -- should it be a hard error if a non-MVCC snapshot is passed?
+  */
+ Snapshot
+ RegisterSnapshot(Snapshot snapshot)
+ {
+ 	SnapshotElt	*elt;
+ 	SnapshotElt	*newhead;
+ 	MemoryContext	oldcxt;
+ 
+ 	if (snapshot == InvalidSnapshot)
+ 		return InvalidSnapshot;
+ 
+ 	if (SnapshotCxt == NULL)
+ 	{
+ 		SnapshotCxt = AllocSetContextCreate(TopTransactionContext,
+ 											"Snapshot Context",
+ 											ALLOCSET_DEFAULT_MINSIZE,
+ 											ALLOCSET_DEFAULT_INITSIZE,
+ 											ALLOCSET_DEFAULT_MAXSIZE);
+ 	}
+ 
+ 	for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 	{
+ 		if (elt->s_snap == snapshot)
+ 		{
+ 			elt->s_refcount++;
+ 			return elt->s_snap;
+ 		}
+ 	}
+ 
+ 	oldcxt = MemoryContextSwitchTo(SnapshotCxt);
+ 	newhead = palloc(sizeof(SnapshotElt));
+ 	newhead->s_next = SnapshotHead;
+ 	newhead->s_snap = CopySnapshot(snapshot);
+ 	newhead->s_level = GetCurrentTransactionNestLevel();
+ 	newhead->s_refcount = 1;
+ 	MemoryContextSwitchTo(oldcxt);
+ 
+ 	SnapshotHead = newhead;
+ 
+ 	return SnapshotHead->s_snap;
+ }
+ 
+ /*
+  * UnregisterSnapshot
+  * 		Signals that a snapshot is no longer necessary
+  *
+  * If the reference count falls to zero, the memory is released.
+  */
+ void
+ UnregisterSnapshot(Snapshot snapshot)
+ {
+ 	SnapshotElt	*prev = NULL;
+ 	SnapshotElt	*elt;
+ 	bool		found = false;
+ 
+ 	if (snapshot == InvalidSnapshot)
+ 		return;
+ 
+ 	for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 	{
+ 		if (elt->s_snap == snapshot)
+ 		{
+ 			elt->s_refcount--;
+ 			found = true;
+ 
+ 			if (elt->s_refcount == 0)
+ 			{
+ 				if (prev)
+ 					prev->s_next = elt->s_next;
+ 				else
+ 					SnapshotHead = elt->s_next;
+ 
+ 				elog(LOG, "found: refcount is now 0");
+ 
+ 				pfree(elt);
+ 			}
+ 			break;
+ 		}
+ 
+ 		prev = elt;
+ 	}
+ 
+ 	if (!found)
+ 		elog(WARNING, "unregistering failed for snapshot %p", snapshot);
+ }
+ 
+ /*
+  * At subtransaction abort, unregister all snapshots registered during this
+  * subtransaction.
+  * */
+ void
+ AtSubAbort_Snapshot(void)
+ {
+ 	SnapshotElt	*prev = NULL;
+ 	SnapshotElt	*elt;
+ 	int		level = GetCurrentTransactionNestLevel();
+ 
+ 	for (elt = SnapshotHead; elt != NULL;)
+ 	{
+ 		if (elt->s_level == level)
+ 		{
+ 			SnapshotElt	*tofree;
+ 
+ 			if (prev)
+ 				prev->s_next = elt->s_next;
+ 			else
+ 				SnapshotHead = elt->s_next;
+ 
+ 			tofree = elt;
+ 			elt = elt->s_next;
+ 			pfree(tofree);
+ 		}
+ 		else
+ 		{
+ 			prev = elt;
+ 			elt = elt->s_next;
+ 		}
+ 	}
+ }
+ 
+ /*
+  * At subtransaction commit, reassign all snapshots to the parent subxact.
+  */
+ void
+ AtSubCommit_Snapshot(void)
+ {
+ 	SnapshotElt	*elt;
+ 	int		level = GetCurrentTransactionNestLevel();
+ 
+ 	for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 	{
+ 		if (elt->s_level == level)
+ 			elt->s_level--;
+ 	}
+ }
+ 
+ /*
+  * AtEOXact_Snapshot
+  *		Main transaction end handler for snapshots. 
+  *
+  * At xact commit, we want to complain about any leftover snapshots, as a
+  * measure against forgotten snapshot cleanup calls.  On xact abort, it is
+  * expected that some snapshots are left behind; just cleanup our module.
+  */
+ void
+ AtEOXact_Snapshot(bool is_commit)
+ {
+ 	if (SnapshotCxt == NULL)
+ 		return;
+ 
+ 	if (is_commit)
+ 	{
+ 		SnapshotElt	*elt;
+ 
+ 		/* complain about any unregistered snapshot */
+ 		for (elt = SnapshotHead; elt != NULL; elt = elt->s_next)
+ 		{
+ 			ereport(WARNING,
+ 					(errmsg("snapshot %p not destroyed at commit (%d refs)",
+ 							elt->s_snap, elt->s_refcount)));
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * And forget about them.  We don't need to reset the context -- it'll
+ 	 * go away with TopTransactionContext.
+ 	 */
+ 	SnapshotCxt = NULL;
+ 	SnapshotHead = NULL;
+ }
Index: src/include/utils/snapmgr.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/utils/snapmgr.h,v
retrieving revision 1.1
diff -c -p -r1.1 snapmgr.h
*** src/include/utils/snapmgr.h	26 Mar 2008 18:48:59 -0000	1.1
--- src/include/utils/snapmgr.h	27 Mar 2008 13:46:58 -0000
*************** extern Snapshot CopySnapshot(Snapshot sn
*** 30,33 ****
--- 30,41 ----
  extern void FreeSnapshot(Snapshot snapshot);
  extern void FreeXactSnapshot(void);
  
+ extern Snapshot RegisterSnapshot(Snapshot snapshot);
+ extern void UnregisterSnapshot(Snapshot snapshot);
+ extern void AtSubAbort_Snapshot(void);
+ extern void AtSubCommit_Snapshot(void);
+ extern void AtEOXact_Snapshot(bool is_commit);
+ extern void AtPrepare_Snapshot(void);
+ extern void PostPrepare_Snapshot(void);
+ 
  #endif /* SNAPMGR_H */
Index: src/pl/plpgsql/src/pl_exec.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/pl/plpgsql/src/pl_exec.c,v
retrieving revision 1.206
diff -c -p -r1.206 pl_exec.c
*** src/pl/plpgsql/src/pl_exec.c	26 Mar 2008 18:48:59 -0000	1.206
--- src/pl/plpgsql/src/pl_exec.c	27 Mar 2008 13:16:24 -0000
*************** exec_eval_simple_expr(PLpgSQL_execstate 
*** 4241,4247 ****
  		if (!estate->readonly_func)
  		{
  			CommandCounterIncrement();
! 			ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
  		}
  
  		/*
--- 4241,4247 ----
  		if (!estate->readonly_func)
  		{
  			CommandCounterIncrement();
! 			ActiveSnapshot = RegisterSnapshot(GetTransactionSnapshot());
  		}
  
  		/*
*************** exec_eval_simple_expr(PLpgSQL_execstate 
*** 4252,4257 ****
--- 4252,4260 ----
  							   isNull,
  							   NULL);
  		MemoryContextSwitchTo(oldcontext);
+ 
+ 		if (!estate->readonly_func)
+ 			UnregisterSnapshot(ActiveSnapshot);
  	}
  	PG_CATCH();
  	{
