Author: Noah Misch Commit: Noah Misch Fix twophase.c XID epoch tracking Half the time after epoch 0, allowable XIDs span two epochs. This would have no user-visible consequences during epoch 0, but I expect (unconfirmed) twophase breakage during other epochs. FIXME likely rework this in favor of broader fulltransaction use in twophase.c diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index a3190dc..7a16293 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -926,24 +926,6 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held) /* State file support */ /************************************************************************/ -/* - * Compute FullTransactionId for the given TransactionId, using the current - * epoch. - */ -static inline FullTransactionId -FullTransactionIdFromCurrentEpoch(TransactionId xid) -{ - FullTransactionId fxid; - FullTransactionId nextFullXid; - uint32 epoch; - - nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); - - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); - return fxid; -} - static inline int TwoPhaseFilePath(char *path, FullTransactionId fxid) { @@ -1283,7 +1265,7 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info, * contents of the file, issuing an error when finding corrupted data. If * missing_ok is true, which indicates that missing files can be safely * ignored, then return NULL. This state can be reached when doing recovery - * after discarding two-phase files from other epochs. + * after discarding two-phase files from frozen epochs. */ static char * ReadTwoPhaseFile(TransactionId xid, bool missing_ok) @@ -1299,7 +1281,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) int r; FullTransactionId fxid; - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); @@ -1664,15 +1646,13 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* Count the prepared xact as committed or aborted */ AtEOXact_PgStat(isCommit, false); - /* - * And now we can clean up any files we may have left. These should be - * from the current epoch. - */ + /* And now we can clean up any files we may have left. */ if (ondisk) { FullTransactionId fxid; - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + xid); RemoveTwoPhaseFile(fxid, true); } @@ -1749,8 +1729,7 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len) COMP_CRC32C(statefile_crc, content, len); FIN_CRC32C(statefile_crc); - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid); TwoPhaseFilePath(path, fxid); fd = OpenTransientFile(path, @@ -1900,7 +1879,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon) * This is called once at the beginning of recovery, saving any extra * lookups in the future. Two-phase files that are newer than the * minimum XID horizon are discarded on the way. Two-phase files with - * an epoch older or newer than the current checkpoint's record epoch + * an epoch frozen relative to the current checkpoint's record epoch * are also discarded. */ void @@ -1971,7 +1950,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) TransactionId origNextXid = XidFromFullTransactionId(nextXid); TransactionId result = origNextXid; TransactionId *xids = NULL; - uint32 epoch = EpochFromFullTransactionId(nextXid); int nxids = 0; int allocsize = 0; int i; @@ -1988,11 +1966,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) xid = gxact->xid; - /* - * All two-phase files with past and future epoch in pg_twophase are - * gone at this point, so we're OK to rely on only the current epoch. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = FullTransactionIdFromAllowableAt(nextXid, xid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, false, true); @@ -2055,12 +2029,9 @@ void StandbyRecoverPreparedTransactions(void) { int i; - uint32 epoch; FullTransactionId nextFullXid; - /* get current epoch */ nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -2074,11 +2045,7 @@ StandbyRecoverPreparedTransactions(void) xid = gxact->xid; - /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); @@ -2108,12 +2075,9 @@ void RecoverPreparedTransactions(void) { int i; - uint32 epoch; FullTransactionId nextFullXid; - /* get current epoch */ nextFullXid = ReadNextFullTransactionId(); - epoch = EpochFromFullTransactionId(nextFullXid); LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -2127,10 +2091,6 @@ RecoverPreparedTransactions(void) TransactionId *subxids; const char *gid; - /* - * At this stage, we're OK to work with the current epoch as all past - * and future files have been already discarded. - */ xid = gxact->xid; /* @@ -2142,7 +2102,7 @@ RecoverPreparedTransactions(void) * SubTransSetParent has been set before, if the prepared transaction * generated xid assignment records. */ - fxid = FullTransactionIdFromEpochAndXid(epoch, xid); + fxid = FullTransactionIdFromAllowableAt(nextFullXid, xid); buf = ProcessTwoPhaseBuffer(fxid, gxact->prepare_start_lsn, gxact->ondisk, true, false); @@ -2260,8 +2220,9 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, return NULL; } - /* Discard files from past epochs */ - if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid)) + /* Discard files from frozen epochs */ + if (EpochFromFullTransactionId(fxid) + 1 < + EpochFromFullTransactionId(nextXid)) { if (fromdisk) { @@ -2576,8 +2537,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, char path[MAXPGPATH]; FullTransactionId fxid; - /* Use current epoch */ - fxid = FullTransactionIdFromCurrentEpoch(hdr->xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + hdr->xid); TwoPhaseFilePath(path, fxid); if (access(path, F_OK) == 0) @@ -2676,10 +2637,8 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) { FullTransactionId fxid; - /* - * We should deal with a file at the current epoch here. - */ - fxid = FullTransactionIdFromCurrentEpoch(xid); + fxid = FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), + xid); RemoveTwoPhaseFile(fxid, giveWarning); } RemoveGXact(gxact); diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 3596af0..91b6a91 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -2166,28 +2166,14 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) FullTransactionId XLogRecGetFullXid(XLogReaderState *record) { - TransactionId xid, - next_xid; - uint32 epoch; - /* * This function is only safe during replay, because it depends on the * replay state. See AdvanceNextFullTransactionIdPastXid() for more. */ Assert(AmStartupProcess() || !IsUnderPostmaster); - xid = XLogRecGetXid(record); - next_xid = XidFromFullTransactionId(TransamVariables->nextXid); - epoch = EpochFromFullTransactionId(TransamVariables->nextXid); - - /* - * If xid is numerically greater than next_xid, it has to be from the last - * epoch. - */ - if (unlikely(xid > next_xid)) - --epoch; - - return FullTransactionIdFromEpochAndXid(epoch, xid); + return FullTransactionIdFromAllowableAt(TransamVariables->nextXid, + XLogRecGetXid(record)); } #endif diff --git a/src/backend/utils/adt/xid8funcs.c b/src/backend/utils/adt/xid8funcs.c index 4736755..b173956 100644 --- a/src/backend/utils/adt/xid8funcs.c +++ b/src/backend/utils/adt/xid8funcs.c @@ -155,35 +155,6 @@ TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid) } /* - * Convert a TransactionId obtained from a snapshot held by the caller to a - * FullTransactionId. Use next_fxid as a reference FullTransactionId, so that - * we can compute the high order bits. It must have been obtained by the - * caller with ReadNextFullTransactionId() after the snapshot was created. - */ -static FullTransactionId -widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid) -{ - TransactionId next_xid = XidFromFullTransactionId(next_fxid); - uint32 epoch = EpochFromFullTransactionId(next_fxid); - - /* Special transaction ID. */ - if (!TransactionIdIsNormal(xid)) - return FullTransactionIdFromEpochAndXid(0, xid); - - /* - * The 64 bit result must be <= next_fxid, since next_fxid hadn't been - * issued yet when the snapshot was created. Every TransactionId in the - * snapshot must therefore be from the same epoch as next_fxid, or the - * epoch before. We know this because next_fxid is never allow to get - * more than one epoch ahead of the TransactionIds in any snapshot. - */ - if (xid > next_xid) - epoch--; - - return FullTransactionIdFromEpochAndXid(epoch, xid); -} - -/* * txid comparator for qsort/bsearch */ static int @@ -420,12 +391,18 @@ pg_current_snapshot(PG_FUNCTION_ARGS) nxip = cur->xcnt; snap = palloc(PG_SNAPSHOT_SIZE(nxip)); - /* fill */ - snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid); - snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid); + /* + * Fill. This is the current backend's active snapshot, so MyProc->xmin + * is <= all these XIDs. As long as that remains so, oldestXid can't + * advance past any of these XIDs. Hence, these XIDs remain allowable + * relative to next_fxid. + */ + snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->xmin); + snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->xmax); snap->nxip = nxip; for (i = 0; i < nxip; i++) - snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid); + snap->xip[i] = + FullTransactionIdFromAllowableAt(next_fxid, cur->xip[i]); /* * We want them guaranteed to be in ascending order. This also removes diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 0cab865..48fce16 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -77,6 +77,35 @@ FullTransactionIdFromEpochAndXid(uint32 epoch, TransactionId xid) return result; } +/* + * Compute FullTransactionId for the given TransactionId, assuming xid was + * between [oldestXid, nextXid] at the time when TransamVariables->nextXid was + * nextFullXid. + */ +static inline FullTransactionId +FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, + TransactionId xid) +{ + uint32 epoch; + + /* Special transaction ID. */ + if (!TransactionIdIsNormal(xid)) + return FullTransactionIdFromEpochAndXid(0, xid); + + /* + * The 64 bit result must be <= nextFullXid, since nextFullXid hadn't been + * issued yet when xid was in the past. The xid must therefore be from + * the epoch of nextFullXid or the epoch before. We know this because we + * must remove (by freezing) an XID before assigning the XID half an epoch + * ahead of it. + */ + epoch = EpochFromFullTransactionId(nextFullXid); + if (xid > XidFromFullTransactionId(nextFullXid)) + epoch--; + + return FullTransactionIdFromEpochAndXid(epoch, xid); +} + static inline FullTransactionId FullTransactionIdFromU64(uint64 value) {