diff -c -Nr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/allfiles.sgml 07twophase/doc/src/sgml/ref/allfiles.sgml
*** 00orig/doc/src/sgml/ref/allfiles.sgml 2004-08-23 11:14:41.000000000 -0400
--- 07twophase/doc/src/sgml/ref/allfiles.sgml 2005-05-25 17:35:12.000000000 -0400
***************
*** 30,35 ****
--- 30,36 ----
+
***************
*** 88,98 ****
--- 89,101 ----
+
+
diff -c -Nr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/commit_prepared.sgml 07twophase/doc/src/sgml/ref/commit_prepared.sgml
*** 00orig/doc/src/sgml/ref/commit_prepared.sgml 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/doc/src/sgml/ref/commit_prepared.sgml 2005-05-25 17:35:12.000000000 -0400
***************
*** 0 ****
--- 1,105 ----
+
+
+
+
+ COMMIT PREPARED
+ SQL - Language Statements
+
+
+
+ COMMIT PREPARED
+ commit a transaction that was earlier prepared for two-phase commit
+
+
+
+ COMMIT PREPARED
+
+
+
+
+ COMMIT PREPARED 'global transaction id'
+
+
+
+
+ Description
+
+
+ COMMIT PREPARED commits a transaction that is in
+ prepared state.
+
+
+
+ You can check all current transactions from the pg_prepared_xacts
+ system view.
+
+
+
+
+ Parameters
+
+
+
+ global_transaction_id
+
+
+ The global transaction identifier of the transaction that is to be
+ rolled back.
+
+
+
+
+
+
+
+ Notes
+
+
+ This command works outside transaction control. The prepared
+ transaction is committed immediately.
+
+
+
+
+ Examples
+
+ Commits the transaction identified by the global transaction
+ identifier 'foobar':
+
+
+ COMMIT PREPARED 'foobar'
+
+
+
+
+
+
+ See Also
+
+
+
+
+
+
+
+
+
+
diff -c -Nr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/prepare_transaction.sgml 07twophase/doc/src/sgml/ref/prepare_transaction.sgml
*** 00orig/doc/src/sgml/ref/prepare_transaction.sgml 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/doc/src/sgml/ref/prepare_transaction.sgml 2005-05-25 17:35:12.000000000 -0400
***************
*** 0 ****
--- 1,105 ----
+
+
+
+
+ PREPARE TRANSACTION
+ SQL - Language Statements
+
+
+
+ PREPARE TRANSACTION
+ prepare the current transaction for two-phase commit
+
+
+
+ PREPARE TRANSACTION
+
+
+
+
+ PREPARE TRANSACTION 'global transaction id'
+
+
+
+
+ Description
+
+
+ PREPARE TRANSACTION prepares the transaction for two-phase
+ commit. After this command, the current transaction is no longer
+ associated with its backend. It can later be committed or rolled
+ back with COMMIT PREPARED or
+ ROLLBACK PREPARED, respectively.
+
+
+
+ All prepared transactions are listed in the pg_prepared_xacts system view.
+
+
+
+
+ Parameters
+
+
+
+ global transaction id
+
+
+ An arbitrary identifier that later acts as a handle to this transaction.
+
+
+
+
+
+
+
+ Notes
+
+
+ This command must be used inside a transaction. Use
+ BEGIN to start one.
+
+
+
+
+ Examples
+
+ Prepare the current transaction for two-phase commit, using
+ 'foobar' as the global transaction identifier:
+
+
+ PREPARE TRANSACTION 'foobar'
+
+
+
+
+
+ See Also
+
+
+
+
+
+
+
+
+
+
diff -c -Nr --exclude-from=diff-ignore 00orig/doc/src/sgml/ref/rollback_prepared.sgml 07twophase/doc/src/sgml/ref/rollback_prepared.sgml
*** 00orig/doc/src/sgml/ref/rollback_prepared.sgml 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/doc/src/sgml/ref/rollback_prepared.sgml 2005-05-25 17:35:12.000000000 -0400
***************
*** 0 ****
--- 1,105 ----
+
+
+
+
+ ROLLBACK PREPARED
+ SQL - Language Statements
+
+
+
+ ROLLBACK PREPARED
+ abort a transaction that was earlier prepared for two-phase commit
+
+
+
+ ROLLBACK PREPARED
+
+
+
+
+ ROLLBACK PREPARED 'global transaction id'
+
+
+
+
+ Description
+
+
+ ROLLBACK PREPARED rolls back a transaction that is in
+ prepared state.
+
+
+
+ You can check all current transactions from the pg_prepared_xacts
+ system view.
+
+
+
+
+ Parameters
+
+
+
+ global_transaction_id
+
+
+ The global transaction identifier of the transaction that is to be
+ rolled back.
+
+
+
+
+
+
+
+ Notes
+
+
+ This command works outside transaction control. The prepared
+ transaction is rolled back immediately.
+
+
+
+
+ Examples
+
+ Rolls back the transaction identified by the global transaction
+ identifier 'foobar':
+
+
+ ROLLBACK PREPARED 'foobar'
+
+
+
+
+
+
+ See Also
+
+
+
+
+
+
+
+
+
+
diff -c -Nr --exclude-from=diff-ignore 00orig/doc/src/sgml/reference.sgml 07twophase/doc/src/sgml/reference.sgml
*** 00orig/doc/src/sgml/reference.sgml 2004-08-23 11:14:41.000000000 -0400
--- 07twophase/doc/src/sgml/reference.sgml 2005-05-25 17:35:12.000000000 -0400
***************
*** 62,67 ****
--- 62,68 ----
&cluster;
&commentOn;
&commit;
+ &commitPrepared;
©Table;
&createAggregate;
&createCast;
***************
*** 120,130 ****
--- 121,133 ----
&move;
¬ify;
&prepare;
+ &prepareTransaction;
&reindex;
&releaseSavepoint;
&reset;
&revoke;
&rollback;
+ &rollbackPrepared;
&rollbackTo;
&savepoint;
&select;
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/access/transam/Makefile 07twophase/src/backend/access/transam/Makefile
*** 00orig/src/backend/access/transam/Makefile 2005-04-28 19:39:54.000000000 -0400
--- 07twophase/src/backend/access/transam/Makefile 2005-05-25 17:35:12.000000000 -0400
***************
*** 12,18 ****
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o
all: SUBSYS.o
--- 12,18 ----
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
! OBJS = clog.o transam.o varsup.o xact.o xlog.o xlogutils.o rmgr.o slru.o subtrans.o multixact.o twophase.o twophase_rmgr.o
all: SUBSYS.o
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/access/transam/twophase.c 07twophase/src/backend/access/transam/twophase.c
*** 00orig/src/backend/access/transam/twophase.c 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/src/backend/access/transam/twophase.c 2005-06-07 17:11:07.000000000 -0400
***************
*** 0 ****
--- 1,1112 ----
+ /*-------------------------------------------------------------------------
+ *
+ * twophase.c
+ * Two-phase commit support functions.
+ *
+ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ * NOTES
+ * Each global transaction is associated with a global transaction
+ * identifier (GID). The client assigns a GID to a postgres
+ * transaction with the PREPARE TRANSACTION command.
+ *
+ * We keep all active global transactions in a shared memory array.
+ * When the PREPARE TRANSACTION command is issued, the GID is
+ * reserved for the transaction in the array. This is done before
+ * a WAL entry is made, because the reservation checks for duplicate
+ * GIDs and aborts the transaction if there already is a global
+ * transaction in prepared state with the same GID.
+ *
+ * In order to survive crashes and shutdowns, all prepared
+ * transactions must be stored in permanent storage. This includes
+ * locking information, pending notifications etc. All that state
+ * information is written to the per-transaction state file in
+ * the pg_twophase directory.
+ *
+ *-------------------------------------------------------------------------
+ */
+ #include "postgres.h"
+
+ #include
+ #include
+ #include
+ #include
+
+ #include "access/heapam.h"
+ #include "access/xact.h"
+ #include "access/twophase.h"
+ #include "access/twophase_rmgr.h"
+ #include "access/subtrans.h"
+ #include "catalog/pg_type.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "utils/builtins.h"
+ #include "storage/fd.h"
+ #include "storage/lock.h"
+ #include "storage/lwlock.h"
+ #include "storage/proc.h"
+
+
+ /*
+ * Directory where Two-phase commit files reside within PGDATA
+ */
+ #define TWOPHASE_DIR "pg_twophase"
+
+ /* GUC variable, can't be changed after startup */
+ int max_prepared_xacts = 100;
+
+ /*
+ * This struct describes one global transaction that is in prepared state.
+ */
+ typedef struct GlobalTransactionData
+ {
+ char gid[GIDSIZE];
+ TransactionId xid;
+ AclId owner;
+
+ /*
+ * Indicates that the prepare phase has completely finished. When false,
+ * the transaction is still under original backends control and must not be
+ * committed or rolled back yet.
+ */
+ bool fullyPrepared;
+ } GlobalTransactionData;
+ /*
+ * typedef struct GlobalTransactionData *GlobalTransaction appears in
+ * twophase.h
+ */
+
+ /*
+ * Two Phase Commit shared state. Access to this struct is protected
+ * by TwoPhaseStateLock.
+ */
+ typedef struct TwoPhaseStateData
+ {
+ /* Dummy PGPROC entry which will hold the persisted locks. */
+ PGPROC pgproc;
+
+ /* number of used prepXacts items. */
+ int numPrepXacts;
+ /*
+ * There are max_prepared_xacts items in this array, but C wants a
+ * fixed-size array.
+ */
+ GlobalTransactionData prepXacts[1];
+ } TwoPhaseStateData; /* Actual data follows at end of struct */
+
+ static TwoPhaseStateData *TwoPhaseState;
+
+
+ static void RecordTransactionCommitPrepared(TransactionId xid, int nchildren,
+ TransactionId *children);
+ static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren,
+ TransactionId *children);
+ static void ProcessRecords(int fd, TransactionId xid,
+ const TwoPhaseCallback callbacks[]);
+
+
+ /*
+ * MarkAsPreparing
+ * Reserves the gid for the given transaction.
+ */
+ GlobalTransaction
+ MarkAsPreparing(TransactionId xid, char *gid, AclId owner)
+ {
+ GlobalTransaction gxact;
+ int i;
+
+ printf("MarkAsPreparing: %d, %s\n", xid, gid);
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ if (TwoPhaseState->numPrepXacts >= max_prepared_xacts)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("maximum number of prepared transactions reached")));
+ /* TODO: proper error code and hint */
+ }
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ if (strcmp(TwoPhaseState->prepXacts[i].gid, gid) == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+ errmsg("global transaction identifier \"%s\" is already in use", gid)));
+ }
+ }
+
+ gxact = &TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
+ (TwoPhaseState->numPrepXacts)++;
+
+ TransactionIdStore(xid, &gxact->xid);
+ strncpy(gxact->gid, gid, GIDSIZE);
+ gxact->owner = owner;
+ gxact->fullyPrepared = false;
+
+ LWLockRelease(TwoPhaseStateLock);
+ return gxact;
+ }
+
+ void
+ MarkAsPrepared(GlobalTransaction gxact)
+ {
+ /* Since the flag is only set, no need for locking. */
+ Assert(!gxact->fullyPrepared);
+ gxact->fullyPrepared = true;
+ }
+
+ /*
+ * Removes the prepared transaction from the shared memory array.
+ */
+ TransactionId
+ MarkAsNoLongerPrepared(char *gid, AclId user)
+ {
+ GlobalTransaction gxact;
+ TransactionId xid;
+ int i;
+
+ printf("MarkAsNoLongerPrepared: %s\n", gid);
+
+ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ gxact = &TwoPhaseState->prepXacts[i];
+ if (strncmp(gxact->gid, gid, GIDSIZE) == 0)
+ {
+ if (!gxact->fullyPrepared)
+ elog(ERROR, "The prepare phase for that transaction isn't finished yet.");
+
+ xid = gxact->xid;
+
+ if (user != gxact->owner && !superuser_arg(user))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to finish prepared transaction"),
+ errhint("Must be superuser or the user that prepared the transaction to finish it.")));
+
+ /* Move the last gxact to this slot */
+ if (i != (TwoPhaseState->numPrepXacts) - 1)
+ {
+ TwoPhaseState->prepXacts[i] =
+ TwoPhaseState->prepXacts[(TwoPhaseState->numPrepXacts) - 1];
+ }
+ TwoPhaseState->numPrepXacts--;
+
+ LWLockRelease(TwoPhaseStateLock);
+ return xid;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ elog(ERROR, "No prepared transaction with gid %s", gid);
+
+ return InvalidTransactionId; /* Unreachable code */
+ }
+
+ /*
+ * Returns an array of all prepared transactions for the user-level
+ * function pg_prepared_xact.
+ *
+ * The returned array and all its elements are copies of internal data
+ * structures, to minimize the time we need to hold the TwoPhaseStateLock.
+ *
+ * WARNING -- we return even those transactions that are not fully prepared
+ * yet. The caller should filter them out if he doesn't want them.
+ *
+ * The returned array is palloc'd.
+ */
+ static int
+ GetPreparedTransactionList(GlobalTransactionData **gxacts)
+ {
+ GlobalTransactionData *array;
+ int num;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ if (TwoPhaseState->numPrepXacts == 0)
+ {
+ LWLockRelease(TwoPhaseStateLock);
+
+ *gxacts = NULL;
+ return 0;
+ }
+
+ num = TwoPhaseState->numPrepXacts;
+ array = palloc(sizeof(GlobalTransactionData) * num);
+ memcpy(array, TwoPhaseState->prepXacts, sizeof(GlobalTransactionData) * num);
+ *gxacts = array;
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return num;
+ }
+
+ /* Working status for pg_prepared_xact */
+ typedef struct
+ {
+ GlobalTransactionData *array;
+ int ngxacts;
+ int currIdx;
+ } Working_State;
+
+ /*
+ * pg_prepared_xact
+ * Produce a view with one row per prepared transaction.
+ *
+ * This function is here so we don't have to export the
+ * GlobalTransactionData struct definition.
+ */
+ Datum
+ pg_prepared_xact(PG_FUNCTION_ARGS)
+ {
+ FuncCallContext *funcctx;
+ Working_State *status;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ TupleDesc tupdesc;
+ MemoryContext oldcontext;
+
+ /* create a function context for cross-call persistence */
+ funcctx = SRF_FIRSTCALL_INIT();
+
+ /*
+ * Switch to memory context appropriate for multiple function
+ * calls
+ */
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ /* build tupdesc for result tuples */
+ /* this had better match pg_prepared_xacts view in system_views.sql */
+ tupdesc = CreateTemplateTupleDesc(3, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
+ XIDOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
+ TEXTOID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "owner",
+ NAMEOID, -1, 0);
+
+ funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+ /*
+ * Collect all the 2PC status information that we will format and
+ * send out as a result set.
+ */
+ status = (Working_State *) palloc(sizeof(Working_State));
+ funcctx->user_fctx = (void *) status;
+
+ status->ngxacts = GetPreparedTransactionList(&status->array);
+ status->currIdx = 0;
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ funcctx = SRF_PERCALL_SETUP();
+ status = (Working_State *) funcctx->user_fctx;
+
+ while (status->array != NULL && status->currIdx < status->ngxacts)
+ {
+ Datum values[3];
+ bool nulls[3];
+ HeapTuple tuple;
+ Datum result;
+ GlobalTransactionData gxact = status->array[status->currIdx++];
+
+ if (!gxact.fullyPrepared)
+ continue;
+
+ /*
+ * Form tuple with appropriate data.
+ */
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = TransactionIdGetDatum(gxact.xid);
+ values[1] = DirectFunctionCall1(textin, CStringGetDatum(gxact.gid));
+ values[2] = DirectFunctionCall1(namein,
+ CStringGetDatum(GetUserNameFromId(gxact.owner)));
+
+ tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
+ result = HeapTupleGetDatum(tuple);
+ SRF_RETURN_NEXT(funcctx, result);
+ }
+
+ SRF_RETURN_DONE(funcctx);
+ }
+
+
+ /*
+ * Iterator routines for scanning through all currently
+ * prepared transactions.
+ */
+
+ static int scan_ptr;
+
+ int
+ StartPreparedTransactionListScan(void)
+ {
+ scan_ptr = 0;
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ return TwoPhaseState->numPrepXacts;
+ }
+
+ TransactionId
+ NextPreparedTransactionId(void)
+ {
+ if (scan_ptr >= TwoPhaseState->numPrepXacts)
+ return InvalidTransactionId;
+
+ return TwoPhaseState->prepXacts[scan_ptr++].xid;
+ }
+
+
+ void
+ EndPreparedTransactionListScan(void)
+ {
+ LWLockRelease(TwoPhaseStateLock);
+ }
+
+ /*
+ * TransactionIdIsPrepared
+ * True iff transaction associated with the identifier is prepared
+ * for two-phase commit
+ *
+ * Note:
+ * Assumes transaction identifier is valid.
+ */
+ bool
+ TransactionIdIsPrepared(TransactionId xid)
+ {
+ int i;
+ bool result = false;
+
+ xid = SubTransGetTopmostTransaction(xid);
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+ for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ if (TransactionIdEquals(TwoPhaseState->prepXacts[i].xid, xid))
+ {
+ result = true;
+ break;
+ }
+ }
+
+ LWLockRelease(TwoPhaseStateLock);
+
+ return result;
+ }
+
+ void
+ BootStrapTwoPhase(void)
+ {
+ /* NOP */
+ }
+
+ /*
+ * Initialization of shared memory
+ */
+ int
+ TwoPhaseShmemSize(void)
+ {
+ #define SIZEOF_TWOPHASE_SHMEM \
+ sizeof(TwoPhaseStateData) + \
+ sizeof(GlobalTransactionData) * max_prepared_xacts
+ return SIZEOF_TWOPHASE_SHMEM;
+ }
+
+
+ void
+ TwoPhaseShmemInit(void)
+ {
+ bool found;
+
+ TwoPhaseState = ShmemInitStruct("global transaction lookup table",
+ SIZEOF_TWOPHASE_SHMEM,
+ &found);
+ if (!IsUnderPostmaster)
+ {
+ Assert(!found);
+ MemSet(&TwoPhaseState->pgproc, 0, sizeof(PGPROC));
+ TwoPhaseState->pgproc.pid = 0;
+ SHMQueueInit(&TwoPhaseState->pgproc.procLocks);
+ }
+ else
+ Assert(found);
+ }
+
+ PGPROC *
+ TwoPhaseGetDummyProc(void)
+ {
+ return &TwoPhaseState->pgproc;
+ }
+
+ /************************************************************************/
+ /* State file support */
+ /************************************************************************/
+
+ #define TwoPhaseFilePath(path, xid) \
+ snprintf(path, MAXPGPATH, "%s/%s/%08X", DataDir, TWOPHASE_DIR, xid)
+
+ static void register_statefileblock(void *data, uint32 len);
+
+
+ /**
+ * File format
+ *
+ * 1. TwoPhaseFileHeader
+ * 2. TransactionId (subtransactions)
+ * 3. TwoPhaseRecordOnDisk
+ * 4. ...
+ *
+ * 7. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END)
+ * 8. CRC32
+ */
+
+ /*
+ * Header for a 2PC state file
+ */
+ typedef struct TwoPhaseFileHeader
+ {
+ TransactionId xid;
+ AclId owner;
+ char gid[GIDSIZE];
+ int nsubxacts;
+ } TwoPhaseFileHeader;
+
+ /*
+ * Header for each record in a state file
+ *
+ * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
+ */
+ typedef struct TwoPhaseRecordOnDisk
+ {
+ uint32 len; /* length of rmgr data */
+ TwoPhaseRmgrId rmid; /* resource manager for this record */
+
+ /* ACTUAL DATA FOLLOWS AT END OF STRUCT */
+ char data[0];
+ } TwoPhaseRecordOnDisk;
+
+ /* On prepare, the state file is assembled in memory before writing it to
+ * WAL and the actual state file.
+ */
+ struct xllist
+ {
+ XLogRecData *head; /* first data block in the chain */
+ XLogRecData *tail; /* last block in chain */
+ int bytesFree; /* there is bytesFree bytes left in tail block */
+ } records;
+
+ /*
+ * Appends a block of data to records data structure.
+ * The data is copied, so the caller is free to modify it
+ * afterwards.
+ */
+ static void
+ register_statefileblock(void *data, uint32 len)
+ {
+ if (len > records.bytesFree)
+ {
+ records.tail->next = palloc0(sizeof(XLogRecData));
+ records.tail = records.tail->next;
+ records.tail->buffer = InvalidBuffer;
+ records.tail->next = NULL;
+
+ records.bytesFree = len > 512 ? len : 512;
+ records.tail->data = palloc0(records.bytesFree);
+ }
+ else
+ records.bytesFree -= len;
+
+ memcpy(&((char *)records.tail->data)[records.tail->len], data, len);
+ records.tail->len += len;
+ }
+
+ /*
+ * Starts preparing a state file.
+ */
+ void
+ StartPrepare(char *gid)
+ {
+ TwoPhaseFileHeader hdr;
+ TransactionId *children;
+
+ /* Initialize linked list */
+ records.head = palloc(sizeof(XLogRecData));
+ records.head->data = palloc(512);
+ records.head->len = 0;
+ records.head->next = NULL;
+ records.head->buffer = InvalidBuffer;
+ records.bytesFree = 512;
+
+ records.tail = records.head;
+
+ /* Create header */
+
+ strncpy(hdr.gid, gid, GIDSIZE);
+ TransactionIdStore(GetTopTransactionId(), &hdr.xid);
+ hdr.owner = GetUserId();
+ hdr.nsubxacts = xactGetCommittedChildren(&children);
+
+ register_statefileblock(&hdr, sizeof(TwoPhaseFileHeader));
+ register_statefileblock(children, hdr.nsubxacts * sizeof(TransactionId));
+ }
+
+ /*
+ * Finishes state file. Calculates CRC and writes state file to WAL and in
+ * pg_twophase directory.
+ */
+ void
+ EndPrepare(void)
+ {
+ char path[MAXPGPATH];
+ TwoPhaseRecordOnDisk end_record;
+ TransactionId xid = GetTopTransactionId();
+ XLogRecData *record;
+ XLogRecPtr recptr;
+ static pg_crc32 statefile_crc;
+ int twophase_fd;
+
+ /* register end sentinel */
+ end_record.len = 0;
+ end_record.rmid = 0;
+ register_statefileblock(&end_record, sizeof(end_record));
+
+ /*
+ * We have to record transaction prepares even if we didn't
+ * make any updates, because the transaction manager might
+ * get confused if we lose a global transaction.
+ */
+
+ /* Create file and initialize CRC */
+ INIT_CRC32(statefile_crc);
+
+ TwoPhaseFilePath(path, xid);
+
+ twophase_fd = BasicOpenFile(path, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (twophase_fd < 0)
+ {
+ close(twophase_fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create twophase state file \"%s\": %m",
+ path)));
+ }
+
+ /* Write records to file. */
+
+ record = records.head;
+ while (record != NULL)
+ {
+ if ((write(twophase_fd, record->data, record->len)) != record->len)
+ {
+ close(twophase_fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to twophase state file: %m")));
+ }
+
+ COMP_CRC32(statefile_crc, record->data, record->len);
+
+ record = record->next;
+ }
+
+ FIN_CRC32(statefile_crc);
+
+ /* The state file isn't valid yet, because we haven't written the CRC yet. */
+ /* But before we do that, insert entry to WAL */
+
+ START_CRIT_SECTION();
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, records.head);
+ XLogFlush(recptr);
+ END_CRIT_SECTION();
+
+ /* If we crash now, we have already prepared */
+
+ /* write CRC and close file */
+
+ if ((write(twophase_fd, &statefile_crc, sizeof(statefile_crc))) != sizeof(statefile_crc))
+ {
+ close(twophase_fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to twophase state file: %m")));
+ }
+
+ close(twophase_fd);
+
+ records.tail = records.head = NULL;
+ }
+
+ /*
+ * Registers a record to be written to state file.
+ */
+ void
+ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, void *data, uint32 len)
+ {
+ TwoPhaseRecordOnDisk record;
+ record.rmid = rmid;
+ record.len = len;
+
+ register_statefileblock(&record, sizeof(TwoPhaseRecordOnDisk));
+ register_statefileblock(data, len);
+ }
+
+
+ /* Buffer size used in ValidateTwoPhaseFile */
+ #define BUF_SIZE 512
+
+ /*
+ * Validates the state file for xid. Returns true if
+ * it looks OK.
+ *
+ * A state file is valid if it has a valid checksum.
+ */
+ static bool
+ ValidateTwoPhaseFile(TransactionId xid)
+ {
+ char path[MAXPGPATH];
+ char buf[BUF_SIZE];
+ pg_crc32 calc_crc, file_crc;
+ struct stat stat;
+ off_t total_read = 0;
+ int fd;
+ int len;
+
+ TwoPhaseFilePath(path, xid);
+
+ elog(DEBUG1, "ValidateTwoPhaseFile(%x)", xid);
+
+ fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not open twophase state file \"%s\": %m",
+ path)));
+ return false;
+ }
+
+ fstat(fd, &stat);
+
+ /* Read through the file and accumulate the CRC until
+ * end - sizeof(pg_crc32). */
+
+ INIT_CRC32(calc_crc);
+ for (;;)
+ {
+ int bytesleft = stat.st_size - total_read - sizeof(pg_crc32);
+
+ len = read(fd, buf, bytesleft < BUF_SIZE ? bytesleft : BUF_SIZE);
+ total_read += len;
+
+ if (len == -1)
+ {
+ close(fd);
+ return false;
+ }
+
+ if (len == 0)
+ break;
+
+ COMP_CRC32(calc_crc, buf, len);
+ }
+ FIN_CRC32(calc_crc);
+
+ len = read(fd, &file_crc, sizeof(pg_crc32));
+ close(fd);
+
+ if (len != sizeof(pg_crc32))
+ return false;
+
+ return EQ_CRC32(calc_crc, file_crc);
+ }
+
+ static void
+ read_with_ereport(int fd, void *data, int len)
+ {
+ int read_len;
+
+ read_len = read(fd, data, len);
+
+ if (read_len != len)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("error reading twophase state file: %m")));
+ }
+ }
+
+ void
+ FinishPreparedTransaction(char *gid, bool isCommit)
+ {
+ TransactionId xid;
+ TransactionId *children;
+ char path[MAXPGPATH];
+ int len;
+ int fd;
+ TwoPhaseFileHeader hdr;
+
+ xid = MarkAsNoLongerPrepared(gid, GetUserId());
+
+ TwoPhaseFilePath(path, xid);
+
+ elog(DEBUG1, "FinishPreparedTransaction (%x)", xid);
+
+ fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open twophase state file \"%s\": %m",
+ path)));
+ }
+
+ read_with_ereport(fd, &hdr, sizeof(TwoPhaseFileHeader));
+
+ Assert(TransactionIdEquals(hdr.xid, xid));
+
+ len = hdr.nsubxacts * sizeof(TransactionId);
+ children = palloc(len);
+ read_with_ereport(fd, children, len);
+
+ if (isCommit)
+ {
+ RecordTransactionCommitPrepared(xid, hdr.nsubxacts, children);
+ ProcessRecords(fd, xid, postcommit_callbacks);
+ }
+ else
+ {
+ RecordTransactionAbortPrepared(xid, hdr.nsubxacts, children);
+ ProcessRecords(fd, xid, postabort_callbacks);
+ }
+ LockReleaseAllForPrepared(xid);
+
+ close(fd);
+
+ RemoveTwoPhaseFile(xid);
+ }
+
+ /*
+ * Reads the state file for the transaction. Calls the
+ * callbacks for each record in the file.
+ */
+ static void
+ ProcessRecords(int fd, TransactionId xid, const TwoPhaseCallback callbacks[])
+ {
+ TwoPhaseRecordOnDisk *record;
+
+ elog(DEBUG1, "ProcessRecords (%x)", xid);
+
+ record = palloc(sizeof(TwoPhaseRecordOnDisk));
+ for (;;)
+ {
+ /* Read record header */
+ read_with_ereport(fd, record, sizeof(TwoPhaseRecordOnDisk));
+
+ Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
+
+ /* Watch for end sentinel */
+ if (record->rmid == TWOPHASE_RM_END_ID)
+ break;
+
+ /* Read resource manager specific data and call the hook */
+ record = repalloc(record, sizeof(TwoPhaseRecordOnDisk) + record->len);
+ read_with_ereport(fd, record->data, record->len);
+
+ if (callbacks[record->rmid] != NULL)
+ callbacks[record->rmid](xid, record->data, record->len);
+ }
+ pfree(record);
+ }
+
+ void RemoveTwoPhaseFile(TransactionId xid)
+ {
+ char path[MAXPGPATH];
+ TwoPhaseFilePath(path, xid);
+
+ if (unlink(path))
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("could not remove two-phase state file \"%s\": %m",
+ path)));
+ }
+
+ /*
+ * Recreates a state file. This is used in WAL replay.
+ *
+ * Note: content and len doesn't include CRC.
+ */
+ void
+ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
+ {
+ char path[MAXPGPATH];
+ pg_crc32 statefile_crc;
+ int fd;
+
+ TwoPhaseFilePath(path, xid);
+
+ fd = BasicOpenFile(path, O_CREAT | O_WRONLY | PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not recreate twophase state file \"%s\": %m",
+ path)));
+ }
+
+ /* Write content */
+ INIT_CRC32(statefile_crc);
+ COMP_CRC32(statefile_crc, content, len);
+ FIN_CRC32(statefile_crc);
+
+ if (write(fd, content, len) != len)
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to twophase state file: %m")));
+ }
+
+ if (write(fd, &statefile_crc, sizeof(statefile_crc)) != sizeof(statefile_crc))
+ {
+ close(fd);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to twophase state file: %m")));
+ }
+
+ close(fd);
+ }
+
+ /*
+ * Scans the pg_twophase directory and recovers every found
+ * prepared transaction
+ */
+ void
+ RecoverPreparedTransactions(void)
+ {
+ DIR *cldir;
+ struct dirent *clde;
+ char dir[MAXPGPATH];
+
+ elog(LOG, "Recovering prepared transactions");
+ snprintf(dir, MAXPGPATH, "%s/%s", DataDir, TWOPHASE_DIR);
+
+ cldir = AllocateDir(dir);
+ if (cldir == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open directory \"%s\": %m", dir)));
+
+ errno = 0;
+
+ while ((clde = readdir(cldir)) != NULL)
+ {
+ if (strlen(clde->d_name) == 8 &&
+ strspn(clde->d_name, "0123456789ABCDEF") == 8)
+ {
+ TransactionId xid;
+ int i;
+ sscanf(clde->d_name, "%X", &xid);
+
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("removing stale twophase state file \"%s\"",
+ clde->d_name)));
+ RemoveTwoPhaseFile(xid);
+ continue;
+ }
+
+ if (!ValidateTwoPhaseFile(xid))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("removing corrupt twophase state file \"%s\"",
+ clde->d_name)));
+
+ RemoveTwoPhaseFile(xid);
+ continue;
+ }
+
+ /* The transaction looks good. Read state file. */
+ {
+ char path[MAXPGPATH];
+ int fd;
+ TwoPhaseFileHeader hdr;
+ GlobalTransaction gxact;
+
+ TwoPhaseFilePath(path, xid);
+
+ elog(DEBUG1, "OpenTwoPhaseFile (%x)", xid);
+
+ fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open twophase state file \"%s\": %m",
+ path)));
+ }
+
+ read_with_ereport(fd, &hdr, sizeof(TwoPhaseFileHeader));
+
+ Assert(TransactionIdEquals(hdr.xid, xid));
+
+ for (i = 0; i < hdr.nsubxacts; i++)
+ {
+ TransactionId subxid;
+ read_with_ereport(fd, &subxid, sizeof(TransactionId));
+
+ SubTransSetParent(subxid, xid);
+ }
+
+ gxact = MarkAsPreparing(xid, hdr.gid, hdr.owner);
+ MarkAsPrepared(gxact);
+ ProcessRecords(fd, xid, recover_callbacks);
+ close(fd);
+ }
+ }
+ }
+ if (errno)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read directory \"%s\": %m", dir)));
+
+ FreeDir(cldir);
+ }
+
+ /*
+ * Since we don't fsync when we write a state file, we need to fsync them
+ * on checkpoint.
+ */
+ void
+ CheckPointTwoPhase(void)
+ {
+ int n;
+ int i;
+
+ n = StartPreparedTransactionListScan();
+
+ {
+ TransactionId xids[n];
+
+ for (i = 0; i < n; i++)
+ xids[i] = NextPreparedTransactionId();
+
+ EndPreparedTransactionListScan();
+
+ for (i = 0; i < n; i++)
+ {
+ char path[MAXPGPATH];
+ int fd;
+ int ret;
+
+ TwoPhaseFilePath(path, xids[i]);
+
+ fd = BasicOpenFile(path, PG_BINARY, S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ {
+ /*
+ * This can happen, if the transaction is committed/rolled back
+ * after the call to EndPreparedTransactionListScan above
+ * and before we get here.
+ */
+ }
+ else
+ {
+ ret = pg_fsync(fd);
+ if (ret != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync twophase state file \"%s\": %m",
+ path)));
+ close(fd);
+ }
+ }
+ }
+ }
+
+
+ /*
+ * RecordTransactionCommitPrepared
+ */
+ static void
+ RecordTransactionCommitPrepared(TransactionId xid, int nchildren, TransactionId *children)
+ {
+ XLogRecData rdata[2];
+ XLogRecPtr recptr;
+
+ elog(DEBUG1, "Committing prepared transaction %d\n", xid);
+
+ START_CRIT_SECTION();
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *)&nchildren;
+ rdata[0].len = sizeof(nchildren);
+ rdata[0].next = &rdata[1];
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *)children;
+ rdata[1].len = sizeof(TransactionId) * nchildren;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsertOnBehalf(RM_XACT_ID,
+ XLOG_XACT_COMMIT_PREPARED | XLOG_NO_TRAN,
+ rdata, xid);
+ XLogFlush(recptr);
+
+ TransactionIdCommit(xid);
+ TransactionIdCommitTree(nchildren, children);
+
+ END_CRIT_SECTION();
+ }
+
+ /*
+ * RecordTransactionAbortPrepared
+ */
+ static void
+ RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children)
+ {
+ XLogRecData rdata[2];
+ XLogRecPtr recptr;
+
+ START_CRIT_SECTION();
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *)&nchildren;
+ rdata[0].len = sizeof(nchildren);
+ rdata[0].next = &rdata[1];
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = (char *)children;
+ rdata[1].len = sizeof(TransactionId) * nchildren;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsertOnBehalf(RM_XACT_ID,
+ XLOG_XACT_ABORT_PREPARED | XLOG_NO_TRAN,
+ rdata, xid);
+ XLogFlush(recptr);
+
+ /*
+ * Mark the transaction aborted in clog. This is not absolutely
+ * necessary but we may as well do it while we are here.
+ */
+ TransactionIdAbort(xid);
+ TransactionIdAbortTree(nchildren, children);
+
+ END_CRIT_SECTION();
+ }
+
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/access/transam/twophase_rmgr.c 07twophase/src/backend/access/transam/twophase_rmgr.c
*** 00orig/src/backend/access/transam/twophase_rmgr.c 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/src/backend/access/transam/twophase_rmgr.c 2005-05-25 17:35:12.000000000 -0400
***************
*** 0 ****
--- 1,47 ----
+ /*
+ * twophase_rmgr.c
+ *
+ * Twophase resource managers definition
+ *
+ * $PostgreSQL$
+ */
+ #include "postgres.h"
+
+ #include "access/twophase.h"
+ #include "access/twophase_rmgr.h"
+
+ #include "storage/lockrmgr.h"
+ #include "storage/smgr.h"
+ #include "utils/inval.h"
+ #include "utils/flatfiles.h"
+ #include "commands/async.h"
+
+ const TwoPhaseCallback recover_callbacks[TWOPHASE_RM_MAX_ID + 1] =
+ {
+ NULL, /* Reserved for twophase.c internal logic */
+ lock_recover_record, /* Lock */
+ NULL, /* Inval */
+ NULL, /* smgr */
+ NULL, /* user/group file update */
+ NULL /* notify/listen */
+ };
+
+ const TwoPhaseCallback postcommit_callbacks[TWOPHASE_RM_MAX_ID + 1] =
+ {
+ NULL, /* Reserved for twophase.c internal logic */
+ NULL, /* Lock */
+ inval_postcommit_record, /* Inval */
+ smgr_postcommit_record, /* smgr */
+ flatfile_postcommit_record, /* user/group file update */
+ notify_postcommit_record /* notify/listen */
+ };
+
+ const TwoPhaseCallback postabort_callbacks[TWOPHASE_RM_MAX_ID + 1] =
+ {
+ NULL, /* Reserved for twophase.c internal logic */
+ NULL, /* Lock */
+ NULL, /* Inval */
+ smgr_postabort_record, /* smgr */
+ NULL, /* user/group file update */
+ NULL
+ };
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/access/transam/xact.c 07twophase/src/backend/access/transam/xact.c
*** 00orig/src/backend/access/transam/xact.c 2005-06-07 16:44:05.000000000 -0400
--- 07twophase/src/backend/access/transam/xact.c 2005-06-06 21:02:41.000000000 -0400
***************
*** 23,28 ****
--- 23,30 ----
#include "access/multixact.h"
#include "access/subtrans.h"
#include "access/xact.h"
+ #include "access/twophase.h"
+ #include "access/twophase_rmgr.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
***************
*** 68,74 ****
TRANS_START,
TRANS_INPROGRESS,
TRANS_COMMIT,
! TRANS_ABORT
} TransState;
/*
--- 70,77 ----
TRANS_START,
TRANS_INPROGRESS,
TRANS_COMMIT,
! TRANS_ABORT,
! TRANS_PREPARE
} TransState;
/*
***************
*** 99,105 ****
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */
! TBLOCK_SUBABORT_RESTART /* failed subxact, ROLLBACK TO received */
} TBlockState;
/*
--- 102,111 ----
TBLOCK_SUBABORT_END, /* failed subxact, ROLLBACK received */
TBLOCK_SUBABORT_PENDING, /* live subxact, ROLLBACK received */
TBLOCK_SUBRESTART, /* live subxact, ROLLBACK TO received */
! TBLOCK_SUBABORT_RESTART, /* failed subxact, ROLLBACK TO received */
!
! /* two-phase commit states */
! TBLOCK_PREPARE
} TBlockState;
/*
***************
*** 197,202 ****
--- 203,210 ----
static SubXactCallbackItem *SubXact_callbacks = NULL;
+ /* stash for the gid between calls to PrepareTransaction and PrepareTransactionBlock */
+ static char gidStash[GIDSIZE+1];
/* local function prototypes */
static void AssignSubTransactionId(TransactionState s);
***************
*** 265,270 ****
--- 273,280 ----
return true;
case TRANS_COMMIT:
return true;
+ case TRANS_PREPARE:
+ return true;
case TRANS_ABORT:
return true;
}
***************
*** 1609,1614 ****
--- 1619,1845 ----
}
/*
+ * AbortPreparedTransaction
+ */
+ void
+ AbortPreparedTransaction(char *gid)
+ {
+ FinishPreparedTransaction(gid, false);
+ }
+
+ /*
+ * CommitPreparedTransaction
+ */
+ void
+ CommitPreparedTransaction(char *gid)
+ {
+ FinishPreparedTransaction(gid, true);
+ }
+
+ /*
+ * PrepareTransaction
+ */
+ static void
+ PrepareTransaction(void)
+ {
+ TransactionState s = CurrentTransactionState;
+ TransactionId xid = GetCurrentTransactionId();
+ GlobalTransaction gxact;
+
+ Assert(MyProc != NULL);
+
+ ShowTransactionState("PrepareTransaction");
+
+ /*
+ * check the current transaction state
+ */
+ Assert(s->state == TRANS_INPROGRESS);
+ Assert(s->parent == NULL);
+
+ /*
+ * Do pre-commit processing (most of this stuff requires database
+ * access, and in fact could still cause an error...)
+ *
+ * It is possible for CommitHoldablePortals to invoke functions that
+ * queue deferred triggers, and it's also possible that triggers create
+ * holdable cursors. So we have to loop until there's nothing left to
+ * do.
+ */
+ for (;;)
+ {
+ /*
+ * Fire all currently pending deferred triggers.
+ */
+ AfterTriggerFireDeferred();
+
+ /*
+ * Convert any open holdable cursors into static portals. If there
+ * weren't any, we are done ... otherwise loop back to check if they
+ * queued deferred triggers. Lather, rinse, repeat.
+ */
+ if (!CommitHoldablePortals())
+ break;
+ }
+
+ /* Now we can shut down the deferred-trigger manager */
+ AfterTriggerEndXact(true);
+
+ /* Close any open regular cursors */
+ AtPrepare_Portals();
+
+ /*
+ * Let ON COMMIT management do its thing (must happen after closing
+ * cursors, to avoid dangling-reference problems)
+ */
+ PreCommit_on_commit_actions();
+
+ /* Prevent cancel/die interrupt while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /*
+ * set the current transaction state information appropriately during
+ * the processing
+ */
+ s->state = TRANS_PREPARE;
+
+ /* Reserve the gid for this transaction. This could still fail if the
+ * gid is already in use. */
+
+ /* TODO: If we fail later, the gid stays reserved until restart */
+ gxact = MarkAsPreparing(xid, gidStash, GetUserId());
+ StartPrepare(gidStash);
+ gidStash[0] = '\0';
+
+ /*
+ * Do pre-commit processing (most of this stuff requires database
+ * access, and in fact could still cause an error...)
+ */
+
+ /* handle prepare for large objects */
+ AtPrepare_LargeObject();
+
+ /* NOTIFY commit must come before lower-level cleanup
+ */
+ AtPrepare_Notify();
+
+ /* Update the flat files if we changed pg_database, pg_shadow or pg_group
+ */
+ AtPrepare_UpdateFlatFiles();
+
+ /*
+ * Let smgr persist its list of pending deletes.
+ */
+ AtPrepare_smgr();
+
+ AtPrepare_Inval();
+ AtPrepare_GUC();
+
+ LockPersistAll();
+
+ /*
+ * We have to record transaction prepares even if we didn't
+ * make any updates, because the transaction manager might
+ * get confused if we lose a global transaction.
+ */
+
+ /* Tell bufmgr and smgr to prepare for commit */
+ BufmgrCommit();
+
+ /*
+ * Here is where we really truly prepare.
+ */
+ EndPrepare();
+
+ /* Break the chain of back-links in the XLOG records I output */
+ MyLastRecPtr.xrecoff = 0;
+ MyXactMadeXLogEntry = false;
+ MyXactMadeTempRelUpdate = false;
+
+ /* Show myself as out of the transaction in PGPROC array */
+ MyProc->logRec.xrecoff = 0;
+
+ /*
+ * Let others know about no transaction in progress by me.
+ *
+ * LWLockAcquire(SInvalLock) is required: UPDATE with xid 0 is blocked by
+ * xid 1' UPDATE, xid 1 is doing commit while xid 2 gets snapshot - if
+ * xid 2' GetSnapshotData sees xid 1 as running then it must see xid 0
+ * as running as well or it will see two tuple versions - one deleted
+ * by xid 1 and one inserted by xid 0. See notes in GetSnapshotData.
+ */
+ /* Lock SInvalLock because that's what GetSnapshotData uses. */
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
+ MyProc->xid = InvalidTransactionId;
+ MyProc->xmin = InvalidTransactionId;
+ LWLockRelease(SInvalLock);
+
+ /* Mark that the prepared is completely done. Only after this
+ * can others commit/rollback the transaction. */
+ MarkAsPrepared(gxact);
+
+ /*
+ * This is all post-transaction cleanup. Note that if an error is raised
+ * here, it's too late to abort the transaction. This should be just
+ * noncritical resource releasing.
+ *
+ * The ordering of operations is not entirely random. The idea is:
+ * release resources visible to other backends (eg, files, buffer
+ * pins); then release locks; then release backend-local resources. We
+ * want to release locks at the point where any backend waiting for us
+ * will see our transaction as being fully cleaned up.
+ *
+ * Resources that can be associated with individual queries are handled
+ * by the ResourceOwner mechanism. The other calls here are for
+ * backend-wide state.
+ */
+
+ CallXactCallbacks(XACT_EVENT_PREPARE);
+
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_BEFORE_LOCKS,
+ true, true);
+
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_LOCKS,
+ true, true);
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_AFTER_LOCKS,
+ true, true);
+
+
+ AtEOXact_SPI(true);
+ AtEOXact_on_commit_actions(true);
+ AtEOXact_Namespace(true);
+ /* smgrcommit already done */
+ AtEOXact_Files();
+ /* TODO: */
+ /* pgstat_count_xact_commit(); */
+
+ CurrentResourceOwner = NULL;
+
+ ResourceOwnerDelete(TopTransactionResourceOwner);
+ s->curTransactionOwner = NULL;
+ CurTransactionResourceOwner = NULL;
+ TopTransactionResourceOwner = NULL;
+
+ AtCommit_Memory();
+
+ s->transactionId = InvalidTransactionId;
+ s->subTransactionId = InvalidSubTransactionId;
+ s->nestingLevel = 0;
+ s->childXids = NIL;
+
+ /*
+ * done with 1st phase commit processing, set current transaction
+ * state back to default
+ */
+ s->state = TRANS_DEFAULT;
+
+ RESUME_INTERRUPTS();
+ }
+
+
+ /*
* AbortTransaction
*/
static void
***************
*** 1640,1646 ****
/*
* check the current transaction state
*/
! if (s->state != TRANS_INPROGRESS)
elog(WARNING, "AbortTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
--- 1871,1877 ----
/*
* check the current transaction state
*/
! if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE)
elog(WARNING, "AbortTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
***************
*** 1833,1838 ****
--- 2064,2070 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(ERROR, "StartTransactionCommand: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 1965,1970 ****
--- 2197,2209 ----
s->blockState = TBLOCK_DEFAULT;
}
else
+ if (s->blockState == TBLOCK_PREPARE)
+ {
+ Assert(s->parent == NULL);
+ PrepareTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ }
+ else
{
Assert(s->blockState == TBLOCK_INPROGRESS ||
s->blockState == TBLOCK_SUBINPROGRESS);
***************
*** 2047,2052 ****
--- 2286,2301 ----
s->blockState = TBLOCK_SUBINPROGRESS;
}
break;
+
+ /*
+ * This is the case when we just got the "PREPARE TRANSACTION"
+ * statement, so we prepare the transaction and go back to the
+ * default state.
+ */
+ case TBLOCK_PREPARE:
+ PrepareTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ break;
}
}
***************
*** 2187,2192 ****
--- 2436,2448 ----
CleanupSubTransaction();
AbortCurrentTransaction();
break;
+
+ case TBLOCK_PREPARE:
+ s->blockState = TBLOCK_DEFAULT;
+ AbortTransaction();
+ CleanupTransaction();
+ break;
+
}
}
***************
*** 2487,2492 ****
--- 2743,2749 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "BeginTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2494,2499 ****
--- 2751,2781 ----
}
/*
+ * PrepareTransactionBlock
+ */
+ bool
+ PrepareTransactionBlock(char *gid)
+ {
+ TransactionState s;
+ bool result;
+
+ result = EndTransactionBlock();
+
+ if (result)
+ {
+ s = CurrentTransactionState;
+
+ while (s->parent != NULL)
+ s = s->parent;
+
+ s->blockState = TBLOCK_PREPARE;
+ strncpy(gidStash, gid, GIDSIZE);
+ }
+ return result;
+ }
+
+
+ /*
* EndTransactionBlock
* This executes a COMMIT command.
*
***************
*** 2603,2608 ****
--- 2885,2891 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "EndTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2694,2699 ****
--- 2977,2983 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2740,2745 ****
--- 3024,3030 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "DefineSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2795,2800 ****
--- 3080,3086 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "ReleaseSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2892,2897 ****
--- 3178,3184 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "RollbackToSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 2999,3004 ****
--- 3286,3292 ----
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "BeginInternalSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
break;
***************
*** 3055,3060 ****
--- 3343,3349 ----
case TBLOCK_BEGIN:
case TBLOCK_SUBBEGIN:
case TBLOCK_INPROGRESS:
+ case TBLOCK_PREPARE:
case TBLOCK_END:
case TBLOCK_SUBEND:
case TBLOCK_ABORT:
***************
*** 3111,3116 ****
--- 3400,3406 ----
case TBLOCK_INPROGRESS:
case TBLOCK_END:
case TBLOCK_ABORT_PENDING:
+ case TBLOCK_PREPARE:
/* In a transaction, so clean up */
AbortTransaction();
CleanupTransaction();
***************
*** 3202,3207 ****
--- 3492,3498 ----
case TBLOCK_SUBINPROGRESS:
case TBLOCK_END:
case TBLOCK_SUBEND:
+ case TBLOCK_PREPARE:
return 'T'; /* in transaction */
case TBLOCK_ABORT:
case TBLOCK_SUBABORT:
***************
*** 3700,3705 ****
--- 3991,3998 ----
return "SUB RESTART";
case TBLOCK_SUBABORT_RESTART:
return "SUB AB RESTRT";
+ case TBLOCK_PREPARE:
+ return "PREPARE";
}
return "UNRECOGNIZED";
}
***************
*** 3723,3728 ****
--- 4016,4023 ----
return "ABORT";
case TRANS_INPROGRESS:
return "INPROGR";
+ case TRANS_PREPARE:
+ return "PREPARE";
}
return "UNRECOGNIZED";
}
***************
*** 3771,3804 ****
xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
! TransactionIdCommit(record->xl_xid);
/* Mark committed subtransactions as committed */
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
TransactionIdCommitTree(xlrec->nsubxacts, sub_xids);
- /* Make sure nextXid is beyond any XID mentioned in the record */
- max_xid = record->xl_xid;
- for (i = 0; i < xlrec->nsubxacts; i++)
- {
- if (TransactionIdPrecedes(max_xid, sub_xids[i]))
- max_xid = sub_xids[i];
- }
- if (TransactionIdFollowsOrEquals(max_xid,
- ShmemVariableCache->nextXid))
- {
- ShmemVariableCache->nextXid = max_xid;
- TransactionIdAdvance(ShmemVariableCache->nextXid);
- }
-
/* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
--- 4066,4088 ----
xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ TransactionId xid = record->xl_xid;
+ TransactionId max_xid;
+ TransactionId *sub_xids = NULL;
+ int nsubxacts = 0;
+ int i;
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
! TransactionIdCommit(xid);
/* Mark committed subtransactions as committed */
+ nsubxacts = xlrec->nsubxacts;
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
TransactionIdCommitTree(xlrec->nsubxacts, sub_xids);
/* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
***************
*** 3810,3838 ****
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
TransactionIdAbort(record->xl_xid);
/* Mark subtransactions as aborted */
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
TransactionIdAbortTree(xlrec->nsubxacts, sub_xids);
- /* Make sure nextXid is beyond any XID mentioned in the record */
- max_xid = record->xl_xid;
- for (i = 0; i < xlrec->nsubxacts; i++)
- {
- if (TransactionIdPrecedes(max_xid, sub_xids[i]))
- max_xid = sub_xids[i];
- }
- if (TransactionIdFollowsOrEquals(max_xid,
- ShmemVariableCache->nextXid))
- {
- ShmemVariableCache->nextXid = max_xid;
- TransactionIdAdvance(ShmemVariableCache->nextXid);
- }
-
/* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
--- 4094,4107 ----
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
TransactionId *sub_xids;
TransactionIdAbort(record->xl_xid);
/* Mark subtransactions as aborted */
+ nsubxacts = xlrec->nsubxacts;
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
TransactionIdAbortTree(xlrec->nsubxacts, sub_xids);
/* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
***************
*** 3840,3847 ****
--- 4109,4150 ----
smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
}
}
+ else if (info == XLOG_XACT_PREPARE)
+ {
+ RecreateTwoPhaseFile(xid, XLogRecGetData(record), record->xl_len);
+ }
+ else if (info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ nsubxacts = *((int *)XLogRecGetData(record));
+ sub_xids = (TransactionId *)(XLogRecGetData(record) + sizeof(int));
+ TransactionIdCommit(xid);
+ TransactionIdCommitTree(nsubxacts, sub_xids);
+ RemoveTwoPhaseFile(xid);
+ }
+ else if (info == XLOG_XACT_ABORT_PREPARED)
+ {
+ nsubxacts = *((int *)XLogRecGetData(record));
+ sub_xids = (TransactionId *)(XLogRecGetData(record) + sizeof(int));
+ TransactionIdAbort(xid);
+ TransactionIdAbortTree(nsubxacts, sub_xids);
+ RemoveTwoPhaseFile(xid);
+ }
else
elog(PANIC, "xact_redo: unknown op code %u", info);
+
+ /* Make sure nextXid is beyond any XID mentioned in the record */
+ max_xid = xid;
+ for (i = 0; i < nsubxacts; i++)
+ {
+ if (TransactionIdPrecedes(max_xid, sub_xids[i]))
+ max_xid = sub_xids[i];
+ }
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = max_xid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
}
void
***************
*** 3908,3913 ****
--- 4211,4246 ----
sprintf(buf + strlen(buf), " %u", xacts[i]);
}
}
+ else if (info == XLOG_XACT_PREPARE)
+ {
+ sprintf(buf + strlen(buf), "prepare");
+ }
+ else if (info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ int nsubxacts = *((int *)rec);
+ TransactionId *sub_xids = (TransactionId *)(((char *)rec) + sizeof(int));
+
+ sprintf(buf + strlen(buf), "commit_prepared: ");
+ if (nsubxacts > 0)
+ {
+ sprintf(buf + strlen(buf), "; subxacts:");
+ for (i = 0; i < nsubxacts; i++)
+ sprintf(buf + strlen(buf), " %u", sub_xids[i]);
+ }
+ }
+ else if (info == XLOG_XACT_ABORT_PREPARED)
+ {
+ int nsubxacts = *((int *)rec);
+ TransactionId *sub_xids = (TransactionId *)(((char *)rec) + sizeof(int));
+
+ sprintf(buf + strlen(buf), "abort_prepared: ");
+ if (nsubxacts > 0)
+ {
+ sprintf(buf + strlen(buf), "; subxacts:");
+ for (i = 0; i < nsubxacts; i++)
+ sprintf(buf + strlen(buf), " %u", sub_xids[i]);
+ }
+ }
else
strcat(buf, "UNKNOWN");
}
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/access/transam/xlog.c 07twophase/src/backend/access/transam/xlog.c
*** 00orig/src/backend/access/transam/xlog.c 2005-06-07 16:44:06.000000000 -0400
--- 07twophase/src/backend/access/transam/xlog.c 2005-06-07 17:13:06.000000000 -0400
***************
*** 29,34 ****
--- 29,35 ----
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
+ #include "access/twophase.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "miscadmin.h"
***************
*** 489,494 ****
--- 490,502 ----
XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{
+ return XLogInsertOnBehalf(rmid, info, rdata, GetCurrentTransactionIdIfAny());
+ }
+
+
+ XLogRecPtr
+ XLogInsertOnBehalf(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId xid)
+ {
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecord *record;
XLogContRecord *contrecord;
***************
*** 3766,3774 ****
WriteControlFile();
! /* Bootstrap the commit log, too */
BootStrapCLOG();
BootStrapSUBTRANS();
BootStrapMultiXact();
}
--- 3774,3783 ----
WriteControlFile();
! /* Bootstrap the commit log, subtransaction and twophase subsystems, too */
BootStrapCLOG();
BootStrapSUBTRANS();
+ BootStrapTwoPhase();
BootStrapMultiXact();
}
***************
*** 4645,4650 ****
--- 4654,4662 ----
}
#endif
+ StartupSUBTRANS();
+ RecoverPreparedTransactions();
+
if (InRecovery)
{
int rmid;
***************
*** 4706,4712 ****
/* Start up the commit log and related stuff, too */
StartupCLOG();
- StartupSUBTRANS();
StartupMultiXact();
ereport(LOG,
--- 4718,4723 ----
***************
*** 5101,5106 ****
--- 5112,5118 ----
CheckPointCLOG();
CheckPointSUBTRANS();
+ CheckPointTwoPhase();
CheckPointMultiXact();
FlushBufferPool();
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/catalog/system_views.sql 07twophase/src/backend/catalog/system_views.sql
*** 00orig/src/backend/catalog/system_views.sql 2005-05-18 21:30:56.000000000 -0400
--- 07twophase/src/backend/catalog/system_views.sql 2005-06-02 11:47:15.000000000 -0400
***************
*** 266,271 ****
--- 266,275 ----
transaction xid, classid oid, objid oid, objsubid int2,
pid int4, mode text, granted boolean);
+ CREATE VIEW pg_prepared_xacts AS
+ SELECT *
+ FROM pg_prepared_xact() AS L(transaction xid, gid text, owner name);
+
CREATE VIEW pg_settings AS
SELECT *
FROM pg_show_all_settings() AS A
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/commands/async.c 07twophase/src/backend/commands/async.c
*** 00orig/src/backend/commands/async.c 2005-05-11 18:09:28.000000000 -0400
--- 07twophase/src/backend/commands/async.c 2005-06-02 12:22:21.000000000 -0400
***************
*** 78,83 ****
--- 78,84 ----
#include
#include "access/heapam.h"
+ #include "access/twophase_rmgr.h"
#include "catalog/pg_listener.h"
#include "commands/async.h"
#include "libpq/libpq.h"
***************
*** 407,412 ****
--- 408,440 ----
CommitTransactionCommand();
}
+
+ /*
+ *--------------------------------------------------------------
+ * AtPrepare_Notify
+ *
+ * This is called at the prepare phase of a two-phase
+ * transaction.
+ *
+ * Not implemented. Throw an error if there was any notify
+ * requests in this transaction.
+ *
+ *--------------------------------------------------------------
+ */
+ void
+ AtPrepare_Notify(void)
+ {
+ ListCell *p;
+
+ /* elog(ERROR, "notifications with two-phase commit not implemented"); */
+
+ foreach(p, pendingNotifies) {
+ char *relname = lfirst(p);
+ RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, relname, strlen(relname) + 1);
+ }
+ ClearPendingNotifies();
+ }
+
/*
*--------------------------------------------------------------
* AtCommit_Notify
***************
*** 1037,1039 ****
--- 1065,1074 ----
*/
pendingNotifies = NIL;
}
+
+ void
+ notify_postcommit_record(TransactionId xid, void *recdata, uint32 len)
+ {
+ /* Piggyback the current transaction running COMMIT PREPARED */
+ Async_Notify((char *) recdata);
+ }
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/libpq/be-fsstubs.c 07twophase/src/backend/libpq/be-fsstubs.c
*** 00orig/src/backend/libpq/be-fsstubs.c 2005-01-10 19:05:18.000000000 -0300
--- 07twophase/src/backend/libpq/be-fsstubs.c 2005-05-25 17:35:12.000000000 -0400
***************
*** 503,508 ****
--- 503,515 ----
PG_RETURN_INT32(1);
}
+ void
+ AtPrepare_LargeObject(void)
+ {
+ if (fscxt != NULL)
+ elog(ERROR, "two-phase commit of large objects is not implemented");
+ }
+
/*
* AtEOXact_LargeObject -
* prepares large objects for transaction commit
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/parser/gram.y 07twophase/src/backend/parser/gram.y
*** 00orig/src/backend/parser/gram.y 2005-05-11 18:09:31.000000000 -0400
--- 07twophase/src/backend/parser/gram.y 2005-05-25 17:35:12.000000000 -0400
***************
*** 387,393 ****
ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER
PARTIAL PASSWORD PLACING POSITION
! PRECISION PRESERVE PREPARE PRIMARY
PRIOR PRIVILEGES PROCEDURAL PROCEDURE
QUOTE
--- 387,393 ----
ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER
PARTIAL PASSWORD PLACING POSITION
! PRECISION PRESERVE PREPARE PREPARED PRIMARY
PRIOR PRIVILEGES PROCEDURAL PROCEDURE
QUOTE
***************
*** 4059,4064 ****
--- 4059,4085 ----
n->options = $3;
$$ = (Node *)n;
}
+ | PREPARE TRANSACTION Sconst
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+ n->kind = TRANS_STMT_PREPARE;
+ n->gid = $3;
+ $$ = (Node *)n;
+ }
+ | COMMIT PREPARED Sconst
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+ n->kind = TRANS_STMT_COMMIT_PREPARED;
+ n->gid = $3;
+ $$ = (Node *)n;
+ }
+ | ROLLBACK PREPARED Sconst
+ {
+ TransactionStmt *n = makeNode(TransactionStmt);
+ n->kind = TRANS_STMT_ROLLBACK_PREPARED;
+ n->gid = $3;
+ $$ = (Node *)n;
+ }
| COMMIT opt_transaction
{
TransactionStmt *n = makeNode(TransactionStmt);
***************
*** 7842,7847 ****
--- 7863,7869 ----
| PARTIAL
| PASSWORD
| PREPARE
+ | PREPARED
| PRESERVE
| PRIOR
| PRIVILEGES
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/parser/keywords.c 07twophase/src/backend/parser/keywords.c
*** 00orig/src/backend/parser/keywords.c 2005-05-11 18:09:31.000000000 -0400
--- 07twophase/src/backend/parser/keywords.c 2005-05-25 17:35:12.000000000 -0400
***************
*** 242,247 ****
--- 242,248 ----
{"position", POSITION},
{"precision", PRECISION},
{"prepare", PREPARE},
+ {"prepared", PREPARED},
{"preserve", PRESERVE},
{"primary", PRIMARY},
{"prior", PRIOR},
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/storage/ipc/ipci.c 07twophase/src/backend/storage/ipc/ipci.c
*** 00orig/src/backend/storage/ipc/ipci.c 2005-05-19 18:47:04.000000000 -0400
--- 07twophase/src/backend/storage/ipc/ipci.c 2005-05-25 17:35:12.000000000 -0400
***************
*** 17,22 ****
--- 17,23 ----
#include "access/clog.h"
#include "access/multixact.h"
#include "access/subtrans.h"
+ #include "access/twophase.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "postmaster/bgwriter.h"
***************
*** 77,82 ****
--- 78,84 ----
size += XLOGShmemSize();
size += CLOGShmemSize();
size += SUBTRANSShmemSize();
+ size += TwoPhaseShmemSize();
size += MultiXactShmemSize();
size += LWLockShmemSize();
size += ProcArrayShmemSize(maxBackends);
***************
*** 144,149 ****
--- 146,152 ----
XLOGShmemInit();
CLOGShmemInit();
SUBTRANSShmemInit();
+ TwoPhaseShmemInit();
MultiXactShmemInit();
InitBufferPool();
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/storage/ipc/procarray.c 07twophase/src/backend/storage/ipc/procarray.c
*** 00orig/src/backend/storage/ipc/procarray.c 2005-05-19 21:43:03.000000000 -0400
--- 07twophase/src/backend/storage/ipc/procarray.c 2005-06-02 16:53:14.000000000 -0400
***************
*** 25,30 ****
--- 25,31 ----
#include "postgres.h"
#include "access/subtrans.h"
+ #include "access/twophase.h"
#include "miscadmin.h"
#include "storage/proc.h"
#include "storage/procarray.h"
***************
*** 51,61 ****
--- 52,64 ----
/* counters for XidCache measurement */
static long xc_by_recent_xmin = 0;
+ static long xc_is_prepared = 0;
static long xc_by_main_xid = 0;
static long xc_by_child_xid = 0;
static long xc_slow_answer = 0;
#define xc_by_recent_xmin_inc() (xc_by_recent_xmin++)
+ #define xc_is_prepared() (xc_is_prepared++)
#define xc_by_main_xid_inc() (xc_by_main_xid++)
#define xc_by_child_xid_inc() (xc_by_child_xid++)
#define xc_slow_answer_inc() (xc_slow_answer++)
***************
*** 65,70 ****
--- 68,74 ----
#else /* !XIDCACHE_DEBUG */
#define xc_by_recent_xmin_inc() ((void) 0)
+ #define xc_is_prepared() ((void) 0)
#define xc_by_main_xid_inc() ((void) 0)
#define xc_by_child_xid_inc() ((void) 0)
#define xc_slow_answer_inc() ((void) 0)
***************
*** 216,221 ****
--- 220,231 ----
return false;
}
+ if (TransactionIdIsPrepared(xid))
+ {
+ xc_is_prepared();
+ return true;
+ }
+
/* Get workspace to remember main XIDs in */
xids = (TransactionId *) palloc(sizeof(TransactionId) * arrayP->maxProcs);
***************
*** 390,395 ****
--- 400,419 ----
}
}
+ if (allDbs)
+ {
+ TransactionId xid;
+ StartPreparedTransactionListScan();
+
+ while ((xid = NextPreparedTransactionId()) != InvalidTransactionId)
+ {
+ if (TransactionIdPrecedes(xid, result))
+ result = xid;
+ }
+
+ EndPreparedTransactionListScan();
+ }
+
LWLockRelease(ProcArrayLock);
return result;
***************
*** 456,462 ****
* First call for this snapshot
*/
snapshot->xip = (TransactionId *)
! malloc(MaxBackends * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
--- 480,486 ----
* First call for this snapshot
*/
snapshot->xip = (TransactionId *)
! malloc((MaxBackends + max_prepared_xacts) * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
***************
*** 533,541 ****
--- 557,582 ----
globalxmin = xid;
}
+ /* Take a snapshot of prepared transactions too */
+ if (StartPreparedTransactionListScan() > 0)
+ {
+ TransactionId xid;
+ while ((xid = NextPreparedTransactionId()) != InvalidTransactionId)
+ {
+ if (!TransactionIdIsNormal(xid) || TransactionIdFollowsOrEquals(xid, xmax))
+ continue;
+
+ if (TransactionIdPrecedes(xid, xmin))
+ xmin = xid;
+
+ snapshot->xip[count++] = xid;
+ }
+ }
+
if (serializable)
MyProc->xmin = TransactionXmin = xmin;
+ EndPreparedTransactionListScan();
LWLockRelease(ProcArrayLock);
/*
***************
*** 777,784 ****
DisplayXidCache(void)
{
fprintf(stderr,
! "XidCache: xmin: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n",
xc_by_recent_xmin,
xc_by_main_xid,
xc_by_child_xid,
xc_slow_answer);
--- 818,826 ----
DisplayXidCache(void)
{
fprintf(stderr,
! "XidCache: xmin: %ld, prepared: %ld, mainxid: %ld, childxid: %ld, slow: %ld\n",
xc_by_recent_xmin,
+ xc_is_prepared,
xc_by_main_xid,
xc_by_child_xid,
xc_slow_answer);
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/storage/lmgr/lock.c 07twophase/src/backend/storage/lmgr/lock.c
*** 00orig/src/backend/storage/lmgr/lock.c 2005-06-01 01:04:04.000000000 -0400
--- 07twophase/src/backend/storage/lmgr/lock.c 2005-06-07 17:44:43.085479584 -0400
***************
*** 34,43 ****
--- 34,47 ----
#include
#include "access/xact.h"
+ #include "access/twophase.h"
+ #include "access/twophase_rmgr.h"
#include "miscadmin.h"
#include "storage/proc.h"
+ #include "storage/lockrmgr.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+ #include "utils/relcache.h"
#include "utils/resowner.h"
***************
*** 78,83 ****
--- 82,94 ----
};
+ /* Record that's written to xlog when a lock is persisted */
+ typedef struct TwoPhaseLockRecord
+ {
+ LOCKTAG locktag;
+ LOCKMASK holdMask;
+ } TwoPhaseLockRecord;
+
#ifdef LOCK_DEBUG
/*------
***************
*** 157,162 ****
--- 168,175 ----
#define PROCLOCK_PRINT(where, proclockP)
#endif /* not LOCK_DEBUG */
+ static bool LockAcquireForProc(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
+ TransactionId xid, LOCKMODE lockmode, bool dontWait, PGPROC *proc);
static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
***************
*** 171,178 ****
/*
! * InitLocks -- Init the lock module. Create a private data
! * structure for constructing conflict masks.
*/
void
InitLocks(void)
--- 184,190 ----
/*
! * InitLocks -- Init the lock module. Nothing to do here at present.
*/
void
InitLocks(void)
***************
*** 423,428 ****
--- 435,447 ----
LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode, bool dontWait)
{
+ return LockAcquireForProc(lockmethodid, locktag, xid, lockmode, dontWait, MyProc);
+ }
+
+ bool
+ LockAcquireForProc(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
+ TransactionId xid, LOCKMODE lockmode, bool dontWait, PGPROC *proc)
+ {
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
LOCK *lock;
***************
*** 565,571 ****
*/
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
! proclocktag.proc = MAKE_OFFSET(MyProc);
TransactionIdStore(xid, &proclocktag.xid);
/*
--- 584,590 ----
*/
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
! proclocktag.proc = MAKE_OFFSET(proc);
TransactionIdStore(xid, &proclocktag.xid);
/*
***************
*** 607,613 ****
proclock->holdMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
! SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
PROCLOCK_PRINT("LockAcquire: new", proclock);
}
else
--- 626,632 ----
proclock->holdMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
! SHMQueueInsertBefore(&proc->procLocks, &proclock->procLink);
PROCLOCK_PRINT("LockAcquire: new", proclock);
}
else
***************
*** 661,667 ****
* If this process (under any XID) is a holder of the lock, just grant
* myself another one without blocking.
*/
! LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
if (myHolding[lockmode] > 0)
{
GrantLock(lock, proclock, lockmode);
--- 680,686 ----
* If this process (under any XID) is a holder of the lock, just grant
* myself another one without blocking.
*/
! LockCountMyLocks(proclock->tag.lock, proc, myHolding);
if (myHolding[lockmode] > 0)
{
GrantLock(lock, proclock, lockmode);
***************
*** 681,687 ****
else
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock,
! MyProc, myHolding);
if (status == STATUS_OK)
{
--- 700,706 ----
else
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock,
! proc, myHolding);
if (status == STATUS_OK)
{
***************
*** 733,739 ****
if (myHolding[i] > 0)
heldLocks |= LOCKBIT_ON(i);
}
! MyProc->heldLocks = heldLocks;
}
/*
--- 752,758 ----
if (myHolding[i] > 0)
heldLocks |= LOCKBIT_ON(i);
}
! proc->heldLocks = heldLocks;
}
/*
***************
*** 1630,1635 ****
--- 1649,1867 ----
}
}
+ /*
+ * LockPersistAll
+ * Take care of the locks held by a transaction that is "preparing" for
+ * two-phase commit.
+ */
+ bool
+ LockPersistAll(void)
+ {
+ HASH_SEQ_STATUS status;
+ SHM_QUEUE *procLocks = &(MyProc->procLocks);
+ LWLockId masterLock;
+ LockMethod lockMethodTable;
+ int numLockModes;
+ LOCALLOCK *locallock;
+ PROCLOCK *oldproclock;
+ PROCLOCK *newproclock;
+ LOCK *lock;
+ LOCKMETHODID lockmethodid = DEFAULT_LOCKMETHOD;
+ PROCLOCKTAG proclocktag;
+ PGPROC *dummyProc;
+ bool found;
+
+ #ifdef LOCK_DEBUG
+ if (Trace_locks)
+ elog(LOG, "LockPersistAll: lockmethod=%d", lockmethodid);
+ #endif
+
+ lockMethodTable = LockMethods[lockmethodid];
+ if (!lockMethodTable)
+ {
+ elog(WARNING, "bad lock method: %d", lockmethodid);
+ return FALSE;
+ }
+
+ numLockModes = lockMethodTable->numLockModes;
+ masterLock = lockMethodTable->masterLock;
+
+ /*
+ * First we scan the locallock table and get rid of entries pointing to
+ * those locks we will persist. Then we scan the transaction's proclock
+ * table, record each proclock in the twophase state file, and remove it.
+ *
+ * We do this in two passes because we may have multiple locallock
+ * entries pointing to the same proclock, and we daren't end up with
+ * any dangling pointers.
+ */
+ hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
+
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ LOCK *lock;
+
+ if (locallock->proclock == NULL || locallock->lock == NULL)
+ {
+ /*
+ * We must've run out of shared memory while trying to set up
+ * this lock. Just forget the local entry.
+ */
+ Assert(locallock->nLocks == 0);
+ RemoveLocalLock(locallock);
+ continue;
+ }
+
+ /* Ignore items that are not of the lockmethod to be removed */
+ if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
+ continue;
+
+ /* Ignore session locks */
+ if (!TransactionIdIsValid(locallock->tag.xid))
+ continue;
+
+ /*
+ * Make sure the transaction didn't operate on a temp table.
+ * We have better do this before grabbing the lmgr's master lock,
+ * because RelationIdGetRelation would try to grab it again.
+ */
+ lock = locallock->lock;
+
+ if (lock->tag.locktag_type == LOCKTAG_RELATION)
+ {
+ Relation rel = RelationIdGetRelation(lock->tag.locktag_field2);
+
+ if (rel->rd_istemp)
+ ereport(ERROR,
+ /* FIXME -- get the right errcode and errmsg */
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("persisting transactions operating on temporal tables is not supported")));
+
+ RelationClose(rel);
+ }
+
+ RemoveLocalLock(locallock);
+ }
+
+ LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+ oldproclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
+
+ dummyProc = TwoPhaseGetDummyProc();
+
+ while (oldproclock)
+ {
+ PROCLOCK *nextHolder;
+
+ /* Get link first, since we may unlink/delete this proclock */
+ nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &oldproclock->procLink,
+ offsetof(PROCLOCK, procLink));
+
+ Assert(oldproclock->tag.proc == MAKE_OFFSET(MyProc));
+
+ lock = (LOCK *) MAKE_PTR(oldproclock->tag.lock);
+
+ /* Ignore items that are not of the lockmethod to be removed */
+ if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
+ goto next_item;
+
+ /* Ignore session locks */
+ if (!TransactionIdIsValid(oldproclock->tag.xid))
+ goto next_item;
+
+ PROCLOCK_PRINT("LockPersistAll", oldproclock);
+ LOCK_PRINT("LockPersistAll", lock, 0);
+ Assert(lock->nRequested >= 0);
+ Assert(lock->nGranted >= 0);
+ Assert(lock->nGranted <= lock->nRequested);
+ Assert((oldproclock->holdMask & ~lock->grantMask) == 0);
+
+ /*
+ * To reassign the lock to the dummy PGPROC struct, we create a new
+ * proclock identical to the original, except that it points to the
+ * dummy PGPROC. We then insert that in the LOCK.lockHolders list,
+ * and remove the original proclock from the linked lists and the hash
+ * table.
+ */
+
+ /*
+ * Create the hash key for the proclock table.
+ */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
+ * needed */
+ proclocktag.lock = oldproclock->tag.lock;
+ proclocktag.proc = MAKE_OFFSET(dummyProc);
+ TransactionIdStore(oldproclock->tag.xid, &proclocktag.xid);
+
+ newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &proclocktag,
+ HASH_ENTER, &found);
+ if (!newproclock)
+ {
+ LWLockRelease(masterLock);
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Not enough memory for storing the prepared transaction's held locks.")));
+ return FALSE;
+ }
+
+ /*
+ * If new, initialize the new entry
+ */
+ if (!found)
+ {
+ newproclock->holdMask = oldproclock->holdMask;
+ /* Add proclock to appropriate lists */
+ SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
+ SHMQueueInsertBefore(&dummyProc->procLocks, &newproclock->procLink);
+ PROCLOCK_PRINT("LockPersistAll: new", newproclock);
+ }
+
+ /* Make a recovery record that is used to reacquire the
+ * lock after shutdown or crash.
+ */
+ {
+ TwoPhaseLockRecord record;
+
+ LOCK *lock = (LOCK *) MAKE_PTR(oldproclock->tag.lock);
+ memcpy(&(record.locktag), &(lock->tag), sizeof(LOCKTAG));
+ record.holdMask = oldproclock->holdMask;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, &record, sizeof(TwoPhaseLockRecord));
+ }
+
+ PROCLOCK_PRINT("LockPersistAll: deleting", oldproclock);
+
+ /* remove the old proclock entry from the hashtable and linked lists */
+ SHMQueueDelete(&oldproclock->lockLink);
+ SHMQueueDelete(&oldproclock->procLink);
+ oldproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &(oldproclock->tag),
+ HASH_REMOVE,
+ NULL);
+ if (!oldproclock)
+ {
+ LWLockRelease(masterLock);
+ elog(WARNING, "proclock table corrupted");
+ return FALSE;
+ }
+
+ next_item:
+ oldproclock = nextHolder;
+ }
+
+ LWLockRelease(masterLock);
+
+ #ifdef LOCK_DEBUG
+ if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+ elog(LOG, "LockPersistAll done");
+ #endif
+
+ return TRUE;
+ }
+
/*
* Estimate shared-memory space used for lock tables
***************
*** 1734,1750 ****
*
* Must have already acquired the masterLock.
*/
! void
! DumpLocks(void)
{
- PGPROC *proc;
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
int lockmethodid = DEFAULT_LOCKMETHOD;
LockMethod lockMethodTable;
- proc = MyProc;
if (proc == NULL)
return;
--- 1966,1980 ----
*
* Must have already acquired the masterLock.
*/
! static void
! DumpLocks(PGPROC *proc)
{
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
int lockmethodid = DEFAULT_LOCKMETHOD;
LockMethod lockMethodTable;
if (proc == NULL)
return;
***************
*** 1819,1821 ****
--- 2049,2160 ----
}
#endif /* LOCK_DEBUG */
+
+ /*
+ * LOCK resource manager's routines
+ */
+
+ /*
+ * Re-acquire all locks that belong to transactions that were prepared. Some of
+ * these transactions might have committed/aborted later, but we release locks
+ * belonging to those transactions later in LockReleaseAllForPrepared, when we
+ * know their final state.
+ *
+ * Because this function is run at db startup, re-acquiring the locks should
+ * never conflict with running transactions because there is none. All locks
+ * are acquired for the same dummy PGPROC structure, and therefore they should
+ * also never conflict with each other.
+ */
+ void
+ lock_recover_record(TransactionId xid, void *recdata, uint32 len)
+ {
+ TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
+ int lockmode;
+ bool success;
+ PGPROC *dummyProc = TwoPhaseGetDummyProc();
+
+ for (lockmode = 0; lockmode < MAX_LOCKMODES; lockmode++)
+ {
+ if (rec->holdMask & LOCKBIT_ON(lockmode))
+ {
+ success = LockAcquireForProc(rec->locktag.locktag_lockmethodid, &rec->locktag,
+ xid, lockmode, true, dummyProc);
+ if (success)
+ elog(DEBUG3, "Lock re-acquired");
+ else
+ ereport(WARNING,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("couldn't re-acquire lock")));
+ }
+ }
+ }
+
+
+ /*
+ * Release locks held by a prepared transaction.
+ */
+ void
+ LockReleaseAllForPrepared(TransactionId xid)
+ {
+ PGPROC *dummyProc = TwoPhaseGetDummyProc();
+ SHM_QUEUE *procLocks = &(dummyProc->procLocks);
+ PROCLOCK *proclock;
+ PROCLOCK *nextHolder;
+ LWLockId masterLock;
+ LockMethod lockMethodTable;
+ int i,
+ numLockModes;
+ LOCK *lock;
+
+ lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
+
+ numLockModes = lockMethodTable->numLockModes;
+ masterLock = lockMethodTable->masterLock;
+
+ LWLockAcquire(masterLock, LW_EXCLUSIVE);
+
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
+
+ while (proclock)
+ {
+ bool wakeupNeeded = false;
+
+ /* Get link first, since we may unlink/delete this proclock */
+ nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
+ offsetof(PROCLOCK, procLink));
+
+ lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
+
+ if (!TransactionIdEquals(proclock->tag.xid, xid))
+ goto next_item;
+
+ PROCLOCK_PRINT("LockReleaseAllForPrepared", proclock);
+ LOCK_PRINT("LockReleaseAllForPrepared", lock, 0);
+
+ for (i = 1; i <= numLockModes; i++)
+ {
+ if (!(proclock->holdMask & LOCKBIT_ON(i)))
+ continue;
+
+ wakeupNeeded |= UnGrantLock(lock, i, proclock, lockMethodTable);
+ }
+
+ LOCK_PRINT("LockReleaseAllForPrepared: updated", lock, 0);
+
+ PROCLOCK_PRINT("LockReleaseAllForPrepared: deleting", proclock);
+
+ CleanUpLock(DEFAULT_LOCKMETHOD, lock, proclock, wakeupNeeded);
+ next_item:
+ proclock = nextHolder;
+ }
+
+ LWLockRelease(masterLock);
+
+ #ifdef LOCK_DEBUG
+ if (Trace_locks)
+ elog(LOG, "LockReleaseAllForPrepared done");
+ #endif
+
+ return;
+ }
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/storage/lmgr/Makefile 07twophase/src/backend/storage/lmgr/Makefile
*** 00orig/src/backend/storage/lmgr/Makefile 2004-10-15 21:16:22.000000000 -0300
--- 07twophase/src/backend/storage/lmgr/Makefile 2005-06-02 22:47:59.000000000 -0400
***************
*** 16,21 ****
--- 16,23 ----
all: SUBSYS.o
+ CFLAGS = -O0 -Wall -Wmissing-prototypes -Wpointer-arith -Wendif-labels -fno-strict-aliasing -g -I../../../../src/include -I/home/alvherre/CVS/pgsql/source/07twophase/src/include -D_GNU_SOURCE
+
SUBSYS.o: $(OBJS)
$(LD) $(LDREL) $(LDOUT) SUBSYS.o $(OBJS)
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/storage/smgr/smgr.c 07twophase/src/backend/storage/smgr/smgr.c
*** 00orig/src/backend/storage/smgr/smgr.c 2005-06-07 16:44:23.000000000 -0400
--- 07twophase/src/backend/storage/smgr/smgr.c 2005-06-06 21:02:50.000000000 -0400
***************
*** 18,23 ****
--- 18,24 ----
#include "postgres.h"
#include "access/xact.h"
+ #include "access/twophase_rmgr.h"
#include "commands/tablespace.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
***************
*** 677,682 ****
--- 678,731 ----
reln->smgr_rnode.relNode)));
}
+ void
+ smgr_postcommit_record(TransactionId xid, void *recdata, uint32 len)
+ {
+ PendingRelDelete *pending = (PendingRelDelete *) recdata;
+
+ if (pending->atCommit)
+ smgr_internal_unlink(pending->relnode,
+ pending->which,
+ pending->isTemp,
+ false);
+ }
+
+ void
+ smgr_postabort_record(TransactionId xid, void *recdata, uint32 len)
+ {
+ PendingRelDelete *pending = (PendingRelDelete *) recdata;
+
+ if (!pending->atCommit)
+ smgr_internal_unlink(pending->relnode,
+ pending->which,
+ pending->isTemp,
+ false);
+ }
+
+ /*
+ * AtPrepare_smgr -- Does things related to two-phase commit prepare phase
+ */
+ void
+ AtPrepare_smgr(void)
+ {
+ if(pendingDeletes != NULL) {
+ PendingRelDelete *pending;
+ PendingRelDelete *next;
+
+ for (pending = pendingDeletes; pending != NULL; pending = next)
+ {
+ next = pending->next;
+
+ pendingDeletes = next;
+
+ RegisterTwoPhaseRecord(TWOPHASE_RM_SMGR_ID, pending, sizeof(PendingRelDelete));
+ /* must explicitly free the list entry */
+ pfree(pending);
+ }
+ }
+ }
+
+
/*
* smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
*
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/tcop/postgres.c 07twophase/src/backend/tcop/postgres.c
*** 00orig/src/backend/tcop/postgres.c 2005-06-07 16:44:24.000000000 -0400
--- 07twophase/src/backend/tcop/postgres.c 2005-06-06 21:02:51.000000000 -0400
***************
*** 928,933 ****
--- 928,934 ----
if (stmt->kind == TRANS_STMT_COMMIT ||
stmt->kind == TRANS_STMT_ROLLBACK ||
+ stmt->kind == TRANS_STMT_PREPARE ||
stmt->kind == TRANS_STMT_ROLLBACK_TO)
allowit = true;
}
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/tcop/utility.c 07twophase/src/backend/tcop/utility.c
*** 00orig/src/backend/tcop/utility.c 2005-04-28 19:39:57.000000000 -0400
--- 07twophase/src/backend/tcop/utility.c 2005-05-25 17:35:13.000000000 -0400
***************
*** 383,393 ****
if (strcmp(item->defname, "transaction_isolation") == 0)
SetPGVariable("transaction_isolation",
list_make1(item->arg),
! false);
else if (strcmp(item->defname, "transaction_read_only") == 0)
SetPGVariable("transaction_read_only",
list_make1(item->arg),
! false);
}
}
break;
--- 383,393 ----
if (strcmp(item->defname, "transaction_isolation") == 0)
SetPGVariable("transaction_isolation",
list_make1(item->arg),
! true);
else if (strcmp(item->defname, "transaction_read_only") == 0)
SetPGVariable("transaction_read_only",
list_make1(item->arg),
! true);
}
}
break;
***************
*** 401,406 ****
--- 401,425 ----
}
break;
+ case TRANS_STMT_COMMIT_PREPARED:
+ PreventTransactionChain(stmt, "COMMIT PREPARED");
+ CommitPreparedTransaction(stmt->gid);
+ break;
+
+ case TRANS_STMT_ROLLBACK_PREPARED:
+ PreventTransactionChain(stmt, "ROLLBACK PREPARED");
+ AbortPreparedTransaction(stmt->gid);
+ break;
+
+ case TRANS_STMT_PREPARE:
+ if (!PrepareTransactionBlock(stmt->gid))
+ {
+ /* report unsuccessful commit in completionTag */
+ if (completionTag)
+ strcpy(completionTag, "ROLLBACK");
+ }
+ break;
+
case TRANS_STMT_ROLLBACK:
UserAbortTransactionBlock();
break;
***************
*** 1198,1207 ****
--- 1217,1238 ----
tag = "START TRANSACTION";
break;
+ case TRANS_STMT_PREPARE:
+ tag = "PREPARE TRANSACTION";
+ break;
+
case TRANS_STMT_COMMIT:
tag = "COMMIT";
break;
+ case TRANS_STMT_COMMIT_PREPARED:
+ tag = "COMMIT PREPARED";
+ break;
+
+ case TRANS_STMT_ROLLBACK_PREPARED:
+ tag = "ROLLBACK PREPARED";
+ break;
+
case TRANS_STMT_ROLLBACK:
case TRANS_STMT_ROLLBACK_TO:
tag = "ROLLBACK";
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/utils/adt/Makefile 07twophase/src/backend/utils/adt/Makefile
*** 00orig/src/backend/utils/adt/Makefile 2004-04-14 16:45:56.000000000 -0400
--- 07twophase/src/backend/utils/adt/Makefile 2005-06-02 12:16:44.000000000 -0400
***************
*** 1,7 ****
#
# Makefile for utils/adt
#
! # $PostgreSQL: pgsql-server/src/backend/utils/adt/Makefile,v 1.56 2003/11/29 19:51:57 pgsql Exp $
#
subdir = src/backend/utils/adt
--- 1,7 ----
#
# Makefile for utils/adt
#
! # $PostgreSQL: pgsql/src/backend/utils/adt/Makefile,v 1.57 2004/04/01 21:28:45 tgl Exp $
#
subdir = src/backend/utils/adt
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/utils/cache/inval.c 07twophase/src/backend/utils/cache/inval.c
*** 00orig/src/backend/utils/cache/inval.c 2005-04-20 17:25:43.000000000 -0400
--- 07twophase/src/backend/utils/cache/inval.c 2005-05-31 20:51:15.000000000 -0400
***************
*** 97,102 ****
--- 97,103 ----
#include "utils/relcache.h"
#include "utils/syscache.h"
+ #include "access/twophase_rmgr.h"
/*
* To minimize palloc traffic, we keep pending requests in successively-
***************
*** 171,176 ****
--- 172,181 ----
static int cache_callback_count = 0;
+ static void PersistInvalidationMessage(SharedInvalidationMessage *msg);
+
+ static char *
+ describeInvalidationMessage(SharedInvalidationMessage *msg);
/* ----------------------------------------------------------------
* Invalidation list support functions
***************
*** 426,431 ****
--- 431,440 ----
LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
{
int i;
+ char *desc = describeInvalidationMessage(msg);
+
+ elog(DEBUG3, "local process: %s", desc);
+ pfree(desc);
if (msg->id >= 0)
{
***************
*** 629,634 ****
--- 638,644 ----
void
AtStart_Inval(void)
{
+ elog(DEBUG3, "Inval starting transaction");
Assert(transInvalInfo == NULL);
transInvalInfo = (TransInvalidationInfo *)
MemoryContextAllocZero(TopTransactionContext,
***************
*** 637,642 ****
--- 647,678 ----
}
/*
+ * AtPrepare_Inval
+ * Save the inval lists state at 2PC transaction prepare.
+ *
+ * We need to do two things here: first, save the lists so they can be
+ * recovered after a crash. Second, send the messages to ourselves, because
+ * we need to see the preparing transaction as if it were in progress in a
+ * hypotetical remote backend (i.e. we don't see its changes.)
+ */
+ void
+ AtPrepare_Inval(void)
+ {
+ elog(DEBUG3, "Inval preparing transaction");
+ AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
+ &transInvalInfo->CurrentCmdInvalidMsgs);
+
+ ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
+ PersistInvalidationMessage);
+
+ ProcessInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
+ LocalExecuteInvalidationMessage);
+
+ /* Need not free anything explicitly */
+ transInvalInfo = NULL;
+ }
+
+ /*
* AtSubStart_Inval
* Initialize inval lists at start of a subtransaction.
*/
***************
*** 644,649 ****
--- 680,686 ----
AtSubStart_Inval(void)
{
TransInvalidationInfo *myInfo;
+ elog(DEBUG3, "Inval starting subtransaction");
Assert(transInvalInfo != NULL);
myInfo = (TransInvalidationInfo *)
***************
*** 654,659 ****
--- 691,767 ----
transInvalInfo = myInfo;
}
+ static char *
+ describeInvalidationMessage(SharedInvalidationMessage *msg)
+ {
+ char *result = palloc0(1024);
+
+ strcat(result, "message ");
+ switch (msg->id)
+ {
+ case SHAREDINVALRELCACHE_ID:
+ {
+ SharedInvalRelcacheMsg *relmsg = (SharedInvalRelcacheMsg *) msg;
+ char tmp[1024];
+ sprintf(tmp, "RelCache: dbId/relId %u/%u", relmsg->dbId, relmsg->relId);
+ strcat(result, tmp);
+ break;
+ };
+ case SHAREDINVALSMGR_ID:
+ {
+ SharedInvalSmgrMsg *smgrmsg = (SharedInvalSmgrMsg *) msg;
+ char tmp[1024];
+ sprintf(tmp, "Smgr: RelFileNode %u/%u/%u", smgrmsg->rnode.spcNode, smgrmsg->rnode.dbNode, smgrmsg->rnode.relNode);
+ strcat(result, tmp);
+ break;
+ }
+ default:
+ {
+ SharedInvalCatcacheMsg *ccmsg = (SharedInvalCatcacheMsg *) msg;
+ char tmp[1024];
+ sprintf(tmp, "CatCache: dbId: %u cacheId: %d iptr(%u/%d) hash: %x", ccmsg->dbId, ccmsg->id,
+ ItemPointerGetBlockNumber(&(ccmsg->tuplePtr)),
+ ItemPointerGetOffsetNumber(&(ccmsg->tuplePtr)), ccmsg->hashValue);
+ strcat(result, tmp);
+ break;
+ }
+ }
+ return result;
+ }
+
+ /*
+ * PersistInvalidationMessage
+ * Write an invalidation message to the 2PC state file.
+ */
+ static void
+ PersistInvalidationMessage(SharedInvalidationMessage *msg)
+ {
+ char *desc;
+ RegisterTwoPhaseRecord(TWOPHASE_RM_INVAL_ID, msg, sizeof(SharedInvalidationMessage));
+ desc = describeInvalidationMessage(msg);
+ elog(DEBUG3, "persisting message: %s", desc);
+ pfree(desc);
+ }
+
+ /*
+ * inval_postcommit_record
+ * Recover an invalidation message from the 2PC state file.
+ */
+ void
+ inval_postcommit_record(TransactionId xid, void *recdata, uint32 len)
+ {
+ SharedInvalidationMessage *msg = (SharedInvalidationMessage *) recdata;
+
+ SendSharedInvalidMessage(msg);
+ /*
+ * We need to do file invalidation only once here, because we are in
+ * recovery mode so there's no race condition. We use the locking path
+ * of RelationCacheInitFileInvalidate just for paranoia's sake.
+ */
+ RelationCacheInitFileInvalidate(false);
+ }
+
+
/*
* AtEOXact_Inval
* Process queued-up invalidation messages at end of main transaction.
***************
*** 681,686 ****
--- 789,795 ----
void
AtEOXact_Inval(bool isCommit)
{
+ elog(DEBUG3, "Inval finishing transaction (%s)", isCommit ? "commit" : "abort");
if (isCommit)
{
/* Must be at top of stack */
***************
*** 740,745 ****
--- 849,856 ----
int my_level = GetCurrentTransactionNestLevel();
TransInvalidationInfo *myInfo = transInvalInfo;
+ elog(DEBUG3, "Inval finishing subtransaction (%s)", isCommit ? "commit" : "abort");
+
if (isCommit)
{
/* Must be at non-top of stack */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/utils/init/flatfiles.c 07twophase/src/backend/utils/init/flatfiles.c
*** 00orig/src/backend/utils/init/flatfiles.c 2005-06-07 16:44:28.000000000 -0400
--- 07twophase/src/backend/utils/init/flatfiles.c 2005-06-06 21:02:54.000000000 -0400
***************
*** 46,51 ****
--- 46,53 ----
#include "utils/flatfiles.h"
#include "utils/resowner.h"
#include "utils/syscache.h"
+ #include "access/twophase.h"
+ #include "access/twophase_rmgr.h"
#define DATABASE_FLAT_FILE "pg_database"
***************
*** 757,762 ****
--- 759,798 ----
SendPostmasterSignal(PMSIGNAL_PASSWORD_CHANGE);
}
+ #define BIT_USER 1
+ #define BIT_GROUP 2
+ #define BIT_DATABASE 4
+
+ /*
+ * This routine is called during transaction prepare.
+ *
+ * Record which files need to be refreshed if this transaction later
+ * commits.
+ */
+ void
+ AtPrepare_UpdateFlatFiles(void)
+ {
+ bits8 bits = 0;
+
+ /* We record information on which of the files need to be
+ * reloaded after commit */
+ if(database_file_update_subid != InvalidSubTransactionId) {
+ database_file_update_subid = InvalidSubTransactionId;
+ bits |= BIT_DATABASE;
+ }
+ if(group_file_update_subid != InvalidSubTransactionId) {
+ group_file_update_subid = InvalidSubTransactionId;
+ bits |= BIT_GROUP;
+ }
+ if(user_file_update_subid != InvalidSubTransactionId) {
+ user_file_update_subid = InvalidSubTransactionId;
+ bits |= BIT_USER;
+ }
+ if(bits != 0)
+ RegisterTwoPhaseRecord(TWOPHASE_RM_FLATFILES_ID, &bits, sizeof(bits));
+ }
+
+
/*
* AtEOSubXact_UpdateFlatFiles
*
***************
*** 831,833 ****
--- 867,889 ----
return PointerGetDatum(NULL);
}
+
+ void
+ flatfile_postcommit_record(TransactionId xid, void *recdata, uint32 len)
+ {
+ bits8 bits;
+
+ Assert(len == 1);
+
+ bits = *((bits8 *) recdata);
+
+ /* Pick up file updates to the current transaction running
+ * COMMIT PREPARED.
+ */
+ if (bits & BIT_DATABASE)
+ database_file_update_needed();
+ if (bits & BIT_GROUP)
+ group_file_update_needed();
+ if (bits & BIT_USER)
+ user_file_update_needed();
+ }
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/utils/misc/guc.c 07twophase/src/backend/utils/misc/guc.c
*** 00orig/src/backend/utils/misc/guc.c 2005-06-07 16:44:32.000000000 -0400
--- 07twophase/src/backend/utils/misc/guc.c 2005-06-06 21:02:55.000000000 -0400
***************
*** 25,30 ****
--- 25,31 ----
#include "utils/guc.h"
#include "utils/guc_tables.h"
+ #include "access/twophase.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "commands/async.h"
***************
*** 1094,1099 ****
--- 1095,1109 ----
},
{
+ {"max_prepared_transactions", PGC_POSTMASTER, RESOURCES,
+ gettext_noop("Sets the maximum number of simultaneously prepared transactions."),
+ NULL
+ },
+ &max_prepared_xacts,
+ 50, 0, 1000, NULL, NULL
+ },
+
+ {
{"max_files_per_process", PGC_POSTMASTER, RESOURCES_KERNEL,
gettext_noop("Sets the maximum number of simultaneously open files for each server process."),
NULL
***************
*** 1929,1934 ****
--- 1939,1946 ----
static bool guc_dirty; /* TRUE if need to do commit/abort work */
+ static bool guc_cannotprepare; /* TRUE if this transaction cannot be prepared for 2PC */
+
static bool reporting_enabled; /* TRUE to enable GUC_REPORT */
static char *guc_string_workspace; /* for avoiding memory leaks */
***************
*** 2456,2461 ****
--- 2468,2474 ----
}
guc_dirty = false;
+ guc_cannotprepare = false;
reporting_enabled = false;
***************
*** 2681,2686 ****
--- 2694,2700 ----
conf->gen.tentative_source = conf->gen.reset_source;
conf->gen.status |= GUC_HAVE_TENTATIVE;
guc_dirty = true;
+ guc_cannotprepare = true;
break;
}
case PGC_INT:
***************
*** 2697,2702 ****
--- 2711,2717 ----
conf->gen.tentative_source = conf->gen.reset_source;
conf->gen.status |= GUC_HAVE_TENTATIVE;
guc_dirty = true;
+ guc_cannotprepare = true;
break;
}
case PGC_REAL:
***************
*** 2713,2718 ****
--- 2728,2734 ----
conf->gen.tentative_source = conf->gen.reset_source;
conf->gen.status |= GUC_HAVE_TENTATIVE;
guc_dirty = true;
+ guc_cannotprepare = true;
break;
}
case PGC_STRING:
***************
*** 2753,2758 ****
--- 2769,2775 ----
conf->gen.tentative_source = conf->gen.reset_source;
conf->gen.status |= GUC_HAVE_TENTATIVE;
guc_dirty = true;
+ guc_cannotprepare = true;
break;
}
}
***************
*** 2845,2850 ****
--- 2862,2877 ----
}
/*
+ * Do GUC processing at transaction prepare.
+ */
+ void
+ AtPrepare_GUC(void)
+ {
+ if (guc_cannotprepare)
+ elog(ERROR, "two-phase commit of session variables is not implemented");
+ }
+
+ /*
* Do GUC processing at transaction or subtransaction commit or abort.
*/
void
***************
*** 3104,3111 ****
* know that all outer transaction levels will have stacked values to
* deal with.)
*/
! if (!isSubXact)
guc_dirty = false;
}
--- 3131,3140 ----
* know that all outer transaction levels will have stacked values to
* deal with.)
*/
! if (!isSubXact) {
guc_dirty = false;
+ guc_cannotprepare = false;
+ }
}
***************
*** 3512,3517 ****
--- 3541,3547 ----
conf->gen.tentative_source = source;
conf->gen.status |= GUC_HAVE_TENTATIVE;
guc_dirty = true;
+ guc_cannotprepare = true;
}
}
break;
***************
*** 3800,3805 ****
--- 3830,3836 ----
conf->gen.tentative_source = source;
conf->gen.status |= GUC_HAVE_TENTATIVE;
guc_dirty = true;
+ guc_cannotprepare = true;
}
}
else
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/utils/misc/postgresql.conf.sample 07twophase/src/backend/utils/misc/postgresql.conf.sample
*** 00orig/src/backend/utils/misc/postgresql.conf.sample 2005-06-07 16:44:32.000000000 -0400
--- 07twophase/src/backend/utils/misc/postgresql.conf.sample 2005-06-06 21:02:56.000000000 -0400
***************
*** 109,114 ****
--- 109,116 ----
#bgwriter_all_percent = 0.333 # 0-100% of all buffers scanned in each round
#bgwriter_all_maxpages = 5 # 0-1000 buffers max written per round
+ #max_prepared_transactions = 100 # 0-1000
+
#---------------------------------------------------------------------------
# WRITE AHEAD LOG
diff -c -Nr --exclude-from=diff-ignore 00orig/src/backend/utils/mmgr/portalmem.c 07twophase/src/backend/utils/mmgr/portalmem.c
*** 00orig/src/backend/utils/mmgr/portalmem.c 2005-06-01 01:04:11.000000000 -0400
--- 07twophase/src/backend/utils/mmgr/portalmem.c 2005-05-30 16:15:07.000000000 -0400
***************
*** 406,411 ****
--- 406,420 ----
}
}
+ /*
+ * This is called at prepare phase of a two-phase commit. Currently,
+ * we don't do anything different from a regular commit.
+ */
+ void
+ AtPrepare_Portals(void)
+ {
+ AtCommit_Portals();
+ }
/*
* Pre-commit processing for portals.
diff -c -Nr --exclude-from=diff-ignore 00orig/src/bin/initdb/initdb.c 07twophase/src/bin/initdb/initdb.c
*** 00orig/src/bin/initdb/initdb.c 2005-05-02 17:30:21.000000000 -0400
--- 07twophase/src/bin/initdb/initdb.c 2005-05-25 17:35:13.000000000 -0400
***************
*** 2124,2129 ****
--- 2124,2130 ----
"pg_xlog/archive_status",
"pg_clog",
"pg_subtrans",
+ "pg_twophase",
"pg_multixact/members",
"pg_multixact/offsets",
"base",
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/access/twophase.h 07twophase/src/include/access/twophase.h
*** 00orig/src/include/access/twophase.h 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/src/include/access/twophase.h 2005-06-02 16:37:32.000000000 -0400
***************
*** 0 ****
--- 1,56 ----
+ /*
+ * twophase.h
+ *
+ * Two-phase commit related declarations.
+ *
+ * $PostgreSQL$
+ */
+ #ifndef TWOPHASE_H
+ #define TWOPHASE_H
+
+ #include "access/xlog.h"
+ #include "access/twophase_rmgr.h"
+ #include "storage/lock.h"
+
+ #define GIDSIZE 200
+
+
+ /*
+ * GlobalTransactionData is defined in twophase.c; other places have no
+ * business knowing the internal definition.
+ */
+ typedef struct GlobalTransactionData *GlobalTransaction;
+
+ /* GUC variable */
+ extern int max_prepared_xacts;
+
+ extern PGPROC *TwoPhaseGetDummyProc(void);
+
+ extern int StartPreparedTransactionListScan(void);
+ extern TransactionId NextPreparedTransactionId(void);
+ extern void EndPreparedTransactionListScan(void);
+
+ extern int TwoPhaseShmemSize(void);
+ extern void TwoPhaseShmemInit(void);
+ extern void BootStrapTwoPhase(void);
+
+ extern GlobalTransaction MarkAsPreparing(TransactionId xid, char *gid,
+ AclId owner);
+ extern void MarkAsPrepared(GlobalTransaction);
+ extern TransactionId MarkAsNoLongerPrepared(char *gid, AclId user);
+
+ extern void StartPrepare(char *gid);
+ extern void EndPrepare(void);
+
+ extern void RecoverPreparedTransactions(void);
+
+ extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
+ extern void RemoveTwoPhaseFile(TransactionId xid);
+
+ extern void CheckPointTwoPhase(void);
+
+ extern bool TransactionIdIsPrepared(TransactionId xid);
+
+ extern void FinishPreparedTransaction(char *gid, bool isCommit);
+
+ #endif /* TWOPHASE_H */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/access/twophase_rmgr.h 07twophase/src/include/access/twophase_rmgr.h
*** 00orig/src/include/access/twophase_rmgr.h 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/src/include/access/twophase_rmgr.h 2005-06-02 12:24:03.000000000 -0400
***************
*** 0 ****
--- 1,32 ----
+ /*
+ * twophase_rmgr.h
+ *
+ * Twophase resource managers definition
+ *
+ * $PostgreSQL$
+ */
+ #ifndef TWOPHASE_RMGR_H
+ #define TWOPHASE_RMGR_H
+
+ typedef void (*TwoPhaseCallback) (TransactionId xid, void *recdata, uint32 len);
+ typedef uint8 TwoPhaseRmgrId;
+
+ /*
+ * Built-in resource managers
+ */
+ #define TWOPHASE_RM_END_ID 0
+ #define TWOPHASE_RM_LOCK_ID 1
+ #define TWOPHASE_RM_INVAL_ID 2
+ #define TWOPHASE_RM_SMGR_ID 3
+ #define TWOPHASE_RM_FLATFILES_ID 4
+ #define TWOPHASE_RM_NOTIFY_ID 5
+ #define TWOPHASE_RM_MAX_ID TWOPHASE_RM_NOTIFY_ID
+
+ extern const TwoPhaseCallback recover_callbacks[];
+ extern const TwoPhaseCallback postcommit_callbacks[];
+ extern const TwoPhaseCallback postabort_callbacks[];
+
+
+ extern void RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, void *data, uint32 len);
+
+ #endif /* TWOPHASE_RMGR_H */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/access/xact.h 07twophase/src/include/access/xact.h
*** 00orig/src/include/access/xact.h 2005-06-07 16:44:38.000000000 -0400
--- 07twophase/src/include/access/xact.h 2005-06-06 21:03:00.000000000 -0400
***************
*** 47,52 ****
--- 47,53 ----
typedef enum
{
XACT_EVENT_COMMIT,
+ XACT_EVENT_PREPARE,
XACT_EVENT_ABORT
} XactEvent;
***************
*** 72,79 ****
* XLOG allows to store some information in high 4 bits of log
* record xl_info field
*/
! #define XLOG_XACT_COMMIT 0x00
! #define XLOG_XACT_ABORT 0x20
typedef struct xl_xact_commit
{
--- 73,83 ----
* XLOG allows to store some information in high 4 bits of log
* record xl_info field
*/
! #define XLOG_XACT_COMMIT 0x00
! #define XLOG_XACT_PREPARE 0x10
! #define XLOG_XACT_ABORT 0x20
! #define XLOG_XACT_COMMIT_PREPARED 0x30
! #define XLOG_XACT_ABORT_PREPARED 0x40
typedef struct xl_xact_commit
{
***************
*** 87,92 ****
--- 91,101 ----
#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
+
+ /* high bits of xl_info field tells us if it's a prepare,
+ * abort or commit record
+ */
+
typedef struct xl_xact_abort
{
time_t xtime;
***************
*** 143,148 ****
--- 152,161 ----
extern void RecordTransactionCommit(void);
+ extern void CommitPreparedTransaction(char *gid);
+ extern void AbortPreparedTransaction(char *gid);
+ extern bool PrepareTransactionBlock(char *gid);
+
extern int xactGetCommittedChildren(TransactionId **ptr);
extern void xact_redo(XLogRecPtr lsn, XLogRecord *record);
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/access/xlog.h 07twophase/src/include/access/xlog.h
*** 00orig/src/include/access/xlog.h 2005-06-07 16:44:38.000000000 -0400
--- 07twophase/src/include/access/xlog.h 2005-06-06 21:03:01.000000000 -0400
***************
*** 150,155 ****
--- 150,156 ----
#endif
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
+ extern XLogRecPtr XLogInsertOnBehalf(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId xid);
extern void XLogFlush(XLogRecPtr RecPtr);
extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/catalog/pg_proc.h 07twophase/src/include/catalog/pg_proc.h
*** 00orig/src/include/catalog/pg_proc.h 2005-06-07 16:44:39.000000000 -0400
--- 07twophase/src/include/catalog/pg_proc.h 2005-06-07 16:43:40.000000000 -0400
***************
*** 3001,3006 ****
--- 3001,3008 ----
DESCR("SHOW ALL as a function");
DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 f f t t v 0 2249 "" _null_ _null_ _null_ pg_lock_status - _null_ ));
DESCR("view system lock information");
+ DATA(insert OID = 1038 ( pg_prepared_xact PGNSP PGUID 12 f f f t v 0 2249 "" _null_ _null_ _null_ pg_prepared_xact - _null_ ));
+ DESCR("view two-phase transactions");
DATA(insert OID = 2079 ( pg_table_is_visible PGNSP PGUID 12 f f t f s 1 16 "26" _null_ _null_ _null_ pg_table_is_visible - _null_ ));
DESCR("is table visible in search path?");
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/commands/async.h 07twophase/src/include/commands/async.h
*** 00orig/src/include/commands/async.h 2005-01-10 19:05:41.000000000 -0300
--- 07twophase/src/include/commands/async.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 26,31 ****
--- 26,32 ----
extern void AtSubStart_Notify(void);
extern void AtSubCommit_Notify(void);
extern void AtSubAbort_Notify(void);
+ extern void AtPrepare_Notify(void);
/* signal handler for inbound notifies (SIGUSR2) */
extern void NotifyInterruptHandler(SIGNAL_ARGS);
***************
*** 38,41 ****
--- 39,44 ----
extern void EnableNotifyInterrupt(void);
extern bool DisableNotifyInterrupt(void);
+ extern void notify_postcommit_record(TransactionId xid, void *recdata, uint32 len);
+
#endif /* ASYNC_H */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/libpq/be-fsstubs.h 07twophase/src/include/libpq/be-fsstubs.h
*** 00orig/src/include/libpq/be-fsstubs.h 2005-01-10 19:05:41.000000000 -0300
--- 07twophase/src/include/libpq/be-fsstubs.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 48,52 ****
--- 48,53 ----
extern void AtEOXact_LargeObject(bool isCommit);
extern void AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
SubTransactionId parentSubid);
+ extern void AtPrepare_LargeObject(void);
#endif /* BE_FSSTUBS_H */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/nodes/parsenodes.h 07twophase/src/include/nodes/parsenodes.h
*** 00orig/src/include/nodes/parsenodes.h 2005-06-07 16:44:40.000000000 -0400
--- 07twophase/src/include/nodes/parsenodes.h 2005-06-06 21:03:03.000000000 -0400
***************
*** 1556,1562 ****
TRANS_STMT_ROLLBACK,
TRANS_STMT_SAVEPOINT,
TRANS_STMT_RELEASE,
! TRANS_STMT_ROLLBACK_TO
} TransactionStmtKind;
typedef struct TransactionStmt
--- 1556,1565 ----
TRANS_STMT_ROLLBACK,
TRANS_STMT_SAVEPOINT,
TRANS_STMT_RELEASE,
! TRANS_STMT_ROLLBACK_TO,
! TRANS_STMT_PREPARE,
! TRANS_STMT_COMMIT_PREPARED,
! TRANS_STMT_ROLLBACK_PREPARED
} TransactionStmtKind;
typedef struct TransactionStmt
***************
*** 1564,1569 ****
--- 1567,1573 ----
NodeTag type;
TransactionStmtKind kind; /* see above */
List *options; /* for BEGIN/START and savepoint commands */
+ char *gid; /* for two-phase commit related commands*/
} TransactionStmt;
/* ----------------------
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/postgres.h 07twophase/src/include/postgres.h
*** 00orig/src/include/postgres.h 2005-04-20 17:25:53.000000000 -0400
--- 07twophase/src/include/postgres.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 277,282 ****
--- 277,296 ----
#define TransactionIdGetDatum(X) ((Datum) SET_4_BYTES((X)))
/*
+ * DatumGetAclId
+ * Returns user or group identifier value of a datum.
+ */
+
+ #define DatumGetAclId(X) ((AclId) GET_4_BYTES(X))
+
+ /*
+ * AclIdGetDatum
+ * Returns datum representation for a user or group identifier.
+ */
+
+ #define AclIdGetDatum(X) ((AclId) SET_4_BYTES((X)))
+
+ /*
* DatumGetCommandId
* Returns command identifier value of a datum.
*/
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/storage/lock.h 07twophase/src/include/storage/lock.h
*** 00orig/src/include/storage/lock.h 2005-06-01 01:04:21.000000000 -0400
--- 07twophase/src/include/storage/lock.h 2005-05-30 16:15:14.000000000 -0400
***************
*** 371,376 ****
--- 371,378 ----
extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode);
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
+ extern bool LockPersistAll(void);
+ extern void LockReleaseAllForPrepared(TransactionId xid);
extern void LockReleaseCurrentOwner(void);
extern void LockReassignCurrentOwner(void);
extern int LockCheckConflicts(LockMethod lockMethodTable,
***************
*** 392,398 ****
extern const char *GetLockmodeName(LOCKMODE mode);
#ifdef LOCK_DEBUG
! extern void DumpLocks(void);
extern void DumpAllLocks(void);
#endif
--- 394,400 ----
extern const char *GetLockmodeName(LOCKMODE mode);
#ifdef LOCK_DEBUG
! extern void DumpLocks(PGPROC *proc);
extern void DumpAllLocks(void);
#endif
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/storage/lockrmgr.h 07twophase/src/include/storage/lockrmgr.h
*** 00orig/src/include/storage/lockrmgr.h 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/src/include/storage/lockrmgr.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 0 ****
--- 1,18 ----
+ /*-------------------------------------------------------------------------
+ *
+ * lockrmgr.h
+ *
+ * Lock subsystems two-phase state file hooks.
+ *
+ * $Id$
+ *
+ *-------------------------------------------------------------------------
+ */
+ #ifndef LOCKRMGR_H_
+ #define LOCKRMGR_H_
+
+ #include "access/xlog.h"
+
+ extern void lock_recover_record(TransactionId xid, void *data, uint32 len);
+
+ #endif /* LOCKRMGR_H */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/storage/lwlock.h 07twophase/src/include/storage/lwlock.h
*** 00orig/src/include/storage/lwlock.h 2005-05-19 18:47:20.000000000 -0400
--- 07twophase/src/include/storage/lwlock.h 2005-05-26 18:27:15.000000000 -0400
***************
*** 46,51 ****
--- 46,52 ----
MultiXactMemberControlLock,
RelCacheInitLock,
BgWriterCommLock,
+ TwoPhaseStateLock,
NumFixedLWLocks, /* must be last except for
* MaxDynamicLWLock */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/storage/procarray.h 07twophase/src/include/storage/procarray.h
*** 00orig/src/include/storage/procarray.h 2005-05-19 17:35:47.000000000 -0400
--- 07twophase/src/include/storage/procarray.h 2005-05-25 17:38:44.000000000 -0400
***************
*** 20,25 ****
--- 20,26 ----
extern void ProcArrayRemoveMyself(void);
extern bool TransactionIdIsInProgress(TransactionId xid);
+ extern bool TransactionIdIsPrepared(TransactionId transactionId);
extern TransactionId GetOldestXmin(bool allDbs);
/* Use "struct PGPROC", not PGPROC, to avoid including proc.h here */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/storage/smgr.h 07twophase/src/include/storage/smgr.h
*** 00orig/src/include/storage/smgr.h 2005-06-07 16:44:43.000000000 -0400
--- 07twophase/src/include/storage/smgr.h 2005-06-06 21:03:05.000000000 -0400
***************
*** 83,91 ****
--- 83,95 ----
extern void smgrabort(void);
extern void smgrsync(void);
+ extern void AtPrepare_smgr(void);
+
extern void smgr_redo(XLogRecPtr lsn, XLogRecord *record);
extern void smgr_desc(char *buf, uint8 xl_info, char *rec);
+ extern void smgr_postcommit_record(TransactionId xid, void *recdata, uint32 len);
+ extern void smgr_postabort_record(TransactionId xid, void *recdata, uint32 len);
/* internals: move me elsewhere -- ay 7/94 */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/utils/builtins.h 07twophase/src/include/utils/builtins.h
*** 00orig/src/include/utils/builtins.h 2005-06-01 01:04:22.000000000 -0400
--- 07twophase/src/include/utils/builtins.h 2005-05-30 16:15:14.000000000 -0400
***************
*** 825,830 ****
--- 825,833 ----
/* lockfuncs.c */
extern Datum pg_lock_status(PG_FUNCTION_ARGS);
+ /* pg_prepared_xact.c */
+ extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
+
/* catalog/pg_conversion.c */
extern Datum pg_convert_using(PG_FUNCTION_ARGS);
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/utils/flatfiles.h 07twophase/src/include/utils/flatfiles.h
*** 00orig/src/include/utils/flatfiles.h 2005-05-11 18:09:54.000000000 -0400
--- 07twophase/src/include/utils/flatfiles.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 23,28 ****
--- 23,29 ----
extern void BuildFlatFiles(bool database_only);
+ extern void AtPrepare_UpdateFlatFiles(void);
extern void AtEOXact_UpdateFlatFiles(bool isCommit);
extern void AtEOSubXact_UpdateFlatFiles(bool isCommit,
SubTransactionId mySubid,
***************
*** 30,33 ****
--- 31,36 ----
extern Datum flatfile_update_trigger(PG_FUNCTION_ARGS);
+ extern void flatfile_postcommit_record(TransactionId xid, void *recdata, uint32 len);
+
#endif /* FLATFILES_H */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/utils/guc.h 07twophase/src/include/utils/guc.h
*** 00orig/src/include/utils/guc.h 2005-03-25 18:19:35.000000000 -0400
--- 07twophase/src/include/utils/guc.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 187,192 ****
--- 187,193 ----
extern bool SelectConfigFiles(const char *userDoption, const char *progname);
extern void ResetAllOptions(void);
extern void AtEOXact_GUC(bool isCommit, bool isSubXact);
+ extern void AtPrepare_GUC(void);
extern void BeginReportingGUCOptions(void);
extern void ParseLongOption(const char *string, char **name, char **value);
extern bool set_config_option(const char *name, const char *value,
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/utils/inval.h 07twophase/src/include/utils/inval.h
*** 00orig/src/include/utils/inval.h 2005-01-10 19:05:46.000000000 -0300
--- 07twophase/src/include/utils/inval.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 30,35 ****
--- 30,37 ----
extern void AtEOSubXact_Inval(bool isCommit);
+ extern void AtPrepare_Inval(void);
+
extern void CommandEndInvalidationMessages(void);
extern void CacheInvalidateHeapTuple(Relation relation, HeapTuple tuple);
***************
*** 47,50 ****
--- 49,53 ----
extern void CacheRegisterRelcacheCallback(CacheCallbackFunction func,
Datum arg);
+ extern void inval_postcommit_record(TransactionId xid, void *recdata, uint32 len);
#endif /* INVAL_H */
diff -c -Nr --exclude-from=diff-ignore 00orig/src/include/utils/portal.h 07twophase/src/include/utils/portal.h
*** 00orig/src/include/utils/portal.h 2005-04-20 17:25:59.000000000 -0400
--- 07twophase/src/include/utils/portal.h 2005-05-25 17:35:13.000000000 -0400
***************
*** 185,190 ****
--- 185,191 ----
extern bool CommitHoldablePortals(void);
extern void AtCommit_Portals(void);
extern void AtAbort_Portals(void);
+ extern void AtPrepare_Portals(void);
extern void AtCleanup_Portals(void);
extern void AtSubCommit_Portals(SubTransactionId mySubid,
SubTransactionId parentSubid,
diff -c -Nr --exclude-from=diff-ignore 00orig/src/test/regress/expected/prepared_xacts.out 07twophase/src/test/regress/expected/prepared_xacts.out
*** 00orig/src/test/regress/expected/prepared_xacts.out 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/src/test/regress/expected/prepared_xacts.out 2005-05-25 17:35:13.000000000 -0400
***************
*** 0 ****
--- 1,102 ----
+ --
+ -- PREPARED_XACTS
+ --
+ -- create a simple table that we'll use in the tests
+ CREATE TABLE p_xacts_test (
+ foobar VARCHAR(10)
+ );
+ INSERT INTO p_xacts_test VALUES ('aaa');
+ -- Test PREPARE TRANSACTION
+ BEGIN;
+ UPDATE p_xacts_test SET foobar = 'bbb' WHERE foobar = 'aaa';
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ bbb
+ (1 row)
+
+ PREPARE TRANSACTION 'foo1';
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ aaa
+ (1 row)
+
+ -- Test pg_prepared_xacts system view
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ ------
+ foo1
+ (1 row)
+
+ -- Test ROLLBACK PREPARED
+ ROLLBACK PREPARED 'foo1';
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ aaa
+ (1 row)
+
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ -----
+ (0 rows)
+
+ -- Test COMMIT PREPARED
+ BEGIN;
+ INSERT INTO p_xacts_test VALUES ('ddd');
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ aaa
+ ddd
+ (2 rows)
+
+ PREPARE TRANSACTION 'foo2';
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ aaa
+ (1 row)
+
+ COMMIT PREPARED 'foo2';
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ aaa
+ ddd
+ (2 rows)
+
+ -- Test duplicate gids
+ BEGIN;
+ UPDATE p_xacts_test SET foobar = 'eee' WHERE foobar = 'ddd';
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ aaa
+ eee
+ (2 rows)
+
+ PREPARE TRANSACTION 'foo3';
+ SELECT gid FROM pg_prepared_xacts;
+ gid
+ ------
+ foo3
+ (1 row)
+
+ BEGIN;
+ INSERT INTO p_xacts_test VALUES ('fff');
+ SELECT * FROM p_xacts_test;
+ foobar
+ --------
+ aaa
+ ddd
+ fff
+ (3 rows)
+
+ -- This should fail, because the gid foo3 is already in use
+ PREPARE TRANSACTION 'foo3';
+ ERROR: global transaction identifier "foo3" is already in use
+ -- Clean up
+ ROLLBACK PREPARED 'foo3';
+ DROP TABLE p_xacts_test;
diff -c -Nr --exclude-from=diff-ignore 00orig/src/test/regress/expected/rules.out 07twophase/src/test/regress/expected/rules.out
*** 00orig/src/test/regress/expected/rules.out 2005-05-18 21:31:05.000000000 -0400
--- 07twophase/src/test/regress/expected/rules.out 2005-05-25 17:35:13.000000000 -0400
***************
*** 1316,1322 ****
shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color))));
street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath);
toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp;
! (40 rows)
SELECT tablename, rulename, definition FROM pg_rules
ORDER BY tablename, rulename;
--- 1316,1322 ----
shoelace_obsolete | SELECT shoelace.sl_name, shoelace.sl_avail, shoelace.sl_color, shoelace.sl_len, shoelace.sl_unit, shoelace.sl_len_cm FROM shoelace WHERE (NOT (EXISTS (SELECT shoe.shoename FROM shoe WHERE (shoe.slcolor = shoelace.sl_color))));
street | SELECT r.name, r.thepath, c.cname FROM ONLY road r, real_city c WHERE (c.outline ## r.thepath);
toyemp | SELECT emp.name, emp.age, emp."location", (12 * emp.salary) AS annualsal FROM emp;
! (41 rows)
SELECT tablename, rulename, definition FROM pg_rules
ORDER BY tablename, rulename;
diff -c -Nr --exclude-from=diff-ignore 00orig/src/test/regress/parallel_schedule 07twophase/src/test/regress/parallel_schedule
*** 00orig/src/test/regress/parallel_schedule 2004-06-23 15:21:24.000000000 -0400
--- 07twophase/src/test/regress/parallel_schedule 2005-05-25 17:35:13.000000000 -0400
***************
*** 60,66 ****
# ----------
# The fourth group of parallel test
# ----------
! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace
test: privileges
test: misc
--- 60,66 ----
# ----------
# The fourth group of parallel test
# ----------
! test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update namespace prepared_xacts
test: privileges
test: misc
diff -c -Nr --exclude-from=diff-ignore 00orig/src/test/regress/sql/prepared_xacts.sql 07twophase/src/test/regress/sql/prepared_xacts.sql
*** 00orig/src/test/regress/sql/prepared_xacts.sql 1969-12-31 21:00:00.000000000 -0300
--- 07twophase/src/test/regress/sql/prepared_xacts.sql 2005-06-07 17:02:14.000000000 -0400
***************
*** 0 ****
--- 1,120 ----
+ --
+ -- PREPARED_XACTS
+ --
+
+ -- create a simple table that we'll use in the tests
+ CREATE TABLE p_xacts_test (
+ foobar VARCHAR(10)
+ );
+
+ INSERT INTO p_xacts_test VALUES ('aaa');
+
+
+ -- Test PREPARE TRANSACTION
+ BEGIN;
+ UPDATE p_xacts_test SET foobar = 'bbb' WHERE foobar = 'aaa';
+ SELECT * FROM p_xacts_test;
+ PREPARE TRANSACTION 'foo1';
+
+ SELECT * FROM p_xacts_test;
+
+ -- Test pg_prepared_xacts system view
+ SELECT gid FROM pg_prepared_xacts;
+
+ -- Test ROLLBACK PREPARED
+
+ ROLLBACK PREPARED 'foo1';
+
+ SELECT * FROM p_xacts_test;
+
+ SELECT gid FROM pg_prepared_xacts;
+
+
+ -- Test COMMIT PREPARED
+ BEGIN;
+ INSERT INTO p_xacts_test VALUES ('ddd');
+ SELECT * FROM p_xacts_test;
+ PREPARE TRANSACTION 'foo2';
+ SELECT * FROM p_xacts_test;
+
+ COMMIT PREPARED 'foo2';
+
+ SELECT * FROM p_xacts_test;
+
+ -- Test duplicate gids
+ BEGIN;
+ UPDATE p_xacts_test SET foobar = 'eee' WHERE foobar = 'ddd';
+ SELECT * FROM p_xacts_test;
+ PREPARE TRANSACTION 'foo3';
+
+ SELECT gid FROM pg_prepared_xacts;
+
+ BEGIN;
+ INSERT INTO p_xacts_test VALUES ('fff');
+ SELECT * FROM p_xacts_test;
+
+ -- This should fail, because the gid foo3 is already in use
+ PREPARE TRANSACTION 'foo3';
+
+ -- Clean up
+
+ ROLLBACK PREPARED 'foo3';
+
+ DROP TABLE p_xacts_test;
+
+ -- Test subtransactions
+ BEGIN;
+ CREATE TABLE foo (a int);
+ INSERT INTO foo VALUES (1);
+ SAVEPOINT a;
+ INSERT INTO foo VALUES (2);
+ ROLLBACK TO a;
+ INSERT INTO foo VALUES (3);
+ PREPARE TRANSACTION 'regress-one';
+
+ CREATE TABLE bar();
+
+ -- Test shared invalidation
+ BEGIN;
+ DROP TABLE bar;
+ CREATE TABLE baz (a int);
+ INSERT INTO baz VALUES (1);
+ INSERT INTO baz VALUES (2);
+ DECLARE foo CURSOR FOR SELECT * FROM baz;
+ -- Fetch 1 tuple, keeping the cursor open
+ FETCH 1 FROM foo;
+ PREPARE TRANSACTION 'regress-two';
+
+ -- No such cursor
+ FETCH 1 FROM foo;
+
+ -- Table doesn't exist, the creation hasn't been committed yet
+ SELECT * FROM foo;
+
+ -- There should be two prepared transactions
+ SELECT * FROM pg_prepared_xacts;
+
+ -- Disconnect, we will continue testing in a different backend
+ \c -
+
+ -- There should be two prepared transactions
+ SELECT * FROM pg_prepared_xacts;
+
+ -- Commit table creation
+ COMMIT PREPARED 'regress-one';
+ \d foo
+ SELECT * FROM foo;
+
+ -- There should be one prepared transaction
+ SELECT * FROM pg_prepared_xacts;
+
+ COMMIT PREPARED 'regress-two';
+ -- Error, table was dropped in prepared transaction "regress-two"
+ /*
+ * This test fails currently with an error relative to the relcache. Probably
+ * a problem with the sinval code. Must fix.
+ */
+ SELECT * FROM bar;
+
+ -- There should be no prepared transactions
+ SELECT * FROM pg_prepared_xacts;