diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d5772a6b22..090aae126b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -391,8 +391,9 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; static BufFile *stream_fd = NULL; /* - * The remote WAL position that has been applied and flushed locally. Refer to - * send_feedback() for details on its usage. + * The remote WAL position that has been applied and flushed locally. We + * record this information while sending feedback to the server and use this + * both while sending feedback and advancing oldest_nonremovable_xid. */ static XLogRecPtr last_flushpos = InvalidXLogRecPtr; @@ -3849,7 +3850,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) wait_time = NAPTIME_PER_CYCLE; /* - * Ensure to wake up when it's possible to attempt advancing the + * Ensure to wake up when it's possible to attempt to advance the * non-removable transaction ID. */ if (data.phase == RCI_GET_CANDIDATE_XID && data.xid_advancement_interval) @@ -4066,7 +4067,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) * * The overall state progression is: GET_CANDIDATE_XID -> * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to - * REQUEST_PUBLISHER_STATUS if concurrent remote transactions persist) -> + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. * * Retaining the dead tuples for this period is sufficient for ensuring @@ -4184,11 +4185,14 @@ get_candidate_xid(RetainConflictInfoData *data) /* * Adjust the interval for advancing non-removable transaction IDs. * - * If no new transaction ID has been assigned since the last advancement, the - * interval is doubled. This increase is limited by the - * wal_receiver_status_interval if it is not zero, or otherwise restricted to a - * maximum of 3 minutes. If a new transaction ID is detected, the interval is - * reset to a minimum of 100ms. + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. */ static void adjust_xid_advancement_interval(RetainConflictInfoData *data, bool new_xid_found) @@ -4200,8 +4204,8 @@ adjust_xid_advancement_interval(RetainConflictInfoData *data, bool new_xid_found : MAX_XID_ADVANCEMENT_INTERVAL; /* - * No new transaction ID assigned since the last check, so double the - * interval, but not beyond the maximum allowable value. + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. */ data->xid_advancement_interval = Min(data->xid_advancement_interval * 2, max_interval); @@ -4331,11 +4335,8 @@ wait_for_local_flush(RetainConflictInfoData *data) * effort. * * It is safe to add new tables with initial states to the subscription - * after this check because WAL positions of changes from these new - * tables, which will be applied, should be greater than remote_lsn and - * are included in transactions with later commit timestamps. So, there is - * no need to wait for these changes to be applied in this round of - * advancement. + * after this check because any changes applied to these tables should have + * a WAL position greater than the data->remote_lsn. */ if (!AllTablesyncsReady()) return; @@ -4354,8 +4355,7 @@ wait_for_local_flush(RetainConflictInfoData *data) /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and - * flushed locally. So, now we can advance the non-removable transaction - * ID. + * flushed locally. So, we can advance the non-removable transaction ID. */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 1eab8a5e46..22cdd0a591 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -87,20 +87,18 @@ typedef struct LogicalRepWorker bool parallel_apply; /* - * The changes made by this and later transactions are still non-removable - * to allow for the detection of update_deleted conflicts when applying + * The changes made by this and later transactions shouldn't be removed. + * This allows the detection of update_deleted conflicts when applying * changes in this logical replication worker. * * Note that this info cannot directly protect dead tuples from being * prematurely frozen or removed. The logical replication launcher * asynchronously collects this info to determine whether to advance the - * xmin value of the replication slot. + * xmin value of its replication slot. * - * Therefore, FullTransactionId that includes both the transaction ID and - * its epoch is used here instead of a single Transaction ID. This is - * critical because without considering the epoch, the transaction ID - * alone may appear as if it is in the future due to transaction ID - * wraparound. + * We need to use FullTransactionId here because without considering the + * epoch, the transaction ID alone may appear as if it is in the future due + * to the transaction ID wraparound. */ FullTransactionId oldest_nonremovable_xid;