From cf93ea5272c3ea225af1a52b979a21a1bcb00c95 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 20 Mar 2025 19:20:17 +0800 Subject: [PATCH 3/3] add a struct --- src/backend/executor/execReplication.c | 33 +++----- src/backend/replication/logical/conflict.c | 17 ++--- src/backend/replication/logical/worker.c | 87 ++++++++-------------- src/include/replication/conflict.h | 17 ++++- 4 files changed, 59 insertions(+), 95 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index a2a0ab90ab0..bf352f69e2f 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -493,12 +493,7 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot) { - int conflicts = 0; - List *conflictslots = NIL; - List *conflictindexes = NIL; - List *localxmins = NIL; - List *localorigins = NIL; - List *localts = NIL; + List *conflicttuples = NIL; TupleTableSlot *conflictslot; /* Check all the unique indexes for conflicts */ @@ -508,32 +503,22 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot, &conflictslot)) { - RepOriginId origin; - TimestampTz committs; - TransactionId xmin; + ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo); - GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs); + conflicttuple->slot = conflictslot; - /* - * Add the conflict slot, index, and the transaction info to the - * respective lists. - */ - conflictslots = lappend(conflictslots, conflictslot); - conflictindexes = lappend_oid(conflictindexes, uniqueidx); - localxmins = lappend_xid(localxmins, xmin); - localorigins = lappend_int(localorigins, origin); - localts = lappend(localts, DatumGetPointer(Int64GetDatum(committs))); + GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin, + &conflicttuple->origin, &conflicttuple->ts); - conflicts++; + conflicttuples = lappend(conflicttuples, conflicttuple); } } /* Report the conflict if found */ - if (conflicts) + if (conflicttuples) ReportApplyConflict(estate, resultRelInfo, ERROR, - conflicts > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type, - searchslot, conflictslots, remoteslot, - conflictindexes, localxmins, localorigins, localts); + list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type, + searchslot, remoteslot, conflicttuples); } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 9beded483d4..10f07e18a4b 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -107,9 +107,7 @@ GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - List *conflictslots, TupleTableSlot *remoteslot, - List *conflictindexes, List *localxmins, - List *localorigins, List *localts) + TupleTableSlot *remoteslot, List *conflicttuples) { Relation localrel = relinfo->ri_RelationDesc; StringInfoData err_detail; @@ -125,15 +123,14 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, * Iterate over conflicting tuples, along with their commit timestamps, * origins, and the conflicting indexes to assemble an errdetail() line. */ - forfive(lslot, conflictslots, lindex, conflictindexes, lxmin, localxmins, - lorigin, localorigins, lts, localts) + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { errdetail_apply_conflict(estate, relinfo, type, searchslot, - lfirst(lslot), remoteslot, - lfirst_oid(lindex), - lfirst_xid(lxmin), - lfirst_int(lorigin), - (TimestampTz) lfirst(lts), + conflicttuple->slot, remoteslot, + conflicttuple->indexoid, + conflicttuple->xmin, + conflicttuple->origin, + conflicttuple->ts, &err_detail); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c19c4f938db..058c353f992 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2675,10 +2675,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, Relation localrel = relinfo->ri_RelationDesc; EPQState epqstate; TupleTableSlot *localslot = NULL; - Oid conflictindex = InvalidOid; - RepOriginId localorigin = InvalidRepOriginId; - TransactionId localxmin = InvalidTransactionId; - TimestampTz localts = 0; + ConflictTupleInfo conflicttuple = {0}; bool found; MemoryContext oldctx; @@ -2701,8 +2698,9 @@ apply_handle_update_internal(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -2710,12 +2708,11 @@ apply_handle_update_internal(ApplyExecutionData *edata, newslot = table_slot_create(localrel, &estate->es_tupleTable); slot_store_data(newslot, relmapentry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot, list_make1(localslot), newslot, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, newslot, + list_make1(&conflicttuple)); } /* Process and store remote tuple in the slot */ @@ -2736,8 +2733,6 @@ apply_handle_update_internal(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; - localslot = NULL; - /* Store the new tuple for conflict reporting */ slot_store_data(newslot, relmapentry, newtup); @@ -2746,11 +2741,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING, - remoteslot, list_make1(localslot), newslot, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, newslot, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -2868,10 +2859,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, LogicalRepRelation *remoterel = &edata->targetRel->remoterel; EPQState epqstate; TupleTableSlot *localslot; - Oid conflictindex = InvalidOid; - RepOriginId localorigin = InvalidRepOriginId; - TransactionId localxmin = InvalidTransactionId; - TimestampTz localts = 0; + ConflictTupleInfo conflicttuple = {0}; bool found; EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); @@ -2887,22 +2875,19 @@ apply_handle_delete_internal(ApplyExecutionData *edata, /* If found delete it. */ if (found) { - RepOriginId localorigin; - TransactionId localxmin; - TimestampTz localts; - /* * Report the conflict if the tuple was modified by a different * origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) + { + conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, - remoteslot, list_make1(localslot), NULL, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, NULL, + list_make1(&conflicttuple)); + } EvalPlanQualSetSlot(&epqstate, localslot); @@ -2912,18 +2897,12 @@ apply_handle_delete_internal(ApplyExecutionData *edata, } else { - localslot = NULL; - /* * The tuple to be deleted could not be found. Do nothing except for * emitting a log message. */ ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_MISSING, - remoteslot, list_make1(localslot), NULL, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot, NULL, list_make1(&conflicttuple)); } /* Cleanup. */ @@ -3091,10 +3070,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, Relation partrel_new; bool found; EPQState epqstate; - RepOriginId localorigin = InvalidRepOriginId; - TransactionId localxmin = InvalidTransactionId; - TimestampTz localts = 0; - Oid conflictindex = InvalidOid; + ConflictTupleInfo conflicttuple = {0}; /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(edata, partrel, @@ -3105,8 +3081,6 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; - localslot = NULL; - /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); @@ -3116,11 +3090,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, */ ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_MISSING, remoteslot_part, - list_make1(localslot), newslot, - list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + newslot, list_make1(&conflicttuple)); return; } @@ -3129,8 +3099,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * Report the conflict if the tuple was modified by a * different origin. */ - if (GetTupleTransactionInfo(localslot, &localxmin, &localorigin, &localts) && - localorigin != replorigin_session_origin) + if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, + &conflicttuple.origin, + &conflicttuple.ts) && + conflicttuple.origin != replorigin_session_origin) { TupleTableSlot *newslot; @@ -3138,12 +3110,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, newslot = table_slot_create(partrel, &estate->es_tupleTable); slot_store_data(newslot, part_entry, newtup); + conflicttuple.slot = localslot; + ReportApplyConflict(estate, partrelinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, - remoteslot_part, list_make1(localslot), - newslot, list_make1_oid(conflictindex), - list_make1_xid(localxmin), - list_make1_int(localorigin), - list_make1(DatumGetPointer(Int64GetDatum(localts)))); + remoteslot_part, newslot, + list_make1(&conflicttuple)); } /* diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index f32c3266719..976f1708095 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -53,6 +53,19 @@ typedef enum #define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1) + +/* + * Information for the exiting local tuple that caused the conflict. + */ +typedef struct ConflictTupleInfo +{ + TupleTableSlot *slot; + Oid indexoid; /* conflicting index */ + TransactionId xmin; /* transaction ID that modified the existing local tuple */ + RepOriginId origin; /* which origin modified the exiting local tuple */ + TimestampTz ts; /* when the exiting local tuple was modified by the origin */ +} ConflictTupleInfo; + extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, @@ -60,9 +73,7 @@ extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, - List *conflictslots, TupleTableSlot *remoteslot, - List *conflictindexes, List *localxmins, - List *localorigins, List *localts); + List *conflicttuples); extern void InitConflictIndexes(ResultRelInfo *relInfo); #endif -- 2.30.0.windows.2