diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 97f4d9fcba..06ba6d3a64 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3736,9 +3736,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Attempt to advance the non-removable transaction ID - * during change application to prevent it from - * remaining unchanged for long periods when the worker - * is busy. + * to avoid accumulating dead rows when the worker is + * busy. */ if (can_advance_nonremovable_xid(&data, last_recv_timestamp)) maybe_advance_nonremovable_xid(&data); @@ -3770,6 +3769,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) data.remote_epoch = pq_getmsgint(&s, 4); data.reply_time = pq_getmsgint64(&s); + /* + * This should never happen, see + * ProcessStandbyPSRequestMessage. But if it happens + * due to a bug, we don't want to proceed as it can + * incorrectly advance oldest_nonremovable_xid. + */ if (XLogRecPtrIsInvalid(data.remote_lsn)) elog(ERROR, "cannot get the latest WAL position from the publisher"); @@ -4164,8 +4169,8 @@ request_publisher_status(RetainConflictInfoData *data) data->phase = RCI_WAIT_FOR_PUBLISHER_STATUS; /* - * Skip calling maybe_advance_nonremovable_xid() since further actions - * cannot proceed until the publisher status is received. + * Skip calling maybe_advance_nonremovable_xid() since further transition + * is possible only once we receive the publisher status message. */ } @@ -4244,7 +4249,14 @@ wait_for_local_flush(RetainConflictInfoData *data) if (!AllTablesyncsReady()) return; - /* Return to wait for the changes to be applied */ + /* + * Return to wait for the changes to be applied. + * + * XXX The remote flush location (last_flushpos) is updated only when + * feedback is sent to the server. So, the advancement of + * oldest_nonremovable_xid may be delayed. We can always update + * last_flushpos here if we notice such a delay. + */ if (last_flushpos < data->remote_lsn) return; @@ -4268,7 +4280,7 @@ wait_for_local_flush(RetainConflictInfoData *data) } /* - * Determine if the next round of transaction ID advancement can be attempted. + * Determine if we can attempt to advance transaction ID. * * TODO: The remote flush location (last_flushpos) is currently not updated * during change application, making it impossible to satisfy the condition of