From 23bb871f617e7085b5ea869e2a83dcb6d38c91aa Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 17 Mar 2025 11:25:49 +0900 Subject: [PATCH v20_REL_13 2/2] Backpatch introducing invalidation messages in ReorderBufferChangeType --- .../replication/logical/reorderbuffer.c | 72 +++++++++++++++++-- src/include/replication/reorderbuffer.h | 16 ++++- 2 files changed, 80 insertions(+), 8 deletions(-) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index fa9413fa2a0..697b45675a6 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -220,7 +220,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state); -static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); /* * --------------------------------------- @@ -484,6 +484,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + if (change->data.inval.invalidations) + pfree(change->data.inval.invalidations); + change->data.inval.invalidations = NULL; + break; } pfree(change); @@ -1883,7 +1888,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * see new catalog contents, so execute all * invalidations. */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); } break; @@ -1891,6 +1897,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + /* Execute the invalidation messages locally */ + ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, + change->data.inval.invalidations); } } @@ -1921,7 +1931,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -1947,7 +1957,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, AbortCurrentTransaction(); /* make sure there's no cache pollution */ - ReorderBufferExecuteInvalidations(rb, txn); + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); if (using_subtxn) RollbackAndReleaseCurrentSubTransaction(); @@ -2265,6 +2275,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, { ReorderBufferTXN *txn; MemoryContext oldcontext; + ReorderBufferChange *change; txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); @@ -2302,6 +2313,16 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, txn->ninvalidations += nmsgs; } + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change); + MemoryContextSwitchTo(oldcontext); } @@ -2310,12 +2331,12 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, * in the changestream but we don't know which those are. */ static void -ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn) +ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs) { int i; - for (i = 0; i < txn->ninvalidations; i++) - LocalExecuteInvalidationMessage(&txn->invalidations[i]); + for (i = 0; i < nmsgs; i++) + LocalExecuteInvalidationMessage(&msgs[i]); } /* @@ -2725,6 +2746,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + char *data; + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + sz += inval_size; + + ReorderBufferSerializeReserve(rb, sz); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + memcpy(data, change->data.inval.invalidations, inval_size); + data += inval_size; + + break; + } } ondisk->size = sz; @@ -2833,6 +2872,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + sz += sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + break; + } } return sz; @@ -3120,6 +3165,19 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + change->data.inval.invalidations = + MemoryContextAlloc(rb->context, inval_size); + + /* read the message */ + memcpy(change->data.inval.invalidations, data, inval_size); + + break; + } } dlist_push_tail(&txn->changes, &change->node); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 545cee891ed..dff58a2fd8f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -63,7 +63,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_TRUNCATE, - REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT + REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, + REORDER_BUFFER_CHANGE_INVALIDATION }; /* forward declaration */ @@ -150,6 +151,13 @@ typedef struct ReorderBufferChange CommandId cmax; CommandId combocid; } tuplecid; + + /* Invalidation. */ + struct + { + uint32 ninvalidations; /* Number of messages */ + SharedInvalidationMessage *invalidations; /* invalidation message */ + } inval; } data; /* @@ -467,6 +475,12 @@ uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs); +void ReorderBufferAddInvalidationsForDistribute(ReorderBuffer *, + TransactionId, + XLogRecPtr lsn, + Size nmsgs, + SharedInvalidationMessage *msgs); + void StartupReorderBuffer(void); #endif -- 2.43.5