From b676827e76a21eea6a9b67d9b65aa673dd85705d Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 26 Sep 2024 14:11:40 +0800 Subject: [PATCH v88] Maintain the oldest non removeable tranasction ID by apply worker This set of patches aims to support the detection of update_deleted conflicts, which occur when the apply worker cannot find the target tuple to be updated (e.g., the tuple has been removed by a different origin). To detect this conflict consistently and correctly, we must ensure that tuples deleted by other origins are not prematurely removed by VACUUM before conflict detection. If these tuples are removed too soon, a different conflict might arise and be resolved incorrectly, causing data inconsistency between nodes. To achieve this, we will retain the dead tuples on the subscriber for some period. The concept is that dead tuples are useful for detecting conflicts only during the application of concurrent transactions from remote nodes. After applying and flushing all remote transactions that occurred concurrently with the tuple DELETE, any subsequent UPDATE from a remote node should have a later timestamp. In such cases, it is acceptable to detect an update_missing scenario and convert the UPDATE to an INSERT when applying it. But, for concurrent remote transactions with earlier timestamps than the DELETE, detecting update_delete is necessary, as the UPDATEs in remote transactions should be ignored if their timestamp is earlier than that of the dead tuples. We assume that the appropriate resolution for update_deleted conflicts, to achieve eventual consistency, is the last-update-win strategy. This means that when detecting the update_deleted conflict, and the remote update has a later timestamp, the resolution would be to convert the UPDATE to an INSERT. Remote updates with earlier timestamps compared to the dead tuples will be disregarded. To implement this, an additional replication slot named pg_conflict_detection will be created on the subscriber side and maintained by the launcher. This slot will be used to retain dead tuples. Each apply worker will maintain its own non-removable transaction ID, while the launcher collects these IDs to determine whether to advance the xmin value of the replication slot. The process of advancing the non-removable transaction ID in the apply worker involves: 1) Call GetOldestActiveTransactionId() to take oldestRunningXid as the candidate xid. 2) Send a message to the walsender requesting the publisher status, which includes the latest WAL write position and information about running transactions. 3) Wait for the status from the walsender. After receiving the first status after acquiring a new candidate transaction ID, do not proceed if there are ongoing concurrent remote transactions. These transactions might have been assigned an earlier commit timestamp but have not yet written the commit WAL record. Continue to request the publisher status until all these transactions have completed. 4) Advance the non-removable transaction ID if the current flush location has reached or surpassed the last received WAL position. These steps are repeated at intervals defined by wal_receiver_status_interval to minimize performance impact. This approach ensures that dead tuples are not removed until all concurrent transactions have been applied. It works for both bidirectional and non-bidirectional replication scenarios. This patch allows each apply worker to maintain the non-removable transaction ID in the shared memory following the steps described above. The actual replication slot management is implemented in the following patches. --- doc/src/sgml/protocol.sgml | 89 +++++++ src/backend/replication/logical/worker.c | 307 +++++++++++++++++++++- src/backend/replication/walsender.c | 57 ++++ src/include/replication/worker_internal.h | 18 ++ src/tools/pgindent/typedefs.list | 1 + 5 files changed, 470 insertions(+), 2 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index d5a78694b9..3d0cf32356 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2441,6 +2441,68 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Primary status update (B) + + + + Byte1('s') + + + Identifies the message as a primary status update. + + + + + + Int64 + + + The oldest running transaction ID on the server. + + + + + + Int64 + + + The next transaction ID to be assigned on the server. + + + + + + Int64 + + + The epoch of the next transaction ID to be assigned. + + + + + + Int64 + + + The latest WAL write position on the server. + + + + + + Int64 + + + The server's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + @@ -2585,6 +2647,33 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Request primary status update (F) + + + + Byte1('S') + + + Identifies the message as a request for a primary status update. + + + + + + Int64 + + + The client's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + + diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 925dff9cc4..60d13c6e02 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -173,12 +173,14 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" #include "storage/buffile.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/dynahash.h" @@ -275,6 +277,20 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * The phases involved in advancing the non-removable transaction ID. + * + * Refer to maybe_advance_nonremovable_xid() for details on how the function + * transitions between these phases. + */ +typedef enum +{ + DTR_GET_CANDIDATE_XID, + DTR_REQUEST_PUBLISHER_STATUS, + DTR_WAIT_FOR_PUBLISHER_STATUS, + DTR_WAIT_FOR_LOCAL_FLUSH +} DeadTupleRetainPhase; + /* errcontext tracker */ static ApplyErrorCallbackArg apply_error_callback_arg = { @@ -339,6 +355,10 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + +static StringInfo reply_message = NULL; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -378,6 +398,11 @@ static void stream_open_and_write_change(TransactionId xid, char action, StringI static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void maybe_advance_nonremovable_xid(XLogRecPtr *remote_lsn, + TransactionId remote_oldestxid, + TransactionId remote_nextxid, + uint32 remote_epoch, + DeadTupleRetainPhase *phase); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -3573,6 +3598,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + XLogRecPtr remote_lsn; + TransactionId remote_oldestxid; + TransactionId remote_nextxid; + uint32 remote_epoch; + DeadTupleRetainPhase phase = DTR_GET_CANDIDATE_XID; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3694,6 +3724,50 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, reply_requested, false); UpdateWorkerStats(last_received, timestamp, true); } + else if (c == 's') /* Primary status update */ + { + TimestampTz timestamp; + + remote_lsn = pq_getmsgint64(&s); + remote_oldestxid = pq_getmsgint(&s, 4); + remote_nextxid = pq_getmsgint(&s, 4); + remote_epoch = pq_getmsgint(&s, 4); + timestamp = pq_getmsgint64(&s); + + /* + * An invalid position indiates the publisher is also + * a physical standby. In this scenario, advancing the + * non-removable transaction ID is not supported. This + * is because the logical walsender on the standby can + * only get the WAL replay position but there may be + * more WALs that are being replicated from the + * primary and those WALs could have earlier commit + * timestamp. Refer to + * maybe_advance_nonremovable_xid() for details. + */ + if (XLogRecPtrIsInvalid(remote_lsn)) + { + ereport(WARNING, + errmsg("cannot get the latest WAL position from the publisher"), + errdetail("The connected publisher is also a standby server.")); + + /* + * Continuously revert to the request phase until + * the standby server (publisher) is promoted, at + * which point a valid WAL position will be + * received. + */ + phase = DTR_REQUEST_PUBLISHER_STATUS; + } + + maybe_advance_nonremovable_xid(&remote_lsn, + remote_oldestxid, + remote_nextxid, + remote_epoch, + &phase); + + UpdateWorkerStats(last_received, timestamp, false); + } /* other message types are purposefully ignored */ MemoryContextReset(ApplyMessageContext); @@ -3706,6 +3780,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + maybe_advance_nonremovable_xid(&remote_lsn, remote_oldestxid, + remote_nextxid, remote_epoch, &phase); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3833,12 +3910,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) { - static StringInfo reply_message = NULL; static TimestampTz send_time = 0; static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; - static XLogRecPtr last_flushpos = InvalidXLogRecPtr; XLogRecPtr writepos; XLogRecPtr flushpos; @@ -3916,6 +3991,234 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) last_flushpos = flushpos; } +/* + * Attempt to advance the non-removable transaction ID. + * + * The oldest_nonremovable_xid is maintained in shared memory to prevent dead + * rows from being removed prematurely when the apply worker still needs them + * to detect update_deleted conflicts. + * + * The non-removable transaction ID is advanced to the oldest running + * transaction ID once all concurrent transactions on the publisher have been + * applied and flushed locally. The process involves: + * + * - DTR_GET_CANDIDATE_XID: + * Call GetOldestActiveTransactionId() to take oldestRunningXid as the + * candidate xid. + * + * - DTR_REQUEST_PUBLISHER_STATUS: + * Send a message to the walsender requesting the publisher status, which + * includes the latest WAL write position and information about running + * transactions. + * + * - DTR_WAIT_FOR_PUBLISHER_STATUS: + * Wait for the status from the walsender. After receiving the first status + * after acquiring a new candidate transaction ID, do not proceed if there + * are ongoing concurrent remote transactions. These transactions might have + * been assigned an earlier commit timestamp but have not yet written the + * commit WAL record. Continue to request the publisher status + * (DTR_REQUEST_PUBLISHER_STATUS) until all these transactions have + * completed. + * + * - DTR_WAIT_FOR_LOCAL_FLUSH: + * Advance the non-removable transaction ID if the current flush location has + * reached or surpassed the last received WAL position. + * + * Retaining the dead tuples for this period is sufficient for ensuring + * eventual consistency using last-update-wins strategy, as dead tuples are + * useful for detecting conflicts only during the application of concurrent + * transactions from remote nodes. After applying and flushing all remote + * transactions that occurred concurrently with the tuple DELETE, any + * subsequent UPDATE from a remote node should have a later timestamp. In such + * cases, it is acceptable to detect an update_missing scenario and convert the + * UPDATE to an INSERT when applying it. But, for concurrent remote + * transactions with earlier timestamps than the DELETE, detecting + * update_deleted is necessary, as the UPDATEs in remote transactions should be + * ignored if their timestamp is earlier than that of the dead tuples. + * + * The 'remote_lsn' will be reset after sending a new request to walsender. + */ +static void +maybe_advance_nonremovable_xid(XLogRecPtr *remote_lsn, + TransactionId remote_oldestxid, + TransactionId remote_nextxid, + uint32 remote_epoch, + DeadTupleRetainPhase *phase) +{ + static TimestampTz xid_advance_attempt_time = 0; + static FullTransactionId candidate_xid; + static FullTransactionId next_phase_at; + + TimestampTz now = 0; + + Assert(remote_lsn); + + /* + * The non-removable transaction ID for a subscription is centrally + * managed by the main apply worker. + */ + if (!am_leader_apply_worker()) + return; + + if (*phase == DTR_WAIT_FOR_PUBLISHER_STATUS) + { + FullTransactionId remote_full_xid; + + Assert(xid_advance_attempt_time); + + /* + * Return if we have requested but not yet received the remote WAL + * position. + */ + if (XLogRecPtrIsInvalid(*remote_lsn)) + return; + + if (!FullTransactionIdIsValid(next_phase_at)) + next_phase_at = FullTransactionIdFromEpochAndXid(remote_epoch, + remote_nextxid); + + /* Compute the epoch of the remote oldest running transaction ID */ + if (remote_oldestxid > remote_nextxid) + remote_epoch--; + + remote_full_xid = FullTransactionIdFromEpochAndXid(remote_epoch, + remote_oldestxid); + + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to + * the next phase; otherwise, continue checking the publisher status + * until these transactions finish. + * + * Do not return here because the apply worker might have already + * applied all changes up to remote_lsn, or enough time might have + * passed. Instead, proceed to the next phase to check if we can + * immediately advance the transaction ID or send one more request. + */ + if (FullTransactionIdPrecedesOrEquals(next_phase_at, remote_full_xid)) + *phase = DTR_WAIT_FOR_LOCAL_FLUSH; + else + *phase = DTR_REQUEST_PUBLISHER_STATUS; + } + + if (*phase == DTR_WAIT_FOR_LOCAL_FLUSH) + { + Assert(!XLogRecPtrIsInvalid(*remote_lsn) && + FullTransactionIdIsValid(candidate_xid)); + + /* + * Do not attempt to advance the non-removable transaction ID when + * table sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers + * corresponding to the target tables. In this case, confirming the + * apply and flush progress across all table sync workers is complex + * and not worth the effort. + */ + if (!AllTablesyncsReady()) + return; + + /* Return to wait for the changes to be applied */ + if (last_flushpos < *remote_lsn) + return; + + /* + * Reaching here means the remote WAL position has been received, and + * all transactions up to that position on the publisher have been + * applied and flushed locally. So, now we can advance the + * non-removable transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* + * Do not return here as enough time might have passed since the last + * transaction ID advancement. Instead, proceed to the next phase to + * check if we can get the next candidate transaction ID. + */ + *phase = DTR_GET_CANDIDATE_XID; + } + + Assert(*phase == DTR_GET_CANDIDATE_XID || + *phase == DTR_REQUEST_PUBLISHER_STATUS); + + /* + * Exit early if the user has disabled sending messages to the + * publisher. + */ + if (wal_receiver_status_interval <= 0) + return; + + now = GetCurrentTimestamp(); + + /* + * Compute the candidate_xid and send a message at most once per + * wal_receiver_status_interval. + */ + if (!TimestampDifferenceExceeds(xid_advance_attempt_time, now, + wal_receiver_status_interval * 1000)) + return; + + xid_advance_attempt_time = now; + + if (*phase == DTR_GET_CANDIDATE_XID) + { + TransactionId oldest_running_xid; + FullTransactionId next_full_xid; + FullTransactionId full_xid; + uint32 epoch; + + oldest_running_xid = GetOldestActiveTransactionId(); + next_full_xid = ReadNextFullTransactionId(); + epoch = EpochFromFullTransactionId(next_full_xid); + + /* Compute the epoch of the oldestRunningXid */ + if (oldest_running_xid > XidFromFullTransactionId(next_full_xid)) + epoch--; + + full_xid = FullTransactionIdFromEpochAndXid(epoch, oldest_running_xid); + + /* Return if the new transaction ID is unchanged */ + if (FullTransactionIdFollowsOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + full_xid)) + return; + + candidate_xid = full_xid; + next_phase_at = InvalidFullTransactionId; + + /* + * Do not return here to ensure an immediate request message is sent + * in the next phase. + */ + *phase = DTR_REQUEST_PUBLISHER_STATUS; + } + + if (*phase == DTR_REQUEST_PUBLISHER_STATUS) + { + if (!reply_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + reply_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(reply_message); + + pq_sendbyte(reply_message, 'S'); + pq_sendint64(reply_message, now); + + elog(DEBUG2, "sending publisher status request message"); + + /* Send a WAL position request message to the server */ + walrcv_send(LogRepWorkerWalRcvConn, + reply_message->data, reply_message->len); + + *remote_lsn = InvalidXLogRecPtr; + *phase = DTR_WAIT_FOR_PUBLISHER_STATUS; + } +} + /* * Exit routine for apply workers due to subscription parameter changes. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 371eef3ddd..63eed24c5a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -83,6 +83,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -253,6 +254,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessStandbyPSRequestMessage(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); @@ -2314,6 +2316,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'S': + ProcessStandbyPSRequestMessage(); + break; + default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2660,6 +2666,57 @@ ProcessStandbyHSFeedbackMessage(void) } } +/* + * Process the request for a primary status update message. + */ +static void +ProcessStandbyPSRequestMessage(void) +{ + XLogRecPtr lsn = InvalidXLogRecPtr; + TransactionId oldestRunningXid = InvalidTransactionId; + FullTransactionId nextFullXid = InvalidFullTransactionId; + WalSnd *walsnd = MyWalSnd; + TimestampTz replyTime; + + replyTime = pq_getmsgint64(&reply_message); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + + /* + * Information about running transactions and the WAL write position is + * only available on a non-standby server. + * + * XXX: We could consider forwarding the request to the primary server if + * the current server is a standby. + */ + if (!RecoveryInProgress()) + { + oldestRunningXid = GetOldestActiveTransactionId(); + nextFullXid = ReadNextFullTransactionId(); + lsn = GetXLogWriteRecPtr(); + } + + elog(DEBUG2, "sending primary status"); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 's'); + pq_sendint64(&output_message, lsn); + pq_sendint32(&output_message, oldestRunningXid); + pq_sendint32(&output_message, XidFromFullTransactionId(nextFullXid)); + pq_sendint32(&output_message, EpochFromFullTransactionId(nextFullXid)); + pq_sendint64(&output_message, GetCurrentTimestamp()); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock('d', output_message.data, output_message.len); +} + /* * Compute how long send/receive loops should sleep. * diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9646261d7e..370c71c93e 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,24 @@ typedef struct LogicalRepWorker /* Indicates whether apply can be performed in parallel. */ bool parallel_apply; + /* + * The changes made by this and later transactions are still non-removable + * to allow for the detection of update_delete conflicts when applying + * changes in this logical replication worker. + * + * Note that this info cannot directly protect dead tuples from being + * prematurely frozen or removed. The logical replication launcher + * asynchronously collects this info to determine whether to advance the + * xmin value of the replication slot. + * + * Therefore, FullTransactionId that includes both the transaction ID and + * its epoch is used here instead of a single Transaction ID. This is + * critical because without considering the epoch, the transaction ID + * alone may appear as if it is in the future due to transaction ID + * wraparound. + */ + FullTransactionId oldest_nonremovable_xid; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1847bbfa95..d24cc956f0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -591,6 +591,7 @@ DbInfoArr DbLocaleInfo DeClonePtrType DeadLockState +DeadTupleRetainPhase DeallocateStmt DeclareCursorStmt DecodedBkpBlock -- 2.30.0.windows.2