diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c index d0d1abda58..b8be124555 100644 --- a/src/backend/catalog/indexing.c +++ b/src/backend/catalog/indexing.c @@ -22,6 +22,7 @@ #include "catalog/index.h" #include "catalog/indexing.h" #include "executor/executor.h" +#include "miscadmin.h" #include "utils/rel.h" @@ -234,6 +235,14 @@ CatalogTupleInsert(Relation heapRel, HeapTuple tup) { CatalogIndexState indstate; + /* + * If we might need TOAST access, make sure the caller has set up a valid + * snapshot. + */ + Assert(HaveRegisteredOrActiveSnapshot() || + !OidIsValid(heapRel->rd_rel->reltoastrelid) || + !IsNormalProcessingMode()); + CatalogTupleCheckConstraints(heapRel, tup); indstate = CatalogOpenIndexes(heapRel); @@ -256,6 +265,14 @@ void CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup, CatalogIndexState indstate) { + /* + * If we might need TOAST access, make sure the caller has set up a valid + * snapshot. + */ + Assert(HaveRegisteredOrActiveSnapshot() || + !OidIsValid(heapRel->rd_rel->reltoastrelid) || + !IsNormalProcessingMode()); + CatalogTupleCheckConstraints(heapRel, tup); simple_heap_insert(heapRel, tup); @@ -273,6 +290,14 @@ void CatalogTuplesMultiInsertWithInfo(Relation heapRel, TupleTableSlot **slot, int ntuples, CatalogIndexState indstate) { + /* + * If we might need TOAST access, make sure the caller has set up a valid + * snapshot. + */ + Assert(HaveRegisteredOrActiveSnapshot() || + !OidIsValid(heapRel->rd_rel->reltoastrelid) || + !IsNormalProcessingMode()); + /* Nothing to do */ if (ntuples <= 0) return; @@ -315,6 +340,14 @@ CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup) CatalogIndexState indstate; TU_UpdateIndexes updateIndexes = TU_All; + /* + * If we might need TOAST access, make sure the caller has set up a valid + * snapshot. + */ + Assert(HaveRegisteredOrActiveSnapshot() || + !OidIsValid(heapRel->rd_rel->reltoastrelid) || + !IsNormalProcessingMode()); + CatalogTupleCheckConstraints(heapRel, tup); indstate = CatalogOpenIndexes(heapRel); @@ -339,6 +372,14 @@ CatalogTupleUpdateWithInfo(Relation heapRel, ItemPointer otid, HeapTuple tup, { TU_UpdateIndexes updateIndexes = TU_All; + /* + * If we might need TOAST access, make sure the caller has set up a valid + * snapshot. + */ + Assert(HaveRegisteredOrActiveSnapshot() || + !OidIsValid(heapRel->rd_rel->reltoastrelid) || + !IsNormalProcessingMode()); + CatalogTupleCheckConstraints(heapRel, tup); simple_heap_update(heapRel, otid, tup, &updateIndexes); @@ -364,5 +405,13 @@ CatalogTupleUpdateWithInfo(Relation heapRel, ItemPointer otid, HeapTuple tup, void CatalogTupleDelete(Relation heapRel, ItemPointer tid) { + /* + * If we might need TOAST access, make sure the caller has set up a valid + * snapshot. + */ + Assert(HaveRegisteredOrActiveSnapshot() || + !OidIsValid(heapRel->rd_rel->reltoastrelid) || + !IsNormalProcessingMode()); + simple_heap_delete(heapRel, tid); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index d27e6cf345..06ac1d39d2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -19292,9 +19292,17 @@ ATExecDetachPartition(List **wqueue, AlteredTableInfo *tab, Relation rel, tab->rel = rel; } + if (concurrent) + PushActiveSnapshot(GetTransactionSnapshot()); + else + Assert(HaveRegisteredOrActiveSnapshot()); + /* Do the final part of detaching */ DetachPartitionFinalize(rel, partRel, concurrent, defaultPartOid); + if (concurrent) + PopActiveSnapshot(); + ObjectAddressSet(address, RelationRelationId, RelationGetRelid(partRel)); /* keep our lock until commit */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e03e761392..ed7d187bfe 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -377,6 +377,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) replorigin_session_origin_lsn = InvalidXLogRecPtr; replorigin_session_origin_timestamp = 0; + PushActiveSnapshot(GetTransactionSnapshot()); + /* * Drop the tablesync's origin tracking if exists. * @@ -387,6 +389,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) */ replorigin_drop_by_name(originname, true, false); + PopActiveSnapshot(); + finish_sync_worker(); } else @@ -483,6 +487,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) started_tx = true; } + PushActiveSnapshot(GetTransactionSnapshot()); + /* * Remove the tablesync origin tracking if exists. * @@ -500,6 +506,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) sizeof(originname)); replorigin_drop_by_name(originname, true, false); + PopActiveSnapshot(); + /* * Update the state to READY only after the origin cleanup. */ @@ -1456,7 +1464,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * logged for the purpose of recovery. Locks are to prevent the * replication origin from vanishing while advancing. */ + PushActiveSnapshot(GetTransactionSnapshot()); originid = replorigin_create(originname); + PopActiveSnapshot(); LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 925dff9cc4..502033f3d8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4604,8 +4604,10 @@ run_apply_worker() walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + PopActiveSnapshot(); CommitTransactionCommand(); } else @@ -4821,7 +4823,9 @@ DisableSubscriptionAndExit(void) /* Disable the subscription */ StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); DisableSubscription(MySubscription->oid); + PopActiveSnapshot(); CommitTransactionCommand(); /* Ensure we remove no-longer-useful entry for worker's start time */ @@ -4925,6 +4929,8 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn) started_tx = true; } + PushActiveSnapshot(GetTransactionSnapshot()); + /* * Protect subskiplsn of pg_subscription from being concurrently updated * while clearing it. @@ -4983,6 +4989,8 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn) heap_freetuple(tup); table_close(rel, NoLock); + PopActiveSnapshot(); + if (started_tx) CommitTransactionCommand(); }