From c77a6a31a6cfc0ba6757376d1953ec913652a4a9 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Mon, 6 Feb 2023 10:52:54 +0000 Subject: [PATCH v47 2/6] Handle logical slot conflicts on standby. During WAL replay on standby, when slot conflict is identified, invalidate such slots. Also do the same thing if wal_level on the primary server is reduced to below logical and there are existing logical slots on standby. Introduce a new ProcSignalReason value for slot conflict recovery. Arrange for a new pg_stat_database_conflicts field: confl_active_logicalslot. Add a new field "conflicting" in pg_replication_slots. Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello, Bharath Rupireddy --- doc/src/sgml/monitoring.sgml | 11 + doc/src/sgml/system-views.sgml | 10 + src/backend/access/gist/gistxlog.c | 2 + src/backend/access/hash/hash_xlog.c | 1 + src/backend/access/heap/heapam.c | 3 + src/backend/access/nbtree/nbtxlog.c | 2 + src/backend/access/spgist/spgxlog.c | 1 + src/backend/access/transam/xlog.c | 24 ++- src/backend/catalog/system_views.sql | 6 +- .../replication/logical/logicalfuncs.c | 13 +- src/backend/replication/slot.c | 198 +++++++++++++----- src/backend/replication/slotfuncs.c | 13 +- src/backend/replication/walsender.c | 8 + src/backend/storage/ipc/procsignal.c | 3 + src/backend/storage/ipc/standby.c | 13 +- src/backend/tcop/postgres.c | 24 +++ src/backend/utils/activity/pgstat_database.c | 4 + src/backend/utils/adt/pgstatfuncs.c | 3 + src/include/catalog/pg_proc.dat | 11 +- src/include/pgstat.h | 1 + src/include/replication/slot.h | 5 +- src/include/storage/procsignal.h | 1 + src/include/storage/standby.h | 2 + src/test/regress/expected/rules.out | 8 +- 24 files changed, 304 insertions(+), 63 deletions(-) 5.4% doc/src/sgml/ 7.2% src/backend/access/transam/ 4.7% src/backend/replication/logical/ 56.8% src/backend/replication/ 4.5% src/backend/storage/ipc/ 6.5% src/backend/tcop/ 5.4% src/backend/ 3.9% src/include/catalog/ 3.0% src/include/replication/ diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 1756f1a4b6..e25f71a776 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -4365,6 +4365,17 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i deadlocks + + + + confl_active_logicalslot bigint + + + Number of active logical slots in this database that have been + invalidated because they conflict with recovery (note that inactive ones + are also invalidated but do not increment this counter) + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 7c8fc3f654..239f713295 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2516,6 +2516,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx false for physical slots. + + + + conflicting bool + + + True if this logical slot conflicted with recovery (and so is now + invalidated). Always NULL for physical slots. + + diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index b7678f3c14..9a86fb3fef 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -197,6 +197,7 @@ gistRedoDeleteRecord(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, rlocator); } @@ -390,6 +391,7 @@ gistRedoPageReuse(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, xlrec->locator); } diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c index 08ceb91288..b856304746 100644 --- a/src/backend/access/hash/hash_xlog.c +++ b/src/backend/access/hash/hash_xlog.c @@ -1003,6 +1003,7 @@ hash_xlog_vacuum_one_page(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, rlocator); } diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index b5acbb5d39..3f303a27b8 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8777,6 +8777,7 @@ heap_xlog_prune(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); /* @@ -8946,6 +8947,7 @@ heap_xlog_visible(XLogReaderState *record) */ if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->flags & VISIBILITYMAP_IS_CATALOG_REL, rlocator); /* @@ -9063,6 +9065,7 @@ heap_xlog_freeze_page(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); } diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 414ca4f6de..c87e46ed66 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -669,6 +669,7 @@ btree_xlog_delete(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &rlocator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, rlocator); } @@ -1007,6 +1008,7 @@ btree_xlog_reuse_page(XLogReaderState *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshotFullXid(xlrec->snapshotConflictHorizon, + xlrec->isCatalogRel, xlrec->locator); } diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c index b071b59c8a..459ac929ba 100644 --- a/src/backend/access/spgist/spgxlog.c +++ b/src/backend/access/spgist/spgxlog.c @@ -879,6 +879,7 @@ spgRedoVacuumRedirect(XLogReaderState *record) XLogRecGetBlockTag(record, 0, &locator, NULL, NULL); ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon, + xldata->isCatalogRel, locator); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f9f0f6db8d..54d344a59c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6444,6 +6444,7 @@ CreateCheckPoint(int flags) VirtualTransactionId *vxids; int nvxids; int oldXLogAllowed = 0; + bool invalidated = false; /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just @@ -6804,7 +6805,8 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL); + if (invalidated) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7083,6 +7085,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; XLogSegNo _logSegNo; TimestampTz xtime; + bool invalidated = false; /* Concurrent checkpoint/restartpoint cannot happen */ Assert(!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER); @@ -7248,7 +7251,8 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(_logSegNo)) + InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL); + if (invalidated) { /* * Some slots have been invalidated; recalculate the old-segment @@ -7961,6 +7965,22 @@ xlog_redo(XLogReaderState *record) /* Update our copy of the parameters in pg_control */ memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); + /* + * Invalidate logical slots if we are in hot standby and the primary does not + * have a WAL level sufficient for logical decoding. No need to search + * for potentially conflicting logically slots if standby is running + * with wal_level lower than logical, because in that case, we would + * have either disallowed creation of logical slots or invalidated existing + * ones. + */ + if (InRecovery && InHotStandby && + xlrec.wal_level < WAL_LEVEL_LOGICAL && + wal_level >= WAL_LEVEL_LOGICAL) + { + TransactionId ConflictHorizon = InvalidTransactionId; + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, InvalidOid, &ConflictHorizon); + } + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; ControlFile->max_worker_processes = xlrec.max_worker_processes; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8608e3fa5b..a272bd4a88 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -997,7 +997,8 @@ CREATE VIEW pg_replication_slots AS L.confirmed_flush_lsn, L.wal_status, L.safe_wal_size, - L.two_phase + L.two_phase, + L.conflicting FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); @@ -1065,7 +1066,8 @@ CREATE VIEW pg_stat_database_conflicts AS pg_stat_get_db_conflict_lock(D.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_active_logicalslot FROM pg_database D; CREATE VIEW pg_stat_user_functions AS diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index fa1b641a2b..070fd378e8 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -216,9 +216,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* * After the sanity checks in CreateDecodingContext, make sure the - * restart_lsn is valid. Avoid "cannot get changes" wording in this - * errmsg because that'd be confusingly ambiguous about no changes - * being available. + * restart_lsn is valid or both xmin and catalog_xmin are valid. Avoid + * "cannot get changes" wording in this errmsg because that'd be + * confusingly ambiguous about no changes being available. */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, @@ -227,6 +227,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(*name)), errdetail("This slot has never previously reserved WAL, or it has been invalidated."))); + if (LogicalReplicationSlotIsInvalid(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + NameStr(*name)), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + MemoryContextSwitchTo(oldcontext); /* diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f286918f69..38c6f18886 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) SpinLockAcquire(&s->mutex); effective_xmin = s->effective_xmin; effective_catalog_xmin = s->effective_catalog_xmin; - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && - XLogRecPtrIsInvalid(s->data.restart_lsn)); + invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) && + XLogRecPtrIsInvalid(s->data.restart_lsn)) + || (!TransactionIdIsValid(s->data.xmin) && + !TransactionIdIsValid(s->data.catalog_xmin))); SpinLockRelease(&s->mutex); /* invalidated slots need not apply */ @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void) } /* - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot - * and mark it invalid, if necessary and possible. + * Helper for InvalidateObsoleteReplicationSlots + * + * Acquires the given slot and mark it invalid, if necessary and possible. * * Returns whether ReplicationSlotControlLock was released in the interim (and * in that case we're not holding the lock at return, otherwise we are). * - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.) * * This is inherently racy, because we release the LWLock * for syscalls, so caller must restart if we return true. */ static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, - bool *invalidated) +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, + bool *invalidated, TransactionId *xid) { int last_signaled_pid = 0; bool released_lock = false; @@ -1245,6 +1248,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, for (;;) { XLogRecPtr restart_lsn; + TransactionId slot_xmin; + TransactionId slot_catalog_xmin; + NameData slotname; int active_pid = 0; @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, * Check if the slot needs to be invalidated. If it needs to be * invalidated, and is not currently acquired, acquire it and mark it * as having been invalidated. We do this with the spinlock held to - * avoid race conditions -- for example the restart_lsn could move - * forward, or the slot could be dropped. + * avoid race conditions -- for example the restart_lsn (or the + * xmin(s) could) move forward or the slot could be dropped. */ SpinLockAcquire(&s->mutex); restart_lsn = s->data.restart_lsn; + slot_xmin = s->data.xmin; + slot_catalog_xmin = s->data.catalog_xmin; + + /* slot has been invalidated (logical decoding conflict case) */ + if ((xid && + ((LogicalReplicationSlotIsInvalid(s)) + || /* - * If the slot is already invalid or is fresh enough, we don't need to - * do anything. + * We are not forcing for invalidation because the xid is valid and + * this is a non conflicting slot. */ - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + (TransactionIdIsValid(*xid) && !( + (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, *xid)) + || + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, *xid)) + )) + )) + || + /* slot has been invalidated (obsolete LSN case) */ + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN))) { SpinLockRelease(&s->mutex); if (released_lock) @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, { MyReplicationSlot = s; s->active_pid = MyProcPid; - s->data.invalidated_at = restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - + if (xid) + { + s->data.xmin = InvalidTransactionId; + s->data.catalog_xmin = InvalidTransactionId; + } + else + { + s->data.invalidated_at = restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } /* Let caller know */ *invalidated = true; } @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, */ if (last_signaled_pid != active_pid) { - ereport(LOG, - errmsg("terminating process %d to release replication slot \"%s\"", - active_pid, NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); + if (xid) + { + if (TransactionIdIsValid(*xid)) + { + ereport(LOG, + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery", + active_pid, NameStr(slotname)), + errdetail("The slot conflicted with xid horizon %u.", + *xid)); + } + else + { + ereport(LOG, + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery", + active_pid, NameStr(slotname)), + errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server")); + } + + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId); + } + else + { + ereport(LOG, + errmsg("terminating process %d to release replication slot \"%s\"", + active_pid, NameStr(slotname)), + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", + LSN_FORMAT_ARGS(restart_lsn), + (unsigned long long) (oldestLSN - restart_lsn)), + errhint("You might need to increase max_slot_wal_keep_size.")); + + (void) kill(active_pid, SIGTERM); + } - (void) kill(active_pid, SIGTERM); last_signaled_pid = active_pid; } @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, ReplicationSlotSave(); ReplicationSlotRelease(); - ereport(LOG, - errmsg("invalidating obsolete replication slot \"%s\"", - NameStr(slotname)), - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", - LSN_FORMAT_ARGS(restart_lsn), - (unsigned long long) (oldestLSN - restart_lsn)), - errhint("You might need to increase max_slot_wal_keep_size.")); + if (xid) + { + pgstat_drop_replslot(s); + + if (TransactionIdIsValid(*xid)) + { + ereport(LOG, + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)), + errdetail("The slot conflicted with xid horizon %u.", *xid)); + } + else + { + ereport(LOG, + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)), + errdetail("Logical decoding on standby requires wal_level to be at least logical on the primary server")); + } + } + else + { + ereport(LOG, + errmsg("invalidating obsolete replication slot \"%s\"", + NameStr(slotname)), + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", + LSN_FORMAT_ARGS(restart_lsn), + (unsigned long long) (oldestLSN - restart_lsn)), + errhint("You might need to increase max_slot_wal_keep_size.")); + } /* done with this slot for now */ break; @@ -1388,20 +1460,40 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, } /* - * Mark any slot that points to an LSN older than the given segment - * as invalid; it requires WAL that's about to be removed. + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots. * - * Returns true when any slot have got invalidated. + * Obsolete case (aka xid is NULL): * - * NB - this runs as part of checkpoint, so avoid raising errors if possible. + * Mark any slot that points to an LSN older than the given segment + * as invalid; it requires WAL that's about to be removed. + * invalidated is set to true when any slot have got invalidated. + * + * Logical replication slot case: + * + * When xid is valid, it means that we are about to remove rows older than xid. + * Therefore we need to invalidate slots that depend on seeing those rows. + * When xid is invalid, invalidate all logical slots. This is required when the + * master wal_level is set back to replica, so existing logical slots need to + * be invalidated. */ -bool -InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +void +InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, bool *invalidated, Oid dboid, TransactionId *xid) { - XLogRecPtr oldestLSN; - bool invalidated = false; - XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + XLogRecPtr oldestLSN = InvalidXLogRecPtr; + bool logical_slot_invalidated = false; + + Assert(max_replication_slots >= 0); + + if (max_replication_slots == 0) + return; + + if (!xid) + { + Assert(invalidated); + *invalidated = false; + XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + } restart: LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); @@ -1412,24 +1504,36 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) + if (xid) { - /* if the lock was released, start from scratch */ - goto restart; + /* we are only dealing with *logical* slot conflicts */ + if (!SlotIsLogical(s)) + continue; + + /* + * not the database of interest and we don't want all the + * database, skip + */ + if (s->data.database != dboid && TransactionIdIsValid(*xid)) + continue; } + + if (InvalidatePossiblyObsoleteOrConflictingLogicalSlot(s, oldestLSN, invalidated ? invalidated : &logical_slot_invalidated, xid)) + goto restart; } + LWLockRelease(ReplicationSlotControlLock); /* - * If any slots have been invalidated, recalculate the resource limits. + * If any slots have been invalidated, recalculate the required xmin + * and the required lsn (if appropriate). */ - if (invalidated) + if ((!xid && *invalidated) || (xid && logical_slot_invalidated)) { ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + if (!xid && *invalidated) + ReplicationSlotsComputeRequiredLSN(); } - - return invalidated; } /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2f3c964824..44192bc32d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 14 +#define PG_GET_REPLICATION_SLOTS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -404,6 +404,17 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.two_phase); + if (slot_contents.data.database == InvalidOid) + nulls[i++] = true; + else + { + if (slot_contents.data.xmin == InvalidTransactionId && + slot_contents.data.catalog_xmin == InvalidTransactionId) + values[i++] = BoolGetDatum(true); + else + values[i++] = BoolGetDatum(false); + } + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4ed3747e3f..8885cdeebc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1253,6 +1253,14 @@ StartLogicalReplication(StartReplicationCmd *cmd) ReplicationSlotAcquire(cmd->slotname, true); + if (!TransactionIdIsValid(MyReplicationSlot->data.xmin) + && !TransactionIdIsValid(MyReplicationSlot->data.catalog_xmin)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot read from logical replication slot \"%s\"", + cmd->slotname), + errdetail("This slot has been invalidated because it was conflicting with recovery."))); + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 395b2cf690..c85cb5cc18 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -673,6 +673,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT)) + RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 94cc860f5f..ec817381a1 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -35,6 +35,7 @@ #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" +#include "replication/slot.h" /* User-settable GUC parameters */ int vacuum_defer_cleanup_age; @@ -475,6 +476,7 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, */ void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator) { VirtualTransactionId *backends; @@ -500,6 +502,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, true); + + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, locator.dbOid, &snapshotConflictHorizon); } /* @@ -508,6 +513,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, */ void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator) { /* @@ -526,7 +532,9 @@ ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHor TransactionId truncated; truncated = XidFromFullTransactionId(snapshotConflictHorizon); - ResolveRecoveryConflictWithSnapshot(truncated, locator); + ResolveRecoveryConflictWithSnapshot(truncated, + isCatalogRel, + locator); } } @@ -1487,6 +1495,9 @@ get_recovery_conflict_desc(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: reasonDesc = _("recovery conflict on snapshot"); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + reasonDesc = _("recovery conflict on replication slot"); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: reasonDesc = _("recovery conflict on buffer deadlock"); break; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 5d439f2710..b2a75b6d72 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2481,6 +2481,9 @@ errdetail_recovery_conflict(void) case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: errdetail("User query might have needed to see row versions that must be removed."); break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + errdetail("User was using the logical slot that must be dropped."); + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: errdetail("User transaction caused buffer deadlock with recovery."); break; @@ -3050,6 +3053,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason) case PROCSIG_RECOVERY_CONFLICT_LOCK: case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + + /* + * For conflicts that require a logical slot to be + * invalidated, the requirement is for the signal receiver to + * release the slot, so that it could be invalidated by the + * signal sender. So for normal backends, the transaction + * should be aborted, just like for other recovery conflicts. + * But if it's walsender on standby, we don't want to go + * through the following IsTransactionOrTransactionBlock() + * check, so break here. + */ + if (am_cascading_walsender && + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) + { + RecoveryConflictPending = true; + QueryCancelPending = true; + InterruptPending = true; + break; + } /* * If we aren't in a transaction any longer then ignore. diff --git a/src/backend/utils/activity/pgstat_database.c b/src/backend/utils/activity/pgstat_database.c index 6e650ceaad..7149f22f72 100644 --- a/src/backend/utils/activity/pgstat_database.c +++ b/src/backend/utils/activity/pgstat_database.c @@ -109,6 +109,9 @@ pgstat_report_recovery_conflict(int reason) case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: dbentry->conflict_bufferpin++; break; + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + dbentry->conflict_logicalslot++; + break; case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: dbentry->conflict_startup_deadlock++; break; @@ -387,6 +390,7 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) PGSTAT_ACCUM_DBCOUNT(conflict_tablespace); PGSTAT_ACCUM_DBCOUNT(conflict_lock); PGSTAT_ACCUM_DBCOUNT(conflict_snapshot); + PGSTAT_ACCUM_DBCOUNT(conflict_logicalslot); PGSTAT_ACCUM_DBCOUNT(conflict_bufferpin); PGSTAT_ACCUM_DBCOUNT(conflict_startup_deadlock); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 6737493402..afd62d3cc0 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -1066,6 +1066,8 @@ PG_STAT_GET_DBENTRY_INT64(xact_commit) /* pg_stat_get_db_xact_rollback */ PG_STAT_GET_DBENTRY_INT64(xact_rollback) +/* pg_stat_get_db_conflict_logicalslot */ +PG_STAT_GET_DBENTRY_INT64(conflict_logicalslot) Datum pg_stat_get_db_stat_reset_time(PG_FUNCTION_ARGS) @@ -1099,6 +1101,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS) result = (int64) (dbentry->conflict_tablespace + dbentry->conflict_lock + dbentry->conflict_snapshot + + dbentry->conflict_logicalslot + dbentry->conflict_bufferpin + dbentry->conflict_startup_deadlock); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index c0f2a8a77c..c8e11ab710 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5577,6 +5577,11 @@ proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's', proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', prosrc => 'pg_stat_get_db_conflict_snapshot' }, +{ oid => '9901', + descr => 'statistics: recovery conflicts in database caused by logical replication slot', + proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's', + proparallel => 'r', prorettype => 'int8', proargtypes => 'oid', + prosrc => 'pg_stat_get_db_conflict_logicalslot' }, { oid => '3068', descr => 'statistics: recovery conflicts in database caused by shared buffer pin', proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's', @@ -10946,9 +10951,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5e3326a3b9..872eb35757 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -291,6 +291,7 @@ typedef struct PgStat_StatDBEntry PgStat_Counter conflict_tablespace; PgStat_Counter conflict_lock; PgStat_Counter conflict_snapshot; + PgStat_Counter conflict_logicalslot; PgStat_Counter conflict_bufferpin; PgStat_Counter conflict_startup_deadlock; PgStat_Counter temp_files; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8872c80cdf..236ebcdbdb 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -17,6 +17,8 @@ #include "storage/spin.h" #include "replication/walreceiver.h" +#define LogicalReplicationSlotIsInvalid(s) (!TransactionIdIsValid(s->data.xmin) && \ + !TransactionIdIsValid(s->data.catalog_xmin)) /* * Behaviour of replication slots, upon release or crash. * @@ -215,7 +217,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); -extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); +extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno, bool *invalidated, Oid dboid, TransactionId *xid); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); @@ -227,5 +229,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason); #endif /* SLOT_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 905af2231b..2f52100b00 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -42,6 +42,7 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_TABLESPACE, PROCSIG_RECOVERY_CONFLICT_LOCK, PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, + PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h index 2effdea126..41f4dc372e 100644 --- a/src/include/storage/standby.h +++ b/src/include/storage/standby.h @@ -30,8 +30,10 @@ extern void InitRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void); extern void ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator); extern void ResolveRecoveryConflictWithSnapshotFullXid(FullTransactionId snapshotConflictHorizon, + bool isCatalogRel, RelFileLocator locator); extern void ResolveRecoveryConflictWithTablespace(Oid tsid); extern void ResolveRecoveryConflictWithDatabase(Oid dbid); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e7a2f5856a..11ea206337 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name, l.confirmed_flush_lsn, l.wal_status, l.safe_wal_size, - l.two_phase - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase) + l.two_phase, + l.conflicting + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, @@ -1868,7 +1869,8 @@ pg_stat_database_conflicts| SELECT oid AS datid, pg_stat_get_db_conflict_lock(oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(oid) AS confl_bufferpin, - pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock + pg_stat_get_db_conflict_startup_deadlock(oid) AS confl_deadlock, + pg_stat_get_db_conflict_logicalslot(oid) AS confl_active_logicalslot FROM pg_database d; pg_stat_gssapi| SELECT pid, gss_auth AS gss_authenticated, -- 2.34.1