From 1b809742f1de7a87e159ac8b4f120e66ed7172ce Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 20 Mar 2025 18:55:11 +0800 Subject: [PATCH 2/3] refactor code --- src/backend/replication/logical/conflict.c | 71 +++++++++------------- src/backend/replication/logical/worker.c | 55 ++++++++++++----- 2 files changed, 68 insertions(+), 58 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 84ef4648747..9beded483d4 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -111,59 +111,36 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, List *conflictindexes, List *localxmins, List *localorigins, List *localts) { - int conflictNum = 0; Relation localrel = relinfo->ri_RelationDesc; - ListCell *slotlc = list_head(conflictSlots); StringInfoData err_detail; + ListCell *lslot; + ListCell *lindex; + ListCell *lxmin; + ListCell *lorigin; + ListCell *lts; initStringInfo(&err_detail); - do + /* + * Iterate over conflicting tuples, along with their commit timestamps, + * origins, and the conflicting indexes to assemble an errdetail() line. + */ + forfive(lslot, conflictslots, lindex, conflictindexes, lxmin, localxmins, + lorigin, localorigins, lts, localts) { - Oid indexoid = InvalidOid; - TimestampTz committs = 0; - RepOriginId origin = InvalidRepOriginId; - TransactionId xmin = InvalidTransactionId; - TupleTableSlot *slot = NULL; - - if (slotlc) - { - Assert(localxmins && localorigins && localts); - - slot = lfirst(slotlc); - origin = list_nth_int(localorigins, conflictNum); - xmin = lfirst_xid(list_nth_cell(localxmins, conflictNum)); - committs = (TimestampTz) lfirst(list_nth_cell(localts, conflictNum)); - - slotlc = lnext(conflictSlots, slotlc); - } - - if (conflictIndexes) - indexoid = list_nth_oid(conflictIndexes, conflictNum); - - Assert(!OidIsValid(indexoid) || - CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); - - /* - * Build the error detail message containing the conflicting key and - * tuple information. The details for each conflict will be appended - * to err_detail. - */ errdetail_apply_conflict(estate, relinfo, type, searchslot, - slot, remoteslot, indexoid, - xmin, origin, committs, &err_detail); - - conflictNum++; - - } while (slotlc); + lfirst(lslot), remoteslot, + lfirst_oid(lindex), + lfirst_xid(lxmin), + lfirst_int(lorigin), + (TimestampTz) lfirst(lts), + &err_detail); + } /* Conflict stats are not gathered for multiple_unique_conflicts */ if (type != CT_MULTIPLE_UNIQUE_CONFLICTS) pgstat_report_subscription_conflict(MySubscription->oid, type); - /* Remove the extra newline at the end of err_detail */ - err_detail.data[err_detail.len - 1] = '\0'; - ereport(elevel, errcode_apply_conflict(type), errmsg("conflict detected on relation \"%s.%s\": conflict=%s", @@ -257,7 +234,8 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, case CT_INSERT_EXISTS: case CT_UPDATE_EXISTS: case CT_MULTIPLE_UNIQUE_CONFLICTS: - Assert(OidIsValid(indexoid)); + Assert(OidIsValid(indexoid) && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); if (localts) { @@ -339,7 +317,14 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, if (val_desc) appendStringInfo(&err_detail, "\n%s", val_desc); - appendStringInfo(err_msg, "%s\n", err_detail.data); + /* + * Insert a blank line to visually separate the new detail line from the + * existing ones. + */ + if (err_msg->len > 0) + appendStringInfoChar(err_msg, '\n'); + + appendStringInfo(err_msg, "%s", err_detail.data); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4a595e7906f..c19c4f938db 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2674,7 +2674,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, LogicalRepRelMapEntry *relmapentry = edata->targetRel; Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; - TupleTableSlot *localslot; + TupleTableSlot *localslot = NULL; + Oid conflictindex = InvalidOid; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxmin = InvalidTransactionId; + TimestampTz localts = 0; bool found; MemoryContext oldctx; @@ -2693,10 +2697,6 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. @@ -2712,7 +2712,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot, list_make1(localslot), newslot, - NULL, list_make1_xid(localxmin), + list_make1_oid(conflictindex), + list_make1_xid(localxmin), list_make1_int(localorigin), list_make1(DatumGetPointer(Int64GetDatum(localts)))); } @@ -2735,6 +2736,8 @@ apply_handle_update_internal(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; + localslot = NULL; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); @@ -2743,8 +2746,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, NULL, newslot, NULL, - NULL, NULL, NULL); + remoteslot, list_make1(localslot), newslot, + list_make1_oid(conflictindex), + list_make1_xid(localxmin), + list_make1_int(localorigin), + list_make1(DatumGetPointer(Int64GetDatum(localts)))); } /* Cleanup. */ @@ -2862,6 +2868,10 @@ apply_handle_delete_internal(ApplyExecutionData *edata, LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; + Oid conflictindex = InvalidOid; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxmin = InvalidTransactionId; + TimestampTz localts = 0; bool found; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); @@ -2889,7 +2899,8 @@ apply_handle_delete_internal(ApplyExecutionData *edata, localorigin != replorigin_session_origin) ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, remoteslot, list_make1(localslot), NULL, - NULL, list_make1_xid(localxmin), + list_make1_oid(conflictindex), + list_make1_xid(localxmin), list_make1_int(localorigin), list_make1(DatumGetPointer(Int64GetDatum(localts)))); @@ -2901,12 +2912,18 @@ apply_handle_delete_internal(ApplyExecutionData *edata, } else { + localslot = NULL; + /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, - remoteslot, NULL, NULL, NULL, NULL, NULL, NULL); + remoteslot, list_make1(localslot), NULL, + list_make1_oid(conflictindex), + list_make1_xid(localxmin), + list_make1_int(localorigin), + list_make1(DatumGetPointer(Int64GetDatum(localts)))); } /* Cleanup. */ @@ -3074,9 +3091,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; + RepOriginId localorigin = InvalidRepOriginId; + TransactionId localxmin = InvalidTransactionId; + TimestampTz localts = 0; + Oid conflictindex = InvalidOid; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3087,6 +3105,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; + localslot = NULL; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); @@ -3096,7 +3116,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, */ ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_MISSING, remoteslot_part, - NULL, newslot, NULL, NULL, NULL, NULL); + list_make1(localslot), newslot, + list_make1_oid(conflictindex), + list_make1_xid(localxmin), + list_make1_int(localorigin), + list_make1(DatumGetPointer(Int64GetDatum(localts)))); return; } @@ -3116,7 +3140,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot_part, list_make1(localslot), - newslot, NULL, list_make1_xid(localxmin), + newslot, list_make1_oid(conflictindex), + list_make1_xid(localxmin), list_make1_int(localorigin), list_make1(DatumGetPointer(Int64GetDatum(localts)))); } -- 2.30.0.windows.2