From 39d899a8ce9a38bca6439ac36158e52bbdc088ab Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Fri, 11 Apr 2025 11:13:14 +0200 Subject: [PATCH 4/7] Add CONCURRENTLY option to REPACK command. The REPACK command copies the relation data into a new file, creates new indexes and eventually swaps the files. To make sure that the old file does not change during the copying, the relation is locked in an exclusive mode, which prevents applications from both reading and writing. (To keep the data consistent, we'd only need to prevent the applications from writing, but even reading needs to be blocked before we can swap the files - otherwise some applications could continue using the old file. Since we should not request a stronger lock without releasing the weaker one first, we acquire the exclusive lock in the beginning and keep it till the end of the processing.) This patch introduces an alternative workflow, which only requires the exclusive lock when the relation (and index) files are being swapped. (Supposedly, the swapping should be pretty fast.) On the other hand, when we copy the data to the new file, we allow applications to read from the relation and even to write to it. First, we scan the relation using a "historic snapshot", and insert all the tuples satisfying this snapshot into the new file. Second, logical decoding is used to capture the data changes done by applications during the copying (i.e. changes that do not satisfy the historic snapshot mentioned above), and those are applied to the new file before we acquire the exclusive lock that we need to swap the files. (Of course, more data changes can take place while we are waiting for the lock - these will be applied to the new file after we have acquired the lock, before we swap the files.) Since the logical decoding system, during its startup, waits until all the transactions which already have XID assigned have finished, there is a risk of deadlock if a transaction that already changed anything in the database tries to acquire a conflicting lock on the table REPACK CONCURRENTLY is working on. As an example, consider transaction running CREATE INDEX command on the table that is being REPACKed CONCURRENTLY. On the other hand, DML commands (INSERT, UPDATE, DELETE) are not a problem as their lock does not conflict with REPACK CONCURRENTLY. The current approach is that we accept the risk. If we tried to avoid it, it'd be necessary to unlock the table before the logical decoding is setup and lock it again afterwards. Such temporary unlocking would imply re-checking if the table still meets all the requirements for REPACK CONCURRENTLY. Like the existing implementation of REPACK, the variant with the CONCURRENTLY option also requires an extra space for the new relation and index files (which coexist with the old files for some time). In addition, the CONCURRENTLY option might introduce a lag in releasing WAL segments for archiving / recycling. This is due to the decoding of the data changes done by applications concurrently. When copying the table contents into the new file, we check the lag periodically. If it exceeds the size of a WAL segment, we decode all the available WAL before resuming the copying. (Of course, the changes are not applied until the whole table contents is copied.) A background worker might be a better approach for the decoding - let's consider implementing it in the future. The WAL records produced by running DML commands on the new relation do not contain enough information to be processed by the logical decoding system. All we need from the new relation is the file (relfilenode), while the actual relation is eventually dropped. Thus there is no point in replaying the DMLs anywhere. --- doc/src/sgml/monitoring.sgml | 37 +- doc/src/sgml/mvcc.sgml | 12 +- doc/src/sgml/ref/repack.sgml | 134 +- src/Makefile | 1 + src/backend/access/heap/heapam.c | 34 +- src/backend/access/heap/heapam_handler.c | 215 +- src/backend/access/heap/rewriteheap.c | 6 +- src/backend/access/transam/xact.c | 11 +- src/backend/catalog/index.c | 43 +- src/backend/catalog/system_views.sql | 30 +- src/backend/commands/cluster.c | 1895 +++++++++++++++-- src/backend/commands/matview.c | 2 +- src/backend/commands/tablecmds.c | 1 + src/backend/commands/vacuum.c | 12 +- src/backend/meson.build | 1 + src/backend/parser/gram.y | 15 +- src/backend/replication/logical/decode.c | 83 + src/backend/replication/logical/snapbuild.c | 20 + .../replication/pgoutput_repack/Makefile | 32 + .../replication/pgoutput_repack/meson.build | 18 + .../pgoutput_repack/pgoutput_repack.c | 288 +++ src/backend/storage/ipc/ipci.c | 1 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/cache/relcache.c | 1 + src/backend/utils/time/snapmgr.c | 3 +- src/bin/psql/tab-complete.in.c | 25 +- src/include/access/heapam.h | 9 +- src/include/access/heapam_xlog.h | 2 + src/include/access/tableam.h | 10 + src/include/catalog/index.h | 3 + src/include/commands/cluster.h | 87 +- src/include/commands/progress.h | 23 +- src/include/nodes/parsenodes.h | 1 + src/include/replication/snapbuild.h | 1 + src/include/storage/lockdefs.h | 4 +- src/include/storage/lwlocklist.h | 1 + src/include/utils/snapmgr.h | 2 + src/test/regress/expected/rules.out | 29 +- src/tools/pgindent/typedefs.list | 4 + 39 files changed, 2767 insertions(+), 330 deletions(-) create mode 100644 src/backend/replication/pgoutput_repack/Makefile create mode 100644 src/backend/replication/pgoutput_repack/meson.build create mode 100644 src/backend/replication/pgoutput_repack/pgoutput_repack.c diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 9f1432c1ae6..8a9d16dcc5b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6051,14 +6051,35 @@ FROM pg_stat_get_backend_idset() AS backendid; - heap_tuples_written bigint + heap_tuples_inserted bigint - Number of heap tuples written. + Number of heap tuples inserted. This counter only advances when the phase is seq scanning heap, - index scanning heap - or writing new heap. + index scanning heap, + writing new heap + or catch-up. + + + + + + heap_tuples_updated bigint + + + Number of heap tuples updated. + This counter only advances when the phase is catch-up. + + + + + + heap_tuples_deleted bigint + + + Number of heap tuples deleted. + This counter only advances when the phase is catch-up. @@ -6139,6 +6160,14 @@ FROM pg_stat_get_backend_idset() AS backendid; REPACK is currently writing the new heap. + + catch-up + + REPACK CONCURRENTLY is currently processing the DML + commands that other transactions executed during any of the preceding + phase. + + swapping relation files diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml index 049ee75a4ba..0f5c34af542 100644 --- a/doc/src/sgml/mvcc.sgml +++ b/doc/src/sgml/mvcc.sgml @@ -1833,15 +1833,17 @@ SELECT pg_advisory_lock(q.id) FROM Caveats - Some DDL commands, currently only TRUNCATE and the - table-rewriting forms of ALTER TABLE, are not + Some commands, currently only TRUNCATE, the + table-rewriting forms of ALTER + TABLE and REPACK with + the CONCURRENTLY option, are not MVCC-safe. This means that after the truncation or rewrite commits, the table will appear empty to concurrent transactions, if they are using a - snapshot taken before the DDL command committed. This will only be an + snapshot taken before the command committed. This will only be an issue for a transaction that did not access the table in question - before the DDL command started — any transaction that has done so + before the command started — any transaction that has done so would hold at least an ACCESS SHARE table lock, - which would block the DDL command until that transaction completes. + which would block the truncating or rewriting command until that transaction completes. So these commands will not cause any apparent inconsistency in the table contents for successive queries on the target table, but they could cause visible inconsistency between the contents of the target diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml index c74a5023a54..c837e4614f3 100644 --- a/doc/src/sgml/ref/repack.sgml +++ b/doc/src/sgml/ref/repack.sgml @@ -22,8 +22,10 @@ PostgreSQL documentation REPACK [ ( option [, ...] ) ] -[ table_name [ USING INDEX -index_name ] ] +[ table_name [ USING INDEX index_name ] ] +REPACK [ ( option [, ...] ) ] +CONCURRENTLY table_name [ USING +INDEX index_name ] where option can be one of: @@ -50,7 +52,8 @@ REPACK [ ( option [, ...] ) ] processes every table and materialized view in the current database that the current user has the MAINTAIN privilege on. This form of REPACK cannot be executed inside a transaction - block. + block. Also, this form is not allowed if + the CONCURRENTLY option is used. @@ -63,7 +66,8 @@ REPACK [ ( option [, ...] ) ] When a table is being repacked, an ACCESS EXCLUSIVE lock is acquired on it. This prevents any other database operations (both reads and writes) from operating on the table until the REPACK - is finished. + is finished. If you want to keep the table accessible during the repacking, + consider using the CONCURRENTLY option. @@ -162,6 +166,128 @@ REPACK [ ( option [, ...] ) ] + + CONCURRENTLY + + + Allow other transactions to use the table while it is being repacked. + + + + Internally, REPACK copies the contents of the table + (ignoring dead tuples) into a new file, sorted by the specified index, + and also creates a new file for each index. Then it swaps the old and + new files for the table and all the indexes, and deletes the old + files. The ACCESS EXCLUSIVE lock is needed to make + sure that the old files do not change during the processing because the + changes would get lost due to the swap. + + + + With the CONCURRENTLY option, the ACCESS + EXCLUSIVE lock is only acquired to swap the table and index + files. The data changes that took place during the creation of the new + table and index files are captured using logical decoding + () and applied before + the ACCESS EXCLUSIVE lock is requested. Thus the lock + is typically held only for the time needed to swap the files, which + should be pretty short. However, the time might still be noticeable if + too many data changes have been done to the table while + REPACK was waiting for the lock: those changes must + be processed just before the files are swapped, while the + ACCESS EXCLUSIVE lock is being held. + + + + Note that REPACK with the + the CONCURRENTLY option does not try to order the + rows inserted into the table after the repacking started. Also + note REPACK might fail to complete due to DDL + commands executed on the table by other transactions during the + repacking. + + + + + In addition to the temporary space requirements explained in + , + the CONCURRENTLY option can add to the usage of + temporary space a bit more. The reason is that other transactions can + perform DML operations which cannot be applied to the new file until + REPACK has copied all the tuples from the old + file. Thus the tuples inserted into the old file during the copying are + also stored separately in a temporary file, so they can eventually be + applied to the new file. + + + + Furthermore, the data changes performed during the copying are + extracted from write-ahead log (WAL), and + this extraction (decoding) only takes place when certain amount of WAL + has been written. Therefore, WAL removal can be delayed by this + threshold. Currently the threshold is equal to the value of + the wal_segment_size + configuration parameter. + + + + + The CONCURRENTLY option cannot be used in the + following cases: + + + + + The table is UNLOGGED. + + + + + + The table is partitioned. + + + + + + The table is a system catalog or a TOAST table. + + + + + + REPACK is executed inside a transaction block. + + + + + + The wal_level + configuration parameter is less than logical. + + + + + + The max_replication_slots + configuration parameter does not allow for creation of an additional + replication slot. + + + + + + + + REPACK with the CONCURRENTLY + option is not MVCC-safe, see for + details. + + + + + + VERBOSE diff --git a/src/Makefile b/src/Makefile index 2f31a2f20a7..b18c9a14ffa 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ interfaces \ backend/replication/libpqwalreceiver \ backend/replication/pgoutput \ + backend/replication/pgoutput_repack \ fe_utils \ bin \ pl \ diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ed2e3021799..da98aadf39f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -60,7 +60,8 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, - bool all_visible_cleared, bool new_all_visible_cleared); + bool all_visible_cleared, bool new_all_visible_cleared, + bool wal_logical); #ifdef USE_ASSERT_CHECKING static void check_lock_if_inplace_updateable_rel(Relation relation, ItemPointer otid, @@ -2744,7 +2745,7 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask) TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, bool wal_logical) { TM_Result result; TransactionId xid = GetCurrentTransactionId(); @@ -2989,7 +2990,8 @@ l1: * Compute replica identity tuple before entering the critical section so * we don't PANIC upon a memory allocation failure. */ - old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied); + old_key_tuple = wal_logical ? + ExtractReplicaIdentity(relation, &tp, true, &old_key_copied) : NULL; /* * If this is the first possibly-multixact-able operation in the current @@ -3079,6 +3081,15 @@ l1: xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY; } + /* + * Unlike UPDATE, DELETE is decoded even if there is no old key, so it + * does not help to clear both XLH_DELETE_CONTAINS_OLD_TUPLE and + * XLH_DELETE_CONTAINS_OLD_KEY. Thus we need an extra flag. TODO + * Consider not decoding tuples w/o the old tuple/key instead. + */ + if (!wal_logical) + xlrec.flags |= XLH_DELETE_NO_LOGICAL; + XLogBeginInsert(); XLogRegisterData(&xlrec, SizeOfHeapDelete); @@ -3171,7 +3182,8 @@ simple_heap_delete(Relation relation, ItemPointer tid) result = heap_delete(relation, tid, GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , - &tmfd, false /* changingPart */ ); + &tmfd, false, /* changingPart */ + true /* wal_logical */); switch (result) { case TM_SelfModified: @@ -3212,7 +3224,7 @@ TM_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - TU_UpdateIndexes *update_indexes) + TU_UpdateIndexes *update_indexes, bool wal_logical) { TM_Result result; TransactionId xid = GetCurrentTransactionId(); @@ -4103,7 +4115,8 @@ l2: newbuf, &oldtup, heaptup, old_key_tuple, all_visible_cleared, - all_visible_cleared_new); + all_visible_cleared_new, + wal_logical); if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); @@ -4461,7 +4474,8 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup, result = heap_update(relation, otid, tup, GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , - &tmfd, &lockmode, update_indexes); + &tmfd, &lockmode, update_indexes, + true /* wal_logical */); switch (result) { case TM_SelfModified: @@ -8794,7 +8808,8 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, - bool all_visible_cleared, bool new_all_visible_cleared) + bool all_visible_cleared, bool new_all_visible_cleared, + bool wal_logical) { xl_heap_update xlrec; xl_heap_header xlhdr; @@ -8805,7 +8820,8 @@ log_heap_update(Relation reln, Buffer oldbuf, suffixlen = 0; XLogRecPtr recptr; Page page = BufferGetPage(newbuf); - bool need_tuple_data = RelationIsLogicallyLogged(reln); + bool need_tuple_data = RelationIsLogicallyLogged(reln) && + wal_logical; bool init; int bufflags; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d91e66241fb..9d55004305f 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -33,6 +33,7 @@ #include "catalog/index.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "commands/cluster.h" #include "commands/progress.h" #include "executor/executor.h" #include "miscadmin.h" @@ -309,7 +310,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); + return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart, + true); } @@ -328,7 +330,7 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, tuple->t_tableOid = slot->tts_tableOid; result = heap_update(relation, otid, tuple, cid, crosscheck, wait, - tmfd, lockmode, update_indexes); + tmfd, lockmode, update_indexes, true); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); /* @@ -685,13 +687,15 @@ static void heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, double *tups_recently_dead) { - RewriteState rwstate; + RewriteState rwstate = NULL; IndexScanDesc indexScan; TableScanDesc tableScan; HeapScanDesc heapScan; @@ -705,6 +709,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, bool *isnull; BufferHeapTupleTableSlot *hslot; BlockNumber prev_cblock = InvalidBlockNumber; + bool concurrent = snapshot != NULL; + XLogRecPtr end_of_wal_prev = GetFlushRecPtr(NULL); /* Remember if it's a system catalog */ is_system_catalog = IsSystemRelation(OldHeap); @@ -720,9 +726,12 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, values = (Datum *) palloc(natts * sizeof(Datum)); isnull = (bool *) palloc(natts * sizeof(bool)); - /* Initialize the rewrite operation */ - rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff, - *multi_cutoff); + /* + * Initialize the rewrite operation. + */ + if (!concurrent) + rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, + *xid_cutoff, *multi_cutoff); /* Set up sorting if wanted */ @@ -737,6 +746,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, * Prepare to scan the OldHeap. To ensure we see recently-dead tuples * that still need to be copied, we scan with SnapshotAny and use * HeapTupleSatisfiesVacuum for the visibility test. + * + * In the CONCURRENTLY case, we do regular MVCC visibility tests, using + * the snapshot passed by the caller. */ if (OldIndex != NULL && !use_sort) { @@ -753,7 +765,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, tableScan = NULL; heapScan = NULL; - indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, 0); + indexScan = index_beginscan(OldHeap, OldIndex, + snapshot ? snapshot :SnapshotAny, + NULL, 0, 0); index_rescan(indexScan, NULL, 0, NULL, 0); } else @@ -762,7 +776,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_SEQ_SCAN_HEAP); - tableScan = table_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL); + tableScan = table_beginscan(OldHeap, + snapshot ? snapshot :SnapshotAny, + 0, (ScanKey) NULL); heapScan = (HeapScanDesc) tableScan; indexScan = NULL; @@ -785,6 +801,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, HeapTuple tuple; Buffer buf; bool isdead; + HTSV_Result vis; CHECK_FOR_INTERRUPTS(); @@ -837,70 +854,84 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, tuple = ExecFetchSlotHeapTuple(slot, false, NULL); buf = hslot->buffer; - LockBuffer(buf, BUFFER_LOCK_SHARE); - - switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf)) + /* + * Regarding CONCURRENTLY, see the comments on MVCC snapshot above. + */ + if (!concurrent) { - case HEAPTUPLE_DEAD: - /* Definitely dead */ - isdead = true; - break; - case HEAPTUPLE_RECENTLY_DEAD: - *tups_recently_dead += 1; - /* fall through */ - case HEAPTUPLE_LIVE: - /* Live or recently dead, must copy it */ - isdead = false; - break; - case HEAPTUPLE_INSERT_IN_PROGRESS: + LockBuffer(buf, BUFFER_LOCK_SHARE); - /* - * Since we hold exclusive lock on the relation, normally the - * only way to see this is if it was inserted earlier in our - * own transaction. However, it can happen in system - * catalogs, since we tend to release write lock before commit - * there. Give a warning if neither case applies; but in any - * case we had better copy it. - */ - if (!is_system_catalog && - !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data))) - elog(WARNING, "concurrent insert in progress within table \"%s\"", - RelationGetRelationName(OldHeap)); - /* treat as live */ - isdead = false; - break; - case HEAPTUPLE_DELETE_IN_PROGRESS: + switch ((vis = HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))) + { + case HEAPTUPLE_DEAD: + /* Definitely dead */ + isdead = true; + break; + case HEAPTUPLE_RECENTLY_DEAD: + *tups_recently_dead += 1; + /* fall through */ + case HEAPTUPLE_LIVE: + /* Live or recently dead, must copy it */ + isdead = false; + break; + case HEAPTUPLE_INSERT_IN_PROGRESS: /* - * Similar situation to INSERT_IN_PROGRESS case. + * As long as we hold exclusive lock on the relation, normally + * the only way to see this is if it was inserted earlier in + * our own transaction. However, it can happen in system + * catalogs, since we tend to release write lock before commit + * there. Also, there's no exclusive lock during concurrent + * processing. Give a warning if neither case applies; but in + * any case we had better copy it. */ - if (!is_system_catalog && - !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data))) - elog(WARNING, "concurrent delete in progress within table \"%s\"", - RelationGetRelationName(OldHeap)); - /* treat as recently dead */ - *tups_recently_dead += 1; - isdead = false; - break; - default: - elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result"); - isdead = false; /* keep compiler quiet */ - break; - } + if (!is_system_catalog && !concurrent && + !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data))) + elog(WARNING, "concurrent insert in progress within table \"%s\"", + RelationGetRelationName(OldHeap)); + /* treat as live */ + isdead = false; + break; + case HEAPTUPLE_DELETE_IN_PROGRESS: - LockBuffer(buf, BUFFER_LOCK_UNLOCK); + /* + * Similar situation to INSERT_IN_PROGRESS case. + */ + if (!is_system_catalog && !concurrent && + !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data))) + elog(WARNING, "concurrent delete in progress within table \"%s\"", + RelationGetRelationName(OldHeap)); + /* treat as recently dead */ + *tups_recently_dead += 1; + isdead = false; + break; + default: + elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result"); + isdead = false; /* keep compiler quiet */ + break; + } - if (isdead) - { - *tups_vacuumed += 1; - /* heap rewrite module still needs to see it... */ - if (rewrite_heap_dead_tuple(rwstate, tuple)) + if (isdead) { - /* A previous recently-dead tuple is now known dead */ *tups_vacuumed += 1; - *tups_recently_dead -= 1; + /* heap rewrite module still needs to see it... */ + if (rewrite_heap_dead_tuple(rwstate, tuple)) + { + /* A previous recently-dead tuple is now known dead */ + *tups_vacuumed += 1; + *tups_recently_dead -= 1; + } + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + continue; } - continue; + + /* + * In the concurrent case, we have a copy of the tuple, so we + * don't worry whether the source tuple will be deleted / updated + * after we release the lock. + */ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); } *num_tuples += 1; @@ -919,7 +950,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, { const int ct_index[] = { PROGRESS_REPACK_HEAP_TUPLES_SCANNED, - PROGRESS_REPACK_HEAP_TUPLES_WRITTEN + PROGRESS_REPACK_HEAP_TUPLES_INSERTED }; int64 ct_val[2]; @@ -934,6 +965,31 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, ct_val[1] = *num_tuples; pgstat_progress_update_multi_param(2, ct_index, ct_val); } + + /* + * Process the WAL produced by the load, as well as by other + * transactions, so that the replication slot can advance and WAL does + * not pile up. Use wal_segment_size as a threshold so that we do not + * introduce the decoding overhead too often. + * + * Of course, we must not apply the changes until the initial load has + * completed. + * + * Note that our insertions into the new table should not be decoded + * as we (intentionally) do not write the logical decoding specific + * information to WAL. + */ + if (concurrent) + { + XLogRecPtr end_of_wal; + + end_of_wal = GetFlushRecPtr(NULL); + if ((end_of_wal - end_of_wal_prev) > wal_segment_size) + { + repack_decode_concurrent_changes(decoding_ctx, end_of_wal); + end_of_wal_prev = end_of_wal; + } + } } if (indexScan != NULL) @@ -977,7 +1033,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, values, isnull, rwstate); /* Report n_tuples */ - pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_WRITTEN, + pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, n_tuples); } @@ -985,7 +1041,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, } /* Write out any remaining tuples, and fsync if needed */ - end_heap_rewrite(rwstate); + if (rwstate) + end_heap_rewrite(rwstate); /* Clean up */ pfree(values); @@ -2376,6 +2433,10 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, * SET WITHOUT OIDS. * * So, we must reconstruct the tuple from component Datums. + * + * If rwstate=NULL, use simple_heap_insert() instead of rewriting - in that + * case we still need to deform/form the tuple. TODO Shouldn't we rename the + * function, as might not do any rewrite? */ static void reform_and_rewrite_tuple(HeapTuple tuple, @@ -2398,8 +2459,28 @@ reform_and_rewrite_tuple(HeapTuple tuple, copiedTuple = heap_form_tuple(newTupDesc, values, isnull); - /* The heap rewrite module does the rest */ - rewrite_heap_tuple(rwstate, tuple, copiedTuple); + if (rwstate) + /* The heap rewrite module does the rest */ + rewrite_heap_tuple(rwstate, tuple, copiedTuple); + else + { + /* + * Insert tuple when processing REPACK CONCURRENTLY. + * + * rewriteheap.c is not used in the CONCURRENTLY case because it'd be + * difficult to do the same in the catch-up phase (as the logical + * decoding does not provide us with sufficient visibility + * information). Thus we must use heap_insert() both during the + * catch-up and here. + * + * The following is like simple_heap_insert() except that we pass the + * flag to skip logical decoding: as soon as REPACK CONCURRENTLY swaps + * the relation files, it drops this relation, so no logical + * replication subscription should need the data. + */ + heap_insert(NewHeap, copiedTuple, GetCurrentCommandId(true), + HEAP_INSERT_NO_LOGICAL, NULL); + } heap_freetuple(copiedTuple); } diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index e6d2b5fced1..6aa2ed214f2 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -617,9 +617,9 @@ raw_heap_insert(RewriteState state, HeapTuple tup) int options = HEAP_INSERT_SKIP_FSM; /* - * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data - * for the TOAST table are not logically decoded. The main heap is - * WAL-logged as XLOG FPI records, which are not logically decoded. + * While rewriting the heap for REPACK, make sure data for the TOAST + * table are not logically decoded. The main heap is WAL-logged as + * XLOG FPI records, which are not logically decoded. */ options |= HEAP_INSERT_NO_LOGICAL; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b885513f765..23f2de587a1 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -215,6 +215,7 @@ typedef struct TransactionStateData bool parallelChildXact; /* is any parent transaction parallel? */ bool chain; /* start a new block after this one */ bool topXidLogged; /* for a subxact: is top-level XID logged? */ + bool internal; /* for a subxact: launched internally? */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -4723,6 +4724,7 @@ BeginInternalSubTransaction(const char *name) /* Normal subtransaction start */ PushTransaction(); s = CurrentTransactionState; /* changed by push */ + s->internal = true; /* * Savepoint names, like the TransactionState block itself, live @@ -5239,7 +5241,13 @@ AbortSubTransaction(void) LWLockReleaseAll(); pgstat_report_wait_end(); - pgstat_progress_end_command(); + + /* + * Internal subtransacion might be used by an user command, in which case + * the command outlives the subtransaction. + */ + if (!s->internal) + pgstat_progress_end_command(); pgaio_error_cleanup(); @@ -5456,6 +5464,7 @@ PushTransaction(void) s->parallelModeLevel = 0; s->parallelChildXact = (p->parallelModeLevel != 0 || p->parallelChildXact); s->topXidLogged = false; + s->internal = false; CurrentTransactionState = s; diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 466cf0fdef6..c70521d1d54 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1418,22 +1418,7 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId, for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++) opclassOptions[i] = get_attoptions(oldIndexId, i + 1); - /* Extract statistic targets for each attribute */ - stattargets = palloc0_array(NullableDatum, newInfo->ii_NumIndexAttrs); - for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++) - { - HeapTuple tp; - Datum dat; - - tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(oldIndexId), Int16GetDatum(i + 1)); - if (!HeapTupleIsValid(tp)) - elog(ERROR, "cache lookup failed for attribute %d of relation %u", - i + 1, oldIndexId); - dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull); - ReleaseSysCache(tp); - stattargets[i].value = dat; - stattargets[i].isnull = isnull; - } + stattargets = get_index_stattargets(oldIndexId, newInfo); /* * Now create the new index. @@ -1472,6 +1457,32 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId, return newIndexId; } +NullableDatum * +get_index_stattargets(Oid indexid, IndexInfo *indInfo) +{ + NullableDatum *stattargets; + + /* Extract statistic targets for each attribute */ + stattargets = palloc0_array(NullableDatum, indInfo->ii_NumIndexAttrs); + for (int i = 0; i < indInfo->ii_NumIndexAttrs; i++) + { + HeapTuple tp; + Datum dat; + bool isnull; + + tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(indexid), Int16GetDatum(i + 1)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + i + 1, indexid); + dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull); + ReleaseSysCache(tp); + stattargets[i].value = dat; + stattargets[i].isnull = isnull; + } + + return stattargets; +} + /* * index_concurrently_build * diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2ff3322580f..d19770451d0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1263,16 +1263,17 @@ CREATE VIEW pg_stat_progress_cluster AS WHEN 2 THEN 'index scanning heap' WHEN 3 THEN 'sorting tuples' WHEN 4 THEN 'writing new heap' - WHEN 5 THEN 'swapping relation files' - WHEN 6 THEN 'rebuilding index' - WHEN 7 THEN 'performing final cleanup' + -- 5 is 'catch-up', but that should not appear here. + WHEN 6 THEN 'swapping relation files' + WHEN 7 THEN 'rebuilding index' + WHEN 8 THEN 'performing final cleanup' END AS phase, CAST(S.param3 AS oid) AS cluster_index_relid, S.param4 AS heap_tuples_scanned, S.param5 AS heap_tuples_written, - S.param6 AS heap_blks_total, - S.param7 AS heap_blks_scanned, - S.param8 AS index_rebuild_count + S.param8 AS heap_blks_total, + S.param9 AS heap_blks_scanned, + S.param10 AS index_rebuild_count FROM pg_stat_get_progress_info('CLUSTER') AS S LEFT JOIN pg_database D ON S.datid = D.oid; @@ -1288,16 +1289,19 @@ CREATE VIEW pg_stat_progress_repack AS WHEN 2 THEN 'index scanning heap' WHEN 3 THEN 'sorting tuples' WHEN 4 THEN 'writing new heap' - WHEN 5 THEN 'swapping relation files' - WHEN 6 THEN 'rebuilding index' - WHEN 7 THEN 'performing final cleanup' + WHEN 5 THEN 'catch-up' + WHEN 6 THEN 'swapping relation files' + WHEN 7 THEN 'rebuilding index' + WHEN 8 THEN 'performing final cleanup' END AS phase, CAST(S.param3 AS oid) AS repack_index_relid, S.param4 AS heap_tuples_scanned, - S.param5 AS heap_tuples_written, - S.param6 AS heap_blks_total, - S.param7 AS heap_blks_scanned, - S.param8 AS index_rebuild_count + S.param5 AS heap_tuples_inserted, + S.param6 AS heap_tuples_updated, + S.param7 AS heap_tuples_deleted, + S.param8 AS heap_blks_total, + S.param9 AS heap_blks_scanned, + S.param10 AS index_rebuild_count FROM pg_stat_get_progress_info('REPACK') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index c463cd58672..592909f453f 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -25,6 +25,10 @@ #include "access/toast_internals.h" #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" +#include "access/xlogutils.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/heap.h" @@ -32,6 +36,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/pg_am.h" +#include "catalog/pg_control.h" #include "catalog/pg_inherits.h" #include "catalog/toasting.h" #include "commands/cluster.h" @@ -39,10 +44,15 @@ #include "commands/progress.h" #include "commands/tablecmds.h" #include "commands/vacuum.h" +#include "executor/executor.h" #include "miscadmin.h" #include "optimizer/optimizer.h" #include "pgstat.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/snapbuild.h" #include "storage/bufmgr.h" +#include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "utils/acl.h" @@ -67,15 +77,45 @@ typedef struct Oid indexOid; } RelToCluster; +/* + * The following definitions are used for concurrent processing. + */ + +/* + * The locators are used to avoid logical decoding of data that we do not need + * for our table. + */ +RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid}; +RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid}; + +/* + * Everything we need to call ExecInsertIndexTuples(). + */ +typedef struct IndexInsertState +{ + ResultRelInfo *rri; + EState *estate; + + Relation ident_index; +} IndexInsertState; + +/* The WAL segment being decoded. */ +static XLogSegNo repack_current_segment = 0; + static void cluster_multiple_rels(List *rtcs, ClusterParams *params, - ClusterCommand cmd); + ClusterCommand cmd, LOCKMODE lockmode, + bool isTopLevel); static bool cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, - int options); + LOCKMODE lmode, int options); +static void check_repack_concurrently_requirements(Relation rel); static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, - ClusterCommand cmd); + bool concurrent, Oid userid, ClusterCommand cmd); static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, - bool verbose, bool *pSwapToastByContent, - TransactionId *pFreezeXid, MultiXactId *pCutoffMulti); + Snapshot snapshot, LogicalDecodingContext *decoding_ctx, + bool verbose, + bool *pSwapToastByContent, + TransactionId *pFreezeXid, + MultiXactId *pCutoffMulti); static List *get_tables_to_cluster(MemoryContext cluster_context); static List *get_tables_to_repack(MemoryContext repack_context); static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context, @@ -83,7 +123,53 @@ static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context, ClusterCommand cmd); static bool cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd); +static void begin_concurrent_repack(Relation rel); +static void end_concurrent_repack(void); +static LogicalDecodingContext *setup_logical_decoding(Oid relid, + const char *slotname, + TupleDesc tupdesc); +static HeapTuple get_changed_tuple(char *change); +static void apply_concurrent_changes(RepackDecodingState *dstate, + Relation rel, ScanKey key, int nkeys, + IndexInsertState *iistate); +static void apply_concurrent_insert(Relation rel, ConcurrentChange *change, + HeapTuple tup, IndexInsertState *iistate, + TupleTableSlot *index_slot); +static void apply_concurrent_update(Relation rel, HeapTuple tup, + HeapTuple tup_target, + ConcurrentChange *change, + IndexInsertState *iistate, + TupleTableSlot *index_slot); +static void apply_concurrent_delete(Relation rel, HeapTuple tup_target, + ConcurrentChange *change); +static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys, + HeapTuple tup_key, + IndexInsertState *iistate, + TupleTableSlot *ident_slot, + IndexScanDesc *scan_p); +static void process_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal, + Relation rel_dst, + Relation rel_src, + ScanKey ident_key, + int ident_key_nentries, + IndexInsertState *iistate); +static IndexInsertState *get_index_insert_state(Relation relation, + Oid ident_index_id); +static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src, + int *nentries); +static void free_index_insert_state(IndexInsertState *iistate); +static void cleanup_logical_decoding(LogicalDecodingContext *ctx); +static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, + Relation cl_index, + LogicalDecodingContext *ctx, + bool swap_toast_by_content, + TransactionId frozenXid, + MultiXactId cutoffMulti); +static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes); static Relation process_single_relation(RangeVar *relation, char *indexname, + LOCKMODE lockmode, + bool isTopLevel, ClusterParams *params, ClusterCommand cmd, Oid *indexOid_p); @@ -142,8 +228,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) if (stmt->relation != NULL) { - /* This is the single-relation case. */ rel = process_single_relation(stmt->relation, stmt->indexname, + AccessExclusiveLock, isTopLevel, ¶ms, CLUSTER_COMMAND_CLUSTER, &indexOid); if (rel == NULL) @@ -194,7 +280,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) } /* Do the job. */ - cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_CLUSTER); + cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_CLUSTER, + AccessExclusiveLock, isTopLevel); /* Start a new transaction for the cleanup work. */ StartTransactionCommand(); @@ -211,7 +298,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) * return. */ static void -cluster_multiple_rels(List *rtcs, ClusterParams *params, ClusterCommand cmd) +cluster_multiple_rels(List *rtcs, ClusterParams *params, ClusterCommand cmd, + LOCKMODE lockmode, bool isTopLevel) { ListCell *lc; @@ -231,10 +319,10 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params, ClusterCommand cmd) /* functions in indexes may want a snapshot set */ PushActiveSnapshot(GetTransactionSnapshot()); - rel = table_open(rtc->tableOid, AccessExclusiveLock); + rel = table_open(rtc->tableOid, lockmode); /* Process this table */ - cluster_rel(rel, rtc->indexOid, params, cmd); + cluster_rel(rel, rtc->indexOid, params, cmd, isTopLevel); /* cluster_rel closes the relation, but keeps lock */ PopActiveSnapshot(); @@ -258,12 +346,18 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params, ClusterCommand cmd) * instead of index order. This is the new implementation of VACUUM FULL, * and error messages should refer to the operation as VACUUM not CLUSTER. * + * Note that, in the concurrent case, the function releases the lock at some + * point, in order to get AccessExclusiveLock for the final steps (i.e. to + * swap the relation files). To make things simpler, the caller should expect + * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The + * AccessExclusiveLock is kept till the end of the transaction.) + * * 'cmd' indicates which command is being executed. REPACK should be the only * caller of this function in the future. */ void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, - ClusterCommand cmd) + ClusterCommand cmd, bool isTopLevel) { Oid tableOid = RelationGetRelid(OldHeap); Oid save_userid; @@ -272,8 +366,34 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, bool verbose = ((params->options & CLUOPT_VERBOSE) != 0); bool recheck = ((params->options & CLUOPT_RECHECK) != 0); Relation index; + bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0); + LOCKMODE lmode; + + /* + * Check that the correct lock is held. The lock mode is + * AccessExclusiveLock for normal processing and ShareUpdateExclusiveLock + * for concurrent processing (so that SELECT, INSERT, UPDATE and DELETE + * commands work, but cluster_rel() cannot be called concurrently for the + * same relation). + */ + lmode = !concurrent ? AccessExclusiveLock : ShareUpdateExclusiveLock; + + /* There are specific requirements on concurrent processing. */ + if (concurrent) + { + /* + * Make sure we have no XID assigned, otherwise call of + * setup_logical_decoding() can cause a deadlock. + * + * The existence of transaction block actually does not imply that XID + * was already assigned, but it very likely is. We might want to check + * the result of GetCurrentTransactionIdIfAny() instead, but that + * would be less clear from user's perspective. + */ + PreventInTransactionBlock(isTopLevel, "REPACK CONCURRENTLY"); - Assert(CheckRelationLockedByMe(OldHeap, AccessExclusiveLock, false)); + check_repack_concurrently_requirements(OldHeap); + } /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); @@ -319,7 +439,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, * to cluster a not-previously-clustered index. */ if (recheck) - if (!cluster_rel_recheck(OldHeap, indexOid, save_userid, + if (!cluster_rel_recheck(OldHeap, indexOid, save_userid, lmode, params->options)) goto out; @@ -338,6 +458,12 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot cluster a shared catalog"))); + /* + * The CONCURRENTLY case should have been rejected earlier because it does + * not support system catalogs. + */ + Assert(!(OldHeap->rd_rel->relisshared && concurrent)); + /* * Don't process temp tables of other backends ... their local buffer * manager is not going to cope. @@ -376,7 +502,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, if (OidIsValid(indexOid)) { /* verify the index is good and lock it */ - check_index_is_clusterable(OldHeap, indexOid, AccessExclusiveLock); + check_index_is_clusterable(OldHeap, indexOid, lmode); /* also open it */ index = index_open(indexOid, NoLock); } @@ -393,7 +519,9 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW && !RelationIsPopulated(OldHeap)) { - relation_close(OldHeap, AccessExclusiveLock); + if (index) + index_close(index, lmode); + relation_close(OldHeap, lmode); goto out; } @@ -406,11 +534,35 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, * invalid, because we move tuples around. Promote them to relation * locks. Predicate locks on indexes will be promoted when they are * reindexed. + * + * During concurrent processing, the heap as well as its indexes stay in + * operation, so we postpone this step until they are locked using + * AccessExclusiveLock near the end of the processing. */ - TransferPredicateLocksToHeapRelation(OldHeap); + if (!concurrent) + TransferPredicateLocksToHeapRelation(OldHeap); /* rebuild_relation does all the dirty work */ - rebuild_relation(OldHeap, index, verbose, cmd); + PG_TRY(); + { + /* + * For concurrent processing, make sure that our logical decoding + * ignores data changes of other tables than the one we are + * processing. + */ + if (concurrent) + begin_concurrent_repack(OldHeap); + + rebuild_relation(OldHeap, index, verbose, concurrent, save_userid, + cmd); + } + PG_FINALLY(); + { + if (concurrent) + end_concurrent_repack(); + } + PG_END_TRY(); + /* rebuild_relation closes OldHeap, and index if valid */ out: @@ -429,7 +581,7 @@ out: */ static bool cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, - int options) + LOCKMODE lmode, int options) { Oid tableOid = RelationGetRelid(OldHeap); @@ -437,7 +589,7 @@ cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, if (!cluster_is_permitted_for_relation(tableOid, userid, CLUSTER_COMMAND_CLUSTER)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } @@ -451,7 +603,7 @@ cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, */ if (RELATION_IS_OTHER_TEMP(OldHeap)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } @@ -462,7 +614,7 @@ cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, */ if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid))) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } @@ -473,7 +625,7 @@ cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 && !get_index_isclustered(indexOid)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } } @@ -614,19 +766,87 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal) table_close(pg_index, RowExclusiveLock); } +/* + * Check if the CONCURRENTLY option is legal for the relation. + */ +static void +check_repack_concurrently_requirements(Relation rel) +{ + char relpersistence, + replident; + Oid ident_idx; + + /* Data changes in system relations are not logically decoded. */ + if (IsCatalogRelation(rel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("REPACK CONCURRENTLY is not supported for catalog relations."))); + + /* + * reorderbuffer.c does not seem to handle processing of TOAST relation + * alone. + */ + if (IsToastRelation(rel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("REPACK CONCURRENTLY is not supported for TOAST relations, unless the main relation is repacked too."))); + + relpersistence = rel->rd_rel->relpersistence; + if (relpersistence != RELPERSISTENCE_PERMANENT) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("REPACK CONCURRENTLY is only allowed for permanent relations."))); + + /* With NOTHING, WAL does not contain the old tuple. */ + replident = rel->rd_rel->relreplident; + if (replident == REPLICA_IDENTITY_NOTHING) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("Relation \"%s\" has insufficient replication identity.", + RelationGetRelationName(rel)))); + + /* + * Identity index is not set if the replica identity is FULL, but PK might + * exist in such a case. + */ + ident_idx = RelationGetReplicaIndex(rel); + if (!OidIsValid(ident_idx) && OidIsValid(rel->rd_pkindex)) + ident_idx = rel->rd_pkindex; + if (!OidIsValid(ident_idx)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot process relation \"%s\"", + RelationGetRelationName(rel)), + (errhint("Relation \"%s\" has no identity index.", + RelationGetRelationName(rel))))); +} + /* * rebuild_relation: rebuild an existing relation in index or physical order * - * OldHeap: table to rebuild. + * OldHeap: table to rebuild. See cluster_rel() for comments on the required + * lock strength. + * * index: index to cluster by, or NULL to rewrite in physical order. * - * On entry, heap and index (if one is given) must be open, and - * AccessExclusiveLock held on them. - * On exit, they are closed, but locks on them are not released. + * On entry, heap and index (if one is given) must be open, and the + * appropriate lock held on them (AccessExclusiveLock for exclusive processing + * and ShareUpdateExclusiveLock for concurrent processing).. + * + * On exit, they are closed, but still locked with AccessExclusiveLock (The + * function handles the lock upgrade if 'concurrent' is true.) */ static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, - ClusterCommand cmd) + bool concurrent, Oid userid, ClusterCommand cmd) { Oid tableOid = RelationGetRelid(OldHeap); Oid accessMethod = OldHeap->rd_rel->relam; @@ -634,13 +854,55 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid OIDNewHeap; Relation NewHeap; char relpersistence; - bool is_system_catalog; bool swap_toast_by_content; TransactionId frozenXid; MultiXactId cutoffMulti; + NameData slotname; + LogicalDecodingContext *ctx = NULL; + Snapshot snapshot = NULL; +#if USE_ASSERT_CHECKING + LOCKMODE lmode; + + lmode = !concurrent ? AccessExclusiveLock : ShareUpdateExclusiveLock; + + Assert(CheckRelationLockedByMe(OldHeap, lmode, false) && + (index == NULL || CheckRelationLockedByMe(index, lmode, false))); +#endif + + if (concurrent) + { + TupleDesc tupdesc; - Assert(CheckRelationLockedByMe(OldHeap, AccessExclusiveLock, false) && - (index == NULL || CheckRelationLockedByMe(index, AccessExclusiveLock, false))); + /* + * REPACK CONCURRENTLY is not allowed in a transaction block, so this + * should never fire. + */ + Assert(GetTopTransactionIdIfAny() == InvalidTransactionId); + + /* + * A single backend should not execute multiple REPACK commands at a + * time, so use PID to make the slot unique. + */ + snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid); + + tupdesc = CreateTupleDescCopy(RelationGetDescr(OldHeap)); + + /* + * Prepare to capture the concurrent data changes. + * + * Note that this call waits for all transactions with XID already + * assigned to finish. If some of those transactions is waiting for a + * lock conflicting with ShareUpdateExclusiveLock on our table (e.g. + * it runs CREATE INDEX), we can end up in a deadlock. Not sure this + * risk is worth unlocking/locking the table (and its clustering + * index) and checking again if its still eligible for REPACK + * CONCURRENTLY. + */ + ctx = setup_logical_decoding(tableOid, NameStr(slotname), tupdesc); + + snapshot = SnapBuildInitialSnapshotForRepack(ctx->snapshot_builder); + PushActiveSnapshot(snapshot); + } if (index && cmd == CLUSTER_COMMAND_CLUSTER) /* Mark the correct index as clustered */ @@ -648,7 +910,6 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; - is_system_catalog = IsSystemRelation(OldHeap); /* * Create the transient table that will receive the re-ordered data. @@ -664,30 +925,67 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, NewHeap = table_open(OIDNewHeap, NoLock); /* Copy the heap data into the new table in the desired order */ - copy_table_data(NewHeap, OldHeap, index, verbose, + copy_table_data(NewHeap, OldHeap, index, snapshot, ctx, verbose, &swap_toast_by_content, &frozenXid, &cutoffMulti); + /* The historic snapshot won't be needed anymore. */ + if (snapshot) + PopActiveSnapshot(); - /* Close relcache entries, but keep lock until transaction commit */ - table_close(OldHeap, NoLock); - if (index) - index_close(index, NoLock); + if (concurrent) + { + /* + * Push a snapshot that we will use to find old versions of rows when + * processing concurrent UPDATE and DELETE commands. (That snapshot + * should also be used by index expressions.) + */ + PushActiveSnapshot(GetTransactionSnapshot()); - /* - * Close the new relation so it can be dropped as soon as the storage is - * swapped. The relation is not visible to others, so no need to unlock it - * explicitly. - */ - table_close(NewHeap, NoLock); + /* + * Make sure we can find the tuples just inserted when applying DML + * commands on top of those. + */ + CommandCounterIncrement(); + UpdateActiveSnapshotCommandId(); - /* - * Swap the physical files of the target and transient tables, then - * rebuild the target's indexes and throw away the transient table. - */ - finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, - swap_toast_by_content, false, true, - frozenXid, cutoffMulti, - relpersistence); + rebuild_relation_finish_concurrent(NewHeap, OldHeap, index, + ctx, swap_toast_by_content, + frozenXid, cutoffMulti); + PopActiveSnapshot(); + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_FINAL_CLEANUP); + + /* Done with decoding. */ + cleanup_logical_decoding(ctx); + ReplicationSlotRelease(); + ReplicationSlotDrop(NameStr(slotname), false); + } + else + { + bool is_system_catalog = IsSystemRelation(OldHeap); + + /* Close relcache entries, but keep lock until transaction commit */ + table_close(OldHeap, NoLock); + if (index) + index_close(index, NoLock); + + /* + * Close the new relation so it can be dropped as soon as the storage + * is swapped. The relation is not visible to others, so no need to + * unlock it explicitly. + */ + table_close(NewHeap, NoLock); + + /* + * Swap the physical files of the target and transient tables, then + * rebuild the target's indexes and throw away the transient table. + */ + finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, + swap_toast_by_content, false, true, true, + frozenXid, cutoffMulti, + relpersistence); + } } @@ -822,15 +1120,19 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, /* * Do the physical copying of table data. * + * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass + * iff concurrent processing is required. + * * There are three output parameters: * *pSwapToastByContent is set true if toast tables must be swapped by content. * *pFreezeXid receives the TransactionId used as freeze cutoff point. * *pCutoffMulti receives the MultiXactId used as a cutoff point. */ static void -copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose, - bool *pSwapToastByContent, TransactionId *pFreezeXid, - MultiXactId *pCutoffMulti) +copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, + Snapshot snapshot, LogicalDecodingContext *decoding_ctx, + bool verbose, bool *pSwapToastByContent, + TransactionId *pFreezeXid, MultiXactId *pCutoffMulti) { Relation relRelation; HeapTuple reltup; @@ -848,6 +1150,8 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb PGRUsage ru0; char *nspname; + bool concurrent = snapshot != NULL; + pg_rusage_init(&ru0); /* Store a copy of the namespace name for logging purposes */ @@ -950,8 +1254,48 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * provided, else plain seqscan. */ if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID) + { + ResourceOwner oldowner = NULL; + ResourceOwner resowner = NULL; + + /* + * In the CONCURRENT case, use a dedicated resource owner so we don't + * leave any additional locks behind us that we cannot release easily. + */ + if (concurrent) + { + Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, + false)); + Assert(CheckRelationLockedByMe(OldIndex, ShareUpdateExclusiveLock, + false)); + + resowner = ResourceOwnerCreate(CurrentResourceOwner, + "plan_cluster_use_sort"); + oldowner = CurrentResourceOwner; + CurrentResourceOwner = resowner; + } + use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), RelationGetRelid(OldIndex)); + + if (concurrent) + { + CurrentResourceOwner = oldowner; + + /* + * We are primarily concerned about locks, but if the planner + * happened to allocate any other resources, we should release + * them too because we're going to delete the whole resowner. + */ + ResourceOwnerRelease(resowner, RESOURCE_RELEASE_BEFORE_LOCKS, + false, false); + ResourceOwnerRelease(resowner, RESOURCE_RELEASE_LOCKS, + false, false); + ResourceOwnerRelease(resowner, RESOURCE_RELEASE_AFTER_LOCKS, + false, false); + ResourceOwnerDelete(resowner); + } + } else use_sort = false; @@ -980,7 +1324,9 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * values (e.g. because the AM doesn't use freezing). */ table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort, - cutoffs.OldestXmin, &cutoffs.FreezeLimit, + cutoffs.OldestXmin, snapshot, + decoding_ctx, + &cutoffs.FreezeLimit, &cutoffs.MultiXactCutoff, &num_tuples, &tups_vacuumed, &tups_recently_dead); @@ -989,7 +1335,11 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb *pFreezeXid = cutoffs.FreezeLimit; *pCutoffMulti = cutoffs.MultiXactCutoff; - /* Reset rd_toastoid just to be tidy --- it shouldn't be looked at again */ + /* + * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again. + * In the CONCURRENTLY case, we need to set it again before applying the + * concurrent changes. + */ NewHeap->rd_toastoid = InvalidOid; num_pages = RelationGetNumberOfBlocks(NewHeap); @@ -1447,14 +1797,13 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool swap_toast_by_content, bool check_constraints, bool is_internal, + bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence) { ObjectAddress object; Oid mapped_tables[4]; - int reindex_flags; - ReindexParams reindex_params = {0}; int i; /* Report that we are now swapping relation files */ @@ -1480,39 +1829,47 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, if (is_system_catalog) CacheInvalidateCatalog(OIDOldHeap); - /* - * Rebuild each index on the relation (but not the toast table, which is - * all-new at this point). It is important to do this before the DROP - * step because if we are processing a system catalog that will be used - * during DROP, we want to have its indexes available. There is no - * advantage to the other order anyway because this is all transactional, - * so no chance to reclaim disk space before commit. We do not need a - * final CommandCounterIncrement() because reindex_relation does it. - * - * Note: because index_build is called via reindex_relation, it will never - * set indcheckxmin true for the indexes. This is OK even though in some - * sense we are building new indexes rather than rebuilding existing ones, - * because the new heap won't contain any HOT chains at all, let alone - * broken ones, so it can't be necessary to set indcheckxmin. - */ - reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE; - if (check_constraints) - reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS; + if (reindex) + { + int reindex_flags; + ReindexParams reindex_params = {0}; - /* - * Ensure that the indexes have the same persistence as the parent - * relation. - */ - if (newrelpersistence == RELPERSISTENCE_UNLOGGED) - reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED; - else if (newrelpersistence == RELPERSISTENCE_PERMANENT) - reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT; + /* + * Rebuild each index on the relation (but not the toast table, which + * is all-new at this point). It is important to do this before the + * DROP step because if we are processing a system catalog that will + * be used during DROP, we want to have its indexes available. There + * is no advantage to the other order anyway because this is all + * transactional, so no chance to reclaim disk space before commit. We + * do not need a final CommandCounterIncrement() because + * reindex_relation does it. + * + * Note: because index_build is called via reindex_relation, it will + * never set indcheckxmin true for the indexes. This is OK even + * though in some sense we are building new indexes rather than + * rebuilding existing ones, because the new heap won't contain any + * HOT chains at all, let alone broken ones, so it can't be necessary + * to set indcheckxmin. + */ + reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE; + if (check_constraints) + reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS; - /* Report that we are now reindexing relations */ - pgstat_progress_update_param(PROGRESS_REPACK_PHASE, - PROGRESS_REPACK_PHASE_REBUILD_INDEX); + /* + * Ensure that the indexes have the same persistence as the parent + * relation. + */ + if (newrelpersistence == RELPERSISTENCE_UNLOGGED) + reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED; + else if (newrelpersistence == RELPERSISTENCE_PERMANENT) + reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT; - reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params); + /* Report that we are now reindexing relations */ + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_REBUILD_INDEX); + + reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params); + } /* Report that we are now doing clean up */ pgstat_progress_update_param(PROGRESS_REPACK_PHASE, @@ -1833,90 +2190,1315 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd) return false; } +#define REPL_PLUGIN_NAME "pgoutput_repack" + /* - * REPACK is intended to be a replacement of both CLUSTER and VACUUM FULL. + * Call this function before REPACK CONCURRENTLY starts to setup logical + * decoding. It makes sure that other users of the table put enough + * information into WAL. + * + * The point is that at various places we expect that the table we're + * processing is treated like a system catalog. For example, we need to be + * able to scan it using a "historic snapshot" anytime during the processing + * (as opposed to scanning only at the start point of the decoding, as logical + * replication does during initial table synchronization), in order to apply + * concurrent UPDATE / DELETE commands. + * + * Note that TOAST table needs no attention here as it's not scanned using + * historic snapshot. */ -void -repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) +static void +begin_concurrent_repack(Relation rel) { - ListCell *lc; - ClusterParams params = {0}; - bool verbose = false; - Relation rel = NULL; - Oid indexOid = InvalidOid; - MemoryContext repack_context; - List *rtcs; + Oid toastrelid; - /* Parse option list */ - foreach(lc, stmt->params) + /* Avoid logical decoding of other relations by this backend. */ + repacked_rel_locator = rel->rd_locator; + toastrelid = rel->rd_rel->reltoastrelid; + if (OidIsValid(toastrelid)) { - DefElem *opt = (DefElem *) lfirst(lc); + Relation toastrel; - if (strcmp(opt->defname, "verbose") == 0) - verbose = defGetBoolean(opt); - else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized REPACK option \"%s\"", - opt->defname), - parser_errposition(pstate, opt->location))); + /* Avoid logical decoding of other TOAST relations. */ + toastrel = table_open(toastrelid, AccessShareLock); + repacked_rel_toast_locator = toastrel->rd_locator; + table_close(toastrel, AccessShareLock); } +} - params.options = (verbose ? CLUOPT_VERBOSE : 0); +/* + * Call this when done with REPACK CONCURRENTLY. + */ +static void +end_concurrent_repack(void) +{ + /* + * Restore normal function of (future) logical decoding for this backend. + */ + repacked_rel_locator.relNumber = InvalidOid; + repacked_rel_toast_locator.relNumber = InvalidOid; +} - if (stmt->relation != NULL) - { - /* This is the single-relation case. */ - rel = process_single_relation(stmt->relation, stmt->indexname, - ¶ms, CLUSTER_COMMAND_REPACK, - &indexOid); - if (rel == NULL) - return; - } +/* + * This function is much like pg_create_logical_replication_slot() except that + * the new slot is neither released (if anyone else could read changes from + * our slot, we could miss changes other backends do while we copy the + * existing data into temporary table), nor persisted (it's easier to handle + * crash by restarting all the work from scratch). + */ +static LogicalDecodingContext * +setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc) +{ + LogicalDecodingContext *ctx; + RepackDecodingState *dstate; /* - * By here, we know we are in a multi-table situation. In order to avoid - * holding locks for too long, we want to process each table in its own - * transaction. This forces us to disallow running inside a user - * transaction block. + * Check if we can use logical decoding. */ - PreventInTransactionBlock(isTopLevel, "REPACK"); + CheckSlotPermissions(); + CheckLogicalDecodingRequirements(); - /* Also, we need a memory context to hold our list of relations */ - repack_context = AllocSetContextCreate(PortalContext, - "Repack", - ALLOCSET_DEFAULT_SIZES); + /* RS_TEMPORARY so that the slot gets cleaned up on ERROR. */ + ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, false, false); - params.options |= CLUOPT_RECHECK; - if (rel != NULL) - { - Oid relid; - bool rel_is_index; + /* + * Neither prepare_write nor do_write callback nor update_progress is + * useful for us. + * + * Regarding the value of need_full_snapshot, we pass false because the + * table we are processing is present in RepackedRelsHash and therefore, + * regarding logical decoding, treated like a catalog. + */ + ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME, + NIL, + false, + InvalidXLogRecPtr, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); - Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + /* + * We don't have control on setting fast_forward, so at least check it. + */ + Assert(!ctx->fast_forward); - if (OidIsValid(indexOid)) - { - relid = indexOid; - rel_is_index = true; - } - else + DecodingContextFindStartpoint(ctx); + + /* Some WAL records should have been read. */ + Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr); + + XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment, + wal_segment_size); + + /* + * Setup structures to store decoded changes. + */ + dstate = palloc0(sizeof(RepackDecodingState)); + dstate->relid = relid; + dstate->tstore = tuplestore_begin_heap(false, false, + maintenance_work_mem); + + dstate->tupdesc = tupdesc; + + /* Initialize the descriptor to store the changes ... */ + dstate->tupdesc_change = CreateTemplateTupleDesc(1); + + TupleDescInitEntry(dstate->tupdesc_change, 1, NULL, BYTEAOID, -1, 0); + /* ... as well as the corresponding slot. */ + dstate->tsslot = MakeSingleTupleTableSlot(dstate->tupdesc_change, + &TTSOpsMinimalTuple); + + dstate->resowner = ResourceOwnerCreate(CurrentResourceOwner, + "logical decoding"); + + ctx->output_writer_private = dstate; + return ctx; +} + +/* + * Retrieve tuple from ConcurrentChange structure. + * + * The input data starts with the structure but it might not be appropriately + * aligned. + */ +static HeapTuple +get_changed_tuple(char *change) +{ + HeapTupleData tup_data; + HeapTuple result; + char *src; + + /* + * Ensure alignment before accessing the fields. (This is why we can't use + * heap_copytuple() instead of this function.) + */ + src = change + offsetof(ConcurrentChange, tup_data); + memcpy(&tup_data, src, sizeof(HeapTupleData)); + + result = (HeapTuple) palloc(HEAPTUPLESIZE + tup_data.t_len); + memcpy(result, &tup_data, sizeof(HeapTupleData)); + result->t_data = (HeapTupleHeader) ((char *) result + HEAPTUPLESIZE); + src = change + SizeOfConcurrentChange; + memcpy(result->t_data, src, result->t_len); + + return result; +} + +/* + * Decode logical changes from the WAL sequence up to end_of_wal. + */ +void +repack_decode_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal) +{ + RepackDecodingState *dstate; + ResourceOwner resowner_old; + + /* + * Invalidate the "present" cache before moving to "(recent) history". + */ + InvalidateSystemCaches(); + + dstate = (RepackDecodingState *) ctx->output_writer_private; + resowner_old = CurrentResourceOwner; + CurrentResourceOwner = dstate->resowner; + + PG_TRY(); + { + while (ctx->reader->EndRecPtr < end_of_wal) { - relid = RelationGetRelid(rel); - rel_is_index = false; - } - rtcs = get_tables_to_cluster_partitioned(repack_context, relid, - rel_is_index, - CLUSTER_COMMAND_REPACK); + XLogRecord *record; + XLogSegNo segno_new; + char *errm = NULL; + XLogRecPtr end_lsn; - /* close relation, releasing lock on parent table */ - table_close(rel, AccessExclusiveLock); - } - else - rtcs = get_tables_to_repack(repack_context); + record = XLogReadRecord(ctx->reader, &errm); + if (errm) + elog(ERROR, "%s", errm); + + if (record != NULL) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + /* + * If WAL segment boundary has been crossed, inform the decoding + * system that the catalog_xmin can advance. (We can confirm more + * often, but a filling a single WAL segment should not take much + * time.) + */ + end_lsn = ctx->reader->EndRecPtr; + XLByteToSeg(end_lsn, segno_new, wal_segment_size); + if (segno_new != repack_current_segment) + { + LogicalConfirmReceivedLocation(end_lsn); + elog(DEBUG1, "REPACK: confirmed receive location %X/%X", + (uint32) (end_lsn >> 32), (uint32) end_lsn); + repack_current_segment = segno_new; + } + + CHECK_FOR_INTERRUPTS(); + } + InvalidateSystemCaches(); + CurrentResourceOwner = resowner_old; + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + CurrentResourceOwner = resowner_old; + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* + * Apply changes that happened during the initial load. + * + * Scan key is passed by caller, so it does not have to be constructed + * multiple times. Key entries have all fields initialized, except for + * sk_argument. + */ +static void +apply_concurrent_changes(RepackDecodingState *dstate, Relation rel, + ScanKey key, int nkeys, IndexInsertState *iistate) +{ + TupleTableSlot *index_slot, + *ident_slot; + HeapTuple tup_old = NULL; + + if (dstate->nchanges == 0) + return; + + /* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */ + index_slot = MakeSingleTupleTableSlot(dstate->tupdesc, &TTSOpsHeapTuple); + + /* A slot to fetch tuples from identity index. */ + ident_slot = table_slot_create(rel, NULL); + + while (tuplestore_gettupleslot(dstate->tstore, true, false, + dstate->tsslot)) + { + bool shouldFree; + HeapTuple tup_change, + tup, + tup_exist; + char *change_raw, + *src; + ConcurrentChange change; + bool isnull[1]; + Datum values[1]; + + CHECK_FOR_INTERRUPTS(); + + /* Get the change from the single-column tuple. */ + tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree); + heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull); + Assert(!isnull[0]); + + /* Make sure we access aligned data. */ + change_raw = (char *) DatumGetByteaP(values[0]); + src = (char *) VARDATA(change_raw); + memcpy(&change, src, SizeOfConcurrentChange); + + /* TRUNCATE change contains no tuple, so process it separately. */ + if (change.kind == CHANGE_TRUNCATE) + { + /* + * All the things that ExecuteTruncateGuts() does (such as firing + * triggers or handling the DROP_CASCADE behavior) should have + * taken place on the source relation. Thus we only do the actual + * truncation of the new relation (and its indexes). + */ + heap_truncate_one_rel(rel); + + pfree(tup_change); + continue; + } + + /* + * Extract the tuple from the change. The tuple is copied here because + * it might be assigned to 'tup_old', in which case it needs to + * survive into the next iteration. + */ + tup = get_changed_tuple(src); + + if (change.kind == CHANGE_UPDATE_OLD) + { + Assert(tup_old == NULL); + tup_old = tup; + } + else if (change.kind == CHANGE_INSERT) + { + Assert(tup_old == NULL); + + apply_concurrent_insert(rel, &change, tup, iistate, index_slot); + + pfree(tup); + } + else if (change.kind == CHANGE_UPDATE_NEW || + change.kind == CHANGE_DELETE) + { + IndexScanDesc ind_scan = NULL; + HeapTuple tup_key; + + if (change.kind == CHANGE_UPDATE_NEW) + { + tup_key = tup_old != NULL ? tup_old : tup; + } + else + { + Assert(tup_old == NULL); + tup_key = tup; + } + + /* + * Find the tuple to be updated or deleted. + */ + tup_exist = find_target_tuple(rel, key, nkeys, tup_key, + iistate, ident_slot, &ind_scan); + if (tup_exist == NULL) + elog(ERROR, "Failed to find target tuple"); + + if (change.kind == CHANGE_UPDATE_NEW) + apply_concurrent_update(rel, tup, tup_exist, &change, iistate, + index_slot); + else + apply_concurrent_delete(rel, tup_exist, &change); + + if (tup_old != NULL) + { + pfree(tup_old); + tup_old = NULL; + } + + pfree(tup); + index_endscan(ind_scan); + } + else + elog(ERROR, "Unrecognized kind of change: %d", change.kind); + + /* + * If a change was applied now, increment CID for next writes and + * update the snapshot so it sees the changes we've applied so far. + */ + if (change.kind != CHANGE_UPDATE_OLD) + { + CommandCounterIncrement(); + UpdateActiveSnapshotCommandId(); + } + + /* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */ + Assert(shouldFree); + pfree(tup_change); + } + + tuplestore_clear(dstate->tstore); + dstate->nchanges = 0; + + /* Cleanup. */ + ExecDropSingleTupleTableSlot(index_slot); + ExecDropSingleTupleTableSlot(ident_slot); +} + +static void +apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, + IndexInsertState *iistate, TupleTableSlot *index_slot) +{ + List *recheck; + + + /* + * Like simple_heap_insert(), but make sure that the INSERT is not + * logically decoded - see reform_and_rewrite_tuple() for more + * information. + */ + heap_insert(rel, tup, GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, + NULL); + + /* + * Update indexes. + * + * In case functions in the index need the active snapshot and caller + * hasn't set one. + */ + ExecStoreHeapTuple(tup, index_slot, false); + recheck = ExecInsertIndexTuples(iistate->rri, + index_slot, + iistate->estate, + false, /* update */ + false, /* noDupErr */ + NULL, /* specConflict */ + NIL, /* arbiterIndexes */ + false /* onlySummarizing */ + ); + + /* + * If recheck is required, it must have been preformed on the source + * relation by now. (All the logical changes we process here are already + * committed.) + */ + list_free(recheck); + + pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1); +} + +static void +apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, + ConcurrentChange *change, IndexInsertState *iistate, + TupleTableSlot *index_slot) +{ + LockTupleMode lockmode; + TM_FailureData tmfd; + TU_UpdateIndexes update_indexes; + TM_Result res; + List *recheck; + + /* + * Write the new tuple into the new heap. ('tup' gets the TID assigned + * here.) + * + * Do it like in simple_heap_update(), except for 'wal_logical' (and + * except for 'wait'). + */ + res = heap_update(rel, &tup_target->t_self, tup, + GetCurrentCommandId(true), + InvalidSnapshot, + false, /* no wait - only we are doing changes */ + &tmfd, &lockmode, &update_indexes, + false /* wal_logical */); + if (res != TM_Ok) + ereport(ERROR, (errmsg("failed to apply concurrent UPDATE"))); + + ExecStoreHeapTuple(tup, index_slot, false); + + if (update_indexes != TU_None) + { + recheck = ExecInsertIndexTuples(iistate->rri, + index_slot, + iistate->estate, + true, /* update */ + false, /* noDupErr */ + NULL, /* specConflict */ + NIL, /* arbiterIndexes */ + /* onlySummarizing */ + update_indexes == TU_Summarizing); + list_free(recheck); + } + + pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1); +} + +static void +apply_concurrent_delete(Relation rel, HeapTuple tup_target, + ConcurrentChange *change) +{ + TM_Result res; + TM_FailureData tmfd; + + /* + * Delete tuple from the new heap. + * + * Do it like in simple_heap_delete(), except for 'wal_logical' (and + * except for 'wait'). + */ + res = heap_delete(rel, &tup_target->t_self, GetCurrentCommandId(true), + InvalidSnapshot, false, + &tmfd, + false, /* no wait - only we are doing changes */ + false /* wal_logical */); + + if (res != TM_Ok) + ereport(ERROR, (errmsg("failed to apply concurrent DELETE"))); + + pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1); +} + +/* + * Find the tuple to be updated or deleted. + * + * 'key' is a pre-initialized scan key, into which the function will put the + * key values. + * + * 'tup_key' is a tuple containing the key values for the scan. + * + * On exit,'*scan_p' contains the scan descriptor used. The caller must close + * it when he no longer needs the tuple returned. + */ +static HeapTuple +find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, + IndexInsertState *iistate, + TupleTableSlot *ident_slot, IndexScanDesc *scan_p) +{ + IndexScanDesc scan; + Form_pg_index ident_form; + int2vector *ident_indkey; + HeapTuple result = NULL; + + /* XXX no instrumentation for now */ + scan = index_beginscan(rel, iistate->ident_index, GetActiveSnapshot(), + NULL, nkeys, 0); + *scan_p = scan; + index_rescan(scan, key, nkeys, NULL, 0); + + /* Info needed to retrieve key values from heap tuple. */ + ident_form = iistate->ident_index->rd_index; + ident_indkey = &ident_form->indkey; + + /* Use the incoming tuple to finalize the scan key. */ + for (int i = 0; i < scan->numberOfKeys; i++) + { + ScanKey entry; + bool isnull; + int16 attno_heap; + + entry = &scan->keyData[i]; + attno_heap = ident_indkey->values[i]; + entry->sk_argument = heap_getattr(tup_key, + attno_heap, + rel->rd_att, + &isnull); + Assert(!isnull); + } + if (index_getnext_slot(scan, ForwardScanDirection, ident_slot)) + { + bool shouldFree; + + result = ExecFetchSlotHeapTuple(ident_slot, false, &shouldFree); + /* TTSOpsBufferHeapTuple has .get_heap_tuple != NULL. */ + Assert(!shouldFree); + } + + return result; +} + +/* + * Decode and apply concurrent changes. + * + * Pass rel_src iff its reltoastrelid is needed. + */ +static void +process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal, + Relation rel_dst, Relation rel_src, ScanKey ident_key, + int ident_key_nentries, IndexInsertState *iistate) +{ + RepackDecodingState *dstate; + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_CATCH_UP); + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + repack_decode_concurrent_changes(ctx, end_of_wal); + + if (dstate->nchanges == 0) + return; + + PG_TRY(); + { + /* + * Make sure that TOAST values can eventually be accessed via the old + * relation - see comment in copy_table_data(). + */ + if (rel_src) + rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid; + + apply_concurrent_changes(dstate, rel_dst, ident_key, + ident_key_nentries, iistate); + } + PG_FINALLY(); + { + if (rel_src) + rel_dst->rd_toastoid = InvalidOid; + } + PG_END_TRY(); +} + +static IndexInsertState * +get_index_insert_state(Relation relation, Oid ident_index_id) +{ + EState *estate; + int i; + IndexInsertState *result; + + result = (IndexInsertState *) palloc0(sizeof(IndexInsertState)); + estate = CreateExecutorState(); + + result->rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo)); + InitResultRelInfo(result->rri, relation, 0, 0, 0); + ExecOpenIndices(result->rri, false); + + /* + * Find the relcache entry of the identity index so that we spend no extra + * effort to open / close it. + */ + for (i = 0; i < result->rri->ri_NumIndices; i++) + { + Relation ind_rel; + + ind_rel = result->rri->ri_IndexRelationDescs[i]; + if (ind_rel->rd_id == ident_index_id) + result->ident_index = ind_rel; + } + if (result->ident_index == NULL) + elog(ERROR, "Failed to open identity index"); + + /* Only initialize fields needed by ExecInsertIndexTuples(). */ + result->estate = estate; + + return result; +} + +/* + * Build scan key to process logical changes. + */ +static ScanKey +build_identity_key(Oid ident_idx_oid, Relation rel_src, int *nentries) +{ + Relation ident_idx_rel; + Form_pg_index ident_idx; + int n, + i; + ScanKey result; + + Assert(OidIsValid(ident_idx_oid)); + ident_idx_rel = index_open(ident_idx_oid, AccessShareLock); + ident_idx = ident_idx_rel->rd_index; + n = ident_idx->indnatts; + result = (ScanKey) palloc(sizeof(ScanKeyData) * n); + for (i = 0; i < n; i++) + { + ScanKey entry; + int16 relattno; + Form_pg_attribute att; + Oid opfamily, + opcintype, + opno, + opcode; + + entry = &result[i]; + relattno = ident_idx->indkey.values[i]; + if (relattno >= 1) + { + TupleDesc desc; + + desc = rel_src->rd_att; + att = TupleDescAttr(desc, relattno - 1); + } + else + elog(ERROR, "Unexpected attribute number %d in index", relattno); + + opfamily = ident_idx_rel->rd_opfamily[i]; + opcintype = ident_idx_rel->rd_opcintype[i]; + opno = get_opfamily_member(opfamily, opcintype, opcintype, + BTEqualStrategyNumber); + + if (!OidIsValid(opno)) + elog(ERROR, "Failed to find = operator for type %u", opcintype); + + opcode = get_opcode(opno); + if (!OidIsValid(opcode)) + elog(ERROR, "Failed to find = operator for operator %u", opno); + + /* Initialize everything but argument. */ + ScanKeyInit(entry, + i + 1, + BTEqualStrategyNumber, opcode, + (Datum) NULL); + entry->sk_collation = att->attcollation; + } + index_close(ident_idx_rel, AccessShareLock); + + *nentries = n; + return result; +} + +static void +free_index_insert_state(IndexInsertState *iistate) +{ + ExecCloseIndices(iistate->rri); + FreeExecutorState(iistate->estate); + pfree(iistate->rri); + pfree(iistate); +} + +static void +cleanup_logical_decoding(LogicalDecodingContext *ctx) +{ + RepackDecodingState *dstate; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + ExecDropSingleTupleTableSlot(dstate->tsslot); + FreeTupleDesc(dstate->tupdesc_change); + FreeTupleDesc(dstate->tupdesc); + tuplestore_end(dstate->tstore); + + FreeDecodingContext(ctx); +} + +/* + * The final steps of rebuild_relation() for concurrent processing. + * + * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its + * clustering index (if one is passed) are still locked in a mode that allows + * concurrent data changes. On exit, both tables and their indexes are closed, + * but locked in AccessExclusiveLock mode. + */ +static void +rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, + Relation cl_index, + LogicalDecodingContext *ctx, + bool swap_toast_by_content, + TransactionId frozenXid, + MultiXactId cutoffMulti) +{ + LOCKMODE lockmode_old PG_USED_FOR_ASSERTS_ONLY; + List *ind_oids_new; + Oid old_table_oid = RelationGetRelid(OldHeap); + Oid new_table_oid = RelationGetRelid(NewHeap); + List *ind_oids_old = RelationGetIndexList(OldHeap); + ListCell *lc, + *lc2; + char relpersistence; + bool is_system_catalog; + Oid ident_idx_old, + ident_idx_new; + IndexInsertState *iistate; + ScanKey ident_key; + int ident_key_nentries; + XLogRecPtr wal_insert_ptr, + end_of_wal; + char dummy_rec_data = '\0'; + Relation *ind_refs, + *ind_refs_p; + int nind; + + /* Like in cluster_rel(). */ + lockmode_old = ShareUpdateExclusiveLock; + Assert(CheckRelationLockedByMe(OldHeap, lockmode_old, false)); + Assert(cl_index == NULL || + CheckRelationLockedByMe(cl_index, lockmode_old, false)); + /* This is expected from the caller. */ + Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false)); + + ident_idx_old = RelationGetReplicaIndex(OldHeap); + + /* + * Unlike the exclusive case, we build new indexes for the new relation + * rather than swapping the storage and reindexing the old relation. The + * point is that the index build can take some time, so we do it before we + * get AccessExclusiveLock on the old heap and therefore we cannot swap + * the heap storage yet. + * + * index_create() will lock the new indexes using AccessExclusiveLock - no + * need to change that. + * + * We assume that ShareUpdateExclusiveLock on the table prevents anyone + * from dropping the existing indexes or adding new ones, so the lists of + * old and new indexes should match at the swap time. On the other hand we + * do not block ALTER INDEX commands that do not require table lock + * (e.g. ALTER INDEX ... SET ...). + * + * XXX Should we check a the end of our work if another transaction + * executed such a command and issue a NOTICE that we might have discarded + * its effects? (For example, someone changes storage parameter after we + * have created the new index, the new value of that parameter is lost.) + * Alternatively, we can lock all the indexes now in a mode that blocks + * all the ALTER INDEX commands (ShareUpdateExclusiveLock ?), and keep + * them locked till the end of the transactions. That might increase the + * risk of deadlock during the lock upgrade below, however SELECT / DML + * queries should not be involved in such a deadlock. + */ + ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old); + + /* + * Processing shouldn't start w/o valid identity index. + */ + Assert(OidIsValid(ident_idx_old)); + + /* Find "identity index" on the new relation. */ + ident_idx_new = InvalidOid; + forboth(lc, ind_oids_old, lc2, ind_oids_new) + { + Oid ind_old = lfirst_oid(lc); + Oid ind_new = lfirst_oid(lc2); + + if (ident_idx_old == ind_old) + { + ident_idx_new = ind_new; + break; + } + } + if (!OidIsValid(ident_idx_new)) + /* + * Should not happen, given our lock on the old relation. + */ + ereport(ERROR, + (errmsg("Identity index missing on the new relation"))); + + /* Executor state to update indexes. */ + iistate = get_index_insert_state(NewHeap, ident_idx_new); + + /* + * Build scan key that we'll use to look for rows to be updated / deleted + * during logical decoding. + */ + ident_key = build_identity_key(ident_idx_new, OldHeap, &ident_key_nentries); + + /* + * Flush all WAL records inserted so far (possibly except for the last + * incomplete page, see GetInsertRecPtr), to minimize the amount of data + * we need to flush while holding exclusive lock on the source table. + */ + wal_insert_ptr = GetInsertRecPtr(); + XLogFlush(wal_insert_ptr); + end_of_wal = GetFlushRecPtr(NULL); + + /* + * Apply concurrent changes first time, to minimize the time we need to + * hold AccessExclusiveLock. (Quite some amount of WAL could have been + * written during the data copying and index creation.) + */ + process_concurrent_changes(ctx, end_of_wal, NewHeap, + swap_toast_by_content ? OldHeap : NULL, + ident_key, ident_key_nentries, iistate); + + /* + * Acquire AccessExclusiveLock on the table, its TOAST relation (if there + * is one), all its indexes, so that we can swap the files. + * + * Before that, unlock the index temporarily to avoid deadlock in case + * another transaction is trying to lock it while holding the lock on the + * table. + */ + if (cl_index) + { + index_close(cl_index, ShareUpdateExclusiveLock); + cl_index = NULL; + } + /* For the same reason, unlock TOAST relation. */ + if (OldHeap->rd_rel->reltoastrelid) + LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock); + /* Finally lock the table */ + LockRelationOid(old_table_oid, AccessExclusiveLock); + + /* + * Lock all indexes now, not only the clustering one: all indexes need to + * have their files swapped. While doing that, store their relation + * references in an array, to handle predicate locks below. + */ + ind_refs_p = ind_refs = palloc_array(Relation, list_length(ind_oids_old)); + nind = 0; + foreach(lc, ind_oids_old) + { + Oid ind_oid; + Relation index; + + ind_oid = lfirst_oid(lc); + index = index_open(ind_oid, AccessExclusiveLock); + /* + * TODO 1) Do we need to check if ALTER INDEX was executed since the + * new index was created in build_new_indexes()? 2) Specifically for + * the clustering index, should check_index_is_clusterable() be called + * here? (Not sure about the latter: ShareUpdateExclusiveLock on the + * table probably blocks all commands that affect the result of + * check_index_is_clusterable().) + */ + *ind_refs_p = index; + ind_refs_p++; + nind++; + } + + /* + * In addition, lock the OldHeap's TOAST relation exclusively - again, the + * lock is needed to swap the files. + */ + if (OidIsValid(OldHeap->rd_rel->reltoastrelid)) + LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock); + + /* + * Tuples and pages of the old heap will be gone, but the heap will stay. + */ + TransferPredicateLocksToHeapRelation(OldHeap); + /* The same for indexes. */ + for (int i = 0; i < nind; i++) + { + Relation index = ind_refs[i]; + + TransferPredicateLocksToHeapRelation(index); + + /* + * References to indexes on the old relation are not needed anymore, + * however locks stay till the end of the transaction. + */ + index_close(index, NoLock); + } + pfree(ind_refs); + + /* + * Flush anything we see in WAL, to make sure that all changes committed + * while we were waiting for the exclusive lock are available for + * decoding. This should not be necessary if all backends had + * synchronous_commit set, but we can't rely on this setting. + * + * Unfortunately, GetInsertRecPtr() may lag behind the actual insert + * position, and GetLastImportantRecPtr() points at the start of the last + * record rather than at the end. Thus the simplest way to determine the + * insert position is to insert a dummy record and use its LSN. + * + * XXX Consider using GetLastImportantRecPtr() and adding the size of the + * last record (plus the total size of all the page headers the record + * spans)? + */ + XLogBeginInsert(); + XLogRegisterData(&dummy_rec_data, 1); + wal_insert_ptr = XLogInsert(RM_XLOG_ID, XLOG_NOOP); + XLogFlush(wal_insert_ptr); + end_of_wal = GetFlushRecPtr(NULL); + + /* Apply the concurrent changes again. */ + process_concurrent_changes(ctx, end_of_wal, NewHeap, + swap_toast_by_content ? OldHeap : NULL, + ident_key, ident_key_nentries, iistate); + + /* Remember info about rel before closing OldHeap */ + relpersistence = OldHeap->rd_rel->relpersistence; + is_system_catalog = IsSystemRelation(OldHeap); + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_SWAP_REL_FILES); + + /* + * Even ShareUpdateExclusiveLock should have prevented others from + * creating / dropping indexes (even using the CONCURRENTLY option), so we + * do not need to check whether the lists match. + */ + forboth(lc, ind_oids_old, lc2, ind_oids_new) + { + Oid ind_old = lfirst_oid(lc); + Oid ind_new = lfirst_oid(lc2); + Oid mapped_tables[4]; + + /* Zero out possible results from swapped_relation_files */ + memset(mapped_tables, 0, sizeof(mapped_tables)); + + swap_relation_files(ind_old, ind_new, + (old_table_oid == RelationRelationId), + swap_toast_by_content, + true, + InvalidTransactionId, + InvalidMultiXactId, + mapped_tables); + +#ifdef USE_ASSERT_CHECKING + + /* + * Concurrent processing is not supported for system relations, so + * there should be no mapped tables. + */ + for (int i = 0; i < 4; i++) + Assert(mapped_tables[i] == 0); +#endif + } + + /* The new indexes must be visible for deletion. */ + CommandCounterIncrement(); + + /* Close the old heap but keep lock until transaction commit. */ + table_close(OldHeap, NoLock); + /* Close the new heap. (We didn't have to open its indexes). */ + table_close(NewHeap, NoLock); + + /* Cleanup what we don't need anymore. (And close the identity index.) */ + pfree(ident_key); + free_index_insert_state(iistate); + + /* + * Swap the relations and their TOAST relations and TOAST indexes. This + * also drops the new relation and its indexes. + * + * (System catalogs are currently not supported.) + */ + Assert(!is_system_catalog); + finish_heap_swap(old_table_oid, new_table_oid, + is_system_catalog, + swap_toast_by_content, + false, true, false, + frozenXid, cutoffMulti, + relpersistence); +} + +/* + * Build indexes on NewHeap according to those on OldHeap. + * + * OldIndexes is the list of index OIDs on OldHeap. + * + * A list of OIDs of the corresponding indexes created on NewHeap is + * returned. The order of items does match, so we can use these arrays to swap + * index storage. + */ +static List * +build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes) +{ + StringInfo ind_name; + ListCell *lc; + List *result = NIL; + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_REBUILD_INDEX); + + ind_name = makeStringInfo(); + + foreach(lc, OldIndexes) + { + Oid ind_oid, + ind_oid_new, + tbsp_oid; + Relation ind; + IndexInfo *ind_info; + int i, + heap_col_id; + List *colnames; + int16 indnatts; + Oid *collations, + *opclasses; + HeapTuple tup; + bool isnull; + Datum d; + oidvector *oidvec; + int2vector *int2vec; + size_t oid_arr_size; + size_t int2_arr_size; + int16 *indoptions; + text *reloptions = NULL; + bits16 flags; + Datum *opclassOptions; + NullableDatum *stattargets; + + ind_oid = lfirst_oid(lc); + ind = index_open(ind_oid, AccessShareLock); + ind_info = BuildIndexInfo(ind); + + tbsp_oid = ind->rd_rel->reltablespace; + + /* + * Index name really doesn't matter, we'll eventually use only their + * storage. Just make them unique within the table. + */ + resetStringInfo(ind_name); + appendStringInfo(ind_name, "ind_%d", + list_cell_number(OldIndexes, lc)); + + flags = 0; + if (ind->rd_index->indisprimary) + flags |= INDEX_CREATE_IS_PRIMARY; + + colnames = NIL; + indnatts = ind->rd_index->indnatts; + oid_arr_size = sizeof(Oid) * indnatts; + int2_arr_size = sizeof(int16) * indnatts; + + collations = (Oid *) palloc(oid_arr_size); + for (i = 0; i < indnatts; i++) + { + char *colname; + + heap_col_id = ind->rd_index->indkey.values[i]; + if (heap_col_id > 0) + { + Form_pg_attribute att; + + /* Normal attribute. */ + att = TupleDescAttr(OldHeap->rd_att, heap_col_id - 1); + colname = pstrdup(NameStr(att->attname)); + collations[i] = att->attcollation; + } + else if (heap_col_id == 0) + { + HeapTuple tuple; + Form_pg_attribute att; + + /* + * Expression column is not present in relcache. What we need + * here is an attribute of the *index* relation. + */ + tuple = SearchSysCache2(ATTNUM, + ObjectIdGetDatum(ind_oid), + Int16GetDatum(i + 1)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, + "cache lookup failed for attribute %d of relation %u", + i + 1, ind_oid); + att = (Form_pg_attribute) GETSTRUCT(tuple); + colname = pstrdup(NameStr(att->attname)); + collations[i] = att->attcollation; + ReleaseSysCache(tuple); + } + else + elog(ERROR, "Unexpected column number: %d", + heap_col_id); + + colnames = lappend(colnames, colname); + } + + /* + * Special effort needed for variable length attributes of + * Form_pg_index. + */ + tup = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(ind_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for index %u", ind_oid); + d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indclass, &isnull); + Assert(!isnull); + oidvec = (oidvector *) DatumGetPointer(d); + opclasses = (Oid *) palloc(oid_arr_size); + memcpy(opclasses, oidvec->values, oid_arr_size); + + d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indoption, + &isnull); + Assert(!isnull); + int2vec = (int2vector *) DatumGetPointer(d); + indoptions = (int16 *) palloc(int2_arr_size); + memcpy(indoptions, int2vec->values, int2_arr_size); + ReleaseSysCache(tup); + + tup = SearchSysCache1(RELOID, ObjectIdGetDatum(ind_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for index relation %u", ind_oid); + d = SysCacheGetAttr(RELOID, tup, Anum_pg_class_reloptions, &isnull); + reloptions = !isnull ? DatumGetTextPCopy(d) : NULL; + ReleaseSysCache(tup); + + opclassOptions = palloc0(sizeof(Datum) * ind_info->ii_NumIndexAttrs); + for (i = 0; i < ind_info->ii_NumIndexAttrs; i++) + opclassOptions[i] = get_attoptions(ind_oid, i + 1); + + stattargets = get_index_stattargets(ind_oid, ind_info); + + /* + * Neither parentIndexRelid nor parentConstraintId needs to be passed + * since the new catalog entries (pg_constraint, pg_inherits) would + * eventually be dropped. Therefore there's no need to record valid + * dependency on parents. + */ + ind_oid_new = index_create(NewHeap, + ind_name->data, + InvalidOid, + InvalidOid, /* parentIndexRelid */ + InvalidOid, /* parentConstraintId */ + InvalidOid, + ind_info, + colnames, + ind->rd_rel->relam, + tbsp_oid, + collations, + opclasses, + opclassOptions, + indoptions, + stattargets, + PointerGetDatum(reloptions), + flags, /* flags */ + 0, /* constr_flags */ + false, /* allow_system_table_mods */ + false, /* is_internal */ + NULL /* constraintId */ + ); + result = lappend_oid(result, ind_oid_new); + + index_close(ind, AccessShareLock); + list_free_deep(colnames); + pfree(collations); + pfree(opclasses); + pfree(indoptions); + if (reloptions) + pfree(reloptions); + } + + return result; +} + +/* + * REPACK is intended to be a replacement of both CLUSTER and VACUUM FULL. + */ +void +repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) +{ + ListCell *lc; + ClusterParams params = {0}; + bool verbose = false; + Relation rel = NULL; + Oid indexOid = InvalidOid; + MemoryContext repack_context; + List *rtcs; + LOCKMODE lockmode; + + /* Parse option list */ + foreach(lc, stmt->params) + { + DefElem *opt = (DefElem *) lfirst(lc); + + if (strcmp(opt->defname, "verbose") == 0) + verbose = defGetBoolean(opt); + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized REPACK option \"%s\"", + opt->defname), + parser_errposition(pstate, opt->location))); + } + + params.options = + (verbose ? CLUOPT_VERBOSE : 0) | + (stmt->concurrent ? CLUOPT_CONCURRENT : 0); + + /* + * Determine the lock mode expected by cluster_rel(). + * + * In the exclusive case, we obtain AccessExclusiveLock right away to + * avoid lock-upgrade hazard in the single-transaction case. In the + * CONCURRENTLY case, the AccessExclusiveLock will only be used at the end + * of processing, supposedly for very short time. Until then, we'll have + * to unlock the relation temporarily, so there's no lock-upgrade hazard. + */ + lockmode = (params.options & CLUOPT_CONCURRENT) == 0 ? + AccessExclusiveLock : ShareUpdateExclusiveLock; + + if (stmt->relation != NULL) + { + /* This is the single-relation case. */ + rel = process_single_relation(stmt->relation, stmt->indexname, + lockmode, isTopLevel, ¶ms, + CLUSTER_COMMAND_REPACK, &indexOid); + if (rel == NULL) + return; + } + + /* + * By here, we know we are in a multi-table situation. + * + * Concurrent processing is currently considered rather special (e.g. in + * terms of resources consumed) so it is not performed in bulk. + */ + if (params.options & CLUOPT_CONCURRENT) + { + if (rel != NULL) + { + Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + ereport(ERROR, + (errmsg("REPACK CONCURRENTLY not supported for partitioned tables"), + errhint("Consider running the command for individual partitions."))); + } + else + ereport(ERROR, + (errmsg("REPACK CONCURRENTLY requires explicit table name"))); + } + + /* + * In order to avoid holding locks for too long, we want to process each + * table in its own transaction. This forces us to disallow running + * inside a user transaction block. + */ + PreventInTransactionBlock(isTopLevel, "REPACK"); + + /* Also, we need a memory context to hold our list of relations */ + repack_context = AllocSetContextCreate(PortalContext, + "Repack", + ALLOCSET_DEFAULT_SIZES); + + params.options |= CLUOPT_RECHECK; + if (rel != NULL) + { + Oid relid; + bool rel_is_index; + + Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + /* See the ereport() above. */ + Assert((params.options & CLUOPT_CONCURRENT) == 0); + + if (OidIsValid(indexOid)) + { + relid = indexOid; + rel_is_index = true; + } + else + { + relid = RelationGetRelid(rel); + rel_is_index = false; + } + rtcs = get_tables_to_cluster_partitioned(repack_context, relid, + rel_is_index, + CLUSTER_COMMAND_REPACK); + + /* close relation, releasing lock on parent table */ + table_close(rel, lockmode); + } + else + rtcs = get_tables_to_repack(repack_context); + + /* Do the job. */ + cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_REPACK, lockmode, + isTopLevel); - /* Do the job. */ - cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_REPACK); /* Start a new transaction for the cleanup work. */ StartTransactionCommand(); @@ -1933,6 +3515,7 @@ repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) */ static Relation process_single_relation(RangeVar *relation, char *indexname, + LOCKMODE lockmode, bool isTopLevel, ClusterParams *params, ClusterCommand cmd, Oid *indexOid_p) { @@ -1943,12 +3526,10 @@ process_single_relation(RangeVar *relation, char *indexname, Oid tableOid; /* - * Find, lock, and check permissions on the table. We obtain - * AccessExclusiveLock right away to avoid lock-upgrade hazard in the - * single-transaction case. + * Find, lock, and check permissions on the table. */ tableOid = RangeVarGetRelidExtended(relation, - AccessExclusiveLock, + lockmode, 0, RangeVarCallbackMaintainsTable, NULL); @@ -2012,7 +3593,7 @@ process_single_relation(RangeVar *relation, char *indexname, /* For non-partitioned tables, do what we came here to do. */ if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { - cluster_rel(rel, indexOid, params, cmd); + cluster_rel(rel, indexOid, params, cmd, isTopLevel); /* cluster_rel closes the relation, but keeps lock */ return NULL; diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index e7854add178..df879c2a18d 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -904,7 +904,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) { - finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, true, RecentXmin, ReadNextMultiXactId(), relpersistence); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 686f1850cab..f4e1ed0ec5f 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -5989,6 +5989,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode, finish_heap_swap(tab->relid, OIDNewHeap, false, false, true, !OidIsValid(tab->newTableSpace), + true, RecentXmin, ReadNextMultiXactId(), persistence); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index a4ad23448f8..f9f8f5ebb58 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -124,7 +124,7 @@ static void vac_truncate_clog(TransactionId frozenXID, TransactionId lastSaneFrozenXid, MultiXactId lastSaneMinMulti); static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, - BufferAccessStrategy bstrategy); + BufferAccessStrategy bstrategy, bool isTopLevel); static double compute_parallel_delay(void); static VacOptValue get_vacoptval_from_boolean(DefElem *def); static bool vac_tid_reaped(ItemPointer itemptr, void *state); @@ -634,7 +634,8 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, if (params->options & VACOPT_VACUUM) { - if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy)) + if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy, + isTopLevel)) continue; } @@ -1996,7 +1997,7 @@ vac_truncate_clog(TransactionId frozenXID, */ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, - BufferAccessStrategy bstrategy) + BufferAccessStrategy bstrategy, bool isTopLevel) { LOCKMODE lmode; Relation rel; @@ -2264,7 +2265,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */ cluster_rel(rel, InvalidOid, &cluster_params, - CLUSTER_COMMAND_VACUUM); + CLUSTER_COMMAND_VACUUM, isTopLevel); /* cluster_rel closes the relation, but keeps lock */ rel = NULL; @@ -2310,7 +2311,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, toast_vacuum_params.options |= VACOPT_PROCESS_MAIN; toast_vacuum_params.toast_parent = relid; - vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy); + vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy, + isTopLevel); } /* diff --git a/src/backend/meson.build b/src/backend/meson.build index 2b0db214804..50aa385a581 100644 --- a/src/backend/meson.build +++ b/src/backend/meson.build @@ -194,5 +194,6 @@ pg_test_mod_args = pg_mod_args + { subdir('jit/llvm') subdir('replication/libpqwalreceiver') subdir('replication/pgoutput') +subdir('replication/pgoutput_repack') subdir('snowball') subdir('utils/mb/conversion_procs') diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 00813f88b47..72f75f25fcc 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -11897,27 +11897,30 @@ cluster_index_specification: * * QUERY: * REPACK [ (options) ] [ [ USING INDEX ] ] + * REPACK [ (options) ] CONCURRENTLY [ USING INDEX ] * *****************************************************************************/ RepackStmt: - REPACK opt_repack_args + REPACK opt_concurrently opt_repack_args { RepackStmt *n = makeNode(RepackStmt); - n->relation = $2 ? (RangeVar *) linitial($2) : NULL; - n->indexname = $2 ? (char *) lsecond($2) : NULL; + n->relation = $3 ? (RangeVar *) linitial($3) : NULL; + n->indexname = $3 ? (char *) lsecond($3) : NULL; n->params = NIL; + n->concurrent = $2; $$ = (Node *) n; } - | REPACK '(' utility_option_list ')' opt_repack_args + | REPACK '(' utility_option_list ')' opt_concurrently opt_repack_args { RepackStmt *n = makeNode(RepackStmt); - n->relation = $5 ? (RangeVar *) linitial($5) : NULL; - n->indexname = $5 ? (char *) lsecond($5) : NULL; + n->relation = $6 ? (RangeVar *) linitial($6) : NULL; + n->indexname = $6 ? (char *) lsecond($6) : NULL; n->params = $3; + n->concurrent = $5; $$ = (Node *) n; } ; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 78f9a0a11c4..bc0e4397f35 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -33,6 +33,7 @@ #include "access/xlogreader.h" #include "access/xlogrecord.h" #include "catalog/pg_control.h" +#include "commands/cluster.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" @@ -467,6 +468,88 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + /* + * If the change is not intended for logical decoding, do not even + * establish transaction for it - REPACK CONCURRENTLY is the typical use + * case. + * + * First, check if REPACK CONCURRENTLY is being performed by this backend. + * If so, only decode data changes of the table that it is processing, and + * the changes of its TOAST relation. + * + * (TOAST locator should not be set unless the main is.) + */ + Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) || + OidIsValid(repacked_rel_locator.relNumber)); + + if (OidIsValid(repacked_rel_locator.relNumber)) + { + XLogReaderState *r = buf->record; + RelFileLocator locator; + + /* Not all records contain the block. */ + if (XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL) && + !RelFileLocatorEquals(locator, repacked_rel_locator) && + (!OidIsValid(repacked_rel_toast_locator.relNumber) || + !RelFileLocatorEquals(locator, repacked_rel_toast_locator))) + return; + } + + /* + * Second, skip records which do not contain sufficient information for + * the decoding. + * + * The problem we solve here is that REPACK CONCURRENTLY generates WAL + * when doing changes in the new table. Those changes should not be useful + * for any other user (such as logical replication subscription) because + * the new table will eventually be dropped (after REPACK CONCURRENTLY has + * assigned its file to the "old table"). + */ + switch (info) + { + case XLOG_HEAP_INSERT: + { + xl_heap_insert *rec; + + rec = (xl_heap_insert *) XLogRecGetData(buf->record); + + /* + * This does happen when 1) raw_heap_insert marks the TOAST + * record as HEAP_INSERT_NO_LOGICAL, 2) REPACK CONCURRENTLY + * replays inserts performed by other backends. + */ + if ((rec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) == 0) + return; + + break; + } + + case XLOG_HEAP_HOT_UPDATE: + case XLOG_HEAP_UPDATE: + { + xl_heap_update *rec; + + rec = (xl_heap_update *) XLogRecGetData(buf->record); + if ((rec->flags & + (XLH_UPDATE_CONTAINS_NEW_TUPLE | + XLH_UPDATE_CONTAINS_OLD_TUPLE | + XLH_UPDATE_CONTAINS_OLD_KEY)) == 0) + return; + + break; + } + + case XLOG_HEAP_DELETE: + { + xl_heap_delete *rec; + + rec = (xl_heap_delete *) XLogRecGetData(buf->record); + if (rec->flags & XLH_DELETE_NO_LOGICAL) + return; + break; + } + } + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index feaa3ac5ad4..5d552f9ce74 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -486,6 +486,26 @@ SnapBuildInitialSnapshot(SnapBuild *builder) return SnapBuildMVCCFromHistoric(snap, true); } +/* + * Build an MVCC snapshot for the initial data load performed by REPACK + * CONCURRENTLY command. + * + * The snapshot will only be used to scan one particular relation, which is + * treated like a catalog (therefore ->building_full_snapshot is not + * important), and the caller should already have a replication slot setup (so + * we do not set MyProc->xmin). XXX Do we yet need to add some restrictions? + */ +Snapshot +SnapBuildInitialSnapshotForRepack(SnapBuild *builder) +{ + Snapshot snap; + + Assert(builder->state == SNAPBUILD_CONSISTENT); + + snap = SnapBuildBuildSnapshot(builder); + return SnapBuildMVCCFromHistoric(snap, false); +} + /* * Turn a historic MVCC snapshot into an ordinary MVCC snapshot. * diff --git a/src/backend/replication/pgoutput_repack/Makefile b/src/backend/replication/pgoutput_repack/Makefile new file mode 100644 index 00000000000..4efeb713b70 --- /dev/null +++ b/src/backend/replication/pgoutput_repack/Makefile @@ -0,0 +1,32 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/pgoutput_repack +# +# IDENTIFICATION +# src/backend/replication/pgoutput_repack +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/pgoutput_repack +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + $(WIN32RES) \ + pgoutput_repack.o +PGFILEDESC = "pgoutput_repack - logical replication output plugin for REPACK command" +NAME = pgoutput_repack + +all: all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/pgoutput_repack/meson.build b/src/backend/replication/pgoutput_repack/meson.build new file mode 100644 index 00000000000..133e865a4a0 --- /dev/null +++ b/src/backend/replication/pgoutput_repack/meson.build @@ -0,0 +1,18 @@ +# Copyright (c) 2022-2024, PostgreSQL Global Development Group + +pgoutput_repack_sources = files( + 'pgoutput_repack.c', +) + +if host_system == 'windows' + pgoutput_repack_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pgoutput_repack', + '--FILEDESC', 'pgoutput_repack - logical replication output plugin for REPACK command',]) +endif + +pgoutput_repack = shared_module('pgoutput_repack', + pgoutput_repack_sources, + kwargs: pg_mod_args, +) + +backend_targets += pgoutput_repack diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c new file mode 100644 index 00000000000..687fbbc59bb --- /dev/null +++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c @@ -0,0 +1,288 @@ +/*------------------------------------------------------------------------- + * + * pgoutput_cluster.c + * Logical Replication output plugin for REPACK command + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/pgoutput_cluster/pgoutput_cluster.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heaptoast.h" +#include "commands/cluster.h" +#include "replication/snapbuild.h" + +PG_MODULE_MAGIC; + +static void plugin_startup(LogicalDecodingContext *ctx, + OutputPluginOptions *opt, bool is_init); +static void plugin_shutdown(LogicalDecodingContext *ctx); +static void plugin_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void plugin_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation rel, ReorderBufferChange *change); +static void plugin_truncate(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, + Relation relations[], + ReorderBufferChange *change); +static void store_change(LogicalDecodingContext *ctx, + ConcurrentChangeKind kind, HeapTuple tuple); + +void +_PG_output_plugin_init(OutputPluginCallbacks *cb) +{ + AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit); + + cb->startup_cb = plugin_startup; + cb->begin_cb = plugin_begin_txn; + cb->change_cb = plugin_change; + cb->truncate_cb = plugin_truncate; + cb->commit_cb = plugin_commit_txn; + cb->shutdown_cb = plugin_shutdown; +} + + +/* initialize this plugin */ +static void +plugin_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, + bool is_init) +{ + ctx->output_plugin_private = NULL; + + /* Probably unnecessary, as we don't use the SQL interface ... */ + opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + + if (ctx->output_plugin_options != NIL) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("This plugin does not expect any options"))); + } +} + +static void +plugin_shutdown(LogicalDecodingContext *ctx) +{ +} + +/* + * As we don't release the slot during processing of particular table, there's + * no room for SQL interface, even for debugging purposes. Therefore we need + * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin + * callbacks. (Although we might want to write custom callbacks, this API + * seems to be unnecessarily generic for our purposes.) + */ + +/* BEGIN callback */ +static void +plugin_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ +} + +/* COMMIT callback */ +static void +plugin_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ +} + +/* + * Callback for individual changed tuples + */ +static void +plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + RepackDecodingState *dstate; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + /* Only interested in one particular relation. */ + if (relation->rd_id != dstate->relid) + return; + + /* Decode entry depending on its type */ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple newtuple; + + newtuple = change->data.tp.newtuple != NULL ? + change->data.tp.newtuple : NULL; + + /* + * Identity checks in the main function should have made this + * impossible. + */ + if (newtuple == NULL) + elog(ERROR, "Incomplete insert info."); + + store_change(ctx, CHANGE_INSERT, newtuple); + } + break; + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple oldtuple, + newtuple; + + oldtuple = change->data.tp.oldtuple != NULL ? + change->data.tp.oldtuple : NULL; + newtuple = change->data.tp.newtuple != NULL ? + change->data.tp.newtuple : NULL; + + if (newtuple == NULL) + elog(ERROR, "Incomplete update info."); + + if (oldtuple != NULL) + store_change(ctx, CHANGE_UPDATE_OLD, oldtuple); + + store_change(ctx, CHANGE_UPDATE_NEW, newtuple); + } + break; + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple oldtuple; + + oldtuple = change->data.tp.oldtuple ? + change->data.tp.oldtuple : NULL; + + if (oldtuple == NULL) + elog(ERROR, "Incomplete delete info."); + + store_change(ctx, CHANGE_DELETE, oldtuple); + } + break; + default: + /* Should not come here */ + Assert(false); + break; + } +} + +static void +plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + RepackDecodingState *dstate; + int i; + Relation relation = NULL; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + /* Find the relation we are processing. */ + for (i = 0; i < nrelations; i++) + { + relation = relations[i]; + + if (RelationGetRelid(relation) == dstate->relid) + break; + } + + /* Is this truncation of another relation? */ + if (i == nrelations) + return; + + store_change(ctx, CHANGE_TRUNCATE, NULL); +} + +/* Store concurrent data change. */ +static void +store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, + HeapTuple tuple) +{ + RepackDecodingState *dstate; + char *change_raw; + ConcurrentChange change; + bool flattened = false; + Size size; + Datum values[1]; + bool isnull[1]; + char *dst, + *dst_start; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + size = MAXALIGN(VARHDRSZ) + SizeOfConcurrentChange; + + if (tuple) + { + /* + * ReorderBufferCommit() stores the TOAST chunks in its private memory + * context and frees them after having called apply_change(). + * Therefore we need flat copy (including TOAST) that we eventually + * copy into the memory context which is available to + * decode_concurrent_changes(). + */ + if (HeapTupleHasExternal(tuple)) + { + /* + * toast_flatten_tuple_to_datum() might be more convenient but we + * don't want the decompression it does. + */ + tuple = toast_flatten_tuple(tuple, dstate->tupdesc); + flattened = true; + } + + size += tuple->t_len; + } + + /* XXX Isn't there any function / macro to do this? */ + if (size >= 0x3FFFFFFF) + elog(ERROR, "Change is too big."); + + /* Construct the change. */ + change_raw = (char *) palloc0(size); + SET_VARSIZE(change_raw, size); + + /* + * Since the varlena alignment might not be sufficient for the structure, + * set the fields in a local instance and remember where it should + * eventually be copied. + */ + change.kind = kind; + dst_start = (char *) VARDATA(change_raw); + + /* No other information is needed for TRUNCATE. */ + if (change.kind == CHANGE_TRUNCATE) + { + memcpy(dst_start, &change, SizeOfConcurrentChange); + goto store; + } + + /* + * Copy the tuple. + * + * CAUTION: change->tup_data.t_data must be fixed on retrieval! + */ + memcpy(&change.tup_data, tuple, sizeof(HeapTupleData)); + dst = dst_start + SizeOfConcurrentChange; + memcpy(dst, tuple->t_data, tuple->t_len); + + /* The data has been copied. */ + if (flattened) + pfree(tuple); + +store: + /* Copy the structure so it can be stored. */ + memcpy(dst_start, &change, SizeOfConcurrentChange); + + /* Store as tuple of 1 bytea column. */ + values[0] = PointerGetDatum(change_raw); + isnull[0] = false; + tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change, + values, isnull); + + /* Accounting. */ + dstate->nchanges++; + + /* Cleanup. */ + pfree(change_raw); +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 00c76d05356..f247e4e7c16 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -25,6 +25,7 @@ #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "commands/async.h" +#include "commands/cluster.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 930321905f1..4155faf6c76 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -353,6 +353,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared pg_serial state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +RepackedRels "Waiting to read or update information on tables being repacked concurrently." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2905ae86a20..75434e32198 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -64,6 +64,7 @@ #include "catalog/pg_type.h" #include "catalog/schemapg.h" #include "catalog/storage.h" +#include "commands/cluster.h" #include "commands/policy.h" #include "commands/publicationcmds.h" #include "commands/trigger.h" diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 70a6b8902d1..7f1c220e00b 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -213,7 +213,6 @@ static List *exportedSnapshots = NIL; /* Prototypes for local functions */ static void UnregisterSnapshotNoOwner(Snapshot snapshot); -static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); /* ResourceOwner callbacks to track snapshot references */ @@ -646,7 +645,7 @@ CopySnapshot(Snapshot snapshot) * FreeSnapshot * Free the memory associated with a snapshot. */ -static void +void FreeSnapshot(Snapshot snapshot) { Assert(snapshot->regd_count == 0); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 8512e099b03..24016228522 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -4914,18 +4914,27 @@ match_previous_words(int pattern_id, } /* REPACK */ - else if (Matches("REPACK")) + else if (Matches("REPACK") || Matches("REPACK", "(*)")) + COMPLETE_WITH_SCHEMA_QUERY_PLUS(Query_for_list_of_clusterables, + "CONCURRENTLY"); + else if (Matches("REPACK", "CONCURRENTLY")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_clusterables); - else if (Matches("REPACK", "(*)")) + else if (Matches("REPACK", "(*)", "CONCURRENTLY")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_clusterables); - /* If we have REPACK , then add "USING INDEX" */ - else if (Matches("REPACK", MatchAnyExcept("("))) + /* If we have REPACK [ CONCURRENTLY ] , then add "USING INDEX" */ + else if (Matches("REPACK", MatchAnyExcept("(|CONCURRENTLY")) || + Matches("REPACK", "CONCURRENTLY", MatchAnyExcept("("))) COMPLETE_WITH("USING INDEX"); - /* If we have REPACK (*) , then add "USING INDEX" */ - else if (Matches("REPACK", "(*)", MatchAny)) + /* If we have REPACK (*) [ CONCURRENTLY ] , then add "USING INDEX" */ + else if (Matches("REPACK", "(*)", MatchAnyExcept("CONCURRENTLY")) || + Matches("REPACK", "(*)", "CONCURRENTLY", MatchAnyExcept("("))) COMPLETE_WITH("USING INDEX"); - /* If we have REPACK USING, then add the index as well */ - else if (Matches("REPACK", MatchAny, "USING", "INDEX")) + + /* + * Complete ... [ (*) ] [ CONCURRENTLY ] USING INDEX, with a list of + * indexes for . + */ + else if (TailMatches(MatchAnyExcept("(|CONCURRENTLY"), "USING", "INDEX")) { set_completion_reference(prev3_wd); COMPLETE_WITH_SCHEMA_QUERY(Query_for_index_of_table); diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index e48fe434cd3..be36bb51d0e 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -322,14 +322,15 @@ extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, BulkInsertState bistate); extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, - struct TM_FailureData *tmfd, bool changingPart); + struct TM_FailureData *tmfd, bool changingPart, + bool wal_logical); extern void heap_finish_speculative(Relation relation, ItemPointer tid); extern void heap_abort_speculative(Relation relation, ItemPointer tid); extern TM_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, LockTupleMode *lockmode, - TU_UpdateIndexes *update_indexes); + TU_UpdateIndexes *update_indexes, bool wal_logical); extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, @@ -411,6 +412,10 @@ extern HTSV_Result HeapTupleSatisfiesVacuumHorizon(HeapTuple htup, Buffer buffer TransactionId *dead_after); extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid); +extern bool HeapTupleMVCCInserted(HeapTuple htup, Snapshot snapshot, + Buffer buffer); +extern bool HeapTupleMVCCNotDeleted(HeapTuple htup, Snapshot snapshot, + Buffer buffer); extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple); extern bool HeapTupleIsSurelyDead(HeapTuple htup, struct GlobalVisState *vistest); diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 277df6b3cf0..8d4af07f840 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -104,6 +104,8 @@ #define XLH_DELETE_CONTAINS_OLD_KEY (1<<2) #define XLH_DELETE_IS_SUPER (1<<3) #define XLH_DELETE_IS_PARTITION_MOVE (1<<4) +/* See heap_delete() */ +#define XLH_DELETE_NO_LOGICAL (1<<5) /* convenience macro for checking whether any form of old tuple was logged */ #define XLH_DELETE_CONTAINS_OLD \ diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8713e12cbfb..58356392895 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -21,6 +21,7 @@ #include "access/sdir.h" #include "access/xact.h" #include "executor/tuptable.h" +#include "replication/logical.h" #include "storage/read_stream.h" #include "utils/rel.h" #include "utils/snapshot.h" @@ -623,6 +624,8 @@ typedef struct TableAmRoutine Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -1627,6 +1630,10 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) * not needed for the relation's AM * - *xid_cutoff - ditto * - *multi_cutoff - ditto + * - snapshot - if != NULL, ignore data changes done by transactions that this + * (MVCC) snapshot considers still in-progress or in the future. + * - decoding_ctx - logical decoding context, to capture concurrent data + * changes. * * Output parameters: * - *xid_cutoff - rel's new relfrozenxid value, may be invalid @@ -1639,6 +1646,8 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -1647,6 +1656,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, { OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex, use_sort, OldestXmin, + snapshot, decoding_ctx, xid_cutoff, multi_cutoff, num_tuples, tups_vacuumed, tups_recently_dead); diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 4daa8bef5ee..66431cc19e5 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -100,6 +100,9 @@ extern Oid index_concurrently_create_copy(Relation heapRelation, Oid tablespaceOid, const char *newName); +extern NullableDatum *get_index_stattargets(Oid indexid, + IndexInfo *indInfo); + extern void index_concurrently_build(Oid heapRelationId, Oid indexRelationId); diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index 3be57c97b3f..0a7e72bc74a 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -13,10 +13,15 @@ #ifndef CLUSTER_H #define CLUSTER_H +#include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" +#include "replication/logical.h" #include "storage/lock.h" +#include "storage/relfilelocator.h" #include "utils/relcache.h" +#include "utils/resowner.h" +#include "utils/tuplestore.h" /* flag bits for ClusterParams->options */ @@ -24,6 +29,7 @@ #define CLUOPT_RECHECK 0x02 /* recheck relation state */ #define CLUOPT_RECHECK_ISCLUSTERED 0x04 /* recheck relation state for * indisclustered */ +#define CLUOPT_CONCURRENT 0x08 /* allow concurrent data changes */ /* options for CLUSTER */ typedef struct ClusterParams @@ -46,13 +52,89 @@ typedef enum ClusterCommand CLUSTER_COMMAND_VACUUM } ClusterCommand; +/* + * The following definitions are used by REPACK CONCURRENTLY. + */ + +extern RelFileLocator repacked_rel_locator; +extern RelFileLocator repacked_rel_toast_locator; + +typedef enum +{ + CHANGE_INSERT, + CHANGE_UPDATE_OLD, + CHANGE_UPDATE_NEW, + CHANGE_DELETE, + CHANGE_TRUNCATE +} ConcurrentChangeKind; + +typedef struct ConcurrentChange +{ + /* See the enum above. */ + ConcurrentChangeKind kind; + + /* + * The actual tuple. + * + * The tuple data follows the ConcurrentChange structure. Before use make + * sure the tuple is correctly aligned (ConcurrentChange can be stored as + * bytea) and that tuple->t_data is fixed. + */ + HeapTupleData tup_data; +} ConcurrentChange; + +#define SizeOfConcurrentChange (offsetof(ConcurrentChange, tup_data) + \ + sizeof(HeapTupleData)) + +/* + * Logical decoding state. + * + * Here we store the data changes that we decode from WAL while the table + * contents is being copied to a new storage. Also the necessary metadata + * needed to apply these changes to the table is stored here. + */ +typedef struct RepackDecodingState +{ + /* The relation whose changes we're decoding. */ + Oid relid; + + /* + * Decoded changes are stored here. Although we try to avoid excessive + * batches, it can happen that the changes need to be stored to disk. The + * tuplestore does this transparently. + */ + Tuplestorestate *tstore; + + /* The current number of changes in tstore. */ + double nchanges; + + /* + * Descriptor to store the ConcurrentChange structure serialized (bytea). + * We can't store the tuple directly because tuplestore only supports + * minimum tuple and we may need to transfer OID system column from the + * output plugin. Also we need to transfer the change kind, so it's better + * to put everything in the structure than to use 2 tuplestores "in + * parallel". + */ + TupleDesc tupdesc_change; + + /* Tuple descriptor needed to update indexes. */ + TupleDesc tupdesc; + + /* Slot to retrieve data from tstore. */ + TupleTableSlot *tsslot; + + ResourceOwner resowner; +} RepackDecodingState; + extern void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel); extern void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, - ClusterCommand cmd); + ClusterCommand cmd, bool isTopLevel); extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode); extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); - +extern void repack_decode_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal); extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode); extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, @@ -60,6 +142,7 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool swap_toast_by_content, bool check_constraints, bool is_internal, + bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence); diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index f92ff524031..4cbf4d16529 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -59,18 +59,20 @@ /* * Progress parameters for REPACK. * - * Note: Since REPACK shares some code with CLUSTER, these values are also - * used by CLUSTER. (CLUSTER is now deprecated, so it makes little sense to - * introduce a separate set of constants.) + * Note: Since REPACK shares some code with CLUSTER, (some of) these values + * are also used by CLUSTER. (CLUSTER is now deprecated, so it makes little + * sense to introduce a separate set of constants.) */ #define PROGRESS_REPACK_COMMAND 0 #define PROGRESS_REPACK_PHASE 1 #define PROGRESS_REPACK_INDEX_RELID 2 #define PROGRESS_REPACK_HEAP_TUPLES_SCANNED 3 -#define PROGRESS_REPACK_HEAP_TUPLES_WRITTEN 4 -#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 5 -#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 6 -#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 7 +#define PROGRESS_REPACK_HEAP_TUPLES_INSERTED 4 +#define PROGRESS_REPACK_HEAP_TUPLES_UPDATED 5 +#define PROGRESS_REPACK_HEAP_TUPLES_DELETED 6 +#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 7 +#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 8 +#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 9 /* * Phases of repack (as advertised via PROGRESS_REPACK_PHASE). @@ -83,9 +85,10 @@ #define PROGRESS_REPACK_PHASE_INDEX_SCAN_HEAP 2 #define PROGRESS_REPACK_PHASE_SORT_TUPLES 3 #define PROGRESS_REPACK_PHASE_WRITE_NEW_HEAP 4 -#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 5 -#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 6 -#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 7 +#define PROGRESS_REPACK_PHASE_CATCH_UP 5 +#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 6 +#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 7 +#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 8 /* * Commands of PROGRESS_REPACK diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 648484205cb..d12827f4b5e 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3933,6 +3933,7 @@ typedef struct RepackStmt RangeVar *relation; /* relation being repacked */ char *indexname; /* order tuples by this index */ List *params; /* list of DefElem nodes */ + bool concurrent; /* allow concurrent access? */ } RepackStmt; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 6d4d2d1814c..802fc4b0823 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -73,6 +73,7 @@ extern void FreeSnapshotBuilder(SnapBuild *builder); extern void SnapBuildSnapDecRefcount(Snapshot snap); extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder); +extern Snapshot SnapBuildInitialSnapshotForRepack(SnapBuild *builder); extern Snapshot SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place); extern const char *SnapBuildExportSnapshot(SnapBuild *builder); extern void SnapBuildClearExportedSnapshot(void); diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h index 7f3ba0352f6..2739327b0da 100644 --- a/src/include/storage/lockdefs.h +++ b/src/include/storage/lockdefs.h @@ -36,8 +36,8 @@ typedef int LOCKMODE; #define AccessShareLock 1 /* SELECT */ #define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE */ #define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE */ -#define ShareUpdateExclusiveLock 4 /* VACUUM (non-FULL), ANALYZE, CREATE - * INDEX CONCURRENTLY */ +#define ShareUpdateExclusiveLock 4 /* VACUUM (non-exclusive), ANALYZE, CREATE + * INDEX CONCURRENTLY, REPACK CONCURRENTLY */ #define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY) */ #define ShareRowExclusiveLock 6 /* like EXCLUSIVE MODE, but allows ROW * SHARE */ diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146..9bb2f7ae1a8 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, RepackedRels) diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 147b190210a..5eeabdc6c4f 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -61,6 +61,8 @@ extern Snapshot GetLatestSnapshot(void); extern void SnapshotSetCommandId(CommandId curcid); extern Snapshot CopySnapshot(Snapshot snapshot); +extern void FreeSnapshot(Snapshot snapshot); + extern Snapshot GetCatalogSnapshot(Oid relid); extern Snapshot GetNonHistoricCatalogSnapshot(Oid relid); extern void InvalidateCatalogSnapshot(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 328235044d9..ebaf8fdd268 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1990,17 +1990,17 @@ pg_stat_progress_cluster| SELECT s.pid, WHEN 2 THEN 'index scanning heap'::text WHEN 3 THEN 'sorting tuples'::text WHEN 4 THEN 'writing new heap'::text - WHEN 5 THEN 'swapping relation files'::text - WHEN 6 THEN 'rebuilding index'::text - WHEN 7 THEN 'performing final cleanup'::text + WHEN 6 THEN 'swapping relation files'::text + WHEN 7 THEN 'rebuilding index'::text + WHEN 8 THEN 'performing final cleanup'::text ELSE NULL::text END AS phase, (s.param3)::oid AS cluster_index_relid, s.param4 AS heap_tuples_scanned, s.param5 AS heap_tuples_written, - s.param6 AS heap_blks_total, - s.param7 AS heap_blks_scanned, - s.param8 AS index_rebuild_count + s.param8 AS heap_blks_total, + s.param9 AS heap_blks_scanned, + s.param10 AS index_rebuild_count FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_copy| SELECT s.pid, @@ -2072,17 +2072,20 @@ pg_stat_progress_repack| SELECT s.pid, WHEN 2 THEN 'index scanning heap'::text WHEN 3 THEN 'sorting tuples'::text WHEN 4 THEN 'writing new heap'::text - WHEN 5 THEN 'swapping relation files'::text - WHEN 6 THEN 'rebuilding index'::text - WHEN 7 THEN 'performing final cleanup'::text + WHEN 5 THEN 'catch-up'::text + WHEN 6 THEN 'swapping relation files'::text + WHEN 7 THEN 'rebuilding index'::text + WHEN 8 THEN 'performing final cleanup'::text ELSE NULL::text END AS phase, (s.param3)::oid AS repack_index_relid, s.param4 AS heap_tuples_scanned, - s.param5 AS heap_tuples_written, - s.param6 AS heap_blks_total, - s.param7 AS heap_blks_scanned, - s.param8 AS index_rebuild_count + s.param5 AS heap_tuples_inserted, + s.param6 AS heap_tuples_updated, + s.param7 AS heap_tuples_deleted, + s.param8 AS heap_blks_total, + s.param9 AS heap_blks_scanned, + s.param10 AS index_rebuild_count FROM (pg_stat_get_progress_info('REPACK'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_vacuum| SELECT s.pid, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index bc2176b62ec..6bbc8b419f8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -486,6 +486,8 @@ CompressFileHandle CompressionLocation CompressorState ComputeXidHorizonsResult +ConcurrentChange +ConcurrentChangeKind ConditionVariable ConditionVariableMinimallyPadded ConditionalStack @@ -1252,6 +1254,7 @@ IndexElem IndexFetchHeapData IndexFetchTableData IndexInfo +IndexInsertState IndexList IndexOnlyScan IndexOnlyScanState @@ -2526,6 +2529,7 @@ ReorderBufferTupleCidKey ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId +RepackDecodingState RepackStmt ReparameterizeForeignPathByChild_function ReplaceVarsFromTargetList_context -- 2.43.5