diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 06ba6d3a64..e89e811c51 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4135,6 +4135,7 @@ get_candidate_xid(RetainConflictInfoData *data) data->last_phase_at = InvalidFullTransactionId; data->phase = RCI_REQUEST_PUBLISHER_STATUS; + /* process the next phase */ maybe_advance_nonremovable_xid(data); } @@ -4156,6 +4157,10 @@ request_publisher_status(RetainConflictInfoData *data) else resetStringInfo(request_message); + /* + * We send the current time to update the remote walsender's latest reply + * message received time. + */ pq_sendbyte(request_message, 'S'); pq_sendint64(request_message, GetCurrentTimestamp()); @@ -4213,6 +4218,7 @@ wait_for_publisher_status(RetainConflictInfoData *data) else data->phase = RCI_REQUEST_PUBLISHER_STATUS; + /* process the next phase */ maybe_advance_nonremovable_xid(data); } @@ -4226,8 +4232,10 @@ wait_for_local_flush(RetainConflictInfoData *data) FullTransactionIdIsValid(data->candidate_xid)); /* - * Issue a warning if there is a detected clock skew between the publisher - * and subscriber. + * We expect the publisher and subscriber clocks to be in sync using + * time sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect update_delete conflict. * * XXX Consider waiting for the publisher's clock to catch up with the * subscriber's before proceeding to the next phase. @@ -4235,7 +4243,7 @@ wait_for_local_flush(RetainConflictInfoData *data) if (TimestampDifferenceExceeds(data->reply_time, data->candidate_xid_time, 0)) ereport(WARNING, - errmsg("non-removable transaction ID may be advanced prematurely"), + errmsg("oldest_nonremovable_xid transaction ID may be advanced prematurely"), errdetail("The clock on the publisher is behind that of the subscriber.")); /* @@ -4276,6 +4284,7 @@ wait_for_local_flush(RetainConflictInfoData *data) data->phase = RCI_GET_CANDIDATE_XID; + /* process the next phase */ maybe_advance_nonremovable_xid(data); }